http: add h1 write buffer size setting (#3986)

Co-authored-by: Nicolas Grondin <ngrondin78@gmail.com>
This commit is contained in:
Yuki Okushi 2026-04-18 13:22:45 +09:00 committed by GitHub
parent 10609f749d
commit cffe5d271e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 300 additions and 8 deletions

View File

@ -15,6 +15,7 @@
- Fix `HeaderMap` iterators' `len()` and `size_hint()` implementations for multi-value headers.
- Update `rand` dependency to `0.10`.
- Update `sha1` dependency to `0.11`.
- Add `ServiceConfigBuilder::h1_write_buffer_size()` and `HttpServiceBuilder::h1_write_buffer_size()`.
[#3953]: https://github.com/actix/actix-web/pull/3953

View File

@ -5,7 +5,9 @@ use actix_service::{IntoServiceFactory, Service, ServiceFactory};
use crate::{
body::{BoxBody, MessageBody},
config::{DEFAULT_H2_CONN_WINDOW_SIZE, DEFAULT_H2_STREAM_WINDOW_SIZE},
config::{
DEFAULT_H1_WRITE_BUFFER_SIZE, DEFAULT_H2_CONN_WINDOW_SIZE, DEFAULT_H2_STREAM_WINDOW_SIZE,
},
h1::{self, ExpectHandler, H1Service, UpgradeHandler},
service::HttpService,
ConnectCallback, Extensions, KeepAlive, Request, Response, ServiceConfigBuilder,
@ -22,6 +24,7 @@ pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> {
secure: bool,
local_addr: Option<net::SocketAddr>,
h1_allow_half_closed: bool,
h1_write_buffer_size: usize,
h2_conn_window_size: u32,
h2_stream_window_size: u32,
expect: X,
@ -47,6 +50,7 @@ where
secure: false,
local_addr: None,
h1_allow_half_closed: true,
h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE,
h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE,
h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE,
@ -151,6 +155,25 @@ where
self
}
/// Sets the maximum response write buffer size for HTTP/1 connections.
///
/// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream.
///
/// The default value is 32 KiB.
///
/// # Panics
///
/// Panics if `size` is 0.
pub fn h1_write_buffer_size(mut self, size: usize) -> Self {
assert!(
size > 0,
"HTTP/1 write buffer size must be greater than zero"
);
self.h1_write_buffer_size = size;
self
}
/// Sets initial stream-level flow control window size for HTTP/2 connections.
///
/// See [`ServiceConfigBuilder::h2_initial_window_size`] for more details.
@ -187,6 +210,7 @@ where
secure: self.secure,
local_addr: self.local_addr,
h1_allow_half_closed: self.h1_allow_half_closed,
h1_write_buffer_size: self.h1_write_buffer_size,
h2_conn_window_size: self.h2_conn_window_size,
h2_stream_window_size: self.h2_stream_window_size,
expect: expect.into_factory(),
@ -215,6 +239,7 @@ where
secure: self.secure,
local_addr: self.local_addr,
h1_allow_half_closed: self.h1_allow_half_closed,
h1_write_buffer_size: self.h1_write_buffer_size,
h2_conn_window_size: self.h2_conn_window_size,
h2_stream_window_size: self.h2_stream_window_size,
expect: self.expect,
@ -254,6 +279,7 @@ where
.secure(self.secure)
.local_addr(self.local_addr)
.h1_allow_half_closed(self.h1_allow_half_closed)
.h1_write_buffer_size(self.h1_write_buffer_size)
.h2_initial_window_size(self.h2_stream_window_size)
.h2_initial_connection_window_size(self.h2_conn_window_size)
.build();
@ -283,6 +309,7 @@ where
.secure(self.secure)
.local_addr(self.local_addr)
.h1_allow_half_closed(self.h1_allow_half_closed)
.h1_write_buffer_size(self.h1_write_buffer_size)
.h2_initial_window_size(self.h2_stream_window_size)
.h2_initial_connection_window_size(self.h2_conn_window_size)
.build();
@ -309,6 +336,7 @@ where
.secure(self.secure)
.local_addr(self.local_addr)
.h1_allow_half_closed(self.h1_allow_half_closed)
.h1_write_buffer_size(self.h1_write_buffer_size)
.h2_initial_window_size(self.h2_stream_window_size)
.h2_initial_connection_window_size(self.h2_conn_window_size)
.build();

View File

@ -18,6 +18,9 @@ pub(crate) const DEFAULT_H2_CONN_WINDOW_SIZE: u32 = 1024 * 1024 * 2; // 2MiB
/// Matches awc's defaults to avoid poor throughput on high-BDP links.
pub(crate) const DEFAULT_H2_STREAM_WINDOW_SIZE: u32 = 1024 * 1024; // 1MiB
/// Default HTTP/1 response write buffer size.
pub(crate) const DEFAULT_H1_WRITE_BUFFER_SIZE: usize = 32_768;
/// A builder for creating a [`ServiceConfig`]
#[derive(Default, Debug)]
pub struct ServiceConfigBuilder {
@ -86,6 +89,25 @@ impl ServiceConfigBuilder {
self
}
/// Sets the maximum response write buffer size for HTTP/1 connections.
///
/// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream.
///
/// The default value is 32 KiB.
///
/// # Panics
///
/// Panics if `size` is 0.
pub fn h1_write_buffer_size(mut self, size: usize) -> Self {
assert!(
size > 0,
"HTTP/1 write buffer size must be greater than zero"
);
self.inner.h1_write_buffer_size = size;
self
}
/// Sets initial stream-level flow control window size for HTTP/2 connections.
///
/// Higher values can improve upload performance on high-latency links at the cost of higher
@ -128,6 +150,7 @@ struct Inner {
tcp_nodelay: Option<bool>,
date_service: DateService,
h1_allow_half_closed: bool,
h1_write_buffer_size: usize,
h2_conn_window_size: u32,
h2_stream_window_size: u32,
}
@ -143,6 +166,7 @@ impl Default for Inner {
tcp_nodelay: None,
date_service: DateService::new(),
h1_allow_half_closed: true,
h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE,
h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE,
h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE,
}
@ -167,6 +191,7 @@ impl ServiceConfig {
tcp_nodelay: None,
date_service: DateService::new(),
h1_allow_half_closed: true,
h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE,
h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE,
h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE,
}))
@ -228,6 +253,11 @@ impl ServiceConfig {
self.0.h1_allow_half_closed
}
/// HTTP/1 response write buffer size (in bytes).
pub fn h1_write_buffer_size(&self) -> usize {
self.0.h1_write_buffer_size
}
/// Returns configured `TCP_NODELAY` setting for accepted TCP connections.
pub fn tcp_nodelay(&self) -> Option<bool> {
self.0.tcp_nodelay

View File

@ -171,6 +171,7 @@ pin_project! {
pub(super) io: Option<T>,
read_buf: BytesMut,
write_buf: BytesMut,
h1_write_buffer_size: usize,
codec: Codec,
}
}
@ -284,6 +285,7 @@ where
io: Some(io),
read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
h1_write_buffer_size: config.h1_write_buffer_size(),
codec: Codec::new(config),
},
},
@ -618,7 +620,7 @@ where
StateProj::SendPayload { mut body } => {
// keep populate writer buffer until buffer size limit hit,
// get blocked or finished.
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
while this.write_buf.len() < *this.h1_write_buffer_size {
match body.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
this.codec
@ -680,7 +682,7 @@ where
// keep populate writer buffer until buffer size limit hit,
// get blocked or finished.
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
while this.write_buf.len() < *this.h1_write_buffer_size {
match body.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
this.codec

View File

@ -1,6 +1,9 @@
use std::{
cell::Cell,
future::Future,
io,
pin::Pin,
rc::Rc,
str,
task::{Context, Poll},
time::Duration,
@ -46,6 +49,111 @@ impl Service<Request> for YieldService {
}
}
struct ReadyChunkBody {
chunk_polls: Rc<Cell<usize>>,
remaining: usize,
chunk_len: usize,
}
impl ReadyChunkBody {
fn new(chunk_polls: Rc<Cell<usize>>, remaining: usize, chunk_len: usize) -> Self {
Self {
chunk_polls,
remaining,
chunk_len,
}
}
}
impl MessageBody for ReadyChunkBody {
type Error = Error;
fn size(&self) -> crate::body::BodySize {
crate::body::BodySize::Stream
}
fn poll_next(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.remaining == 0 {
return Poll::Ready(None);
}
self.remaining -= 1;
self.chunk_polls.set(self.chunk_polls.get() + 1);
Poll::Ready(Some(Ok(Bytes::from(vec![b'x'; self.chunk_len]))))
}
}
struct PendingOnceWriteBuf {
io: TestBuffer,
block_next_write: bool,
}
impl PendingOnceWriteBuf {
fn new<T>(data: T) -> Self
where
T: Into<BytesMut>,
{
Self {
io: TestBuffer::new(data),
block_next_write: true,
}
}
}
impl io::Read for PendingOnceWriteBuf {
fn read(&mut self, dst: &mut [u8]) -> Result<usize, io::Error> {
self.io.read(dst)
}
}
impl io::Write for PendingOnceWriteBuf {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}
impl actix_codec::AsyncRead for PendingOnceWriteBuf {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut actix_codec::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_read(cx, buf)
}
}
impl actix_codec::AsyncWrite for PendingOnceWriteBuf {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if self.block_next_write {
self.block_next_write = false;
cx.waker().wake_by_ref();
return Poll::Pending;
}
Pin::new(&mut self.io).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_shutdown(cx)
}
}
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
memchr::memmem::find(&haystack[from..], needle)
}
@ -108,6 +216,18 @@ fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, E
})
}
fn ready_chunk_body_service(
chunk_polls: Rc<Cell<usize>>,
chunk_count: usize,
chunk_len: usize,
) -> impl Service<Request, Response = Response<ReadyChunkBody>, Error = Error> {
fn_service(move |_req: Request| {
ready(Ok::<_, Error>(Response::ok().set_body(
ReadyChunkBody::new(chunk_polls.clone(), chunk_count, chunk_len),
)))
})
}
#[actix_rt::test]
async fn late_request() {
let mut buf = TestBuffer::empty();
@ -1364,6 +1484,58 @@ async fn disallow_half_closed() {
assert!(matches!(inner.state, State::ServiceCall { .. }))
}
#[actix_rt::test]
async fn h1_write_buffer_size_limits_buffering() {
let request = "GET /stream HTTP/1.1\r\nConnection: close\r\n\r\n";
let default_polls = Rc::new(Cell::new(0));
let default_services = HttpFlow::new(
ready_chunk_body_service(default_polls.clone(), 8, 1024),
ExpectHandler,
None::<UpgradeHandler>,
);
let default_io = PendingOnceWriteBuf::new(request);
let default_dispatcher = Dispatcher::new(
default_io,
default_services,
ServiceConfig::default(),
None,
OnConnectData::default(),
);
pin!(default_dispatcher);
let mut cx = Context::from_waker(futures_util::task::noop_waker_ref());
assert!(default_dispatcher.as_mut().poll(&mut cx).is_pending());
assert_eq!(default_polls.get(), 8);
let custom_polls = Rc::new(Cell::new(0));
let custom_services = HttpFlow::new(
ready_chunk_body_service(custom_polls.clone(), 8, 1024),
ExpectHandler,
None::<UpgradeHandler>,
);
let custom_io = PendingOnceWriteBuf::new(request);
let custom_dispatcher = Dispatcher::new(
custom_io,
custom_services,
crate::config::ServiceConfigBuilder::new()
.h1_write_buffer_size(1024)
.build(),
None,
OnConnectData::default(),
);
pin!(custom_dispatcher);
assert!(custom_dispatcher.as_mut().poll(&mut cx).is_pending());
assert_eq!(custom_polls.get(), 1);
}
#[actix_rt::test]
#[should_panic(expected = "HTTP/1 write buffer size must be greater than zero")]
async fn h1_write_buffer_size_rejects_zero() {
let _ = crate::config::ServiceConfigBuilder::new().h1_write_buffer_size(0);
}
fn http_msg(msg: impl AsRef<str>) -> BytesMut {
let mut msg = msg
.as_ref()

View File

@ -8,6 +8,7 @@
- Fix `HttpRequest::{match_pattern,match_name}` reporting path-only matches when route guards disambiguate overlapping resources. [#3346]
- Fix `Readlines` handling of lines split across payload chunks so combined line limits are enforced and complete lines are yielded.
- Update `rand` dependency to `0.10`.
- Add `HttpServer::h1_write_buffer_size()`.
[#3944]: https://github.com/actix/actix-web/pull/3944
[#3346]: https://github.com/actix/actix-web/issues/3346

View File

@ -33,6 +33,7 @@ struct Config {
client_request_timeout: Duration,
client_disconnect_timeout: Duration,
h1_allow_half_closed: bool,
h1_write_buffer_size: Option<usize>,
h2_initial_window_size: Option<u32>,
h2_initial_connection_window_size: Option<u32>,
#[allow(dead_code)] // only dead when no TLS features are enabled
@ -122,6 +123,7 @@ where
client_request_timeout: Duration::from_secs(5),
client_disconnect_timeout: Duration::from_secs(1),
h1_allow_half_closed: true,
h1_write_buffer_size: None,
h2_initial_window_size: None,
h2_initial_connection_window_size: None,
tls_handshake_timeout: None,
@ -286,6 +288,25 @@ where
self
}
/// Sets the maximum response write buffer size for HTTP/1 connections.
///
/// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream.
///
/// The default value is 32 KiB.
///
/// # Panics
///
/// Panics if `size` is 0.
pub fn h1_write_buffer_size(self, size: usize) -> Self {
assert!(
size > 0,
"HTTP/1 write buffer size must be greater than zero"
);
self.config.lock().unwrap().h1_write_buffer_size = Some(size);
self
}
/// Sets initial stream-level flow control window size for HTTP/2 connections.
///
/// Higher values can improve upload performance on high-latency links at the cost of higher
@ -629,6 +650,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = cfg.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = cfg.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -686,6 +711,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = cfg.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = cfg.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -774,6 +803,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = c.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -837,6 +870,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = c.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -915,6 +952,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = c.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -993,6 +1034,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = c.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -1072,6 +1117,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = c.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -1140,14 +1189,19 @@ where
.into_factory()
.map_err(|err| err.into().error_response());
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then(
HttpService::build()
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then({
let mut svc = HttpService::build()
.keep_alive(c.keep_alive)
.client_request_timeout(c.client_request_timeout)
.client_disconnect_timeout(c.client_disconnect_timeout)
.h1_allow_half_closed(c.h1_allow_half_closed)
.finish(map_config(fac, move |_| config.clone())),
)
.h1_allow_half_closed(c.h1_allow_half_closed);
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
svc.finish(map_config(fac, move |_| config.clone()))
})
},
)?;
@ -1194,6 +1248,10 @@ where
svc = svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext));
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
let fac = factory()
.into_factory()
.map_err(|err| err.into().error_response());