From ee7e37c62f702121b3e6e13a78adb031961064fd Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 1 Jun 2025 15:48:21 +1000 Subject: [PATCH] Added max_buffer_size as a configuration item on the httpservice --- actix-http/src/builder.rs | 16 ++++++++++++++++ actix-http/src/config.rs | 10 +++++++++- actix-http/src/h1/dispatcher.rs | 6 ++++-- actix-http/src/h1/dispatcher_tests.rs | 11 ++++++++++- actix-web/src/server.rs | 15 +++++++++++++++ 5 files changed, 54 insertions(+), 4 deletions(-) diff --git a/actix-http/src/builder.rs b/actix-http/src/builder.rs index 916083a98..e06a0c008 100644 --- a/actix-http/src/builder.rs +++ b/actix-http/src/builder.rs @@ -17,6 +17,7 @@ pub struct HttpServiceBuilder { keep_alive: KeepAlive, client_request_timeout: Duration, client_disconnect_timeout: Duration, + max_buffer_size: Option, secure: bool, local_addr: Option, expect: X, @@ -38,6 +39,7 @@ where keep_alive: KeepAlive::default(), client_request_timeout: Duration::from_secs(5), client_disconnect_timeout: Duration::ZERO, + max_buffer_size: None, secure: false, local_addr: None, @@ -124,6 +126,15 @@ where self.client_disconnect_timeout(dur) } + /// Set maximum buffer size. + /// + /// Defines the maximum size of the buffer. When the size is reached, the dispatcher + /// will flush the data to the IO streams + pub fn max_buffer_size(mut self, size: usize) -> Self { + self.max_buffer_size = Some(size); + self + } + /// Provide service for `EXPECT: 100-Continue` support. /// /// Service get called with request that contains `EXPECT` header. @@ -140,6 +151,7 @@ where keep_alive: self.keep_alive, client_request_timeout: self.client_request_timeout, client_disconnect_timeout: self.client_disconnect_timeout, + max_buffer_size: self.max_buffer_size, secure: self.secure, local_addr: self.local_addr, expect: expect.into_factory(), @@ -164,6 +176,7 @@ where keep_alive: self.keep_alive, client_request_timeout: self.client_request_timeout, client_disconnect_timeout: self.client_disconnect_timeout, + max_buffer_size: self.max_buffer_size, secure: self.secure, local_addr: self.local_addr, expect: self.expect, @@ -199,6 +212,7 @@ where self.keep_alive, self.client_request_timeout, self.client_disconnect_timeout, + self.max_buffer_size, self.secure, self.local_addr, ); @@ -224,6 +238,7 @@ where self.keep_alive, self.client_request_timeout, self.client_disconnect_timeout, + self.max_buffer_size, self.secure, self.local_addr, ); @@ -246,6 +261,7 @@ where self.keep_alive, self.client_request_timeout, self.client_disconnect_timeout, + self.max_buffer_size, self.secure, self.local_addr, ); diff --git a/actix-http/src/config.rs b/actix-http/src/config.rs index b3b215da4..98895a82f 100644 --- a/actix-http/src/config.rs +++ b/actix-http/src/config.rs @@ -17,6 +17,7 @@ struct Inner { keep_alive: KeepAlive, client_request_timeout: Duration, client_disconnect_timeout: Duration, + max_buffer_size: Option, secure: bool, local_addr: Option, date_service: DateService, @@ -28,6 +29,7 @@ impl Default for ServiceConfig { KeepAlive::default(), Duration::from_secs(5), Duration::ZERO, + None, false, None, ) @@ -40,6 +42,7 @@ impl ServiceConfig { keep_alive: KeepAlive, client_request_timeout: Duration, client_disconnect_timeout: Duration, + max_buffer_size: Option, secure: bool, local_addr: Option, ) -> ServiceConfig { @@ -47,6 +50,7 @@ impl ServiceConfig { keep_alive: keep_alive.normalize(), client_request_timeout, client_disconnect_timeout, + max_buffer_size, secure, local_addr, date_service: DateService::new(), @@ -104,6 +108,10 @@ impl ServiceConfig { self.0.date_service.now() } + pub fn max_buffer_size(&self) -> Option { + self.0.max_buffer_size + } + /// Writes date header to `dst` buffer. /// /// Low-level method that utilizes the built-in efficient date service, requiring fewer syscalls @@ -144,7 +152,7 @@ mod tests { #[actix_rt::test] async fn test_date_service_update() { let settings = - ServiceConfig::new(KeepAlive::Os, Duration::ZERO, Duration::ZERO, false, None); + ServiceConfig::new(KeepAlive::Os, Duration::ZERO, Duration::ZERO, None, false, None); yield_now().await; diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 00b51360e..c9bbdae37 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -166,6 +166,7 @@ pin_project! { pub(super) io: Option, read_buf: BytesMut, write_buf: BytesMut, + max_buffer_size: usize, codec: Codec, } } @@ -278,6 +279,7 @@ where io: Some(io), read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), + max_buffer_size: config.max_buffer_size().unwrap_or(MAX_BUFFER_SIZE), codec: Codec::new(config), }, }, @@ -493,7 +495,7 @@ where StateProj::SendPayload { mut body } => { // keep populate writer buffer until buffer size limit hit, // get blocked or finished. - while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { + while this.write_buf.len() < *this.max_buffer_size /*super::payload::MAX_BUFFER_SIZE*/ { match body.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(item))) => { this.codec @@ -532,7 +534,7 @@ where // keep populate writer buffer until buffer size limit hit, // get blocked or finished. - while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { + while this.write_buf.len() < *this.max_buffer_size /*super::payload::MAX_BUFFER_SIZE*/ { match body.as_mut().poll_next(cx) { Poll::Ready(Some(Ok(item))) => { this.codec diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 50259e6ce..ff03881cd 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -10,7 +10,7 @@ use futures_util::future::lazy; use super::dispatcher::{Dispatcher, DispatcherState, DispatcherStateProj, Flags}; use crate::{ body::MessageBody, - config::ServiceConfig, + config::{ServiceConfig}, h1::{Codec, ExpectHandler, UpgradeHandler}, service::HttpFlow, test::{TestBuffer, TestSeqBuffer}, @@ -82,6 +82,7 @@ async fn late_request() { KeepAlive::Disabled, Duration::from_millis(100), Duration::ZERO, + None, false, None, ); @@ -149,6 +150,7 @@ async fn oneshot_connection() { KeepAlive::Disabled, Duration::from_millis(100), Duration::ZERO, + None, false, None, ); @@ -210,6 +212,7 @@ async fn keep_alive_timeout() { KeepAlive::Timeout(Duration::from_millis(200)), Duration::from_millis(100), Duration::ZERO, + None, false, None, ); @@ -289,6 +292,7 @@ async fn keep_alive_follow_up_req() { KeepAlive::Timeout(Duration::from_millis(500)), Duration::from_millis(100), Duration::ZERO, + None, false, None, ); @@ -453,6 +457,7 @@ async fn pipelining_ok_then_ok() { KeepAlive::Disabled, Duration::from_millis(1), Duration::from_millis(1), + None, false, None, ); @@ -523,6 +528,7 @@ async fn pipelining_ok_then_bad() { KeepAlive::Disabled, Duration::from_millis(1), Duration::from_millis(1), + None, false, None, ); @@ -586,6 +592,7 @@ async fn expect_handling() { KeepAlive::Disabled, Duration::ZERO, Duration::ZERO, + None, false, None, ); @@ -663,6 +670,7 @@ async fn expect_eager() { KeepAlive::Disabled, Duration::ZERO, Duration::ZERO, + None, false, None, ); @@ -746,6 +754,7 @@ async fn upgrade_handling() { KeepAlive::Disabled, Duration::ZERO, Duration::ZERO, + None, false, None, ); diff --git a/actix-web/src/server.rs b/actix-web/src/server.rs index 0717f5bc6..e52ad0b37 100644 --- a/actix-web/src/server.rs +++ b/actix-web/src/server.rs @@ -31,6 +31,7 @@ struct Config { keep_alive: KeepAlive, client_request_timeout: Duration, client_disconnect_timeout: Duration, + max_buffer_size: Option, #[allow(dead_code)] // only dead when no TLS features are enabled tls_handshake_timeout: Option, } @@ -116,6 +117,7 @@ where keep_alive: KeepAlive::default(), client_request_timeout: Duration::from_secs(5), client_disconnect_timeout: Duration::from_secs(1), + max_buffer_size: None, tls_handshake_timeout: None, })), backlog: 1024, @@ -234,6 +236,15 @@ where self } + /// Set maximum buffer size. + /// + /// Defines the maximum size of the write buffer. When the size is reached, the dispatcher + /// will flush the data to the IO streams + pub fn max_buffer_size(self, size: usize) -> Self { + self.config.lock().unwrap().max_buffer_size = Some(size); + self + } + /// Sets TLS handshake timeout. /// /// Defines a timeout for TLS handshake. If the TLS handshake does not complete within this @@ -560,6 +571,10 @@ where .client_disconnect_timeout(cfg.client_disconnect_timeout) .local_addr(addr); + if let Some(size) = cfg.max_buffer_size { + svc = svc.max_buffer_size(size); + }; + if let Some(handler) = on_connect_fn.clone() { svc = svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext))