mirror of https://github.com/fafhrd91/actix-web
make AnyBody generic on Stream body type
This commit is contained in:
parent
13cf5a9e44
commit
8d2cfd0a15
|
@ -3,9 +3,14 @@
|
|||
## Unreleased - 2021-xx-xx
|
||||
### Added
|
||||
* `AnyBody::empty` for quickly creating an empty body. [#2446]
|
||||
* `impl Clone` for `AnyBody<S> where S: Clone`. [#????]
|
||||
* `AnyBody::into_boxed` for quickly converting to a type-erased, boxed body type. [#????]
|
||||
|
||||
### Changed
|
||||
* Rename `AnyBody::{Message => Stream}`. [#2446]
|
||||
* Rename `AnyBody::{from_message => new_boxed}`. [#????]
|
||||
* Rename `BoxAnyBody` to `BoxBody` [#????]
|
||||
* Change representation of `AnyBody` to include a type parameter in `Stream` variant. Defaults to `BoxBody`. [#????]
|
||||
|
||||
### Removed
|
||||
* `AnyBody::Empty`; an empty body can now only be represented as a zero-length `Bytes` variant. [#2446]
|
||||
|
|
|
@ -92,6 +92,7 @@ regex = "1.3"
|
|||
rustls-pemfile = "0.2"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
static_assertions = "1"
|
||||
tls-openssl = { package = "openssl", version = "0.10.9" }
|
||||
tls-rustls = { package = "rustls", version = "0.20.0" }
|
||||
tokio = { version = "1.2", features = ["net", "rt"] }
|
||||
|
|
|
@ -16,7 +16,8 @@ use super::{BodySize, BodyStream, MessageBody, MessageBodyMapErr, SizedStream};
|
|||
pub type Body = AnyBody;
|
||||
|
||||
/// Represents various types of HTTP message body.
|
||||
pub enum AnyBody {
|
||||
#[derive(Clone)]
|
||||
pub enum AnyBody<S = BoxBody> {
|
||||
/// Empty response. `Content-Length` header is not set.
|
||||
None,
|
||||
|
||||
|
@ -24,7 +25,7 @@ pub enum AnyBody {
|
|||
Bytes(Bytes),
|
||||
|
||||
/// Generic message body.
|
||||
Stream(BoxAnyBody),
|
||||
Stream(S),
|
||||
}
|
||||
|
||||
impl AnyBody {
|
||||
|
@ -33,22 +34,45 @@ impl AnyBody {
|
|||
Self::Bytes(Bytes::new())
|
||||
}
|
||||
|
||||
/// Create body from slice (copy)
|
||||
pub fn from_slice(s: &[u8]) -> Self {
|
||||
Self::Bytes(Bytes::copy_from_slice(s))
|
||||
}
|
||||
|
||||
/// Create body from generic message body.
|
||||
pub fn from_message<B>(body: B) -> Self
|
||||
/// Create boxed body from generic message body.
|
||||
pub fn new_boxed<B>(body: B) -> Self
|
||||
where
|
||||
B: MessageBody + 'static,
|
||||
B::Error: Into<Box<dyn StdError + 'static>>,
|
||||
{
|
||||
Self::Stream(BoxAnyBody::from_body(body))
|
||||
Self::Stream(BoxBody::from_body(body))
|
||||
}
|
||||
|
||||
/// Create body from slice (copy)
|
||||
pub fn from_slice(s: &[u8]) -> Self {
|
||||
Self::Bytes(Bytes::copy_from_slice(s))
|
||||
}
|
||||
}
|
||||
|
||||
impl MessageBody for AnyBody {
|
||||
impl<B> AnyBody<B>
|
||||
where
|
||||
B: MessageBody + 'static,
|
||||
B::Error: Into<Box<dyn StdError + 'static>>,
|
||||
{
|
||||
/// Create body from generic message body.
|
||||
pub fn new(body: B) -> Self {
|
||||
Self::Stream(body)
|
||||
}
|
||||
|
||||
pub fn into_boxed(self) -> AnyBody {
|
||||
match self {
|
||||
AnyBody::None => AnyBody::new_boxed(()),
|
||||
AnyBody::Bytes(body) => AnyBody::new_boxed(body),
|
||||
AnyBody::Stream(body) => AnyBody::new_boxed(body),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> MessageBody for AnyBody<S>
|
||||
where
|
||||
S: MessageBody + Unpin,
|
||||
S::Error: StdError + 'static,
|
||||
{
|
||||
type Error = Error;
|
||||
|
||||
fn size(&self) -> BodySize {
|
||||
|
@ -74,8 +98,7 @@ impl MessageBody for AnyBody {
|
|||
}
|
||||
}
|
||||
|
||||
AnyBody::Stream(body) => body
|
||||
.as_pin_mut()
|
||||
AnyBody::Stream(body) => Pin::new(body)
|
||||
.poll_next(cx)
|
||||
.map_err(|err| Error::new_body().with_cause(err)),
|
||||
}
|
||||
|
@ -95,25 +118,25 @@ impl PartialEq for AnyBody {
|
|||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for AnyBody {
|
||||
impl<S: fmt::Debug> fmt::Debug for AnyBody<S> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
AnyBody::None => write!(f, "AnyBody::None"),
|
||||
AnyBody::Bytes(ref b) => write!(f, "AnyBody::Bytes({:?})", b),
|
||||
AnyBody::Stream(_) => write!(f, "AnyBody::Message(_)"),
|
||||
AnyBody::Bytes(ref bytes) => write!(f, "AnyBody::Bytes({:?})", bytes),
|
||||
AnyBody::Stream(ref stream) => write!(f, "AnyBody::Message({:?})", stream),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&'static str> for AnyBody {
|
||||
fn from(s: &'static str) -> Body {
|
||||
AnyBody::Bytes(Bytes::from_static(s.as_ref()))
|
||||
fn from(string: &'static str) -> Body {
|
||||
AnyBody::Bytes(Bytes::from_static(string.as_ref()))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&'static [u8]> for AnyBody {
|
||||
fn from(s: &'static [u8]) -> Body {
|
||||
AnyBody::Bytes(Bytes::from_static(s))
|
||||
fn from(bytes: &'static [u8]) -> Body {
|
||||
AnyBody::Bytes(Bytes::from_static(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -124,20 +147,20 @@ impl From<Vec<u8>> for AnyBody {
|
|||
}
|
||||
|
||||
impl From<String> for AnyBody {
|
||||
fn from(s: String) -> Body {
|
||||
s.into_bytes().into()
|
||||
fn from(string: String) -> Body {
|
||||
string.into_bytes().into()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&'_ String> for AnyBody {
|
||||
fn from(s: &String) -> Body {
|
||||
AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&s)))
|
||||
fn from(string: &String) -> Body {
|
||||
AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(&string)))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Cow<'_, str>> for AnyBody {
|
||||
fn from(s: Cow<'_, str>) -> Body {
|
||||
match s {
|
||||
fn from(string: Cow<'_, str>) -> Body {
|
||||
match string {
|
||||
Cow::Owned(s) => AnyBody::from(s),
|
||||
Cow::Borrowed(s) => {
|
||||
AnyBody::Bytes(Bytes::copy_from_slice(AsRef::<[u8]>::as_ref(s)))
|
||||
|
@ -147,41 +170,41 @@ impl From<Cow<'_, str>> for AnyBody {
|
|||
}
|
||||
|
||||
impl From<Bytes> for AnyBody {
|
||||
fn from(s: Bytes) -> Body {
|
||||
AnyBody::Bytes(s)
|
||||
fn from(bytes: Bytes) -> Body {
|
||||
AnyBody::Bytes(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BytesMut> for AnyBody {
|
||||
fn from(s: BytesMut) -> Body {
|
||||
AnyBody::Bytes(s.freeze())
|
||||
fn from(bytes: BytesMut) -> Body {
|
||||
AnyBody::Bytes(bytes.freeze())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, E> From<SizedStream<S>> for AnyBody
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + 'static,
|
||||
E: Into<Box<dyn StdError>> + 'static,
|
||||
E: StdError + 'static,
|
||||
{
|
||||
fn from(s: SizedStream<S>) -> Body {
|
||||
AnyBody::from_message(s)
|
||||
fn from(stream: SizedStream<S>) -> Body {
|
||||
AnyBody::new_boxed(stream)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, E> From<BodyStream<S>> for AnyBody
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + 'static,
|
||||
E: Into<Box<dyn StdError>> + 'static,
|
||||
E: StdError + 'static,
|
||||
{
|
||||
fn from(s: BodyStream<S>) -> Body {
|
||||
AnyBody::from_message(s)
|
||||
fn from(stream: BodyStream<S>) -> Body {
|
||||
AnyBody::new_boxed(stream)
|
||||
}
|
||||
}
|
||||
|
||||
/// A boxed message body with boxed errors.
|
||||
pub struct BoxAnyBody(Pin<Box<dyn MessageBody<Error = Box<dyn StdError + 'static>>>>);
|
||||
pub struct BoxBody(Pin<Box<dyn MessageBody<Error = Box<dyn StdError>>>>);
|
||||
|
||||
impl BoxAnyBody {
|
||||
impl BoxBody {
|
||||
/// Boxes a `MessageBody` and any errors it generates.
|
||||
pub fn from_body<B>(body: B) -> Self
|
||||
where
|
||||
|
@ -195,18 +218,18 @@ impl BoxAnyBody {
|
|||
/// Returns a mutable pinned reference to the inner message body type.
|
||||
pub fn as_pin_mut(
|
||||
&mut self,
|
||||
) -> Pin<&mut (dyn MessageBody<Error = Box<dyn StdError + 'static>>)> {
|
||||
) -> Pin<&mut (dyn MessageBody<Error = Box<dyn StdError>>)> {
|
||||
self.0.as_mut()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for BoxAnyBody {
|
||||
impl fmt::Debug for BoxBody {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str("BoxAnyBody(dyn MessageBody)")
|
||||
}
|
||||
}
|
||||
|
||||
impl MessageBody for BoxAnyBody {
|
||||
impl MessageBody for BoxBody {
|
||||
type Error = Error;
|
||||
|
||||
fn size(&self) -> BodySize {
|
||||
|
@ -223,3 +246,18 @@ impl MessageBody for BoxAnyBody {
|
|||
.map_err(|err| Error::new_body().with_cause(err))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use static_assertions::{assert_impl_all, assert_not_impl_all};
|
||||
|
||||
use super::*;
|
||||
|
||||
assert_impl_all!(AnyBody<()>: MessageBody, fmt::Debug, Send, Sync);
|
||||
assert_impl_all!(AnyBody<Bytes>: MessageBody, fmt::Debug, Send, Sync);
|
||||
assert_impl_all!(AnyBody: MessageBody, fmt::Debug);
|
||||
assert_impl_all!(BoxBody: MessageBody, fmt::Debug);
|
||||
|
||||
assert_not_impl_all!(AnyBody: Send, Sync);
|
||||
assert_not_impl_all!(BoxBody: Send, Sync);
|
||||
}
|
||||
|
|
|
@ -75,10 +75,22 @@ mod tests {
|
|||
use derive_more::{Display, Error};
|
||||
use futures_core::ready;
|
||||
use futures_util::{stream, FutureExt as _};
|
||||
use static_assertions::{assert_impl_all, assert_not_impl_all};
|
||||
|
||||
use super::*;
|
||||
use crate::body::to_bytes;
|
||||
|
||||
assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, crate::Error>>>: MessageBody);
|
||||
assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, &'static str>>>: MessageBody);
|
||||
assert_impl_all!(BodyStream<stream::Repeat<Result<Bytes, &'static str>>>: MessageBody);
|
||||
assert_impl_all!(BodyStream<stream::Empty<Result<Bytes, Infallible>>>: MessageBody);
|
||||
assert_impl_all!(BodyStream<stream::Repeat<Result<Bytes, Infallible>>>: MessageBody);
|
||||
|
||||
assert_not_impl_all!(BodyStream<stream::Empty<Bytes>>: MessageBody);
|
||||
assert_not_impl_all!(BodyStream<stream::Repeat<Bytes>>: MessageBody);
|
||||
// crate::Error is not Clone
|
||||
assert_not_impl_all!(BodyStream<stream::Repeat<Result<Bytes, crate::Error>>>: MessageBody);
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn skips_empty_chunks() {
|
||||
let body = BodyStream::new(stream::iter(
|
||||
|
@ -124,6 +136,30 @@ mod tests {
|
|||
assert!(matches!(to_bytes(body).await, Err(StreamErr)));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn stream_string_error() {
|
||||
// `&'static str` does not impl `Error`
|
||||
// but it does impl `Into<Box<dyn Error>>`
|
||||
|
||||
let body = BodyStream::new(stream::once(async { Err("stringy error") }));
|
||||
assert!(matches!(to_bytes(body).await, Err("stringy error")));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn stream_boxed_error() {
|
||||
// `Box<dyn Error>` does not impl `Error`
|
||||
// but it does impl `Into<Box<dyn Error>>`
|
||||
|
||||
let body = BodyStream::new(stream::once(async {
|
||||
Err(Box::<dyn StdError>::from("stringy error"))
|
||||
}));
|
||||
|
||||
assert_eq!(
|
||||
to_bytes(body).await.unwrap_err().to_string(),
|
||||
"stringy error"
|
||||
);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn stream_delayed_error() {
|
||||
let body =
|
||||
|
|
|
@ -15,7 +15,7 @@ mod response_body;
|
|||
mod size;
|
||||
mod sized_stream;
|
||||
|
||||
pub use self::body::{AnyBody, Body, BoxAnyBody};
|
||||
pub use self::body::{AnyBody, Body, BoxBody};
|
||||
pub use self::body_stream::BodyStream;
|
||||
pub use self::message_body::MessageBody;
|
||||
pub(crate) use self::message_body::MessageBodyMapErr;
|
||||
|
|
|
@ -72,10 +72,22 @@ mod tests {
|
|||
use actix_rt::pin;
|
||||
use actix_utils::future::poll_fn;
|
||||
use futures_util::stream;
|
||||
use static_assertions::{assert_impl_all, assert_not_impl_all};
|
||||
|
||||
use super::*;
|
||||
use crate::body::to_bytes;
|
||||
|
||||
assert_impl_all!(SizedStream<stream::Empty<Result<Bytes, crate::Error>>>: MessageBody);
|
||||
assert_impl_all!(SizedStream<stream::Empty<Result<Bytes, &'static str>>>: MessageBody);
|
||||
assert_impl_all!(SizedStream<stream::Repeat<Result<Bytes, &'static str>>>: MessageBody);
|
||||
assert_impl_all!(SizedStream<stream::Empty<Result<Bytes, Infallible>>>: MessageBody);
|
||||
assert_impl_all!(SizedStream<stream::Repeat<Result<Bytes, Infallible>>>: MessageBody);
|
||||
|
||||
assert_not_impl_all!(SizedStream<stream::Empty<Bytes>>: MessageBody);
|
||||
assert_not_impl_all!(SizedStream<stream::Repeat<Bytes>>: MessageBody);
|
||||
// crate::Error is not Clone
|
||||
assert_not_impl_all!(SizedStream<stream::Repeat<Result<Bytes, crate::Error>>>: MessageBody);
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn skips_empty_chunks() {
|
||||
let body = SizedStream::new(
|
||||
|
@ -119,4 +131,37 @@ mod tests {
|
|||
|
||||
assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12")));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn stream_string_error() {
|
||||
// `&'static str` does not impl `Error`
|
||||
// but it does impl `Into<Box<dyn Error>>`
|
||||
|
||||
let body = SizedStream::new(0, stream::once(async { Err("stringy error") }));
|
||||
assert_eq!(to_bytes(body).await, Ok(Bytes::new()));
|
||||
|
||||
let body = SizedStream::new(1, stream::once(async { Err("stringy error") }));
|
||||
assert!(matches!(to_bytes(body).await, Err("stringy error")));
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn stream_boxed_error() {
|
||||
// `Box<dyn Error>` does not impl `Error`
|
||||
// but it does impl `Into<Box<dyn Error>>`
|
||||
|
||||
let body = SizedStream::new(
|
||||
0,
|
||||
stream::once(async { Err(Box::<dyn StdError>::from("stringy error")) }),
|
||||
);
|
||||
assert_eq!(to_bytes(body).await.unwrap(), Bytes::new());
|
||||
|
||||
let body = SizedStream::new(
|
||||
1,
|
||||
stream::once(async { Err(Box::<dyn StdError>::from("stringy error")) }),
|
||||
);
|
||||
assert_eq!(
|
||||
to_bytes(body).await.unwrap_err().to_string(),
|
||||
"stringy error"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ use flate2::write::{GzEncoder, ZlibEncoder};
|
|||
use zstd::stream::write::Encoder as ZstdEncoder;
|
||||
|
||||
use crate::{
|
||||
body::{Body, BodySize, BoxAnyBody, MessageBody, ResponseBody},
|
||||
body::{Body, BodySize, BoxBody, MessageBody, ResponseBody},
|
||||
http::{
|
||||
header::{ContentEncoding, CONTENT_ENCODING},
|
||||
HeaderValue, StatusCode,
|
||||
|
@ -100,7 +100,7 @@ impl<B: MessageBody> Encoder<B> {
|
|||
enum EncoderBody<B> {
|
||||
Bytes(Bytes),
|
||||
Stream(#[pin] B),
|
||||
BoxedStream(BoxAnyBody),
|
||||
BoxedStream(BoxBody),
|
||||
}
|
||||
|
||||
impl<B> MessageBody for EncoderBody<B>
|
||||
|
|
|
@ -262,7 +262,7 @@ impl ResponseBuilder {
|
|||
S: Stream<Item = Result<Bytes, E>> + 'static,
|
||||
E: Into<Box<dyn StdError>> + 'static,
|
||||
{
|
||||
self.body(AnyBody::from_message(BodyStream::new(stream)))
|
||||
self.body(AnyBody::new_boxed(BodyStream::new(stream)))
|
||||
}
|
||||
|
||||
/// Generate response with an empty body.
|
||||
|
|
|
@ -9,7 +9,7 @@ use std::{
|
|||
};
|
||||
|
||||
use actix_http::{
|
||||
body::{Body, BodyStream},
|
||||
body::{AnyBody, Body, BodyStream},
|
||||
http::{
|
||||
header::{self, HeaderMap, HeaderName, IntoHeaderValue},
|
||||
Error as HttpError,
|
||||
|
@ -286,7 +286,7 @@ impl RequestSender {
|
|||
response_decompress,
|
||||
timeout,
|
||||
config,
|
||||
Body::from_message(BodyStream::new(stream)),
|
||||
AnyBody::new_boxed(BodyStream::new(stream)),
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ use std::{
|
|||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_http::body::{Body, MessageBody};
|
||||
use actix_http::body::{AnyBody, MessageBody};
|
||||
use actix_service::{Service, Transform};
|
||||
use futures_core::{future::LocalBoxFuture, ready};
|
||||
|
||||
|
@ -124,7 +124,7 @@ where
|
|||
B::Error: Into<Box<dyn StdError + 'static>>,
|
||||
{
|
||||
fn map_body(self) -> ServiceResponse {
|
||||
self.map_body(|_, body| Body::from_message(body))
|
||||
self.map_body(|_, body| AnyBody::new_boxed(body))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -354,10 +354,10 @@ impl HttpResponseBuilder {
|
|||
#[inline]
|
||||
pub fn streaming<S, E>(&mut self, stream: S) -> HttpResponse
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
||||
S: Stream<Item = Result<Bytes, E>> + 'static,
|
||||
E: Into<Box<dyn StdError>> + 'static,
|
||||
{
|
||||
self.body(AnyBody::from_message(BodyStream::new(stream)))
|
||||
self.body(AnyBody::new_boxed(BodyStream::new(stream)))
|
||||
}
|
||||
|
||||
/// Set a json body and generate `Response`
|
||||
|
|
Loading…
Reference in New Issue