From 22f251de1a7c255f980a82040744f4d2a205534e Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 21 Feb 2021 13:42:02 -0800 Subject: [PATCH] expose multiple types related to InnerConnector --- actix-http/src/client/connection.rs | 2 +- actix-http/src/client/connector.rs | 188 +++++++++++++--------------- actix-http/src/client/pool.rs | 2 +- 3 files changed, 86 insertions(+), 106 deletions(-) diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 97ecd0515..fcbbf1c92 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -187,7 +187,7 @@ impl IoConnection { } #[allow(dead_code)] -pub(crate) enum EitherIoConnection +pub enum EitherIoConnection where A: AsyncRead + AsyncWrite + Unpin + 'static, B: AsyncRead + AsyncWrite + Unpin + 'static, diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index 78d488d9c..138808613 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -1,5 +1,8 @@ use std::fmt; +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::Duration; use actix_codec::{AsyncRead, AsyncWrite}; @@ -12,7 +15,7 @@ use actix_utils::timeout::{TimeoutError, TimeoutService}; use http::Uri; use super::config::ConnectorConfig; -use super::connection::Connection; +use super::connection::EitherIoConnection; use super::error::ConnectError; use super::pool::{ConnectionPool, Protocol}; use super::Connect; @@ -55,7 +58,7 @@ pub struct Connector { _phantom: PhantomData, } -trait Io: AsyncRead + AsyncWrite + Unpin {} +pub trait Io: AsyncRead + AsyncWrite + Unpin {} impl Io for T {} impl Connector<(), ()> { @@ -242,8 +245,12 @@ where /// its combinator chain. pub fn finish( self, - ) -> impl Service + Clone - { + ) -> InnerConnector< + impl Service, + impl Service, Protocol), Error = ConnectError>, + U, + Box, + > { let tcp_service = TimeoutService::new( self.config.timeout, apply_fn(self.connector.clone(), |msg: Connect, srv| { @@ -259,22 +266,7 @@ where #[cfg(not(any(feature = "openssl", feature = "rustls")))] { - use futures_core::future::LocalBoxFuture; - - // A dummy service for annotate tls pool's type signature. - type DummyService = Box< - dyn Service< - Connect, - Response = (Box, Protocol), - Error = ConnectError, - Future = LocalBoxFuture< - 'static, - Result<(Box, Protocol), ConnectError>, - >, - >, - >; - - connect_impl::InnerConnector::<_, DummyService, _, Box> { + InnerConnector { tcp_pool: ConnectionPool::new( tcp_service, self.config.no_disconnect_timeout(), @@ -282,6 +274,7 @@ where tls_pool: None, } } + #[cfg(any(feature = "openssl", feature = "rustls"))] { const H2: &[u8] = b"h2"; @@ -344,7 +337,7 @@ where TimeoutError::Timeout => ConnectError::Timeout, }); - connect_impl::InnerConnector { + InnerConnector { tcp_pool: ConnectionPool::new( tcp_service, self.config.no_disconnect_timeout(), @@ -355,99 +348,86 @@ where } } -mod connect_impl { - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; +pub struct InnerConnector +where + S1: Service + 'static, + S2: Service + 'static, + Io1: AsyncRead + AsyncWrite + Unpin + 'static, + Io2: AsyncRead + AsyncWrite + Unpin + 'static, +{ + tcp_pool: ConnectionPool, + tls_pool: Option>, +} - use super::*; - use crate::client::connection::EitherIoConnection; - - pub(crate) struct InnerConnector - where - S1: Service + 'static, - S2: Service + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - { - pub(crate) tcp_pool: ConnectionPool, - pub(crate) tls_pool: Option>, - } - - impl Clone for InnerConnector - where - S1: Service + 'static, - S2: Service + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - { - fn clone(&self) -> Self { - InnerConnector { - tcp_pool: self.tcp_pool.clone(), - tls_pool: self.tls_pool.as_ref().cloned(), - } +impl Clone for InnerConnector +where + S1: Service + 'static, + S2: Service + 'static, + Io1: AsyncRead + AsyncWrite + Unpin + 'static, + Io2: AsyncRead + AsyncWrite + Unpin + 'static, +{ + fn clone(&self) -> Self { + InnerConnector { + tcp_pool: self.tcp_pool.clone(), + tls_pool: self.tls_pool.as_ref().cloned(), } } +} - impl Service for InnerConnector - where - S1: Service + 'static, - S2: Service + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - { - type Response = EitherIoConnection; - type Error = ConnectError; - type Future = InnerConnectorResponse; +impl Service for InnerConnector +where + S1: Service + 'static, + S2: Service + 'static, + Io1: AsyncRead + AsyncWrite + Unpin + 'static, + Io2: AsyncRead + AsyncWrite + Unpin + 'static, +{ + type Response = EitherIoConnection; + type Error = ConnectError; + type Future = InnerConnectorResponse; - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.tcp_pool.poll_ready(cx) - } - - fn call(&self, req: Connect) -> Self::Future { - match req.uri.scheme_str() { - Some("https") | Some("wss") => match self.tls_pool { - None => InnerConnectorResponse::SslIsNotSupported, - Some(ref pool) => InnerConnectorResponse::Io2(pool.call(req)), - }, - _ => InnerConnectorResponse::Io1(self.tcp_pool.call(req)), - } - } + fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.tcp_pool.poll_ready(cx) } - #[pin_project::pin_project(project = InnerConnectorProj)] - pub(crate) enum InnerConnectorResponse - where - S1: Service + 'static, - S2: Service + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - { - Io1(#[pin] as Service>::Future), - Io2(#[pin] as Service>::Future), - SslIsNotSupported, + fn call(&self, req: Connect) -> Self::Future { + match req.uri.scheme_str() { + Some("https") | Some("wss") => match self.tls_pool { + None => InnerConnectorResponse::SslIsNotSupported, + Some(ref pool) => InnerConnectorResponse::Io2(pool.call(req)), + }, + _ => InnerConnectorResponse::Io1(self.tcp_pool.call(req)), + } } +} - impl Future for InnerConnectorResponse - where - S1: Service + 'static, - S2: Service + 'static, - Io1: AsyncRead + AsyncWrite + Unpin + 'static, - Io2: AsyncRead + AsyncWrite + Unpin + 'static, - { - type Output = Result, ConnectError>; +#[pin_project::pin_project(project = InnerConnectorProj)] +pub enum InnerConnectorResponse +where + S1: Service + 'static, + S2: Service + 'static, + Io1: AsyncRead + AsyncWrite + Unpin + 'static, + Io2: AsyncRead + AsyncWrite + Unpin + 'static, +{ + Io1(#[pin] as Service>::Future), + Io2(#[pin] as Service>::Future), + SslIsNotSupported, +} - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.project() { - InnerConnectorProj::Io1(fut) => { - fut.poll(cx).map_ok(EitherIoConnection::A) - } - InnerConnectorProj::Io2(fut) => { - fut.poll(cx).map_ok(EitherIoConnection::B) - } - InnerConnectorProj::SslIsNotSupported => { - Poll::Ready(Err(ConnectError::SslIsNotSupported)) - } +impl Future for InnerConnectorResponse +where + S1: Service + 'static, + S2: Service + 'static, + Io1: AsyncRead + AsyncWrite + Unpin + 'static, + Io2: AsyncRead + AsyncWrite + Unpin + 'static, +{ + type Output = Result, ConnectError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project() { + InnerConnectorProj::Io1(fut) => fut.poll(cx).map_ok(EitherIoConnection::A), + InnerConnectorProj::Io2(fut) => fut.poll(cx).map_ok(EitherIoConnection::B), + InnerConnectorProj::SslIsNotSupported => { + Poll::Ready(Err(ConnectError::SslIsNotSupported)) } } } diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 3800696fa..c93670d8e 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -45,7 +45,7 @@ impl From for Key { } /// Connections pool for reuse Io type for certain [`http::uri::Authority`] as key. -pub(crate) struct ConnectionPool +pub struct ConnectionPool where Io: AsyncWrite + Unpin + 'static, {