provide efficient bytes moving with more methods on MessageBody

This commit is contained in:
Rob Ede 2021-12-08 21:51:39 +00:00
parent fb5b4734a4
commit b6f366b71f
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
6 changed files with 256 additions and 30 deletions

View File

@ -51,6 +51,38 @@ impl MessageBody for BoxBody {
.poll_next(cx) .poll_next(cx)
.map_err(|err| Error::new_body().with_cause(err)) .map_err(|err| Error::new_body().with_cause(err))
} }
fn is_complete_body(&self) -> bool {
let a = self.0.is_complete_body();
eprintln!("BoxBody is complete?: {}", a);
a
}
fn take_complete_body(&mut self) -> Bytes {
eprintln!("taking box body contents");
debug_assert!(
self.is_complete_body(),
"boxed type does not allow taking complete body; caller should make sure to \
call `is_complete_body` first",
);
// we do not have DerefMut access to call take_complete_body directly but since
// is_complete_body is true we should expect the entire bytes chunk in one poll_next
let waker = futures_util::task::noop_waker();
let mut cx = Context::from_waker(&waker);
match self.as_pin_mut().poll_next(&mut cx) {
Poll::Ready(Some(Ok(data))) => data,
_ => {
panic!(
"boxed type indicated it allows taking complete body but failed to \
return Bytes when polled",
);
}
}
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -67,6 +67,20 @@ where
.map_err(|err| Error::new_body().with_cause(err)), .map_err(|err| Error::new_body().with_cause(err)),
} }
} }
fn is_complete_body(&self) -> bool {
match self {
EitherBody::Left { body } => body.is_complete_body(),
EitherBody::Right { body } => body.is_complete_body(),
}
}
fn take_complete_body(&mut self) -> Bytes {
match self {
EitherBody::Left { body } => body.take_complete_body(),
EitherBody::Right { body } => body.take_complete_body(),
}
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -29,6 +29,17 @@ pub trait MessageBody {
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>>; ) -> Poll<Option<Result<Bytes, Self::Error>>>;
fn is_complete_body(&self) -> bool {
false
}
fn take_complete_body(&mut self) -> Bytes {
unimplemented!(
"type ({}) allows taking complete body but did not provide an implementation",
std::any::type_name::<Self>()
);
}
} }
mod foreign_impls { mod foreign_impls {
@ -49,6 +60,14 @@ mod foreign_impls {
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
match *self {} match *self {}
} }
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
match *self {}
}
} }
impl MessageBody for () { impl MessageBody for () {
@ -66,6 +85,14 @@ mod foreign_impls {
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
Poll::Ready(None) Poll::Ready(None)
} }
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
Bytes::new()
}
} }
impl<B> MessageBody for Box<B> impl<B> MessageBody for Box<B>
@ -86,6 +113,14 @@ mod foreign_impls {
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
Pin::new(self.get_mut().as_mut()).poll_next(cx) Pin::new(self.get_mut().as_mut()).poll_next(cx)
} }
fn is_complete_body(&self) -> bool {
self.as_ref().is_complete_body()
}
fn take_complete_body(&mut self) -> Bytes {
self.as_mut().take_complete_body()
}
} }
impl<B> MessageBody for Pin<Box<B>> impl<B> MessageBody for Pin<Box<B>>
@ -106,6 +141,36 @@ mod foreign_impls {
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
self.as_mut().poll_next(cx) self.as_mut().poll_next(cx)
} }
fn is_complete_body(&self) -> bool {
self.as_ref().is_complete_body()
}
fn take_complete_body(&mut self) -> Bytes {
debug_assert!(
self.is_complete_body(),
"inner type \"{}\" does not allow taking complete body; caller should make sure to \
call `is_complete_body` first",
std::any::type_name::<B>(),
);
// we do not have DerefMut access to call take_complete_body directly but since
// is_complete_body is true we should expect the entire bytes chunk in one poll_next
let waker = futures_util::task::noop_waker();
let mut cx = Context::from_waker(&waker);
match self.as_mut().poll_next(&mut cx) {
Poll::Ready(Some(Ok(data))) => data,
_ => {
panic!(
"inner type \"{}\" indicated it allows taking complete body but failed to \
return Bytes when polled",
std::any::type_name::<B>()
);
}
}
}
} }
impl MessageBody for &'static [u8] { impl MessageBody for &'static [u8] {
@ -116,17 +181,23 @@ mod foreign_impls {
} }
fn poll_next( fn poll_next(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
let bytes = mem::take(self.get_mut()); Poll::Ready(Some(Ok(self.take_complete_body())))
let bytes = Bytes::from_static(bytes);
Poll::Ready(Some(Ok(bytes)))
} }
} }
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
Bytes::from_static(mem::take(self))
}
} }
impl MessageBody for Bytes { impl MessageBody for Bytes {
@ -137,16 +208,23 @@ mod foreign_impls {
} }
fn poll_next( fn poll_next(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
let bytes = mem::take(self.get_mut()); Poll::Ready(Some(Ok(self.take_complete_body())))
Poll::Ready(Some(Ok(bytes)))
} }
} }
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
mem::take(self)
}
} }
impl MessageBody for BytesMut { impl MessageBody for BytesMut {
@ -157,16 +235,23 @@ mod foreign_impls {
} }
fn poll_next( fn poll_next(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
let bytes = mem::take(self.get_mut()).freeze(); Poll::Ready(Some(Ok(self.take_complete_body())))
Poll::Ready(Some(Ok(bytes)))
} }
} }
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
mem::take(self).freeze()
}
} }
impl MessageBody for Vec<u8> { impl MessageBody for Vec<u8> {
@ -177,16 +262,23 @@ mod foreign_impls {
} }
fn poll_next( fn poll_next(
self: Pin<&mut Self>, mut self: Pin<&mut Self>,
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
let bytes = mem::take(self.get_mut()); Poll::Ready(Some(Ok(self.take_complete_body())))
Poll::Ready(Some(Ok(Bytes::from(bytes))))
} }
} }
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
Bytes::from(mem::take(self))
}
} }
impl MessageBody for &'static str { impl MessageBody for &'static str {
@ -208,6 +300,14 @@ mod foreign_impls {
Poll::Ready(Some(Ok(bytes))) Poll::Ready(Some(Ok(bytes)))
} }
} }
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
Bytes::from_static(mem::take(self).as_bytes())
}
} }
impl MessageBody for String { impl MessageBody for String {
@ -228,6 +328,14 @@ mod foreign_impls {
Poll::Ready(Some(Ok(Bytes::from(string)))) Poll::Ready(Some(Ok(Bytes::from(string))))
} }
} }
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
Bytes::from(mem::take(self))
}
} }
impl MessageBody for bytestring::ByteString { impl MessageBody for bytestring::ByteString {
@ -244,6 +352,14 @@ mod foreign_impls {
let string = mem::take(self.get_mut()); let string = mem::take(self.get_mut());
Poll::Ready(Some(Ok(string.into_bytes()))) Poll::Ready(Some(Ok(string.into_bytes())))
} }
fn is_complete_body(&self) -> bool {
true
}
fn take_complete_body(&mut self) -> Bytes {
mem::take(self).into_bytes()
}
} }
} }

