Merge branch 'master' into router/rework

This commit is contained in:
Rob Ede 2022-01-31 21:27:31 +00:00 committed by GitHub
commit 4c8cd8a8e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
58 changed files with 1594 additions and 862 deletions

View File

@ -46,3 +46,21 @@ jobs:
with: with:
token: ${{ secrets.GITHUB_TOKEN }} token: ${{ secrets.GITHUB_TOKEN }}
args: --workspace --tests --examples --all-features args: --workspace --tests --examples --all-features
lint-docs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
components: rust-docs
- name: Check for broken intra-doc links
uses: actions-rs/cargo@v1
env:
RUSTDOCFLAGS: "-D warnings"
with:
command: doc
args: --no-deps --all-features --workspace

View File

@ -1,10 +1,15 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
### Changed
- Rename `HttpServer::{client_timeout => client_request_timeout}`. [#2611]
- Rename `HttpServer::{client_shutdown => client_disconnect_timeout}`. [#2611]
### Removed ### Removed
- `impl Future for HttpResponse`. [#2601] - `impl Future for HttpResponse`. [#2601]
[#2601]: https://github.com/actix/actix-web/pull/2601 [#2601]: https://github.com/actix/actix-web/pull/2601
[#2611]: https://github.com/actix/actix-web/pull/2611
## 4.0.0-beta.21 - 2022-01-21 ## 4.0.0-beta.21 - 2022-01-21

View File

@ -80,7 +80,7 @@ actix-service = "2.0.0"
actix-utils = "3.0.0" actix-utils = "3.0.0"
actix-tls = { version = "3.0.0", default-features = false, optional = true } actix-tls = { version = "3.0.0", default-features = false, optional = true }
actix-http = "3.0.0-beta.19" actix-http = { version = "3.0.0-beta.19", features = ["http2", "ws"] }
actix-router = "0.5.0-rc.2" actix-router = "0.5.0-rc.2"
actix-web-codegen = "0.5.0-rc.1" actix-web-codegen = "0.5.0-rc.1"

View File

@ -209,6 +209,7 @@ impl NamedFile {
Self::from_file(file, path) Self::from_file(file, path)
} }
#[allow(rustdoc::broken_intra_doc_links)]
/// Attempts to open a file asynchronously in read-only mode. /// Attempts to open a file asynchronously in read-only mode.
/// ///
/// When the `experimental-io-uring` crate feature is enabled, this will be async. /// When the `experimental-io-uring` crate feature is enabled, this will be async.
@ -298,9 +299,11 @@ impl NamedFile {
self self
} }
/// Set content encoding for serving this file /// Sets content encoding for this file.
/// ///
/// Must be used with [`actix_web::middleware::Compress`] to take effect. /// This prevents the `Compress` middleware from modifying the file contents and signals to
/// browsers/clients how to decode it. For example, if serving a compressed HTML file (e.g.,
/// `index.html.gz`) then use `.set_content_encoding(ContentEncoding::Gzip)`.
#[inline] #[inline]
pub fn set_content_encoding(mut self, enc: ContentEncoding) -> Self { pub fn set_content_encoding(mut self, enc: ContentEncoding) -> Self {
self.encoding = Some(enc); self.encoding = Some(enc);

View File

@ -1,6 +1,35 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
### Added
- Implement `Default` for `KeepAlive`. [#2611]
- Implement `From<Duration>` for `KeepAlive`. [#2611]
- Implement `From<Option<Duration>>` for `KeepAlive`. [#2611]
- Implement `Default` for `HttpServiceBuilder`. [#2611]
- Crate `ws` feature flag, disabled by default. [#2618]
- Crate `http2` feature flag, disabled by default. [#2618]
### Changed
- Rename `ServiceConfig::{client_timer_expire => client_request_deadline}`. [#2611]
- Rename `ServiceConfig::{client_disconnect_timer => client_disconnect_deadline}`. [#2611]
- Deadline methods in `ServiceConfig` now return `std::time::Instant`s instead of Tokio's wrapper type. [#2611]
- Rename `h1::Codec::{keepalive => keep_alive}`. [#2611]
- Rename `h1::Codec::{keepalive_enabled => keep_alive_enabled}`. [#2611]
- Rename `h1::ClientCodec::{keepalive => keep_alive}`. [#2611]
- Rename `h1::ClientPayloadCodec::{keepalive => keep_alive}`. [#2611]
- `ServiceConfig::keep_alive` now returns a `KeepAlive`. [#2611]
### Fixed
- HTTP/1.1 dispatcher correctly uses client request timeout. [#2611]
### Removed
- `ServiceConfig::{client_timer, keep_alive_timer}`. [#2611]
- `impl From<usize> for KeepAlive`; use `Duration`s instead. [#2611]
- `impl From<Option<usize>> for KeepAlive`; use `Duration`s instead. [#2611]
- `HttpServiceBuilder::new`; use `default` instead. [#2611]
[#2611]: https://github.com/actix/actix-web/pull/2611
[#2618]: https://github.com/actix/actix-web/pull/2618
## 3.0.0-beta.19 - 2022-01-21 ## 3.0.0-beta.19 - 2022-01-21

View File

@ -29,54 +29,69 @@ path = "src/lib.rs"
[features] [features]
default = [] default = []
# openssl # HTTP/2 protocol support
http2 = ["h2"]
# WebSocket protocol implementation
ws = [
"local-channel",
"base64",
"rand",
"sha-1",
]
# TLS via OpenSSL
openssl = ["actix-tls/accept", "actix-tls/openssl"] openssl = ["actix-tls/accept", "actix-tls/openssl"]
# rustls support # TLS via Rustls
rustls = ["actix-tls/accept", "actix-tls/rustls"] rustls = ["actix-tls/accept", "actix-tls/rustls"]
# enable compression support # Compression codecs
compress-brotli = ["brotli", "__compress"] compress-brotli = ["__compress", "brotli"]
compress-gzip = ["flate2", "__compress"] compress-gzip = ["__compress", "flate2"]
compress-zstd = ["zstd", "__compress"] compress-zstd = ["__compress", "zstd"]
# Internal (PRIVATE!) features used to aid testing and cheking feature status. # Internal (PRIVATE!) features used to aid testing and cheking feature status.
# Don't rely on these whatsoever. They may disappear at anytime. # Don't rely on these whatsoever. They are semver-exempt and may disappear at anytime.
__compress = [] __compress = []
[dependencies] [dependencies]
actix-service = "2.0.0" actix-service = "2"
actix-codec = "0.4.1" actix-codec = "0.4.1"
actix-utils = "3.0.0" actix-utils = "3"
actix-rt = { version = "2.2", default-features = false } actix-rt = { version = "2.2", default-features = false }
ahash = "0.7" ahash = "0.7"
base64 = "0.13"
bitflags = "1.2" bitflags = "1.2"
bytes = "1" bytes = "1"
bytestring = "1" bytestring = "1"
derive_more = "0.99.5" derive_more = "0.99.5"
encoding_rs = "0.8" encoding_rs = "0.8"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
h2 = "0.3.9"
http = "0.2.5" http = "0.2.5"
httparse = "1.5.1" httparse = "1.5.1"
httpdate = "1.0.1" httpdate = "1.0.1"
itoa = "1" itoa = "1"
language-tags = "0.3" language-tags = "0.3"
local-channel = "0.1"
log = "0.4" log = "0.4"
mime = "0.3" mime = "0.3"
percent-encoding = "2.1" percent-encoding = "2.1"
pin-project-lite = "0.2" pin-project-lite = "0.2"
rand = "0.8"
sha-1 = "0.10"
smallvec = "1.6.1" smallvec = "1.6.1"
# tls # http2
h2 = { version = "0.3.9", optional = true }
# websockets
local-channel = { version = "0.1", optional = true }
base64 = { version = "0.13", optional = true }
rand = { version = "0.8", optional = true }
sha-1 = { version = "0.10", optional = true }
# openssl/rustls
actix-tls = { version = "3.0.0", default-features = false, optional = true } actix-tls = { version = "3.0.0", default-features = false, optional = true }
# compression # compress-*
brotli = { version = "3.3.3", optional = true } brotli = { version = "3.3.3", optional = true }
flate2 = { version = "1.0.13", optional = true } flate2 = { version = "1.0.13", optional = true }
zstd = { version = "0.9", optional = true } zstd = { version = "0.9", optional = true }
@ -92,6 +107,7 @@ criterion = { version = "0.3", features = ["html_reports"] }
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
memchr = "2.4" memchr = "2.4"
once_cell = "1.9"
rcgen = "0.8" rcgen = "0.8"
regex = "1.3" regex = "1.3"
rustls-pemfile = "0.2" rustls-pemfile = "0.2"

View File

@ -0,0 +1,27 @@
use std::{convert::Infallible, io, time::Duration};
use actix_http::{HttpService, Request, Response, StatusCode};
use actix_server::Server;
use once_cell::sync::Lazy;
static STR: Lazy<String> = Lazy::new(|| "HELLO WORLD ".repeat(20));
#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
Server::build()
.bind("dispatcher-benchmark", ("127.0.0.1", 8080), || {
HttpService::build()
.client_request_timeout(Duration::from_secs(1))
.finish(|_: Request| async move {
let mut res = Response::build(StatusCode::OK);
Ok::<_, Infallible>(res.body(&**STR))
})
.tcp()
})?
// limiting number of workers so that bench client is not sharing as many resources
.workers(4)
.run()
.await
}

View File

@ -1,4 +1,4 @@
use std::io; use std::{io, time::Duration};
use actix_http::{Error, HttpService, Request, Response, StatusCode}; use actix_http::{Error, HttpService, Request, Response, StatusCode};
use actix_server::Server; use actix_server::Server;
@ -13,8 +13,8 @@ async fn main() -> io::Result<()> {
Server::build() Server::build()
.bind("echo", ("127.0.0.1", 8080), || { .bind("echo", ("127.0.0.1", 8080), || {
HttpService::build() HttpService::build()
.client_timeout(1000) .client_request_timeout(Duration::from_secs(1))
.client_disconnect(1000) .client_disconnect_timeout(Duration::from_secs(1))
// handles HTTP/1.1 and HTTP/2 // handles HTTP/1.1 and HTTP/2
.finish(|mut req: Request| async move { .finish(|mut req: Request| async move {
let mut body = BytesMut::new(); let mut body = BytesMut::new();

View File

@ -0,0 +1,25 @@
use std::{convert::Infallible, io};
use actix_http::{HttpService, Request, Response, StatusCode};
use actix_server::Server;
use once_cell::sync::Lazy;
static STR: Lazy<String> = Lazy::new(|| "HELLO WORLD ".repeat(100));
#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
Server::build()
.bind("h2spec", ("127.0.0.1", 8080), || {
HttpService::build()
.h2(|_: Request| async move {
let mut res = Response::build(StatusCode::OK);
Ok::<_, Infallible>(res.body(&**STR))
})
.tcp()
})?
.workers(4)
.run()
.await
}

View File

@ -1,4 +1,4 @@
use std::{convert::Infallible, io}; use std::{convert::Infallible, io, time::Duration};
use actix_http::{ use actix_http::{
header::HeaderValue, HttpMessage, HttpService, Request, Response, StatusCode, header::HeaderValue, HttpMessage, HttpService, Request, Response, StatusCode,
@ -12,8 +12,8 @@ async fn main() -> io::Result<()> {
Server::build() Server::build()
.bind("hello-world", ("127.0.0.1", 8080), || { .bind("hello-world", ("127.0.0.1", 8080), || {
HttpService::build() HttpService::build()
.client_timeout(1000) .client_request_timeout(Duration::from_secs(1))
.client_disconnect(1000) .client_disconnect_timeout(Duration::from_secs(1))
.on_connect_ext(|_, ext| { .on_connect_ext(|_, ext| {
ext.insert(42u32); ext.insert(42u32);
}) })

View File

@ -1,25 +1,22 @@
use std::{fmt, marker::PhantomData, net, rc::Rc}; use std::{fmt, marker::PhantomData, net, rc::Rc, time::Duration};
use actix_codec::Framed; use actix_codec::Framed;
use actix_service::{IntoServiceFactory, Service, ServiceFactory}; use actix_service::{IntoServiceFactory, Service, ServiceFactory};
use crate::{ use crate::{
body::{BoxBody, MessageBody}, body::{BoxBody, MessageBody},
config::{KeepAlive, ServiceConfig},
h1::{self, ExpectHandler, H1Service, UpgradeHandler}, h1::{self, ExpectHandler, H1Service, UpgradeHandler},
h2::H2Service,
service::HttpService, service::HttpService,
ConnectCallback, Extensions, Request, Response, ConnectCallback, Extensions, KeepAlive, Request, Response, ServiceConfig,
}; };
/// A HTTP service builder /// An HTTP service builder.
/// ///
/// This type can be used to construct an instance of [`HttpService`] through a /// This type can construct an instance of [`HttpService`] through a builder-like pattern.
/// builder-like pattern.
pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> { pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> {
keep_alive: KeepAlive, keep_alive: KeepAlive,
client_timeout: u64, client_request_timeout: Duration,
client_disconnect: u64, client_disconnect_timeout: Duration,
secure: bool, secure: bool,
local_addr: Option<net::SocketAddr>, local_addr: Option<net::SocketAddr>,
expect: X, expect: X,
@ -28,22 +25,23 @@ pub struct HttpServiceBuilder<T, S, X = ExpectHandler, U = UpgradeHandler> {
_phantom: PhantomData<S>, _phantom: PhantomData<S>,
} }
impl<T, S> HttpServiceBuilder<T, S, ExpectHandler, UpgradeHandler> impl<T, S> Default for HttpServiceBuilder<T, S, ExpectHandler, UpgradeHandler>
where where
S: ServiceFactory<Request, Config = ()>, S: ServiceFactory<Request, Config = ()>,
S::Error: Into<Response<BoxBody>> + 'static, S::Error: Into<Response<BoxBody>> + 'static,
S::InitError: fmt::Debug, S::InitError: fmt::Debug,
<S::Service as Service<Request>>::Future: 'static, <S::Service as Service<Request>>::Future: 'static,
{ {
/// Create instance of `ServiceConfigBuilder` fn default() -> Self {
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
HttpServiceBuilder { HttpServiceBuilder {
keep_alive: KeepAlive::Timeout(5), // ServiceConfig parts (make sure defaults match)
client_timeout: 5000, keep_alive: KeepAlive::default(),
client_disconnect: 0, client_request_timeout: Duration::from_secs(5),
client_disconnect_timeout: Duration::ZERO,
secure: false, secure: false,
local_addr: None, local_addr: None,
// dispatcher parts
expect: ExpectHandler, expect: ExpectHandler,
upgrade: None, upgrade: None,
on_connect_ext: None, on_connect_ext: None,
@ -65,9 +63,11 @@ where
U::Error: fmt::Display, U::Error: fmt::Display,
U::InitError: fmt::Debug, U::InitError: fmt::Debug,
{ {
/// Set server keep-alive setting. /// Set connection keep-alive setting.
/// ///
/// By default keep alive is set to a 5 seconds. /// Applies to HTTP/1.1 keep-alive and HTTP/2 ping-pong.
///
/// By default keep-alive is 5 seconds.
pub fn keep_alive<W: Into<KeepAlive>>(mut self, val: W) -> Self { pub fn keep_alive<W: Into<KeepAlive>>(mut self, val: W) -> Self {
self.keep_alive = val.into(); self.keep_alive = val.into();
self self
@ -85,33 +85,45 @@ where
self self
} }
/// Set server client timeout in milliseconds for first request. /// Set client request timeout (for first request).
/// ///
/// Defines a timeout for reading client request header. If a client does not transmit /// Defines a timeout for reading client request header. If the client does not transmit the
/// the entire set headers within this time, the request is terminated with /// request head within this duration, the connection is terminated with a `408 Request Timeout`
/// the 408 (Request Time-out) error. /// response error.
/// ///
/// To disable timeout set value to 0. /// A duration of zero disables the timeout.
/// ///
/// By default client timeout is set to 5000 milliseconds. /// By default, the client timeout is 5 seconds.
pub fn client_timeout(mut self, val: u64) -> Self { pub fn client_request_timeout(mut self, dur: Duration) -> Self {
self.client_timeout = val; self.client_request_timeout = dur;
self self
} }
/// Set server connection disconnect timeout in milliseconds. #[doc(hidden)]
#[deprecated(since = "3.0.0", note = "Renamed to `client_request_timeout`.")]
pub fn client_timeout(self, dur: Duration) -> Self {
self.client_request_timeout(dur)
}
/// Set client connection disconnect timeout.
/// ///
/// Defines a timeout for disconnect connection. If a disconnect procedure does not complete /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete
/// within this time, the request get dropped. This timeout affects secure connections. /// within this time, the request get dropped. This timeout affects secure connections.
/// ///
/// To disable timeout set value to 0. /// A duration of zero disables the timeout.
/// ///
/// By default disconnect timeout is set to 0. /// By default, the disconnect timeout is disabled.
pub fn client_disconnect(mut self, val: u64) -> Self { pub fn client_disconnect_timeout(mut self, dur: Duration) -> Self {
self.client_disconnect = val; self.client_disconnect_timeout = dur;
self self
} }
#[doc(hidden)]
#[deprecated(since = "3.0.0", note = "Renamed to `client_disconnect_timeout`.")]
pub fn client_disconnect(self, dur: Duration) -> Self {
self.client_disconnect_timeout(dur)
}
/// Provide service for `EXPECT: 100-Continue` support. /// Provide service for `EXPECT: 100-Continue` support.
/// ///
/// Service get called with request that contains `EXPECT` header. /// Service get called with request that contains `EXPECT` header.
@ -126,8 +138,8 @@ where
{ {
HttpServiceBuilder { HttpServiceBuilder {
keep_alive: self.keep_alive, keep_alive: self.keep_alive,
client_timeout: self.client_timeout, client_request_timeout: self.client_request_timeout,
client_disconnect: self.client_disconnect, client_disconnect_timeout: self.client_disconnect_timeout,
secure: self.secure, secure: self.secure,
local_addr: self.local_addr, local_addr: self.local_addr,
expect: expect.into_factory(), expect: expect.into_factory(),
@ -150,8 +162,8 @@ where
{ {
HttpServiceBuilder { HttpServiceBuilder {
keep_alive: self.keep_alive, keep_alive: self.keep_alive,
client_timeout: self.client_timeout, client_request_timeout: self.client_request_timeout,
client_disconnect: self.client_disconnect, client_disconnect_timeout: self.client_disconnect_timeout,
secure: self.secure, secure: self.secure,
local_addr: self.local_addr, local_addr: self.local_addr,
expect: self.expect, expect: self.expect,
@ -185,8 +197,8 @@ where
{ {
let cfg = ServiceConfig::new( let cfg = ServiceConfig::new(
self.keep_alive, self.keep_alive,
self.client_timeout, self.client_request_timeout,
self.client_disconnect, self.client_disconnect_timeout,
self.secure, self.secure,
self.local_addr, self.local_addr,
); );
@ -198,7 +210,8 @@ where
} }
/// Finish service configuration and create a HTTP service for HTTP/2 protocol. /// Finish service configuration and create a HTTP service for HTTP/2 protocol.
pub fn h2<F, B>(self, service: F) -> H2Service<T, S, B> #[cfg(feature = "http2")]
pub fn h2<F, B>(self, service: F) -> crate::h2::H2Service<T, S, B>
where where
F: IntoServiceFactory<S, Request>, F: IntoServiceFactory<S, Request>,
S::Error: Into<Response<BoxBody>> + 'static, S::Error: Into<Response<BoxBody>> + 'static,
@ -209,13 +222,14 @@ where
{ {
let cfg = ServiceConfig::new( let cfg = ServiceConfig::new(
self.keep_alive, self.keep_alive,
self.client_timeout, self.client_request_timeout,
self.client_disconnect, self.client_disconnect_timeout,
self.secure, self.secure,
self.local_addr, self.local_addr,
); );
H2Service::with_config(cfg, service.into_factory()).on_connect_ext(self.on_connect_ext) crate::h2::H2Service::with_config(cfg, service.into_factory())
.on_connect_ext(self.on_connect_ext)
} }
/// Finish service configuration and create `HttpService` instance. /// Finish service configuration and create `HttpService` instance.
@ -230,8 +244,8 @@ where
{ {
let cfg = ServiceConfig::new( let cfg = ServiceConfig::new(
self.keep_alive, self.keep_alive,
self.client_timeout, self.client_request_timeout,
self.client_disconnect, self.client_disconnect_timeout,
self.secure, self.secure,
self.local_addr, self.local_addr,
); );

View File

@ -1,71 +1,36 @@
use std::{ use std::{
cell::Cell,
fmt::{self, Write},
net, net,
rc::Rc, rc::Rc,
time::{Duration, SystemTime}, time::{Duration, Instant},
}; };
use actix_rt::{
task::JoinHandle,
time::{interval, sleep_until, Instant, Sleep},
};
use bytes::BytesMut; use bytes::BytesMut;
/// "Sun, 06 Nov 1994 08:49:37 GMT".len() use crate::{date::DateService, KeepAlive};
pub(crate) const DATE_VALUE_LENGTH: usize = 29;
#[derive(Debug, PartialEq, Clone, Copy)] /// HTTP service configuration.
/// Server keep-alive setting #[derive(Debug, Clone)]
pub enum KeepAlive {
/// Keep alive in seconds
Timeout(usize),
/// Rely on OS to shutdown tcp connection
Os,
/// Disabled
Disabled,
}
impl From<usize> for KeepAlive {
fn from(keepalive: usize) -> Self {
KeepAlive::Timeout(keepalive)
}
}
impl From<Option<usize>> for KeepAlive {
fn from(keepalive: Option<usize>) -> Self {
if let Some(keepalive) = keepalive {
KeepAlive::Timeout(keepalive)
} else {
KeepAlive::Disabled
}
}
}
/// Http service configuration
pub struct ServiceConfig(Rc<Inner>); pub struct ServiceConfig(Rc<Inner>);
#[derive(Debug)]
struct Inner { struct Inner {
keep_alive: Option<Duration>, keep_alive: KeepAlive,
client_timeout: u64, client_request_timeout: Duration,
client_disconnect: u64, client_disconnect_timeout: Duration,
ka_enabled: bool,
secure: bool, secure: bool,
local_addr: Option<std::net::SocketAddr>, local_addr: Option<std::net::SocketAddr>,
date_service: DateService, date_service: DateService,
} }
impl Clone for ServiceConfig {
fn clone(&self) -> Self {
ServiceConfig(self.0.clone())
}
}
impl Default for ServiceConfig { impl Default for ServiceConfig {
fn default() -> Self { fn default() -> Self {
Self::new(KeepAlive::Timeout(5), 0, 0, false, None) Self::new(
KeepAlive::default(),
Duration::from_secs(5),
Duration::ZERO,
false,
None,
)
} }
} }
@ -73,34 +38,22 @@ impl ServiceConfig {
/// Create instance of `ServiceConfig` /// Create instance of `ServiceConfig`
pub fn new( pub fn new(
keep_alive: KeepAlive, keep_alive: KeepAlive,
client_timeout: u64, client_request_timeout: Duration,
client_disconnect: u64, client_disconnect_timeout: Duration,
secure: bool, secure: bool,
local_addr: Option<net::SocketAddr>, local_addr: Option<net::SocketAddr>,
) -> ServiceConfig { ) -> ServiceConfig {
let (keep_alive, ka_enabled) = match keep_alive {
KeepAlive::Timeout(val) => (val as u64, true),
KeepAlive::Os => (0, true),
KeepAlive::Disabled => (0, false),
};
let keep_alive = if ka_enabled && keep_alive > 0 {
Some(Duration::from_secs(keep_alive))
} else {
None
};
ServiceConfig(Rc::new(Inner { ServiceConfig(Rc::new(Inner {
keep_alive, keep_alive: keep_alive.normalize(),
ka_enabled, client_request_timeout,
client_timeout, client_disconnect_timeout,
client_disconnect,
secure, secure,
local_addr, local_addr,
date_service: DateService::new(), date_service: DateService::new(),
})) }))
} }
/// Returns true if connection is secure (HTTPS) /// Returns `true` if connection is secure (i.e., using TLS / HTTPS).
#[inline] #[inline]
pub fn secure(&self) -> bool { pub fn secure(&self) -> bool {
self.0.secure self.0.secure
@ -114,239 +67,92 @@ impl ServiceConfig {
self.0.local_addr self.0.local_addr
} }
/// Keep alive duration if configured. /// Connection keep-alive setting.
#[inline] #[inline]
pub fn keep_alive(&self) -> Option<Duration> { pub fn keep_alive(&self) -> KeepAlive {
self.0.keep_alive self.0.keep_alive
} }
/// Return state of connection keep-alive functionality /// Creates a time object representing the deadline for this connection's keep-alive period, if
#[inline] /// enabled.
pub fn keep_alive_enabled(&self) -> bool { ///
self.0.ka_enabled /// When [`KeepAlive::Os`] or [`KeepAlive::Disabled`] is set, this will return `None`.
} pub fn keep_alive_deadline(&self) -> Option<Instant> {
match self.keep_alive() {
/// Client timeout for first request. KeepAlive::Timeout(dur) => Some(self.now() + dur),
#[inline] KeepAlive::Os => None,
pub fn client_timer(&self) -> Option<Sleep> { KeepAlive::Disabled => None,
let delay_time = self.0.client_timeout;
if delay_time != 0 {
Some(sleep_until(self.now() + Duration::from_millis(delay_time)))
} else {
None
} }
} }
/// Client timeout for first request. /// Creates a time object representing the deadline for the client to finish sending the head of
pub fn client_timer_expire(&self) -> Option<Instant> { /// its first request.
let delay = self.0.client_timeout; ///
if delay != 0 { /// Returns `None` if this `ServiceConfig was` constructed with `client_request_timeout: 0`.
Some(self.now() + Duration::from_millis(delay)) pub fn client_request_deadline(&self) -> Option<Instant> {
} else { let timeout = self.0.client_request_timeout;
None (timeout != Duration::ZERO).then(|| self.now() + timeout)
}
} }
/// Client disconnect timer /// Creates a time object representing the deadline for the client to disconnect.
pub fn client_disconnect_timer(&self) -> Option<Instant> { pub fn client_disconnect_deadline(&self) -> Option<Instant> {
let delay = self.0.client_disconnect; let timeout = self.0.client_disconnect_timeout;
if delay != 0 { (timeout != Duration::ZERO).then(|| self.now() + timeout)
Some(self.now() + Duration::from_millis(delay))
} else {
None
}
} }
/// Return keep-alive timer delay is configured.
#[inline]
pub fn keep_alive_timer(&self) -> Option<Sleep> {
self.keep_alive().map(|ka| sleep_until(self.now() + ka))
}
/// Keep-alive expire time
pub fn keep_alive_expire(&self) -> Option<Instant> {
self.keep_alive().map(|ka| self.now() + ka)
}
#[inline]
pub(crate) fn now(&self) -> Instant { pub(crate) fn now(&self) -> Instant {
self.0.date_service.now() self.0.date_service.now()
} }
#[doc(hidden)] pub(crate) fn write_date_header(&self, dst: &mut BytesMut, camel_case: bool) {
pub fn set_date(&self, dst: &mut BytesMut, camel_case: bool) {
let mut buf: [u8; 39] = [0; 39]; let mut buf: [u8; 39] = [0; 39];
buf[..6].copy_from_slice(if camel_case { b"Date: " } else { b"date: " }); buf[..6].copy_from_slice(if camel_case { b"Date: " } else { b"date: " });
self.0 self.0
.date_service .date_service
.set_date(|date| buf[6..35].copy_from_slice(&date.bytes)); .with_date(|date| buf[6..35].copy_from_slice(&date.bytes));
buf[35..].copy_from_slice(b"\r\n\r\n"); buf[35..].copy_from_slice(b"\r\n\r\n");
dst.extend_from_slice(&buf); dst.extend_from_slice(&buf);
} }
pub(crate) fn set_date_header(&self, dst: &mut BytesMut) { #[allow(unused)] // used with `http2` feature flag
pub(crate) fn write_date_header_value(&self, dst: &mut BytesMut) {
self.0 self.0
.date_service .date_service
.set_date(|date| dst.extend_from_slice(&date.bytes)); .with_date(|date| dst.extend_from_slice(&date.bytes));
}
}
#[derive(Copy, Clone)]
struct Date {
bytes: [u8; DATE_VALUE_LENGTH],
pos: usize,
}
impl Date {
fn new() -> Date {
let mut date = Date {
bytes: [0; DATE_VALUE_LENGTH],
pos: 0,
};
date.update();
date
}
fn update(&mut self) {
self.pos = 0;
write!(self, "{}", httpdate::fmt_http_date(SystemTime::now())).unwrap();
}
}
impl fmt::Write for Date {
fn write_str(&mut self, s: &str) -> fmt::Result {
let len = s.len();
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
self.pos += len;
Ok(())
}
}
/// Service for update Date and Instant periodically at 500 millis interval.
struct DateService {
current: Rc<Cell<(Date, Instant)>>,
handle: JoinHandle<()>,
}
impl Drop for DateService {
fn drop(&mut self) {
// stop the timer update async task on drop.
self.handle.abort();
}
}
impl DateService {
fn new() -> Self {
// shared date and timer for DateService and update async task.
let current = Rc::new(Cell::new((Date::new(), Instant::now())));
let current_clone = Rc::clone(&current);
// spawn an async task sleep for 500 milli and update current date/timer in a loop.
// handle is used to stop the task on DateService drop.
let handle = actix_rt::spawn(async move {
#[cfg(test)]
let _notify = notify_on_drop::NotifyOnDrop::new();
let mut interval = interval(Duration::from_millis(500));
loop {
let now = interval.tick().await;
let date = Date::new();
current_clone.set((date, now));
}
});
DateService { current, handle }
}
fn now(&self) -> Instant {
self.current.get().1
}
fn set_date<F: FnMut(&Date)>(&self, mut f: F) {
f(&self.current.get().0);
}
}
// TODO: move to a util module for testing all spawn handle drop style tasks.
/// Test Module for checking the drop state of certain async tasks that are spawned
/// with `actix_rt::spawn`
///
/// The target task must explicitly generate `NotifyOnDrop` when spawn the task
#[cfg(test)]
mod notify_on_drop {
use std::cell::RefCell;
thread_local! {
static NOTIFY_DROPPED: RefCell<Option<bool>> = RefCell::new(None);
}
/// Check if the spawned task is dropped.
///
/// # Panics
/// Panics when there was no `NotifyOnDrop` instance on current thread.
pub(crate) fn is_dropped() -> bool {
NOTIFY_DROPPED.with(|bool| {
bool.borrow()
.expect("No NotifyOnDrop existed on current thread")
})
}
pub(crate) struct NotifyOnDrop;
impl NotifyOnDrop {
/// # Panic:
///
/// When construct multiple instances on any given thread.
pub(crate) fn new() -> Self {
NOTIFY_DROPPED.with(|bool| {
let mut bool = bool.borrow_mut();
if bool.is_some() {
panic!("NotifyOnDrop existed on current thread");
} else {
*bool = Some(false);
}
});
NotifyOnDrop
}
}
impl Drop for NotifyOnDrop {
fn drop(&mut self) {
NOTIFY_DROPPED.with(|bool| {
if let Some(b) = bool.borrow_mut().as_mut() {
*b = true;
}
});
}
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::{date::DATE_VALUE_LENGTH, notify_on_drop};
use actix_rt::{task::yield_now, time::sleep}; use actix_rt::{
task::yield_now,
time::{sleep, sleep_until},
};
use memchr::memmem; use memchr::memmem;
#[actix_rt::test] #[actix_rt::test]
async fn test_date_service_update() { async fn test_date_service_update() {
let settings = ServiceConfig::new(KeepAlive::Os, 0, 0, false, None); let settings =
ServiceConfig::new(KeepAlive::Os, Duration::ZERO, Duration::ZERO, false, None);
yield_now().await; yield_now().await;
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf1, false); settings.write_date_header(&mut buf1, false);
let now1 = settings.now(); let now1 = settings.now();
sleep_until(Instant::now() + Duration::from_secs(2)).await; sleep_until((Instant::now() + Duration::from_secs(2)).into()).await;
yield_now().await; yield_now().await;
let now2 = settings.now(); let now2 = settings.now();
let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf2, false); settings.write_date_header(&mut buf2, false);
assert_ne!(now1, now2); assert_ne!(now1, now2);
@ -402,10 +208,10 @@ mod tests {
let settings = ServiceConfig::default(); let settings = ServiceConfig::default();
let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf1, false); settings.write_date_header(&mut buf1, false);
let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf2, false); settings.write_date_header(&mut buf2, false);
assert_eq!(buf1, buf2); assert_eq!(buf1, buf2);
} }
@ -415,11 +221,11 @@ mod tests {
let settings = ServiceConfig::default(); let settings = ServiceConfig::default();
let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf, false); settings.write_date_header(&mut buf, false);
assert!(memmem::find(&buf, b"date:").is_some()); assert!(memmem::find(&buf, b"date:").is_some());
let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10);
settings.set_date(&mut buf, true); settings.write_date_header(&mut buf, true);
assert!(memmem::find(&buf, b"Date:").is_some()); assert!(memmem::find(&buf, b"Date:").is_some());
} }
} }

92
actix-http/src/date.rs Normal file
View File

@ -0,0 +1,92 @@
use std::{
cell::Cell,
fmt::{self, Write},
rc::Rc,
time::{Duration, Instant, SystemTime},
};
use actix_rt::{task::JoinHandle, time::interval};
/// "Thu, 01 Jan 1970 00:00:00 GMT".len()
pub(crate) const DATE_VALUE_LENGTH: usize = 29;
#[derive(Clone, Copy)]
pub(crate) struct Date {
pub(crate) bytes: [u8; DATE_VALUE_LENGTH],
pos: usize,
}
impl Date {
fn new() -> Date {
let mut date = Date {
bytes: [0; DATE_VALUE_LENGTH],
pos: 0,
};
date.update();
date
}
fn update(&mut self) {
self.pos = 0;
write!(self, "{}", httpdate::fmt_http_date(SystemTime::now())).unwrap();
}
}
impl fmt::Write for Date {
fn write_str(&mut self, s: &str) -> fmt::Result {
let len = s.len();
self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes());
self.pos += len;
Ok(())
}
}
/// Service for update Date and Instant periodically at 500 millis interval.
pub(crate) struct DateService {
current: Rc<Cell<(Date, Instant)>>,
handle: JoinHandle<()>,
}
impl DateService {
pub(crate) fn new() -> Self {
// shared date and timer for DateService and update async task.
let current = Rc::new(Cell::new((Date::new(), Instant::now())));
let current_clone = Rc::clone(&current);
// spawn an async task sleep for 500 millis and update current date/timer in a loop.
// handle is used to stop the task on DateService drop.
let handle = actix_rt::spawn(async move {
#[cfg(test)]
let _notify = crate::notify_on_drop::NotifyOnDrop::new();
let mut interval = interval(Duration::from_millis(500));
loop {
let now = interval.tick().await;
let date = Date::new();
current_clone.set((date, now.into_std()));
}
});
DateService { current, handle }
}
pub(crate) fn now(&self) -> Instant {
self.current.get().1
}
pub(crate) fn with_date<F: FnMut(&Date)>(&self, mut f: F) {
f(&self.current.get().0);
}
}
impl fmt::Debug for DateService {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DateService").finish_non_exhaustive()
}
}
impl Drop for DateService {
fn drop(&mut self) {
// stop the timer update async task on drop.
self.handle.abort();
}
}

View File

@ -352,7 +352,7 @@ impl ContentEncoder {
ContentEncoder::Brotli(ref mut encoder) => match encoder.write_all(data) { ContentEncoder::Brotli(ref mut encoder) => match encoder.write_all(data) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(err) => { Err(err) => {
trace!("Error decoding br encoding: {}", err); log::trace!("Error decoding br encoding: {}", err);
Err(err) Err(err)
} }
}, },
@ -361,7 +361,7 @@ impl ContentEncoder {
ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) { ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(err) => { Err(err) => {
trace!("Error decoding gzip encoding: {}", err); log::trace!("Error decoding gzip encoding: {}", err);
Err(err) Err(err)
} }
}, },
@ -370,7 +370,7 @@ impl ContentEncoder {
ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) { ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(err) => { Err(err) => {
trace!("Error decoding deflate encoding: {}", err); log::trace!("Error decoding deflate encoding: {}", err);
Err(err) Err(err)
} }
}, },
@ -379,7 +379,7 @@ impl ContentEncoder {
ContentEncoder::Zstd(ref mut encoder) => match encoder.write_all(data) { ContentEncoder::Zstd(ref mut encoder) => match encoder.write_all(data) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(err) => { Err(err) => {
trace!("Error decoding ztsd encoding: {}", err); log::trace!("Error decoding ztsd encoding: {}", err);
Err(err) Err(err)
} }
}, },

View File

@ -5,7 +5,7 @@ use std::{error::Error as StdError, fmt, io, str::Utf8Error, string::FromUtf8Err
use derive_more::{Display, Error, From}; use derive_more::{Display, Error, From};
use http::{uri::InvalidUri, StatusCode}; use http::{uri::InvalidUri, StatusCode};
use crate::{body::BoxBody, ws, Response}; use crate::{body::BoxBody, Response};
pub use http::Error as HttpError; pub use http::Error as HttpError;
@ -61,6 +61,7 @@ impl Error {
Self::new(Kind::Encoder) Self::new(Kind::Encoder)
} }
#[allow(unused)] // used with `ws` feature flag
pub(crate) fn new_ws() -> Self { pub(crate) fn new_ws() -> Self {
Self::new(Kind::Ws) Self::new(Kind::Ws)
} }
@ -139,14 +140,16 @@ impl From<HttpError> for Error {
} }
} }
impl From<ws::HandshakeError> for Error { #[cfg(feature = "ws")]
fn from(err: ws::HandshakeError) -> Self { impl From<crate::ws::HandshakeError> for Error {
fn from(err: crate::ws::HandshakeError) -> Self {
Self::new_ws().with_cause(err) Self::new_ws().with_cause(err)
} }
} }
impl From<ws::ProtocolError> for Error { #[cfg(feature = "ws")]
fn from(err: ws::ProtocolError) -> Self { impl From<crate::ws::ProtocolError> for Error {
fn from(err: crate::ws::ProtocolError) -> Self {
Self::new_ws().with_cause(err) Self::new_ws().with_cause(err)
} }
} }
@ -277,8 +280,9 @@ pub enum PayloadError {
UnknownLength, UnknownLength,
/// HTTP/2 payload error. /// HTTP/2 payload error.
#[cfg(feature = "http2")]
#[display(fmt = "{}", _0)] #[display(fmt = "{}", _0)]
Http2Payload(h2::Error), Http2Payload(::h2::Error),
/// Generic I/O error. /// Generic I/O error.
#[display(fmt = "{}", _0)] #[display(fmt = "{}", _0)]
@ -293,14 +297,16 @@ impl std::error::Error for PayloadError {
PayloadError::EncodingCorrupted => None, PayloadError::EncodingCorrupted => None,
PayloadError::Overflow => None, PayloadError::Overflow => None,
PayloadError::UnknownLength => None, PayloadError::UnknownLength => None,
#[cfg(feature = "http2")]
PayloadError::Http2Payload(err) => Some(err as &dyn std::error::Error), PayloadError::Http2Payload(err) => Some(err as &dyn std::error::Error),
PayloadError::Io(err) => Some(err as &dyn std::error::Error), PayloadError::Io(err) => Some(err as &dyn std::error::Error),
} }
} }
} }
impl From<h2::Error> for PayloadError { #[cfg(feature = "http2")]
fn from(err: h2::Error) -> Self { impl From<::h2::Error> for PayloadError {
fn from(err: ::h2::Error) -> Self {
PayloadError::Http2Payload(err) PayloadError::Http2Payload(err)
} }
} }
@ -356,6 +362,7 @@ pub enum DispatchError {
/// HTTP/2 error. /// HTTP/2 error.
#[display(fmt = "{}", _0)] #[display(fmt = "{}", _0)]
#[cfg(feature = "http2")]
H2(h2::Error), H2(h2::Error),
/// The first request did not complete within the specified timeout. /// The first request did not complete within the specified timeout.
@ -379,7 +386,10 @@ impl StdError for DispatchError {
DispatchError::Body(err) => Some(&**err), DispatchError::Body(err) => Some(&**err),
DispatchError::Io(err) => Some(err), DispatchError::Io(err) => Some(err),
DispatchError::Parse(err) => Some(err), DispatchError::Parse(err) => Some(err),
#[cfg(feature = "http2")]
DispatchError::H2(err) => Some(err), DispatchError::H2(err) => Some(err),
_ => None, _ => None,
} }
} }

View File

@ -1,4 +1,4 @@
use std::io; use std::{fmt, io};
use actix_codec::{Decoder, Encoder}; use actix_codec::{Decoder, Encoder};
use bitflags::bitflags; use bitflags::bitflags;
@ -17,9 +17,9 @@ use crate::{
bitflags! { bitflags! {
struct Flags: u8 { struct Flags: u8 {
const HEAD = 0b0000_0001; const HEAD = 0b0000_0001;
const KEEPALIVE_ENABLED = 0b0000_1000; const KEEP_ALIVE_ENABLED = 0b0000_1000;
const STREAM = 0b0001_0000; const STREAM = 0b0001_0000;
} }
} }
@ -38,7 +38,7 @@ struct ClientCodecInner {
decoder: decoder::MessageDecoder<ResponseHead>, decoder: decoder::MessageDecoder<ResponseHead>,
payload: Option<PayloadDecoder>, payload: Option<PayloadDecoder>,
version: Version, version: Version,
ctype: ConnectionType, conn_type: ConnectionType,
// encoder part // encoder part
flags: Flags, flags: Flags,
@ -51,23 +51,32 @@ impl Default for ClientCodec {
} }
} }
impl fmt::Debug for ClientCodec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("h1::ClientCodec")
.field("flags", &self.inner.flags)
.finish_non_exhaustive()
}
}
impl ClientCodec { impl ClientCodec {
/// Create HTTP/1 codec. /// Create HTTP/1 codec.
/// ///
/// `keepalive_enabled` how response `connection` header get generated. /// `keepalive_enabled` how response `connection` header get generated.
pub fn new(config: ServiceConfig) -> Self { pub fn new(config: ServiceConfig) -> Self {
let flags = if config.keep_alive_enabled() { let flags = if config.keep_alive().enabled() {
Flags::KEEPALIVE_ENABLED Flags::KEEP_ALIVE_ENABLED
} else { } else {
Flags::empty() Flags::empty()
}; };
ClientCodec { ClientCodec {
inner: ClientCodecInner { inner: ClientCodecInner {
config, config,
decoder: decoder::MessageDecoder::default(), decoder: decoder::MessageDecoder::default(),
payload: None, payload: None,
version: Version::HTTP_11, version: Version::HTTP_11,
ctype: ConnectionType::Close, conn_type: ConnectionType::Close,
flags, flags,
encoder: encoder::MessageEncoder::default(), encoder: encoder::MessageEncoder::default(),
@ -77,12 +86,12 @@ impl ClientCodec {
/// Check if request is upgrade /// Check if request is upgrade
pub fn upgrade(&self) -> bool { pub fn upgrade(&self) -> bool {
self.inner.ctype == ConnectionType::Upgrade self.inner.conn_type == ConnectionType::Upgrade
} }
/// Check if last response is keep-alive /// Check if last response is keep-alive
pub fn keepalive(&self) -> bool { pub fn keep_alive(&self) -> bool {
self.inner.ctype == ConnectionType::KeepAlive self.inner.conn_type == ConnectionType::KeepAlive
} }
/// Check last request's message type /// Check last request's message type
@ -104,8 +113,8 @@ impl ClientCodec {
impl ClientPayloadCodec { impl ClientPayloadCodec {
/// Check if last response is keep-alive /// Check if last response is keep-alive
pub fn keepalive(&self) -> bool { pub fn keep_alive(&self) -> bool {
self.inner.ctype == ConnectionType::KeepAlive self.inner.conn_type == ConnectionType::KeepAlive
} }
/// Transform payload codec to a message codec /// Transform payload codec to a message codec
@ -122,12 +131,12 @@ impl Decoder for ClientCodec {
debug_assert!(!self.inner.payload.is_some(), "Payload decoder is set"); debug_assert!(!self.inner.payload.is_some(), "Payload decoder is set");
if let Some((req, payload)) = self.inner.decoder.decode(src)? { if let Some((req, payload)) = self.inner.decoder.decode(src)? {
if let Some(ctype) = req.conn_type() { if let Some(conn_type) = req.conn_type() {
// do not use peer's keep-alive // do not use peer's keep-alive
self.inner.ctype = if ctype == ConnectionType::KeepAlive { self.inner.conn_type = if conn_type == ConnectionType::KeepAlive {
self.inner.ctype self.inner.conn_type
} else { } else {
ctype conn_type
}; };
} }
@ -192,9 +201,9 @@ impl Encoder<Message<(RequestHeadType, BodySize)>> for ClientCodec {
.set(Flags::HEAD, head.as_ref().method == Method::HEAD); .set(Flags::HEAD, head.as_ref().method == Method::HEAD);
// connection status // connection status
inner.ctype = match head.as_ref().connection_type() { inner.conn_type = match head.as_ref().connection_type() {
ConnectionType::KeepAlive => { ConnectionType::KeepAlive => {
if inner.flags.contains(Flags::KEEPALIVE_ENABLED) { if inner.flags.contains(Flags::KEEP_ALIVE_ENABLED) {
ConnectionType::KeepAlive ConnectionType::KeepAlive
} else { } else {
ConnectionType::Close ConnectionType::Close
@ -211,7 +220,7 @@ impl Encoder<Message<(RequestHeadType, BodySize)>> for ClientCodec {
false, false,
inner.version, inner.version,
length, length,
inner.ctype, inner.conn_type,
&inner.config, &inner.config,
)?; )?;
} }

View File

@ -15,9 +15,9 @@ use crate::{
bitflags! { bitflags! {
struct Flags: u8 { struct Flags: u8 {
const HEAD = 0b0000_0001; const HEAD = 0b0000_0001;
const KEEPALIVE_ENABLED = 0b0000_0010; const KEEP_ALIVE_ENABLED = 0b0000_0010;
const STREAM = 0b0000_0100; const STREAM = 0b0000_0100;
} }
} }
@ -42,7 +42,9 @@ impl Default for Codec {
impl fmt::Debug for Codec { impl fmt::Debug for Codec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "h1::Codec({:?})", self.flags) f.debug_struct("h1::Codec")
.field("flags", &self.flags)
.finish_non_exhaustive()
} }
} }
@ -51,8 +53,8 @@ impl Codec {
/// ///
/// `keepalive_enabled` how response `connection` header get generated. /// `keepalive_enabled` how response `connection` header get generated.
pub fn new(config: ServiceConfig) -> Self { pub fn new(config: ServiceConfig) -> Self {
let flags = if config.keep_alive_enabled() { let flags = if config.keep_alive().enabled() {
Flags::KEEPALIVE_ENABLED Flags::KEEP_ALIVE_ENABLED
} else { } else {
Flags::empty() Flags::empty()
}; };
@ -76,14 +78,14 @@ impl Codec {
/// Check if last response is keep-alive. /// Check if last response is keep-alive.
#[inline] #[inline]
pub fn keepalive(&self) -> bool { pub fn keep_alive(&self) -> bool {
self.conn_type == ConnectionType::KeepAlive self.conn_type == ConnectionType::KeepAlive
} }
/// Check if keep-alive enabled on server level. /// Check if keep-alive enabled on server level.
#[inline] #[inline]
pub fn keepalive_enabled(&self) -> bool { pub fn keep_alive_enabled(&self) -> bool {
self.flags.contains(Flags::KEEPALIVE_ENABLED) self.flags.contains(Flags::KEEP_ALIVE_ENABLED)
} }
/// Check last request's message type. /// Check last request's message type.
@ -124,7 +126,7 @@ impl Decoder for Codec {
self.version = head.version; self.version = head.version;
self.conn_type = head.connection_type(); self.conn_type = head.connection_type();
if self.conn_type == ConnectionType::KeepAlive if self.conn_type == ConnectionType::KeepAlive
&& !self.flags.contains(Flags::KEEPALIVE_ENABLED) && !self.flags.contains(Flags::KEEP_ALIVE_ENABLED)
{ {
self.conn_type = ConnectionType::Close self.conn_type = ConnectionType::Close
} }
@ -179,9 +181,11 @@ impl Encoder<Message<(Response<()>, BodySize)>> for Codec {
&self.config, &self.config,
)?; )?;
} }
Message::Chunk(Some(bytes)) => { Message::Chunk(Some(bytes)) => {
self.encoder.encode_chunk(bytes.as_ref(), dst)?; self.encoder.encode_chunk(bytes.as_ref(), dst)?;
} }
Message::Chunk(None) => { Message::Chunk(None) => {
self.encoder.encode_eof(dst)?; self.encoder.encode_eof(dst)?;
} }

File diff suppressed because it is too large Load Diff

View File

@ -17,7 +17,7 @@ use crate::{
h1::{Codec, ExpectHandler, UpgradeHandler}, h1::{Codec, ExpectHandler, UpgradeHandler},
service::HttpFlow, service::HttpFlow,
test::{TestBuffer, TestSeqBuffer}, test::{TestBuffer, TestSeqBuffer},
Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, StatusCode,
}; };
fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> { fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option<usize> {
@ -34,7 +34,13 @@ fn stabilize_date_header(payload: &mut [u8]) {
} }
fn ok_service() -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error> { fn ok_service() -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error> {
fn_service(|_req: Request| ready(Ok::<_, Error>(Response::ok()))) status_service(StatusCode::OK)
}
fn status_service(
status: StatusCode,
) -> impl Service<Request, Response = Response<impl MessageBody>, Error = Error> {
fn_service(move |_req: Request| ready(Ok::<_, Error>(Response::new(status))))
} }
fn echo_path_service( fn echo_path_service(
@ -64,10 +70,83 @@ fn echo_payload_service() -> impl Service<Request, Response = Response<Bytes>, E
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_basic() { async fn late_request() {
let mut buf = TestBuffer::empty();
let cfg = ServiceConfig::new(
KeepAlive::Disabled,
Duration::from_millis(100),
Duration::ZERO,
false,
None,
);
let services = HttpFlow::new(ok_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
buf.clone(),
services,
cfg,
None,
OnConnectData::default(),
);
actix_rt::pin!(h1);
lazy(|cx| {
assert!(matches!(&h1.inner, DispatcherState::Normal { .. }));
match h1.as_mut().poll(cx) {
Poll::Ready(_) => panic!("first poll should not be ready"),
Poll::Pending => {}
}
// polls: initial
assert_eq!(h1.poll_count, 1);
buf.extend_read_buf("GET /abcd HTTP/1.1\r\nConnection: close\r\n\r\n");
match h1.as_mut().poll(cx) {
Poll::Pending => panic!("second poll should not be pending"),
Poll::Ready(res) => assert!(res.is_ok()),
}
// polls: initial pending => handle req => shutdown
assert_eq!(h1.poll_count, 3);
let mut res = buf.take_write_buf().to_vec();
stabilize_date_header(&mut res);
let res = &res[..];
let exp = b"\
HTTP/1.1 200 OK\r\n\
content-length: 0\r\n\
connection: close\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\
";
assert_eq!(
res,
exp,
"\nexpected response not in write buffer:\n\
response: {:?}\n\
expected: {:?}",
String::from_utf8_lossy(res),
String::from_utf8_lossy(exp)
);
})
.await;
}
#[actix_rt::test]
async fn oneshot_connection() {
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
let cfg = ServiceConfig::new(KeepAlive::Disabled, 100, 0, false, None); let cfg = ServiceConfig::new(
KeepAlive::Disabled,
Duration::from_millis(100),
Duration::ZERO,
false,
None,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
@ -116,10 +195,16 @@ async fn test_basic() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_keep_alive_timeout() { async fn keep_alive_timeout() {
let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None); let cfg = ServiceConfig::new(
KeepAlive::Timeout(Duration::from_millis(200)),
Duration::from_millis(100),
Duration::ZERO,
false,
None,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
@ -166,7 +251,7 @@ async fn test_keep_alive_timeout() {
.await; .await;
// sleep slightly longer than keep-alive timeout // sleep slightly longer than keep-alive timeout
sleep(Duration::from_millis(1100)).await; sleep(Duration::from_millis(250)).await;
lazy(|cx| { lazy(|cx| {
assert!( assert!(
@ -189,10 +274,16 @@ async fn test_keep_alive_timeout() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_keep_alive_follow_up_req() { async fn keep_alive_follow_up_req() {
let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n");
let cfg = ServiceConfig::new(KeepAlive::Timeout(2), 100, 0, false, None); let cfg = ServiceConfig::new(
KeepAlive::Timeout(Duration::from_millis(500)),
Duration::from_millis(100),
Duration::ZERO,
false,
None,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new(
@ -239,7 +330,7 @@ async fn test_keep_alive_follow_up_req() {
.await; .await;
// sleep for less than KA timeout // sleep for less than KA timeout
sleep(Duration::from_millis(200)).await; sleep(Duration::from_millis(100)).await;
lazy(|cx| { lazy(|cx| {
assert!( assert!(
@ -308,7 +399,7 @@ async fn test_keep_alive_follow_up_req() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_req_parse_err() { async fn req_parse_err() {
lazy(|cx| { lazy(|cx| {
let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n"); let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n");
@ -350,7 +441,13 @@ async fn pipelining_ok_then_ok() {
", ",
); );
let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); let cfg = ServiceConfig::new(
KeepAlive::Disabled,
Duration::from_millis(1),
Duration::from_millis(1),
false,
None,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
@ -414,7 +511,13 @@ async fn pipelining_ok_then_bad() {
", ",
); );
let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); let cfg = ServiceConfig::new(
KeepAlive::Disabled,
Duration::from_millis(1),
Duration::from_millis(1),
false,
None,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
@ -468,10 +571,16 @@ async fn pipelining_ok_then_bad() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_expect() { async fn expect_handling() {
lazy(|cx| { lazy(|cx| {
let mut buf = TestSeqBuffer::empty(); let mut buf = TestSeqBuffer::empty();
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); let cfg = ServiceConfig::new(
KeepAlive::Disabled,
Duration::ZERO,
Duration::ZERO,
false,
None,
);
let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None); let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None);
@ -499,7 +608,6 @@ async fn test_expect() {
// polls: manual // polls: manual
assert_eq!(h1.poll_count, 1); assert_eq!(h1.poll_count, 1);
eprintln!("poll count: {}", h1.poll_count);
if let DispatcherState::Normal { ref inner } = h1.inner { if let DispatcherState::Normal { ref inner } = h1.inner {
let io = inner.io.as_ref().unwrap(); let io = inner.io.as_ref().unwrap();
@ -540,10 +648,16 @@ async fn test_expect() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_eager_expect() { async fn expect_eager() {
lazy(|cx| { lazy(|cx| {
let mut buf = TestSeqBuffer::empty(); let mut buf = TestSeqBuffer::empty();
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); let cfg = ServiceConfig::new(
KeepAlive::Disabled,
Duration::ZERO,
Duration::ZERO,
false,
None,
);
let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None);
@ -600,7 +714,7 @@ async fn test_eager_expect() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_upgrade() { async fn upgrade_handling() {
struct TestUpgrade; struct TestUpgrade;
impl<T> Service<(Request, Framed<T, Codec>)> for TestUpgrade { impl<T> Service<(Request, Framed<T, Codec>)> for TestUpgrade {
@ -620,7 +734,13 @@ async fn test_upgrade() {
lazy(|cx| { lazy(|cx| {
let mut buf = TestSeqBuffer::empty(); let mut buf = TestSeqBuffer::empty();
let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); let cfg = ServiceConfig::new(
KeepAlive::Disabled,
Duration::ZERO,
Duration::ZERO,
false,
None,
);
let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade)); let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade));

View File

@ -212,7 +212,7 @@ pub(crate) trait MessageType: Sized {
// optimized date header, set_date writes \r\n // optimized date header, set_date writes \r\n
if !has_date { if !has_date {
config.set_date(dst, camel_case); config.write_date_header(dst, camel_case);
} else { } else {
// msg eof // msg eof
dst.extend_from_slice(b"\r\n"); dst.extend_from_slice(b"\r\n");
@ -318,16 +318,17 @@ impl MessageType for RequestHeadType {
} }
impl<T: MessageType> MessageEncoder<T> { impl<T: MessageType> MessageEncoder<T> {
/// Encode message /// Encode chunk.
pub fn encode_chunk(&mut self, msg: &[u8], buf: &mut BytesMut) -> io::Result<bool> { pub fn encode_chunk(&mut self, msg: &[u8], buf: &mut BytesMut) -> io::Result<bool> {
self.te.encode(msg, buf) self.te.encode(msg, buf)
} }
/// Encode eof /// Encode EOF.
pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> { pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> {
self.te.encode_eof(buf) self.te.encode_eof(buf)
} }
/// Encode message.
pub fn encode( pub fn encode(
&mut self, &mut self,
dst: &mut BytesMut, dst: &mut BytesMut,

View File

@ -13,6 +13,7 @@ mod encoder;
mod expect; mod expect;
mod payload; mod payload;
mod service; mod service;
mod timer;
mod upgrade; mod upgrade;
mod utils; mod utils;
@ -28,9 +29,10 @@ pub use self::utils::SendResponse;
#[derive(Debug)] #[derive(Debug)]
/// Codec message /// Codec message
pub enum Message<T> { pub enum Message<T> {
/// Http message /// HTTP message.
Item(T), Item(T),
/// Payload chunk
/// Payload chunk.
Chunk(Option<Bytes>), Chunk(Option<Bytes>),
} }

View File

@ -0,0 +1,80 @@
use std::{fmt, future::Future, pin::Pin, task::Context};
use actix_rt::time::{Instant, Sleep};
#[derive(Debug)]
pub(super) enum TimerState {
Disabled,
Inactive,
Active { timer: Pin<Box<Sleep>> },
}
impl TimerState {
pub(super) fn new(enabled: bool) -> Self {
if enabled {
Self::Inactive
} else {
Self::Disabled
}
}
pub(super) fn is_enabled(&self) -> bool {
matches!(self, Self::Active { .. } | Self::Inactive)
}
pub(super) fn set(&mut self, timer: Sleep, line: u32) {
if matches!(self, Self::Disabled) {
log::trace!("setting disabled timer from line {}", line);
}
*self = Self::Active {
timer: Box::pin(timer),
};
}
pub(super) fn set_and_init(&mut self, cx: &mut Context<'_>, timer: Sleep, line: u32) {
self.set(timer, line);
self.init(cx);
}
pub(super) fn clear(&mut self, line: u32) {
if matches!(self, Self::Disabled) {
log::trace!("trying to clear a disabled timer from line {}", line);
}
if matches!(self, Self::Inactive) {
log::trace!("trying to clear an inactive timer from line {}", line);
}
*self = Self::Inactive;
}
pub(super) fn init(&mut self, cx: &mut Context<'_>) {
if let TimerState::Active { timer } = self {
let _ = timer.as_mut().poll(cx);
}
}
}
impl fmt::Display for TimerState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TimerState::Disabled => f.write_str("timer is disabled"),
TimerState::Inactive => f.write_str("timer is inactive"),
TimerState::Active { timer } => {
let deadline = timer.deadline();
let now = Instant::now();
if deadline < now {
f.write_str("timer is active and has reached deadline")
} else {
write!(
f,
"timer is active and due to expire in {} milliseconds",
((deadline - now).as_secs_f32() * 1000.0)
)
}
}
}
}
}

View File

@ -57,11 +57,11 @@ where
conn_data: OnConnectData, conn_data: OnConnectData,
timer: Option<Pin<Box<Sleep>>>, timer: Option<Pin<Box<Sleep>>>,
) -> Self { ) -> Self {
let ping_pong = config.keep_alive().map(|dur| H2PingPong { let ping_pong = config.keep_alive().duration().map(|dur| H2PingPong {
timer: timer timer: timer
.map(|mut timer| { .map(|mut timer| {
// reset timer if it's received from new function. // reuse timer slot if it was initialized for handshake
timer.as_mut().reset(config.now() + dur); timer.as_mut().reset((config.now() + dur).into());
timer timer
}) })
.unwrap_or_else(|| Box::pin(sleep(dur))), .unwrap_or_else(|| Box::pin(sleep(dur))),
@ -141,7 +141,7 @@ where
DispatchError::SendResponse(err) => { DispatchError::SendResponse(err) => {
trace!("Error sending HTTP/2 response: {:?}", err) trace!("Error sending HTTP/2 response: {:?}", err)
} }
DispatchError::SendData(err) => warn!("{:?}", err), DispatchError::SendData(err) => log::warn!("{:?}", err),
DispatchError::ResponseBody(err) => { DispatchError::ResponseBody(err) => {
error!("Response payload stream error: {:?}", err) error!("Response payload stream error: {:?}", err)
} }
@ -160,8 +160,8 @@ where
Poll::Ready(_) => { Poll::Ready(_) => {
ping_pong.on_flight = false; ping_pong.on_flight = false;
let dead_line = this.config.keep_alive_expire().unwrap(); let dead_line = this.config.keep_alive_deadline().unwrap();
ping_pong.timer.as_mut().reset(dead_line); ping_pong.timer.as_mut().reset(dead_line.into());
} }
Poll::Pending => { Poll::Pending => {
return ping_pong.timer.as_mut().poll(cx).map(|_| Ok(())) return ping_pong.timer.as_mut().poll(cx).map(|_| Ok(()))
@ -174,8 +174,8 @@ where
ping_pong.ping_pong.send_ping(Ping::opaque())?; ping_pong.ping_pong.send_ping(Ping::opaque())?;
let dead_line = this.config.keep_alive_expire().unwrap(); let dead_line = this.config.keep_alive_deadline().unwrap();
ping_pong.timer.as_mut().reset(dead_line); ping_pong.timer.as_mut().reset(dead_line.into());
ping_pong.on_flight = true; ping_pong.on_flight = true;
} }
@ -322,7 +322,7 @@ fn prepare_response(
// set date header // set date header
if !has_date { if !has_date {
let mut bytes = BytesMut::with_capacity(29); let mut bytes = BytesMut::with_capacity(29);
config.set_date_header(&mut bytes); config.write_date_header_value(&mut bytes);
res.headers_mut().insert( res.headers_mut().insert(
DATE, DATE,
// SAFETY: serialized date-times are known ASCII strings // SAFETY: serialized date-times are known ASCII strings

View File

@ -7,7 +7,7 @@ use std::{
}; };
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::time::Sleep; use actix_rt::time::{sleep_until, Sleep};
use bytes::Bytes; use bytes::Bytes;
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use h2::{ use h2::{
@ -15,17 +15,17 @@ use h2::{
RecvStream, RecvStream,
}; };
use crate::{
config::ServiceConfig,
error::{DispatchError, PayloadError},
};
mod dispatcher; mod dispatcher;
mod service; mod service;
pub use self::dispatcher::Dispatcher; pub use self::dispatcher::Dispatcher;
pub use self::service::H2Service; pub use self::service::H2Service;
use crate::{
config::ServiceConfig,
error::{DispatchError, PayloadError},
};
/// HTTP/2 peer stream. /// HTTP/2 peer stream.
pub struct Payload { pub struct Payload {
stream: RecvStream, stream: RecvStream,
@ -67,7 +67,9 @@ where
{ {
HandshakeWithTimeout { HandshakeWithTimeout {
handshake: handshake(io), handshake: handshake(io),
timer: config.client_timer().map(Box::pin), timer: config
.client_request_deadline()
.map(|deadline| Box::pin(sleep_until(deadline.into()))),
} }
} }
@ -86,7 +88,7 @@ where
let this = self.get_mut(); let this = self.get_mut();
match Pin::new(&mut this.handshake).poll(cx)? { match Pin::new(&mut this.handshake).poll(cx)? {
// return the timer on success handshake. It can be re-used for h2 ping-pong. // return the timer on success handshake; its slot can be re-used for h2 ping-pong
Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))), Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))),
Poll::Pending => match this.timer.as_mut() { Poll::Pending => match this.timer.as_mut() {
Some(timer) => { Some(timer) => {

View File

@ -355,7 +355,7 @@ where
} }
Err(err) => { Err(err) => {
trace!("H2 handshake error: {}", err); log::trace!("H2 handshake error: {}", err);
Poll::Ready(Err(err)) Poll::Ready(Err(err))
} }
}, },

View File

@ -630,7 +630,7 @@ impl Removed {
/// Returns true if iterator contains no elements, without consuming it. /// Returns true if iterator contains no elements, without consuming it.
/// ///
/// If called immediately after [`HeaderMap::insert`] or [`HeaderMap::remove`], it will indicate /// If called immediately after [`HeaderMap::insert`] or [`HeaderMap::remove`], it will indicate
/// wether any items were actually replaced or removed, respectively. /// whether any items were actually replaced or removed, respectively.
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
match self.inner { match self.inner {
// size hint lower bound of smallvec is the correct length // size hint lower bound of smallvec is the correct length

View File

@ -4,8 +4,7 @@ use bytes::BytesMut;
use http::header::{HeaderValue, InvalidHeaderValue}; use http::header::{HeaderValue, InvalidHeaderValue};
use crate::{ use crate::{
config::DATE_VALUE_LENGTH, error::ParseError, header::TryIntoHeaderValue, date::DATE_VALUE_LENGTH, error::ParseError, header::TryIntoHeaderValue, helpers::MutWriter,
helpers::MutWriter,
}; };
/// A timestamp with HTTP-style formatting and parsing. /// A timestamp with HTTP-style formatting and parsing.

View File

@ -55,7 +55,7 @@ pub trait HttpMessage: Sized {
"" ""
} }
/// Get content type encoding /// Get content type encoding.
/// ///
/// UTF-8 is used by default, If request charset is not set. /// UTF-8 is used by default, If request charset is not set.
fn encoding(&self) -> Result<&'static Encoding, ContentTypeError> { fn encoding(&self) -> Result<&'static Encoding, ContentTypeError> {

View File

@ -0,0 +1,84 @@
use std::time::Duration;
/// Connection keep-alive config.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum KeepAlive {
/// Keep-alive duration.
///
/// `KeepAlive::Timeout(Duration::ZERO)` is mapped to `KeepAlive::Disabled`.
Timeout(Duration),
/// Rely on OS to shutdown TCP connection.
///
/// Some defaults can be very long, check your OS documentation.
Os,
/// Keep-alive is disabled.
///
/// Connections will be closed immediately.
Disabled,
}
impl KeepAlive {
pub(crate) fn enabled(&self) -> bool {
!matches!(self, Self::Disabled)
}
#[allow(unused)] // used with `http2` feature flag
pub(crate) fn duration(&self) -> Option<Duration> {
match self {
KeepAlive::Timeout(dur) => Some(*dur),
_ => None,
}
}
/// Map zero duration to disabled.
pub(crate) fn normalize(self) -> KeepAlive {
match self {
KeepAlive::Timeout(Duration::ZERO) => KeepAlive::Disabled,
ka => ka,
}
}
}
impl Default for KeepAlive {
fn default() -> Self {
Self::Timeout(Duration::from_secs(5))
}
}
impl From<Duration> for KeepAlive {
fn from(dur: Duration) -> Self {
KeepAlive::Timeout(dur).normalize()
}
}
impl From<Option<Duration>> for KeepAlive {
fn from(ka_dur: Option<Duration>) -> Self {
match ka_dur {
Some(dur) => KeepAlive::from(dur),
None => KeepAlive::Disabled,
}
.normalize()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn from_impls() {
let test: KeepAlive = Duration::from_secs(1).into();
assert_eq!(test, KeepAlive::Timeout(Duration::from_secs(1)));
let test: KeepAlive = Duration::from_secs(0).into();
assert_eq!(test, KeepAlive::Disabled);
let test: KeepAlive = Some(Duration::from_secs(0)).into();
assert_eq!(test, KeepAlive::Disabled);
let test: KeepAlive = None.into();
assert_eq!(test, KeepAlive::Disabled);
}
}

View File

@ -24,38 +24,42 @@
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
#[macro_use]
extern crate log;
pub use ::http::{uri, uri::Uri}; pub use ::http::{uri, uri::Uri};
pub use ::http::{Method, StatusCode, Version}; pub use ::http::{Method, StatusCode, Version};
pub mod body; pub mod body;
mod builder; mod builder;
mod config; mod config;
mod date;
#[cfg(feature = "__compress")] #[cfg(feature = "__compress")]
pub mod encoding; pub mod encoding;
pub mod error; pub mod error;
mod extensions; mod extensions;
pub mod h1; pub mod h1;
#[cfg(feature = "http2")]
pub mod h2; pub mod h2;
pub mod header; pub mod header;
mod helpers; mod helpers;
mod http_message; mod http_message;
mod keep_alive;
mod message; mod message;
#[cfg(test)]
mod notify_on_drop;
mod payload; mod payload;
mod requests; mod requests;
mod responses; mod responses;
mod service; mod service;
pub mod test; pub mod test;
#[cfg(feature = "ws")]
pub mod ws; pub mod ws;
pub use self::builder::HttpServiceBuilder; pub use self::builder::HttpServiceBuilder;
pub use self::config::{KeepAlive, ServiceConfig}; pub use self::config::ServiceConfig;
pub use self::error::Error; pub use self::error::Error;
pub use self::extensions::Extensions; pub use self::extensions::Extensions;
pub use self::header::ContentEncoding; pub use self::header::ContentEncoding;
pub use self::http_message::HttpMessage; pub use self::http_message::HttpMessage;
pub use self::keep_alive::KeepAlive;
pub use self::message::ConnectionType; pub use self::message::ConnectionType;
pub use self::message::Message; pub use self::message::Message;
#[allow(deprecated)] #[allow(deprecated)]

View File

@ -0,0 +1,49 @@
/// Test Module for checking the drop state of certain async tasks that are spawned
/// with `actix_rt::spawn`
///
/// The target task must explicitly generate `NotifyOnDrop` when spawn the task
use std::cell::RefCell;
thread_local! {
static NOTIFY_DROPPED: RefCell<Option<bool>> = RefCell::new(None);
}
/// Check if the spawned task is dropped.
///
/// # Panics
/// Panics when there was no `NotifyOnDrop` instance on current thread.
pub(crate) fn is_dropped() -> bool {
NOTIFY_DROPPED.with(|bool| {
bool.borrow()
.expect("No NotifyOnDrop existed on current thread")
})
}
pub(crate) struct NotifyOnDrop;
impl NotifyOnDrop {
/// # Panics
/// Panics hen construct multiple instances on any given thread.
pub(crate) fn new() -> Self {
NOTIFY_DROPPED.with(|bool| {
let mut bool = bool.borrow_mut();
if bool.is_some() {
panic!("NotifyOnDrop existed on current thread");
} else {
*bool = Some(false);
}
});
NotifyOnDrop
}
}
impl Drop for NotifyOnDrop {
fn drop(&mut self) {
NOTIFY_DROPPED.with(|bool| {
if let Some(b) = bool.borrow_mut().as_mut() {
*b = true;
}
});
}
}

View File

@ -16,6 +16,18 @@ pub type BoxedPayloadStream = Pin<Box<dyn Stream<Item = Result<Bytes, PayloadErr
#[deprecated(since = "4.0.0", note = "Renamed to `BoxedPayloadStream`.")] #[deprecated(since = "4.0.0", note = "Renamed to `BoxedPayloadStream`.")]
pub type PayloadStream = BoxedPayloadStream; pub type PayloadStream = BoxedPayloadStream;
#[cfg(not(feature = "http2"))]
pin_project! {
/// A streaming payload.
#[project = PayloadProj]
pub enum Payload<S = BoxedPayloadStream> {
None,
H1 { payload: crate::h1::Payload },
Stream { #[pin] payload: S },
}
}
#[cfg(feature = "http2")]
pin_project! { pin_project! {
/// A streaming payload. /// A streaming payload.
#[project = PayloadProj] #[project = PayloadProj]
@ -33,14 +45,16 @@ impl<S> From<crate::h1::Payload> for Payload<S> {
} }
} }
#[cfg(feature = "http2")]
impl<S> From<crate::h2::Payload> for Payload<S> { impl<S> From<crate::h2::Payload> for Payload<S> {
fn from(payload: crate::h2::Payload) -> Self { fn from(payload: crate::h2::Payload) -> Self {
Payload::H2 { payload } Payload::H2 { payload }
} }
} }
impl<S> From<h2::RecvStream> for Payload<S> { #[cfg(feature = "http2")]
fn from(stream: h2::RecvStream) -> Self { impl<S> From<::h2::RecvStream> for Payload<S> {
fn from(stream: ::h2::RecvStream) -> Self {
Payload::H2 { Payload::H2 {
payload: crate::h2::Payload::new(stream), payload: crate::h2::Payload::new(stream),
} }
@ -71,7 +85,10 @@ where
match self.project() { match self.project() {
PayloadProj::None => Poll::Ready(None), PayloadProj::None => Poll::Ready(None),
PayloadProj::H1 { payload } => Pin::new(payload).poll_next(cx), PayloadProj::H1 { payload } => Pin::new(payload).poll_next(cx),
#[cfg(feature = "http2")]
PayloadProj::H2 { payload } => Pin::new(payload).poll_next(cx), PayloadProj::H2 { payload } => Pin::new(payload).poll_next(cx),
PayloadProj::Stream { payload } => payload.poll_next(cx), PayloadProj::Stream { payload } => payload.poll_next(cx),
} }
} }

View File

@ -130,8 +130,8 @@ impl RequestHead {
} }
} }
/// Request contains `EXPECT` header.
#[inline] #[inline]
/// Request contains `EXPECT` header
pub fn expect(&self) -> bool { pub fn expect(&self) -> bool {
self.flags.contains(Flags::EXPECT) self.flags.contains(Flags::EXPECT)
} }

View File

@ -42,7 +42,7 @@ impl ResponseHead {
&mut self.headers &mut self.headers
} }
/// Sets the flag that controls wether to send headers formatted as Camel-Case. /// Sets the flag that controls whether to send headers formatted as Camel-Case.
/// ///
/// Only applicable to HTTP/1.x responses; HTTP/2 header names are always lowercase. /// Only applicable to HTTP/1.x responses; HTTP/2 header names are always lowercase.
#[inline] #[inline]
@ -210,14 +210,15 @@ mod tests {
use memchr::memmem; use memchr::memmem;
use crate::{ use crate::{
h1::H1Service,
header::{HeaderName, HeaderValue}, header::{HeaderName, HeaderValue},
Error, HttpService, Request, Response, Error, Request, Response, ServiceConfig,
}; };
#[actix_rt::test] #[actix_rt::test]
async fn camel_case_headers() { async fn camel_case_headers() {
let mut srv = actix_http_test::test_server(|| { let mut srv = actix_http_test::test_server(|| {
HttpService::new(|req: Request| async move { H1Service::with_config(ServiceConfig::default(), |req: Request| async move {
let mut res = Response::ok(); let mut res = Response::ok();
if req.path().contains("camel") { if req.path().contains("camel") {
@ -228,6 +229,7 @@ mod tests {
HeaderName::from_static("foo-bar"), HeaderName::from_static("foo-bar"),
HeaderValue::from_static("baz"), HeaderValue::from_static("baz"),
); );
Ok::<_, Error>(res) Ok::<_, Error>(res)
}) })
.tcp() .tcp()
@ -235,9 +237,11 @@ mod tests {
.await; .await;
let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n"); let _ = stream
let mut data = vec![0; 1024]; .write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n")
let _ = stream.read(&mut data); .unwrap();
let mut data = vec![];
let _ = stream.read_to_end(&mut data).unwrap();
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
assert!(memmem::find(&data, b"Foo-Bar").is_some()); assert!(memmem::find(&data, b"Foo-Bar").is_some());
assert!(memmem::find(&data, b"foo-bar").is_none()); assert!(memmem::find(&data, b"foo-bar").is_none());
@ -247,9 +251,11 @@ mod tests {
assert!(memmem::find(&data, b"content-length").is_none()); assert!(memmem::find(&data, b"content-length").is_none());
let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n"); let _ = stream
let mut data = vec![0; 1024]; .write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n")
let _ = stream.read(&mut data); .unwrap();
let mut data = vec![];
let _ = stream.read_to_end(&mut data).unwrap();
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
assert!(memmem::find(&data, b"Foo-Bar").is_none()); assert!(memmem::find(&data, b"Foo-Bar").is_none());
assert!(memmem::find(&data, b"foo-bar").is_some()); assert!(memmem::find(&data, b"foo-bar").is_some());

View File

@ -19,9 +19,8 @@ use pin_project_lite::pin_project;
use crate::{ use crate::{
body::{BoxBody, MessageBody}, body::{BoxBody, MessageBody},
builder::HttpServiceBuilder, builder::HttpServiceBuilder,
config::{KeepAlive, ServiceConfig},
error::DispatchError, error::DispatchError,
h1, h2, ConnectCallback, OnConnectData, Protocol, Request, Response, h1, ConnectCallback, OnConnectData, Protocol, Request, Response, ServiceConfig,
}; };
/// A `ServiceFactory` for HTTP/1.1 or HTTP/2 protocol. /// A `ServiceFactory` for HTTP/1.1 or HTTP/2 protocol.
@ -43,9 +42,9 @@ where
<S::Service as Service<Request>>::Future: 'static, <S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
/// Create builder for `HttpService` instance. /// Constructs builder for `HttpService` instance.
pub fn build() -> HttpServiceBuilder<T, S> { pub fn build() -> HttpServiceBuilder<T, S> {
HttpServiceBuilder::new() HttpServiceBuilder::default()
} }
} }
@ -58,12 +57,10 @@ where
<S::Service as Service<Request>>::Future: 'static, <S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
{ {
/// Create new `HttpService` instance. /// Constructs new `HttpService` instance from service with default config.
pub fn new<F: IntoServiceFactory<S, Request>>(service: F) -> Self { pub fn new<F: IntoServiceFactory<S, Request>>(service: F) -> Self {
let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0, false, None);
HttpService { HttpService {
cfg, cfg: ServiceConfig::default(),
srv: service.into_factory(), srv: service.into_factory(),
expect: h1::ExpectHandler, expect: h1::ExpectHandler,
upgrade: None, upgrade: None,
@ -72,7 +69,7 @@ where
} }
} }
/// Create new `HttpService` instance with config. /// Constructs new `HttpService` instance from config and service.
pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>( pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
cfg: ServiceConfig, cfg: ServiceConfig,
service: F, service: F,
@ -97,11 +94,10 @@ where
<S::Service as Service<Request>>::Future: 'static, <S::Service as Service<Request>>::Future: 'static,
B: MessageBody, B: MessageBody,
{ {
/// Provide service for `EXPECT: 100-Continue` support. /// Sets service for `Expect: 100-Continue` handling.
/// ///
/// Service get called with request that contains `EXPECT` header. /// An expect service is called with requests that contain an `Expect` header. A successful
/// Service must return request in case of success, in that case /// response type is also a request which will be forwarded to the main service.
/// request will be forwarded to main service.
pub fn expect<X1>(self, expect: X1) -> HttpService<T, S, B, X1, U> pub fn expect<X1>(self, expect: X1) -> HttpService<T, S, B, X1, U>
where where
X1: ServiceFactory<Request, Config = (), Response = Request>, X1: ServiceFactory<Request, Config = (), Response = Request>,
@ -118,10 +114,10 @@ where
} }
} }
/// Provide service for custom `Connection: UPGRADE` support. /// Sets service for custom `Connection: Upgrade` handling.
/// ///
/// If service is provided then normal requests handling get halted /// If service is provided then normal requests handling get halted and this service get called
/// and this service get called with original request and framed object. /// with original request and framed object.
pub fn upgrade<U1>(self, upgrade: Option<U1>) -> HttpService<T, S, B, X, U1> pub fn upgrade<U1>(self, upgrade: Option<U1>) -> HttpService<T, S, B, X, U1>
where where
U1: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>, U1: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>,
@ -506,10 +502,11 @@ where
let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref()); let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref());
match proto { match proto {
#[cfg(feature = "http2")]
Protocol::Http2 => HttpServiceHandlerResponse { Protocol::Http2 => HttpServiceHandlerResponse {
state: State::H2Handshake { state: State::H2Handshake {
handshake: Some(( handshake: Some((
h2::handshake_with_timeout(io, &self.cfg), crate::h2::handshake_with_timeout(io, &self.cfg),
self.cfg.clone(), self.cfg.clone(),
self.flow.clone(), self.flow.clone(),
conn_data, conn_data,
@ -518,6 +515,11 @@ where
}, },
}, },
#[cfg(not(feature = "http2"))]
Protocol::Http2 => {
panic!("HTTP/2 support is disabled (enable with the `http2` feature flag)")
}
Protocol::Http1 => HttpServiceHandlerResponse { Protocol::Http1 => HttpServiceHandlerResponse {
state: State::H1 { state: State::H1 {
dispatcher: h1::Dispatcher::new( dispatcher: h1::Dispatcher::new(
@ -535,6 +537,7 @@ where
} }
} }
#[cfg(not(feature = "http2"))]
pin_project! { pin_project! {
#[project = StateProj] #[project = StateProj]
enum State<T, S, B, X, U> enum State<T, S, B, X, U>
@ -556,10 +559,37 @@ pin_project! {
U::Error: fmt::Display, U::Error: fmt::Display,
{ {
H1 { #[pin] dispatcher: h1::Dispatcher<T, S, B, X, U> }, H1 { #[pin] dispatcher: h1::Dispatcher<T, S, B, X, U> },
H2 { #[pin] dispatcher: h2::Dispatcher<T, S, B, X, U> }, }
}
#[cfg(feature = "http2")]
pin_project! {
#[project = StateProj]
enum State<T, S, B, X, U>
where
T: AsyncRead,
T: AsyncWrite,
T: Unpin,
S: Service<Request>,
S::Future: 'static,
S::Error: Into<Response<BoxBody>>,
B: MessageBody,
X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>,
U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
U::Error: fmt::Display,
{
H1 { #[pin] dispatcher: h1::Dispatcher<T, S, B, X, U> },
H2 { #[pin] dispatcher: crate::h2::Dispatcher<T, S, B, X, U> },
H2Handshake { H2Handshake {
handshake: Option<( handshake: Option<(
h2::HandshakeWithTimeout<T>, crate::h2::HandshakeWithTimeout<T>,
ServiceConfig, ServiceConfig,
Rc<HttpFlow<S, X, U>>, Rc<HttpFlow<S, X, U>>,
OnConnectData, OnConnectData,
@ -618,21 +648,25 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().project().state.project() { match self.as_mut().project().state.project() {
StateProj::H1 { dispatcher } => dispatcher.poll(cx), StateProj::H1 { dispatcher } => dispatcher.poll(cx),
#[cfg(feature = "http2")]
StateProj::H2 { dispatcher } => dispatcher.poll(cx), StateProj::H2 { dispatcher } => dispatcher.poll(cx),
#[cfg(feature = "http2")]
StateProj::H2Handshake { handshake: data } => { StateProj::H2Handshake { handshake: data } => {
match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) { match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) {
Ok((conn, timer)) => { Ok((conn, timer)) => {
let (_, config, flow, conn_data, peer_addr) = data.take().unwrap(); let (_, config, flow, conn_data, peer_addr) = data.take().unwrap();
self.as_mut().project().state.set(State::H2 { self.as_mut().project().state.set(State::H2 {
dispatcher: h2::Dispatcher::new( dispatcher: crate::h2::Dispatcher::new(
conn, flow, config, peer_addr, conn_data, timer, conn, flow, config, peer_addr, conn_data, timer,
), ),
}); });
self.poll(cx) self.poll(cx)
} }
Err(err) => { Err(err) => {
trace!("H2 handshake error: {}", err); log::trace!("H2 handshake error: {}", err);
Poll::Ready(Err(err)) Poll::Ready(Err(err))
} }
} }

View File

@ -242,7 +242,7 @@ impl io::Read for TestBuffer {
impl io::Write for TestBuffer { impl io::Write for TestBuffer {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
RefCell::borrow_mut(&self.write_buf).extend(buf); self.write_buf.borrow_mut().extend(buf);
Ok(buf.len()) Ok(buf.len())
} }

View File

@ -3,9 +3,11 @@ use bitflags::bitflags;
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use bytestring::ByteString; use bytestring::ByteString;
use super::frame::Parser; use super::{
use super::proto::{CloseReason, OpCode}; frame::Parser,
use super::ProtocolError; proto::{CloseReason, OpCode},
ProtocolError,
};
/// A WebSocket message. /// A WebSocket message.
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -251,7 +253,7 @@ impl Decoder for Codec {
} }
} }
_ => { _ => {
error!("Unfinished fragment {:?}", opcode); log::error!("Unfinished fragment {:?}", opcode);
Err(ProtocolError::ContinuationFragment(opcode)) Err(ProtocolError::ContinuationFragment(opcode))
} }
}; };

View File

@ -1,6 +1,8 @@
use std::future::Future; use std::{
use std::pin::Pin; future::Future,
use std::task::{Context, Poll}; pin::Pin,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_service::{IntoService, Service}; use actix_service::{IntoService, Service};

View File

@ -3,9 +3,11 @@ use std::convert::TryFrom;
use bytes::{Buf, BufMut, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use log::debug; use log::debug;
use crate::ws::mask::apply_mask; use super::{
use crate::ws::proto::{CloseCode, CloseReason, OpCode}; mask::apply_mask,
use crate::ws::ProtocolError; proto::{CloseCode, CloseReason, OpCode},
ProtocolError,
};
/// A struct representing a WebSocket frame. /// A struct representing a WebSocket frame.
#[derive(Debug)] #[derive(Debug)]

View File

@ -31,7 +31,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World"; Hello World Hello World Hello World Hello World Hello World";
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_v2() { async fn h1_v2() {
let srv = test_server(move || { let srv = test_server(move || {
HttpService::build() HttpService::build()
.finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR))) .finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR)))
@ -59,7 +59,7 @@ async fn test_h1_v2() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_connection_close() { async fn connection_close() {
let srv = test_server(move || { let srv = test_server(move || {
HttpService::build() HttpService::build()
.finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR))) .finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR)))
@ -73,7 +73,7 @@ async fn test_connection_close() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_with_query_parameter() { async fn with_query_parameter() {
let srv = test_server(move || { let srv = test_server(move || {
HttpService::build() HttpService::build()
.finish(|req: Request| async move { .finish(|req: Request| async move {
@ -104,7 +104,7 @@ impl From<ExpectFailed> for Response<BoxBody> {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_expect() { async fn h1_expect() {
let srv = test_server(move || { let srv = test_server(move || {
HttpService::build() HttpService::build()
.expect(|req: Request| async { .expect(|req: Request| async {

View File

@ -1,4 +1,4 @@
use std::io; use std::{io, time::Duration};
use actix_http::{error::Error, HttpService, Response}; use actix_http::{error::Error, HttpService, Response};
use actix_server::Server; use actix_server::Server;
@ -19,7 +19,7 @@ async fn h2_ping_pong() -> io::Result<()> {
.workers(1) .workers(1)
.listen("h2_ping_pong", lst, || { .listen("h2_ping_pong", lst, || {
HttpService::build() HttpService::build()
.keep_alive(3) .keep_alive(Duration::from_secs(3))
.h2(|_| async { Ok::<_, Error>(Response::ok()) }) .h2(|_| async { Ok::<_, Error>(Response::ok()) })
.tcp() .tcp()
})? })?
@ -92,10 +92,10 @@ async fn h2_handshake_timeout() -> io::Result<()> {
.workers(1) .workers(1)
.listen("h2_ping_pong", lst, || { .listen("h2_ping_pong", lst, || {
HttpService::build() HttpService::build()
.keep_alive(30) .keep_alive(Duration::from_secs(30))
// set first request timeout to 5 seconds. // set first request timeout to 5 seconds.
// this is the timeout used for http2 handshake. // this is the timeout used for http2 handshake.
.client_timeout(5000) .client_request_timeout(Duration::from_secs(5))
.h2(|_| async { Ok::<_, Error>(Response::ok()) }) .h2(|_| async { Ok::<_, Error>(Response::ok()) })
.tcp() .tcp()
})? })?

View File

@ -2,7 +2,7 @@ use std::{
convert::Infallible, convert::Infallible,
io::{Read, Write}, io::{Read, Write},
net, thread, net, thread,
time::Duration, time::{Duration, Instant},
}; };
use actix_http::{ use actix_http::{
@ -22,12 +22,12 @@ use futures_util::{
use regex::Regex; use regex::Regex;
#[actix_rt::test] #[actix_rt::test]
async fn test_h1() { async fn h1_basic() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.keep_alive(KeepAlive::Disabled) .keep_alive(KeepAlive::Disabled)
.client_timeout(1000) .client_request_timeout(Duration::from_secs(1))
.client_disconnect(1000) .client_disconnect_timeout(Duration::from_secs(1))
.h1(|req: Request| { .h1(|req: Request| {
assert!(req.peer_addr().is_some()); assert!(req.peer_addr().is_some());
ok::<_, Infallible>(Response::ok()) ok::<_, Infallible>(Response::ok())
@ -43,12 +43,12 @@ async fn test_h1() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_2() { async fn h1_2() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.keep_alive(KeepAlive::Disabled) .keep_alive(KeepAlive::Disabled)
.client_timeout(1000) .client_request_timeout(Duration::from_secs(1))
.client_disconnect(1000) .client_disconnect_timeout(Duration::from_secs(1))
.finish(|req: Request| { .finish(|req: Request| {
assert!(req.peer_addr().is_some()); assert!(req.peer_addr().is_some());
assert_eq!(req.version(), http::Version::HTTP_11); assert_eq!(req.version(), http::Version::HTTP_11);
@ -75,7 +75,7 @@ impl From<ExpectFailed> for Response<BoxBody> {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_expect_continue() { async fn expect_continue() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.expect(fn_service(|req: Request| { .expect(fn_service(|req: Request| {
@ -106,7 +106,7 @@ async fn test_expect_continue() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_expect_continue_h1() { async fn expect_continue_h1() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.expect(fn_service(|req: Request| { .expect(fn_service(|req: Request| {
@ -139,7 +139,7 @@ async fn test_expect_continue_h1() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_chunked_payload() { async fn chunked_payload() {
let chunk_sizes = vec![32768, 32, 32768]; let chunk_sizes = vec![32768, 32, 32768];
let total_size: usize = chunk_sizes.iter().sum(); let total_size: usize = chunk_sizes.iter().sum();
@ -197,26 +197,43 @@ async fn test_chunked_payload() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_slow_request() { async fn slow_request_408() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.client_timeout(100) .client_request_timeout(Duration::from_millis(200))
.keep_alive(Duration::from_secs(2))
.finish(|_| ok::<_, Infallible>(Response::ok())) .finish(|_| ok::<_, Infallible>(Response::ok()))
.tcp() .tcp()
}) })
.await; .await;
let start = Instant::now();
let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n"); let _ = stream.write_all(b"GET /test HTTP/1.1\r\n");
let mut data = String::new(); let mut data = String::new();
let _ = stream.read_to_string(&mut data); let _ = stream.read_to_string(&mut data);
assert!(data.starts_with("HTTP/1.1 408 Request Timeout")); assert!(
data.starts_with("HTTP/1.1 408 Request Timeout"),
"response was not 408: {}",
data
);
let diff = start.elapsed();
if diff < Duration::from_secs(1) {
// test success
} else if diff < Duration::from_secs(3) {
panic!("request seems to have wrongly timed-out according to keep-alive");
} else {
panic!("request took way too long to time out");
}
srv.stop().await; srv.stop().await;
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_http1_malformed_request() { async fn http1_malformed_request() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok())) .h1(|_| ok::<_, Infallible>(Response::ok()))
@ -234,7 +251,7 @@ async fn test_http1_malformed_request() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_http1_keepalive() { async fn http1_keepalive() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok())) .h1(|_| ok::<_, Infallible>(Response::ok()))
@ -257,23 +274,25 @@ async fn test_http1_keepalive() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_http1_keepalive_timeout() { async fn http1_keepalive_timeout() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.keep_alive(1) .keep_alive(Duration::from_secs(1))
.h1(|_| ok::<_, Infallible>(Response::ok())) .h1(|_| ok::<_, Infallible>(Response::ok()))
.tcp() .tcp()
}) })
.await; .await;
let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n");
let mut data = vec![0; 1024]; let _ = stream.write_all(b"GET /test HTTP/1.1\r\n\r\n");
let mut data = vec![0; 256];
let _ = stream.read(&mut data); let _ = stream.read(&mut data);
assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n");
thread::sleep(Duration::from_millis(1100)); thread::sleep(Duration::from_millis(1100));
let mut data = vec![0; 1024]; let mut data = vec![0; 256];
let res = stream.read(&mut data).unwrap(); let res = stream.read(&mut data).unwrap();
assert_eq!(res, 0); assert_eq!(res, 0);
@ -281,7 +300,7 @@ async fn test_http1_keepalive_timeout() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_http1_keepalive_close() { async fn http1_keepalive_close() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok())) .h1(|_| ok::<_, Infallible>(Response::ok()))
@ -303,7 +322,7 @@ async fn test_http1_keepalive_close() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_http10_keepalive_default_close() { async fn http10_keepalive_default_close() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok())) .h1(|_| ok::<_, Infallible>(Response::ok()))
@ -325,7 +344,7 @@ async fn test_http10_keepalive_default_close() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_http10_keepalive() { async fn http10_keepalive() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok())) .h1(|_| ok::<_, Infallible>(Response::ok()))
@ -354,7 +373,7 @@ async fn test_http10_keepalive() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_http1_keepalive_disabled() { async fn http1_keepalive_disabled() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.keep_alive(KeepAlive::Disabled) .keep_alive(KeepAlive::Disabled)
@ -377,7 +396,7 @@ async fn test_http1_keepalive_disabled() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_content_length() { async fn content_length() {
use actix_http::{ use actix_http::{
header::{HeaderName, HeaderValue}, header::{HeaderName, HeaderValue},
StatusCode, StatusCode,
@ -426,7 +445,7 @@ async fn test_content_length() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_headers() { async fn h1_headers() {
let data = STR.repeat(10); let data = STR.repeat(10);
let data2 = data.clone(); let data2 = data.clone();
@ -492,7 +511,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \
Hello World Hello World Hello World Hello World Hello World"; Hello World Hello World Hello World Hello World Hello World";
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_body() { async fn h1_body() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
@ -511,7 +530,7 @@ async fn test_h1_body() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_head_empty() { async fn h1_head_empty() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
@ -538,7 +557,7 @@ async fn test_h1_head_empty() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_head_binary() { async fn h1_head_binary() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
@ -565,7 +584,7 @@ async fn test_h1_head_binary() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_head_binary2() { async fn h1_head_binary2() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR)))
@ -588,7 +607,7 @@ async fn test_h1_head_binary2() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_body_length() { async fn h1_body_length() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| { .h1(|_| {
@ -612,7 +631,7 @@ async fn test_h1_body_length() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_body_chunked_explicit() { async fn h1_body_chunked_explicit() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| { .h1(|_| {
@ -649,7 +668,7 @@ async fn test_h1_body_chunked_explicit() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_body_chunked_implicit() { async fn h1_body_chunked_implicit() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| { .h1(|_| {
@ -680,7 +699,7 @@ async fn test_h1_body_chunked_implicit() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_response_http_error_handling() { async fn h1_response_http_error_handling() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(fn_service(|_| { .h1(fn_service(|_| {
@ -719,7 +738,7 @@ impl From<BadRequest> for Response<BoxBody> {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_service_error() { async fn h1_service_error() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| err::<Response<()>, _>(BadRequest)) .h1(|_| err::<Response<()>, _>(BadRequest))
@ -738,7 +757,7 @@ async fn test_h1_service_error() {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_h1_on_connect() { async fn h1_on_connect() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.on_connect_ext(|_, data| { .on_connect_ext(|_, data| {
@ -761,7 +780,7 @@ async fn test_h1_on_connect() {
/// Tests compliance with 304 Not Modified spec in RFC 7232 §4.1. /// Tests compliance with 304 Not Modified spec in RFC 7232 §4.1.
/// https://datatracker.ietf.org/doc/html/rfc7232#section-4.1 /// https://datatracker.ietf.org/doc/html/rfc7232#section-4.1
#[actix_rt::test] #[actix_rt::test]
async fn test_not_modified_spec_h1() { async fn not_modified_spec_h1() {
// TODO: this test needing a few seconds to complete reveals some weirdness with either the // TODO: this test needing a few seconds to complete reveals some weirdness with either the
// dispatcher or the client, though similar hangs occur on other tests in this file, only // dispatcher or the client, though similar hangs occur on other tests in this file, only
// succeeding, it seems, because of the keepalive timer // succeeding, it seems, because of the keepalive timer

View File

@ -109,7 +109,7 @@ async fn service(msg: Frame) -> Result<Message, Error> {
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_simple() { async fn simple() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.upgrade(fn_factory(|| async { .upgrade(fn_factory(|| async {

View File

@ -6,8 +6,10 @@
- Change signature of `ResourceDef::capture_match_info_fn` to remove `user_data` parameter. [#2612] - Change signature of `ResourceDef::capture_match_info_fn` to remove `user_data` parameter. [#2612]
- Replace `Option<U>` with `U` in `Router` api. [#2612] - Replace `Option<U>` with `U` in `Router` api. [#2612]
- Relax bounds in `Router::recognize*` and `ResourceDef::capture_match_info`. [#2612] - Relax bounds in `Router::recognize*` and `ResourceDef::capture_match_info`. [#2612]
- `Quoter::requote` now returns `Option<Vec<u8>>`. [#2613]
[#2612]: https://github.com/actix/actix-web/pull/2612 [#2612]: https://github.com/actix/actix-web/pull/2612
[#2613]: https://github.com/actix/actix-web/pull/2613
## 0.5.0-rc.2 - 2022-01-21 ## 0.5.0-rc.2 - 2022-01-21

View File

@ -52,7 +52,7 @@ macro_rules! parse_value {
V: Visitor<'de>, V: Visitor<'de>,
{ {
let decoded = FULL_QUOTER let decoded = FULL_QUOTER
.with(|q| q.requote(self.value.as_bytes())) .with(|q| q.requote_str_lossy(self.value))
.map(Cow::Owned) .map(Cow::Owned)
.unwrap_or(Cow::Borrowed(self.value)); .unwrap_or(Cow::Borrowed(self.value));
@ -332,7 +332,7 @@ impl<'de> Deserializer<'de> for Value<'de> {
where where
V: Visitor<'de>, V: Visitor<'de>,
{ {
match FULL_QUOTER.with(|q| q.requote(self.value.as_bytes())) { match FULL_QUOTER.with(|q| q.requote_str_lossy(self.value)) {
Some(s) => visitor.visit_string(s), Some(s) => visitor.visit_string(s),
None => visitor.visit_borrowed_str(self.value), None => visitor.visit_borrowed_str(self.value),
} }
@ -342,7 +342,7 @@ impl<'de> Deserializer<'de> for Value<'de> {
where where
V: Visitor<'de>, V: Visitor<'de>,
{ {
match FULL_QUOTER.with(|q| q.requote(self.value.as_bytes())) { match FULL_QUOTER.with(|q| q.requote_str_lossy(self.value)) {
Some(s) => visitor.visit_byte_buf(s.into()), Some(s) => visitor.visit_byte_buf(s.into()),
None => visitor.visit_borrowed_bytes(self.value.as_bytes()), None => visitor.visit_borrowed_bytes(self.value.as_bytes()),
} }

View File

@ -66,8 +66,13 @@ impl Quoter {
/// Re-quotes... ? /// Re-quotes... ?
/// ///
/// Returns `None` when no modification to the original string was required. /// Returns `None` when no modification to the original byte string was required.
pub fn requote(&self, val: &[u8]) -> Option<String> { ///
/// Non-ASCII bytes are accepted as valid input.
///
/// Behavior for invalid/incomplete percent-encoding sequences is unspecified and may include removing
/// the invalid sequence from the output or passing it as it is.
pub fn requote(&self, val: &[u8]) -> Option<Vec<u8>> {
let mut has_pct = 0; let mut has_pct = 0;
let mut pct = [b'%', 0, 0]; let mut pct = [b'%', 0, 0];
let mut idx = 0; let mut idx = 0;
@ -121,7 +126,12 @@ impl Quoter {
idx += 1; idx += 1;
} }
cloned.map(|data| String::from_utf8_lossy(&data).into_owned()) cloned
}
pub(crate) fn requote_str_lossy(&self, val: &str) -> Option<String> {
self.requote(val.as_bytes())
.map(|data| String::from_utf8_lossy(&data).into_owned())
} }
} }
@ -201,14 +211,29 @@ mod tests {
#[test] #[test]
fn custom_quoter() { fn custom_quoter() {
let q = Quoter::new(b"", b"+"); let q = Quoter::new(b"", b"+");
assert_eq!(q.requote(b"/a%25c").unwrap(), "/a%c"); assert_eq!(q.requote(b"/a%25c").unwrap(), b"/a%c");
assert_eq!(q.requote(b"/a%2Bc").unwrap(), "/a%2Bc"); assert_eq!(q.requote(b"/a%2Bc").unwrap(), b"/a%2Bc");
let q = Quoter::new(b"%+", b"/"); let q = Quoter::new(b"%+", b"/");
assert_eq!(q.requote(b"/a%25b%2Bc").unwrap(), "/a%b+c"); assert_eq!(q.requote(b"/a%25b%2Bc").unwrap(), b"/a%b+c");
assert_eq!(q.requote(b"/a%2fb").unwrap(), "/a%2fb"); assert_eq!(q.requote(b"/a%2fb").unwrap(), b"/a%2fb");
assert_eq!(q.requote(b"/a%2Fb").unwrap(), "/a%2Fb"); assert_eq!(q.requote(b"/a%2Fb").unwrap(), b"/a%2Fb");
assert_eq!(q.requote(b"/a%0Ab").unwrap(), "/a\nb"); assert_eq!(q.requote(b"/a%0Ab").unwrap(), b"/a\nb");
assert_eq!(q.requote(b"/a%FE\xffb").unwrap(), b"/a\xfe\xffb");
assert_eq!(q.requote(b"/a\xfe\xffb"), None);
}
#[test]
fn non_ascii() {
let q = Quoter::new(b"%+", b"/");
assert_eq!(q.requote(b"/a%FE\xffb").unwrap(), b"/a\xfe\xffb");
assert_eq!(q.requote(b"/a\xfe\xffb"), None);
}
#[test]
fn invalid_sequences() {
let q = Quoter::new(b"%+", b"/");
assert_eq!(q.requote(b"/a%2x%2X%%").unwrap(), b"/a%2x%2X");
} }
#[test] #[test]

View File

@ -15,14 +15,14 @@ pub struct Url {
impl Url { impl Url {
#[inline] #[inline]
pub fn new(uri: http::Uri) -> Url { pub fn new(uri: http::Uri) -> Url {
let path = DEFAULT_QUOTER.with(|q| q.requote(uri.path().as_bytes())); let path = DEFAULT_QUOTER.with(|q| q.requote_str_lossy(uri.path()));
Url { uri, path } Url { uri, path }
} }
#[inline] #[inline]
pub fn new_with_quoter(uri: http::Uri, quoter: &Quoter) -> Url { pub fn new_with_quoter(uri: http::Uri, quoter: &Quoter) -> Url {
Url { Url {
path: quoter.requote(uri.path().as_bytes()), path: quoter.requote_str_lossy(uri.path()),
uri, uri,
} }
} }
@ -45,13 +45,13 @@ impl Url {
#[inline] #[inline]
pub fn update(&mut self, uri: &http::Uri) { pub fn update(&mut self, uri: &http::Uri) {
self.uri = uri.clone(); self.uri = uri.clone();
self.path = DEFAULT_QUOTER.with(|q| q.requote(uri.path().as_bytes())); self.path = DEFAULT_QUOTER.with(|q| q.requote_str_lossy(uri.path()));
} }
#[inline] #[inline]
pub fn update_with_quoter(&mut self, uri: &http::Uri, quoter: &Quoter) { pub fn update_with_quoter(&mut self, uri: &http::Uri, quoter: &Quoter) {
self.uri = uri.clone(); self.uri = uri.clone();
self.path = quoter.requote(uri.path().as_bytes()); self.path = quoter.requote_str_lossy(uri.path());
} }
} }

View File

@ -1,6 +1,9 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
- Rename `TestServerConfig::{client_timeout => client_request_timeout}`. [#2611]
[#2611]: https://github.com/actix/actix-web/pull/2611
## 0.1.0-beta.11 - 2022-01-04 ## 0.1.0-beta.11 - 2022-01-04

View File

@ -149,7 +149,7 @@ where
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();
let factory = factory.clone(); let factory = factory.clone();
let srv_cfg = cfg.clone(); let srv_cfg = cfg.clone();
let timeout = cfg.client_timeout; let timeout = cfg.client_request_timeout;
let builder = Server::build().workers(1).disable_signals().system_exit(); let builder = Server::build().workers(1).disable_signals().system_exit();
@ -167,7 +167,7 @@ where
.map_err(|err| err.into().error_response()); .map_err(|err| err.into().error_response());
HttpService::build() HttpService::build()
.client_timeout(timeout) .client_request_timeout(timeout)
.h1(map_config(fac, move |_| app_cfg.clone())) .h1(map_config(fac, move |_| app_cfg.clone()))
.tcp() .tcp()
}), }),
@ -183,7 +183,7 @@ where
.map_err(|err| err.into().error_response()); .map_err(|err| err.into().error_response());
HttpService::build() HttpService::build()
.client_timeout(timeout) .client_request_timeout(timeout)
.h2(map_config(fac, move |_| app_cfg.clone())) .h2(map_config(fac, move |_| app_cfg.clone()))
.tcp() .tcp()
}), }),
@ -199,7 +199,7 @@ where
.map_err(|err| err.into().error_response()); .map_err(|err| err.into().error_response());
HttpService::build() HttpService::build()
.client_timeout(timeout) .client_request_timeout(timeout)
.finish(map_config(fac, move |_| app_cfg.clone())) .finish(map_config(fac, move |_| app_cfg.clone()))
.tcp() .tcp()
}), }),
@ -218,7 +218,7 @@ where
.map_err(|err| err.into().error_response()); .map_err(|err| err.into().error_response());
HttpService::build() HttpService::build()
.client_timeout(timeout) .client_request_timeout(timeout)
.h1(map_config(fac, move |_| app_cfg.clone())) .h1(map_config(fac, move |_| app_cfg.clone()))
.openssl(acceptor.clone()) .openssl(acceptor.clone())
}), }),
@ -234,7 +234,7 @@ where
.map_err(|err| err.into().error_response()); .map_err(|err| err.into().error_response());
HttpService::build() HttpService::build()
.client_timeout(timeout) .client_request_timeout(timeout)
.h2(map_config(fac, move |_| app_cfg.clone())) .h2(map_config(fac, move |_| app_cfg.clone()))
.openssl(acceptor.clone()) .openssl(acceptor.clone())
}), }),
@ -250,7 +250,7 @@ where
.map_err(|err| err.into().error_response()); .map_err(|err| err.into().error_response());
HttpService::build() HttpService::build()
.client_timeout(timeout) .client_request_timeout(timeout)
.finish(map_config(fac, move |_| app_cfg.clone())) .finish(map_config(fac, move |_| app_cfg.clone()))
.openssl(acceptor.clone()) .openssl(acceptor.clone())
}), }),
@ -269,7 +269,7 @@ where
.map_err(|err| err.into().error_response()); .map_err(|err| err.into().error_response());
HttpService::build() HttpService::build()
.client_timeout(timeout) .client_request_timeout(timeout)
.h1(map_config(fac, move |_| app_cfg.clone())) .h1(map_config(fac, move |_| app_cfg.clone()))
.rustls(config.clone()) .rustls(config.clone())
}), }),
@ -285,7 +285,7 @@ where
.map_err(|err| err.into().error_response()); .map_err(|err| err.into().error_response());
HttpService::build() HttpService::build()
.client_timeout(timeout) .client_request_timeout(timeout)
.h2(map_config(fac, move |_| app_cfg.clone())) .h2(map_config(fac, move |_| app_cfg.clone()))
.rustls(config.clone()) .rustls(config.clone())
}), }),
@ -301,7 +301,7 @@ where
.map_err(|err| err.into().error_response()); .map_err(|err| err.into().error_response());
HttpService::build() HttpService::build()
.client_timeout(timeout) .client_request_timeout(timeout)
.finish(map_config(fac, move |_| app_cfg.clone())) .finish(map_config(fac, move |_| app_cfg.clone()))
.rustls(config.clone()) .rustls(config.clone())
}), }),
@ -388,7 +388,7 @@ pub fn config() -> TestServerConfig {
pub struct TestServerConfig { pub struct TestServerConfig {
tp: HttpVer, tp: HttpVer,
stream: StreamType, stream: StreamType,
client_timeout: u64, client_request_timeout: Duration,
} }
impl Default for TestServerConfig { impl Default for TestServerConfig {
@ -403,7 +403,7 @@ impl TestServerConfig {
TestServerConfig { TestServerConfig {
tp: HttpVer::Both, tp: HttpVer::Both,
stream: StreamType::Tcp, stream: StreamType::Tcp,
client_timeout: 5000, client_request_timeout: Duration::from_secs(5),
} }
} }
@ -433,9 +433,9 @@ impl TestServerConfig {
self self
} }
/// Set client timeout in milliseconds for first request. /// Set client timeout for first request.
pub fn client_timeout(mut self, val: u64) -> Self { pub fn client_request_timeout(mut self, dur: Duration) -> Self {
self.client_timeout = val; self.client_request_timeout = dur;
self self
} }
} }

View File

@ -60,7 +60,7 @@ dangerous-h2c = []
[dependencies] [dependencies]
actix-codec = "0.4.1" actix-codec = "0.4.1"
actix-service = "2.0.0" actix-service = "2.0.0"
actix-http = "3.0.0-beta.19" actix-http = { version = "3.0.0-beta.19", features = ["http2", "ws"] }
actix-rt = { version = "2.1", default-features = false } actix-rt = { version = "2.1", default-features = false }
actix-tls = { version = "3.0.0", features = ["connect", "uri"] } actix-tls = { version = "3.0.0", features = ["connect", "uri"] }
actix-utils = "3.0.0" actix-utils = "3.0.0"

View File

@ -70,7 +70,7 @@ where
let is_expect = if head.as_ref().headers.contains_key(EXPECT) { let is_expect = if head.as_ref().headers.contains_key(EXPECT) {
match body.size() { match body.size() {
BodySize::None | BodySize::Sized(0) => { BodySize::None | BodySize::Sized(0) => {
let keep_alive = framed.codec_ref().keepalive(); let keep_alive = framed.codec_ref().keep_alive();
framed.io_mut().on_release(keep_alive); framed.io_mut().on_release(keep_alive);
// TODO: use a new variant or a new type better describing error violate // TODO: use a new variant or a new type better describing error violate
@ -119,7 +119,7 @@ where
match pin_framed.codec_ref().message_type() { match pin_framed.codec_ref().message_type() {
h1::MessageType::None => { h1::MessageType::None => {
let keep_alive = pin_framed.codec_ref().keepalive(); let keep_alive = pin_framed.codec_ref().keep_alive();
pin_framed.io_mut().on_release(keep_alive); pin_framed.io_mut().on_release(keep_alive);
Ok((head, Payload::None)) Ok((head, Payload::None))
@ -223,7 +223,7 @@ impl<Io: ConnectionIo> Stream for PlStream<Io> {
match ready!(this.framed.as_mut().next_item(cx)?) { match ready!(this.framed.as_mut().next_item(cx)?) {
Some(Some(chunk)) => Poll::Ready(Some(Ok(chunk))), Some(Some(chunk)) => Poll::Ready(Some(Ok(chunk))),
Some(None) => { Some(None) => {
let keep_alive = this.framed.codec_ref().keepalive(); let keep_alive = this.framed.codec_ref().keep_alive();
this.framed.io_mut().on_release(keep_alive); this.framed.io_mut().on_release(keep_alive);
Poll::Ready(None) Poll::Ready(None)
} }

View File

@ -236,10 +236,14 @@ where
self self
} }
/// Default service to be used if no matching resource could be found. /// Default service that is invoked when no matching resource could be found.
/// ///
/// It is possible to use services like `Resource`, `Route`. /// You can use a [`Route`] as default service.
/// ///
/// If a default service is not registered, an empty `404 Not Found` response will be sent to
/// the client instead.
///
/// # Examples
/// ``` /// ```
/// use actix_web::{web, App, HttpResponse}; /// use actix_web::{web, App, HttpResponse};
/// ///
@ -248,23 +252,8 @@ where
/// } /// }
/// ///
/// let app = App::new() /// let app = App::new()
/// .service( /// .service(web::resource("/index.html").route(web::get().to(index)))
/// web::resource("/index.html").route(web::get().to(index))) /// .default_service(web::to(|| HttpResponse::NotFound()));
/// .default_service(
/// web::route().to(|| HttpResponse::NotFound()));
/// ```
///
/// It is also possible to use static files as default service.
///
/// ```
/// use actix_web::{web, App, HttpResponse};
///
/// let app = App::new()
/// .service(
/// web::resource("/index.html").to(|| HttpResponse::Ok()))
/// .default_service(
/// web::to(|| HttpResponse::NotFound())
/// );
/// ``` /// ```
pub fn default_service<F, U>(mut self, svc: F) -> Self pub fn default_service<F, U>(mut self, svc: F) -> Self
where where

View File

@ -72,7 +72,7 @@ where
}))) })))
}); });
// App config // create App config to pass to child services
let mut config = AppService::new(config, default.clone()); let mut config = AppService::new(config, default.clone());
// register services // register services

View File

@ -313,8 +313,12 @@ where
} }
/// Default service to be used if no matching route could be found. /// Default service to be used if no matching route could be found.
/// By default *405* response get returned. Resource does not use ///
/// default handler from `App` or `Scope`. /// You can use a [`Route`] as default service.
///
/// If a default service is not registered, an empty `405 Method Not Allowed` response will be
/// sent to the client instead. Unlike [`Scope`](crate::Scope)s, a [`Resource`] does **not**
/// inherit its parent's default service.
pub fn default_service<F, U>(mut self, f: F) -> Self pub fn default_service<F, U>(mut self, f: F) -> Self
where where
F: IntoServiceFactory<U, ServiceRequest>, F: IntoServiceFactory<U, ServiceRequest>,

View File

@ -262,9 +262,10 @@ where
) )
} }
/// Default service to be used if no matching route could be found. /// Default service to be used if no matching resource could be found.
/// ///
/// If default resource is not registered, app's default resource is being used. /// If a default service is not registered, it will fall back to the default service of
/// the parent [`App`](crate::App) (see [`App::default_service`](crate::App::default_service)).
pub fn default_service<F, U>(mut self, f: F) -> Self pub fn default_service<F, U>(mut self, f: F) -> Self
where where
F: IntoServiceFactory<U, ServiceRequest>, F: IntoServiceFactory<U, ServiceRequest>,

View File

@ -4,6 +4,7 @@ use std::{
marker::PhantomData, marker::PhantomData,
net, net,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
time::Duration,
}; };
use actix_http::{body::MessageBody, Extensions, HttpService, KeepAlive, Request, Response}; use actix_http::{body::MessageBody, Extensions, HttpService, KeepAlive, Request, Response};
@ -27,8 +28,8 @@ struct Socket {
struct Config { struct Config {
host: Option<String>, host: Option<String>,
keep_alive: KeepAlive, keep_alive: KeepAlive,
client_timeout: u64, client_request_timeout: Duration,
client_shutdown: u64, client_disconnect_timeout: Duration,
} }
/// An HTTP Server. /// An HTTP Server.
@ -88,9 +89,9 @@ where
factory, factory,
config: Arc::new(Mutex::new(Config { config: Arc::new(Mutex::new(Config {
host: None, host: None,
keep_alive: KeepAlive::Timeout(5), keep_alive: KeepAlive::default(),
client_timeout: 5000, client_request_timeout: Duration::from_secs(5),
client_shutdown: 5000, client_disconnect_timeout: Duration::from_secs(1),
})), })),
backlog: 1024, backlog: 1024,
sockets: Vec::new(), sockets: Vec::new(),
@ -200,11 +201,17 @@ where
/// To disable timeout set value to 0. /// To disable timeout set value to 0.
/// ///
/// By default client timeout is set to 5000 milliseconds. /// By default client timeout is set to 5000 milliseconds.
pub fn client_timeout(self, val: u64) -> Self { pub fn client_request_timeout(self, dur: Duration) -> Self {
self.config.lock().unwrap().client_timeout = val; self.config.lock().unwrap().client_request_timeout = dur;
self self
} }
#[doc(hidden)]
#[deprecated(since = "4.0.0", note = "Renamed to `client_request_timeout`.")]
pub fn client_timeout(self, dur: Duration) -> Self {
self.client_request_timeout(dur)
}
/// Set server connection shutdown timeout in milliseconds. /// Set server connection shutdown timeout in milliseconds.
/// ///
/// Defines a timeout for shutdown connection. If a shutdown procedure does not complete /// Defines a timeout for shutdown connection. If a shutdown procedure does not complete
@ -213,11 +220,17 @@ where
/// To disable timeout set value to 0. /// To disable timeout set value to 0.
/// ///
/// By default client timeout is set to 5000 milliseconds. /// By default client timeout is set to 5000 milliseconds.
pub fn client_shutdown(self, val: u64) -> Self { pub fn client_disconnect_timeout(self, dur: Duration) -> Self {
self.config.lock().unwrap().client_shutdown = val; self.config.lock().unwrap().client_disconnect_timeout = dur;
self self
} }
#[doc(hidden)]
#[deprecated(since = "4.0.0", note = "Renamed to `client_request_timeout`.")]
pub fn client_shutdown(self, dur: u64) -> Self {
self.client_disconnect_timeout(Duration::from_millis(dur))
}
/// Set server host name. /// Set server host name.
/// ///
/// Host name is used by application router as a hostname for url generation. /// Host name is used by application router as a hostname for url generation.
@ -291,8 +304,8 @@ where
let mut svc = HttpService::build() let mut svc = HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_request_timeout(c.client_request_timeout)
.client_disconnect(c.client_shutdown) .client_disconnect_timeout(c.client_disconnect_timeout)
.local_addr(addr); .local_addr(addr);
if let Some(handler) = on_connect_fn.clone() { if let Some(handler) = on_connect_fn.clone() {
@ -349,8 +362,8 @@ where
let svc = HttpService::build() let svc = HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_request_timeout(c.client_request_timeout)
.client_disconnect(c.client_shutdown) .client_disconnect_timeout(c.client_disconnect_timeout)
.local_addr(addr); .local_addr(addr);
let svc = if let Some(handler) = on_connect_fn.clone() { let svc = if let Some(handler) = on_connect_fn.clone() {
@ -410,8 +423,8 @@ where
let svc = HttpService::build() let svc = HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_request_timeout(c.client_request_timeout)
.client_disconnect(c.client_shutdown); .client_disconnect_timeout(c.client_disconnect_timeout);
let svc = if let Some(handler) = on_connect_fn.clone() { let svc = if let Some(handler) = on_connect_fn.clone() {
svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext)) svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext))
@ -537,8 +550,8 @@ where
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then({ fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then({
let mut svc = HttpService::build() let mut svc = HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_request_timeout(c.client_request_timeout)
.client_disconnect(c.client_shutdown); .client_disconnect_timeout(c.client_disconnect_timeout);
if let Some(handler) = on_connect_fn.clone() { if let Some(handler) = on_connect_fn.clone() {
svc = svc svc = svc
@ -593,8 +606,8 @@ where
fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then( fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then(
HttpService::build() HttpService::build()
.keep_alive(c.keep_alive) .keep_alive(c.keep_alive)
.client_timeout(c.client_timeout) .client_request_timeout(c.client_request_timeout)
.client_disconnect(c.client_shutdown) .client_disconnect_timeout(c.client_disconnect_timeout)
.finish(map_config(fac, move |_| config.clone())), .finish(map_config(fac, move |_| config.clone())),
) )
}, },

View File

@ -26,9 +26,9 @@ async fn test_start() {
.backlog(1) .backlog(1)
.max_connections(10) .max_connections(10)
.max_connection_rate(10) .max_connection_rate(10)
.keep_alive(10) .keep_alive(Duration::from_secs(10))
.client_timeout(5000) .client_request_timeout(Duration::from_secs(5))
.client_shutdown(0) .client_disconnect_timeout(Duration::ZERO)
.server_hostname("localhost") .server_hostname("localhost")
.system_exit() .system_exit()
.disable_signals() .disable_signals()

View File

@ -8,6 +8,7 @@ use std::{
io::{Read, Write}, io::{Read, Write},
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
use actix_web::{ use actix_web::{
@ -835,9 +836,10 @@ async fn test_server_cookies() {
async fn test_slow_request() { async fn test_slow_request() {
use std::net; use std::net;
let srv = actix_test::start_with(actix_test::config().client_timeout(200), || { let srv = actix_test::start_with(
App::new().service(web::resource("/").route(web::to(HttpResponse::Ok))) actix_test::config().client_request_timeout(Duration::from_millis(200)),
}); || App::new().service(web::resource("/").route(web::to(HttpResponse::Ok))),
);
let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); let mut stream = net::TcpStream::connect(srv.addr()).unwrap();
let mut data = String::new(); let mut data = String::new();