diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 4427174ec..bb89905fb 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -8,7 +8,7 @@ use bytes::{BufMut, BytesMut}; use crate::body::BodySize; use crate::config::ServiceConfig; -use crate::header::map; +use crate::header::{map::Value, HeaderName}; use crate::helpers; use crate::http::header::{CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use crate::http::{HeaderMap, StatusCode, Version}; @@ -121,21 +121,11 @@ pub(crate) trait MessageType: Sized { _ => {} } - // merging headers from head and extra headers. HeaderMap::new() does not allocate. - let empty_headers = HeaderMap::new(); - let extra_headers = self.extra_headers().unwrap_or(&empty_headers); - let headers = self - .headers() - .inner - .iter() - .filter(|(name, _)| !extra_headers.contains_key(*name)) - .chain(extra_headers.inner.iter()); - // write headers let mut has_date = false; - let mut buf = dst.chunk_mut().as_mut_ptr() as *mut u8; + let mut buf = dst.chunk_mut().as_mut_ptr(); let mut remaining = dst.capacity() - dst.len(); // tracks bytes written since last buffer resize @@ -143,10 +133,10 @@ pub(crate) trait MessageType: Sized { // container's knowledge, this is used to sync the containers cursor after data is written let mut pos = 0; - for (key, value) in headers { + self.write_headers(|key, value| { match *key { - CONNECTION => continue, - TRANSFER_ENCODING | CONTENT_LENGTH if skip_len => continue, + CONNECTION => return, + TRANSFER_ENCODING | CONTENT_LENGTH if skip_len => return, DATE => has_date = true, _ => {} } @@ -155,7 +145,7 @@ pub(crate) trait MessageType: Sized { let k_len = k.len(); match value { - map::Value::One(ref val) => { + Value::One(ref val) => { let v = val.as_ref(); let v_len = v.len(); @@ -177,7 +167,7 @@ pub(crate) trait MessageType: Sized { // re-assign buf raw pointer since it's possible that the buffer was // reallocated and/or resized - buf = dst.chunk_mut().as_mut_ptr() as *mut u8; + buf = dst.chunk_mut().as_mut_ptr(); } // SAFETY: on each write, it is enough to ensure that the advancement of the @@ -206,7 +196,7 @@ pub(crate) trait MessageType: Sized { remaining -= len; } - map::Value::Multi(ref vec) => { + Value::Multi(ref vec) => { for val in vec { let v = val.as_ref(); let v_len = v.len(); @@ -224,7 +214,7 @@ pub(crate) trait MessageType: Sized { // re-assign buf raw pointer since it's possible that the buffer was // reallocated and/or resized - buf = dst.chunk_mut().as_mut_ptr() as *mut u8; + buf = dst.chunk_mut().as_mut_ptr(); } // SAFETY: on each write, it is enough to ensure that the advancement of @@ -253,7 +243,7 @@ pub(crate) trait MessageType: Sized { } } } - } + }); // final cursor synchronization with the bytes container // @@ -273,6 +263,24 @@ pub(crate) trait MessageType: Sized { Ok(()) } + + fn write_headers(&mut self, mut f: F) + where + F: FnMut(&HeaderName, &Value), + { + match self.extra_headers() { + Some(headers) => { + // merging headers from head and extra headers. + self.headers() + .inner + .iter() + .filter(|(name, _)| !headers.contains_key(*name)) + .chain(headers.inner.iter()) + .for_each(|(k, v)| f(k, v)) + } + None => self.headers().inner.iter().for_each(|(k, v)| f(k, v)), + } + } } impl MessageType for Response<()> { diff --git a/src/middleware/err_handlers.rs b/src/middleware/err_handlers.rs index dfd9a7dc5..44962aa98 100644 --- a/src/middleware/err_handlers.rs +++ b/src/middleware/err_handlers.rs @@ -1,10 +1,15 @@ //! For middleware documentation, see [`ErrorHandlers`]. -use std::rc::Rc; +use std::{ + future::Future, + pin::Pin, + rc::Rc, + task::{Context, Poll}, +}; use actix_service::{Service, Transform}; use ahash::AHashMap; -use futures_util::future::{ok, FutureExt, LocalBoxFuture, Ready}; +use futures_core::{future::LocalBoxFuture, ready}; use crate::{ dev::{ServiceRequest, ServiceResponse}, @@ -51,9 +56,11 @@ type ErrorHandler = dyn Fn(ServiceResponse) -> Result { - handlers: Rc>>>, + handlers: Handlers, } +type Handlers = Rc>>>; + impl Default for ErrorHandlers { fn default() -> Self { ErrorHandlers { @@ -82,7 +89,7 @@ impl ErrorHandlers { impl Transform for ErrorHandlers where - S: Service, Error = Error>, + S: Service, Error = Error> + 'static, S::Future: 'static, B: 'static, { @@ -90,20 +97,18 @@ where type Error = Error; type Transform = ErrorHandlersMiddleware; type InitError = (); - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; fn new_transform(&self, service: S) -> Self::Future { - ok(ErrorHandlersMiddleware { - service, - handlers: self.handlers.clone(), - }) + let handlers = self.handlers.clone(); + Box::pin(async move { Ok(ErrorHandlersMiddleware { service, handlers }) }) } } #[doc(hidden)] pub struct ErrorHandlersMiddleware { service: S, - handlers: Rc>>>, + handlers: Handlers, } impl Service for ErrorHandlersMiddleware @@ -114,35 +119,63 @@ where { type Response = ServiceResponse; type Error = Error; - type Future = LocalBoxFuture<'static, Result>; + type Future = ErrorHandlersFuture; actix_service::forward_ready!(service); fn call(&mut self, req: ServiceRequest) -> Self::Future { let handlers = self.handlers.clone(); let fut = self.service.call(req); + ErrorHandlersFuture::ServiceFuture { fut, handlers } + } +} - async move { - let res = fut.await?; +#[pin_project::pin_project(project = ErrorHandlersProj)] +pub enum ErrorHandlersFuture +where + Fut: Future, +{ + ServiceFuture { + #[pin] + fut: Fut, + handlers: Handlers, + }, + HandlerFuture { + fut: LocalBoxFuture<'static, Fut::Output>, + }, +} - if let Some(handler) = handlers.get(&res.status()) { - match handler(res) { - Ok(ErrorHandlerResponse::Response(res)) => Ok(res), - Ok(ErrorHandlerResponse::Future(fut)) => fut.await, - Err(e) => Err(e), +impl Future for ErrorHandlersFuture +where + Fut: Future, Error>>, +{ + type Output = Fut::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().project() { + ErrorHandlersProj::ServiceFuture { fut, handlers } => { + let res = ready!(fut.poll(cx))?; + match handlers.get(&res.status()) { + Some(handler) => match handler(res)? { + ErrorHandlerResponse::Response(res) => Poll::Ready(Ok(res)), + ErrorHandlerResponse::Future(fut) => { + self.as_mut() + .set(ErrorHandlersFuture::HandlerFuture { fut }); + self.poll(cx) + } + }, + None => Poll::Ready(Ok(res)), } - } else { - Ok(res) } + ErrorHandlersProj::HandlerFuture { fut } => fut.as_mut().poll(cx), } - .boxed_local() } } #[cfg(test)] mod tests { use actix_service::IntoService; - use futures_util::future::ok; + use futures_util::future::{ok, FutureExt}; use super::*; use crate::http::{header::CONTENT_TYPE, HeaderValue, StatusCode};