mirror of https://github.com/fafhrd91/actix-web
client: configurable H2 window sizes and max_http_version
This commit is contained in:
parent
188de637f4
commit
eab0a292e9
|
@ -12,7 +12,11 @@
|
||||||
|
|
||||||
* MessageBody is not implemented for &'static [u8] anymore.
|
* MessageBody is not implemented for &'static [u8] anymore.
|
||||||
|
|
||||||
* Change defaul initial window size and connection window size for HTTP2 to 5MB and 2MB respectively. This 7 times improves download speed for awc when downloading large objects.
|
* Change default initial window size and connection window size for HTTP2 to 2MB and 1MB respectively to improve download speed for awc when downloading large objects.
|
||||||
|
|
||||||
|
* client::Connector accepts initial_window_size and initial_connection_window_size HTTP2 configuration
|
||||||
|
|
||||||
|
* client::Connector allowing to set max_http_version to limit HTTP version to be used
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
// These values are taken from hyper/src/proto/h2/client.rs
|
||||||
|
const DEFAULT_H2_CONN_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
|
||||||
|
const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024 * 1; // 1mb
|
||||||
|
|
||||||
|
/// Connector configuration
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub(crate) struct ConnectorConfig {
|
||||||
|
pub(crate) timeout: Duration,
|
||||||
|
pub(crate) conn_lifetime: Duration,
|
||||||
|
pub(crate) conn_keep_alive: Duration,
|
||||||
|
pub(crate) disconnect_timeout: Option<Duration>,
|
||||||
|
pub(crate) limit: usize,
|
||||||
|
pub(crate) conn_window_size: u32,
|
||||||
|
pub(crate) stream_window_size: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ConnectorConfig {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
timeout: Duration::from_secs(1),
|
||||||
|
conn_lifetime: Duration::from_secs(75),
|
||||||
|
conn_keep_alive: Duration::from_secs(15),
|
||||||
|
disconnect_timeout: Some(Duration::from_millis(3000)),
|
||||||
|
limit: 100,
|
||||||
|
conn_window_size: DEFAULT_H2_CONN_WINDOW,
|
||||||
|
stream_window_size: DEFAULT_H2_STREAM_WINDOW,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectorConfig {
|
||||||
|
pub(crate) fn no_disconnect_timeout(&self) -> Self {
|
||||||
|
let mut res = self.clone();
|
||||||
|
res.disconnect_timeout = None;
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
|
@ -11,6 +11,7 @@ use actix_service::{apply_fn, Service};
|
||||||
use actix_utils::timeout::{TimeoutError, TimeoutService};
|
use actix_utils::timeout::{TimeoutError, TimeoutService};
|
||||||
use http::Uri;
|
use http::Uri;
|
||||||
|
|
||||||
|
use super::config::ConnectorConfig;
|
||||||
use super::connection::Connection;
|
use super::connection::Connection;
|
||||||
use super::error::ConnectError;
|
use super::error::ConnectError;
|
||||||
use super::pool::{ConnectionPool, Protocol};
|
use super::pool::{ConnectionPool, Protocol};
|
||||||
|
@ -48,11 +49,14 @@ type SslConnector = ();
|
||||||
/// ```
|
/// ```
|
||||||
pub struct Connector<T, U> {
|
pub struct Connector<T, U> {
|
||||||
connector: T,
|
connector: T,
|
||||||
timeout: Duration,
|
config: ConnectorConfig,
|
||||||
|
/* timeout: Duration,
|
||||||
conn_lifetime: Duration,
|
conn_lifetime: Duration,
|
||||||
conn_keep_alive: Duration,
|
conn_keep_alive: Duration,
|
||||||
disconnect_timeout: Duration,
|
disconnect_timeout: Duration,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
|
conn_window_size: u32,
|
||||||
|
stream_window_size: u32,*/
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
ssl: SslConnector,
|
ssl: SslConnector,
|
||||||
_t: PhantomData<U>,
|
_t: PhantomData<U>,
|
||||||
|
@ -71,42 +75,52 @@ impl Connector<(), ()> {
|
||||||
> + Clone,
|
> + Clone,
|
||||||
TcpStream,
|
TcpStream,
|
||||||
> {
|
> {
|
||||||
let ssl = {
|
|
||||||
#[cfg(feature = "openssl")]
|
|
||||||
{
|
|
||||||
use actix_connect::ssl::openssl::SslMethod;
|
|
||||||
|
|
||||||
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
|
|
||||||
let _ = ssl
|
|
||||||
.set_alpn_protos(b"\x02h2\x08http/1.1")
|
|
||||||
.map_err(|e| error!("Can not set alpn protocol: {:?}", e));
|
|
||||||
SslConnector::Openssl(ssl.build())
|
|
||||||
}
|
|
||||||
#[cfg(all(not(feature = "openssl"), feature = "rustls"))]
|
|
||||||
{
|
|
||||||
let protos = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
|
||||||
let mut config = ClientConfig::new();
|
|
||||||
config.set_protocols(&protos);
|
|
||||||
config
|
|
||||||
.root_store
|
|
||||||
.add_server_trust_anchors(&actix_tls::rustls::TLS_SERVER_ROOTS);
|
|
||||||
SslConnector::Rustls(Arc::new(config))
|
|
||||||
}
|
|
||||||
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
|
||||||
{}
|
|
||||||
};
|
|
||||||
|
|
||||||
Connector {
|
Connector {
|
||||||
ssl,
|
ssl: Self::build_ssl(vec![b"h2".to_vec(), b"http/1.1".to_vec()]),
|
||||||
connector: default_connector(),
|
connector: default_connector(),
|
||||||
timeout: Duration::from_secs(1),
|
config: ConnectorConfig::default(),
|
||||||
|
/* timeout: Duration::from_secs(1),
|
||||||
conn_lifetime: Duration::from_secs(75),
|
conn_lifetime: Duration::from_secs(75),
|
||||||
conn_keep_alive: Duration::from_secs(15),
|
conn_keep_alive: Duration::from_secs(15),
|
||||||
disconnect_timeout: Duration::from_millis(3000),
|
disconnect_timeout: Duration::from_millis(3000),
|
||||||
limit: 100,
|
limit: 100,
|
||||||
|
conn_window_size: DEFAULT_H2_CONN_WINDOW,
|
||||||
|
stream_window_size: DEFAULT_H2_STREAM_WINDOW,*/
|
||||||
_t: PhantomData,
|
_t: PhantomData,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build Ssl connector based on features config and supplied alpn protocols
|
||||||
|
fn build_ssl(protocols: Vec<Vec<u8>>) -> SslConnector {
|
||||||
|
#[cfg(feature = "openssl")]
|
||||||
|
{
|
||||||
|
use actix_connect::ssl::openssl::SslMethod;
|
||||||
|
use bytes::{BufMut, BytesMut};
|
||||||
|
|
||||||
|
let mut alpn = BytesMut::with_capacity(20);
|
||||||
|
for proto in protocols.iter() {
|
||||||
|
alpn.put_u8(proto.len() as u8);
|
||||||
|
alpn.put(proto.as_slice());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
|
||||||
|
let _ = ssl
|
||||||
|
.set_alpn_protos(&alpn)
|
||||||
|
.map_err(|e| error!("Can not set alpn protocol: {:?}", e));
|
||||||
|
SslConnector::Openssl(ssl.build())
|
||||||
|
}
|
||||||
|
#[cfg(all(not(feature = "openssl"), feature = "rustls"))]
|
||||||
|
{
|
||||||
|
let mut config = ClientConfig::new();
|
||||||
|
config.set_protocols(&protocols);
|
||||||
|
config
|
||||||
|
.root_store
|
||||||
|
.add_server_trust_anchors(&actix_tls::rustls::TLS_SERVER_ROOTS);
|
||||||
|
SslConnector::Rustls(Arc::new(config))
|
||||||
|
}
|
||||||
|
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
||||||
|
{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T, U> Connector<T, U> {
|
impl<T, U> Connector<T, U> {
|
||||||
|
@ -122,11 +136,7 @@ impl<T, U> Connector<T, U> {
|
||||||
{
|
{
|
||||||
Connector {
|
Connector {
|
||||||
connector,
|
connector,
|
||||||
timeout: self.timeout,
|
config: self.config,
|
||||||
conn_lifetime: self.conn_lifetime,
|
|
||||||
conn_keep_alive: self.conn_keep_alive,
|
|
||||||
disconnect_timeout: self.disconnect_timeout,
|
|
||||||
limit: self.limit,
|
|
||||||
ssl: self.ssl,
|
ssl: self.ssl,
|
||||||
_t: PhantomData,
|
_t: PhantomData,
|
||||||
}
|
}
|
||||||
|
@ -146,7 +156,7 @@ where
|
||||||
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
|
/// Connection timeout, i.e. max time to connect to remote host including dns name resolution.
|
||||||
/// Set to 1 second by default.
|
/// Set to 1 second by default.
|
||||||
pub fn timeout(mut self, timeout: Duration) -> Self {
|
pub fn timeout(mut self, timeout: Duration) -> Self {
|
||||||
self.timeout = timeout;
|
self.config.timeout = timeout;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -163,12 +173,41 @@ where
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Maximum supported http major version
|
||||||
|
/// When supplied 1 both HTTP/1.0 and HTTP/1.1 will be allowed
|
||||||
|
pub fn max_http_version(mut self, val: u8) -> Self {
|
||||||
|
self.ssl = Connector::build_ssl(if val == 1 {
|
||||||
|
vec![b"http/1.1".to_vec()]
|
||||||
|
} else {
|
||||||
|
vec![b"h2".to_vec(), b"http/1.1".to_vec()]
|
||||||
|
});
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Indicates the initial window size (in octets) for
|
||||||
|
/// HTTP2 stream-level flow control for received data.
|
||||||
|
///
|
||||||
|
/// The default value is 65,535 and is good for APIs, but not for big objects.
|
||||||
|
pub fn initial_window_size(mut self, size: u32) -> Self {
|
||||||
|
self.config.stream_window_size = size;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Indicates the initial window size (in octets) for
|
||||||
|
/// HTTP2 connection-level flow control for received data.
|
||||||
|
///
|
||||||
|
/// The default value is 65,535 and is good for APIs, but not for big objects.
|
||||||
|
pub fn initial_connection_window_size(mut self, size: u32) -> Self {
|
||||||
|
self.config.conn_window_size = size;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
/// Set total number of simultaneous connections per type of scheme.
|
/// Set total number of simultaneous connections per type of scheme.
|
||||||
///
|
///
|
||||||
/// If limit is 0, the connector has no limit.
|
/// If limit is 0, the connector has no limit.
|
||||||
/// The default limit size is 100.
|
/// The default limit size is 100.
|
||||||
pub fn limit(mut self, limit: usize) -> Self {
|
pub fn limit(mut self, limit: usize) -> Self {
|
||||||
self.limit = limit;
|
self.config.limit = limit;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,7 +218,7 @@ where
|
||||||
/// exceeds this period, the connection is closed.
|
/// exceeds this period, the connection is closed.
|
||||||
/// Default keep-alive period is 15 seconds.
|
/// Default keep-alive period is 15 seconds.
|
||||||
pub fn conn_keep_alive(mut self, dur: Duration) -> Self {
|
pub fn conn_keep_alive(mut self, dur: Duration) -> Self {
|
||||||
self.conn_keep_alive = dur;
|
self.config.conn_keep_alive = dur;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,7 +228,7 @@ where
|
||||||
/// until it is closed regardless of keep-alive period.
|
/// until it is closed regardless of keep-alive period.
|
||||||
/// Default lifetime period is 75 seconds.
|
/// Default lifetime period is 75 seconds.
|
||||||
pub fn conn_lifetime(mut self, dur: Duration) -> Self {
|
pub fn conn_lifetime(mut self, dur: Duration) -> Self {
|
||||||
self.conn_lifetime = dur;
|
self.config.conn_lifetime = dur;
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,7 +241,7 @@ where
|
||||||
///
|
///
|
||||||
/// By default disconnect timeout is set to 3000 milliseconds.
|
/// By default disconnect timeout is set to 3000 milliseconds.
|
||||||
pub fn disconnect_timeout(mut self, dur: Duration) -> Self {
|
pub fn disconnect_timeout(mut self, dur: Duration) -> Self {
|
||||||
self.disconnect_timeout = dur;
|
self.config.disconnect_timeout = Some(dur);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,7 +255,7 @@ where
|
||||||
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
||||||
{
|
{
|
||||||
let connector = TimeoutService::new(
|
let connector = TimeoutService::new(
|
||||||
self.timeout,
|
self.config.timeout,
|
||||||
apply_fn(self.connector, |msg: Connect, srv| {
|
apply_fn(self.connector, |msg: Connect, srv| {
|
||||||
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
||||||
})
|
})
|
||||||
|
@ -231,10 +270,7 @@ where
|
||||||
connect_impl::InnerConnector {
|
connect_impl::InnerConnector {
|
||||||
tcp_pool: ConnectionPool::new(
|
tcp_pool: ConnectionPool::new(
|
||||||
connector,
|
connector,
|
||||||
self.conn_lifetime,
|
self.config.no_disconnect_timeout(),
|
||||||
self.conn_keep_alive,
|
|
||||||
None,
|
|
||||||
self.limit,
|
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -248,7 +284,7 @@ where
|
||||||
use actix_service::{boxed::service, pipeline};
|
use actix_service::{boxed::service, pipeline};
|
||||||
|
|
||||||
let ssl_service = TimeoutService::new(
|
let ssl_service = TimeoutService::new(
|
||||||
self.timeout,
|
self.config.timeout,
|
||||||
pipeline(
|
pipeline(
|
||||||
apply_fn(self.connector.clone(), |msg: Connect, srv| {
|
apply_fn(self.connector.clone(), |msg: Connect, srv| {
|
||||||
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
||||||
|
@ -301,7 +337,7 @@ where
|
||||||
});
|
});
|
||||||
|
|
||||||
let tcp_service = TimeoutService::new(
|
let tcp_service = TimeoutService::new(
|
||||||
self.timeout,
|
self.config.timeout,
|
||||||
apply_fn(self.connector, |msg: Connect, srv| {
|
apply_fn(self.connector, |msg: Connect, srv| {
|
||||||
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
||||||
})
|
})
|
||||||
|
@ -316,18 +352,9 @@ where
|
||||||
connect_impl::InnerConnector {
|
connect_impl::InnerConnector {
|
||||||
tcp_pool: ConnectionPool::new(
|
tcp_pool: ConnectionPool::new(
|
||||||
tcp_service,
|
tcp_service,
|
||||||
self.conn_lifetime,
|
self.config.no_disconnect_timeout(),
|
||||||
self.conn_keep_alive,
|
|
||||||
None,
|
|
||||||
self.limit,
|
|
||||||
),
|
|
||||||
ssl_pool: ConnectionPool::new(
|
|
||||||
ssl_service,
|
|
||||||
self.conn_lifetime,
|
|
||||||
self.conn_keep_alive,
|
|
||||||
Some(self.disconnect_timeout),
|
|
||||||
self.limit,
|
|
||||||
),
|
),
|
||||||
|
ssl_pool: ConnectionPool::new(ssl_service, self.config),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,15 @@
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::time;
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
|
use std::time;
|
||||||
|
|
||||||
use actix_codec::{AsyncRead, AsyncWrite};
|
use actix_codec::{AsyncRead, AsyncWrite};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_util::future::poll_fn;
|
use futures_util::future::poll_fn;
|
||||||
use futures_util::pin_mut;
|
use futures_util::pin_mut;
|
||||||
use h2::{client::{SendRequest, Connection, Builder}, SendStream};
|
use h2::{
|
||||||
|
client::{Builder, Connection, SendRequest},
|
||||||
|
SendStream,
|
||||||
|
};
|
||||||
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
|
use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING};
|
||||||
use http::{request::Request, Method, Version};
|
use http::{request::Request, Method, Version};
|
||||||
|
|
||||||
|
@ -15,6 +18,7 @@ use crate::header::HeaderMap;
|
||||||
use crate::message::{RequestHeadType, ResponseHead};
|
use crate::message::{RequestHeadType, ResponseHead};
|
||||||
use crate::payload::Payload;
|
use crate::payload::Payload;
|
||||||
|
|
||||||
|
use super::config::ConnectorConfig;
|
||||||
use super::connection::{ConnectionType, IoConnection};
|
use super::connection::{ConnectionType, IoConnection};
|
||||||
use super::error::SendRequestError;
|
use super::error::SendRequestError;
|
||||||
use super::pool::Acquired;
|
use super::pool::Acquired;
|
||||||
|
@ -187,20 +191,17 @@ fn release<T: AsyncRead + AsyncWrite + Unpin + 'static>(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn handshake<Io>(
|
||||||
// These values are taken from hyper/src/proto/h2/client.rs
|
io: Io,
|
||||||
const DEFAULT_H2_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
|
config: &ConnectorConfig,
|
||||||
const DEFAULT_H2_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
|
) -> impl Future<Output = Result<(SendRequest<Bytes>, Connection<Io, Bytes>), h2::Error>>
|
||||||
|
|
||||||
pub(crate) fn handshake<Io>(io: Io)
|
|
||||||
-> impl Future<Output=Result<(SendRequest<Bytes>, Connection<Io, Bytes>), h2::Error>>
|
|
||||||
where
|
where
|
||||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||||
{
|
{
|
||||||
let mut builder = Builder::new();
|
let mut builder = Builder::new();
|
||||||
builder
|
builder
|
||||||
.initial_window_size(DEFAULT_H2_CONN_WINDOW)
|
.initial_window_size(config.stream_window_size)
|
||||||
.initial_connection_window_size(DEFAULT_H2_STREAM_WINDOW)
|
.initial_connection_window_size(config.conn_window_size)
|
||||||
.enable_push(false);
|
.enable_push(false);
|
||||||
builder.handshake(io)
|
builder.handshake(io)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
//! Http client api
|
//! Http client api
|
||||||
use http::Uri;
|
use http::Uri;
|
||||||
|
|
||||||
|
mod config;
|
||||||
mod connection;
|
mod connection;
|
||||||
mod connector;
|
mod connector;
|
||||||
mod error;
|
mod error;
|
||||||
|
|
|
@ -19,9 +19,10 @@ use indexmap::IndexSet;
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
|
||||||
use super::h2proto::handshake;
|
use super::config::ConnectorConfig;
|
||||||
use super::connection::{ConnectionType, IoConnection};
|
use super::connection::{ConnectionType, IoConnection};
|
||||||
use super::error::ConnectError;
|
use super::error::ConnectError;
|
||||||
|
use super::h2proto::handshake;
|
||||||
use super::Connect;
|
use super::Connect;
|
||||||
|
|
||||||
#[derive(Clone, Copy, PartialEq)]
|
#[derive(Clone, Copy, PartialEq)]
|
||||||
|
@ -51,20 +52,11 @@ where
|
||||||
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
T: Service<Request = Connect, Response = (Io, Protocol), Error = ConnectError>
|
||||||
+ 'static,
|
+ 'static,
|
||||||
{
|
{
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(connector: T, config: ConnectorConfig) -> Self {
|
||||||
connector: T,
|
|
||||||
conn_lifetime: Duration,
|
|
||||||
conn_keep_alive: Duration,
|
|
||||||
disconnect_timeout: Option<Duration>,
|
|
||||||
limit: usize,
|
|
||||||
) -> Self {
|
|
||||||
ConnectionPool(
|
ConnectionPool(
|
||||||
Rc::new(RefCell::new(connector)),
|
Rc::new(RefCell::new(connector)),
|
||||||
Rc::new(RefCell::new(Inner {
|
Rc::new(RefCell::new(Inner {
|
||||||
conn_lifetime,
|
config,
|
||||||
conn_keep_alive,
|
|
||||||
disconnect_timeout,
|
|
||||||
limit,
|
|
||||||
acquired: 0,
|
acquired: 0,
|
||||||
waiters: Slab::new(),
|
waiters: Slab::new(),
|
||||||
waiters_queue: IndexSet::new(),
|
waiters_queue: IndexSet::new(),
|
||||||
|
@ -130,6 +122,8 @@ where
|
||||||
// open tcp connection
|
// open tcp connection
|
||||||
let (io, proto) = connector.call(req).await?;
|
let (io, proto) = connector.call(req).await?;
|
||||||
|
|
||||||
|
let config = inner.borrow().config.clone();
|
||||||
|
|
||||||
let guard = OpenGuard::new(key, inner);
|
let guard = OpenGuard::new(key, inner);
|
||||||
|
|
||||||
if proto == Protocol::Http1 {
|
if proto == Protocol::Http1 {
|
||||||
|
@ -139,7 +133,7 @@ where
|
||||||
Some(guard.consume()),
|
Some(guard.consume()),
|
||||||
))
|
))
|
||||||
} else {
|
} else {
|
||||||
let (snd, connection) = handshake(io).await?;
|
let (snd, connection) = handshake(io, &config).await?;
|
||||||
actix_rt::spawn(connection.map(|_| ()));
|
actix_rt::spawn(connection.map(|_| ()));
|
||||||
Ok(IoConnection::new(
|
Ok(IoConnection::new(
|
||||||
ConnectionType::H2(snd),
|
ConnectionType::H2(snd),
|
||||||
|
@ -256,10 +250,7 @@ struct AvailableConnection<Io> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct Inner<Io> {
|
pub(crate) struct Inner<Io> {
|
||||||
conn_lifetime: Duration,
|
config: ConnectorConfig,
|
||||||
conn_keep_alive: Duration,
|
|
||||||
disconnect_timeout: Option<Duration>,
|
|
||||||
limit: usize,
|
|
||||||
acquired: usize,
|
acquired: usize,
|
||||||
available: FxHashMap<Key, VecDeque<AvailableConnection<Io>>>,
|
available: FxHashMap<Key, VecDeque<AvailableConnection<Io>>>,
|
||||||
waiters: Slab<
|
waiters: Slab<
|
||||||
|
@ -312,7 +303,7 @@ where
|
||||||
|
|
||||||
fn acquire(&mut self, key: &Key, cx: &mut Context<'_>) -> Acquire<Io> {
|
fn acquire(&mut self, key: &Key, cx: &mut Context<'_>) -> Acquire<Io> {
|
||||||
// check limits
|
// check limits
|
||||||
if self.limit > 0 && self.acquired >= self.limit {
|
if self.config.limit > 0 && self.acquired >= self.config.limit {
|
||||||
return Acquire::NotAvailable;
|
return Acquire::NotAvailable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,10 +315,10 @@ where
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
while let Some(conn) = connections.pop_back() {
|
while let Some(conn) = connections.pop_back() {
|
||||||
// check if it still usable
|
// check if it still usable
|
||||||
if (now - conn.used) > self.conn_keep_alive
|
if (now - conn.used) > self.config.conn_keep_alive
|
||||||
|| (now - conn.created) > self.conn_lifetime
|
|| (now - conn.created) > self.config.conn_lifetime
|
||||||
{
|
{
|
||||||
if let Some(timeout) = self.disconnect_timeout {
|
if let Some(timeout) = self.config.disconnect_timeout {
|
||||||
if let ConnectionType::H1(io) = conn.io {
|
if let ConnectionType::H1(io) = conn.io {
|
||||||
actix_rt::spawn(CloseConnection::new(io, timeout))
|
actix_rt::spawn(CloseConnection::new(io, timeout))
|
||||||
}
|
}
|
||||||
|
@ -339,7 +330,7 @@ where
|
||||||
match Pin::new(s).poll_read(cx, &mut buf) {
|
match Pin::new(s).poll_read(cx, &mut buf) {
|
||||||
Poll::Pending => (),
|
Poll::Pending => (),
|
||||||
Poll::Ready(Ok(n)) if n > 0 => {
|
Poll::Ready(Ok(n)) if n > 0 => {
|
||||||
if let Some(timeout) = self.disconnect_timeout {
|
if let Some(timeout) = self.config.disconnect_timeout {
|
||||||
if let ConnectionType::H1(io) = io {
|
if let ConnectionType::H1(io) = io {
|
||||||
actix_rt::spawn(CloseConnection::new(
|
actix_rt::spawn(CloseConnection::new(
|
||||||
io, timeout,
|
io, timeout,
|
||||||
|
@ -373,7 +364,7 @@ where
|
||||||
|
|
||||||
fn release_close(&mut self, io: ConnectionType<Io>) {
|
fn release_close(&mut self, io: ConnectionType<Io>) {
|
||||||
self.acquired -= 1;
|
self.acquired -= 1;
|
||||||
if let Some(timeout) = self.disconnect_timeout {
|
if let Some(timeout) = self.config.disconnect_timeout {
|
||||||
if let ConnectionType::H1(io) = io {
|
if let ConnectionType::H1(io) = io {
|
||||||
actix_rt::spawn(CloseConnection::new(io, timeout))
|
actix_rt::spawn(CloseConnection::new(io, timeout))
|
||||||
}
|
}
|
||||||
|
@ -382,7 +373,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_availibility(&self) {
|
fn check_availibility(&self) {
|
||||||
if !self.waiters_queue.is_empty() && self.acquired < self.limit {
|
if !self.waiters_queue.is_empty() && self.acquired < self.config.limit {
|
||||||
self.waker.wake();
|
self.waker.wake();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -481,6 +472,7 @@ where
|
||||||
tx,
|
tx,
|
||||||
this.inner.clone(),
|
this.inner.clone(),
|
||||||
this.connector.call(connect),
|
this.connector.call(connect),
|
||||||
|
inner.config.clone(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -507,6 +499,7 @@ where
|
||||||
>,
|
>,
|
||||||
rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectError>>>,
|
rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectError>>>,
|
||||||
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
||||||
|
config: ConnectorConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F, Io> OpenWaitingConnection<F, Io>
|
impl<F, Io> OpenWaitingConnection<F, Io>
|
||||||
|
@ -519,6 +512,7 @@ where
|
||||||
rx: oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
|
rx: oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
|
||||||
inner: Rc<RefCell<Inner<Io>>>,
|
inner: Rc<RefCell<Inner<Io>>>,
|
||||||
fut: F,
|
fut: F,
|
||||||
|
config: ConnectorConfig,
|
||||||
) {
|
) {
|
||||||
actix_rt::spawn(OpenWaitingConnection {
|
actix_rt::spawn(OpenWaitingConnection {
|
||||||
key,
|
key,
|
||||||
|
@ -526,6 +520,7 @@ where
|
||||||
h2: None,
|
h2: None,
|
||||||
rx: Some(rx),
|
rx: Some(rx),
|
||||||
inner: Some(inner),
|
inner: Some(inner),
|
||||||
|
config,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -595,7 +590,7 @@ where
|
||||||
)));
|
)));
|
||||||
Poll::Ready(())
|
Poll::Ready(())
|
||||||
} else {
|
} else {
|
||||||
*this.h2 = Some(handshake(io).boxed_local());
|
*this.h2 = Some(handshake(io, this.config).boxed_local());
|
||||||
self.poll(cx)
|
self.poll(cx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue