mirror of https://github.com/fafhrd91/actix-web
Compare commits
5 Commits
e6d09913d9
...
8c397f83a3
| Author | SHA1 | Date |
|---|---|---|
|
|
8c397f83a3 | |
|
|
cffe5d271e | |
|
|
10609f749d | |
|
|
0fb89457ed | |
|
|
3c056bd361 |
|
|
@ -49,7 +49,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "actix-http"
|
||||
version = "3.12.0"
|
||||
version = "3.12.1"
|
||||
dependencies = [
|
||||
"actix-codec",
|
||||
"actix-http-test",
|
||||
|
|
@ -72,7 +72,7 @@ dependencies = [
|
|||
"encoding_rs",
|
||||
"env_logger",
|
||||
"flate2",
|
||||
"foldhash",
|
||||
"foldhash 0.2.0",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"h2",
|
||||
|
|
@ -351,7 +351,7 @@ dependencies = [
|
|||
"encoding_rs",
|
||||
"env_logger",
|
||||
"flate2",
|
||||
"foldhash",
|
||||
"foldhash 0.2.0",
|
||||
"futures-core",
|
||||
"futures-util",
|
||||
"impl-more",
|
||||
|
|
@ -1320,6 +1320,12 @@ version = "0.1.5"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
|
||||
|
||||
[[package]]
|
||||
name = "foldhash"
|
||||
version = "0.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb"
|
||||
|
||||
[[package]]
|
||||
name = "foreign-types"
|
||||
version = "0.3.2"
|
||||
|
|
@ -1535,7 +1541,7 @@ version = "0.15.5"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
|
||||
dependencies = [
|
||||
"foldhash",
|
||||
"foldhash 0.1.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
|
|
@ -2,10 +2,21 @@
|
|||
|
||||
## Unreleased
|
||||
|
||||
- When configured, gracefully close HTTP/1 connections after early responses to unread request bodies. [#3967]
|
||||
- Update `foldhash` dependency to `0.2`.
|
||||
|
||||
[#3967]: https://github.com/actix/actix-web/issues/3967
|
||||
|
||||
## 3.12.1
|
||||
|
||||
**Notice: This release contains a security fix. Users are encouraged to update to this version ASAP.**
|
||||
|
||||
- SECURITY: Reject HTTP/1 requests with ambiguous request framing from `Content-Length` and `Transfer-Encoding` headers to prevent request smuggling.
|
||||
- Encode the HTTP/1 `Connection: Upgrade` header in Camel-Case when camel-case header formatting is enabled.[#3953]
|
||||
- Fix `HeaderMap` iterators' `len()` and `size_hint()` implementations for multi-value headers.
|
||||
- Update `rand` dependency to `0.10`.
|
||||
- Update `sha1` dependency to `0.11`.
|
||||
- Add `ServiceConfigBuilder::h1_write_buffer_size()` and `HttpServiceBuilder::h1_write_buffer_size()`.
|
||||
|
||||
[#3953]: https://github.com/actix/actix-web/pull/3953
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "actix-http"
|
||||
version = "3.12.0"
|
||||
version = "3.12.1"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Rob Ede <robjtede@icloud.com>"]
|
||||
description = "HTTP types and services for the Actix ecosystem"
|
||||
keywords = ["actix", "http", "framework", "async", "futures"]
|
||||
|
|
@ -102,7 +102,7 @@ bytes = "1"
|
|||
bytestring = "1"
|
||||
derive_more = { version = "2", features = ["as_ref", "deref", "deref_mut", "display", "error", "from"] }
|
||||
encoding_rs = "0.8"
|
||||
foldhash = "0.1"
|
||||
foldhash = "0.2"
|
||||
futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] }
|
||||
http = "0.2.7"
|
||||
httparse = "1.5.1"
|
||||
|
|
|
|||
|
|
@ -5,7 +5,9 @@ use actix_service::{IntoServiceFactory, Service, ServiceFactory};
|
|||
|
||||
use crate::{
|
||||
body::{BoxBody, MessageBody},
|
||||
config::{DEFAULT_H2_CONN_WINDOW_SIZE, DEFAULT_H2_STREAM_WINDOW_SIZE},
|
||||
config::{
|
||||
DEFAULT_H1_WRITE_BUFFER_SIZE, DEFAULT_H2_CONN_WINDOW_SIZE, DEFAULT_H2_STREAM_WINDOW_SIZE,
|
||||
},
|
||||
h1::{self, ExpectHandler, H1Service, UpgradeHandler},
|
||||
service::HttpService,
|
||||
ConnectCallback, Extensions, KeepAlive, Request, Response, ServiceConfigBuilder,
|
||||
|
|
@ -22,6 +24,7 @@ pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> {
|
|||
secure: bool,
|
||||
local_addr: Option<net::SocketAddr>,
|
||||
h1_allow_half_closed: bool,
|
||||
h1_write_buffer_size: usize,
|
||||
h2_conn_window_size: u32,
|
||||
h2_stream_window_size: u32,
|
||||
expect: X,
|
||||
|
|
@ -47,6 +50,7 @@ where
|
|||
secure: false,
|
||||
local_addr: None,
|
||||
h1_allow_half_closed: true,
|
||||
h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE,
|
||||
h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE,
|
||||
h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE,
|
||||
|
||||
|
|
@ -151,6 +155,25 @@ where
|
|||
self
|
||||
}
|
||||
|
||||
/// Sets the maximum response write buffer size for HTTP/1 connections.
|
||||
///
|
||||
/// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream.
|
||||
///
|
||||
/// The default value is 32 KiB.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `size` is 0.
|
||||
pub fn h1_write_buffer_size(mut self, size: usize) -> Self {
|
||||
assert!(
|
||||
size > 0,
|
||||
"HTTP/1 write buffer size must be greater than zero"
|
||||
);
|
||||
|
||||
self.h1_write_buffer_size = size;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets initial stream-level flow control window size for HTTP/2 connections.
|
||||
///
|
||||
/// See [`ServiceConfigBuilder::h2_initial_window_size`] for more details.
|
||||
|
|
@ -187,6 +210,7 @@ where
|
|||
secure: self.secure,
|
||||
local_addr: self.local_addr,
|
||||
h1_allow_half_closed: self.h1_allow_half_closed,
|
||||
h1_write_buffer_size: self.h1_write_buffer_size,
|
||||
h2_conn_window_size: self.h2_conn_window_size,
|
||||
h2_stream_window_size: self.h2_stream_window_size,
|
||||
expect: expect.into_factory(),
|
||||
|
|
@ -215,6 +239,7 @@ where
|
|||
secure: self.secure,
|
||||
local_addr: self.local_addr,
|
||||
h1_allow_half_closed: self.h1_allow_half_closed,
|
||||
h1_write_buffer_size: self.h1_write_buffer_size,
|
||||
h2_conn_window_size: self.h2_conn_window_size,
|
||||
h2_stream_window_size: self.h2_stream_window_size,
|
||||
expect: self.expect,
|
||||
|
|
@ -254,6 +279,7 @@ where
|
|||
.secure(self.secure)
|
||||
.local_addr(self.local_addr)
|
||||
.h1_allow_half_closed(self.h1_allow_half_closed)
|
||||
.h1_write_buffer_size(self.h1_write_buffer_size)
|
||||
.h2_initial_window_size(self.h2_stream_window_size)
|
||||
.h2_initial_connection_window_size(self.h2_conn_window_size)
|
||||
.build();
|
||||
|
|
@ -283,6 +309,7 @@ where
|
|||
.secure(self.secure)
|
||||
.local_addr(self.local_addr)
|
||||
.h1_allow_half_closed(self.h1_allow_half_closed)
|
||||
.h1_write_buffer_size(self.h1_write_buffer_size)
|
||||
.h2_initial_window_size(self.h2_stream_window_size)
|
||||
.h2_initial_connection_window_size(self.h2_conn_window_size)
|
||||
.build();
|
||||
|
|
@ -309,6 +336,7 @@ where
|
|||
.secure(self.secure)
|
||||
.local_addr(self.local_addr)
|
||||
.h1_allow_half_closed(self.h1_allow_half_closed)
|
||||
.h1_write_buffer_size(self.h1_write_buffer_size)
|
||||
.h2_initial_window_size(self.h2_stream_window_size)
|
||||
.h2_initial_connection_window_size(self.h2_conn_window_size)
|
||||
.build();
|
||||
|
|
|
|||
|
|
@ -18,6 +18,9 @@ pub(crate) const DEFAULT_H2_CONN_WINDOW_SIZE: u32 = 1024 * 1024 * 2; // 2MiB
|
|||
/// Matches awc's defaults to avoid poor throughput on high-BDP links.
|
||||
pub(crate) const DEFAULT_H2_STREAM_WINDOW_SIZE: u32 = 1024 * 1024; // 1MiB
|
||||
|
||||
/// Default HTTP/1 response write buffer size.
|
||||
pub(crate) const DEFAULT_H1_WRITE_BUFFER_SIZE: usize = 32_768;
|
||||
|
||||
/// A builder for creating a [`ServiceConfig`]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct ServiceConfigBuilder {
|
||||
|
|
@ -86,6 +89,25 @@ impl ServiceConfigBuilder {
|
|||
self
|
||||
}
|
||||
|
||||
/// Sets the maximum response write buffer size for HTTP/1 connections.
|
||||
///
|
||||
/// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream.
|
||||
///
|
||||
/// The default value is 32 KiB.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `size` is 0.
|
||||
pub fn h1_write_buffer_size(mut self, size: usize) -> Self {
|
||||
assert!(
|
||||
size > 0,
|
||||
"HTTP/1 write buffer size must be greater than zero"
|
||||
);
|
||||
|
||||
self.inner.h1_write_buffer_size = size;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets initial stream-level flow control window size for HTTP/2 connections.
|
||||
///
|
||||
/// Higher values can improve upload performance on high-latency links at the cost of higher
|
||||
|
|
@ -128,6 +150,7 @@ struct Inner {
|
|||
tcp_nodelay: Option<bool>,
|
||||
date_service: DateService,
|
||||
h1_allow_half_closed: bool,
|
||||
h1_write_buffer_size: usize,
|
||||
h2_conn_window_size: u32,
|
||||
h2_stream_window_size: u32,
|
||||
}
|
||||
|
|
@ -143,6 +166,7 @@ impl Default for Inner {
|
|||
tcp_nodelay: None,
|
||||
date_service: DateService::new(),
|
||||
h1_allow_half_closed: true,
|
||||
h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE,
|
||||
h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE,
|
||||
h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE,
|
||||
}
|
||||
|
|
@ -167,6 +191,7 @@ impl ServiceConfig {
|
|||
tcp_nodelay: None,
|
||||
date_service: DateService::new(),
|
||||
h1_allow_half_closed: true,
|
||||
h1_write_buffer_size: DEFAULT_H1_WRITE_BUFFER_SIZE,
|
||||
h2_conn_window_size: DEFAULT_H2_CONN_WINDOW_SIZE,
|
||||
h2_stream_window_size: DEFAULT_H2_STREAM_WINDOW_SIZE,
|
||||
}))
|
||||
|
|
@ -228,6 +253,11 @@ impl ServiceConfig {
|
|||
self.0.h1_allow_half_closed
|
||||
}
|
||||
|
||||
/// HTTP/1 response write buffer size (in bytes).
|
||||
pub fn h1_write_buffer_size(&self) -> usize {
|
||||
self.0.h1_write_buffer_size
|
||||
}
|
||||
|
||||
/// Returns configured `TCP_NODELAY` setting for accepted TCP connections.
|
||||
pub fn tcp_nodelay(&self) -> Option<bool> {
|
||||
self.0.tcp_nodelay
|
||||
|
|
|
|||
|
|
@ -237,4 +237,18 @@ mod tests {
|
|||
assert_eq!(*req.method(), Method::POST);
|
||||
assert!(req.chunked().unwrap());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_http_request_rejects_content_length_and_chunked() {
|
||||
let mut codec = Codec::default();
|
||||
let mut buf = BytesMut::from(
|
||||
"POST /test HTTP/1.1\r\n\
|
||||
content-length: 11\r\n\
|
||||
transfer-encoding: chunked\r\n\r\n\
|
||||
0\r\n\r\n\
|
||||
GET /test2 HTTP/1.1\r\n\r\n",
|
||||
);
|
||||
|
||||
assert!(codec.decode(&mut buf).is_err());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -275,6 +275,23 @@ impl MessageType for Request {
|
|||
// convert headers
|
||||
let mut length = msg.set_headers(&src.split_to(len).freeze(), &headers[..h_len], ver)?;
|
||||
|
||||
if msg.head().headers.contains_key(header::TRANSFER_ENCODING) {
|
||||
if ver == Version::HTTP_10 {
|
||||
debug!("Transfer-Encoding is not allowed in HTTP/1.0 requests");
|
||||
return Err(ParseError::Header);
|
||||
}
|
||||
|
||||
if !crate::HttpMessage::chunked(&msg)? {
|
||||
debug!("request Transfer-Encoding must be chunked");
|
||||
return Err(ParseError::Header);
|
||||
}
|
||||
|
||||
if msg.head().headers.contains_key(header::CONTENT_LENGTH) {
|
||||
debug!("both Content-Length and Transfer-Encoding are set");
|
||||
return Err(ParseError::Header);
|
||||
}
|
||||
}
|
||||
|
||||
// disallow HTTP/1.0 POST requests that do not contain a Content-Length headers
|
||||
// see https://datatracker.ietf.org/doc/html/rfc1945#section-7.2.2
|
||||
if ver == Version::HTTP_10 && method == Method::POST && length.is_none() {
|
||||
|
|
@ -1116,18 +1133,57 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn hrs_cl_and_te_http10() {
|
||||
// in HTTP/1.0 transfer encoding is simply ignored so it's fine to have both
|
||||
|
||||
let mut buf = BytesMut::from(
|
||||
expect_parse_err!(&mut BytesMut::from(
|
||||
"GET / HTTP/1.0\r\n\
|
||||
Host: example.com\r\n\
|
||||
Content-Length: 3\r\n\
|
||||
Transfer-Encoding: chunked\r\n\
|
||||
\r\n\
|
||||
000",
|
||||
);
|
||||
));
|
||||
}
|
||||
|
||||
parse_ready!(&mut buf);
|
||||
#[test]
|
||||
fn hrs_cl_and_chunked_te_http11() {
|
||||
expect_parse_err!(&mut BytesMut::from(
|
||||
"POST / HTTP/1.1\r\n\
|
||||
Host: example.com\r\n\
|
||||
Content-Length: 3\r\n\
|
||||
Transfer-Encoding: chunked\r\n\
|
||||
\r\n\
|
||||
0\r\n\
|
||||
\r\n",
|
||||
));
|
||||
|
||||
expect_parse_err!(&mut BytesMut::from(
|
||||
"POST / HTTP/1.1\r\n\
|
||||
Host: example.com\r\n\
|
||||
Transfer-Encoding: chunked\r\n\
|
||||
Content-Length: 3\r\n\
|
||||
\r\n\
|
||||
0\r\n\
|
||||
\r\n",
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn hrs_identity_te_http11() {
|
||||
expect_parse_err!(&mut BytesMut::from(
|
||||
"POST / HTTP/1.1\r\n\
|
||||
Host: example.com\r\n\
|
||||
Transfer-Encoding: identity\r\n\
|
||||
\r\n\
|
||||
0\r\n",
|
||||
));
|
||||
|
||||
expect_parse_err!(&mut BytesMut::from(
|
||||
"POST / HTTP/1.1\r\n\
|
||||
Host: example.com\r\n\
|
||||
Content-Length: 3\r\n\
|
||||
Transfer-Encoding: identity\r\n\
|
||||
\r\n\
|
||||
0\r\n",
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -1165,14 +1221,16 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
fn transfer_encoding_agrees() {
|
||||
fn hrs_chunked_te_http11() {
|
||||
let mut buf = BytesMut::from(
|
||||
"GET /test HTTP/1.1\r\n\
|
||||
Host: example.com\r\n\
|
||||
Content-Length: 3\r\n\
|
||||
Transfer-Encoding: identity\r\n\
|
||||
Transfer-Encoding: chunked\r\n\
|
||||
\r\n\
|
||||
0\r\n",
|
||||
1\r\n\
|
||||
a\r\n\
|
||||
0\r\n\
|
||||
\r\n",
|
||||
);
|
||||
|
||||
let mut reader = MessageDecoder::<Request>::default();
|
||||
|
|
@ -1180,6 +1238,6 @@ mod tests {
|
|||
let mut pl = pl.unwrap();
|
||||
|
||||
let chunk = pl.decode(&mut buf).unwrap().unwrap();
|
||||
assert_eq!(chunk, PayloadItem::Chunk(Bytes::from_static(b"0\r\n")));
|
||||
assert_eq!(chunk, PayloadItem::Chunk(Bytes::from_static(b"a")));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ use crate::{
|
|||
config::ServiceConfig,
|
||||
error::{DispatchError, ParseError, PayloadError},
|
||||
service::HttpFlow,
|
||||
Error, Extensions, HttpMessage, OnConnectData, Request, Response, StatusCode,
|
||||
ConnectionType, Error, Extensions, HttpMessage, OnConnectData, Request, Response, StatusCode,
|
||||
};
|
||||
|
||||
const LW_BUFFER_SIZE: usize = 1024;
|
||||
|
|
@ -58,6 +58,9 @@ bitflags! {
|
|||
|
||||
/// Set if write-half is disconnected.
|
||||
const WRITE_DISCONNECT = 0b0010_0000;
|
||||
|
||||
/// Set while gracefully closing a connection after an early response.
|
||||
const LINGER = 0b0100_0000;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -168,6 +171,7 @@ pin_project! {
|
|||
pub(super) io: Option<T>,
|
||||
read_buf: BytesMut,
|
||||
write_buf: BytesMut,
|
||||
h1_write_buffer_size: usize,
|
||||
codec: Codec,
|
||||
}
|
||||
}
|
||||
|
|
@ -281,6 +285,7 @@ where
|
|||
io: Some(io),
|
||||
read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
|
||||
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
|
||||
h1_write_buffer_size: config.h1_write_buffer_size(),
|
||||
codec: Codec::new(config),
|
||||
},
|
||||
},
|
||||
|
|
@ -361,6 +366,65 @@ where
|
|||
io.poll_flush(cx)
|
||||
}
|
||||
|
||||
fn enter_linger(mut self: Pin<&mut Self>) {
|
||||
let this = self.as_mut().project();
|
||||
this.flags.remove(Flags::KEEP_ALIVE);
|
||||
this.flags.insert(Flags::LINGER | Flags::FINISHED);
|
||||
}
|
||||
|
||||
fn ensure_linger_timer(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool {
|
||||
let this = self.as_mut().project();
|
||||
|
||||
if matches!(this.shutdown_timer, TimerState::Active { .. }) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if let Some(deadline) = this.config.client_disconnect_deadline() {
|
||||
this.shutdown_timer
|
||||
.set_and_init(cx, sleep_until(deadline.into()), line!());
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_linger(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Result<Poll<()>, DispatchError> {
|
||||
if self.as_mut().poll_flush(cx)?.is_pending() {
|
||||
return Ok(Poll::Pending);
|
||||
}
|
||||
|
||||
if !self.as_mut().ensure_linger_timer(cx) {
|
||||
let this = self.as_mut().project();
|
||||
this.flags.remove(Flags::LINGER);
|
||||
this.flags.insert(Flags::SHUTDOWN);
|
||||
return Ok(Poll::Ready(()));
|
||||
}
|
||||
|
||||
loop {
|
||||
let should_disconnect = self.as_mut().read_available(cx)?;
|
||||
let this = self.as_mut().project();
|
||||
let mut progressed = false;
|
||||
|
||||
if !this.read_buf.is_empty() {
|
||||
this.read_buf.clear();
|
||||
progressed = true;
|
||||
}
|
||||
|
||||
if should_disconnect {
|
||||
this.flags.remove(Flags::LINGER);
|
||||
this.flags.insert(Flags::READ_DISCONNECT | Flags::SHUTDOWN);
|
||||
return Ok(Poll::Ready(()));
|
||||
}
|
||||
|
||||
if !progressed {
|
||||
return Ok(Poll::Pending);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn send_response_inner(
|
||||
self: Pin<&mut Self>,
|
||||
res: Response<()>,
|
||||
|
|
@ -385,54 +449,90 @@ where
|
|||
|
||||
fn send_response(
|
||||
mut self: Pin<&mut Self>,
|
||||
res: Response<()>,
|
||||
mut res: Response<()>,
|
||||
body: B,
|
||||
) -> Result<(), DispatchError> {
|
||||
let size = self.as_mut().send_response_inner(res, &body)?;
|
||||
let mut this = self.project();
|
||||
this.state.set(match size {
|
||||
BodySize::None | BodySize::Sized(0) => {
|
||||
let payload_unfinished = this.payload.is_some();
|
||||
let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
|
||||
&& *this.payload_drainable;
|
||||
let close_after_response = {
|
||||
let this = self.as_mut().project();
|
||||
should_close_after_response(this.payload.as_ref(), *this.payload_drainable)
|
||||
};
|
||||
|
||||
if payload_unfinished && !drain_payload {
|
||||
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
|
||||
if close_after_response {
|
||||
res.head_mut().set_connection_type(ConnectionType::Close);
|
||||
}
|
||||
|
||||
let size = self.as_mut().send_response_inner(res, &body)?;
|
||||
match size {
|
||||
BodySize::None | BodySize::Sized(0) => {
|
||||
let this = self.as_mut().project();
|
||||
|
||||
if close_after_response {
|
||||
if this.config.client_disconnect_deadline().is_some() {
|
||||
drop(this);
|
||||
self.as_mut().enter_linger();
|
||||
} else {
|
||||
self.as_mut()
|
||||
.project()
|
||||
.flags
|
||||
.insert(Flags::SHUTDOWN | Flags::FINISHED);
|
||||
}
|
||||
} else {
|
||||
this.flags.insert(Flags::FINISHED);
|
||||
}
|
||||
|
||||
State::None
|
||||
self.as_mut().project().state.set(State::None);
|
||||
}
|
||||
_ => State::SendPayload { body },
|
||||
});
|
||||
_ => self
|
||||
.as_mut()
|
||||
.project()
|
||||
.state
|
||||
.set(State::SendPayload { body }),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_error_response(
|
||||
mut self: Pin<&mut Self>,
|
||||
res: Response<()>,
|
||||
mut res: Response<()>,
|
||||
body: BoxBody,
|
||||
) -> Result<(), DispatchError> {
|
||||
let size = self.as_mut().send_response_inner(res, &body)?;
|
||||
let mut this = self.project();
|
||||
this.state.set(match size {
|
||||
BodySize::None | BodySize::Sized(0) => {
|
||||
let payload_unfinished = this.payload.is_some();
|
||||
let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
|
||||
&& *this.payload_drainable;
|
||||
let close_after_response = {
|
||||
let this = self.as_mut().project();
|
||||
should_close_after_response(this.payload.as_ref(), *this.payload_drainable)
|
||||
};
|
||||
|
||||
if payload_unfinished && !drain_payload {
|
||||
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
|
||||
if close_after_response {
|
||||
res.head_mut().set_connection_type(ConnectionType::Close);
|
||||
}
|
||||
|
||||
let size = self.as_mut().send_response_inner(res, &body)?;
|
||||
match size {
|
||||
BodySize::None | BodySize::Sized(0) => {
|
||||
let this = self.as_mut().project();
|
||||
|
||||
if close_after_response {
|
||||
if this.config.client_disconnect_deadline().is_some() {
|
||||
drop(this);
|
||||
self.as_mut().enter_linger();
|
||||
} else {
|
||||
self.as_mut()
|
||||
.project()
|
||||
.flags
|
||||
.insert(Flags::SHUTDOWN | Flags::FINISHED);
|
||||
}
|
||||
} else {
|
||||
this.flags.insert(Flags::FINISHED);
|
||||
}
|
||||
|
||||
State::None
|
||||
self.as_mut().project().state.set(State::None);
|
||||
}
|
||||
_ => State::SendErrorPayload { body },
|
||||
});
|
||||
_ => self
|
||||
.as_mut()
|
||||
.project()
|
||||
.state
|
||||
.set(State::SendErrorPayload { body }),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -520,7 +620,7 @@ where
|
|||
StateProj::SendPayload { mut body } => {
|
||||
// keep populate writer buffer until buffer size limit hit,
|
||||
// get blocked or finished.
|
||||
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
|
||||
while this.write_buf.len() < *this.h1_write_buffer_size {
|
||||
match body.as_mut().poll_next(cx) {
|
||||
Poll::Ready(Some(Ok(item))) => {
|
||||
this.codec
|
||||
|
|
@ -534,18 +634,26 @@ where
|
|||
// this.payload was the payload for the request we just finished
|
||||
// responding to. We can check to see if we finished reading it
|
||||
// yet, and if not, shutdown the connection.
|
||||
let payload_unfinished = this.payload.is_some();
|
||||
let drain_payload =
|
||||
this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
|
||||
&& *this.payload_drainable;
|
||||
let close_after_response = should_close_after_response(
|
||||
this.payload.as_ref(),
|
||||
*this.payload_drainable,
|
||||
);
|
||||
let not_pipelined = this.messages.is_empty();
|
||||
|
||||
// payload stream finished.
|
||||
// set state to None and handle next message
|
||||
this.state.set(State::None);
|
||||
|
||||
if not_pipelined && payload_unfinished && !drain_payload {
|
||||
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
|
||||
if not_pipelined && close_after_response {
|
||||
if this.config.client_disconnect_deadline().is_some() {
|
||||
drop(this);
|
||||
self.as_mut().enter_linger();
|
||||
} else {
|
||||
self.as_mut()
|
||||
.project()
|
||||
.flags
|
||||
.insert(Flags::SHUTDOWN | Flags::FINISHED);
|
||||
}
|
||||
} else {
|
||||
this.flags.insert(Flags::FINISHED);
|
||||
}
|
||||
|
|
@ -574,7 +682,7 @@ where
|
|||
|
||||
// keep populate writer buffer until buffer size limit hit,
|
||||
// get blocked or finished.
|
||||
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
|
||||
while this.write_buf.len() < *this.h1_write_buffer_size {
|
||||
match body.as_mut().poll_next(cx) {
|
||||
Poll::Ready(Some(Ok(item))) => {
|
||||
this.codec
|
||||
|
|
@ -588,18 +696,26 @@ where
|
|||
// this.payload was the payload for the request we just finished
|
||||
// responding to. We can check to see if we finished reading it
|
||||
// yet, and if not, shutdown the connection.
|
||||
let payload_unfinished = this.payload.is_some();
|
||||
let drain_payload =
|
||||
this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
|
||||
&& *this.payload_drainable;
|
||||
let close_after_response = should_close_after_response(
|
||||
this.payload.as_ref(),
|
||||
*this.payload_drainable,
|
||||
);
|
||||
let not_pipelined = this.messages.is_empty();
|
||||
|
||||
// payload stream finished.
|
||||
// set state to None and handle next message
|
||||
this.state.set(State::None);
|
||||
|
||||
if not_pipelined && payload_unfinished && !drain_payload {
|
||||
this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
|
||||
if not_pipelined && close_after_response {
|
||||
if this.config.client_disconnect_deadline().is_some() {
|
||||
drop(this);
|
||||
self.as_mut().enter_linger();
|
||||
} else {
|
||||
self.as_mut()
|
||||
.project()
|
||||
.flags
|
||||
.insert(Flags::SHUTDOWN | Flags::FINISHED);
|
||||
}
|
||||
} else {
|
||||
this.flags.insert(Flags::FINISHED);
|
||||
}
|
||||
|
|
@ -960,14 +1076,20 @@ where
|
|||
let this = self.as_mut().project();
|
||||
if let TimerState::Active { timer } = this.shutdown_timer {
|
||||
debug_assert!(
|
||||
this.flags.contains(Flags::SHUTDOWN),
|
||||
"shutdown flag should be set when timer is active",
|
||||
this.flags.intersects(Flags::LINGER | Flags::SHUTDOWN),
|
||||
"shutdown or linger flag should be set when timer is active",
|
||||
);
|
||||
|
||||
// timed-out during shutdown; drop connection
|
||||
if timer.as_mut().poll(cx).is_ready() {
|
||||
trace!("timed-out during shutdown");
|
||||
return Err(DispatchError::DisconnectTimeout);
|
||||
if this.flags.contains(Flags::LINGER) {
|
||||
trace!("timed-out during linger; shutting down connection");
|
||||
this.flags.remove(Flags::LINGER);
|
||||
this.flags.insert(Flags::SHUTDOWN);
|
||||
this.shutdown_timer.clear(line!());
|
||||
} else {
|
||||
trace!("timed-out during shutdown");
|
||||
return Err(DispatchError::DisconnectTimeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1133,7 +1255,15 @@ where
|
|||
|
||||
inner.as_mut().poll_timers(cx)?;
|
||||
|
||||
let poll = if inner.flags.contains(Flags::SHUTDOWN) {
|
||||
let poll = if inner.flags.contains(Flags::LINGER) {
|
||||
match inner.as_mut().poll_linger(cx)? {
|
||||
Poll::Ready(()) => {
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
} else if inner.flags.contains(Flags::SHUTDOWN) {
|
||||
if inner.flags.contains(Flags::WRITE_DISCONNECT) {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
|
|
@ -1281,7 +1411,7 @@ where
|
|||
inner_p.shutdown_timer,
|
||||
);
|
||||
|
||||
if inner_p.flags.contains(Flags::SHUTDOWN) {
|
||||
if inner_p.flags.intersects(Flags::LINGER | Flags::SHUTDOWN) {
|
||||
cx.waker().wake_by_ref();
|
||||
}
|
||||
Poll::Pending
|
||||
|
|
@ -1295,6 +1425,13 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
fn should_close_after_response(payload: Option<&PayloadSender>, payload_drainable: bool) -> bool {
|
||||
let payload_unfinished = payload.is_some();
|
||||
let drain_payload = payload.is_some_and(|pl| pl.is_dropped()) && payload_drainable;
|
||||
|
||||
payload_unfinished && !drain_payload
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
fn trace_timer_states(
|
||||
label: &str,
|
||||
|
|
|
|||
|
|
@ -1,13 +1,19 @@
|
|||
use std::{
|
||||
cell::Cell,
|
||||
future::Future,
|
||||
io,
|
||||
pin::Pin,
|
||||
rc::Rc,
|
||||
str,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use actix_codec::Framed;
|
||||
use actix_rt::{pin, time::sleep};
|
||||
use actix_rt::{
|
||||
pin,
|
||||
time::{sleep, timeout},
|
||||
};
|
||||
use actix_service::{fn_service, Service};
|
||||
use actix_utils::future::{ready, Ready};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
|
|
@ -43,6 +49,111 @@ impl Service<Request> for YieldService {
|
|||
}
|
||||
}
|
||||
|
||||
struct ReadyChunkBody {
|
||||
chunk_polls: Rc<Cell<usize>>,
|
||||
remaining: usize,
|
||||
chunk_len: usize,
|
||||
}
|
||||
|
||||
impl ReadyChunkBody {
|
||||
fn new(chunk_polls: Rc<Cell<usize>>, remaining: usize, chunk_len: usize) -> Self {
|
||||
Self {
|
||||
chunk_polls,
|
||||
remaining,
|
||||
chunk_len,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MessageBody for ReadyChunkBody {
|
||||
type Error = Error;
|
||||
|
||||
fn size(&self) -> crate::body::BodySize {
|
||||
crate::body::BodySize::Stream
|
||||
}
|
||||
|
||||
fn poll_next(
|
||||
mut self: Pin<&mut Self>,
|
||||
_: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||
if self.remaining == 0 {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
|
||||
self.remaining -= 1;
|
||||
self.chunk_polls.set(self.chunk_polls.get() + 1);
|
||||
|
||||
Poll::Ready(Some(Ok(Bytes::from(vec![b'x'; self.chunk_len]))))
|
||||
}
|
||||
}
|
||||
|
||||
struct PendingOnceWriteBuf {
|
||||
io: TestBuffer,
|
||||
block_next_write: bool,
|
||||
}
|
||||
|
||||
impl PendingOnceWriteBuf {
|
||||
fn new<T>(data: T) -> Self
|
||||
where
|
||||
T: Into<BytesMut>,
|
||||
{
|
||||
Self {
|
||||
io: TestBuffer::new(data),
|
||||
block_next_write: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Read for PendingOnceWriteBuf {
|
||||
fn read(&mut self, dst: &mut [u8]) -> Result<usize, io::Error> {
|
||||
self.io.read(dst)
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Write for PendingOnceWriteBuf {
|
||||
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
|
||||
self.io.write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> io::Result<()> {
|
||||
self.io.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl actix_codec::AsyncRead for PendingOnceWriteBuf {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut actix_codec::ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.io).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl actix_codec::AsyncWrite for PendingOnceWriteBuf {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
if self.block_next_write {
|
||||
self.block_next_write = false;
|
||||
cx.waker().wake_by_ref();
|
||||
return Poll::Pending;
|
||||
}
|
||||
|
||||
Pin::new(&mut self.io).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.io).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.io).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
|
||||
memchr::memmem::find(&haystack[from..], needle)
|
||||
}
|
||||
|
|
@ -84,6 +195,11 @@ fn drop_payload_service() -> impl Service<Request, Response = Response<&'static
|
|||
})
|
||||
}
|
||||
|
||||
fn ignore_payload_service(
|
||||
) -> impl Service<Request, Response = Response<&'static str>, Error = Error> {
|
||||
fn_service(|_req: Request| ready(Ok::<_, Error>(Response::with_body(StatusCode::OK, "ok"))))
|
||||
}
|
||||
|
||||
fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, Error = Error> {
|
||||
fn_service(|mut req: Request| {
|
||||
Box::pin(async move {
|
||||
|
|
@ -100,6 +216,18 @@ fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, E
|
|||
})
|
||||
}
|
||||
|
||||
fn ready_chunk_body_service(
|
||||
chunk_polls: Rc<Cell<usize>>,
|
||||
chunk_count: usize,
|
||||
chunk_len: usize,
|
||||
) -> impl Service<Request, Response = Response<ReadyChunkBody>, Error = Error> {
|
||||
fn_service(move |_req: Request| {
|
||||
ready(Ok::<_, Error>(Response::ok().set_body(
|
||||
ReadyChunkBody::new(chunk_polls.clone(), chunk_count, chunk_len),
|
||||
)))
|
||||
})
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn late_request() {
|
||||
let mut buf = TestBuffer::empty();
|
||||
|
|
@ -536,15 +664,14 @@ async fn pipelining_ok_then_ok() {
|
|||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn early_response_with_payload_closes_connection() {
|
||||
async fn early_response_with_payload_lingers_before_closing() {
|
||||
lazy(|cx| {
|
||||
let buf = TestBuffer::new(
|
||||
"\
|
||||
GET /unfinished HTTP/1.1\r\n\
|
||||
Content-Length: 2\r\n\
|
||||
\r\n\
|
||||
",
|
||||
);
|
||||
let buf = TestSeqBuffer::new(http_msg(
|
||||
r"
|
||||
GET /unfinished HTTP/1.1
|
||||
Content-Length: 2
|
||||
",
|
||||
));
|
||||
|
||||
let cfg = ServiceConfig::new(
|
||||
KeepAlive::Os,
|
||||
|
|
@ -569,39 +696,172 @@ async fn early_response_with_payload_closes_connection() {
|
|||
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
||||
|
||||
match h1.as_mut().poll(cx) {
|
||||
Poll::Pending => panic!("Should have shut down"),
|
||||
Poll::Ready(res) => assert!(res.is_ok()),
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(res) => panic!("should still be lingering: {:?}", res),
|
||||
}
|
||||
|
||||
// polls: initial => shutdown
|
||||
assert_eq!(h1.poll_count, 2);
|
||||
// polls: initial
|
||||
assert_eq!(h1.poll_count, 1);
|
||||
|
||||
{
|
||||
let mut res = buf.write_buf_slice_mut();
|
||||
stabilize_date_header(&mut res);
|
||||
let res = &res[..];
|
||||
let mut res = buf.take_write_buf().to_vec();
|
||||
stabilize_date_header(&mut res);
|
||||
let res = &res[..];
|
||||
|
||||
let exp = b"\
|
||||
HTTP/1.1 200 OK\r\n\
|
||||
content-length: 11\r\n\
|
||||
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
||||
/unfinished\
|
||||
";
|
||||
let exp = b"\
|
||||
HTTP/1.1 200 OK\r\n\
|
||||
content-length: 11\r\n\
|
||||
connection: close\r\n\
|
||||
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
||||
/unfinished\
|
||||
";
|
||||
|
||||
assert_eq!(
|
||||
res,
|
||||
exp,
|
||||
"\nexpected response not in write buffer:\n\
|
||||
response: {:?}\n\
|
||||
expected: {:?}",
|
||||
String::from_utf8_lossy(res),
|
||||
String::from_utf8_lossy(exp)
|
||||
);
|
||||
}
|
||||
assert_eq!(
|
||||
res,
|
||||
exp,
|
||||
"\nexpected response not in write buffer:\n\
|
||||
response: {:?}\n\
|
||||
expected: {:?}",
|
||||
String::from_utf8_lossy(res),
|
||||
String::from_utf8_lossy(exp)
|
||||
);
|
||||
|
||||
buf.close_read();
|
||||
|
||||
assert!(h1.as_mut().poll(cx).is_pending());
|
||||
assert!(h1.as_mut().poll(cx).is_ready());
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn buffered_upload_ignored_by_handler_should_not_shutdown_immediately() {
|
||||
lazy(|cx| {
|
||||
let buf = TestSeqBuffer::new(http_msg(
|
||||
r"
|
||||
POST / HTTP/1.1
|
||||
Content-Length: 8
|
||||
|
||||
ab
|
||||
",
|
||||
));
|
||||
|
||||
let cfg = ServiceConfig::new(
|
||||
KeepAlive::Os,
|
||||
Duration::from_millis(1),
|
||||
Duration::from_millis(1),
|
||||
false,
|
||||
None,
|
||||
);
|
||||
|
||||
let services = HttpFlow::new(ignore_payload_service(), ExpectHandler, None);
|
||||
|
||||
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
||||
buf.clone(),
|
||||
services,
|
||||
cfg,
|
||||
None,
|
||||
OnConnectData::default(),
|
||||
);
|
||||
|
||||
pin!(h1);
|
||||
|
||||
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
|
||||
|
||||
match h1.as_mut().poll(cx) {
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(res) => panic!("closed connection early: {:?}", res),
|
||||
}
|
||||
|
||||
let mut res = BytesMut::from(buf.take_write_buf().as_ref());
|
||||
stabilize_date_header(&mut res);
|
||||
let res = &res[..];
|
||||
|
||||
let exp = http_msg(
|
||||
r"
|
||||
HTTP/1.1 200 OK
|
||||
content-length: 2
|
||||
connection: close
|
||||
date: Thu, 01 Jan 1970 12:34:56 UTC
|
||||
|
||||
ok
|
||||
",
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
res,
|
||||
exp,
|
||||
"\nexpected response not in write buffer:\n\
|
||||
response: {:?}\n\
|
||||
expected: {:?}",
|
||||
String::from_utf8_lossy(res),
|
||||
String::from_utf8_lossy(&exp)
|
||||
);
|
||||
|
||||
buf.close_read();
|
||||
|
||||
assert!(h1.as_mut().poll(cx).is_pending());
|
||||
assert!(h1.as_mut().poll(cx).is_ready());
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn lingering_timeout_uses_graceful_shutdown() {
|
||||
let buf = TestSeqBuffer::new(
|
||||
"\
|
||||
POST / HTTP/1.1\r\n\
|
||||
Content-Length: 8\r\n\
|
||||
\r\n\
|
||||
ab\
|
||||
",
|
||||
);
|
||||
|
||||
let cfg = ServiceConfig::new(
|
||||
KeepAlive::Disabled,
|
||||
Duration::ZERO,
|
||||
Duration::from_millis(1),
|
||||
false,
|
||||
None,
|
||||
);
|
||||
|
||||
let services = HttpFlow::new(ignore_payload_service(), ExpectHandler, None);
|
||||
|
||||
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
|
||||
buf.clone(),
|
||||
services,
|
||||
cfg,
|
||||
None,
|
||||
OnConnectData::default(),
|
||||
);
|
||||
|
||||
assert!(matches!(
|
||||
timeout(Duration::from_millis(100), h1).await,
|
||||
Ok(Ok(()))
|
||||
));
|
||||
|
||||
let mut res = buf.take_write_buf().to_vec();
|
||||
stabilize_date_header(&mut res);
|
||||
let res = &res[..];
|
||||
|
||||
let exp = b"\
|
||||
HTTP/1.1 200 OK\r\n\
|
||||
content-length: 2\r\n\
|
||||
connection: close\r\n\
|
||||
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
|
||||
ok\
|
||||
";
|
||||
|
||||
assert_eq!(
|
||||
res,
|
||||
exp,
|
||||
"\nexpected response not in write buffer:\n\
|
||||
response: {:?}\n\
|
||||
expected: {:?}",
|
||||
String::from_utf8_lossy(res),
|
||||
String::from_utf8_lossy(exp)
|
||||
);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn pipelining_ok_then_bad() {
|
||||
lazy(|cx| {
|
||||
|
|
@ -1224,6 +1484,58 @@ async fn disallow_half_closed() {
|
|||
assert!(matches!(inner.state, State::ServiceCall { .. }))
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn h1_write_buffer_size_limits_buffering() {
|
||||
let request = "GET /stream HTTP/1.1\r\nConnection: close\r\n\r\n";
|
||||
|
||||
let default_polls = Rc::new(Cell::new(0));
|
||||
let default_services = HttpFlow::new(
|
||||
ready_chunk_body_service(default_polls.clone(), 8, 1024),
|
||||
ExpectHandler,
|
||||
None::<UpgradeHandler>,
|
||||
);
|
||||
let default_io = PendingOnceWriteBuf::new(request);
|
||||
let default_dispatcher = Dispatcher::new(
|
||||
default_io,
|
||||
default_services,
|
||||
ServiceConfig::default(),
|
||||
None,
|
||||
OnConnectData::default(),
|
||||
);
|
||||
pin!(default_dispatcher);
|
||||
|
||||
let mut cx = Context::from_waker(futures_util::task::noop_waker_ref());
|
||||
assert!(default_dispatcher.as_mut().poll(&mut cx).is_pending());
|
||||
assert_eq!(default_polls.get(), 8);
|
||||
|
||||
let custom_polls = Rc::new(Cell::new(0));
|
||||
let custom_services = HttpFlow::new(
|
||||
ready_chunk_body_service(custom_polls.clone(), 8, 1024),
|
||||
ExpectHandler,
|
||||
None::<UpgradeHandler>,
|
||||
);
|
||||
let custom_io = PendingOnceWriteBuf::new(request);
|
||||
let custom_dispatcher = Dispatcher::new(
|
||||
custom_io,
|
||||
custom_services,
|
||||
crate::config::ServiceConfigBuilder::new()
|
||||
.h1_write_buffer_size(1024)
|
||||
.build(),
|
||||
None,
|
||||
OnConnectData::default(),
|
||||
);
|
||||
pin!(custom_dispatcher);
|
||||
|
||||
assert!(custom_dispatcher.as_mut().poll(&mut cx).is_pending());
|
||||
assert_eq!(custom_polls.get(), 1);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
#[should_panic(expected = "HTTP/1 write buffer size must be greater than zero")]
|
||||
async fn h1_write_buffer_size_rejects_zero() {
|
||||
let _ = crate::config::ServiceConfigBuilder::new().h1_write_buffer_size(0);
|
||||
}
|
||||
|
||||
fn http_msg(msg: impl AsRef<str>) -> BytesMut {
|
||||
let mut msg = msg
|
||||
.as_ref()
|
||||
|
|
|
|||
|
|
@ -7,7 +7,9 @@
|
|||
- Panic when calling `Route::to()` or `Route::service()` after `Route::wrap()` to prevent silently dropping route middleware. [#3944]
|
||||
- Fix `HttpRequest::{match_pattern,match_name}` reporting path-only matches when route guards disambiguate overlapping resources. [#3346]
|
||||
- Fix `Readlines` handling of lines split across payload chunks so combined line limits are enforced and complete lines are yielded.
|
||||
- Update `foldhash` dependency to `0.2`.
|
||||
- Update `rand` dependency to `0.10`.
|
||||
- Add `HttpServer::h1_write_buffer_size()`.
|
||||
|
||||
[#3944]: https://github.com/actix/actix-web/pull/3944
|
||||
[#3346]: https://github.com/actix/actix-web/issues/3346
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ actix-service = "2"
|
|||
actix-tls = { version = "3.4", default-features = false, optional = true }
|
||||
actix-utils = "3"
|
||||
|
||||
actix-http = "3.12.0"
|
||||
actix-http = "3.12.1"
|
||||
actix-router = { version = "0.5.4", default-features = false, features = ["http"] }
|
||||
actix-web-codegen = { version = "4.3", optional = true, default-features = false }
|
||||
|
||||
|
|
@ -147,7 +147,7 @@ cfg-if = "1"
|
|||
cookie = { version = "0.16", features = ["percent-encode"], optional = true }
|
||||
derive_more = { version = "2", features = ["as_ref", "deref", "deref_mut", "display", "error", "from"] }
|
||||
encoding_rs = "0.8"
|
||||
foldhash = "0.1"
|
||||
foldhash = "0.2"
|
||||
futures-core = { version = "0.3.17", default-features = false }
|
||||
futures-util = { version = "0.3.17", default-features = false }
|
||||
impl-more = "0.1.4"
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ struct Config {
|
|||
client_request_timeout: Duration,
|
||||
client_disconnect_timeout: Duration,
|
||||
h1_allow_half_closed: bool,
|
||||
h1_write_buffer_size: Option<usize>,
|
||||
h2_initial_window_size: Option<u32>,
|
||||
h2_initial_connection_window_size: Option<u32>,
|
||||
#[allow(dead_code)] // only dead when no TLS features are enabled
|
||||
|
|
@ -122,6 +123,7 @@ where
|
|||
client_request_timeout: Duration::from_secs(5),
|
||||
client_disconnect_timeout: Duration::from_secs(1),
|
||||
h1_allow_half_closed: true,
|
||||
h1_write_buffer_size: None,
|
||||
h2_initial_window_size: None,
|
||||
h2_initial_connection_window_size: None,
|
||||
tls_handshake_timeout: None,
|
||||
|
|
@ -245,7 +247,7 @@ where
|
|||
///
|
||||
/// To disable timeout set value to 0.
|
||||
///
|
||||
/// By default client timeout is set to 5000 milliseconds.
|
||||
/// By default client timeout is set to 1000 milliseconds.
|
||||
pub fn client_disconnect_timeout(self, dur: Duration) -> Self {
|
||||
self.config.lock().unwrap().client_disconnect_timeout = dur;
|
||||
self
|
||||
|
|
@ -286,6 +288,25 @@ where
|
|||
self
|
||||
}
|
||||
|
||||
/// Sets the maximum response write buffer size for HTTP/1 connections.
|
||||
///
|
||||
/// Once the response buffer reaches this size, the dispatcher flushes it to the I/O stream.
|
||||
///
|
||||
/// The default value is 32 KiB.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `size` is 0.
|
||||
pub fn h1_write_buffer_size(self, size: usize) -> Self {
|
||||
assert!(
|
||||
size > 0,
|
||||
"HTTP/1 write buffer size must be greater than zero"
|
||||
);
|
||||
|
||||
self.config.lock().unwrap().h1_write_buffer_size = Some(size);
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets initial stream-level flow control window size for HTTP/2 connections.
|
||||
///
|
||||
/// Higher values can improve upload performance on high-latency links at the cost of higher
|
||||
|
|
@ -629,6 +650,10 @@ where
|
|||
svc = svc.tcp_nodelay(enabled);
|
||||
}
|
||||
|
||||
if let Some(size) = cfg.h1_write_buffer_size {
|
||||
svc = svc.h1_write_buffer_size(size);
|
||||
}
|
||||
|
||||
if let Some(val) = cfg.h2_initial_window_size {
|
||||
svc = svc.h2_initial_window_size(val);
|
||||
}
|
||||
|
|
@ -686,6 +711,10 @@ where
|
|||
svc = svc.tcp_nodelay(enabled);
|
||||
}
|
||||
|
||||
if let Some(size) = cfg.h1_write_buffer_size {
|
||||
svc = svc.h1_write_buffer_size(size);
|
||||
}
|
||||
|
||||
if let Some(val) = cfg.h2_initial_window_size {
|
||||
svc = svc.h2_initial_window_size(val);
|
||||
}
|
||||
|
|
@ -774,6 +803,10 @@ where
|
|||
svc = svc.tcp_nodelay(enabled);
|
||||
}
|
||||
|
||||
if let Some(size) = c.h1_write_buffer_size {
|
||||
svc = svc.h1_write_buffer_size(size);
|
||||
}
|
||||
|
||||
if let Some(val) = c.h2_initial_window_size {
|
||||
svc = svc.h2_initial_window_size(val);
|
||||
}
|
||||
|
|
@ -837,6 +870,10 @@ where
|
|||
svc = svc.tcp_nodelay(enabled);
|
||||
}
|
||||
|
||||
if let Some(size) = c.h1_write_buffer_size {
|
||||
svc = svc.h1_write_buffer_size(size);
|
||||
}
|
||||
|
||||
if let Some(val) = c.h2_initial_window_size {
|
||||
svc = svc.h2_initial_window_size(val);
|
||||
}
|
||||
|
|
@ -915,6 +952,10 @@ where
|
|||
svc = svc.tcp_nodelay(enabled);
|
||||
}
|
||||
|
||||
if let Some(size) = c.h1_write_buffer_size {
|
||||
svc = svc.h1_write_buffer_size(size);
|
||||
}
|
||||
|
||||
if let Some(val) = c.h2_initial_window_size {
|
||||
svc = svc.h2_initial_window_size(val);
|
||||
}
|
||||
|
|
@ -993,6 +1034,10 @@ where
|
|||
svc = svc.tcp_nodelay(enabled);
|
||||
}
|
||||
|
||||
if let Some(size) = c.h1_write_buffer_size {
|
||||
svc = svc.h1_write_buffer_size(size);
|
||||
}
|
||||
|
||||
if let Some(val) = c.h2_initial_window_size {
|
||||
svc = svc.h2_initial_window_size(val);
|
||||
}
|
||||
|
|
@ -1072,6 +1117,10 @@ where
|
|||
svc = svc.tcp_nodelay(enabled);
|
||||
}
|
||||
|
||||
if let Some(size) = c.h1_write_buffer_size {
|
||||
svc = svc.h1_write_buffer_size(size);
|
||||
}
|
||||
|
||||
if let Some(val) = c.h2_initial_window_size {
|
||||
svc = svc.h2_initial_window_size(val);
|
||||
}
|
||||
|
|
@ -1140,14 +1189,19 @@ where
|
|||
.into_factory()
|
||||
.map_err(|err| err.into().error_response());
|
||||
|
||||
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then(
|
||||
HttpService::build()
|
||||
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then({
|
||||
let mut svc = HttpService::build()
|
||||
.keep_alive(c.keep_alive)
|
||||
.client_request_timeout(c.client_request_timeout)
|
||||
.client_disconnect_timeout(c.client_disconnect_timeout)
|
||||
.h1_allow_half_closed(c.h1_allow_half_closed)
|
||||
.finish(map_config(fac, move |_| config.clone())),
|
||||
)
|
||||
.h1_allow_half_closed(c.h1_allow_half_closed);
|
||||
|
||||
if let Some(size) = c.h1_write_buffer_size {
|
||||
svc = svc.h1_write_buffer_size(size);
|
||||
}
|
||||
|
||||
svc.finish(map_config(fac, move |_| config.clone()))
|
||||
})
|
||||
},
|
||||
)?;
|
||||
|
||||
|
|
@ -1194,6 +1248,10 @@ where
|
|||
svc = svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext));
|
||||
}
|
||||
|
||||
if let Some(size) = c.h1_write_buffer_size {
|
||||
svc = svc.h1_write_buffer_size(size);
|
||||
}
|
||||
|
||||
let fac = factory()
|
||||
.into_factory()
|
||||
.map_err(|err| err.into().error_response());
|
||||
|
|
|
|||
|
|
@ -98,7 +98,7 @@ dangerous-h2c = []
|
|||
|
||||
[dependencies]
|
||||
actix-codec = "0.5"
|
||||
actix-http = { version = "3.12.0", features = ["http2", "ws"] }
|
||||
actix-http = { version = "3.12.1", features = ["http2", "ws"] }
|
||||
actix-rt = { version = "2.1", default-features = false }
|
||||
actix-service = "2"
|
||||
actix-tls = { version = "3.4", features = ["connect", "uri"] }
|
||||
|
|
|
|||
Loading…
Reference in New Issue