From 6b40c4818505680aaca74d7601c07983dc54fbff Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Fri, 17 Dec 2021 04:52:38 +0300 Subject: [PATCH] wip --- actix-http/src/body/boxed.rs | 55 ++++-- actix-http/src/body/either.rs | 18 +- actix-http/src/body/message_body.rs | 251 ++++------------------------ actix-http/src/body/none.rs | 10 -- 4 files changed, 77 insertions(+), 257 deletions(-) diff --git a/actix-http/src/body/boxed.rs b/actix-http/src/body/boxed.rs index 7581bec88..2603aa762 100644 --- a/actix-http/src/body/boxed.rs +++ b/actix-http/src/body/boxed.rs @@ -1,6 +1,6 @@ use std::{ error::Error as StdError, - fmt, + fmt, mem, pin::Pin, task::{Context, Poll}, }; @@ -11,7 +11,13 @@ use super::{BodySize, MessageBody, MessageBodyMapErr}; use crate::Error; /// A boxed message body with boxed errors. -pub struct BoxBody(Pin>>>); +pub struct BoxBody(BoxBodyInner); + +enum BoxBodyInner { + None, + Bytes(Bytes), + Stream(Pin>>>), +} impl BoxBody { /// Same as `MessageBody::boxed`. @@ -23,19 +29,29 @@ impl BoxBody { where B: MessageBody + 'static, { - let body = MessageBodyMapErr::new(body, Into::into); - Self(Box::pin(body)) + match body.size() { + BodySize::None => Self(BoxBodyInner::None), + _ => match body.try_into_bytes() { + Ok(bytes) => Self(BoxBodyInner::Bytes(bytes)), + Err(body) => { + let body = MessageBodyMapErr::new(body, Into::into); + Self(BoxBodyInner::Stream(Box::pin(body))) + } + }, + } } /// Returns a mutable pinned reference to the inner message body type. #[inline] - pub fn as_pin_mut(&mut self) -> Pin<&mut (dyn MessageBody>)> { - self.0.as_mut() + //pub fn as_pin_mut(&mut self) -> Pin<&mut (dyn MessageBody>)> { + pub fn as_pin_mut(&mut self) -> Pin<&mut Self> { + Pin::new(self) } } impl fmt::Debug for BoxBody { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO show BoxBodyInner f.write_str("BoxBody(dyn MessageBody)") } } @@ -45,7 +61,11 @@ impl MessageBody for BoxBody { #[inline] fn size(&self) -> BodySize { - self.0.size() + match &self.0 { + BoxBodyInner::None => BodySize::None, + BoxBodyInner::Bytes(bytes) => BodySize::Sized(bytes.len() as u64), + BoxBodyInner::Stream(stream) => stream.size(), + } } #[inline] @@ -53,20 +73,19 @@ impl MessageBody for BoxBody { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - self.0 - .as_mut() - .poll_next(cx) - .map_err(|err| Error::new_body().with_cause(err)) + match &mut self.0 { + BoxBodyInner::None => Poll::Ready(None), + BoxBodyInner::Bytes(bytes) => Poll::Ready(Some(Ok(mem::take(bytes)))), + BoxBodyInner::Stream(stream) => Pin::new(stream).poll_next(cx).map_err(|err| Error::new_body().with_cause(err)), + } } #[inline] - fn is_complete_body(&self) -> bool { - self.0.is_complete_body() - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - self.0.take_complete_body() + fn try_into_bytes(self) -> Result { + match self.0 { + BoxBodyInner::Bytes(bytes) => Ok(bytes), + _ => Err(self), + } } #[inline] diff --git a/actix-http/src/body/either.rs b/actix-http/src/body/either.rs index 3a4082dc9..add1eab7c 100644 --- a/actix-http/src/body/either.rs +++ b/actix-http/src/body/either.rs @@ -74,18 +74,14 @@ where } #[inline] - fn is_complete_body(&self) -> bool { + fn try_into_bytes(self) -> Result { match self { - EitherBody::Left { body } => body.is_complete_body(), - EitherBody::Right { body } => body.is_complete_body(), - } - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - match self { - EitherBody::Left { body } => body.take_complete_body(), - EitherBody::Right { body } => body.take_complete_body(), + EitherBody::Left { body } => body + .try_into_bytes() + .map_err(|body| EitherBody::Left { body }), + EitherBody::Right { body } => body + .try_into_bytes() + .map_err(|body| EitherBody::Right { body }), } } diff --git a/actix-http/src/body/message_body.rs b/actix-http/src/body/message_body.rs index 075ae7220..7826ef54f 100644 --- a/actix-http/src/body/message_body.rs +++ b/actix-http/src/body/message_body.rs @@ -31,51 +31,11 @@ pub trait MessageBody { cx: &mut Context<'_>, ) -> Poll>>; - /// Returns true if entire body bytes chunk is obtainable in one call to `poll_next`. - /// - /// This method's implementation should agree with [`take_complete_body`] and should always be - /// checked before taking the body. - /// - /// The default implementation returns `false. - /// - /// [`take_complete_body`]: MessageBody::take_complete_body - fn is_complete_body(&self) -> bool { - false - } - - /// Returns the complete chunk of body bytes. - /// - /// Implementors of this method should note the following: - /// - It is acceptable to skip the omit checks of [`is_complete_body`]. The responsibility of - /// performing this check is delegated to the caller. - /// - If the result of [`is_complete_body`] is conditional, that condition should be given - /// equivalent attention here. - /// - A second call call to [`take_complete_body`] should return an empty `Bytes` or panic. - /// - A call to [`poll_next`] after calling [`take_complete_body`] should return `None` unless - /// the chunk is guaranteed to be empty. - /// - /// The default implementation panics unconditionally, indicating a control flow bug in the - /// calling code. - /// - /// # Panics - /// With a correct implementation, panics if called without first checking [`is_complete_body`]. - /// - /// [`is_complete_body`]: MessageBody::is_complete_body - /// [`take_complete_body`]: MessageBody::take_complete_body - /// [`poll_next`]: MessageBody::poll_next - fn take_complete_body(&mut self) -> Bytes { - assert!( - self.is_complete_body(), - "type ({}) allows taking complete body but did not provide an implementation \ - of `take_complete_body`", - std::any::type_name::() - ); - - unimplemented!( - "type ({}) does not allow taking complete body; caller should make sure to \ - check `is_complete_body` first", - std::any::type_name::() - ); + fn try_into_bytes(self) -> Result + where + Self: Sized, + { + Err(self) } /// Converts this body into `BoxBody`. @@ -104,14 +64,6 @@ mod foreign_impls { ) -> Poll>> { match *self {} } - - fn is_complete_body(&self) -> bool { - true - } - - fn take_complete_body(&mut self) -> Bytes { - match *self {} - } } impl MessageBody for () { @@ -131,13 +83,8 @@ mod foreign_impls { } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::new() + fn try_into_bytes(self) -> Result { + Ok(Bytes::new()) } } @@ -159,16 +106,6 @@ mod foreign_impls { ) -> Poll>> { Pin::new(self.get_mut().as_mut()).poll_next(cx) } - - #[inline] - fn is_complete_body(&self) -> bool { - self.as_ref().is_complete_body() - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - self.as_mut().take_complete_body() - } } impl MessageBody for Pin> @@ -189,38 +126,6 @@ mod foreign_impls { ) -> Poll>> { self.get_mut().as_mut().poll_next(cx) } - - #[inline] - fn is_complete_body(&self) -> bool { - self.as_ref().is_complete_body() - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - debug_assert!( - self.is_complete_body(), - "inner type \"{}\" does not allow taking complete body; caller should make sure to \ - call `is_complete_body` first", - std::any::type_name::(), - ); - - // we do not have DerefMut access to call take_complete_body directly but since - // is_complete_body is true we should expect the entire bytes chunk in one poll_next - - let waker = futures_task::noop_waker(); - let mut cx = Context::from_waker(&waker); - - match self.as_mut().poll_next(&mut cx) { - Poll::Ready(Some(Ok(data))) => data, - _ => { - panic!( - "inner type \"{}\" indicated it allows taking complete body but failed to \ - return Bytes when polled", - std::any::type_name::() - ); - } - } - } } impl MessageBody for &'static [u8] { @@ -232,24 +137,19 @@ mod foreign_impls { } fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(self.take_complete_body()))) + Poll::Ready(Some(Ok(Bytes::from_static(mem::take(self.get_mut()))))) } } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::from_static(mem::take(self)) + fn try_into_bytes(self) -> Result { + Ok(Bytes::from_static(self)) } } @@ -262,24 +162,19 @@ mod foreign_impls { } fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(self.take_complete_body()))) + Poll::Ready(Some(Ok(mem::take(self.get_mut())))) } } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - mem::take(self) + fn try_into_bytes(self) -> Result { + Ok(self) } } @@ -292,24 +187,19 @@ mod foreign_impls { } fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(self.take_complete_body()))) + Poll::Ready(Some(Ok(mem::take(self.get_mut()).freeze()))) } } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - mem::take(self).freeze() + fn try_into_bytes(self) -> Result { + Ok(self.freeze()) } } @@ -322,24 +212,19 @@ mod foreign_impls { } fn poll_next( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - Poll::Ready(Some(Ok(self.take_complete_body()))) + Poll::Ready(Some(Ok(mem::take(self.get_mut()).into()))) } } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::from(mem::take(self)) + fn try_into_bytes(self) -> Result { + Ok(Bytes::from(self)) } } @@ -365,13 +250,8 @@ mod foreign_impls { } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::from_static(mem::take(self).as_bytes()) + fn try_into_bytes(self) -> Result { + Ok(Bytes::from_static(self.as_bytes())) } } @@ -396,13 +276,8 @@ mod foreign_impls { } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::from(mem::take(self)) + fn try_into_bytes(self) -> Result { + Ok(Bytes::from(self)) } } @@ -423,13 +298,8 @@ mod foreign_impls { } #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - mem::take(self).into_bytes() + fn try_into_bytes(self) -> Result { + Ok(self.into_bytes()) } } } @@ -486,13 +356,9 @@ where } #[inline] - fn is_complete_body(&self) -> bool { - self.body.is_complete_body() - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - self.body.take_complete_body() + fn try_into_bytes(self) -> Result { + let Self { body, mapper } = self; + body.try_into_bytes().map_err(|body| Self { body, mapper }) } } @@ -604,51 +470,6 @@ mod tests { assert_poll_next!(pl, Bytes::from("test")); } - #[test] - fn take_string() { - let mut data = "test".repeat(2); - let data_bytes = Bytes::from(data.clone()); - assert!(data.is_complete_body()); - assert_eq!(data.take_complete_body(), data_bytes); - - let mut big_data = "test".repeat(64 * 1024); - let data_bytes = Bytes::from(big_data.clone()); - assert!(big_data.is_complete_body()); - assert_eq!(big_data.take_complete_body(), data_bytes); - } - - #[test] - fn take_boxed_equivalence() { - let mut data = Bytes::from_static(b"test"); - assert!(data.is_complete_body()); - assert_eq!(data.take_complete_body(), b"test".as_ref()); - - let mut data = Box::new(Bytes::from_static(b"test")); - assert!(data.is_complete_body()); - assert_eq!(data.take_complete_body(), b"test".as_ref()); - - let mut data = Box::pin(Bytes::from_static(b"test")); - assert!(data.is_complete_body()); - assert_eq!(data.take_complete_body(), b"test".as_ref()); - } - - #[test] - fn take_policy() { - let mut data = Bytes::from_static(b"test"); - // first call returns chunk - assert_eq!(data.take_complete_body(), b"test".as_ref()); - // second call returns empty - assert_eq!(data.take_complete_body(), b"".as_ref()); - - let waker = futures_task::noop_waker(); - let mut cx = Context::from_waker(&waker); - let mut data = Bytes::from_static(b"test"); - // take returns whole chunk - assert_eq!(data.take_complete_body(), b"test".as_ref()); - // subsequent poll_next returns None - assert_eq!(Pin::new(&mut data).poll_next(&mut cx), Poll::Ready(None)); - } - #[test] fn complete_body_combinators() { use crate::body::{BoxBody, EitherBody}; @@ -661,13 +482,7 @@ mod tests { let body = Box::pin(body); let mut body = body; - assert!(body.is_complete_body()); - assert_eq!(body.take_complete_body(), b"test".as_ref()); - - // subsequent poll_next returns None - let waker = futures_task::noop_waker(); - let mut cx = Context::from_waker(&waker); - assert!(Pin::new(&mut body).poll_next(&mut cx).map_err(drop) == Poll::Ready(None)); + assert_eq!(body.try_into_bytes().unwrap(), b"test".as_ref()); } // down-casting used to be done with a method on MessageBody trait diff --git a/actix-http/src/body/none.rs b/actix-http/src/body/none.rs index bb494078f..0fc7c8c9f 100644 --- a/actix-http/src/body/none.rs +++ b/actix-http/src/body/none.rs @@ -40,14 +40,4 @@ impl MessageBody for None { ) -> Poll>> { Poll::Ready(Option::None) } - - #[inline] - fn is_complete_body(&self) -> bool { - true - } - - #[inline] - fn take_complete_body(&mut self) -> Bytes { - Bytes::new() - } }