Merge pull request #1358 from dunnock/msg-body

Adapt workspace crates to MessageBody::poll with Pin
This commit is contained in:
Yuki Okushi 2020-02-20 03:50:25 +09:00 committed by GitHub
commit 4e9caadc3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 16 additions and 14 deletions

View File

@ -108,7 +108,7 @@ codegen-units = 1
[patch.crates-io] [patch.crates-io]
actix-web = { path = "." } actix-web = { path = "." }
#actix-http = { path = "actix-http" } actix-http = { path = "actix-http" }
actix-http-test = { path = "test-server" } actix-http-test = { path = "test-server" }
actix-web-codegen = { path = "actix-web-codegen" } actix-web-codegen = { path = "actix-web-codegen" }
actix-cors = { path = "actix-cors" } actix-cors = { path = "actix-cors" }

View File

@ -81,9 +81,6 @@ async fn service(msg: ws::Frame) -> Result<ws::Message, Error> {
Ok(msg) Ok(msg)
} }
/*
Temporarily commented out due to dependency on actix-http-test
#[actix_rt::test] #[actix_rt::test]
async fn test_simple() { async fn test_simple() {
let ws_service = WsService::new(); let ws_service = WsService::new();
@ -195,5 +192,3 @@ async fn test_simple() {
assert!(ws_service.was_polled()); assert!(ws_service.was_polled());
} }
*/

View File

@ -238,15 +238,20 @@ where
} }
} }
use pin_project::{pin_project, pinned_drop};
#[pin_project(PinnedDrop)]
pub struct StreamLog<B> { pub struct StreamLog<B> {
#[pin]
body: ResponseBody<B>, body: ResponseBody<B>,
format: Option<Format>, format: Option<Format>,
size: usize, size: usize,
time: OffsetDateTime, time: OffsetDateTime,
} }
impl<B> Drop for StreamLog<B> { #[pinned_drop]
fn drop(&mut self) { impl<B> PinnedDrop for StreamLog<B> {
fn drop(self: Pin<&mut Self>) {
if let Some(ref format) = self.format { if let Some(ref format) = self.format {
let render = |fmt: &mut Formatter<'_>| { let render = |fmt: &mut Formatter<'_>| {
for unit in &format.0 { for unit in &format.0 {
@ -259,15 +264,17 @@ impl<B> Drop for StreamLog<B> {
} }
} }
impl<B: MessageBody> MessageBody for StreamLog<B> { impl<B: MessageBody> MessageBody for StreamLog<B> {
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
self.body.size() self.body.size()
} }
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, Error>>> {
match self.body.poll_next(cx) { let this = self.project();
match this.body.poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => { Poll::Ready(Some(Ok(chunk))) => {
self.size += chunk.len(); *this.size += chunk.len();
Poll::Ready(Some(Ok(chunk))) Poll::Ready(Some(Ok(chunk)))
} }
val => val, val => val,

View File

@ -150,7 +150,7 @@ where
pub async fn read_response<S, B>(app: &mut S, req: Request) -> Bytes pub async fn read_response<S, B>(app: &mut S, req: Request) -> Bytes
where where
S: Service<Request = Request, Response = ServiceResponse<B>, Error = Error>, S: Service<Request = Request, Response = ServiceResponse<B>, Error = Error>,
B: MessageBody, B: MessageBody + Unpin,
{ {
let mut resp = app let mut resp = app
.call(req) .call(req)
@ -193,7 +193,7 @@ where
/// ``` /// ```
pub async fn read_body<B>(mut res: ServiceResponse<B>) -> Bytes pub async fn read_body<B>(mut res: ServiceResponse<B>) -> Bytes
where where
B: MessageBody, B: MessageBody + Unpin,
{ {
let mut body = res.take_body(); let mut body = res.take_body();
let mut bytes = BytesMut::new(); let mut bytes = BytesMut::new();
@ -251,7 +251,7 @@ where
pub async fn read_response_json<S, B, T>(app: &mut S, req: Request) -> T pub async fn read_response_json<S, B, T>(app: &mut S, req: Request) -> T
where where
S: Service<Request = Request, Response = ServiceResponse<B>, Error = Error>, S: Service<Request = Request, Response = ServiceResponse<B>, Error = Error>,
B: MessageBody, B: MessageBody + Unpin,
T: DeserializeOwned, T: DeserializeOwned,
{ {
let body = read_response(app, req).await; let body = read_response(app, req).await;