Merge branch 'master' into header-rework

This commit is contained in:
Rob Ede 2021-01-14 02:10:08 +00:00 committed by GitHub
commit 13bab910da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 83 additions and 42 deletions

View File

@ -8,7 +8,7 @@ use bytes::{BufMut, BytesMut};
use crate::body::BodySize; use crate::body::BodySize;
use crate::config::ServiceConfig; use crate::config::ServiceConfig;
use crate::header::map; use crate::header::{map::Value, HeaderName};
use crate::helpers; use crate::helpers;
use crate::http::header::{CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use crate::http::header::{CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING};
use crate::http::{HeaderMap, StatusCode, Version}; use crate::http::{HeaderMap, StatusCode, Version};
@ -121,21 +121,11 @@ pub(crate) trait MessageType: Sized {
_ => {} _ => {}
} }
// merging headers from head and extra headers. HeaderMap::new() does not allocate.
let empty_headers = HeaderMap::new();
let extra_headers = self.extra_headers().unwrap_or(&empty_headers);
let headers = self
.headers()
.inner
.iter()
.filter(|(name, _)| !extra_headers.contains_key(*name))
.chain(extra_headers.inner.iter());
// write headers // write headers
let mut has_date = false; let mut has_date = false;
let mut buf = dst.chunk_mut().as_mut_ptr() as *mut u8; let mut buf = dst.chunk_mut().as_mut_ptr();
let mut remaining = dst.capacity() - dst.len(); let mut remaining = dst.capacity() - dst.len();
// tracks bytes written since last buffer resize // tracks bytes written since last buffer resize
@ -143,10 +133,10 @@ pub(crate) trait MessageType: Sized {
// container's knowledge, this is used to sync the containers cursor after data is written // container's knowledge, this is used to sync the containers cursor after data is written
let mut pos = 0; let mut pos = 0;
for (key, value) in headers { self.write_headers(|key, value| {
match *key { match *key {
CONNECTION => continue, CONNECTION => return,
TRANSFER_ENCODING | CONTENT_LENGTH if skip_len => continue, TRANSFER_ENCODING | CONTENT_LENGTH if skip_len => return,
DATE => has_date = true, DATE => has_date = true,
_ => {} _ => {}
} }
@ -155,7 +145,7 @@ pub(crate) trait MessageType: Sized {
let k_len = k.len(); let k_len = k.len();
match value { match value {
map::Value::One(ref val) => { Value::One(ref val) => {
let v = val.as_ref(); let v = val.as_ref();
let v_len = v.len(); let v_len = v.len();
@ -177,7 +167,7 @@ pub(crate) trait MessageType: Sized {
// re-assign buf raw pointer since it's possible that the buffer was // re-assign buf raw pointer since it's possible that the buffer was
// reallocated and/or resized // reallocated and/or resized
buf = dst.chunk_mut().as_mut_ptr() as *mut u8; buf = dst.chunk_mut().as_mut_ptr();
} }
// SAFETY: on each write, it is enough to ensure that the advancement of the // SAFETY: on each write, it is enough to ensure that the advancement of the
@ -206,7 +196,7 @@ pub(crate) trait MessageType: Sized {
remaining -= len; remaining -= len;
} }
map::Value::Multi(ref vec) => { Value::Multi(ref vec) => {
for val in vec { for val in vec {
let v = val.as_ref(); let v = val.as_ref();
let v_len = v.len(); let v_len = v.len();
@ -224,7 +214,7 @@ pub(crate) trait MessageType: Sized {
// re-assign buf raw pointer since it's possible that the buffer was // re-assign buf raw pointer since it's possible that the buffer was
// reallocated and/or resized // reallocated and/or resized
buf = dst.chunk_mut().as_mut_ptr() as *mut u8; buf = dst.chunk_mut().as_mut_ptr();
} }
// SAFETY: on each write, it is enough to ensure that the advancement of // SAFETY: on each write, it is enough to ensure that the advancement of
@ -253,7 +243,7 @@ pub(crate) trait MessageType: Sized {
} }
} }
} }
} });
// final cursor synchronization with the bytes container // final cursor synchronization with the bytes container
// //
@ -273,6 +263,24 @@ pub(crate) trait MessageType: Sized {
Ok(()) Ok(())
} }
fn write_headers<F>(&mut self, mut f: F)
where
F: FnMut(&HeaderName, &Value),
{
match self.extra_headers() {
Some(headers) => {
// merging headers from head and extra headers.
self.headers()
.inner
.iter()
.filter(|(name, _)| !headers.contains_key(*name))
.chain(headers.inner.iter())
.for_each(|(k, v)| f(k, v))
}
None => self.headers().inner.iter().for_each(|(k, v)| f(k, v)),
}
}
} }
impl MessageType for Response<()> { impl MessageType for Response<()> {

View File

@ -1,10 +1,15 @@
//! For middleware documentation, see [`ErrorHandlers`]. //! For middleware documentation, see [`ErrorHandlers`].
use std::rc::Rc; use std::{
future::Future,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
use actix_service::{Service, Transform}; use actix_service::{Service, Transform};
use ahash::AHashMap; use ahash::AHashMap;
use futures_util::future::{ok, FutureExt, LocalBoxFuture, Ready}; use futures_core::{future::LocalBoxFuture, ready};
use crate::{ use crate::{
dev::{ServiceRequest, ServiceResponse}, dev::{ServiceRequest, ServiceResponse},
@ -51,9 +56,11 @@ type ErrorHandler<B> = dyn Fn(ServiceResponse<B>) -> Result<ErrorHandlerResponse
/// )); /// ));
/// ``` /// ```
pub struct ErrorHandlers<B> { pub struct ErrorHandlers<B> {
handlers: Rc<AHashMap<StatusCode, Box<ErrorHandler<B>>>>, handlers: Handlers<B>,
} }
type Handlers<B> = Rc<AHashMap<StatusCode, Box<ErrorHandler<B>>>>;
impl<B> Default for ErrorHandlers<B> { impl<B> Default for ErrorHandlers<B> {
fn default() -> Self { fn default() -> Self {
ErrorHandlers { ErrorHandlers {
@ -82,7 +89,7 @@ impl<B> ErrorHandlers<B> {
impl<S, B> Transform<S, ServiceRequest> for ErrorHandlers<B> impl<S, B> Transform<S, ServiceRequest> for ErrorHandlers<B>
where where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>, S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
S::Future: 'static, S::Future: 'static,
B: 'static, B: 'static,
{ {
@ -90,20 +97,18 @@ where
type Error = Error; type Error = Error;
type Transform = ErrorHandlersMiddleware<S, B>; type Transform = ErrorHandlersMiddleware<S, B>;
type InitError = (); type InitError = ();
type Future = Ready<Result<Self::Transform, Self::InitError>>; type Future = LocalBoxFuture<'static, Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future { fn new_transform(&self, service: S) -> Self::Future {
ok(ErrorHandlersMiddleware { let handlers = self.handlers.clone();
service, Box::pin(async move { Ok(ErrorHandlersMiddleware { service, handlers }) })
handlers: self.handlers.clone(),
})
} }
} }
#[doc(hidden)] #[doc(hidden)]
pub struct ErrorHandlersMiddleware<S, B> { pub struct ErrorHandlersMiddleware<S, B> {
service: S, service: S,
handlers: Rc<AHashMap<StatusCode, Box<ErrorHandler<B>>>>, handlers: Handlers<B>,
} }
impl<S, B> Service<ServiceRequest> for ErrorHandlersMiddleware<S, B> impl<S, B> Service<ServiceRequest> for ErrorHandlersMiddleware<S, B>
@ -114,35 +119,63 @@ where
{ {
type Response = ServiceResponse<B>; type Response = ServiceResponse<B>;
type Error = Error; type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>; type Future = ErrorHandlersFuture<S::Future, B>;
actix_service::forward_ready!(service); actix_service::forward_ready!(service);
fn call(&mut self, req: ServiceRequest) -> Self::Future { fn call(&mut self, req: ServiceRequest) -> Self::Future {
let handlers = self.handlers.clone(); let handlers = self.handlers.clone();
let fut = self.service.call(req); let fut = self.service.call(req);
ErrorHandlersFuture::ServiceFuture { fut, handlers }
}
}
async move { #[pin_project::pin_project(project = ErrorHandlersProj)]
let res = fut.await?; pub enum ErrorHandlersFuture<Fut, B>
where
Fut: Future,
{
ServiceFuture {
#[pin]
fut: Fut,
handlers: Handlers<B>,
},
HandlerFuture {
fut: LocalBoxFuture<'static, Fut::Output>,
},
}
if let Some(handler) = handlers.get(&res.status()) { impl<Fut, B> Future for ErrorHandlersFuture<Fut, B>
match handler(res) { where
Ok(ErrorHandlerResponse::Response(res)) => Ok(res), Fut: Future<Output = Result<ServiceResponse<B>, Error>>,
Ok(ErrorHandlerResponse::Future(fut)) => fut.await, {
Err(e) => Err(e), type Output = Fut::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().project() {
ErrorHandlersProj::ServiceFuture { fut, handlers } => {
let res = ready!(fut.poll(cx))?;
match handlers.get(&res.status()) {
Some(handler) => match handler(res)? {
ErrorHandlerResponse::Response(res) => Poll::Ready(Ok(res)),
ErrorHandlerResponse::Future(fut) => {
self.as_mut()
.set(ErrorHandlersFuture::HandlerFuture { fut });
self.poll(cx)
} }
} else { },
Ok(res) None => Poll::Ready(Ok(res)),
} }
} }
.boxed_local() ErrorHandlersProj::HandlerFuture { fut } => fut.as_mut().poll(cx),
}
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use actix_service::IntoService; use actix_service::IntoService;
use futures_util::future::ok; use futures_util::future::{ok, FutureExt};
use super::*; use super::*;
use crate::http::{header::CONTENT_TYPE, HeaderValue, StatusCode}; use crate::http::{header::CONTENT_TYPE, HeaderValue, StatusCode};