diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 718f67922..d9bef53cd 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -2,7 +2,9 @@ ## Unreleased -- Properly wake Payload receivers when feeding errors or EOF +- Properly wake Payload receivers when feeding errors or EOF. +- Add `ServiceConfigBuilder` type to facilitate future configuration extensions. +- Add a configuration option to allow/disallow half closed connections in HTTP/1. This defaults to allow, reverting the change made in 3.11.1. ## 3.11.1 diff --git a/actix-http/src/builder.rs b/actix-http/src/builder.rs index 916083a98..09b379e87 100644 --- a/actix-http/src/builder.rs +++ b/actix-http/src/builder.rs @@ -7,7 +7,7 @@ use crate::{ body::{BoxBody, MessageBody}, h1::{self, ExpectHandler, H1Service, UpgradeHandler}, service::HttpService, - ConnectCallback, Extensions, KeepAlive, Request, Response, ServiceConfig, + ConnectCallback, Extensions, KeepAlive, Request, Response, ServiceConfigBuilder, }; /// An HTTP service builder. @@ -19,6 +19,7 @@ pub struct HttpServiceBuilder { client_disconnect_timeout: Duration, secure: bool, local_addr: Option, + h1_allow_half_closed: bool, expect: X, upgrade: Option, on_connect_ext: Option>>, @@ -40,6 +41,7 @@ where client_disconnect_timeout: Duration::ZERO, secure: false, local_addr: None, + h1_allow_half_closed: true, // dispatcher parts expect: ExpectHandler, @@ -124,6 +126,18 @@ where self.client_disconnect_timeout(dur) } + /// Sets whether HTTP/1 connections should support half-closures. + /// + /// Clients can choose to shutdown their writer-side of the connection after completing their + /// request and while waiting for the server response. Setting this to `false` will cause the + /// server to abort the connection handling as soon as it detects an EOF from the client. + /// + /// The default behavior is to allow, i.e. `true` + pub fn h1_allow_half_closed(mut self, allow: bool) -> Self { + self.h1_allow_half_closed = allow; + self + } + /// Provide service for `EXPECT: 100-Continue` support. /// /// Service get called with request that contains `EXPECT` header. @@ -142,6 +156,7 @@ where client_disconnect_timeout: self.client_disconnect_timeout, secure: self.secure, local_addr: self.local_addr, + h1_allow_half_closed: self.h1_allow_half_closed, expect: expect.into_factory(), upgrade: self.upgrade, on_connect_ext: self.on_connect_ext, @@ -166,6 +181,7 @@ where client_disconnect_timeout: self.client_disconnect_timeout, secure: self.secure, local_addr: self.local_addr, + h1_allow_half_closed: self.h1_allow_half_closed, expect: self.expect, upgrade: Some(upgrade.into_factory()), on_connect_ext: self.on_connect_ext, @@ -195,13 +211,14 @@ where S::InitError: fmt::Debug, S::Response: Into>, { - let cfg = ServiceConfig::new( - self.keep_alive, - self.client_request_timeout, - self.client_disconnect_timeout, - self.secure, - self.local_addr, - ); + let cfg = ServiceConfigBuilder::new() + .keep_alive(self.keep_alive) + .client_request_timeout(self.client_request_timeout) + .client_disconnect_timeout(self.client_disconnect_timeout) + .secure(self.secure) + .local_addr(self.local_addr) + .h1_allow_half_closed(self.h1_allow_half_closed) + .build(); H1Service::with_config(cfg, service.into_factory()) .expect(self.expect) @@ -220,13 +237,14 @@ where B: MessageBody + 'static, { - let cfg = ServiceConfig::new( - self.keep_alive, - self.client_request_timeout, - self.client_disconnect_timeout, - self.secure, - self.local_addr, - ); + let cfg = ServiceConfigBuilder::new() + .keep_alive(self.keep_alive) + .client_request_timeout(self.client_request_timeout) + .client_disconnect_timeout(self.client_disconnect_timeout) + .secure(self.secure) + .local_addr(self.local_addr) + .h1_allow_half_closed(self.h1_allow_half_closed) + .build(); crate::h2::H2Service::with_config(cfg, service.into_factory()) .on_connect_ext(self.on_connect_ext) @@ -242,13 +260,14 @@ where B: MessageBody + 'static, { - let cfg = ServiceConfig::new( - self.keep_alive, - self.client_request_timeout, - self.client_disconnect_timeout, - self.secure, - self.local_addr, - ); + let cfg = ServiceConfigBuilder::new() + .keep_alive(self.keep_alive) + .client_request_timeout(self.client_request_timeout) + .client_disconnect_timeout(self.client_disconnect_timeout) + .secure(self.secure) + .local_addr(self.local_addr) + .h1_allow_half_closed(self.h1_allow_half_closed) + .build(); HttpService::with_config(cfg, service.into_factory()) .expect(self.expect) diff --git a/actix-http/src/config.rs b/actix-http/src/config.rs index b3b215da4..96e2aef07 100644 --- a/actix-http/src/config.rs +++ b/actix-http/src/config.rs @@ -1,5 +1,5 @@ use std::{ - net, + net::SocketAddr, rc::Rc, time::{Duration, Instant}, }; @@ -8,8 +8,76 @@ use bytes::BytesMut; use crate::{date::DateService, KeepAlive}; +/// A builder for creating a [`ServiceConfig`] +#[derive(Default, Debug)] +pub struct ServiceConfigBuilder { + inner: Inner, +} + +impl ServiceConfigBuilder { + /// Creates a new, default, [`ServiceConfigBuilder`] + /// + /// It uses the following default values: + /// + /// - [`KeepAlive::default`] for the connection keep-alive setting + /// - 5 seconds for the client request timeout + /// - 0 seconds for the client shutdown timeout + /// - secure value of `false` + /// - [`None`] for the local address setting + /// - Allow for half closed HTTP/1 connections + pub fn new() -> Self { + Self::default() + } + + /// Sets the `secure` attribute for this configuration + pub fn secure(mut self, secure: bool) -> Self { + self.inner.secure = secure; + self + } + + /// Sets the local address for this configuration + pub fn local_addr(mut self, local_addr: Option) -> Self { + self.inner.local_addr = local_addr; + self + } + + /// Sets connection keep-alive setting + pub fn keep_alive(mut self, keep_alive: KeepAlive) -> Self { + self.inner.keep_alive = keep_alive; + self + } + + /// Sets the timeout for the client to finish sending the head of its first request + pub fn client_request_timeout(mut self, timeout: Duration) -> Self { + self.inner.client_request_timeout = timeout; + self + } + + /// Sets the timeout for cleanly disconnecting from the client after connection shutdown has + /// started + pub fn client_disconnect_timeout(mut self, timeout: Duration) -> Self { + self.inner.client_disconnect_timeout = timeout; + self + } + + /// Sets whether HTTP/1 connections should support half-closures. + /// + /// Clients can choose to shutdown their writer-side of the connection after completing their + /// request and while waiting for the server response. Setting this to `false` will cause the + /// server to abort the connection handling as soon as it detects an EOF from the client + pub fn h1_allow_half_closed(mut self, allow: bool) -> Self { + self.inner.h1_allow_half_closed = allow; + self + } + + /// Builds a [`ServiceConfig`] from this [`ServiceConfigBuilder`] instance + pub fn build(self) -> ServiceConfig { + ServiceConfig(Rc::new(self.inner)) + } +} + /// HTTP service configuration. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] pub struct ServiceConfig(Rc); #[derive(Debug)] @@ -18,19 +86,22 @@ struct Inner { client_request_timeout: Duration, client_disconnect_timeout: Duration, secure: bool, - local_addr: Option, + local_addr: Option, date_service: DateService, + h1_allow_half_closed: bool, } -impl Default for ServiceConfig { +impl Default for Inner { fn default() -> Self { - Self::new( - KeepAlive::default(), - Duration::from_secs(5), - Duration::ZERO, - false, - None, - ) + Self { + keep_alive: KeepAlive::default(), + client_request_timeout: Duration::from_secs(5), + client_disconnect_timeout: Duration::ZERO, + secure: false, + local_addr: None, + date_service: DateService::new(), + h1_allow_half_closed: true, + } } } @@ -41,7 +112,7 @@ impl ServiceConfig { client_request_timeout: Duration, client_disconnect_timeout: Duration, secure: bool, - local_addr: Option, + local_addr: Option, ) -> ServiceConfig { ServiceConfig(Rc::new(Inner { keep_alive: keep_alive.normalize(), @@ -50,6 +121,7 @@ impl ServiceConfig { secure, local_addr, date_service: DateService::new(), + h1_allow_half_closed: true, })) } @@ -63,7 +135,7 @@ impl ServiceConfig { /// /// Returns `None` for connections via UDS (Unix Domain Socket). #[inline] - pub fn local_addr(&self) -> Option { + pub fn local_addr(&self) -> Option { self.0.local_addr } @@ -100,6 +172,15 @@ impl ServiceConfig { (timeout != Duration::ZERO).then(|| self.now() + timeout) } + /// Whether HTTP/1 connections should support half-closures. + /// + /// Clients can choose to shutdown their writer-side of the connection after completing their + /// request and while waiting for the server response. If this configuration is `false`, the + /// server will abort the connection handling as soon as it detects an EOF from the client + pub fn h1_allow_half_closed(&self) -> bool { + self.0.h1_allow_half_closed + } + pub(crate) fn now(&self) -> Instant { self.0.date_service.now() } diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 3f0b78af4..41266659a 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -1181,8 +1181,16 @@ where let inner_p = inner.as_mut().project(); let state_is_none = inner_p.state.is_none(); - // read half is closed; we do not process any responses - if inner_p.flags.contains(Flags::READ_DISCONNECT) { + // If the read-half is closed, we start the shutdown procedure if either is + // true: + // + // - state is [`State::None`], which means that we're done with request + // processing, so if the client closed its writer-side it means that it won't + // send more requests. + // - The user requested to not allow half-closures + if inner_p.flags.contains(Flags::READ_DISCONNECT) + && (!inner_p.config.h1_allow_half_closed() || state_is_none) + { trace!("read half closed; start shutdown"); inner_p.flags.insert(Flags::SHUTDOWN); } diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 50259e6ce..3edf47ac4 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -1,4 +1,10 @@ -use std::{future::Future, str, task::Poll, time::Duration}; +use std::{ + future::Future, + pin::Pin, + str, + task::{Context, Poll}, + time::Duration, +}; use actix_codec::Framed; use actix_rt::{pin, time::sleep}; @@ -9,7 +15,7 @@ use futures_util::future::lazy; use super::dispatcher::{Dispatcher, DispatcherState, DispatcherStateProj, Flags}; use crate::{ - body::MessageBody, + body::{BoxBody, MessageBody}, config::ServiceConfig, h1::{Codec, ExpectHandler, UpgradeHandler}, service::HttpFlow, @@ -17,6 +23,26 @@ use crate::{ Error, HttpMessage, KeepAlive, Method, OnConnectData, Request, Response, StatusCode, }; +struct YieldService; + +impl Service for YieldService { + type Response = Response; + type Error = Response; + type Future = Pin>>>; + + actix_service::always_ready!(); + + fn call(&self, _: Request) -> Self::Future { + Box::pin(async { + // Yield twice because the dispatcher can poll the service twice per dispatcher's poll: + // once in `handle_request` and another in `poll_response` + actix_rt::task::yield_now().await; + actix_rt::task::yield_now().await; + Ok(Response::ok()) + }) + } +} + fn find_slice(haystack: &[u8], needle: &[u8], from: usize) -> Option { memchr::memmem::find(&haystack[from..], needle) } @@ -924,6 +950,91 @@ async fn handler_drop_payload() { .await; } +#[actix_rt::test] +async fn allow_half_closed() { + let buf = TestSeqBuffer::new(http_msg("GET / HTTP/1.1")); + buf.close_read(); + let services = HttpFlow::new(YieldService, ExpectHandler, None::); + + let mut cx = Context::from_waker(futures_util::task::noop_waker_ref()); + let disptacher = Dispatcher::new( + buf.clone(), + services, + ServiceConfig::default(), + None, + OnConnectData::default(), + ); + pin!(disptacher); + + assert!(disptacher.as_mut().poll(&mut cx).is_pending()); + assert_eq!(disptacher.poll_count, 1); + + assert!(disptacher.as_mut().poll(&mut cx).is_ready()); + assert_eq!(disptacher.poll_count, 3); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 0 + date: Thu, 01 Jan 1970 12:34:56 UTC + ", + ); + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(&res), + String::from_utf8_lossy(&exp) + ); + + let DispatcherStateProj::Normal { inner } = disptacher.as_mut().project().inner.project() + else { + panic!("End dispatcher state should be Normal"); + }; + assert!(inner.state.is_none()); +} + +#[actix_rt::test] +async fn disallow_half_closed() { + use crate::{config::ServiceConfigBuilder, h1::dispatcher::State}; + + let buf = TestSeqBuffer::new(http_msg("GET / HTTP/1.1")); + buf.close_read(); + let services = HttpFlow::new(YieldService, ExpectHandler, None::); + let config = ServiceConfigBuilder::new() + .h1_allow_half_closed(false) + .build(); + + let mut cx = Context::from_waker(futures_util::task::noop_waker_ref()); + let disptacher = Dispatcher::new( + buf.clone(), + services, + config, + None, + OnConnectData::default(), + ); + pin!(disptacher); + + assert!(disptacher.as_mut().poll(&mut cx).is_pending()); + assert_eq!(disptacher.poll_count, 1); + + assert!(disptacher.as_mut().poll(&mut cx).is_ready()); + assert_eq!(disptacher.poll_count, 2); + + let res = BytesMut::from(buf.take_write_buf().as_ref()); + assert!(res.is_empty()); + + let DispatcherStateProj::Normal { inner } = disptacher.as_mut().project().inner.project() + else { + panic!("End dispatcher state should be Normal"); + }; + assert!(matches!(inner.state, State::ServiceCall { .. })) +} + fn http_msg(msg: impl AsRef) -> BytesMut { let mut msg = msg .as_ref() diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index a970056de..ae3713f20 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -63,7 +63,7 @@ pub use self::payload::PayloadStream; pub use self::service::TlsAcceptorConfig; pub use self::{ builder::HttpServiceBuilder, - config::ServiceConfig, + config::{ServiceConfig, ServiceConfigBuilder}, error::Error, extensions::Extensions, header::ContentEncoding, diff --git a/actix-http/src/test.rs b/actix-http/src/test.rs index 5632a310c..926efe2f5 100644 --- a/actix-http/src/test.rs +++ b/actix-http/src/test.rs @@ -275,6 +275,7 @@ impl TestSeqBuffer { { Self(Rc::new(RefCell::new(TestSeqInner { read_buf: data.into(), + read_closed: false, write_buf: BytesMut::new(), err: None, }))) @@ -293,36 +294,59 @@ impl TestSeqBuffer { Ref::map(self.0.borrow(), |inner| &inner.write_buf) } + pub fn take_write_buf(&self) -> Bytes { + self.0.borrow_mut().write_buf.split().freeze() + } + pub fn err(&self) -> Ref<'_, Option> { Ref::map(self.0.borrow(), |inner| &inner.err) } /// Add data to read buffer. + /// + /// # Panics + /// + /// Panics if called after [`TestSeqBuffer::close_read`] has been called pub fn extend_read_buf>(&mut self, data: T) { - self.0 - .borrow_mut() - .read_buf - .extend_from_slice(data.as_ref()) + let mut inner = self.0.borrow_mut(); + if inner.read_closed { + panic!("Tried to extend the read buffer after calling close_read"); + } + + inner.read_buf.extend_from_slice(data.as_ref()) + } + + /// Closes the [`AsyncRead`]/[`Read`] part of this test buffer. + /// + /// The current data in the buffer will still be returned by a call to read/poll_read, however, + /// after the buffer is empty, it will return `Ok(0)` to signify the EOF condition + pub fn close_read(&self) { + self.0.borrow_mut().read_closed = true; } } pub struct TestSeqInner { read_buf: BytesMut, + read_closed: bool, write_buf: BytesMut, err: Option, } impl io::Read for TestSeqBuffer { fn read(&mut self, dst: &mut [u8]) -> Result { - if self.0.borrow().read_buf.is_empty() { - if self.0.borrow().err.is_some() { - Err(self.0.borrow_mut().err.take().unwrap()) + let mut inner = self.0.borrow_mut(); + + if inner.read_buf.is_empty() { + if let Some(err) = inner.err.take() { + Err(err) + } else if inner.read_closed { + Ok(0) } else { Err(io::Error::new(io::ErrorKind::WouldBlock, "")) } } else { - let size = std::cmp::min(self.0.borrow().read_buf.len(), dst.len()); - let b = self.0.borrow_mut().read_buf.split_to(size); + let size = std::cmp::min(inner.read_buf.len(), dst.len()); + let b = inner.read_buf.split_to(size); dst[..size].copy_from_slice(&b); Ok(size) } diff --git a/actix-web/src/server.rs b/actix-web/src/server.rs index 0717f5bc6..2bd7c4463 100644 --- a/actix-web/src/server.rs +++ b/actix-web/src/server.rs @@ -31,6 +31,7 @@ struct Config { keep_alive: KeepAlive, client_request_timeout: Duration, client_disconnect_timeout: Duration, + h1_allow_half_closed: bool, #[allow(dead_code)] // only dead when no TLS features are enabled tls_handshake_timeout: Option, } @@ -116,6 +117,7 @@ where keep_alive: KeepAlive::default(), client_request_timeout: Duration::from_secs(5), client_disconnect_timeout: Duration::from_secs(1), + h1_allow_half_closed: true, tls_handshake_timeout: None, })), backlog: 1024, @@ -257,6 +259,18 @@ where self.client_disconnect_timeout(Duration::from_millis(dur)) } + /// Sets whether HTTP/1 connections should support half-closures. + /// + /// Clients can choose to shutdown their writer-side of the connection after completing their + /// request and while waiting for the server response. Setting this to `false` will cause the + /// server to abort the connection handling as soon as it detects an EOF from the client. + /// + /// The default behavior is to allow, i.e. `true` + pub fn h1_allow_half_closed(self, allow: bool) -> Self { + self.config.lock().unwrap().h1_allow_half_closed = allow; + self + } + /// Sets function that will be called once before each connection is handled. /// /// It will receive a `&std::any::Any`, which contains underlying connection type and an @@ -558,6 +572,7 @@ where .keep_alive(cfg.keep_alive) .client_request_timeout(cfg.client_request_timeout) .client_disconnect_timeout(cfg.client_disconnect_timeout) + .h1_allow_half_closed(cfg.h1_allow_half_closed) .local_addr(addr); if let Some(handler) = on_connect_fn.clone() { @@ -602,6 +617,7 @@ where .keep_alive(cfg.keep_alive) .client_request_timeout(cfg.client_request_timeout) .client_disconnect_timeout(cfg.client_disconnect_timeout) + .h1_allow_half_closed(cfg.h1_allow_half_closed) .local_addr(addr); if let Some(handler) = on_connect_fn.clone() { @@ -677,6 +693,7 @@ where let svc = HttpService::build() .keep_alive(c.keep_alive) .client_request_timeout(c.client_request_timeout) + .h1_allow_half_closed(c.h1_allow_half_closed) .client_disconnect_timeout(c.client_disconnect_timeout); let svc = if let Some(handler) = on_connect_fn.clone() { @@ -728,6 +745,7 @@ where let svc = HttpService::build() .keep_alive(c.keep_alive) .client_request_timeout(c.client_request_timeout) + .h1_allow_half_closed(c.h1_allow_half_closed) .client_disconnect_timeout(c.client_disconnect_timeout); let svc = if let Some(handler) = on_connect_fn.clone() { @@ -794,6 +812,7 @@ where let svc = HttpService::build() .keep_alive(c.keep_alive) .client_request_timeout(c.client_request_timeout) + .h1_allow_half_closed(c.h1_allow_half_closed) .client_disconnect_timeout(c.client_disconnect_timeout); let svc = if let Some(handler) = on_connect_fn.clone() { @@ -860,6 +879,7 @@ where let svc = HttpService::build() .keep_alive(c.keep_alive) .client_request_timeout(c.client_request_timeout) + .h1_allow_half_closed(c.h1_allow_half_closed) .client_disconnect_timeout(c.client_disconnect_timeout); let svc = if let Some(handler) = on_connect_fn.clone() { @@ -927,6 +947,7 @@ where .keep_alive(c.keep_alive) .client_request_timeout(c.client_request_timeout) .client_disconnect_timeout(c.client_disconnect_timeout) + .h1_allow_half_closed(c.h1_allow_half_closed) .local_addr(addr); let svc = if let Some(handler) = on_connect_fn.clone() { @@ -995,6 +1016,7 @@ where .keep_alive(c.keep_alive) .client_request_timeout(c.client_request_timeout) .client_disconnect_timeout(c.client_disconnect_timeout) + .h1_allow_half_closed(c.h1_allow_half_closed) .finish(map_config(fac, move |_| config.clone())), ) }, @@ -1036,6 +1058,7 @@ where let mut svc = HttpService::build() .keep_alive(c.keep_alive) .client_request_timeout(c.client_request_timeout) + .h1_allow_half_closed(c.h1_allow_half_closed) .client_disconnect_timeout(c.client_disconnect_timeout); if let Some(handler) = on_connect_fn.clone() {