Compare commits

...

5 Commits

Author SHA1 Message Date
Yuki Okushi 8c397f83a3
chore(http,web): upgrade `foldhash` to 0.2 (#4030) 2026-04-18 14:22:44 +09:00
Yuki Okushi cffe5d271e
http: add h1 write buffer size setting (#3986)
Co-authored-by: Nicolas Grondin <ngrondin78@gmail.com>
2026-04-18 13:22:45 +09:00
Yuki Okushi 10609f749d
actix-http: linger after early responses (#3985)
Co-authored-by: Ophir LOJKINE <contact@ophir.dev>
2026-04-18 12:46:44 +09:00
Yuki Okushi 0fb89457ed
chore(http): prepare v3.12.1 (#4029) 2026-04-18 11:41:32 +09:00
Yuki Okushi 3c056bd361
Merge commit from fork 2026-04-18 11:09:12 +09:00
13 changed files with 763 additions and 107 deletions

14
Cargo.lock generated
View File

@ -49,7 +49,7 @@ dependencies = [
[[package]]
name = "actix-http"
version = "3.12.0"
version = "3.12.1"
dependencies = [
"actix-codec",
"actix-http-test",
@ -72,7 +72,7 @@ dependencies = [
"encoding_rs",
"env_logger",
"flate2",
"foldhash",
"foldhash 0.2.0",
"futures-core",
"futures-util",
"h2",
@ -351,7 +351,7 @@ dependencies = [
"encoding_rs",
"env_logger",
"flate2",
"foldhash",
"foldhash 0.2.0",
"futures-core",
"futures-util",
"impl-more",
@ -1320,6 +1320,12 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foldhash"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
[[package]]
name = "foreign-types"
version = "0.3.2"
@ -1535,7 +1541,7 @@ version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"foldhash",
"foldhash 0.1.5",
]
[[package]]

View File

@ -2,10 +2,21 @@
## Unreleased
- When configured, gracefully close HTTP/1 connections after early responses to unread request bodies. [#3967]
- Update `foldhash` dependency to `0.2`.
[#3967]: https://github.com/actix/actix-web/issues/3967
## 3.12.1
**Notice: This release contains a security fix. Users are encouraged to update to this version ASAP.**
- SECURITY: Reject HTTP/1 requests with ambiguous request framing from `Content-Length` and `Transfer-Encoding` headers to prevent request smuggling.
- Encode the HTTP/1 `Connection: Upgrade` header in Camel-Case when camel-case header formatting is enabled.[#3953]
- Fix `HeaderMap` iterators' `len()` and `size_hint()` implementations for multi-value headers.
- Update `rand` dependency to `0.10`.
- Update `sha1` dependency to `0.11`.
- Add `ServiceConfigBuilder::h1_write_buffer_size()` and `HttpServiceBuilder::h1_write_buffer_size()`.
[#3953]: https://github.com/actix/actix-web/pull/3953

View File

@ -1,6 +1,6 @@
[package]
name = "actix-http"
version = "3.12.0"
version = "3.12.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Rob Ede <robjtede@icloud.com>"]
description = "HTTP types and services for the Actix ecosystem"
keywords = ["actix", "http", "framework", "async", "futures"]
@ -102,7 +102,7 @@ bytes = "1"
bytestring = "1"
derive_more = { version = "2", features = ["as_ref", "deref", "deref_mut", "display", "error", "from"] }
encoding_rs = "0.8"
foldhash = "0.1"
foldhash = "0.2"
futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] }
http = "0.2.7"
httparse = "1.5.1"

View File

@ -5,7 +5,9 @@ use actix_service::{IntoServiceFactory, Service, ServiceFactory};
use crate::{
body::{BoxBody, MessageBody},
config::{DEFAULT_H2_CONN_WINDOW_SIZE, DEFAULT_H2_STREAM_WINDOW_SIZE},
config::{
DEFAULT_H1_WRITE_BUFFER_SIZE, DEFAULT_H2_CONN_WINDOW_SIZE, DEFAULT_H2_STREAM_WINDOW_SIZE,
},
h1::{self, ExpectHandler, H1Service, UpgradeHandler},
service::HttpService,
ConnectCallback, Extensions, KeepAlive, Request, Response, ServiceConfigBuilder,
@ -22,6 +24,7 @@ pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> {
secure: bool,
local_addr: Option<net::SocketAddr>,
h1_allow_half_closed: bool,
h1_write_buffer_size: usize,
h2_conn_window_size: u32,
h2_stream_window_size: u32,
expect: X,
@ -47,6 +50,7 @@ where
secure: false,
local_addr: None,
h1_allow_half_closed: true,
h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE,
h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE,
h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE,
@ -151,6 +155,25 @@ where
self
}
/// Sets the maximum response write buffer size for HTTP/1 connections.
///
/// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream.
///
/// The default value is 32 KiB.
///
/// # Panics
///
/// Panics if `size` is 0.
pub fn h1_write_buffer_size(mut self, size: usize) -> Self {
assert!(
size > 0,
"HTTP/1 write buffer size must be greater than zero"
);
self.h1_write_buffer_size = size;
self
}
/// Sets initial stream-level flow control window size for HTTP/2 connections.
///
/// See [`ServiceConfigBuilder::h2_initial_window_size`] for more details.
@ -187,6 +210,7 @@ where
secure: self.secure,
local_addr: self.local_addr,
h1_allow_half_closed: self.h1_allow_half_closed,
h1_write_buffer_size: self.h1_write_buffer_size,
h2_conn_window_size: self.h2_conn_window_size,
h2_stream_window_size: self.h2_stream_window_size,
expect: expect.into_factory(),
@ -215,6 +239,7 @@ where
secure: self.secure,
local_addr: self.local_addr,
h1_allow_half_closed: self.h1_allow_half_closed,
h1_write_buffer_size: self.h1_write_buffer_size,
h2_conn_window_size: self.h2_conn_window_size,
h2_stream_window_size: self.h2_stream_window_size,
expect: self.expect,
@ -254,6 +279,7 @@ where
.secure(self.secure)
.local_addr(self.local_addr)
.h1_allow_half_closed(self.h1_allow_half_closed)
.h1_write_buffer_size(self.h1_write_buffer_size)
.h2_initial_window_size(self.h2_stream_window_size)
.h2_initial_connection_window_size(self.h2_conn_window_size)
.build();
@ -283,6 +309,7 @@ where
.secure(self.secure)
.local_addr(self.local_addr)
.h1_allow_half_closed(self.h1_allow_half_closed)
.h1_write_buffer_size(self.h1_write_buffer_size)
.h2_initial_window_size(self.h2_stream_window_size)
.h2_initial_connection_window_size(self.h2_conn_window_size)
.build();
@ -309,6 +336,7 @@ where
.secure(self.secure)
.local_addr(self.local_addr)
.h1_allow_half_closed(self.h1_allow_half_closed)
.h1_write_buffer_size(self.h1_write_buffer_size)
.h2_initial_window_size(self.h2_stream_window_size)
.h2_initial_connection_window_size(self.h2_conn_window_size)
.build();

View File

@ -18,6 +18,9 @@ pub(crate) const DEFAULT_H2_CONN_WINDOW_SIZE: u32 = 1024 * 1024 * 2; // 2MiB
/// Matches awc's defaults to avoid poor throughput on high-BDP links.
pub(crate) const DEFAULT_H2_STREAM_WINDOW_SIZE: u32 = 1024 * 1024; // 1MiB
/// Default HTTP/1 response write buffer size.
pub(crate) const DEFAULT_H1_WRITE_BUFFER_SIZE: usize = 32_768;
/// A builder for creating a [`ServiceConfig`]
#[derive(Default, Debug)]
pub struct ServiceConfigBuilder {
@ -86,6 +89,25 @@ impl ServiceConfigBuilder {
self
}
/// Sets the maximum response write buffer size for HTTP/1 connections.
///
/// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream.
///
/// The default value is 32 KiB.
///
/// # Panics
///
/// Panics if `size` is 0.
pub fn h1_write_buffer_size(mut self, size: usize) -> Self {
assert!(
size > 0,
"HTTP/1 write buffer size must be greater than zero"
);
self.inner.h1_write_buffer_size = size;
self
}
/// Sets initial stream-level flow control window size for HTTP/2 connections.
///
/// Higher values can improve upload performance on high-latency links at the cost of higher
@ -128,6 +150,7 @@ struct Inner {
tcp_nodelay: Option<bool>,
date_service: DateService,
h1_allow_half_closed: bool,
h1_write_buffer_size: usize,
h2_conn_window_size: u32,
h2_stream_window_size: u32,
}
@ -143,6 +166,7 @@ impl Default for Inner {
tcp_nodelay: None,
date_service: DateService::new(),
h1_allow_half_closed: true,
h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE,
h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE,
h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE,
}
@ -167,6 +191,7 @@ impl ServiceConfig {
tcp_nodelay: None,
date_service: DateService::new(),
h1_allow_half_closed: true,
h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE,
h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE,
h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE,
}))
@ -228,6 +253,11 @@ impl ServiceConfig {
self.0.h1_allow_half_closed
}
/// HTTP/1 response write buffer size (in bytes).
pub fn h1_write_buffer_size(&self) -> usize {
self.0.h1_write_buffer_size
}
/// Returns configured `TCP_NODELAY` setting for accepted TCP connections.
pub fn tcp_nodelay(&self) -> Option<bool> {
self.0.tcp_nodelay

View File

@ -237,4 +237,18 @@ mod tests {
assert_eq!(*req.method(), Method::POST);
assert!(req.chunked().unwrap());
}
#[actix_rt::test]
async fn test_http_request_rejects_content_length_and_chunked() {
let mut codec = Codec::default();
let mut buf = BytesMut::from(
"POST /test HTTP/1.1\r\n\
content-length: 11\r\n\
transfer-encoding: chunked\r\n\r\n\
0\r\n\r\n\
GET /test2 HTTP/1.1\r\n\r\n",
);
assert!(codec.decode(&mut buf).is_err());
}
}

View File

@ -275,6 +275,23 @@ impl MessageType for Request {
// convert headers
let mut length = msg.set_headers(&src.split_to(len).freeze(), &headers[..h_len], ver)?;
if msg.head().headers.contains_key(header::TRANSFER_ENCODING) {
if ver == Version::HTTP_10 {
debug!("Transfer-Encoding is not allowed in HTTP/1.0 requests");
return Err(ParseError::Header);
}
if !crate::HttpMessage::chunked(&msg)? {
debug!("request Transfer-Encoding must be chunked");
return Err(ParseError::Header);
}
if msg.head().headers.contains_key(header::CONTENT_LENGTH) {
debug!("both Content-Length and Transfer-Encoding are set");
return Err(ParseError::Header);
}
}
// disallow HTTP/1.0 POST requests that do not contain a Content-Length headers
// see https://datatracker.ietf.org/doc/html/rfc1945#section-7.2.2
if ver == Version::HTTP_10 && method == Method::POST && length.is_none() {
@ -1116,18 +1133,57 @@ mod tests {
#[test]
fn hrs_cl_and_te_http10() {
// in HTTP/1.0 transfer encoding is simply ignored so it's fine to have both
let mut buf = BytesMut::from(
expect_parse_err!(&mut BytesMut::from(
"GET / HTTP/1.0\r\n\
Host: example.com\r\n\
Content-Length: 3\r\n\
Transfer-Encoding: chunked\r\n\
\r\n\
000",
);
));
}
parse_ready!(&mut buf);
#[test]
fn hrs_cl_and_chunked_te_http11() {
expect_parse_err!(&mut BytesMut::from(
"POST / HTTP/1.1\r\n\
Host: example.com\r\n\
Content-Length: 3\r\n\
Transfer-Encoding: chunked\r\n\
\r\n\
0\r\n\
\r\n",
));
expect_parse_err!(&mut BytesMut::from(
"POST / HTTP/1.1\r\n\
Host: example.com\r\n\
Transfer-Encoding: chunked\r\n\
Content-Length: 3\r\n\
\r\n\
0\r\n\
\r\n",
));
}
#[test]
fn hrs_identity_te_http11() {
expect_parse_err!(&mut BytesMut::from(
"POST / HTTP/1.1\r\n\
Host: example.com\r\n\
Transfer-Encoding: identity\r\n\
\r\n\
0\r\n",
));
expect_parse_err!(&mut BytesMut::from(
"POST / HTTP/1.1\r\n\
Host: example.com\r\n\
Content-Length: 3\r\n\
Transfer-Encoding: identity\r\n\
\r\n\
0\r\n",
));
}
#[test]
@ -1165,14 +1221,16 @@ mod tests {
}
#[test]
fn transfer_encoding_agrees() {
fn hrs_chunked_te_http11() {
let mut buf = BytesMut::from(
"GET /test HTTP/1.1\r\n\
Host: example.com\r\n\
Content-Length: 3\r\n\
Transfer-Encoding: identity\r\n\
Transfer-Encoding: chunked\r\n\
\r\n\
0\r\n",
1\r\n\
a\r\n\
0\r\n\
\r\n",
);
let mut reader = MessageDecoder::<Request>::default();
@ -1180,6 +1238,6 @@ mod tests {
let mut pl = pl.unwrap();
let chunk = pl.decode(&mut buf).unwrap().unwrap();
assert_eq!(chunk, PayloadItem::Chunk(Bytes::from_static(b"0\r\n")));
assert_eq!(chunk, PayloadItem::Chunk(Bytes::from_static(b"a")));
}
}

View File

@ -31,7 +31,7 @@ use crate::{
config::ServiceConfig,
error::{DispatchError, ParseError, PayloadError},
service::HttpFlow,
Error, Extensions, HttpMessage, OnConnectData, Request, Response, StatusCode,
ConnectionType, Error, Extensions, HttpMessage, OnConnectData, Request, Response, StatusCode,
};
const LW_BUFFER_SIZE: usize = 1024;
@ -58,6 +58,9 @@ bitflags! {
/// Set if write-half is disconnected.
const WRITE_DISCONNECT = 0b0010_0000;
/// Set while gracefully closing a connection after an early response.
const LINGER = 0b0100_0000;
}
}
@ -168,6 +171,7 @@ pin_project! {
pub(super) io: Option<T>,
read_buf: BytesMut,
write_buf: BytesMut,
h1_write_buffer_size: usize,
codec: Codec,
}
}
@ -281,6 +285,7 @@ where
io: Some(io),
read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
h1_write_buffer_size: config.h1_write_buffer_size(),
codec: Codec::new(config),
},
},
@ -361,6 +366,65 @@ where
io.poll_flush(cx)
}
fn enter_linger(mut self: Pin<&mut Self>) {
let this = self.as_mut().project();
this.flags.remove(Flags::KEEP_ALIVE);
this.flags.insert(Flags::LINGER | Flags::FINISHED);
}
fn ensure_linger_timer(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool {
let this = self.as_mut().project();
if matches!(this.shutdown_timer, TimerState::Active { .. }) {
return true;
}
if let Some(deadline) = this.config.client_disconnect_deadline() {
this.shutdown_timer
.set_and_init(cx, sleep_until(deadline.into()), line!());
true
} else {
false
}
}
fn poll_linger(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Result<Poll<()>, DispatchError> {
if self.as_mut().poll_flush(cx)?.is_pending() {
return Ok(Poll::Pending);
}
if !self.as_mut().ensure_linger_timer(cx) {
let this = self.as_mut().project();
this.flags.remove(Flags::LINGER);
this.flags.insert(Flags::SHUTDOWN);
return Ok(Poll::Ready(()));
}
loop {
let should_disconnect = self.as_mut().read_available(cx)?;
let this = self.as_mut().project();
let mut progressed = false;
if !this.read_buf.is_empty() {
this.read_buf.clear();
progressed = true;
}
if should_disconnect {
this.flags.remove(Flags::LINGER);
this.flags.insert(Flags::READ_DISCONNECT | Flags::SHUTDOWN);
return Ok(Poll::Ready(()));
}
if !progressed {
return Ok(Poll::Pending);
}
}
}
fn send_response_inner(
self: Pin<&mut Self>,
res: Response<()>,
@ -385,54 +449,90 @@ where
fn send_response(
mut self: Pin<&mut Self>,
res: Response<()>,
mut res: Response<()>,
body: B,
) -> Result<(), DispatchError> {
let size = self.as_mut().send_response_inner(res, &body)?;
let mut this = self.project();
this.state.set(match size {
BodySize::None | BodySize::Sized(0) => {
let payload_unfinished = this.payload.is_some();
let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
&& *this.payload_drainable;
let close_after_response = {
let this = self.as_mut().project();
should_close_after_response(this.payload.as_ref(), *this.payload_drainable)
};
if payload_unfinished && !drain_payload {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
if close_after_response {
res.head_mut().set_connection_type(ConnectionType::Close);
}
let size = self.as_mut().send_response_inner(res, &body)?;
match size {
BodySize::None | BodySize::Sized(0) => {
let this = self.as_mut().project();
if close_after_response {
if this.config.client_disconnect_deadline().is_some() {
drop(this);
self.as_mut().enter_linger();
} else {
self.as_mut()
.project()
.flags
.insert(Flags::SHUTDOWN | Flags::FINISHED);
}
} else {
this.flags.insert(Flags::FINISHED);
}
State::None
self.as_mut().project().state.set(State::None);
}
_ => State::SendPayload { body },
});
_ => self
.as_mut()
.project()
.state
.set(State::SendPayload { body }),
}
Ok(())
}
fn send_error_response(
mut self: Pin<&mut Self>,
res: Response<()>,
mut res: Response<()>,
body: BoxBody,
) -> Result<(), DispatchError> {
let size = self.as_mut().send_response_inner(res, &body)?;
let mut this = self.project();
this.state.set(match size {
BodySize::None | BodySize::Sized(0) => {
let payload_unfinished = this.payload.is_some();
let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
&& *this.payload_drainable;
let close_after_response = {
let this = self.as_mut().project();
should_close_after_response(this.payload.as_ref(), *this.payload_drainable)
};
if payload_unfinished && !drain_payload {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
if close_after_response {
res.head_mut().set_connection_type(ConnectionType::Close);
}
let size = self.as_mut().send_response_inner(res, &body)?;
match size {
BodySize::None | BodySize::Sized(0) => {
let this = self.as_mut().project();
if close_after_response {
if this.config.client_disconnect_deadline().is_some() {
drop(this);
self.as_mut().enter_linger();
} else {
self.as_mut()
.project()
.flags
.insert(Flags::SHUTDOWN | Flags::FINISHED);
}
} else {
this.flags.insert(Flags::FINISHED);
}
State::None
self.as_mut().project().state.set(State::None);
}
_ => State::SendErrorPayload { body },
});
_ => self
.as_mut()
.project()
.state
.set(State::SendErrorPayload { body }),
}
Ok(())
}
@ -520,7 +620,7 @@ where
StateProj::SendPayload { mut body } => {
// keep populate writer buffer until buffer size limit hit,
// get blocked or finished.
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
while this.write_buf.len() < *this.h1_write_buffer_size {
match body.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
this.codec
@ -534,18 +634,26 @@ where
// this.payload was the payload for the request we just finished
// responding to. We can check to see if we finished reading it
// yet, and if not, shutdown the connection.
let payload_unfinished = this.payload.is_some();
let drain_payload =
this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
&& *this.payload_drainable;
let close_after_response = should_close_after_response(
this.payload.as_ref(),
*this.payload_drainable,
);
let not_pipelined = this.messages.is_empty();
// payload stream finished.
// set state to None and handle next message
this.state.set(State::None);
if not_pipelined && payload_unfinished && !drain_payload {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
if not_pipelined && close_after_response {
if this.config.client_disconnect_deadline().is_some() {
drop(this);
self.as_mut().enter_linger();
} else {
self.as_mut()
.project()
.flags
.insert(Flags::SHUTDOWN | Flags::FINISHED);
}
} else {
this.flags.insert(Flags::FINISHED);
}
@ -574,7 +682,7 @@ where
// keep populate writer buffer until buffer size limit hit,
// get blocked or finished.
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
while this.write_buf.len() < *this.h1_write_buffer_size {
match body.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(item))) => {
this.codec
@ -588,18 +696,26 @@ where
// this.payload was the payload for the request we just finished
// responding to. We can check to see if we finished reading it
// yet, and if not, shutdown the connection.
let payload_unfinished = this.payload.is_some();
let drain_payload =
this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
&& *this.payload_drainable;
let close_after_response = should_close_after_response(
this.payload.as_ref(),
*this.payload_drainable,
);
let not_pipelined = this.messages.is_empty();
// payload stream finished.
// set state to None and handle next message
this.state.set(State::None);
if not_pipelined && payload_unfinished && !drain_payload {
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
if not_pipelined && close_after_response {
if this.config.client_disconnect_deadline().is_some() {
drop(this);
self.as_mut().enter_linger();
} else {
self.as_mut()
.project()
.flags
.insert(Flags::SHUTDOWN | Flags::FINISHED);
}
} else {
this.flags.insert(Flags::FINISHED);
}
@ -960,14 +1076,20 @@ where
let this = self.as_mut().project();
if let TimerState::Active { timer } = this.shutdown_timer {
debug_assert!(
this.flags.contains(Flags::SHUTDOWN),
"shutdown flag should be set when timer is active",
this.flags.intersects(Flags::LINGER | Flags::SHUTDOWN),
"shutdown or linger flag should be set when timer is active",
);
// timed-out during shutdown; drop connection
if timer.as_mut().poll(cx).is_ready() {
trace!("timed-out during shutdown");
return Err(DispatchError::DisconnectTimeout);
if this.flags.contains(Flags::LINGER) {
trace!("timed-out during linger; shutting down connection");
this.flags.remove(Flags::LINGER);
this.flags.insert(Flags::SHUTDOWN);
this.shutdown_timer.clear(line!());
} else {
trace!("timed-out during shutdown");
return Err(DispatchError::DisconnectTimeout);
}
}
}
@ -1133,7 +1255,15 @@ where
inner.as_mut().poll_timers(cx)?;
let poll = if inner.flags.contains(Flags::SHUTDOWN) {
let poll = if inner.flags.contains(Flags::LINGER) {
match inner.as_mut().poll_linger(cx)? {
Poll::Ready(()) => {
cx.waker().wake_by_ref();
Poll::Pending
}
Poll::Pending => Poll::Pending,
}
} else if inner.flags.contains(Flags::SHUTDOWN) {
if inner.flags.contains(Flags::WRITE_DISCONNECT) {
Poll::Ready(Ok(()))
} else {
@ -1281,7 +1411,7 @@ where
inner_p.shutdown_timer,
);
if inner_p.flags.contains(Flags::SHUTDOWN) {
if inner_p.flags.intersects(Flags::LINGER | Flags::SHUTDOWN) {
cx.waker().wake_by_ref();
}
Poll::Pending
@ -1295,6 +1425,13 @@ where
}
}
fn should_close_after_response(payload: Option<&PayloadSender>, payload_drainable: bool) -> bool {
let payload_unfinished = payload.is_some();
let drain_payload = payload.is_some_and(|pl| pl.is_dropped()) && payload_drainable;
payload_unfinished && !drain_payload
}
#[allow(dead_code)]
fn trace_timer_states(
label: &str,

View File

@ -1,13 +1,19 @@
use std::{
cell::Cell,
future::Future,
io,
pin::Pin,
rc::Rc,
str,
task::{Context, Poll},
time::Duration,
};
use actix_codec::Framed;
use actix_rt::{pin, time::sleep};
use actix_rt::{
pin,
time::{sleep, timeout},
};
use actix_service::{fn_service, Service};
use actix_utils::future::{ready, Ready};
use bytes::{Buf, Bytes, BytesMut};
@ -43,6 +49,111 @@ impl Service<Request> for YieldService {
}
}
struct ReadyChunkBody {
chunk_polls: Rc<Cell<usize>>,
remaining: usize,
chunk_len: usize,
}
impl ReadyChunkBody {
fn new(chunk_polls: Rc<Cell<usize>>, remaining: usize, chunk_len: usize) -> Self {
Self {
chunk_polls,
remaining,
chunk_len,
}
}
}
impl MessageBody for ReadyChunkBody {
type Error = Error;
fn size(&self) -> crate::body::BodySize {
crate::body::BodySize::Stream
}
fn poll_next(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.remaining == 0 {
return Poll::Ready(None);
}
self.remaining -= 1;
self.chunk_polls.set(self.chunk_polls.get() + 1);
Poll::Ready(Some(Ok(Bytes::from(vec![b'x'; self.chunk_len]))))
}
}
struct PendingOnceWriteBuf {
io: TestBuffer,
block_next_write: bool,
}
impl PendingOnceWriteBuf {
fn new<T>(data: T) -> Self
where
T: Into<BytesMut>,
{
Self {
io: TestBuffer::new(data),
block_next_write: true,
}
}
}
impl io::Read for PendingOnceWriteBuf {
fn read(&mut self, dst: &mut [u8]) -> Result<usize, io::Error> {
self.io.read(dst)
}
}
impl io::Write for PendingOnceWriteBuf {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.io.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.io.flush()
}
}
impl actix_codec::AsyncRead for PendingOnceWriteBuf {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut actix_codec::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_read(cx, buf)
}
}
impl actix_codec::AsyncWrite for PendingOnceWriteBuf {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if self.block_next_write {
self.block_next_write = false;
cx.waker().wake_by_ref();
return Poll::Pending;
}
Pin::new(&mut self.io).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.io).poll_shutdown(cx)
}
}
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
memchr::memmem::find(&haystack[from..], needle)
}
@ -84,6 +195,11 @@ fn drop_payload_service() -> impl Service<Request, Response = Response<&'static
})
}
fn ignore_payload_service(
) -> impl Service<Request, Response = Response<&'static str>, Error = Error> {
fn_service(|_req: Request| ready(Ok::<_, Error>(Response::with_body(StatusCode::OK, "ok"))))
}
fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, Error = Error> {
fn_service(|mut req: Request| {
Box::pin(async move {
@ -100,6 +216,18 @@ fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, E
})
}
fn ready_chunk_body_service(
chunk_polls: Rc<Cell<usize>>,
chunk_count: usize,
chunk_len: usize,
) -> impl Service<Request, Response = Response<ReadyChunkBody>, Error = Error> {
fn_service(move |_req: Request| {
ready(Ok::<_, Error>(Response::ok().set_body(
ReadyChunkBody::new(chunk_polls.clone(), chunk_count, chunk_len),
)))
})
}
#[actix_rt::test]
async fn late_request() {
let mut buf = TestBuffer::empty();
@ -536,15 +664,14 @@ async fn pipelining_ok_then_ok() {
}
#[actix_rt::test]
async fn early_response_with_payload_closes_connection() {
async fn early_response_with_payload_lingers_before_closing() {
lazy(|cx| {
let buf = TestBuffer::new(
"\
GET /unfinished HTTP/1.1\r\n\
Content-Length: 2\r\n\
\r\n\
",
);
let buf = TestSeqBuffer::new(http_msg(
r"
GET /unfinished HTTP/1.1
Content-Length: 2
",
));
let cfg = ServiceConfig::new(
KeepAlive::Os,
@ -569,39 +696,172 @@ async fn early_response_with_payload_closes_connection() {
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
match h1.as_mut().poll(cx) {
Poll::Pending => panic!("Should have shut down"),
Poll::Ready(res) => assert!(res.is_ok()),
Poll::Pending => {}
Poll::Ready(res) => panic!("should still be lingering: {:?}", res),
}
// polls: initial => shutdown
assert_eq!(h1.poll_count, 2);
// polls: initial
assert_eq!(h1.poll_count, 1);
{
let mut res = buf.write_buf_slice_mut();
stabilize_date_header(&mut res);
let res = &res[..];
let mut res = buf.take_write_buf().to_vec();
stabilize_date_header(&mut res);
let res = &res[..];
let exp = b"\
HTTP/1.1 200 OK\r\n\
content-length: 11\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
/unfinished\
";
let exp = b"\
HTTP/1.1 200 OK\r\n\
content-length: 11\r\n\
connection: close\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
/unfinished\
";
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(exp)
);
}
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(exp)
);
buf.close_read();
assert!(h1.as_mut().poll(cx).is_pending());
assert!(h1.as_mut().poll(cx).is_ready());
})
.await;
}
#[actix_rt::test]
async fn buffered_upload_ignored_by_handler_should_not_shutdown_immediately() {
lazy(|cx| {
let buf = TestSeqBuffer::new(http_msg(
r"
POST / HTTP/1.1
Content-Length: 8
ab
",
));
let cfg = ServiceConfig::new(
KeepAlive::Os,
Duration::from_millis(1),
Duration::from_millis(1),
false,
None,
);
let services = HttpFlow::new(ignore_payload_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
services,
cfg,
None,
OnConnectData::default(),
);
pin!(h1);
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
match h1.as_mut().poll(cx) {
Poll::Pending => {}
Poll::Ready(res) => panic!("closed connection early: {:?}", res),
}
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
stabilize_date_header(&mut res);
let res = &res[..];
let exp = http_msg(
r"
HTTP/1.1 200 OK
content-length: 2
connection: close
date: Thu, 01 Jan 1970 12:34:56 UTC
ok
",
);
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(&exp)
);
buf.close_read();
assert!(h1.as_mut().poll(cx).is_pending());
assert!(h1.as_mut().poll(cx).is_ready());
})
.await;
}
#[actix_rt::test]
async fn lingering_timeout_uses_graceful_shutdown() {
let buf = TestSeqBuffer::new(
"\
POST / HTTP/1.1\r\n\
Content-Length: 8\r\n\
\r\n\
ab\
",
);
let cfg = ServiceConfig::new(
KeepAlive::Disabled,
Duration::ZERO,
Duration::from_millis(1),
false,
None,
);
let services = HttpFlow::new(ignore_payload_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
services,
cfg,
None,
OnConnectData::default(),
);
assert!(matches!(
timeout(Duration::from_millis(100), h1).await,
Ok(Ok(()))
));
let mut res = buf.take_write_buf().to_vec();
stabilize_date_header(&mut res);
let res = &res[..];
let exp = b"\
HTTP/1.1 200 OK\r\n\
content-length: 2\r\n\
connection: close\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
ok\
";
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(exp)
);
}
#[actix_rt::test]
async fn pipelining_ok_then_bad() {
lazy(|cx| {
@ -1224,6 +1484,58 @@ async fn disallow_half_closed() {
assert!(matches!(inner.state, State::ServiceCall { .. }))
}
#[actix_rt::test]
async fn h1_write_buffer_size_limits_buffering() {
let request = "GET /stream HTTP/1.1\r\nConnection: close\r\n\r\n";
let default_polls = Rc::new(Cell::new(0));
let default_services = HttpFlow::new(
ready_chunk_body_service(default_polls.clone(), 8, 1024),
ExpectHandler,
None::<UpgradeHandler>,
);
let default_io = PendingOnceWriteBuf::new(request);
let default_dispatcher = Dispatcher::new(
default_io,
default_services,
ServiceConfig::default(),
None,
OnConnectData::default(),
);
pin!(default_dispatcher);
let mut cx = Context::from_waker(futures_util::task::noop_waker_ref());
assert!(default_dispatcher.as_mut().poll(&mut cx).is_pending());
assert_eq!(default_polls.get(), 8);
let custom_polls = Rc::new(Cell::new(0));
let custom_services = HttpFlow::new(
ready_chunk_body_service(custom_polls.clone(), 8, 1024),
ExpectHandler,
None::<UpgradeHandler>,
);
let custom_io = PendingOnceWriteBuf::new(request);
let custom_dispatcher = Dispatcher::new(
custom_io,
custom_services,
crate::config::ServiceConfigBuilder::new()
.h1_write_buffer_size(1024)
.build(),
None,
OnConnectData::default(),
);
pin!(custom_dispatcher);
assert!(custom_dispatcher.as_mut().poll(&mut cx).is_pending());
assert_eq!(custom_polls.get(), 1);
}
#[actix_rt::test]
#[should_panic(expected = "HTTP/1 write buffer size must be greater than zero")]
async fn h1_write_buffer_size_rejects_zero() {
let _ = crate::config::ServiceConfigBuilder::new().h1_write_buffer_size(0);
}
fn http_msg(msg: impl AsRef<str>) -> BytesMut {
let mut msg = msg
.as_ref()

View File

@ -7,7 +7,9 @@
- Panic when calling `Route::to()` or `Route::service()` after `Route::wrap()` to prevent silently dropping route middleware. [#3944]
- Fix `HttpRequest::{match_pattern,match_name}` reporting path-only matches when route guards disambiguate overlapping resources. [#3346]
- Fix `Readlines` handling of lines split across payload chunks so combined line limits are enforced and complete lines are yielded.
- Update `foldhash` dependency to `0.2`.
- Update `rand` dependency to `0.10`.
- Add `HttpServer::h1_write_buffer_size()`.
[#3944]: https://github.com/actix/actix-web/pull/3944
[#3346]: https://github.com/actix/actix-web/issues/3346

View File

@ -137,7 +137,7 @@ actix-service = "2"
actix-tls = { version = "3.4", default-features = false, optional = true }
actix-utils = "3"
actix-http = "3.12.0"
actix-http = "3.12.1"
actix-router = { version = "0.5.4", default-features = false, features = ["http"] }
actix-web-codegen = { version = "4.3", optional = true, default-features = false }
@ -147,7 +147,7 @@ cfg-if = "1"
cookie = { version = "0.16", features = ["percent-encode"], optional = true }
derive_more = { version = "2", features = ["as_ref", "deref", "deref_mut", "display", "error", "from"] }
encoding_rs = "0.8"
foldhash = "0.1"
foldhash = "0.2"
futures-core = { version = "0.3.17", default-features = false }
futures-util = { version = "0.3.17", default-features = false }
impl-more = "0.1.4"

View File

@ -33,6 +33,7 @@ struct Config {
client_request_timeout: Duration,
client_disconnect_timeout: Duration,
h1_allow_half_closed: bool,
h1_write_buffer_size: Option<usize>,
h2_initial_window_size: Option<u32>,
h2_initial_connection_window_size: Option<u32>,
#[allow(dead_code)] // only dead when no TLS features are enabled
@ -122,6 +123,7 @@ where
client_request_timeout: Duration::from_secs(5),
client_disconnect_timeout: Duration::from_secs(1),
h1_allow_half_closed: true,
h1_write_buffer_size: None,
h2_initial_window_size: None,
h2_initial_connection_window_size: None,
tls_handshake_timeout: None,
@ -245,7 +247,7 @@ where
///
/// To disable timeout set value to 0.
///
/// By default client timeout is set to 5000 milliseconds.
/// By default client timeout is set to 1000 milliseconds.
pub fn client_disconnect_timeout(self, dur: Duration) -> Self {
self.config.lock().unwrap().client_disconnect_timeout = dur;
self
@ -286,6 +288,25 @@ where
self
}
/// Sets the maximum response write buffer size for HTTP/1 connections.
///
/// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream.
///
/// The default value is 32 KiB.
///
/// # Panics
///
/// Panics if `size` is 0.
pub fn h1_write_buffer_size(self, size: usize) -> Self {
assert!(
size > 0,
"HTTP/1 write buffer size must be greater than zero"
);
self.config.lock().unwrap().h1_write_buffer_size = Some(size);
self
}
/// Sets initial stream-level flow control window size for HTTP/2 connections.
///
/// Higher values can improve upload performance on high-latency links at the cost of higher
@ -629,6 +650,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = cfg.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = cfg.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -686,6 +711,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = cfg.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = cfg.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -774,6 +803,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = c.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -837,6 +870,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = c.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -915,6 +952,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = c.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -993,6 +1034,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = c.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -1072,6 +1117,10 @@ where
svc = svc.tcp_nodelay(enabled);
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
if let Some(val) = c.h2_initial_window_size {
svc = svc.h2_initial_window_size(val);
}
@ -1140,14 +1189,19 @@ where
.into_factory()
.map_err(|err| err.into().error_response());
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then(
HttpService::build()
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then({
let mut svc = HttpService::build()
.keep_alive(c.keep_alive)
.client_request_timeout(c.client_request_timeout)
.client_disconnect_timeout(c.client_disconnect_timeout)
.h1_allow_half_closed(c.h1_allow_half_closed)
.finish(map_config(fac, move |_| config.clone())),
)
.h1_allow_half_closed(c.h1_allow_half_closed);
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
svc.finish(map_config(fac, move |_| config.clone()))
})
},
)?;
@ -1194,6 +1248,10 @@ where
svc = svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext));
}
if let Some(size) = c.h1_write_buffer_size {
svc = svc.h1_write_buffer_size(size);
}
let fac = factory()
.into_factory()
.map_err(|err| err.into().error_response());

View File

@ -98,7 +98,7 @@ dangerous-h2c = []
[dependencies]
actix-codec = "0.5"
actix-http = { version = "3.12.0", features = ["http2", "ws"] }
actix-http = { version = "3.12.1", features = ["http2", "ws"] }
actix-rt = { version = "2.1", default-features = false }
actix-service = "2"
actix-tls = { version = "3.4", features = ["connect", "uri"] }