diff --git a/actix-http/src/body/boxed.rs b/actix-http/src/body/boxed.rs index 75a9e6b7a..f232c3332 100644 --- a/actix-http/src/body/boxed.rs +++ b/actix-http/src/body/boxed.rs @@ -70,7 +70,7 @@ mod tests { #[actix_rt::test] async fn nested_boxed_body() { - let body = Bytes::from(&[1, 2, 3]); + let body = Bytes::from_static(&[1, 2, 3]); let boxed_body = BoxBody::new(BoxBody::new(body)); assert_eq!( diff --git a/actix-http/src/body/either.rs b/actix-http/src/body/either.rs index e58d30237..ba50ff672 100644 --- a/actix-http/src/body/either.rs +++ b/actix-http/src/body/either.rs @@ -78,6 +78,6 @@ mod tests { #[test] fn either_body_works() { - EitherBody::left(()); + let body = EitherBody::new(()); } } diff --git a/actix-http/src/body/mod.rs b/actix-http/src/body/mod.rs index 83d20b1d0..b0a31609c 100644 --- a/actix-http/src/body/mod.rs +++ b/actix-http/src/body/mod.rs @@ -1,12 +1,5 @@ //! Traits and structures to aid consuming and writing HTTP payloads. -use std::task::Poll; - -use actix_rt::pin; -use actix_utils::future::poll_fn; -use bytes::{Bytes, BytesMut}; -use futures_core::ready; - #[allow(clippy::module_inception)] mod body; mod body_stream; @@ -15,6 +8,7 @@ mod either; mod message_body; mod size; mod sized_stream; +mod utils; pub use self::body::AnyBody; #[allow(deprecated)] @@ -26,51 +20,7 @@ pub use self::message_body::MessageBody; pub(crate) use self::message_body::MessageBodyMapErr; pub use self::size::BodySize; pub use self::sized_stream::SizedStream; - -/// Collects the body produced by a `MessageBody` implementation into `Bytes`. -/// -/// Any errors produced by the body stream are returned immediately. -/// -/// # Examples -/// ``` -/// use actix_http::body::{AnyBody, to_bytes}; -/// use bytes::Bytes; -/// -/// # async fn test_to_bytes() { -/// let body = AnyBody::none(); -/// let bytes = to_bytes(body).await.unwrap(); -/// assert!(bytes.is_empty()); -/// -/// let body = AnyBody::copy_from_slice(b"123"); -/// let bytes = to_bytes(body).await.unwrap(); -/// assert_eq!(bytes, b"123"[..]); -/// # } -/// ``` -pub async fn to_bytes(body: B) -> Result { - let cap = match body.size() { - BodySize::None | BodySize::Sized(0) => return Ok(Bytes::new()), - BodySize::Sized(size) => size as usize, - // good enough first guess for chunk size - BodySize::Stream => 32_768, - }; - - let mut buf = BytesMut::with_capacity(cap); - - pin!(body); - - poll_fn(|cx| loop { - let body = body.as_mut(); - - match ready!(body.poll_next(cx)) { - Some(Ok(bytes)) => buf.extend_from_slice(&*bytes), - None => return Poll::Ready(Ok(())), - Some(Err(err)) => return Poll::Ready(Err(err)), - } - }) - .await?; - - Ok(buf.freeze()) -} +pub use self::utils::to_bytes; #[cfg(test)] mod tests { @@ -263,15 +213,4 @@ mod tests { let not_body = resp_body.downcast_ref::<()>(); assert!(not_body.is_none()); } - - #[actix_rt::test] - async fn test_to_bytes() { - let body = AnyBody::empty(); - let bytes = to_bytes(body).await.unwrap(); - assert!(bytes.is_empty()); - - let body = AnyBody::copy_from_slice(b"123"); - let bytes = to_bytes(body).await.unwrap(); - assert_eq!(bytes, b"123"[..]); - } } diff --git a/actix-http/src/body/utils.rs b/actix-http/src/body/utils.rs new file mode 100644 index 000000000..ed85fac63 --- /dev/null +++ b/actix-http/src/body/utils.rs @@ -0,0 +1,78 @@ +use std::task::Poll; + +use actix_rt::pin; +use actix_utils::future::poll_fn; +use bytes::{Bytes, BytesMut}; +use futures_core::ready; + +use super::{BodySize, MessageBody}; + +/// Collects the body produced by a `MessageBody` implementation into `Bytes`. +/// +/// Any errors produced by the body stream are returned immediately. +/// +/// # Examples +/// ``` +/// use actix_http::body::{AnyBody, to_bytes}; +/// use bytes::Bytes; +/// +/// # async fn test_to_bytes() { +/// let body = AnyBody::none(); +/// let bytes = to_bytes(body).await.unwrap(); +/// assert!(bytes.is_empty()); +/// +/// let body = AnyBody::copy_from_slice(b"123"); +/// let bytes = to_bytes(body).await.unwrap(); +/// assert_eq!(bytes, b"123"[..]); +/// # } +/// ``` +pub async fn to_bytes(body: B) -> Result { + let cap = match body.size() { + BodySize::None | BodySize::Sized(0) => return Ok(Bytes::new()), + BodySize::Sized(size) => size as usize, + // good enough first guess for chunk size + BodySize::Stream => 32_768, + }; + + let mut buf = BytesMut::with_capacity(cap); + + pin!(body); + + poll_fn(|cx| loop { + let body = body.as_mut(); + + match ready!(body.poll_next(cx)) { + Some(Ok(bytes)) => buf.extend_from_slice(&*bytes), + None => return Poll::Ready(Ok(())), + Some(Err(err)) => return Poll::Ready(Err(err)), + } + }) + .await?; + + Ok(buf.freeze()) +} + +#[cfg(test)] +mod test { + use futures_util::{stream, StreamExt as _}; + + use super::*; + use crate::{body::BodyStream, Error}; + + #[actix_rt::test] + async fn test_to_bytes() { + let bytes = to_bytes(()).await.unwrap(); + assert!(bytes.is_empty()); + + let body = Bytes::from_static(b"123"); + let bytes = to_bytes(body).await.unwrap(); + assert_eq!(bytes, b"123"[..]); + + let stream = + stream::iter(vec![Bytes::from_static(b"123"), Bytes::from_static(b"abc")]) + .map(Ok::<_, Error>); + let body = BodyStream::new(stream); + let bytes = to_bytes(body).await.unwrap(); + assert_eq!(bytes, b"123abc"[..]); + } +}