ResponseBody does not require Unpin

This commit is contained in:
Rob Ede 2021-12-25 01:36:12 +00:00
parent 27f3d8f517
commit fd99390054
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
5 changed files with 63 additions and 47 deletions

View File

@ -4,6 +4,8 @@
- Rename `Connector::{ssl => openssl}`. [#2503] - Rename `Connector::{ssl => openssl}`. [#2503]
- Improve `Client` instantiation efficiency when using `openssl` by only building connectors once. [#2503] - Improve `Client` instantiation efficiency when using `openssl` by only building connectors once. [#2503]
- `ClientRequest::send_body` now takes an `impl MessageBody`. [#2546] - `ClientRequest::send_body` now takes an `impl MessageBody`. [#2546]
- Rename `MessageBody => ResponseBody` to avoid conflicts with `MessageBody` trait. [#2546]
- `impl Future` for `ResponseBody` no longer requires the body type be `Unpin`. [#2546]
- `impl Future` for `JsonBody` no longer requires the body type be `Unpin`. [#2546] - `impl Future` for `JsonBody` no longer requires the body type be `Unpin`. [#2546]
- `impl Stream` for `ClientResponse` no longer requires the body type be `Unpin`. [#2546] - `impl Stream` for `ClientResponse` no longer requires the body type be `Unpin`. [#2546]
- `ClientResponse` is no longer `Unpin`. [#2546] - `ClientResponse` is no longer `Unpin`. [#2546]

View File

@ -128,7 +128,8 @@ pub use self::client::Connector;
pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse}; pub use self::connect::{BoxConnectorService, BoxedSocket, ConnectRequest, ConnectResponse};
pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder}; pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder};
pub use self::request::ClientRequest; pub use self::request::ClientRequest;
pub use self::responses::{ClientResponse, JsonBody, ResponseBody}; #[allow(deprecated)]
pub use self::responses::{ClientResponse, JsonBody, MessageBody, ResponseBody};
pub use self::sender::SendClientRequest; pub use self::sender::SendClientRequest;
use std::{convert::TryFrom, rc::Rc, time::Duration}; use std::{convert::TryFrom, rc::Rc, time::Duration};

View File

@ -16,15 +16,15 @@ use super::{read_body::ReadBody, ResponseTimeout, DEFAULT_BODY_LIMIT};
use crate::{error::JsonPayloadError, ClientResponse}; use crate::{error::JsonPayloadError, ClientResponse};
pin_project! { pin_project! {
/// Consumes body stream and parses JSON, resolving to a deserialized `T` value. /// A `Future` that reads a body stream, parses JSON, resolving to a deserialized `T`.
/// ///
/// # Errors /// # Errors
/// `Future` implementation returns error if: /// `Future` implementation returns error if:
/// - content type is not `application/json`; /// - content type is not `application/json`;
/// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB). /// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB).
pub struct JsonBody<B, T> { pub struct JsonBody<S, T> {
#[pin] #[pin]
body: Option<ReadBody<B>>, body: Option<ReadBody<S>>,
length: Option<usize>, length: Option<usize>,
timeout: ResponseTimeout, timeout: ResponseTimeout,
err: Option<JsonPayloadError>, err: Option<JsonPayloadError>,
@ -32,13 +32,13 @@ pin_project! {
} }
} }
impl<B, T> JsonBody<B, T> impl<S, T> JsonBody<S, T>
where where
B: Stream<Item = Result<Bytes, PayloadError>>, S: Stream<Item = Result<Bytes, PayloadError>>,
T: DeserializeOwned, T: DeserializeOwned,
{ {
/// Create `JsonBody` for request. /// Creates a JSON body stream reader from a response by taking its payload.
pub fn new(res: &mut ClientResponse<B>) -> Self { pub fn new(res: &mut ClientResponse<S>) -> Self {
// check content-type // check content-type
let json = if let Ok(Some(mime)) = res.mime_type() { let json = if let Ok(Some(mime)) = res.mime_type() {
mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON) mime.subtype() == mime::JSON || mime.suffix() == Some(mime::JSON)
@ -56,19 +56,15 @@ where
}; };
} }
let mut len = None; let length = res
.headers()
if let Some(l) = res.headers().get(&header::CONTENT_LENGTH) { .get(&header::CONTENT_LENGTH)
if let Ok(s) = l.to_str() { .and_then(|len_hdr| len_hdr.to_str().ok())
if let Ok(l) = s.parse::<usize>() { .and_then(|len_str| len_str.parse::<usize>().ok());
len = Some(l)
}
}
}
JsonBody { JsonBody {
body: Some(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)), body: Some(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)),
length: len, length,
timeout: mem::take(&mut res.timeout), timeout: mem::take(&mut res.timeout),
err: None, err: None,
_phantom: PhantomData, _phantom: PhantomData,
@ -85,9 +81,9 @@ where
} }
} }
impl<B, T> Future for JsonBody<B, T> impl<S, T> Future for JsonBody<S, T>
where where
B: Stream<Item = Result<Bytes, PayloadError>>, S: Stream<Item = Result<Bytes, PayloadError>>,
T: DeserializeOwned, T: DeserializeOwned,
{ {
type Output = Result<T, JsonPayloadError>; type Output = Result<T, JsonPayloadError>;
@ -142,7 +138,7 @@ mod tests {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_json_body() { async fn read_json_body() {
let mut req = TestResponse::default().finish(); let mut req = TestResponse::default().finish();
let json = JsonBody::<_, MyObject>::new(&mut req).await; let json = JsonBody::<_, MyObject>::new(&mut req).await;
assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType)); assert!(json_eq(json.err().unwrap(), JsonPayloadError::ContentType));

View File

@ -25,7 +25,8 @@ mod read_body;
mod response_body; mod response_body;
pub use self::json_body::JsonBody; pub use self::json_body::JsonBody;
pub use self::response_body::ResponseBody; #[allow(deprecated)]
pub use self::response_body::{MessageBody, ResponseBody};
/// Client Response /// Client Response
pub struct ClientResponse<S = BoxedPayloadStream> { pub struct ClientResponse<S = BoxedPayloadStream> {
@ -249,7 +250,7 @@ impl<S> fmt::Debug for ClientResponse<S> {
} }
} }
/// Default body size limit: 2MiB /// Default body size limit: 2 MiB
const DEFAULT_BODY_LIMIT: usize = 2 * 1024 * 1024; const DEFAULT_BODY_LIMIT: usize = 2 * 1024 * 1024;
#[cfg(test)] #[cfg(test)]

View File

@ -8,22 +8,35 @@ use std::{
use actix_http::{error::PayloadError, header, HttpMessage}; use actix_http::{error::PayloadError, header, HttpMessage};
use bytes::Bytes; use bytes::Bytes;
use futures_core::Stream; use futures_core::Stream;
use pin_project_lite::pin_project;
use super::{read_body::ReadBody, ResponseTimeout, DEFAULT_BODY_LIMIT}; use super::{read_body::ReadBody, ResponseTimeout, DEFAULT_BODY_LIMIT};
use crate::ClientResponse; use crate::ClientResponse;
/// Future that resolves to a complete response body. pin_project! {
pub struct ResponseBody<S> { /// A `Future` that reads a body stream, resolving as [`Bytes`].
body: Result<ReadBody<S>, Option<PayloadError>>, ///
length: Option<usize>, /// # Errors
timeout: ResponseTimeout, /// `Future` implementation returns error if:
/// - content type is not `application/json`;
/// - content length is greater than [limit](JsonBody::limit) (default: 2 MiB).
pub struct ResponseBody<S> {
#[pin]
body: Option<ReadBody<S>>,
length: Option<usize>,
timeout: ResponseTimeout,
err: Option<PayloadError>,
}
} }
#[deprecated(since = "3.0.0", note = "Renamed to `ResponseBody`.")]
pub type MessageBody<B> = ResponseBody<B>;
impl<S> ResponseBody<S> impl<S> ResponseBody<S>
where where
S: Stream<Item = Result<Bytes, PayloadError>>, S: Stream<Item = Result<Bytes, PayloadError>>,
{ {
/// Create `MessageBody` for request. /// Creates a body stream reader from a response by taking its payload.
pub fn new(res: &mut ClientResponse<S>) -> ResponseBody<S> { pub fn new(res: &mut ClientResponse<S>) -> ResponseBody<S> {
let length = match res.headers().get(&header::CONTENT_LENGTH) { let length = match res.headers().get(&header::CONTENT_LENGTH) {
Some(value) => { Some(value) => {
@ -38,9 +51,10 @@ where
}; };
ResponseBody { ResponseBody {
body: Some(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)),
length, length,
timeout: mem::take(&mut res.timeout), timeout: mem::take(&mut res.timeout),
body: Ok(ReadBody::new(res.take_payload(), DEFAULT_BODY_LIMIT)), err: None,
} }
} }
@ -48,44 +62,46 @@ where
/// ///
/// The default limit is 2 MiB. /// The default limit is 2 MiB.
pub fn limit(mut self, limit: usize) -> Self { pub fn limit(mut self, limit: usize) -> Self {
if let Ok(ref mut body) = self.body { if let Some(ref mut body) = self.body {
body.limit = limit; body.limit = limit;
} }
self self
} }
fn err(e: PayloadError) -> Self { fn err(err: PayloadError) -> Self {
ResponseBody { ResponseBody {
body: None,
length: None, length: None,
timeout: ResponseTimeout::default(), timeout: ResponseTimeout::default(),
body: Err(Some(e)), err: Some(err),
} }
} }
} }
impl<S> Future for ResponseBody<S> impl<S> Future for ResponseBody<S>
where where
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin, S: Stream<Item = Result<Bytes, PayloadError>>,
{ {
type Output = Result<Bytes, PayloadError>; type Output = Result<Bytes, PayloadError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.project();
match this.body { if let Some(err) = this.err.take() {
Err(ref mut err) => Poll::Ready(Err(err.take().unwrap())), return Poll::Ready(Err(err));
Ok(ref mut body) => { }
if let Some(len) = this.length.take() {
if len > body.limit {
return Poll::Ready(Err(PayloadError::Overflow));
}
}
this.timeout.poll_timeout(cx)?; if let Some(len) = this.length.take() {
let body = Option::as_ref(&this.body).unwrap();
Pin::new(body).poll(cx) if len > body.limit {
return Poll::Ready(Err(PayloadError::Overflow));
} }
} }
this.timeout.poll_timeout(cx)?;
this.body.as_pin_mut().unwrap().poll(cx)
} }
} }
@ -99,7 +115,7 @@ mod tests {
assert_impl_all!(ResponseBody<()>: Unpin); assert_impl_all!(ResponseBody<()>: Unpin);
#[actix_rt::test] #[actix_rt::test]
async fn test_body() { async fn read_body() {
let mut req = TestResponse::with_header((header::CONTENT_LENGTH, "xxxx")).finish(); let mut req = TestResponse::with_header((header::CONTENT_LENGTH, "xxxx")).finish();
match req.body().await.err().unwrap() { match req.body().await.err().unwrap() {
PayloadError::UnknownLength => {} PayloadError::UnknownLength => {}