This commit is contained in:
Ali MJ Al-Nasrawy 2021-12-17 04:52:38 +03:00
parent 3c0d059d92
commit 6b40c48185
4 changed files with 77 additions and 257 deletions

View File

@ -1,6 +1,6 @@
use std::{ use std::{
error::Error as StdError, error::Error as StdError,
fmt, fmt, mem,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
@ -11,7 +11,13 @@ use super::{BodySize, MessageBody, MessageBodyMapErr};
use crate::Error; use crate::Error;
/// A boxed message body with boxed errors. /// A boxed message body with boxed errors.
pub struct BoxBody(Pin<Box<dyn MessageBody<Error = Box<dyn StdError>>>>); pub struct BoxBody(BoxBodyInner);
enum BoxBodyInner {
None,
Bytes(Bytes),
Stream(Pin<Box<dyn MessageBody<Error = Box<dyn StdError>>>>),
}
impl BoxBody { impl BoxBody {
/// Same as `MessageBody::boxed`. /// Same as `MessageBody::boxed`.
@ -23,19 +29,29 @@ impl BoxBody {
where where
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
let body = MessageBodyMapErr::new(body, Into::into); match body.size() {
Self(Box::pin(body)) BodySize::None => Self(BoxBodyInner::None),
_ => match body.try_into_bytes() {
Ok(bytes) => Self(BoxBodyInner::Bytes(bytes)),
Err(body) => {
let body = MessageBodyMapErr::new(body, Into::into);
Self(BoxBodyInner::Stream(Box::pin(body)))
}
},
}
} }
/// Returns a mutable pinned reference to the inner message body type. /// Returns a mutable pinned reference to the inner message body type.
#[inline] #[inline]
pub fn as_pin_mut(&mut self) -> Pin<&mut (dyn MessageBody<Error = Box<dyn StdError>>)> { //pub fn as_pin_mut(&mut self) -> Pin<&mut (dyn MessageBody<Error = Box<dyn StdError>>)> {
self.0.as_mut() pub fn as_pin_mut(&mut self) -> Pin<&mut Self> {
Pin::new(self)
} }
} }
impl fmt::Debug for BoxBody { impl fmt::Debug for BoxBody {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO show BoxBodyInner
f.write_str("BoxBody(dyn MessageBody)") f.write_str("BoxBody(dyn MessageBody)")
} }
} }
@ -45,7 +61,11 @@ impl MessageBody for BoxBody {
#[inline] #[inline]
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
self.0.size() match &self.0 {
BoxBodyInner::None => BodySize::None,
BoxBodyInner::Bytes(bytes) => BodySize::Sized(bytes.len() as u64),
BoxBodyInner::Stream(stream) => stream.size(),
}
} }
#[inline] #[inline]
@ -53,20 +73,19 @@ impl MessageBody for BoxBody {
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
self.0 match &mut self.0 {
.as_mut() BoxBodyInner::None => Poll::Ready(None),
.poll_next(cx) BoxBodyInner::Bytes(bytes) => Poll::Ready(Some(Ok(mem::take(bytes)))),
.map_err(|err| Error::new_body().with_cause(err)) BoxBodyInner::Stream(stream) => Pin::new(stream).poll_next(cx).map_err(|err| Error::new_body().with_cause(err)),
}
} }
#[inline] #[inline]
fn is_complete_body(&self) -> bool { fn try_into_bytes(self) -> Result<Bytes, Self> {
self.0.is_complete_body() match self.0 {
} BoxBodyInner::Bytes(bytes) => Ok(bytes),
_ => Err(self),
#[inline] }
fn take_complete_body(&mut self) -> Bytes {
self.0.take_complete_body()
} }
#[inline] #[inline]

View File

@ -74,18 +74,14 @@ where
} }
#[inline] #[inline]
fn is_complete_body(&self) -> bool { fn try_into_bytes(self) -> Result<Bytes, Self> {
match self { match self {
EitherBody::Left { body } => body.is_complete_body(), EitherBody::Left { body } => body
EitherBody::Right { body } => body.is_complete_body(), .try_into_bytes()
} .map_err(|body| EitherBody::Left { body }),
} EitherBody::Right { body } => body
.try_into_bytes()
#[inline] .map_err(|body| EitherBody::Right { body }),
fn take_complete_body(&mut self) -> Bytes {
match self {
EitherBody::Left { body } => body.take_complete_body(),
EitherBody::Right { body } => body.take_complete_body(),
} }
} }

View File

