Merge branch 'master' into normalize-path-docs

This commit is contained in:
Rob Ede 2020-12-22 23:26:59 +00:00 committed by GitHub
commit f3c91ed975
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 93 deletions

View File

@ -9,8 +9,9 @@ use std::time::{Duration, Instant};
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::time::{delay_for, Delay}; use actix_rt::time::{delay_for, Delay};
use actix_service::Service; use actix_service::Service;
use actix_utils::{oneshot, task::LocalWaker}; use actix_utils::task::LocalWaker;
use bytes::Bytes; use bytes::Bytes;
use futures_channel::oneshot;
use futures_util::future::{poll_fn, FutureExt, LocalBoxFuture}; use futures_util::future::{poll_fn, FutureExt, LocalBoxFuture};
use fxhash::FxHashMap; use fxhash::FxHashMap;
use h2::client::{Connection, SendRequest}; use h2::client::{Connection, SendRequest};

View File

@ -90,26 +90,20 @@ where
} }
fn call(&mut self, (param, req): (T, HttpRequest)) -> Self::Future { fn call(&mut self, (param, req): (T, HttpRequest)) -> Self::Future {
HandlerServiceResponse { let fut = self.hnd.call(param);
fut: self.hnd.call(param), HandlerServiceResponse::Future(fut, Some(req))
fut2: None,
req: Some(req),
}
} }
} }
#[doc(hidden)] #[doc(hidden)]
#[pin_project] #[pin_project(project = HandlerProj)]
pub struct HandlerServiceResponse<T, R> pub enum HandlerServiceResponse<T, R>
where where
T: Future<Output = R>, T: Future<Output = R>,
R: Responder, R: Responder,
{ {
#[pin] Future(#[pin] T, Option<HttpRequest>),
fut: T, Responder(#[pin] R::Future, Option<HttpRequest>),
#[pin]
fut2: Option<R::Future>,
req: Option<HttpRequest>,
} }
impl<T, R> Future for HandlerServiceResponse<T, R> impl<T, R> Future for HandlerServiceResponse<T, R>
@ -120,28 +114,26 @@ where
type Output = Result<ServiceResponse, Infallible>; type Output = Result<ServiceResponse, Infallible>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project(); loop {
match self.as_mut().project() {
if let Some(fut) = this.fut2.as_pin_mut() { HandlerProj::Future(fut, req) => {
return match fut.poll(cx) { let res = ready!(fut.poll(cx));
Poll::Ready(Ok(res)) => { let fut = res.respond_to(req.as_ref().unwrap());
Poll::Ready(Ok(ServiceResponse::new(this.req.take().unwrap(), res))) let state = HandlerServiceResponse::Responder(fut, req.take());
self.as_mut().set(state);
} }
Poll::Pending => Poll::Pending, HandlerProj::Responder(fut, req) => {
Poll::Ready(Err(e)) => { let res = ready!(fut.poll(cx));
let res: Response = e.into().into(); let req = req.take().unwrap();
Poll::Ready(Ok(ServiceResponse::new(this.req.take().unwrap(), res))) return match res {
Ok(res) => Poll::Ready(Ok(ServiceResponse::new(req, res))),
Err(e) => {
let res: Response = e.into().into();
Poll::Ready(Ok(ServiceResponse::new(req, res)))
}
};
} }
};
}
match this.fut.poll(cx) {
Poll::Ready(res) => {
let fut = res.respond_to(this.req.as_ref().unwrap());
self.as_mut().project().fut2.set(Some(fut));
self.poll(cx)
} }
Poll::Pending => Poll::Pending,
} }
} }
} }
@ -169,12 +161,12 @@ where
Error = Infallible, Error = Infallible,
> + Clone, > + Clone,
{ {
type Config = ();
type Request = ServiceRequest; type Request = ServiceRequest;
type Response = ServiceResponse; type Response = ServiceResponse;
type Error = (Error, ServiceRequest); type Error = Error;
type InitError = (); type Config = ();
type Service = ExtractService<T, S>; type Service = ExtractService<T, S>;
type InitError = ();
type Future = Ready<Result<Self::Service, ()>>; type Future = Ready<Result<Self::Service, ()>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
@ -200,7 +192,7 @@ where
{ {
type Request = ServiceRequest; type Request = ServiceRequest;
type Response = ServiceResponse; type Response = ServiceResponse;
type Error = (Error, ServiceRequest); type Error = Error;
type Future = ExtractResponse<T, S>; type Future = ExtractResponse<T, S>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
@ -210,24 +202,14 @@ where
fn call(&mut self, req: ServiceRequest) -> Self::Future { fn call(&mut self, req: ServiceRequest) -> Self::Future {
let (req, mut payload) = req.into_parts(); let (req, mut payload) = req.into_parts();
let fut = T::from_request(&req, &mut payload); let fut = T::from_request(&req, &mut payload);
ExtractResponse::Future(fut, Some(req), self.service.clone())
ExtractResponse {
fut,
req,
fut_s: None,
service: self.service.clone(),
}
} }
} }
#[pin_project] #[pin_project(project = ExtractProj)]
pub struct ExtractResponse<T: FromRequest, S: Service> { pub enum ExtractResponse<T: FromRequest, S: Service> {
req: HttpRequest, Future(#[pin] T::Future, Option<HttpRequest>, S),
service: S, Response(#[pin] S::Future),
#[pin]
fut: T::Future,
#[pin]
fut_s: Option<S::Future>,
} }
impl<T: FromRequest, S> Future for ExtractResponse<T, S> impl<T: FromRequest, S> Future for ExtractResponse<T, S>
@ -238,24 +220,26 @@ where
Error = Infallible, Error = Infallible,
>, >,
{ {
type Output = Result<ServiceResponse, (Error, ServiceRequest)>; type Output = Result<ServiceResponse, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project(); loop {
match self.as_mut().project() {
if let Some(fut) = this.fut_s.as_pin_mut() { ExtractProj::Future(fut, req, srv) => {
return fut.poll(cx).map_err(|_| panic!()); let res = ready!(fut.poll(cx));
} let req = req.take().unwrap();
match res {
match ready!(this.fut.poll(cx)) { Err(e) => {
Err(e) => { let req = ServiceRequest::new(req);
let req = ServiceRequest::new(this.req.clone()); return Poll::Ready(Ok(req.error_response(e.into())));
Poll::Ready(Err((e.into(), req))) }
} Ok(item) => {
Ok(item) => { let fut = srv.call((item, req));
let fut = Some(this.service.call((item, this.req.clone()))); self.as_mut().set(ExtractResponse::Response(fut));
self.as_mut().project().fut_s.set(fut); }
self.poll(cx) }
}
ExtractProj::Response(fut) => return fut.poll(cx).map_err(|_| panic!()),
} }
} }
} }

View File

@ -234,7 +234,7 @@ impl Route {
struct RouteNewService<T> struct RouteNewService<T>
where where
T: ServiceFactory<Request = ServiceRequest, Error = (Error, ServiceRequest)>, T: ServiceFactory<Request = ServiceRequest, Error = Error>,
{ {
service: T, service: T,
} }
@ -245,7 +245,7 @@ where
Config = (), Config = (),
Request = ServiceRequest, Request = ServiceRequest,
Response = ServiceResponse, Response = ServiceResponse,
Error = (Error, ServiceRequest), Error = Error,
>, >,
T::Future: 'static, T::Future: 'static,
T::Service: 'static, T::Service: 'static,
@ -262,7 +262,7 @@ where
Config = (), Config = (),
Request = ServiceRequest, Request = ServiceRequest,
Response = ServiceResponse, Response = ServiceResponse,
Error = (Error, ServiceRequest), Error = Error,
>, >,
T::Future: 'static, T::Future: 'static,
T::Service: 'static, T::Service: 'static,
@ -297,11 +297,7 @@ struct RouteServiceWrapper<T: Service> {
impl<T> Service for RouteServiceWrapper<T> impl<T> Service for RouteServiceWrapper<T>
where where
T::Future: 'static, T::Future: 'static,
T: Service< T: Service<Request = ServiceRequest, Response = ServiceResponse, Error = Error>,
Request = ServiceRequest,
Response = ServiceResponse,
Error = (Error, ServiceRequest),
>,
{ {
type Request = ServiceRequest; type Request = ServiceRequest;
type Response = ServiceResponse; type Response = ServiceResponse;
@ -309,27 +305,11 @@ where
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx).map_err(|(e, _)| e) self.service.poll_ready(cx)
} }
fn call(&mut self, req: ServiceRequest) -> Self::Future { fn call(&mut self, req: ServiceRequest) -> Self::Future {
// let mut fut = self.service.call(req); Box::pin(self.service.call(req))
self.service
.call(req)
.map(|res| match res {
Ok(res) => Ok(res),
Err((err, req)) => Ok(req.error_response(err)),
})
.boxed_local()
// match fut.poll() {
// Poll::Ready(Ok(res)) => Either::Left(ok(res)),
// Poll::Ready(Err((e, req))) => Either::Left(ok(req.error_response(e))),
// Poll::Pending => Either::Right(Box::new(fut.then(|res| match res {
// Ok(res) => Ok(res),
// Err((err, req)) => Ok(req.error_response(err)),
// }))),
// }
} }
} }