From d3f8f7c854887189d6b1a4b8d2501eb0f6c0515a Mon Sep 17 00:00:00 2001 From: Simon Hornby Date: Tue, 5 Aug 2025 12:22:29 +0200 Subject: [PATCH 1/3] fix: force feed eof when there's a hanging body --- actix-http/src/h1/dispatcher.rs | 20 ++++++-------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 00b51360e..88f3bb361 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -20,18 +20,18 @@ use tokio_util::codec::{Decoder as _, Encoder as _}; use tracing::{error, trace}; use super::{ + Message, MessageType, codec::Codec, decoder::MAX_BUFFER_SIZE, payload::{Payload, PayloadSender, PayloadStatus}, timer::TimerState, - Message, MessageType, }; use crate::{ + Error, Extensions, OnConnectData, Request, Response, StatusCode, body::{BodySize, BoxBody, MessageBody}, config::ServiceConfig, error::{DispatchError, ParseError, PayloadError}, service::HttpFlow, - Error, Extensions, OnConnectData, Request, Response, StatusCode, }; const LW_BUFFER_SIZE: usize = 1024; @@ -236,16 +236,12 @@ enum PollResponse { impl Dispatcher where T: AsyncRead + AsyncWrite + Unpin, - S: Service, S::Error: Into>, S::Response: Into>, - B: MessageBody, - X: Service, X::Error: Into>, - U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { @@ -291,16 +287,12 @@ where impl InnerDispatcher where T: AsyncRead + AsyncWrite + Unpin, - S: Service, S::Error: Into>, S::Response: Into>, - B: MessageBody, - X: Service, X::Error: Into>, - U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { @@ -654,6 +646,10 @@ where // to notify the dispatcher a new state is set and the outer loop // should be continue. Poll::Ready(Ok(res)) => { + let this = self.as_mut().project(); + if let Some(mut payload) = this.payload.take() { + payload.feed_eof(); + } let (res, body) = res.into().replace_body(()); self.as_mut().send_response(res, body) } @@ -1036,16 +1032,12 @@ where impl Future for Dispatcher where T: AsyncRead + AsyncWrite + Unpin, - S: Service, S::Error: Into>, S::Response: Into>, - B: MessageBody, - X: Service, X::Error: Into>, - U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { From a816f617d1deece4f323c73ea4a12a51fd3f736b Mon Sep 17 00:00:00 2001 From: Simon Hornby Date: Tue, 5 Aug 2025 13:45:45 +0200 Subject: [PATCH 2/3] fix: finalize hanging payloads --- actix-http/src/h1/dispatcher.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 88f3bb361..86e4d2c76 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -296,6 +296,14 @@ where U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { + fn finalize_payload_if_present(mut self: Pin<&mut Self>, reason: &str) { + let this = self.as_mut().project(); + if let Some(mut payload) = this.payload.take() { + trace!("Finalizing payload early: {reason}"); + payload.feed_eof(); + } + } + fn can_read(&self, cx: &mut Context<'_>) -> bool { if self.flags.contains(Flags::READ_DISCONNECT) { false @@ -682,6 +690,11 @@ where // limit amount of non-processed requests if pipeline_queue_full || can_not_read { + // since we're here, it's possible the client has been sent a response before we've been able to read the body + // in this case, we should eof the payload to prevent the next request from reading invalid data + // this can occur with certain load balancers that pipeline requests + self.as_mut() + .finalize_payload_if_present("pipeline queue full or cannot read"); return Ok(false); } From 9b6818694699255697f5c5f80f3ea612b76da2f1 Mon Sep 17 00:00:00 2001 From: Simon Hornby Date: Tue, 5 Aug 2025 13:47:23 +0200 Subject: [PATCH 3/3] chore: reduce down to minimal fix --- actix-http/src/h1/dispatcher.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 86e4d2c76..ed8fcc91c 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -20,18 +20,18 @@ use tokio_util::codec::{Decoder as _, Encoder as _}; use tracing::{error, trace}; use super::{ - Message, MessageType, codec::Codec, decoder::MAX_BUFFER_SIZE, payload::{Payload, PayloadSender, PayloadStatus}, timer::TimerState, + Message, MessageType, }; use crate::{ - Error, Extensions, OnConnectData, Request, Response, StatusCode, body::{BodySize, BoxBody, MessageBody}, config::ServiceConfig, error::{DispatchError, ParseError, PayloadError}, service::HttpFlow, + Error, Extensions, OnConnectData, Request, Response, StatusCode, }; const LW_BUFFER_SIZE: usize = 1024; @@ -236,12 +236,16 @@ enum PollResponse { impl Dispatcher where T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Error: Into>, S::Response: Into>, + B: MessageBody, + X: Service, X::Error: Into>, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { @@ -287,12 +291,16 @@ where impl InnerDispatcher where T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Error: Into>, S::Response: Into>, + B: MessageBody, + X: Service, X::Error: Into>, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { @@ -654,10 +662,6 @@ where // to notify the dispatcher a new state is set and the outer loop // should be continue. Poll::Ready(Ok(res)) => { - let this = self.as_mut().project(); - if let Some(mut payload) = this.payload.take() { - payload.feed_eof(); - } let (res, body) = res.into().replace_body(()); self.as_mut().send_response(res, body) } @@ -1045,12 +1049,16 @@ where impl Future for Dispatcher where T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Error: Into>, S::Response: Into>, + B: MessageBody, + X: Service, X::Error: Into>, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, {