From af30b7e679b58c1d6aacc380d234636ea8760112 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 7 Aug 2025 17:56:44 -0500 Subject: [PATCH 1/4] actix-http: h1: stop pipelining when not reading full requests The existing pipelining behavior of the h1 dispatcher can cause client timeouts if the entire request body isn't read. It puts the dispatcher into a state where it refuses to read more (payload dropped) but there are still bytes in the buffer from the request body. This solution adds the SHUTDOWN flag in addition to the FINISHED flag when completing a response when both the following are true: 1. There are no messages in `this.messages` 2. There is still a payload in `this.payload` This combination implies two things. First, that we have not parsed a pipelined request after the request we have just responded to. Second, that the current request payload has not been fed an EOF. Because there are no pipelined requests, we know that the current request payload belongs to the request we have just responded to, and because the request payload has not been fed an EOF, we know we never finished reading it. When this occurs, adding the SHUTDOWN flag to the dispatcher triggers a `flush` and a `poll_shutdown` on the IO resource on the next poll. --- actix-http/src/h1/dispatcher.rs | 54 ++++++++++++++++++++++++++++++--- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 00b51360e..adcda10c6 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -386,7 +386,14 @@ where let mut this = self.project(); this.state.set(match size { BodySize::None | BodySize::Sized(0) => { - this.flags.insert(Flags::FINISHED); + let payload_unfinished = this.payload.is_some(); + + if payload_unfinished { + this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); + } else { + this.flags.insert(Flags::FINISHED); + } + State::None } _ => State::SendPayload { body }, @@ -404,7 +411,14 @@ where let mut this = self.project(); this.state.set(match size { BodySize::None | BodySize::Sized(0) => { - this.flags.insert(Flags::FINISHED); + let payload_unfinished = this.payload.is_some(); + + if payload_unfinished { + this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); + } else { + this.flags.insert(Flags::FINISHED); + } + State::None } _ => State::SendErrorPayload { body }, @@ -503,10 +517,25 @@ where Poll::Ready(None) => { this.codec.encode(Message::Chunk(None), this.write_buf)?; + // if we have not yet pipelined to the next request, then + // this.payload was the payload for the request we just finished + // responding to. We can check to see if we finished reading it + // yet, and if not, shutdown the connection. + let payload_unfinished = this.payload.is_some(); + let not_pipelined = this.messages.is_empty(); + + println!("not pipelined: {not_pipelined}"); + println!("payload unfinished: {payload_unfinished}"); + // payload stream finished. // set state to None and handle next message this.state.set(State::None); - this.flags.insert(Flags::FINISHED); + + if not_pipelined && payload_unfinished { + this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); + } else { + this.flags.insert(Flags::FINISHED); + } continue 'res; } @@ -542,10 +571,25 @@ where Poll::Ready(None) => { this.codec.encode(Message::Chunk(None), this.write_buf)?; - // payload stream finished + // if we have not yet pipelined to the next request, then + // this.payload was the payload for the request we just finished + // responding to. We can check to see if we finished reading it + // yet, and if not, shutdown the connection. + let payload_unfinished = this.payload.is_some(); + let not_pipelined = this.messages.is_empty(); + + println!("not pipelined: {not_pipelined}"); + println!("payload unfinished: {payload_unfinished}"); + + // payload stream finished. // set state to None and handle next message this.state.set(State::None); - this.flags.insert(Flags::FINISHED); + + if not_pipelined && payload_unfinished { + this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); + } else { + this.flags.insert(Flags::FINISHED); + } continue 'res; } From 016e923138c3fc39aa7f670ef2c12d58552277fb Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 4 Sep 2025 17:51:09 -0500 Subject: [PATCH 2/4] Remove printlns from dispatcher --- actix-http/src/h1/dispatcher.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index b3f8762c6..0abd6226a 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -524,9 +524,6 @@ where let payload_unfinished = this.payload.is_some(); let not_pipelined = this.messages.is_empty(); - println!("not pipelined: {not_pipelined}"); - println!("payload unfinished: {payload_unfinished}"); - // payload stream finished. // set state to None and handle next message this.state.set(State::None); @@ -578,9 +575,6 @@ where let payload_unfinished = this.payload.is_some(); let not_pipelined = this.messages.is_empty(); - println!("not pipelined: {not_pipelined}"); - println!("payload unfinished: {payload_unfinished}"); - // payload stream finished. // set state to None and handle next message this.state.set(State::None); From 965bb5c2033d9c44090f9e0b42e82e399f63ceb6 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 4 Sep 2025 17:51:21 -0500 Subject: [PATCH 3/4] Add test that fails without changes & passes with changes --- actix-http/src/h1/dispatcher_tests.rs | 67 +++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 50259e6ce..267b5be70 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -509,6 +509,73 @@ async fn pipelining_ok_then_ok() { .await; } +#[actix_rt::test] +async fn early_response_with_payload_closes_connection() { + lazy(|cx| { + let buf = TestBuffer::new( + "\ + GET /unfinished HTTP/1.1\r\n\ + Content-Length: 2\r\n\ + \r\n\ + ", + ); + + let cfg = ServiceConfig::new( + KeepAlive::Os, + Duration::from_millis(1), + Duration::from_millis(1), + false, + None, + ); + + let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf.clone(), + services, + cfg, + None, + OnConnectData::default(), + ); + + pin!(h1); + + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); + + match h1.as_mut().poll(cx) { + Poll::Pending => panic!("Should have shut down"), + Poll::Ready(res) => assert!(res.is_ok()), + } + + // polls: initial => shutdown + assert_eq!(h1.poll_count, 2); + + { + let mut res = buf.write_buf_slice_mut(); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = b"\ + HTTP/1.1 200 OK\r\n\ + content-length: 11\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + /unfinished\ + "; + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(exp) + ); + } + }) + .await; +} + #[actix_rt::test] async fn pipelining_ok_then_bad() { lazy(|cx| { From b9942bcb3dda79dbd0cf81ada19842ab06d15802 Mon Sep 17 00:00:00 2001 From: asonix Date: Thu, 4 Sep 2025 17:57:53 -0500 Subject: [PATCH 4/4] Add changelog entry for h1 shutdown --- actix-http/CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 718f67922..f203e3f46 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - Properly wake Payload receivers when feeding errors or EOF +- Shutdown connections when HTTP Responses are written without reading full Requests ## 3.11.1