Compare commits

...

3 Commits

Author SHA1 Message Date
Yuki Okushi 25963dbdf1
Prepare release for actix-http, awc (#3791) 2025-10-05 11:30:27 +09:00
asonix 4df9953c86
actix-http: h1: stop pipelining when not reading full requests (#3721)
* actix-http: h1: stop pipelining when not reading full requests

The existing pipelining behavior of the h1 dispatcher can cause client timeouts
if the entire request body isn't read. It puts the dispatcher into a state where
it refuses to read more (payload dropped) but there are still bytes in the buffer
from the request body.

This solution adds the SHUTDOWN flag in addition to the FINISHED flag
when completing a response when both the following are true:

1. There are no messages in `this.messages`
2. There is still a payload in `this.payload`

This combination implies two things. First, that we have not parsed a
pipelined request after the request we have just responded to. Second,
that the current request payload has not been fed an EOF. Because there
are no pipelined requests, we know that the current request payload
belongs to the request we have just responded to, and because the
request payload has not been fed an EOF, we know we never finished
reading it.

When this occurs, adding the SHUTDOWN flag to the dispatcher triggers a
`flush` and a `poll_shutdown` on the IO resource on the next poll.

* Remove printlns from dispatcher

* Add test that fails without changes & passes with changes

* Add changelog entry for h1 shutdown

---------

Co-authored-by: Rob Ede <robjtede@icloud.com>
Co-authored-by: Yuki Okushi <huyuumi.dev@gmail.com>
2025-10-05 01:07:35 +00:00
Kevin Logan 016841137e
fix(awc): close connections after GO_AWAY local or remote GO_AWAY errors (#3790) 2025-10-05 00:44:02 +00:00
10 changed files with 127 additions and 15 deletions

4
Cargo.lock generated
View File

@ -71,7 +71,7 @@ dependencies = [
[[package]]
name = "actix-http"
version = "3.11.1"
version = "3.11.2"
dependencies = [
"actix-codec",
"actix-http-test",
@ -620,7 +620,7 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "awc"
version = "3.8.0"
version = "3.8.1"
dependencies = [
"actix-codec",
"actix-http",

View File

@ -2,9 +2,12 @@
## Unreleased
## 3.11.2
- Properly wake Payload receivers when feeding errors or EOF.
- Add `ServiceConfigBuilder` type to facilitate future configuration extensions.
- Add a configuration option to allow/disallow half closed connections in HTTP/1. This defaults to allow, reverting the change made in 3.11.1.
- Shutdown connections when HTTP Responses are written without reading full Requests.
## 3.11.1

View File

@ -1,6 +1,6 @@
[package]
name = "actix-http"
version = "3.11.1"
version = "3.11.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Rob Ede <robjtede@icloud.com>"]
description = "HTTP types and services for the Actix ecosystem"
keywords = ["actix", "http", "framework", "async", "futures"]

View File

@ -5,11 +5,11 @@
<!-- prettier-ignore-start -->
[![crates.io](https://img.shields.io/crates/v/actix-http?label=latest)](https://crates.io/crates/actix-http)
[![Documentation](https://docs.rs/actix-http/badge.svg?version=3.11.1)](https://docs.rs/actix-http/3.11.1)
[![Documentation](https://docs.rs/actix-http/badge.svg?version=3.11.2)](https://docs.rs/actix-http/3.11.2)
![Version](https://img.shields.io/badge/rustc-1.72+-ab6000.svg)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-http.svg)
<br />
[![dependency status](https://deps.rs/crate/actix-http/3.11.1/status.svg)](https://deps.rs/crate/actix-http/3.11.1)
[![dependency status](https://deps.rs/crate/actix-http/3.11.2/status.svg)](https://deps.rs/crate/actix-http/3.11.2)
[![Download](https://img.shields.io/crates/d/actix-http.svg)](https://crates.io/crates/actix-http)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)

View File

@ -386,7 +386,14 @@ where
let mut this = self.project();
this.state.set(match size {
BodySize::None | BodySize::Sized(0) => {
this.flags.insert(Flags::FINISHED);
let payload_unfinished = this.payload.is_some();
if payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
State::None
}
_ => State::SendPayload { body },
@ -404,7 +411,14 @@ where
let mut this = self.project();
this.state.set(match size {
BodySize::None | BodySize::Sized(0) => {
this.flags.insert(Flags::FINISHED);
let payload_unfinished = this.payload.is_some();
if payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
State::None
}
_ => State::SendErrorPayload { body },
@ -503,10 +517,22 @@ where
Poll::Ready(None) => {
this.codec.encode(Message::Chunk(None), this.write_buf)?;
// if we have not yet pipelined to the next request, then
// this.payload was the payload for the request we just finished
// responding to. We can check to see if we finished reading it
// yet, and if not, shutdown the connection.
let payload_unfinished = this.payload.is_some();
let not_pipelined = this.messages.is_empty();
// payload stream finished.
// set state to None and handle next message
this.state.set(State::None);
this.flags.insert(Flags::FINISHED);
if not_pipelined && payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
continue 'res;
}
@ -542,10 +568,22 @@ where
Poll::Ready(None) => {
this.codec.encode(Message::Chunk(None), this.write_buf)?;
// payload stream finished
// if we have not yet pipelined to the next request, then
// this.payload was the payload for the request we just finished
// responding to. We can check to see if we finished reading it
// yet, and if not, shutdown the connection.
let payload_unfinished = this.payload.is_some();
let not_pipelined = this.messages.is_empty();
// payload stream finished.
// set state to None and handle next message
this.state.set(State::None);
this.flags.insert(Flags::FINISHED);
if not_pipelined && payload_unfinished {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
} else {
this.flags.insert(Flags::FINISHED);
}
continue 'res;
}

View File

@ -535,6 +535,73 @@ async fn pipelining_ok_then_ok() {
.await;
}
#[actix_rt::test]
async fn early_response_with_payload_closes_connection() {
lazy(|cx| {
let buf = TestBuffer::new(
"\
GET /unfinished HTTP/1.1\r\n\
Content-Length: 2\r\n\
\r\n\
",
);
let cfg = ServiceConfig::new(
KeepAlive::Os,
Duration::from_millis(1),
Duration::from_millis(1),
false,
None,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
services,
cfg,
None,
OnConnectData::default(),
);
pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
match h1.as_mut().poll(cx) {
Poll::Pending => panic!("Should have shut down"),
Poll::Ready(res) => assert!(res.is_ok()),
}
// polls: initial => shutdown
assert_eq!(h1.poll_count, 2);
{
let mut res = buf.write_buf_slice_mut();
stabilize_date_header(&mut res);
let res = &res[..];
let exp = b"\
HTTP/1.1 200 OK\r\n\
content-length: 11\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
/unfinished\
";
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(exp)
);
}
})
.await;
}
#[actix_rt::test]
async fn pipelining_ok_then_bad() {
lazy(|cx| {

View File

@ -2,6 +2,10 @@
## Unreleased
## 3.8.1
- Fix a bug where `GO_AWAY` errors did not stop connections from returning to the pool.
## 3.8.0
- Add `hickory-dns` crate feature (off-by-default).

View File

@ -1,6 +1,6 @@
[package]
name = "awc"
version = "3.8.0"
version = "3.8.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Async HTTP and WebSocket client library"
keywords = ["actix", "http", "framework", "async", "web"]

View File

@ -5,9 +5,9 @@
<!-- prettier-ignore-start -->
[![crates.io](https://img.shields.io/crates/v/awc?label=latest)](https://crates.io/crates/awc)
[![Documentation](https://docs.rs/awc/badge.svg?version=3.8.0)](https://docs.rs/awc/3.8.0)
[![Documentation](https://docs.rs/awc/badge.svg?version=3.8.1)](https://docs.rs/awc/3.8.1)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/awc)
[![Dependency Status](https://deps.rs/crate/awc/3.8.0/status.svg)](https://deps.rs/crate/awc/3.8.0)
[![Dependency Status](https://deps.rs/crate/awc/3.8.1/status.svg)](https://deps.rs/crate/awc/3.8.1)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)
<!-- prettier-ignore-end -->

View File

@ -107,7 +107,7 @@ where
let res = poll_fn(|cx| io.poll_ready(cx)).await;
if let Err(err) = res {
io.on_release(err.is_io());
io.on_release(err.is_io() || err.is_go_away());
return Err(SendRequestError::from(err));
}
@ -121,7 +121,7 @@ where
fut.await.map_err(SendRequestError::from)?
}
Err(err) => {
io.on_release(err.is_io());
io.on_release(err.is_io() || err.is_go_away());
return Err(err.into());
}
};