mirror of https://github.com/fafhrd91/actix-web
Compare commits
6 Commits
993e737b57
...
5141a8f04b
Author | SHA1 | Date |
---|---|---|
|
5141a8f04b | |
|
616aed669e | |
|
5041cd1c65 | |
|
d3c46537b3 | |
|
9320df6339 | |
|
ee7e37c62f |
|
@ -2,6 +2,8 @@
|
||||||
|
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
|
||||||
|
- Properly wake Payload receivers when feeding errors or EOF
|
||||||
|
|
||||||
## 3.11.1
|
## 3.11.1
|
||||||
|
|
||||||
- Prevent more hangs after client disconnects.
|
- Prevent more hangs after client disconnects.
|
||||||
|
|
|
@ -156,7 +156,7 @@ serde_json = "1.0"
|
||||||
static_assertions = "1"
|
static_assertions = "1"
|
||||||
tls-openssl = { package = "openssl", version = "0.10.55" }
|
tls-openssl = { package = "openssl", version = "0.10.55" }
|
||||||
tls-rustls_023 = { package = "rustls", version = "0.23" }
|
tls-rustls_023 = { package = "rustls", version = "0.23" }
|
||||||
tokio = { version = "1.38.2", features = ["net", "rt", "macros"] }
|
tokio = { version = "1.38.2", features = ["net", "rt", "macros", "sync"] }
|
||||||
|
|
||||||
[lints]
|
[lints]
|
||||||
workspace = true
|
workspace = true
|
||||||
|
|
|
@ -17,6 +17,7 @@ pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> {
|
||||||
keep_alive: KeepAlive,
|
keep_alive: KeepAlive,
|
||||||
client_request_timeout: Duration,
|
client_request_timeout: Duration,
|
||||||
client_disconnect_timeout: Duration,
|
client_disconnect_timeout: Duration,
|
||||||
|
max_buffer_size: Option<usize>,
|
||||||
secure: bool,
|
secure: bool,
|
||||||
local_addr: Option<net::SocketAddr>,
|
local_addr: Option<net::SocketAddr>,
|
||||||
expect: X,
|
expect: X,
|
||||||
|
@ -38,6 +39,7 @@ where
|
||||||
keep_alive: KeepAlive::default(),
|
keep_alive: KeepAlive::default(),
|
||||||
client_request_timeout: Duration::from_secs(5),
|
client_request_timeout: Duration::from_secs(5),
|
||||||
client_disconnect_timeout: Duration::ZERO,
|
client_disconnect_timeout: Duration::ZERO,
|
||||||
|
max_buffer_size: None,
|
||||||
secure: false,
|
secure: false,
|
||||||
local_addr: None,
|
local_addr: None,
|
||||||
|
|
||||||
|
@ -124,6 +126,15 @@ where
|
||||||
self.client_disconnect_timeout(dur)
|
self.client_disconnect_timeout(dur)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set maximum buffer size.
|
||||||
|
///
|
||||||
|
/// Defines the maximum size of the buffer. When the size is reached, the dispatcher
|
||||||
|
/// will flush the data to the IO streams
|
||||||
|
pub fn max_buffer_size(mut self, size: usize) -> Self {
|
||||||
|
self.max_buffer_size = Some(size);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Provide service for `EXPECT: 100-Continue` support.
|
/// Provide service for `EXPECT: 100-Continue` support.
|
||||||
///
|
///
|
||||||
/// Service get called with request that contains `EXPECT` header.
|
/// Service get called with request that contains `EXPECT` header.
|
||||||
|
@ -140,6 +151,7 @@ where
|
||||||
keep_alive: self.keep_alive,
|
keep_alive: self.keep_alive,
|
||||||
client_request_timeout: self.client_request_timeout,
|
client_request_timeout: self.client_request_timeout,
|
||||||
client_disconnect_timeout: self.client_disconnect_timeout,
|
client_disconnect_timeout: self.client_disconnect_timeout,
|
||||||
|
max_buffer_size: self.max_buffer_size,
|
||||||
secure: self.secure,
|
secure: self.secure,
|
||||||
local_addr: self.local_addr,
|
local_addr: self.local_addr,
|
||||||
expect: expect.into_factory(),
|
expect: expect.into_factory(),
|
||||||
|
@ -164,6 +176,7 @@ where
|
||||||
keep_alive: self.keep_alive,
|
keep_alive: self.keep_alive,
|
||||||
client_request_timeout: self.client_request_timeout,
|
client_request_timeout: self.client_request_timeout,
|
||||||
client_disconnect_timeout: self.client_disconnect_timeout,
|
client_disconnect_timeout: self.client_disconnect_timeout,
|
||||||
|
max_buffer_size: self.max_buffer_size,
|
||||||
secure: self.secure,
|
secure: self.secure,
|
||||||
local_addr: self.local_addr,
|
local_addr: self.local_addr,
|
||||||
expect: self.expect,
|
expect: self.expect,
|
||||||
|
@ -199,6 +212,7 @@ where
|
||||||
self.keep_alive,
|
self.keep_alive,
|
||||||
self.client_request_timeout,
|
self.client_request_timeout,
|
||||||
self.client_disconnect_timeout,
|
self.client_disconnect_timeout,
|
||||||
|
self.max_buffer_size,
|
||||||
self.secure,
|
self.secure,
|
||||||
self.local_addr,
|
self.local_addr,
|
||||||
);
|
);
|
||||||
|
@ -224,6 +238,7 @@ where
|
||||||
self.keep_alive,
|
self.keep_alive,
|
||||||
self.client_request_timeout,
|
self.client_request_timeout,
|
||||||
self.client_disconnect_timeout,
|
self.client_disconnect_timeout,
|
||||||
|
self.max_buffer_size,
|
||||||
self.secure,
|
self.secure,
|
||||||
self.local_addr,
|
self.local_addr,
|
||||||
);
|
);
|
||||||
|
@ -246,6 +261,7 @@ where
|
||||||
self.keep_alive,
|
self.keep_alive,
|
||||||
self.client_request_timeout,
|
self.client_request_timeout,
|
||||||
self.client_disconnect_timeout,
|
self.client_disconnect_timeout,
|
||||||
|
self.max_buffer_size,
|
||||||
self.secure,
|
self.secure,
|
||||||
self.local_addr,
|
self.local_addr,
|
||||||
);
|
);
|
||||||
|
|
|
@ -17,6 +17,7 @@ struct Inner {
|
||||||
keep_alive: KeepAlive,
|
keep_alive: KeepAlive,
|
||||||
client_request_timeout: Duration,
|
client_request_timeout: Duration,
|
||||||
client_disconnect_timeout: Duration,
|
client_disconnect_timeout: Duration,
|
||||||
|
max_buffer_size: Option<usize>,
|
||||||
secure: bool,
|
secure: bool,
|
||||||
local_addr: Option<std::net::SocketAddr>,
|
local_addr: Option<std::net::SocketAddr>,
|
||||||
date_service: DateService,
|
date_service: DateService,
|
||||||
|
@ -28,6 +29,7 @@ impl Default for ServiceConfig {
|
||||||
KeepAlive::default(),
|
KeepAlive::default(),
|
||||||
Duration::from_secs(5),
|
Duration::from_secs(5),
|
||||||
Duration::ZERO,
|
Duration::ZERO,
|
||||||
|
None,
|
||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
|
@ -40,6 +42,7 @@ impl ServiceConfig {
|
||||||
keep_alive: KeepAlive,
|
keep_alive: KeepAlive,
|
||||||
client_request_timeout: Duration,
|
client_request_timeout: Duration,
|
||||||
client_disconnect_timeout: Duration,
|
client_disconnect_timeout: Duration,
|
||||||
|
max_buffer_size: Option<usize>,
|
||||||
secure: bool,
|
secure: bool,
|
||||||
local_addr: Option<net::SocketAddr>,
|
local_addr: Option<net::SocketAddr>,
|
||||||
) -> ServiceConfig {
|
) -> ServiceConfig {
|
||||||
|
@ -47,6 +50,7 @@ impl ServiceConfig {
|
||||||
keep_alive: keep_alive.normalize(),
|
keep_alive: keep_alive.normalize(),
|
||||||
client_request_timeout,
|
client_request_timeout,
|
||||||
client_disconnect_timeout,
|
client_disconnect_timeout,
|
||||||
|
max_buffer_size,
|
||||||
secure,
|
secure,
|
||||||
local_addr,
|
local_addr,
|
||||||
date_service: DateService::new(),
|
date_service: DateService::new(),
|
||||||
|
@ -104,6 +108,10 @@ impl ServiceConfig {
|
||||||
self.0.date_service.now()
|
self.0.date_service.now()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn max_buffer_size(&self) -> Option<usize> {
|
||||||
|
self.0.max_buffer_size
|
||||||
|
}
|
||||||
|
|
||||||
/// Writes date header to `dst` buffer.
|
/// Writes date header to `dst` buffer.
|
||||||
///
|
///
|
||||||
/// Low-level method that utilizes the built-in efficient date service, requiring fewer syscalls
|
/// Low-level method that utilizes the built-in efficient date service, requiring fewer syscalls
|
||||||
|
@ -143,8 +151,14 @@ mod tests {
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_date_service_update() {
|
async fn test_date_service_update() {
|
||||||
let settings =
|
let settings = ServiceConfig::new(
|
||||||
ServiceConfig::new(KeepAlive::Os, Duration::ZERO, Duration::ZERO, false, None);
|
KeepAlive::Os,
|
||||||
|
Duration::ZERO,
|
||||||
|
Duration::ZERO,
|
||||||
|
None,
|
||||||
|
false,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
|
||||||
yield_now().await;
|
yield_now().await;
|
||||||
|
|
||||||
|
|
|
@ -166,6 +166,7 @@ pin_project! {
|
||||||
pub(super) io: Option<T>,
|
pub(super) io: Option<T>,
|
||||||
read_buf: BytesMut,
|
read_buf: BytesMut,
|
||||||
write_buf: BytesMut,
|
write_buf: BytesMut,
|
||||||
|
max_buffer_size: usize,
|
||||||
codec: Codec,
|
codec: Codec,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -278,6 +279,7 @@ where
|
||||||
io: Some(io),
|
io: Some(io),
|
||||||
read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
|
read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
|
||||||
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
|
write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
|
||||||
|
max_buffer_size: config.max_buffer_size().unwrap_or(MAX_BUFFER_SIZE),
|
||||||
codec: Codec::new(config),
|
codec: Codec::new(config),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -493,7 +495,7 @@ where
|
||||||
StateProj::SendPayload { mut body } => {
|
StateProj::SendPayload { mut body } => {
|
||||||
// keep populate writer buffer until buffer size limit hit,
|
// keep populate writer buffer until buffer size limit hit,
|
||||||
// get blocked or finished.
|
// get blocked or finished.
|
||||||
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
|
while this.write_buf.len() < *this.max_buffer_size {
|
||||||
match body.as_mut().poll_next(cx) {
|
match body.as_mut().poll_next(cx) {
|
||||||
Poll::Ready(Some(Ok(item))) => {
|
Poll::Ready(Some(Ok(item))) => {
|
||||||
this.codec
|
this.codec
|
||||||
|
@ -532,7 +534,7 @@ where
|
||||||
|
|
||||||
// keep populate writer buffer until buffer size limit hit,
|
// keep populate writer buffer until buffer size limit hit,
|
||||||
// get blocked or finished.
|
// get blocked or finished.
|
||||||
while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
|
while this.write_buf.len() < *this.max_buffer_size {
|
||||||
match body.as_mut().poll_next(cx) {
|
match body.as_mut().poll_next(cx) {
|
||||||
Poll::Ready(Some(Ok(item))) => {
|
Poll::Ready(Some(Ok(item))) => {
|
||||||
this.codec
|
this.codec
|
||||||
|
|
|
@ -82,6 +82,7 @@ async fn late_request() {
|
||||||
KeepAlive::Disabled,
|
KeepAlive::Disabled,
|
||||||
Duration::from_millis(100),
|
Duration::from_millis(100),
|
||||||
Duration::ZERO,
|
Duration::ZERO,
|
||||||
|
None,
|
||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
|
@ -149,6 +150,7 @@ async fn oneshot_connection() {
|
||||||
KeepAlive::Disabled,
|
KeepAlive::Disabled,
|
||||||
Duration::from_millis(100),
|
Duration::from_millis(100),
|
||||||
Duration::ZERO,
|
Duration::ZERO,
|
||||||
|
None,
|
||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
|
@ -210,6 +212,7 @@ async fn keep_alive_timeout() {
|
||||||
KeepAlive::Timeout(Duration::from_millis(200)),
|
KeepAlive::Timeout(Duration::from_millis(200)),
|
||||||
Duration::from_millis(100),
|
Duration::from_millis(100),
|
||||||
Duration::ZERO,
|
Duration::ZERO,
|
||||||
|
None,
|
||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
|
@ -289,6 +292,7 @@ async fn keep_alive_follow_up_req() {
|
||||||
KeepAlive::Timeout(Duration::from_millis(500)),
|
KeepAlive::Timeout(Duration::from_millis(500)),
|
||||||
Duration::from_millis(100),
|
Duration::from_millis(100),
|
||||||
Duration::ZERO,
|
Duration::ZERO,
|
||||||
|
None,
|
||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
|
@ -453,6 +457,7 @@ async fn pipelining_ok_then_ok() {
|
||||||
KeepAlive::Disabled,
|
KeepAlive::Disabled,
|
||||||
Duration::from_millis(1),
|
Duration::from_millis(1),
|
||||||
Duration::from_millis(1),
|
Duration::from_millis(1),
|
||||||
|
None,
|
||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
|
@ -523,6 +528,7 @@ async fn pipelining_ok_then_bad() {
|
||||||
KeepAlive::Disabled,
|
KeepAlive::Disabled,
|
||||||
Duration::from_millis(1),
|
Duration::from_millis(1),
|
||||||
Duration::from_millis(1),
|
Duration::from_millis(1),
|
||||||
|
None,
|
||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
|
@ -586,6 +592,7 @@ async fn expect_handling() {
|
||||||
KeepAlive::Disabled,
|
KeepAlive::Disabled,
|
||||||
Duration::ZERO,
|
Duration::ZERO,
|
||||||
Duration::ZERO,
|
Duration::ZERO,
|
||||||
|
None,
|
||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
|
@ -663,6 +670,7 @@ async fn expect_eager() {
|
||||||
KeepAlive::Disabled,
|
KeepAlive::Disabled,
|
||||||
Duration::ZERO,
|
Duration::ZERO,
|
||||||
Duration::ZERO,
|
Duration::ZERO,
|
||||||
|
None,
|
||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
|
@ -746,6 +754,7 @@ async fn upgrade_handling() {
|
||||||
KeepAlive::Disabled,
|
KeepAlive::Disabled,
|
||||||
Duration::ZERO,
|
Duration::ZERO,
|
||||||
Duration::ZERO,
|
Duration::ZERO,
|
||||||
|
None,
|
||||||
false,
|
false,
|
||||||
None,
|
None,
|
||||||
);
|
);
|
||||||
|
|
|
@ -200,11 +200,13 @@ impl Inner {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn set_error(&mut self, err: PayloadError) {
|
fn set_error(&mut self, err: PayloadError) {
|
||||||
self.err = Some(err);
|
self.err = Some(err);
|
||||||
|
self.wake();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn feed_eof(&mut self) {
|
fn feed_eof(&mut self) {
|
||||||
self.eof = true;
|
self.eof = true;
|
||||||
|
self.wake();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -253,8 +255,13 @@ impl Inner {
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use std::{task::Poll, time::Duration};
|
||||||
|
|
||||||
|
use actix_rt::time::timeout;
|
||||||
use actix_utils::future::poll_fn;
|
use actix_utils::future::poll_fn;
|
||||||
|
use futures_util::{FutureExt, StreamExt};
|
||||||
use static_assertions::{assert_impl_all, assert_not_impl_any};
|
use static_assertions::{assert_impl_all, assert_not_impl_any};
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
|
@ -263,6 +270,67 @@ mod tests {
|
||||||
|
|
||||||
assert_impl_all!(Inner: Unpin, Send, Sync);
|
assert_impl_all!(Inner: Unpin, Send, Sync);
|
||||||
|
|
||||||
|
const WAKE_TIMEOUT: Duration = Duration::from_secs(2);
|
||||||
|
|
||||||
|
fn prepare_waking_test(
|
||||||
|
mut payload: Payload,
|
||||||
|
expected: Option<Result<(), ()>>,
|
||||||
|
) -> (oneshot::Receiver<()>, actix_rt::task::JoinHandle<()>) {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let handle = actix_rt::spawn(async move {
|
||||||
|
// Make sure to poll once to set the waker
|
||||||
|
poll_fn(|cx| {
|
||||||
|
assert!(payload.poll_next_unpin(cx).is_pending());
|
||||||
|
Poll::Ready(())
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
tx.send(()).unwrap();
|
||||||
|
|
||||||
|
// actix-rt is single-threaded, so this won't race with `rx.await`
|
||||||
|
let mut pend_once = false;
|
||||||
|
poll_fn(|_| {
|
||||||
|
if pend_once {
|
||||||
|
Poll::Ready(())
|
||||||
|
} else {
|
||||||
|
// Return pending without storing wakers, we already did on the previous
|
||||||
|
// `poll_fn`, now this task will only continue if the `sender` wakes us
|
||||||
|
pend_once = true;
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let got = payload.next().now_or_never().unwrap();
|
||||||
|
match expected {
|
||||||
|
Some(Ok(_)) => assert!(got.unwrap().is_ok()),
|
||||||
|
Some(Err(_)) => assert!(got.unwrap().is_err()),
|
||||||
|
None => assert!(got.is_none()),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
(rx, handle)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn wake_on_error() {
|
||||||
|
let (mut sender, payload) = Payload::create(false);
|
||||||
|
let (rx, handle) = prepare_waking_test(payload, Some(Err(())));
|
||||||
|
|
||||||
|
rx.await.unwrap();
|
||||||
|
sender.set_error(PayloadError::Incomplete(None));
|
||||||
|
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn wake_on_eof() {
|
||||||
|
let (mut sender, payload) = Payload::create(false);
|
||||||
|
let (rx, handle) = prepare_waking_test(payload, None);
|
||||||
|
|
||||||
|
rx.await.unwrap();
|
||||||
|
sender.feed_eof();
|
||||||
|
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_unread_data() {
|
async fn test_unread_data() {
|
||||||
let (_, mut payload) = Payload::create(false);
|
let (_, mut payload) = Payload::create(false);
|
||||||
|
|
|
@ -24,7 +24,7 @@ allowed_external_types = [
|
||||||
actix = { version = ">=0.12, <0.14", default-features = false }
|
actix = { version = ">=0.12, <0.14", default-features = false }
|
||||||
actix-codec = "0.5"
|
actix-codec = "0.5"
|
||||||
actix-http = "3"
|
actix-http = "3"
|
||||||
actix-web = { version = "4", default-features = false }
|
actix-web = { version = "4", default-features = false, features = ["ws"] }
|
||||||
|
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
bytestring = "1"
|
bytestring = "1"
|
||||||
|
|
|
@ -4,6 +4,7 @@
|
||||||
|
|
||||||
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now sets `Content-Type` to `application/octet-stream` if `Content-Type` does not exist.
|
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now sets `Content-Type` to `application/octet-stream` if `Content-Type` does not exist.
|
||||||
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now calls `actix_web::response::builder::HttpResponseBuilder::no_chunking()` if `Content-Length` is set by user.
|
- `actix_web::response::builder::HttpResponseBuilder::streaming()` now calls `actix_web::response::builder::HttpResponseBuilder::no_chunking()` if `Content-Length` is set by user.
|
||||||
|
- Add `ws` crate feature (on-by-default) which forwards to `actix-http` and guards some of its `ResponseError` impls.
|
||||||
|
|
||||||
## 4.11.0
|
## 4.11.0
|
||||||
|
|
||||||
|
|
|
@ -67,6 +67,7 @@ default = [
|
||||||
"http2",
|
"http2",
|
||||||
"unicode",
|
"unicode",
|
||||||
"compat",
|
"compat",
|
||||||
|
"ws",
|
||||||
]
|
]
|
||||||
|
|
||||||
# Brotli algorithm content-encoding support
|
# Brotli algorithm content-encoding support
|
||||||
|
@ -85,9 +86,12 @@ cookies = ["dep:cookie"]
|
||||||
# Secure & signed cookies
|
# Secure & signed cookies
|
||||||
secure-cookies = ["cookies", "cookie/secure"]
|
secure-cookies = ["cookies", "cookie/secure"]
|
||||||
|
|
||||||
# HTTP/2 support (including h2c).
|
# HTTP/2 support (including h2c)
|
||||||
http2 = ["actix-http/http2"]
|
http2 = ["actix-http/http2"]
|
||||||
|
|
||||||
|
# WebSocket support
|
||||||
|
ws = ["actix-http/ws"]
|
||||||
|
|
||||||
# TLS via OpenSSL
|
# TLS via OpenSSL
|
||||||
openssl = ["__tls", "http2", "actix-http/openssl", "actix-tls/accept", "actix-tls/openssl"]
|
openssl = ["__tls", "http2", "actix-http/openssl", "actix-tls/accept", "actix-tls/openssl"]
|
||||||
|
|
||||||
|
@ -131,7 +135,7 @@ actix-service = "2"
|
||||||
actix-tls = { version = "3.4", default-features = false, optional = true }
|
actix-tls = { version = "3.4", default-features = false, optional = true }
|
||||||
actix-utils = "3"
|
actix-utils = "3"
|
||||||
|
|
||||||
actix-http = { version = "3.11", features = ["ws"] }
|
actix-http = "3.11"
|
||||||
actix-router = { version = "0.5.3", default-features = false, features = ["http"] }
|
actix-router = { version = "0.5.3", default-features = false, features = ["http"] }
|
||||||
actix-web-codegen = { version = "4.3", optional = true, default-features = false }
|
actix-web-codegen = { version = "4.3", optional = true, default-features = false }
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,6 @@ use std::{
|
||||||
io::{self, Write as _},
|
io::{self, Write as _},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_http::Response;
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -126,20 +125,24 @@ impl ResponseError for actix_http::error::PayloadError {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ResponseError for actix_http::ws::ProtocolError {}
|
|
||||||
|
|
||||||
impl ResponseError for actix_http::error::ContentTypeError {
|
impl ResponseError for actix_http::error::ContentTypeError {
|
||||||
fn status_code(&self) -> StatusCode {
|
fn status_code(&self) -> StatusCode {
|
||||||
StatusCode::BAD_REQUEST
|
StatusCode::BAD_REQUEST
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "ws")]
|
||||||
impl ResponseError for actix_http::ws::HandshakeError {
|
impl ResponseError for actix_http::ws::HandshakeError {
|
||||||
fn error_response(&self) -> HttpResponse<BoxBody> {
|
fn error_response(&self) -> HttpResponse<BoxBody> {
|
||||||
Response::from(self).map_into_boxed_body().into()
|
actix_http::Response::from(self)
|
||||||
|
.map_into_boxed_body()
|
||||||
|
.into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "ws")]
|
||||||
|
impl ResponseError for actix_http::ws::ProtocolError {}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
|
@ -31,6 +31,7 @@ struct Config {
|
||||||
keep_alive: KeepAlive,
|
keep_alive: KeepAlive,
|
||||||
client_request_timeout: Duration,
|
client_request_timeout: Duration,
|
||||||
client_disconnect_timeout: Duration,
|
client_disconnect_timeout: Duration,
|
||||||
|
max_buffer_size: Option<usize>,
|
||||||
#[allow(dead_code)] // only dead when no TLS features are enabled
|
#[allow(dead_code)] // only dead when no TLS features are enabled
|
||||||
tls_handshake_timeout: Option<Duration>,
|
tls_handshake_timeout: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
@ -116,6 +117,7 @@ where
|
||||||
keep_alive: KeepAlive::default(),
|
keep_alive: KeepAlive::default(),
|
||||||
client_request_timeout: Duration::from_secs(5),
|
client_request_timeout: Duration::from_secs(5),
|
||||||
client_disconnect_timeout: Duration::from_secs(1),
|
client_disconnect_timeout: Duration::from_secs(1),
|
||||||
|
max_buffer_size: None,
|
||||||
tls_handshake_timeout: None,
|
tls_handshake_timeout: None,
|
||||||
})),
|
})),
|
||||||
backlog: 1024,
|
backlog: 1024,
|
||||||
|
@ -234,6 +236,15 @@ where
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set maximum buffer size.
|
||||||
|
///
|
||||||
|
/// Defines the maximum size of the write buffer. When the size is reached, the dispatcher
|
||||||
|
/// will flush the data to the IO streams
|
||||||
|
pub fn max_buffer_size(self, size: usize) -> Self {
|
||||||
|
self.config.lock().unwrap().max_buffer_size = Some(size);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Sets TLS handshake timeout.
|
/// Sets TLS handshake timeout.
|
||||||
///
|
///
|
||||||
/// Defines a timeout for TLS handshake. If the TLS handshake does not complete within this
|
/// Defines a timeout for TLS handshake. If the TLS handshake does not complete within this
|
||||||
|
@ -560,6 +571,10 @@ where
|
||||||
.client_disconnect_timeout(cfg.client_disconnect_timeout)
|
.client_disconnect_timeout(cfg.client_disconnect_timeout)
|
||||||
.local_addr(addr);
|
.local_addr(addr);
|
||||||
|
|
||||||
|
if let Some(size) = cfg.max_buffer_size {
|
||||||
|
svc = svc.max_buffer_size(size);
|
||||||
|
};
|
||||||
|
|
||||||
if let Some(handler) = on_connect_fn.clone() {
|
if let Some(handler) = on_connect_fn.clone() {
|
||||||
svc =
|
svc =
|
||||||
svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext))
|
svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext))
|
||||||
|
|
Loading…
Reference in New Issue