Merge branch 'master' into refactor/awc_client_builder

This commit is contained in:
fakeshadow 2021-02-18 03:56:51 -08:00 committed by GitHub
commit 51353c4dd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 118 deletions

View File

@ -16,7 +16,7 @@ use crate::message::{RequestHeadType, ResponseHead};
use crate::payload::Payload; use crate::payload::Payload;
use super::error::SendRequestError; use super::error::SendRequestError;
use super::pool::{Acquired, Protocol}; use super::pool::Acquired;
use super::{h1proto, h2proto}; use super::{h1proto, h2proto};
pub(crate) enum ConnectionType<Io> { pub(crate) enum ConnectionType<Io> {
@ -49,7 +49,7 @@ impl H2Connection {
} }
} }
// wake up waker when drop // cancel spawned connection task on drop.
impl Drop for H2Connection { impl Drop for H2Connection {
fn drop(&mut self) { fn drop(&mut self) {
self.handle.abort(); self.handle.abort();
@ -74,8 +74,6 @@ impl DerefMut for H2Connection {
pub trait Connection { pub trait Connection {
type Io: AsyncRead + AsyncWrite + Unpin; type Io: AsyncRead + AsyncWrite + Unpin;
fn protocol(&self) -> Protocol;
/// Send request and body /// Send request and body
fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>( fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
self, self,
@ -154,14 +152,6 @@ where
{ {
type Io = T; type Io = T;
fn protocol(&self) -> Protocol {
match self.io {
Some(ConnectionType::H1(_)) => Protocol::Http1,
Some(ConnectionType::H2(_)) => Protocol::Http2,
None => Protocol::Http1,
}
}
fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>( fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
mut self, mut self,
head: H, head: H,
@ -210,7 +200,7 @@ where
} }
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) enum EitherConnection<A, B> pub(crate) enum EitherIoConnection<A, B>
where where
A: AsyncRead + AsyncWrite + Unpin + 'static, A: AsyncRead + AsyncWrite + Unpin + 'static,
B: AsyncRead + AsyncWrite + Unpin + 'static, B: AsyncRead + AsyncWrite + Unpin + 'static,
@ -219,28 +209,21 @@ where
B(IoConnection<B>), B(IoConnection<B>),
} }
impl<A, B> Connection for EitherConnection<A, B> impl<A, B> Connection for EitherIoConnection<A, B>
where where
A: AsyncRead + AsyncWrite + Unpin + 'static, A: AsyncRead + AsyncWrite + Unpin + 'static,
B: AsyncRead + AsyncWrite + Unpin + 'static, B: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
type Io = EitherIo<A, B>; type Io = EitherIo<A, B>;
fn protocol(&self) -> Protocol {
match self {
EitherConnection::A(con) => con.protocol(),
EitherConnection::B(con) => con.protocol(),
}
}
fn send_request<RB: MessageBody + 'static, H: Into<RequestHeadType>>( fn send_request<RB: MessageBody + 'static, H: Into<RequestHeadType>>(
self, self,
head: H, head: H,
body: RB, body: RB,
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> { ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> {
match self { match self {
EitherConnection::A(con) => con.send_request(head, body), EitherIoConnection::A(con) => con.send_request(head, body),
EitherConnection::B(con) => con.send_request(head, body), EitherIoConnection::B(con) => con.send_request(head, body),
} }
} }
@ -253,11 +236,11 @@ where
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>, Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
> { > {
match self { match self {
EitherConnection::A(con) => Box::pin(async { EitherIoConnection::A(con) => Box::pin(async {
let (head, framed) = con.open_tunnel(head).await?; let (head, framed) = con.open_tunnel(head).await?;
Ok((head, framed.into_map_io(EitherIo::A))) Ok((head, framed.into_map_io(EitherIo::A)))
}), }),
EitherConnection::B(con) => Box::pin(async { EitherIoConnection::B(con) => Box::pin(async {
let (head, framed) = con.open_tunnel(head).await?; let (head, framed) = con.open_tunnel(head).await?;
Ok((head, framed.into_map_io(EitherIo::B))) Ok((head, framed.into_map_io(EitherIo::B)))
}), }),

View File

@ -356,7 +356,7 @@ where
mod connect_impl { mod connect_impl {
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures_util::future::{err, Either, Ready}; use futures_core::future::LocalBoxFuture;
use super::*; use super::*;
use crate::client::connection::IoConnection; use crate::client::connection::IoConnection;
@ -388,10 +388,7 @@ mod connect_impl {
{ {
type Response = IoConnection<Io>; type Response = IoConnection<Io>;
type Error = ConnectError; type Error = ConnectError;
type Future = Either< type Future = LocalBoxFuture<'static, Result<IoConnection<Io>, ConnectError>>;
<ConnectionPool<T, Io> as Service<Connect>>::Future,
Ready<Result<IoConnection<Io>, ConnectError>>,
>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tcp_pool.poll_ready(cx) self.tcp_pool.poll_ready(cx)
@ -400,9 +397,9 @@ mod connect_impl {
fn call(&self, req: Connect) -> Self::Future { fn call(&self, req: Connect) -> Self::Future {
match req.uri.scheme_str() { match req.uri.scheme_str() {
Some("https") | Some("wss") => { Some("https") | Some("wss") => {
Either::Right(err(ConnectError::SslIsNotSupported)) Box::pin(async { Err(ConnectError::SslIsNotSupported) })
} }
_ => Either::Left(self.tcp_pool.call(req)), _ => self.tcp_pool.call(req),
} }
} }
} }
@ -411,33 +408,29 @@ mod connect_impl {
#[cfg(any(feature = "openssl", feature = "rustls"))] #[cfg(any(feature = "openssl", feature = "rustls"))]
mod connect_impl { mod connect_impl {
use std::future::Future; use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures_core::ready;
use futures_util::future::Either;
use super::*; use super::*;
use crate::client::connection::EitherConnection; use crate::client::connection::EitherIoConnection;
pub(crate) struct InnerConnector<T1, T2, Io1, Io2> pub(crate) struct InnerConnector<S1, S2, Io1, Io2>
where where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static, Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static, Io2: AsyncRead + AsyncWrite + Unpin + 'static,
T1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError>,
T2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError>,
{ {
pub(crate) tcp_pool: ConnectionPool<T1, Io1>, pub(crate) tcp_pool: ConnectionPool<S1, Io1>,
pub(crate) ssl_pool: ConnectionPool<T2, Io2>, pub(crate) ssl_pool: ConnectionPool<S2, Io2>,
} }
impl<T1, T2, Io1, Io2> Clone for InnerConnector<T1, T2, Io1, Io2> impl<S1, S2, Io1, Io2> Clone for InnerConnector<S1, S2, Io1, Io2>
where where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static, Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static, Io2: AsyncRead + AsyncWrite + Unpin + 'static,
T1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
T2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
{ {
fn clone(&self) -> Self { fn clone(&self) -> Self {
InnerConnector { InnerConnector {
@ -447,19 +440,16 @@ mod connect_impl {
} }
} }
impl<T1, T2, Io1, Io2> Service<Connect> for InnerConnector<T1, T2, Io1, Io2> impl<S1, S2, Io1, Io2> Service<Connect> for InnerConnector<S1, S2, Io1, Io2>
where where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static, Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static, Io2: AsyncRead + AsyncWrite + Unpin + 'static,
T1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
T2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
{ {
type Response = EitherConnection<Io1, Io2>; type Response = EitherIoConnection<Io1, Io2>;
type Error = ConnectError; type Error = ConnectError;
type Future = Either< type Future = InnerConnectorResponse<S1, S2, Io1, Io2>;
InnerConnectorResponseA<T1, Io1, Io2>,
InnerConnectorResponseB<T2, Io1, Io2>,
>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tcp_pool.poll_ready(cx) self.tcp_pool.poll_ready(cx)
@ -467,69 +457,44 @@ mod connect_impl {
fn call(&self, req: Connect) -> Self::Future { fn call(&self, req: Connect) -> Self::Future {
match req.uri.scheme_str() { match req.uri.scheme_str() {
Some("https") | Some("wss") => Either::Right(InnerConnectorResponseB { Some("https") | Some("wss") => {
fut: self.ssl_pool.call(req), InnerConnectorResponse::Io2(self.ssl_pool.call(req))
_phantom: PhantomData, }
}), _ => InnerConnectorResponse::Io1(self.tcp_pool.call(req)),
_ => Either::Left(InnerConnectorResponseA {
fut: self.tcp_pool.call(req),
_phantom: PhantomData,
}),
} }
} }
} }
#[pin_project::pin_project] #[pin_project::pin_project(project = InnerConnectorProj)]
pub(crate) struct InnerConnectorResponseA<T, Io1, Io2> pub(crate) enum InnerConnectorResponse<S1, S2, Io1, Io2>
where where
Io1: AsyncRead + AsyncWrite + Unpin + 'static, S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
T: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static, S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
{
#[pin]
fut: <ConnectionPool<T, Io1> as Service<Connect>>::Future,
_phantom: PhantomData<Io2>,
}
impl<T, Io1, Io2> Future for InnerConnectorResponseA<T, Io1, Io2>
where
T: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static, Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static, Io2: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
type Output = Result<EitherConnection<Io1, Io2>, ConnectError>; Io1(#[pin] <ConnectionPool<S1, Io1> as Service<Connect>>::Future),
Io2(#[pin] <ConnectionPool<S2, Io2> as Service<Connect>>::Future),
}
impl<S1, S2, Io1, Io2> Future for InnerConnectorResponse<S1, S2, Io1, Io2>
where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Output = Result<EitherIoConnection<Io1, Io2>, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready( match self.project() {
ready!(Pin::new(&mut self.get_mut().fut).poll(cx)) InnerConnectorProj::Io1(fut) => {
.map(EitherConnection::A), fut.poll(cx).map_ok(EitherIoConnection::A)
) }
} InnerConnectorProj::Io2(fut) => {
} fut.poll(cx).map_ok(EitherIoConnection::B)
}
#[pin_project::pin_project] }
pub(crate) struct InnerConnectorResponseB<T, Io1, Io2>
where
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
T: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
{
#[pin]
fut: <ConnectionPool<T, Io2> as Service<Connect>>::Future,
_phantom: PhantomData<Io1>,
}
impl<T, Io1, Io2> Future for InnerConnectorResponseB<T, Io1, Io2>
where
T: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Output = Result<EitherConnection<Io1, Io2>, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(
ready!(Pin::new(&mut self.get_mut().fut).poll(cx))
.map(EitherConnection::B),
)
} }
} }
} }

View File

@ -510,25 +510,43 @@ where
match msg { match msg {
Message::Item(mut req) => { Message::Item(mut req) => {
let pl = this.codec.message_type();
req.head_mut().peer_addr = *this.peer_addr; req.head_mut().peer_addr = *this.peer_addr;
// merge on_connect_ext data into request extensions // merge on_connect_ext data into request extensions
this.on_connect_data.merge_into(&mut req); this.on_connect_data.merge_into(&mut req);
if pl == MessageType::Stream && this.flow.upgrade.is_some() { match this.codec.message_type() {
this.messages.push_back(DispatcherMessage::Upgrade(req)); // Request is upgradable. add upgrade message and break.
break; // everything remain in read buffer would be handed to
} // upgraded Request.
if pl == MessageType::Payload || pl == MessageType::Stream { MessageType::Stream if this.flow.upgrade.is_some() => {
let (ps, pl) = Payload::create(false); this.messages
let (req1, _) = .push_back(DispatcherMessage::Upgrade(req));
req.replace_payload(crate::Payload::H1(pl)); break;
req = req1; }
*this.payload = Some(ps);
// Request is not upgradable.
MessageType::Payload | MessageType::Stream => {
/*
PayloadSender and Payload are smart pointers share the
same state.
PayloadSender is attached to dispatcher and used to sink
new chunked request data to state.
Payload is attached to Request and passed to Service::call
where the state can be collected and consumed.
*/
let (ps, pl) = Payload::create(false);
let (req1, _) =
req.replace_payload(crate::Payload::H1(pl));
req = req1;
*this.payload = Some(ps);
}
// Request has no payload.
MessageType::None => {}
} }
// handle request early // handle request early when no future in InnerDispatcher state.
if this.state.is_empty() { if this.state.is_empty() {
self.as_mut().handle_request(req, cx)?; self.as_mut().handle_request(req, cx)?;
this = self.as_mut().project(); this = self.as_mut().project();