From d3c46537b34b5de60443797e649d043d9b6c1358 Mon Sep 17 00:00:00 2001 From: Thales Date: Thu, 28 Aug 2025 23:47:03 -0300 Subject: [PATCH] fix(http): Wake Payload when feeding error or EOF (#3749) * fix(http): Add failing tests to demonstrate the payload problem Signed-off-by: Thales Fragoso * fix(http): Wake Payload when feeding error or eof Signed-off-by: Thales Fragoso --------- Signed-off-by: Thales Fragoso --- actix-http/CHANGES.md | 2 ++ actix-http/Cargo.toml | 2 +- actix-http/src/h1/payload.rs | 68 ++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 1 deletion(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 5d59f7e52..718f67922 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -2,6 +2,8 @@ ## Unreleased +- Properly wake Payload receivers when feeding errors or EOF + ## 3.11.1 - Prevent more hangs after client disconnects. diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index 2725e944b..e28e6c400 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -156,7 +156,7 @@ serde_json = "1.0" static_assertions = "1" tls-openssl = { package = "openssl", version = "0.10.55" } tls-rustls_023 = { package = "rustls", version = "0.23" } -tokio = { version = "1.38.2", features = ["net", "rt", "macros"] } +tokio = { version = "1.38.2", features = ["net", "rt", "macros", "sync"] } [lints] workspace = true diff --git a/actix-http/src/h1/payload.rs b/actix-http/src/h1/payload.rs index 2ad3a14a3..d478c677a 100644 --- a/actix-http/src/h1/payload.rs +++ b/actix-http/src/h1/payload.rs @@ -200,11 +200,13 @@ impl Inner { #[inline] fn set_error(&mut self, err: PayloadError) { self.err = Some(err); + self.wake(); } #[inline] fn feed_eof(&mut self) { self.eof = true; + self.wake(); } #[inline] @@ -253,8 +255,13 @@ impl Inner { #[cfg(test)] mod tests { + use std::{task::Poll, time::Duration}; + + use actix_rt::time::timeout; use actix_utils::future::poll_fn; + use futures_util::{FutureExt, StreamExt}; use static_assertions::{assert_impl_all, assert_not_impl_any}; + use tokio::sync::oneshot; use super::*; @@ -263,6 +270,67 @@ mod tests { assert_impl_all!(Inner: Unpin, Send, Sync); + const WAKE_TIMEOUT: Duration = Duration::from_secs(2); + + fn prepare_waking_test( + mut payload: Payload, + expected: Option>, + ) -> (oneshot::Receiver<()>, actix_rt::task::JoinHandle<()>) { + let (tx, rx) = oneshot::channel(); + + let handle = actix_rt::spawn(async move { + // Make sure to poll once to set the waker + poll_fn(|cx| { + assert!(payload.poll_next_unpin(cx).is_pending()); + Poll::Ready(()) + }) + .await; + tx.send(()).unwrap(); + + // actix-rt is single-threaded, so this won't race with `rx.await` + let mut pend_once = false; + poll_fn(|_| { + if pend_once { + Poll::Ready(()) + } else { + // Return pending without storing wakers, we already did on the previous + // `poll_fn`, now this task will only continue if the `sender` wakes us + pend_once = true; + Poll::Pending + } + }) + .await; + + let got = payload.next().now_or_never().unwrap(); + match expected { + Some(Ok(_)) => assert!(got.unwrap().is_ok()), + Some(Err(_)) => assert!(got.unwrap().is_err()), + None => assert!(got.is_none()), + } + }); + (rx, handle) + } + + #[actix_rt::test] + async fn wake_on_error() { + let (mut sender, payload) = Payload::create(false); + let (rx, handle) = prepare_waking_test(payload, Some(Err(()))); + + rx.await.unwrap(); + sender.set_error(PayloadError::Incomplete(None)); + timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap(); + } + + #[actix_rt::test] + async fn wake_on_eof() { + let (mut sender, payload) = Payload::create(false); + let (rx, handle) = prepare_waking_test(payload, None); + + rx.await.unwrap(); + sender.feed_eof(); + timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap(); + } + #[actix_rt::test] async fn test_unread_data() { let (_, mut payload) = Payload::create(false);