mirror of https://github.com/fafhrd91/actix-web
reduce duplicate code
This commit is contained in:
parent
2dbdf61c37
commit
4936340901
|
@ -75,11 +75,14 @@ pub trait Connection {
|
|||
type Io: AsyncRead + AsyncWrite + Unpin;
|
||||
|
||||
/// Send request and body
|
||||
fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
|
||||
fn send_request<B, H>(
|
||||
self,
|
||||
head: H,
|
||||
body: B,
|
||||
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>;
|
||||
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
|
||||
where
|
||||
B: MessageBody + 'static,
|
||||
H: Into<RequestHeadType> + 'static;
|
||||
|
||||
/// Send request, returns Response and Framed
|
||||
fn open_tunnel<H: Into<RequestHeadType> + 'static>(
|
||||
|
@ -144,47 +147,31 @@ impl<T: AsyncRead + AsyncWrite + Unpin> IoConnection<T> {
|
|||
pub(crate) fn into_parts(self) -> (ConnectionType<T>, time::Instant, Acquired<T>) {
|
||||
(self.io.unwrap(), self.created, self.pool.unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Connection for IoConnection<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
type Io = T;
|
||||
|
||||
fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
|
||||
async fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
|
||||
mut self,
|
||||
head: H,
|
||||
body: B,
|
||||
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> {
|
||||
) -> Result<(ResponseHead, Payload), SendRequestError> {
|
||||
match self.io.take().unwrap() {
|
||||
ConnectionType::H1(io) => Box::pin(h1proto::send_request(
|
||||
io,
|
||||
head.into(),
|
||||
body,
|
||||
self.created,
|
||||
self.pool,
|
||||
)),
|
||||
ConnectionType::H2(io) => Box::pin(h2proto::send_request(
|
||||
io,
|
||||
head.into(),
|
||||
body,
|
||||
self.created,
|
||||
self.pool,
|
||||
)),
|
||||
ConnectionType::H1(io) => {
|
||||
h1proto::send_request(io, head.into(), body, self.created, self.pool)
|
||||
.await
|
||||
}
|
||||
ConnectionType::H2(io) => {
|
||||
h2proto::send_request(io, head.into(), body, self.created, self.pool)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send request, returns Response and Framed
|
||||
fn open_tunnel<H: Into<RequestHeadType>>(
|
||||
async fn open_tunnel<H: Into<RequestHeadType>>(
|
||||
mut self,
|
||||
head: H,
|
||||
) -> LocalBoxFuture<
|
||||
'static,
|
||||
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
|
||||
> {
|
||||
) -> Result<(ResponseHead, Framed<T, ClientCodec>), SendRequestError> {
|
||||
match self.io.take().unwrap() {
|
||||
ConnectionType::H1(io) => Box::pin(h1proto::open_tunnel(io, head.into())),
|
||||
ConnectionType::H1(io) => h1proto::open_tunnel(io, head.into()).await,
|
||||
ConnectionType::H2(io) => {
|
||||
if let Some(mut pool) = self.pool.take() {
|
||||
pool.release(IoConnection::new(
|
||||
|
@ -193,7 +180,7 @@ where
|
|||
None,
|
||||
));
|
||||
}
|
||||
Box::pin(async { Err(SendRequestError::TunnelNotSupported) })
|
||||
Err(SendRequestError::TunnelNotSupported)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -216,14 +203,18 @@ where
|
|||
{
|
||||
type Io = EitherIo<A, B>;
|
||||
|
||||
fn send_request<RB: MessageBody + 'static, H: Into<RequestHeadType>>(
|
||||
fn send_request<RB, H>(
|
||||
self,
|
||||
head: H,
|
||||
body: RB,
|
||||
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> {
|
||||
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
|
||||
where
|
||||
RB: MessageBody + 'static,
|
||||
H: Into<RequestHeadType> + 'static,
|
||||
{
|
||||
match self {
|
||||
EitherIoConnection::A(con) => con.send_request(head, body),
|
||||
EitherIoConnection::B(con) => con.send_request(head, body),
|
||||
EitherIoConnection::A(con) => Box::pin(con.send_request(head, body)),
|
||||
EitherIoConnection::B(con) => Box::pin(con.send_request(head, body)),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -244,26 +244,27 @@ where
|
|||
self,
|
||||
) -> impl Service<Connect, Response = impl Connection, Error = ConnectError> + Clone
|
||||
{
|
||||
let tcp_service = TimeoutService::new(
|
||||
self.config.timeout,
|
||||
apply_fn(self.connector.clone(), |msg: Connect, srv| {
|
||||
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
||||
})
|
||||
.map_err(ConnectError::from)
|
||||
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
TimeoutError::Service(e) => e,
|
||||
TimeoutError::Timeout => ConnectError::Timeout,
|
||||
});
|
||||
|
||||
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
||||
{
|
||||
let connector = TimeoutService::new(
|
||||
self.config.timeout,
|
||||
apply_fn(self.connector, |msg: Connect, srv| {
|
||||
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
||||
})
|
||||
.map_err(ConnectError::from)
|
||||
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
TimeoutError::Service(e) => e,
|
||||
TimeoutError::Timeout => ConnectError::Timeout,
|
||||
});
|
||||
|
||||
connect_impl::InnerConnector {
|
||||
tcp_pool: ConnectionPool::new(
|
||||
connector,
|
||||
self.config.no_disconnect_timeout(),
|
||||
),
|
||||
tls_pool: connect_impl::TlsPool::None,
|
||||
}
|
||||
}
|
||||
#[cfg(any(feature = "openssl", feature = "rustls"))]
|
||||
|
@ -328,84 +329,17 @@ where
|
|||
TimeoutError::Timeout => ConnectError::Timeout,
|
||||
});
|
||||
|
||||
let tcp_service = TimeoutService::new(
|
||||
self.config.timeout,
|
||||
apply_fn(self.connector, |msg: Connect, srv| {
|
||||
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
|
||||
})
|
||||
.map_err(ConnectError::from)
|
||||
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
|
||||
)
|
||||
.map_err(|e| match e {
|
||||
TimeoutError::Service(e) => e,
|
||||
TimeoutError::Timeout => ConnectError::Timeout,
|
||||
});
|
||||
|
||||
connect_impl::InnerConnector {
|
||||
tcp_pool: ConnectionPool::new(
|
||||
tcp_service,
|
||||
self.config.no_disconnect_timeout(),
|
||||
),
|
||||
ssl_pool: ConnectionPool::new(ssl_service, self.config),
|
||||
tls_pool: Some(ConnectionPool::new(ssl_service, self.config)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(any(feature = "openssl", feature = "rustls")))]
|
||||
mod connect_impl {
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
|
||||
use super::*;
|
||||
use crate::client::connection::IoConnection;
|
||||
|
||||
pub(crate) struct InnerConnector<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + 'static,
|
||||
{
|
||||
pub(crate) tcp_pool: ConnectionPool<T, Io>,
|
||||
}
|
||||
|
||||
impl<T, Io> Clone for InnerConnector<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + 'static,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
InnerConnector {
|
||||
tcp_pool: self.tcp_pool.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Io> Service<Connect> for InnerConnector<T, Io>
|
||||
where
|
||||
Io: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
T: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + 'static,
|
||||
{
|
||||
type Response = IoConnection<Io>;
|
||||
type Error = ConnectError;
|
||||
type Future = LocalBoxFuture<'static, Result<IoConnection<Io>, ConnectError>>;
|
||||
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.tcp_pool.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&self, req: Connect) -> Self::Future {
|
||||
match req.uri.scheme_str() {
|
||||
Some("https") | Some("wss") => {
|
||||
Box::pin(async { Err(ConnectError::SslIsNotSupported) })
|
||||
}
|
||||
_ => self.tcp_pool.call(req),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(feature = "openssl", feature = "rustls"))]
|
||||
mod connect_impl {
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
@ -422,7 +356,7 @@ mod connect_impl {
|
|||
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
pub(crate) tcp_pool: ConnectionPool<S1, Io1>,
|
||||
pub(crate) ssl_pool: ConnectionPool<S2, Io2>,
|
||||
pub(crate) tls_pool: Option<ConnectionPool<S2, Io2>>,
|
||||
}
|
||||
|
||||
impl<S1, S2, Io1, Io2> Clone for InnerConnector<S1, S2, Io1, Io2>
|
||||
|
@ -435,7 +369,7 @@ mod connect_impl {
|
|||
fn clone(&self) -> Self {
|
||||
InnerConnector {
|
||||
tcp_pool: self.tcp_pool.clone(),
|
||||
ssl_pool: self.ssl_pool.clone(),
|
||||
tls_pool: self.tls_pool.as_ref().cloned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -457,9 +391,10 @@ mod connect_impl {
|
|||
|
||||
fn call(&self, req: Connect) -> Self::Future {
|
||||
match req.uri.scheme_str() {
|
||||
Some("https") | Some("wss") => {
|
||||
InnerConnectorResponse::Io2(self.ssl_pool.call(req))
|
||||
}
|
||||
Some("https") | Some("wss") => match self.tls_pool {
|
||||
None => InnerConnectorResponse::SslIsNotSupported,
|
||||
Some(ref pool) => InnerConnectorResponse::Io2(pool.call(req)),
|
||||
},
|
||||
_ => InnerConnectorResponse::Io1(self.tcp_pool.call(req)),
|
||||
}
|
||||
}
|
||||
|
@ -475,6 +410,7 @@ mod connect_impl {
|
|||
{
|
||||
Io1(#[pin] <ConnectionPool<S1, Io1> as Service<Connect>>::Future),
|
||||
Io2(#[pin] <ConnectionPool<S2, Io2> as Service<Connect>>::Future),
|
||||
SslIsNotSupported,
|
||||
}
|
||||
|
||||
impl<S1, S2, Io1, Io2> Future for InnerConnectorResponse<S1, S2, Io1, Io2>
|
||||
|
@ -494,6 +430,9 @@ mod connect_impl {
|
|||
InnerConnectorProj::Io2(fut) => {
|
||||
fut.poll(cx).map_ok(EitherIoConnection::B)
|
||||
}
|
||||
InnerConnectorProj::SslIsNotSupported => {
|
||||
Poll::Ready(Err(ConnectError::SslIsNotSupported))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue