This commit is contained in:
asonix 2025-09-04 17:58:00 -05:00 committed by GitHub
commit 98c98a4707
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 111 additions and 5 deletions

View File

@ -3,6 +3,7 @@
## Unreleased
- Properly wake Payload receivers when feeding errors or EOF
- Shutdown connections when HTTP Responses are written without reading full Requests
## 3.11.1

View File

@ -386,7 +386,14 @@ where
let mut this = self.project();
this.state.set(match size {
BodySize::None | BodySize::Sized(0) => {
this.flags.insert(Flags::FINISHED);
let payload_unfinished = this.payload.is_some();
if payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
State::None
}
_ => State::SendPayload { body },
@ -404,7 +411,14 @@ where
let mut this = self.project();
this.state.set(match size {
BodySize::None | BodySize::Sized(0) => {
this.flags.insert(Flags::FINISHED);
let payload_unfinished = this.payload.is_some();
if payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
State::None
}
_ => State::SendErrorPayload { body },
@ -503,10 +517,22 @@ where
Poll::Ready(None) => {
this.codec.encode(Message::Chunk(None), this.write_buf)?;
// if we have not yet pipelined to the next request, then
// this.payload was the payload for the request we just finished
// responding to. We can check to see if we finished reading it
// yet, and if not, shutdown the connection.
let payload_unfinished = this.payload.is_some();
let not_pipelined = this.messages.is_empty();
// payload stream finished.
// set state to None and handle next message
this.state.set(State::None);
this.flags.insert(Flags::FINISHED);
if not_pipelined && payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
continue 'res;
}
@ -542,10 +568,22 @@ where
Poll::Ready(None) => {
this.codec.encode(Message::Chunk(None), this.write_buf)?;
// payload stream finished
// if we have not yet pipelined to the next request, then
// this.payload was the payload for the request we just finished
// responding to. We can check to see if we finished reading it
// yet, and if not, shutdown the connection.
let payload_unfinished = this.payload.is_some();
let not_pipelined = this.messages.is_empty();
// payload stream finished.
// set state to None and handle next message
this.state.set(State::None);
this.flags.insert(Flags::FINISHED);
if not_pipelined && payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
continue 'res;
}

View File

@ -509,6 +509,73 @@ async fn pipelining_ok_then_ok() {
.await;
}
#[actix_rt::test]
async fn early_response_with_payload_closes_connection() {
lazy(|cx| {
let buf = TestBuffer::new(
"\
GET /unfinished HTTP/1.1\r\n\
Content-Length: 2\r\n\
\r\n\
",
);
let cfg = ServiceConfig::new(
KeepAlive::Os,
Duration::from_millis(1),
Duration::from_millis(1),
false,
None,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
services,
cfg,
None,
OnConnectData::default(),
);
pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
match h1.as_mut().poll(cx) {
Poll::Pending => panic!("Should have shut down"),
Poll::Ready(res) => assert!(res.is_ok()),
}
// polls: initial => shutdown
assert_eq!(h1.poll_count, 2);
{
let mut res = buf.write_buf_slice_mut();
stabilize_date_header(&mut res);
let res = &res[..];
let exp = b"\
HTTP/1.1 200 OK\r\n\
content-length: 11\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
/unfinished\
";
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(exp)
);
}
})
.await;
}
#[actix_rt::test]
async fn pipelining_ok_then_bad() {
lazy(|cx| {