diff --git a/awc/Cargo.toml b/awc/Cargo.toml index 9beecc6d4..e403dc3aa 100644 --- a/awc/Cargo.toml +++ b/awc/Cargo.toml @@ -47,7 +47,7 @@ trust-dns = ["actix-http/trust-dns"] actix-codec = "0.4.0-beta.1" actix-service = "2.0.0-beta.4" actix-http = "3.0.0-beta.3" -actix-rt = "2" +actix-rt = { version = "2", default-features = false } base64 = "0.13" bytes = "1" @@ -57,6 +57,7 @@ futures-core = { version = "0.3.7", default-features = false } log =" 0.4" mime = "0.3" percent-encoding = "2.1" +pin-project-lite = "0.2" rand = "0.8" serde = "1.0" serde_json = "1.0" diff --git a/awc/src/builder.rs b/awc/src/builder.rs index 4495b39fd..bfef55c1e 100644 --- a/awc/src/builder.rs +++ b/awc/src/builder.rs @@ -1,7 +1,4 @@ -use std::convert::TryFrom; -use std::fmt; -use std::rc::Rc; -use std::time::Duration; +use std::{convert::TryFrom, fmt, rc::Rc, time::Duration}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_http::{ @@ -9,23 +6,26 @@ use actix_http::{ http::{self, header, Error as HttpError, HeaderMap, HeaderName, Uri}, }; use actix_rt::net::TcpStream; -use actix_service::Service; +use actix_service::{boxed, Service}; -use crate::connect::ConnectorWrapper; -use crate::{Client, ClientConfig}; +use crate::connect::DefaultConnector; +use crate::error::SendRequestError; +use crate::middleware::{NestTransform, Transform}; +use crate::{Client, ClientConfig, ConnectRequest, ConnectResponse, ConnectorService}; /// An HTTP Client builder /// /// This type can be used to construct an instance of `Client` through a /// builder-like pattern. -pub struct ClientBuilder { +pub struct ClientBuilder { + middleware: M, default_headers: bool, max_http_version: Option, stream_window_size: Option, conn_window_size: Option, headers: HeaderMap, timeout: Option, - connector: Connector, + connector: Connector, } impl ClientBuilder { @@ -37,8 +37,10 @@ impl ClientBuilder { Error = TcpConnectError, > + Clone, TcpStream, + (), > { ClientBuilder { + middleware: (), default_headers: true, headers: HeaderMap::new(), timeout: Some(Duration::from_secs(5)), @@ -50,7 +52,7 @@ impl ClientBuilder { } } -impl ClientBuilder +impl ClientBuilder where S: Service, Response = TcpConnection, Error = TcpConnectError> + Clone @@ -58,7 +60,7 @@ where Io: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, { /// Use custom connector service. - pub fn connector(self, connector: Connector) -> ClientBuilder + pub fn connector(self, connector: Connector) -> ClientBuilder where S1: Service< TcpConnect, @@ -69,6 +71,7 @@ where Io1: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, { ClientBuilder { + middleware: self.middleware, default_headers: self.default_headers, headers: self.headers, timeout: self.timeout, @@ -171,8 +174,37 @@ where self.header(header::AUTHORIZATION, format!("Bearer {}", token)) } + /// Registers middleware, in the form of a middleware component (type), + /// that runs during inbound and/or outbound processing in the request + /// life-cycle (request -> response), modifying request/response as + /// necessary, across all requests managed by the Client. + pub fn wrap( + self, + mw: M1, + ) -> ClientBuilder> + where + M: Transform, + M1: Transform, + { + ClientBuilder { + middleware: NestTransform::new(self.middleware, mw), + default_headers: self.default_headers, + max_http_version: self.max_http_version, + stream_window_size: self.stream_window_size, + conn_window_size: self.conn_window_size, + headers: self.headers, + timeout: self.timeout, + connector: self.connector, + } + } + /// Finish build process and create `Client` instance. - pub fn finish(self) -> Client { + pub fn finish(self) -> Client + where + M: Transform + 'static, + M::Transform: + Service, + { let mut connector = self.connector; if let Some(val) = self.max_http_version { @@ -185,10 +217,14 @@ where connector = connector.initial_window_size(val) }; + let connector = boxed::service(DefaultConnector::new(connector.finish())); + + let connector = boxed::service(self.middleware.new_transform(connector)); + let config = ClientConfig { headers: self.headers, timeout: self.timeout, - connector: Box::new(ConnectorWrapper::new(connector.finish())) as _, + connector, }; Client(Rc::new(config)) diff --git a/awc/src/connect.rs b/awc/src/connect.rs index 97af2d1cc..a4abbc46b 100644 --- a/awc/src/connect.rs +++ b/awc/src/connect.rs @@ -1,5 +1,7 @@ use std::{ - fmt, io, net, + fmt, + future::Future, + io, net, pin::Pin, task::{Context, Poll}, }; @@ -9,24 +11,14 @@ use actix_http::{ body::Body, client::{Connect as ClientConnect, ConnectError, Connection, SendRequestError}, h1::ClientCodec, - RequestHead, RequestHeadType, ResponseHead, + Payload, RequestHead, RequestHeadType, ResponseHead, }; use actix_service::Service; -use futures_core::future::LocalBoxFuture; +use futures_core::{future::LocalBoxFuture, ready}; use crate::response::ClientResponse; -pub(crate) struct ConnectorWrapper { - connector: T, -} - -impl ConnectorWrapper { - pub(crate) fn new(connector: T) -> Self { - Self { connector } - } -} - -pub type ConnectService = Box< +pub type ConnectorService = Box< dyn Service< ConnectRequest, Response = ConnectResponse, @@ -65,16 +57,25 @@ impl ConnectResponse { } } -impl Service for ConnectorWrapper +pub(crate) struct DefaultConnector { + connector: S, +} + +impl DefaultConnector { + pub(crate) fn new(connector: S) -> Self { + Self { connector } + } +} + +impl Service for DefaultConnector where - T: Service, - T::Response: Connection, - ::Io: 'static, - T::Future: 'static, + S: Service, + S::Response: Connection, + ::Io: 'static, { type Response = ConnectResponse; type Error = SendRequestError; - type Future = LocalBoxFuture<'static, Result>; + type Future = ConnectRequestFuture::Io>; actix_service::forward_ready!(connector); @@ -91,26 +92,76 @@ where }), }; - Box::pin(async move { - let connection = fut.await?; + ConnectRequestFuture::Connection { + fut, + req: Some(req), + } + } +} - match req { - ConnectRequest::Client(head, body, ..) => { - // send request - let (head, payload) = connection.send_request(head, body).await?; +pin_project_lite::pin_project! { + #[project = ConnectRequestProj] + pub(crate) enum ConnectRequestFuture { + Connection { + #[pin] + fut: Fut, + req: Option + }, + Client { + fut: LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> + }, + Tunnel { + fut: LocalBoxFuture< + 'static, + Result<(ResponseHead, Framed), SendRequestError>, + >, + } + } +} - Ok(ConnectResponse::Client(ClientResponse::new(head, payload))) - } - ConnectRequest::Tunnel(head, ..) => { - // send request - let (head, framed) = - connection.open_tunnel(RequestHeadType::from(head)).await?; - - let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); - Ok(ConnectResponse::Tunnel(head, framed)) +impl Future for ConnectRequestFuture +where + Fut: Future>, + C: Connection, + Io: AsyncRead + AsyncWrite + Unpin + 'static, +{ + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().project() { + ConnectRequestProj::Connection { fut, req } => { + let connection = ready!(fut.poll(cx))?; + let req = req.take().unwrap(); + match req { + ConnectRequest::Client(head, body, ..) => { + // send request + let fut = ConnectRequestFuture::Client { + fut: connection.send_request(head, body), + }; + self.as_mut().set(fut); + } + ConnectRequest::Tunnel(head, ..) => { + // send request + let fut = ConnectRequestFuture::Tunnel { + fut: connection.open_tunnel(RequestHeadType::from(head)), + }; + self.as_mut().set(fut); + } } + self.poll(cx) } - }) + ConnectRequestProj::Client { fut } => { + let (head, payload) = ready!(fut.as_mut().poll(cx))?; + Poll::Ready(Ok(ConnectResponse::Client(ClientResponse::new( + head, payload, + )))) + } + ConnectRequestProj::Tunnel { fut } => { + let (head, framed) = ready!(fut.as_mut().poll(cx))?; + let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); + Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed))) + } + } } } diff --git a/awc/src/lib.rs b/awc/src/lib.rs index 66ff55402..2f48dca79 100644 --- a/awc/src/lib.rs +++ b/awc/src/lib.rs @@ -107,12 +107,13 @@ use actix_http::{ RequestHead, }; use actix_rt::net::TcpStream; -use actix_service::Service; +use actix_service::{boxed, Service}; mod builder; mod connect; pub mod error; mod frozen; +pub mod middleware; mod request; mod response; mod sender; @@ -120,14 +121,12 @@ pub mod test; pub mod ws; pub use self::builder::ClientBuilder; -pub use self::connect::{BoxedSocket, ConnectRequest, ConnectResponse, ConnectService}; +pub use self::connect::{BoxedSocket, ConnectRequest, ConnectResponse, ConnectorService}; pub use self::frozen::{FrozenClientRequest, FrozenSendBuilder}; pub use self::request::ClientRequest; pub use self::response::{ClientResponse, JsonBody, MessageBody}; pub use self::sender::SendClientRequest; -use self::connect::ConnectorWrapper; - /// An asynchronous HTTP and WebSocket client. /// /// ## Examples @@ -151,7 +150,7 @@ use self::connect::ConnectorWrapper; pub struct Client(Rc); pub(crate) struct ClientConfig { - pub(crate) connector: ConnectService, + pub(crate) connector: ConnectorService, pub(crate) headers: HeaderMap, pub(crate) timeout: Option, } @@ -159,7 +158,9 @@ pub(crate) struct ClientConfig { impl Default for Client { fn default() -> Self { Client(Rc::new(ClientConfig { - connector: Box::new(ConnectorWrapper::new(Connector::new().finish())), + connector: boxed::service(self::connect::DefaultConnector::new( + Connector::new().finish(), + )), headers: HeaderMap::new(), timeout: Some(Duration::from_secs(5)), })) diff --git a/awc/src/middleware/mod.rs b/awc/src/middleware/mod.rs new file mode 100644 index 000000000..ae767c4e8 --- /dev/null +++ b/awc/src/middleware/mod.rs @@ -0,0 +1,71 @@ +mod redirect; + +pub use self::redirect::RedirectMiddleware; + +use std::marker::PhantomData; + +use actix_service::Service; + +/// Trait for transform a type to another one. +/// Both the input and output type should impl [actix_service::Service] trait. +pub trait Transform { + type Transform: Service; + + /// Creates and returns a new Transform component. + fn new_transform(self, service: S) -> Self::Transform; +} + +#[doc(hidden)] +/// Helper struct for constructing Nested types that would call `Transform::new_transform` +/// in a chain. +/// +/// The child field would be called first and the output `Service` type is +/// passed to parent as input type. +pub struct NestTransform +where + T1: Transform, + T2: Transform, +{ + child: T1, + parent: T2, + _service: PhantomData<(S, Req)>, +} + +impl NestTransform +where + T1: Transform, + T2: Transform, +{ + pub(crate) fn new(child: T1, parent: T2) -> Self { + NestTransform { + child, + parent, + _service: PhantomData, + } + } +} + +impl Transform for NestTransform +where + T1: Transform, + T2: Transform, +{ + type Transform = T2::Transform; + + fn new_transform(self, service: S) -> Self::Transform { + let service = self.child.new_transform(service); + self.parent.new_transform(service) + } +} + +/// Dummy impl for kick start `NestTransform` type in `ClientBuilder` type +impl Transform for () +where + S: Service, +{ + type Transform = S; + + fn new_transform(self, service: S) -> Self::Transform { + service + } +} diff --git a/awc/src/middleware/redirect.rs b/awc/src/middleware/redirect.rs new file mode 100644 index 000000000..02d37c3b4 --- /dev/null +++ b/awc/src/middleware/redirect.rs @@ -0,0 +1,202 @@ +use std::rc::Rc; + +use actix_http::client::InvalidUrl; +use actix_http::{ + body::Body, + client::SendRequestError, + http::{header, StatusCode, Uri}, + RequestHead, RequestHeadType, +}; +use actix_service::Service; +use futures_core::future::LocalBoxFuture; + +use super::Transform; + +use crate::connect::{ConnectRequest, ConnectResponse}; + +pub struct RedirectMiddleware { + max_redirect_times: u8, +} + +impl Default for RedirectMiddleware { + fn default() -> Self { + Self::new() + } +} + +impl RedirectMiddleware { + pub fn new() -> Self { + Self { + max_redirect_times: 10, + } + } + + pub fn max_redirect_times(mut self, times: u8) -> Self { + self.max_redirect_times = times; + self + } +} + +impl Transform for RedirectMiddleware +where + S: Service + 'static, +{ + type Transform = RedirectService; + + fn new_transform(self, service: S) -> Self::Transform { + RedirectService { + max_redirect_times: self.max_redirect_times, + connector: Rc::new(service), + } + } +} + +pub struct RedirectService { + max_redirect_times: u8, + connector: Rc, +} + +impl Service for RedirectService +where + S: Service + 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = LocalBoxFuture<'static, Result>; + + actix_service::forward_ready!(connector); + + fn call(&self, req: ConnectRequest) -> Self::Future { + let connector = self.connector.clone(); + let mut max_redirect_times = self.max_redirect_times; + + Box::pin(async move { + match req { + // tunnel request is skipped. + ConnectRequest::Tunnel(head, addr) => { + return connector.call(ConnectRequest::Tunnel(head, addr)).await + } + ConnectRequest::Client(mut head, mut body, addr) => { + // backup the uri for reuse schema and authority. + let uri = match head { + RequestHeadType::Owned(ref head) => head.uri.clone(), + RequestHeadType::Rc(ref head, ..) => head.uri.clone(), + }; + + loop { + let res = connector + .call(ConnectRequest::Client(head, body, addr.clone())) + .await?; + match res { + ConnectResponse::Client(res) => match res.head().status { + StatusCode::MOVED_PERMANENTLY + | StatusCode::FOUND + | StatusCode::SEE_OTHER + | StatusCode::TEMPORARY_REDIRECT + | StatusCode::PERMANENT_REDIRECT + if max_redirect_times > 0 => + { + // rebuild uri from the location header value. + let uri = res + .headers() + .get(header::LOCATION) + .map(|value| { + Uri::builder() + .scheme(uri.scheme().cloned().unwrap()) + .authority(uri.authority().cloned().unwrap()) + .path_and_query(value.as_bytes()) + }) + .ok_or(SendRequestError::Url( + InvalidUrl::MissingScheme, + ))? + .build()?; + + // use a new request head. + let mut head_new = RequestHead::default(); + head_new.uri = uri; + + head = RequestHeadType::Owned(head_new); + + // throw body + body = Body::None; + + max_redirect_times -= 1; + } + _ => return Ok(ConnectResponse::Client(res)), + }, + _ => unreachable!( + " ConnectRequest::Tunnel is not handled by Redirect" + ), + } + } + } + } + }) + } +} + +#[cfg(test)] +mod tests { + use actix_web::{test::start, web, App, Error, HttpResponse}; + + use super::*; + + use crate::ClientBuilder; + + #[actix_rt::test] + async fn test_basic_redirect() { + let client = ClientBuilder::new() + .wrap(RedirectMiddleware::new().max_redirect_times(10)) + .finish(); + + let srv = start(|| { + App::new() + .service(web::resource("/test").route(web::to(|| async { + Ok::<_, Error>(HttpResponse::BadRequest()) + }))) + .service(web::resource("/").route(web::to(|| async { + Ok::<_, Error>( + HttpResponse::Found() + .append_header(("location", "/test")) + .finish(), + ) + }))) + }); + + let res = client.get(srv.url("/")).send().await.unwrap(); + + assert_eq!(res.status().as_u16(), 400); + } + + #[actix_rt::test] + async fn test_redirect_limit() { + let client = ClientBuilder::new() + .wrap(RedirectMiddleware::new().max_redirect_times(1)) + .finish(); + + let srv = start(|| { + App::new() + .service(web::resource("/").route(web::to(|| async { + Ok::<_, Error>( + HttpResponse::Found() + .append_header(("location", "/test")) + .finish(), + ) + }))) + .service(web::resource("/test").route(web::to(|| async { + Ok::<_, Error>( + HttpResponse::Found() + .append_header(("location", "/test2")) + .finish(), + ) + }))) + .service(web::resource("/test2").route(web::to(|| async { + Ok::<_, Error>(HttpResponse::BadRequest()) + }))) + }); + + let res = client.get(srv.url("/")).send().await.unwrap(); + + assert_eq!(res.status().as_u16(), 302); + } +}