From 1281a748d03aa63fb7662c638d725d91b29a44c7 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 29 Mar 2021 19:06:16 -0700 Subject: [PATCH] merge H1ServiceHandler requests into HttpServiceHandler (#2126) --- actix-http/src/h1/service.rs | 68 +-------- actix-http/src/service.rs | 260 ++++++++++++----------------------- 2 files changed, 95 insertions(+), 233 deletions(-) diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 4fe79736..f915bfa4 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -6,7 +6,7 @@ use std::{fmt, net}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; -use futures_core::{future::LocalBoxFuture, ready}; +use futures_core::future::LocalBoxFuture; use futures_util::future::ready; use crate::body::MessageBody; @@ -14,7 +14,7 @@ use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::request::Request; use crate::response::Response; -use crate::service::HttpFlow; +use crate::service::HttpServiceHandler; use crate::{ConnectCallback, OnConnectData}; use super::codec::Codec; @@ -315,47 +315,10 @@ where } /// `Service` implementation for HTTP/1 transport -pub struct H1ServiceHandler<T, S, B, X, U> -where - S: Service<Request>, - X: Service<Request>, - U: Service<(Request, Framed<T, Codec>)>, -{ - flow: Rc<HttpFlow<S, X, U>>, - on_connect_ext: Option<Rc<ConnectCallback<T>>>, - cfg: ServiceConfig, - _phantom: PhantomData<B>, -} - -impl<T, S, B, X, U> H1ServiceHandler<T, S, B, X, U> -where - S: Service<Request>, - S::Error: Into<Error>, - S::Response: Into<Response<B>>, - B: MessageBody, - X: Service<Request, Response = Request>, - X::Error: Into<Error>, - U: Service<(Request, Framed<T, Codec>), Response = ()>, - U::Error: fmt::Display, -{ - fn new( - cfg: ServiceConfig, - service: S, - expect: X, - upgrade: Option<U>, - on_connect_ext: Option<Rc<ConnectCallback<T>>>, - ) -> H1ServiceHandler<T, S, B, X, U> { - H1ServiceHandler { - flow: HttpFlow::new(service, expect, upgrade), - cfg, - on_connect_ext, - _phantom: PhantomData, - } - } -} +pub type H1ServiceHandler<T, S, B, X, U> = HttpServiceHandler<T, S, B, X, U>; impl<T, S, B, X, U> Service<(T, Option<net::SocketAddr>)> - for H1ServiceHandler<T, S, B, X, U> + for HttpServiceHandler<T, S, B, X, U> where T: AsyncRead + AsyncWrite + Unpin, S: Service<Request>, @@ -372,27 +335,10 @@ where type Future = Dispatcher<T, S, B, X, U>; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - ready!(self.flow.expect.poll_ready(cx)).map_err(|e| { - let e = e.into(); - log::error!("Http expect service readiness error: {:?}", e); + self._poll_ready(cx).map_err(|e| { + log::error!("HTTP/1 service readiness error: {:?}", e); DispatchError::Service(e) - })?; - - if let Some(ref upg) = self.flow.upgrade { - ready!(upg.poll_ready(cx)).map_err(|e| { - let e = e.into(); - log::error!("Http upgrade service readiness error: {:?}", e); - DispatchError::Service(e) - })?; - }; - - ready!(self.flow.service.poll_ready(cx)).map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })?; - - Poll::Ready(Ok(())) + }) } fn call(&self, (io, addr): (T, Option<net::SocketAddr>)) -> Self::Future { diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index 1a06cec3..fd97fb5e 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -12,7 +12,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_rt::net::TcpStream; use actix_service::{pipeline_factory, IntoServiceFactory, Service, ServiceFactory}; use bytes::Bytes; -use futures_core::ready; +use futures_core::{future::LocalBoxFuture, ready}; use h2::server::{handshake, Handshake}; use pin_project::pin_project; @@ -107,7 +107,6 @@ where X1: ServiceFactory<Request, Config = (), Response = Request>, X1::Error: Into<Error>, X1::InitError: fmt::Debug, - <X1::Service as Service<Request>>::Future: 'static, { HttpService { expect, @@ -128,7 +127,6 @@ where U1: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>, U1::Error: fmt::Display, U1::InitError: fmt::Debug, - <U1::Service as Service<(Request, Framed<T, h1::Codec>)>>::Future: 'static, { HttpService { upgrade, @@ -150,23 +148,24 @@ where impl<S, B, X, U> HttpService<TcpStream, S, B, X, U> where S: ServiceFactory<Request, Config = ()>, + S::Future: 'static, S::Error: Into<Error> + 'static, S::InitError: fmt::Debug, S::Response: Into<Response<B>> + 'static, <S::Service as Service<Request>>::Future: 'static, B: MessageBody + 'static, X: ServiceFactory<Request, Config = (), Response = Request>, + X::Future: 'static, X::Error: Into<Error>, X::InitError: fmt::Debug, - <X::Service as Service<Request>>::Future: 'static, U: ServiceFactory< (Request, Framed<TcpStream, h1::Codec>), Config = (), Response = (), >, + U::Future: 'static, U::Error: fmt::Display + Into<Error>, U::InitError: fmt::Debug, - <U::Service as Service<(Request, Framed<TcpStream, h1::Codec>)>>::Future: 'static, { /// Create simple tcp stream service pub fn tcp( @@ -196,23 +195,24 @@ mod openssl { impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U> where S: ServiceFactory<Request, Config = ()>, + S::Future: 'static, S::Error: Into<Error> + 'static, S::InitError: fmt::Debug, S::Response: Into<Response<B>> + 'static, <S::Service as Service<Request>>::Future: 'static, B: MessageBody + 'static, X: ServiceFactory<Request, Config = (), Response = Request>, + X::Future: 'static, X::Error: Into<Error>, X::InitError: fmt::Debug, - <X::Service as Service<Request>>::Future: 'static, U: ServiceFactory< (Request, Framed<TlsStream<TcpStream>, h1::Codec>), Config = (), Response = (), >, + U::Future: 'static, U::Error: fmt::Display + Into<Error>, U::InitError: fmt::Debug, - <U::Service as Service<(Request, Framed<TlsStream<TcpStream>, h1::Codec>)>>::Future: 'static, { /// Create openssl based service pub fn openssl( @@ -261,23 +261,24 @@ mod rustls { impl<S, B, X, U> HttpService<TlsStream<TcpStream>, S, B, X, U> where S: ServiceFactory<Request, Config = ()>, + S::Future: 'static, S::Error: Into<Error> + 'static, S::InitError: fmt::Debug, S::Response: Into<Response<B>> + 'static, <S::Service as Service<Request>>::Future: 'static, B: MessageBody + 'static, X: ServiceFactory<Request, Config = (), Response = Request>, + X::Future: 'static, X::Error: Into<Error>, X::InitError: fmt::Debug, - <X::Service as Service<Request>>::Future: 'static, U: ServiceFactory< (Request, Framed<TlsStream<TcpStream>, h1::Codec>), Config = (), Response = (), >, + U::Future: 'static, U::Error: fmt::Display + Into<Error>, U::InitError: fmt::Debug, - <U::Service as Service<(Request, Framed<TlsStream<TcpStream>, h1::Codec>)>>::Future: 'static, { /// Create openssl based service pub fn rustls( @@ -319,137 +320,117 @@ mod rustls { impl<T, S, B, X, U> ServiceFactory<(T, Protocol, Option<net::SocketAddr>)> for HttpService<T, S, B, X, U> where - T: AsyncRead + AsyncWrite + Unpin, + T: AsyncRead + AsyncWrite + Unpin + 'static, S: ServiceFactory<Request, Config = ()>, + S::Future: 'static, S::Error: Into<Error> + 'static, S::InitError: fmt::Debug, S::Response: Into<Response<B>> + 'static, <S::Service as Service<Request>>::Future: 'static, B: MessageBody + 'static, X: ServiceFactory<Request, Config = (), Response = Request>, + X::Future: 'static, X::Error: Into<Error>, X::InitError: fmt::Debug, - <X::Service as Service<Request>>::Future: 'static, U: ServiceFactory<(Request, Framed<T, h1::Codec>), Config = (), Response = ()>, + U::Future: 'static, U::Error: fmt::Display + Into<Error>, U::InitError: fmt::Debug, - <U::Service as Service<(Request, Framed<T, h1::Codec>)>>::Future: 'static, { type Response = (); type Error = DispatchError; type Config = (); type Service = HttpServiceHandler<T, S::Service, B, X::Service, U::Service>; type InitError = (); - type Future = HttpServiceResponse<T, S, B, X, U>; + type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>; fn new_service(&self, _: ()) -> Self::Future { - HttpServiceResponse { - fut: self.srv.new_service(()), - fut_ex: Some(self.expect.new_service(())), - fut_upg: self.upgrade.as_ref().map(|f| f.new_service(())), - expect: None, - upgrade: None, - on_connect_ext: self.on_connect_ext.clone(), - cfg: self.cfg.clone(), - _phantom: PhantomData, - } - } -} + let service = self.srv.new_service(()); + let expect = self.expect.new_service(()); + let upgrade = self.upgrade.as_ref().map(|s| s.new_service(())); + let on_connect_ext = self.on_connect_ext.clone(); + let cfg = self.cfg.clone(); -#[doc(hidden)] -#[pin_project] -pub struct HttpServiceResponse<T, S, B, X, U> -where - S: ServiceFactory<Request>, - X: ServiceFactory<Request>, - U: ServiceFactory<(Request, Framed<T, h1::Codec>)>, -{ - #[pin] - fut: S::Future, - #[pin] - fut_ex: Option<X::Future>, - #[pin] - fut_upg: Option<U::Future>, - expect: Option<X::Service>, - upgrade: Option<U::Service>, - on_connect_ext: Option<Rc<ConnectCallback<T>>>, - cfg: ServiceConfig, - _phantom: PhantomData<B>, -} + Box::pin(async move { + let expect = expect + .await + .map_err(|e| log::error!("Init http expect service error: {:?}", e))?; -impl<T, S, B, X, U> Future for HttpServiceResponse<T, S, B, X, U> -where - T: AsyncRead + AsyncWrite + Unpin, - S: ServiceFactory<Request>, - S::Error: Into<Error> + 'static, - S::InitError: fmt::Debug, - S::Response: Into<Response<B>> + 'static, - <S::Service as Service<Request>>::Future: 'static, - B: MessageBody + 'static, - X: ServiceFactory<Request, Response = Request>, - X::Error: Into<Error>, - X::InitError: fmt::Debug, - <X::Service as Service<Request>>::Future: 'static, - U: ServiceFactory<(Request, Framed<T, h1::Codec>), Response = ()>, - U::Error: fmt::Display, - U::InitError: fmt::Debug, - <U::Service as Service<(Request, Framed<T, h1::Codec>)>>::Future: 'static, -{ - type Output = - Result<HttpServiceHandler<T, S::Service, B, X::Service, U::Service>, ()>; + let upgrade = match upgrade { + Some(upgrade) => { + let upgrade = upgrade.await.map_err(|e| { + log::error!("Init http upgrade service error: {:?}", e) + })?; + Some(upgrade) + } + None => None, + }; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut this = self.as_mut().project(); + let service = service + .await + .map_err(|e| log::error!("Init http service error: {:?}", e))?; - if let Some(fut) = this.fut_ex.as_pin_mut() { - let expect = ready!(fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e)))?; - this = self.as_mut().project(); - *this.expect = Some(expect); - this.fut_ex.set(None); - } - - if let Some(fut) = this.fut_upg.as_pin_mut() { - let upgrade = ready!(fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e)))?; - this = self.as_mut().project(); - *this.upgrade = Some(upgrade); - this.fut_upg.set(None); - } - - let result = ready!(this - .fut - .poll(cx) - .map_err(|e| log::error!("Init http service error: {:?}", e))); - - Poll::Ready(result.map(|service| { - let this = self.as_mut().project(); - HttpServiceHandler::new( - this.cfg.clone(), + Ok(HttpServiceHandler::new( + cfg, service, - this.expect.take().unwrap(), - this.upgrade.take(), - this.on_connect_ext.clone(), - ) - })) + expect, + upgrade, + on_connect_ext, + )) + }) } } -/// `Service` implementation for HTTP transport +/// `Service` implementation for HTTP/1 and HTTP/2 transport pub struct HttpServiceHandler<T, S, B, X, U> where S: Service<Request>, X: Service<Request>, U: Service<(Request, Framed<T, h1::Codec>)>, { - flow: Rc<HttpFlow<S, X, U>>, - cfg: ServiceConfig, - on_connect_ext: Option<Rc<ConnectCallback<T>>>, + pub(super) flow: Rc<HttpFlow<S, X, U>>, + pub(super) cfg: ServiceConfig, + pub(super) on_connect_ext: Option<Rc<ConnectCallback<T>>>, _phantom: PhantomData<B>, } +impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U> +where + S: Service<Request>, + S::Error: Into<Error>, + X: Service<Request>, + X::Error: Into<Error>, + U: Service<(Request, Framed<T, h1::Codec>)>, + U::Error: Into<Error>, +{ + pub(super) fn new( + cfg: ServiceConfig, + service: S, + expect: X, + upgrade: Option<U>, + on_connect_ext: Option<Rc<ConnectCallback<T>>>, + ) -> HttpServiceHandler<T, S, B, X, U> { + HttpServiceHandler { + cfg, + on_connect_ext, + flow: HttpFlow::new(service, expect, upgrade), + _phantom: PhantomData, + } + } + + pub(super) fn _poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> { + ready!(self.flow.expect.poll_ready(cx).map_err(Into::into))?; + + ready!(self.flow.service.poll_ready(cx).map_err(Into::into))?; + + if let Some(ref upg) = self.flow.upgrade { + ready!(upg.poll_ready(cx).map_err(Into::into))?; + }; + + Poll::Ready(Ok(())) + } +} + /// A collection of services that describe an HTTP request flow. pub(super) struct HttpFlow<S, X, U> { pub(super) service: S, @@ -467,34 +448,6 @@ impl<S, X, U> HttpFlow<S, X, U> { } } -impl<T, S, B, X, U> HttpServiceHandler<T, S, B, X, U> -where - S: Service<Request>, - S::Error: Into<Error> + 'static, - S::Future: 'static, - S::Response: Into<Response<B>> + 'static, - B: MessageBody + 'static, - X: Service<Request, Response = Request>, - X::Error: Into<Error>, - U: Service<(Request, Framed<T, h1::Codec>), Response = ()>, - U::Error: fmt::Display, -{ - fn new( - cfg: ServiceConfig, - service: S, - expect: X, - upgrade: Option<U>, - on_connect_ext: Option<Rc<ConnectCallback<T>>>, - ) -> HttpServiceHandler<T, S, B, X, U> { - HttpServiceHandler { - cfg, - on_connect_ext, - flow: HttpFlow::new(service, expect, upgrade), - _phantom: PhantomData, - } - } -} - impl<T, S, B, X, U> Service<(T, Protocol, Option<net::SocketAddr>)> for HttpServiceHandler<T, S, B, X, U> where @@ -514,47 +467,10 @@ where type Future = HttpServiceHandlerResponse<T, S, B, X, U>; fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - let ready = self - .flow - .expect - .poll_ready(cx) - .map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })? - .is_ready(); - - let ready = self - .flow - .service - .poll_ready(cx) - .map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })? - .is_ready() - && ready; - - let ready = if let Some(ref upg) = self.flow.upgrade { - upg.poll_ready(cx) - .map_err(|e| { - let e = e.into(); - log::error!("Http service readiness error: {:?}", e); - DispatchError::Service(e) - })? - .is_ready() - && ready - } else { - ready - }; - - if ready { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } + self._poll_ready(cx).map_err(|e| { + log::error!("HTTP service readiness error: {:?}", e); + DispatchError::Service(e) + }) } fn call(