From cffe5d271e8b619078952b68860ae64e5d11732d Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Sat, 18 Apr 2026 13:22:45 +0900 Subject: [PATCH] http: add h1 write buffer size setting (#3986) Co-authored-by: Nicolas Grondin --- actix-http/CHANGES.md | 1 + actix-http/src/builder.rs | 30 ++++- actix-http/src/config.rs | 30 +++++ actix-http/src/h1/dispatcher.rs | 6 +- actix-http/src/h1/dispatcher_tests.rs | 172 ++++++++++++++++++++++++++ actix-web/CHANGES.md | 1 + actix-web/src/server.rs | 68 +++++++++- 7 files changed, 300 insertions(+), 8 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index e68acb254..68a6188c4 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -15,6 +15,7 @@ - Fix `HeaderMap` iterators' `len()` and `size_hint()` implementations for multi-value headers. - Update `rand` dependency to `0.10`. - Update `sha1` dependency to `0.11`. +- Add `ServiceConfigBuilder::h1_write_buffer_size()` and `HttpServiceBuilder::h1_write_buffer_size()`. [#3953]: https://github.com/actix/actix-web/pull/3953 diff --git a/actix-http/src/builder.rs b/actix-http/src/builder.rs index fff7ceefe..f14db068d 100644 --- a/actix-http/src/builder.rs +++ b/actix-http/src/builder.rs @@ -5,7 +5,9 @@ use actix_service::{IntoServiceFactory, Service, ServiceFactory}; use crate::{ body::{BoxBody, MessageBody}, - config::{DEFAULT_H2_CONN_WINDOW_SIZE, DEFAULT_H2_STREAM_WINDOW_SIZE}, + config::{ + DEFAULT_H1_WRITE_BUFFER_SIZE, DEFAULT_H2_CONN_WINDOW_SIZE, DEFAULT_H2_STREAM_WINDOW_SIZE, + }, h1::{self, ExpectHandler, H1Service, UpgradeHandler}, service::HttpService, ConnectCallback, Extensions, KeepAlive, Request, Response, ServiceConfigBuilder, @@ -22,6 +24,7 @@ pub struct HttpServiceBuilder { secure: bool, local_addr: Option, h1_allow_half_closed: bool, + h1_write_buffer_size: usize, h2_conn_window_size: u32, h2_stream_window_size: u32, expect: X, @@ -47,6 +50,7 @@ where secure: false, local_addr: None, h1_allow_half_closed: true, + h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE, h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE, h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE, @@ -151,6 +155,25 @@ where self } + /// Sets the maximum response write buffer size for HTTP/1 connections. + /// + /// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream. + /// + /// The default value is 32 KiB. + /// + /// # Panics + /// + /// Panics if `size` is 0. + pub fn h1_write_buffer_size(mut self, size: usize) -> Self { + assert!( + size > 0, + "HTTP/1 write buffer size must be greater than zero" + ); + + self.h1_write_buffer_size = size; + self + } + /// Sets initial stream-level flow control window size for HTTP/2 connections. /// /// See [`ServiceConfigBuilder::h2_initial_window_size`] for more details. @@ -187,6 +210,7 @@ where secure: self.secure, local_addr: self.local_addr, h1_allow_half_closed: self.h1_allow_half_closed, + h1_write_buffer_size: self.h1_write_buffer_size, h2_conn_window_size: self.h2_conn_window_size, h2_stream_window_size: self.h2_stream_window_size, expect: expect.into_factory(), @@ -215,6 +239,7 @@ where secure: self.secure, local_addr: self.local_addr, h1_allow_half_closed: self.h1_allow_half_closed, + h1_write_buffer_size: self.h1_write_buffer_size, h2_conn_window_size: self.h2_conn_window_size, h2_stream_window_size: self.h2_stream_window_size, expect: self.expect, @@ -254,6 +279,7 @@ where .secure(self.secure) .local_addr(self.local_addr) .h1_allow_half_closed(self.h1_allow_half_closed) + .h1_write_buffer_size(self.h1_write_buffer_size) .h2_initial_window_size(self.h2_stream_window_size) .h2_initial_connection_window_size(self.h2_conn_window_size) .build(); @@ -283,6 +309,7 @@ where .secure(self.secure) .local_addr(self.local_addr) .h1_allow_half_closed(self.h1_allow_half_closed) + .h1_write_buffer_size(self.h1_write_buffer_size) .h2_initial_window_size(self.h2_stream_window_size) .h2_initial_connection_window_size(self.h2_conn_window_size) .build(); @@ -309,6 +336,7 @@ where .secure(self.secure) .local_addr(self.local_addr) .h1_allow_half_closed(self.h1_allow_half_closed) + .h1_write_buffer_size(self.h1_write_buffer_size) .h2_initial_window_size(self.h2_stream_window_size) .h2_initial_connection_window_size(self.h2_conn_window_size) .build(); diff --git a/actix-http/src/config.rs b/actix-http/src/config.rs index c0fbc7521..9c86b9d63 100644 --- a/actix-http/src/config.rs +++ b/actix-http/src/config.rs @@ -18,6 +18,9 @@ pub(crate) const DEFAULT_H2_CONN_WINDOW_SIZE: u32 = 1024 * 1024 * 2; // 2MiB /// Matches awc's defaults to avoid poor throughput on high-BDP links. pub(crate) const DEFAULT_H2_STREAM_WINDOW_SIZE: u32 = 1024 * 1024; // 1MiB +/// Default HTTP/1 response write buffer size. +pub(crate) const DEFAULT_H1_WRITE_BUFFER_SIZE: usize = 32_768; + /// A builder for creating a [`ServiceConfig`] #[derive(Default, Debug)] pub struct ServiceConfigBuilder { @@ -86,6 +89,25 @@ impl ServiceConfigBuilder { self } + /// Sets the maximum response write buffer size for HTTP/1 connections. + /// + /// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream. + /// + /// The default value is 32 KiB. + /// + /// # Panics + /// + /// Panics if `size` is 0. + pub fn h1_write_buffer_size(mut self, size: usize) -> Self { + assert!( + size > 0, + "HTTP/1 write buffer size must be greater than zero" + ); + + self.inner.h1_write_buffer_size = size; + self + } + /// Sets initial stream-level flow control window size for HTTP/2 connections. /// /// Higher values can improve upload performance on high-latency links at the cost of higher @@ -128,6 +150,7 @@ struct Inner { tcp_nodelay: Option, date_service: DateService, h1_allow_half_closed: bool, + h1_write_buffer_size: usize, h2_conn_window_size: u32, h2_stream_window_size: u32, } @@ -143,6 +166,7 @@ impl Default for Inner { tcp_nodelay: None, date_service: DateService::new(), h1_allow_half_closed: true, + h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE, h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE, h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE, } @@ -167,6 +191,7 @@ impl ServiceConfig { tcp_nodelay: None, date_service: DateService::new(), h1_allow_half_closed: true, + h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE, h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE, h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE, })) @@ -228,6 +253,11 @@ impl ServiceConfig { self.0.h1_allow_half_closed } + /// HTTP/1 response write buffer size (in bytes). + pub fn h1_write_buffer_size(&self) -> usize { + self.0.h1_write_buffer_size + } + /// Returns configured `TCP_NODELAY` setting for accepted TCP connections. pub fn tcp_nodelay(&self) -> Option { self.0.tcp_nodelay diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 267b9dfb1..6ef48b038 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -171,6 +171,7 @@ pin_project! { pub(super) io: Option, read_buf: BytesMut, write_buf: BytesMut, + h1_write_buffer_size: usize, codec: Codec, } } @@ -284,6 +285,7 @@ where io: Some(io), read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), + h1_write_buffer_size: config.h1_write_buffer_size(), codec: Codec::new(config), }, }, @@ -618,7 +620,7 @@ where StateProj::SendPayload { mut body } => { // keep populate writer buffer until buffer size limit hit, // get blocked or finished. - while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { + while this.write_buf.len() < *this.h1_write_buffer_size { match body.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(item))) => { this.codec @@ -680,7 +682,7 @@ where // keep populate writer buffer until buffer size limit hit, // get blocked or finished. - while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { + while this.write_buf.len() < *this.h1_write_buffer_size { match body.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(item))) => { this.codec diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index fb9c1823b..a9262a483 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -1,6 +1,9 @@ use std::{ + cell::Cell, future::Future, + io, pin::Pin, + rc::Rc, str, task::{Context, Poll}, time::Duration, @@ -46,6 +49,111 @@ impl Service for YieldService { } } +struct ReadyChunkBody { + chunk_polls: Rc>, + remaining: usize, + chunk_len: usize, +} + +impl ReadyChunkBody { + fn new(chunk_polls: Rc>, remaining: usize, chunk_len: usize) -> Self { + Self { + chunk_polls, + remaining, + chunk_len, + } + } +} + +impl MessageBody for ReadyChunkBody { + type Error = Error; + + fn size(&self) -> crate::body::BodySize { + crate::body::BodySize::Stream + } + + fn poll_next( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll>> { + if self.remaining == 0 { + return Poll::Ready(None); + } + + self.remaining -= 1; + self.chunk_polls.set(self.chunk_polls.get() + 1); + + Poll::Ready(Some(Ok(Bytes::from(vec![b'x'; self.chunk_len])))) + } +} + +struct PendingOnceWriteBuf { + io: TestBuffer, + block_next_write: bool, +} + +impl PendingOnceWriteBuf { + fn new(data: T) -> Self + where + T: Into, + { + Self { + io: TestBuffer::new(data), + block_next_write: true, + } + } +} + +impl io::Read for PendingOnceWriteBuf { + fn read(&mut self, dst: &mut [u8]) -> Result { + self.io.read(dst) + } +} + +impl io::Write for PendingOnceWriteBuf { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.io.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.io.flush() + } +} + +impl actix_codec::AsyncRead for PendingOnceWriteBuf { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut actix_codec::ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.io).poll_read(cx, buf) + } +} + +impl actix_codec::AsyncWrite for PendingOnceWriteBuf { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + if self.block_next_write { + self.block_next_write = false; + cx.waker().wake_by_ref(); + return Poll::Pending; + } + + Pin::new(&mut self.io).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.io).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.io).poll_shutdown(cx) + } +} + fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option { memchr::memmem::find(&haystack[from..], needle) } @@ -108,6 +216,18 @@ fn echo_payload_service() -> impl Service, E }) } +fn ready_chunk_body_service( + chunk_polls: Rc>, + chunk_count: usize, + chunk_len: usize, +) -> impl Service, Error = Error> { + fn_service(move |_req: Request| { + ready(Ok::<_, Error>(Response::ok().set_body( + ReadyChunkBody::new(chunk_polls.clone(), chunk_count, chunk_len), + ))) + }) +} + #[actix_rt::test] async fn late_request() { let mut buf = TestBuffer::empty(); @@ -1364,6 +1484,58 @@ async fn disallow_half_closed() { assert!(matches!(inner.state, State::ServiceCall { .. })) } +#[actix_rt::test] +async fn h1_write_buffer_size_limits_buffering() { + let request = "GET /stream HTTP/1.1\r\nConnection: close\r\n\r\n"; + + let default_polls = Rc::new(Cell::new(0)); + let default_services = HttpFlow::new( + ready_chunk_body_service(default_polls.clone(), 8, 1024), + ExpectHandler, + None::, + ); + let default_io = PendingOnceWriteBuf::new(request); + let default_dispatcher = Dispatcher::new( + default_io, + default_services, + ServiceConfig::default(), + None, + OnConnectData::default(), + ); + pin!(default_dispatcher); + + let mut cx = Context::from_waker(futures_util::task::noop_waker_ref()); + assert!(default_dispatcher.as_mut().poll(&mut cx).is_pending()); + assert_eq!(default_polls.get(), 8); + + let custom_polls = Rc::new(Cell::new(0)); + let custom_services = HttpFlow::new( + ready_chunk_body_service(custom_polls.clone(), 8, 1024), + ExpectHandler, + None::, + ); + let custom_io = PendingOnceWriteBuf::new(request); + let custom_dispatcher = Dispatcher::new( + custom_io, + custom_services, + crate::config::ServiceConfigBuilder::new() + .h1_write_buffer_size(1024) + .build(), + None, + OnConnectData::default(), + ); + pin!(custom_dispatcher); + + assert!(custom_dispatcher.as_mut().poll(&mut cx).is_pending()); + assert_eq!(custom_polls.get(), 1); +} + +#[actix_rt::test] +#[should_panic(expected = "HTTP/1 write buffer size must be greater than zero")] +async fn h1_write_buffer_size_rejects_zero() { + let _ = crate::config::ServiceConfigBuilder::new().h1_write_buffer_size(0); +} + fn http_msg(msg: impl AsRef) -> BytesMut { let mut msg = msg .as_ref() diff --git a/actix-web/CHANGES.md b/actix-web/CHANGES.md index 1c48a27ce..884017a47 100644 --- a/actix-web/CHANGES.md +++ b/actix-web/CHANGES.md @@ -8,6 +8,7 @@ - Fix `HttpRequest::{match_pattern,match_name}` reporting path-only matches when route guards disambiguate overlapping resources. [#3346] - Fix `Readlines` handling of lines split across payload chunks so combined line limits are enforced and complete lines are yielded. - Update `rand` dependency to `0.10`. +- Add `HttpServer::h1_write_buffer_size()`. [#3944]: https://github.com/actix/actix-web/pull/3944 [#3346]: https://github.com/actix/actix-web/issues/3346 diff --git a/actix-web/src/server.rs b/actix-web/src/server.rs index 1965d458e..3f5899b50 100644 --- a/actix-web/src/server.rs +++ b/actix-web/src/server.rs @@ -33,6 +33,7 @@ struct Config { client_request_timeout: Duration, client_disconnect_timeout: Duration, h1_allow_half_closed: bool, + h1_write_buffer_size: Option, h2_initial_window_size: Option, h2_initial_connection_window_size: Option, #[allow(dead_code)] // only dead when no TLS features are enabled @@ -122,6 +123,7 @@ where client_request_timeout: Duration::from_secs(5), client_disconnect_timeout: Duration::from_secs(1), h1_allow_half_closed: true, + h1_write_buffer_size: None, h2_initial_window_size: None, h2_initial_connection_window_size: None, tls_handshake_timeout: None, @@ -286,6 +288,25 @@ where self } + /// Sets the maximum response write buffer size for HTTP/1 connections. + /// + /// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream. + /// + /// The default value is 32 KiB. + /// + /// # Panics + /// + /// Panics if `size` is 0. + pub fn h1_write_buffer_size(self, size: usize) -> Self { + assert!( + size > 0, + "HTTP/1 write buffer size must be greater than zero" + ); + + self.config.lock().unwrap().h1_write_buffer_size = Some(size); + self + } + /// Sets initial stream-level flow control window size for HTTP/2 connections. /// /// Higher values can improve upload performance on high-latency links at the cost of higher @@ -629,6 +650,10 @@ where svc = svc.tcp_nodelay(enabled); } + if let Some(size) = cfg.h1_write_buffer_size { + svc = svc.h1_write_buffer_size(size); + } + if let Some(val) = cfg.h2_initial_window_size { svc = svc.h2_initial_window_size(val); } @@ -686,6 +711,10 @@ where svc = svc.tcp_nodelay(enabled); } + if let Some(size) = cfg.h1_write_buffer_size { + svc = svc.h1_write_buffer_size(size); + } + if let Some(val) = cfg.h2_initial_window_size { svc = svc.h2_initial_window_size(val); } @@ -774,6 +803,10 @@ where svc = svc.tcp_nodelay(enabled); } + if let Some(size) = c.h1_write_buffer_size { + svc = svc.h1_write_buffer_size(size); + } + if let Some(val) = c.h2_initial_window_size { svc = svc.h2_initial_window_size(val); } @@ -837,6 +870,10 @@ where svc = svc.tcp_nodelay(enabled); } + if let Some(size) = c.h1_write_buffer_size { + svc = svc.h1_write_buffer_size(size); + } + if let Some(val) = c.h2_initial_window_size { svc = svc.h2_initial_window_size(val); } @@ -915,6 +952,10 @@ where svc = svc.tcp_nodelay(enabled); } + if let Some(size) = c.h1_write_buffer_size { + svc = svc.h1_write_buffer_size(size); + } + if let Some(val) = c.h2_initial_window_size { svc = svc.h2_initial_window_size(val); } @@ -993,6 +1034,10 @@ where svc = svc.tcp_nodelay(enabled); } + if let Some(size) = c.h1_write_buffer_size { + svc = svc.h1_write_buffer_size(size); + } + if let Some(val) = c.h2_initial_window_size { svc = svc.h2_initial_window_size(val); } @@ -1072,6 +1117,10 @@ where svc = svc.tcp_nodelay(enabled); } + if let Some(size) = c.h1_write_buffer_size { + svc = svc.h1_write_buffer_size(size); + } + if let Some(val) = c.h2_initial_window_size { svc = svc.h2_initial_window_size(val); } @@ -1140,14 +1189,19 @@ where .into_factory() .map_err(|err| err.into().error_response()); - fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then( - HttpService::build() + fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then({ + let mut svc = HttpService::build() .keep_alive(c.keep_alive) .client_request_timeout(c.client_request_timeout) .client_disconnect_timeout(c.client_disconnect_timeout) - .h1_allow_half_closed(c.h1_allow_half_closed) - .finish(map_config(fac, move |_| config.clone())), - ) + .h1_allow_half_closed(c.h1_allow_half_closed); + + if let Some(size) = c.h1_write_buffer_size { + svc = svc.h1_write_buffer_size(size); + } + + svc.finish(map_config(fac, move |_| config.clone())) + }) }, )?; @@ -1194,6 +1248,10 @@ where svc = svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext)); } + if let Some(size) = c.h1_write_buffer_size { + svc = svc.h1_write_buffer_size(size); + } + let fac = factory() .into_factory() .map_err(|err| err.into().error_response());