fix(http): drain unread body if chunked and dropped payload (#3936)

fix(http): drain unready body if chunked and dropped payload
This commit is contained in:
Yuki Okushi 2026-02-18 16:46:16 +09:00 committed by GitHub
parent f120479030
commit 11872101a3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 174 additions and 27 deletions

View File

@ -8,11 +8,13 @@
- Fix truncated body ending without error when connection closed abnormally. [#3067] - Fix truncated body ending without error when connection closed abnormally. [#3067]
- Add config/method for `TCP_NODELAY`. [#3918] - Add config/method for `TCP_NODELAY`. [#3918]
- Do not compress 206 Partial Content responses. [#3191] - Do not compress 206 Partial Content responses. [#3191]
- Fix lingering sockets and client stalls when responding early to dropped chunked request payloads. [#2972]
[#3638]: https://github.com/actix/actix-web/issues/3638 [#3638]: https://github.com/actix/actix-web/issues/3638
[#3067]: https://github.com/actix/actix-web/pull/3067 [#3067]: https://github.com/actix/actix-web/pull/3067
[#3918]: https://github.com/actix/actix-web/pull/3918 [#3918]: https://github.com/actix/actix-web/pull/3918
[#3191]: https://github.com/actix/actix-web/issues/3191 [#3191]: https://github.com/actix/actix-web/issues/3191
[#2972]: https://github.com/actix/actix-web/issues/2972
## 3.11.2 ## 3.11.2

View File

@ -31,7 +31,7 @@ use crate::{
config::ServiceConfig, config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError}, error::{DispatchError, ParseError, PayloadError},
service::HttpFlow, service::HttpFlow,
Error, Extensions, OnConnectData, Request, Response, StatusCode, Error, Extensions, HttpMessage, OnConnectData, Request, Response, StatusCode,
}; };
const LW_BUFFER_SIZE: usize = 1024; const LW_BUFFER_SIZE: usize = 1024;
@ -157,6 +157,8 @@ pin_project! {
pub(super) state: State<S, B, X>, pub(super) state: State<S, B, X>,
// when Some(_) dispatcher is in state of receiving request payload // when Some(_) dispatcher is in state of receiving request payload
payload: Option<PayloadSender>, payload: Option<PayloadSender>,
// true when current request uses chunked transfer encoding (drainable when payload is dropped)
payload_drainable: bool,
messages: VecDeque<DispatcherMessage>, messages: VecDeque<DispatcherMessage>,
head_timer: TimerState, head_timer: TimerState,
@ -269,6 +271,7 @@ where
state: State::None, state: State::None,
payload: None, payload: None,
payload_drainable: false,
messages: VecDeque::new(), messages: VecDeque::new(),
head_timer: TimerState::new(config.client_request_deadline().is_some()), head_timer: TimerState::new(config.client_request_deadline().is_some()),
@ -308,7 +311,10 @@ where
if self.flags.contains(Flags::READ_DISCONNECT) { if self.flags.contains(Flags::READ_DISCONNECT) {
false false
} else if let Some(ref info) = self.payload { } else if let Some(ref info) = self.payload {
info.need_read(cx) == PayloadStatus::Read matches!(
info.need_read(cx),
PayloadStatus::Read | PayloadStatus::Dropped
)
} else { } else {
true true
} }
@ -387,8 +393,10 @@ where
this.state.set(match size { this.state.set(match size {
BodySize::None | BodySize::Sized(0) => { BodySize::None | BodySize::Sized(0) => {
let payload_unfinished = this.payload.is_some(); let payload_unfinished = this.payload.is_some();
let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
&& *this.payload_drainable;
if payload_unfinished { if payload_unfinished && !drain_payload {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else { } else {
this.flags.insert(Flags::FINISHED); this.flags.insert(Flags::FINISHED);
@ -412,8 +420,10 @@ where
this.state.set(match size { this.state.set(match size {
BodySize::None | BodySize::Sized(0) => { BodySize::None | BodySize::Sized(0) => {
let payload_unfinished = this.payload.is_some(); let payload_unfinished = this.payload.is_some();
let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
&& *this.payload_drainable;
if payload_unfinished { if payload_unfinished && !drain_payload {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else { } else {
this.flags.insert(Flags::FINISHED); this.flags.insert(Flags::FINISHED);
@ -469,8 +479,11 @@ where
// all messages are dealt with // all messages are dealt with
None => { None => {
// start keep-alive if last request allowed it // start keep-alive only if request payload is fully read/drained
this.flags.set(Flags::KEEP_ALIVE, this.codec.keep_alive()); this.flags.set(
Flags::KEEP_ALIVE,
this.payload.is_none() && this.codec.keep_alive(),
);
return Ok(PollResponse::DoNothing); return Ok(PollResponse::DoNothing);
} }
@ -522,13 +535,16 @@ where
// responding to. We can check to see if we finished reading it // responding to. We can check to see if we finished reading it
// yet, and if not, shutdown the connection. // yet, and if not, shutdown the connection.
let payload_unfinished = this.payload.is_some(); let payload_unfinished = this.payload.is_some();
let drain_payload =
this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
&& *this.payload_drainable;
let not_pipelined = this.messages.is_empty(); let not_pipelined = this.messages.is_empty();
// payload stream finished. // payload stream finished.
// set state to None and handle next message // set state to None and handle next message
this.state.set(State::None); this.state.set(State::None);
if not_pipelined && payload_unfinished { if not_pipelined && payload_unfinished && !drain_payload {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else { } else {
this.flags.insert(Flags::FINISHED); this.flags.insert(Flags::FINISHED);
@ -573,13 +589,16 @@ where
// responding to. We can check to see if we finished reading it // responding to. We can check to see if we finished reading it
// yet, and if not, shutdown the connection. // yet, and if not, shutdown the connection.
let payload_unfinished = this.payload.is_some(); let payload_unfinished = this.payload.is_some();
let drain_payload =
this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
&& *this.payload_drainable;
let not_pipelined = this.messages.is_empty(); let not_pipelined = this.messages.is_empty();
// payload stream finished. // payload stream finished.
// set state to None and handle next message // set state to None and handle next message
this.state.set(State::None); this.state.set(State::None);
if not_pipelined && payload_unfinished { if not_pipelined && payload_unfinished && !drain_payload {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else { } else {
this.flags.insert(Flags::FINISHED); this.flags.insert(Flags::FINISHED);
@ -748,12 +767,13 @@ where
match this.codec.message_type() { match this.codec.message_type() {
// request has no payload // request has no payload
MessageType::None => {} MessageType::None => *this.payload_drainable = false,
// Request is upgradable. Add upgrade message and break. // Request is upgradable. Add upgrade message and break.
// Everything remaining in read buffer will be handed to // Everything remaining in read buffer will be handed to
// upgraded Request. // upgraded Request.
MessageType::Stream if this.flow.upgrade.is_some() => { MessageType::Stream if this.flow.upgrade.is_some() => {
*this.payload_drainable = false;
this.messages.push_back(DispatcherMessage::Upgrade(req)); this.messages.push_back(DispatcherMessage::Upgrade(req));
break; break;
} }
@ -768,6 +788,7 @@ where
let (sender, payload) = Payload::create(false); let (sender, payload) = Payload::create(false);
*req.payload() = crate::Payload::H1 { payload }; *req.payload() = crate::Payload::H1 { payload };
*this.payload = Some(sender); *this.payload = Some(sender);
*this.payload_drainable = req.chunked().unwrap_or(false);
} }
} }
@ -797,6 +818,7 @@ where
Message::Chunk(None) => { Message::Chunk(None) => {
if let Some(mut payload) = this.payload.take() { if let Some(mut payload) = this.payload.take() {
payload.feed_eof(); payload.feed_eof();
*this.payload_drainable = false;
} else { } else {
error!("Internal server error: unexpected eof"); error!("Internal server error: unexpected eof");
this.flags.insert(Flags::READ_DISCONNECT); this.flags.insert(Flags::READ_DISCONNECT);
@ -999,23 +1021,14 @@ where
// //
// A Request head too large to parse is only checked on `httparse::Status::Partial`. // A Request head too large to parse is only checked on `httparse::Status::Partial`.
match this.payload { match this.payload.as_ref().map(|p| p.need_read(cx)) {
// When dispatcher has a payload the responsibility of wake ups is shifted to // Payload consumer is alive but applying backpressure. Wait for its waker.
// `h1::payload::Payload` unless the payload is needing a read, in which case it Some(PayloadStatus::Pause) => {}
// might not have access to the waker and could result in the dispatcher
// getting stuck until timeout. // Consumer dropped means drain/discard mode; keep polling to make progress.
// Some(PayloadStatus::Dropped) | Some(PayloadStatus::Read) | None => {
// Reason: cx.waker().wake_by_ref()
// Self wake up when there is payload would waste poll and/or result in }
// over read.
//
// Case:
// When payload is (partial) dropped by user there is no need to do
// read anymore. At this case read_buf could always remain beyond
// MAX_BUFFER_SIZE and self wake up would be busy poll dispatcher and
// waste resources.
Some(ref p) if p.need_read(cx) != PayloadStatus::Read => {}
_ => cx.waker().wake_by_ref(),
} }
return Ok(false); return Ok(false);
@ -1029,7 +1042,11 @@ where
match tokio_util::io::poll_read_buf(io.as_mut(), cx, this.read_buf) { match tokio_util::io::poll_read_buf(io.as_mut(), cx, this.read_buf) {
Poll::Ready(Ok(n)) => { Poll::Ready(Ok(n)) => {
this.flags.remove(Flags::FINISHED); // When draining a dropped request payload, keep FINISHED set so the
// disconnect/keep-alive decision can be made once the payload is fully drained.
if !this.payload.as_ref().is_some_and(|pl| pl.is_dropped()) {
this.flags.remove(Flags::FINISHED);
}
if n == 0 { if n == 0 {
return Ok(true); return Ok(true);
@ -1244,6 +1261,7 @@ where
// disconnect if keep-alive is not enabled // disconnect if keep-alive is not enabled
if inner_p.flags.contains(Flags::FINISHED) if inner_p.flags.contains(Flags::FINISHED)
&& !inner_p.flags.contains(Flags::KEEP_ALIVE) && !inner_p.flags.contains(Flags::KEEP_ALIVE)
&& inner_p.payload.is_none()
{ {
inner_p.flags.remove(Flags::FINISHED); inner_p.flags.remove(Flags::FINISHED);
inner_p.flags.insert(Flags::SHUTDOWN); inner_p.flags.insert(Flags::SHUTDOWN);

View File

@ -1017,6 +1017,128 @@ async fn handler_drop_payload() {
.await; .await;
} }
// Handler drops request payload without reading it. Server should keep reading and discarding the
// rest of the request body so clients that do not read the response until they've finished
// writing the request (like `requests` in Python) do not deadlock.
// ref. https://github.com/actix/actix-web/issues/2972
#[actix_rt::test]
async fn handler_drop_payload_drains_body() {
let _ = env_logger::try_init();
let mut buf = TestSeqBuffer::new(http_msg(
r"
POST /drop-payload HTTP/1.1
Transfer-Encoding: chunked
",
));
let services = HttpFlow::new(
drop_payload_service(),
ExpectHandler,
None::<UpgradeHandler>,
);
let h1 = Dispatcher::new(
buf.clone(),
services,
ServiceConfig::default(),
None,
OnConnectData::default(),
);
pin!(h1);
lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_pending());
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 15
date: Thu, 01 Jan 1970 12:34:56 UTC
payload dropped
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);
})
.await;
// stream a body larger than the dispatcher read buffer limit; it should still be drained
// (read + decoded + discarded) without stalling.
for _ in 0..32 {
let data = vec![b'a'; 8192];
let mut chunk = BytesMut::new();
chunk.extend_from_slice(format!("{:x}\r\n", data.len()).as_bytes());
chunk.extend_from_slice(&data);
chunk.extend_from_slice(b"\r\n");
buf.extend_read_buf(chunk);
lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_pending());
assert!(buf.take_write_buf().is_empty());
assert!(buf.read_buf().is_empty());
})
.await;
}
// terminating chunk
buf.extend_read_buf(b"0\r\n\r\n");
lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_pending());
assert!(buf.take_write_buf().is_empty());
assert!(buf.read_buf().is_empty());
})
.await;
// connection should be able to accept another request after draining the previous body
buf.extend_read_buf(http_msg("GET /drop-payload HTTP/1.1"));
lazy(|cx| {
assert!(h1.as_mut().poll(cx).is_pending());
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 15
date: Thu, 01 Jan 1970 12:34:56 UTC
payload dropped
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);
})
.await;
}
#[actix_rt::test] #[actix_rt::test]
async fn allow_half_closed() { async fn allow_half_closed() {
let buf = TestSeqBuffer::new(http_msg("GET / HTTP/1.1")); let buf = TestSeqBuffer::new(http_msg("GET / HTTP/1.1"));

View File

@ -133,6 +133,11 @@ impl PayloadSender {
PayloadStatus::Dropped PayloadStatus::Dropped
} }
} }
#[inline]
pub fn is_dropped(&self) -> bool {
self.inner.strong_count() == 0
}
} }
#[derive(Debug)] #[derive(Debug)]