Merge branch 'master' into feature/client_middleware

This commit is contained in:
fakeshadow 2021-02-22 07:36:41 -08:00 committed by GitHub
commit 3c6eebbb2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 155 additions and 202 deletions

View File

@ -35,7 +35,7 @@ actix-tls = "3.0.0-beta.3"
actix-utils = "3.0.0-beta.2" actix-utils = "3.0.0-beta.2"
actix-rt = "2" actix-rt = "2"
actix-server = "2.0.0-beta.3" actix-server = "2.0.0-beta.3"
awc = "3.0.0-beta.2" awc = { version = "3.0.0-beta.2", default-features = false }
base64 = "0.13" base64 = "0.13"
bytes = "1" bytes = "1"
@ -57,5 +57,5 @@ features = ["vendored"]
optional = true optional = true
[dev-dependencies] [dev-dependencies]
actix-web = "4.0.0-beta.3" actix-web = { version = "4.0.0-beta.3", default-features = false, features = ["cookies"] }
actix-http = "3.0.0-beta.3" actix-http = "3.0.0-beta.3"

View File

@ -75,11 +75,14 @@ pub trait Connection {
type Io: AsyncRead + AsyncWrite + Unpin; type Io: AsyncRead + AsyncWrite + Unpin;
/// Send request and body /// Send request and body
fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>( fn send_request<B, H>(
self, self,
head: H, head: H,
body: B, body: B,
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>; ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
where
B: MessageBody + 'static,
H: Into<RequestHeadType> + 'static;
/// Send request, returns Response and Framed /// Send request, returns Response and Framed
fn open_tunnel<H: Into<RequestHeadType> + 'static>( fn open_tunnel<H: Into<RequestHeadType> + 'static>(
@ -144,47 +147,31 @@ impl<T: AsyncRead + AsyncWrite + Unpin> IoConnection<T> {
pub(crate) fn into_parts(self) -> (ConnectionType<T>, time::Instant, Acquired<T>) { pub(crate) fn into_parts(self) -> (ConnectionType<T>, time::Instant, Acquired<T>) {
(self.io.unwrap(), self.created, self.pool.unwrap()) (self.io.unwrap(), self.created, self.pool.unwrap())
} }
}
impl<T> Connection for IoConnection<T> async fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Io = T;
fn send_request<B: MessageBody + 'static, H: Into<RequestHeadType>>(
mut self, mut self,
head: H, head: H,
body: B, body: B,
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> { ) -> Result<(ResponseHead, Payload), SendRequestError> {
match self.io.take().unwrap() { match self.io.take().unwrap() {
ConnectionType::H1(io) => Box::pin(h1proto::send_request( ConnectionType::H1(io) => {
io, h1proto::send_request(io, head.into(), body, self.created, self.pool)
head.into(), .await
body, }
self.created, ConnectionType::H2(io) => {
self.pool, h2proto::send_request(io, head.into(), body, self.created, self.pool)
)), .await
ConnectionType::H2(io) => Box::pin(h2proto::send_request( }
io,
head.into(),
body,
self.created,
self.pool,
)),
} }
} }
/// Send request, returns Response and Framed /// Send request, returns Response and Framed
fn open_tunnel<H: Into<RequestHeadType>>( async fn open_tunnel<H: Into<RequestHeadType>>(
mut self, mut self,
head: H, head: H,
) -> LocalBoxFuture< ) -> Result<(ResponseHead, Framed<T, ClientCodec>), SendRequestError> {
'static,
Result<(ResponseHead, Framed<Self::Io, ClientCodec>), SendRequestError>,
> {
match self.io.take().unwrap() { match self.io.take().unwrap() {
ConnectionType::H1(io) => Box::pin(h1proto::open_tunnel(io, head.into())), ConnectionType::H1(io) => h1proto::open_tunnel(io, head.into()).await,
ConnectionType::H2(io) => { ConnectionType::H2(io) => {
if let Some(mut pool) = self.pool.take() { if let Some(mut pool) = self.pool.take() {
pool.release(IoConnection::new( pool.release(IoConnection::new(
@ -193,7 +180,7 @@ where
None, None,
)); ));
} }
Box::pin(async { Err(SendRequestError::TunnelNotSupported) }) Err(SendRequestError::TunnelNotSupported)
} }
} }
} }
@ -216,14 +203,18 @@ where
{ {
type Io = EitherIo<A, B>; type Io = EitherIo<A, B>;
fn send_request<RB: MessageBody + 'static, H: Into<RequestHeadType>>( fn send_request<RB, H>(
self, self,
head: H, head: H,
body: RB, body: RB,
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> { ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
where
RB: MessageBody + 'static,
H: Into<RequestHeadType> + 'static,
{
match self { match self {
EitherIoConnection::A(con) => con.send_request(head, body), EitherIoConnection::A(con) => Box::pin(con.send_request(head, body)),
EitherIoConnection::B(con) => con.send_request(head, body), EitherIoConnection::B(con) => Box::pin(con.send_request(head, body)),
} }
} }

View File

@ -1,5 +1,8 @@
use std::fmt; use std::fmt;
use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
@ -12,7 +15,7 @@ use actix_utils::timeout::{TimeoutError, TimeoutService};
use http::Uri; use http::Uri;
use super::config::ConnectorConfig; use super::config::ConnectorConfig;
use super::connection::Connection; use super::connection::{Connection, EitherIoConnection};
use super::error::ConnectError; use super::error::ConnectError;
use super::pool::{ConnectionPool, Protocol}; use super::pool::{ConnectionPool, Protocol};
use super::Connect; use super::Connect;
@ -55,7 +58,7 @@ pub struct Connector<T, U> {
_phantom: PhantomData<U>, _phantom: PhantomData<U>,
} }
trait Io: AsyncRead + AsyncWrite + Unpin {} pub trait Io: AsyncRead + AsyncWrite + Unpin {}
impl<T: AsyncRead + AsyncWrite + Unpin> Io for T {} impl<T: AsyncRead + AsyncWrite + Unpin> Io for T {}
impl Connector<(), ()> { impl Connector<(), ()> {
@ -244,28 +247,43 @@ where
self, self,
) -> impl Service<Connect, Response = impl Connection, Error = ConnectError> + Clone ) -> impl Service<Connect, Response = impl Connection, Error = ConnectError> + Clone
{ {
let tcp_service = TimeoutService::new(
self.config.timeout,
apply_fn(self.connector.clone(), |msg: Connect, srv| {
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
})
.map_err(ConnectError::from)
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
TimeoutError::Timeout => ConnectError::Timeout,
});
#[cfg(not(any(feature = "openssl", feature = "rustls")))] #[cfg(not(any(feature = "openssl", feature = "rustls")))]
{ {
let connector = TimeoutService::new( // A dummy service for annotate tls pool's type signature.
self.config.timeout, pub type DummyService = Box<
apply_fn(self.connector, |msg: Connect, srv| { dyn Service<
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr)) Connect,
}) Response = (Box<dyn Io>, Protocol),
.map_err(ConnectError::from) Error = ConnectError,
.map(|stream| (stream.into_parts().0, Protocol::Http1)), Future = futures_core::future::LocalBoxFuture<
) 'static,
.map_err(|e| match e { Result<(Box<dyn Io>, Protocol), ConnectError>,
TimeoutError::Service(e) => e, >,
TimeoutError::Timeout => ConnectError::Timeout, >,
}); >;
connect_impl::InnerConnector { InnerConnector::<_, DummyService, _, Box<dyn Io>> {
tcp_pool: ConnectionPool::new( tcp_pool: ConnectionPool::new(
connector, tcp_service,
self.config.no_disconnect_timeout(), self.config.no_disconnect_timeout(),
), ),
tls_pool: None,
} }
} }
#[cfg(any(feature = "openssl", feature = "rustls"))] #[cfg(any(feature = "openssl", feature = "rustls"))]
{ {
const H2: &[u8] = b"h2"; const H2: &[u8] = b"h2";
@ -328,172 +346,97 @@ where
TimeoutError::Timeout => ConnectError::Timeout, TimeoutError::Timeout => ConnectError::Timeout,
}); });
let tcp_service = TimeoutService::new( InnerConnector {
self.config.timeout,
apply_fn(self.connector, |msg: Connect, srv| {
srv.call(TcpConnect::new(msg.uri).set_addr(msg.addr))
})
.map_err(ConnectError::from)
.map(|stream| (stream.into_parts().0, Protocol::Http1)),
)
.map_err(|e| match e {
TimeoutError::Service(e) => e,
TimeoutError::Timeout => ConnectError::Timeout,
});
connect_impl::InnerConnector {
tcp_pool: ConnectionPool::new( tcp_pool: ConnectionPool::new(
tcp_service, tcp_service,
self.config.no_disconnect_timeout(), self.config.no_disconnect_timeout(),
), ),
ssl_pool: ConnectionPool::new(ssl_service, self.config), tls_pool: Some(ConnectionPool::new(ssl_service, self.config)),
} }
} }
} }
} }
#[cfg(not(any(feature = "openssl", feature = "rustls")))] struct InnerConnector<S1, S2, Io1, Io2>
mod connect_impl { where
use std::task::{Context, Poll}; S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
{
tcp_pool: ConnectionPool<S1, Io1>,
tls_pool: Option<ConnectionPool<S2, Io2>>,
}
use futures_core::future::LocalBoxFuture; impl<S1, S2, Io1, Io2> Clone for InnerConnector<S1, S2, Io1, Io2>
where
use super::*; S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
use crate::client::connection::IoConnection; S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
pub(crate) struct InnerConnector<T, Io> Io2: AsyncRead + AsyncWrite + Unpin + 'static,
where {
Io: AsyncRead + AsyncWrite + Unpin + 'static, fn clone(&self) -> Self {
T: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + 'static, InnerConnector {
{ tcp_pool: self.tcp_pool.clone(),
pub(crate) tcp_pool: ConnectionPool<T, Io>, tls_pool: self.tls_pool.as_ref().cloned(),
}
impl<T, Io> Clone for InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
T: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + 'static,
{
fn clone(&self) -> Self {
InnerConnector {
tcp_pool: self.tcp_pool.clone(),
}
}
}
impl<T, Io> Service<Connect> for InnerConnector<T, Io>
where
Io: AsyncRead + AsyncWrite + Unpin + 'static,
T: Service<Connect, Response = (Io, Protocol), Error = ConnectError> + 'static,
{
type Response = IoConnection<Io>;
type Error = ConnectError;
type Future = LocalBoxFuture<'static, Result<IoConnection<Io>, ConnectError>>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.tcp_pool.poll_ready(cx)
}
fn call(&self, req: Connect) -> Self::Future {
match req.uri.scheme_str() {
Some("https") | Some("wss") => {
Box::pin(async { Err(ConnectError::SslIsNotSupported) })
}
_ => self.tcp_pool.call(req),
}
} }
} }
} }
#[cfg(any(feature = "openssl", feature = "rustls"))] impl<S1, S2, Io1, Io2> Service<Connect> for InnerConnector<S1, S2, Io1, Io2>
mod connect_impl { where
use std::future::Future; S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
use std::pin::Pin; S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
use std::task::{Context, Poll}; Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = EitherIoConnection<Io1, Io2>;
type Error = ConnectError;
type Future = InnerConnectorResponse<S1, S2, Io1, Io2>;
use super::*; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
use crate::client::connection::EitherIoConnection; self.tcp_pool.poll_ready(cx)
pub(crate) struct InnerConnector<S1, S2, Io1, Io2>
where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
{
pub(crate) tcp_pool: ConnectionPool<S1, Io1>,
pub(crate) ssl_pool: ConnectionPool<S2, Io2>,
} }
impl<S1, S2, Io1, Io2> Clone for InnerConnector<S1, S2, Io1, Io2> fn call(&self, req: Connect) -> Self::Future {
where match req.uri.scheme_str() {
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static, Some("https") | Some("wss") => match self.tls_pool {
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static, None => InnerConnectorResponse::SslIsNotSupported,
Io1: AsyncRead + AsyncWrite + Unpin + 'static, Some(ref pool) => InnerConnectorResponse::Io2(pool.call(req)),
Io2: AsyncRead + AsyncWrite + Unpin + 'static, },
{ _ => InnerConnectorResponse::Io1(self.tcp_pool.call(req)),
fn clone(&self) -> Self {
InnerConnector {
tcp_pool: self.tcp_pool.clone(),
ssl_pool: self.ssl_pool.clone(),
}
} }
} }
}
impl<S1, S2, Io1, Io2> Service<Connect> for InnerConnector<S1, S2, Io1, Io2> #[pin_project::pin_project(project = InnerConnectorProj)]
where enum InnerConnectorResponse<S1, S2, Io1, Io2>
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static, where
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static, S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static, S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static, Io1: AsyncRead + AsyncWrite + Unpin + 'static,
{ Io2: AsyncRead + AsyncWrite + Unpin + 'static,
type Response = EitherIoConnection<Io1, Io2>; {
type Error = ConnectError; Io1(#[pin] <ConnectionPool<S1, Io1> as Service<Connect>>::Future),
type Future = InnerConnectorResponse<S1, S2, Io1, Io2>; Io2(#[pin] <ConnectionPool<S2, Io2> as Service<Connect>>::Future),
SslIsNotSupported,
}
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { impl<S1, S2, Io1, Io2> Future for InnerConnectorResponse<S1, S2, Io1, Io2>
self.tcp_pool.poll_ready(cx) where
} S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Output = Result<EitherIoConnection<Io1, Io2>, ConnectError>;
fn call(&self, req: Connect) -> Self::Future { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match req.uri.scheme_str() { match self.project() {
Some("https") | Some("wss") => { InnerConnectorProj::Io1(fut) => fut.poll(cx).map_ok(EitherIoConnection::A),
InnerConnectorResponse::Io2(self.ssl_pool.call(req)) InnerConnectorProj::Io2(fut) => fut.poll(cx).map_ok(EitherIoConnection::B),
} InnerConnectorProj::SslIsNotSupported => {
_ => InnerConnectorResponse::Io1(self.tcp_pool.call(req)), Poll::Ready(Err(ConnectError::SslIsNotSupported))
}
}
}
#[pin_project::pin_project(project = InnerConnectorProj)]
pub(crate) enum InnerConnectorResponse<S1, S2, Io1, Io2>
where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
{
Io1(#[pin] <ConnectionPool<S1, Io1> as Service<Connect>>::Future),
Io2(#[pin] <ConnectionPool<S2, Io2> as Service<Connect>>::Future),
}
impl<S1, S2, Io1, Io2> Future for InnerConnectorResponse<S1, S2, Io1, Io2>
where
S1: Service<Connect, Response = (Io1, Protocol), Error = ConnectError> + 'static,
S2: Service<Connect, Response = (Io2, Protocol), Error = ConnectError> + 'static,
Io1: AsyncRead + AsyncWrite + Unpin + 'static,
Io2: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Output = Result<EitherIoConnection<Io1, Io2>, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
InnerConnectorProj::Io1(fut) => {
fut.poll(cx).map_ok(EitherIoConnection::A)
}
InnerConnectorProj::Io2(fut) => {
fut.poll(cx).map_ok(EitherIoConnection::B)
}
} }
} }
} }

