propagate into<error> change

This commit is contained in:
Rob Ede 2021-04-22 15:26:38 +01:00
parent 70f5f2ae40
commit f1ab78a44c
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
18 changed files with 82 additions and 40 deletions

View File

@ -72,6 +72,8 @@ impl MessageBody for Body {
Poll::Ready(Some(Ok(mem::take(bin)))) Poll::Ready(Some(Ok(mem::take(bin))))
} }
} }
// TODO: MSRV 1.51: poll_map_err
Body::Message(body) => match ready!(body.as_mut().poll_next(cx)) { Body::Message(body) => match ready!(body.as_mut().poll_next(cx)) {
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))), Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
Some(Ok(val)) => Poll::Ready(Some(Ok(val))), Some(Ok(val)) => Poll::Ready(Some(Ok(val))),

View File

@ -45,7 +45,8 @@ impl MessageBody for () {
impl<T> MessageBody for Box<T> impl<T> MessageBody for Box<T>
where where
T: MessageBody<Error = Error> + Unpin, T: MessageBody + Unpin,
T::Error: Into<Error>,
{ {
type Error = Error; type Error = Error;
@ -57,11 +58,19 @@ where
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
Pin::new(self.get_mut().as_mut()).poll_next(cx) match ready!(Pin::new(self.get_mut().as_mut()).poll_next(cx)) {
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
Some(Ok(val)) => Poll::Ready(Some(Ok(val))),
None => Poll::Ready(None),
}
} }
} }
impl<T: MessageBody> MessageBody for Pin<Box<T>> { impl<T> MessageBody for Pin<Box<T>>
where
T: MessageBody,
T::Error: Into<Error>,
{
type Error = Error; type Error = Error;
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
@ -72,7 +81,11 @@ impl<T: MessageBody> MessageBody for Pin<Box<T>> {
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> { ) -> Poll<Option<Result<Bytes, Self::Error>>> {
self.as_mut().poll_next(cx) match ready!(self.as_mut().poll_next(cx)) {
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
Some(Ok(val)) => Poll::Ready(Some(Ok(val))),
None => Poll::Ready(None),
}
} }
} }

View File

