force message body error types to be errors

This commit is contained in:
Rob Ede 2021-11-30 18:13:28 +00:00
parent aec0e46909
commit c3c59e40d1
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
20 changed files with 293 additions and 350 deletions

View File

@ -75,6 +75,7 @@ mod tests {
use derive_more::{Display, Error}; use derive_more::{Display, Error};
use futures_core::ready; use futures_core::ready;
use futures_util::{stream, FutureExt as _}; use futures_util::{stream, FutureExt as _};
use pin_project_lite::pin_project;
use static_assertions::{assert_impl_all, assert_not_impl_all}; use static_assertions::{assert_impl_all, assert_not_impl_all};
use super::*; use super::*;
@ -166,12 +167,14 @@ mod tests {
BodyStream::new(stream::iter(vec![Ok(Bytes::from("1")), Err(StreamErr)])); BodyStream::new(stream::iter(vec![Ok(Bytes::from("1")), Err(StreamErr)]));
assert!(matches!(to_bytes(body).await, Err(StreamErr))); assert!(matches!(to_bytes(body).await, Err(StreamErr)));
#[pin_project::pin_project(project = TimeDelayStreamProj)] pin_project! {
#[derive(Debug)] #[derive(Debug)]
enum TimeDelayStream { #[project = TimeDelayStreamProj]
Start, enum TimeDelayStream {
Sleep(Pin<Box<Sleep>>), Start,
Done, Sleep { delay: Pin<Box<Sleep>> },
Done,
}
} }
impl Stream for TimeDelayStream { impl Stream for TimeDelayStream {
@ -184,12 +187,14 @@ mod tests {
match self.as_mut().get_mut() { match self.as_mut().get_mut() {
TimeDelayStream::Start => { TimeDelayStream::Start => {
let sleep = sleep(Duration::from_millis(1)); let sleep = sleep(Duration::from_millis(1));
self.as_mut().set(TimeDelayStream::Sleep(Box::pin(sleep))); self.as_mut().set(TimeDelayStream::Sleep {
delay: Box::pin(sleep),
});
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
Poll::Pending Poll::Pending
} }
TimeDelayStream::Sleep(ref mut delay) => { TimeDelayStream::Sleep { ref mut delay } => {
ready!(delay.poll_unpin(cx)); ready!(delay.poll_unpin(cx));
self.set(TimeDelayStream::Done); self.set(TimeDelayStream::Done);
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();

View File

@ -2,6 +2,7 @@
use std::{ use std::{
convert::Infallible, convert::Infallible,
error::Error as StdError,
mem, mem,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
@ -16,7 +17,7 @@ use super::BodySize;
/// An interface types that can converted to bytes and used as response bodies. /// An interface types that can converted to bytes and used as response bodies.
// TODO: examples // TODO: examples
pub trait MessageBody { pub trait MessageBody {
type Error; type Error: Into<Box<dyn StdError>>;
/// Body size hint. /// Body size hint.
fn size(&self) -> BodySize; fn size(&self) -> BodySize;
@ -135,25 +136,6 @@ impl MessageBody for Bytes {
} }
} }
// impl<'a> MessageBody for &'a Bytes {
// type Error = Infallible;
// fn size(&self) -> BodySize {
// BodySize::Sized(self.len() as u64)
// }
// fn poll_next(
// self: Pin<&mut Self>,
// _cx: &mut Context<'_>,
// ) -> Poll<Option<Result<Bytes, Self::Error>>> {
// if self.is_empty() {
// Poll::Ready(None)
// } else {
// Poll::Ready(Some(Ok(self.clone())))
// }
// }
// }
impl MessageBody for BytesMut { impl MessageBody for BytesMut {
type Error = Infallible; type Error = Infallible;
@ -168,29 +150,31 @@ impl MessageBody for BytesMut {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(mem::take(self.get_mut()).freeze()))) let bytes = mem::take(self.get_mut()).freeze();
Poll::Ready(Some(Ok(bytes)))
} }
} }
} }
// impl<'a> MessageBody for &'a BytesMut { impl MessageBody for Vec<u8> {
// type Error = Infallible; type Error = Infallible;
// fn size(&self) -> BodySize { fn size(&self) -> BodySize {
// BodySize::Sized(self.len() as u64) BodySize::Sized(self.len() as u64)
// } }
// fn poll_next( fn poll_next(
// self: Pin<&mut Self>, self: Pin<&mut Self>,
// _cx: &mut Context<'_>, _cx: &mut Context<'_>,
// ) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
// if self.is_empty() { if self.is_empty() {
// Poll::Ready(None) Poll::Ready(None)
// } else { } else {
// Poll::Ready(Some(Ok(self.clone().freeze()))) let bytes = mem::take(self.get_mut());
// } Poll::Ready(Some(Ok(Bytes::from(bytes))))
// } }
// } }
}
impl MessageBody for &'static str { impl MessageBody for &'static str {
type Error = Infallible; type Error = Infallible;
@ -213,25 +197,6 @@ impl MessageBody for &'static str {
} }
} }
impl MessageBody for Vec<u8> {
type Error = Infallible;
fn size(&self) -> BodySize {
BodySize::Sized(self.len() as u64)
}
fn poll_next(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> {
if self.is_empty() {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(Bytes::from(mem::take(self.get_mut())))))
}
}
}
impl MessageBody for String { impl MessageBody for String {
type Error = Infallible; type Error = Infallible;
@ -246,51 +211,12 @@ impl MessageBody for String {
if self.is_empty() { if self.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(Bytes::from(mem::take(self.get_mut()))))) let string = mem::take(self.get_mut());
Poll::Ready(Some(Ok(Bytes::from(string))))
} }
} }
} }
// impl<'a> MessageBody for &'a String {
// type Error = Infallible;
// fn size(&self) -> BodySize {
// BodySize::Sized(self.len() as u64)
// }
// fn poll_next(
// self: Pin<&mut Self>,
// _cx: &mut Context<'_>,
// ) -> Poll<Option<Result<Bytes, Self::Error>>> {
// if self.is_empty() {
// Poll::Ready(None)
// } else {
// Poll::Ready(Some(Ok(Bytes::from(self.clone()))))
// }
// }
// }
// impl MessageBody for Cow<'_, str> {
// type Error = Infallible;
// fn size(&self) -> BodySize {
// BodySize::Sized(self.len() as u64)
// }
// fn poll_next(
// self: Pin<&mut Self>,
// cx: &mut Context<'_>,
// ) -> Poll<Option<Result<Bytes, Self::Error>>> {
// if self.is_empty() {
// Poll::Ready(None)
// } else {
// let cow = Pin::into_inner(self);
// let mut string = cow.clone().into_owned();
// Pin::new(&mut string).poll_next(cx)
// }
// }
// }
impl MessageBody for bytestring::ByteString { impl MessageBody for bytestring::ByteString {
type Error = Infallible; type Error = Infallible;
@ -307,8 +233,6 @@ impl MessageBody for bytestring::ByteString {
} }
} }
// TODO: ensure consistent impls of MessageBody that always terminate
pin_project! { pin_project! {
pub(crate) struct MessageBodyMapErr<B, F> { pub(crate) struct MessageBodyMapErr<B, F> {
#[pin] #[pin]
@ -334,6 +258,7 @@ impl<B, F, E> MessageBody for MessageBodyMapErr<B, F>
where where
B: MessageBody, B: MessageBody,
F: FnOnce(B::Error) -> E, F: FnOnce(B::Error) -> E,
E: Into<Box<dyn StdError>>,
{ {
type Error = E; type Error = E;

View File

@ -103,8 +103,10 @@ pin_project! {
#[project = EncoderBodyProj] #[project = EncoderBodyProj]
enum EncoderBody<B> { enum EncoderBody<B> {
None, None,
// TODO: this variant is not used but RA can't see it because of macro wrapper // TODO: this variant is not used but RA can't see it because of macro wrapper
Bytes { bytes: Bytes }, Bytes { body: Bytes },
Stream { #[pin] body: B }, Stream { #[pin] body: B },
} }
} }
@ -113,12 +115,12 @@ impl<B> MessageBody for EncoderBody<B>
where where
B: MessageBody, B: MessageBody,
{ {
type Error = EncoderError<B::Error>; type Error = EncoderError;
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
match self { match self {
EncoderBody::None => BodySize::None, EncoderBody::None => BodySize::None,
EncoderBody::Bytes { bytes } => bytes.size(), EncoderBody::Bytes { body } => body.size(),
EncoderBody::Stream { body } => body.size(), EncoderBody::Stream { body } => body.size(),
} }
} }
@ -130,17 +132,16 @@ where
match self.project() { match self.project() {
EncoderBodyProj::None => Poll::Ready(None), EncoderBodyProj::None => Poll::Ready(None),
EncoderBodyProj::Bytes { bytes } => { EncoderBodyProj::Bytes { body } => {
if bytes.is_empty() { if body.is_empty() {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
Poll::Ready(Some(Ok(std::mem::take(bytes)))) Poll::Ready(Some(Ok(std::mem::take(body))))
} }
} }
EncoderBodyProj::Stream { body } => body
EncoderBodyProj::Stream { body } => { .poll_next(cx)
body.poll_next(cx).map_err(EncoderError::Body) .map_err(|err| EncoderError::Body(err.into())),
}
} }
} }
} }
@ -149,7 +150,7 @@ impl<B> MessageBody for Encoder<B>
where where
B: MessageBody, B: MessageBody,
{ {
type Error = EncoderError<B::Error>; type Error = EncoderError;
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
if self.encoder.is_none() { if self.encoder.is_none() {
@ -369,9 +370,9 @@ impl ContentEncoder {
#[derive(Debug, Display)] #[derive(Debug, Display)]
#[non_exhaustive] #[non_exhaustive]
pub enum EncoderError<E> { pub enum EncoderError {
#[display(fmt = "body")] #[display(fmt = "body")]
Body(E), Body(Box<dyn StdError>),
#[display(fmt = "blocking")] #[display(fmt = "blocking")]
Blocking(BlockingError), Blocking(BlockingError),
@ -380,18 +381,18 @@ pub enum EncoderError<E> {
Io(io::Error), Io(io::Error),
} }
impl<E: StdError + 'static> StdError for EncoderError<E> { impl StdError for EncoderError {
fn source(&self) -> Option<&(dyn StdError + 'static)> { fn source(&self) -> Option<&(dyn StdError + 'static)> {
match self { match self {
EncoderError::Body(err) => Some(err), EncoderError::Body(err) => Some(&**err),
EncoderError::Blocking(err) => Some(err), EncoderError::Blocking(err) => Some(err),
EncoderError::Io(err) => Some(err), EncoderError::Io(err) => Some(err),
} }
} }
} }
impl<E: StdError + 'static> From<EncoderError<E>> for crate::Error { impl From<EncoderError> for crate::Error {
fn from(err: EncoderError<E>) -> Self { fn from(err: EncoderError) -> Self {
crate::Error::new_encoder().with_cause(err) crate::Error::new_encoder().with_cause(err)
} }
} }

View File

@ -1,22 +1,30 @@
use std::future::Future; use std::{
use std::pin::Pin; future::Future,
use std::task::{Context, Poll}; pin::Pin,
task::{Context, Poll},
};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use pin_project_lite::pin_project;
use crate::body::{BodySize, MessageBody}; use crate::{
use crate::error::Error; body::{BodySize, MessageBody},
use crate::h1::{Codec, Message}; error::Error,
use crate::response::Response; h1::{Codec, Message},
response::Response,
};
/// Send HTTP/1 response pin_project! {
#[pin_project::pin_project] /// Send HTTP/1 response
pub struct SendResponse<T, B> { pub struct SendResponse<T, B> {
res: Option<Message<(Response<()>, BodySize)>>, res: Option<Message<(Response<()>, BodySize)>>,
#[pin]
body: Option<B>, #[pin]
#[pin] body: Option<B>,
framed: Option<Framed<T, Codec>>,
#[pin]
framed: Option<Framed<T, Codec>>,
}
} }
impl<T, B> SendResponse<T, B> impl<T, B> SendResponse<T, B>

View File

@ -51,7 +51,7 @@ where
{ {
pub(crate) fn new( pub(crate) fn new(
flow: Rc<HttpFlow<S, X, U>>, flow: Rc<HttpFlow<S, X, U>>,
mut connection: Connection<T, Bytes>, mut conn: Connection<T, Bytes>,
on_connect_data: OnConnectData, on_connect_data: OnConnectData,
config: ServiceConfig, config: ServiceConfig,
peer_addr: Option<net::SocketAddr>, peer_addr: Option<net::SocketAddr>,
@ -66,14 +66,14 @@ where
}) })
.unwrap_or_else(|| Box::pin(sleep(dur))), .unwrap_or_else(|| Box::pin(sleep(dur))),
on_flight: false, on_flight: false,
ping_pong: connection.ping_pong().unwrap(), ping_pong: conn.ping_pong().unwrap(),
}); });
Self { Self {
flow, flow,
config, config,
peer_addr, peer_addr,
connection, connection: conn,
on_connect_data, on_connect_data,
ping_pong, ping_pong,
_phantom: PhantomData, _phantom: PhantomData,

View File

@ -15,7 +15,7 @@ use crate::{
header::{self, IntoHeaderValue}, header::{self, IntoHeaderValue},
http::{HeaderMap, StatusCode}, http::{HeaderMap, StatusCode},
message::{BoxedResponseHead, ResponseHead}, message::{BoxedResponseHead, ResponseHead},
ResponseBuilder, Error, ResponseBuilder,
}; };
/// An HTTP response. /// An HTTP response.
@ -233,21 +233,16 @@ impl<B: Default> Default for Response<B> {
} }
} }
// TODO: fix this impl impl<I: Into<Response<BoxBody>>, E: Into<Error>> From<Result<I, E>>
// impl<B, I, E> From<Result<I, E>> for Response<BoxBody> for Response<BoxBody>
// where {
// B: MessageBody + 'static, fn from(res: Result<I, E>) -> Self {
// B::Error: Into<Box<dyn StdError + 'static>>, match res {
// I: Into<Response<B>>, Ok(val) => val.into(),
// E: Into<Error>, Err(err) => Response::from(err.into()),
// { }
// fn from(res: Result<I, E>) -> Self { }
// match res { }
// Ok(val) => val.into(),
// Err(err) => err.into().into(),
// }
// }
// }
impl From<ResponseBuilder> for Response<BoxBody> { impl From<ResponseBuilder> for Response<BoxBody> {
fn from(mut builder: ResponseBuilder) -> Self { fn from(mut builder: ResponseBuilder) -> Self {
@ -288,12 +283,14 @@ impl From<String> for Response<String> {
} }
} }
// TODO: was this is useful impl impl From<&String> for Response<String> {
// impl<'a> From<&'a String> for Response<&'a String> { fn from(val: &String) -> Self {
// fn from(val: &'a String) -> Self { let mut res = Response::with_body(StatusCode::OK, val.clone());
// todo!() let mime = mime::TEXT_PLAIN_UTF_8.try_into_value().unwrap();
// } res.headers_mut().insert(header::CONTENT_TYPE, mime);
// } res
}
}
impl From<Bytes> for Response<Bytes> { impl From<Bytes> for Response<Bytes> {
fn from(val: Bytes) -> Self { fn from(val: Bytes) -> Self {
@ -322,8 +319,6 @@ impl From<ByteString> for Response<ByteString> {
} }
} }
// TODO: impl into Response for ByteString
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -239,7 +239,6 @@ impl ResponseBuilder {
{ {
match self.message_body(body) { match self.message_body(body) {
Ok(res) => res.map_body(|_, body| EitherBody::left(body)), Ok(res) => res.map_body(|_, body| EitherBody::left(body)),
// TODO: add error path
Err(err) => Response::from(err).map_body(|_, body| EitherBody::right(body)), Err(err) => Response::from(err).map_body(|_, body| EitherBody::right(body)),
} }
} }

View File

@ -15,7 +15,7 @@ use actix_service::{
fn_service, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt as _, fn_service, IntoServiceFactory, Service, ServiceFactory, ServiceFactoryExt as _,
}; };
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::{future::LocalBoxFuture, ready};
use pin_project::pin_project; use pin_project_lite::pin_project;
use crate::{ use crate::{
body::{BoxBody, MessageBody}, body::{BoxBody, MessageBody},
@ -519,23 +519,27 @@ where
match proto { match proto {
Protocol::Http2 => HttpServiceHandlerResponse { Protocol::Http2 => HttpServiceHandlerResponse {
state: State::H2Handshake(Some(( state: State::H2Handshake {
h2::handshake_with_timeout(io, &self.cfg), handshake: Some((
self.cfg.clone(), h2::handshake_with_timeout(io, &self.cfg),
self.flow.clone(), self.cfg.clone(),
on_connect_data, self.flow.clone(),
peer_addr, on_connect_data,
))), peer_addr,
)),
},
}, },
Protocol::Http1 => HttpServiceHandlerResponse { Protocol::Http1 => HttpServiceHandlerResponse {
state: State::H1(h1::Dispatcher::new( state: State::H1 {
io, dispatcher: h1::Dispatcher::new(
self.cfg.clone(), io,
self.flow.clone(), self.cfg.clone(),
on_connect_data, self.flow.clone(),
peer_addr, on_connect_data,
)), peer_addr,
),
},
}, },
proto => unimplemented!("Unsupported HTTP version: {:?}.", proto), proto => unimplemented!("Unsupported HTTP version: {:?}.", proto),
@ -543,58 +547,67 @@ where
} }
} }
#[pin_project(project = StateProj)] pin_project! {
enum State<T, S, B, X, U> #[project = StateProj]
where enum State<T, S, B, X, U>
T: AsyncRead + AsyncWrite + Unpin, where
T: AsyncRead,
T: AsyncWrite,
T: Unpin,
S: Service<Request>, S: Service<Request>,
S::Future: 'static, S::Future: 'static,
S::Error: Into<Response<BoxBody>>, S::Error: Into<Response<BoxBody>>,
B: MessageBody, B: MessageBody,
B::Error: Into<Box<dyn StdError>>, B::Error: Into<Box<dyn StdError>>,
X: Service<Request, Response = Request>, X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>, X::Error: Into<Response<BoxBody>>,
U: Service<(Request, Framed<T, h1::Codec>), Response = ()>, U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
U::Error: fmt::Display, U::Error: fmt::Display,
{ {
H1(#[pin] h1::Dispatcher<T, S, B, X, U>), H1 { #[pin] dispatcher: h1::Dispatcher<T, S, B, X, U> },
H2(#[pin] h2::Dispatcher<T, S, B, X, U>), H2 { #[pin] dispatcher: h2::Dispatcher<T, S, B, X, U> },
H2Handshake( H2Handshake {
Option<( handshake: Option<(
h2::HandshakeWithTimeout<T>, h2::HandshakeWithTimeout<T>,
ServiceConfig, ServiceConfig,
Rc<HttpFlow<S, X, U>>, Rc<HttpFlow<S, X, U>>,
OnConnectData, OnConnectData,
Option<net::SocketAddr>, Option<net::SocketAddr>,
)>, )>,
), },
}
} }
#[pin_project] pin_project! {
pub struct HttpServiceHandlerResponse<T, S, B, X, U> pub struct HttpServiceHandlerResponse<T, S, B, X, U>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead,
T: AsyncWrite,
T: Unpin,
S: Service<Request>, S: Service<Request>,
S::Error: Into<Response<BoxBody>> + 'static, S::Error: Into<Response<BoxBody>>,
S::Future: 'static, S::Error: 'static,
S::Response: Into<Response<B>> + 'static, S::Future: 'static,
S::Response: Into<Response<B>>,
S::Response: 'static,
B: MessageBody, B: MessageBody,
B::Error: Into<Box<dyn StdError>>, B::Error: Into<Box<dyn StdError>>,
X: Service<Request, Response = Request>, X: Service<Request, Response = Request>,
X::Error: Into<Response<BoxBody>>, X::Error: Into<Response<BoxBody>>,
U: Service<(Request, Framed<T, h1::Codec>), Response = ()>, U: Service<(Request, Framed<T, h1::Codec>), Response = ()>,
U::Error: fmt::Display, U::Error: fmt::Display,
{ {
#[pin] #[pin]
state: State<T, S, B, X, U>, state: State<T, S, B, X, U>,
}
} }
impl<T, S, B, X, U> Future for HttpServiceHandlerResponse<T, S, B, X, U> impl<T, S, B, X, U> Future for HttpServiceHandlerResponse<T, S, B, X, U>
@ -619,23 +632,24 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().project().state.project() { match self.as_mut().project().state.project() {
StateProj::H1(disp) => disp.poll(cx), StateProj::H1 { dispatcher } => dispatcher.poll(cx),
StateProj::H2(disp) => disp.poll(cx), StateProj::H2 { dispatcher } => dispatcher.poll(cx),
StateProj::H2Handshake(data) => { StateProj::H2Handshake { handshake: data } => {
match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) { match ready!(Pin::new(&mut data.as_mut().unwrap().0).poll(cx)) {
Ok((conn, timer)) => { Ok((conn, timer)) => {
let (_, cfg, srv, on_connect_data, peer_addr) = let (_, config, flow, on_connect_data, peer_addr) =
data.take().unwrap(); data.take().unwrap();
self.as_mut().project().state.set(State::H2(
h2::Dispatcher::new( self.as_mut().project().state.set(State::H2 {
srv, dispatcher: h2::Dispatcher::new(
flow,
conn, conn,
on_connect_data, on_connect_data,
cfg, config,
peer_addr, peer_addr,
timer, timer,
), ),
)); });
self.poll(cx) self.poll(cx)
} }
Err(err) => { Err(err) => {

View File

@ -4,17 +4,21 @@ use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite, Framed}; use actix_codec::{AsyncRead, AsyncWrite, Framed};
use actix_service::{IntoService, Service}; use actix_service::{IntoService, Service};
use pin_project_lite::pin_project;
use super::{Codec, Frame, Message}; use super::{Codec, Frame, Message};
#[pin_project::pin_project] pin_project! {
pub struct Dispatcher<S, T> pub struct Dispatcher<S, T>
where where
S: Service<Frame, Response = Message> + 'static, S: Service<Frame, Response = Message>,
T: AsyncRead + AsyncWrite, S: 'static,
{ T: AsyncRead,
#[pin] T: AsyncWrite,
inner: inner::Dispatcher<S, T, Codec, Message>, {
#[pin]
inner: inner::Dispatcher<S, T, Codec, Message>,
}
} }
impl<S, T> Dispatcher<S, T> impl<S, T> Dispatcher<S, T>

View File

@ -22,7 +22,7 @@ actix-web = { version = "4.0.0-beta.11", default-features = false }
bytes = "1" bytes = "1"
bytestring = "1" bytestring = "1"
futures-core = { version = "0.3.7", default-features = false } futures-core = { version = "0.3.7", default-features = false }
pin-project = "1.0.0" pin-project-lite = "0.2"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }
[dev-dependencies] [dev-dependencies]

View File

@ -30,6 +30,7 @@ use actix_web::{
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use bytestring::ByteString; use bytestring::ByteString;
use futures_core::Stream; use futures_core::Stream;
use pin_project_lite::pin_project;
use tokio::sync::oneshot::Sender; use tokio::sync::oneshot::Sender;
/// Perform WebSocket handshake and start actor. /// Perform WebSocket handshake and start actor.
@ -462,13 +463,14 @@ where
} }
} }
#[pin_project::pin_project] pin_project! {
struct WsStream<S> { struct WsStream<S> {
#[pin] #[pin]
stream: S, stream: S,
decoder: Codec, decoder: Codec,
buf: BytesMut, buf: BytesMut,
closed: bool, closed: bool,
}
} }
impl<S> WsStream<S> impl<S> WsStream<S>

View File

@ -1,37 +1,35 @@
use std::cell::RefCell; use std::{cell::RefCell, fmt, future::Future, marker::PhantomData, rc::Rc};
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::rc::Rc;
use actix_http::body::{BoxBody, MessageBody}; use actix_http::{
use actix_http::{Extensions, Request}; body::{BoxBody, MessageBody},
use actix_service::boxed::{self, BoxServiceFactory}; Extensions, Request,
};
use actix_service::{ use actix_service::{
apply, apply_fn_factory, IntoServiceFactory, ServiceFactory, ServiceFactoryExt, Transform, apply, apply_fn_factory, boxed, IntoServiceFactory, ServiceFactory, ServiceFactoryExt,
Transform,
}; };
use futures_util::future::FutureExt as _; use futures_util::future::FutureExt as _;
use crate::app_service::{AppEntry, AppInit, AppRoutingFactory}; use crate::{
use crate::config::ServiceConfig; app_service::{AppEntry, AppInit, AppRoutingFactory},
use crate::data::{Data, DataFactory, FnDataFactory}; config::ServiceConfig,
use crate::dev::ResourceDef; data::{Data, DataFactory, FnDataFactory},
use crate::error::Error; dev::ResourceDef,
use crate::resource::Resource; error::Error,
use crate::route::Route; resource::Resource,
use crate::service::{ route::Route,
AppServiceFactory, HttpServiceFactory, ServiceFactoryWrapper, ServiceRequest, service::{
ServiceResponse, AppServiceFactory, BoxedHttpServiceFactory, HttpServiceFactory, ServiceFactoryWrapper,
ServiceRequest, ServiceResponse,
},
}; };
type HttpNewService = BoxServiceFactory<(), ServiceRequest, ServiceResponse, Error, ()>;
/// Application builder - structure that follows the builder pattern /// Application builder - structure that follows the builder pattern
/// for building application instances. /// for building application instances.
pub struct App<T, B> { pub struct App<T, B> {
endpoint: T, endpoint: T,
services: Vec<Box<dyn AppServiceFactory>>, services: Vec<Box<dyn AppServiceFactory>>,
default: Option<Rc<HttpNewService>>, default: Option<Rc<BoxedHttpServiceFactory>>,
factory_ref: Rc<RefCell<Option<AppRoutingFactory>>>, factory_ref: Rc<RefCell<Option<AppRoutingFactory>>>,
data_factories: Vec<FnDataFactory>, data_factories: Vec<FnDataFactory>,
external: Vec<ResourceDef>, external: Vec<ResourceDef>,
@ -287,7 +285,7 @@ where
/// ); /// );
/// } /// }
/// ``` /// ```
pub fn default_service<F, U>(mut self, f: F) -> Self pub fn default_service<F, U>(mut self, svc: F) -> Self
where where
F: IntoServiceFactory<U, ServiceRequest>, F: IntoServiceFactory<U, ServiceRequest>,
U: ServiceFactory< U: ServiceFactory<
@ -298,10 +296,12 @@ where
> + 'static, > + 'static,
U::InitError: fmt::Debug, U::InitError: fmt::Debug,
{ {
// create and configure default resource let svc = svc
self.default = Some(Rc::new(boxed::factory(f.into_factory().map_init_err( .into_factory()
|e| log::error!("Can not construct default service: {:?}", e), .map(|res| res.map_into_boxed_body())
)))); .map_init_err(|e| log::error!("Can not construct default service: {:?}", e));
self.default = Some(Rc::new(boxed::factory(svc)));
self self
} }

View File

@ -1,14 +1,11 @@
use std::future::Future; use std::future::Future;
use actix_service::{ use actix_service::{boxed, fn_service};
boxed::{self, BoxServiceFactory},
fn_service,
};
use crate::{ use crate::{
body::EitherBody, body::MessageBody,
service::{ServiceRequest, ServiceResponse}, service::{BoxedHttpServiceFactory, ServiceRequest, ServiceResponse},
Error, FromRequest, HttpResponse, Responder, BoxError, FromRequest, HttpResponse, Responder,
}; };
/// A request handler is an async function that accepts zero or more parameters that can be /// A request handler is an async function that accepts zero or more parameters that can be
@ -25,20 +22,14 @@ where
fn call(&self, param: T) -> R; fn call(&self, param: T) -> R;
} }
pub fn handler_service<F, T, R>( pub fn handler_service<F, T, R>(handler: F) -> BoxedHttpServiceFactory
handler: F,
) -> BoxServiceFactory<
(),
ServiceRequest,
ServiceResponse<EitherBody<<R::Output as Responder>::Body>>,
Error,
(),
>
where where
F: Handler<T, R>, F: Handler<T, R>,
T: FromRequest, T: FromRequest,
R: Future, R: Future,
R::Output: Responder, R::Output: Responder,
<R::Output as Responder>::Body: MessageBody,
<<R::Output as Responder>::Body as MessageBody>::Error: Into<BoxError>,
{ {
boxed::factory(fn_service(move |req: ServiceRequest| { boxed::factory(fn_service(move |req: ServiceRequest| {
let handler = handler.clone(); let handler = handler.clone();
@ -46,15 +37,13 @@ where
async move { async move {
let (req, mut payload) = req.into_parts(); let (req, mut payload) = req.into_parts();
let res = match T::from_request(&req, &mut payload).await { let res = match T::from_request(&req, &mut payload).await {
Err(err) => { Err(err) => HttpResponse::from_error(err).map_into_boxed_body(),
HttpResponse::from_error(err).map_body(|_, body| EitherBody::right(body))
}
Ok(data) => handler Ok(data) => handler
.call(data) .call(data)
.await .await
.respond_to(&req) .respond_to(&req)
.map_body(|_, body| EitherBody::left(body)), .map_into_boxed_body(),
}; };
Ok(ServiceResponse::new(req, res)) Ok(ServiceResponse::new(req, res))

View File

@ -115,3 +115,5 @@ pub use crate::scope::Scope;
pub use crate::server::HttpServer; pub use crate::server::HttpServer;
// TODO: is exposing the error directly really needed // TODO: is exposing the error directly really needed
pub use crate::types::{Either, EitherExtractError}; pub use crate::types::{Either, EitherExtractError};
pub(crate) type BoxError = Box<dyn std::error::Error>;

View File

@ -1,4 +1,4 @@
use std::{cell::RefCell, error::Error as StdError, fmt, future::Future, rc::Rc}; use std::{cell::RefCell, fmt, future::Future, rc::Rc};
use actix_http::Extensions; use actix_http::Extensions;
use actix_router::{IntoPatterns, Patterns}; use actix_router::{IntoPatterns, Patterns};
@ -21,7 +21,7 @@ use crate::{
BoxedHttpService, BoxedHttpServiceFactory, HttpServiceFactory, ServiceRequest, BoxedHttpService, BoxedHttpServiceFactory, HttpServiceFactory, ServiceRequest,
ServiceResponse, ServiceResponse,
}, },
Error, FromRequest, HttpResponse, BoxError, Error, FromRequest, HttpResponse,
}; };
/// *Resource* is an entry in resources table which corresponds to requested URL. /// *Resource* is an entry in resources table which corresponds to requested URL.
@ -239,9 +239,8 @@ where
I: FromRequest + 'static, I: FromRequest + 'static,
R: Future + 'static, R: Future + 'static,
R::Output: Responder + 'static, R::Output: Responder + 'static,
<R::Output as Responder>::Body: MessageBody + 'static, <R::Output as Responder>::Body: MessageBody,
<<R::Output as Responder>::Body as MessageBody>::Error: <<R::Output as Responder>::Body as MessageBody>::Error: Into<BoxError>,
Into<Box<dyn StdError + 'static>>,
{ {
self.routes.push(Route::new().to(handler)); self.routes.push(Route::new().to(handler));
self self

View File

@ -1,4 +1,4 @@
use std::error::Error as StdError; use std::borrow::Cow;
use actix_http::{ use actix_http::{
body::{BoxBody, EitherBody, MessageBody}, body::{BoxBody, EitherBody, MessageBody},
@ -6,7 +6,7 @@ use actix_http::{
}; };
use bytes::{Bytes, BytesMut}; use bytes::{Bytes, BytesMut};
use crate::{Error, HttpRequest, HttpResponse, HttpResponseBuilder}; use crate::{BoxError, Error, HttpRequest, HttpResponse, HttpResponseBuilder};
/// Trait implemented by types that can be converted to an HTTP response. /// Trait implemented by types that can be converted to an HTTP response.
/// ///
@ -99,7 +99,7 @@ impl Responder for actix_http::ResponseBuilder {
impl<T> Responder for Option<T> impl<T> Responder for Option<T>
where where
T: Responder, T: Responder,
<T::Body as MessageBody>::Error: Into<Box<dyn StdError + 'static>>, <T::Body as MessageBody>::Error: Into<BoxError>,
{ {
type Body = EitherBody<T::Body>; type Body = EitherBody<T::Body>;
@ -118,7 +118,7 @@ where
impl<T, E> Responder for Result<T, E> impl<T, E> Responder for Result<T, E>
where where
T: Responder, T: Responder,
<T::Body as MessageBody>::Error: Into<Box<dyn StdError + 'static>>, <T::Body as MessageBody>::Error: Into<BoxError>,
E: Into<Error>, E: Into<Error>,
{ {
type Body = EitherBody<T::Body>; type Body = EitherBody<T::Body>;
@ -129,8 +129,8 @@ where
.respond_to(req) .respond_to(req)
.map_body(|_, body| EitherBody::left(body)), .map_body(|_, body| EitherBody::left(body)),
Err(e) => { Err(err) => {
HttpResponse::from_error(e.into()).map_body(|_, body| EitherBody::right(body)) HttpResponse::from_error(err.into()).map_body(|_, body| EitherBody::right(body))
} }
} }
} }
@ -170,25 +170,22 @@ impl_responder_by_forward_into_base_response!(BytesMut);
impl_responder_by_forward_into_base_response!(&'static str); impl_responder_by_forward_into_base_response!(&'static str);
impl_responder_by_forward_into_base_response!(String); impl_responder_by_forward_into_base_response!(String);
// macro_rules! impl_responder { macro_rules! impl_into_string_responder {
// ($res:ty, $body:ty, $ct:path) => { ($res:ty) => {
// impl Responder for $res { impl Responder for $res {
// type Body = $body; type Body = String;
// fn respond_to(self, _: &HttpRequest) -> HttpResponse<Self::Body> { fn respond_to(self, _: &HttpRequest) -> HttpResponse<Self::Body> {
// HttpResponse::Ok().content_type($ct).body(self) let string: String = self.into();
// } let res: actix_http::Response<_> = string.into();
// } res.into()
// }; }
}
};
}
// ($res:ty, $ct:path) => { impl_into_string_responder!(&'_ String);
// impl_responder!($res, $res, $ct); impl_into_string_responder!(Cow<'_, str>);
// };
// }
// impl_responder!(&'_ String, mime::TEXT_PLAIN_UTF_8);
// impl_responder!(Cow<'_, str>, mime::TEXT_PLAIN_UTF_8);
/// Allows overriding status code and headers for a responder. /// Allows overriding status code and headers for a responder.
pub struct CustomResponder<T> { pub struct CustomResponder<T> {
@ -257,7 +254,7 @@ impl<T: Responder> CustomResponder<T> {
impl<T> Responder for CustomResponder<T> impl<T> Responder for CustomResponder<T>
where where
T: Responder, T: Responder,
<T::Body as MessageBody>::Error: Into<Box<dyn StdError + 'static>>, <T::Body as MessageBody>::Error: Into<BoxError>,
{ {
type Body = EitherBody<T::Body>; type Body = EitherBody<T::Body>;
@ -265,7 +262,7 @@ where
let headers = match self.headers { let headers = match self.headers {
Ok(headers) => headers, Ok(headers) => headers,
Err(err) => { Err(err) => {
return HttpResponse::from_error(Error::from(err)) return HttpResponse::from_error(err)
.map_body(|_, body| EitherBody::right(body)) .map_body(|_, body| EitherBody::right(body))
} }
}; };

View File

@ -9,7 +9,7 @@ use std::{
}; };
use actix_http::{ use actix_http::{
body::{BoxBody, MessageBody}, body::{BoxBody, EitherBody, MessageBody},
http::{header::HeaderMap, StatusCode}, http::{header::HeaderMap, StatusCode},
Extensions, Response, ResponseHead, Extensions, Response, ResponseHead,
}; };
@ -228,7 +228,15 @@ impl<B> HttpResponse<B> {
} }
} }
// TODO: old into_body equivalent, maybe // TODO: docs for the body map methods below
pub fn map_into_left_body<R>(self) -> HttpResponse<EitherBody<B, R>> {
self.map_body(|_, body| EitherBody::left(body))
}
pub fn map_into_right_body<L>(self) -> HttpResponse<EitherBody<L, B>> {
self.map_body(|_, body| EitherBody::right(body))
}
pub fn map_into_boxed_body(self) -> HttpResponse<BoxBody> pub fn map_into_boxed_body(self) -> HttpResponse<BoxBody>
where where

View File

@ -1,11 +1,11 @@
#![allow(clippy::rc_buffer)] // inner value is mutated before being shared (`Rc::get_mut`) #![allow(clippy::rc_buffer)] // inner value is mutated before being shared (`Rc::get_mut`)
use std::{error::Error as StdError, future::Future, mem, rc::Rc}; use std::{future::Future, mem, rc::Rc};
use actix_http::http::Method; use actix_http::http::Method;
use actix_service::{ use actix_service::{
boxed::{self, BoxService, BoxServiceFactory}, boxed::{self, BoxService},
Service, ServiceFactory, ServiceFactoryExt, fn_service, Service, ServiceFactory, ServiceFactoryExt,
}; };
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
@ -13,8 +13,8 @@ use crate::{
body::MessageBody, body::MessageBody,
guard::{self, Guard}, guard::{self, Guard},
handler::{handler_service, Handler}, handler::{handler_service, Handler},
service::{ServiceRequest, ServiceResponse}, service::{BoxedHttpServiceFactory, ServiceRequest, ServiceResponse},
Error, FromRequest, HttpResponse, Responder, BoxError, Error, FromRequest, HttpResponse, Responder,
}; };
/// Resource route definition /// Resource route definition
@ -22,7 +22,7 @@ use crate::{
/// Route uses builder-like pattern for configuration. /// Route uses builder-like pattern for configuration.
/// If handler is not explicitly set, default *404 Not Found* handler is used. /// If handler is not explicitly set, default *404 Not Found* handler is used.
pub struct Route { pub struct Route {
service: BoxServiceFactory<(), ServiceRequest, ServiceResponse, Error, ()>, service: BoxedHttpServiceFactory,
guards: Rc<Vec<Box<dyn Guard>>>, guards: Rc<Vec<Box<dyn Guard>>>,
} }
@ -31,10 +31,9 @@ impl Route {
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub fn new() -> Route { pub fn new() -> Route {
Route { Route {
// TODO: remove double boxing service: boxed::factory(fn_service(|req: ServiceRequest| async {
service: boxed::factory( Ok(req.into_response(HttpResponse::NotFound()))
handler_service(HttpResponse::NotFound).map(|res| res.map_into_boxed_body()), })),
),
guards: Rc::new(Vec::new()), guards: Rc::new(Vec::new()),
} }
} }
@ -185,13 +184,10 @@ impl Route {
T: FromRequest + 'static, T: FromRequest + 'static,
R: Future + 'static, R: Future + 'static,
R::Output: Responder + 'static, R::Output: Responder + 'static,
<R::Output as Responder>::Body: MessageBody + 'static, <R::Output as Responder>::Body: MessageBody,
<<R::Output as Responder>::Body as MessageBody>::Error: <<R::Output as Responder>::Body as MessageBody>::Error: Into<BoxError>,
Into<Box<dyn StdError + 'static>>,
{ {
// TODO: remove double boxing self.service = handler_service(handler);
self.service =
boxed::factory(handler_service(handler).map(|res| res.map_into_boxed_body()));
self self
} }

View File

@ -27,9 +27,9 @@ use crate::{
Error, HttpRequest, HttpResponse, Error, HttpRequest, HttpResponse,
}; };
pub(crate) type BoxedHttpService = BoxService<ServiceRequest, ServiceResponse, Error>; pub(crate) type BoxedHttpService = BoxService<ServiceRequest, ServiceResponse<BoxBody>, Error>;
pub(crate) type BoxedHttpServiceFactory = pub(crate) type BoxedHttpServiceFactory =
BoxServiceFactory<(), ServiceRequest, ServiceResponse, Error, ()>; BoxServiceFactory<(), ServiceRequest, ServiceResponse<BoxBody>, Error, ()>;
pub trait HttpServiceFactory { pub trait HttpServiceFactory {
fn register(self, config: &mut AppService); fn register(self, config: &mut AppService);

View File

@ -1,7 +1,6 @@
//! For either helper, see [`Either`]. //! For either helper, see [`Either`].
use std::{ use std::{
error::Error as StdError,
future::Future, future::Future,
mem, mem,
pin::Pin, pin::Pin,
@ -15,7 +14,7 @@ use pin_project_lite::pin_project;
use crate::{ use crate::{
body, dev, body, dev,
web::{Form, Json}, web::{Form, Json},
Error, FromRequest, HttpRequest, HttpResponse, Responder, BoxError, Error, FromRequest, HttpRequest, HttpResponse, Responder,
}; };
/// Combines two extractor or responder types into a single type. /// Combines two extractor or responder types into a single type.
@ -145,9 +144,9 @@ impl<L, R> Either<L, R> {
impl<L, R> Responder for Either<L, R> impl<L, R> Responder for Either<L, R>
where where
L: Responder, L: Responder,
<L::Body as dev::MessageBody>::Error: Into<Box<dyn StdError + 'static>>, <L::Body as dev::MessageBody>::Error: Into<BoxError>,
R: Responder, R: Responder,
<R::Body as dev::MessageBody>::Error: Into<Box<dyn StdError + 'static>>, <R::Body as dev::MessageBody>::Error: Into<BoxError>,
{ {
type Body = body::EitherBody<L::Body, R::Body>; type Body = body::EitherBody<L::Body, R::Body>;