Compare commits

...

6 Commits

Author SHA1 Message Date
ngrondin 5141a8f04b
Merge 616aed669e into 5041cd1c65 2025-08-29 13:17:36 +09:00
Rob Ede 616aed669e
Merge branch 'master' into master 2025-08-29 04:02:11 +01:00
George Pollard 5041cd1c65
Make 'ws' feature of actix-http optional in actix-web (#3734)
* Make 'ws' feature of actix-http optional

* Update CHANGES.md

* Update actix-web-actors

* Update CHANGES.md

* nits

* nits

---------

Co-authored-by: Rob Ede <robjtede@icloud.com>
2025-08-29 02:50:05 +00:00
Thales d3c46537b3
fix(http): Wake Payload when feeding error or EOF (#3749)
* fix(http): Add failing tests to demonstrate the payload problem

Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com>

* fix(http): Wake Payload when feeding error or eof

Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com>

---------

Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com>
2025-08-29 02:47:03 +00:00
Nicolas 9320df6339 Fix formatting 2025-06-01 18:37:55 +10:00
Nicolas ee7e37c62f Added max_buffer_size as a configuration item on the httpservice 2025-06-01 15:48:21 +10:00
12 changed files with 146 additions and 12 deletions

View File

@ -2,6 +2,8 @@
## Unreleased ## Unreleased
- Properly wake Payload receivers when feeding errors or EOF
## 3.11.1 ## 3.11.1
- Prevent more hangs after client disconnects. - Prevent more hangs after client disconnects.

View File

@ -156,7 +156,7 @@ serde_json = "1.0"
static_assertions = "1" static_assertions = "1"
tls-openssl = { package = "openssl", version = "0.10.55" } tls-openssl = { package = "openssl", version = "0.10.55" }
tls-rustls_023 = { package = "rustls", version = "0.23" } tls-rustls_023 = { package = "rustls", version = "0.23" }
tokio = { version = "1.38.2", features = ["net", "rt", "macros"] } tokio = { version = "1.38.2", features = ["net", "rt", "macros", "sync"] }
[lints] [lints]
workspace = true workspace = true

View File

@ -17,6 +17,7 @@ pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> {
keep_alive: KeepAlive, keep_alive: KeepAlive,
client_request_timeout: Duration, client_request_timeout: Duration,
client_disconnect_timeout: Duration, client_disconnect_timeout: Duration,
max_buffer_size: Option<usize>,
secure: bool, secure: bool,
local_addr: Option<net::SocketAddr>, local_addr: Option<net::SocketAddr>,
expect: X, expect: X,
@ -38,6 +39,7 @@ where
keep_alive: KeepAlive::default(), keep_alive: KeepAlive::default(),
client_request_timeout: Duration::from_secs(5), client_request_timeout: Duration::from_secs(5),
client_disconnect_timeout: Duration::ZERO, client_disconnect_timeout: Duration::ZERO,
max_buffer_size: None,
secure: false, secure: false,
local_addr: None, local_addr: None,
@ -124,6 +126,15 @@ where
self.client_disconnect_timeout(dur) self.client_disconnect_timeout(dur)
} }
/// Set maximum buffer size.
///
/// Defines the maximum size of the buffer. When the size is reached, the dispatcher
/// will flush the data to the IO streams
pub fn max_buffer_size(mut self, size: usize) -> Self {
self.max_buffer_size = Some(size);
self
}
/// Provide service for `EXPECT: 100-Continue` support. /// Provide service for `EXPECT: 100-Continue` support.
/// ///
/// Service get called with request that contains `EXPECT` header. /// Service get called with request that contains `EXPECT` header.
@ -140,6 +151,7 @@ where
keep_alive: self.keep_alive, keep_alive: self.keep_alive,
client_request_timeout: self.client_request_timeout, client_request_timeout: self.client_request_timeout,
client_disconnect_timeout: self.client_disconnect_timeout, client_disconnect_timeout: self.client_disconnect_timeout,
max_buffer_size: self.max_buffer_size,
secure: self.secure, secure: self.secure,
local_addr: self.local_addr, local_addr: self.local_addr,
expect: expect.into_factory(), expect: expect.into_factory(),
@ -164,6 +176,7 @@ where
keep_alive: self.keep_alive, keep_alive: self.keep_alive,
client_request_timeout: self.client_request_timeout, client_request_timeout: self.client_request_timeout,
client_disconnect_timeout: self.client_disconnect_timeout, client_disconnect_timeout: self.client_disconnect_timeout,
max_buffer_size: self.max_buffer_size,
secure: self.secure, secure: self.secure,
local_addr: self.local_addr, local_addr: self.local_addr,
expect: self.expect, expect: self.expect,
@ -199,6 +212,7 @@ where
self.keep_alive, self.keep_alive,
self.client_request_timeout, self.client_request_timeout,
self.client_disconnect_timeout, self.client_disconnect_timeout,
self.max_buffer_size,
self.secure, self.secure,
self.local_addr, self.local_addr,
); );
@ -224,6 +238,7 @@ where
self.keep_alive, self.keep_alive,
self.client_request_timeout, self.client_request_timeout,
self.client_disconnect_timeout, self.client_disconnect_timeout,
self.max_buffer_size,
self.secure, self.secure,
self.local_addr, self.local_addr,
); );
@ -246,6 +261,7 @@ where
self.keep_alive, self.keep_alive,
self.client_request_timeout, self.client_request_timeout,
self.client_disconnect_timeout, self.client_disconnect_timeout,
self.max_buffer_size,
self.secure, self.secure,
self.local_addr, self.local_addr,
); );

