use std::{ future::Future, net, pin::Pin, rc::Rc, task::{Context, Poll}, }; use actix_codec::Framed; use actix_http::{h1::ClientCodec, Payload, RequestHead, RequestHeadType, ResponseHead}; use actix_service::Service; use futures_core::{future::LocalBoxFuture, ready}; use crate::{ any_body::AnyBody, client::{ Connect as ClientConnect, ConnectError, Connection, ConnectionIo, SendRequestError, ServerName, }, ClientResponse, }; pub type BoxConnectorService = Rc< dyn Service< ConnectRequest, Response = ConnectResponse, Error = SendRequestError, Future = LocalBoxFuture<'static, Result>, >, >; pub type BoxedSocket = Box; /// Combined HTTP and WebSocket request type received by connection service. pub enum ConnectRequest { /// Standard HTTP request. /// /// Contains the request head, body type, optional pre-resolved socket address and optional sni host. Client( RequestHeadType, AnyBody, Option, Option, ), /// Tunnel used by WebSocket connection requests. /// /// Contains the request head, optional pre-resolved socket address and optional sni host. Tunnel(RequestHead, Option, Option), } /// Combined HTTP response & WebSocket tunnel type returned from connection service. pub enum ConnectResponse { /// Standard HTTP response. Client(ClientResponse), /// Tunnel used for WebSocket communication. /// /// Contains response head and framed HTTP/1.1 codec. Tunnel(ResponseHead, Framed), } impl ConnectResponse { /// Unwraps type into HTTP response. /// /// # Panics /// Panics if enum variant is not `Client`. pub fn into_client_response(self) -> ClientResponse { match self { ConnectResponse::Client(res) => res, _ => { panic!("ClientResponse only reachable with ConnectResponse::ClientResponse variant") } } } /// Unwraps type into WebSocket tunnel response. /// /// # Panics /// Panics if enum variant is not `Tunnel`. pub fn into_tunnel_response(self) -> (ResponseHead, Framed) { match self { ConnectResponse::Tunnel(head, framed) => (head, framed), _ => { panic!("TunnelResponse only reachable with ConnectResponse::TunnelResponse variant") } } } } pub struct DefaultConnector { connector: S, } impl DefaultConnector { pub(crate) fn new(connector: S) -> Self { Self { connector } } } impl Service for DefaultConnector where S: Service>, Io: ConnectionIo, { type Response = ConnectResponse; type Error = SendRequestError; type Future = ConnectRequestFuture; actix_service::forward_ready!(connector); fn call(&self, req: ConnectRequest) -> Self::Future { // connect to the host let (head, addr, sni_host) = match req { ConnectRequest::Client(ref head, .., addr, ref sni_host) => { (head.as_ref(), addr, sni_host.clone()) } ConnectRequest::Tunnel(ref head, addr, ref sni_host) => (head, addr, sni_host.clone()), }; let authority = if let Some(authority) = head.uri.authority() { authority } else { return ConnectRequestFuture::Error { err: ConnectError::Unresolved, }; }; let tls = match head.uri.scheme_str() { Some("https") | Some("wss") => true, _ => false, }; let fut = self.connector.call(ClientConnect { hostname: authority.host().to_string(), port: authority.port().map(|p| p.as_u16()).unwrap_or_else(|| { if tls { 443 } else { 80 } }), tls, sni_host, addr, }); ConnectRequestFuture::Connection { fut, req: Some(req), } } } pin_project_lite::pin_project! { #[project = ConnectRequestProj] pub enum ConnectRequestFuture where Io: ConnectionIo { Error { err: ConnectError }, Connection { #[pin] fut: Fut, req: Option }, Client { fut: LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> }, Tunnel { fut: LocalBoxFuture< 'static, Result<(ResponseHead, Framed, ClientCodec>), SendRequestError>, >, } } } impl Future for ConnectRequestFuture where Fut: Future, ConnectError>>, Io: ConnectionIo, { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.as_mut().project() { ConnectRequestProj::Connection { fut, req } => { let connection = ready!(fut.poll(cx))?; let req = req.take().unwrap(); match req { ConnectRequest::Client(head, body, ..) => { // send request let fut = ConnectRequestFuture::Client { fut: connection.send_request(head, body), }; self.set(fut); } ConnectRequest::Tunnel(head, ..) => { // send request let fut = ConnectRequestFuture::Tunnel { fut: connection.open_tunnel(RequestHeadType::from(head)), }; self.set(fut); } } self.poll(cx) } ConnectRequestProj::Client { fut } => { let (head, payload) = ready!(fut.as_mut().poll(cx))?; Poll::Ready(Ok(ConnectResponse::Client(ClientResponse::new( head, payload, )))) } ConnectRequestProj::Tunnel { fut } => { let (head, framed) = ready!(fut.as_mut().poll(cx))?; let framed = framed.into_map_io(|io| Box::new(io) as _); Poll::Ready(Ok(ConnectResponse::Tunnel(head, framed))) } ConnectRequestProj::Error { .. } => { Poll::Ready(Err(SendRequestError::Connect(ConnectError::Unresolved))) } } } }