View File

@ -983,6 +983,7 @@ mod tests {
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
} }
#[cfg(feature = "cookies")]
#[test] #[test]
fn test_cookie_parse() { fn test_cookie_parse() {
let resp: Response = CookieParseError::EmptyName.error_response(); let resp: Response = CookieParseError::EmptyName.error_response();

View File

@ -896,8 +896,9 @@ mod tests {
use super::*; use super::*;
use crate::body::Body; use crate::body::Body;
use crate::http::header::{HeaderValue, CONTENT_TYPE, COOKIE, SET_COOKIE}; use crate::http::header::{HeaderValue, CONTENT_TYPE, COOKIE};
use crate::HttpMessage; #[cfg(feature = "cookies")]
use crate::{http::header::SET_COOKIE, HttpMessage};
#[test] #[test]
fn test_debug() { fn test_debug() {
@ -909,6 +910,7 @@ mod tests {
assert!(dbg.contains("Response")); assert!(dbg.contains("Response"));
} }
#[cfg(feature = "cookies")]
#[test] #[test]
fn test_response_cookies() { fn test_response_cookies() {
let req = crate::test::TestRequest::default() let req = crate::test::TestRequest::default()
@ -946,6 +948,7 @@ mod tests {
); );
} }
#[cfg(feature = "cookies")]
#[test] #[test]
fn test_update_response_cookies() { fn test_update_response_cookies() {
let mut r = Response::Ok() let mut r = Response::Ok()
@ -1097,6 +1100,7 @@ mod tests {
assert_eq!(resp.body().get_ref(), b"test"); assert_eq!(resp.body().get_ref(), b"test");
} }
#[cfg(feature = "cookies")]
#[test] #[test]
fn test_into_builder() { fn test_into_builder() {
let mut resp: Response = "test".into(); let mut resp: Response = "test".into();

View File

@ -29,7 +29,7 @@ use crate::{dev::Payload, error::QueryPayloadError, Error, FromRequest, HttpRequ
/// Code /// Code
/// } /// }
/// ///
/// #[derive(Deserialize)] /// #[derive(Debug, Deserialize)]
/// pub struct AuthRequest { /// pub struct AuthRequest {
/// id: u64, /// id: u64,
/// response_type: ResponseType, /// response_type: ResponseType,
@ -42,9 +42,23 @@ use crate::{dev::Payload, error::QueryPayloadError, Error, FromRequest, HttpRequ
/// async fn index(info: web::Query<AuthRequest>) -> String { /// async fn index(info: web::Query<AuthRequest>) -> String {
/// format!("Authorization request for id={} and type={:?}!", info.id, info.response_type) /// format!("Authorization request for id={} and type={:?}!", info.id, info.response_type)
/// } /// }
///
/// // To access the entire underlying query struct, use `.into_inner()`.
/// #[get("/debug1")]
/// async fn debug1(info: web::Query<AuthRequest>) -> String {
/// dbg!("Authorization object={:?}", info.into_inner());
/// "OK".to_string()
/// }
///
/// // Or use `.0`, which is equivalent to `.into_inner()`.
/// #[get("/debug2")]
/// async fn debug2(info: web::Query<AuthRequest>) -> String {
/// dbg!("Authorization object={:?}", info.0);
/// "OK".to_string()
/// }
/// ``` /// ```
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)] #[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Query<T>(T); pub struct Query<T>(pub T);
impl<T> Query<T> { impl<T> Query<T> {
/// Unwrap into inner `T` value. /// Unwrap into inner `T` value.