mirror of https://github.com/fafhrd91/actix-web
rework client connection pool
This commit is contained in:
parent
68d1bd88b1
commit
7e38644b60
|
@ -4,6 +4,10 @@
|
|||
### Changed
|
||||
* Feature `cookies` is now optional and disabled by default. [#1981]
|
||||
|
||||
### Removed
|
||||
* re-export of `futures_channel::oneshot::Canceled` is removed from `error` mod.
|
||||
* `ResponseError` impl for `futures_channel::oneshot::Canceled` is removed.
|
||||
|
||||
[#1981]: https://github.com/actix/actix-web/pull/1981
|
||||
|
||||
|
||||
|
|
|
@ -59,13 +59,11 @@ cfg-if = "1"
|
|||
cookie = { version = "0.14.1", features = ["percent-encode"], optional = true }
|
||||
derive_more = "0.99.5"
|
||||
encoding_rs = "0.8"
|
||||
futures-channel = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
||||
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
||||
futures-util = { version = "0.3.7", default-features = false, features = ["alloc", "sink"] }
|
||||
h2 = "0.3.0"
|
||||
http = "0.2.2"
|
||||
httparse = "1.3"
|
||||
indexmap = "1.3"
|
||||
itoa = "0.4"
|
||||
language-tags = "0.2"
|
||||
lazy_static = "1.4"
|
||||
|
@ -79,9 +77,9 @@ serde = "1.0"
|
|||
serde_json = "1.0"
|
||||
serde_urlencoded = "0.7"
|
||||
sha-1 = "0.9"
|
||||
slab = "0.4"
|
||||
smallvec = "1.6"
|
||||
time = { version = "0.2.23", default-features = false, features = ["std"] }
|
||||
tokio = { version = "1.2", features = ["sync"] }
|
||||
|
||||
# compression
|
||||
brotli2 = { version="0.3.2", optional = true }
|
||||
|
|
|
@ -103,7 +103,10 @@ pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
|
|||
|
||||
#[doc(hidden)]
|
||||
/// HTTP client connection
|
||||
pub struct IoConnection<T> {
|
||||
pub struct IoConnection<T>
|
||||
where
|
||||
T: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
io: Option<ConnectionType<T>>,
|
||||
created: time::Instant,
|
||||
pool: Option<Acquired<T>>,
|
||||
|
@ -111,7 +114,7 @@ pub struct IoConnection<T> {
|
|||
|
||||
impl<T> fmt::Debug for IoConnection<T>
|
||||
where
|
||||
T: fmt::Debug,
|
||||
T: AsyncWrite + Unpin + fmt::Debug + 'static,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self.io {
|
||||
|
@ -202,7 +205,11 @@ where
|
|||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) enum EitherConnection<A, B> {
|
||||
pub(crate) enum EitherConnection<A, B>
|
||||
where
|
||||
A: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
B: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
A(IoConnection<A>),
|
||||
B(IoConnection<B>),
|
||||
}
|
||||
|
|
|
@ -165,7 +165,10 @@ where
|
|||
|
||||
#[doc(hidden)]
|
||||
/// HTTP client connection
|
||||
pub struct H1Connection<T> {
|
||||
pub struct H1Connection<T>
|
||||
where
|
||||
T: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
/// T should be `Unpin`
|
||||
io: Option<T>,
|
||||
created: time::Instant,
|
||||
|
|
|
@ -1,32 +1,27 @@
|
|||
use std::cell::RefCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::future::Future;
|
||||
use std::ops::Deref;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use actix_rt::time::{sleep, Sleep};
|
||||
use actix_service::Service;
|
||||
use actix_utils::task::LocalWaker;
|
||||
use ahash::AHashMap;
|
||||
use bytes::Bytes;
|
||||
use futures_channel::oneshot;
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use futures_util::future::{poll_fn, FutureExt};
|
||||
use h2::client::{Connection, SendRequest};
|
||||
use http::uri::Authority;
|
||||
use indexmap::IndexSet;
|
||||
use pin_project::pin_project;
|
||||
use slab::Slab;
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
|
||||
use super::config::ConnectorConfig;
|
||||
use super::connection::{ConnectionType, IoConnection};
|
||||
use super::connection::{ConnectionType, H2Connection, IoConnection};
|
||||
use super::error::ConnectError;
|
||||
use super::h2proto::handshake;
|
||||
use super::Connect;
|
||||
use crate::client::connection::H2Connection;
|
||||
|
||||
#[derive(Clone, Copy, PartialEq)]
|
||||
/// Protocol version
|
||||
|
@ -46,358 +41,281 @@ impl From<Authority> for Key {
|
|||
}
|
||||
}
|
||||
|
||||
/// Connections pool
|
||||
pub(crate) struct ConnectionPool<T, Io: 'static>(Rc<T>, Rc<RefCell<Inner<Io>>>);
|
||||
|
||||
impl<T, Io> ConnectionPool<T, Io>
|
||||
/// Connections pool for reuse Io type for certain [`http::uri::Authority`] as key
|
||||
pub(crate) struct ConnectionPool<S, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + 'static,
|
||||
Io: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
pub(crate) fn new(connector: T, config: ConnectorConfig) -> Self {
|
||||
let connector_rc = Rc::new(connector);
|
||||
let inner_rc = Rc::new(RefCell::new(Inner {
|
||||
config,
|
||||
acquired: 0,
|
||||
waiters: Slab::new(),
|
||||
waiters_queue: IndexSet::new(),
|
||||
available: AHashMap::default(),
|
||||
waker: LocalWaker::new(),
|
||||
}));
|
||||
connector: Rc<S>,
|
||||
inner: ConnectionPoolInner<Io>,
|
||||
}
|
||||
|
||||
// start support future
|
||||
actix_rt::spawn(ConnectorPoolSupport {
|
||||
connector: Rc::clone(&connector_rc),
|
||||
inner: Rc::clone(&inner_rc),
|
||||
});
|
||||
/// wrapper type for check the ref count of Rc.
|
||||
struct ConnectionPoolInner<Io>(Rc<ConnectionPoolInnerPriv<Io>>)
|
||||
where
|
||||
Io: AsyncWrite + Unpin + 'static;
|
||||
|
||||
ConnectionPool(connector_rc, inner_rc)
|
||||
impl<Io> ConnectionPoolInner<Io>
|
||||
where
|
||||
Io: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
/// spawn a async for graceful shutdown h1 Io type with a timeout.
|
||||
fn close(&self, conn: ConnectionType<Io>) {
|
||||
if let Some(timeout) = self.config.disconnect_timeout {
|
||||
if let ConnectionType::H1(io) = conn {
|
||||
actix_rt::spawn(CloseConnection::new(io, timeout));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Io> Clone for ConnectionPool<T, Io>
|
||||
impl<Io> Clone for ConnectionPoolInner<Io>
|
||||
where
|
||||
Io: 'static,
|
||||
Io: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
ConnectionPool(self.0.clone(), self.1.clone())
|
||||
Self(Rc::clone(&self.0))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Io> Drop for ConnectionPool<T, Io> {
|
||||
fn drop(&mut self) {
|
||||
// wake up the ConnectorPoolSupport when dropping so it can exit properly.
|
||||
self.1.borrow().waker.wake();
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Io> Service<Connect> for ConnectionPool<T, Io>
|
||||
impl<Io> Deref for ConnectionPoolInner<Io>
|
||||
where
|
||||
Io: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
type Target = ConnectionPoolInnerPriv<Io>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&*self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<Io> Drop for ConnectionPoolInner<Io>
|
||||
where
|
||||
Io: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
// When strong count is one it means the pool is dropped
|
||||
// remove and drop all Io types.
|
||||
if Rc::strong_count(&self.0) == 1 {
|
||||
self.permits.close();
|
||||
std::mem::take(&mut *self.available.borrow_mut())
|
||||
.into_iter()
|
||||
.for_each(|(_, conns)| {
|
||||
conns.into_iter().for_each(|pooled| self.close(pooled.conn))
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct ConnectionPoolInnerPriv<Io>
|
||||
where
|
||||
Io: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
config: ConnectorConfig,
|
||||
available: RefCell<AHashMap<Key, VecDeque<PooledConnection<Io>>>>,
|
||||
permits: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
impl<S, Io> ConnectionPool<S, Io>
|
||||
where
|
||||
Io: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
/// construct a new connection pool.
|
||||
///
|
||||
/// [`super::config::ConnectorConfig`]'s `limit` is used as the max permits allowed
|
||||
/// for on flight connections.
|
||||
///
|
||||
/// The pool can only have equal to `limit` amount of requests spawning/using Io type
|
||||
/// concurrently.
|
||||
///
|
||||
/// Any requests beyond limit would be wait in fifo order and get notified in async
|
||||
/// manner by [`tokio::sync::Semaphore`]
|
||||
pub(crate) fn new(connector: S, config: ConnectorConfig) -> Self {
|
||||
let permits = Arc::new(Semaphore::new(config.limit));
|
||||
let available = RefCell::new(AHashMap::default());
|
||||
let connector = Rc::new(connector);
|
||||
|
||||
let inner = ConnectionPoolInner(Rc::new(ConnectionPoolInnerPriv {
|
||||
config,
|
||||
available,
|
||||
permits,
|
||||
}));
|
||||
|
||||
Self { connector, inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Io> Clone for ConnectionPool<S, Io>
|
||||
where
|
||||
Io: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
connector: self.connector.clone(),
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Io> Service<Connect> for ConnectionPool<S, Io>
|
||||
where
|
||||
S: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + 'static,
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + 'static,
|
||||
{
|
||||
type Response = IoConnection<Io>;
|
||||
type Error = ConnectError;
|
||||
type Future = LocalBoxFuture<'static, Result<IoConnection<Io>, ConnectError>>;
|
||||
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_ready(cx)
|
||||
self.connector.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&self, req: Connect) -> Self::Future {
|
||||
let connector = self.0.clone();
|
||||
let inner = self.1.clone();
|
||||
let connector = self.connector.clone();
|
||||
let inner = self.inner.clone();
|
||||
|
||||
let fut = async move {
|
||||
Box::pin(async move {
|
||||
let key = if let Some(authority) = req.uri.authority() {
|
||||
authority.clone().into()
|
||||
} else {
|
||||
return Err(ConnectError::Unresolved);
|
||||
};
|
||||
|
||||
// acquire connection
|
||||
match poll_fn(|cx| Poll::Ready(inner.borrow_mut().acquire(&key, cx))).await {
|
||||
Acquire::Acquired(io, created) => {
|
||||
// use existing connection
|
||||
Ok(IoConnection::new(
|
||||
io,
|
||||
created,
|
||||
Some(Acquired(key, Some(inner))),
|
||||
))
|
||||
// acquire an owned permit and carry it with connection
|
||||
let permit = inner
|
||||
.permits
|
||||
.clone()
|
||||
.acquire_owned()
|
||||
.await
|
||||
// TODO: use specific error for semaphore acquire error
|
||||
.map_err(|_| ConnectError::NoRecords)?;
|
||||
|
||||
// check if there is idle connection for given key.
|
||||
let mut map = inner.available.borrow_mut();
|
||||
|
||||
let mut conn = None;
|
||||
if let Some(conns) = map.get_mut(&key) {
|
||||
let now = Instant::now();
|
||||
while let Some(mut c) = conns.pop_front() {
|
||||
// check the lifetime and drop connection that live for too long.
|
||||
if (now - c.used) > inner.config.conn_keep_alive
|
||||
|| (now - c.created) > inner.config.conn_lifetime
|
||||
{
|
||||
inner.close(c.conn);
|
||||
// check if the connection is still usable.
|
||||
} else {
|
||||
if let ConnectionType::H1(ref mut io) = c.conn {
|
||||
let check = ConnectionCheckFuture { io };
|
||||
match check.await {
|
||||
ConnectionState::Break => {
|
||||
inner.close(c.conn);
|
||||
continue;
|
||||
}
|
||||
Acquire::Available => {
|
||||
// open tcp connection
|
||||
ConnectionState::Skip => continue,
|
||||
ConnectionState::Live => conn = Some(c),
|
||||
}
|
||||
} else {
|
||||
conn = Some(c);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// drop map early to end the borrow_mut of RefCell.
|
||||
drop(map);
|
||||
|
||||
// construct acquired. It's used to put Io type back to pool/ close the Io type.
|
||||
// permit is carried with the whole lifecycle of Acquired.
|
||||
let acquired = Some(Acquired { key, inner, permit });
|
||||
|
||||
// match the connection and spawn new one if did not get anything.
|
||||
match conn {
|
||||
Some(conn) => {
|
||||
Ok(IoConnection::new(conn.conn, conn.created.into(), acquired))
|
||||
}
|
||||
None => {
|
||||
let (io, proto) = connector.call(req).await?;
|
||||
|
||||
let config = inner.borrow().config.clone();
|
||||
|
||||
let guard = OpenGuard::new(key, inner);
|
||||
|
||||
if proto == Protocol::Http1 {
|
||||
Ok(IoConnection::new(
|
||||
ConnectionType::H1(io),
|
||||
Instant::now(),
|
||||
Some(guard.consume()),
|
||||
Instant::now().into(),
|
||||
acquired,
|
||||
))
|
||||
} else {
|
||||
let (sender, connection) = handshake(io, &config).await?;
|
||||
let config = &acquired.as_ref().unwrap().inner.config;
|
||||
let (sender, connection) = handshake(io, config).await?;
|
||||
Ok(IoConnection::new(
|
||||
ConnectionType::H2(H2Connection::new(sender, connection)),
|
||||
Instant::now(),
|
||||
Some(guard.consume()),
|
||||
Instant::now().into(),
|
||||
acquired,
|
||||
))
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
// connection is not available, wait
|
||||
let (rx, token) = inner.borrow_mut().wait_for(req);
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
let guard = WaiterGuard::new(key, token, inner);
|
||||
let res = match rx.await {
|
||||
Err(_) => Err(ConnectError::Disconnected),
|
||||
Ok(res) => res,
|
||||
};
|
||||
guard.consume();
|
||||
res
|
||||
}
|
||||
/// Type for check the connection and determine if it's usable.
|
||||
struct ConnectionCheckFuture<'a, Io> {
|
||||
io: &'a mut Io,
|
||||
}
|
||||
|
||||
enum ConnectionState {
|
||||
Live,
|
||||
Break,
|
||||
Skip,
|
||||
}
|
||||
|
||||
impl<Io> Future for ConnectionCheckFuture<'_, Io>
|
||||
where
|
||||
Io: AsyncRead + Unpin,
|
||||
{
|
||||
type Output = ConnectionState;
|
||||
|
||||
// this future is only used to get access to Context.
|
||||
// It should never return Poll::Pending.
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
let mut buf = [0; 2];
|
||||
let mut read_buf = ReadBuf::new(&mut buf);
|
||||
|
||||
let state = match Pin::new(&mut this.io).poll_read(cx, &mut read_buf) {
|
||||
// io is pending and new data would wake up it.
|
||||
Poll::Pending => ConnectionState::Live,
|
||||
// io have data inside. drop it.
|
||||
Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => {
|
||||
ConnectionState::Break
|
||||
}
|
||||
// otherwise skip to next.
|
||||
_ => ConnectionState::Skip,
|
||||
};
|
||||
|
||||
fut.boxed_local()
|
||||
Poll::Ready(state)
|
||||
}
|
||||
}
|
||||
|
||||
struct WaiterGuard<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
key: Key,
|
||||
token: usize,
|
||||
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
||||
}
|
||||
|
||||
impl<Io> WaiterGuard<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
fn new(key: Key, token: usize, inner: Rc<RefCell<Inner<Io>>>) -> Self {
|
||||
Self {
|
||||
key,
|
||||
token,
|
||||
inner: Some(inner),
|
||||
}
|
||||
}
|
||||
|
||||
fn consume(mut self) {
|
||||
let _ = self.inner.take();
|
||||
}
|
||||
}
|
||||
|
||||
impl<Io> Drop for WaiterGuard<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
if let Some(i) = self.inner.take() {
|
||||
let mut inner = i.as_ref().borrow_mut();
|
||||
inner.release_waiter(&self.key, self.token);
|
||||
inner.check_availability();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct OpenGuard<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
key: Key,
|
||||
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
||||
}
|
||||
|
||||
impl<Io> OpenGuard<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
fn new(key: Key, inner: Rc<RefCell<Inner<Io>>>) -> Self {
|
||||
Self {
|
||||
key,
|
||||
inner: Some(inner),
|
||||
}
|
||||
}
|
||||
|
||||
fn consume(mut self) -> Acquired<Io> {
|
||||
Acquired(self.key.clone(), self.inner.take())
|
||||
}
|
||||
}
|
||||
|
||||
impl<Io> Drop for OpenGuard<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
if let Some(i) = self.inner.take() {
|
||||
let mut inner = i.as_ref().borrow_mut();
|
||||
inner.release();
|
||||
inner.check_availability();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum Acquire<T> {
|
||||
Acquired(ConnectionType<T>, Instant),
|
||||
Available,
|
||||
NotAvailable,
|
||||
}
|
||||
|
||||
struct AvailableConnection<Io> {
|
||||
io: ConnectionType<Io>,
|
||||
struct PooledConnection<Io> {
|
||||
conn: ConnectionType<Io>,
|
||||
used: Instant,
|
||||
created: Instant,
|
||||
}
|
||||
|
||||
pub(crate) struct Inner<Io> {
|
||||
config: ConnectorConfig,
|
||||
acquired: usize,
|
||||
available: AHashMap<Key, VecDeque<AvailableConnection<Io>>>,
|
||||
waiters: Slab<
|
||||
Option<(
|
||||
Connect,
|
||||
oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
|
||||
)>,
|
||||
>,
|
||||
waiters_queue: IndexSet<(Key, usize)>,
|
||||
waker: LocalWaker,
|
||||
}
|
||||
|
||||
impl<Io> Inner<Io> {
|
||||
fn reserve(&mut self) {
|
||||
self.acquired += 1;
|
||||
}
|
||||
|
||||
fn release(&mut self) {
|
||||
self.acquired -= 1;
|
||||
}
|
||||
|
||||
fn release_waiter(&mut self, key: &Key, token: usize) {
|
||||
self.waiters.remove(token);
|
||||
let _ = self.waiters_queue.shift_remove(&(key.clone(), token));
|
||||
}
|
||||
}
|
||||
|
||||
impl<Io> Inner<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
/// connection is not available, wait
|
||||
fn wait_for(
|
||||
&mut self,
|
||||
connect: Connect,
|
||||
) -> (
|
||||
oneshot::Receiver<Result<IoConnection<Io>, ConnectError>>,
|
||||
usize,
|
||||
) {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let key: Key = connect.uri.authority().unwrap().clone().into();
|
||||
let entry = self.waiters.vacant_entry();
|
||||
let token = entry.key();
|
||||
entry.insert(Some((connect, tx)));
|
||||
assert!(self.waiters_queue.insert((key, token)));
|
||||
|
||||
(rx, token)
|
||||
}
|
||||
|
||||
fn acquire(&mut self, key: &Key, cx: &mut Context<'_>) -> Acquire<Io> {
|
||||
// check limits
|
||||
if self.config.limit > 0 && self.acquired >= self.config.limit {
|
||||
return Acquire::NotAvailable;
|
||||
}
|
||||
|
||||
self.reserve();
|
||||
|
||||
// check if open connection is available
|
||||
// cleanup stale connections at the same time
|
||||
if let Some(ref mut connections) = self.available.get_mut(key) {
|
||||
let now = Instant::now();
|
||||
while let Some(conn) = connections.pop_back() {
|
||||
// check if it still usable
|
||||
if (now - conn.used) > self.config.conn_keep_alive
|
||||
|| (now - conn.created) > self.config.conn_lifetime
|
||||
{
|
||||
if let Some(timeout) = self.config.disconnect_timeout {
|
||||
if let ConnectionType::H1(io) = conn.io {
|
||||
actix_rt::spawn(CloseConnection::new(io, timeout));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let mut io = conn.io;
|
||||
let mut buf = [0; 2];
|
||||
let mut read_buf = ReadBuf::new(&mut buf);
|
||||
if let ConnectionType::H1(ref mut s) = io {
|
||||
match Pin::new(s).poll_read(cx, &mut read_buf) {
|
||||
Poll::Pending => {}
|
||||
Poll::Ready(Ok(())) if !read_buf.filled().is_empty() => {
|
||||
if let Some(timeout) = self.config.disconnect_timeout {
|
||||
if let ConnectionType::H1(io) = io {
|
||||
actix_rt::spawn(CloseConnection::new(
|
||||
io, timeout,
|
||||
));
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
return Acquire::Acquired(io, conn.created);
|
||||
}
|
||||
}
|
||||
}
|
||||
Acquire::Available
|
||||
}
|
||||
|
||||
fn release_conn(&mut self, key: &Key, io: ConnectionType<Io>, created: Instant) {
|
||||
self.acquired -= 1;
|
||||
self.available
|
||||
.entry(key.clone())
|
||||
.or_insert_with(VecDeque::new)
|
||||
.push_back(AvailableConnection {
|
||||
io,
|
||||
created,
|
||||
used: Instant::now(),
|
||||
});
|
||||
self.check_availability();
|
||||
}
|
||||
|
||||
fn release_close(&mut self, io: ConnectionType<Io>) {
|
||||
self.acquired -= 1;
|
||||
if let Some(timeout) = self.config.disconnect_timeout {
|
||||
if let ConnectionType::H1(io) = io {
|
||||
actix_rt::spawn(CloseConnection::new(io, timeout));
|
||||
}
|
||||
}
|
||||
self.check_availability();
|
||||
}
|
||||
|
||||
fn check_availability(&self) {
|
||||
if !self.waiters_queue.is_empty() && self.acquired < self.config.limit {
|
||||
self.waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
struct CloseConnection<T> {
|
||||
io: T,
|
||||
#[pin_project]
|
||||
struct CloseConnection<Io> {
|
||||
io: Io,
|
||||
#[pin]
|
||||
timeout: Sleep,
|
||||
}
|
||||
|
||||
impl<T> CloseConnection<T>
|
||||
impl<Io> CloseConnection<Io>
|
||||
where
|
||||
T: AsyncWrite + Unpin,
|
||||
Io: AsyncWrite + Unpin,
|
||||
{
|
||||
fn new(io: T, timeout: Duration) -> Self {
|
||||
fn new(io: Io, timeout: Duration) -> Self {
|
||||
CloseConnection {
|
||||
io,
|
||||
timeout: sleep(timeout),
|
||||
|
@ -405,9 +323,9 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Future for CloseConnection<T>
|
||||
impl<Io> Future for CloseConnection<Io>
|
||||
where
|
||||
T: AsyncWrite + Unpin,
|
||||
Io: AsyncWrite + Unpin,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
|
@ -416,230 +334,46 @@ where
|
|||
|
||||
match this.timeout.poll(cx) {
|
||||
Poll::Ready(_) => Poll::Ready(()),
|
||||
Poll::Pending => match Pin::new(this.io).poll_shutdown(cx) {
|
||||
Poll::Ready(_) => Poll::Ready(()),
|
||||
Poll::Pending => Poll::Pending,
|
||||
},
|
||||
Poll::Pending => Pin::new(this.io).poll_shutdown(cx).map(|_| ()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
struct ConnectorPoolSupport<T, Io>
|
||||
pub(crate) struct Acquired<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
Io: AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
connector: Rc<T>,
|
||||
inner: Rc<RefCell<Inner<Io>>>,
|
||||
}
|
||||
|
||||
impl<T, Io> Future for ConnectorPoolSupport<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Connect, Response = (Io, Protocol), Error = ConnectError>,
|
||||
T::Future: 'static,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
if Rc::strong_count(this.inner) == 1 {
|
||||
// If we are last copy of Inner<Io> it means the ConnectionPool is already gone
|
||||
// and we are safe to exit.
|
||||
return Poll::Ready(());
|
||||
}
|
||||
|
||||
let mut inner = this.inner.borrow_mut();
|
||||
inner.waker.register(cx.waker());
|
||||
|
||||
// check waiters
|
||||
loop {
|
||||
let (key, token) = {
|
||||
if let Some((key, token)) = inner.waiters_queue.get_index(0) {
|
||||
(key.clone(), *token)
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
};
|
||||
if inner.waiters.get(token).unwrap().is_none() {
|
||||
continue;
|
||||
}
|
||||
|
||||
match inner.acquire(&key, cx) {
|
||||
Acquire::NotAvailable => break,
|
||||
Acquire::Acquired(io, created) => {
|
||||
let tx = inner.waiters.get_mut(token).unwrap().take().unwrap().1;
|
||||
if let Err(conn) = tx.send(Ok(IoConnection::new(
|
||||
io,
|
||||
created,
|
||||
Some(Acquired(key.clone(), Some(this.inner.clone()))),
|
||||
))) {
|
||||
let (io, created) = conn.unwrap().into_inner();
|
||||
inner.release_conn(&key, io, created);
|
||||
}
|
||||
}
|
||||
Acquire::Available => {
|
||||
let (connect, tx) =
|
||||
inner.waiters.get_mut(token).unwrap().take().unwrap();
|
||||
OpenWaitingConnection::spawn(
|
||||
key.clone(),
|
||||
tx,
|
||||
this.inner.clone(),
|
||||
this.connector.call(connect),
|
||||
inner.config.clone(),
|
||||
);
|
||||
}
|
||||
}
|
||||
let _ = inner.waiters_queue.swap_remove_index(0);
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project(PinnedDrop)]
|
||||
struct OpenWaitingConnection<F, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
#[pin]
|
||||
fut: F,
|
||||
key: Key,
|
||||
h2: Option<
|
||||
LocalBoxFuture<
|
||||
'static,
|
||||
Result<(SendRequest<Bytes>, Connection<Io, Bytes>), h2::Error>,
|
||||
>,
|
||||
>,
|
||||
rx: Option<oneshot::Sender<Result<IoConnection<Io>, ConnectError>>>,
|
||||
inner: Option<Rc<RefCell<Inner<Io>>>>,
|
||||
config: ConnectorConfig,
|
||||
inner: ConnectionPoolInner<Io>,
|
||||
permit: OwnedSemaphorePermit,
|
||||
}
|
||||
|
||||
impl<F, Io> OpenWaitingConnection<F, Io>
|
||||
where
|
||||
F: Future<Output = Result<(Io, Protocol), ConnectError>> + 'static,
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
fn spawn(
|
||||
key: Key,
|
||||
rx: oneshot::Sender<Result<IoConnection<Io>, ConnectError>>,
|
||||
inner: Rc<RefCell<Inner<Io>>>,
|
||||
fut: F,
|
||||
config: ConnectorConfig,
|
||||
) {
|
||||
actix_rt::spawn(OpenWaitingConnection {
|
||||
key,
|
||||
fut,
|
||||
h2: None,
|
||||
rx: Some(rx),
|
||||
inner: Some(inner),
|
||||
config,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pinned_drop]
|
||||
impl<F, Io> PinnedDrop for OpenWaitingConnection<F, Io>
|
||||
impl<Io> Acquired<Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
fn drop(self: Pin<&mut Self>) {
|
||||
if let Some(inner) = self.project().inner.take() {
|
||||
let mut inner = inner.as_ref().borrow_mut();
|
||||
inner.release();
|
||||
inner.check_availability();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, Io> Future for OpenWaitingConnection<F, Io>
|
||||
where
|
||||
F: Future<Output = Result<(Io, Protocol), ConnectError>>,
|
||||
Io: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.as_mut().project();
|
||||
|
||||
if let Some(ref mut h2) = this.h2 {
|
||||
return match Pin::new(h2).poll(cx) {
|
||||
Poll::Ready(Ok((sender, connection))) => {
|
||||
let rx = this.rx.take().unwrap();
|
||||
let _ = rx.send(Ok(IoConnection::new(
|
||||
ConnectionType::H2(H2Connection::new(sender, connection)),
|
||||
Instant::now(),
|
||||
Some(Acquired(this.key.clone(), this.inner.take())),
|
||||
)));
|
||||
Poll::Ready(())
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Err(err)) => {
|
||||
let _ = this.inner.take();
|
||||
if let Some(rx) = this.rx.take() {
|
||||
let _ = rx.send(Err(ConnectError::H2(err)));
|
||||
}
|
||||
Poll::Ready(())
|
||||
}
|
||||
};
|
||||
// close the Io type.
|
||||
pub(crate) fn close(&mut self, conn: IoConnection<Io>) {
|
||||
let (conn, _) = conn.into_inner();
|
||||
self.inner.close(conn);
|
||||
}
|
||||
|
||||
match this.fut.poll(cx) {
|
||||
Poll::Ready(Err(err)) => {
|
||||
let _ = this.inner.take();
|
||||
if let Some(rx) = this.rx.take() {
|
||||
let _ = rx.send(Err(err));
|
||||
}
|
||||
Poll::Ready(())
|
||||
}
|
||||
Poll::Ready(Ok((io, proto))) => {
|
||||
if proto == Protocol::Http1 {
|
||||
let rx = this.rx.take().unwrap();
|
||||
let _ = rx.send(Ok(IoConnection::new(
|
||||
ConnectionType::H1(io),
|
||||
Instant::now(),
|
||||
Some(Acquired(this.key.clone(), this.inner.take())),
|
||||
)));
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
*this.h2 = Some(handshake(io, this.config).boxed_local());
|
||||
self.poll(cx)
|
||||
}
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct Acquired<T>(Key, Option<Rc<RefCell<Inner<T>>>>);
|
||||
|
||||
impl<T> Acquired<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
pub(crate) fn close(&mut self, conn: IoConnection<T>) {
|
||||
if let Some(inner) = self.1.take() {
|
||||
let (io, _) = conn.into_inner();
|
||||
inner.as_ref().borrow_mut().release_close(io);
|
||||
}
|
||||
}
|
||||
pub(crate) fn release(&mut self, conn: IoConnection<T>) {
|
||||
if let Some(inner) = self.1.take() {
|
||||
// put the Io type back to pool.
|
||||
pub(crate) fn release(&mut self, conn: IoConnection<Io>) {
|
||||
let (io, created) = conn.into_inner();
|
||||
let Acquired { key, inner, .. } = self;
|
||||
inner
|
||||
.as_ref()
|
||||
.available
|
||||
.borrow_mut()
|
||||
.release_conn(&self.0, io, created);
|
||||
}
|
||||
}
|
||||
}
|
||||
.entry(key.clone())
|
||||
.or_insert_with(VecDeque::new)
|
||||
.push_back(PooledConnection {
|
||||
conn: io,
|
||||
created,
|
||||
used: Instant::now(),
|
||||
});
|
||||
|
||||
impl<T> Drop for Acquired<T> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(inner) = self.1.take() {
|
||||
inner.as_ref().borrow_mut().release();
|
||||
}
|
||||
// a no op bind. used to stop clippy warning without adding allow attribute.
|
||||
let _permit = &mut self.permit;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,7 +11,6 @@ use actix_utils::dispatcher::DispatcherError as FramedDispatcherError;
|
|||
use actix_utils::timeout::TimeoutError;
|
||||
use bytes::BytesMut;
|
||||
use derive_more::{Display, From};
|
||||
pub use futures_channel::oneshot::Canceled;
|
||||
use http::uri::InvalidUri;
|
||||
use http::{header, Error as HttpError, StatusCode};
|
||||
use serde::de::value::Error as DeError;
|
||||
|
@ -186,9 +185,6 @@ impl ResponseError for DeError {
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns [`StatusCode::INTERNAL_SERVER_ERROR`] for [`Canceled`].
|
||||
impl ResponseError for Canceled {}
|
||||
|
||||
/// Returns [`StatusCode::BAD_REQUEST`] for [`Utf8Error`].
|
||||
impl ResponseError for Utf8Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
|
|
Loading…
Reference in New Issue