From 11872101a3f30db825a03db0dec07e839d01eb2b Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Wed, 18 Feb 2026 16:46:16 +0900 Subject: [PATCH] fix(http): drain unread body if chunked and dropped payload (#3936) fix(http): drain unready body if chunked and dropped payload --- actix-http/CHANGES.md | 2 + actix-http/src/h1/dispatcher.rs | 72 +++++++++------ actix-http/src/h1/dispatcher_tests.rs | 122 ++++++++++++++++++++++++++ actix-http/src/h1/payload.rs | 5 ++ 4 files changed, 174 insertions(+), 27 deletions(-) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index c8a2bbd92..25f5901df 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -8,11 +8,13 @@ - Fix truncated body ending without error when connection closed abnormally. [#3067] - Add config/method for `TCP_NODELAY`. [#3918] - Do not compress 206 Partial Content responses. [#3191] +- Fix lingering sockets and client stalls when responding early to dropped chunked request payloads. [#2972] [#3638]: https://github.com/actix/actix-web/issues/3638 [#3067]: https://github.com/actix/actix-web/pull/3067 [#3918]: https://github.com/actix/actix-web/pull/3918 [#3191]: https://github.com/actix/actix-web/issues/3191 +[#2972]: https://github.com/actix/actix-web/issues/2972 ## 3.11.2 diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index c59be2d50..2ed78cfca 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -31,7 +31,7 @@ use crate::{ config::ServiceConfig, error::{DispatchError, ParseError, PayloadError}, service::HttpFlow, - Error, Extensions, OnConnectData, Request, Response, StatusCode, + Error, Extensions, HttpMessage, OnConnectData, Request, Response, StatusCode, }; const LW_BUFFER_SIZE: usize = 1024; @@ -157,6 +157,8 @@ pin_project! { pub(super) state: State, // when Some(_) dispatcher is in state of receiving request payload payload: Option, + // true when current request uses chunked transfer encoding (drainable when payload is dropped) + payload_drainable: bool, messages: VecDeque, head_timer: TimerState, @@ -269,6 +271,7 @@ where state: State::None, payload: None, + payload_drainable: false, messages: VecDeque::new(), head_timer: TimerState::new(config.client_request_deadline().is_some()), @@ -308,7 +311,10 @@ where if self.flags.contains(Flags::READ_DISCONNECT) { false } else if let Some(ref info) = self.payload { - info.need_read(cx) == PayloadStatus::Read + matches!( + info.need_read(cx), + PayloadStatus::Read | PayloadStatus::Dropped + ) } else { true } @@ -387,8 +393,10 @@ where this.state.set(match size { BodySize::None | BodySize::Sized(0) => { let payload_unfinished = this.payload.is_some(); + let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped()) + && *this.payload_drainable; - if payload_unfinished { + if payload_unfinished && !drain_payload { this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); } else { this.flags.insert(Flags::FINISHED); @@ -412,8 +420,10 @@ where this.state.set(match size { BodySize::None | BodySize::Sized(0) => { let payload_unfinished = this.payload.is_some(); + let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped()) + && *this.payload_drainable; - if payload_unfinished { + if payload_unfinished && !drain_payload { this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); } else { this.flags.insert(Flags::FINISHED); @@ -469,8 +479,11 @@ where // all messages are dealt with None => { - // start keep-alive if last request allowed it - this.flags.set(Flags::KEEP_ALIVE, this.codec.keep_alive()); + // start keep-alive only if request payload is fully read/drained + this.flags.set( + Flags::KEEP_ALIVE, + this.payload.is_none() && this.codec.keep_alive(), + ); return Ok(PollResponse::DoNothing); } @@ -522,13 +535,16 @@ where // responding to. We can check to see if we finished reading it // yet, and if not, shutdown the connection. let payload_unfinished = this.payload.is_some(); + let drain_payload = + this.payload.as_ref().is_some_and(|pl| pl.is_dropped()) + && *this.payload_drainable; let not_pipelined = this.messages.is_empty(); // payload stream finished. // set state to None and handle next message this.state.set(State::None); - if not_pipelined && payload_unfinished { + if not_pipelined && payload_unfinished && !drain_payload { this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); } else { this.flags.insert(Flags::FINISHED); @@ -573,13 +589,16 @@ where // responding to. We can check to see if we finished reading it // yet, and if not, shutdown the connection. let payload_unfinished = this.payload.is_some(); + let drain_payload = + this.payload.as_ref().is_some_and(|pl| pl.is_dropped()) + && *this.payload_drainable; let not_pipelined = this.messages.is_empty(); // payload stream finished. // set state to None and handle next message this.state.set(State::None); - if not_pipelined && payload_unfinished { + if not_pipelined && payload_unfinished && !drain_payload { this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); } else { this.flags.insert(Flags::FINISHED); @@ -748,12 +767,13 @@ where match this.codec.message_type() { // request has no payload - MessageType::None => {} + MessageType::None => *this.payload_drainable = false, // Request is upgradable. Add upgrade message and break. // Everything remaining in read buffer will be handed to // upgraded Request. MessageType::Stream if this.flow.upgrade.is_some() => { + *this.payload_drainable = false; this.messages.push_back(DispatcherMessage::Upgrade(req)); break; } @@ -768,6 +788,7 @@ where let (sender, payload) = Payload::create(false); *req.payload() = crate::Payload::H1 { payload }; *this.payload = Some(sender); + *this.payload_drainable = req.chunked().unwrap_or(false); } } @@ -797,6 +818,7 @@ where Message::Chunk(None) => { if let Some(mut payload) = this.payload.take() { payload.feed_eof(); + *this.payload_drainable = false; } else { error!("Internal server error: unexpected eof"); this.flags.insert(Flags::READ_DISCONNECT); @@ -999,23 +1021,14 @@ where // // A Request head too large to parse is only checked on `httparse::Status::Partial`. - match this.payload { - // When dispatcher has a payload the responsibility of wake ups is shifted to - // `h1::payload::Payload` unless the payload is needing a read, in which case it - // might not have access to the waker and could result in the dispatcher - // getting stuck until timeout. - // - // Reason: - // Self wake up when there is payload would waste poll and/or result in - // over read. - // - // Case: - // When payload is (partial) dropped by user there is no need to do - // read anymore. At this case read_buf could always remain beyond - // MAX_BUFFER_SIZE and self wake up would be busy poll dispatcher and - // waste resources. - Some(ref p) if p.need_read(cx) != PayloadStatus::Read => {} - _ => cx.waker().wake_by_ref(), + match this.payload.as_ref().map(|p| p.need_read(cx)) { + // Payload consumer is alive but applying backpressure. Wait for its waker. + Some(PayloadStatus::Pause) => {} + + // Consumer dropped means drain/discard mode; keep polling to make progress. + Some(PayloadStatus::Dropped) | Some(PayloadStatus::Read) | None => { + cx.waker().wake_by_ref() + } } return Ok(false); @@ -1029,7 +1042,11 @@ where match tokio_util::io::poll_read_buf(io.as_mut(), cx, this.read_buf) { Poll::Ready(Ok(n)) => { - this.flags.remove(Flags::FINISHED); + // When draining a dropped request payload, keep FINISHED set so the + // disconnect/keep-alive decision can be made once the payload is fully drained. + if !this.payload.as_ref().is_some_and(|pl| pl.is_dropped()) { + this.flags.remove(Flags::FINISHED); + } if n == 0 { return Ok(true); @@ -1244,6 +1261,7 @@ where // disconnect if keep-alive is not enabled if inner_p.flags.contains(Flags::FINISHED) && !inner_p.flags.contains(Flags::KEEP_ALIVE) + && inner_p.payload.is_none() { inner_p.flags.remove(Flags::FINISHED); inner_p.flags.insert(Flags::SHUTDOWN); diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 49582ad8a..e3a907e5c 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -1017,6 +1017,128 @@ async fn handler_drop_payload() { .await; } +// Handler drops request payload without reading it. Server should keep reading and discarding the +// rest of the request body so clients that do not read the response until they've finished +// writing the request (like `requests` in Python) do not deadlock. +// ref. https://github.com/actix/actix-web/issues/2972 +#[actix_rt::test] +async fn handler_drop_payload_drains_body() { + let _ = env_logger::try_init(); + + let mut buf = TestSeqBuffer::new(http_msg( + r" + POST /drop-payload HTTP/1.1 + Transfer-Encoding: chunked + + ", + )); + + let services = HttpFlow::new( + drop_payload_service(), + ExpectHandler, + None::, + ); + + let h1 = Dispatcher::new( + buf.clone(), + services, + ServiceConfig::default(), + None, + OnConnectData::default(), + ); + pin!(h1); + + lazy(|cx| { + assert!(h1.as_mut().poll(cx).is_pending()); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 15 + date: Thu, 01 Jan 1970 12:34:56 UTC + + payload dropped + ", + ); + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(&exp) + ); + }) + .await; + + // stream a body larger than the dispatcher read buffer limit; it should still be drained + // (read + decoded + discarded) without stalling. + for _ in 0..32 { + let data = vec![b'a'; 8192]; + let mut chunk = BytesMut::new(); + chunk.extend_from_slice(format!("{:x}\r\n", data.len()).as_bytes()); + chunk.extend_from_slice(&data); + chunk.extend_from_slice(b"\r\n"); + + buf.extend_read_buf(chunk); + + lazy(|cx| { + assert!(h1.as_mut().poll(cx).is_pending()); + assert!(buf.take_write_buf().is_empty()); + assert!(buf.read_buf().is_empty()); + }) + .await; + } + + // terminating chunk + buf.extend_read_buf(b"0\r\n\r\n"); + + lazy(|cx| { + assert!(h1.as_mut().poll(cx).is_pending()); + assert!(buf.take_write_buf().is_empty()); + assert!(buf.read_buf().is_empty()); + }) + .await; + + // connection should be able to accept another request after draining the previous body + buf.extend_read_buf(http_msg("GET /drop-payload HTTP/1.1")); + + lazy(|cx| { + assert!(h1.as_mut().poll(cx).is_pending()); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 15 + date: Thu, 01 Jan 1970 12:34:56 UTC + + payload dropped + ", + ); + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(&exp) + ); + }) + .await; +} + #[actix_rt::test] async fn allow_half_closed() { let buf = TestSeqBuffer::new(http_msg("GET / HTTP/1.1")); diff --git a/actix-http/src/h1/payload.rs b/actix-http/src/h1/payload.rs index 92875a9db..e12c87806 100644 --- a/actix-http/src/h1/payload.rs +++ b/actix-http/src/h1/payload.rs @@ -133,6 +133,11 @@ impl PayloadSender { PayloadStatus::Dropped } } + + #[inline] + pub fn is_dropped(&self) -> bool { + self.inner.strong_count() == 0 + } } #[derive(Debug)]