From 9e401b6ef71fda227b0eb66e19b3df8b78437f00 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 10 Jan 2021 02:06:49 +0800 Subject: [PATCH 1/4] refactor Scope (#1895) --- src/app_service.rs | 3 +- src/scope.rs | 215 +++++++++++++++------------------------------ 2 files changed, 71 insertions(+), 147 deletions(-) diff --git a/src/app_service.rs b/src/app_service.rs index 442de9362..3cfd84d5c 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -62,7 +62,8 @@ where type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, config: AppConfig) -> Self::Future { - // update resource default service + // set AppService's default service to 404 NotFound + // if no user defined default service exists. let default = self.default.clone().unwrap_or_else(|| { Rc::new(boxed::factory(fn_service(|req: ServiceRequest| async { Ok(req.into_response(Response::NotFound().finish())) diff --git a/src/scope.rs b/src/scope.rs index 419e572aa..2da4f5546 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -1,18 +1,18 @@ use std::cell::RefCell; use std::fmt; use std::future::Future; -use std::pin::Pin; use std::rc::Rc; -use std::task::{Context, Poll}; +use std::task::Poll; -use actix_http::{Extensions, Response}; -use actix_router::{ResourceDef, ResourceInfo, Router}; +use actix_http::Extensions; +use actix_router::{ResourceDef, Router}; use actix_service::boxed::{self, BoxService, BoxServiceFactory}; use actix_service::{ apply, apply_fn_factory, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt, Transform, }; use futures_core::future::LocalBoxFuture; +use futures_util::future::join_all; use crate::config::ServiceConfig; use crate::data::Data; @@ -61,10 +61,10 @@ type HttpNewService = BoxServiceFactory<(), ServiceRequest, ServiceResponse, Err pub struct Scope { endpoint: T, rdef: String, - data: Option, + app_data: Option, services: Vec>, guards: Vec>, - default: Rc>>>, + default: Option>, external: Vec, factory_ref: Rc>>, } @@ -76,10 +76,10 @@ impl Scope { Scope { endpoint: ScopeEndpoint::new(fref.clone()), rdef: path.to_string(), - data: None, + app_data: None, guards: Vec::new(), services: Vec::new(), - default: Rc::new(RefCell::new(None)), + default: None, external: Vec::new(), factory_ref: fref, } @@ -155,10 +155,10 @@ where /// /// Data of different types from parent contexts will still be accessible. pub fn app_data(mut self, data: U) -> Self { - if self.data.is_none() { - self.data = Some(Extensions::new()); + if self.app_data.is_none() { + self.app_data = Some(Extensions::new()); } - self.data.as_mut().unwrap().insert(data); + self.app_data.as_mut().unwrap().insert(data); self } @@ -201,15 +201,15 @@ where self.external.extend(cfg.external); if !cfg.data.is_empty() { - let mut data = self.data.unwrap_or_else(Extensions::new); + let mut data = self.app_data.unwrap_or_else(Extensions::new); for value in cfg.data.iter() { value.create(&mut data); } - self.data = Some(data); + self.app_data = Some(data); } - self.data + self.app_data .get_or_insert_with(Extensions::new) .extend(cfg.extensions); self @@ -295,11 +295,9 @@ where U::InitError: fmt::Debug, { // create and configure default resource - self.default = Rc::new(RefCell::new(Some(Rc::new(boxed::factory( - f.into_factory().map_init_err(|e| { - log::error!("Can not construct default service: {:?}", e) - }), - ))))); + self.default = Some(Rc::new(boxed::factory(f.into_factory().map_init_err( + |e| log::error!("Can not construct default service: {:?}", e), + )))); self } @@ -337,7 +335,7 @@ where Scope { endpoint: apply(mw, self.endpoint), rdef: self.rdef, - data: self.data, + app_data: self.app_data, guards: self.guards, services: self.services, default: self.default, @@ -397,7 +395,7 @@ where Scope { endpoint: apply_fn_factory(self.endpoint, mw), rdef: self.rdef, - data: self.data, + app_data: self.app_data, guards: self.guards, services: self.services, default: self.default, @@ -419,9 +417,7 @@ where { fn register(mut self, config: &mut AppService) { // update default resource if needed - if self.default.borrow().is_none() { - *self.default.borrow_mut() = Some(config.default_service()); - } + let default = self.default.unwrap_or_else(|| config.default_service()); // register nested services let mut cfg = config.clone_config(); @@ -437,14 +433,14 @@ where } // custom app data storage - if let Some(ref mut ext) = self.data { + if let Some(ref mut ext) = self.app_data { config.set_service_data(ext); } // complete scope pipeline creation *self.factory_ref.borrow_mut() = Some(ScopeFactory { - data: self.data.take().map(Rc::new), - default: self.default.clone(), + app_data: self.app_data.take().map(Rc::new), + default, services: cfg .into_services() .1 @@ -476,129 +472,65 @@ where } pub struct ScopeFactory { - data: Option>, + app_data: Option>, services: Rc<[(ResourceDef, HttpNewService, RefCell>)]>, - default: Rc>>>, + default: Rc, } impl ServiceFactory for ScopeFactory { - type Config = (); type Response = ServiceResponse; type Error = Error; - type InitError = (); + type Config = (); type Service = ScopeService; - type Future = ScopeFactoryResponse; + type InitError = (); + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - let default_fut = if let Some(ref default) = *self.default.borrow() { - Some(default.new_service(())) - } else { - None - }; + // construct default service factory future + let default_fut = self.default.new_service(()); - ScopeFactoryResponse { - fut: self - .services - .iter() - .map(|(path, service, guards)| { - CreateScopeServiceItem::Future( - Some(path.clone()), - guards.borrow_mut().take(), - service.new_service(()), - ) - }) - .collect(), - default: None, - data: self.data.clone(), - default_fut, - } - } -} + // construct all services factory future with it's resource def and guards. + let factory_fut = + join_all(self.services.iter().map(|(path, factory, guards)| { + let path = path.clone(); + let guards = guards.borrow_mut().take(); + let factory_fut = factory.new_service(()); + async move { + let service = factory_fut.await?; + Ok((path, guards, service)) + } + })); -/// Create scope service -#[doc(hidden)] -#[pin_project::pin_project] -pub struct ScopeFactoryResponse { - fut: Vec, - data: Option>, - default: Option, - default_fut: Option>>, -} + let app_data = self.app_data.clone(); -type HttpServiceFut = LocalBoxFuture<'static, Result>; + Box::pin(async move { + let default = default_fut.await?; -enum CreateScopeServiceItem { - Future(Option, Option, HttpServiceFut), - Service(ResourceDef, Option, HttpService), -} - -impl Future for ScopeFactoryResponse { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut done = true; - - if let Some(ref mut fut) = self.default_fut { - match Pin::new(fut).poll(cx)? { - Poll::Ready(default) => self.default = Some(default), - Poll::Pending => done = false, - } - } - - // poll http services - for item in &mut self.fut { - let res = match item { - CreateScopeServiceItem::Future( - ref mut path, - ref mut guards, - ref mut fut, - ) => match Pin::new(fut).poll(cx)? { - Poll::Ready(service) => { - Some((path.take().unwrap(), guards.take(), service)) - } - Poll::Pending => { - done = false; - None - } - }, - CreateScopeServiceItem::Service(_, _, _) => continue, - }; - - if let Some((path, guards, service)) = res { - *item = CreateScopeServiceItem::Service(path, guards, service); - } - } - - if done { - let router = self - .fut + // build router from the factory future result. + let router = factory_fut + .await + .into_iter() + .collect::, _>>()? .drain(..) - .fold(Router::build(), |mut router, item| { - match item { - CreateScopeServiceItem::Service(path, guards, service) => { - router.rdef(path, service).2 = guards; - } - CreateScopeServiceItem::Future(_, _, _) => unreachable!(), - } + .fold(Router::build(), |mut router, (path, guards, service)| { + router.rdef(path, service).2 = guards; router - }); - Poll::Ready(Ok(ScopeService { - data: self.data.clone(), - router: router.finish(), - default: self.default.take(), - _ready: None, - })) - } else { - Poll::Pending - } + }) + .finish(); + + Ok(ScopeService { + app_data, + router, + default, + }) + }) } } pub struct ScopeService { - data: Option>, + app_data: Option>, router: Router>>, - default: Option, - _ready: Option<(ServiceRequest, ResourceInfo)>, + default: HttpService, } impl Service for ScopeService { @@ -606,9 +538,7 @@ impl Service for ScopeService { type Error = Error; type Future = LocalBoxFuture<'static, Result>; - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + actix_service::always_ready!(); fn call(&mut self, mut req: ServiceRequest) -> Self::Future { let res = self.router.recognize_mut_checked(&mut req, |req, guards| { @@ -622,21 +552,14 @@ impl Service for ScopeService { true }); + if let Some(ref app_data) = self.app_data { + req.add_data_container(app_data.clone()); + } + if let Some((srv, _info)) = res { - if let Some(ref data) = self.data { - req.add_data_container(data.clone()); - } srv.call(req) - } else if let Some(ref mut default) = self.default { - if let Some(ref data) = self.data { - req.add_data_container(data.clone()); - } - default.call(req) } else { - let req = req.into_parts().0; - Box::pin(async { - Ok(ServiceResponse::new(req, Response::NotFound().finish())) - }) + self.default.call(req) } } } @@ -658,7 +581,7 @@ impl ServiceFactory for ScopeEndpoint { type Config = (); type Service = ScopeService; type InitError = (); - type Future = ScopeFactoryResponse; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { self.factory.borrow_mut().as_mut().unwrap().new_service(()) From 46b2f7eaaf602d9e30e086b7da18c089564caf0d Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 11 Jan 2021 06:59:44 +0800 Subject: [PATCH 2/4] use a non leak pool for HttpRequestInner (#1889) Co-authored-by: Rob Ede --- src/app_service.rs | 48 +++++++++++++++++----- src/config.rs | 12 +++--- src/request.rs | 100 +++++++++++++++++++++++++++++++++------------ src/server.rs | 36 +++++++--------- src/test.rs | 29 ++++++------- 5 files changed, 144 insertions(+), 81 deletions(-) diff --git a/src/app_service.rs b/src/app_service.rs index 3cfd84d5c..c4ac0b094 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -142,10 +142,8 @@ where Ok(AppInitService { service, - rmap, - config, app_data: Rc::new(app_data), - pool: HttpRequestPool::create(), + app_state: AppInitServiceState::new(rmap, config), }) }) } @@ -157,10 +155,42 @@ where T: Service, Error = Error>, { service: T, + app_data: Rc, + app_state: Rc, +} + +// a collection of AppInitService state that shared between HttpRequests. +pub(crate) struct AppInitServiceState { rmap: Rc, config: AppConfig, - app_data: Rc, - pool: &'static HttpRequestPool, + pool: HttpRequestPool, +} + +impl AppInitServiceState { + pub(crate) fn new(rmap: Rc, config: AppConfig) -> Rc { + Rc::new(AppInitServiceState { + rmap, + config, + // TODO: AppConfig can be used to pass user defined HttpRequestPool + // capacity. + pool: HttpRequestPool::default(), + }) + } + + #[inline] + pub(crate) fn rmap(&self) -> &ResourceMap { + &*self.rmap + } + + #[inline] + pub(crate) fn config(&self) -> &AppConfig { + &self.config + } + + #[inline] + pub(crate) fn pool(&self) -> &HttpRequestPool { + &self.pool + } } impl Service for AppInitService @@ -178,7 +208,7 @@ where fn call(&mut self, req: Request) -> Self::Future { let (head, payload) = req.into_parts(); - let req = if let Some(mut req) = self.pool.get_request() { + let req = if let Some(mut req) = self.app_state.pool().pop() { let inner = Rc::get_mut(&mut req.inner).unwrap(); inner.path.get_mut().update(&head.uri); inner.path.reset(); @@ -190,10 +220,8 @@ where Path::new(Url::new(head.uri.clone())), head, payload, - self.rmap.clone(), - self.config.clone(), + self.app_state.clone(), self.app_data.clone(), - self.pool, ) }; self.service.call(ServiceRequest::new(req)) @@ -205,7 +233,7 @@ where T: Service, Error = Error>, { fn drop(&mut self) { - self.pool.clear(); + self.app_state.pool().clear(); } } diff --git a/src/config.rs b/src/config.rs index 4ec36952a..2b93ae892 100644 --- a/src/config.rs +++ b/src/config.rs @@ -125,9 +125,7 @@ impl AppService { /// Application connection config #[derive(Clone)] -pub struct AppConfig(Rc); - -struct AppConfigInner { +pub struct AppConfig { secure: bool, host: String, addr: SocketAddr, @@ -135,7 +133,7 @@ struct AppConfigInner { impl AppConfig { pub(crate) fn new(secure: bool, addr: SocketAddr, host: String) -> Self { - AppConfig(Rc::new(AppConfigInner { secure, addr, host })) + AppConfig { secure, addr, host } } /// Server host name. @@ -146,17 +144,17 @@ impl AppConfig { /// /// By default host name is set to a "localhost" value. pub fn host(&self) -> &str { - &self.0.host + &self.host } /// Returns true if connection is secure(https) pub fn secure(&self) -> bool { - self.0.secure + self.secure } /// Returns the socket address of the local half of this TCP connection pub fn local_addr(&self) -> SocketAddr { - self.0.addr + self.addr } } diff --git a/src/request.rs b/src/request.rs index f8160ae47..437d07b6e 100644 --- a/src/request.rs +++ b/src/request.rs @@ -8,6 +8,7 @@ use actix_router::{Path, Url}; use futures_util::future::{ok, Ready}; use smallvec::SmallVec; +use crate::app_service::AppInitServiceState; use crate::config::AppConfig; use crate::error::UrlGenerationError; use crate::extract::FromRequest; @@ -29,9 +30,7 @@ pub(crate) struct HttpRequestInner { pub(crate) path: Path, pub(crate) payload: Payload, pub(crate) app_data: SmallVec<[Rc; 4]>, - rmap: Rc, - config: AppConfig, - pool: &'static HttpRequestPool, + app_state: Rc, } impl HttpRequest { @@ -40,10 +39,8 @@ impl HttpRequest { path: Path, head: Message, payload: Payload, - rmap: Rc, - config: AppConfig, + app_state: Rc, app_data: Rc, - pool: &'static HttpRequestPool, ) -> HttpRequest { let mut data = SmallVec::<[Rc; 4]>::new(); data.push(app_data); @@ -53,10 +50,8 @@ impl HttpRequest { head, path, payload, - rmap, - config, + app_state, app_data: data, - pool, }), } } @@ -142,7 +137,7 @@ impl HttpRequest { /// Returns a None when no resource is fully matched, including default services. #[inline] pub fn match_pattern(&self) -> Option { - self.inner.rmap.match_pattern(self.path()) + self.resource_map().match_pattern(self.path()) } /// The resource name that matched the path. Useful for logging and metrics. @@ -150,7 +145,7 @@ impl HttpRequest { /// Returns a None when no resource is fully matched, including default services. #[inline] pub fn match_name(&self) -> Option<&str> { - self.inner.rmap.match_name(self.path()) + self.resource_map().match_name(self.path()) } /// Request extensions @@ -192,7 +187,7 @@ impl HttpRequest { U: IntoIterator, I: AsRef, { - self.inner.rmap.url_for(&self, name, elements) + self.resource_map().url_for(&self, name, elements) } /// Generate url for named resource @@ -207,7 +202,7 @@ impl HttpRequest { #[inline] /// Get a reference to a `ResourceMap` of current application. pub fn resource_map(&self) -> &ResourceMap { - &self.inner.rmap + &self.app_state().rmap() } /// Peer socket address @@ -227,13 +222,13 @@ impl HttpRequest { /// borrowed. #[inline] pub fn connection_info(&self) -> Ref<'_, ConnectionInfo> { - ConnectionInfo::get(self.head(), &*self.app_config()) + ConnectionInfo::get(self.head(), self.app_config()) } /// App config #[inline] pub fn app_config(&self) -> &AppConfig { - &self.inner.config + self.app_state().config() } /// Get an application data object stored with `App::data` or `App::app_data` @@ -253,6 +248,11 @@ impl HttpRequest { None } + + #[inline] + fn app_state(&self) -> &AppInitServiceState { + &*self.inner.app_state + } } impl HttpMessage for HttpRequest { @@ -288,14 +288,16 @@ impl Drop for HttpRequest { // This relies on no Weak exists anywhere.(There is none) if let Some(inner) = Rc::get_mut(&mut self.inner) { - let v = &mut inner.pool.0.borrow_mut(); - if v.len() < 128 { + if inner.app_state.pool().is_available() { // clear additional app_data and keep the root one for reuse. inner.app_data.truncate(1); // inner is borrowed mut here. get head's Extension mutably // to reduce borrow check inner.head.extensions.get_mut().clear(); - v.push(self.inner.clone()); + + // a re-borrow of pool is necessary here. + let req = self.inner.clone(); + self.app_state().pool().push(req); } } } @@ -364,25 +366,50 @@ impl fmt::Debug for HttpRequest { /// Request objects are added when they are dropped (see `::drop`) and re-used /// in `::call` when there are available objects in the list. /// -/// The pool's initial capacity is 128 items. -pub(crate) struct HttpRequestPool(RefCell>>); +/// The pool's default capacity is 128 items. +pub(crate) struct HttpRequestPool { + inner: RefCell>>, + cap: usize, +} + +impl Default for HttpRequestPool { + fn default() -> Self { + Self::with_capacity(128) + } +} impl HttpRequestPool { - /// Allocates a slab of memory for pool use. - pub(crate) fn create() -> &'static HttpRequestPool { - let pool = HttpRequestPool(RefCell::new(Vec::with_capacity(128))); - Box::leak(Box::new(pool)) + pub(crate) fn with_capacity(cap: usize) -> Self { + HttpRequestPool { + inner: RefCell::new(Vec::with_capacity(cap)), + cap, + } } /// Re-use a previously allocated (but now completed/discarded) HttpRequest object. #[inline] - pub(crate) fn get_request(&self) -> Option { - self.0.borrow_mut().pop().map(|inner| HttpRequest { inner }) + pub(crate) fn pop(&self) -> Option { + self.inner + .borrow_mut() + .pop() + .map(|inner| HttpRequest { inner }) + } + + /// Check if the pool still has capacity for request storage. + #[inline] + pub(crate) fn is_available(&self) -> bool { + self.inner.borrow_mut().len() < self.cap + } + + /// Push a request to pool. + #[inline] + pub(crate) fn push(&self, req: Rc) { + self.inner.borrow_mut().push(req); } /// Clears all allocated HttpRequest objects. pub(crate) fn clear(&self) { - self.0.borrow_mut().clear() + self.inner.borrow_mut().clear() } } @@ -528,6 +555,25 @@ mod tests { ); } + #[actix_rt::test] + async fn test_drop_http_request_pool() { + let mut srv = init_service(App::new().service(web::resource("/").to( + |req: HttpRequest| { + HttpResponse::Ok() + .set_header("pool_cap", req.app_state().pool().cap) + .finish() + }, + ))) + .await; + + let req = TestRequest::default().to_request(); + let resp = call_service(&mut srv, req).await; + + drop(srv); + + assert_eq!(resp.headers().get("pool_cap").unwrap(), "128"); + } + #[actix_rt::test] async fn test_data() { let mut srv = init_service(App::new().app_data(10usize).service( diff --git a/src/server.rs b/src/server.rs index 26089ccba..8bfb27b77 100644 --- a/src/server.rs +++ b/src/server.rs @@ -283,11 +283,7 @@ where lst, move || { let c = cfg.lock().unwrap(); - let cfg = AppConfig::new( - false, - addr, - c.host.clone().unwrap_or_else(|| format!("{}", addr)), - ); + let host = c.host.clone().unwrap_or_else(|| format!("{}", addr)); let svc = HttpService::build() .keep_alive(c.keep_alive) @@ -302,8 +298,10 @@ where svc }; - svc.finish(map_config(factory(), move |_| cfg.clone())) - .tcp() + svc.finish(map_config(factory(), move |_| { + AppConfig::new(false, addr, host.clone()) + })) + .tcp() }, )?; Ok(self) @@ -342,11 +340,7 @@ where lst, move || { let c = cfg.lock().unwrap(); - let cfg = AppConfig::new( - true, - addr, - c.host.clone().unwrap_or_else(|| format!("{}", addr)), - ); + let host = c.host.clone().unwrap_or_else(|| format!("{}", addr)); let svc = HttpService::build() .keep_alive(c.keep_alive) @@ -361,8 +355,10 @@ where svc }; - svc.finish(map_config(factory(), move |_| cfg.clone())) - .openssl(acceptor.clone()) + svc.finish(map_config(factory(), move |_| { + AppConfig::new(true, addr, host.clone()) + })) + .openssl(acceptor.clone()) }, )?; Ok(self) @@ -401,11 +397,7 @@ where lst, move || { let c = cfg.lock().unwrap(); - let cfg = AppConfig::new( - true, - addr, - c.host.clone().unwrap_or_else(|| format!("{}", addr)), - ); + let host = c.host.clone().unwrap_or_else(|| format!("{}", addr)); let svc = HttpService::build() .keep_alive(c.keep_alive) @@ -420,8 +412,10 @@ where svc }; - svc.finish(map_config(factory(), move |_| cfg.clone())) - .rustls(config.clone()) + svc.finish(map_config(factory(), move |_| { + AppConfig::new(true, addr, host.clone()) + })) + .rustls(config.clone()) }, )?; Ok(self) diff --git a/src/test.rs b/src/test.rs index a76bae6a6..271ed4505 100644 --- a/src/test.rs +++ b/src/test.rs @@ -27,10 +27,10 @@ use socket2::{Domain, Protocol, Socket, Type}; pub use actix_http::test::TestBuffer; +use crate::app_service::AppInitServiceState; use crate::config::AppConfig; use crate::data::Data; use crate::dev::{Body, MessageBody, Payload, Server}; -use crate::request::HttpRequestPool; use crate::rmap::ResourceMap; use crate::service::{ServiceRequest, ServiceResponse}; use crate::{Error, HttpRequest, HttpResponse}; @@ -542,14 +542,15 @@ impl TestRequest { head.peer_addr = self.peer_addr; self.path.get_mut().update(&head.uri); + let app_state = + AppInitServiceState::new(Rc::new(self.rmap), self.config.clone()); + ServiceRequest::new(HttpRequest::new( self.path, head, payload, - Rc::new(self.rmap), - self.config.clone(), + app_state, Rc::new(self.app_data), - HttpRequestPool::create(), )) } @@ -564,15 +565,10 @@ impl TestRequest { head.peer_addr = self.peer_addr; self.path.get_mut().update(&head.uri); - HttpRequest::new( - self.path, - head, - payload, - Rc::new(self.rmap), - self.config.clone(), - Rc::new(self.app_data), - HttpRequestPool::create(), - ) + let app_state = + AppInitServiceState::new(Rc::new(self.rmap), self.config.clone()); + + HttpRequest::new(self.path, head, payload, app_state, Rc::new(self.app_data)) } /// Complete request creation and generate `HttpRequest` and `Payload` instances @@ -581,14 +577,15 @@ impl TestRequest { head.peer_addr = self.peer_addr; self.path.get_mut().update(&head.uri); + let app_state = + AppInitServiceState::new(Rc::new(self.rmap), self.config.clone()); + let req = HttpRequest::new( self.path, head, Payload::None, - Rc::new(self.rmap), - self.config.clone(), + app_state, Rc::new(self.app_data), - HttpRequestPool::create(), ); (req, payload) From 7affc6878e5d9346bf3f632c8594694b6be3d539 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 11 Jan 2021 08:13:56 +0800 Subject: [PATCH 3/4] simplify h1 dispatcher (#1899) Co-authored-by: Rob Ede --- actix-http/src/h1/dispatcher.rs | 53 +++++++++------------------------ 1 file changed, 14 insertions(+), 39 deletions(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 8ef96fbef..feea7f34a 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -287,42 +287,35 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { - if self.write_buf.is_empty() { + let len = self.write_buf.len(); + if len == 0 { return Ok(false); } - let len = self.write_buf.len(); - let mut written = 0; let InnerDispatcherProj { io, write_buf, .. } = self.project(); let mut io = Pin::new(io.as_mut().unwrap()); + + let mut written = 0; while written < len { match io.as_mut().poll_write(cx, &write_buf[written..]) { Poll::Ready(Ok(0)) => { return Err(DispatchError::Io(io::Error::new( io::ErrorKind::WriteZero, "", - ))); - } - Poll::Ready(Ok(n)) => { - written += n; + ))) } + Poll::Ready(Ok(n)) => written += n, Poll::Pending => { - if written > 0 { - write_buf.advance(written); - } + write_buf.advance(written); return Ok(true); } Poll::Ready(Err(err)) => return Err(DispatchError::Io(err)), } } - if written == write_buf.len() { - // SAFETY: setting length to 0 is safe - // skips one length check vs truncate - unsafe { write_buf.set_len(0) } - } else { - write_buf.advance(written); - } + // SAFETY: setting length to 0 is safe + // skips one length check vs truncate + unsafe { write_buf.set_len(0) } Ok(false) } @@ -766,19 +759,12 @@ where } else { // flush buffer inner.as_mut().poll_flush(cx)?; - if !inner.write_buf.is_empty() || inner.io.is_none() { + if !inner.write_buf.is_empty() { Poll::Pending } else { - match Pin::new(inner.project().io) - .as_pin_mut() - .unwrap() + Pin::new(inner.project().io.as_mut().unwrap()) .poll_shutdown(cx) - { - Poll::Ready(res) => { - Poll::Ready(res.map_err(DispatchError::from)) - } - Poll::Pending => Poll::Pending, - } + .map_err(DispatchError::from) } } } else { @@ -920,7 +906,7 @@ where buf.reserve(HW_BUFFER_SIZE - remaining); } - match read(cx, io, buf) { + match actix_codec::poll_read_buf(Pin::new(io), cx, buf) { Poll::Pending => { return if read_some { Ok(Some(false)) } else { Ok(None) }; } @@ -948,17 +934,6 @@ where } } -fn read( - cx: &mut Context<'_>, - io: &mut T, - buf: &mut BytesMut, -) -> Poll> -where - T: AsyncRead + Unpin, -{ - actix_codec::poll_read_buf(Pin::new(io), cx, buf) -} - #[cfg(test)] mod tests { use std::str; From 57398c6df190ab0149070f0eaacd735720d12c1c Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Mon, 11 Jan 2021 09:29:16 +0800 Subject: [PATCH 4/4] Refactor/service request (#1893) --- CHANGES.md | 4 ++ src/app_service.rs | 10 ++--- src/request.rs | 3 -- src/service.rs | 99 +++++++++++++++------------------------------- src/test.rs | 21 +++------- 5 files changed, 45 insertions(+), 92 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 70f7705c8..00608df76 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,8 @@ ### Changed * Rework `Responder` trait to be sync and returns `Response`/`HttpResponse` directly. Making it more simple and performant. [#1891] +* `ServiceRequest::into_parts` and `ServiceRequest::from_parts` would not fail. + `ServiceRequest::from_request` would not fail and no payload would be generated [#1893] * Our `Either` type now uses `Left`/`Right` variants (instead of `A`/`B`) [#1894] ### Removed @@ -15,8 +17,10 @@ * Public field of `web::Query` has been made private. [#1894] [#1891]: https://github.com/actix/actix-web/pull/1891 +[#1893]: https://github.com/actix/actix-web/pull/1893 [#1894]: https://github.com/actix/actix-web/pull/1894 + ## 4.0.0-beta.1 - 2021-01-07 ### Added * `Compat` middleware enabling generic response body/error type of middlewares like `Logger` and diff --git a/src/app_service.rs b/src/app_service.rs index c4ac0b094..8169be517 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -1,6 +1,6 @@ use std::cell::RefCell; use std::rc::Rc; -use std::task::{Context, Poll}; +use std::task::Poll; use actix_http::{Extensions, Request, Response}; use actix_router::{Path, ResourceDef, Router, Url}; @@ -201,9 +201,7 @@ where type Error = T::Error; type Future = T::Future; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.service.poll_ready(cx) - } + actix_service::forward_ready!(service); fn call(&mut self, req: Request) -> Self::Future { let (head, payload) = req.into_parts(); @@ -213,18 +211,16 @@ where inner.path.get_mut().update(&head.uri); inner.path.reset(); inner.head = head; - inner.payload = payload; req } else { HttpRequest::new( Path::new(Url::new(head.uri.clone())), head, - payload, self.app_state.clone(), self.app_data.clone(), ) }; - self.service.call(ServiceRequest::new(req)) + self.service.call(ServiceRequest::new(req, payload)) } } diff --git a/src/request.rs b/src/request.rs index 437d07b6e..c0e26006c 100644 --- a/src/request.rs +++ b/src/request.rs @@ -28,7 +28,6 @@ pub struct HttpRequest { pub(crate) struct HttpRequestInner { pub(crate) head: Message, pub(crate) path: Path, - pub(crate) payload: Payload, pub(crate) app_data: SmallVec<[Rc; 4]>, app_state: Rc, } @@ -38,7 +37,6 @@ impl HttpRequest { pub(crate) fn new( path: Path, head: Message, - payload: Payload, app_state: Rc, app_data: Rc, ) -> HttpRequest { @@ -49,7 +47,6 @@ impl HttpRequest { inner: Rc::new(HttpRequestInner { head, path, - payload, app_state, app_data: data, }), diff --git a/src/service.rs b/src/service.rs index c6a961efc..668b7d1b4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -52,75 +52,61 @@ where /// An service http request /// /// ServiceRequest allows mutable access to request's internal structures -pub struct ServiceRequest(HttpRequest); +pub struct ServiceRequest { + req: HttpRequest, + payload: Payload, +} impl ServiceRequest { /// Construct service request - pub(crate) fn new(req: HttpRequest) -> Self { - ServiceRequest(req) + pub(crate) fn new(req: HttpRequest, payload: Payload) -> Self { + Self { req, payload } } /// Deconstruct request into parts - pub fn into_parts(mut self) -> (HttpRequest, Payload) { - let pl = Rc::get_mut(&mut (self.0).inner).unwrap().payload.take(); - (self.0, pl) + #[inline] + pub fn into_parts(self) -> (HttpRequest, Payload) { + (self.req, self.payload) } /// Construct request from parts. - /// - /// `ServiceRequest` can be re-constructed only if `req` hasn't been cloned. - pub fn from_parts( - mut req: HttpRequest, - pl: Payload, - ) -> Result { - match Rc::get_mut(&mut req.inner) { - Some(p) => { - p.payload = pl; - Ok(ServiceRequest(req)) - } - None => Err((req, pl)), - } + pub fn from_parts(req: HttpRequest, payload: Payload) -> Self { + Self { req, payload } } /// Construct request from request. /// - /// `HttpRequest` implements `Clone` trait via `Rc` type. `ServiceRequest` - /// can be re-constructed only if rc's strong pointers count eq 1 and - /// weak pointers count is 0. - pub fn from_request(req: HttpRequest) -> Result { - // There is no weak pointer used on HttpRequest so intentionally - // ignore the check. - if Rc::strong_count(&req.inner) == 1 { - debug_assert!(Rc::weak_count(&req.inner) == 0); - Ok(ServiceRequest(req)) - } else { - Err(req) + /// The returned `ServiceRequest` would have no payload. + pub fn from_request(req: HttpRequest) -> Self { + ServiceRequest { + req, + payload: Payload::None, } } /// Create service response #[inline] pub fn into_response>>(self, res: R) -> ServiceResponse { - ServiceResponse::new(self.0, res.into()) + ServiceResponse::new(self.req, res.into()) } /// Create service response for error #[inline] pub fn error_response>(self, err: E) -> ServiceResponse { let res: Response = err.into().into(); - ServiceResponse::new(self.0, res.into_body()) + ServiceResponse::new(self.req, res.into_body()) } /// This method returns reference to the request head #[inline] pub fn head(&self) -> &RequestHead { - &self.0.head() + &self.req.head() } /// This method returns reference to the request head #[inline] pub fn head_mut(&mut self) -> &mut RequestHead { - self.0.head_mut() + self.req.head_mut() } /// Request's uri. @@ -196,42 +182,42 @@ impl ServiceRequest { /// access the matched value for that segment. #[inline] pub fn match_info(&self) -> &Path { - self.0.match_info() + self.req.match_info() } /// Counterpart to [`HttpRequest::match_name`](super::HttpRequest::match_name()). #[inline] pub fn match_name(&self) -> Option<&str> { - self.0.match_name() + self.req.match_name() } /// Counterpart to [`HttpRequest::match_pattern`](super::HttpRequest::match_pattern()). #[inline] pub fn match_pattern(&self) -> Option { - self.0.match_pattern() + self.req.match_pattern() } #[inline] /// Get a mutable reference to the Path parameters. pub fn match_info_mut(&mut self) -> &mut Path { - self.0.match_info_mut() + self.req.match_info_mut() } #[inline] /// Get a reference to a `ResourceMap` of current application. pub fn resource_map(&self) -> &ResourceMap { - self.0.resource_map() + self.req.resource_map() } /// Service configuration #[inline] pub fn app_config(&self) -> &AppConfig { - self.0.app_config() + self.req.app_config() } /// Counterpart to [`HttpRequest::app_data`](super::HttpRequest::app_data()). pub fn app_data(&self) -> Option<&T> { - for container in (self.0).inner.app_data.iter().rev() { + for container in self.req.inner.app_data.iter().rev() { if let Some(data) = container.get::() { return Some(data); } @@ -242,13 +228,13 @@ impl ServiceRequest { /// Set request payload. pub fn set_payload(&mut self, payload: Payload) { - Rc::get_mut(&mut (self.0).inner).unwrap().payload = payload; + self.payload = payload; } #[doc(hidden)] /// Add app data container to request's resolution set. pub fn add_data_container(&mut self, extensions: Rc) { - Rc::get_mut(&mut (self.0).inner) + Rc::get_mut(&mut (self.req).inner) .unwrap() .app_data .push(extensions); @@ -273,18 +259,18 @@ impl HttpMessage for ServiceRequest { /// Request extensions #[inline] fn extensions(&self) -> Ref<'_, Extensions> { - self.0.extensions() + self.req.extensions() } /// Mutable reference to a the request's extensions #[inline] fn extensions_mut(&self) -> RefMut<'_, Extensions> { - self.0.extensions_mut() + self.req.extensions_mut() } #[inline] fn take_payload(&mut self) -> Payload { - Rc::get_mut(&mut (self.0).inner).unwrap().payload.take() + self.payload.take() } } @@ -552,27 +538,6 @@ mod tests { use actix_service::Service; use futures_util::future::ok; - #[test] - fn test_service_request() { - let req = TestRequest::default().to_srv_request(); - let (r, pl) = req.into_parts(); - assert!(ServiceRequest::from_parts(r, pl).is_ok()); - - let req = TestRequest::default().to_srv_request(); - let (r, pl) = req.into_parts(); - let _r2 = r.clone(); - assert!(ServiceRequest::from_parts(r, pl).is_err()); - - let req = TestRequest::default().to_srv_request(); - let (r, _pl) = req.into_parts(); - assert!(ServiceRequest::from_request(r).is_ok()); - - let req = TestRequest::default().to_srv_request(); - let (r, _pl) = req.into_parts(); - let _r2 = r.clone(); - assert!(ServiceRequest::from_request(r).is_err()); - } - #[actix_rt::test] async fn test_service() { let mut srv = init_service( diff --git a/src/test.rs b/src/test.rs index 271ed4505..f8b789d1b 100644 --- a/src/test.rs +++ b/src/test.rs @@ -545,13 +545,10 @@ impl TestRequest { let app_state = AppInitServiceState::new(Rc::new(self.rmap), self.config.clone()); - ServiceRequest::new(HttpRequest::new( - self.path, - head, + ServiceRequest::new( + HttpRequest::new(self.path, head, app_state, Rc::new(self.app_data)), payload, - app_state, - Rc::new(self.app_data), - )) + ) } /// Complete request creation and generate `ServiceResponse` instance @@ -561,14 +558,14 @@ impl TestRequest { /// Complete request creation and generate `HttpRequest` instance pub fn to_http_request(mut self) -> HttpRequest { - let (mut head, payload) = self.req.finish().into_parts(); + let (mut head, _) = self.req.finish().into_parts(); head.peer_addr = self.peer_addr; self.path.get_mut().update(&head.uri); let app_state = AppInitServiceState::new(Rc::new(self.rmap), self.config.clone()); - HttpRequest::new(self.path, head, payload, app_state, Rc::new(self.app_data)) + HttpRequest::new(self.path, head, app_state, Rc::new(self.app_data)) } /// Complete request creation and generate `HttpRequest` and `Payload` instances @@ -580,13 +577,7 @@ impl TestRequest { let app_state = AppInitServiceState::new(Rc::new(self.rmap), self.config.clone()); - let req = HttpRequest::new( - self.path, - head, - Payload::None, - app_state, - Rc::new(self.app_data), - ); + let req = HttpRequest::new(self.path, head, app_state, Rc::new(self.app_data)); (req, payload) }