fix(encoding): flush codec after each chunk for streaming responses Without an explicit flush, gzip/deflate/zstd buffers all chunks and releases them only at stream EOF, causing middleware::Compress to delay all output until the response stream ends. Fixes #3410

This commit is contained in:
Rachit2323 2026-05-03 16:47:23 +05:30
parent 101c608124
commit a09b738d04
2 changed files with 68 additions and 0 deletions

View File

@ -208,6 +208,7 @@ where
if let Some(mut encoder) = this.encoder.take() { if let Some(mut encoder) = this.encoder.take() {
if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE { if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE {
encoder.write(&chunk).map_err(EncoderError::Io)?; encoder.write(&chunk).map_err(EncoderError::Io)?;
encoder.flush().map_err(EncoderError::Io)?;
let chunk = encoder.take(); let chunk = encoder.take();
*this.encoder = Some(encoder); *this.encoder = Some(encoder);
@ -217,6 +218,7 @@ where
} else { } else {
*this.fut = Some(spawn_blocking(move || { *this.fut = Some(spawn_blocking(move || {
encoder.write(&chunk)?; encoder.write(&chunk)?;
encoder.flush()?;
Ok(encoder) Ok(encoder)
})); }));
} }
@ -360,6 +362,26 @@ impl ContentEncoder {
} }
} }
/// Flush internal codec buffers so compressed bytes are available via [`take`](Self::take).
///
/// Calling this after every [`write`](Self::write) ensures streaming responses emit data
/// promptly instead of waiting until the stream ends.
fn flush(&mut self) -> Result<(), io::Error> {
match *self {
#[cfg(feature = "compress-brotli")]
ContentEncoder::Brotli(ref mut encoder) => encoder.flush(),
#[cfg(feature = "compress-gzip")]
ContentEncoder::Deflate(ref mut encoder) => encoder.flush(),
#[cfg(feature = "compress-gzip")]
ContentEncoder::Gzip(ref mut encoder) => encoder.flush(),
#[cfg(feature = "compress-zstd")]
ContentEncoder::Zstd(ref mut encoder) => encoder.flush(),
}
}
fn write(&mut self, data: &[u8]) -> Result<(), io::Error> { fn write(&mut self, data: &[u8]) -> Result<(), io::Error> {
match *self { match *self {
#[cfg(feature = "compress-brotli")] #[cfg(feature = "compress-brotli")]

View File

@ -405,6 +405,52 @@ async fn test_body_zstd_streaming() {
srv.stop().await; srv.stop().await;
} }
/// Regression test for https://github.com/actix/actix-web/issues/3410
///
/// Compress middleware must flush each chunk immediately so streaming responses
/// deliver data progressively rather than buffering everything until the stream ends.
#[actix_rt::test]
#[cfg(feature = "compress-gzip")]
async fn test_compress_streaming_flushes_chunks() {
use futures_util::StreamExt as _;
let srv = actix_test::start_with(actix_test::config().h1(), || {
App::new().wrap(Compress::default()).service(
web::resource("/").route(web::get().to(|| async {
// Two-chunk stream: first chunk arrives immediately, second after 500ms.
// Without the flush fix both chunks arrive together after 500ms.
let s = futures_util::stream::once(async {
Ok::<_, std::io::Error>(Bytes::from("hello"))
})
.chain(futures_util::stream::once(async {
tokio::time::sleep(Duration::from_millis(500)).await;
Ok::<_, std::io::Error>(Bytes::from(" world"))
}));
HttpResponse::Ok().streaming(s)
})),
)
});
let mut res = srv
.get("/")
.no_decompress()
.append_header((header::ACCEPT_ENCODING, "gzip"))
.send()
.await
.unwrap();
assert_eq!(res.status(), StatusCode::OK);
// The first compressed chunk must arrive well before the 500ms delay.
let chunk = tokio::time::timeout(Duration::from_millis(200), res.next())
.await
.expect("first chunk must arrive before the 500ms stream delay (compress flush bug)")
.expect("stream should not be empty");
assert!(chunk.is_ok(), "chunk should not be an error");
srv.stop().await;
}
#[actix_rt::test] #[actix_rt::test]
async fn test_zstd_encoding() { async fn test_zstd_encoding() {
let srv = actix_test::start_with(actix_test::config().h1(), || { let srv = actix_test::start_with(actix_test::config().h1(), || {