mirror of https://github.com/fafhrd91/actix-web
Merge branch 'master' into feat/ws-codec-frame-size
This commit is contained in:
commit
8d6d584faf
|
@ -1,6 +1,11 @@
|
|||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
### Added
|
||||
* `Compat` middleware enabling generic response body/error type of middlewares
|
||||
like `Logger` and `Compress` to be used in `middleware::Condition`
|
||||
and `Resource`, `Scope` services. [#1865]
|
||||
|
||||
### Changed
|
||||
* Update `actix-*` dependencies to tokio `1.0` based versions. [#1813]
|
||||
* Bumped `rand` to `0.8`.
|
||||
|
@ -9,7 +14,7 @@
|
|||
* MSRV is now 1.46.0.
|
||||
|
||||
[#1813]: https://github.com/actix/actix-web/pull/1813
|
||||
|
||||
[#1865]: https://github.com/actix/actix-web/pull/1865
|
||||
|
||||
### Fixed
|
||||
* added the actual parsing error to `test::read_body_json` [#1812]
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{fmt, mem};
|
||||
|
@ -68,7 +67,7 @@ impl<T: MessageBody + Unpin> MessageBody for Box<T> {
|
|||
#[pin_project(project = ResponseBodyProj)]
|
||||
pub enum ResponseBody<B> {
|
||||
Body(#[pin] B),
|
||||
Other(#[pin] Body),
|
||||
Other(Body),
|
||||
}
|
||||
|
||||
impl ResponseBody<Body> {
|
||||
|
@ -110,7 +109,7 @@ impl<B: MessageBody> MessageBody for ResponseBody<B> {
|
|||
) -> Poll<Option<Result<Bytes, Error>>> {
|
||||
match self.project() {
|
||||
ResponseBodyProj::Body(body) => body.poll_next(cx),
|
||||
ResponseBodyProj::Other(body) => body.poll_next(cx),
|
||||
ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -124,12 +123,11 @@ impl<B: MessageBody> Stream for ResponseBody<B> {
|
|||
) -> Poll<Option<Self::Item>> {
|
||||
match self.project() {
|
||||
ResponseBodyProj::Body(body) => body.poll_next(cx),
|
||||
ResponseBodyProj::Other(body) => body.poll_next(cx),
|
||||
ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project(project = BodyProj)]
|
||||
/// Represents various types of http message body.
|
||||
pub enum Body {
|
||||
/// Empty response. `Content-Length` header is not set.
|
||||
|
@ -168,10 +166,10 @@ impl MessageBody for Body {
|
|||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Bytes, Error>>> {
|
||||
match self.project() {
|
||||
BodyProj::None => Poll::Ready(None),
|
||||
BodyProj::Empty => Poll::Ready(None),
|
||||
BodyProj::Bytes(ref mut bin) => {
|
||||
match self.get_mut() {
|
||||
Body::None => Poll::Ready(None),
|
||||
Body::Empty => Poll::Ready(None),
|
||||
Body::Bytes(ref mut bin) => {
|
||||
let len = bin.len();
|
||||
if len == 0 {
|
||||
Poll::Ready(None)
|
||||
|
@ -179,7 +177,7 @@ impl MessageBody for Body {
|
|||
Poll::Ready(Some(Ok(mem::take(bin))))
|
||||
}
|
||||
}
|
||||
BodyProj::Message(ref mut body) => Pin::new(body.as_mut()).poll_next(cx),
|
||||
Body::Message(body) => Pin::new(&mut **body).poll_next(cx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -266,12 +264,12 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<S, E> From<BodyStream<S, E>> for Body
|
||||
impl<S, E> From<BodyStream<S>> for Body
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
|
||||
E: Into<Error> + 'static,
|
||||
{
|
||||
fn from(s: BodyStream<S, E>) -> Body {
|
||||
fn from(s: BodyStream<S>) -> Body {
|
||||
Body::from_message(s)
|
||||
}
|
||||
}
|
||||
|
@ -367,27 +365,21 @@ impl MessageBody for String {
|
|||
|
||||
/// Type represent streaming body.
|
||||
/// Response does not contain `content-length` header and appropriate transfer encoding is used.
|
||||
#[pin_project]
|
||||
pub struct BodyStream<S: Unpin, E> {
|
||||
#[pin]
|
||||
pub struct BodyStream<S: Unpin> {
|
||||
stream: S,
|
||||
_phantom: PhantomData<E>,
|
||||
}
|
||||
|
||||
impl<S, E> BodyStream<S, E>
|
||||
impl<S, E> BodyStream<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + Unpin,
|
||||
E: Into<Error>,
|
||||
{
|
||||
pub fn new(stream: S) -> Self {
|
||||
BodyStream {
|
||||
stream,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
BodyStream { stream }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, E> MessageBody for BodyStream<S, E>
|
||||
impl<S, E> MessageBody for BodyStream<S>
|
||||
where
|
||||
S: Stream<Item = Result<Bytes, E>> + Unpin,
|
||||
E: Into<Error>,
|
||||
|
@ -402,13 +394,12 @@ where
|
|||
/// ended on a zero-length chunk, but rather proceed until the underlying
|
||||
/// [`Stream`] ends.
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Bytes, Error>>> {
|
||||
let mut stream = self.project().stream;
|
||||
loop {
|
||||
let stream = stream.as_mut();
|
||||
return Poll::Ready(match ready!(stream.poll_next(cx)) {
|
||||
let stream = &mut self.as_mut().stream;
|
||||
return Poll::Ready(match ready!(Pin::new(stream).poll_next(cx)) {
|
||||
Some(Ok(ref bytes)) if bytes.is_empty() => continue,
|
||||
opt => opt.map(|res| res.map_err(Into::into)),
|
||||
});
|
||||
|
@ -418,10 +409,8 @@ where
|
|||
|
||||
/// Type represent streaming body. This body implementation should be used
|
||||
/// if total size of stream is known. Data get sent as is without using transfer encoding.
|
||||
#[pin_project]
|
||||
pub struct SizedStream<S: Unpin> {
|
||||
size: u64,
|
||||
#[pin]
|
||||
stream: S,
|
||||
}
|
||||
|
||||
|
@ -448,13 +437,12 @@ where
|
|||
/// ended on a zero-length chunk, but rather proceed until the underlying
|
||||
/// [`Stream`] ends.
|
||||
fn poll_next(
|
||||
self: Pin<&mut Self>,
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<Bytes, Error>>> {
|
||||
let mut stream: Pin<&mut S> = self.project().stream;
|
||||
loop {
|
||||
let stream = stream.as_mut();
|
||||
return Poll::Ready(match ready!(stream.poll_next(cx)) {
|
||||
let stream = &mut self.as_mut().stream;
|
||||
return Poll::Ready(match ready!(Pin::new(stream).poll_next(cx)) {
|
||||
Some(Ok(ref bytes)) if bytes.is_empty() => continue,
|
||||
val => val,
|
||||
});
|
||||
|
|
|
@ -0,0 +1,192 @@
|
|||
//! `Middleware` for enabling any middleware to be used in `Resource`, `Scope` and `Condition`.
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use actix_http::body::Body;
|
||||
use actix_http::body::{MessageBody, ResponseBody};
|
||||
use actix_service::{Service, Transform};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use futures_core::ready;
|
||||
|
||||
use crate::error::Error;
|
||||
use crate::service::ServiceResponse;
|
||||
|
||||
/// `Middleware` for enabling any middleware to be used in `Resource`, `Scope` and `Condition`.
|
||||
///
|
||||
///
|
||||
/// ## Usage
|
||||
///
|
||||
/// ```rust
|
||||
/// use actix_web::middleware::{Logger, Compat};
|
||||
/// use actix_web::{App, web};
|
||||
///
|
||||
/// let logger = Logger::default();
|
||||
///
|
||||
/// // this would not compile
|
||||
/// // let app = App::new().service(web::scope("scoped").wrap(logger));
|
||||
///
|
||||
/// // by using scoped middleware we can use logger in scope.
|
||||
/// let app = App::new().service(web::scope("scoped").wrap(Compat::new(logger)));
|
||||
/// ```
|
||||
pub struct Compat<T> {
|
||||
transform: T,
|
||||
}
|
||||
|
||||
impl<T> Compat<T> {
|
||||
pub fn new(transform: T) -> Self {
|
||||
Self { transform }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T, Req> Transform<S, Req> for Compat<T>
|
||||
where
|
||||
S: Service<Req>,
|
||||
T: Transform<S, Req>,
|
||||
T::Future: 'static,
|
||||
T::Response: MapServiceResponseBody,
|
||||
Error: From<T::Error>,
|
||||
{
|
||||
type Response = ServiceResponse;
|
||||
type Error = Error;
|
||||
type Transform = CompatMiddleware<T::Transform>;
|
||||
type InitError = T::InitError;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Transform, Self::InitError>>;
|
||||
|
||||
fn new_transform(&self, service: S) -> Self::Future {
|
||||
let fut = self.transform.new_transform(service);
|
||||
Box::pin(async move {
|
||||
let service = fut.await?;
|
||||
Ok(CompatMiddleware { service })
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CompatMiddleware<S> {
|
||||
service: S,
|
||||
}
|
||||
|
||||
impl<S, Req> Service<Req> for CompatMiddleware<S>
|
||||
where
|
||||
S: Service<Req>,
|
||||
S::Response: MapServiceResponseBody,
|
||||
Error: From<S::Error>,
|
||||
{
|
||||
type Response = ServiceResponse;
|
||||
type Error = Error;
|
||||
type Future = CompatMiddlewareFuture<S::Future>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.service.poll_ready(cx).map_err(From::from)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Req) -> Self::Future {
|
||||
let fut = self.service.call(req);
|
||||
CompatMiddlewareFuture { fut }
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub struct CompatMiddlewareFuture<Fut> {
|
||||
#[pin]
|
||||
fut: Fut,
|
||||
}
|
||||
|
||||
impl<Fut, T, E> Future for CompatMiddlewareFuture<Fut>
|
||||
where
|
||||
Fut: Future<Output = Result<T, E>>,
|
||||
T: MapServiceResponseBody,
|
||||
Error: From<E>,
|
||||
{
|
||||
type Output = Result<ServiceResponse, Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let res = ready!(self.project().fut.poll(cx))?;
|
||||
Poll::Ready(Ok(res.map_body()))
|
||||
}
|
||||
}
|
||||
|
||||
// trait for convert ServiceResponse's ResponseBody<B> generic type
|
||||
// to ResponseBody<Body>
|
||||
pub trait MapServiceResponseBody {
|
||||
fn map_body(self) -> ServiceResponse;
|
||||
}
|
||||
|
||||
impl<B: MessageBody + Unpin + 'static> MapServiceResponseBody for ServiceResponse<B> {
|
||||
fn map_body(self) -> ServiceResponse {
|
||||
self.map_body(|_, body| ResponseBody::Other(Body::from_message(body)))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use actix_service::IntoService;
|
||||
|
||||
use crate::dev::ServiceRequest;
|
||||
use crate::http::StatusCode;
|
||||
use crate::middleware::{Compress, Condition, Logger};
|
||||
use crate::test::{call_service, init_service, TestRequest};
|
||||
use crate::{web, App, HttpResponse};
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_scope_middleware() {
|
||||
let logger = Logger::default();
|
||||
let compress = Compress::default();
|
||||
|
||||
let mut srv = init_service(
|
||||
App::new().service(
|
||||
web::scope("app")
|
||||
.wrap(Compat::new(logger))
|
||||
.wrap(Compat::new(compress))
|
||||
.service(
|
||||
web::resource("/test").route(web::get().to(HttpResponse::Ok)),
|
||||
),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let req = TestRequest::with_uri("/app/test").to_request();
|
||||
let resp = call_service(&mut srv, req).await;
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_resource_scope_middleware() {
|
||||
let logger = Logger::default();
|
||||
let compress = Compress::default();
|
||||
|
||||
let mut srv = init_service(
|
||||
App::new().service(
|
||||
web::resource("app/test")
|
||||
.wrap(Compat::new(logger))
|
||||
.wrap(Compat::new(compress))
|
||||
.route(web::get().to(HttpResponse::Ok)),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let req = TestRequest::with_uri("/app/test").to_request();
|
||||
let resp = call_service(&mut srv, req).await;
|
||||
assert_eq!(resp.status(), StatusCode::OK);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_condition_scope_middleware() {
|
||||
let srv = |req: ServiceRequest| {
|
||||
Box::pin(async move {
|
||||
Ok(req.into_response(HttpResponse::InternalServerError().finish()))
|
||||
})
|
||||
};
|
||||
|
||||
let logger = Logger::default();
|
||||
|
||||
let mut mw = Condition::new(true, Compat::new(logger))
|
||||
.new_transform(srv.into_service())
|
||||
.await
|
||||
.unwrap();
|
||||
let resp = call_service(&mut mw, TestRequest::default().to_srv_request()).await;
|
||||
assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
}
|
|
@ -6,7 +6,9 @@ use futures_util::future::{ok, Either, FutureExt, LocalBoxFuture};
|
|||
|
||||
/// `Middleware` for conditionally enables another middleware.
|
||||
/// The controlled middleware must not change the `Service` interfaces.
|
||||
/// This means you cannot control such middlewares like `Logger` or `Compress`.
|
||||
///
|
||||
/// This means you cannot control such middlewares like `Logger` or `Compress` directly.
|
||||
/// *. See `Compat` middleware for alternative.
|
||||
///
|
||||
/// ## Usage
|
||||
///
|
||||
|
|
|
@ -5,12 +5,14 @@ mod compress;
|
|||
#[cfg(feature = "compress")]
|
||||
pub use self::compress::Compress;
|
||||
|
||||
mod compat;
|
||||
mod condition;
|
||||
mod defaultheaders;
|
||||
pub mod errhandlers;
|
||||
mod logger;
|
||||
pub mod normalize;
|
||||
|
||||
pub use self::compat::Compat;
|
||||
pub use self::condition::Condition;
|
||||
pub use self::defaultheaders::DefaultHeaders;
|
||||
pub use self::logger::Logger;
|
||||
|
|
Loading…
Reference in New Issue