diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3aac6efa8..585b3f497 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -86,7 +86,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: test - args: -v --workspace --all-features --no-fail-fast -- --nocapture + args: --workspace --all-features --no-fail-fast -- --nocapture --skip=test_h2_content_length --skip=test_reading_deflate_encoding_large_random_rustls diff --git a/CHANGES.md b/CHANGES.md index 6fe0174ad..162f9f61b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,8 +1,21 @@ # Changes ## Unreleased - 2021-xx-xx +### Added +* `HttpServer::worker_max_blocking_threads` for setting block thread pool. [#2200] + ### Changed +* `ServiceResponse::error_response` now uses body type of `Body`. [#2201] +* `ServiceResponse::checked_expr` now returns a `Result`. [#2201] * Update `language-tags` to `0.3`. +* `ServiceResponse::take_body`. [#2201] +* `ServiceResponse::map_body` closure receives and returns `B` instead of `ResponseBody` types. [#2201] + +### Removed +* `HttpResponse::take_body` and old `HttpResponse::into_body` method that casted body type. [#2201] + +[#2200]: https://github.com/actix/actix-web/pull/2200 +[#2201]: https://github.com/actix/actix-web/pull/2201 ## 4.0.0-beta.6 - 2021-04-17 diff --git a/actix-files/src/files.rs b/actix-files/src/files.rs index 8e28cb45e..25706a232 100644 --- a/actix-files/src/files.rs +++ b/actix-files/src/files.rs @@ -66,6 +66,7 @@ impl Clone for Files { } } } + impl Files { /// Create new `Files` instance for a specified base directory. /// @@ -83,7 +84,7 @@ impl Files { /// /// `Files` utilizes the existing Tokio thread-pool for blocking filesystem operations. /// The number of running threads is adjusted over time as needed, up to a maximum of 512 times - /// the number of server [workers](HttpServer::workers), by default. + /// the number of server [workers](actix_web::HttpServer::workers), by default. pub fn new>(mount_path: &str, serve_from: T) -> Files { let orig_dir = serve_from.into(); let dir = match orig_dir.canonicalize() { diff --git a/actix-files/src/service.rs b/actix-files/src/service.rs index dc51ada18..31e1434bd 100644 --- a/actix-files/src/service.rs +++ b/actix-files/src/service.rs @@ -96,8 +96,7 @@ impl Service for FilesService { return Box::pin(ok(req.into_response( HttpResponse::Found() .insert_header((header::LOCATION, redirect_to)) - .body("") - .into_body(), + .finish(), ))); } diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index 9f59563ec..37a818f4a 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -2,24 +2,38 @@ ## Unreleased - 2021-xx-xx ### Added +* `BoxAnyBody`: a boxed message body with boxed errors. [#2183] * Re-export `http` crate's `Error` type as `error::HttpError`. [#2171] * Re-export `StatusCode`, `Method`, `Version` and `Uri` at the crate root. [#2171] * Re-export `ContentEncoding` and `ConnectionType` at the crate root. [#2171] +* `Response::into_body` that consumes response and returns body type. [#2201] +* `impl Default` for `Response`. [#2201] ### Changed +* The `MessageBody` trait now has an associated `Error` type. [#2183] +* Places in `Response` where `ResponseBody` was received or returned now simply use `B`. [#2201] * `header` mod is now public. [#2171] * `uri` mod is now public. [#2171] * Update `language-tags` to `0.3`. -* Reduce the level from `error` to `debug` for the log line that is emitted when a `500 Internal Server Error` is built using `HttpResponse::from_error`. [#2196] +* Reduce the level from `error` to `debug` for the log line that is emitted when a `500 Internal Server Error` is built using `HttpResponse::from_error`. [#2201] +* `ResponseBuilder::message_body` now returns a `Result`. [#2201] ### Removed * Stop re-exporting `http` crate's `HeaderMap` types in addition to ours. [#2171] +* Down-casting for `MessageBody` types. [#2183] +* `error::Result` alias. [#2201] +* Error field from `Response` and `Response::error`. [#2205] +* `impl Future` for `Response`. [#2201] +* `Response::take_body` and old `Response::into_body` method that casted body type. [#2201] ### Fixed * Converting an `HttpResponse` to an `Error` return the underlying `Error` if `HttpResponse` was built using `HttpResponse::from_error`. [#2171]: https://github.com/actix/actix-web/pull/2171 +[#2183]: https://github.com/actix/actix-web/pull/2183 [#2196]: https://github.com/actix/actix-web/pull/2196 +[#2201]: https://github.com/actix/actix-web/pull/2201 +[#2205]: https://github.com/actix/actix-web/pull/2205 ## 3.0.0-beta.6 - 2021-04-17 diff --git a/actix-http/examples/echo.rs b/actix-http/examples/echo.rs index b2cdb0be1..54a71a106 100644 --- a/actix-http/examples/echo.rs +++ b/actix-http/examples/echo.rs @@ -1,4 +1,4 @@ -use std::{env, io}; +use std::io; use actix_http::{http::StatusCode, Error, HttpService, Request, Response}; use actix_server::Server; @@ -9,8 +9,7 @@ use log::info; #[actix_rt::main] async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "echo=info"); - env_logger::init(); + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); Server::build() .bind("echo", "127.0.0.1:8080", || { diff --git a/actix-http/examples/echo2.rs b/actix-http/examples/echo2.rs index 9acf4bbae..3974cf20b 100644 --- a/actix-http/examples/echo2.rs +++ b/actix-http/examples/echo2.rs @@ -1,4 +1,4 @@ -use std::{env, io}; +use std::io; use actix_http::{body::Body, http::HeaderValue, http::StatusCode}; use actix_http::{Error, HttpService, Request, Response}; @@ -21,8 +21,7 @@ async fn handle_request(mut req: Request) -> Result, Error> { #[actix_rt::main] async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "echo=info"); - env_logger::init(); + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); Server::build() .bind("echo", "127.0.0.1:8080", || { diff --git a/actix-http/examples/hello-world.rs b/actix-http/examples/hello-world.rs index 85994556d..d51de6f4e 100644 --- a/actix-http/examples/hello-world.rs +++ b/actix-http/examples/hello-world.rs @@ -1,4 +1,4 @@ -use std::{env, io}; +use std::io; use actix_http::{http::StatusCode, HttpService, Response}; use actix_server::Server; @@ -8,8 +8,7 @@ use log::info; #[actix_rt::main] async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "hello_world=info"); - env_logger::init(); + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); Server::build() .bind("hello-world", "127.0.0.1:8080", || { diff --git a/actix-http/examples/ws.rs b/actix-http/examples/ws.rs index af66f7d71..d3cedf870 100644 --- a/actix-http/examples/ws.rs +++ b/actix-http/examples/ws.rs @@ -4,7 +4,7 @@ extern crate tls_rustls as rustls; use std::{ - env, io, + io, pin::Pin, task::{Context, Poll}, time::Duration, @@ -20,8 +20,7 @@ use futures_core::{ready, Stream}; #[actix_rt::main] async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "actix=info,h2_ws=info"); - env_logger::init(); + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); Server::build() .bind("tcp", ("127.0.0.1", 8080), || { @@ -41,7 +40,7 @@ async fn handler(req: Request) -> Result>, Error> // handshake will always fail under HTTP/2 log::info!("responding"); - Ok(res.message_body(BodyStream::new(Heartbeat::new(ws::Codec::new())))) + Ok(res.message_body(BodyStream::new(Heartbeat::new(ws::Codec::new())))?) } struct Heartbeat { diff --git a/actix-http/src/body/body.rs b/actix-http/src/body/body.rs index 4fe18338a..4c95bd31a 100644 --- a/actix-http/src/body/body.rs +++ b/actix-http/src/body/body.rs @@ -1,16 +1,17 @@ use std::{ borrow::Cow, + error::Error as StdError, fmt, mem, pin::Pin, task::{Context, Poll}, }; use bytes::{Bytes, BytesMut}; -use futures_core::Stream; +use futures_core::{ready, Stream}; use crate::error::Error; -use super::{BodySize, BodyStream, MessageBody, SizedStream}; +use super::{BodySize, BodyStream, MessageBody, MessageBodyMapErr, SizedStream}; /// Represents various types of HTTP message body. // #[deprecated(since = "4.0.0", note = "Use body types directly.")] @@ -25,7 +26,7 @@ pub enum Body { Bytes(Bytes), /// Generic message body. - Message(Pin>), + Message(BoxAnyBody), } impl Body { @@ -35,12 +36,18 @@ impl Body { } /// Create body from generic message body. - pub fn from_message(body: B) -> Body { - Body::Message(Box::pin(body)) + pub fn from_message(body: B) -> Body + where + B: MessageBody + 'static, + B::Error: Into>, + { + Self::Message(BoxAnyBody::from_body(body)) } } impl MessageBody for Body { + type Error = Error; + fn size(&self) -> BodySize { match self { Body::None => BodySize::None, @@ -53,7 +60,7 @@ impl MessageBody for Body { fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { match self.get_mut() { Body::None => Poll::Ready(None), Body::Empty => Poll::Ready(None), @@ -65,7 +72,13 @@ impl MessageBody for Body { Poll::Ready(Some(Ok(mem::take(bin)))) } } - Body::Message(body) => body.as_mut().poll_next(cx), + + // TODO: MSRV 1.51: poll_map_err + Body::Message(body) => match ready!(body.as_pin_mut().poll_next(cx)) { + Some(Err(err)) => Poll::Ready(Some(Err(err.into()))), + Some(Ok(val)) => Poll::Ready(Some(Ok(val))), + None => Poll::Ready(None), + }, } } } @@ -166,3 +179,51 @@ where Body::from_message(s) } } + +/// A boxed message body with boxed errors. +pub struct BoxAnyBody(Pin>>>); + +impl BoxAnyBody { + /// Boxes a `MessageBody` and any errors it generates. + pub fn from_body(body: B) -> Self + where + B: MessageBody + 'static, + B::Error: Into>, + { + let body = MessageBodyMapErr::new(body, Into::into); + Self(Box::pin(body)) + } + + /// Returns a mutable pinned reference to the inner message body type. + pub fn as_pin_mut( + &mut self, + ) -> Pin<&mut (dyn MessageBody>)> { + self.0.as_mut() + } +} + +impl fmt::Debug for BoxAnyBody { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("BoxAnyBody(dyn MessageBody)") + } +} + +impl MessageBody for BoxAnyBody { + type Error = Error; + + fn size(&self) -> BodySize { + self.0.size() + } + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + // TODO: MSRV 1.51: poll_map_err + match ready!(self.0.as_mut().poll_next(cx)) { + Some(Err(err)) => Poll::Ready(Some(Err(err.into()))), + Some(Ok(val)) => Poll::Ready(Some(Ok(val))), + None => Poll::Ready(None), + } + } +} diff --git a/actix-http/src/body/body_stream.rs b/actix-http/src/body/body_stream.rs index b81aeb4c1..ebe872022 100644 --- a/actix-http/src/body/body_stream.rs +++ b/actix-http/src/body/body_stream.rs @@ -36,6 +36,8 @@ where S: Stream>, E: Into, { + type Error = Error; + fn size(&self) -> BodySize { BodySize::Stream } @@ -48,7 +50,7 @@ where fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { loop { let stream = self.as_mut().project().stream; diff --git a/actix-http/src/body/message_body.rs b/actix-http/src/body/message_body.rs index 894a5fa98..2d2642ba7 100644 --- a/actix-http/src/body/message_body.rs +++ b/actix-http/src/body/message_body.rs @@ -1,12 +1,15 @@ //! [`MessageBody`] trait and foreign implementations. use std::{ + convert::Infallible, mem, pin::Pin, task::{Context, Poll}, }; use bytes::{Bytes, BytesMut}; +use futures_core::ready; +use pin_project_lite::pin_project; use crate::error::Error; @@ -14,6 +17,8 @@ use super::BodySize; /// An interface for response bodies. pub trait MessageBody { + type Error; + /// Body size hint. fn size(&self) -> BodySize; @@ -21,14 +26,12 @@ pub trait MessageBody { fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>>; - - downcast_get_type_id!(); + ) -> Poll>>; } -downcast!(MessageBody); - impl MessageBody for () { + type Error = Infallible; + fn size(&self) -> BodySize { BodySize::Empty } @@ -36,12 +39,18 @@ impl MessageBody for () { fn poll_next( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { Poll::Ready(None) } } -impl MessageBody for Box { +impl MessageBody for Box +where + B: MessageBody + Unpin, + B::Error: Into, +{ + type Error = B::Error; + fn size(&self) -> BodySize { self.as_ref().size() } @@ -49,12 +58,18 @@ impl MessageBody for Box { fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { Pin::new(self.get_mut().as_mut()).poll_next(cx) } } -impl MessageBody for Pin> { +impl MessageBody for Pin> +where + B: MessageBody, + B::Error: Into, +{ + type Error = B::Error; + fn size(&self) -> BodySize { self.as_ref().size() } @@ -62,12 +77,14 @@ impl MessageBody for Pin> { fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { self.as_mut().poll_next(cx) } } impl MessageBody for Bytes { + type Error = Infallible; + fn size(&self) -> BodySize { BodySize::Sized(self.len() as u64) } @@ -75,7 +92,7 @@ impl MessageBody for Bytes { fn poll_next( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { @@ -85,6 +102,8 @@ impl MessageBody for Bytes { } impl MessageBody for BytesMut { + type Error = Infallible; + fn size(&self) -> BodySize { BodySize::Sized(self.len() as u64) } @@ -92,7 +111,7 @@ impl MessageBody for BytesMut { fn poll_next( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { @@ -102,6 +121,8 @@ impl MessageBody for BytesMut { } impl MessageBody for &'static str { + type Error = Infallible; + fn size(&self) -> BodySize { BodySize::Sized(self.len() as u64) } @@ -109,7 +130,7 @@ impl MessageBody for &'static str { fn poll_next( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { @@ -121,6 +142,8 @@ impl MessageBody for &'static str { } impl MessageBody for Vec { + type Error = Infallible; + fn size(&self) -> BodySize { BodySize::Sized(self.len() as u64) } @@ -128,7 +151,7 @@ impl MessageBody for Vec { fn poll_next( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { @@ -138,6 +161,8 @@ impl MessageBody for Vec { } impl MessageBody for String { + type Error = Infallible; + fn size(&self) -> BodySize { BodySize::Sized(self.len() as u64) } @@ -145,7 +170,7 @@ impl MessageBody for String { fn poll_next( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { if self.is_empty() { Poll::Ready(None) } else { @@ -155,3 +180,53 @@ impl MessageBody for String { } } } + +pin_project! { + pub(crate) struct MessageBodyMapErr { + #[pin] + body: B, + mapper: Option, + } +} + +impl MessageBodyMapErr +where + B: MessageBody, + F: FnOnce(B::Error) -> E, +{ + pub(crate) fn new(body: B, mapper: F) -> Self { + Self { + body, + mapper: Some(mapper), + } + } +} + +impl MessageBody for MessageBodyMapErr +where + B: MessageBody, + F: FnOnce(B::Error) -> E, +{ + type Error = E; + + fn size(&self) -> BodySize { + self.body.size() + } + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let this = self.as_mut().project(); + + match ready!(this.body.poll_next(cx)) { + Some(Err(err)) => { + let f = self.as_mut().project().mapper.take().unwrap(); + let mapped_err = (f)(err); + Poll::Ready(Some(Err(mapped_err))) + } + Some(Ok(val)) => Poll::Ready(Some(Ok(val))), + None => Poll::Ready(None), + } + } +} diff --git a/actix-http/src/body/mod.rs b/actix-http/src/body/mod.rs index f26d6a8cf..cdfcd226b 100644 --- a/actix-http/src/body/mod.rs +++ b/actix-http/src/body/mod.rs @@ -15,9 +15,10 @@ mod response_body; mod size; mod sized_stream; -pub use self::body::Body; +pub use self::body::{Body, BoxAnyBody}; pub use self::body_stream::BodyStream; pub use self::message_body::MessageBody; +pub(crate) use self::message_body::MessageBodyMapErr; pub use self::response_body::ResponseBody; pub use self::size::BodySize; pub use self::sized_stream::SizedStream; @@ -41,7 +42,7 @@ pub use self::sized_stream::SizedStream; /// assert_eq!(bytes, b"123"[..]); /// # } /// ``` -pub async fn to_bytes(body: impl MessageBody) -> Result { +pub async fn to_bytes(body: B) -> Result { let cap = match body.size() { BodySize::None | BodySize::Empty | BodySize::Sized(0) => return Ok(Bytes::new()), BodySize::Sized(size) => size as usize, @@ -85,15 +86,6 @@ mod tests { } } - impl ResponseBody { - pub(crate) fn get_ref(&self) -> &[u8] { - match *self { - ResponseBody::Body(ref b) => b.get_ref(), - ResponseBody::Other(ref b) => b.get_ref(), - } - } - } - #[actix_rt::test] async fn test_static_str() { assert_eq!(Body::from("").size(), BodySize::Sized(0)); @@ -237,10 +229,13 @@ mod tests { ); } + // down-casting used to be done with a method on MessageBody trait + // test is kept to demonstrate equivalence of Any trait #[actix_rt::test] async fn test_body_casting() { let mut body = String::from("hello cast"); - let resp_body: &mut dyn MessageBody = &mut body; + // let mut resp_body: &mut dyn MessageBody = &mut body; + let resp_body: &mut dyn std::any::Any = &mut body; let body = resp_body.downcast_ref::().unwrap(); assert_eq!(body, "hello cast"); let body = &mut resp_body.downcast_mut::().unwrap(); diff --git a/actix-http/src/body/response_body.rs b/actix-http/src/body/response_body.rs index b27112475..855c742f2 100644 --- a/actix-http/src/body/response_body.rs +++ b/actix-http/src/body/response_body.rs @@ -5,7 +5,7 @@ use std::{ }; use bytes::Bytes; -use futures_core::Stream; +use futures_core::{ready, Stream}; use pin_project::pin_project; use crate::error::Error; @@ -43,7 +43,13 @@ impl ResponseBody { } } -impl MessageBody for ResponseBody { +impl MessageBody for ResponseBody +where + B: MessageBody, + B::Error: Into, +{ + type Error = Error; + fn size(&self) -> BodySize { match self { ResponseBody::Body(ref body) => body.size(), @@ -54,12 +60,16 @@ impl MessageBody for ResponseBody { fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { Stream::poll_next(self, cx) } } -impl Stream for ResponseBody { +impl Stream for ResponseBody +where + B: MessageBody, + B::Error: Into, +{ type Item = Result; fn poll_next( @@ -67,7 +77,12 @@ impl Stream for ResponseBody { cx: &mut Context<'_>, ) -> Poll> { match self.project() { - ResponseBodyProj::Body(body) => body.poll_next(cx), + // TODO: MSRV 1.51: poll_map_err + ResponseBodyProj::Body(body) => match ready!(body.poll_next(cx)) { + Some(Err(err)) => Poll::Ready(Some(Err(err.into()))), + Some(Ok(val)) => Poll::Ready(Some(Ok(val))), + None => Poll::Ready(None), + }, ResponseBodyProj::Other(body) => Pin::new(body).poll_next(cx), } } diff --git a/actix-http/src/body/sized_stream.rs b/actix-http/src/body/sized_stream.rs index f0332fc8f..4af132389 100644 --- a/actix-http/src/body/sized_stream.rs +++ b/actix-http/src/body/sized_stream.rs @@ -36,6 +36,8 @@ impl MessageBody for SizedStream where S: Stream>, { + type Error = Error; + fn size(&self) -> BodySize { BodySize::Sized(self.size as u64) } @@ -48,7 +50,7 @@ where fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { loop { let stream = self.as_mut().project().stream; diff --git a/actix-http/src/builder.rs b/actix-http/src/builder.rs index 623bfdda2..660cd9817 100644 --- a/actix-http/src/builder.rs +++ b/actix-http/src/builder.rs @@ -202,11 +202,13 @@ where /// Finish service configuration and create a HTTP service for HTTP/2 protocol. pub fn h2(self, service: F) -> H2Service where - B: MessageBody + 'static, F: IntoServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, + + B: MessageBody + 'static, + B::Error: Into, { let cfg = ServiceConfig::new( self.keep_alive, @@ -223,11 +225,13 @@ where /// Finish service configuration and create `HttpService` instance. pub fn finish(self, service: F) -> HttpService where - B: MessageBody + 'static, F: IntoServiceFactory, S::Error: Into + 'static, S::InitError: fmt::Debug, S::Response: Into> + 'static, + + B: MessageBody + 'static, + B::Error: Into, { let cfg = ServiceConfig::new( self.keep_alive, diff --git a/actix-http/src/client/connection.rs b/actix-http/src/client/connection.rs index 0e3e97f3f..a30f651ca 100644 --- a/actix-http/src/client/connection.rs +++ b/actix-http/src/client/connection.rs @@ -12,10 +12,10 @@ use bytes::Bytes; use futures_core::future::LocalBoxFuture; use h2::client::SendRequest; -use crate::body::MessageBody; use crate::h1::ClientCodec; use crate::message::{RequestHeadType, ResponseHead}; use crate::payload::Payload; +use crate::{body::MessageBody, Error}; use super::error::SendRequestError; use super::pool::Acquired; @@ -256,8 +256,9 @@ where body: RB, ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> where - RB: MessageBody + 'static, H: Into + 'static, + RB: MessageBody + 'static, + RB::Error: Into, { Box::pin(async move { match self { diff --git a/actix-http/src/client/h1proto.rs b/actix-http/src/client/h1proto.rs index fa4469d35..65a30748c 100644 --- a/actix-http/src/client/h1proto.rs +++ b/actix-http/src/client/h1proto.rs @@ -11,7 +11,6 @@ use bytes::{Bytes, BytesMut}; use futures_core::{ready, Stream}; use futures_util::SinkExt as _; -use crate::error::PayloadError; use crate::h1; use crate::http::{ header::{HeaderMap, IntoHeaderValue, EXPECT, HOST}, @@ -19,6 +18,7 @@ use crate::http::{ }; use crate::message::{RequestHeadType, ResponseHead}; use crate::payload::Payload; +use crate::{error::PayloadError, Error}; use super::connection::{ConnectionIo, H1Connection}; use super::error::{ConnectError, SendRequestError}; @@ -32,6 +32,7 @@ pub(crate) async fn send_request( where Io: ConnectionIo, B: MessageBody, + B::Error: Into, { // set request host header if !head.as_ref().headers.contains_key(HOST) @@ -154,6 +155,7 @@ pub(crate) async fn send_body( where Io: ConnectionIo, B: MessageBody, + B::Error: Into, { actix_rt::pin!(body); @@ -161,9 +163,10 @@ where while !eof { while !eof && !framed.as_ref().is_write_buf_full() { match poll_fn(|cx| body.as_mut().poll_next(cx)).await { - Some(result) => { - framed.as_mut().write(h1::Message::Chunk(Some(result?)))?; + Some(Ok(chunk)) => { + framed.as_mut().write(h1::Message::Chunk(Some(chunk)))?; } + Some(Err(err)) => return Err(err.into().into()), None => { eof = true; framed.as_mut().write(h1::Message::Chunk(None))?; diff --git a/actix-http/src/client/h2proto.rs b/actix-http/src/client/h2proto.rs index 8cb2e2522..cf423ef12 100644 --- a/actix-http/src/client/h2proto.rs +++ b/actix-http/src/client/h2proto.rs @@ -9,14 +9,19 @@ use h2::{ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, TRANSFER_ENCODING}; use http::{request::Request, Method, Version}; -use crate::body::{BodySize, MessageBody}; -use crate::header::HeaderMap; -use crate::message::{RequestHeadType, ResponseHead}; -use crate::payload::Payload; +use crate::{ + body::{BodySize, MessageBody}, + header::HeaderMap, + message::{RequestHeadType, ResponseHead}, + payload::Payload, + Error, +}; -use super::config::ConnectorConfig; -use super::connection::{ConnectionIo, H2Connection}; -use super::error::SendRequestError; +use super::{ + config::ConnectorConfig, + connection::{ConnectionIo, H2Connection}, + error::SendRequestError, +}; pub(crate) async fn send_request( mut io: H2Connection, @@ -26,6 +31,7 @@ pub(crate) async fn send_request( where Io: ConnectionIo, B: MessageBody, + B::Error: Into, { trace!("Sending client request: {:?} {:?}", head, body.size()); @@ -125,10 +131,14 @@ where Ok((head, payload)) } -async fn send_body( +async fn send_body( body: B, mut send: SendStream, -) -> Result<(), SendRequestError> { +) -> Result<(), SendRequestError> +where + B: MessageBody, + B::Error: Into, +{ let mut buf = None; actix_rt::pin!(body); loop { @@ -138,7 +148,7 @@ async fn send_body( send.reserve_capacity(b.len()); buf = Some(b); } - Some(Err(e)) => return Err(e.into()), + Some(Err(e)) => return Err(e.into().into()), None => { if let Err(e) = send.send_data(Bytes::new(), true) { return Err(e.into()); diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index add6ee980..b8bc8b68d 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -1,6 +1,7 @@ //! Stream encoders. use std::{ + error::Error as StdError, future::Future, io::{self, Write as _}, pin::Pin, @@ -10,12 +11,13 @@ use std::{ use actix_rt::task::{spawn_blocking, JoinHandle}; use brotli2::write::BrotliEncoder; use bytes::Bytes; +use derive_more::Display; use flate2::write::{GzEncoder, ZlibEncoder}; use futures_core::ready; use pin_project::pin_project; use crate::{ - body::{Body, BodySize, MessageBody, ResponseBody}, + body::{Body, BodySize, BoxAnyBody, MessageBody, ResponseBody}, http::{ header::{ContentEncoding, CONTENT_ENCODING}, HeaderValue, StatusCode, @@ -92,10 +94,16 @@ impl Encoder { enum EncoderBody { Bytes(Bytes), Stream(#[pin] B), - BoxedStream(Pin>), + BoxedStream(BoxAnyBody), } -impl MessageBody for EncoderBody { +impl MessageBody for EncoderBody +where + B: MessageBody, + B::Error: Into, +{ + type Error = EncoderError; + fn size(&self) -> BodySize { match self { EncoderBody::Bytes(ref b) => b.size(), @@ -107,7 +115,7 @@ impl MessageBody for EncoderBody { fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { match self.project() { EncoderBodyProj::Bytes(b) => { if b.is_empty() { @@ -116,13 +124,32 @@ impl MessageBody for EncoderBody { Poll::Ready(Some(Ok(std::mem::take(b)))) } } - EncoderBodyProj::Stream(b) => b.poll_next(cx), - EncoderBodyProj::BoxedStream(ref mut b) => b.as_mut().poll_next(cx), + // TODO: MSRV 1.51: poll_map_err + EncoderBodyProj::Stream(b) => match ready!(b.poll_next(cx)) { + Some(Err(err)) => Poll::Ready(Some(Err(EncoderError::Body(err)))), + Some(Ok(val)) => Poll::Ready(Some(Ok(val))), + None => Poll::Ready(None), + }, + EncoderBodyProj::BoxedStream(ref mut b) => { + match ready!(b.as_pin_mut().poll_next(cx)) { + Some(Err(err)) => { + Poll::Ready(Some(Err(EncoderError::Boxed(err.into())))) + } + Some(Ok(val)) => Poll::Ready(Some(Ok(val))), + None => Poll::Ready(None), + } + } } } } -impl MessageBody for Encoder { +impl MessageBody for Encoder +where + B: MessageBody, + B::Error: Into, +{ + type Error = EncoderError; + fn size(&self) -> BodySize { if self.encoder.is_none() { self.body.size() @@ -134,7 +161,7 @@ impl MessageBody for Encoder { fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { let mut this = self.project(); loop { if *this.eof { @@ -142,8 +169,9 @@ impl MessageBody for Encoder { } if let Some(ref mut fut) = this.fut { - let mut encoder = - ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??; + let mut encoder = ready!(Pin::new(fut).poll(cx)) + .map_err(|_| EncoderError::Blocking(BlockingError))? + .map_err(EncoderError::Io)?; let chunk = encoder.take(); *this.encoder = Some(encoder); @@ -162,7 +190,7 @@ impl MessageBody for Encoder { Some(Ok(chunk)) => { if let Some(mut encoder) = this.encoder.take() { if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE { - encoder.write(&chunk)?; + encoder.write(&chunk).map_err(EncoderError::Io)?; let chunk = encoder.take(); *this.encoder = Some(encoder); @@ -182,7 +210,7 @@ impl MessageBody for Encoder { None => { if let Some(encoder) = this.encoder.take() { - let chunk = encoder.finish()?; + let chunk = encoder.finish().map_err(EncoderError::Io)?; if chunk.is_empty() { return Poll::Ready(None); } else { @@ -281,3 +309,36 @@ impl ContentEncoder { } } } + +#[derive(Debug, Display)] +#[non_exhaustive] +pub enum EncoderError { + #[display(fmt = "body")] + Body(E), + + #[display(fmt = "boxed")] + Boxed(Error), + + #[display(fmt = "blocking")] + Blocking(BlockingError), + + #[display(fmt = "io")] + Io(io::Error), +} + +impl StdError for EncoderError { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + None + } +} + +impl> From> for Error { + fn from(err: EncoderError) -> Self { + match err { + EncoderError::Body(err) => err.into(), + EncoderError::Boxed(err) => err, + EncoderError::Blocking(err) => err.into(), + EncoderError::Io(err) => err.into(), + } + } +} diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 002331e47..88a7da26e 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -2,6 +2,7 @@ use std::{ cell::RefCell, + error::Error as StdError, fmt, io::{self, Write as _}, str::Utf8Error, @@ -17,12 +18,6 @@ use crate::{body::Body, helpers::Writer, Response, ResponseBuilder}; pub use http::Error as HttpError; -/// A specialized [`std::result::Result`] for Actix Web operations. -/// -/// This typedef is generally used to avoid writing out `actix_http::error::Error` directly and is -/// otherwise a direct mapping to `Result`. -pub type Result = std::result::Result; - /// General purpose actix web error. /// /// An actix web error is used to carry errors from `std::error` @@ -105,8 +100,7 @@ impl From<()> for Error { impl From for Error { fn from(_: std::convert::Infallible) -> Self { - // `std::convert::Infallible` indicates an error - // that will never happen + // hint that an error that will never happen unreachable!() } } @@ -149,6 +143,8 @@ impl From for Error { #[display(fmt = "Unknown Error")] struct UnitError; +impl ResponseError for Box {} + /// Returns [`StatusCode::INTERNAL_SERVER_ERROR`] for [`UnitError`]. impl ResponseError for UnitError {} @@ -472,9 +468,8 @@ impl ResponseError for ContentTypeError { /// /// ``` /// # use std::io; -/// # use actix_http::*; -/// -/// fn index(req: Request) -> Result<&'static str> { +/// # use actix_http::{error, Request}; +/// fn index(req: Request) -> Result<&'static str, actix_http::Error> { /// Err(error::ErrorBadRequest(io::Error::new(io::ErrorKind::Other, "error"))) /// } /// ``` diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 3b272f0fb..574f0b2a9 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -17,7 +17,7 @@ use futures_core::ready; use log::{error, trace}; use pin_project::pin_project; -use crate::body::{Body, BodySize, MessageBody, ResponseBody}; +use crate::body::{Body, BodySize, MessageBody}; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::error::{ParseError, PayloadError}; @@ -51,9 +51,13 @@ pub struct Dispatcher where S: Service, S::Error: Into, + B: MessageBody, + B::Error: Into, + X: Service, X::Error: Into, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { @@ -69,9 +73,13 @@ enum DispatcherState where S: Service, S::Error: Into, + B: MessageBody, + B::Error: Into, + X: Service, X::Error: Into, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { @@ -84,9 +92,13 @@ struct InnerDispatcher where S: Service, S::Error: Into, + B: MessageBody, + B::Error: Into, + X: Service, X::Error: Into, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { @@ -122,19 +134,25 @@ enum State where S: Service, X: Service, + B: MessageBody, + B::Error: Into, { None, ExpectCall(#[pin] X::Future), ServiceCall(#[pin] S::Future), - SendPayload(#[pin] ResponseBody), + SendPayload(#[pin] B), + SendErrorPayload(#[pin] Body), } impl State where S: Service, + X: Service, + B: MessageBody, + B::Error: Into, { fn is_empty(&self) -> bool { matches!(self, State::None) @@ -150,12 +168,17 @@ enum PollResponse { impl Dispatcher where T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Error: Into, S::Response: Into>, + B: MessageBody, + B::Error: Into, + X: Service, X::Error: Into, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { @@ -206,12 +229,17 @@ where impl InnerDispatcher where T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Error: Into, S::Response: Into>, + B: MessageBody, + B::Error: Into, + X: Service, X::Error: Into, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { @@ -268,11 +296,11 @@ where io.poll_flush(cx) } - fn send_response( + fn send_response_inner( self: Pin<&mut Self>, message: Response<()>, - body: ResponseBody, - ) -> Result<(), DispatchError> { + body: &impl MessageBody, + ) -> Result { let size = body.size(); let mut this = self.project(); this.codec @@ -285,10 +313,35 @@ where })?; this.flags.set(Flags::KEEPALIVE, this.codec.keepalive()); - match size { - BodySize::None | BodySize::Empty => this.state.set(State::None), - _ => this.state.set(State::SendPayload(body)), + + Ok(size) + } + + fn send_response( + mut self: Pin<&mut Self>, + message: Response<()>, + body: B, + ) -> Result<(), DispatchError> { + let size = self.as_mut().send_response_inner(message, &body)?; + let state = match size { + BodySize::None | BodySize::Empty => State::None, + _ => State::SendPayload(body), }; + self.project().state.set(state); + Ok(()) + } + + fn send_error_response( + mut self: Pin<&mut Self>, + message: Response<()>, + body: Body, + ) -> Result<(), DispatchError> { + let size = self.as_mut().send_response_inner(message, &body)?; + let state = match size { + BodySize::None | BodySize::Empty => State::None, + _ => State::SendErrorPayload(body), + }; + self.project().state.set(state); Ok(()) } @@ -326,8 +379,7 @@ where // send_response would update InnerDispatcher state to SendPayload or // None(If response body is empty). // continue loop to poll it. - self.as_mut() - .send_response(res, ResponseBody::Other(Body::Empty))?; + self.as_mut().send_error_response(res, Body::Empty)?; } // return with upgrade request and poll it exclusively. @@ -349,7 +401,7 @@ where Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - self.as_mut().send_response(res, body.into_body())?; + self.as_mut().send_error_response(res, body)?; } // service call pending and could be waiting for more chunk messages. @@ -365,6 +417,41 @@ where }, StateProj::SendPayload(mut stream) => { + // keep populate writer buffer until buffer size limit hit, + // get blocked or finished. + while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { + match stream.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(item))) => { + this.codec.encode( + Message::Chunk(Some(item)), + &mut this.write_buf, + )?; + } + + Poll::Ready(None) => { + this.codec + .encode(Message::Chunk(None), &mut this.write_buf)?; + // payload stream finished. + // set state to None and handle next message + this.state.set(State::None); + continue 'res; + } + + Poll::Ready(Some(Err(err))) => { + return Err(DispatchError::Service(err.into())) + } + + Poll::Pending => return Ok(PollResponse::DoNothing), + } + } + // buffer is beyond max size. + // return and try to write the whole buffer to io stream. + return Ok(PollResponse::DrainWriteBuf); + } + + StateProj::SendErrorPayload(mut stream) => { + // TODO: de-dupe impl with SendPayload + // keep populate writer buffer until buffer size limit hit, // get blocked or finished. while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE { @@ -406,12 +493,14 @@ where let fut = this.flow.service.call(req); this.state.set(State::ServiceCall(fut)); } + // send expect error as response Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - self.as_mut().send_response(res, body.into_body())?; + self.as_mut().send_error_response(res, body)?; } + // expect must be solved before progress can be made. Poll::Pending => return Ok(PollResponse::DoNothing), }, @@ -459,7 +548,7 @@ where Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - return self.send_response(res, body.into_body()); + return self.send_error_response(res, body); } } } @@ -479,7 +568,7 @@ where Poll::Ready(Err(err)) => { let res = Response::from_error(err.into()); let (res, body) = res.replace_body(()); - self.send_response(res, body.into_body()) + self.send_error_response(res, body) } }; } @@ -599,8 +688,10 @@ where } // Requests overflow buffer size should be responded with 431 this.messages.push_back(DispatcherMessage::Error( - Response::new(StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE) - .drop_body(), + Response::with_body( + StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, + (), + ), )); this.flags.insert(Flags::READ_DISCONNECT); *this.error = Some(ParseError::TooLarge.into()); @@ -679,10 +770,9 @@ where } else { // timeout on first request (slow request) return 408 trace!("Slow request timeout"); - let _ = self.as_mut().send_response( - Response::new(StatusCode::REQUEST_TIMEOUT) - .drop_body(), - ResponseBody::Other(Body::Empty), + let _ = self.as_mut().send_error_response( + Response::with_body(StatusCode::REQUEST_TIMEOUT, ()), + Body::Empty, ); this = self.project(); this.flags.insert(Flags::STARTED | Flags::SHUTDOWN); @@ -817,12 +907,17 @@ where impl Future for Dispatcher where T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Error: Into, S::Response: Into>, + B: MessageBody, + B::Error: Into, + X: Service, X::Error: Into, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { diff --git a/actix-http/src/h1/encoder.rs b/actix-http/src/h1/encoder.rs index 4e9903284..eaabcb687 100644 --- a/actix-http/src/h1/encoder.rs +++ b/actix-http/src/h1/encoder.rs @@ -630,8 +630,7 @@ mod tests { async fn test_no_content_length() { let mut bytes = BytesMut::with_capacity(2048); - let mut res: Response<()> = - Response::new(StatusCode::SWITCHING_PROTOCOLS).into_body::<()>(); + let mut res = Response::with_body(StatusCode::SWITCHING_PROTOCOLS, ()); res.headers_mut().insert(DATE, HeaderValue::from_static("")); res.headers_mut() .insert(CONTENT_LENGTH, HeaderValue::from_static("0")); diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 916643a18..1ab85cbf3 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -64,11 +64,15 @@ where S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, + B: MessageBody, + B::Error: Into, + X: ServiceFactory, X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, + U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U::Future: 'static, U::Error: fmt::Display + Into, @@ -109,11 +113,15 @@ mod openssl { S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, + B: MessageBody, + B::Error: Into, + X: ServiceFactory, X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, + U: ServiceFactory< (Request, Framed, Codec>), Config = (), @@ -165,11 +173,15 @@ mod rustls { S::Error: Into, S::InitError: fmt::Debug, S::Response: Into>, + B: MessageBody, + B::Error: Into, + X: ServiceFactory, X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, + U: ServiceFactory< (Request, Framed, Codec>), Config = (), @@ -253,16 +265,21 @@ impl ServiceFactory<(T, Option)> for H1Service where T: AsyncRead + AsyncWrite + Unpin + 'static, + S: ServiceFactory, S::Future: 'static, S::Error: Into, S::Response: Into>, S::InitError: fmt::Debug, + B: MessageBody, + B::Error: Into, + X: ServiceFactory, X::Future: 'static, X::Error: Into, X::InitError: fmt::Debug, + U: ServiceFactory<(Request, Framed), Config = (), Response = ()>, U::Future: 'static, U::Error: fmt::Display + Into, @@ -319,12 +336,17 @@ impl Service<(T, Option)> for HttpServiceHandler where T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Error: Into, S::Response: Into>, + B: MessageBody, + B::Error: Into, + X: Service, X::Error: Into, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display + Into, { diff --git a/actix-http/src/h1/utils.rs b/actix-http/src/h1/utils.rs index 9e9c57137..90e44daa4 100644 --- a/actix-http/src/h1/utils.rs +++ b/actix-http/src/h1/utils.rs @@ -4,7 +4,7 @@ use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite, Framed}; -use crate::body::{BodySize, MessageBody, ResponseBody}; +use crate::body::{BodySize, MessageBody}; use crate::error::Error; use crate::h1::{Codec, Message}; use crate::response::Response; @@ -14,7 +14,7 @@ use crate::response::Response; pub struct SendResponse { res: Option, BodySize)>>, #[pin] - body: Option>, + body: Option, #[pin] framed: Option>, } @@ -22,6 +22,7 @@ pub struct SendResponse { impl SendResponse where B: MessageBody, + B::Error: Into, { pub fn new(framed: Framed, response: Response) -> Self { let (res, body) = response.into_parts(); @@ -38,6 +39,7 @@ impl Future for SendResponse where T: AsyncRead + AsyncWrite + Unpin, B: MessageBody + Unpin, + B::Error: Into, { type Output = Result, Error>; @@ -60,7 +62,18 @@ where .unwrap() .is_write_buf_full() { - match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx)? { + let next = + // TODO: MSRV 1.51: poll_map_err + match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx) { + Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(item)), + Poll::Ready(Some(Err(err))) => { + return Poll::Ready(Err(err.into())) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }; + + match next { Poll::Ready(item) => { // body is done when item is None body_done = item.is_none(); diff --git a/actix-http/src/h2/dispatcher.rs b/actix-http/src/h2/dispatcher.rs index 87dd66fe7..5be172aaf 100644 --- a/actix-http/src/h2/dispatcher.rs +++ b/actix-http/src/h2/dispatcher.rs @@ -12,7 +12,7 @@ use h2::{ use http::header::{HeaderValue, CONNECTION, CONTENT_LENGTH, DATE, TRANSFER_ENCODING}; use log::{error, trace}; -use crate::body::{BodySize, MessageBody, ResponseBody}; +use crate::body::{Body, BodySize, MessageBody}; use crate::config::ServiceConfig; use crate::error::{DispatchError, Error}; use crate::message::ResponseHead; @@ -69,11 +69,14 @@ where impl Future for Dispatcher where T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Error: Into + 'static, S::Future: 'static, S::Response: Into> + 'static, + B: MessageBody + 'static, + B::Error: Into, { type Output = Result<(), DispatchError>; @@ -132,7 +135,8 @@ struct ServiceResponse { #[pin_project::pin_project(project = ServiceResponseStateProj)] enum ServiceResponseState { ServiceCall(#[pin] F, Option>), - SendPayload(SendStream, #[pin] ResponseBody), + SendPayload(SendStream, #[pin] B), + SendErrorPayload(SendStream, #[pin] Body), } impl ServiceResponse @@ -140,7 +144,9 @@ where F: Future>, E: Into, I: Into>, + B: MessageBody, + B::Error: Into, { fn prepare_response( &self, @@ -216,7 +222,9 @@ where F: Future>, E: Into, I: Into>, + B: MessageBody, + B::Error: Into, { type Output = (); @@ -273,9 +281,8 @@ where if size.is_eof() { Poll::Ready(()) } else { - this.state.set(ServiceResponseState::SendPayload( - stream, - body.into_body(), + this.state.set(ServiceResponseState::SendErrorPayload( + stream, body, )); self.poll(cx) } @@ -324,8 +331,65 @@ where *this.buffer = Some(chunk); } + Some(Err(err)) => { + error!( + "Response payload stream error: {:?}", + err.into() + ); + + return Poll::Ready(()); + } + }, + } + } + } + + ServiceResponseStateProj::SendErrorPayload(ref mut stream, ref mut body) => { + // TODO: de-dupe impl with SendPayload + + loop { + match this.buffer { + Some(ref mut buffer) => match ready!(stream.poll_capacity(cx)) { + None => return Poll::Ready(()), + + Some(Ok(cap)) => { + let len = buffer.len(); + let bytes = buffer.split_to(cmp::min(cap, len)); + + if let Err(e) = stream.send_data(bytes, false) { + warn!("{:?}", e); + return Poll::Ready(()); + } else if !buffer.is_empty() { + let cap = cmp::min(buffer.len(), CHUNK_SIZE); + stream.reserve_capacity(cap); + } else { + this.buffer.take(); + } + } + Some(Err(e)) => { - error!("Response payload stream error: {:?}", e); + warn!("{:?}", e); + return Poll::Ready(()); + } + }, + + None => match ready!(body.as_mut().poll_next(cx)) { + None => { + if let Err(e) = stream.send_data(Bytes::new(), true) { + warn!("{:?}", e); + } + return Poll::Ready(()); + } + + Some(Ok(chunk)) => { + stream + .reserve_capacity(cmp::min(chunk.len(), CHUNK_SIZE)); + *this.buffer = Some(chunk); + } + + Some(Err(err)) => { + error!("Response payload stream error: {:?}", err); + return Poll::Ready(()); } }, diff --git a/actix-http/src/h2/service.rs b/actix-http/src/h2/service.rs index 1a0b8c7f5..a75abef7d 100644 --- a/actix-http/src/h2/service.rs +++ b/actix-http/src/h2/service.rs @@ -40,7 +40,9 @@ where S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, + B: MessageBody + 'static, + B::Error: Into, { /// Create new `H2Service` instance with config. pub(crate) fn with_config>( @@ -69,7 +71,9 @@ where S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, + B: MessageBody + 'static, + B::Error: Into, { /// Create plain TCP based service pub fn tcp( @@ -106,7 +110,9 @@ mod openssl { S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, + B: MessageBody + 'static, + B::Error: Into, { /// Create OpenSSL based service pub fn openssl( @@ -150,7 +156,9 @@ mod rustls { S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, + B: MessageBody + 'static, + B::Error: Into, { /// Create Rustls based service pub fn rustls( @@ -185,12 +193,15 @@ mod rustls { impl ServiceFactory<(T, Option)> for H2Service where T: AsyncRead + AsyncWrite + Unpin + 'static, + S: ServiceFactory, S::Future: 'static, S::Error: Into + 'static, S::Response: Into> + 'static, >::Future: 'static, + B: MessageBody + 'static, + B::Error: Into, { type Response = (); type Error = DispatchError; @@ -252,6 +263,7 @@ where S::Future: 'static, S::Response: Into> + 'static, B: MessageBody + 'static, + B::Error: Into, { type Response = (); type Error = DispatchError; @@ -316,6 +328,7 @@ where S::Future: 'static, S::Response: Into> + 'static, B: MessageBody, + B::Error: Into, { type Output = Result<(), DispatchError>; diff --git a/actix-http/src/header/shared/charset.rs b/actix-http/src/header/shared/charset.rs index 36bdbf7e2..b482f6bce 100644 --- a/actix-http/src/header/shared/charset.rs +++ b/actix-http/src/header/shared/charset.rs @@ -104,7 +104,7 @@ impl Display for Charset { impl FromStr for Charset { type Err = crate::Error; - fn from_str(s: &str) -> crate::Result { + fn from_str(s: &str) -> Result { Ok(match s.to_ascii_uppercase().as_ref() { "US-ASCII" => Us_Ascii, "ISO-8859-1" => Iso_8859_1, diff --git a/actix-http/src/lib.rs b/actix-http/src/lib.rs index 82d0415c2..7c2c3b4e3 100644 --- a/actix-http/src/lib.rs +++ b/actix-http/src/lib.rs @@ -54,7 +54,7 @@ pub mod ws; pub use self::builder::HttpServiceBuilder; pub use self::config::{KeepAlive, ServiceConfig}; -pub use self::error::{Error, ResponseError, Result}; +pub use self::error::{Error, ResponseError}; pub use self::extensions::Extensions; pub use self::header::ContentEncoding; pub use self::http_message::HttpMessage; diff --git a/actix-http/src/macros.rs b/actix-http/src/macros.rs index 7cf0e288b..be8e63d6e 100644 --- a/actix-http/src/macros.rs +++ b/actix-http/src/macros.rs @@ -15,8 +15,15 @@ macro_rules! downcast_get_type_id { /// making it impossible for safe code to construct outside of /// this module. This ensures that safe code cannot violate /// type-safety by implementing this method. + /// + /// We also take `PrivateHelper` as a parameter, to ensure that + /// safe code cannot obtain a `PrivateHelper` instance by + /// delegating to an existing implementation of `__private_get_type_id__` #[doc(hidden)] - fn __private_get_type_id__(&self) -> (std::any::TypeId, PrivateHelper) + fn __private_get_type_id__( + &self, + _: PrivateHelper, + ) -> (std::any::TypeId, PrivateHelper) where Self: 'static, { @@ -39,7 +46,9 @@ macro_rules! downcast { impl dyn $name + 'static { /// Downcasts generic body to a specific type. pub fn downcast_ref(&self) -> Option<&T> { - if self.__private_get_type_id__().0 == std::any::TypeId::of::() { + if self.__private_get_type_id__(PrivateHelper(())).0 + == std::any::TypeId::of::() + { // SAFETY: external crates cannot override the default // implementation of `__private_get_type_id__`, since // it requires returning a private type. We can therefore @@ -53,7 +62,9 @@ macro_rules! downcast { /// Downcasts a generic body to a mutable specific type. pub fn downcast_mut(&mut self) -> Option<&mut T> { - if self.__private_get_type_id__().0 == std::any::TypeId::of::() { + if self.__private_get_type_id__(PrivateHelper(())).0 + == std::any::TypeId::of::() + { // SAFETY: external crates cannot override the default // implementation of `__private_get_type_id__`, since // it requires returning a private type. We can therefore diff --git a/actix-http/src/message.rs b/actix-http/src/message.rs index 8cb99d43a..0a3f3a915 100644 --- a/actix-http/src/message.rs +++ b/actix-http/src/message.rs @@ -293,14 +293,14 @@ impl ResponseHead { } } - #[inline] /// Check if keep-alive is enabled + #[inline] pub fn keep_alive(&self) -> bool { self.connection_type() == ConnectionType::KeepAlive } - #[inline] /// Check upgrade status of this message + #[inline] pub fn upgrade(&self) -> bool { self.connection_type() == ConnectionType::Upgrade } @@ -389,12 +389,6 @@ impl BoxedResponseHead { pub fn new(status: StatusCode) -> Self { RESPONSE_POOL.with(|p| p.get_message(status)) } - - pub(crate) fn take(&mut self) -> Self { - BoxedResponseHead { - head: self.head.take(), - } - } } impl std::ops::Deref for BoxedResponseHead { diff --git a/actix-http/src/response.rs b/actix-http/src/response.rs index 4d2c550e1..bde17edf2 100644 --- a/actix-http/src/response.rs +++ b/actix-http/src/response.rs @@ -2,17 +2,13 @@ use std::{ cell::{Ref, RefMut}, - fmt, - future::Future, - pin::Pin, - str, - task::{Context, Poll}, + fmt, str, }; use bytes::{Bytes, BytesMut}; use crate::{ - body::{Body, MessageBody, ResponseBody}, + body::{Body, MessageBody}, error::Error, extensions::Extensions, http::{HeaderMap, StatusCode}, @@ -23,22 +19,20 @@ use crate::{ /// An HTTP response. pub struct Response { pub(crate) head: BoxedResponseHead, - pub(crate) body: ResponseBody, - pub(crate) error: Option, + pub(crate) body: B, } impl Response { - /// Constructs a response + /// Constructs a new response with default body. #[inline] pub fn new(status: StatusCode) -> Response { Response { head: BoxedResponseHead::new(status), - body: ResponseBody::Body(Body::Empty), - error: None, + body: Body::Empty, } } - /// Create HTTP response builder with specific status. + /// Constructs a new response builder. #[inline] pub fn build(status: StatusCode) -> ResponseBuilder { ResponseBuilder::new(status) @@ -47,25 +41,25 @@ impl Response { // just a couple frequently used shortcuts // this list should not grow larger than a few - /// Creates a new response with status 200 OK. + /// Constructs a new response with status 200 OK. #[inline] pub fn ok() -> Response { Response::new(StatusCode::OK) } - /// Creates a new response with status 400 Bad Request. + /// Constructs a new response with status 400 Bad Request. #[inline] pub fn bad_request() -> Response { Response::new(StatusCode::BAD_REQUEST) } - /// Creates a new response with status 404 Not Found. + /// Constructs a new response with status 404 Not Found. #[inline] pub fn not_found() -> Response { Response::new(StatusCode::NOT_FOUND) } - /// Creates a new response with status 500 Internal Server Error. + /// Constructs a new response with status 500 Internal Server Error. #[inline] pub fn internal_server_error() -> Response { Response::new(StatusCode::INTERNAL_SERVER_ERROR) @@ -73,176 +67,149 @@ impl Response { // end shortcuts - /// Constructs an error response + /// Constructs a new response from an error. #[inline] pub fn from_error(error: Error) -> Response { - let mut resp = error.as_response_error().error_response(); + let resp = error.as_response_error().error_response(); if resp.head.status == StatusCode::INTERNAL_SERVER_ERROR { debug!("Internal Server Error: {:?}", error); } - resp.error = Some(error); resp } - - /// Convert response to response with body - pub fn into_body(self) -> Response { - let b = match self.body { - ResponseBody::Body(b) => b, - ResponseBody::Other(b) => b, - }; - Response { - head: self.head, - error: self.error, - body: ResponseBody::Other(b), - } - } } impl Response { - /// Constructs a response with body + /// Constructs a new response with given body. #[inline] pub fn with_body(status: StatusCode, body: B) -> Response { Response { head: BoxedResponseHead::new(status), - body: ResponseBody::Body(body), - error: None, + body: body, } } + /// Returns a reference to the head of this response. #[inline] - /// Http message part of the response pub fn head(&self) -> &ResponseHead { &*self.head } + /// Returns a mutable reference to the head of this response. #[inline] - /// Mutable reference to a HTTP message part of the response pub fn head_mut(&mut self) -> &mut ResponseHead { &mut *self.head } - /// The source `error` for this response - #[inline] - pub fn error(&self) -> Option<&Error> { - self.error.as_ref() - } - - /// Get the response status code + /// Returns the status code of this response. #[inline] pub fn status(&self) -> StatusCode { self.head.status } - /// Set the `StatusCode` for this response + /// Returns a mutable reference the status code of this response. #[inline] pub fn status_mut(&mut self) -> &mut StatusCode { &mut self.head.status } - /// Get the headers from the response + /// Returns a reference to response headers. #[inline] pub fn headers(&self) -> &HeaderMap { &self.head.headers } - /// Get a mutable reference to the headers + /// Returns a mutable reference to response headers. #[inline] pub fn headers_mut(&mut self) -> &mut HeaderMap { &mut self.head.headers } - /// Connection upgrade status + /// Returns true if connection upgrade is enabled. #[inline] pub fn upgrade(&self) -> bool { self.head.upgrade() } - /// Keep-alive status for this connection + /// Returns true if keep-alive is enabled. pub fn keep_alive(&self) -> bool { self.head.keep_alive() } - /// Responses extensions + /// Returns a reference to the extensions of this response. #[inline] pub fn extensions(&self) -> Ref<'_, Extensions> { self.head.extensions.borrow() } - /// Mutable reference to a the response's extensions + /// Returns a mutable reference to the extensions of this response. #[inline] pub fn extensions_mut(&mut self) -> RefMut<'_, Extensions> { self.head.extensions.borrow_mut() } - /// Get body of this response + /// Returns a reference to the body of this response. #[inline] - pub fn body(&self) -> &ResponseBody { + pub fn body(&self) -> &B { &self.body } - /// Set a body + /// Sets new body. pub fn set_body(self, body: B2) -> Response { Response { head: self.head, - body: ResponseBody::Body(body), - error: None, + body, } } - /// Split response and body - pub fn into_parts(self) -> (Response<()>, ResponseBody) { - ( - Response { - head: self.head, - body: ResponseBody::Body(()), - error: self.error, - }, - self.body, - ) - } - - /// Drop request's body + /// Drops body and returns new response. pub fn drop_body(self) -> Response<()> { - Response { - head: self.head, - body: ResponseBody::Body(()), - error: None, - } + self.set_body(()) } - /// Set a body and return previous body value - pub(crate) fn replace_body(self, body: B2) -> (Response, ResponseBody) { + /// Sets new body, returning new response and previous body value. + pub(crate) fn replace_body(self, body: B2) -> (Response, B) { ( Response { head: self.head, - body: ResponseBody::Body(body), - error: self.error, + body, }, self.body, ) } - /// Set a body and return previous body value + /// Returns split head and body. + /// + /// # Implementation Notes + /// Due to internal performance optimisations, the first element of the returned tuple is a + /// `Response` as well but only contains the head of the response this was called on. + pub fn into_parts(self) -> (Response<()>, B) { + self.replace_body(()) + } + + /// Returns new response with mapped body. pub fn map_body(mut self, f: F) -> Response where - F: FnOnce(&mut ResponseHead, ResponseBody) -> ResponseBody, + F: FnOnce(&mut ResponseHead, B) -> B2, { let body = f(&mut self.head, self.body); Response { - body, head: self.head, - error: self.error, + body, } } - /// Extract response body - pub fn take_body(&mut self) -> ResponseBody { - self.body.take_body() + /// Returns body, consuming this response. + pub fn into_body(self) -> B { + self.body } } -impl fmt::Debug for Response { +impl fmt::Debug for Response +where + B: MessageBody, + B::Error: Into, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let res = writeln!( f, @@ -260,19 +227,13 @@ impl fmt::Debug for Response { } } -impl Future for Response { - type Output = Result, Error>; - - fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - Poll::Ready(Ok(Response { - head: self.head.take(), - body: self.body.take_body(), - error: self.error.take(), - })) +impl Default for Response { + #[inline] + fn default() -> Response { + Response::with_body(StatusCode::default(), B::default()) } } -/// Helper converters impl>, E: Into> From> for Response { fn from(res: Result) -> Self { match res { diff --git a/actix-http/src/response_builder.rs b/actix-http/src/response_builder.rs index 0105f70cf..df9079d70 100644 --- a/actix-http/src/response_builder.rs +++ b/actix-http/src/response_builder.rs @@ -13,7 +13,7 @@ use bytes::Bytes; use futures_core::Stream; use crate::{ - body::{Body, BodyStream, ResponseBody}, + body::{Body, BodyStream}, error::{Error, HttpError}, header::{self, IntoHeaderPair, IntoHeaderValue}, message::{BoxedResponseHead, ConnectionType, ResponseHead}, @@ -38,10 +38,11 @@ use crate::{ /// .body("1234"); /// /// assert_eq!(res.status(), StatusCode::OK); -/// assert_eq!(body::to_bytes(res.take_body()).await.unwrap(), &b"1234"[..]); /// /// assert!(res.headers().contains_key("server")); /// assert_eq!(res.headers().get_all("set-cookie").count(), 2); +/// +/// assert_eq!(body::to_bytes(res.into_body()).await.unwrap(), &b"1234"[..]); /// # }) /// ``` pub struct ResponseBuilder { @@ -236,23 +237,19 @@ impl ResponseBuilder { #[inline] pub fn body>(&mut self, body: B) -> Response { self.message_body(body.into()) + .unwrap_or_else(Response::from_error) } /// Generate response with a body. /// /// This `ResponseBuilder` will be left in a useless state. - pub fn message_body(&mut self, body: B) -> Response { - if let Some(e) = self.err.take() { - return Response::from(Error::from(e)).into_body(); + pub fn message_body(&mut self, body: B) -> Result, Error> { + if let Some(err) = self.err.take() { + return Err(err.into()); } - let response = self.head.take().expect("cannot reuse response builder"); - - Response { - head: response, - body: ResponseBody::Body(body), - error: None, - } + let head = self.head.take().expect("cannot reuse response builder"); + Ok(Response { head, body }) } /// Generate response with a streaming body. diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index ff4b49f1d..d25a67a19 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -59,6 +59,7 @@ where S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, + B::Error: Into, { /// Create new `HttpService` instance. pub fn new>(service: F) -> Self { @@ -157,6 +158,7 @@ where >::Future: 'static, B: MessageBody + 'static, + B::Error: Into, X: ServiceFactory, X::Future: 'static, @@ -208,6 +210,7 @@ mod openssl { >::Future: 'static, B: MessageBody + 'static, + B::Error: Into, X: ServiceFactory, X::Future: 'static, @@ -275,6 +278,7 @@ mod rustls { >::Future: 'static, B: MessageBody + 'static, + B::Error: Into, X: ServiceFactory, X::Future: 'static, @@ -339,6 +343,7 @@ where >::Future: 'static, B: MessageBody + 'static, + B::Error: Into, X: ServiceFactory, X::Future: 'static, @@ -465,13 +470,18 @@ impl Service<(T, Protocol, Option)> for HttpServiceHandler where T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Error: Into + 'static, S::Future: 'static, S::Response: Into> + 'static, + B: MessageBody + 'static, + B::Error: Into, + X: Service, X::Error: Into, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display + Into, { @@ -522,13 +532,18 @@ where #[pin_project(project = StateProj)] enum State where + T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Future: 'static, S::Error: Into, - T: AsyncRead + AsyncWrite + Unpin, + B: MessageBody, + B::Error: Into, + X: Service, X::Error: Into, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { @@ -549,13 +564,18 @@ where pub struct HttpServiceHandlerResponse where T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Error: Into + 'static, S::Future: 'static, S::Response: Into> + 'static, - B: MessageBody + 'static, + + B: MessageBody, + B::Error: Into, + X: Service, X::Error: Into, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { @@ -566,13 +586,18 @@ where impl Future for HttpServiceHandlerResponse where T: AsyncRead + AsyncWrite + Unpin, + S: Service, S::Error: Into + 'static, S::Future: 'static, S::Response: Into> + 'static, - B: MessageBody, + + B: MessageBody + 'static, + B::Error: Into, + X: Service, X::Error: Into, + U: Service<(Request, Framed), Response = ()>, U::Error: fmt::Display, { diff --git a/actix-http/tests/test_ws.rs b/actix-http/tests/test_ws.rs index 72870bab5..bf1ca9385 100644 --- a/actix-http/tests/test_ws.rs +++ b/actix-http/tests/test_ws.rs @@ -52,7 +52,7 @@ where fn call(&self, (req, mut framed): (Request, Framed)) -> Self::Future { let fut = async move { - let res = ws::handshake(req.head()).unwrap().message_body(()); + let res = ws::handshake(req.head()).unwrap().message_body(()).unwrap(); framed .send((res, body::BodySize::None).into()) diff --git a/actix-test/src/lib.rs b/actix-test/src/lib.rs index 8fab33289..5d85c2687 100644 --- a/actix-test/src/lib.rs +++ b/actix-test/src/lib.rs @@ -86,6 +86,7 @@ where S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, + B::Error: Into, { start_with(TestServerConfig::default(), factory) } @@ -125,6 +126,7 @@ where S::Response: Into> + 'static, >::Future: 'static, B: MessageBody + 'static, + B::Error: Into, { let (tx, rx) = mpsc::channel(); diff --git a/src/app_service.rs b/src/app_service.rs index 32c779a32..ca6f36202 100644 --- a/src/app_service.rs +++ b/src/app_service.rs @@ -166,8 +166,7 @@ impl AppInitServiceState { Rc::new(AppInitServiceState { rmap, config, - // TODO: AppConfig can be used to pass user defined HttpRequestPool - // capacity. + // TODO: AppConfig can be used to pass user defined HttpRequestPool capacity. pool: HttpRequestPool::default(), }) } diff --git a/src/error.rs b/src/error.rs index cc1a055b8..a5a245693 100644 --- a/src/error.rs +++ b/src/error.rs @@ -9,6 +9,11 @@ use url::ParseError as UrlParseError; use crate::http::StatusCode; +/// A convenience [`Result`](std::result::Result) for Actix Web operations. +/// +/// This type alias is generally used to avoid writing out `actix_http::Error` directly. +pub type Result = std::result::Result; + /// Errors which can occur when attempting to generate resource uri. #[derive(Debug, PartialEq, Display, Error, From)] #[non_exhaustive] @@ -26,7 +31,6 @@ pub enum UrlGenerationError { ParseError(UrlParseError), } -/// `InternalServerError` for `UrlGeneratorError` impl ResponseError for UrlGenerationError {} /// A set of errors that can occur during parsing urlencoded payloads @@ -70,7 +74,6 @@ pub enum UrlencodedError { Payload(PayloadError), } -/// Return `BadRequest` for `UrlencodedError` impl ResponseError for UrlencodedError { fn status_code(&self) -> StatusCode { match self { @@ -149,7 +152,6 @@ pub enum QueryPayloadError { Deserialize(serde::de::value::Error), } -/// Return `BadRequest` for `QueryPayloadError` impl ResponseError for QueryPayloadError { fn status_code(&self) -> StatusCode { StatusCode::BAD_REQUEST @@ -177,7 +179,6 @@ pub enum ReadlinesError { ContentTypeError(ContentTypeError), } -/// Return `BadRequest` for `ReadlinesError` impl ResponseError for ReadlinesError { fn status_code(&self) -> StatusCode { match *self { diff --git a/src/lib.rs b/src/lib.rs index 4d0ad26ed..96e6ecbf8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -97,7 +97,7 @@ pub(crate) mod types; pub mod web; pub use actix_http::Response as BaseHttpResponse; -pub use actix_http::{body, Error, HttpMessage, ResponseError, Result}; +pub use actix_http::{body, Error, HttpMessage, ResponseError}; #[doc(inline)] pub use actix_rt as rt; pub use actix_web_codegen::*; @@ -105,6 +105,7 @@ pub use actix_web_codegen::*; pub use cookie; pub use crate::app::App; +pub use crate::error::Result; pub use crate::extract::FromRequest; pub use crate::request::HttpRequest; pub use crate::resource::Resource; diff --git a/src/middleware/compat.rs b/src/middleware/compat.rs index 0e3a4f2b7..4f2f2a504 100644 --- a/src/middleware/compat.rs +++ b/src/middleware/compat.rs @@ -1,12 +1,13 @@ //! For middleware documentation, see [`Compat`]. use std::{ + error::Error as StdError, future::Future, pin::Pin, task::{Context, Poll}, }; -use actix_http::body::{Body, MessageBody, ResponseBody}; +use actix_http::body::{Body, MessageBody}; use actix_service::{Service, Transform}; use futures_core::{future::LocalBoxFuture, ready}; @@ -113,9 +114,13 @@ pub trait MapServiceResponseBody { fn map_body(self) -> ServiceResponse; } -impl MapServiceResponseBody for ServiceResponse { +impl MapServiceResponseBody for ServiceResponse +where + B: MessageBody + Unpin + 'static, + B::Error: Into>, +{ fn map_body(self) -> ServiceResponse { - self.map_body(|_, body| ResponseBody::Other(Body::from_message(body))) + self.map_body(|_, body| Body::from_message(body)) } } diff --git a/src/middleware/compress.rs b/src/middleware/compress.rs index 6a56e6de0..f8514c7cc 100644 --- a/src/middleware/compress.rs +++ b/src/middleware/compress.rs @@ -10,7 +10,7 @@ use std::{ }; use actix_http::{ - body::MessageBody, + body::{MessageBody, ResponseBody}, encoding::Encoder, http::header::{ContentEncoding, ACCEPT_ENCODING}, Error, @@ -59,7 +59,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Response = ServiceResponse>; + type Response = ServiceResponse>>; type Error = Error; type Transform = CompressMiddleware; type InitError = (); @@ -83,7 +83,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Response = ServiceResponse>; + type Response = ServiceResponse>>; type Error = Error; type Future = CompressResponse; @@ -127,7 +127,7 @@ where B: MessageBody, S: Service, Error = Error>, { - type Output = Result>, Error>; + type Output = Result>>, Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); @@ -140,9 +140,9 @@ where *this.encoding }; - Poll::Ready(Ok( - resp.map_body(move |head, body| Encoder::response(enc, head, body)) - )) + Poll::Ready(Ok(resp.map_body(move |head, body| { + Encoder::response(enc, head, ResponseBody::Body(body)) + }))) } Err(e) => Poll::Ready(Err(e)), } diff --git a/src/middleware/logger.rs b/src/middleware/logger.rs index 40ed9258f..bbb0e3dc4 100644 --- a/src/middleware/logger.rs +++ b/src/middleware/logger.rs @@ -21,11 +21,10 @@ use regex::{Regex, RegexSet}; use time::OffsetDateTime; use crate::{ - dev::{BodySize, MessageBody, ResponseBody}, - error::{Error, Result}, + dev::{BodySize, MessageBody}, http::{HeaderName, StatusCode}, service::{ServiceRequest, ServiceResponse}, - HttpResponse, + Error, HttpResponse, Result, }; /// Middleware for logging request and response summaries to the terminal. @@ -290,13 +289,11 @@ where let time = *this.time; let format = this.format.take(); - Poll::Ready(Ok(res.map_body(move |_, body| { - ResponseBody::Body(StreamLog { - body, - time, - format, - size: 0, - }) + Poll::Ready(Ok(res.map_body(move |_, body| StreamLog { + body, + time, + format, + size: 0, }))) } } @@ -306,7 +303,7 @@ use pin_project::{pin_project, pinned_drop}; #[pin_project(PinnedDrop)] pub struct StreamLog { #[pin] - body: ResponseBody, + body: B, format: Option, size: usize, time: OffsetDateTime, @@ -327,7 +324,13 @@ impl PinnedDrop for StreamLog { } } -impl MessageBody for StreamLog { +impl MessageBody for StreamLog +where + B: MessageBody, + B::Error: Into, +{ + type Error = Error; + fn size(&self) -> BodySize { self.body.size() } @@ -335,14 +338,17 @@ impl MessageBody for StreamLog { fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { + ) -> Poll>> { let this = self.project(); - match this.body.poll_next(cx) { - Poll::Ready(Some(Ok(chunk))) => { + + // TODO: MSRV 1.51: poll_map_err + match ready!(this.body.poll_next(cx)) { + Some(Ok(chunk)) => { *this.size += chunk.len(); Poll::Ready(Some(Ok(chunk))) } - val => val, + Some(Err(err)) => Poll::Ready(Some(Err(err.into()))), + None => Poll::Ready(None), } } } diff --git a/src/responder.rs b/src/responder.rs index 7b8288ed8..2393d046b 100644 --- a/src/responder.rs +++ b/src/responder.rs @@ -264,7 +264,7 @@ pub(crate) mod tests { let resp = srv.call(req).await.unwrap(); assert_eq!(resp.status(), StatusCode::OK); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + Body::Bytes(ref b) => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"some")); } @@ -277,16 +277,28 @@ pub(crate) mod tests { fn body(&self) -> &Body; } + impl BodyTest for Body { + fn bin_ref(&self) -> &[u8] { + match self { + Body::Bytes(ref bin) => &bin, + _ => unreachable!("bug in test impl"), + } + } + fn body(&self) -> &Body { + self + } + } + impl BodyTest for ResponseBody { fn bin_ref(&self) -> &[u8] { match self { ResponseBody::Body(ref b) => match b { Body::Bytes(ref bin) => &bin, - _ => panic!(), + _ => unreachable!("bug in test impl"), }, ResponseBody::Other(ref b) => match b { Body::Bytes(ref bin) => &bin, - _ => panic!(), + _ => unreachable!("bug in test impl"), }, } } diff --git a/src/response/builder.rs b/src/response/builder.rs index 8b3c0f10d..b9a10c56b 100644 --- a/src/response/builder.rs +++ b/src/response/builder.rs @@ -310,16 +310,19 @@ impl HttpResponseBuilder { /// /// `HttpResponseBuilder` can not be used after this call. #[inline] - pub fn body>(&mut self, body: B) -> HttpResponse { - self.message_body(body.into()) + pub fn body>(&mut self, body: B) -> HttpResponse { + match self.message_body(body.into()) { + Ok(res) => res, + Err(err) => HttpResponse::from_error(err), + } } /// Set a body and generate `Response`. /// /// `HttpResponseBuilder` can not be used after this call. - pub fn message_body(&mut self, body: B) -> HttpResponse { + pub fn message_body(&mut self, body: B) -> Result, Error> { if let Some(err) = self.err.take() { - return HttpResponse::from_error(Error::from(err)).into_body(); + return Err(err.into()); } let res = self @@ -336,12 +339,12 @@ impl HttpResponseBuilder { for cookie in jar.delta() { match HeaderValue::from_str(&cookie.to_string()) { Ok(val) => res.headers_mut().append(header::SET_COOKIE, val), - Err(err) => return HttpResponse::from_error(Error::from(err)).into_body(), + Err(err) => return Err(err.into()), }; } } - res + Ok(res) } /// Set a streaming body and generate `Response`. @@ -422,7 +425,6 @@ impl Future for HttpResponseBuilder { type Output = Result; fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - eprintln!("httpresponse future error"); Poll::Ready(Ok(self.finish())) } } @@ -478,42 +480,42 @@ mod tests { #[actix_rt::test] async fn test_json() { - let mut resp = HttpResponse::Ok().json(vec!["v1", "v2", "v3"]); + let resp = HttpResponse::Ok().json(vec!["v1", "v2", "v3"]); let ct = resp.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("application/json")); assert_eq!( - body::to_bytes(resp.take_body()).await.unwrap().as_ref(), + body::to_bytes(resp.into_body()).await.unwrap().as_ref(), br#"["v1","v2","v3"]"# ); - let mut resp = HttpResponse::Ok().json(&["v1", "v2", "v3"]); + let resp = HttpResponse::Ok().json(&["v1", "v2", "v3"]); let ct = resp.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("application/json")); assert_eq!( - body::to_bytes(resp.take_body()).await.unwrap().as_ref(), + body::to_bytes(resp.into_body()).await.unwrap().as_ref(), br#"["v1","v2","v3"]"# ); // content type override - let mut resp = HttpResponse::Ok() + let resp = HttpResponse::Ok() .insert_header((CONTENT_TYPE, "text/json")) .json(&vec!["v1", "v2", "v3"]); let ct = resp.headers().get(CONTENT_TYPE).unwrap(); assert_eq!(ct, HeaderValue::from_static("text/json")); assert_eq!( - body::to_bytes(resp.take_body()).await.unwrap().as_ref(), + body::to_bytes(resp.into_body()).await.unwrap().as_ref(), br#"["v1","v2","v3"]"# ); } #[actix_rt::test] async fn test_serde_json_in_body() { - let mut resp = HttpResponse::Ok().body( + let resp = HttpResponse::Ok().body( serde_json::to_vec(&serde_json::json!({ "test-key": "test-value" })).unwrap(), ); assert_eq!( - body::to_bytes(resp.take_body()).await.unwrap().as_ref(), + body::to_bytes(resp.into_body()).await.unwrap().as_ref(), br#"{"test-key":"test-value"}"# ); } diff --git a/src/response/response.rs b/src/response/response.rs index 31868fe0b..194e2dff8 100644 --- a/src/response/response.rs +++ b/src/response/response.rs @@ -8,7 +8,7 @@ use std::{ }; use actix_http::{ - body::{Body, MessageBody, ResponseBody}, + body::{Body, MessageBody}, http::{header::HeaderMap, StatusCode}, Extensions, Response, ResponseHead, }; @@ -27,7 +27,7 @@ use crate::{error::Error, HttpResponseBuilder}; /// An HTTP Response pub struct HttpResponse { res: Response, - error: Option, + pub(crate) error: Option, } impl HttpResponse { @@ -56,14 +56,6 @@ impl HttpResponse { error: Some(error), } } - - /// Convert response to response with body - pub fn into_body(self) -> HttpResponse { - HttpResponse { - res: self.res.into_body(), - error: self.error, - } - } } impl HttpResponse { @@ -192,7 +184,7 @@ impl HttpResponse { /// Get body of this response #[inline] - pub fn body(&self) -> &ResponseBody { + pub fn body(&self) -> &B { self.res.body() } @@ -206,7 +198,7 @@ impl HttpResponse { } /// Split response and body - pub fn into_parts(self) -> (HttpResponse<()>, ResponseBody) { + pub fn into_parts(self) -> (HttpResponse<()>, B) { let (head, body) = self.res.into_parts(); ( @@ -229,7 +221,7 @@ impl HttpResponse { /// Set a body and return previous body value pub fn map_body(self, f: F) -> HttpResponse where - F: FnOnce(&mut ResponseHead, ResponseBody) -> ResponseBody, + F: FnOnce(&mut ResponseHead, B) -> B2, { HttpResponse { res: self.res.map_body(f), @@ -238,12 +230,16 @@ impl HttpResponse { } /// Extract response body - pub fn take_body(&mut self) -> ResponseBody { - self.res.take_body() + pub fn into_body(self) -> B { + self.res.into_body() } } -impl fmt::Debug for HttpResponse { +impl fmt::Debug for HttpResponse +where + B: MessageBody, + B::Error: Into, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("HttpResponse") .field("error", &self.error) @@ -270,20 +266,25 @@ impl From> for Response { // TODO: expose cause somewhere? // if let Some(err) = res.error { - // eprintln!("impl From> for Response let Some(err)"); - // return Response::from_error(err).into_body(); + // return Response::from_error(err); // } res.res } } -impl Future for HttpResponse { +// Future is only implemented for Body payload type because it's the most useful for making simple +// handlers without async blocks. Making it generic over all MessageBody types requires a future +// impl on Response which would cause it's body field to be, undesirably, Option. +// +// This impl is not particularly efficient due to the Response construction and should probably +// not be invoked if performance is important. Prefer an async fn/block in such cases. +impl Future for HttpResponse { type Output = Result, Error>; fn poll(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { if let Some(err) = self.error.take() { - return Poll::Ready(Ok(Response::from_error(err).into_body())); + return Poll::Ready(Err(err)); } Poll::Ready(Ok(mem::replace( diff --git a/src/scope.rs b/src/scope.rs index 3be6adb0c..412c01d95 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -578,7 +578,7 @@ mod tests { use actix_utils::future::ok; use bytes::Bytes; - use crate::dev::{Body, ResponseBody}; + use crate::dev::Body; use crate::http::{header, HeaderValue, Method, StatusCode}; use crate::middleware::DefaultHeaders; use crate::service::ServiceRequest; @@ -748,7 +748,7 @@ mod tests { assert_eq!(resp.status(), StatusCode::OK); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + Body::Bytes(ref b) => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"project: project1")); } @@ -849,7 +849,7 @@ mod tests { assert_eq!(resp.status(), StatusCode::CREATED); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + Body::Bytes(ref b) => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"project: project_1")); } @@ -877,7 +877,7 @@ mod tests { assert_eq!(resp.status(), StatusCode::CREATED); match resp.response().body() { - ResponseBody::Body(Body::Bytes(ref b)) => { + Body::Bytes(ref b) => { let bytes = b.clone(); assert_eq!(bytes, Bytes::from_static(b"project: test - 1")); } diff --git a/src/server.rs b/src/server.rs index 6577f4d1f..44ae6f880 100644 --- a/src/server.rs +++ b/src/server.rs @@ -81,6 +81,7 @@ where S::Service: 'static, // S::Service: 'static, B: MessageBody + 'static, + B::Error: Into, { /// Create new HTTP server with application factory pub fn new(factory: F) -> Self { @@ -173,6 +174,16 @@ where self } + /// Set max number of threads for each worker's blocking task thread pool. + /// + /// One thread pool is set up **per worker**; not shared across workers. + /// + /// By default set to 512 / workers. + pub fn worker_max_blocking_threads(mut self, num: usize) -> Self { + self.builder = self.builder.worker_max_blocking_threads(num); + self + } + /// Set server keep-alive setting. /// /// By default keep alive is set to a 5 seconds. diff --git a/src/service.rs b/src/service.rs index f6d1f9ebf..b7f244797 100644 --- a/src/service.rs +++ b/src/service.rs @@ -2,7 +2,7 @@ use std::cell::{Ref, RefMut}; use std::rc::Rc; use std::{fmt, net}; -use actix_http::body::{Body, MessageBody, ResponseBody}; +use actix_http::body::{Body, MessageBody}; use actix_http::http::{HeaderMap, Method, StatusCode, Uri, Version}; use actix_http::{ Error, Extensions, HttpMessage, Payload, PayloadStream, RequestHead, Response, ResponseHead, @@ -110,9 +110,9 @@ impl ServiceRequest { /// Create service response for error #[inline] - pub fn error_response>(self, err: E) -> ServiceResponse { + pub fn error_response>(self, err: E) -> ServiceResponse { let res = HttpResponse::from_error(err.into()); - ServiceResponse::new(self.req, res.into_body()) + ServiceResponse::new(self.req, res) } /// This method returns reference to the request head @@ -335,22 +335,24 @@ pub struct ServiceResponse { response: HttpResponse, } +impl ServiceResponse { + /// Create service response from the error + pub fn from_err>(err: E, request: HttpRequest) -> Self { + let response = HttpResponse::from_error(err.into()); + ServiceResponse { request, response } + } +} + impl ServiceResponse { /// Create service response instance pub fn new(request: HttpRequest, response: HttpResponse) -> Self { ServiceResponse { request, response } } - /// Create service response from the error - pub fn from_err>(err: E, request: HttpRequest) -> Self { - let response = HttpResponse::from_error(err.into()).into_body(); - ServiceResponse { request, response } - } - /// Create service response for error #[inline] - pub fn error_response>(self, err: E) -> Self { - Self::from_err(err, self.request) + pub fn error_response>(self, err: E) -> ServiceResponse { + ServiceResponse::from_err(err, self.request) } /// Create service response @@ -396,23 +398,18 @@ impl ServiceResponse { } /// Execute closure and in case of error convert it to response. - pub fn checked_expr(mut self, f: F) -> Self + pub fn checked_expr(mut self, f: F) -> Result where F: FnOnce(&mut Self) -> Result<(), E>, E: Into, { - match f(&mut self) { - Ok(_) => self, - Err(err) => { - let res = HttpResponse::from_error(err.into()); - ServiceResponse::new(self.request, res.into_body()) - } - } + f(&mut self).map_err(Into::into)?; + Ok(self) } /// Extract response body - pub fn take_body(&mut self) -> ResponseBody { - self.response.take_body() + pub fn into_body(self) -> B { + self.response.into_body() } } @@ -420,7 +417,7 @@ impl ServiceResponse { /// Set a new body pub fn map_body(self, f: F) -> ServiceResponse where - F: FnOnce(&mut ResponseHead, ResponseBody) -> ResponseBody, + F: FnOnce(&mut ResponseHead, B) -> B2, { let response = self.response.map_body(f); @@ -443,7 +440,11 @@ impl From> for Response { } } -impl fmt::Debug for ServiceResponse { +impl fmt::Debug for ServiceResponse +where + B: MessageBody, + B::Error: Into, +{ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let res = writeln!( f, diff --git a/src/test.rs b/src/test.rs index c2e456e58..de97dc8aa 100644 --- a/src/test.rs +++ b/src/test.rs @@ -4,13 +4,14 @@ use std::{net::SocketAddr, rc::Rc}; pub use actix_http::test::TestBuffer; use actix_http::{ + body, http::{header::IntoHeaderPair, Method, StatusCode, Uri, Version}, test::TestRequest as HttpTestRequest, Extensions, Request, }; use actix_router::{Path, ResourceDef, Url}; use actix_service::{IntoService, IntoServiceFactory, Service, ServiceFactory}; -use actix_utils::future::ok; +use actix_utils::future::{ok, poll_fn}; use futures_core::Stream; use futures_util::StreamExt as _; use serde::{de::DeserializeOwned, Serialize}; @@ -151,17 +152,19 @@ pub async fn read_response(app: &S, req: Request) -> Bytes where S: Service, Error = Error>, B: MessageBody + Unpin, + B::Error: Into, { - let mut resp = app + let resp = app .call(req) .await .unwrap_or_else(|e| panic!("read_response failed at application call: {}", e)); - let mut body = resp.take_body(); + let body = resp.into_body(); let mut bytes = BytesMut::new(); - while let Some(item) = body.next().await { - bytes.extend_from_slice(&item.unwrap()); + actix_rt::pin!(body); + while let Some(item) = poll_fn(|cx| body.as_mut().poll_next(cx)).await { + bytes.extend_from_slice(&item.map_err(Into::into).unwrap()); } bytes.freeze() @@ -193,15 +196,19 @@ where /// assert_eq!(result, Bytes::from_static(b"welcome!")); /// } /// ``` -pub async fn read_body(mut res: ServiceResponse) -> Bytes +pub async fn read_body(res: ServiceResponse) -> Bytes where B: MessageBody + Unpin, + B::Error: Into, { - let mut body = res.take_body(); + let body = res.into_body(); let mut bytes = BytesMut::new(); - while let Some(item) = body.next().await { - bytes.extend_from_slice(&item.unwrap()); + + actix_rt::pin!(body); + while let Some(item) = poll_fn(|cx| body.as_mut().poll_next(cx)).await { + bytes.extend_from_slice(&item.map_err(Into::into).unwrap()); } + bytes.freeze() } @@ -245,6 +252,7 @@ where pub async fn read_body_json(res: ServiceResponse) -> T where B: MessageBody + Unpin, + B::Error: Into, T: DeserializeOwned, { let body = read_body(res).await; @@ -268,6 +276,14 @@ where Ok(data.freeze()) } +pub async fn load_body(body: B) -> Result +where + B: MessageBody + Unpin, + B::Error: Into, +{ + body::to_bytes(body).await.map_err(Into::into) +} + /// Helper function that returns a deserialized response body of a TestRequest /// /// ``` @@ -306,6 +322,7 @@ pub async fn read_response_json(app: &S, req: Request) -> T where S: Service, Error = Error>, B: MessageBody + Unpin, + B::Error: Into, T: DeserializeOwned, { let body = read_response(app, req).await; diff --git a/src/types/json.rs b/src/types/json.rs index 322e5cbf3..5762c6428 100644 --- a/src/types/json.rs +++ b/src/types/json.rs @@ -435,7 +435,7 @@ mod tests { header::{self, CONTENT_LENGTH, CONTENT_TYPE}, StatusCode, }, - test::{load_stream, TestRequest}, + test::{load_body, TestRequest}, }; #[derive(Serialize, Deserialize, PartialEq, Debug)] @@ -492,10 +492,10 @@ mod tests { .to_http_parts(); let s = Json::::from_request(&req, &mut pl).await; - let mut resp = HttpResponse::from_error(s.err().unwrap()); + let resp = HttpResponse::from_error(s.err().unwrap()); assert_eq!(resp.status(), StatusCode::BAD_REQUEST); - let body = load_stream(resp.take_body()).await.unwrap(); + let body = load_body(resp.into_body()).await.unwrap(); let msg: MyObject = serde_json::from_slice(&body).unwrap(); assert_eq!(msg.name, "invalid request"); } diff --git a/tests/test_server.rs b/tests/test_server.rs index 2760cc7fb..756c180fc 100644 --- a/tests/test_server.rs +++ b/tests/test_server.rs @@ -32,7 +32,7 @@ use rand::{distributions::Alphanumeric, Rng}; use actix_web::dev::BodyEncoding; use actix_web::middleware::{Compress, NormalizePath, TrailingSlash}; -use actix_web::{dev, web, App, Error, HttpResponse}; +use actix_web::{web, App, Error, HttpResponse}; const STR: &str = "Hello World Hello World Hello World Hello World Hello World \ Hello World Hello World Hello World Hello World Hello World \ @@ -160,9 +160,7 @@ async fn test_body_gzip2() { let srv = actix_test::start_with(actix_test::config().h1(), || { App::new() .wrap(Compress::new(ContentEncoding::Gzip)) - .service(web::resource("/").route(web::to(|| { - HttpResponse::Ok().body(STR).into_body::() - }))) + .service(web::resource("/").route(web::to(|| HttpResponse::Ok().body(STR)))) }); let mut response = srv @@ -903,7 +901,7 @@ async fn test_normalize() { let srv = actix_test::start_with(actix_test::config().h1(), || { App::new() .wrap(NormalizePath::new(TrailingSlash::Trim)) - .service(web::resource("/one").route(web::to(|| HttpResponse::Ok().finish()))) + .service(web::resource("/one").route(web::to(|| HttpResponse::Ok()))) }); let response = srv.get("/one/").send().await.unwrap();