diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 718f67922..f203e3f46 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - Properly wake Payload receivers when feeding errors or EOF +- Shutdown connections when HTTP Responses are written without reading full Requests ## 3.11.1 diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 3f0b78af4..0abd6226a 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -386,7 +386,14 @@ where let mut this = self.project(); this.state.set(match size { BodySize::None | BodySize::Sized(0) => { - this.flags.insert(Flags::FINISHED); + let payload_unfinished = this.payload.is_some(); + + if payload_unfinished { + this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); + } else { + this.flags.insert(Flags::FINISHED); + } + State::None } _ => State::SendPayload { body }, @@ -404,7 +411,14 @@ where let mut this = self.project(); this.state.set(match size { BodySize::None | BodySize::Sized(0) => { - this.flags.insert(Flags::FINISHED); + let payload_unfinished = this.payload.is_some(); + + if payload_unfinished { + this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); + } else { + this.flags.insert(Flags::FINISHED); + } + State::None } _ => State::SendErrorPayload { body }, @@ -503,10 +517,22 @@ where Poll::Ready(None) => { this.codec.encode(Message::Chunk(None), this.write_buf)?; + // if we have not yet pipelined to the next request, then + // this.payload was the payload for the request we just finished + // responding to. We can check to see if we finished reading it + // yet, and if not, shutdown the connection. + let payload_unfinished = this.payload.is_some(); + let not_pipelined = this.messages.is_empty(); + // payload stream finished. // set state to None and handle next message this.state.set(State::None); - this.flags.insert(Flags::FINISHED); + + if not_pipelined && payload_unfinished { + this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); + } else { + this.flags.insert(Flags::FINISHED); + } continue 'res; } @@ -542,10 +568,22 @@ where Poll::Ready(None) => { this.codec.encode(Message::Chunk(None), this.write_buf)?; - // payload stream finished + // if we have not yet pipelined to the next request, then + // this.payload was the payload for the request we just finished + // responding to. We can check to see if we finished reading it + // yet, and if not, shutdown the connection. + let payload_unfinished = this.payload.is_some(); + let not_pipelined = this.messages.is_empty(); + + // payload stream finished. // set state to None and handle next message this.state.set(State::None); - this.flags.insert(Flags::FINISHED); + + if not_pipelined && payload_unfinished { + this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED); + } else { + this.flags.insert(Flags::FINISHED); + } continue 'res; } diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 50259e6ce..267b5be70 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -509,6 +509,73 @@ async fn pipelining_ok_then_ok() { .await; } +#[actix_rt::test] +async fn early_response_with_payload_closes_connection() { + lazy(|cx| { + let buf = TestBuffer::new( + "\ + GET /unfinished HTTP/1.1\r\n\ + Content-Length: 2\r\n\ + \r\n\ + ", + ); + + let cfg = ServiceConfig::new( + KeepAlive::Os, + Duration::from_millis(1), + Duration::from_millis(1), + false, + None, + ); + + let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf.clone(), + services, + cfg, + None, + OnConnectData::default(), + ); + + pin!(h1); + + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); + + match h1.as_mut().poll(cx) { + Poll::Pending => panic!("Should have shut down"), + Poll::Ready(res) => assert!(res.is_ok()), + } + + // polls: initial => shutdown + assert_eq!(h1.poll_count, 2); + + { + let mut res = buf.write_buf_slice_mut(); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = b"\ + HTTP/1.1 200 OK\r\n\ + content-length: 11\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + /unfinished\ + "; + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(exp) + ); + } + }) + .await; +} + #[actix_rt::test] async fn pipelining_ok_then_bad() { lazy(|cx| {