Box encoder to resolve degradation in `response-body-compression`

This commit is contained in:
Yury Yarashevich 2023-10-14 08:40:10 +02:00
parent d90883f8f0
commit ccae712b9b
1 changed files with 36 additions and 40 deletions

View File

@ -28,8 +28,7 @@ pin_project! {
pub struct Encoder<B> { pub struct Encoder<B> {
#[pin] #[pin]
body: EncoderBody<B>, body: EncoderBody<B>,
encoder: Option<SelectedContentEncoder>, encoder: Option<Box<SelectedContentEncoder>>,
chunk_ready_to_encode: Option<Bytes>,
eof: bool, eof: bool,
} }
} }
@ -41,7 +40,6 @@ impl<B: MessageBody> Encoder<B> {
body: body::None::new(), body: body::None::new(),
}, },
encoder: None, encoder: None,
chunk_ready_to_encode: None,
eof: true, eof: true,
} }
} }
@ -70,7 +68,6 @@ impl<B: MessageBody> Encoder<B> {
return Encoder { return Encoder {
body, body,
encoder: Some(selected_encoder), encoder: Some(selected_encoder),
chunk_ready_to_encode: None,
eof: false, eof: false,
}; };
} }
@ -79,7 +76,6 @@ impl<B: MessageBody> Encoder<B> {
Encoder { Encoder {
body, body,
encoder: None, encoder: None,
chunk_ready_to_encode: None,
eof: false, eof: false,
} }
} }
@ -174,31 +170,25 @@ where
return Poll::Ready(None); return Poll::Ready(None);
} }
if let Some(chunk) = this.chunk_ready_to_encode.as_mut() { if let Some(selected_encoder) = this.encoder.as_deref_mut() {
let selected_encoder = this.encoder.as_mut().expect( if let Some(chunk) = selected_encoder.chunk_ready_to_encode.as_mut() {
"when chunk_ready_to_encode is presented the encoder is expected to be presented as well", let encode_len = chunk.len().min(selected_encoder.preferred_chunk_size);
); selected_encoder
let encode_len = chunk.len().min(selected_encoder.preferred_chunk_size); .content_encoder
selected_encoder .write(&chunk[..encode_len])
.content_encoder .map_err(EncoderError::Io)?;
.write(&chunk[..encode_len]) chunk.advance(encode_len);
.map_err(EncoderError::Io)?;
chunk.advance(encode_len);
if chunk.is_empty() { if chunk.is_empty() {
*this.chunk_ready_to_encode = None; selected_encoder.chunk_ready_to_encode = None;
} }
let encoded_chunk = selected_encoder.content_encoder.take(); let encoded_chunk = selected_encoder.content_encoder.take();
if !encoded_chunk.is_empty() { if encoded_chunk.is_empty() {
continue;
}
return Poll::Ready(Some(Ok(encoded_chunk))); return Poll::Ready(Some(Ok(encoded_chunk)));
} }
if this.chunk_ready_to_encode.is_some() {
// Yield execution to give chance other futures to execute
cx.waker().wake_by_ref();
return Poll::Pending;
}
} }
let result = ready!(this.body.as_mut().poll_next(cx)); let result = ready!(this.body.as_mut().poll_next(cx));
@ -206,12 +196,12 @@ where
match result { match result {
Some(Err(err)) => return Poll::Ready(Some(Err(err))), Some(Err(err)) => return Poll::Ready(Some(Err(err))),
Some(Ok(chunk)) => { Some(Ok(chunk)) => match this.encoder.as_deref_mut() {
if this.encoder.is_none() { None => return Poll::Ready(Some(Ok(chunk))),
return Poll::Ready(Some(Ok(chunk))); Some(encoder) => {
encoder.chunk_ready_to_encode = Some(chunk);
} }
*this.chunk_ready_to_encode = Some(chunk); },
}
None => { None => {
if let Some(selected_encoder) = this.encoder.take() { if let Some(selected_encoder) = this.encoder.take() {
@ -281,10 +271,11 @@ enum ContentEncoder {
struct SelectedContentEncoder { struct SelectedContentEncoder {
content_encoder: ContentEncoder, content_encoder: ContentEncoder,
preferred_chunk_size: usize, preferred_chunk_size: usize,
chunk_ready_to_encode: Option<Bytes>,
} }
impl ContentEncoder { impl ContentEncoder {
fn select(encoding: ContentEncoding) -> Option<SelectedContentEncoder> { fn select(encoding: ContentEncoding) -> Option<Box<SelectedContentEncoder>> {
// Chunk size picked as max chunk size which took less that 50 µs to compress on "cargo bench --bench compression-chunk-size" // Chunk size picked as max chunk size which took less that 50 µs to compress on "cargo bench --bench compression-chunk-size"
// Rust 1.72 linux/arm64 in Docker on Apple M2 Pro: "time to compress chunk/deflate-16384" time: [39.114 µs 39.283 µs 39.457 µs] // Rust 1.72 linux/arm64 in Docker on Apple M2 Pro: "time to compress chunk/deflate-16384" time: [39.114 µs 39.283 µs 39.457 µs]
@ -298,36 +289,40 @@ impl ContentEncoder {
match encoding { match encoding {
#[cfg(feature = "compress-gzip")] #[cfg(feature = "compress-gzip")]
ContentEncoding::Deflate => Some(SelectedContentEncoder { ContentEncoding::Deflate => Some(Box::new(SelectedContentEncoder {
content_encoder: ContentEncoder::Deflate(ZlibEncoder::new( content_encoder: ContentEncoder::Deflate(ZlibEncoder::new(
Writer::new(), Writer::new(),
flate2::Compression::fast(), flate2::Compression::fast(),
)), )),
preferred_chunk_size: MAX_DEFLATE_CHUNK_SIZE, preferred_chunk_size: MAX_DEFLATE_CHUNK_SIZE,
}), chunk_ready_to_encode: None,
})),
#[cfg(feature = "compress-gzip")] #[cfg(feature = "compress-gzip")]
ContentEncoding::Gzip => Some(SelectedContentEncoder { ContentEncoding::Gzip => Some(Box::new(SelectedContentEncoder {
content_encoder: ContentEncoder::Gzip(GzEncoder::new( content_encoder: ContentEncoder::Gzip(GzEncoder::new(
Writer::new(), Writer::new(),
flate2::Compression::fast(), flate2::Compression::fast(),
)), )),
preferred_chunk_size: MAX_GZIP_CHUNK_SIZE, preferred_chunk_size: MAX_GZIP_CHUNK_SIZE,
}), chunk_ready_to_encode: None,
})),
#[cfg(feature = "compress-brotli")] #[cfg(feature = "compress-brotli")]
ContentEncoding::Brotli => Some(SelectedContentEncoder { ContentEncoding::Brotli => Some(Box::new(SelectedContentEncoder {
content_encoder: ContentEncoder::Brotli(new_brotli_compressor()), content_encoder: ContentEncoder::Brotli(new_brotli_compressor()),
preferred_chunk_size: MAX_BROTLI_CHUNK_SIZE, preferred_chunk_size: MAX_BROTLI_CHUNK_SIZE,
}), chunk_ready_to_encode: None,
})),
#[cfg(feature = "compress-zstd")] #[cfg(feature = "compress-zstd")]
ContentEncoding::Zstd => { ContentEncoding::Zstd => {
let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?; let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?;
Some(SelectedContentEncoder { Some(Box::new(SelectedContentEncoder {
content_encoder: ContentEncoder::Zstd(encoder), content_encoder: ContentEncoder::Zstd(encoder),
preferred_chunk_size: MAX_ZSTD_CHUNK_SIZE, preferred_chunk_size: MAX_ZSTD_CHUNK_SIZE,
}) chunk_ready_to_encode: None,
}))
} }
_ => None, _ => None,
@ -485,6 +480,7 @@ mod tests {
let SelectedContentEncoder { let SelectedContentEncoder {
content_encoder: mut compressor, content_encoder: mut compressor,
preferred_chunk_size: _, preferred_chunk_size: _,
chunk_ready_to_encode: _,
} = ContentEncoder::select(encoding).unwrap(); } = ContentEncoder::select(encoding).unwrap();
compressor.write(&body_to_compress).unwrap(); compressor.write(&body_to_compress).unwrap();
let reference_compressed_bytes = compressor.finish().unwrap(); let reference_compressed_bytes = compressor.finish().unwrap();