diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 625b4100d..a6bca0696 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -12,7 +12,11 @@ * MessageBody is not implemented for &'static [u8] anymore. -* Change defaul initial window size and connection window size for HTTP2 to 5MB and 2MB respectively. This 7 times improves download speed for awc when downloading large objects. +* Change default initial window size and connection window size for HTTP2 to 2MB and 1MB respectively to improve download speed for awc when downloading large objects. + +* client::Connector accepts initial_window_size and initial_connection_window_size HTTP2 configuration + +* client::Connector allowing to set max_http_version to limit HTTP version to be used ### Fixed diff --git a/actix-http/src/client/config.rs b/actix-http/src/client/config.rs new file mode 100644 index 000000000..2844849df --- /dev/null +++ b/actix-http/src/client/config.rs @@ -0,0 +1,39 @@ +use std::time::Duration; + +// These values are taken from hyper/src/proto/h2/client.rs +const DEFAULT_H2_CONN_WINDOW: u32 = 1024 * 1024 * 2; // 2mb +const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024 * 1; // 1mb + +/// Connector configuration +#[derive(Clone)] +pub(crate) struct ConnectorConfig { + pub(crate) timeout: Duration, + pub(crate) conn_lifetime: Duration, + pub(crate) conn_keep_alive: Duration, + pub(crate) disconnect_timeout: Option, + pub(crate) limit: usize, + pub(crate) conn_window_size: u32, + pub(crate) stream_window_size: u32, +} + +impl Default for ConnectorConfig { + fn default() -> Self { + Self { + timeout: Duration::from_secs(1), + conn_lifetime: Duration::from_secs(75), + conn_keep_alive: Duration::from_secs(15), + disconnect_timeout: Some(Duration::from_millis(3000)), + limit: 100, + conn_window_size: DEFAULT_H2_CONN_WINDOW, + stream_window_size: DEFAULT_H2_STREAM_WINDOW, + } + } +} + +impl ConnectorConfig { + pub(crate) fn no_disconnect_timeout(&self) -> Self { + let mut res = self.clone(); + res.disconnect_timeout = None; + res + } +} diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index 055d4276d..d689ba955 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -11,6 +11,7 @@ use actix_service::{apply_fn, Service}; use actix_utils::timeout::{TimeoutError, TimeoutService}; use http::Uri; +use super::config::ConnectorConfig; use super::connection::Connection; use super::error::ConnectError; use super::pool::{ConnectionPool, Protocol}; @@ -48,11 +49,14 @@ type SslConnector = (); /// ``` pub struct Connector { connector: T, - timeout: Duration, + config: ConnectorConfig, + /* timeout: Duration, conn_lifetime: Duration, conn_keep_alive: Duration, disconnect_timeout: Duration, limit: usize, + conn_window_size: u32, + stream_window_size: u32,*/ #[allow(dead_code)] ssl: SslConnector, _t: PhantomData, @@ -71,42 +75,52 @@ impl Connector<(), ()> { > + Clone, TcpStream, > { - let ssl = { - #[cfg(feature = "openssl")] - { - use actix_connect::ssl::openssl::SslMethod; - - let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap(); - let _ = ssl - .set_alpn_protos(b"\x02h2\x08http/1.1") - .map_err(|e| error!("Can not set alpn protocol: {:?}", e)); - SslConnector::Openssl(ssl.build()) - } - #[cfg(all(not(feature = "openssl"), feature = "rustls"))] - { - let protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; - let mut config = ClientConfig::new(); - config.set_protocols(&protos); - config - .root_store - .add_server_trust_anchors(&actix_tls::rustls::TLS_SERVER_ROOTS); - SslConnector::Rustls(Arc::new(config)) - } - #[cfg(not(any(feature = "openssl", feature = "rustls")))] - {} - }; - Connector { - ssl, + ssl: Self::build_ssl(vec![b"h2".to_vec(), b"http/1.1".to_vec()]), connector: default_connector(), - timeout: Duration::from_secs(1), + config: ConnectorConfig::default(), + /* timeout: Duration::from_secs(1), conn_lifetime: Duration::from_secs(75), conn_keep_alive: Duration::from_secs(15), disconnect_timeout: Duration::from_millis(3000), limit: 100, + conn_window_size: DEFAULT_H2_CONN_WINDOW, + stream_window_size: DEFAULT_H2_STREAM_WINDOW,*/ _t: PhantomData, } } + + // Build Ssl connector based on features config and supplied alpn protocols + fn build_ssl(protocols: Vec>) -> SslConnector { + #[cfg(feature = "openssl")] + { + use actix_connect::ssl::openssl::SslMethod; + use bytes::{BufMut, BytesMut}; + + let mut alpn = BytesMut::with_capacity(20); + for proto in protocols.iter() { + alpn.put_u8(proto.len() as u8); + alpn.put(proto.as_slice()); + } + + let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap(); + let _ = ssl + .set_alpn_protos(&alpn) + .map_err(|e| error!("Can not set alpn protocol: {:?}", e)); + SslConnector::Openssl(ssl.build()) + } + #[cfg(all(not(feature = "openssl"), feature = "rustls"))] + { + let mut config = ClientConfig::new(); + config.set_protocols(&protocols); + config + .root_store + .add_server_trust_anchors(&actix_tls::rustls::TLS_SERVER_ROOTS); + SslConnector::Rustls(Arc::new(config)) + } + #[cfg(not(any(feature = "openssl", feature = "rustls")))] + {} + } } impl Connector { @@ -122,11 +136,7 @@ impl Connector { { Connector { connector, - timeout: self.timeout, - conn_lifetime: self.conn_lifetime, - conn_keep_alive: self.conn_keep_alive, - disconnect_timeout: self.disconnect_timeout, - limit: self.limit, + config: self.config, ssl: self.ssl, _t: PhantomData, } @@ -146,7 +156,7 @@ where /// Connection timeout, i.e. max time to connect to remote host including dns name resolution. /// Set to 1 second by default. pub fn timeout(mut self, timeout: Duration) -> Self { - self.timeout = timeout; + self.config.timeout = timeout; self } @@ -163,12 +173,41 @@ where self } + /// Maximum supported http major version + /// When supplied 1 both HTTP/1.0 and HTTP/1.1 will be allowed + pub fn max_http_version(mut self, val: u8) -> Self { + self.ssl = Connector::build_ssl(if val == 1 { + vec![b"http/1.1".to_vec()] + } else { + vec![b"h2".to_vec(), b"http/1.1".to_vec()] + }); + self + } + + /// Indicates the initial window size (in octets) for + /// HTTP2 stream-level flow control for received data. + /// + /// The default value is 65,535 and is good for APIs, but not for big objects. + pub fn initial_window_size(mut self, size: u32) -> Self { + self.config.stream_window_size = size; + self + } + + /// Indicates the initial window size (in octets) for + /// HTTP2 connection-level flow control for received data. + /// + /// The default value is 65,535 and is good for APIs, but not for big objects. + pub fn initial_connection_window_size(mut self, size: u32) -> Self { + self.config.conn_window_size = size; + self + } + /// Set total number of simultaneous connections per type of scheme. /// /// If limit is 0, the connector has no limit. /// The default limit size is 100. pub fn limit(mut self, limit: usize) -> Self { - self.limit = limit; + self.config.limit = limit; self } @@ -179,7 +218,7 @@ where /// exceeds this period, the connection is closed. /// Default keep-alive period is 15 seconds. pub fn conn_keep_alive(mut self, dur: Duration) -> Self { - self.conn_keep_alive = dur; + self.config.conn_keep_alive = dur; self } @@ -189,7 +228,7 @@ where /// until it is closed regardless of keep-alive period. /// Default lifetime period is 75 seconds. pub fn conn_lifetime(mut self, dur: Duration) -> Self { - self.conn_lifetime = dur; + self.config.conn_lifetime = dur; self } @@ -202,7 +241,7 @@ where /// /// By default disconnect timeout is set to 3000 milliseconds. pub fn disconnect_timeout(mut self, dur: Duration) -> Self { - self.disconnect_timeout = dur; + self.config.disconnect_timeout = Some(dur); self } @@ -216,7 +255,7 @@ where #[cfg(not(any(feature = "openssl", feature = "rustls")))] { let connector = TimeoutService::new( - self.timeout, + self.config.timeout, apply_fn(self.connector, |msg: Connect, srv| { srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) }) @@ -231,10 +270,7 @@ where connect_impl::InnerConnector { tcp_pool: ConnectionPool::new( connector, - self.conn_lifetime, - self.conn_keep_alive, - None, - self.limit, + self.config.no_disconnect_timeout(), ), } } @@ -248,7 +284,7 @@ where use actix_service::{boxed::service, pipeline}; let ssl_service = TimeoutService::new( - self.timeout, + self.config.timeout, pipeline( apply_fn(self.connector.clone(), |msg: Connect, srv| { srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) @@ -301,7 +337,7 @@ where }); let tcp_service = TimeoutService::new( - self.timeout, + self.config.timeout, apply_fn(self.connector, |msg: Connect, srv| { srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) }) @@ -316,18 +352,9 @@ where connect_impl::InnerConnector { tcp_pool: ConnectionPool::new( tcp_service, - self.conn_lifetime, - self.conn_keep_alive, - None, - self.limit, - ), - ssl_pool: ConnectionPool::new( - ssl_service, - self.conn_lifetime, - self.conn_keep_alive, - Some(self.disconnect_timeout), - self.limit, + self.config.no_disconnect_timeout(), ), + ssl_pool: ConnectionPool::new(ssl_service, self.config), } } } diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index 3f609f94d..2afd2d80b 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -1,12 +1,15 @@ use std::convert::TryFrom; -use std::time; use std::future::Future; +use std::time; use actix_codec::{AsyncRead, AsyncWrite}; use bytes::Bytes; use futures_util::future::poll_fn; use futures_util::pin_mut; -use h2::{client::{SendRequest, Connection, Builder}, SendStream}; +use h2::{ + client::{Builder, Connection, SendRequest}, + SendStream, +}; use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::{request::Request, Method, Version}; @@ -15,6 +18,7 @@ use crate::header::HeaderMap; use crate::message::{RequestHeadType, ResponseHead}; use crate::payload::Payload; +use super::config::ConnectorConfig; use super::connection::{ConnectionType, IoConnection}; use super::error::SendRequestError; use super::pool::Acquired; @@ -187,20 +191,17 @@ fn release( } } - -// These values are taken from hyper/src/proto/h2/client.rs -const DEFAULT_H2_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb -const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb - -pub(crate) fn handshake(io: Io) - -> impl Future, Connection), h2::Error>> +pub(crate) fn handshake( + io: Io, + config: &ConnectorConfig, +) -> impl Future, Connection), h2::Error>> where Io: AsyncRead + AsyncWrite + Unpin + 'static, { let mut builder = Builder::new(); builder - .initial_window_size(DEFAULT_H2_CONN_WINDOW) - .initial_connection_window_size(DEFAULT_H2_STREAM_WINDOW) + .initial_window_size(config.stream_window_size) + .initial_connection_window_size(config.conn_window_size) .enable_push(false); builder.handshake(io) } diff --git a/actix-http/src/client/mod.rs b/actix-http/src/client/mod.rs index a45aebcd5..dd1e9b25a 100644 --- a/actix-http/src/client/mod.rs +++ b/actix-http/src/client/mod.rs @@ -1,6 +1,7 @@ //! Http client api use http::Uri; +mod config; mod connection; mod connector; mod error; diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index f86ef582e..983396f92 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -19,9 +19,10 @@ use indexmap::IndexSet; use pin_project::pin_project; use slab::Slab; -use super::h2proto::handshake; +use super::config::ConnectorConfig; use super::connection::{ConnectionType, IoConnection}; use super::error::ConnectError; +use super::h2proto::handshake; use super::Connect; #[derive(Clone, Copy, PartialEq)] @@ -51,20 +52,11 @@ where T: Service + 'static, { - pub(crate) fn new( - connector: T, - conn_lifetime: Duration, - conn_keep_alive: Duration, - disconnect_timeout: Option, - limit: usize, - ) -> Self { + pub(crate) fn new(connector: T, config: ConnectorConfig) -> Self { ConnectionPool( Rc::new(RefCell::new(connector)), Rc::new(RefCell::new(Inner { - conn_lifetime, - conn_keep_alive, - disconnect_timeout, - limit, + config, acquired: 0, waiters: Slab::new(), waiters_queue: IndexSet::new(), @@ -130,6 +122,8 @@ where // open tcp connection let (io, proto) = connector.call(req).await?; + let config = inner.borrow().config.clone(); + let guard = OpenGuard::new(key, inner); if proto == Protocol::Http1 { @@ -139,7 +133,7 @@ where Some(guard.consume()), )) } else { - let (snd, connection) = handshake(io).await?; + let (snd, connection) = handshake(io, &config).await?; actix_rt::spawn(connection.map(|_| ())); Ok(IoConnection::new( ConnectionType::H2(snd), @@ -256,10 +250,7 @@ struct AvailableConnection { } pub(crate) struct Inner { - conn_lifetime: Duration, - conn_keep_alive: Duration, - disconnect_timeout: Option, - limit: usize, + config: ConnectorConfig, acquired: usize, available: FxHashMap>>, waiters: Slab< @@ -312,7 +303,7 @@ where fn acquire(&mut self, key: &Key, cx: &mut Context<'_>) -> Acquire { // check limits - if self.limit > 0 && self.acquired >= self.limit { + if self.config.limit > 0 && self.acquired >= self.config.limit { return Acquire::NotAvailable; } @@ -324,10 +315,10 @@ where let now = Instant::now(); while let Some(conn) = connections.pop_back() { // check if it still usable - if (now - conn.used) > self.conn_keep_alive - || (now - conn.created) > self.conn_lifetime + if (now - conn.used) > self.config.conn_keep_alive + || (now - conn.created) > self.config.conn_lifetime { - if let Some(timeout) = self.disconnect_timeout { + if let Some(timeout) = self.config.disconnect_timeout { if let ConnectionType::H1(io) = conn.io { actix_rt::spawn(CloseConnection::new(io, timeout)) } @@ -339,7 +330,7 @@ where match Pin::new(s).poll_read(cx, &mut buf) { Poll::Pending => (), Poll::Ready(Ok(n)) if n > 0 => { - if let Some(timeout) = self.disconnect_timeout { + if let Some(timeout) = self.config.disconnect_timeout { if let ConnectionType::H1(io) = io { actix_rt::spawn(CloseConnection::new( io, timeout, @@ -373,7 +364,7 @@ where fn release_close(&mut self, io: ConnectionType) { self.acquired -= 1; - if let Some(timeout) = self.disconnect_timeout { + if let Some(timeout) = self.config.disconnect_timeout { if let ConnectionType::H1(io) = io { actix_rt::spawn(CloseConnection::new(io, timeout)) } @@ -382,7 +373,7 @@ where } fn check_availibility(&self) { - if !self.waiters_queue.is_empty() && self.acquired < self.limit { + if !self.waiters_queue.is_empty() && self.acquired < self.config.limit { self.waker.wake(); } } @@ -481,6 +472,7 @@ where tx, this.inner.clone(), this.connector.call(connect), + inner.config.clone(), ); } } @@ -507,6 +499,7 @@ where >, rx: Option, ConnectError>>>, inner: Option>>>, + config: ConnectorConfig, } impl OpenWaitingConnection @@ -519,6 +512,7 @@ where rx: oneshot::Sender, ConnectError>>, inner: Rc>>, fut: F, + config: ConnectorConfig, ) { actix_rt::spawn(OpenWaitingConnection { key, @@ -526,6 +520,7 @@ where h2: None, rx: Some(rx), inner: Some(inner), + config, }) } } @@ -595,7 +590,7 @@ where ))); Poll::Ready(()) } else { - *this.h2 = Some(handshake(io).boxed_local()); + *this.h2 = Some(handshake(io, this.config).boxed_local()); self.poll(cx) } }