change stream type errors

This commit is contained in:
Rob Ede 2021-05-18 14:48:04 +01:00
parent 18106170ce
commit ea6f0cd49a
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
14 changed files with 89 additions and 62 deletions

View File

@ -11,6 +11,7 @@
* `ServiceResponse::take_body`. [#2201] * `ServiceResponse::take_body`. [#2201]
* `ServiceResponse::map_body` closure receives and returns `B` instead of `ResponseBody<B>` types. [#2201] * `ServiceResponse::map_body` closure receives and returns `B` instead of `ResponseBody<B>` types. [#2201]
* All error trait bounds in server service builders have changed from `Into<Error>` to `Into<Response<AnyBody>>`. [#2224] * All error trait bounds in server service builders have changed from `Into<Error>` to `Into<Response<AnyBody>>`. [#2224]
* All error trait bounds in message body and stream impls changed from `Into<Error>` to `Into<Box<dyn std::error::Error>>`. [#2224]
### Removed ### Removed
* `HttpResponse::take_body` and old `HttpResponse::into_body` method that casted body type. [#2201] * `HttpResponse::take_body` and old `HttpResponse::into_body` method that casted body type. [#2201]

View File

@ -13,6 +13,7 @@
### Changed ### Changed
* The `MessageBody` trait now has an associated `Error` type. [#2183] * The `MessageBody` trait now has an associated `Error` type. [#2183]
* All error trait bounds in server service builders have changed from `Into<Error>` to `Into<Response<AnyBody>>`. [#2224] * All error trait bounds in server service builders have changed from `Into<Error>` to `Into<Response<AnyBody>>`. [#2224]
* All error trait bounds in message body and stream impls changed from `Into<Error>` to `Into<Box<dyn std::error::Error>>`. [#2224]
* Places in `Response` where `ResponseBody<B>` was received or returned now simply use `B`. [#2201] * Places in `Response` where `ResponseBody<B>` was received or returned now simply use `B`. [#2201]
* `header` mod is now public. [#2171] * `header` mod is now public. [#2171]
* `uri` mod is now public. [#2171] * `uri` mod is now public. [#2171]

View File

@ -174,7 +174,7 @@ where
impl<S, E> From<BodyStream<S>> for AnyBody impl<S, E> From<BodyStream<S>> for AnyBody
where where
S: Stream<Item = Result<Bytes, E>> + 'static, S: Stream<Item = Result<Bytes, E>> + 'static,
E: Into<Error> + 'static, E: Into<Box<dyn StdError>> + 'static,
{ {
fn from(s: BodyStream<S>) -> Body { fn from(s: BodyStream<S>) -> Body {
AnyBody::from_message(s) AnyBody::from_message(s)

View File

@ -1,4 +1,5 @@
use std::{ use std::{
error::Error as StdError,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
@ -7,8 +8,6 @@ use bytes::Bytes;
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::error::Error;
use super::{BodySize, MessageBody}; use super::{BodySize, MessageBody};
pin_project! { pin_project! {
@ -24,7 +23,7 @@ pin_project! {
impl<S, E> BodyStream<S> impl<S, E> BodyStream<S>
where where
S: Stream<Item = Result<Bytes, E>>, S: Stream<Item = Result<Bytes, E>>,
E: Into<Error>, E: Into<Box<dyn StdError>> + 'static,
{ {
pub fn new(stream: S) -> Self { pub fn new(stream: S) -> Self {
BodyStream { stream } BodyStream { stream }
@ -34,9 +33,9 @@ where
impl<S, E> MessageBody for BodyStream<S> impl<S, E> MessageBody for BodyStream<S>
where where
S: Stream<Item = Result<Bytes, E>>, S: Stream<Item = Result<Bytes, E>>,
E: Into<Error>, E: Into<Box<dyn StdError>> + 'static,
{ {
type Error = Error; type Error = E;
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
BodySize::Stream BodySize::Stream
@ -56,7 +55,7 @@ where
let chunk = match ready!(stream.poll_next(cx)) { let chunk = match ready!(stream.poll_next(cx)) {
Some(Ok(ref bytes)) if bytes.is_empty() => continue, Some(Ok(ref bytes)) if bytes.is_empty() => continue,
opt => opt.map(|res| res.map_err(Into::into)), opt => opt,
}; };
return Poll::Ready(chunk); return Poll::Ready(chunk);
@ -66,6 +65,8 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::convert::Infallible;
use actix_rt::pin; use actix_rt::pin;
use actix_utils::future::poll_fn; use actix_utils::future::poll_fn;
use futures_util::stream; use futures_util::stream;
@ -78,7 +79,7 @@ mod tests {
let body = BodyStream::new(stream::iter( let body = BodyStream::new(stream::iter(
["1", "", "2"] ["1", "", "2"]
.iter() .iter()
.map(|&v| Ok(Bytes::from(v)) as Result<Bytes, ()>), .map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
)); ));
pin!(body); pin!(body);
@ -103,7 +104,7 @@ mod tests {
let body = BodyStream::new(stream::iter( let body = BodyStream::new(stream::iter(
["1", "", "2"] ["1", "", "2"]
.iter() .iter()
.map(|&v| Ok(Bytes::from(v)) as Result<Bytes, ()>), .map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
)); ));
assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12"))); assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12")));

View File

@ -1,4 +1,5 @@
use std::{ use std::{
error::Error as StdError,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
@ -7,15 +8,13 @@ use bytes::Bytes;
use futures_core::{ready, Stream}; use futures_core::{ready, Stream};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::error::Error;
use super::{BodySize, MessageBody}; use super::{BodySize, MessageBody};
pin_project! { pin_project! {
/// Known sized streaming response wrapper. /// Known sized streaming response wrapper.
/// ///
/// This body implementation should be used if total size of stream is known. Data get sent as is /// This body implementation should be used if total size of stream is known. Data is sent as-is
/// without using transfer encoding. /// without using chunked transfer encoding.
pub struct SizedStream<S> { pub struct SizedStream<S> {
size: u64, size: u64,
#[pin] #[pin]
@ -23,20 +22,22 @@ pin_project! {
} }
} }
impl<S> SizedStream<S> impl<S, E> SizedStream<S>
where where
S: Stream<Item = Result<Bytes, Error>>, S: Stream<Item = Result<Bytes, E>>,
E: Into<Box<dyn StdError>> + 'static,
{ {
pub fn new(size: u64, stream: S) -> Self { pub fn new(size: u64, stream: S) -> Self {
SizedStream { size, stream } SizedStream { size, stream }
} }
} }
impl<S> MessageBody for SizedStream<S> impl<S, E> MessageBody for SizedStream<S>
where where
S: Stream<Item = Result<Bytes, Error>>, S: Stream<Item = Result<Bytes, E>>,
E: Into<Box<dyn StdError>> + 'static,
{ {
type Error = Error; type Error = E;
fn size(&self) -> BodySize { fn size(&self) -> BodySize {
BodySize::Sized(self.size as u64) BodySize::Sized(self.size as u64)
@ -66,6 +67,8 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::convert::Infallible;
use actix_rt::pin; use actix_rt::pin;
use actix_utils::future::poll_fn; use actix_utils::future::poll_fn;
use futures_util::stream; use futures_util::stream;
@ -77,7 +80,11 @@ mod tests {
async fn skips_empty_chunks() { async fn skips_empty_chunks() {
let body = SizedStream::new( let body = SizedStream::new(
2, 2,
stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))), stream::iter(
["1", "", "2"]
.iter()
.map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
),
); );
pin!(body); pin!(body);
@ -103,7 +110,11 @@ mod tests {
async fn read_to_bytes() { async fn read_to_bytes() {
let body = SizedStream::new( let body = SizedStream::new(
2, 2,
stream::iter(["1", "", "2"].iter().map(|&v| Ok(Bytes::from(v)))), stream::iter(
["1", "", "2"]
.iter()
.map(|&v| Ok::<_, Infallible>(Bytes::from(v))),
),
); );
assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12"))); assert_eq!(to_bytes(body).await.ok(), Some(Bytes::from("12")));

View File

@ -90,7 +90,7 @@ impl fmt::Debug for Error {
} }
} }
impl std::error::Error for Error { impl StdError for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
None None
} }

View File

@ -2,6 +2,7 @@
use std::{ use std::{
cell::{Ref, RefMut}, cell::{Ref, RefMut},
error::Error as StdError,
fmt, fmt,
future::Future, future::Future,
pin::Pin, pin::Pin,
@ -259,7 +260,7 @@ impl ResponseBuilder {
pub fn streaming<S, E>(&mut self, stream: S) -> Response<Body> pub fn streaming<S, E>(&mut self, stream: S) -> Response<Body>
where where
S: Stream<Item = Result<Bytes, E>> + 'static, S: Stream<Item = Result<Bytes, E>> + 'static,
E: Into<Error> + 'static, E: Into<Box<dyn StdError>> + 'static,
{ {
self.body(Body::from_message(BodyStream::new(stream))) self.body(Body::from_message(BodyStream::new(stream)))
} }

View File

@ -2,7 +2,7 @@
extern crate tls_openssl as openssl; extern crate tls_openssl as openssl;
use std::io; use std::{convert::Infallible, io};
use actix_http::{ use actix_http::{
body::{AnyBody, Body, SizedStream}, body::{AnyBody, Body, SizedStream},
@ -330,9 +330,12 @@ async fn test_h2_head_binary2() {
async fn test_h2_body_length() { async fn test_h2_body_length() {
let mut srv = test_server(move || { let mut srv = test_server(move || {
HttpService::build() HttpService::build()
.h2(|_| { .h2(|_| async {
let body = once(ok(Bytes::from_static(STR.as_ref()))); let body = once(async {
ok::<_, ()>( Ok::<_, Infallible>(Bytes::from_static(STR.as_ref()))
});
Ok::<_, Infallible>(
Response::ok().set_body(SizedStream::new(STR.len() as u64, body)), Response::ok().set_body(SizedStream::new(STR.len() as u64, body)),
) )
}) })

View File

@ -2,6 +2,11 @@
extern crate tls_rustls as rustls; extern crate tls_rustls as rustls;
use std::{
convert::Infallible,
io::{self, BufReader},
};
use actix_http::{ use actix_http::{
body::{AnyBody, Body, SizedStream}, body::{AnyBody, Body, SizedStream},
error::PayloadError, error::PayloadError,
@ -23,8 +28,6 @@ use rustls::{
NoClientAuth, ServerConfig as RustlsServerConfig, NoClientAuth, ServerConfig as RustlsServerConfig,
}; };
use std::io::{self, BufReader};
async fn load_body<S>(mut stream: S) -> Result<BytesMut, PayloadError> async fn load_body<S>(mut stream: S) -> Result<BytesMut, PayloadError>
where where
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin, S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
@ -347,7 +350,7 @@ async fn test_h2_body_length() {
let mut srv = test_server(move || { let mut srv = test_server(move || {
HttpService::build() HttpService::build()
.h2(|_| { .h2(|_| {
let body = once(ok(Bytes::from_static(STR.as_ref()))); let body = once(ok::<_, Infallible>(Bytes::from_static(STR.as_ref())));
ok::<_, ()>( ok::<_, ()>(
Response::ok().set_body(SizedStream::new(STR.len() as u64, body)), Response::ok().set_body(SizedStream::new(STR.len() as u64, body)),
) )

View File

@ -1,6 +1,9 @@
use std::io::{Read, Write}; use std::{
use std::time::Duration; convert::Infallible,
use std::{net, thread}; io::{Read, Write},
net, thread,
time::Duration,
};
use actix_http::{ use actix_http::{
body::{AnyBody, Body, SizedStream}, body::{AnyBody, Body, SizedStream},
@ -557,7 +560,7 @@ async fn test_h1_body_length() {
let mut srv = test_server(|| { let mut srv = test_server(|| {
HttpService::build() HttpService::build()
.h1(|_| { .h1(|_| {
let body = once(ok(Bytes::from_static(STR.as_ref()))); let body = once(ok::<_, Infallible>(Bytes::from_static(STR.as_ref())));
ok::<_, ()>( ok::<_, ()>(
Response::ok().set_body(SizedStream::new(STR.len() as u64, body)), Response::ok().set_body(SizedStream::new(STR.len() as u64, body)),
) )

View File

@ -1,21 +1,21 @@
use std::convert::TryFrom; use std::{convert::TryFrom, error::Error as StdError, net, rc::Rc, time::Duration};
use std::net;
use std::rc::Rc;
use std::time::Duration;
use bytes::Bytes; use bytes::Bytes;
use futures_core::Stream; use futures_core::Stream;
use serde::Serialize; use serde::Serialize;
use actix_http::body::Body; use actix_http::{
use actix_http::http::header::IntoHeaderValue; body::Body,
use actix_http::http::{Error as HttpError, HeaderMap, HeaderName, Method, Uri}; http::{header::IntoHeaderValue, Error as HttpError, HeaderMap, HeaderName, Method, Uri},
use actix_http::{Error, RequestHead}; RequestHead,
};
use crate::sender::{RequestSender, SendClientRequest}; use crate::{
use crate::ClientConfig; sender::{RequestSender, SendClientRequest},
ClientConfig,
};
/// `FrozenClientRequest` struct represents clonable client request. /// `FrozenClientRequest` struct represents cloneable client request.
/// It could be used to send same request multiple times. /// It could be used to send same request multiple times.
#[derive(Clone)] #[derive(Clone)]
pub struct FrozenClientRequest { pub struct FrozenClientRequest {
@ -82,7 +82,7 @@ impl FrozenClientRequest {
pub fn send_stream<S, E>(&self, stream: S) -> SendClientRequest pub fn send_stream<S, E>(&self, stream: S) -> SendClientRequest
where where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Box<dyn StdError>> + 'static,
{ {
RequestSender::Rc(self.head.clone(), None).send_stream( RequestSender::Rc(self.head.clone(), None).send_stream(
self.addr, self.addr,
@ -207,7 +207,7 @@ impl FrozenSendBuilder {
pub fn send_stream<S, E>(self, stream: S) -> SendClientRequest pub fn send_stream<S, E>(self, stream: S) -> SendClientRequest
where where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Box<dyn StdError>> + 'static,
{ {
if let Some(e) = self.err { if let Some(e) = self.err {
return e.into(); return e.into();

View File

@ -1,25 +1,26 @@
use std::convert::TryFrom; use std::{convert::TryFrom, error::Error as StdError, fmt, net, rc::Rc, time::Duration};
use std::rc::Rc;
use std::time::Duration;
use std::{fmt, net};
use bytes::Bytes; use bytes::Bytes;
use futures_core::Stream; use futures_core::Stream;
use serde::Serialize; use serde::Serialize;
use actix_http::body::Body; use actix_http::{
use actix_http::http::header::{self, IntoHeaderPair}; body::Body,
use actix_http::http::{ http::{
uri, ConnectionType, Error as HttpError, HeaderMap, HeaderValue, Method, Uri, Version, header::{self, IntoHeaderPair},
uri, ConnectionType, Error as HttpError, HeaderMap, HeaderValue, Method, Uri, Version,
},
RequestHead,
}; };
use actix_http::{Error, RequestHead};
#[cfg(feature = "cookies")] #[cfg(feature = "cookies")]
use crate::cookie::{Cookie, CookieJar}; use crate::cookie::{Cookie, CookieJar};
use crate::error::{FreezeRequestError, InvalidUrl}; use crate::{
use crate::frozen::FrozenClientRequest; error::{FreezeRequestError, InvalidUrl},
use crate::sender::{PrepForSendingError, RequestSender, SendClientRequest}; frozen::FrozenClientRequest,
use crate::ClientConfig; sender::{PrepForSendingError, RequestSender, SendClientRequest},
ClientConfig,
};
#[cfg(feature = "compress")] #[cfg(feature = "compress")]
const HTTPS_ENCODING: &str = "br, gzip, deflate"; const HTTPS_ENCODING: &str = "br, gzip, deflate";
@ -408,7 +409,7 @@ impl ClientRequest {
pub fn send_stream<S, E>(self, stream: S) -> SendClientRequest pub fn send_stream<S, E>(self, stream: S) -> SendClientRequest
where where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Box<dyn StdError>> + 'static,
{ {
let slf = match self.prep_for_sending() { let slf = match self.prep_for_sending() {
Ok(slf) => slf, Ok(slf) => slf,

View File

@ -1,4 +1,5 @@
use std::{ use std::{
error::Error as StdError,
future::Future, future::Future,
io, net, io, net,
pin::Pin, pin::Pin,
@ -266,7 +267,7 @@ impl RequestSender {
) -> SendClientRequest ) -> SendClientRequest
where where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Box<dyn StdError>> + 'static,
{ {
self.send_body( self.send_body(
addr, addr,

View File

@ -1,6 +1,7 @@
use std::{ use std::{
cell::{Ref, RefMut}, cell::{Ref, RefMut},
convert::TryInto, convert::TryInto,
error::Error as StdError,
future::Future, future::Future,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
@ -354,7 +355,7 @@ impl HttpResponseBuilder {
pub fn streaming<S, E>(&mut self, stream: S) -> HttpResponse pub fn streaming<S, E>(&mut self, stream: S) -> HttpResponse
where where
S: Stream<Item = Result<Bytes, E>> + Unpin + 'static, S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
E: Into<Error> + 'static, E: Into<Box<dyn StdError>> + 'static,
{ {
self.body(Body::from_message(BodyStream::new(stream))) self.body(Body::from_message(BodyStream::new(stream)))
} }