From 79de04d8625deff301fe76fdf8649543f2976ae5 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 20 Dec 2020 00:33:34 +0800 Subject: [PATCH 1/3] optimise Extract service (#1841) --- src/handler.rs | 110 +++++++++++++++++++++---------------------------- 1 file changed, 47 insertions(+), 63 deletions(-) diff --git a/src/handler.rs b/src/handler.rs index 669512ab3..db6c5ce0a 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -90,26 +90,20 @@ where } fn call(&mut self, (param, req): (T, HttpRequest)) -> Self::Future { - HandlerServiceResponse { - fut: self.hnd.call(param), - fut2: None, - req: Some(req), - } + let fut = self.hnd.call(param); + HandlerServiceResponse::Future(fut, Some(req)) } } #[doc(hidden)] -#[pin_project] -pub struct HandlerServiceResponse +#[pin_project(project = HandlerProj)] +pub enum HandlerServiceResponse where T: Future, R: Responder, { - #[pin] - fut: T, - #[pin] - fut2: Option, - req: Option, + Future(#[pin] T, Option), + Responder(#[pin] R::Future, Option), } impl Future for HandlerServiceResponse @@ -120,28 +114,26 @@ where type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - if let Some(fut) = this.fut2.as_pin_mut() { - return match fut.poll(cx) { - Poll::Ready(Ok(res)) => { - Poll::Ready(Ok(ServiceResponse::new(this.req.take().unwrap(), res))) + loop { + match self.as_mut().project() { + HandlerProj::Future(fut, req) => { + let res = ready!(fut.poll(cx)); + let fut = res.respond_to(req.as_ref().unwrap()); + let state = HandlerServiceResponse::Responder(fut, req.take()); + self.as_mut().set(state); } - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => { - let res: Response = e.into().into(); - Poll::Ready(Ok(ServiceResponse::new(this.req.take().unwrap(), res))) + HandlerProj::Responder(fut, req) => { + let res = ready!(fut.poll(cx)); + let req = req.take().unwrap(); + return match res { + Ok(res) => Poll::Ready(Ok(ServiceResponse::new(req, res))), + Err(e) => { + let res: Response = e.into().into(); + Poll::Ready(Ok(ServiceResponse::new(req, res))) + } + }; } - }; - } - - match this.fut.poll(cx) { - Poll::Ready(res) => { - let fut = res.respond_to(this.req.as_ref().unwrap()); - self.as_mut().project().fut2.set(Some(fut)); - self.poll(cx) } - Poll::Pending => Poll::Pending, } } } @@ -169,12 +161,12 @@ where Error = Infallible, > + Clone, { - type Config = (); type Request = ServiceRequest; type Response = ServiceResponse; type Error = (Error, ServiceRequest); - type InitError = (); + type Config = (); type Service = ExtractService; + type InitError = (); type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { @@ -210,24 +202,14 @@ where fn call(&mut self, req: ServiceRequest) -> Self::Future { let (req, mut payload) = req.into_parts(); let fut = T::from_request(&req, &mut payload); - - ExtractResponse { - fut, - req, - fut_s: None, - service: self.service.clone(), - } + ExtractResponse::Future(fut, Some(req), self.service.clone()) } } -#[pin_project] -pub struct ExtractResponse { - req: HttpRequest, - service: S, - #[pin] - fut: T::Future, - #[pin] - fut_s: Option, +#[pin_project(project = ExtractProj)] +pub enum ExtractResponse { + Future(#[pin] T::Future, Option, S), + Response(#[pin] S::Future), } impl Future for ExtractResponse @@ -241,21 +223,23 @@ where type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - if let Some(fut) = this.fut_s.as_pin_mut() { - return fut.poll(cx).map_err(|_| panic!()); - } - - match ready!(this.fut.poll(cx)) { - Err(e) => { - let req = ServiceRequest::new(this.req.clone()); - Poll::Ready(Err((e.into(), req))) - } - Ok(item) => { - let fut = Some(this.service.call((item, this.req.clone()))); - self.as_mut().project().fut_s.set(fut); - self.poll(cx) + loop { + match self.as_mut().project() { + ExtractProj::Future(fut, req, srv) => { + let res = ready!(fut.poll(cx)); + let req = req.take().unwrap(); + match res { + Err(e) => { + let req = ServiceRequest::new(req); + return Poll::Ready(Err((e.into(), req))); + } + Ok(item) => { + let fut = srv.call((item, req)); + self.as_mut().set(ExtractResponse::Response(fut)); + } + } + } + ExtractProj::Response(fut) => return fut.poll(cx).map_err(|_| panic!()), } } } From 6cbf27508af7e8c38d2fe174fdb26062b35340ed Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 20 Dec 2020 10:20:29 +0800 Subject: [PATCH 2/3] simplify ExtractService's return type (#1842) --- src/handler.rs | 8 ++++---- src/route.rs | 32 ++++++-------------------------- 2 files changed, 10 insertions(+), 30 deletions(-) diff --git a/src/handler.rs b/src/handler.rs index db6c5ce0a..0dc06b3ce 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -163,7 +163,7 @@ where { type Request = ServiceRequest; type Response = ServiceResponse; - type Error = (Error, ServiceRequest); + type Error = Error; type Config = (); type Service = ExtractService; type InitError = (); @@ -192,7 +192,7 @@ where { type Request = ServiceRequest; type Response = ServiceResponse; - type Error = (Error, ServiceRequest); + type Error = Error; type Future = ExtractResponse; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { @@ -220,7 +220,7 @@ where Error = Infallible, >, { - type Output = Result; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { @@ -231,7 +231,7 @@ where match res { Err(e) => { let req = ServiceRequest::new(req); - return Poll::Ready(Err((e.into(), req))); + return Poll::Ready(Ok(req.error_response(e.into()))); } Ok(item) => { let fut = srv.call((item, req)); diff --git a/src/route.rs b/src/route.rs index 45efd9e3c..f8ef458f9 100644 --- a/src/route.rs +++ b/src/route.rs @@ -234,7 +234,7 @@ impl Route { struct RouteNewService where - T: ServiceFactory, + T: ServiceFactory, { service: T, } @@ -245,7 +245,7 @@ where Config = (), Request = ServiceRequest, Response = ServiceResponse, - Error = (Error, ServiceRequest), + Error = Error, >, T::Future: 'static, T::Service: 'static, @@ -262,7 +262,7 @@ where Config = (), Request = ServiceRequest, Response = ServiceResponse, - Error = (Error, ServiceRequest), + Error = Error, >, T::Future: 'static, T::Service: 'static, @@ -297,11 +297,7 @@ struct RouteServiceWrapper { impl Service for RouteServiceWrapper where T::Future: 'static, - T: Service< - Request = ServiceRequest, - Response = ServiceResponse, - Error = (Error, ServiceRequest), - >, + T: Service, { type Request = ServiceRequest; type Response = ServiceResponse; @@ -309,27 +305,11 @@ where type Future = LocalBoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.service.poll_ready(cx).map_err(|(e, _)| e) + self.service.poll_ready(cx) } fn call(&mut self, req: ServiceRequest) -> Self::Future { - // let mut fut = self.service.call(req); - self.service - .call(req) - .map(|res| match res { - Ok(res) => Ok(res), - Err((err, req)) => Ok(req.error_response(err)), - }) - .boxed_local() - - // match fut.poll() { - // Poll::Ready(Ok(res)) => Either::Left(ok(res)), - // Poll::Ready(Err((e, req))) => Either::Left(ok(req.error_response(e))), - // Poll::Pending => Either::Right(Box::new(fut.then(|res| match res { - // Ok(res) => Ok(res), - // Err((err, req)) => Ok(req.error_response(err)), - // }))), - // } + Box::pin(self.service.call(req)) } } From 95ccf1c9bc09e24f11126c6e48927d8e5ea3cf9c Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 22 Dec 2020 00:42:20 +0800 Subject: [PATCH 3/3] replace actix_utils::oneshot with futures_channle::oneshot (#1844) --- actix-http/src/client/pool.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 08abc6277..a8687dbeb 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -9,8 +9,9 @@ use std::time::{Duration, Instant}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::time::{delay_for, Delay}; use actix_service::Service; -use actix_utils::{oneshot, task::LocalWaker}; +use actix_utils::task::LocalWaker; use bytes::Bytes; +use futures_channel::oneshot; use futures_util::future::{poll_fn, FutureExt, LocalBoxFuture}; use fxhash::FxHashMap; use h2::client::{Connection, SendRequest};