View File

@ -40,4 +40,14 @@ impl MessageBody for None {
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
Poll::Ready(Option::None) Poll::Ready(Option::None)
} }
#[inline]
fn is_complete_body(&self) -> bool {
true
}
#[inline]
fn take_complete_body(&mut self) -> Bytes {
Bytes::new()
}
} }

View File

@ -53,41 +53,53 @@ impl<B: MessageBody> Encoder<B> {
} }
} }
pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, body: B) -> Self { pub fn response(encoding: ContentEncoding, head: &mut ResponseHead, mut body: B) -> Self {
let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING) let can_encode = !(head.headers().contains_key(&CONTENT_ENCODING)
|| head.status == StatusCode::SWITCHING_PROTOCOLS || head.status == StatusCode::SWITCHING_PROTOCOLS
|| head.status == StatusCode::NO_CONTENT || head.status == StatusCode::NO_CONTENT
|| encoding == ContentEncoding::Identity || encoding == ContentEncoding::Identity
|| encoding == ContentEncoding::Auto); || encoding == ContentEncoding::Auto);
match body.size() { // no need to compress an empty body
// no need to compress an empty body if matches!(body.size(), BodySize::None) {
BodySize::None => return Self::none(), return Self::none();
// we cannot assume that Sized is not a stream
BodySize::Sized(_) | BodySize::Stream => {}
} }
// TODO potentially some optimisation for single-chunk responses here by trying to read the eprintln!("body type: {}", std::any::type_name::<B>());
// payload eagerly, stopping after 2 polls if the first is a chunk and the second is None
let body = if body.is_complete_body() {
eprintln!("reducing allocation");
let body = body.take_complete_body();
EncoderBody::Full { body }
} else {
eprintln!("using stream type");
EncoderBody::Stream { body }
};
if can_encode { if can_encode {
eprintln!("I CAN ENCODE WOO");
// Modify response body only if encoder is set // Modify response body only if encoder is set
if let Some(enc) = ContentEncoder::encoder(encoding) { if let Some(enc) = ContentEncoder::encoder(encoding) {
eprintln!("AND i have an encoder - lucky day");
update_head(encoding, head); update_head(encoding, head);
head.no_chunking(false);
return Encoder { return Encoder {
body: EncoderBody::Stream { body }, body,
encoder: Some(enc), encoder: Some(enc),
fut: None, fut: None,
eof: false, eof: false,
}; };
} }
eprintln!("but i DONT have an encoder :(");
} }
eprintln!("rip, no compression for you");
Encoder { Encoder {
body: EncoderBody::Stream { body }, body,
encoder: None, encoder: None,
fut: None, fut: None,
eof: false, eof: false,
@ -99,6 +111,7 @@ pin_project! {
#[project = EncoderBodyProj] #[project = EncoderBodyProj]
enum EncoderBody<B> { enum EncoderBody<B> {
None, None,
Full { body: Bytes },
Stream { #[pin] body: B }, Stream { #[pin] body: B },
} }
} }
@ -112,6 +125,7 @@ where
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
match self { match self {
EncoderBody::None => BodySize::None, EncoderBody::None => BodySize::None,
EncoderBody::Full { body } => body.size(),
EncoderBody::Stream { body } => body.size(), EncoderBody::Stream { body } => body.size(),
} }
} }
@ -122,12 +136,32 @@ where
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
match self.project() { match self.project() {
EncoderBodyProj::None => Poll::Ready(None), EncoderBodyProj::None => Poll::Ready(None),
EncoderBodyProj::Full { body } => {
Pin::new(body).poll_next(cx).map_err(|err| match err {})
}
EncoderBodyProj::Stream { body } => body EncoderBodyProj::Stream { body } => body
.poll_next(cx) .poll_next(cx)
.map_err(|err| EncoderError::Body(err.into())), .map_err(|err| EncoderError::Body(err.into())),
} }
} }
fn is_complete_body(&self) -> bool {
match self {
EncoderBody::None => true,
EncoderBody::Full { .. } => true,
EncoderBody::Stream { .. } => false,
}
}
fn take_complete_body(&mut self) -> Bytes {
match self {
EncoderBody::None => Bytes::new(),
EncoderBody::Full { body } => body.take_complete_body(),
EncoderBody::Stream { .. } => {
panic!("EncoderBody::Stream variant cannot be taken")
}
}
}
} }
impl<B> MessageBody for Encoder<B> impl<B> MessageBody for Encoder<B>
@ -137,10 +171,10 @@ where
type Error = EncoderError; type Error = EncoderError;
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
if self.encoder.is_none() { if self.encoder.is_some() {
self.body.size()
} else {
BodySize::Stream BodySize::Stream
} else {
self.body.size()
} }
} }
@ -211,6 +245,22 @@ where
} }
} }
} }
fn is_complete_body(&self) -> bool {
if self.encoder.is_some() {
false
} else {
self.body.is_complete_body()
}
}
fn take_complete_body(&mut self) -> Bytes {
if self.encoder.is_some() {
panic!("compressed body stream cannot be taken")
} else {
self.body.take_complete_body()
}
}
} }
fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) { fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) {
@ -218,6 +268,8 @@ fn update_head(encoding: ContentEncoding, head: &mut ResponseHead) {
header::CONTENT_ENCODING, header::CONTENT_ENCODING,
HeaderValue::from_static(encoding.as_str()), HeaderValue::from_static(encoding.as_str()),
); );
head.no_chunking(false);
} }
enum ContentEncoder { enum ContentEncoder {

View File

@ -11,9 +11,11 @@ async fn index_async(req: HttpRequest) -> &'static str {
"Hello world!\r\n" "Hello world!\r\n"
} }
static A: [u8; 4096] = [b'0'; 4096];
#[get("/")] #[get("/")]
async fn no_params() -> &'static str { async fn no_params() -> &'static str {
"Hello world!\r\n" std::str::from_utf8(A.as_ref()).unwrap()
} }
#[actix_web::main] #[actix_web::main]