From 93161df141544277b1f7c983a5c3a616cfd44dd3 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 5 Jan 2021 07:47:38 +0800 Subject: [PATCH 1/2] clean up body type (#1872) --- actix-http/src/body.rs | 52 ++++++++++++++++-------------------------- 1 file changed, 20 insertions(+), 32 deletions(-) diff --git a/actix-http/src/body.rs b/actix-http/src/body.rs index 9636f2941..5d1cf7329 100644 --- a/actix-http/src/body.rs +++ b/actix-http/src/body.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, mem}; @@ -68,7 +67,7 @@ impl MessageBody for Box { #[pin_project(project = ResponseBodyProj)] pub enum ResponseBody { Body(#[pin] B), - Other(#[pin] Body), + Other(Body), } impl ResponseBody { @@ -110,7 +109,7 @@ impl MessageBody for ResponseBody { ) -> Poll>> { match self.project() { ResponseBodyProj::Body(body) => body.poll_next(cx), - ResponseBodyProj::Other(body) => body.poll_next(cx), + ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx), } } } @@ -124,12 +123,11 @@ impl Stream for ResponseBody { ) -> Poll> { match self.project() { ResponseBodyProj::Body(body) => body.poll_next(cx), - ResponseBodyProj::Other(body) => body.poll_next(cx), + ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx), } } } -#[pin_project(project = BodyProj)] /// Represents various types of http message body. pub enum Body { /// Empty response. `Content-Length` header is not set. @@ -168,10 +166,10 @@ impl MessageBody for Body { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - match self.project() { - BodyProj::None => Poll::Ready(None), - BodyProj::Empty => Poll::Ready(None), - BodyProj::Bytes(ref mut bin) => { + match self.get_mut() { + Body::None => Poll::Ready(None), + Body::Empty => Poll::Ready(None), + Body::Bytes(ref mut bin) => { let len = bin.len(); if len == 0 { Poll::Ready(None) @@ -179,7 +177,7 @@ impl MessageBody for Body { Poll::Ready(Some(Ok(mem::take(bin)))) } } - BodyProj::Message(ref mut body) => Pin::new(body.as_mut()).poll_next(cx), + Body::Message(body) => Pin::new(&mut **body).poll_next(cx), } } } @@ -266,12 +264,12 @@ where } } -impl From> for Body +impl From> for Body where S: Stream> + Unpin + 'static, E: Into + 'static, { - fn from(s: BodyStream) -> Body { + fn from(s: BodyStream) -> Body { Body::from_message(s) } } @@ -367,27 +365,21 @@ impl MessageBody for String { /// Type represent streaming body. /// Response does not contain `content-length` header and appropriate transfer encoding is used. -#[pin_project] -pub struct BodyStream { - #[pin] +pub struct BodyStream { stream: S, - _phantom: PhantomData, } -impl BodyStream +impl BodyStream where S: Stream> + Unpin, E: Into, { pub fn new(stream: S) -> Self { - BodyStream { - stream, - _phantom: PhantomData, - } + BodyStream { stream } } } -impl MessageBody for BodyStream +impl MessageBody for BodyStream where S: Stream> + Unpin, E: Into, @@ -402,13 +394,12 @@ where /// ended on a zero-length chunk, but rather proceed until the underlying /// [`Stream`] ends. fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let mut stream = self.project().stream; loop { - let stream = stream.as_mut(); - return Poll::Ready(match ready!(stream.poll_next(cx)) { + let stream = &mut self.as_mut().stream; + return Poll::Ready(match ready!(Pin::new(stream).poll_next(cx)) { Some(Ok(ref bytes)) if bytes.is_empty() => continue, opt => opt.map(|res| res.map_err(Into::into)), }); @@ -418,10 +409,8 @@ where /// Type represent streaming body. This body implementation should be used /// if total size of stream is known. Data get sent as is without using transfer encoding. -#[pin_project] pub struct SizedStream { size: u64, - #[pin] stream: S, } @@ -448,13 +437,12 @@ where /// ended on a zero-length chunk, but rather proceed until the underlying /// [`Stream`] ends. fn poll_next( - self: Pin<&mut Self>, + mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let mut stream: Pin<&mut S> = self.project().stream; loop { - let stream = stream.as_mut(); - return Poll::Ready(match ready!(stream.poll_next(cx)) { + let stream = &mut self.as_mut().stream; + return Poll::Ready(match ready!(Pin::new(stream).poll_next(cx)) { Some(Ok(ref bytes)) if bytes.is_empty() => continue, val => val, }); From 4f5971d79ef7d3217cfaa9b82169bee950b945b9 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 5 Jan 2021 08:22:57 +0800 Subject: [PATCH 2/2] add Compat middleware (#1865) --- CHANGES.md | 7 +- src/middleware/compat.rs | 192 ++++++++++++++++++++++++++++++++++++ src/middleware/condition.rs | 4 +- src/middleware/mod.rs | 2 + 4 files changed, 203 insertions(+), 2 deletions(-) create mode 100644 src/middleware/compat.rs diff --git a/CHANGES.md b/CHANGES.md index 32f444ec1..0077cd518 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,11 @@ # Changes ## Unreleased - 2021-xx-xx +### Added +* `Compat` middleware enabling generic response body/error type of middlewares + like `Logger` and `Compress` to be used in `middleware::Condition` + and `Resource`, `Scope` services. [#1865] + ### Changed * Update `actix-*` dependencies to tokio `1.0` based versions. [#1813] * Bumped `rand` to `0.8`. @@ -9,7 +14,7 @@ * MSRV is now 1.46.0. [#1813]: https://github.com/actix/actix-web/pull/1813 - +[#1865]: https://github.com/actix/actix-web/pull/1865 ### Fixed * added the actual parsing error to `test::read_body_json` [#1812] diff --git a/src/middleware/compat.rs b/src/middleware/compat.rs new file mode 100644 index 000000000..66346b6d6 --- /dev/null +++ b/src/middleware/compat.rs @@ -0,0 +1,192 @@ +//! `Middleware` for enabling any middleware to be used in `Resource`, `Scope` and `Condition`. +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use actix_http::body::Body; +use actix_http::body::{MessageBody, ResponseBody}; +use actix_service::{Service, Transform}; +use futures_core::future::LocalBoxFuture; +use futures_core::ready; + +use crate::error::Error; +use crate::service::ServiceResponse; + +/// `Middleware` for enabling any middleware to be used in `Resource`, `Scope` and `Condition`. +/// +/// +/// ## Usage +/// +/// ```rust +/// use actix_web::middleware::{Logger, Compat}; +/// use actix_web::{App, web}; +/// +/// let logger = Logger::default(); +/// +/// // this would not compile +/// // let app = App::new().service(web::scope("scoped").wrap(logger)); +/// +/// // by using scoped middleware we can use logger in scope. +/// let app = App::new().service(web::scope("scoped").wrap(Compat::new(logger))); +/// ``` +pub struct Compat { + transform: T, +} + +impl Compat { + pub fn new(transform: T) -> Self { + Self { transform } + } +} + +impl Transform for Compat +where + S: Service, + T: Transform, + T::Future: 'static, + T::Response: MapServiceResponseBody, + Error: From, +{ + type Response = ServiceResponse; + type Error = Error; + type Transform = CompatMiddleware; + type InitError = T::InitError; + type Future = LocalBoxFuture<'static, Result>; + + fn new_transform(&self, service: S) -> Self::Future { + let fut = self.transform.new_transform(service); + Box::pin(async move { + let service = fut.await?; + Ok(CompatMiddleware { service }) + }) + } +} + +pub struct CompatMiddleware { + service: S, +} + +impl Service for CompatMiddleware +where + S: Service, + S::Response: MapServiceResponseBody, + Error: From, +{ + type Response = ServiceResponse; + type Error = Error; + type Future = CompatMiddlewareFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx).map_err(From::from) + } + + fn call(&mut self, req: Req) -> Self::Future { + let fut = self.service.call(req); + CompatMiddlewareFuture { fut } + } +} + +#[pin_project::pin_project] +pub struct CompatMiddlewareFuture { + #[pin] + fut: Fut, +} + +impl Future for CompatMiddlewareFuture +where + Fut: Future>, + T: MapServiceResponseBody, + Error: From, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = ready!(self.project().fut.poll(cx))?; + Poll::Ready(Ok(res.map_body())) + } +} + +// trait for convert ServiceResponse's ResponseBody generic type +// to ResponseBody +pub trait MapServiceResponseBody { + fn map_body(self) -> ServiceResponse; +} + +impl MapServiceResponseBody for ServiceResponse { + fn map_body(self) -> ServiceResponse { + self.map_body(|_, body| ResponseBody::Other(Body::from_message(body))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use actix_service::IntoService; + + use crate::dev::ServiceRequest; + use crate::http::StatusCode; + use crate::middleware::{Compress, Condition, Logger}; + use crate::test::{call_service, init_service, TestRequest}; + use crate::{web, App, HttpResponse}; + + #[actix_rt::test] + async fn test_scope_middleware() { + let logger = Logger::default(); + let compress = Compress::default(); + + let mut srv = init_service( + App::new().service( + web::scope("app") + .wrap(Compat::new(logger)) + .wrap(Compat::new(compress)) + .service( + web::resource("/test").route(web::get().to(HttpResponse::Ok)), + ), + ), + ) + .await; + + let req = TestRequest::with_uri("/app/test").to_request(); + let resp = call_service(&mut srv, req).await; + assert_eq!(resp.status(), StatusCode::OK); + } + + #[actix_rt::test] + async fn test_resource_scope_middleware() { + let logger = Logger::default(); + let compress = Compress::default(); + + let mut srv = init_service( + App::new().service( + web::resource("app/test") + .wrap(Compat::new(logger)) + .wrap(Compat::new(compress)) + .route(web::get().to(HttpResponse::Ok)), + ), + ) + .await; + + let req = TestRequest::with_uri("/app/test").to_request(); + let resp = call_service(&mut srv, req).await; + assert_eq!(resp.status(), StatusCode::OK); + } + + #[actix_rt::test] + async fn test_condition_scope_middleware() { + let srv = |req: ServiceRequest| { + Box::pin(async move { + Ok(req.into_response(HttpResponse::InternalServerError().finish())) + }) + }; + + let logger = Logger::default(); + + let mut mw = Condition::new(true, Compat::new(logger)) + .new_transform(srv.into_service()) + .await + .unwrap(); + let resp = call_service(&mut mw, TestRequest::default().to_srv_request()).await; + assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); + } +} diff --git a/src/middleware/condition.rs b/src/middleware/condition.rs index 87323e325..04173e053 100644 --- a/src/middleware/condition.rs +++ b/src/middleware/condition.rs @@ -6,7 +6,9 @@ use futures_util::future::{ok, Either, FutureExt, LocalBoxFuture}; /// `Middleware` for conditionally enables another middleware. /// The controlled middleware must not change the `Service` interfaces. -/// This means you cannot control such middlewares like `Logger` or `Compress`. +/// +/// This means you cannot control such middlewares like `Logger` or `Compress` directly. +/// *. See `Compat` middleware for alternative. /// /// ## Usage /// diff --git a/src/middleware/mod.rs b/src/middleware/mod.rs index 12c12a98c..af44fd8c7 100644 --- a/src/middleware/mod.rs +++ b/src/middleware/mod.rs @@ -5,12 +5,14 @@ mod compress; #[cfg(feature = "compress")] pub use self::compress::Compress; +mod compat; mod condition; mod defaultheaders; pub mod errhandlers; mod logger; pub mod normalize; +pub use self::compat::Compat; pub use self::condition::Condition; pub use self::defaultheaders::DefaultHeaders; pub use self::logger::Logger;