@ -208,7 +208,7 @@ where
S::Response: Into<Response<B>> + 'static, S::Response: Into<Response<B>> + 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
{ {
let cfg = ServiceConfig::new( let cfg = ServiceConfig::new(
self.keep_alive, self.keep_alive,
@ -225,11 +225,13 @@ where
/// Finish service configuration and create `HttpService` instance. /// Finish service configuration and create `HttpService` instance.
pub fn finish<F, B>(self, service: F) -> HttpService<T, S, B, X, U> pub fn finish<F, B>(self, service: F) -> HttpService<T, S, B, X, U>
where where
B: MessageBody + 'static,
F: IntoServiceFactory<S, Request>, F: IntoServiceFactory<S, Request>,
S::Error: Into<Error> + 'static, S::Error: Into<Error> + 'static,
S::InitError: fmt::Debug, S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static, S::Response: Into<Response<B>> + 'static,
B: MessageBody + 'static,
B::Error: Into<Error>,
{ {
let cfg = ServiceConfig::new( let cfg = ServiceConfig::new(
self.keep_alive, self.keep_alive,

View File

@ -257,7 +257,8 @@ where
) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>> ) -> LocalBoxFuture<'static, Result<(ResponseHead, Payload), SendRequestError>>
where where
H: Into<RequestHeadType> + 'static, H: Into<RequestHeadType> + 'static,
RB: MessageBody<Error = Error> + 'static, RB: MessageBody + 'static,
RB::Error: Into<Error>,
{ {
Box::pin(async move { Box::pin(async move {
match self { match self {

View File

@ -31,7 +31,8 @@ pub(crate) async fn send_request<Io, B>(
) -> Result<(ResponseHead, Payload), SendRequestError> ) -> Result<(ResponseHead, Payload), SendRequestError>
where where
Io: ConnectionIo, Io: ConnectionIo,
B: MessageBody<Error = Error>, B: MessageBody,
B::Error: Into<Error>,
{ {
// set request host header // set request host header
if !head.as_ref().headers.contains_key(HOST) if !head.as_ref().headers.contains_key(HOST)
@ -153,7 +154,8 @@ pub(crate) async fn send_body<Io, B>(
) -> Result<(), SendRequestError> ) -> Result<(), SendRequestError>
where where
Io: ConnectionIo, Io: ConnectionIo,
B: MessageBody<Error = Error>, B: MessageBody,
B::Error: Into<Error>,
{ {
actix_rt::pin!(body); actix_rt::pin!(body);
@ -161,9 +163,10 @@ where
while !eof { while !eof {
while !eof && !framed.as_ref().is_write_buf_full() { while !eof && !framed.as_ref().is_write_buf_full() {
match poll_fn(|cx| body.as_mut().poll_next(cx)).await { match poll_fn(|cx| body.as_mut().poll_next(cx)).await {
Some(result) => { Some(Ok(chunk)) => {
framed.as_mut().write(h1::Message::Chunk(Some(result?)))?; framed.as_mut().write(h1::Message::Chunk(Some(chunk)))?;
} }
Some(Err(err)) => return Err(err.into().into()),
None => { None => {
eof = true; eof = true;
framed.as_mut().write(h1::Message::Chunk(None))?; framed.as_mut().write(h1::Message::Chunk(None))?;

View File

@ -30,7 +30,8 @@ pub(crate) async fn send_request<Io, B>(
) -> Result<(ResponseHead, Payload), SendRequestError> ) -> Result<(ResponseHead, Payload), SendRequestError>
where where
Io: ConnectionIo, Io: ConnectionIo,
B: MessageBody<Error = Error>, B: MessageBody,
B::Error: Into<Error>,
{ {
trace!("Sending client request: {:?} {:?}", head, body.size()); trace!("Sending client request: {:?} {:?}", head, body.size());
@ -135,7 +136,8 @@ async fn send_body<B>(
mut send: SendStream<Bytes>, mut send: SendStream<Bytes>,
) -> Result<(), SendRequestError> ) -> Result<(), SendRequestError>
where where
B: MessageBody<Error = Error>, B: MessageBody,
B::Error: Into<Error>,
{ {
let mut buf = None; let mut buf = None;
actix_rt::pin!(body); actix_rt::pin!(body);
@ -146,7 +148,7 @@ where
send.reserve_capacity(b.len()); send.reserve_capacity(b.len());
buf = Some(b); buf = Some(b);
} }
Some(Err(e)) => return Err(e.into()), Some(Err(e)) => return Err(e.into().into()),
None => { None => {
if let Err(e) = send.send_data(Bytes::new(), true) { if let Err(e) = send.send_data(Bytes::new(), true) {
return Err(e.into()); return Err(e.into());

View File

@ -66,7 +66,7 @@ where
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody, B: MessageBody,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
X: ServiceFactory<Request, Config = (), Response = Request>, X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static, X::Future: 'static,
@ -115,7 +115,7 @@ mod openssl {
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody, B: MessageBody,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
X: ServiceFactory<Request, Config = (), Response = Request>, X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static, X::Future: 'static,
@ -175,7 +175,7 @@ mod rustls {
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody, B: MessageBody,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
X: ServiceFactory<Request, Config = (), Response = Request>, X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static, X::Future: 'static,
@ -273,7 +273,7 @@ where
S::InitError: fmt::Debug, S::InitError: fmt::Debug,
B: MessageBody, B: MessageBody,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
X: ServiceFactory<Request, Config = (), Response = Request>, X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static, X::Future: 'static,
@ -342,7 +342,7 @@ where
S::Response: Into<Response<B>>, S::Response: Into<Response<B>>,
B: MessageBody, B: MessageBody,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
X: Service<Request, Response = Request>, X: Service<Request, Response = Request>,
X::Error: Into<Error>, X::Error: Into<Error>,

View File

@ -22,7 +22,7 @@ pub struct SendResponse<T, B> {
impl<T, B> SendResponse<T, B> impl<T, B> SendResponse<T, B>
where where
B: MessageBody, B: MessageBody,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
{ {
pub fn new(framed: Framed<T, Codec>, response: Response<B>) -> Self { pub fn new(framed: Framed<T, Codec>, response: Response<B>) -> Self {
let (res, body) = response.into_parts(); let (res, body) = response.into_parts();
@ -39,7 +39,7 @@ impl<T, B> Future for SendResponse<T, B>
where where
T: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
B: MessageBody + Unpin, B: MessageBody + Unpin,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
{ {
type Output = Result<Framed<T, Codec>, Error>; type Output = Result<Framed<T, Codec>, Error>;

View File

@ -76,7 +76,7 @@ where
S::Response: Into<Response<B>> + 'static, S::Response: Into<Response<B>> + 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
B::Error: Into<Error> + 'static, B::Error: Into<Error>,
{ {
type Output = Result<(), DispatchError>; type Output = Result<(), DispatchError>;

View File

@ -42,7 +42,7 @@ where
<S::Service as Service<Request>>::Future: 'static, <S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
{ {
/// Create new `H2Service` instance with config. /// Create new `H2Service` instance with config.
pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>( pub(crate) fn with_config<F: IntoServiceFactory<S, Request>>(
@ -73,7 +73,7 @@ where
<S::Service as Service<Request>>::Future: 'static, <S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
{ {
/// Create plain TCP based service /// Create plain TCP based service
pub fn tcp( pub fn tcp(
@ -112,7 +112,7 @@ mod openssl {
<S::Service as Service<Request>>::Future: 'static, <S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
{ {
/// Create OpenSSL based service /// Create OpenSSL based service
pub fn openssl( pub fn openssl(
@ -158,7 +158,7 @@ mod rustls {
<S::Service as Service<Request>>::Future: 'static, <S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
{ {
/// Create Rustls based service /// Create Rustls based service
pub fn rustls( pub fn rustls(
@ -201,7 +201,7 @@ where
<S::Service as Service<Request>>::Future: 'static, <S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
{ {
type Response = (); type Response = ();
type Error = DispatchError; type Error = DispatchError;
@ -263,7 +263,7 @@ where
S::Future: 'static, S::Future: 'static,
S::Response: Into<Response<B>> + 'static, S::Response: Into<Response<B>> + 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
B: MessageBody<Error = Error>, B::Error: Into<Error>,
{ {
type Response = (); type Response = ();
type Error = DispatchError; type Error = DispatchError;
@ -327,7 +327,8 @@ where
S::Error: Into<Error> + 'static, S::Error: Into<Error> + 'static,
S::Future: 'static, S::Future: 'static,
S::Response: Into<Response<B>> + 'static, S::Response: Into<Response<B>> + 'static,
B: MessageBody<Error = Error>, B: MessageBody,
B::Error: Into<Error>,
{ {
type Output = Result<(), DispatchError>; type Output = Result<(), DispatchError>;

View File

@ -244,7 +244,8 @@ impl<B> Response<B> {
impl<B> fmt::Debug for Response<B> impl<B> fmt::Debug for Response<B>
where where
B: MessageBody<Error = Error>, B: MessageBody,
B::Error: Into<Error>,
{ {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let res = writeln!( let res = writeln!(

View File

@ -59,6 +59,7 @@ where
S::Response: Into<Response<B>> + 'static, S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static, <S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static, B: MessageBody + 'static,
B::Error: Into<Error>,
{ {
/// Create new `HttpService` instance. /// Create new `HttpService` instance.
pub fn new<F: IntoServiceFactory<S, Request>>(service: F) -> Self { pub fn new<F: IntoServiceFactory<S, Request>>(service: F) -> Self {

View File

@ -113,8 +113,10 @@ pub trait MapServiceResponseBody {
fn map_body(self) -> ServiceResponse; fn map_body(self) -> ServiceResponse;
} }
impl<B: MessageBody<Error = Error> + Unpin + 'static> MapServiceResponseBody impl<B> MapServiceResponseBody for ServiceResponse<B>
for ServiceResponse<B> where
B: MessageBody + Unpin + 'static,
B::Error: Into<Error>,
{ {
fn map_body(self) -> ServiceResponse { fn map_body(self) -> ServiceResponse {
self.map_body(|_, body| ResponseBody::Other(Body::from_message(body))) self.map_body(|_, body| ResponseBody::Other(Body::from_message(body)))

View File

@ -326,7 +326,11 @@ impl<B> PinnedDrop for StreamLog<B> {
} }
} }
impl<B: MessageBody<Error = Error>> MessageBody for StreamLog<B> { impl<B> MessageBody for StreamLog<B>
where
B: MessageBody,
B::Error: Into<Error>,
{
type Error = Error; type Error = Error;
fn size(&self) -> BodySize { fn size(&self) -> BodySize {

View File

@ -243,7 +243,11 @@ impl<B> HttpResponse<B> {
} }
} }
impl<B: MessageBody<Error = Error>> fmt::Debug for HttpResponse<B> { impl<B> fmt::Debug for HttpResponse<B>
where
B: MessageBody,
B::Error: Into<Error>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HttpResponse") f.debug_struct("HttpResponse")
.field("error", &self.error) .field("error", &self.error)

View File

@ -80,7 +80,8 @@ where
<S::Service as Service<Request>>::Future: 'static, <S::Service as Service<Request>>::Future: 'static,
S::Service: 'static, S::Service: 'static,
// S::Service: 'static, // S::Service: 'static,
B: MessageBody<Error = Error> + 'static, B: MessageBody + 'static,
B::Error: Into<Error>,
{ {
/// Create new HTTP server with application factory /// Create new HTTP server with application factory
pub fn new(factory: F) -> Self { pub fn new(factory: F) -> Self {

View File

@ -445,7 +445,8 @@ impl<B> From<ServiceResponse<B>> for Response<B> {
impl<B> fmt::Debug for ServiceResponse<B> impl<B> fmt::Debug for ServiceResponse<B>
where where
B: MessageBody<Error = Error>, B: MessageBody,
B::Error: Into<Error>,
{ {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let res = writeln!( let res = writeln!(

View File

@ -150,7 +150,8 @@ where
pub async fn read_response<S, B>(app: &S, req: Request) -> Bytes pub async fn read_response<S, B>(app: &S, req: Request) -> Bytes
where where
S: Service<Request, Response = ServiceResponse<B>, Error = Error>, S: Service<Request, Response = ServiceResponse<B>, Error = Error>,
B: MessageBody<Error = Error> + Unpin, B: MessageBody + Unpin,
B::Error: Into<Error>,
{ {
let mut resp = app let mut resp = app
.call(req) .call(req)
@ -195,7 +196,8 @@ where
/// ``` /// ```
pub async fn read_body<B>(mut res: ServiceResponse<B>) -> Bytes pub async fn read_body<B>(mut res: ServiceResponse<B>) -> Bytes
where where
B: MessageBody<Error = Error> + Unpin, B: MessageBody + Unpin,
B::Error: Into<Error>,
{ {
let mut body = res.take_body(); let mut body = res.take_body();
let mut bytes = BytesMut::new(); let mut bytes = BytesMut::new();
@ -244,7 +246,8 @@ where
/// ``` /// ```
pub async fn read_body_json<T, B>(res: ServiceResponse<B>) -> T pub async fn read_body_json<T, B>(res: ServiceResponse<B>) -> T
where where
B: MessageBody<Error = Error> + Unpin, B: MessageBody + Unpin,
B::Error: Into<Error>,
T: DeserializeOwned, T: DeserializeOwned,
{ {
let body = read_body(res).await; let body = read_body(res).await;
@ -305,7 +308,8 @@ where
pub async fn read_response_json<S, B, T>(app: &S, req: Request) -> T pub async fn read_response_json<S, B, T>(app: &S, req: Request) -> T
where where
S: Service<Request, Response = ServiceResponse<B>, Error = Error>, S: Service<Request, Response = ServiceResponse<B>, Error = Error>,
B: MessageBody<Error = Error> + Unpin, B: MessageBody + Unpin,
B::Error: Into<Error>,
T: DeserializeOwned, T: DeserializeOwned,
{ {
let body = read_response(app, req).await; let body = read_response(app, req).await;