fix: force feed eof when there's a hanging body

This commit is contained in:
Simon Hornby 2025-08-05 12:22:29 +02:00 committed by Yuki Okushi
parent e3ae20de30
commit d3f8f7c854
1 changed files with 6 additions and 14 deletions

View File

@ -20,18 +20,18 @@ use tokio_util::codec::{Decoder as _, Encoder as _};
use tracing::{error, trace};
use super::{
Message, MessageType,
codec::Codec,
decoder::MAX_BUFFER_SIZE,
payload::{Payload, PayloadSender, PayloadStatus},
timer::TimerState,
Message, MessageType,
};
use crate::{
Error, Extensions, OnConnectData, Request, Response, StatusCode,
body::{BodySize, BoxBody, MessageBody},
config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError},
service::HttpFlow,
Error, Extensions, OnConnectData, Request, Response, StatusCode,
};
const LW_BUFFER_SIZE: usize = 1024;
@ -236,16 +236,12 @@ enum PollResponse {
impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request>,
S::Error: Into<Response<BoxBody>>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
@ -291,16 +287,12 @@ where
impl<T, S, B, X, U> InnerDispatcher<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request>,
S::Error: Into<Response<BoxBody>>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{
@ -654,6 +646,10 @@ where
// to notify the dispatcher a new state is set and the outer loop
// should be continue.
Poll::Ready(Ok(res)) => {
let this = self.as_mut().project();
if let Some(mut payload) = this.payload.take() {
payload.feed_eof();
}
let (res, body) = res.into().replace_body(());
self.as_mut().send_response(res, body)
}
@ -1036,16 +1032,12 @@ where
impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
where
T: AsyncRead + AsyncWrite + Unpin,
S: Service<Request>,
S::Error: Into<Response<BoxBody>>,
S::Response: Into<Response<B>>,
B: MessageBody,
X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>,
U: Service<(Request, Framed<T, Codec>), Response = ()>,
U::Error: fmt::Display,
{