use std::cell::RefCell; use std::future::Future; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll}; use std::{fmt, io, mem, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_http::body::Body; use actix_http::client::{ Connect as ClientConnect, ConnectError, Connection, SendRequestError, }; use actix_http::h1::ClientCodec; use actix_http::http::{HeaderMap, Uri}; use actix_http::Extensions; use actix_http::{RequestHead, RequestHeadType, ResponseHead}; use actix_service::Service; use crate::response::ClientResponse; pub(crate) struct ConnectorWrapper(pub Rc>); pub(crate) trait Connect { fn send_request( &mut self, head: RequestHead, body: Body, addr: Option, ) -> Pin>>>; fn send_request_extra( &mut self, head: Rc, extra_headers: Option, body: Body, addr: Option, ) -> Pin>>>; /// Send request, returns Response and Framed fn open_tunnel( &mut self, head: RequestHead, addr: Option, ) -> Pin< Box< dyn Future< Output = Result< (ResponseHead, Framed), SendRequestError, >, >, >, >; /// Send request and extra headers, returns Response and Framed fn open_tunnel_extra( &mut self, head: Rc, extra_headers: Option, addr: Option, ) -> Pin< Box< dyn Future< Output = Result< (ResponseHead, Framed), SendRequestError, >, >, >, >; } impl Connect for ConnectorWrapper where T: Service + 'static, T::Response: Connection, ::Io: 'static, ::Future: 'static, ::TunnelFuture: 'static, T::Future: 'static, { fn send_request( &mut self, head: RequestHead, body: Body, addr: Option, ) -> Pin>>> { fn deal_with_redirects( backend: Rc>, head: RequestHead, body: Body, addr: Option, ) -> Pin>>> where S: Service + 'static, S::Response: Connection, ::Io: 'static, ::Future: 'static, ::TunnelFuture: 'static, S::Future: 'static, { // connect to the host let fut = backend.borrow_mut().call(ClientConnect { uri: head.uri.clone(), addr, }); Box::pin(async move { let connection = fut.await?; // FIXME: whether we'll resend the body depends on the redirect status code let reqbody = match body { Body::None => Body::None, Body::Empty => Body::Empty, Body::Bytes(ref b) => Body::Bytes(b.clone()), // can't re-stream body, send an empty one instead // TODO: maybe emit some kind of warning? Body::Message(_) => Body::Empty, }; let mut reqhead = RequestHead::default(); // FIXME: method depends on redirect code reqhead.method = head.method.clone(); reqhead.version = head.version.clone(); // FIXME: not all headers should be mirrored on redirect reqhead.headers = head.headers.clone(); // FIXME: should we mirror extensions? reqhead.extensions = RefCell::new(Extensions::new()); reqhead.peer_addr = head.peer_addr.clone(); // send request let resp = connection .send_request(RequestHeadType::from(head), body) .await; match resp { Ok((resphead, payload)) => { if resphead.status.is_redirection() { reqhead.uri = resphead .headers .get(actix_http::http::header::LOCATION) .unwrap() .to_str() .unwrap() .parse::() .unwrap(); return deal_with_redirects( backend.clone(), reqhead, reqbody, addr, ) .await; } Ok(ClientResponse::new(resphead, payload)) } Err(e) => Err(e), } }) } deal_with_redirects(self.0.clone(), head, body, addr) } fn send_request_extra( &mut self, head: Rc, extra_headers: Option, body: Body, addr: Option, ) -> Pin>>> { // connect to the host let fut = self.0.call(ClientConnect { uri: head.uri.clone(), addr, }); Box::pin(async move { let connection = fut.await?; // send request let (head, payload) = connection .send_request(RequestHeadType::Rc(head, extra_headers), body) .await?; Ok(ClientResponse::new(head, payload)) }) } fn open_tunnel( &mut self, head: RequestHead, addr: Option, ) -> Pin< Box< dyn Future< Output = Result< (ResponseHead, Framed), SendRequestError, >, >, >, > { // connect to the host let fut = self.0.call(ClientConnect { uri: head.uri.clone(), addr, }); Box::pin(async move { let connection = fut.await?; // send request let (head, framed) = connection.open_tunnel(RequestHeadType::from(head)).await?; let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); Ok((head, framed)) }) } fn open_tunnel_extra( &mut self, head: Rc, extra_headers: Option, addr: Option, ) -> Pin< Box< dyn Future< Output = Result< (ResponseHead, Framed), SendRequestError, >, >, >, > { // connect to the host let fut = self.0.call(ClientConnect { uri: head.uri.clone(), addr, }); Box::pin(async move { let connection = fut.await?; // send request let (head, framed) = connection .open_tunnel(RequestHeadType::Rc(head, extra_headers)) .await?; let framed = framed.into_map_io(|io| BoxedSocket(Box::new(Socket(io)))); Ok((head, framed)) }) } } trait AsyncSocket { fn as_read(&self) -> &(dyn AsyncRead + Unpin); fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin); fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin); } struct Socket(T); impl AsyncSocket for Socket { fn as_read(&self) -> &(dyn AsyncRead + Unpin) { &self.0 } fn as_read_mut(&mut self) -> &mut (dyn AsyncRead + Unpin) { &mut self.0 } fn as_write(&mut self) -> &mut (dyn AsyncWrite + Unpin) { &mut self.0 } } pub struct BoxedSocket(Box); impl fmt::Debug for BoxedSocket { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "BoxedSocket") } } impl AsyncRead for BoxedSocket { unsafe fn prepare_uninitialized_buffer( &self, buf: &mut [mem::MaybeUninit], ) -> bool { self.0.as_read().prepare_uninitialized_buffer(buf) } fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { Pin::new(self.get_mut().0.as_read_mut()).poll_read(cx, buf) } } impl AsyncWrite for BoxedSocket { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(self.get_mut().0.as_write()).poll_write(cx, buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(self.get_mut().0.as_write()).poll_flush(cx) } fn poll_shutdown( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { Pin::new(self.get_mut().0.as_write()).poll_shutdown(cx) } }