diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 2beda3dcc..c3dc1de0f 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -3,9 +3,14 @@ ## Unreleased - 2021-xx-xx ### Added * `AnyBody::empty` for quickly creating an empty body. [#2446] +* `impl Clone` for `AnyBody where S: Clone`. [#????] +* `AnyBody::into_boxed` for quickly converting to a type-erased, boxed body type. [#????] ### Changed * Rename `AnyBody::{Message => Stream}`. [#2446] +* Rename `AnyBody::{from_message => new_boxed}`. [#????] +* Rename `BoxAnyBody` to `BoxBody` [#????] +* Change representation of `AnyBody` to include a type parameter in `Stream` variant. Defaults to `BoxBody`. [#????] ### Removed * `AnyBody::Empty`; an empty body can now only be represented as a zero-length `Bytes` variant. [#2446] diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 784312445..27a147379 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -92,6 +92,7 @@ regex = "1.3" rustls-pemfile = "0.2" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +static_assertions = "1" tls-openssl = { package = "openssl", version = "0.10.9" } tls-rustls = { package = "rustls", version = "0.20.0" } tokio = { version = "1.2", features = ["net", "rt"] } diff --git a/actix-http/src/body/body.rs b/actix-http/src/body/body.rs index 32b464486..6b6263889 100644 --- a/actix-http/src/body/body.rs +++ b/actix-http/src/body/body.rs @@ -16,7 +16,8 @@ use super::{BodySize, BodyStream, MessageBody, MessageBodyMapErr, SizedStream}; pub type Body = AnyBody; /// Represents various types of HTTP message body. -pub enum AnyBody { +#[derive(Clone)] +pub enum AnyBody { /// Empty response. `Content-Length` header is not set. None, @@ -24,7 +25,7 @@ pub enum AnyBody { Bytes(Bytes), /// Generic message body. - Stream(BoxAnyBody), + Stream(S), } impl AnyBody { @@ -33,22 +34,45 @@ impl AnyBody { Self::Bytes(Bytes::new()) } - /// Create body from slice (copy) - pub fn from_slice(s: &[u8]) -> Self { - Self::Bytes(Bytes::copy_from_slice(s)) - } - - /// Create body from generic message body. - pub fn from_message(body: B) -> Self + /// Create boxed body from generic message body. + pub fn new_boxed(body: B) -> Self where B: MessageBody + 'static, B::Error: Into>, { - Self::Stream(BoxAnyBody::from_body(body)) + Self::Stream(BoxBody::from_body(body)) + } + + /// Create body from slice (copy) + pub fn from_slice(s: &[u8]) -> Self { + Self::Bytes(Bytes::copy_from_slice(s)) } } -impl MessageBody for AnyBody { +impl AnyBody +where + B: MessageBody + 'static, + B::Error: Into>, +{ + /// Create body from generic message body. + pub fn new(body: B) -> Self { + Self::Stream(body) + } + + pub fn into_boxed(self) -> AnyBody { + match self { + AnyBody::None => AnyBody::new_boxed(()), + AnyBody::Bytes(body) => AnyBody::new_boxed(body), + AnyBody::Stream(body) => AnyBody::new_boxed(body), + } + } +} + +impl MessageBody for AnyBody +where + S: MessageBody + Unpin, + S::Error: StdError + 'static, +{ type Error = Error; fn size(&self) -> BodySize { @@ -74,8 +98,7 @@ impl MessageBody for AnyBody { } } - AnyBody::Stream(body) => body - .as_pin_mut() + AnyBody::Stream(body) => Pin::new(body) .poll_next(cx) .map_err(|err| Error::new_body().with_cause(err)), } @@ -95,25 +118,25 @@ impl PartialEq for AnyBody { } } -impl fmt::Debug for AnyBody { +impl fmt::Debug for AnyBody { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { AnyBody::None => write!(f, "AnyBody::None"), - AnyBody::Bytes(ref b) => write!(f, "AnyBody::Bytes({:?})", b), - AnyBody::Stream(_) => write!(f, "AnyBody::Message(_)"), + AnyBody::Bytes(ref bytes) => write!(f, "AnyBody::Bytes({:?})", bytes), + AnyBody::Stream(ref stream) => write!(f, "AnyBody::Message({:?})", stream), } } } impl From<&'static str> for AnyBody { - fn from(s: &'static str) -> Body { - AnyBody::Bytes(Bytes::from_static(s.as_ref())) + fn from(string: &'static str) -> Body { + AnyBody::Bytes(Bytes::from_static(string.as_ref())) } } impl From<&'static [u8]> for AnyBody { - fn from(s: &'static [u8]) -> Body { - AnyBody::Bytes(Bytes::from_static(s)) + fn from(bytes: &'static [u8]) -> Body { + AnyBody::Bytes(Bytes::from_static(bytes)) } } @@ -124,20 +147,20 @@ impl From> for AnyBody { } impl From for AnyBody { - fn from(s: String) -> Body { - s.into_bytes().into() + fn from(string: String) -> Body { + string.into_bytes().into() } } impl From<&'_ String> for AnyBody { - fn from(s: &String) -> Body { - AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s))) + fn from(string: &String) -> Body { + AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&string))) } } impl From> for AnyBody { - fn from(s: Cow<'_, str>) -> Body { - match s { + fn from(string: Cow<'_, str>) -> Body { + match string { Cow::Owned(s) => AnyBody::from(s), Cow::Borrowed(s) => { AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(s))) @@ -147,41 +170,41 @@ impl From> for AnyBody { } impl From for AnyBody { - fn from(s: Bytes) -> Body { - AnyBody::Bytes(s) + fn from(bytes: Bytes) -> Body { + AnyBody::Bytes(bytes) } } impl From for AnyBody { - fn from(s: BytesMut) -> Body { - AnyBody::Bytes(s.freeze()) + fn from(bytes: BytesMut) -> Body { + AnyBody::Bytes(bytes.freeze()) } } impl From> for AnyBody where S: Stream> + 'static, - E: Into> + 'static, + E: StdError + 'static, { - fn from(s: SizedStream) -> Body { - AnyBody::from_message(s) + fn from(stream: SizedStream) -> Body { + AnyBody::new_boxed(stream) } } impl From> for AnyBody where S: Stream> + 'static, - E: Into> + 'static, + E: StdError + 'static, { - fn from(s: BodyStream) -> Body { - AnyBody::from_message(s) + fn from(stream: BodyStream) -> Body { + AnyBody::new_boxed(stream) } } /// A boxed message body with boxed errors. -pub struct BoxAnyBody(Pin>>>); +pub struct BoxBody(Pin>>>); -impl BoxAnyBody { +impl BoxBody { /// Boxes a `MessageBody` and any errors it generates. pub fn from_body(body: B) -> Self where @@ -195,18 +218,18 @@ impl BoxAnyBody { /// Returns a mutable pinned reference to the inner message body type. pub fn as_pin_mut( &mut self, - ) -> Pin<&mut (dyn MessageBody>)> { + ) -> Pin<&mut (dyn MessageBody>)> { self.0.as_mut() } } -impl fmt::Debug for BoxAnyBody { +impl fmt::Debug for BoxBody { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str("BoxAnyBody(dyn MessageBody)") } } -impl MessageBody for BoxAnyBody { +impl MessageBody for BoxBody { type Error = Error; fn size(&self) -> BodySize { @@ -223,3 +246,18 @@ impl MessageBody for BoxAnyBody { .map_err(|err| Error::new_body().with_cause(err)) } } + +#[cfg(test)] +mod tests { + use static_assertions::{assert_impl_all, assert_not_impl_all}; + + use super::*; + + assert_impl_all!(AnyBody<()>: MessageBody, fmt::Debug, Send, Sync); + assert_impl_all!(AnyBody: MessageBody, fmt::Debug, Send, Sync); + assert_impl_all!(AnyBody: MessageBody, fmt::Debug); + assert_impl_all!(BoxBody: MessageBody, fmt::Debug); + + assert_not_impl_all!(AnyBody: Send, Sync); + assert_not_impl_all!(BoxBody: Send, Sync); +} diff --git a/actix-http/src/body/body_stream.rs b/actix-http/src/body/body_stream.rs index f726f4475..31de9b48f 100644 --- a/actix-http/src/body/body_stream.rs +++ b/actix-http/src/body/body_stream.rs @@ -75,10 +75,22 @@ mod tests { use derive_more::{Display, Error}; use futures_core::ready; use futures_util::{stream, FutureExt as _}; + use static_assertions::{assert_impl_all, assert_not_impl_all}; use super::*; use crate::body::to_bytes; + assert_impl_all!(BodyStream>>: MessageBody); + assert_impl_all!(BodyStream>>: MessageBody); + assert_impl_all!(BodyStream>>: MessageBody); + assert_impl_all!(BodyStream>>: MessageBody); + assert_impl_all!(BodyStream>>: MessageBody); + + assert_not_impl_all!(BodyStream>: MessageBody); + assert_not_impl_all!(BodyStream>: MessageBody); + // crate::Error is not Clone + assert_not_impl_all!(BodyStream>>: MessageBody); + #[actix_rt::test] async fn skips_empty_chunks() { let body = BodyStream::new(stream::iter( @@ -124,6 +136,30 @@ mod tests { assert!(matches!(to_bytes(body).await, Err(StreamErr))); } + #[actix_rt::test] + async fn stream_string_error() { + // `&'static str` does not impl `Error` + // but it does impl `Into>` + + let body = BodyStream::new(stream::once(async { Err("stringy error") })); + assert!(matches!(to_bytes(body).await, Err("stringy error"))); + } + + #[actix_rt::test] + async fn stream_boxed_error() { + // `Box` does not impl `Error` + // but it does impl `Into>` + + let body = BodyStream::new(stream::once(async { + Err(Box::::from("stringy error")) + })); + + assert_eq!( + to_bytes(body).await.unwrap_err().to_string(), + "stringy error" + ); + } + #[actix_rt::test] async fn stream_delayed_error() { let body = diff --git a/actix-http/src/body/mod.rs b/actix-http/src/body/mod.rs index 0d5b0f079..59c0eaf79 100644 --- a/actix-http/src/body/mod.rs +++ b/actix-http/src/body/mod.rs @@ -15,7 +15,7 @@ mod response_body; mod size; mod sized_stream; -pub use self::body::{AnyBody, Body, BoxAnyBody}; +pub use self::body::{AnyBody, Body, BoxBody}; pub use self::body_stream::BodyStream; pub use self::message_body::MessageBody; pub(crate) use self::message_body::MessageBodyMapErr; diff --git a/actix-http/src/body/sized_stream.rs b/actix-http/src/body/sized_stream.rs index b6ceb32fe..b92de44cc 100644 --- a/actix-http/src/body/sized_stream.rs +++ b/actix-http/src/body/sized_stream.rs @@ -72,10 +72,22 @@ mod tests { use actix_rt::pin; use actix_utils::future::poll_fn; use futures_util::stream; + use static_assertions::{assert_impl_all, assert_not_impl_all}; use super::*; use crate::body::to_bytes; + assert_impl_all!(SizedStream>>: MessageBody); + assert_impl_all!(SizedStream>>: MessageBody); + assert_impl_all!(SizedStream>>: MessageBody); + assert_impl_all!(SizedStream>>: MessageBody); + assert_impl_all!(SizedStream>>: MessageBody); + + assert_not_impl_all!(SizedStream>: MessageBody); + assert_not_impl_all!(SizedStream>: MessageBody); + // crate::Error is not Clone + assert_not_impl_all!(SizedStream>>: MessageBody); + #[actix_rt::test] async fn skips_empty_chunks() { let body = SizedStream::new( @@ -119,4 +131,37 @@ mod tests { assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12"))); } + + #[actix_rt::test] + async fn stream_string_error() { + // `&'static str` does not impl `Error` + // but it does impl `Into>` + + let body = SizedStream::new(0, stream::once(async { Err("stringy error") })); + assert_eq!(to_bytes(body).await, Ok(Bytes::new())); + + let body = SizedStream::new(1, stream::once(async { Err("stringy error") })); + assert!(matches!(to_bytes(body).await, Err("stringy error"))); + } + + #[actix_rt::test] + async fn stream_boxed_error() { + // `Box` does not impl `Error` + // but it does impl `Into>` + + let body = SizedStream::new( + 0, + stream::once(async { Err(Box::::from("stringy error")) }), + ); + assert_eq!(to_bytes(body).await.unwrap(), Bytes::new()); + + let body = SizedStream::new( + 1, + stream::once(async { Err(Box::::from("stringy error")) }), + ); + assert_eq!( + to_bytes(body).await.unwrap_err().to_string(), + "stringy error" + ); + } } diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 6cb034b76..b145cc26a 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -24,7 +24,7 @@ use flate2::write::{GzEncoder, ZlibEncoder}; use zstd::stream::write::Encoder as ZstdEncoder; use crate::{ - body::{Body, BodySize, BoxAnyBody, MessageBody, ResponseBody}, + body::{Body, BodySize, BoxBody, MessageBody, ResponseBody}, http::{ header::{ContentEncoding, CONTENT_ENCODING}, HeaderValue, StatusCode, @@ -100,7 +100,7 @@ impl Encoder { enum EncoderBody { Bytes(Bytes), Stream(#[pin] B), - BoxedStream(BoxAnyBody), + BoxedStream(BoxBody), } impl MessageBody for EncoderBody diff --git a/actix-http/src/response_builder.rs b/actix-http/src/response_builder.rs index a1cb1a423..e934f94dc 100644 --- a/actix-http/src/response_builder.rs +++ b/actix-http/src/response_builder.rs @@ -262,7 +262,7 @@ impl ResponseBuilder { S: Stream> + 'static, E: Into> + 'static, { - self.body(AnyBody::from_message(BodyStream::new(stream))) + self.body(AnyBody::new_boxed(BodyStream::new(stream))) } /// Generate response with an empty body. diff --git a/awc/src/sender.rs b/awc/src/sender.rs index fcd0c71af..02870aea9 100644 --- a/awc/src/sender.rs +++ b/awc/src/sender.rs @@ -9,7 +9,7 @@ use std::{ }; use actix_http::{ - body::{Body, BodyStream}, + body::{AnyBody, Body, BodyStream}, http::{ header::{self, HeaderMap, HeaderName, IntoHeaderValue}, Error as HttpError, @@ -286,7 +286,7 @@ impl RequestSender { response_decompress, timeout, config, - Body::from_message(BodyStream::new(stream)), + AnyBody::new_boxed(BodyStream::new(stream)), ) } diff --git a/src/middleware/compat.rs b/src/middleware/compat.rs index 0a6256fe2..752e90f94 100644 --- a/src/middleware/compat.rs +++ b/src/middleware/compat.rs @@ -7,7 +7,7 @@ use std::{ task::{Context, Poll}, }; -use actix_http::body::{Body, MessageBody}; +use actix_http::body::{AnyBody, MessageBody}; use actix_service::{Service, Transform}; use futures_core::{future::LocalBoxFuture, ready}; @@ -124,7 +124,7 @@ where B::Error: Into>, { fn map_body(self) -> ServiceResponse { - self.map_body(|_, body| Body::from_message(body)) + self.map_body(|_, body| AnyBody::new_boxed(body)) } } diff --git a/src/response/builder.rs b/src/response/builder.rs index f6099a019..e42d85f59 100644 --- a/src/response/builder.rs +++ b/src/response/builder.rs @@ -354,10 +354,10 @@ impl HttpResponseBuilder { #[inline] pub fn streaming(&mut self, stream: S) -> HttpResponse where - S: Stream> + Unpin + 'static, + S: Stream> + 'static, E: Into> + 'static, { - self.body(AnyBody::from_message(BodyStream::new(stream))) + self.body(AnyBody::new_boxed(BodyStream::new(stream))) } /// Set a json body and generate `Response`