diff --git a/actix-http/src/client/pool.rs b/actix-http/src/client/pool.rs index 08abc6277..a8687dbeb 100644 --- a/actix-http/src/client/pool.rs +++ b/actix-http/src/client/pool.rs @@ -9,8 +9,9 @@ use std::time::{Duration, Instant}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::time::{delay_for, Delay}; use actix_service::Service; -use actix_utils::{oneshot, task::LocalWaker}; +use actix_utils::task::LocalWaker; use bytes::Bytes; +use futures_channel::oneshot; use futures_util::future::{poll_fn, FutureExt, LocalBoxFuture}; use fxhash::FxHashMap; use h2::client::{Connection, SendRequest}; diff --git a/src/handler.rs b/src/handler.rs index 669512ab3..0dc06b3ce 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -90,26 +90,20 @@ where } fn call(&mut self, (param, req): (T, HttpRequest)) -> Self::Future { - HandlerServiceResponse { - fut: self.hnd.call(param), - fut2: None, - req: Some(req), - } + let fut = self.hnd.call(param); + HandlerServiceResponse::Future(fut, Some(req)) } } #[doc(hidden)] -#[pin_project] -pub struct HandlerServiceResponse +#[pin_project(project = HandlerProj)] +pub enum HandlerServiceResponse where T: Future, R: Responder, { - #[pin] - fut: T, - #[pin] - fut2: Option, - req: Option, + Future(#[pin] T, Option), + Responder(#[pin] R::Future, Option), } impl Future for HandlerServiceResponse @@ -120,28 +114,26 @@ where type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - if let Some(fut) = this.fut2.as_pin_mut() { - return match fut.poll(cx) { - Poll::Ready(Ok(res)) => { - Poll::Ready(Ok(ServiceResponse::new(this.req.take().unwrap(), res))) + loop { + match self.as_mut().project() { + HandlerProj::Future(fut, req) => { + let res = ready!(fut.poll(cx)); + let fut = res.respond_to(req.as_ref().unwrap()); + let state = HandlerServiceResponse::Responder(fut, req.take()); + self.as_mut().set(state); } - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => { - let res: Response = e.into().into(); - Poll::Ready(Ok(ServiceResponse::new(this.req.take().unwrap(), res))) + HandlerProj::Responder(fut, req) => { + let res = ready!(fut.poll(cx)); + let req = req.take().unwrap(); + return match res { + Ok(res) => Poll::Ready(Ok(ServiceResponse::new(req, res))), + Err(e) => { + let res: Response = e.into().into(); + Poll::Ready(Ok(ServiceResponse::new(req, res))) + } + }; } - }; - } - - match this.fut.poll(cx) { - Poll::Ready(res) => { - let fut = res.respond_to(this.req.as_ref().unwrap()); - self.as_mut().project().fut2.set(Some(fut)); - self.poll(cx) } - Poll::Pending => Poll::Pending, } } } @@ -169,12 +161,12 @@ where Error = Infallible, > + Clone, { - type Config = (); type Request = ServiceRequest; type Response = ServiceResponse; - type Error = (Error, ServiceRequest); - type InitError = (); + type Error = Error; + type Config = (); type Service = ExtractService; + type InitError = (); type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { @@ -200,7 +192,7 @@ where { type Request = ServiceRequest; type Response = ServiceResponse; - type Error = (Error, ServiceRequest); + type Error = Error; type Future = ExtractResponse; fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { @@ -210,24 +202,14 @@ where fn call(&mut self, req: ServiceRequest) -> Self::Future { let (req, mut payload) = req.into_parts(); let fut = T::from_request(&req, &mut payload); - - ExtractResponse { - fut, - req, - fut_s: None, - service: self.service.clone(), - } + ExtractResponse::Future(fut, Some(req), self.service.clone()) } } -#[pin_project] -pub struct ExtractResponse { - req: HttpRequest, - service: S, - #[pin] - fut: T::Future, - #[pin] - fut_s: Option, +#[pin_project(project = ExtractProj)] +pub enum ExtractResponse { + Future(#[pin] T::Future, Option, S), + Response(#[pin] S::Future), } impl Future for ExtractResponse @@ -238,24 +220,26 @@ where Error = Infallible, >, { - type Output = Result; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - if let Some(fut) = this.fut_s.as_pin_mut() { - return fut.poll(cx).map_err(|_| panic!()); - } - - match ready!(this.fut.poll(cx)) { - Err(e) => { - let req = ServiceRequest::new(this.req.clone()); - Poll::Ready(Err((e.into(), req))) - } - Ok(item) => { - let fut = Some(this.service.call((item, this.req.clone()))); - self.as_mut().project().fut_s.set(fut); - self.poll(cx) + loop { + match self.as_mut().project() { + ExtractProj::Future(fut, req, srv) => { + let res = ready!(fut.poll(cx)); + let req = req.take().unwrap(); + match res { + Err(e) => { + let req = ServiceRequest::new(req); + return Poll::Ready(Ok(req.error_response(e.into()))); + } + Ok(item) => { + let fut = srv.call((item, req)); + self.as_mut().set(ExtractResponse::Response(fut)); + } + } + } + ExtractProj::Response(fut) => return fut.poll(cx).map_err(|_| panic!()), } } } diff --git a/src/route.rs b/src/route.rs index 45efd9e3c..f8ef458f9 100644 --- a/src/route.rs +++ b/src/route.rs @@ -234,7 +234,7 @@ impl Route { struct RouteNewService where - T: ServiceFactory, + T: ServiceFactory, { service: T, } @@ -245,7 +245,7 @@ where Config = (), Request = ServiceRequest, Response = ServiceResponse, - Error = (Error, ServiceRequest), + Error = Error, >, T::Future: 'static, T::Service: 'static, @@ -262,7 +262,7 @@ where Config = (), Request = ServiceRequest, Response = ServiceResponse, - Error = (Error, ServiceRequest), + Error = Error, >, T::Future: 'static, T::Service: 'static, @@ -297,11 +297,7 @@ struct RouteServiceWrapper { impl Service for RouteServiceWrapper where T::Future: 'static, - T: Service< - Request = ServiceRequest, - Response = ServiceResponse, - Error = (Error, ServiceRequest), - >, + T: Service, { type Request = ServiceRequest; type Response = ServiceResponse; @@ -309,27 +305,11 @@ where type Future = LocalBoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.service.poll_ready(cx).map_err(|(e, _)| e) + self.service.poll_ready(cx) } fn call(&mut self, req: ServiceRequest) -> Self::Future { - // let mut fut = self.service.call(req); - self.service - .call(req) - .map(|res| match res { - Ok(res) => Ok(res), - Err((err, req)) => Ok(req.error_response(err)), - }) - .boxed_local() - - // match fut.poll() { - // Poll::Ready(Ok(res)) => Either::Left(ok(res)), - // Poll::Ready(Err((e, req))) => Either::Left(ok(req.error_response(e))), - // Poll::Pending => Either::Right(Box::new(fut.then(|res| match res { - // Ok(res) => Ok(res), - // Err((err, req)) => Ok(req.error_response(err)), - // }))), - // } + Box::pin(self.service.call(req)) } }