@ -31,51 +31,11 @@ pub trait MessageBody {
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>>; ) -> Poll<Option<Result<Bytes, Self::Error>>>;
/// Returns true if entire body bytes chunk is obtainable in one call to `poll_next`. fn try_into_bytes(self) -> Result<Bytes, Self>
/// where
/// This method's implementation should agree with [`take_complete_body`] and should always be Self: Sized,
/// checked before taking the body. {
/// Err(self)
/// The default implementation returns `false.
///
/// [`take_complete_body`]: MessageBody::take_complete_body
fn is_complete_body(&self) -> bool {
false
}
/// Returns the complete chunk of body bytes.
///
/// Implementors of this method should note the following:
/// - It is acceptable to skip the omit checks of [`is_complete_body`]. The responsibility of
/// performing this check is delegated to the caller.
/// - If the result of [`is_complete_body`] is conditional, that condition should be given
/// equivalent attention here.
/// - A second call call to [`take_complete_body`] should return an empty `Bytes` or panic.
/// - A call to [`poll_next`] after calling [`take_complete_body`] should return `None` unless
/// the chunk is guaranteed to be empty.
///
/// The default implementation panics unconditionally, indicating a control flow bug in the
/// calling code.
///
/// # Panics
/// With a correct implementation, panics if called without first checking [`is_complete_body`].
///
/// [`is_complete_body`]: MessageBody::is_complete_body
/// [`take_complete_body`]: MessageBody::take_complete_body
/// [`poll_next`]: MessageBody::poll_next
fn take_complete_body(&mut self) -> Bytes {
assert!(
self.is_complete_body(),
"type ({}) allows taking complete body but did not provide an implementation \
of `take_complete_body`",
std::any::type_name::<Self>()
);
unimplemented!(
"type ({}) does not allow taking complete body; caller should make sure to \
check `is_complete_body` first",
std::any::type_name::<Self>()
);
} }
/// Converts this body into `BoxBody`. /// Converts this body into `BoxBody`.
@ -104,14 +64,6 @@ mod foreign_impls {
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
match *self {} match *self {}
} }
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
match *self {}
}
} }
impl MessageBody for () { impl MessageBody for () {
@ -131,13 +83,8 @@ mod foreign_impls {
} }
#[inline] #[inline]
fn is_complete_body(&self) -> bool { fn try_into_bytes(self) -> Result<Bytes, Self> {
true Ok(Bytes::new())
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
Bytes::new()
} }
} }
@ -159,16 +106,6 @@ mod foreign_impls {
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
Pin::new(self.get_mut().as_mut()).poll_next(cx) Pin::new(self.get_mut().as_mut()).poll_next(cx)
} }
#[inline]
fn is_complete_body(&self) -> bool {
self.as_ref().is_complete_body()
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
self.as_mut().take_complete_body()
}
} }
impl<B> MessageBody for Pin<Box<B>> impl<B> MessageBody for Pin<Box<B>>
@ -189,38 +126,6 @@ mod foreign_impls {
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
self.get_mut().as_mut().poll_next(cx) self.get_mut().as_mut().poll_next(cx)
} }
#[inline]
fn is_complete_body(&self) -> bool {
self.as_ref().is_complete_body()
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
debug_assert!(
self.is_complete_body(),
"inner type \"{}\" does not allow taking complete body; caller should make sure to \
call `is_complete_body` first",
std::any::type_name::<B>(),
);
// we do not have DerefMut access to call take_complete_body directly but since
// is_complete_body is true we should expect the entire bytes chunk in one poll_next
let waker = futures_task::noop_waker();
let mut cx = Context::from_waker(&waker);
match self.as_mut().poll_next(&mut cx) {
Poll::Ready(Some(Ok(data))) => data,
_ => {
panic!(
"inner type \"{}\" indicated it allows taking complete body but failed to \
return Bytes when polled",
std::any::type_name::<B>()
);
}
}
}
} }
impl MessageBody for &'static [u8] { impl MessageBody for &'static [u8] {
@ -232,24 +137,19 @@ mod foreign_impls {
} }
fn poll_next( fn poll_next(
mut self: Pin<&mut Self>, self: Pin<&mut Self>,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(self.take_complete_body()))) Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self.get_mut())))))
} }
} }
#[inline] #[inline]
fn is_complete_body(&self) -> bool { fn try_into_bytes(self) -> Result<Bytes, Self> {
true Ok(Bytes::from_static(self))
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
Bytes::from_static(mem::take(self))
} }
} }
@ -262,24 +162,19 @@ mod foreign_impls {
} }
fn poll_next( fn poll_next(
mut self: Pin<&mut Self>, self: Pin<&mut Self>,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(self.take_complete_body()))) Poll::Ready(Some(Ok(mem::take(self.get_mut()))))
} }
} }
#[inline] #[inline]
fn is_complete_body(&self) -> bool { fn try_into_bytes(self) -> Result<Bytes, Self> {
true Ok(self)
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
mem::take(self)
} }
} }
@ -292,24 +187,19 @@ mod foreign_impls {
} }
fn poll_next( fn poll_next(
mut self: Pin<&mut Self>, self: Pin<&mut Self>,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(self.take_complete_body()))) Poll::Ready(Some(Ok(mem::take(self.get_mut()).freeze())))
} }
} }
#[inline] #[inline]
fn is_complete_body(&self) -> bool { fn try_into_bytes(self) -> Result<Bytes, Self> {
true Ok(self.freeze())
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
mem::take(self).freeze()
} }
} }
@ -322,24 +212,19 @@ mod foreign_impls {
} }
fn poll_next( fn poll_next(
mut self: Pin<&mut Self>, self: Pin<&mut Self>,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(self.take_complete_body()))) Poll::Ready(Some(Ok(mem::take(self.get_mut()).into())))
} }
} }
#[inline] #[inline]
fn is_complete_body(&self) -> bool { fn try_into_bytes(self) -> Result<Bytes, Self> {
true Ok(Bytes::from(self))
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
Bytes::from(mem::take(self))
} }
} }
@ -365,13 +250,8 @@ mod foreign_impls {
} }
#[inline] #[inline]
fn is_complete_body(&self) -> bool { fn try_into_bytes(self) -> Result<Bytes, Self> {
true Ok(Bytes::from_static(self.as_bytes()))
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
Bytes::from_static(mem::take(self).as_bytes())
} }
} }
@ -396,13 +276,8 @@ mod foreign_impls {
} }
#[inline] #[inline]
fn is_complete_body(&self) -> bool { fn try_into_bytes(self) -> Result<Bytes, Self> {
true Ok(Bytes::from(self))
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
Bytes::from(mem::take(self))
} }
} }
@ -423,13 +298,8 @@ mod foreign_impls {
} }
#[inline] #[inline]
fn is_complete_body(&self) -> bool { fn try_into_bytes(self) -> Result<Bytes, Self> {
true Ok(self.into_bytes())
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
mem::take(self).into_bytes()
} }
} }
} }
@ -486,13 +356,9 @@ where
} }
#[inline] #[inline]
fn is_complete_body(&self) -> bool { fn try_into_bytes(self) -> Result<Bytes, Self> {
self.body.is_complete_body() let Self { body, mapper } = self;
} body.try_into_bytes().map_err(|body| Self { body, mapper })
#[inline]
fn take_complete_body(&mut self) -> Bytes {
self.body.take_complete_body()
} }
} }
@ -604,51 +470,6 @@ mod tests {
assert_poll_next!(pl, Bytes::from("test")); assert_poll_next!(pl, Bytes::from("test"));
} }
#[test]
fn take_string() {
let mut data = "test".repeat(2);
let data_bytes = Bytes::from(data.clone());
assert!(data.is_complete_body());
assert_eq!(data.take_complete_body(), data_bytes);
let mut big_data = "test".repeat(64 * 1024);
let data_bytes = Bytes::from(big_data.clone());
assert!(big_data.is_complete_body());
assert_eq!(big_data.take_complete_body(), data_bytes);
}
#[test]
fn take_boxed_equivalence() {
let mut data = Bytes::from_static(b"test");
assert!(data.is_complete_body());
assert_eq!(data.take_complete_body(), b"test".as_ref());
let mut data = Box::new(Bytes::from_static(b"test"));
assert!(data.is_complete_body());
assert_eq!(data.take_complete_body(), b"test".as_ref());
let mut data = Box::pin(Bytes::from_static(b"test"));
assert!(data.is_complete_body());
assert_eq!(data.take_complete_body(), b"test".as_ref());
}
#[test]
fn take_policy() {
let mut data = Bytes::from_static(b"test");
// first call returns chunk
assert_eq!(data.take_complete_body(), b"test".as_ref());
// second call returns empty
assert_eq!(data.take_complete_body(), b"".as_ref());
let waker = futures_task::noop_waker();
let mut cx = Context::from_waker(&waker);
let mut data = Bytes::from_static(b"test");
// take returns whole chunk
assert_eq!(data.take_complete_body(), b"test".as_ref());
// subsequent poll_next returns None
assert_eq!(Pin::new(&mut data).poll_next(&mut cx), Poll::Ready(None));
}
#[test] #[test]
fn complete_body_combinators() { fn complete_body_combinators() {
use crate::body::{BoxBody, EitherBody}; use crate::body::{BoxBody, EitherBody};
@ -661,13 +482,7 @@ mod tests {
let body = Box::pin(body); let body = Box::pin(body);
let mut body = body; let mut body = body;
assert!(body.is_complete_body()); assert_eq!(body.try_into_bytes().unwrap(), b"test".as_ref());
assert_eq!(body.take_complete_body(), b"test".as_ref());
// subsequent poll_next returns None
let waker = futures_task::noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(Pin::new(&mut body).poll_next(&mut cx).map_err(drop) == Poll::Ready(None));
} }
// down-casting used to be done with a method on MessageBody trait // down-casting used to be done with a method on MessageBody trait

View File

@ -40,14 +40,4 @@ impl MessageBody for None {
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
Poll::Ready(Option::None) Poll::Ready(Option::None)
} }
#[inline]
fn is_complete_body(&self) -> bool {
true
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
Bytes::new()
}
} }