Compare commits

...

4 Commits

Author SHA1 Message Date
asonix e972ead606
Merge af30b7e679 into 5041cd1c65 2025-08-29 13:43:51 +09:00
George Pollard 5041cd1c65
Make 'ws' feature of actix-http optional in actix-web (#3734)
* Make 'ws' feature of actix-http optional

* Update CHANGES.md

* Update actix-web-actors

* Update CHANGES.md

* nits

* nits

---------

Co-authored-by: Rob Ede <robjtede@icloud.com>
2025-08-29 02:50:05 +00:00
Thales d3c46537b3
fix(http): Wake Payload when feeding error or EOF (#3749)
* fix(http): Add failing tests to demonstrate the payload problem

Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com>

* fix(http): Wake Payload when feeding error or eof

Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com>

---------

Signed-off-by: Thales Fragoso <thales.fragoso@axiros.com>
2025-08-29 02:47:03 +00:00
asonix af30b7e679 actix-http: h1: stop pipelining when not reading full requests
The existing pipelining behavior of the h1 dispatcher can cause client timeouts
if the entire request body isn't read. It puts the dispatcher into a state where
it refuses to read more (payload dropped) but there are still bytes in the buffer
from the request body.

This solution adds the SHUTDOWN flag in addition to the FINISHED flag
when completing a response when both the following are true:

1. There are no messages in `this.messages`
2. There is still a payload in `this.payload`

This combination implies two things. First, that we have not parsed a
pipelined request after the request we have just responded to. Second,
that the current request payload has not been fed an EOF. Because there
are no pipelined requests, we know that the current request payload
belongs to the request we have just responded to, and because the
request payload has not been fed an EOF, we know we never finished
reading it.

When this occurs, adding the SHUTDOWN flag to the dispatcher triggers a
`flush` and a `poll_shutdown` on the IO resource on the next poll.
2025-08-07 18:01:06 -05:00
8 changed files with 135 additions and 13 deletions

View File

@ -2,6 +2,8 @@
## Unreleased
- Properly wake Payload receivers when feeding errors or EOF
## 3.11.1
- Prevent more hangs after client disconnects.

View File

@ -156,7 +156,7 @@ serde_json = "1.0"
static_assertions = "1"
tls-openssl = { package = "openssl", version = "0.10.55" }
tls-rustls_023 = { package = "rustls", version = "0.23" }
tokio = { version = "1.38.2", features = ["net", "rt", "macros"] }
tokio = { version = "1.38.2", features = ["net", "rt", "macros", "sync"] }
[lints]
workspace = true

View File

@ -386,7 +386,14 @@ where
let mut this = self.project();
this.state.set(match size {
BodySize::None | BodySize::Sized(0) => {
this.flags.insert(Flags::FINISHED);
let payload_unfinished = this.payload.is_some();
if payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
State::None
}
_ => State::SendPayload { body },
@ -404,7 +411,14 @@ where
let mut this = self.project();
this.state.set(match size {
BodySize::None | BodySize::Sized(0) => {
this.flags.insert(Flags::FINISHED);
let payload_unfinished = this.payload.is_some();
if payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
State::None
}
_ => State::SendErrorPayload { body },
@ -503,10 +517,25 @@ where
Poll::Ready(None) => {
this.codec.encode(Message::Chunk(None), this.write_buf)?;
// if we have not yet pipelined to the next request, then
// this.payload was the payload for the request we just finished
// responding to. We can check to see if we finished reading it
// yet, and if not, shutdown the connection.
let payload_unfinished = this.payload.is_some();
let not_pipelined = this.messages.is_empty();
println!("not pipelined: {not_pipelined}");
println!("payload unfinished: {payload_unfinished}");
// payload stream finished.
// set state to None and handle next message
this.state.set(State::None);
this.flags.insert(Flags::FINISHED);
if not_pipelined && payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
continue 'res;
}
@ -542,10 +571,25 @@ where
Poll::Ready(None) => {
this.codec.encode(Message::Chunk(None), this.write_buf)?;
// payload stream finished
// if we have not yet pipelined to the next request, then
// this.payload was the payload for the request we just finished
// responding to. We can check to see if we finished reading it
// yet, and if not, shutdown the connection.
let payload_unfinished = this.payload.is_some();
let not_pipelined = this.messages.is_empty();
println!("not pipelined: {not_pipelined}");
println!("payload unfinished: {payload_unfinished}");
// payload stream finished.
// set state to None and handle next message
this.state.set(State::None);
this.flags.insert(Flags::FINISHED);
if not_pipelined && payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
continue 'res;
}

View File

@ -200,11 +200,13 @@ impl Inner {
#[inline]
fn set_error(&mut self, err: PayloadError) {
self.err = Some(err);
self.wake();
}
#[inline]
fn feed_eof(&mut self) {
self.eof = true;
self.wake();
}
#[inline]
@ -253,8 +255,13 @@ impl Inner {
#[cfg(test)]
mod tests {
use std::{task::Poll, time::Duration};
use actix_rt::time::timeout;
use actix_utils::future::poll_fn;
use futures_util::{FutureExt, StreamExt};
use static_assertions::{assert_impl_all, assert_not_impl_any};
use tokio::sync::oneshot;
use super::*;
@ -263,6 +270,67 @@ mod tests {
assert_impl_all!(Inner: Unpin, Send, Sync);
const WAKE_TIMEOUT: Duration = Duration::from_secs(2);
fn prepare_waking_test(
mut payload: Payload,
expected: Option<Result<(), ()>>,
) -> (oneshot::Receiver<()>, actix_rt::task::JoinHandle<()>) {
let (tx, rx) = oneshot::channel();
let handle = actix_rt::spawn(async move {
// Make sure to poll once to set the waker
poll_fn(|cx| {
assert!(payload.poll_next_unpin(cx).is_pending());
Poll::Ready(())
})
.await;
tx.send(()).unwrap();
// actix-rt is single-threaded, so this won't race with `rx.await`
let mut pend_once = false;
poll_fn(|_| {
if pend_once {
Poll::Ready(())
} else {
// Return pending without storing wakers, we already did on the previous
// `poll_fn`, now this task will only continue if the `sender` wakes us
pend_once = true;
Poll::Pending
}
})
.await;
let got = payload.next().now_or_never().unwrap();
match expected {
Some(Ok(_)) => assert!(got.unwrap().is_ok()),
Some(Err(_)) => assert!(got.unwrap().is_err()),
None => assert!(got.is_none()),
}
});
(rx, handle)
}
#[actix_rt::test]
async fn wake_on_error() {
let (mut sender, payload) = Payload::create(false);
let (rx, handle) = prepare_waking_test(payload, Some(Err(())));
rx.await.unwrap();
sender.set_error(PayloadError::Incomplete(None));
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
}
#[actix_rt::test]
async fn wake_on_eof() {
let (mut sender, payload) = Payload::create(false);
let (rx, handle) = prepare_waking_test(payload, None);
rx.await.unwrap();
sender.feed_eof();
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
}
#[actix_rt::test]
async fn test_unread_data() {
let (_, mut payload) = Payload::create(false);

View File

@ -24,7 +24,7 @@ allowed_external_types = [
actix = { version = ">=0.12, <0.14", default-features = false }
actix-codec = "0.5"
actix-http = "3"
actix-web = { version = "4", default-features = false }
actix-web = { version = "4", default-features = false, features = ["ws"] }
bytes = "1"
bytestring = "1"

View File

@ -4,6 +4,7 @@
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now sets `Content-Type` to `application/octet-stream` if `Content-Type` does not exist.
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now calls `actix_web::response::builder::HttpResponseBuilder::no_chunking()` if `Content-Length` is set by user.
- Add `ws` crate feature (on-by-default) which forwards to `actix-http` and guards some of its `ResponseError` impls.
## 4.11.0

View File

@ -67,6 +67,7 @@ default = [
"http2",
"unicode",
"compat",
"ws",
]
# Brotli algorithm content-encoding support
@ -85,9 +86,12 @@ cookies = ["dep:cookie"]
# Secure & signed cookies
secure-cookies = ["cookies", "cookie/secure"]
# HTTP/2 support (including h2c).
# HTTP/2 support (including h2c)
http2 = ["actix-http/http2"]
# WebSocket support
ws = ["actix-http/ws"]
# TLS via OpenSSL
openssl = ["__tls", "http2", "actix-http/openssl", "actix-tls/accept", "actix-tls/openssl"]
@ -131,7 +135,7 @@ actix-service = "2"
actix-tls = { version = "3.4", default-features = false, optional = true }
actix-utils = "3"
actix-http = { version = "3.11", features = ["ws"] }
actix-http = "3.11"
actix-router = { version = "0.5.3", default-features = false, features = ["http"] }
actix-web-codegen = { version = "4.3", optional = true, default-features = false }

View File

@ -7,7 +7,6 @@ use std::{
io::{self, Write as _},
};
use actix_http::Response;
use bytes::BytesMut;
use crate::{
@ -126,20 +125,24 @@ impl ResponseError for actix_http::error::PayloadError {
}
}
impl ResponseError for actix_http::ws::ProtocolError {}
impl ResponseError for actix_http::error::ContentTypeError {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
}
#[cfg(feature = "ws")]
impl ResponseError for actix_http::ws::HandshakeError {
fn error_response(&self) -> HttpResponse<BoxBody> {
Response::from(self).map_into_boxed_body().into()
actix_http::Response::from(self)
.map_into_boxed_body()
.into()
}
}
#[cfg(feature = "ws")]
impl ResponseError for actix_http::ws::ProtocolError {}
#[cfg(test)]
mod tests {
use super::*;