diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index f40338689..810379566 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -208,6 +208,7 @@ where if let Some(mut encoder) = this.encoder.take() { if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE { encoder.write(&chunk).map_err(EncoderError::Io)?; + encoder.flush().map_err(EncoderError::Io)?; let chunk = encoder.take(); *this.encoder = Some(encoder); @@ -217,6 +218,7 @@ where } else { *this.fut = Some(spawn_blocking(move || { encoder.write(&chunk)?; + encoder.flush()?; Ok(encoder) })); } @@ -360,6 +362,26 @@ impl ContentEncoder { } } + /// Flush internal codec buffers so compressed bytes are available via [`take`](Self::take). + /// + /// Calling this after every [`write`](Self::write) ensures streaming responses emit data + /// promptly instead of waiting until the stream ends. + fn flush(&mut self) -> Result<(), io::Error> { + match *self { + #[cfg(feature = "compress-brotli")] + ContentEncoder::Brotli(ref mut encoder) => encoder.flush(), + + #[cfg(feature = "compress-gzip")] + ContentEncoder::Deflate(ref mut encoder) => encoder.flush(), + + #[cfg(feature = "compress-gzip")] + ContentEncoder::Gzip(ref mut encoder) => encoder.flush(), + + #[cfg(feature = "compress-zstd")] + ContentEncoder::Zstd(ref mut encoder) => encoder.flush(), + } + } + fn write(&mut self, data: &[u8]) -> Result<(), io::Error> { match *self { #[cfg(feature = "compress-brotli")] diff --git a/actix-web/tests/test_server.rs b/actix-web/tests/test_server.rs index 343b7f104..a710930f3 100644 --- a/actix-web/tests/test_server.rs +++ b/actix-web/tests/test_server.rs @@ -405,6 +405,52 @@ async fn test_body_zstd_streaming() { srv.stop().await; } +/// Regression test for https://github.com/actix/actix-web/issues/3410 +/// +/// Compress middleware must flush each chunk immediately so streaming responses +/// deliver data progressively rather than buffering everything until the stream ends. +#[actix_rt::test] +#[cfg(feature = "compress-gzip")] +async fn test_compress_streaming_flushes_chunks() { + use futures_util::StreamExt as _; + + let srv = actix_test::start_with(actix_test::config().h1(), || { + App::new().wrap(Compress::default()).service( + web::resource("/").route(web::get().to(|| async { + // Two-chunk stream: first chunk arrives immediately, second after 500ms. + // Without the flush fix both chunks arrive together after 500ms. + let s = futures_util::stream::once(async { + Ok::<_, std::io::Error>(Bytes::from("hello")) + }) + .chain(futures_util::stream::once(async { + tokio::time::sleep(Duration::from_millis(500)).await; + Ok::<_, std::io::Error>(Bytes::from(" world")) + })); + HttpResponse::Ok().streaming(s) + })), + ) + }); + + let mut res = srv + .get("/") + .no_decompress() + .append_header((header::ACCEPT_ENCODING, "gzip")) + .send() + .await + .unwrap(); + + assert_eq!(res.status(), StatusCode::OK); + + // The first compressed chunk must arrive well before the 500ms delay. + let chunk = tokio::time::timeout(Duration::from_millis(200), res.next()) + .await + .expect("first chunk must arrive before the 500ms stream delay (compress flush bug)") + .expect("stream should not be empty"); + assert!(chunk.is_ok(), "chunk should not be an error"); + + srv.stop().await; +} + #[actix_rt::test] async fn test_zstd_encoding() { let srv = actix_test::start_with(actix_test::config().h1(), || {