View File

@ -17,6 +17,7 @@ struct Inner {
keep_alive: KeepAlive, keep_alive: KeepAlive,
client_request_timeout: Duration, client_request_timeout: Duration,
client_disconnect_timeout: Duration, client_disconnect_timeout: Duration,
max_buffer_size: Option<usize>,
secure: bool, secure: bool,
local_addr: Option<std::net::SocketAddr>, local_addr: Option<std::net::SocketAddr>,
date_service: DateService, date_service: DateService,
@ -28,6 +29,7 @@ impl Default for ServiceConfig {
KeepAlive::default(), KeepAlive::default(),
Duration::from_secs(5), Duration::from_secs(5),
Duration::ZERO, Duration::ZERO,
None,
false, false,
None, None,
) )
@ -40,6 +42,7 @@ impl ServiceConfig {
keep_alive: KeepAlive, keep_alive: KeepAlive,
client_request_timeout: Duration, client_request_timeout: Duration,
client_disconnect_timeout: Duration, client_disconnect_timeout: Duration,
max_buffer_size: Option<usize>,
secure: bool, secure: bool,
local_addr: Option<net::SocketAddr>, local_addr: Option<net::SocketAddr>,
) -> ServiceConfig { ) -> ServiceConfig {
@ -47,6 +50,7 @@ impl ServiceConfig {
keep_alive: keep_alive.normalize(), keep_alive: keep_alive.normalize(),
client_request_timeout, client_request_timeout,
client_disconnect_timeout, client_disconnect_timeout,
max_buffer_size,
secure, secure,
local_addr, local_addr,
date_service: DateService::new(), date_service: DateService::new(),
@ -104,6 +108,10 @@ impl ServiceConfig {
self.0.date_service.now() self.0.date_service.now()
} }
pub fn max_buffer_size(&self) -> Option<usize> {
self.0.max_buffer_size
}
/// Writes date header to `dst` buffer. /// Writes date header to `dst` buffer.
/// ///
/// Low-level method that utilizes the built-in efficient date service, requiring fewer syscalls /// Low-level method that utilizes the built-in efficient date service, requiring fewer syscalls
@ -143,8 +151,14 @@ mod tests {
#[actix_rt::test] #[actix_rt::test]
async fn test_date_service_update() { async fn test_date_service_update() {
let settings = let settings = ServiceConfig::new(
ServiceConfig::new(KeepAlive::Os, Duration::ZERO, Duration::ZERO, false, None); KeepAlive::Os,
Duration::ZERO,
Duration::ZERO,
None,
false,
None,
);
yield_now().await; yield_now().await;

View File

@ -166,6 +166,7 @@ pin_project! {
pub(super) io: Option<T>, pub(super) io: Option<T>,
read_buf: BytesMut, read_buf: BytesMut,
write_buf: BytesMut, write_buf: BytesMut,
max_buffer_size: usize,
codec: Codec, codec: Codec,
} }
} }
@ -278,6 +279,7 @@ where
io: Some(io), io: Some(io),
read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
max_buffer_size: config.max_buffer_size().unwrap_or(MAX_BUFFER_SIZE),
codec: Codec::new(config), codec: Codec::new(config),
}, },
}, },
@ -493,7 +495,7 @@ where
StateProj::SendPayload { mut body } => { StateProj::SendPayload { mut body } => {
// keep populate writer buffer until buffer size limit hit, // keep populate writer buffer until buffer size limit hit,
// get blocked or finished. // get blocked or finished.
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { while this.write_buf.len() < *this.max_buffer_size {
match body.as_mut().poll_next(cx) { match body.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => { Poll::Ready(Some(Ok(item))) => {
this.codec this.codec
@ -532,7 +534,7 @@ where
// keep populate writer buffer until buffer size limit hit, // keep populate writer buffer until buffer size limit hit,
// get blocked or finished. // get blocked or finished.
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { while this.write_buf.len() < *this.max_buffer_size {
match body.as_mut().poll_next(cx) { match body.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => { Poll::Ready(Some(Ok(item))) => {
this.codec this.codec

View File

@ -82,6 +82,7 @@ async fn late_request() {
KeepAlive::Disabled, KeepAlive::Disabled,
Duration::from_millis(100), Duration::from_millis(100),
Duration::ZERO, Duration::ZERO,
None,
false, false,
None, None,
); );
@ -149,6 +150,7 @@ async fn oneshot_connection() {
KeepAlive::Disabled, KeepAlive::Disabled,
Duration::from_millis(100), Duration::from_millis(100),
Duration::ZERO, Duration::ZERO,
None,
false, false,
None, None,
); );
@ -210,6 +212,7 @@ async fn keep_alive_timeout() {
KeepAlive::Timeout(Duration::from_millis(200)), KeepAlive::Timeout(Duration::from_millis(200)),
Duration::from_millis(100), Duration::from_millis(100),
Duration::ZERO, Duration::ZERO,
None,
false, false,
None, None,
); );
@ -289,6 +292,7 @@ async fn keep_alive_follow_up_req() {
KeepAlive::Timeout(Duration::from_millis(500)), KeepAlive::Timeout(Duration::from_millis(500)),
Duration::from_millis(100), Duration::from_millis(100),
Duration::ZERO, Duration::ZERO,
None,
false, false,
None, None,
); );
@ -453,6 +457,7 @@ async fn pipelining_ok_then_ok() {
KeepAlive::Disabled, KeepAlive::Disabled,
Duration::from_millis(1), Duration::from_millis(1),
Duration::from_millis(1), Duration::from_millis(1),
None,
false, false,
None, None,
); );
@ -523,6 +528,7 @@ async fn pipelining_ok_then_bad() {
KeepAlive::Disabled, KeepAlive::Disabled,
Duration::from_millis(1), Duration::from_millis(1),
Duration::from_millis(1), Duration::from_millis(1),
None,
false, false,
None, None,
); );
@ -586,6 +592,7 @@ async fn expect_handling() {
KeepAlive::Disabled, KeepAlive::Disabled,
Duration::ZERO, Duration::ZERO,
Duration::ZERO, Duration::ZERO,
None,
false, false,
None, None,
); );
@ -663,6 +670,7 @@ async fn expect_eager() {
KeepAlive::Disabled, KeepAlive::Disabled,
Duration::ZERO, Duration::ZERO,
Duration::ZERO, Duration::ZERO,
None,
false, false,
None, None,
); );
@ -746,6 +754,7 @@ async fn upgrade_handling() {
KeepAlive::Disabled, KeepAlive::Disabled,
Duration::ZERO, Duration::ZERO,
Duration::ZERO, Duration::ZERO,
None,
false, false,
None, None,
); );

View File

@ -200,11 +200,13 @@ impl Inner {
#[inline] #[inline]
fn set_error(&mut self, err: PayloadError) { fn set_error(&mut self, err: PayloadError) {
self.err = Some(err); self.err = Some(err);
self.wake();
} }
#[inline] #[inline]
fn feed_eof(&mut self) { fn feed_eof(&mut self) {
self.eof = true; self.eof = true;
self.wake();
} }
#[inline] #[inline]
@ -253,8 +255,13 @@ impl Inner {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::{task::Poll, time::Duration};
use actix_rt::time::timeout;
use actix_utils::future::poll_fn; use actix_utils::future::poll_fn;
use futures_util::{FutureExt, StreamExt};
use static_assertions::{assert_impl_all, assert_not_impl_any}; use static_assertions::{assert_impl_all, assert_not_impl_any};
use tokio::sync::oneshot;
use super::*; use super::*;
@ -263,6 +270,67 @@ mod tests {
assert_impl_all!(Inner: Unpin, Send, Sync); assert_impl_all!(Inner: Unpin, Send, Sync);
const WAKE_TIMEOUT: Duration = Duration::from_secs(2);
fn prepare_waking_test(
mut payload: Payload,
expected: Option<Result<(), ()>>,
) -> (oneshot::Receiver<()>, actix_rt::task::JoinHandle<()>) {
let (tx, rx) = oneshot::channel();
let handle = actix_rt::spawn(async move {
// Make sure to poll once to set the waker
poll_fn(|cx| {
assert!(payload.poll_next_unpin(cx).is_pending());
Poll::Ready(())
})
.await;
tx.send(()).unwrap();
// actix-rt is single-threaded, so this won't race with `rx.await`
let mut pend_once = false;
poll_fn(|_| {
if pend_once {
Poll::Ready(())
} else {
// Return pending without storing wakers, we already did on the previous
// `poll_fn`, now this task will only continue if the `sender` wakes us
pend_once = true;
Poll::Pending
}
})
.await;
let got = payload.next().now_or_never().unwrap();
match expected {
Some(Ok(_)) => assert!(got.unwrap().is_ok()),
Some(Err(_)) => assert!(got.unwrap().is_err()),
None => assert!(got.is_none()),
}
});
(rx, handle)
}
#[actix_rt::test]
async fn wake_on_error() {
let (mut sender, payload) = Payload::create(false);
let (rx, handle) = prepare_waking_test(payload, Some(Err(())));
rx.await.unwrap();
sender.set_error(PayloadError::Incomplete(None));
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
}
#[actix_rt::test]
async fn wake_on_eof() {
let (mut sender, payload) = Payload::create(false);
let (rx, handle) = prepare_waking_test(payload, None);
rx.await.unwrap();
sender.feed_eof();
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
}
#[actix_rt::test] #[actix_rt::test]
async fn test_unread_data() { async fn test_unread_data() {
let (_, mut payload) = Payload::create(false); let (_, mut payload) = Payload::create(false);

View File

@ -24,7 +24,7 @@ allowed_external_types = [
actix = { version = ">=0.12, <0.14", default-features = false } actix = { version = ">=0.12, <0.14", default-features = false }
actix-codec = "0.5" actix-codec = "0.5"
actix-http = "3" actix-http = "3"
actix-web = { version = "4", default-features = false } actix-web = { version = "4", default-features = false, features = ["ws"] }
bytes = "1" bytes = "1"
bytestring = "1" bytestring = "1"

View File

@ -4,6 +4,7 @@
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now sets `Content-Type` to `application/octet-stream` if `Content-Type` does not exist. - `actix_web::response::builder::HttpResponseBuilder::streaming()` now sets `Content-Type` to `application/octet-stream` if `Content-Type` does not exist.
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now calls `actix_web::response::builder::HttpResponseBuilder::no_chunking()` if `Content-Length` is set by user. - `actix_web::response::builder::HttpResponseBuilder::streaming()` now calls `actix_web::response::builder::HttpResponseBuilder::no_chunking()` if `Content-Length` is set by user.
- Add `ws` crate feature (on-by-default) which forwards to `actix-http` and guards some of its `ResponseError` impls.
## 4.11.0 ## 4.11.0

View File

@ -67,6 +67,7 @@ default = [
"http2", "http2",
"unicode", "unicode",
"compat", "compat",
"ws",
] ]
# Brotli algorithm content-encoding support # Brotli algorithm content-encoding support
@ -85,9 +86,12 @@ cookies = ["dep:cookie"]
# Secure & signed cookies # Secure & signed cookies
secure-cookies = ["cookies", "cookie/secure"] secure-cookies = ["cookies", "cookie/secure"]
# HTTP/2 support (including h2c). # HTTP/2 support (including h2c)
http2 = ["actix-http/http2"] http2 = ["actix-http/http2"]
# WebSocket support
ws = ["actix-http/ws"]
# TLS via OpenSSL # TLS via OpenSSL
openssl = ["__tls", "http2", "actix-http/openssl", "actix-tls/accept", "actix-tls/openssl"] openssl = ["__tls", "http2", "actix-http/openssl", "actix-tls/accept", "actix-tls/openssl"]
@ -131,7 +135,7 @@ actix-service = "2"
actix-tls = { version = "3.4", default-features = false, optional = true } actix-tls = { version = "3.4", default-features = false, optional = true }
actix-utils = "3" actix-utils = "3"
actix-http = { version = "3.11", features = ["ws"] } actix-http = "3.11"
actix-router = { version = "0.5.3", default-features = false, features = ["http"] } actix-router = { version = "0.5.3", default-features = false, features = ["http"] }
actix-web-codegen = { version = "4.3", optional = true, default-features = false } actix-web-codegen = { version = "4.3", optional = true, default-features = false }

View File

@ -7,7 +7,6 @@ use std::{
io::{self, Write as _}, io::{self, Write as _},
}; };
use actix_http::Response;
use bytes::BytesMut; use bytes::BytesMut;
use crate::{ use crate::{
@ -126,20 +125,24 @@ impl ResponseError for actix_http::error::PayloadError {
} }
} }
impl ResponseError for actix_http::ws::ProtocolError {}
impl ResponseError for actix_http::error::ContentTypeError { impl ResponseError for actix_http::error::ContentTypeError {
fn status_code(&self) -> StatusCode { fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST StatusCode::BAD_REQUEST
} }
} }
#[cfg(feature = "ws")]
impl ResponseError for actix_http::ws::HandshakeError { impl ResponseError for actix_http::ws::HandshakeError {
fn error_response(&self) -> HttpResponse<BoxBody> { fn error_response(&self) -> HttpResponse<BoxBody> {
Response::from(self).map_into_boxed_body().into() actix_http::Response::from(self)
.map_into_boxed_body()
.into()
} }
} }
#[cfg(feature = "ws")]
impl ResponseError for actix_http::ws::ProtocolError {}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -31,6 +31,7 @@ struct Config {
keep_alive: KeepAlive, keep_alive: KeepAlive,
client_request_timeout: Duration, client_request_timeout: Duration,
client_disconnect_timeout: Duration, client_disconnect_timeout: Duration,
max_buffer_size: Option<usize>,
#[allow(dead_code)] // only dead when no TLS features are enabled #[allow(dead_code)] // only dead when no TLS features are enabled
tls_handshake_timeout: Option<Duration>, tls_handshake_timeout: Option<Duration>,
} }
@ -116,6 +117,7 @@ where
keep_alive: KeepAlive::default(), keep_alive: KeepAlive::default(),
client_request_timeout: Duration::from_secs(5), client_request_timeout: Duration::from_secs(5),
client_disconnect_timeout: Duration::from_secs(1), client_disconnect_timeout: Duration::from_secs(1),
max_buffer_size: None,
tls_handshake_timeout: None, tls_handshake_timeout: None,
})), })),
backlog: 1024, backlog: 1024,
@ -234,6 +236,15 @@ where
self self
} }
/// Set maximum buffer size.
///
/// Defines the maximum size of the write buffer. When the size is reached, the dispatcher
/// will flush the data to the IO streams
pub fn max_buffer_size(self, size: usize) -> Self {
self.config.lock().unwrap().max_buffer_size = Some(size);
self
}
/// Sets TLS handshake timeout. /// Sets TLS handshake timeout.
/// ///
/// Defines a timeout for TLS handshake. If the TLS handshake does not complete within this /// Defines a timeout for TLS handshake. If the TLS handshake does not complete within this
@ -560,6 +571,10 @@ where
.client_disconnect_timeout(cfg.client_disconnect_timeout) .client_disconnect_timeout(cfg.client_disconnect_timeout)
.local_addr(addr); .local_addr(addr);
if let Some(size) = cfg.max_buffer_size {
svc = svc.max_buffer_size(size);
};
if let Some(handler) = on_connect_fn.clone() { if let Some(handler) = on_connect_fn.clone() {
svc = svc =
svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext)) svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext))