From b6f366b71f451f4ae5af1e489b1a143d8a4354ad Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 8 Dec 2021 21:51:39 +0000 Subject: [PATCH] provide efficient bytes moving with more methods on MessageBody --- actix-http/src/body/boxed.rs | 32 +++++++ actix-http/src/body/either.rs | 14 +++ actix-http/src/body/message_body.rs | 142 +++++++++++++++++++++++++--- actix-http/src/body/none.rs | 10 ++ actix-http/src/encoding/encoder.rs | 84 ++++++++++++---- examples/basic.rs | 4 +- 6 files changed, 256 insertions(+), 30 deletions(-) diff --git a/actix-http/src/body/boxed.rs b/actix-http/src/body/boxed.rs index c39da10c0..0c3473d7a 100644 --- a/actix-http/src/body/boxed.rs +++ b/actix-http/src/body/boxed.rs @@ -51,6 +51,38 @@ impl MessageBody for BoxBody { .poll_next(cx) .map_err(|err| Error::new_body().with_cause(err)) } + + fn is_complete_body(&self) -> bool { + let a = self.0.is_complete_body(); + eprintln!("BoxBody is complete?: {}", a); + a + } + + fn take_complete_body(&mut self) -> Bytes { + eprintln!("taking box body contents"); + + debug_assert!( + self.is_complete_body(), + "boxed type does not allow taking complete body; caller should make sure to \ + call `is_complete_body` first", + ); + + // we do not have DerefMut access to call take_complete_body directly but since + // is_complete_body is true we should expect the entire bytes chunk in one poll_next + + let waker = futures_util::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + match self.as_pin_mut().poll_next(&mut cx) { + Poll::Ready(Some(Ok(data))) => data, + _ => { + panic!( + "boxed type indicated it allows taking complete body but failed to \ + return Bytes when polled", + ); + } + } + } } #[cfg(test)] diff --git a/actix-http/src/body/either.rs b/actix-http/src/body/either.rs index 6169ee627..6135d834d 100644 --- a/actix-http/src/body/either.rs +++ b/actix-http/src/body/either.rs @@ -67,6 +67,20 @@ where .map_err(|err| Error::new_body().with_cause(err)), } } + + fn is_complete_body(&self) -> bool { + match self { + EitherBody::Left { body } => body.is_complete_body(), + EitherBody::Right { body } => body.is_complete_body(), + } + } + + fn take_complete_body(&mut self) -> Bytes { + match self { + EitherBody::Left { body } => body.take_complete_body(), + EitherBody::Right { body } => body.take_complete_body(), + } + } } #[cfg(test)] diff --git a/actix-http/src/body/message_body.rs b/actix-http/src/body/message_body.rs index 053b6f286..c360a7d28 100644 --- a/actix-http/src/body/message_body.rs +++ b/actix-http/src/body/message_body.rs @@ -29,6 +29,17 @@ pub trait MessageBody { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>>; + + fn is_complete_body(&self) -> bool { + false + } + + fn take_complete_body(&mut self) -> Bytes { + unimplemented!( + "type ({}) allows taking complete body but did not provide an implementation", + std::any::type_name::() + ); + } } mod foreign_impls { @@ -49,6 +60,14 @@ mod foreign_impls { ) -> Poll>> { match *self {} } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + match *self {} + } } impl MessageBody for () { @@ -66,6 +85,14 @@ mod foreign_impls { ) -> Poll>> { Poll::Ready(None) } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + Bytes::new() + } } impl MessageBody for Box @@ -86,6 +113,14 @@ mod foreign_impls { ) -> Poll>> { Pin::new(self.get_mut().as_mut()).poll_next(cx) } + + fn is_complete_body(&self) -> bool { + self.as_ref().is_complete_body() + } + + fn take_complete_body(&mut self) -> Bytes { + self.as_mut().take_complete_body() + } } impl MessageBody for Pin> @@ -106,6 +141,36 @@ mod foreign_impls { ) -> Poll>> { self.as_mut().poll_next(cx) } + + fn is_complete_body(&self) -> bool { + self.as_ref().is_complete_body() + } + + fn take_complete_body(&mut self) -> Bytes { + debug_assert!( + self.is_complete_body(), + "inner type \"{}\" does not allow taking complete body; caller should make sure to \ + call `is_complete_body` first", + std::any::type_name::(), + ); + + // we do not have DerefMut access to call take_complete_body directly but since + // is_complete_body is true we should expect the entire bytes chunk in one poll_next + + let waker = futures_util::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + + match self.as_mut().poll_next(&mut cx) { + Poll::Ready(Some(Ok(data))) => data, + _ => { + panic!( + "inner type \"{}\" indicated it allows taking complete body but failed to \ + return Bytes when polled", + std::any::type_name::() + ); + } + } + } } impl MessageBody for &'static [u8] { @@ -116,17 +181,23 @@ mod foreign_impls { } fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - let bytes = mem::take(self.get_mut()); - let bytes = Bytes::from_static(bytes); - Poll::Ready(Some(Ok(bytes))) + Poll::Ready(Some(Ok(self.take_complete_body()))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + Bytes::from_static(mem::take(self)) + } } impl MessageBody for Bytes { @@ -137,16 +208,23 @@ mod foreign_impls { } fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - let bytes = mem::take(self.get_mut()); - Poll::Ready(Some(Ok(bytes))) + Poll::Ready(Some(Ok(self.take_complete_body()))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + mem::take(self) + } } impl MessageBody for BytesMut { @@ -157,16 +235,23 @@ mod foreign_impls { } fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - let bytes = mem::take(self.get_mut()).freeze(); - Poll::Ready(Some(Ok(bytes))) + Poll::Ready(Some(Ok(self.take_complete_body()))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + mem::take(self).freeze() + } } impl MessageBody for Vec { @@ -177,16 +262,23 @@ mod foreign_impls { } fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { - let bytes = mem::take(self.get_mut()); - Poll::Ready(Some(Ok(Bytes::from(bytes)))) + Poll::Ready(Some(Ok(self.take_complete_body()))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + Bytes::from(mem::take(self)) + } } impl MessageBody for &'static str { @@ -208,6 +300,14 @@ mod foreign_impls { Poll::Ready(Some(Ok(bytes))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + Bytes::from_static(mem::take(self).as_bytes()) + } } impl MessageBody for String { @@ -228,6 +328,14 @@ mod foreign_impls { Poll::Ready(Some(Ok(Bytes::from(string)))) } } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + Bytes::from(mem::take(self)) + } } impl MessageBody for bytestring::ByteString { @@ -244,6 +352,14 @@ mod foreign_impls { let string = mem::take(self.get_mut()); Poll::Ready(Some(Ok(string.into_bytes()))) } + + fn is_complete_body(&self) -> bool { + true + } + + fn take_complete_body(&mut self) -> Bytes { + mem::take(self).into_bytes() + } } } diff --git a/actix-http/src/body/none.rs b/actix-http/src/body/none.rs index 0fc7c8c9f..bb494078f 100644 --- a/actix-http/src/body/none.rs +++ b/actix-http/src/body/none.rs @@ -40,4 +40,14 @@ impl MessageBody for None { ) -> Poll>> { Poll::Ready(Option::None) } + + #[inline] + fn is_complete_body(&self) -> bool { + true + } + + #[inline] + fn take_complete_body(&mut self) -> Bytes { + Bytes::new() + } } diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 0886221cc..8dc4a6886 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -53,41 +53,53 @@ impl Encoder { } } - pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, body: B) -> Self { + pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, mut body: B) -> Self { let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING) || head.status == StatusCode::SWITCHING_PROTOCOLS || head.status == StatusCode::NO_CONTENT || encoding == ContentEncoding::Identity || encoding == ContentEncoding::Auto); - match body.size() { - // no need to compress an empty body - BodySize::None => return Self::none(), - - // we cannot assume that Sized is not a stream - BodySize::Sized(_) | BodySize::Stream => {} + // no need to compress an empty body + if matches!(body.size(), BodySize::None) { + return Self::none(); } - // TODO potentially some optimisation for single-chunk responses here by trying to read the - // payload eagerly, stopping after 2 polls if the first is a chunk and the second is None + eprintln!("body type: {}", std::any::type_name::()); + + let body = if body.is_complete_body() { + eprintln!("reducing allocation"); + let body = body.take_complete_body(); + EncoderBody::Full { body } + } else { + eprintln!("using stream type"); + EncoderBody::Stream { body } + }; if can_encode { + eprintln!("I CAN ENCODE WOO"); + // Modify response body only if encoder is set if let Some(enc) = ContentEncoder::encoder(encoding) { + eprintln!("AND i have an encoder - lucky day"); + update_head(encoding, head); - head.no_chunking(false); return Encoder { - body: EncoderBody::Stream { body }, + body, encoder: Some(enc), fut: None, eof: false, }; } + + eprintln!("but i DONT have an encoder :("); } + eprintln!("rip, no compression for you"); + Encoder { - body: EncoderBody::Stream { body }, + body, encoder: None, fut: None, eof: false, @@ -99,6 +111,7 @@ pin_project! { #[project = EncoderBodyProj] enum EncoderBody { None, + Full { body: Bytes }, Stream { #[pin] body: B }, } } @@ -112,6 +125,7 @@ where fn size(&self) -> BodySize { match self { EncoderBody::None => BodySize::None, + EncoderBody::Full { body } => body.size(), EncoderBody::Stream { body } => body.size(), } } @@ -122,12 +136,32 @@ where ) -> Poll>> { match self.project() { EncoderBodyProj::None => Poll::Ready(None), - + EncoderBodyProj::Full { body } => { + Pin::new(body).poll_next(cx).map_err(|err| match err {}) + } EncoderBodyProj::Stream { body } => body .poll_next(cx) .map_err(|err| EncoderError::Body(err.into())), } } + + fn is_complete_body(&self) -> bool { + match self { + EncoderBody::None => true, + EncoderBody::Full { .. } => true, + EncoderBody::Stream { .. } => false, + } + } + + fn take_complete_body(&mut self) -> Bytes { + match self { + EncoderBody::None => Bytes::new(), + EncoderBody::Full { body } => body.take_complete_body(), + EncoderBody::Stream { .. } => { + panic!("EncoderBody::Stream variant cannot be taken") + } + } + } } impl MessageBody for Encoder @@ -137,10 +171,10 @@ where type Error = EncoderError; fn size(&self) -> BodySize { - if self.encoder.is_none() { - self.body.size() - } else { + if self.encoder.is_some() { BodySize::Stream + } else { + self.body.size() } } @@ -211,6 +245,22 @@ where } } } + + fn is_complete_body(&self) -> bool { + if self.encoder.is_some() { + false + } else { + self.body.is_complete_body() + } + } + + fn take_complete_body(&mut self) -> Bytes { + if self.encoder.is_some() { + panic!("compressed body stream cannot be taken") + } else { + self.body.take_complete_body() + } + } } fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) { @@ -218,6 +268,8 @@ fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) { header::CONTENT_ENCODING, HeaderValue::from_static(encoding.as_str()), ); + + head.no_chunking(false); } enum ContentEncoder { diff --git a/examples/basic.rs b/examples/basic.rs index d29546129..d10abb0a1 100644 --- a/examples/basic.rs +++ b/examples/basic.rs @@ -11,9 +11,11 @@ async fn index_async(req: HttpRequest) -> &'static str { "Hello world!\r\n" } +static A: [u8; 4096] = [b'0'; 4096]; + #[get("/")] async fn no_params() -> &'static str { - "Hello world!\r\n" + std::str::from_utf8(A.as_ref()).unwrap() } #[actix_web::main]