From 4936340901288e0c8b973563b2fd2c87de2f3173 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 21 Feb 2021 08:30:50 -0800 Subject: [PATCH] reduce duplicate code --- actix-http/src/client/connection.rs | 63 +++++++--------- actix-http/src/client/connector.rs | 111 +++++++--------------------- 2 files changed, 52 insertions(+), 122 deletions(-) diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 778083a1c..97ecd0515 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -75,11 +75,14 @@ pub trait Connection { type Io: AsyncRead + AsyncWrite + Unpin; /// Send request and body - fn send_request>( + fn send_request( self, head: H, body: B, - ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>; + ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> + where + B: MessageBody + 'static, + H: Into + 'static; /// Send request, returns Response and Framed fn open_tunnel + 'static>( @@ -144,47 +147,31 @@ impl IoConnection { pub(crate) fn into_parts(self) -> (ConnectionType, time::Instant, Acquired) { (self.io.unwrap(), self.created, self.pool.unwrap()) } -} -impl Connection for IoConnection -where - T: AsyncRead + AsyncWrite + Unpin + 'static, -{ - type Io = T; - - fn send_request>( + async fn send_request>( mut self, head: H, body: B, - ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> { + ) -> Result<(ResponseHead, Payload), SendRequestError> { match self.io.take().unwrap() { - ConnectionType::H1(io) => Box::pin(h1proto::send_request( - io, - head.into(), - body, - self.created, - self.pool, - )), - ConnectionType::H2(io) => Box::pin(h2proto::send_request( - io, - head.into(), - body, - self.created, - self.pool, - )), + ConnectionType::H1(io) => { + h1proto::send_request(io, head.into(), body, self.created, self.pool) + .await + } + ConnectionType::H2(io) => { + h2proto::send_request(io, head.into(), body, self.created, self.pool) + .await + } } } /// Send request, returns Response and Framed - fn open_tunnel>( + async fn open_tunnel>( mut self, head: H, - ) -> LocalBoxFuture< - 'static, - Result<(ResponseHead, Framed), SendRequestError>, - > { + ) -> Result<(ResponseHead, Framed), SendRequestError> { match self.io.take().unwrap() { - ConnectionType::H1(io) => Box::pin(h1proto::open_tunnel(io, head.into())), + ConnectionType::H1(io) => h1proto::open_tunnel(io, head.into()).await, ConnectionType::H2(io) => { if let Some(mut pool) = self.pool.take() { pool.release(IoConnection::new( @@ -193,7 +180,7 @@ where None, )); } - Box::pin(async { Err(SendRequestError::TunnelNotSupported) }) + Err(SendRequestError::TunnelNotSupported) } } } @@ -216,14 +203,18 @@ where { type Io = EitherIo; - fn send_request>( + fn send_request( self, head: H, body: RB, - ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> { + ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> + where + RB: MessageBody + 'static, + H: Into + 'static, + { match self { - EitherIoConnection::A(con) => con.send_request(head, body), - EitherIoConnection::B(con) => con.send_request(head, body), + EitherIoConnection::A(con) => Box::pin(con.send_request(head, body)), + EitherIoConnection::B(con) => Box::pin(con.send_request(head, body)), } } diff --git a/actix-http/src/client/connector.rs b/actix-http/src/client/connector.rs index 65536f257..c93f764ce 100644 --- a/actix-http/src/client/connector.rs +++ b/actix-http/src/client/connector.rs @@ -244,26 +244,27 @@ where self, ) -> impl Service + Clone { + let tcp_service = TimeoutService::new( + self.config.timeout, + apply_fn(self.connector.clone(), |msg: Connect, srv| { + srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) + }) + .map_err(ConnectError::from) + .map(|stream| (stream.into_parts().0, Protocol::Http1)), + ) + .map_err(|e| match e { + TimeoutError::Service(e) => e, + TimeoutError::Timeout => ConnectError::Timeout, + }); + #[cfg(not(any(feature = "openssl", feature = "rustls")))] { - let connector = TimeoutService::new( - self.config.timeout, - apply_fn(self.connector, |msg: Connect, srv| { - srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) - }) - .map_err(ConnectError::from) - .map(|stream| (stream.into_parts().0, Protocol::Http1)), - ) - .map_err(|e| match e { - TimeoutError::Service(e) => e, - TimeoutError::Timeout => ConnectError::Timeout, - }); - connect_impl::InnerConnector { tcp_pool: ConnectionPool::new( connector, self.config.no_disconnect_timeout(), ), + tls_pool: connect_impl::TlsPool::None, } } #[cfg(any(feature = "openssl", feature = "rustls"))] @@ -328,84 +329,17 @@ where TimeoutError::Timeout => ConnectError::Timeout, }); - let tcp_service = TimeoutService::new( - self.config.timeout, - apply_fn(self.connector, |msg: Connect, srv| { - srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) - }) - .map_err(ConnectError::from) - .map(|stream| (stream.into_parts().0, Protocol::Http1)), - ) - .map_err(|e| match e { - TimeoutError::Service(e) => e, - TimeoutError::Timeout => ConnectError::Timeout, - }); - connect_impl::InnerConnector { tcp_pool: ConnectionPool::new( tcp_service, self.config.no_disconnect_timeout(), ), - ssl_pool: ConnectionPool::new(ssl_service, self.config), + tls_pool: Some(ConnectionPool::new(ssl_service, self.config)), } } } } -#[cfg(not(any(feature = "openssl", feature = "rustls")))] -mod connect_impl { - use std::task::{Context, Poll}; - - use futures_core::future::LocalBoxFuture; - - use super::*; - use crate::client::connection::IoConnection; - - pub(crate) struct InnerConnector - where - Io: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service + 'static, - { - pub(crate) tcp_pool: ConnectionPool, - } - - impl Clone for InnerConnector - where - Io: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service + 'static, - { - fn clone(&self) -> Self { - InnerConnector { - tcp_pool: self.tcp_pool.clone(), - } - } - } - - impl Service for InnerConnector - where - Io: AsyncRead + AsyncWrite + Unpin + 'static, - T: Service + 'static, - { - type Response = IoConnection; - type Error = ConnectError; - type Future = LocalBoxFuture<'static, Result, ConnectError>>; - - fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { - self.tcp_pool.poll_ready(cx) - } - - fn call(&self, req: Connect) -> Self::Future { - match req.uri.scheme_str() { - Some("https") | Some("wss") => { - Box::pin(async { Err(ConnectError::SslIsNotSupported) }) - } - _ => self.tcp_pool.call(req), - } - } - } -} - -#[cfg(any(feature = "openssl", feature = "rustls"))] mod connect_impl { use std::future::Future; use std::pin::Pin; @@ -422,7 +356,7 @@ mod connect_impl { Io2: AsyncRead + AsyncWrite + Unpin + 'static, { pub(crate) tcp_pool: ConnectionPool, - pub(crate) ssl_pool: ConnectionPool, + pub(crate) tls_pool: Option>, } impl Clone for InnerConnector @@ -435,7 +369,7 @@ mod connect_impl { fn clone(&self) -> Self { InnerConnector { tcp_pool: self.tcp_pool.clone(), - ssl_pool: self.ssl_pool.clone(), + tls_pool: self.tls_pool.as_ref().cloned(), } } } @@ -457,9 +391,10 @@ mod connect_impl { fn call(&self, req: Connect) -> Self::Future { match req.uri.scheme_str() { - Some("https") | Some("wss") => { - InnerConnectorResponse::Io2(self.ssl_pool.call(req)) - } + Some("https") | Some("wss") => match self.tls_pool { + None => InnerConnectorResponse::SslIsNotSupported, + Some(ref pool) => InnerConnectorResponse::Io2(pool.call(req)), + }, _ => InnerConnectorResponse::Io1(self.tcp_pool.call(req)), } } @@ -475,6 +410,7 @@ mod connect_impl { { Io1(#[pin] as Service>::Future), Io2(#[pin] as Service>::Future), + SslIsNotSupported, } impl Future for InnerConnectorResponse @@ -494,6 +430,9 @@ mod connect_impl { InnerConnectorProj::Io2(fut) => { fut.poll(cx).map_ok(EitherIoConnection::B) } + InnerConnectorProj::SslIsNotSupported => { + Poll::Ready(Err(ConnectError::SslIsNotSupported)) + } } } }