move to_bytes to utils mod

This commit is contained in:
Rob Ede 2021-11-24 02:01:48 +00:00
parent d36ca3a133
commit 21e3e1d620
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
4 changed files with 82 additions and 65 deletions

View File

@ -70,7 +70,7 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
async fn nested_boxed_body() { async fn nested_boxed_body() {
let body = Bytes::from(&[1, 2, 3]); let body = Bytes::from_static(&[1, 2, 3]);
let boxed_body = BoxBody::new(BoxBody::new(body)); let boxed_body = BoxBody::new(BoxBody::new(body));
assert_eq!( assert_eq!(

View File

@ -78,6 +78,6 @@ mod tests {
#[test] #[test]
fn either_body_works() { fn either_body_works() {
EitherBody::left(()); let body = EitherBody::new(());
} }
} }

View File

@ -1,12 +1,5 @@
//! Traits and structures to aid consuming and writing HTTP payloads. //! Traits and structures to aid consuming and writing HTTP payloads.
use std::task::Poll;
use actix_rt::pin;
use actix_utils::future::poll_fn;
use bytes::{Bytes, BytesMut};
use futures_core::ready;
#[allow(clippy::module_inception)] #[allow(clippy::module_inception)]
mod body; mod body;
mod body_stream; mod body_stream;
@ -15,6 +8,7 @@ mod either;
mod message_body; mod message_body;
mod size; mod size;
mod sized_stream; mod sized_stream;
mod utils;
pub use self::body::AnyBody; pub use self::body::AnyBody;
#[allow(deprecated)] #[allow(deprecated)]
@ -26,51 +20,7 @@ pub use self::message_body::MessageBody;
pub(crate) use self::message_body::MessageBodyMapErr; pub(crate) use self::message_body::MessageBodyMapErr;
pub use self::size::BodySize; pub use self::size::BodySize;
pub use self::sized_stream::SizedStream; pub use self::sized_stream::SizedStream;
pub use self::utils::to_bytes;
/// Collects the body produced by a `MessageBody` implementation into `Bytes`.
///
/// Any errors produced by the body stream are returned immediately.
///
/// # Examples
/// ```
/// use actix_http::body::{AnyBody, to_bytes};
/// use bytes::Bytes;
///
/// # async fn test_to_bytes() {
/// let body = AnyBody::none();
/// let bytes = to_bytes(body).await.unwrap();
/// assert!(bytes.is_empty());
///
/// let body = AnyBody::copy_from_slice(b"123");
/// let bytes = to_bytes(body).await.unwrap();
/// assert_eq!(bytes, b"123"[..]);
/// # }
/// ```
pub async fn to_bytes<B: MessageBody>(body: B) -> Result<Bytes, B::Error> {
let cap = match body.size() {
BodySize::None | BodySize::Sized(0) => return Ok(Bytes::new()),
BodySize::Sized(size) => size as usize,
// good enough first guess for chunk size
BodySize::Stream => 32_768,
};
let mut buf = BytesMut::with_capacity(cap);
pin!(body);
poll_fn(|cx| loop {
let body = body.as_mut();
match ready!(body.poll_next(cx)) {
Some(Ok(bytes)) => buf.extend_from_slice(&*bytes),
None => return Poll::Ready(Ok(())),
Some(Err(err)) => return Poll::Ready(Err(err)),
}
})
.await?;
Ok(buf.freeze())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
@ -263,15 +213,4 @@ mod tests {
let not_body = resp_body.downcast_ref::<()>(); let not_body = resp_body.downcast_ref::<()>();
assert!(not_body.is_none()); assert!(not_body.is_none());
} }
#[actix_rt::test]
async fn test_to_bytes() {
let body = AnyBody::empty();
let bytes = to_bytes(body).await.unwrap();
assert!(bytes.is_empty());
let body = AnyBody::copy_from_slice(b"123");
let bytes = to_bytes(body).await.unwrap();
assert_eq!(bytes, b"123"[..]);
}
} }

View File

@ -0,0 +1,78 @@
use std::task::Poll;
use actix_rt::pin;
use actix_utils::future::poll_fn;
use bytes::{Bytes, BytesMut};
use futures_core::ready;
use super::{BodySize, MessageBody};
/// Collects the body produced by a `MessageBody` implementation into `Bytes`.
///
/// Any errors produced by the body stream are returned immediately.
///
/// # Examples
/// ```
/// use actix_http::body::{AnyBody, to_bytes};
/// use bytes::Bytes;
///
/// # async fn test_to_bytes() {
/// let body = AnyBody::none();
/// let bytes = to_bytes(body).await.unwrap();
/// assert!(bytes.is_empty());
///
/// let body = AnyBody::copy_from_slice(b"123");
/// let bytes = to_bytes(body).await.unwrap();
/// assert_eq!(bytes, b"123"[..]);
/// # }
/// ```
pub async fn to_bytes<B: MessageBody>(body: B) -> Result<Bytes, B::Error> {
let cap = match body.size() {
BodySize::None | BodySize::Sized(0) => return Ok(Bytes::new()),
BodySize::Sized(size) => size as usize,
// good enough first guess for chunk size
BodySize::Stream => 32_768,
};
let mut buf = BytesMut::with_capacity(cap);
pin!(body);
poll_fn(|cx| loop {
let body = body.as_mut();
match ready!(body.poll_next(cx)) {
Some(Ok(bytes)) => buf.extend_from_slice(&*bytes),
None => return Poll::Ready(Ok(())),
Some(Err(err)) => return Poll::Ready(Err(err)),
}
})
.await?;
Ok(buf.freeze())
}
#[cfg(test)]
mod test {
use futures_util::{stream, StreamExt as _};
use super::*;
use crate::{body::BodyStream, Error};
#[actix_rt::test]
async fn test_to_bytes() {
let bytes = to_bytes(()).await.unwrap();
assert!(bytes.is_empty());
let body = Bytes::from_static(b"123");
let bytes = to_bytes(body).await.unwrap();
assert_eq!(bytes, b"123"[..]);
let stream =
stream::iter(vec![Bytes::from_static(b"123"), Bytes::from_static(b"abc")])
.map(Ok::<_, Error>);
let body = BodyStream::new(stream);
let bytes = to_bytes(body).await.unwrap();
assert_eq!(bytes, b"123abc"[..]);
}
}