Merge branch 'master' into feat/tokio-blocking-pool

This commit is contained in:
Rob Ede 2021-01-06 18:17:24 +00:00 committed by GitHub
commit 77c2edf762
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 90 additions and 218 deletions

View File

@ -458,11 +458,12 @@ where
Error = Error, Error = Error,
InitError = (), InitError = (),
>, >,
T::Future: 'static,
{ {
fn into_factory(self) -> AppInit<T, B> { fn into_factory(self) -> AppInit<T, B> {
AppInit { AppInit {
data: self.data.into_boxed_slice().into(), data_factories: self.data.into_boxed_slice().into(),
data_factories: self.data_factories.into_boxed_slice().into(), async_data_factories: self.data_factories.into_boxed_slice().into(),
endpoint: self.endpoint, endpoint: self.endpoint,
services: Rc::new(RefCell::new(self.services)), services: Rc::new(RefCell::new(self.services)),
external: RefCell::new(self.external), external: RefCell::new(self.external),

View File

@ -1,15 +1,13 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_http::{Extensions, Request, Response}; use actix_http::{Extensions, Request, Response};
use actix_router::{Path, ResourceDef, ResourceInfo, Router, Url}; use actix_router::{Path, ResourceDef, Router, Url};
use actix_service::boxed::{self, BoxService, BoxServiceFactory}; use actix_service::boxed::{self, BoxService, BoxServiceFactory};
use actix_service::{fn_service, Service, ServiceFactory}; use actix_service::{fn_service, Service, ServiceFactory};
use futures_util::future::{join_all, ok, FutureExt, LocalBoxFuture}; use futures_core::future::LocalBoxFuture;
use futures_util::future::join_all;
use crate::config::{AppConfig, AppService}; use crate::config::{AppConfig, AppService};
use crate::data::{DataFactory, FnDataFactory}; use crate::data::{DataFactory, FnDataFactory};
@ -22,7 +20,6 @@ use crate::service::{AppServiceFactory, ServiceRequest, ServiceResponse};
type Guards = Vec<Box<dyn Guard>>; type Guards = Vec<Box<dyn Guard>>;
type HttpService = BoxService<ServiceRequest, ServiceResponse, Error>; type HttpService = BoxService<ServiceRequest, ServiceResponse, Error>;
type HttpNewService = BoxServiceFactory<(), ServiceRequest, ServiceResponse, Error, ()>; type HttpNewService = BoxServiceFactory<(), ServiceRequest, ServiceResponse, Error, ()>;
type BoxResponse = LocalBoxFuture<'static, Result<ServiceResponse, Error>>;
/// Service factory to convert `Request` to a `ServiceRequest<S>`. /// Service factory to convert `Request` to a `ServiceRequest<S>`.
/// It also executes data factories. /// It also executes data factories.
@ -38,8 +35,8 @@ where
{ {
pub(crate) endpoint: T, pub(crate) endpoint: T,
pub(crate) extensions: RefCell<Option<Extensions>>, pub(crate) extensions: RefCell<Option<Extensions>>,
pub(crate) data: Rc<[Box<dyn DataFactory>]>, pub(crate) data_factories: Rc<[Box<dyn DataFactory>]>,
pub(crate) data_factories: Rc<[FnDataFactory]>, pub(crate) async_data_factories: Rc<[FnDataFactory]>,
pub(crate) services: Rc<RefCell<Vec<Box<dyn AppServiceFactory>>>>, pub(crate) services: Rc<RefCell<Vec<Box<dyn AppServiceFactory>>>>,
pub(crate) default: Option<Rc<HttpNewService>>, pub(crate) default: Option<Rc<HttpNewService>>,
pub(crate) factory_ref: Rc<RefCell<Option<AppRoutingFactory>>>, pub(crate) factory_ref: Rc<RefCell<Option<AppRoutingFactory>>>,
@ -55,24 +52,26 @@ where
Error = Error, Error = Error,
InitError = (), InitError = (),
>, >,
T::Future: 'static,
{ {
type Response = ServiceResponse<B>; type Response = ServiceResponse<B>;
type Error = T::Error; type Error = T::Error;
type Config = AppConfig; type Config = AppConfig;
type Service = AppInitService<T::Service, B>; type Service = AppInitService<T::Service, B>;
type InitError = T::InitError; type InitError = T::InitError;
type Future = AppInitResult<T, B>; type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, config: AppConfig) -> Self::Future { fn new_service(&self, config: AppConfig) -> Self::Future {
// update resource default service // update resource default service
let default = self.default.clone().unwrap_or_else(|| { let default = self.default.clone().unwrap_or_else(|| {
Rc::new(boxed::factory(fn_service(|req: ServiceRequest| { Rc::new(boxed::factory(fn_service(|req: ServiceRequest| async {
ok(req.into_response(Response::NotFound().finish())) Ok(req.into_response(Response::NotFound().finish()))
}))) })))
}); });
// App config // App config
let mut config = AppService::new(config, default.clone(), self.data.clone()); let mut config =
AppService::new(config, default.clone(), self.data_factories.clone());
// register services // register services
std::mem::take(&mut *self.services.borrow_mut()) std::mem::take(&mut *self.services.borrow_mut())
@ -83,7 +82,7 @@ where
let (config, services) = config.into_services(); let (config, services) = config.into_services();
// complete pipeline creation // complete pipeline creation.
*self.factory_ref.borrow_mut() = Some(AppRoutingFactory { *self.factory_ref.borrow_mut() = Some(AppRoutingFactory {
default, default,
services: services services: services
@ -106,107 +105,48 @@ where
let rmap = Rc::new(rmap); let rmap = Rc::new(rmap);
rmap.finish(rmap.clone()); rmap.finish(rmap.clone());
// start all data factory futures // construct all async data factory futures
let factory_futs = join_all(self.data_factories.iter().map(|f| f())); let factory_futs = join_all(self.async_data_factories.iter().map(|f| f()));
AppInitResult { // construct app service and middleware service factory future.
endpoint: None, let endpoint_fut = self.endpoint.new_service(());
endpoint_fut: self.endpoint.new_service(()),
data: self.data.clone(), // take extensions or create new one as app data container.
data_factories: None, let mut app_data = self
data_factories_fut: factory_futs.boxed_local(), .extensions
extensions: Some(
self.extensions
.borrow_mut() .borrow_mut()
.take() .take()
.unwrap_or_else(Extensions::new), .unwrap_or_else(Extensions::new);
),
config,
rmap,
_phantom: PhantomData,
}
}
}
#[pin_project::pin_project] let data_factories = self.data_factories.clone();
pub struct AppInitResult<T, B>
where
T: ServiceFactory<ServiceRequest>,
{
#[pin]
endpoint_fut: T::Future,
// a Some signals completion of endpoint creation
endpoint: Option<T::Service>,
#[pin]
data_factories_fut: LocalBoxFuture<'static, Vec<Result<Box<dyn DataFactory>, ()>>>,
// a Some signals completion of factory futures
data_factories: Option<Vec<Box<dyn DataFactory>>>,
rmap: Rc<ResourceMap>,
config: AppConfig,
data: Rc<[Box<dyn DataFactory>]>,
extensions: Option<Extensions>,
_phantom: PhantomData<B>,
}
impl<T, B> Future for AppInitResult<T, B>
where
T: ServiceFactory<
ServiceRequest,
Config = (),
Response = ServiceResponse<B>,
Error = Error,
InitError = (),
>,
{
type Output = Result<AppInitService<T::Service, B>, ()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
Box::pin(async move {
// async data factories // async data factories
if let Poll::Ready(factories) = this.data_factories_fut.poll(cx) { let async_data_factories = factory_futs
let factories: Result<Vec<_>, ()> = factories.into_iter().collect(); .await
.into_iter()
if let Ok(factories) = factories { .collect::<Result<Vec<_>, _>>()
this.data_factories.replace(factories); .map_err(|_| ())?;
} else {
return Poll::Ready(Err(()));
}
}
// app service and middleware // app service and middleware
if this.endpoint.is_none() { let service = endpoint_fut.await?;
if let Poll::Ready(srv) = this.endpoint_fut.poll(cx)? {
*this.endpoint = Some(srv);
}
}
// not using if let so condition only needs shared ref // populate app data container from (async) data factories.
if this.endpoint.is_some() && this.data_factories.is_some() { data_factories
// create app data container .iter()
let mut data = this.extensions.take().unwrap(); .chain(&async_data_factories)
.for_each(|factory| {
factory.create(&mut app_data);
});
for f in this.data.iter() { Ok(AppInitService {
f.create(&mut data); service,
} rmap,
config,
for f in this.data_factories.take().unwrap().iter() { app_data: Rc::new(app_data),
f.create(&mut data);
}
return Poll::Ready(Ok(AppInitService {
service: this.endpoint.take().unwrap(),
rmap: this.rmap.clone(),
config: this.config.clone(),
data: Rc::new(data),
pool: HttpRequestPool::create(), pool: HttpRequestPool::create(),
})); })
} })
Poll::Pending
} }
} }
@ -218,7 +158,7 @@ where
service: T, service: T,
rmap: Rc<ResourceMap>, rmap: Rc<ResourceMap>,
config: AppConfig, config: AppConfig,
data: Rc<Extensions>, app_data: Rc<Extensions>,
pool: &'static HttpRequestPool, pool: &'static HttpRequestPool,
} }
@ -251,7 +191,7 @@ where
payload, payload,
self.rmap.clone(), self.rmap.clone(),
self.config.clone(), self.config.clone(),
self.data.clone(), self.app_data.clone(),
self.pool, self.pool,
) )
}; };
@ -274,127 +214,60 @@ pub struct AppRoutingFactory {
} }
impl ServiceFactory<ServiceRequest> for AppRoutingFactory { impl ServiceFactory<ServiceRequest> for AppRoutingFactory {
type Config = ();
type Response = ServiceResponse; type Response = ServiceResponse;
type Error = Error; type Error = Error;
type InitError = (); type Config = ();
type Service = AppRouting; type Service = AppRouting;
type Future = AppRoutingFactoryResponse; type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
AppRoutingFactoryResponse { // construct all services factory future with it's resource def and guards.
fut: self let factory_fut =
.services join_all(self.services.iter().map(|(path, factory, guards)| {
.iter() let path = path.clone();
.map(|(path, service, guards)| { let guards = guards.borrow_mut().take();
CreateAppRoutingItem::Future( let factory_fut = factory.new_service(());
Some(path.clone()), async move {
guards.borrow_mut().take(), let service = factory_fut.await?;
service.new_service(()).boxed_local(), Ok((path, guards, service))
)
})
.collect(),
default: None,
default_fut: Some(self.default.new_service(())),
}
}
} }
}));
type HttpServiceFut = LocalBoxFuture<'static, Result<HttpService, ()>>; // construct default service factory future
let default_fut = self.default.new_service(());
/// Create app service Box::pin(async move {
#[doc(hidden)] let default = default_fut.await?;
pub struct AppRoutingFactoryResponse {
fut: Vec<CreateAppRoutingItem>,
default: Option<HttpService>,
default_fut: Option<LocalBoxFuture<'static, Result<HttpService, ()>>>,
}
enum CreateAppRoutingItem { // build router from the factory future result.
Future(Option<ResourceDef>, Option<Guards>, HttpServiceFut), let router = factory_fut
Service(ResourceDef, Option<Guards>, HttpService), .await
} .into_iter()
.collect::<Result<Vec<_>, _>>()?
impl Future for AppRoutingFactoryResponse {
type Output = Result<AppRouting, ()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut done = true;
if let Some(ref mut fut) = self.default_fut {
match Pin::new(fut).poll(cx)? {
Poll::Ready(default) => self.default = Some(default),
Poll::Pending => done = false,
}
}
// poll http services
for item in &mut self.fut {
let res = match item {
CreateAppRoutingItem::Future(
ref mut path,
ref mut guards,
ref mut fut,
) => match Pin::new(fut).poll(cx) {
Poll::Ready(Ok(service)) => {
Some((path.take().unwrap(), guards.take(), service))
}
Poll::Ready(Err(_)) => return Poll::Ready(Err(())),
Poll::Pending => {
done = false;
None
}
},
CreateAppRoutingItem::Service(_, _, _) => continue,
};
if let Some((path, guards, service)) = res {
*item = CreateAppRoutingItem::Service(path, guards, service);
}
}
if done {
let router = self
.fut
.drain(..) .drain(..)
.fold(Router::build(), |mut router, item| { .fold(Router::build(), |mut router, (path, guards, service)| {
match item {
CreateAppRoutingItem::Service(path, guards, service) => {
router.rdef(path, service).2 = guards; router.rdef(path, service).2 = guards;
}
CreateAppRoutingItem::Future(_, _, _) => unreachable!(),
}
router router
}); })
Poll::Ready(Ok(AppRouting { .finish();
ready: None,
router: router.finish(), Ok(AppRouting { router, default })
default: self.default.take(), })
}))
} else {
Poll::Pending
}
} }
} }
pub struct AppRouting { pub struct AppRouting {
router: Router<HttpService, Guards>, router: Router<HttpService, Guards>,
ready: Option<(ServiceRequest, ResourceInfo)>, default: HttpService,
default: Option<HttpService>,
} }
impl Service<ServiceRequest> for AppRouting { impl Service<ServiceRequest> for AppRouting {
type Response = ServiceResponse; type Response = ServiceResponse;
type Error = Error; type Error = Error;
type Future = BoxResponse; type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { actix_service::always_ready!();
if self.ready.is_none() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&mut self, mut req: ServiceRequest) -> Self::Future { fn call(&mut self, mut req: ServiceRequest) -> Self::Future {
let res = self.router.recognize_mut_checked(&mut req, |req, guards| { let res = self.router.recognize_mut_checked(&mut req, |req, guards| {
@ -410,11 +283,8 @@ impl Service<ServiceRequest> for AppRouting {
if let Some((srv, _info)) = res { if let Some((srv, _info)) = res {
srv.call(req) srv.call(req)
} else if let Some(ref mut default) = self.default {
default.call(req)
} else { } else {
let req = req.into_parts().0; self.default.call(req)
ok(ServiceResponse::new(req, Response::NotFound().finish())).boxed_local()
} }
} }
} }
@ -431,12 +301,12 @@ impl AppEntry {
} }
impl ServiceFactory<ServiceRequest> for AppEntry { impl ServiceFactory<ServiceRequest> for AppEntry {
type Config = ();
type Response = ServiceResponse; type Response = ServiceResponse;
type Error = Error; type Error = Error;
type InitError = (); type Config = ();
type Service = AppRouting; type Service = AppRouting;
type Future = AppRoutingFactoryResponse; type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
self.factory.borrow_mut().as_mut().unwrap().new_service(()) self.factory.borrow_mut().as_mut().unwrap().new_service(())
@ -448,9 +318,10 @@ mod tests {
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use actix_service::Service;
use crate::test::{init_service, TestRequest}; use crate::test::{init_service, TestRequest};
use crate::{web, App, HttpResponse}; use crate::{web, App, HttpResponse};
use actix_service::Service;
struct DropData(Arc<AtomicBool>); struct DropData(Arc<AtomicBool>);