diff --git a/.github/workflows/clippy-fmt.yml b/.github/workflows/clippy-fmt.yml index 9fcb0a561..bc2cec145 100644 --- a/.github/workflows/clippy-fmt.yml +++ b/.github/workflows/clippy-fmt.yml @@ -46,3 +46,21 @@ jobs: with: token: ${{ secrets.GITHUB_TOKEN }} args: --workspace --tests --examples --all-features + + lint-docs: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + profile: minimal + components: rust-docs + - name: Check for broken intra-doc links + uses: actions-rs/cargo@v1 + env: + RUSTDOCFLAGS: "-D warnings" + with: + command: doc + args: --no-deps --all-features --workspace diff --git a/CHANGES.md b/CHANGES.md index 8c3997663..c00bc7198 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,10 +1,15 @@ # Changes ## Unreleased - 2021-xx-xx +### Changed +- Rename `HttpServer::{client_timeout => client_request_timeout}`. [#2611] +- Rename `HttpServer::{client_shutdown => client_disconnect_timeout}`. [#2611] + ### Removed - `impl Future for HttpResponse`. [#2601] [#2601]: https://github.com/actix/actix-web/pull/2601 +[#2611]: https://github.com/actix/actix-web/pull/2611 ## 4.0.0-beta.21 - 2022-01-21 diff --git a/Cargo.toml b/Cargo.toml index 99ff85e8d..38c8512bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,7 @@ actix-service = "2.0.0" actix-utils = "3.0.0" actix-tls = { version = "3.0.0", default-features = false, optional = true } -actix-http = "3.0.0-beta.19" +actix-http = { version = "3.0.0-beta.19", features = ["http2", "ws"] } actix-router = "0.5.0-rc.2" actix-web-codegen = "0.5.0-rc.1" diff --git a/actix-files/src/named.rs b/actix-files/src/named.rs index 14495e660..baf9b5531 100644 --- a/actix-files/src/named.rs +++ b/actix-files/src/named.rs @@ -209,6 +209,7 @@ impl NamedFile { Self::from_file(file, path) } + #[allow(rustdoc::broken_intra_doc_links)] /// Attempts to open a file asynchronously in read-only mode. /// /// When the `experimental-io-uring` crate feature is enabled, this will be async. @@ -298,9 +299,11 @@ impl NamedFile { self } - /// Set content encoding for serving this file + /// Sets content encoding for this file. /// - /// Must be used with [`actix_web::middleware::Compress`] to take effect. + /// This prevents the `Compress` middleware from modifying the file contents and signals to + /// browsers/clients how to decode it. For example, if serving a compressed HTML file (e.g., + /// `index.html.gz`) then use `.set_content_encoding(ContentEncoding::Gzip)`. #[inline] pub fn set_content_encoding(mut self, enc: ContentEncoding) -> Self { self.encoding = Some(enc); diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 6047a6bc5..38bec78ba 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -1,6 +1,35 @@ # Changes ## Unreleased - 2021-xx-xx +### Added +- Implement `Default` for `KeepAlive`. [#2611] +- Implement `From` for `KeepAlive`. [#2611] +- Implement `From>` for `KeepAlive`. [#2611] +- Implement `Default` for `HttpServiceBuilder`. [#2611] +- Crate `ws` feature flag, disabled by default. [#2618] +- Crate `http2` feature flag, disabled by default. [#2618] + +### Changed +- Rename `ServiceConfig::{client_timer_expire => client_request_deadline}`. [#2611] +- Rename `ServiceConfig::{client_disconnect_timer => client_disconnect_deadline}`. [#2611] +- Deadline methods in `ServiceConfig` now return `std::time::Instant`s instead of Tokio's wrapper type. [#2611] +- Rename `h1::Codec::{keepalive => keep_alive}`. [#2611] +- Rename `h1::Codec::{keepalive_enabled => keep_alive_enabled}`. [#2611] +- Rename `h1::ClientCodec::{keepalive => keep_alive}`. [#2611] +- Rename `h1::ClientPayloadCodec::{keepalive => keep_alive}`. [#2611] +- `ServiceConfig::keep_alive` now returns a `KeepAlive`. [#2611] + +### Fixed +- HTTP/1.1 dispatcher correctly uses client request timeout. [#2611] + +### Removed +- `ServiceConfig::{client_timer, keep_alive_timer}`. [#2611] +- `impl From for KeepAlive`; use `Duration`s instead. [#2611] +- `impl From> for KeepAlive`; use `Duration`s instead. [#2611] +- `HttpServiceBuilder::new`; use `default` instead. [#2611] + +[#2611]: https://github.com/actix/actix-web/pull/2611 +[#2618]: https://github.com/actix/actix-web/pull/2618 ## 3.0.0-beta.19 - 2022-01-21 diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index e93d1b7af..f68eda074 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -29,54 +29,69 @@ path = "src/lib.rs" [features] default = [] -# openssl +# HTTP/2 protocol support +http2 = ["h2"] + +# WebSocket protocol implementation +ws = [ + "local-channel", + "base64", + "rand", + "sha-1", +] + +# TLS via OpenSSL openssl = ["actix-tls/accept", "actix-tls/openssl"] -# rustls support +# TLS via Rustls rustls = ["actix-tls/accept", "actix-tls/rustls"] -# enable compression support -compress-brotli = ["brotli", "__compress"] -compress-gzip = ["flate2", "__compress"] -compress-zstd = ["zstd", "__compress"] +# Compression codecs +compress-brotli = ["__compress", "brotli"] +compress-gzip = ["__compress", "flate2"] +compress-zstd = ["__compress", "zstd"] # Internal (PRIVATE!) features used to aid testing and cheking feature status. -# Don't rely on these whatsoever. They may disappear at anytime. +# Don't rely on these whatsoever. They are semver-exempt and may disappear at anytime. __compress = [] [dependencies] -actix-service = "2.0.0" +actix-service = "2" actix-codec = "0.4.1" -actix-utils = "3.0.0" +actix-utils = "3" actix-rt = { version = "2.2", default-features = false } ahash = "0.7" -base64 = "0.13" bitflags = "1.2" bytes = "1" bytestring = "1" derive_more = "0.99.5" encoding_rs = "0.8" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } -h2 = "0.3.9" http = "0.2.5" httparse = "1.5.1" httpdate = "1.0.1" itoa = "1" language-tags = "0.3" -local-channel = "0.1" log = "0.4" mime = "0.3" percent-encoding = "2.1" pin-project-lite = "0.2" -rand = "0.8" -sha-1 = "0.10" smallvec = "1.6.1" -# tls +# http2 +h2 = { version = "0.3.9", optional = true } + +# websockets +local-channel = { version = "0.1", optional = true } +base64 = { version = "0.13", optional = true } +rand = { version = "0.8", optional = true } +sha-1 = { version = "0.10", optional = true } + +# openssl/rustls actix-tls = { version = "3.0.0", default-features = false, optional = true } -# compression +# compress-* brotli = { version = "3.3.3", optional = true } flate2 = { version = "1.0.13", optional = true } zstd = { version = "0.9", optional = true } @@ -92,6 +107,7 @@ criterion = { version = "0.3", features = ["html_reports"] } env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } memchr = "2.4" +once_cell = "1.9" rcgen = "0.8" regex = "1.3" rustls-pemfile = "0.2" diff --git a/actix-http/examples/bench.rs b/actix-http/examples/bench.rs new file mode 100644 index 000000000..e41c0bb4f --- /dev/null +++ b/actix-http/examples/bench.rs @@ -0,0 +1,27 @@ +use std::{convert::Infallible, io, time::Duration}; + +use actix_http::{HttpService, Request, Response, StatusCode}; +use actix_server::Server; +use once_cell::sync::Lazy; + +static STR: Lazy = Lazy::new(|| "HELLO WORLD ".repeat(20)); + +#[actix_rt::main] +async fn main() -> io::Result<()> { + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + + Server::build() + .bind("dispatcher-benchmark", ("127.0.0.1", 8080), || { + HttpService::build() + .client_request_timeout(Duration::from_secs(1)) + .finish(|_: Request| async move { + let mut res = Response::build(StatusCode::OK); + Ok::<_, Infallible>(res.body(&**STR)) + }) + .tcp() + })? + // limiting number of workers so that bench client is not sharing as many resources + .workers(4) + .run() + .await +} diff --git a/actix-http/examples/echo.rs b/actix-http/examples/echo.rs index f9188ed9f..58de64530 100644 --- a/actix-http/examples/echo.rs +++ b/actix-http/examples/echo.rs @@ -1,4 +1,4 @@ -use std::io; +use std::{io, time::Duration}; use actix_http::{Error, HttpService, Request, Response, StatusCode}; use actix_server::Server; @@ -13,8 +13,8 @@ async fn main() -> io::Result<()> { Server::build() .bind("echo", ("127.0.0.1", 8080), || { HttpService::build() - .client_timeout(1000) - .client_disconnect(1000) + .client_request_timeout(Duration::from_secs(1)) + .client_disconnect_timeout(Duration::from_secs(1)) // handles HTTP/1.1 and HTTP/2 .finish(|mut req: Request| async move { let mut body = BytesMut::new(); diff --git a/actix-http/examples/h2spec.rs b/actix-http/examples/h2spec.rs new file mode 100644 index 000000000..4ab426c6c --- /dev/null +++ b/actix-http/examples/h2spec.rs @@ -0,0 +1,25 @@ +use std::{convert::Infallible, io}; + +use actix_http::{HttpService, Request, Response, StatusCode}; +use actix_server::Server; +use once_cell::sync::Lazy; + +static STR: Lazy = Lazy::new(|| "HELLO WORLD ".repeat(100)); + +#[actix_rt::main] +async fn main() -> io::Result<()> { + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + + Server::build() + .bind("h2spec", ("127.0.0.1", 8080), || { + HttpService::build() + .h2(|_: Request| async move { + let mut res = Response::build(StatusCode::OK); + Ok::<_, Infallible>(res.body(&**STR)) + }) + .tcp() + })? + .workers(4) + .run() + .await +} diff --git a/actix-http/examples/hello-world.rs b/actix-http/examples/hello-world.rs index a29903cc4..1a83d4d9c 100644 --- a/actix-http/examples/hello-world.rs +++ b/actix-http/examples/hello-world.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible, io}; +use std::{convert::Infallible, io, time::Duration}; use actix_http::{ header::HeaderValue, HttpMessage, HttpService, Request, Response, StatusCode, @@ -12,8 +12,8 @@ async fn main() -> io::Result<()> { Server::build() .bind("hello-world", ("127.0.0.1", 8080), || { HttpService::build() - .client_timeout(1000) - .client_disconnect(1000) + .client_request_timeout(Duration::from_secs(1)) + .client_disconnect_timeout(Duration::from_secs(1)) .on_connect_ext(|_, ext| { ext.insert(42u32); }) diff --git a/actix-http/src/builder.rs b/actix-http/src/builder.rs index 408ee7924..526a23d53 100644 --- a/actix-http/src/builder.rs +++ b/actix-http/src/builder.rs @@ -1,25 +1,22 @@ -use std::{fmt, marker::PhantomData, net, rc::Rc}; +use std::{fmt, marker::PhantomData, net, rc::Rc, time::Duration}; use actix_codec::Framed; use actix_service::{IntoServiceFactory, Service, ServiceFactory}; use crate::{ body::{BoxBody, MessageBody}, - config::{KeepAlive, ServiceConfig}, h1::{self, ExpectHandler, H1Service, UpgradeHandler}, - h2::H2Service, service::HttpService, - ConnectCallback, Extensions, Request, Response, + ConnectCallback, Extensions, KeepAlive, Request, Response, ServiceConfig, }; -/// A HTTP service builder +/// An HTTP service builder. /// -/// This type can be used to construct an instance of [`HttpService`] through a -/// builder-like pattern. +/// This type can construct an instance of [`HttpService`] through a builder-like pattern. pub struct HttpServiceBuilder { keep_alive: KeepAlive, - client_timeout: u64, - client_disconnect: u64, + client_request_timeout: Duration, + client_disconnect_timeout: Duration, secure: bool, local_addr: Option, expect: X, @@ -28,22 +25,23 @@ pub struct HttpServiceBuilder { _phantom: PhantomData, } -impl HttpServiceBuilder +impl Default for HttpServiceBuilder where S: ServiceFactory, S::Error: Into> + 'static, S::InitError: fmt::Debug, >::Future: 'static, { - /// Create instance of `ServiceConfigBuilder` - #[allow(clippy::new_without_default)] - pub fn new() -> Self { + fn default() -> Self { HttpServiceBuilder { - keep_alive: KeepAlive::Timeout(5), - client_timeout: 5000, - client_disconnect: 0, + // ServiceConfig parts (make sure defaults match) + keep_alive: KeepAlive::default(), + client_request_timeout: Duration::from_secs(5), + client_disconnect_timeout: Duration::ZERO, secure: false, local_addr: None, + + // dispatcher parts expect: ExpectHandler, upgrade: None, on_connect_ext: None, @@ -65,9 +63,11 @@ where U::Error: fmt::Display, U::InitError: fmt::Debug, { - /// Set server keep-alive setting. + /// Set connection keep-alive setting. /// - /// By default keep alive is set to a 5 seconds. + /// Applies to HTTP/1.1 keep-alive and HTTP/2 ping-pong. + /// + /// By default keep-alive is 5 seconds. pub fn keep_alive>(mut self, val: W) -> Self { self.keep_alive = val.into(); self @@ -85,33 +85,45 @@ where self } - /// Set server client timeout in milliseconds for first request. + /// Set client request timeout (for first request). /// - /// Defines a timeout for reading client request header. If a client does not transmit - /// the entire set headers within this time, the request is terminated with - /// the 408 (Request Time-out) error. + /// Defines a timeout for reading client request header. If the client does not transmit the + /// request head within this duration, the connection is terminated with a `408 Request Timeout` + /// response error. /// - /// To disable timeout set value to 0. + /// A duration of zero disables the timeout. /// - /// By default client timeout is set to 5000 milliseconds. - pub fn client_timeout(mut self, val: u64) -> Self { - self.client_timeout = val; + /// By default, the client timeout is 5 seconds. + pub fn client_request_timeout(mut self, dur: Duration) -> Self { + self.client_request_timeout = dur; self } - /// Set server connection disconnect timeout in milliseconds. + #[doc(hidden)] + #[deprecated(since = "3.0.0", note = "Renamed to `client_request_timeout`.")] + pub fn client_timeout(self, dur: Duration) -> Self { + self.client_request_timeout(dur) + } + + /// Set client connection disconnect timeout. /// /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete /// within this time, the request get dropped. This timeout affects secure connections. /// - /// To disable timeout set value to 0. + /// A duration of zero disables the timeout. /// - /// By default disconnect timeout is set to 0. - pub fn client_disconnect(mut self, val: u64) -> Self { - self.client_disconnect = val; + /// By default, the disconnect timeout is disabled. + pub fn client_disconnect_timeout(mut self, dur: Duration) -> Self { + self.client_disconnect_timeout = dur; self } + #[doc(hidden)] + #[deprecated(since = "3.0.0", note = "Renamed to `client_disconnect_timeout`.")] + pub fn client_disconnect(self, dur: Duration) -> Self { + self.client_disconnect_timeout(dur) + } + /// Provide service for `EXPECT: 100-Continue` support. /// /// Service get called with request that contains `EXPECT` header. @@ -126,8 +138,8 @@ where { HttpServiceBuilder { keep_alive: self.keep_alive, - client_timeout: self.client_timeout, - client_disconnect: self.client_disconnect, + client_request_timeout: self.client_request_timeout, + client_disconnect_timeout: self.client_disconnect_timeout, secure: self.secure, local_addr: self.local_addr, expect: expect.into_factory(), @@ -150,8 +162,8 @@ where { HttpServiceBuilder { keep_alive: self.keep_alive, - client_timeout: self.client_timeout, - client_disconnect: self.client_disconnect, + client_request_timeout: self.client_request_timeout, + client_disconnect_timeout: self.client_disconnect_timeout, secure: self.secure, local_addr: self.local_addr, expect: self.expect, @@ -185,8 +197,8 @@ where { let cfg = ServiceConfig::new( self.keep_alive, - self.client_timeout, - self.client_disconnect, + self.client_request_timeout, + self.client_disconnect_timeout, self.secure, self.local_addr, ); @@ -198,7 +210,8 @@ where } /// Finish service configuration and create a HTTP service for HTTP/2 protocol. - pub fn h2(self, service: F) -> H2Service + #[cfg(feature = "http2")] + pub fn h2(self, service: F) -> crate::h2::H2Service where F: IntoServiceFactory, S::Error: Into> + 'static, @@ -209,13 +222,14 @@ where { let cfg = ServiceConfig::new( self.keep_alive, - self.client_timeout, - self.client_disconnect, + self.client_request_timeout, + self.client_disconnect_timeout, self.secure, self.local_addr, ); - H2Service::with_config(cfg, service.into_factory()).on_connect_ext(self.on_connect_ext) + crate::h2::H2Service::with_config(cfg, service.into_factory()) + .on_connect_ext(self.on_connect_ext) } /// Finish service configuration and create `HttpService` instance. @@ -230,8 +244,8 @@ where { let cfg = ServiceConfig::new( self.keep_alive, - self.client_timeout, - self.client_disconnect, + self.client_request_timeout, + self.client_disconnect_timeout, self.secure, self.local_addr, ); diff --git a/actix-http/src/config.rs b/actix-http/src/config.rs index b6d5a7d51..8045910be 100644 --- a/actix-http/src/config.rs +++ b/actix-http/src/config.rs @@ -1,71 +1,36 @@ use std::{ - cell::Cell, - fmt::{self, Write}, net, rc::Rc, - time::{Duration, SystemTime}, + time::{Duration, Instant}, }; -use actix_rt::{ - task::JoinHandle, - time::{interval, sleep_until, Instant, Sleep}, -}; use bytes::BytesMut; -/// "Sun, 06 Nov 1994 08:49:37 GMT".len() -pub(crate) const DATE_VALUE_LENGTH: usize = 29; +use crate::{date::DateService, KeepAlive}; -#[derive(Debug, PartialEq, Clone, Copy)] -/// Server keep-alive setting -pub enum KeepAlive { - /// Keep alive in seconds - Timeout(usize), - - /// Rely on OS to shutdown tcp connection - Os, - - /// Disabled - Disabled, -} - -impl From for KeepAlive { - fn from(keepalive: usize) -> Self { - KeepAlive::Timeout(keepalive) - } -} - -impl From> for KeepAlive { - fn from(keepalive: Option) -> Self { - if let Some(keepalive) = keepalive { - KeepAlive::Timeout(keepalive) - } else { - KeepAlive::Disabled - } - } -} - -/// Http service configuration +/// HTTP service configuration. +#[derive(Debug, Clone)] pub struct ServiceConfig(Rc); +#[derive(Debug)] struct Inner { - keep_alive: Option, - client_timeout: u64, - client_disconnect: u64, - ka_enabled: bool, + keep_alive: KeepAlive, + client_request_timeout: Duration, + client_disconnect_timeout: Duration, secure: bool, local_addr: Option, date_service: DateService, } -impl Clone for ServiceConfig { - fn clone(&self) -> Self { - ServiceConfig(self.0.clone()) - } -} - impl Default for ServiceConfig { fn default() -> Self { - Self::new(KeepAlive::Timeout(5), 0, 0, false, None) + Self::new( + KeepAlive::default(), + Duration::from_secs(5), + Duration::ZERO, + false, + None, + ) } } @@ -73,34 +38,22 @@ impl ServiceConfig { /// Create instance of `ServiceConfig` pub fn new( keep_alive: KeepAlive, - client_timeout: u64, - client_disconnect: u64, + client_request_timeout: Duration, + client_disconnect_timeout: Duration, secure: bool, local_addr: Option, ) -> ServiceConfig { - let (keep_alive, ka_enabled) = match keep_alive { - KeepAlive::Timeout(val) => (val as u64, true), - KeepAlive::Os => (0, true), - KeepAlive::Disabled => (0, false), - }; - let keep_alive = if ka_enabled && keep_alive > 0 { - Some(Duration::from_secs(keep_alive)) - } else { - None - }; - ServiceConfig(Rc::new(Inner { - keep_alive, - ka_enabled, - client_timeout, - client_disconnect, + keep_alive: keep_alive.normalize(), + client_request_timeout, + client_disconnect_timeout, secure, local_addr, date_service: DateService::new(), })) } - /// Returns true if connection is secure (HTTPS) + /// Returns `true` if connection is secure (i.e., using TLS / HTTPS). #[inline] pub fn secure(&self) -> bool { self.0.secure @@ -114,239 +67,92 @@ impl ServiceConfig { self.0.local_addr } - /// Keep alive duration if configured. + /// Connection keep-alive setting. #[inline] - pub fn keep_alive(&self) -> Option { + pub fn keep_alive(&self) -> KeepAlive { self.0.keep_alive } - /// Return state of connection keep-alive functionality - #[inline] - pub fn keep_alive_enabled(&self) -> bool { - self.0.ka_enabled - } - - /// Client timeout for first request. - #[inline] - pub fn client_timer(&self) -> Option { - let delay_time = self.0.client_timeout; - if delay_time != 0 { - Some(sleep_until(self.now() + Duration::from_millis(delay_time))) - } else { - None + /// Creates a time object representing the deadline for this connection's keep-alive period, if + /// enabled. + /// + /// When [`KeepAlive::Os`] or [`KeepAlive::Disabled`] is set, this will return `None`. + pub fn keep_alive_deadline(&self) -> Option { + match self.keep_alive() { + KeepAlive::Timeout(dur) => Some(self.now() + dur), + KeepAlive::Os => None, + KeepAlive::Disabled => None, } } - /// Client timeout for first request. - pub fn client_timer_expire(&self) -> Option { - let delay = self.0.client_timeout; - if delay != 0 { - Some(self.now() + Duration::from_millis(delay)) - } else { - None - } + /// Creates a time object representing the deadline for the client to finish sending the head of + /// its first request. + /// + /// Returns `None` if this `ServiceConfig was` constructed with `client_request_timeout: 0`. + pub fn client_request_deadline(&self) -> Option { + let timeout = self.0.client_request_timeout; + (timeout != Duration::ZERO).then(|| self.now() + timeout) } - /// Client disconnect timer - pub fn client_disconnect_timer(&self) -> Option { - let delay = self.0.client_disconnect; - if delay != 0 { - Some(self.now() + Duration::from_millis(delay)) - } else { - None - } + /// Creates a time object representing the deadline for the client to disconnect. + pub fn client_disconnect_deadline(&self) -> Option { + let timeout = self.0.client_disconnect_timeout; + (timeout != Duration::ZERO).then(|| self.now() + timeout) } - /// Return keep-alive timer delay is configured. - #[inline] - pub fn keep_alive_timer(&self) -> Option { - self.keep_alive().map(|ka| sleep_until(self.now() + ka)) - } - - /// Keep-alive expire time - pub fn keep_alive_expire(&self) -> Option { - self.keep_alive().map(|ka| self.now() + ka) - } - - #[inline] pub(crate) fn now(&self) -> Instant { self.0.date_service.now() } - #[doc(hidden)] - pub fn set_date(&self, dst: &mut BytesMut, camel_case: bool) { + pub(crate) fn write_date_header(&self, dst: &mut BytesMut, camel_case: bool) { let mut buf: [u8; 39] = [0; 39]; buf[..6].copy_from_slice(if camel_case { b"Date: " } else { b"date: " }); self.0 .date_service - .set_date(|date| buf[6..35].copy_from_slice(&date.bytes)); + .with_date(|date| buf[6..35].copy_from_slice(&date.bytes)); buf[35..].copy_from_slice(b"\r\n\r\n"); dst.extend_from_slice(&buf); } - pub(crate) fn set_date_header(&self, dst: &mut BytesMut) { + #[allow(unused)] // used with `http2` feature flag + pub(crate) fn write_date_header_value(&self, dst: &mut BytesMut) { self.0 .date_service - .set_date(|date| dst.extend_from_slice(&date.bytes)); - } -} - -#[derive(Copy, Clone)] -struct Date { - bytes: [u8; DATE_VALUE_LENGTH], - pos: usize, -} - -impl Date { - fn new() -> Date { - let mut date = Date { - bytes: [0; DATE_VALUE_LENGTH], - pos: 0, - }; - date.update(); - date - } - - fn update(&mut self) { - self.pos = 0; - write!(self, "{}", httpdate::fmt_http_date(SystemTime::now())).unwrap(); - } -} - -impl fmt::Write for Date { - fn write_str(&mut self, s: &str) -> fmt::Result { - let len = s.len(); - self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes()); - self.pos += len; - Ok(()) - } -} - -/// Service for update Date and Instant periodically at 500 millis interval. -struct DateService { - current: Rc>, - handle: JoinHandle<()>, -} - -impl Drop for DateService { - fn drop(&mut self) { - // stop the timer update async task on drop. - self.handle.abort(); - } -} - -impl DateService { - fn new() -> Self { - // shared date and timer for DateService and update async task. - let current = Rc::new(Cell::new((Date::new(), Instant::now()))); - let current_clone = Rc::clone(¤t); - // spawn an async task sleep for 500 milli and update current date/timer in a loop. - // handle is used to stop the task on DateService drop. - let handle = actix_rt::spawn(async move { - #[cfg(test)] - let _notify = notify_on_drop::NotifyOnDrop::new(); - - let mut interval = interval(Duration::from_millis(500)); - loop { - let now = interval.tick().await; - let date = Date::new(); - current_clone.set((date, now)); - } - }); - - DateService { current, handle } - } - - fn now(&self) -> Instant { - self.current.get().1 - } - - fn set_date(&self, mut f: F) { - f(&self.current.get().0); - } -} - -// TODO: move to a util module for testing all spawn handle drop style tasks. -/// Test Module for checking the drop state of certain async tasks that are spawned -/// with `actix_rt::spawn` -/// -/// The target task must explicitly generate `NotifyOnDrop` when spawn the task -#[cfg(test)] -mod notify_on_drop { - use std::cell::RefCell; - - thread_local! { - static NOTIFY_DROPPED: RefCell> = RefCell::new(None); - } - - /// Check if the spawned task is dropped. - /// - /// # Panics - /// Panics when there was no `NotifyOnDrop` instance on current thread. - pub(crate) fn is_dropped() -> bool { - NOTIFY_DROPPED.with(|bool| { - bool.borrow() - .expect("No NotifyOnDrop existed on current thread") - }) - } - - pub(crate) struct NotifyOnDrop; - - impl NotifyOnDrop { - /// # Panic: - /// - /// When construct multiple instances on any given thread. - pub(crate) fn new() -> Self { - NOTIFY_DROPPED.with(|bool| { - let mut bool = bool.borrow_mut(); - if bool.is_some() { - panic!("NotifyOnDrop existed on current thread"); - } else { - *bool = Some(false); - } - }); - - NotifyOnDrop - } - } - - impl Drop for NotifyOnDrop { - fn drop(&mut self) { - NOTIFY_DROPPED.with(|bool| { - if let Some(b) = bool.borrow_mut().as_mut() { - *b = true; - } - }); - } + .with_date(|date| dst.extend_from_slice(&date.bytes)); } } #[cfg(test)] mod tests { use super::*; + use crate::{date::DATE_VALUE_LENGTH, notify_on_drop}; - use actix_rt::{task::yield_now, time::sleep}; + use actix_rt::{ + task::yield_now, + time::{sleep, sleep_until}, + }; use memchr::memmem; #[actix_rt::test] async fn test_date_service_update() { - let settings = ServiceConfig::new(KeepAlive::Os, 0, 0, false, None); + let settings = + ServiceConfig::new(KeepAlive::Os, Duration::ZERO, Duration::ZERO, false, None); yield_now().await; let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf1, false); + settings.write_date_header(&mut buf1, false); let now1 = settings.now(); - sleep_until(Instant::now() + Duration::from_secs(2)).await; + sleep_until((Instant::now() + Duration::from_secs(2)).into()).await; yield_now().await; let now2 = settings.now(); let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf2, false); + settings.write_date_header(&mut buf2, false); assert_ne!(now1, now2); @@ -402,10 +208,10 @@ mod tests { let settings = ServiceConfig::default(); let mut buf1 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf1, false); + settings.write_date_header(&mut buf1, false); let mut buf2 = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf2, false); + settings.write_date_header(&mut buf2, false); assert_eq!(buf1, buf2); } @@ -415,11 +221,11 @@ mod tests { let settings = ServiceConfig::default(); let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf, false); + settings.write_date_header(&mut buf, false); assert!(memmem::find(&buf, b"date:").is_some()); let mut buf = BytesMut::with_capacity(DATE_VALUE_LENGTH + 10); - settings.set_date(&mut buf, true); + settings.write_date_header(&mut buf, true); assert!(memmem::find(&buf, b"Date:").is_some()); } } diff --git a/actix-http/src/date.rs b/actix-http/src/date.rs new file mode 100644 index 000000000..1358bbd8c --- /dev/null +++ b/actix-http/src/date.rs @@ -0,0 +1,92 @@ +use std::{ + cell::Cell, + fmt::{self, Write}, + rc::Rc, + time::{Duration, Instant, SystemTime}, +}; + +use actix_rt::{task::JoinHandle, time::interval}; + +/// "Thu, 01 Jan 1970 00:00:00 GMT".len() +pub(crate) const DATE_VALUE_LENGTH: usize = 29; + +#[derive(Clone, Copy)] +pub(crate) struct Date { + pub(crate) bytes: [u8; DATE_VALUE_LENGTH], + pos: usize, +} + +impl Date { + fn new() -> Date { + let mut date = Date { + bytes: [0; DATE_VALUE_LENGTH], + pos: 0, + }; + date.update(); + date + } + + fn update(&mut self) { + self.pos = 0; + write!(self, "{}", httpdate::fmt_http_date(SystemTime::now())).unwrap(); + } +} + +impl fmt::Write for Date { + fn write_str(&mut self, s: &str) -> fmt::Result { + let len = s.len(); + self.bytes[self.pos..self.pos + len].copy_from_slice(s.as_bytes()); + self.pos += len; + Ok(()) + } +} + +/// Service for update Date and Instant periodically at 500 millis interval. +pub(crate) struct DateService { + current: Rc>, + handle: JoinHandle<()>, +} + +impl DateService { + pub(crate) fn new() -> Self { + // shared date and timer for DateService and update async task. + let current = Rc::new(Cell::new((Date::new(), Instant::now()))); + let current_clone = Rc::clone(¤t); + // spawn an async task sleep for 500 millis and update current date/timer in a loop. + // handle is used to stop the task on DateService drop. + let handle = actix_rt::spawn(async move { + #[cfg(test)] + let _notify = crate::notify_on_drop::NotifyOnDrop::new(); + + let mut interval = interval(Duration::from_millis(500)); + loop { + let now = interval.tick().await; + let date = Date::new(); + current_clone.set((date, now.into_std())); + } + }); + + DateService { current, handle } + } + + pub(crate) fn now(&self) -> Instant { + self.current.get().1 + } + + pub(crate) fn with_date(&self, mut f: F) { + f(&self.current.get().0); + } +} + +impl fmt::Debug for DateService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DateService").finish_non_exhaustive() + } +} + +impl Drop for DateService { + fn drop(&mut self) { + // stop the timer update async task on drop. + self.handle.abort(); + } +} diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index 116fe76ab..2f104ee8f 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -352,7 +352,7 @@ impl ContentEncoder { ContentEncoder::Brotli(ref mut encoder) => match encoder.write_all(data) { Ok(_) => Ok(()), Err(err) => { - trace!("Error decoding br encoding: {}", err); + log::trace!("Error decoding br encoding: {}", err); Err(err) } }, @@ -361,7 +361,7 @@ impl ContentEncoder { ContentEncoder::Gzip(ref mut encoder) => match encoder.write_all(data) { Ok(_) => Ok(()), Err(err) => { - trace!("Error decoding gzip encoding: {}", err); + log::trace!("Error decoding gzip encoding: {}", err); Err(err) } }, @@ -370,7 +370,7 @@ impl ContentEncoder { ContentEncoder::Deflate(ref mut encoder) => match encoder.write_all(data) { Ok(_) => Ok(()), Err(err) => { - trace!("Error decoding deflate encoding: {}", err); + log::trace!("Error decoding deflate encoding: {}", err); Err(err) } }, @@ -379,7 +379,7 @@ impl ContentEncoder { ContentEncoder::Zstd(ref mut encoder) => match encoder.write_all(data) { Ok(_) => Ok(()), Err(err) => { - trace!("Error decoding ztsd encoding: {}", err); + log::trace!("Error decoding ztsd encoding: {}", err); Err(err) } }, diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index df6d3813a..841322861 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -5,7 +5,7 @@ use std::{error::Error as StdError, fmt, io, str::Utf8Error, string::FromUtf8Err use derive_more::{Display, Error, From}; use http::{uri::InvalidUri, StatusCode}; -use crate::{body::BoxBody, ws, Response}; +use crate::{body::BoxBody, Response}; pub use http::Error as HttpError; @@ -61,6 +61,7 @@ impl Error { Self::new(Kind::Encoder) } + #[allow(unused)] // used with `ws` feature flag pub(crate) fn new_ws() -> Self { Self::new(Kind::Ws) } @@ -139,14 +140,16 @@ impl From for Error { } } -impl From for Error { - fn from(err: ws::HandshakeError) -> Self { +#[cfg(feature = "ws")] +impl From for Error { + fn from(err: crate::ws::HandshakeError) -> Self { Self::new_ws().with_cause(err) } } -impl From for Error { - fn from(err: ws::ProtocolError) -> Self { +#[cfg(feature = "ws")] +impl From for Error { + fn from(err: crate::ws::ProtocolError) -> Self { Self::new_ws().with_cause(err) } } @@ -277,8 +280,9 @@ pub enum PayloadError { UnknownLength, /// HTTP/2 payload error. + #[cfg(feature = "http2")] #[display(fmt = "{}", _0)] - Http2Payload(h2::Error), + Http2Payload(::h2::Error), /// Generic I/O error. #[display(fmt = "{}", _0)] @@ -293,14 +297,16 @@ impl std::error::Error for PayloadError { PayloadError::EncodingCorrupted => None, PayloadError::Overflow => None, PayloadError::UnknownLength => None, + #[cfg(feature = "http2")] PayloadError::Http2Payload(err) => Some(err as &dyn std::error::Error), PayloadError::Io(err) => Some(err as &dyn std::error::Error), } } } -impl From for PayloadError { - fn from(err: h2::Error) -> Self { +#[cfg(feature = "http2")] +impl From<::h2::Error> for PayloadError { + fn from(err: ::h2::Error) -> Self { PayloadError::Http2Payload(err) } } @@ -356,6 +362,7 @@ pub enum DispatchError { /// HTTP/2 error. #[display(fmt = "{}", _0)] + #[cfg(feature = "http2")] H2(h2::Error), /// The first request did not complete within the specified timeout. @@ -379,7 +386,10 @@ impl StdError for DispatchError { DispatchError::Body(err) => Some(&**err), DispatchError::Io(err) => Some(err), DispatchError::Parse(err) => Some(err), + + #[cfg(feature = "http2")] DispatchError::H2(err) => Some(err), + _ => None, } } diff --git a/actix-http/src/h1/client.rs b/actix-http/src/h1/client.rs index 9bd896ae0..4e0ae8f48 100644 --- a/actix-http/src/h1/client.rs +++ b/actix-http/src/h1/client.rs @@ -1,4 +1,4 @@ -use std::io; +use std::{fmt, io}; use actix_codec::{Decoder, Encoder}; use bitflags::bitflags; @@ -17,9 +17,9 @@ use crate::{ bitflags! { struct Flags: u8 { - const HEAD = 0b0000_0001; - const KEEPALIVE_ENABLED = 0b0000_1000; - const STREAM = 0b0001_0000; + const HEAD = 0b0000_0001; + const KEEP_ALIVE_ENABLED = 0b0000_1000; + const STREAM = 0b0001_0000; } } @@ -38,7 +38,7 @@ struct ClientCodecInner { decoder: decoder::MessageDecoder, payload: Option, version: Version, - ctype: ConnectionType, + conn_type: ConnectionType, // encoder part flags: Flags, @@ -51,23 +51,32 @@ impl Default for ClientCodec { } } +impl fmt::Debug for ClientCodec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("h1::ClientCodec") + .field("flags", &self.inner.flags) + .finish_non_exhaustive() + } +} + impl ClientCodec { /// Create HTTP/1 codec. /// /// `keepalive_enabled` how response `connection` header get generated. pub fn new(config: ServiceConfig) -> Self { - let flags = if config.keep_alive_enabled() { - Flags::KEEPALIVE_ENABLED + let flags = if config.keep_alive().enabled() { + Flags::KEEP_ALIVE_ENABLED } else { Flags::empty() }; + ClientCodec { inner: ClientCodecInner { config, decoder: decoder::MessageDecoder::default(), payload: None, version: Version::HTTP_11, - ctype: ConnectionType::Close, + conn_type: ConnectionType::Close, flags, encoder: encoder::MessageEncoder::default(), @@ -77,12 +86,12 @@ impl ClientCodec { /// Check if request is upgrade pub fn upgrade(&self) -> bool { - self.inner.ctype == ConnectionType::Upgrade + self.inner.conn_type == ConnectionType::Upgrade } /// Check if last response is keep-alive - pub fn keepalive(&self) -> bool { - self.inner.ctype == ConnectionType::KeepAlive + pub fn keep_alive(&self) -> bool { + self.inner.conn_type == ConnectionType::KeepAlive } /// Check last request's message type @@ -104,8 +113,8 @@ impl ClientCodec { impl ClientPayloadCodec { /// Check if last response is keep-alive - pub fn keepalive(&self) -> bool { - self.inner.ctype == ConnectionType::KeepAlive + pub fn keep_alive(&self) -> bool { + self.inner.conn_type == ConnectionType::KeepAlive } /// Transform payload codec to a message codec @@ -122,12 +131,12 @@ impl Decoder for ClientCodec { debug_assert!(!self.inner.payload.is_some(), "Payload decoder is set"); if let Some((req, payload)) = self.inner.decoder.decode(src)? { - if let Some(ctype) = req.conn_type() { + if let Some(conn_type) = req.conn_type() { // do not use peer's keep-alive - self.inner.ctype = if ctype == ConnectionType::KeepAlive { - self.inner.ctype + self.inner.conn_type = if conn_type == ConnectionType::KeepAlive { + self.inner.conn_type } else { - ctype + conn_type }; } @@ -192,9 +201,9 @@ impl Encoder> for ClientCodec { .set(Flags::HEAD, head.as_ref().method == Method::HEAD); // connection status - inner.ctype = match head.as_ref().connection_type() { + inner.conn_type = match head.as_ref().connection_type() { ConnectionType::KeepAlive => { - if inner.flags.contains(Flags::KEEPALIVE_ENABLED) { + if inner.flags.contains(Flags::KEEP_ALIVE_ENABLED) { ConnectionType::KeepAlive } else { ConnectionType::Close @@ -211,7 +220,7 @@ impl Encoder> for ClientCodec { false, inner.version, length, - inner.ctype, + inner.conn_type, &inner.config, )?; } diff --git a/actix-http/src/h1/codec.rs b/actix-http/src/h1/codec.rs index 9a8907579..df74bcc42 100644 --- a/actix-http/src/h1/codec.rs +++ b/actix-http/src/h1/codec.rs @@ -15,9 +15,9 @@ use crate::{ bitflags! { struct Flags: u8 { - const HEAD = 0b0000_0001; - const KEEPALIVE_ENABLED = 0b0000_0010; - const STREAM = 0b0000_0100; + const HEAD = 0b0000_0001; + const KEEP_ALIVE_ENABLED = 0b0000_0010; + const STREAM = 0b0000_0100; } } @@ -42,7 +42,9 @@ impl Default for Codec { impl fmt::Debug for Codec { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "h1::Codec({:?})", self.flags) + f.debug_struct("h1::Codec") + .field("flags", &self.flags) + .finish_non_exhaustive() } } @@ -51,8 +53,8 @@ impl Codec { /// /// `keepalive_enabled` how response `connection` header get generated. pub fn new(config: ServiceConfig) -> Self { - let flags = if config.keep_alive_enabled() { - Flags::KEEPALIVE_ENABLED + let flags = if config.keep_alive().enabled() { + Flags::KEEP_ALIVE_ENABLED } else { Flags::empty() }; @@ -76,14 +78,14 @@ impl Codec { /// Check if last response is keep-alive. #[inline] - pub fn keepalive(&self) -> bool { + pub fn keep_alive(&self) -> bool { self.conn_type == ConnectionType::KeepAlive } /// Check if keep-alive enabled on server level. #[inline] - pub fn keepalive_enabled(&self) -> bool { - self.flags.contains(Flags::KEEPALIVE_ENABLED) + pub fn keep_alive_enabled(&self) -> bool { + self.flags.contains(Flags::KEEP_ALIVE_ENABLED) } /// Check last request's message type. @@ -124,7 +126,7 @@ impl Decoder for Codec { self.version = head.version; self.conn_type = head.connection_type(); if self.conn_type == ConnectionType::KeepAlive - && !self.flags.contains(Flags::KEEPALIVE_ENABLED) + && !self.flags.contains(Flags::KEEP_ALIVE_ENABLED) { self.conn_type = ConnectionType::Close } @@ -179,9 +181,11 @@ impl Encoder, BodySize)>> for Codec { &self.config, )?; } + Message::Chunk(Some(bytes)) => { self.encoder.encode_chunk(bytes.as_ref(), dst)?; } + Message::Chunk(None) => { self.encoder.encode_eof(dst)?; } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 5b790469f..3f327171d 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -8,13 +8,12 @@ use std::{ task::{Context, Poll}, }; -use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed, FramedParts}; -use actix_rt::time::{sleep_until, Instant, Sleep}; +use actix_codec::{AsyncRead, AsyncWrite, Decoder as _, Encoder as _, Framed, FramedParts}; +use actix_rt::time::sleep_until; use actix_service::Service; use bitflags::bitflags; use bytes::{Buf, BytesMut}; use futures_core::ready; -use log::{error, trace}; use pin_project_lite::pin_project; use crate::{ @@ -29,6 +28,7 @@ use super::{ codec::Codec, decoder::MAX_BUFFER_SIZE, payload::{Payload, PayloadSender, PayloadStatus}, + timer::TimerState, Message, MessageType, }; @@ -38,11 +38,23 @@ const MAX_PIPELINED_MESSAGES: usize = 16; bitflags! { pub struct Flags: u8 { - const STARTED = 0b0000_0001; - const KEEPALIVE = 0b0000_0010; - const SHUTDOWN = 0b0000_0100; - const READ_DISCONNECT = 0b0000_1000; - const WRITE_DISCONNECT = 0b0001_0000; + /// Set when stream is read for first time. + const STARTED = 0b0000_0001; + + /// Set when full request-response cycle has occurred. + const FINISHED = 0b0000_0010; + + /// Set if connection is in keep-alive (inactive) state. + const KEEP_ALIVE = 0b0000_0100; + + /// Set if in shutdown procedure. + const SHUTDOWN = 0b0000_1000; + + /// Set if read-half is disconnected. + const READ_DISCONNECT = 0b0001_0000; + + /// Set if write-half is disconnected. + const WRITE_DISCONNECT = 0b0010_0000; } } @@ -135,6 +147,7 @@ pin_project! { pub(super) flags: Flags, peer_addr: Option, conn_data: Option>, + config: ServiceConfig, error: Option, #[pin] @@ -142,9 +155,9 @@ pin_project! { payload: Option, messages: VecDeque, - ka_expire: Instant, - #[pin] - ka_timer: Option, + head_timer: TimerState, + ka_timer: TimerState, + shutdown_timer: TimerState, pub(super) io: Option, read_buf: BytesMut, @@ -165,7 +178,6 @@ pin_project! { where S: Service, X: Service, - B: MessageBody, { None, @@ -179,16 +191,40 @@ pin_project! { impl State where S: Service, - X: Service, - B: MessageBody, { - fn is_empty(&self) -> bool { + fn is_none(&self) -> bool { matches!(self, State::None) } } +impl fmt::Debug for State +where + S: Service, + X: Service, + B: MessageBody, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::None => write!(f, "State::None"), + Self::ExpectCall { .. } => { + f.debug_struct("State::ExpectCall").finish_non_exhaustive() + } + Self::ServiceCall { .. } => { + f.debug_struct("State::ServiceCall").finish_non_exhaustive() + } + Self::SendPayload { .. } => { + f.debug_struct("State::SendPayload").finish_non_exhaustive() + } + Self::SendErrorPayload { .. } => f + .debug_struct("State::SendErrorPayload") + .finish_non_exhaustive(), + } + } +} + +#[derive(Debug)] enum PollResponse { Upgrade(Request), DoNothing, @@ -219,33 +255,25 @@ where peer_addr: Option, conn_data: OnConnectData, ) -> Self { - let flags = if config.keep_alive_enabled() { - Flags::KEEPALIVE - } else { - Flags::empty() - }; - - // keep-alive timer - let (ka_expire, ka_timer) = match config.keep_alive_timer() { - Some(delay) => (delay.deadline(), Some(delay)), - None => (config.now(), None), - }; - Dispatcher { inner: DispatcherState::Normal { inner: InnerDispatcher { flow, - flags, + flags: Flags::empty(), peer_addr, conn_data: conn_data.0.map(Rc::new), + config: config.clone(), error: None, state: State::None, payload: None, messages: VecDeque::new(), - ka_expire, - ka_timer, + head_timer: TimerState::new(config.client_request_deadline().is_some()), + ka_timer: TimerState::new(config.keep_alive().enabled()), + shutdown_timer: TimerState::new( + config.client_disconnect_deadline().is_some(), + ), io: Some(io), read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE), @@ -286,11 +314,12 @@ where } } - // if checked is set to true, delay disconnect until all tasks have finished. fn client_disconnected(self: Pin<&mut Self>) { let this = self.project(); + this.flags .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT); + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } @@ -306,9 +335,12 @@ where while written < len { match io.as_mut().poll_write(cx, &write_buf[written..])? { Poll::Ready(0) => { - return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, ""))) + log::error!("write zero; closing"); + return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, ""))); } + Poll::Ready(n) => written += n, + Poll::Pending => { write_buf.advance(written); return Poll::Pending; @@ -316,59 +348,70 @@ where } } - // everything has written to io. clear buffer. + // everything has written to I/O; clear buffer write_buf.clear(); - // flush the io and check if get blocked. + // flush the I/O and check if get blocked io.poll_flush(cx) } fn send_response_inner( self: Pin<&mut Self>, - message: Response<()>, + res: Response<()>, body: &impl MessageBody, ) -> Result { - let size = body.size(); let this = self.project(); + + let size = body.size(); + this.codec - .encode(Message::Item((message, size)), this.write_buf) + .encode(Message::Item((res, size)), this.write_buf) .map_err(|err| { if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Incomplete(None)); } + DispatchError::Io(err) })?; - this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); + this.flags.set(Flags::KEEP_ALIVE, this.codec.keep_alive()); Ok(size) } fn send_response( mut self: Pin<&mut Self>, - message: Response<()>, + res: Response<()>, body: B, ) -> Result<(), DispatchError> { - let size = self.as_mut().send_response_inner(message, &body)?; - let state = match size { - BodySize::None | BodySize::Sized(0) => State::None, + let size = self.as_mut().send_response_inner(res, &body)?; + let mut this = self.project(); + this.state.set(match size { + BodySize::None | BodySize::Sized(0) => { + this.flags.insert(Flags::FINISHED); + State::None + } _ => State::SendPayload { body }, - }; - self.project().state.set(state); + }); + Ok(()) } fn send_error_response( mut self: Pin<&mut Self>, - message: Response<()>, + res: Response<()>, body: BoxBody, ) -> Result<(), DispatchError> { - let size = self.as_mut().send_response_inner(message, &body)?; - let state = match size { - BodySize::None | BodySize::Sized(0) => State::None, + let size = self.as_mut().send_response_inner(res, &body)?; + let mut this = self.project(); + this.state.set(match size { + BodySize::None | BodySize::Sized(0) => { + this.flags.insert(Flags::FINISHED); + State::None + } _ => State::SendErrorPayload { body }, - }; - self.project().state.set(state); + }); + Ok(()) } @@ -385,63 +428,66 @@ where 'res: loop { let mut this = self.as_mut().project(); match this.state.as_mut().project() { - // no future is in InnerDispatcher state. pop next message. + // no future is in InnerDispatcher state; pop next message StateProj::None => match this.messages.pop_front() { - // handle request message. + // handle request message Some(DispatcherMessage::Item(req)) => { // Handle `EXPECT: 100-Continue` header if req.head().expect() { - // set InnerDispatcher state and continue loop to poll it. + // set InnerDispatcher state and continue loop to poll it let fut = this.flow.expect.call(req); this.state.set(State::ExpectCall { fut }); } else { - // the same as expect call. + // set InnerDispatcher state and continue loop to poll it let fut = this.flow.service.call(req); this.state.set(State::ServiceCall { fut }); }; } - // handle error message. + // handle error message Some(DispatcherMessage::Error(res)) => { - // send_response would update InnerDispatcher state to SendPayload or - // None(If response body is empty). - // continue loop to poll it. + // send_response would update InnerDispatcher state to SendPayload or None + // (If response body is empty) + // continue loop to poll it self.as_mut().send_error_response(res, BoxBody::new(()))?; } - // return with upgrade request and poll it exclusively. + // return with upgrade request and poll it exclusively Some(DispatcherMessage::Upgrade(req)) => { - return Ok(PollResponse::Upgrade(req)); + return Ok(PollResponse::Upgrade(req)) } - // all messages are dealt with. + // all messages are dealt with None => return Ok(PollResponse::DoNothing), }, - StateProj::ServiceCall { fut } => match fut.poll(cx) { - // service call resolved. send response. - Poll::Ready(Ok(res)) => { - let (res, body) = res.into().replace_body(()); - self.as_mut().send_response(res, body)?; - } - // send service call error as response - Poll::Ready(Err(err)) => { - let res: Response = err.into(); - let (res, body) = res.replace_body(()); - self.as_mut().send_error_response(res, body)?; - } - - // service call pending and could be waiting for more chunk messages. - // (pipeline message limit and/or payload can_read limit) - Poll::Pending => { - // no new message is decoded and no new payload is feed. - // nothing to do except waiting for new incoming data from client. - if !self.as_mut().poll_request(cx)? { - return Ok(PollResponse::DoNothing); + StateProj::ServiceCall { fut } => { + match fut.poll(cx) { + // service call resolved. send response. + Poll::Ready(Ok(res)) => { + let (res, body) = res.into().replace_body(()); + self.as_mut().send_response(res, body)?; + } + + // send service call error as response + Poll::Ready(Err(err)) => { + let res: Response = err.into(); + let (res, body) = res.replace_body(()); + self.as_mut().send_error_response(res, body)?; + } + + // service call pending and could be waiting for more chunk messages + // (pipeline message limit and/or payload can_read limit) + Poll::Pending => { + // no new message is decoded and no new payload is fed + // nothing to do except waiting for new incoming data from client + if !self.as_mut().poll_request(cx)? { + return Ok(PollResponse::DoNothing); + } + // else loop } - // otherwise keep loop. } - }, + } StateProj::SendPayload { mut body } => { // keep populate writer buffer until buffer size limit hit, @@ -455,21 +501,26 @@ where Poll::Ready(None) => { this.codec.encode(Message::Chunk(None), this.write_buf)?; + // payload stream finished. // set state to None and handle next message this.state.set(State::None); + this.flags.insert(Flags::FINISHED); + continue 'res; } Poll::Ready(Some(Err(err))) => { - return Err(DispatchError::Body(err.into())) + this.flags.insert(Flags::FINISHED); + return Err(DispatchError::Body(err.into())); } Poll::Pending => return Ok(PollResponse::DoNothing), } } - // buffer is beyond max size. - // return and try to write the whole buffer to io stream. + + // buffer is beyond max size + // return and try to write the whole buffer to I/O stream. return Ok(PollResponse::DrainWriteBuf); } @@ -487,46 +538,55 @@ where Poll::Ready(None) => { this.codec.encode(Message::Chunk(None), this.write_buf)?; - // payload stream finished. + + // payload stream finished // set state to None and handle next message this.state.set(State::None); + this.flags.insert(Flags::FINISHED); + continue 'res; } Poll::Ready(Some(Err(err))) => { + this.flags.insert(Flags::FINISHED); return Err(DispatchError::Body( Error::new_body().with_cause(err).into(), - )) + )); } Poll::Pending => return Ok(PollResponse::DoNothing), } } - // buffer is beyond max size. - // return and try to write the whole buffer to io stream. + + // buffer is beyond max size + // return and try to write the whole buffer to stream return Ok(PollResponse::DrainWriteBuf); } - StateProj::ExpectCall { fut } => match fut.poll(cx) { - // expect resolved. write continue to buffer and set InnerDispatcher state - // to service call. - Poll::Ready(Ok(req)) => { - this.write_buf - .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); - let fut = this.flow.service.call(req); - this.state.set(State::ServiceCall { fut }); - } + StateProj::ExpectCall { fut } => { + log::trace!(" calling expect service"); - // send expect error as response - Poll::Ready(Err(err)) => { - let res: Response = err.into(); - let (res, body) = res.replace_body(()); - self.as_mut().send_error_response(res, body)?; - } + match fut.poll(cx) { + // expect resolved. write continue to buffer and set InnerDispatcher state + // to service call. + Poll::Ready(Ok(req)) => { + this.write_buf + .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n"); + let fut = this.flow.service.call(req); + this.state.set(State::ServiceCall { fut }); + } - // expect must be solved before progress can be made. - Poll::Pending => return Ok(PollResponse::DoNothing), - }, + // send expect error as response + Poll::Ready(Err(err)) => { + let res: Response = err.into(); + let (res, body) = res.replace_body(()); + self.as_mut().send_error_response(res, body)?; + } + + // expect must be solved before progress can be made. + Poll::Pending => return Ok(PollResponse::DoNothing), + } + } } } } @@ -536,64 +596,76 @@ where req: Request, cx: &mut Context<'_>, ) -> Result<(), DispatchError> { - // Handle `EXPECT: 100-Continue` header - let mut this = self.as_mut().project(); - if req.head().expect() { - // set dispatcher state so the future is pinned. - let fut = this.flow.expect.call(req); - this.state.set(State::ExpectCall { fut }); - } else { - // the same as above. - let fut = this.flow.service.call(req); - this.state.set(State::ServiceCall { fut }); + // initialize dispatcher state + { + let mut this = self.as_mut().project(); + + // Handle `EXPECT: 100-Continue` header + if req.head().expect() { + // set dispatcher state to call expect handler + let fut = this.flow.expect.call(req); + this.state.set(State::ExpectCall { fut }); + } else { + // set dispatcher state to call service handler + let fut = this.flow.service.call(req); + this.state.set(State::ServiceCall { fut }); + }; }; - // eagerly poll the future for once(or twice if expect is resolved immediately). + // eagerly poll the future once (or twice if expect is resolved immediately). loop { match self.as_mut().project().state.project() { StateProj::ExpectCall { fut } => { match fut.poll(cx) { - // expect is resolved. continue loop and poll the service call branch. + // expect is resolved; continue loop and poll the service call branch. Poll::Ready(Ok(req)) => { self.as_mut().send_continue(); + let mut this = self.as_mut().project(); let fut = this.flow.service.call(req); this.state.set(State::ServiceCall { fut }); + continue; } - // future is pending. return Ok(()) to notify that a new state is - // set and the outer loop should be continue. - Poll::Pending => return Ok(()), - // future is error. send response and return a result. On success - // to notify the dispatcher a new state is set and the outer loop - // should be continue. + + // future is error; send response and return a result + // on success to notify the dispatcher a new state is set and the outer loop + // should be continued Poll::Ready(Err(err)) => { let res: Response = err.into(); let (res, body) = res.replace_body(()); return self.send_error_response(res, body); } + + // future is pending; return Ok(()) to notify that a new state is + // set and the outer loop should be continue. + Poll::Pending => return Ok(()), } } + StateProj::ServiceCall { fut } => { // return no matter the service call future's result. return match fut.poll(cx) { - // future is resolved. send response and return a result. On success + // Future is resolved. Send response and return a result. On success // to notify the dispatcher a new state is set and the outer loop // should be continue. Poll::Ready(Ok(res)) => { let (res, body) = res.into().replace_body(()); - self.send_response(res, body) + self.as_mut().send_response(res, body) } - // see the comment on ExpectCall state branch's Pending. + + // see the comment on ExpectCall state branch's Pending Poll::Pending => Ok(()), - // see the comment on ExpectCall state branch's Ready(Err(err)). + + // see the comment on ExpectCall state branch's Ready(Err(_)) Poll::Ready(Err(err)) => { let res: Response = err.into(); let (res, body) = res.replace_body(()); - self.send_error_response(res, body) + self.as_mut().send_error_response(res, body) } }; } + _ => { unreachable!( "State must be set to ServiceCall or ExceptCall in handle_request" @@ -604,72 +676,77 @@ where } /// Process one incoming request. + /// + /// Returns true if any meaningful work was done. fn poll_request( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { + let pipeline_queue_full = self.messages.len() >= MAX_PIPELINED_MESSAGES; + let can_not_read = !self.can_read(cx); + // limit amount of non-processed requests - if self.messages.len() >= MAX_PIPELINED_MESSAGES || !self.can_read(cx) { + if pipeline_queue_full || can_not_read { return Ok(false); } - let mut updated = false; let mut this = self.as_mut().project(); + + let mut updated = false; + loop { match this.codec.decode(this.read_buf) { Ok(Some(msg)) => { updated = true; - this.flags.insert(Flags::STARTED); match msg { Message::Item(mut req) => { + // head timer only applies to first request on connection + this.head_timer.clear(line!()); + req.head_mut().peer_addr = *this.peer_addr; req.conn_data = this.conn_data.as_ref().map(Rc::clone); match this.codec.message_type() { - // Request is upgradable. add upgrade message and break. - // everything remain in read buffer would be handed to + // request has no payload + MessageType::None => {} + + // Request is upgradable. Add upgrade message and break. + // Everything remaining in read buffer will be handed to // upgraded Request. MessageType::Stream if this.flow.upgrade.is_some() => { this.messages.push_back(DispatcherMessage::Upgrade(req)); break; } - // Request is not upgradable. + // request is not upgradable MessageType::Payload | MessageType::Stream => { - /* - PayloadSender and Payload are smart pointers share the - same state. - PayloadSender is attached to dispatcher and used to sink - new chunked request data to state. - Payload is attached to Request and passed to Service::call - where the state can be collected and consumed. - */ + // PayloadSender and Payload are smart pointers share the + // same state. PayloadSender is attached to dispatcher and used + // to sink new chunked request data to state. Payload is + // attached to Request and passed to Service::call where the + // state can be collected and consumed. let (sender, payload) = Payload::create(false); - let (req1, _) = - req.replace_payload(crate::Payload::H1 { payload }); - req = req1; + *req.payload() = crate::Payload::H1 { payload }; *this.payload = Some(sender); } - - // Request has no payload. - MessageType::None => {} } // handle request early when no future in InnerDispatcher state. - if this.state.is_empty() { + if this.state.is_none() { self.as_mut().handle_request(req, cx)?; this = self.as_mut().project(); } else { this.messages.push_back(DispatcherMessage::Item(req)); } } + Message::Chunk(Some(chunk)) => { if let Some(ref mut payload) = this.payload { payload.feed_data(chunk); } else { - error!("Internal server error: unexpected payload chunk"); + log::error!("Internal server error: unexpected payload chunk"); this.flags.insert(Flags::READ_DISCONNECT); this.messages.push_back(DispatcherMessage::Error( Response::internal_server_error().drop_body(), @@ -678,11 +755,12 @@ where break; } } + Message::Chunk(None) => { if let Some(mut payload) = this.payload.take() { payload.feed_eof(); } else { - error!("Internal server error: unexpected eof"); + log::error!("Internal server error: unexpected eof"); this.flags.insert(Flags::READ_DISCONNECT); this.messages.push_back(DispatcherMessage::Error( Response::internal_server_error().drop_body(), @@ -693,38 +771,51 @@ where } } } - // decode is partial and buffer is not full yet. - // break and wait for more read. + + // decode is partial and buffer is not full yet + // break and wait for more read Ok(None) => break, + Err(ParseError::Io(err)) => { + log::trace!("I/O error: {}", &err); self.as_mut().client_disconnected(); this = self.as_mut().project(); *this.error = Some(DispatchError::Io(err)); break; } + Err(ParseError::TooLarge) => { + log::trace!("request head was too big; returning 431 response"); + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::Overflow); } - // Requests overflow buffer size should be responded with 431 + + // request heads that overflow buffer size return a 431 error this.messages .push_back(DispatcherMessage::Error(Response::with_body( StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, (), ))); + this.flags.insert(Flags::READ_DISCONNECT); *this.error = Some(ParseError::TooLarge.into()); + break; } + Err(err) => { + log::trace!("parse error {}", &err); + if let Some(mut payload) = this.payload.take() { payload.set_error(PayloadError::EncodingCorrupted); } - // Malformed requests should be responded with 400 + // malformed requests should be responded with 400 this.messages.push_back(DispatcherMessage::Error( Response::bad_request().drop_body(), )); + this.flags.insert(Flags::READ_DISCONNECT); *this.error = Some(err.into()); break; @@ -732,92 +823,115 @@ where } } - if updated && this.ka_timer.is_some() { - if let Some(expire) = this.codec.config().keep_alive_expire() { - *this.ka_expire = expire; - } - } Ok(updated) } - /// keep-alive timer - fn poll_keepalive( + fn poll_head_timer( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result<(), DispatchError> { - let mut this = self.as_mut().project(); + let this = self.as_mut().project(); - // when a branch is not explicit return early it's meant to fall through - // and return as Ok(()) - match this.ka_timer.as_mut().as_pin_mut() { - None => { - // conditionally go into shutdown timeout - if this.flags.contains(Flags::SHUTDOWN) { - if let Some(deadline) = this.codec.config().client_disconnect_timer() { - // write client disconnect time out and poll again to - // go into Some> branch - this.ka_timer.set(Some(sleep_until(deadline))); - return self.poll_keepalive(cx); - } - } - } - Some(mut timer) => { - // only operate when keep-alive timer is resolved. - if timer.as_mut().poll(cx).is_ready() { - // got timeout during shutdown, drop connection - if this.flags.contains(Flags::SHUTDOWN) { - return Err(DispatchError::DisconnectTimeout); - // exceed deadline. check for any outstanding tasks - } else if timer.deadline() >= *this.ka_expire { - // have no task at hand. - if this.state.is_empty() && this.write_buf.is_empty() { - if this.flags.contains(Flags::STARTED) { - trace!("Keep-alive timeout, close connection"); - this.flags.insert(Flags::SHUTDOWN); + if let TimerState::Active { timer } = this.head_timer { + if timer.as_mut().poll(cx).is_ready() { + // timeout on first request (slow request) return 408 - // start shutdown timeout - if let Some(deadline) = - this.codec.config().client_disconnect_timer() - { - timer.as_mut().reset(deadline); - let _ = timer.poll(cx); - } else { - // no shutdown timeout, drop socket - this.flags.insert(Flags::WRITE_DISCONNECT); - } - } else { - // timeout on first request (slow request) return 408 - trace!("Slow request timeout"); - let _ = self.as_mut().send_error_response( - Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), - BoxBody::new(()), - ); - this = self.project(); - this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); - } - // still have unfinished task. try to reset and register keep-alive. - } else if let Some(deadline) = this.codec.config().keep_alive_expire() { - timer.as_mut().reset(deadline); - let _ = timer.poll(cx); - } - // timer resolved but still have not met the keep-alive expire deadline. - // reset and register for later wakeup. - } else { - timer.as_mut().reset(*this.ka_expire); - let _ = timer.poll(cx); - } - } + log::trace!( + "timed out on slow request; \ + replying with 408 and closing connection" + ); + + let _ = self.as_mut().send_error_response( + Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), + BoxBody::new(()), + ); + + self.project().flags.insert(Flags::SHUTDOWN); } - } + }; + Ok(()) } - /// Returns true when io stream can be disconnected after write to it. + fn poll_ka_timer( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result<(), DispatchError> { + let this = self.as_mut().project(); + if let TimerState::Active { timer } = this.ka_timer { + debug_assert!( + this.flags.contains(Flags::KEEP_ALIVE), + "keep-alive flag should be set when timer is active", + ); + debug_assert!( + this.state.is_none(), + "dispatcher should not be in keep-alive phase if state is not none: {:?}", + this.state, + ); + debug_assert!( + this.write_buf.is_empty(), + "dispatcher should not be in keep-alive phase if write_buf is not empty", + ); + + // keep-alive timer has timed out + if timer.as_mut().poll(cx).is_ready() { + // no tasks at hand + log::trace!("timer timed out; closing connection"); + this.flags.insert(Flags::SHUTDOWN); + + if let Some(deadline) = this.config.client_disconnect_deadline() { + // start shutdown timeout if enabled + this.shutdown_timer + .set_and_init(cx, sleep_until(deadline.into()), line!()); + } else { + // no shutdown timeout, drop socket + this.flags.insert(Flags::WRITE_DISCONNECT); + } + } + } + + Ok(()) + } + + fn poll_shutdown_timer( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result<(), DispatchError> { + let this = self.as_mut().project(); + if let TimerState::Active { timer } = this.shutdown_timer { + debug_assert!( + this.flags.contains(Flags::SHUTDOWN), + "shutdown flag should be set when timer is active", + ); + + // timed-out during shutdown; drop connection + if timer.as_mut().poll(cx).is_ready() { + log::trace!("timed-out during shutdown"); + return Err(DispatchError::DisconnectTimeout); + } + } + + Ok(()) + } + + /// Poll head, keep-alive, and disconnect timer. + fn poll_timers( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Result<(), DispatchError> { + self.as_mut().poll_head_timer(cx)?; + self.as_mut().poll_ka_timer(cx)?; + self.as_mut().poll_shutdown_timer(cx)?; + + Ok(()) + } + + /// Returns true when I/O stream can be disconnected after write to it. /// /// It covers these conditions: - /// - `std::io::ErrorKind::ConnectionReset` after partial read. + /// - `std::io::ErrorKind::ConnectionReset` after partial read; /// - all data read done. - #[inline(always)] + #[inline(always)] // TODO: bench this inline fn read_available( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -846,13 +960,12 @@ where // When read_buf is beyond max buffer size the early return could be successfully // be parsed as a new Request. This case would not generate ParseError::TooLarge and // at this point IO stream is not fully read to Pending and would result in - // dispatcher stuck until timeout (KA) + // dispatcher stuck until timeout (keep-alive). // // Note: // This is a perf choice to reduce branch on ::decode. // - // A Request head too large to parse is only checked on - // `httparse::Status::Partial` condition. + // A Request head too large to parse is only checked on `httparse::Status::Partial`. if this.payload.is_none() { // When dispatcher has a payload the responsibility of wake up it would be shift @@ -881,18 +994,29 @@ where match actix_codec::poll_read_buf(io.as_mut(), cx, this.read_buf) { Poll::Ready(Ok(n)) => { + this.flags.remove(Flags::FINISHED); + if n == 0 { return Ok(true); } + read_some = true; } - Poll::Pending => return Ok(false), + + Poll::Pending => { + return Ok(false); + } + Poll::Ready(Err(err)) => { return match err.kind() { + // convert WouldBlock error to the same as Pending return io::ErrorKind::WouldBlock => Ok(false), + + // connection reset after partial read io::ErrorKind::ConnectionReset if read_some => Ok(true), + _ => Err(DispatchError::Io(err)), - } + }; } } } @@ -940,27 +1064,60 @@ where } match this.inner.project() { - DispatcherStateProj::Normal { mut inner } => { - inner.as_mut().poll_keepalive(cx)?; + DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| { + log::error!("Upgrade handler error: {}", err); + DispatchError::Upgrade + }), - if inner.flags.contains(Flags::SHUTDOWN) { + DispatcherStateProj::Normal { mut inner } => { + log::trace!("start flags: {:?}", &inner.flags); + + trace_timer_states( + "start", + &inner.head_timer, + &inner.ka_timer, + &inner.shutdown_timer, + ); + + inner.as_mut().poll_timers(cx)?; + + let poll = if inner.flags.contains(Flags::SHUTDOWN) { if inner.flags.contains(Flags::WRITE_DISCONNECT) { Poll::Ready(Ok(())) } else { - // flush buffer and wait on blocked. + // flush buffer and wait on blocked ready!(inner.as_mut().poll_flush(cx))?; - Pin::new(inner.project().io.as_mut().unwrap()) + Pin::new(inner.as_mut().project().io.as_mut().unwrap()) .poll_shutdown(cx) .map_err(DispatchError::from) } } else { - // read from io stream and fill read buffer. + // read from I/O stream and fill read buffer let should_disconnect = inner.as_mut().read_available(cx)?; + // after reading something from stream, clear keep-alive timer + if !inner.read_buf.is_empty() && inner.flags.contains(Flags::KEEP_ALIVE) { + let inner = inner.as_mut().project(); + inner.flags.remove(Flags::KEEP_ALIVE); + inner.ka_timer.clear(line!()); + } + + if !inner.flags.contains(Flags::STARTED) { + inner.as_mut().project().flags.insert(Flags::STARTED); + + if let Some(deadline) = inner.config.client_request_deadline() { + inner.as_mut().project().head_timer.set_and_init( + cx, + sleep_until(deadline.into()), + line!(), + ); + } + } + inner.as_mut().poll_request(cx)?; - // io stream should to be closed. if should_disconnect { + // I/O stream should to be closed let inner = inner.as_mut().project(); inner.flags.insert(Flags::READ_DISCONNECT); if let Some(mut payload) = inner.payload.take() { @@ -969,11 +1126,27 @@ where }; loop { - // poll_response and populate write buffer. - // drain indicate if write buffer should be emptied before next run. + // poll response to populate write buffer + // drain indicates whether write buffer should be emptied before next run let drain = match inner.as_mut().poll_response(cx)? { PollResponse::DrainWriteBuf => true, - PollResponse::DoNothing => false, + + PollResponse::DoNothing => { + // KEEP_ALIVE is set in send_response_inner if client allows it + // FINISHED is set after writing last chunk of response + if inner.flags.contains(Flags::KEEP_ALIVE | Flags::FINISHED) { + if let Some(timer) = inner.config.keep_alive_deadline() { + inner.as_mut().project().ka_timer.set_and_init( + cx, + sleep_until(timer.into()), + line!(), + ); + } + } + + false + } + // upgrade request and goes Upgrade variant of DispatcherState. PollResponse::Upgrade(req) => { let upgrade = inner.upgrade(req); @@ -985,57 +1158,96 @@ where } }; - // we didn't get WouldBlock from write operation, - // so data get written to kernel completely (macOS) - // and we have to write again otherwise response can get stuck + // we didn't get WouldBlock from write operation, so data get written to + // kernel completely (macOS) and we have to write again otherwise response + // can get stuck // - // TODO: what? is WouldBlock good or bad? - // want to find a reference for this macOS behavior - if inner.as_mut().poll_flush(cx)?.is_pending() || !drain { + // TODO: want to find a reference for this behavior + // see introduced commit: 3872d3ba + let flush_was_ready = inner.as_mut().poll_flush(cx)?.is_ready(); + + // this assert seems to always be true but not willing to commit to it until + // we understand what Nikolay meant when writing the above comment + // debug_assert!(flush_was_ready); + + if !flush_was_ready || !drain { break; } } // client is gone if inner.flags.contains(Flags::WRITE_DISCONNECT) { + log::trace!("client is gone; disconnecting"); return Poll::Ready(Ok(())); } - let is_empty = inner.state.is_empty(); - let inner_p = inner.as_mut().project(); - // read half is closed and we do not processing any responses - if inner_p.flags.contains(Flags::READ_DISCONNECT) && is_empty { + let state_is_none = inner_p.state.is_none(); + + // read half is closed; we do not process any responses + if inner_p.flags.contains(Flags::READ_DISCONNECT) && state_is_none { + log::trace!("read half closed; start shutdown"); inner_p.flags.insert(Flags::SHUTDOWN); } // keep-alive and stream errors - if is_empty && inner_p.write_buf.is_empty() { + if state_is_none && inner_p.write_buf.is_empty() { if let Some(err) = inner_p.error.take() { - Poll::Ready(Err(err)) + log::error!("stream error: {}", &err); + return Poll::Ready(Err(err)); } + // disconnect if keep-alive is not enabled - else if inner_p.flags.contains(Flags::STARTED) - && !inner_p.flags.intersects(Flags::KEEPALIVE) + if inner_p.flags.contains(Flags::FINISHED) + && !inner_p.flags.contains(Flags::KEEP_ALIVE) { + inner_p.flags.remove(Flags::FINISHED); inner_p.flags.insert(Flags::SHUTDOWN); - self.poll(cx) + return self.poll(cx); } + // disconnect if shutdown - else if inner_p.flags.contains(Flags::SHUTDOWN) { - self.poll(cx) - } else { - Poll::Pending + if inner_p.flags.contains(Flags::SHUTDOWN) { + return self.poll(cx); } - } else { - Poll::Pending } - } + + trace_timer_states( + "end", + inner_p.head_timer, + inner_p.ka_timer, + inner_p.shutdown_timer, + ); + + Poll::Pending + }; + + log::trace!("end flags: {:?}", &inner.flags); + + poll } - DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| { - error!("Upgrade handler error: {}", err); - DispatchError::Upgrade - }), } } } + +#[allow(dead_code)] +fn trace_timer_states( + label: &str, + head_timer: &TimerState, + ka_timer: &TimerState, + shutdown_timer: &TimerState, +) { + log::trace!("{} timers:", label); + + if head_timer.is_enabled() { + log::trace!(" head {}", &head_timer); + } + + if ka_timer.is_enabled() { + log::trace!(" keep-alive {}", &ka_timer); + } + + if shutdown_timer.is_enabled() { + log::trace!(" shutdown {}", &shutdown_timer); + } +} diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 057ef1583..891cce69c 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -17,7 +17,7 @@ use crate::{ h1::{Codec, ExpectHandler, UpgradeHandler}, service::HttpFlow, test::{TestBuffer, TestSeqBuffer}, - Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, + Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, StatusCode, }; fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option { @@ -34,7 +34,13 @@ fn stabilize_date_header(payload: &mut [u8]) { } fn ok_service() -> impl Service, Error = Error> { - fn_service(|_req: Request| ready(Ok::<_, Error>(Response::ok()))) + status_service(StatusCode::OK) +} + +fn status_service( + status: StatusCode, +) -> impl Service, Error = Error> { + fn_service(move |_req: Request| ready(Ok::<_, Error>(Response::new(status)))) } fn echo_path_service( @@ -64,10 +70,83 @@ fn echo_payload_service() -> impl Service, E } #[actix_rt::test] -async fn test_basic() { +async fn late_request() { + let mut buf = TestBuffer::empty(); + + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::from_millis(100), + Duration::ZERO, + false, + None, + ); + let services = HttpFlow::new(ok_service(), ExpectHandler, None); + + let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( + buf.clone(), + services, + cfg, + None, + OnConnectData::default(), + ); + actix_rt::pin!(h1); + + lazy(|cx| { + assert!(matches!(&h1.inner, DispatcherState::Normal { .. })); + + match h1.as_mut().poll(cx) { + Poll::Ready(_) => panic!("first poll should not be ready"), + Poll::Pending => {} + } + + // polls: initial + assert_eq!(h1.poll_count, 1); + + buf.extend_read_buf("GET /abcd HTTP/1.1\r\nConnection: close\r\n\r\n"); + + match h1.as_mut().poll(cx) { + Poll::Pending => panic!("second poll should not be pending"), + Poll::Ready(res) => assert!(res.is_ok()), + } + + // polls: initial pending => handle req => shutdown + assert_eq!(h1.poll_count, 3); + + let mut res = buf.take_write_buf().to_vec(); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = b"\ + HTTP/1.1 200 OK\r\n\ + content-length: 0\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\r\n\ + "; + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(exp) + ); + }) + .await; +} + +#[actix_rt::test] +async fn oneshot_connection() { let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 100, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::from_millis(100), + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( @@ -116,10 +195,16 @@ async fn test_basic() { } #[actix_rt::test] -async fn test_keep_alive_timeout() { +async fn keep_alive_timeout() { let buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); - let cfg = ServiceConfig::new(KeepAlive::Timeout(1), 100, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Timeout(Duration::from_millis(200)), + Duration::from_millis(100), + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( @@ -166,7 +251,7 @@ async fn test_keep_alive_timeout() { .await; // sleep slightly longer than keep-alive timeout - sleep(Duration::from_millis(1100)).await; + sleep(Duration::from_millis(250)).await; lazy(|cx| { assert!( @@ -189,10 +274,16 @@ async fn test_keep_alive_timeout() { } #[actix_rt::test] -async fn test_keep_alive_follow_up_req() { +async fn keep_alive_follow_up_req() { let mut buf = TestBuffer::new("GET /abcd HTTP/1.1\r\n\r\n"); - let cfg = ServiceConfig::new(KeepAlive::Timeout(2), 100, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Timeout(Duration::from_millis(500)), + Duration::from_millis(100), + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); let h1 = Dispatcher::<_, _, _, _, UpgradeHandler>::new( @@ -239,7 +330,7 @@ async fn test_keep_alive_follow_up_req() { .await; // sleep for less than KA timeout - sleep(Duration::from_millis(200)).await; + sleep(Duration::from_millis(100)).await; lazy(|cx| { assert!( @@ -308,7 +399,7 @@ async fn test_keep_alive_follow_up_req() { } #[actix_rt::test] -async fn test_req_parse_err() { +async fn req_parse_err() { lazy(|cx| { let buf = TestBuffer::new("GET /test HTTP/1\r\n\r\n"); @@ -350,7 +441,13 @@ async fn pipelining_ok_then_ok() { ", ); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::from_millis(1), + Duration::from_millis(1), + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); @@ -414,7 +511,13 @@ async fn pipelining_ok_then_bad() { ", ); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 1, 1, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::from_millis(1), + Duration::from_millis(1), + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); @@ -468,10 +571,16 @@ async fn pipelining_ok_then_bad() { } #[actix_rt::test] -async fn test_expect() { +async fn expect_handling() { lazy(|cx| { let mut buf = TestSeqBuffer::empty(); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::ZERO, + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(echo_payload_service(), ExpectHandler, None); @@ -499,7 +608,6 @@ async fn test_expect() { // polls: manual assert_eq!(h1.poll_count, 1); - eprintln!("poll count: {}", h1.poll_count); if let DispatcherState::Normal { ref inner } = h1.inner { let io = inner.io.as_ref().unwrap(); @@ -540,10 +648,16 @@ async fn test_expect() { } #[actix_rt::test] -async fn test_eager_expect() { +async fn expect_eager() { lazy(|cx| { let mut buf = TestSeqBuffer::empty(); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::ZERO, + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(echo_path_service(), ExpectHandler, None); @@ -600,7 +714,7 @@ async fn test_eager_expect() { } #[actix_rt::test] -async fn test_upgrade() { +async fn upgrade_handling() { struct TestUpgrade; impl Service<(Request, Framed)> for TestUpgrade { @@ -620,7 +734,13 @@ async fn test_upgrade() { lazy(|cx| { let mut buf = TestSeqBuffer::empty(); - let cfg = ServiceConfig::new(KeepAlive::Disabled, 0, 0, false, None); + let cfg = ServiceConfig::new( + KeepAlive::Disabled, + Duration::ZERO, + Duration::ZERO, + false, + None, + ); let services = HttpFlow::new(ok_service(), ExpectHandler, Some(TestUpgrade)); diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 5fcb2f688..a24ba5911 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -212,7 +212,7 @@ pub(crate) trait MessageType: Sized { // optimized date header, set_date writes \r\n if !has_date { - config.set_date(dst, camel_case); + config.write_date_header(dst, camel_case); } else { // msg eof dst.extend_from_slice(b"\r\n"); @@ -318,16 +318,17 @@ impl MessageType for RequestHeadType { } impl MessageEncoder { - /// Encode message + /// Encode chunk. pub fn encode_chunk(&mut self, msg: &[u8], buf: &mut BytesMut) -> io::Result { self.te.encode(msg, buf) } - /// Encode eof + /// Encode EOF. pub fn encode_eof(&mut self, buf: &mut BytesMut) -> io::Result<()> { self.te.encode_eof(buf) } + /// Encode message. pub fn encode( &mut self, dst: &mut BytesMut, diff --git a/actix-http/src/h1/mod.rs b/actix-http/src/h1/mod.rs index 8c569165d..858cf542a 100644 --- a/actix-http/src/h1/mod.rs +++ b/actix-http/src/h1/mod.rs @@ -13,6 +13,7 @@ mod encoder; mod expect; mod payload; mod service; +mod timer; mod upgrade; mod utils; @@ -28,9 +29,10 @@ pub use self::utils::SendResponse; #[derive(Debug)] /// Codec message pub enum Message { - /// Http message + /// HTTP message. Item(T), - /// Payload chunk + + /// Payload chunk. Chunk(Option), } diff --git a/actix-http/src/h1/timer.rs b/actix-http/src/h1/timer.rs new file mode 100644 index 000000000..bb69fdb80 --- /dev/null +++ b/actix-http/src/h1/timer.rs @@ -0,0 +1,80 @@ +use std::{fmt, future::Future, pin::Pin, task::Context}; + +use actix_rt::time::{Instant, Sleep}; + +#[derive(Debug)] +pub(super) enum TimerState { + Disabled, + Inactive, + Active { timer: Pin> }, +} + +impl TimerState { + pub(super) fn new(enabled: bool) -> Self { + if enabled { + Self::Inactive + } else { + Self::Disabled + } + } + + pub(super) fn is_enabled(&self) -> bool { + matches!(self, Self::Active { .. } | Self::Inactive) + } + + pub(super) fn set(&mut self, timer: Sleep, line: u32) { + if matches!(self, Self::Disabled) { + log::trace!("setting disabled timer from line {}", line); + } + + *self = Self::Active { + timer: Box::pin(timer), + }; + } + + pub(super) fn set_and_init(&mut self, cx: &mut Context<'_>, timer: Sleep, line: u32) { + self.set(timer, line); + self.init(cx); + } + + pub(super) fn clear(&mut self, line: u32) { + if matches!(self, Self::Disabled) { + log::trace!("trying to clear a disabled timer from line {}", line); + } + + if matches!(self, Self::Inactive) { + log::trace!("trying to clear an inactive timer from line {}", line); + } + + *self = Self::Inactive; + } + + pub(super) fn init(&mut self, cx: &mut Context<'_>) { + if let TimerState::Active { timer } = self { + let _ = timer.as_mut().poll(cx); + } + } +} + +impl fmt::Display for TimerState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TimerState::Disabled => f.write_str("timer is disabled"), + TimerState::Inactive => f.write_str("timer is inactive"), + TimerState::Active { timer } => { + let deadline = timer.deadline(); + let now = Instant::now(); + + if deadline < now { + f.write_str("timer is active and has reached deadline") + } else { + write!( + f, + "timer is active and due to expire in {} milliseconds", + ((deadline - now).as_secs_f32() * 1000.0) + ) + } + } + } + } +} diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index a90eb3466..d528bec96 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -57,11 +57,11 @@ where conn_data: OnConnectData, timer: Option>>, ) -> Self { - let ping_pong = config.keep_alive().map(|dur| H2PingPong { + let ping_pong = config.keep_alive().duration().map(|dur| H2PingPong { timer: timer .map(|mut timer| { - // reset timer if it's received from new function. - timer.as_mut().reset(config.now() + dur); + // reuse timer slot if it was initialized for handshake + timer.as_mut().reset((config.now() + dur).into()); timer }) .unwrap_or_else(|| Box::pin(sleep(dur))), @@ -141,7 +141,7 @@ where DispatchError::SendResponse(err) => { trace!("Error sending HTTP/2 response: {:?}", err) } - DispatchError::SendData(err) => warn!("{:?}", err), + DispatchError::SendData(err) => log::warn!("{:?}", err), DispatchError::ResponseBody(err) => { error!("Response payload stream error: {:?}", err) } @@ -160,8 +160,8 @@ where Poll::Ready(_) => { ping_pong.on_flight = false; - let dead_line = this.config.keep_alive_expire().unwrap(); - ping_pong.timer.as_mut().reset(dead_line); + let dead_line = this.config.keep_alive_deadline().unwrap(); + ping_pong.timer.as_mut().reset(dead_line.into()); } Poll::Pending => { return ping_pong.timer.as_mut().poll(cx).map(|_| Ok(())) @@ -174,8 +174,8 @@ where ping_pong.ping_pong.send_ping(Ping::opaque())?; - let dead_line = this.config.keep_alive_expire().unwrap(); - ping_pong.timer.as_mut().reset(dead_line); + let dead_line = this.config.keep_alive_deadline().unwrap(); + ping_pong.timer.as_mut().reset(dead_line.into()); ping_pong.on_flight = true; } @@ -322,7 +322,7 @@ fn prepare_response( // set date header if !has_date { let mut bytes = BytesMut::with_capacity(29); - config.set_date_header(&mut bytes); + config.write_date_header_value(&mut bytes); res.headers_mut().insert( DATE, // SAFETY: serialized date-times are known ASCII strings diff --git a/actix-http/src/h2/mod.rs b/actix-http/src/h2/mod.rs index 47d51b420..c8aaaaa5f 100644 --- a/actix-http/src/h2/mod.rs +++ b/actix-http/src/h2/mod.rs @@ -7,7 +7,7 @@ use std::{ }; use actix_codec::{AsyncRead, AsyncWrite}; -use actix_rt::time::Sleep; +use actix_rt::time::{sleep_until, Sleep}; use bytes::Bytes; use futures_core::{ready, Stream}; use h2::{ @@ -15,17 +15,17 @@ use h2::{ RecvStream, }; +use crate::{ + config::ServiceConfig, + error::{DispatchError, PayloadError}, +}; + mod dispatcher; mod service; pub use self::dispatcher::Dispatcher; pub use self::service::H2Service; -use crate::{ - config::ServiceConfig, - error::{DispatchError, PayloadError}, -}; - /// HTTP/2 peer stream. pub struct Payload { stream: RecvStream, @@ -67,7 +67,9 @@ where { HandshakeWithTimeout { handshake: handshake(io), - timer: config.client_timer().map(Box::pin), + timer: config + .client_request_deadline() + .map(|deadline| Box::pin(sleep_until(deadline.into()))), } } @@ -86,7 +88,7 @@ where let this = self.get_mut(); match Pin::new(&mut this.handshake).poll(cx)? { - // return the timer on success handshake. It can be re-used for h2 ping-pong. + // return the timer on success handshake; its slot can be re-used for h2 ping-pong Poll::Ready(conn) => Poll::Ready(Ok((conn, this.timer.take()))), Poll::Pending => match this.timer.as_mut() { Some(timer) => { diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index 469648054..653982d37 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -355,7 +355,7 @@ where } Err(err) => { - trace!("H2 handshake error: {}", err); + log::trace!("H2 handshake error: {}", err); Poll::Ready(Err(err)) } }, diff --git a/actix-http/src/header/map.rs b/actix-http/src/header/map.rs index 33fb262c4..8f6d1cead 100644 --- a/actix-http/src/header/map.rs +++ b/actix-http/src/header/map.rs @@ -630,7 +630,7 @@ impl Removed { /// Returns true if iterator contains no elements, without consuming it. /// /// If called immediately after [`HeaderMap::insert`] or [`HeaderMap::remove`], it will indicate - /// wether any items were actually replaced or removed, respectively. + /// whether any items were actually replaced or removed, respectively. pub fn is_empty(&self) -> bool { match self.inner { // size hint lower bound of smallvec is the correct length diff --git a/actix-http/src/header/shared/http_date.rs b/actix-http/src/header/shared/http_date.rs index 473d6cad0..21ed49f0c 100644 --- a/actix-http/src/header/shared/http_date.rs +++ b/actix-http/src/header/shared/http_date.rs @@ -4,8 +4,7 @@ use bytes::BytesMut; use http::header::{HeaderValue, InvalidHeaderValue}; use crate::{ - config::DATE_VALUE_LENGTH, error::ParseError, header::TryIntoHeaderValue, - helpers::MutWriter, + date::DATE_VALUE_LENGTH, error::ParseError, header::TryIntoHeaderValue, helpers::MutWriter, }; /// A timestamp with HTTP-style formatting and parsing. diff --git a/actix-http/src/http_message.rs b/actix-http/src/http_message.rs index 068e23b96..198254e02 100644 --- a/actix-http/src/http_message.rs +++ b/actix-http/src/http_message.rs @@ -55,7 +55,7 @@ pub trait HttpMessage: Sized { "" } - /// Get content type encoding + /// Get content type encoding. /// /// UTF-8 is used by default, If request charset is not set. fn encoding(&self) -> Result<&'static Encoding, ContentTypeError> { diff --git a/actix-http/src/keep_alive.rs b/actix-http/src/keep_alive.rs new file mode 100644 index 000000000..feb7ff5df --- /dev/null +++ b/actix-http/src/keep_alive.rs @@ -0,0 +1,84 @@ +use std::time::Duration; + +/// Connection keep-alive config. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum KeepAlive { + /// Keep-alive duration. + /// + /// `KeepAlive::Timeout(Duration::ZERO)` is mapped to `KeepAlive::Disabled`. + Timeout(Duration), + + /// Rely on OS to shutdown TCP connection. + /// + /// Some defaults can be very long, check your OS documentation. + Os, + + /// Keep-alive is disabled. + /// + /// Connections will be closed immediately. + Disabled, +} + +impl KeepAlive { + pub(crate) fn enabled(&self) -> bool { + !matches!(self, Self::Disabled) + } + + #[allow(unused)] // used with `http2` feature flag + pub(crate) fn duration(&self) -> Option { + match self { + KeepAlive::Timeout(dur) => Some(*dur), + _ => None, + } + } + + /// Map zero duration to disabled. + pub(crate) fn normalize(self) -> KeepAlive { + match self { + KeepAlive::Timeout(Duration::ZERO) => KeepAlive::Disabled, + ka => ka, + } + } +} + +impl Default for KeepAlive { + fn default() -> Self { + Self::Timeout(Duration::from_secs(5)) + } +} + +impl From for KeepAlive { + fn from(dur: Duration) -> Self { + KeepAlive::Timeout(dur).normalize() + } +} + +impl From> for KeepAlive { + fn from(ka_dur: Option) -> Self { + match ka_dur { + Some(dur) => KeepAlive::from(dur), + None => KeepAlive::Disabled, + } + .normalize() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn from_impls() { + let test: KeepAlive = Duration::from_secs(1).into(); + assert_eq!(test, KeepAlive::Timeout(Duration::from_secs(1))); + + let test: KeepAlive = Duration::from_secs(0).into(); + assert_eq!(test, KeepAlive::Disabled); + + let test: KeepAlive = Some(Duration::from_secs(0)).into(); + assert_eq!(test, KeepAlive::Disabled); + + let test: KeepAlive = None.into(); + assert_eq!(test, KeepAlive::Disabled); + } +} diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index f2b415790..dbff89612 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -24,38 +24,42 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] -#[macro_use] -extern crate log; - pub use ::http::{uri, uri::Uri}; pub use ::http::{Method, StatusCode, Version}; pub mod body; mod builder; mod config; +mod date; #[cfg(feature = "__compress")] pub mod encoding; pub mod error; mod extensions; pub mod h1; +#[cfg(feature = "http2")] pub mod h2; pub mod header; mod helpers; mod http_message; +mod keep_alive; mod message; +#[cfg(test)] +mod notify_on_drop; mod payload; mod requests; mod responses; mod service; pub mod test; +#[cfg(feature = "ws")] pub mod ws; pub use self::builder::HttpServiceBuilder; -pub use self::config::{KeepAlive, ServiceConfig}; +pub use self::config::ServiceConfig; pub use self::error::Error; pub use self::extensions::Extensions; pub use self::header::ContentEncoding; pub use self::http_message::HttpMessage; +pub use self::keep_alive::KeepAlive; pub use self::message::ConnectionType; pub use self::message::Message; #[allow(deprecated)] diff --git a/actix-http/src/notify_on_drop.rs b/actix-http/src/notify_on_drop.rs new file mode 100644 index 000000000..98544bb5d --- /dev/null +++ b/actix-http/src/notify_on_drop.rs @@ -0,0 +1,49 @@ +/// Test Module for checking the drop state of certain async tasks that are spawned +/// with `actix_rt::spawn` +/// +/// The target task must explicitly generate `NotifyOnDrop` when spawn the task +use std::cell::RefCell; + +thread_local! { + static NOTIFY_DROPPED: RefCell> = RefCell::new(None); +} + +/// Check if the spawned task is dropped. +/// +/// # Panics +/// Panics when there was no `NotifyOnDrop` instance on current thread. +pub(crate) fn is_dropped() -> bool { + NOTIFY_DROPPED.with(|bool| { + bool.borrow() + .expect("No NotifyOnDrop existed on current thread") + }) +} + +pub(crate) struct NotifyOnDrop; + +impl NotifyOnDrop { + /// # Panics + /// Panics hen construct multiple instances on any given thread. + pub(crate) fn new() -> Self { + NOTIFY_DROPPED.with(|bool| { + let mut bool = bool.borrow_mut(); + if bool.is_some() { + panic!("NotifyOnDrop existed on current thread"); + } else { + *bool = Some(false); + } + }); + + NotifyOnDrop + } +} + +impl Drop for NotifyOnDrop { + fn drop(&mut self) { + NOTIFY_DROPPED.with(|bool| { + if let Some(b) = bool.borrow_mut().as_mut() { + *b = true; + } + }); + } +} diff --git a/actix-http/src/payload.rs b/actix-http/src/payload.rs index aed24e963..33d9ec6f5 100644 --- a/actix-http/src/payload.rs +++ b/actix-http/src/payload.rs @@ -16,6 +16,18 @@ pub type BoxedPayloadStream = Pin { + None, + H1 { payload: crate::h1::Payload }, + Stream { #[pin] payload: S }, + } +} + +#[cfg(feature = "http2")] pin_project! { /// A streaming payload. #[project = PayloadProj] @@ -33,14 +45,16 @@ impl From for Payload { } } +#[cfg(feature = "http2")] impl From for Payload { fn from(payload: crate::h2::Payload) -> Self { Payload::H2 { payload } } } -impl From for Payload { - fn from(stream: h2::RecvStream) -> Self { +#[cfg(feature = "http2")] +impl From<::h2::RecvStream> for Payload { + fn from(stream: ::h2::RecvStream) -> Self { Payload::H2 { payload: crate::h2::Payload::new(stream), } @@ -71,7 +85,10 @@ where match self.project() { PayloadProj::None => Poll::Ready(None), PayloadProj::H1 { payload } => Pin::new(payload).poll_next(cx), + + #[cfg(feature = "http2")] PayloadProj::H2 { payload } => Pin::new(payload).poll_next(cx), + PayloadProj::Stream { payload } => payload.poll_next(cx), } } diff --git a/actix-http/src/requests/head.rs b/actix-http/src/requests/head.rs index 06fd0429e..4558801f3 100644 --- a/actix-http/src/requests/head.rs +++ b/actix-http/src/requests/head.rs @@ -130,8 +130,8 @@ impl RequestHead { } } + /// Request contains `EXPECT` header. #[inline] - /// Request contains `EXPECT` header pub fn expect(&self) -> bool { self.flags.contains(Flags::EXPECT) } diff --git a/actix-http/src/responses/head.rs b/actix-http/src/responses/head.rs index 870073ab3..cb47c4b7a 100644 --- a/actix-http/src/responses/head.rs +++ b/actix-http/src/responses/head.rs @@ -42,7 +42,7 @@ impl ResponseHead { &mut self.headers } - /// Sets the flag that controls wether to send headers formatted as Camel-Case. + /// Sets the flag that controls whether to send headers formatted as Camel-Case. /// /// Only applicable to HTTP/1.x responses; HTTP/2 header names are always lowercase. #[inline] @@ -210,14 +210,15 @@ mod tests { use memchr::memmem; use crate::{ + h1::H1Service, header::{HeaderName, HeaderValue}, - Error, HttpService, Request, Response, + Error, Request, Response, ServiceConfig, }; #[actix_rt::test] async fn camel_case_headers() { let mut srv = actix_http_test::test_server(|| { - HttpService::new(|req: Request| async move { + H1Service::with_config(ServiceConfig::default(), |req: Request| async move { let mut res = Response::ok(); if req.path().contains("camel") { @@ -228,6 +229,7 @@ mod tests { HeaderName::from_static("foo-bar"), HeaderValue::from_static("baz"), ); + Ok::<_, Error>(res) }) .tcp() @@ -235,9 +237,11 @@ mod tests { .await; let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); - let _ = stream.write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n"); - let mut data = vec![0; 1024]; - let _ = stream.read(&mut data); + let _ = stream + .write_all(b"GET /camel HTTP/1.1\r\nConnection: Close\r\n\r\n") + .unwrap(); + let mut data = vec![]; + let _ = stream.read_to_end(&mut data).unwrap(); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); assert!(memmem::find(&data, b"Foo-Bar").is_some()); assert!(memmem::find(&data, b"foo-bar").is_none()); @@ -247,9 +251,11 @@ mod tests { assert!(memmem::find(&data, b"content-length").is_none()); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); - let _ = stream.write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n"); - let mut data = vec![0; 1024]; - let _ = stream.read(&mut data); + let _ = stream + .write_all(b"GET /lower HTTP/1.1\r\nConnection: Close\r\n\r\n") + .unwrap(); + let mut data = vec![]; + let _ = stream.read_to_end(&mut data).unwrap(); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); assert!(memmem::find(&data, b"Foo-Bar").is_none()); assert!(memmem::find(&data, b"foo-bar").is_some()); diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index cd2efe678..b220e55a4 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -19,9 +19,8 @@ use pin_project_lite::pin_project; use crate::{ body::{BoxBody, MessageBody}, builder::HttpServiceBuilder, - config::{KeepAlive, ServiceConfig}, error::DispatchError, - h1, h2, ConnectCallback, OnConnectData, Protocol, Request, Response, + h1, ConnectCallback, OnConnectData, Protocol, Request, Response, ServiceConfig, }; /// A `ServiceFactory` for HTTP/1.1 or HTTP/2 protocol. @@ -43,9 +42,9 @@ where >::Future: 'static, B: MessageBody + 'static, { - /// Create builder for `HttpService` instance. + /// Constructs builder for `HttpService` instance. pub fn build() -> HttpServiceBuilder { - HttpServiceBuilder::new() + HttpServiceBuilder::default() } } @@ -58,12 +57,10 @@ where >::Future: 'static, B: MessageBody + 'static, { - /// Create new `HttpService` instance. + /// Constructs new `HttpService` instance from service with default config. pub fn new>(service: F) -> Self { - let cfg = ServiceConfig::new(KeepAlive::Timeout(5), 5000, 0, false, None); - HttpService { - cfg, + cfg: ServiceConfig::default(), srv: service.into_factory(), expect: h1::ExpectHandler, upgrade: None, @@ -72,7 +69,7 @@ where } } - /// Create new `HttpService` instance with config. + /// Constructs new `HttpService` instance from config and service. pub(crate) fn with_config>( cfg: ServiceConfig, service: F, @@ -97,11 +94,10 @@ where >::Future: 'static, B: MessageBody, { - /// Provide service for `EXPECT: 100-Continue` support. + /// Sets service for `Expect: 100-Continue` handling. /// - /// Service get called with request that contains `EXPECT` header. - /// Service must return request in case of success, in that case - /// request will be forwarded to main service. + /// An expect service is called with requests that contain an `Expect` header. A successful + /// response type is also a request which will be forwarded to the main service. pub fn expect(self, expect: X1) -> HttpService where X1: ServiceFactory, @@ -118,10 +114,10 @@ where } } - /// Provide service for custom `Connection: UPGRADE` support. + /// Sets service for custom `Connection: Upgrade` handling. /// - /// If service is provided then normal requests handling get halted - /// and this service get called with original request and framed object. + /// If service is provided then normal requests handling get halted and this service get called + /// with original request and framed object. pub fn upgrade(self, upgrade: Option) -> HttpService where U1: ServiceFactory<(Request, Framed), Config = (), Response = ()>, @@ -506,10 +502,11 @@ where let conn_data = OnConnectData::from_io(&io, self.on_connect_ext.as_deref()); match proto { + #[cfg(feature = "http2")] Protocol::Http2 => HttpServiceHandlerResponse { state: State::H2Handshake { handshake: Some(( - h2::handshake_with_timeout(io, &self.cfg), + crate::h2::handshake_with_timeout(io, &self.cfg), self.cfg.clone(), self.flow.clone(), conn_data, @@ -518,6 +515,11 @@ where }, }, + #[cfg(not(feature = "http2"))] + Protocol::Http2 => { + panic!("HTTP/2 support is disabled (enable with the `http2` feature flag)") + } + Protocol::Http1 => HttpServiceHandlerResponse { state: State::H1 { dispatcher: h1::Dispatcher::new( @@ -535,6 +537,7 @@ where } } +#[cfg(not(feature = "http2"))] pin_project! { #[project = StateProj] enum State @@ -556,10 +559,37 @@ pin_project! { U::Error: fmt::Display, { H1 { #[pin] dispatcher: h1::Dispatcher }, - H2 { #[pin] dispatcher: h2::Dispatcher }, + } +} + +#[cfg(feature = "http2")] +pin_project! { + #[project = StateProj] + enum State + where + T: AsyncRead, + T: AsyncWrite, + T: Unpin, + + S: Service, + S::Future: 'static, + S::Error: Into>, + + B: MessageBody, + + X: Service, + X::Error: Into>, + + U: Service<(Request, Framed), Response = ()>, + U::Error: fmt::Display, + { + H1 { #[pin] dispatcher: h1::Dispatcher }, + + H2 { #[pin] dispatcher: crate::h2::Dispatcher }, + H2Handshake { handshake: Option<( - h2::HandshakeWithTimeout, + crate::h2::HandshakeWithTimeout, ServiceConfig, Rc>, OnConnectData, @@ -618,21 +648,25 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.as_mut().project().state.project() { StateProj::H1 { dispatcher } => dispatcher.poll(cx), + + #[cfg(feature = "http2")] StateProj::H2 { dispatcher } => dispatcher.poll(cx), + + #[cfg(feature = "http2")] StateProj::H2Handshake { handshake: data } => { match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) { Ok((conn, timer)) => { let (_, config, flow, conn_data, peer_addr) = data.take().unwrap(); self.as_mut().project().state.set(State::H2 { - dispatcher: h2::Dispatcher::new( + dispatcher: crate::h2::Dispatcher::new( conn, flow, config, peer_addr, conn_data, timer, ), }); self.poll(cx) } Err(err) => { - trace!("H2 handshake error: {}", err); + log::trace!("H2 handshake error: {}", err); Poll::Ready(Err(err)) } } diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index 0d4d342ec..6212c19d1 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -242,7 +242,7 @@ impl io::Read for TestBuffer { impl io::Write for TestBuffer { fn write(&mut self, buf: &[u8]) -> io::Result { - RefCell::borrow_mut(&self.write_buf).extend(buf); + self.write_buf.borrow_mut().extend(buf); Ok(buf.len()) } diff --git a/actix-http/src/ws/codec.rs b/actix-http/src/ws/codec.rs index f5b755eec..6e7aa7c11 100644 --- a/actix-http/src/ws/codec.rs +++ b/actix-http/src/ws/codec.rs @@ -3,9 +3,11 @@ use bitflags::bitflags; use bytes::{Bytes, BytesMut}; use bytestring::ByteString; -use super::frame::Parser; -use super::proto::{CloseReason, OpCode}; -use super::ProtocolError; +use super::{ + frame::Parser, + proto::{CloseReason, OpCode}, + ProtocolError, +}; /// A WebSocket message. #[derive(Debug, PartialEq)] @@ -251,7 +253,7 @@ impl Decoder for Codec { } } _ => { - error!("Unfinished fragment {:?}", opcode); + log::error!("Unfinished fragment {:?}", opcode); Err(ProtocolError::ContinuationFragment(opcode)) } }; diff --git a/actix-http/src/ws/dispatcher.rs b/actix-http/src/ws/dispatcher.rs index f12ae1b1a..4c7470d37 100644 --- a/actix-http/src/ws/dispatcher.rs +++ b/actix-http/src/ws/dispatcher.rs @@ -1,6 +1,8 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_service::{IntoService, Service}; diff --git a/actix-http/src/ws/frame.rs b/actix-http/src/ws/frame.rs index b58ef7362..78cef1046 100644 --- a/actix-http/src/ws/frame.rs +++ b/actix-http/src/ws/frame.rs @@ -3,9 +3,11 @@ use std::convert::TryFrom; use bytes::{Buf, BufMut, BytesMut}; use log::debug; -use crate::ws::mask::apply_mask; -use crate::ws::proto::{CloseCode, CloseReason, OpCode}; -use crate::ws::ProtocolError; +use super::{ + mask::apply_mask, + proto::{CloseCode, CloseReason, OpCode}, + ProtocolError, +}; /// A struct representing a WebSocket frame. #[derive(Debug)] diff --git a/actix-http/tests/test_client.rs b/actix-http/tests/test_client.rs index a3adcdfd6..5888527f1 100644 --- a/actix-http/tests/test_client.rs +++ b/actix-http/tests/test_client.rs @@ -31,7 +31,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World"; #[actix_rt::test] -async fn test_h1_v2() { +async fn h1_v2() { let srv = test_server(move || { HttpService::build() .finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -59,7 +59,7 @@ async fn test_h1_v2() { } #[actix_rt::test] -async fn test_connection_close() { +async fn connection_close() { let srv = test_server(move || { HttpService::build() .finish(|_| future::ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -73,7 +73,7 @@ async fn test_connection_close() { } #[actix_rt::test] -async fn test_with_query_parameter() { +async fn with_query_parameter() { let srv = test_server(move || { HttpService::build() .finish(|req: Request| async move { @@ -104,7 +104,7 @@ impl From for Response { } #[actix_rt::test] -async fn test_h1_expect() { +async fn h1_expect() { let srv = test_server(move || { HttpService::build() .expect(|req: Request| async { diff --git a/actix-http/tests/test_h2_timer.rs b/actix-http/tests/test_h2_timer.rs index 2b9c26e4a..2e1480297 100644 --- a/actix-http/tests/test_h2_timer.rs +++ b/actix-http/tests/test_h2_timer.rs @@ -1,4 +1,4 @@ -use std::io; +use std::{io, time::Duration}; use actix_http::{error::Error, HttpService, Response}; use actix_server::Server; @@ -19,7 +19,7 @@ async fn h2_ping_pong() -> io::Result<()> { .workers(1) .listen("h2_ping_pong", lst, || { HttpService::build() - .keep_alive(3) + .keep_alive(Duration::from_secs(3)) .h2(|_| async { Ok::<_, Error>(Response::ok()) }) .tcp() })? @@ -92,10 +92,10 @@ async fn h2_handshake_timeout() -> io::Result<()> { .workers(1) .listen("h2_ping_pong", lst, || { HttpService::build() - .keep_alive(30) + .keep_alive(Duration::from_secs(30)) // set first request timeout to 5 seconds. // this is the timeout used for http2 handshake. - .client_timeout(5000) + .client_request_timeout(Duration::from_secs(5)) .h2(|_| async { Ok::<_, Error>(Response::ok()) }) .tcp() })? diff --git a/actix-http/tests/test_server.rs b/actix-http/tests/test_server.rs index 1bb574fd6..1b5de3425 100644 --- a/actix-http/tests/test_server.rs +++ b/actix-http/tests/test_server.rs @@ -2,7 +2,7 @@ use std::{ convert::Infallible, io::{Read, Write}, net, thread, - time::Duration, + time::{Duration, Instant}, }; use actix_http::{ @@ -22,12 +22,12 @@ use futures_util::{ use regex::Regex; #[actix_rt::test] -async fn test_h1() { +async fn h1_basic() { let mut srv = test_server(|| { HttpService::build() .keep_alive(KeepAlive::Disabled) - .client_timeout(1000) - .client_disconnect(1000) + .client_request_timeout(Duration::from_secs(1)) + .client_disconnect_timeout(Duration::from_secs(1)) .h1(|req: Request| { assert!(req.peer_addr().is_some()); ok::<_, Infallible>(Response::ok()) @@ -43,12 +43,12 @@ async fn test_h1() { } #[actix_rt::test] -async fn test_h1_2() { +async fn h1_2() { let mut srv = test_server(|| { HttpService::build() .keep_alive(KeepAlive::Disabled) - .client_timeout(1000) - .client_disconnect(1000) + .client_request_timeout(Duration::from_secs(1)) + .client_disconnect_timeout(Duration::from_secs(1)) .finish(|req: Request| { assert!(req.peer_addr().is_some()); assert_eq!(req.version(), http::Version::HTTP_11); @@ -75,7 +75,7 @@ impl From for Response { } #[actix_rt::test] -async fn test_expect_continue() { +async fn expect_continue() { let mut srv = test_server(|| { HttpService::build() .expect(fn_service(|req: Request| { @@ -106,7 +106,7 @@ async fn test_expect_continue() { } #[actix_rt::test] -async fn test_expect_continue_h1() { +async fn expect_continue_h1() { let mut srv = test_server(|| { HttpService::build() .expect(fn_service(|req: Request| { @@ -139,7 +139,7 @@ async fn test_expect_continue_h1() { } #[actix_rt::test] -async fn test_chunked_payload() { +async fn chunked_payload() { let chunk_sizes = vec![32768, 32, 32768]; let total_size: usize = chunk_sizes.iter().sum(); @@ -197,26 +197,43 @@ async fn test_chunked_payload() { } #[actix_rt::test] -async fn test_slow_request() { +async fn slow_request_408() { let mut srv = test_server(|| { HttpService::build() - .client_timeout(100) + .client_request_timeout(Duration::from_millis(200)) + .keep_alive(Duration::from_secs(2)) .finish(|_| ok::<_, Infallible>(Response::ok())) .tcp() }) .await; + let start = Instant::now(); + let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); - let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n"); + let _ = stream.write_all(b"GET /test HTTP/1.1\r\n"); let mut data = String::new(); let _ = stream.read_to_string(&mut data); - assert!(data.starts_with("HTTP/1.1 408 Request Timeout")); + assert!( + data.starts_with("HTTP/1.1 408 Request Timeout"), + "response was not 408: {}", + data + ); + + let diff = start.elapsed(); + + if diff < Duration::from_secs(1) { + // test success + } else if diff < Duration::from_secs(3) { + panic!("request seems to have wrongly timed-out according to keep-alive"); + } else { + panic!("request took way too long to time out"); + } srv.stop().await; } #[actix_rt::test] -async fn test_http1_malformed_request() { +async fn http1_malformed_request() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -234,7 +251,7 @@ async fn test_http1_malformed_request() { } #[actix_rt::test] -async fn test_http1_keepalive() { +async fn http1_keepalive() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -257,23 +274,25 @@ async fn test_http1_keepalive() { } #[actix_rt::test] -async fn test_http1_keepalive_timeout() { +async fn http1_keepalive_timeout() { let mut srv = test_server(|| { HttpService::build() - .keep_alive(1) + .keep_alive(Duration::from_secs(1)) .h1(|_| ok::<_, Infallible>(Response::ok())) .tcp() }) .await; let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); - let _ = stream.write_all(b"GET /test/tests/test HTTP/1.1\r\n\r\n"); - let mut data = vec![0; 1024]; + + let _ = stream.write_all(b"GET /test HTTP/1.1\r\n\r\n"); + let mut data = vec![0; 256]; let _ = stream.read(&mut data); assert_eq!(&data[..17], b"HTTP/1.1 200 OK\r\n"); + thread::sleep(Duration::from_millis(1100)); - let mut data = vec![0; 1024]; + let mut data = vec![0; 256]; let res = stream.read(&mut data).unwrap(); assert_eq!(res, 0); @@ -281,7 +300,7 @@ async fn test_http1_keepalive_timeout() { } #[actix_rt::test] -async fn test_http1_keepalive_close() { +async fn http1_keepalive_close() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -303,7 +322,7 @@ async fn test_http1_keepalive_close() { } #[actix_rt::test] -async fn test_http10_keepalive_default_close() { +async fn http10_keepalive_default_close() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -325,7 +344,7 @@ async fn test_http10_keepalive_default_close() { } #[actix_rt::test] -async fn test_http10_keepalive() { +async fn http10_keepalive() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok())) @@ -354,7 +373,7 @@ async fn test_http10_keepalive() { } #[actix_rt::test] -async fn test_http1_keepalive_disabled() { +async fn http1_keepalive_disabled() { let mut srv = test_server(|| { HttpService::build() .keep_alive(KeepAlive::Disabled) @@ -377,7 +396,7 @@ async fn test_http1_keepalive_disabled() { } #[actix_rt::test] -async fn test_content_length() { +async fn content_length() { use actix_http::{ header::{HeaderName, HeaderValue}, StatusCode, @@ -426,7 +445,7 @@ async fn test_content_length() { } #[actix_rt::test] -async fn test_h1_headers() { +async fn h1_headers() { let data = STR.repeat(10); let data2 = data.clone(); @@ -492,7 +511,7 @@ const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World"; #[actix_rt::test] -async fn test_h1_body() { +async fn h1_body() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -511,7 +530,7 @@ async fn test_h1_body() { } #[actix_rt::test] -async fn test_h1_head_empty() { +async fn h1_head_empty() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -538,7 +557,7 @@ async fn test_h1_head_empty() { } #[actix_rt::test] -async fn test_h1_head_binary() { +async fn h1_head_binary() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -565,7 +584,7 @@ async fn test_h1_head_binary() { } #[actix_rt::test] -async fn test_h1_head_binary2() { +async fn h1_head_binary2() { let mut srv = test_server(|| { HttpService::build() .h1(|_| ok::<_, Infallible>(Response::ok().set_body(STR))) @@ -588,7 +607,7 @@ async fn test_h1_head_binary2() { } #[actix_rt::test] -async fn test_h1_body_length() { +async fn h1_body_length() { let mut srv = test_server(|| { HttpService::build() .h1(|_| { @@ -612,7 +631,7 @@ async fn test_h1_body_length() { } #[actix_rt::test] -async fn test_h1_body_chunked_explicit() { +async fn h1_body_chunked_explicit() { let mut srv = test_server(|| { HttpService::build() .h1(|_| { @@ -649,7 +668,7 @@ async fn test_h1_body_chunked_explicit() { } #[actix_rt::test] -async fn test_h1_body_chunked_implicit() { +async fn h1_body_chunked_implicit() { let mut srv = test_server(|| { HttpService::build() .h1(|_| { @@ -680,7 +699,7 @@ async fn test_h1_body_chunked_implicit() { } #[actix_rt::test] -async fn test_h1_response_http_error_handling() { +async fn h1_response_http_error_handling() { let mut srv = test_server(|| { HttpService::build() .h1(fn_service(|_| { @@ -719,7 +738,7 @@ impl From for Response { } #[actix_rt::test] -async fn test_h1_service_error() { +async fn h1_service_error() { let mut srv = test_server(|| { HttpService::build() .h1(|_| err::, _>(BadRequest)) @@ -738,7 +757,7 @@ async fn test_h1_service_error() { } #[actix_rt::test] -async fn test_h1_on_connect() { +async fn h1_on_connect() { let mut srv = test_server(|| { HttpService::build() .on_connect_ext(|_, data| { @@ -761,7 +780,7 @@ async fn test_h1_on_connect() { /// Tests compliance with 304 Not Modified spec in RFC 7232 ยง4.1. /// https://datatracker.ietf.org/doc/html/rfc7232#section-4.1 #[actix_rt::test] -async fn test_not_modified_spec_h1() { +async fn not_modified_spec_h1() { // TODO: this test needing a few seconds to complete reveals some weirdness with either the // dispatcher or the client, though similar hangs occur on other tests in this file, only // succeeding, it seems, because of the keepalive timer diff --git a/actix-http/tests/test_ws.rs b/actix-http/tests/test_ws.rs index ed8c61fd6..8b3ab8e1b 100644 --- a/actix-http/tests/test_ws.rs +++ b/actix-http/tests/test_ws.rs @@ -109,7 +109,7 @@ async fn service(msg: Frame) -> Result { } #[actix_rt::test] -async fn test_simple() { +async fn simple() { let mut srv = test_server(|| { HttpService::build() .upgrade(fn_factory(|| async { diff --git a/actix-router/CHANGES.md b/actix-router/CHANGES.md index 41104db98..1e6c72c14 100644 --- a/actix-router/CHANGES.md +++ b/actix-router/CHANGES.md @@ -6,8 +6,10 @@ - Change signature of `ResourceDef::capture_match_info_fn` to remove `user_data` parameter. [#2612] - Replace `Option` with `U` in `Router` api. [#2612] - Relax bounds in `Router::recognize*` and `ResourceDef::capture_match_info`. [#2612] +- `Quoter::requote` now returns `Option>`. [#2613] [#2612]: https://github.com/actix/actix-web/pull/2612 +[#2613]: https://github.com/actix/actix-web/pull/2613 ## 0.5.0-rc.2 - 2022-01-21 diff --git a/actix-router/src/de.rs b/actix-router/src/de.rs index 27aa49ef2..efafd08db 100644 --- a/actix-router/src/de.rs +++ b/actix-router/src/de.rs @@ -52,7 +52,7 @@ macro_rules! parse_value { V: Visitor<'de>, { let decoded = FULL_QUOTER - .with(|q| q.requote(self.value.as_bytes())) + .with(|q| q.requote_str_lossy(self.value)) .map(Cow::Owned) .unwrap_or(Cow::Borrowed(self.value)); @@ -332,7 +332,7 @@ impl<'de> Deserializer<'de> for Value<'de> { where V: Visitor<'de>, { - match FULL_QUOTER.with(|q| q.requote(self.value.as_bytes())) { + match FULL_QUOTER.with(|q| q.requote_str_lossy(self.value)) { Some(s) => visitor.visit_string(s), None => visitor.visit_borrowed_str(self.value), } @@ -342,7 +342,7 @@ impl<'de> Deserializer<'de> for Value<'de> { where V: Visitor<'de>, { - match FULL_QUOTER.with(|q| q.requote(self.value.as_bytes())) { + match FULL_QUOTER.with(|q| q.requote_str_lossy(self.value)) { Some(s) => visitor.visit_byte_buf(s.into()), None => visitor.visit_borrowed_bytes(self.value.as_bytes()), } diff --git a/actix-router/src/quoter.rs b/actix-router/src/quoter.rs index 26ecc92cd..73b1e72dd 100644 --- a/actix-router/src/quoter.rs +++ b/actix-router/src/quoter.rs @@ -66,8 +66,13 @@ impl Quoter { /// Re-quotes... ? /// - /// Returns `None` when no modification to the original string was required. - pub fn requote(&self, val: &[u8]) -> Option { + /// Returns `None` when no modification to the original byte string was required. + /// + /// Non-ASCII bytes are accepted as valid input. + /// + /// Behavior for invalid/incomplete percent-encoding sequences is unspecified and may include removing + /// the invalid sequence from the output or passing it as it is. + pub fn requote(&self, val: &[u8]) -> Option> { let mut has_pct = 0; let mut pct = [b'%', 0, 0]; let mut idx = 0; @@ -121,7 +126,12 @@ impl Quoter { idx += 1; } - cloned.map(|data| String::from_utf8_lossy(&data).into_owned()) + cloned + } + + pub(crate) fn requote_str_lossy(&self, val: &str) -> Option { + self.requote(val.as_bytes()) + .map(|data| String::from_utf8_lossy(&data).into_owned()) } } @@ -201,14 +211,29 @@ mod tests { #[test] fn custom_quoter() { let q = Quoter::new(b"", b"+"); - assert_eq!(q.requote(b"/a%25c").unwrap(), "/a%c"); - assert_eq!(q.requote(b"/a%2Bc").unwrap(), "/a%2Bc"); + assert_eq!(q.requote(b"/a%25c").unwrap(), b"/a%c"); + assert_eq!(q.requote(b"/a%2Bc").unwrap(), b"/a%2Bc"); let q = Quoter::new(b"%+", b"/"); - assert_eq!(q.requote(b"/a%25b%2Bc").unwrap(), "/a%b+c"); - assert_eq!(q.requote(b"/a%2fb").unwrap(), "/a%2fb"); - assert_eq!(q.requote(b"/a%2Fb").unwrap(), "/a%2Fb"); - assert_eq!(q.requote(b"/a%0Ab").unwrap(), "/a\nb"); + assert_eq!(q.requote(b"/a%25b%2Bc").unwrap(), b"/a%b+c"); + assert_eq!(q.requote(b"/a%2fb").unwrap(), b"/a%2fb"); + assert_eq!(q.requote(b"/a%2Fb").unwrap(), b"/a%2Fb"); + assert_eq!(q.requote(b"/a%0Ab").unwrap(), b"/a\nb"); + assert_eq!(q.requote(b"/a%FE\xffb").unwrap(), b"/a\xfe\xffb"); + assert_eq!(q.requote(b"/a\xfe\xffb"), None); + } + + #[test] + fn non_ascii() { + let q = Quoter::new(b"%+", b"/"); + assert_eq!(q.requote(b"/a%FE\xffb").unwrap(), b"/a\xfe\xffb"); + assert_eq!(q.requote(b"/a\xfe\xffb"), None); + } + + #[test] + fn invalid_sequences() { + let q = Quoter::new(b"%+", b"/"); + assert_eq!(q.requote(b"/a%2x%2X%%").unwrap(), b"/a%2x%2X"); } #[test] diff --git a/actix-router/src/url.rs b/actix-router/src/url.rs index f8d94ae4a..e7dda3fca 100644 --- a/actix-router/src/url.rs +++ b/actix-router/src/url.rs @@ -15,14 +15,14 @@ pub struct Url { impl Url { #[inline] pub fn new(uri: http::Uri) -> Url { - let path = DEFAULT_QUOTER.with(|q| q.requote(uri.path().as_bytes())); + let path = DEFAULT_QUOTER.with(|q| q.requote_str_lossy(uri.path())); Url { uri, path } } #[inline] pub fn new_with_quoter(uri: http::Uri, quoter: &Quoter) -> Url { Url { - path: quoter.requote(uri.path().as_bytes()), + path: quoter.requote_str_lossy(uri.path()), uri, } } @@ -45,13 +45,13 @@ impl Url { #[inline] pub fn update(&mut self, uri: &http::Uri) { self.uri = uri.clone(); - self.path = DEFAULT_QUOTER.with(|q| q.requote(uri.path().as_bytes())); + self.path = DEFAULT_QUOTER.with(|q| q.requote_str_lossy(uri.path())); } #[inline] pub fn update_with_quoter(&mut self, uri: &http::Uri, quoter: &Quoter) { self.uri = uri.clone(); - self.path = quoter.requote(uri.path().as_bytes()); + self.path = quoter.requote_str_lossy(uri.path()); } } diff --git a/actix-test/CHANGES.md b/actix-test/CHANGES.md index 32ab2344f..3877f4fbf 100644 --- a/actix-test/CHANGES.md +++ b/actix-test/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +- Rename `TestServerConfig::{client_timeout => client_request_timeout}`. [#2611] + +[#2611]: https://github.com/actix/actix-web/pull/2611 ## 0.1.0-beta.11 - 2022-01-04 diff --git a/actix-test/src/lib.rs b/actix-test/src/lib.rs index f86120f2f..d44bc7a45 100644 --- a/actix-test/src/lib.rs +++ b/actix-test/src/lib.rs @@ -149,7 +149,7 @@ where let local_addr = tcp.local_addr().unwrap(); let factory = factory.clone(); let srv_cfg = cfg.clone(); - let timeout = cfg.client_timeout; + let timeout = cfg.client_request_timeout; let builder = Server::build().workers(1).disable_signals().system_exit(); @@ -167,7 +167,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h1(map_config(fac, move |_| app_cfg.clone())) .tcp() }), @@ -183,7 +183,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h2(map_config(fac, move |_| app_cfg.clone())) .tcp() }), @@ -199,7 +199,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .finish(map_config(fac, move |_| app_cfg.clone())) .tcp() }), @@ -218,7 +218,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h1(map_config(fac, move |_| app_cfg.clone())) .openssl(acceptor.clone()) }), @@ -234,7 +234,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h2(map_config(fac, move |_| app_cfg.clone())) .openssl(acceptor.clone()) }), @@ -250,7 +250,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .finish(map_config(fac, move |_| app_cfg.clone())) .openssl(acceptor.clone()) }), @@ -269,7 +269,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h1(map_config(fac, move |_| app_cfg.clone())) .rustls(config.clone()) }), @@ -285,7 +285,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .h2(map_config(fac, move |_| app_cfg.clone())) .rustls(config.clone()) }), @@ -301,7 +301,7 @@ where .map_err(|err| err.into().error_response()); HttpService::build() - .client_timeout(timeout) + .client_request_timeout(timeout) .finish(map_config(fac, move |_| app_cfg.clone())) .rustls(config.clone()) }), @@ -388,7 +388,7 @@ pub fn config() -> TestServerConfig { pub struct TestServerConfig { tp: HttpVer, stream: StreamType, - client_timeout: u64, + client_request_timeout: Duration, } impl Default for TestServerConfig { @@ -403,7 +403,7 @@ impl TestServerConfig { TestServerConfig { tp: HttpVer::Both, stream: StreamType::Tcp, - client_timeout: 5000, + client_request_timeout: Duration::from_secs(5), } } @@ -433,9 +433,9 @@ impl TestServerConfig { self } - /// Set client timeout in milliseconds for first request. - pub fn client_timeout(mut self, val: u64) -> Self { - self.client_timeout = val; + /// Set client timeout for first request. + pub fn client_request_timeout(mut self, dur: Duration) -> Self { + self.client_request_timeout = dur; self } } diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 222765991..b3afdec10 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -60,7 +60,7 @@ dangerous-h2c = [] [dependencies] actix-codec = "0.4.1" actix-service = "2.0.0" -actix-http = "3.0.0-beta.19" +actix-http = { version = "3.0.0-beta.19", features = ["http2", "ws"] } actix-rt = { version = "2.1", default-features = false } actix-tls = { version = "3.0.0", features = ["connect", "uri"] } actix-utils = "3.0.0" diff --git a/awc/src/client/h1proto.rs b/awc/src/client/h1proto.rs index cf716db72..4f6a87ac5 100644 --- a/awc/src/client/h1proto.rs +++ b/awc/src/client/h1proto.rs @@ -70,7 +70,7 @@ where let is_expect = if head.as_ref().headers.contains_key(EXPECT) { match body.size() { BodySize::None | BodySize::Sized(0) => { - let keep_alive = framed.codec_ref().keepalive(); + let keep_alive = framed.codec_ref().keep_alive(); framed.io_mut().on_release(keep_alive); // TODO: use a new variant or a new type better describing error violate @@ -119,7 +119,7 @@ where match pin_framed.codec_ref().message_type() { h1::MessageType::None => { - let keep_alive = pin_framed.codec_ref().keepalive(); + let keep_alive = pin_framed.codec_ref().keep_alive(); pin_framed.io_mut().on_release(keep_alive); Ok((head, Payload::None)) @@ -223,7 +223,7 @@ impl Stream for PlStream { match ready!(this.framed.as_mut().next_item(cx)?) { Some(Some(chunk)) => Poll::Ready(Some(Ok(chunk))), Some(None) => { - let keep_alive = this.framed.codec_ref().keepalive(); + let keep_alive = this.framed.codec_ref().keep_alive(); this.framed.io_mut().on_release(keep_alive); Poll::Ready(None) } diff --git a/src/app.rs b/src/app.rs index da33ebc4b..a63cf5d50 100644 --- a/src/app.rs +++ b/src/app.rs @@ -236,10 +236,14 @@ where self } - /// Default service to be used if no matching resource could be found. + /// Default service that is invoked when no matching resource could be found. /// - /// It is possible to use services like `Resource`, `Route`. + /// You can use a [`Route`] as default service. /// + /// If a default service is not registered, an empty `404 Not Found` response will be sent to + /// the client instead. + /// + /// # Examples /// ``` /// use actix_web::{web, App, HttpResponse}; /// @@ -248,23 +252,8 @@ where /// } /// /// let app = App::new() - /// .service( - /// web::resource("/index.html").route(web::get().to(index))) - /// .default_service( - /// web::route().to(|| HttpResponse::NotFound())); - /// ``` - /// - /// It is also possible to use static files as default service. - /// - /// ``` - /// use actix_web::{web, App, HttpResponse}; - /// - /// let app = App::new() - /// .service( - /// web::resource("/index.html").to(|| HttpResponse::Ok())) - /// .default_service( - /// web::to(|| HttpResponse::NotFound()) - /// ); + /// .service(web::resource("/index.html").route(web::get().to(index))) + /// .default_service(web::to(|| HttpResponse::NotFound())); /// ``` pub fn default_service(mut self, svc: F) -> Self where diff --git a/src/app_service.rs b/src/app_service.rs index 2ee8232e2..7910f10a3 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -72,7 +72,7 @@ where }))) }); - // App config + // create App config to pass to child services let mut config = AppService::new(config, default.clone()); // register services diff --git a/src/resource.rs b/src/resource.rs index a0fc19faf..6a01a0496 100644 --- a/src/resource.rs +++ b/src/resource.rs @@ -313,8 +313,12 @@ where } /// Default service to be used if no matching route could be found. - /// By default *405* response get returned. Resource does not use - /// default handler from `App` or `Scope`. + /// + /// You can use a [`Route`] as default service. + /// + /// If a default service is not registered, an empty `405 Method Not Allowed` response will be + /// sent to the client instead. Unlike [`Scope`](crate::Scope)s, a [`Resource`] does **not** + /// inherit its parent's default service. pub fn default_service(mut self, f: F) -> Self where F: IntoServiceFactory, diff --git a/src/scope.rs b/src/scope.rs index 9ef28be50..f8d836810 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -262,9 +262,10 @@ where ) } - /// Default service to be used if no matching route could be found. + /// Default service to be used if no matching resource could be found. /// - /// If default resource is not registered, app's default resource is being used. + /// If a default service is not registered, it will fall back to the default service of + /// the parent [`App`](crate::App) (see [`App::default_service`](crate::App::default_service)). pub fn default_service(mut self, f: F) -> Self where F: IntoServiceFactory, diff --git a/src/server.rs b/src/server.rs index ed0c965b3..83e025fb0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,6 +4,7 @@ use std::{ marker::PhantomData, net, sync::{Arc, Mutex}, + time::Duration, }; use actix_http::{body::MessageBody, Extensions, HttpService, KeepAlive, Request, Response}; @@ -27,8 +28,8 @@ struct Socket { struct Config { host: Option, keep_alive: KeepAlive, - client_timeout: u64, - client_shutdown: u64, + client_request_timeout: Duration, + client_disconnect_timeout: Duration, } /// An HTTP Server. @@ -88,9 +89,9 @@ where factory, config: Arc::new(Mutex::new(Config { host: None, - keep_alive: KeepAlive::Timeout(5), - client_timeout: 5000, - client_shutdown: 5000, + keep_alive: KeepAlive::default(), + client_request_timeout: Duration::from_secs(5), + client_disconnect_timeout: Duration::from_secs(1), })), backlog: 1024, sockets: Vec::new(), @@ -200,11 +201,17 @@ where /// To disable timeout set value to 0. /// /// By default client timeout is set to 5000 milliseconds. - pub fn client_timeout(self, val: u64) -> Self { - self.config.lock().unwrap().client_timeout = val; + pub fn client_request_timeout(self, dur: Duration) -> Self { + self.config.lock().unwrap().client_request_timeout = dur; self } + #[doc(hidden)] + #[deprecated(since = "4.0.0", note = "Renamed to `client_request_timeout`.")] + pub fn client_timeout(self, dur: Duration) -> Self { + self.client_request_timeout(dur) + } + /// Set server connection shutdown timeout in milliseconds. /// /// Defines a timeout for shutdown connection. If a shutdown procedure does not complete @@ -213,11 +220,17 @@ where /// To disable timeout set value to 0. /// /// By default client timeout is set to 5000 milliseconds. - pub fn client_shutdown(self, val: u64) -> Self { - self.config.lock().unwrap().client_shutdown = val; + pub fn client_disconnect_timeout(self, dur: Duration) -> Self { + self.config.lock().unwrap().client_disconnect_timeout = dur; self } + #[doc(hidden)] + #[deprecated(since = "4.0.0", note = "Renamed to `client_request_timeout`.")] + pub fn client_shutdown(self, dur: u64) -> Self { + self.client_disconnect_timeout(Duration::from_millis(dur)) + } + /// Set server host name. /// /// Host name is used by application router as a hostname for url generation. @@ -291,8 +304,8 @@ where let mut svc = HttpService::build() .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .client_disconnect(c.client_shutdown) + .client_request_timeout(c.client_request_timeout) + .client_disconnect_timeout(c.client_disconnect_timeout) .local_addr(addr); if let Some(handler) = on_connect_fn.clone() { @@ -349,8 +362,8 @@ where let svc = HttpService::build() .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .client_disconnect(c.client_shutdown) + .client_request_timeout(c.client_request_timeout) + .client_disconnect_timeout(c.client_disconnect_timeout) .local_addr(addr); let svc = if let Some(handler) = on_connect_fn.clone() { @@ -410,8 +423,8 @@ where let svc = HttpService::build() .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .client_disconnect(c.client_shutdown); + .client_request_timeout(c.client_request_timeout) + .client_disconnect_timeout(c.client_disconnect_timeout); let svc = if let Some(handler) = on_connect_fn.clone() { svc.on_connect_ext(move |io: &_, ext: _| (handler)(io as &dyn Any, ext)) @@ -537,8 +550,8 @@ where fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then({ let mut svc = HttpService::build() .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .client_disconnect(c.client_shutdown); + .client_request_timeout(c.client_request_timeout) + .client_disconnect_timeout(c.client_disconnect_timeout); if let Some(handler) = on_connect_fn.clone() { svc = svc @@ -593,8 +606,8 @@ where fn_service(|io: UnixStream| async { Ok((io, Protocol::Http1, None)) }).and_then( HttpService::build() .keep_alive(c.keep_alive) - .client_timeout(c.client_timeout) - .client_disconnect(c.client_shutdown) + .client_request_timeout(c.client_request_timeout) + .client_disconnect_timeout(c.client_disconnect_timeout) .finish(map_config(fac, move |_| config.clone())), ) }, diff --git a/tests/test_httpserver.rs b/tests/test_httpserver.rs index 6ea8e520c..86e0575f3 100644 --- a/tests/test_httpserver.rs +++ b/tests/test_httpserver.rs @@ -26,9 +26,9 @@ async fn test_start() { .backlog(1) .max_connections(10) .max_connection_rate(10) - .keep_alive(10) - .client_timeout(5000) - .client_shutdown(0) + .keep_alive(Duration::from_secs(10)) + .client_request_timeout(Duration::from_secs(5)) + .client_disconnect_timeout(Duration::ZERO) .server_hostname("localhost") .system_exit() .disable_signals() diff --git a/tests/test_server.rs b/tests/test_server.rs index b8193a004..bd8934061 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -8,6 +8,7 @@ use std::{ io::{Read, Write}, pin::Pin, task::{Context, Poll}, + time::Duration, }; use actix_web::{ @@ -835,9 +836,10 @@ async fn test_server_cookies() { async fn test_slow_request() { use std::net; - let srv = actix_test::start_with(actix_test::config().client_timeout(200), || { - App::new().service(web::resource("/").route(web::to(HttpResponse::Ok))) - }); + let srv = actix_test::start_with( + actix_test::config().client_request_timeout(Duration::from_millis(200)), + || App::new().service(web::resource("/").route(web::to(HttpResponse::Ok))), + ); let mut stream = net::TcpStream::connect(srv.addr()).unwrap(); let mut data = String::new();