From d90883f8f071c977a9dd3c44deee17394348313a Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Sun, 24 Sep 2023 00:12:54 +0200 Subject: [PATCH 1/7] Prefer cooperative compression over blocking pool. --- actix-http/CHANGES.md | 2 + actix-http/Cargo.toml | 5 + actix-http/benches/compression-chunk-size.rs | 51 +++++ actix-http/src/encoding/encoder.rs | 212 ++++++++++++++----- 4 files changed, 216 insertions(+), 54 deletions(-) create mode 100644 actix-http/benches/compression-chunk-size.rs diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 860ee3b6c..d36213242 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -2,6 +2,8 @@ ## Unreleased +- Revise compression middleware to perform compression cooperatively, periodically yielding control to other tasks instead of offloading compression to a background thread. + ## 3.4.0 ### Added diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index c19ce0161..5b33af7e1 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -140,3 +140,8 @@ required-features = ["http2", "rustls-0_21"] name = "response-body-compression" harness = false required-features = ["compress-brotli", "compress-gzip", "compress-zstd"] + +[[bench]] +name = "compression-chunk-size" +harness = false +required-features = ["compress-brotli", "compress-gzip", "compress-zstd"] diff --git a/actix-http/benches/compression-chunk-size.rs b/actix-http/benches/compression-chunk-size.rs new file mode 100644 index 000000000..68294f368 --- /dev/null +++ b/actix-http/benches/compression-chunk-size.rs @@ -0,0 +1,51 @@ +#![allow(clippy::uninlined_format_args)] + +use actix_http::{body, encoding::Encoder, ContentEncoding, ResponseHead, StatusCode}; +use criterion::{criterion_group, criterion_main, Criterion}; + +const BODY: &[u8] = include_bytes!("../../Cargo.lock"); + +const CHUNK_SIZES: [usize; 7] = [512, 1024, 2048, 4096, 8192, 16384, 32768]; + +const CONTENT_ENCODING: [ContentEncoding; 4] = [ + ContentEncoding::Deflate, + ContentEncoding::Gzip, + ContentEncoding::Zstd, + ContentEncoding::Brotli, +]; + +fn compression_responses(c: &mut Criterion) { + static_assertions::const_assert!(BODY.len() > CHUNK_SIZES[6]); + + let mut group = c.benchmark_group("time to compress chunk"); + + for content_encoding in CONTENT_ENCODING { + for chunk_size in CHUNK_SIZES { + group.bench_function( + format!("{}-{}", content_encoding.as_str(), chunk_size), + |b| { + let rt = actix_rt::Runtime::new().unwrap(); + b.iter(|| { + rt.block_on(async move { + let encoder = Encoder::response( + content_encoding, + &mut ResponseHead::new(StatusCode::OK), + &BODY[..chunk_size], + ) + .with_encode_chunk_size(chunk_size); + body::to_bytes_limited(encoder, chunk_size + 256) + .await + .unwrap() + .unwrap(); + }); + }); + }, + ); + } + } + + group.finish(); +} + +criterion_group!(benches, compression_responses); +criterion_main!(benches); diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 527bfebaa..75079c8b7 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -2,14 +2,12 @@ use std::{ error::Error as StdError, - future::Future, io::{self, Write as _}, pin::Pin, task::{Context, Poll}, }; -use actix_rt::task::{spawn_blocking, JoinHandle}; -use bytes::Bytes; +use bytes::{Buf, Bytes}; use derive_more::Display; #[cfg(feature = "compress-gzip")] use flate2::write::{GzEncoder, ZlibEncoder}; @@ -26,14 +24,12 @@ use crate::{ ResponseHead, StatusCode, }; -const MAX_CHUNK_SIZE_ENCODE_IN_PLACE: usize = 1024; - pin_project! { pub struct Encoder { #[pin] body: EncoderBody, - encoder: Option, - fut: Option>>, + encoder: Option, + chunk_ready_to_encode: Option, eof: bool, } } @@ -45,7 +41,7 @@ impl Encoder { body: body::None::new(), }, encoder: None, - fut: None, + chunk_ready_to_encode: None, eof: true, } } @@ -68,13 +64,13 @@ impl Encoder { if should_encode { // wrap body only if encoder is feature-enabled - if let Some(enc) = ContentEncoder::select(encoding) { + if let Some(selected_encoder) = ContentEncoder::select(encoding) { update_head(encoding, head); return Encoder { body, - encoder: Some(enc), - fut: None, + encoder: Some(selected_encoder), + chunk_ready_to_encode: None, eof: false, }; } @@ -83,10 +79,19 @@ impl Encoder { Encoder { body, encoder: None, - fut: None, + chunk_ready_to_encode: None, eof: false, } } + + pub fn with_encode_chunk_size(mut self, size: usize) -> Self { + if size > 0 { + if let Some(selected_encoder) = self.encoder.as_mut() { + selected_encoder.preferred_chunk_size = size; + } + } + self + } } pin_project! { @@ -169,22 +174,30 @@ where return Poll::Ready(None); } - if let Some(ref mut fut) = this.fut { - let mut encoder = ready!(Pin::new(fut).poll(cx)) - .map_err(|_| { - EncoderError::Io(io::Error::new( - io::ErrorKind::Other, - "Blocking task was cancelled unexpectedly", - )) - })? + if let Some(chunk) = this.chunk_ready_to_encode.as_mut() { + let selected_encoder = this.encoder.as_mut().expect( + "when chunk_ready_to_encode is presented the encoder is expected to be presented as well", + ); + let encode_len = chunk.len().min(selected_encoder.preferred_chunk_size); + selected_encoder + .content_encoder + .write(&chunk[..encode_len]) .map_err(EncoderError::Io)?; + chunk.advance(encode_len); - let chunk = encoder.take(); - *this.encoder = Some(encoder); - this.fut.take(); + if chunk.is_empty() { + *this.chunk_ready_to_encode = None; + } - if !chunk.is_empty() { - return Poll::Ready(Some(Ok(chunk))); + let encoded_chunk = selected_encoder.content_encoder.take(); + if !encoded_chunk.is_empty() { + return Poll::Ready(Some(Ok(encoded_chunk))); + } + + if this.chunk_ready_to_encode.is_some() { + // Yield execution to give chance other futures to execute + cx.waker().wake_by_ref(); + return Poll::Pending; } } @@ -194,29 +207,18 @@ where Some(Err(err)) => return Poll::Ready(Some(Err(err))), Some(Ok(chunk)) => { - if let Some(mut encoder) = this.encoder.take() { - if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE { - encoder.write(&chunk).map_err(EncoderError::Io)?; - let chunk = encoder.take(); - *this.encoder = Some(encoder); - - if !chunk.is_empty() { - return Poll::Ready(Some(Ok(chunk))); - } - } else { - *this.fut = Some(spawn_blocking(move || { - encoder.write(&chunk)?; - Ok(encoder) - })); - } - } else { + if this.encoder.is_none() { return Poll::Ready(Some(Ok(chunk))); } + *this.chunk_ready_to_encode = Some(chunk); } None => { - if let Some(encoder) = this.encoder.take() { - let chunk = encoder.finish().map_err(EncoderError::Io)?; + if let Some(selected_encoder) = this.encoder.take() { + let chunk = selected_encoder + .content_encoder + .finish() + .map_err(EncoderError::Io)?; if chunk.is_empty() { return Poll::Ready(None); @@ -276,28 +278,56 @@ enum ContentEncoder { Zstd(ZstdEncoder<'static, Writer>), } +struct SelectedContentEncoder { + content_encoder: ContentEncoder, + preferred_chunk_size: usize, +} + impl ContentEncoder { - fn select(encoding: ContentEncoding) -> Option { + fn select(encoding: ContentEncoding) -> Option { + // Chunk size picked as max chunk size which took less that 50 µs to compress on "cargo bench --bench compression-chunk-size" + + // Rust 1.72 linux/arm64 in Docker on Apple M2 Pro: "time to compress chunk/deflate-16384" time: [39.114 µs 39.283 µs 39.457 µs] + const MAX_DEFLATE_CHUNK_SIZE: usize = 16384; + // Rust 1.72 linux/arm64 in Docker on Apple M2 Pro: "time to compress chunk/gzip-16384" time: [40.121 µs 40.340 µs 40.566 µs] + const MAX_GZIP_CHUNK_SIZE: usize = 16384; + // Rust 1.72 linux/arm64 in Docker on Apple M2 Pro: "time to compress chunk/br-8192" time: [46.076 µs 46.208 µs 46.343 µs] + const MAX_BROTLI_CHUNK_SIZE: usize = 8192; + // Rust 1.72 linux/arm64 in Docker on Apple M2 Pro: "time to compress chunk/zstd-16384" time: [32.872 µs 32.967 µs 33.068 µs] + const MAX_ZSTD_CHUNK_SIZE: usize = 16384; + match encoding { #[cfg(feature = "compress-gzip")] - ContentEncoding::Deflate => Some(ContentEncoder::Deflate(ZlibEncoder::new( - Writer::new(), - flate2::Compression::fast(), - ))), + ContentEncoding::Deflate => Some(SelectedContentEncoder { + content_encoder: ContentEncoder::Deflate(ZlibEncoder::new( + Writer::new(), + flate2::Compression::fast(), + )), + preferred_chunk_size: MAX_DEFLATE_CHUNK_SIZE, + }), #[cfg(feature = "compress-gzip")] - ContentEncoding::Gzip => Some(ContentEncoder::Gzip(GzEncoder::new( - Writer::new(), - flate2::Compression::fast(), - ))), + ContentEncoding::Gzip => Some(SelectedContentEncoder { + content_encoder: ContentEncoder::Gzip(GzEncoder::new( + Writer::new(), + flate2::Compression::fast(), + )), + preferred_chunk_size: MAX_GZIP_CHUNK_SIZE, + }), #[cfg(feature = "compress-brotli")] - ContentEncoding::Brotli => Some(ContentEncoder::Brotli(new_brotli_compressor())), + ContentEncoding::Brotli => Some(SelectedContentEncoder { + content_encoder: ContentEncoder::Brotli(new_brotli_compressor()), + preferred_chunk_size: MAX_BROTLI_CHUNK_SIZE, + }), #[cfg(feature = "compress-zstd")] ContentEncoding::Zstd => { let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?; - Some(ContentEncoder::Zstd(encoder)) + Some(SelectedContentEncoder { + content_encoder: ContentEncoder::Zstd(encoder), + preferred_chunk_size: MAX_ZSTD_CHUNK_SIZE, + }) } _ => None, @@ -426,3 +456,77 @@ impl From for crate::Error { crate::Error::new_encoder().with_cause(err) } } + +#[cfg(test)] +mod tests { + use bytes::BytesMut; + use rand::{seq::SliceRandom, Rng}; + + use super::*; + + static EMPTY_BODY: &[u8] = &[]; + + static SHORT_BODY: &[u8] = &[1, 2, 3, 4, 6, 7, 8]; + + static LONG_BODY: &[u8] = include_bytes!("encoder.rs"); + + static BODIES: &[&[u8]] = &[EMPTY_BODY, SHORT_BODY, LONG_BODY]; + + async fn test_compression_of_conentent_enconding(encoding: ContentEncoding, body: &[u8]) { + let mut head = ResponseHead::new(StatusCode::OK); + let body_to_compress = { + let mut body = BytesMut::from(body); + body.shuffle(&mut rand::thread_rng()); + body.freeze() + }; + let compressed_body = Encoder::response(encoding, &mut head, body_to_compress.clone()) + .with_encode_chunk_size(rand::thread_rng().gen_range(32..128)); + + let SelectedContentEncoder { + content_encoder: mut compressor, + preferred_chunk_size: _, + } = ContentEncoder::select(encoding).unwrap(); + compressor.write(&body_to_compress).unwrap(); + let reference_compressed_bytes = compressor.finish().unwrap(); + + let compressed_bytes = + body::to_bytes_limited(compressed_body, 256 + body_to_compress.len()) + .await + .unwrap() + .unwrap(); + + assert_eq!(reference_compressed_bytes, compressed_bytes); + } + + #[actix_rt::test] + #[cfg(feature = "compress-gzip")] + async fn test_gzip_compression_in_chunks_is_the_same_as_whole_chunk_compression() { + for body in BODIES { + test_compression_of_conentent_enconding(ContentEncoding::Gzip, body).await; + } + } + + #[actix_rt::test] + #[cfg(feature = "compress-gzip")] + async fn test_deflate_compression_in_chunks_is_the_same_as_whole_chunk_compression() { + for body in BODIES { + test_compression_of_conentent_enconding(ContentEncoding::Deflate, body).await; + } + } + + #[actix_rt::test] + #[cfg(feature = "compress-brotli")] + async fn test_brotli_compression_in_chunks_is_the_same_as_whole_chunk_compression() { + for body in BODIES { + test_compression_of_conentent_enconding(ContentEncoding::Brotli, body).await; + } + } + + #[actix_rt::test] + #[cfg(feature = "compress-zstd")] + async fn test_zstd_compression_in_chunks_is_the_same_as_whole_chunk_compression() { + for body in BODIES { + test_compression_of_conentent_enconding(ContentEncoding::Zstd, body).await; + } + } +} From ccae712b9b9e81f7e50a4e755cc8cec2a48fcb7e Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Sat, 14 Oct 2023 08:40:10 +0200 Subject: [PATCH 2/7] Box encoder to resolve degradation in `response-body-compression` --- actix-http/src/encoding/encoder.rs | 76 ++++++++++++++---------------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 75079c8b7..28f5d2fd6 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -28,8 +28,7 @@ pin_project! { pub struct Encoder { #[pin] body: EncoderBody, - encoder: Option, - chunk_ready_to_encode: Option, + encoder: Option>, eof: bool, } } @@ -41,7 +40,6 @@ impl Encoder { body: body::None::new(), }, encoder: None, - chunk_ready_to_encode: None, eof: true, } } @@ -70,7 +68,6 @@ impl Encoder { return Encoder { body, encoder: Some(selected_encoder), - chunk_ready_to_encode: None, eof: false, }; } @@ -79,7 +76,6 @@ impl Encoder { Encoder { body, encoder: None, - chunk_ready_to_encode: None, eof: false, } } @@ -174,31 +170,25 @@ where return Poll::Ready(None); } - if let Some(chunk) = this.chunk_ready_to_encode.as_mut() { - let selected_encoder = this.encoder.as_mut().expect( - "when chunk_ready_to_encode is presented the encoder is expected to be presented as well", - ); - let encode_len = chunk.len().min(selected_encoder.preferred_chunk_size); - selected_encoder - .content_encoder - .write(&chunk[..encode_len]) - .map_err(EncoderError::Io)?; - chunk.advance(encode_len); + if let Some(selected_encoder) = this.encoder.as_deref_mut() { + if let Some(chunk) = selected_encoder.chunk_ready_to_encode.as_mut() { + let encode_len = chunk.len().min(selected_encoder.preferred_chunk_size); + selected_encoder + .content_encoder + .write(&chunk[..encode_len]) + .map_err(EncoderError::Io)?; + chunk.advance(encode_len); - if chunk.is_empty() { - *this.chunk_ready_to_encode = None; - } + if chunk.is_empty() { + selected_encoder.chunk_ready_to_encode = None; + } - let encoded_chunk = selected_encoder.content_encoder.take(); - if !encoded_chunk.is_empty() { + let encoded_chunk = selected_encoder.content_encoder.take(); + if encoded_chunk.is_empty() { + continue; + } return Poll::Ready(Some(Ok(encoded_chunk))); } - - if this.chunk_ready_to_encode.is_some() { - // Yield execution to give chance other futures to execute - cx.waker().wake_by_ref(); - return Poll::Pending; - } } let result = ready!(this.body.as_mut().poll_next(cx)); @@ -206,12 +196,12 @@ where match result { Some(Err(err)) => return Poll::Ready(Some(Err(err))), - Some(Ok(chunk)) => { - if this.encoder.is_none() { - return Poll::Ready(Some(Ok(chunk))); + Some(Ok(chunk)) => match this.encoder.as_deref_mut() { + None => return Poll::Ready(Some(Ok(chunk))), + Some(encoder) => { + encoder.chunk_ready_to_encode = Some(chunk); } - *this.chunk_ready_to_encode = Some(chunk); - } + }, None => { if let Some(selected_encoder) = this.encoder.take() { @@ -281,10 +271,11 @@ enum ContentEncoder { struct SelectedContentEncoder { content_encoder: ContentEncoder, preferred_chunk_size: usize, + chunk_ready_to_encode: Option, } impl ContentEncoder { - fn select(encoding: ContentEncoding) -> Option { + fn select(encoding: ContentEncoding) -> Option> { // Chunk size picked as max chunk size which took less that 50 µs to compress on "cargo bench --bench compression-chunk-size" // Rust 1.72 linux/arm64 in Docker on Apple M2 Pro: "time to compress chunk/deflate-16384" time: [39.114 µs 39.283 µs 39.457 µs] @@ -298,36 +289,40 @@ impl ContentEncoder { match encoding { #[cfg(feature = "compress-gzip")] - ContentEncoding::Deflate => Some(SelectedContentEncoder { + ContentEncoding::Deflate => Some(Box::new(SelectedContentEncoder { content_encoder: ContentEncoder::Deflate(ZlibEncoder::new( Writer::new(), flate2::Compression::fast(), )), preferred_chunk_size: MAX_DEFLATE_CHUNK_SIZE, - }), + chunk_ready_to_encode: None, + })), #[cfg(feature = "compress-gzip")] - ContentEncoding::Gzip => Some(SelectedContentEncoder { + ContentEncoding::Gzip => Some(Box::new(SelectedContentEncoder { content_encoder: ContentEncoder::Gzip(GzEncoder::new( Writer::new(), flate2::Compression::fast(), )), preferred_chunk_size: MAX_GZIP_CHUNK_SIZE, - }), + chunk_ready_to_encode: None, + })), #[cfg(feature = "compress-brotli")] - ContentEncoding::Brotli => Some(SelectedContentEncoder { + ContentEncoding::Brotli => Some(Box::new(SelectedContentEncoder { content_encoder: ContentEncoder::Brotli(new_brotli_compressor()), preferred_chunk_size: MAX_BROTLI_CHUNK_SIZE, - }), + chunk_ready_to_encode: None, + })), #[cfg(feature = "compress-zstd")] ContentEncoding::Zstd => { let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?; - Some(SelectedContentEncoder { + Some(Box::new(SelectedContentEncoder { content_encoder: ContentEncoder::Zstd(encoder), preferred_chunk_size: MAX_ZSTD_CHUNK_SIZE, - }) + chunk_ready_to_encode: None, + })) } _ => None, @@ -485,6 +480,7 @@ mod tests { let SelectedContentEncoder { content_encoder: mut compressor, preferred_chunk_size: _, + chunk_ready_to_encode: _, } = ContentEncoder::select(encoding).unwrap(); compressor.write(&body_to_compress).unwrap(); let reference_compressed_bytes = compressor.finish().unwrap(); From e3adb45f557db3e96816c2d2d78c9b82c5516964 Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Sat, 14 Oct 2023 17:32:08 +0200 Subject: [PATCH 3/7] Fix tests. --- actix-http/src/encoding/encoder.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 28f5d2fd6..1cb9ece91 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -467,7 +467,7 @@ mod tests { static BODIES: &[&[u8]] = &[EMPTY_BODY, SHORT_BODY, LONG_BODY]; - async fn test_compression_of_conentent_enconding(encoding: ContentEncoding, body: &[u8]) { + async fn test_compression_of_content_encoding(encoding: ContentEncoding, body: &[u8]) { let mut head = ResponseHead::new(StatusCode::OK); let body_to_compress = { let mut body = BytesMut::from(body); @@ -477,11 +477,8 @@ mod tests { let compressed_body = Encoder::response(encoding, &mut head, body_to_compress.clone()) .with_encode_chunk_size(rand::thread_rng().gen_range(32..128)); - let SelectedContentEncoder { - content_encoder: mut compressor, - preferred_chunk_size: _, - chunk_ready_to_encode: _, - } = ContentEncoder::select(encoding).unwrap(); + let encoder = ContentEncoder::select(encoding).unwrap(); + let mut compressor = encoder.content_encoder; compressor.write(&body_to_compress).unwrap(); let reference_compressed_bytes = compressor.finish().unwrap(); @@ -498,7 +495,7 @@ mod tests { #[cfg(feature = "compress-gzip")] async fn test_gzip_compression_in_chunks_is_the_same_as_whole_chunk_compression() { for body in BODIES { - test_compression_of_conentent_enconding(ContentEncoding::Gzip, body).await; + test_compression_of_content_encoding(ContentEncoding::Gzip, body).await; } } @@ -506,7 +503,7 @@ mod tests { #[cfg(feature = "compress-gzip")] async fn test_deflate_compression_in_chunks_is_the_same_as_whole_chunk_compression() { for body in BODIES { - test_compression_of_conentent_enconding(ContentEncoding::Deflate, body).await; + test_compression_of_content_encoding(ContentEncoding::Deflate, body).await; } } @@ -514,7 +511,7 @@ mod tests { #[cfg(feature = "compress-brotli")] async fn test_brotli_compression_in_chunks_is_the_same_as_whole_chunk_compression() { for body in BODIES { - test_compression_of_conentent_enconding(ContentEncoding::Brotli, body).await; + test_compression_of_content_encoding(ContentEncoding::Brotli, body).await; } } @@ -522,7 +519,7 @@ mod tests { #[cfg(feature = "compress-zstd")] async fn test_zstd_compression_in_chunks_is_the_same_as_whole_chunk_compression() { for body in BODIES { - test_compression_of_conentent_enconding(ContentEncoding::Zstd, body).await; + test_compression_of_content_encoding(ContentEncoding::Zstd, body).await; } } } From 3f518e0001dadaa05820d25f591dfac744d879cf Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Wed, 18 Oct 2023 13:00:32 +0200 Subject: [PATCH 4/7] Rename to ChunkedContentEncoder. --- actix-http/src/encoding/encoder.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 1cb9ece91..eefe16c7e 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -28,7 +28,7 @@ pin_project! { pub struct Encoder { #[pin] body: EncoderBody, - encoder: Option>, + encoder: Option>, eof: bool, } } @@ -268,14 +268,14 @@ enum ContentEncoder { Zstd(ZstdEncoder<'static, Writer>), } -struct SelectedContentEncoder { +struct ChunkedContentEncoder { content_encoder: ContentEncoder, preferred_chunk_size: usize, chunk_ready_to_encode: Option, } impl ContentEncoder { - fn select(encoding: ContentEncoding) -> Option> { + fn select(encoding: ContentEncoding) -> Option> { // Chunk size picked as max chunk size which took less that 50 µs to compress on "cargo bench --bench compression-chunk-size" // Rust 1.72 linux/arm64 in Docker on Apple M2 Pro: "time to compress chunk/deflate-16384" time: [39.114 µs 39.283 µs 39.457 µs] @@ -289,7 +289,7 @@ impl ContentEncoder { match encoding { #[cfg(feature = "compress-gzip")] - ContentEncoding::Deflate => Some(Box::new(SelectedContentEncoder { + ContentEncoding::Deflate => Some(Box::new(ChunkedContentEncoder { content_encoder: ContentEncoder::Deflate(ZlibEncoder::new( Writer::new(), flate2::Compression::fast(), @@ -299,7 +299,7 @@ impl ContentEncoder { })), #[cfg(feature = "compress-gzip")] - ContentEncoding::Gzip => Some(Box::new(SelectedContentEncoder { + ContentEncoding::Gzip => Some(Box::new(ChunkedContentEncoder { content_encoder: ContentEncoder::Gzip(GzEncoder::new( Writer::new(), flate2::Compression::fast(), @@ -309,7 +309,7 @@ impl ContentEncoder { })), #[cfg(feature = "compress-brotli")] - ContentEncoding::Brotli => Some(Box::new(SelectedContentEncoder { + ContentEncoding::Brotli => Some(Box::new(ChunkedContentEncoder { content_encoder: ContentEncoder::Brotli(new_brotli_compressor()), preferred_chunk_size: MAX_BROTLI_CHUNK_SIZE, chunk_ready_to_encode: None, @@ -318,7 +318,7 @@ impl ContentEncoder { #[cfg(feature = "compress-zstd")] ContentEncoding::Zstd => { let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?; - Some(Box::new(SelectedContentEncoder { + Some(Box::new(ChunkedContentEncoder { content_encoder: ContentEncoder::Zstd(encoder), preferred_chunk_size: MAX_ZSTD_CHUNK_SIZE, chunk_ready_to_encode: None, From 292c95c8a408f0c184e950250032b177cc12c929 Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Wed, 18 Oct 2023 22:18:42 +0200 Subject: [PATCH 5/7] Added cooperative budget. --- actix-http/src/encoding/encoder.rs | 110 +++++++++++++++++++---------- 1 file changed, 73 insertions(+), 37 deletions(-) diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index eefe16c7e..422993bcd 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -11,7 +11,7 @@ use bytes::{Buf, Bytes}; use derive_more::Display; #[cfg(feature = "compress-gzip")] use flate2::write::{GzEncoder, ZlibEncoder}; -use futures_core::ready; +use futures_core::{ready, Stream}; use pin_project_lite::pin_project; use tracing::trace; #[cfg(feature = "compress-zstd")] @@ -28,7 +28,7 @@ pin_project! { pub struct Encoder { #[pin] body: EncoderBody, - encoder: Option>, + encoder: Option>, eof: bool, } } @@ -62,12 +62,12 @@ impl Encoder { if should_encode { // wrap body only if encoder is feature-enabled - if let Some(selected_encoder) = ContentEncoder::select(encoding) { + if let Some(coop_encoder) = CooperativeContentEncoder::select(encoding) { update_head(encoding, head); return Encoder { body, - encoder: Some(selected_encoder), + encoder: Some(coop_encoder), eof: false, }; } @@ -82,8 +82,8 @@ impl Encoder { pub fn with_encode_chunk_size(mut self, size: usize) -> Self { if size > 0 { - if let Some(selected_encoder) = self.encoder.as_mut() { - selected_encoder.preferred_chunk_size = size; + if let Some(coop_encoder) = self.encoder.as_mut() { + coop_encoder.preferred_chunk_size = size; } } self @@ -170,24 +170,16 @@ where return Poll::Ready(None); } - if let Some(selected_encoder) = this.encoder.as_deref_mut() { - if let Some(chunk) = selected_encoder.chunk_ready_to_encode.as_mut() { - let encode_len = chunk.len().min(selected_encoder.preferred_chunk_size); - selected_encoder - .content_encoder - .write(&chunk[..encode_len]) - .map_err(EncoderError::Io)?; - chunk.advance(encode_len); - - if chunk.is_empty() { - selected_encoder.chunk_ready_to_encode = None; + if let Some(cooperative_encoder) = this.encoder.as_deref_mut() { + match ready!(Pin::new(cooperative_encoder).poll_next(cx)) { + Some(Ok(Some(chunk))) => return Poll::Ready(Some(Ok(chunk))), + Some(Ok(None)) => { + // Need more data from uncompressed body } - - let encoded_chunk = selected_encoder.content_encoder.take(); - if encoded_chunk.is_empty() { - continue; + Some(Err(err)) => return Poll::Ready(Some(Err(err))), + None => { + unreachable!() } - return Poll::Ready(Some(Ok(encoded_chunk))); } } @@ -199,13 +191,15 @@ where Some(Ok(chunk)) => match this.encoder.as_deref_mut() { None => return Poll::Ready(Some(Ok(chunk))), Some(encoder) => { + debug_assert!(encoder.chunk_ready_to_encode.is_none()); encoder.chunk_ready_to_encode = Some(chunk); + encoder.budget_used = 0; } }, None => { - if let Some(selected_encoder) = this.encoder.take() { - let chunk = selected_encoder + if let Some(coop_encoder) = this.encoder.take() { + let chunk = coop_encoder .content_encoder .finish() .map_err(EncoderError::Io)?; @@ -268,14 +262,15 @@ enum ContentEncoder { Zstd(ZstdEncoder<'static, Writer>), } -struct ChunkedContentEncoder { +struct CooperativeContentEncoder { content_encoder: ContentEncoder, preferred_chunk_size: usize, chunk_ready_to_encode: Option, + budget_used: u8, } -impl ContentEncoder { - fn select(encoding: ContentEncoding) -> Option> { +impl CooperativeContentEncoder { + fn select(encoding: ContentEncoding) -> Option> { // Chunk size picked as max chunk size which took less that 50 µs to compress on "cargo bench --bench compression-chunk-size" // Rust 1.72 linux/arm64 in Docker on Apple M2 Pro: "time to compress chunk/deflate-16384" time: [39.114 µs 39.283 µs 39.457 µs] @@ -289,49 +284,55 @@ impl ContentEncoder { match encoding { #[cfg(feature = "compress-gzip")] - ContentEncoding::Deflate => Some(Box::new(ChunkedContentEncoder { + ContentEncoding::Deflate => Some(Box::new(CooperativeContentEncoder { content_encoder: ContentEncoder::Deflate(ZlibEncoder::new( Writer::new(), flate2::Compression::fast(), )), preferred_chunk_size: MAX_DEFLATE_CHUNK_SIZE, chunk_ready_to_encode: None, + budget_used: 0, })), #[cfg(feature = "compress-gzip")] - ContentEncoding::Gzip => Some(Box::new(ChunkedContentEncoder { + ContentEncoding::Gzip => Some(Box::new(CooperativeContentEncoder { content_encoder: ContentEncoder::Gzip(GzEncoder::new( Writer::new(), flate2::Compression::fast(), )), preferred_chunk_size: MAX_GZIP_CHUNK_SIZE, chunk_ready_to_encode: None, + budget_used: 0, })), #[cfg(feature = "compress-brotli")] - ContentEncoding::Brotli => Some(Box::new(ChunkedContentEncoder { + ContentEncoding::Brotli => Some(Box::new(CooperativeContentEncoder { content_encoder: ContentEncoder::Brotli(new_brotli_compressor()), preferred_chunk_size: MAX_BROTLI_CHUNK_SIZE, chunk_ready_to_encode: None, + budget_used: 0, })), #[cfg(feature = "compress-zstd")] ContentEncoding::Zstd => { let encoder = ZstdEncoder::new(Writer::new(), 3).ok()?; - Some(Box::new(ChunkedContentEncoder { + Some(Box::new(CooperativeContentEncoder { content_encoder: ContentEncoder::Zstd(encoder), preferred_chunk_size: MAX_ZSTD_CHUNK_SIZE, chunk_ready_to_encode: None, + budget_used: 0, })) } _ => None, } } +} +impl ContentEncoder { #[inline] pub(crate) fn take(&mut self) -> Bytes { - match *self { + match self { #[cfg(feature = "compress-brotli")] ContentEncoder::Brotli(ref mut encoder) => encoder.get_mut().take(), @@ -375,7 +376,7 @@ impl ContentEncoder { } fn write(&mut self, data: &[u8]) -> Result<(), io::Error> { - match *self { + match self { #[cfg(feature = "compress-brotli")] ContentEncoder::Brotli(ref mut encoder) => match encoder.write_all(data) { Ok(_) => Ok(()), @@ -415,6 +416,42 @@ impl ContentEncoder { } } +impl futures_core::Stream for CooperativeContentEncoder { + type Item = Result, EncoderError>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + loop { + if this.budget_used > 8 { + this.budget_used = 0; + cx.waker().wake_by_ref(); + return Poll::Pending; + } + if let Some(mut chunk) = this.chunk_ready_to_encode.take() { + let encode_len = chunk.len().min(this.preferred_chunk_size); + this.content_encoder + .write(&chunk[..encode_len]) + .map_err(EncoderError::Io)?; + chunk.advance(encode_len); + + if !chunk.is_empty() { + this.chunk_ready_to_encode = Some(chunk); + } + + let encoded_chunk = this.content_encoder.take(); + if encoded_chunk.is_empty() { + continue; + } + + this.budget_used += 1; + return Poll::Ready(Some(Ok(Some(encoded_chunk)))); + } else { + return Poll::Ready(Some(Ok(None))); + } + } + } +} + #[cfg(feature = "compress-brotli")] fn new_brotli_compressor() -> Box> { Box::new(brotli::CompressorWriter::new( @@ -477,10 +514,9 @@ mod tests { let compressed_body = Encoder::response(encoding, &mut head, body_to_compress.clone()) .with_encode_chunk_size(rand::thread_rng().gen_range(32..128)); - let encoder = ContentEncoder::select(encoding).unwrap(); - let mut compressor = encoder.content_encoder; - compressor.write(&body_to_compress).unwrap(); - let reference_compressed_bytes = compressor.finish().unwrap(); + let mut encoder = CooperativeContentEncoder::select(encoding).unwrap(); + encoder.content_encoder.write(&body_to_compress).unwrap(); + let reference_compressed_bytes = encoder.content_encoder.finish().unwrap(); let compressed_bytes = body::to_bytes_limited(compressed_body, 256 + body_to_compress.len()) From 7cc4210eb3f8323aacefac8064002090e5f62fa3 Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Wed, 18 Oct 2023 22:35:44 +0200 Subject: [PATCH 6/7] Unuse stream trait. --- actix-http/src/encoding/encoder.rs | 96 +++++++++++++++--------------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 422993bcd..7385f71c5 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -11,7 +11,7 @@ use bytes::{Buf, Bytes}; use derive_more::Display; #[cfg(feature = "compress-gzip")] use flate2::write::{GzEncoder, ZlibEncoder}; -use futures_core::{ready, Stream}; +use futures_core::ready; use pin_project_lite::pin_project; use tracing::trace; #[cfg(feature = "compress-zstd")] @@ -171,15 +171,12 @@ where } if let Some(cooperative_encoder) = this.encoder.as_deref_mut() { - match ready!(Pin::new(cooperative_encoder).poll_next(cx)) { - Some(Ok(Some(chunk))) => return Poll::Ready(Some(Ok(chunk))), - Some(Ok(None)) => { - // Need more data from uncompressed body - } - Some(Err(err)) => return Poll::Ready(Some(Err(err))), - None => { - unreachable!() + match ready!(Pin::new(cooperative_encoder).poll_encoded_chunk(cx)) { + Ok(Some(encoded_chunk)) => return Poll::Ready(Some(Ok(encoded_chunk))), + Ok(None) => { + // Need next chunk from uncompressed body } + Err(err) => return Poll::Ready(Some(Err(err))), } } @@ -191,9 +188,7 @@ where Some(Ok(chunk)) => match this.encoder.as_deref_mut() { None => return Poll::Ready(Some(Ok(chunk))), Some(encoder) => { - debug_assert!(encoder.chunk_ready_to_encode.is_none()); - encoder.chunk_ready_to_encode = Some(chunk); - encoder.budget_used = 0; + encoder.push_chunk(chunk); } }, @@ -327,6 +322,47 @@ impl CooperativeContentEncoder { _ => None, } } + + fn push_chunk(&mut self, chunk: Bytes) { + debug_assert!(self.chunk_ready_to_encode.is_none()); + self.chunk_ready_to_encode = Some(chunk); + self.budget_used = 0 + } + + fn poll_encoded_chunk( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, EncoderError>> { + let this = self.get_mut(); + loop { + if this.budget_used > 8 { + this.budget_used = 0; + cx.waker().wake_by_ref(); + return Poll::Pending; + } + if let Some(mut chunk) = this.chunk_ready_to_encode.take() { + let encode_len = chunk.len().min(this.preferred_chunk_size); + this.content_encoder + .write(&chunk[..encode_len]) + .map_err(EncoderError::Io)?; + chunk.advance(encode_len); + + if !chunk.is_empty() { + this.chunk_ready_to_encode = Some(chunk); + } + + let encoded_chunk = this.content_encoder.take(); + if encoded_chunk.is_empty() { + continue; + } + + this.budget_used += 1; + return Poll::Ready(Ok(Some(encoded_chunk))); + } else { + return Poll::Ready(Ok(None)); + } + } + } } impl ContentEncoder { @@ -416,42 +452,6 @@ impl ContentEncoder { } } -impl futures_core::Stream for CooperativeContentEncoder { - type Item = Result, EncoderError>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - loop { - if this.budget_used > 8 { - this.budget_used = 0; - cx.waker().wake_by_ref(); - return Poll::Pending; - } - if let Some(mut chunk) = this.chunk_ready_to_encode.take() { - let encode_len = chunk.len().min(this.preferred_chunk_size); - this.content_encoder - .write(&chunk[..encode_len]) - .map_err(EncoderError::Io)?; - chunk.advance(encode_len); - - if !chunk.is_empty() { - this.chunk_ready_to_encode = Some(chunk); - } - - let encoded_chunk = this.content_encoder.take(); - if encoded_chunk.is_empty() { - continue; - } - - this.budget_used += 1; - return Poll::Ready(Some(Ok(Some(encoded_chunk)))); - } else { - return Poll::Ready(Some(Ok(None))); - } - } - } -} - #[cfg(feature = "compress-brotli")] fn new_brotli_compressor() -> Box> { Box::new(brotli::CompressorWriter::new( From 7ecbefde38b6b39e54ed6b046cfd255479bb044d Mon Sep 17 00:00:00 2001 From: Yury Yarashevich Date: Thu, 19 Oct 2023 11:24:14 +0200 Subject: [PATCH 7/7] Added comment. --- actix-http/src/encoding/encoder.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 7385f71c5..ca7597658 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -333,9 +333,14 @@ impl CooperativeContentEncoder { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, EncoderError>> { + // The maximum computing budget can be utilized before yielding voluntarily + // See inspiration: + // https://tokio.rs/blog/2020-04-preemption + // https://ryhl.io/blog/async-what-is-blocking/ + const BUDGET_LIMIT: u8 = 8; let this = self.get_mut(); loop { - if this.budget_used > 8 { + if this.budget_used > BUDGET_LIMIT { this.budget_used = 0; cx.waker().wake_by_ref(); return Poll::Pending;