From fd99390054cd9956d7f9cf3544dafb6e786e9f3c Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sat, 25 Dec 2021 01:36:12 +0000 Subject: [PATCH] ResponseBody does not require Unpin --- awc/CHANGES.md | 2 + awc/src/lib.rs | 3 +- awc/src/responses/json_body.rs | 36 ++++++++--------- awc/src/responses/mod.rs | 5 ++- awc/src/responses/response_body.rs | 64 +++++++++++++++++++----------- 5 files changed, 63 insertions(+), 47 deletions(-) diff --git a/awc/CHANGES.md b/awc/CHANGES.md index 2864c9112..af0b3de16 100644 --- a/awc/CHANGES.md +++ b/awc/CHANGES.md @@ -4,6 +4,8 @@ - Rename `Connector::{ssl => openssl}`. [#2503] - Improve `Client` instantiation efficiency when using `openssl` by only building connectors once. [#2503] - `ClientRequest::send_body` now takes an `impl MessageBody`. [#2546] +- Rename `MessageBody => ResponseBody` to avoid conflicts with `MessageBody` trait. [#2546] +- `impl Future` for `ResponseBody` no longer requires the body type be `Unpin`. [#2546] - `impl Future` for `JsonBody` no longer requires the body type be `Unpin`. [#2546] - `impl Stream` for `ClientResponse` no longer requires the body type be `Unpin`. [#2546] - `ClientResponse` is no longer `Unpin`. [#2546] diff --git a/awc/src/lib.rs b/awc/src/lib.rs index 6c13bc129..cef8e03dc 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -128,7 +128,8 @@ pub use self::client::Connector; pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse}; pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder}; pub use self::request::ClientRequest; -pub use self::responses::{ClientResponse, JsonBody, ResponseBody}; +#[allow(deprecated)] +pub use self::responses::{ClientResponse, JsonBody, MessageBody, ResponseBody}; pub use self::sender::SendClientRequest; use std::{convert::TryFrom, rc::Rc, time::Duration}; diff --git a/awc/src/responses/json_body.rs b/awc/src/responses/json_body.rs index 1d81dd2fc..3912324b6 100644 --- a/awc/src/responses/json_body.rs +++ b/awc/src/responses/json_body.rs @@ -16,15 +16,15 @@ use super::{read_body::ReadBody, ResponseTimeout, DEFAULT_BODY_LIMIT}; use crate::{error::JsonPayloadError, ClientResponse}; pin_project! { - /// Consumes body stream and parses JSON, resolving to a deserialized `T` value. + /// A `Future` that reads a body stream, parses JSON, resolving to a deserialized `T`. /// /// # Errors /// `Future` implementation returns error if: /// - content type is not `application/json`; /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB). - pub struct JsonBody { + pub struct JsonBody { #[pin] - body: Option>, + body: Option>, length: Option, timeout: ResponseTimeout, err: Option, @@ -32,13 +32,13 @@ pin_project! { } } -impl JsonBody +impl JsonBody where - B: Stream>, + S: Stream>, T: DeserializeOwned, { - /// Create `JsonBody` for request. - pub fn new(res: &mut ClientResponse) -> Self { + /// Creates a JSON body stream reader from a response by taking its payload. + pub fn new(res: &mut ClientResponse) -> Self { // check content-type let json = if let Ok(Some(mime)) = res.mime_type() { mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON) @@ -56,19 +56,15 @@ where }; } - let mut len = None; - - if let Some(l) = res.headers().get(&header::CONTENT_LENGTH) { - if let Ok(s) = l.to_str() { - if let Ok(l) = s.parse::() { - len = Some(l) - } - } - } + let length = res + .headers() + .get(&header::CONTENT_LENGTH) + .and_then(|len_hdr| len_hdr.to_str().ok()) + .and_then(|len_str| len_str.parse::().ok()); JsonBody { body: Some(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)), - length: len, + length, timeout: mem::take(&mut res.timeout), err: None, _phantom: PhantomData, @@ -85,9 +81,9 @@ where } } -impl Future for JsonBody +impl Future for JsonBody where - B: Stream>, + S: Stream>, T: DeserializeOwned, { type Output = Result; @@ -142,7 +138,7 @@ mod tests { } #[actix_rt::test] - async fn test_json_body() { + async fn read_json_body() { let mut req = TestResponse::default().finish(); let json = JsonBody::<_, MyObject>::new(&mut req).await; assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType)); diff --git a/awc/src/responses/mod.rs b/awc/src/responses/mod.rs index 82d8ada5c..8a409b0de 100644 --- a/awc/src/responses/mod.rs +++ b/awc/src/responses/mod.rs @@ -25,7 +25,8 @@ mod read_body; mod response_body; pub use self::json_body::JsonBody; -pub use self::response_body::ResponseBody; +#[allow(deprecated)] +pub use self::response_body::{MessageBody, ResponseBody}; /// Client Response pub struct ClientResponse { @@ -249,7 +250,7 @@ impl fmt::Debug for ClientResponse { } } -/// Default body size limit: 2MiB +/// Default body size limit: 2 MiB const DEFAULT_BODY_LIMIT: usize = 2 * 1024 * 1024; #[cfg(test)] diff --git a/awc/src/responses/response_body.rs b/awc/src/responses/response_body.rs index e5437760e..8d9d1274a 100644 --- a/awc/src/responses/response_body.rs +++ b/awc/src/responses/response_body.rs @@ -8,22 +8,35 @@ use std::{ use actix_http::{error::PayloadError, header, HttpMessage}; use bytes::Bytes; use futures_core::Stream; +use pin_project_lite::pin_project; use super::{read_body::ReadBody, ResponseTimeout, DEFAULT_BODY_LIMIT}; use crate::ClientResponse; -/// Future that resolves to a complete response body. -pub struct ResponseBody { - body: Result, Option>, - length: Option, - timeout: ResponseTimeout, +pin_project! { + /// A `Future` that reads a body stream, resolving as [`Bytes`]. + /// + /// # Errors + /// `Future` implementation returns error if: + /// - content type is not `application/json`; + /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB). + pub struct ResponseBody { + #[pin] + body: Option>, + length: Option, + timeout: ResponseTimeout, + err: Option, + } } +#[deprecated(since = "3.0.0", note = "Renamed to `ResponseBody`.")] +pub type MessageBody = ResponseBody; + impl ResponseBody where S: Stream>, { - /// Create `MessageBody` for request. + /// Creates a body stream reader from a response by taking its payload. pub fn new(res: &mut ClientResponse) -> ResponseBody { let length = match res.headers().get(&header::CONTENT_LENGTH) { Some(value) => { @@ -38,9 +51,10 @@ where }; ResponseBody { + body: Some(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)), length, timeout: mem::take(&mut res.timeout), - body: Ok(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)), + err: None, } } @@ -48,44 +62,46 @@ where /// /// The default limit is 2 MiB. pub fn limit(mut self, limit: usize) -> Self { - if let Ok(ref mut body) = self.body { + if let Some(ref mut body) = self.body { body.limit = limit; } + self } - fn err(e: PayloadError) -> Self { + fn err(err: PayloadError) -> Self { ResponseBody { + body: None, length: None, timeout: ResponseTimeout::default(), - body: Err(Some(e)), + err: Some(err), } } } impl Future for ResponseBody where - S: Stream> + Unpin, + S: Stream>, { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + let this = self.project(); - match this.body { - Err(ref mut err) => Poll::Ready(Err(err.take().unwrap())), - Ok(ref mut body) => { - if let Some(len) = this.length.take() { - if len > body.limit { - return Poll::Ready(Err(PayloadError::Overflow)); - } - } + if let Some(err) = this.err.take() { + return Poll::Ready(Err(err)); + } - this.timeout.poll_timeout(cx)?; - - Pin::new(body).poll(cx) + if let Some(len) = this.length.take() { + let body = Option::as_ref(&this.body).unwrap(); + if len > body.limit { + return Poll::Ready(Err(PayloadError::Overflow)); } } + + this.timeout.poll_timeout(cx)?; + + this.body.as_pin_mut().unwrap().poll(cx) } } @@ -99,7 +115,7 @@ mod tests { assert_impl_all!(ResponseBody<()>: Unpin); #[actix_rt::test] - async fn test_body() { + async fn read_body() { let mut req = TestResponse::with_header((header::CONTENT_LENGTH, "xxxx")).finish(); match req.body().await.err().unwrap() { PayloadError::UnknownLength => {}