diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 422993bcd..7385f71c5 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -11,7 +11,7 @@ use bytes::{Buf, Bytes}; use derive_more::Display; #[cfg(feature = "compress-gzip")] use flate2::write::{GzEncoder, ZlibEncoder}; -use futures_core::{ready, Stream}; +use futures_core::ready; use pin_project_lite::pin_project; use tracing::trace; #[cfg(feature = "compress-zstd")] @@ -171,15 +171,12 @@ where } if let Some(cooperative_encoder) = this.encoder.as_deref_mut() { - match ready!(Pin::new(cooperative_encoder).poll_next(cx)) { - Some(Ok(Some(chunk))) => return Poll::Ready(Some(Ok(chunk))), - Some(Ok(None)) => { - // Need more data from uncompressed body - } - Some(Err(err)) => return Poll::Ready(Some(Err(err))), - None => { - unreachable!() + match ready!(Pin::new(cooperative_encoder).poll_encoded_chunk(cx)) { + Ok(Some(encoded_chunk)) => return Poll::Ready(Some(Ok(encoded_chunk))), + Ok(None) => { + // Need next chunk from uncompressed body } + Err(err) => return Poll::Ready(Some(Err(err))), } } @@ -191,9 +188,7 @@ where Some(Ok(chunk)) => match this.encoder.as_deref_mut() { None => return Poll::Ready(Some(Ok(chunk))), Some(encoder) => { - debug_assert!(encoder.chunk_ready_to_encode.is_none()); - encoder.chunk_ready_to_encode = Some(chunk); - encoder.budget_used = 0; + encoder.push_chunk(chunk); } }, @@ -327,6 +322,47 @@ impl CooperativeContentEncoder { _ => None, } } + + fn push_chunk(&mut self, chunk: Bytes) { + debug_assert!(self.chunk_ready_to_encode.is_none()); + self.chunk_ready_to_encode = Some(chunk); + self.budget_used = 0 + } + + fn poll_encoded_chunk( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, EncoderError>> { + let this = self.get_mut(); + loop { + if this.budget_used > 8 { + this.budget_used = 0; + cx.waker().wake_by_ref(); + return Poll::Pending; + } + if let Some(mut chunk) = this.chunk_ready_to_encode.take() { + let encode_len = chunk.len().min(this.preferred_chunk_size); + this.content_encoder + .write(&chunk[..encode_len]) + .map_err(EncoderError::Io)?; + chunk.advance(encode_len); + + if !chunk.is_empty() { + this.chunk_ready_to_encode = Some(chunk); + } + + let encoded_chunk = this.content_encoder.take(); + if encoded_chunk.is_empty() { + continue; + } + + this.budget_used += 1; + return Poll::Ready(Ok(Some(encoded_chunk))); + } else { + return Poll::Ready(Ok(None)); + } + } + } } impl ContentEncoder { @@ -416,42 +452,6 @@ impl ContentEncoder { } } -impl futures_core::Stream for CooperativeContentEncoder { - type Item = Result, EncoderError>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - loop { - if this.budget_used > 8 { - this.budget_used = 0; - cx.waker().wake_by_ref(); - return Poll::Pending; - } - if let Some(mut chunk) = this.chunk_ready_to_encode.take() { - let encode_len = chunk.len().min(this.preferred_chunk_size); - this.content_encoder - .write(&chunk[..encode_len]) - .map_err(EncoderError::Io)?; - chunk.advance(encode_len); - - if !chunk.is_empty() { - this.chunk_ready_to_encode = Some(chunk); - } - - let encoded_chunk = this.content_encoder.take(); - if encoded_chunk.is_empty() { - continue; - } - - this.budget_used += 1; - return Poll::Ready(Some(Ok(Some(encoded_chunk)))); - } else { - return Poll::Ready(Some(Ok(None))); - } - } - } -} - #[cfg(feature = "compress-brotli")] fn new_brotli_compressor() -> Box> { Box::new(brotli::CompressorWriter::new(