mirror of https://github.com/fafhrd91/actix-web
introduce encoder error type
This commit is contained in:
parent
4261cab3a5
commit
ea379d532a
|
@ -1,6 +1,7 @@
|
||||||
//! Stream encoders.
|
//! Stream encoders.
|
||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
|
error::Error as StdError,
|
||||||
future::Future,
|
future::Future,
|
||||||
io::{self, Write as _},
|
io::{self, Write as _},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
|
@ -10,6 +11,7 @@ use std::{
|
||||||
use actix_rt::task::{spawn_blocking, JoinHandle};
|
use actix_rt::task::{spawn_blocking, JoinHandle};
|
||||||
use brotli2::write::BrotliEncoder;
|
use brotli2::write::BrotliEncoder;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
|
use derive_more::Display;
|
||||||
use flate2::write::{GzEncoder, ZlibEncoder};
|
use flate2::write::{GzEncoder, ZlibEncoder};
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use pin_project::pin_project;
|
use pin_project::pin_project;
|
||||||
|
@ -95,8 +97,12 @@ enum EncoderBody<B> {
|
||||||
BoxedStream(BoxAnyBody),
|
BoxedStream(BoxAnyBody),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: MessageBody<Error = Error>> MessageBody for EncoderBody<B> {
|
impl<B> MessageBody for EncoderBody<B>
|
||||||
type Error = Error;
|
where
|
||||||
|
B: MessageBody,
|
||||||
|
B::Error: Into<Error>,
|
||||||
|
{
|
||||||
|
type Error = EncoderError<B::Error>;
|
||||||
|
|
||||||
fn size(&self) -> BodySize {
|
fn size(&self) -> BodySize {
|
||||||
match self {
|
match self {
|
||||||
|
@ -109,7 +115,7 @@ impl<B: MessageBody<Error = Error>> MessageBody for EncoderBody<B> {
|
||||||
fn poll_next(
|
fn poll_next(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<Result<Bytes, Error>>> {
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
match self.project() {
|
match self.project() {
|
||||||
EncoderBodyProj::Bytes(b) => {
|
EncoderBodyProj::Bytes(b) => {
|
||||||
if b.is_empty() {
|
if b.is_empty() {
|
||||||
|
@ -118,10 +124,17 @@ impl<B: MessageBody<Error = Error>> MessageBody for EncoderBody<B> {
|
||||||
Poll::Ready(Some(Ok(std::mem::take(b))))
|
Poll::Ready(Some(Ok(std::mem::take(b))))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EncoderBodyProj::Stream(b) => b.poll_next(cx),
|
// TODO: MSRV 1.51: poll_map_err
|
||||||
|
EncoderBodyProj::Stream(b) => match ready!(b.poll_next(cx)) {
|
||||||
|
Some(Err(err)) => Poll::Ready(Some(Err(EncoderError::Body(err)))),
|
||||||
|
Some(Ok(val)) => Poll::Ready(Some(Ok(val))),
|
||||||
|
None => Poll::Ready(None),
|
||||||
|
},
|
||||||
EncoderBodyProj::BoxedStream(ref mut b) => {
|
EncoderBodyProj::BoxedStream(ref mut b) => {
|
||||||
match ready!(b.as_mut().poll_next(cx)) {
|
match ready!(b.as_mut().poll_next(cx)) {
|
||||||
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
|
Some(Err(err)) => {
|
||||||
|
Poll::Ready(Some(Err(EncoderError::Boxed(err.into()))))
|
||||||
|
}
|
||||||
Some(Ok(val)) => Poll::Ready(Some(Ok(val))),
|
Some(Ok(val)) => Poll::Ready(Some(Ok(val))),
|
||||||
None => Poll::Ready(None),
|
None => Poll::Ready(None),
|
||||||
}
|
}
|
||||||
|
@ -130,8 +143,12 @@ impl<B: MessageBody<Error = Error>> MessageBody for EncoderBody<B> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<B: MessageBody<Error = Error>> MessageBody for Encoder<B> {
|
impl<B> MessageBody for Encoder<B>
|
||||||
type Error = Error;
|
where
|
||||||
|
B: MessageBody,
|
||||||
|
B::Error: Into<Error>,
|
||||||
|
{
|
||||||
|
type Error = EncoderError<B::Error>;
|
||||||
|
|
||||||
fn size(&self) -> BodySize {
|
fn size(&self) -> BodySize {
|
||||||
if self.encoder.is_none() {
|
if self.encoder.is_none() {
|
||||||
|
@ -144,7 +161,7 @@ impl<B: MessageBody<Error = Error>> MessageBody for Encoder<B> {
|
||||||
fn poll_next(
|
fn poll_next(
|
||||||
self: Pin<&mut Self>,
|
self: Pin<&mut Self>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
||||||
) -> Poll<Option<Result<Bytes, Error>>> {
|
) -> Poll<Option<Result<Bytes, Self::Error>>> {
|
||||||
let mut this = self.project();
|
let mut this = self.project();
|
||||||
loop {
|
loop {
|
||||||
if *this.eof {
|
if *this.eof {
|
||||||
|
@ -152,8 +169,9 @@ impl<B: MessageBody<Error = Error>> MessageBody for Encoder<B> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(ref mut fut) = this.fut {
|
if let Some(ref mut fut) = this.fut {
|
||||||
let mut encoder =
|
let mut encoder = ready!(Pin::new(fut).poll(cx))
|
||||||
ready!(Pin::new(fut).poll(cx)).map_err(|_| BlockingError)??;
|
.map_err(|_| EncoderError::Blocking(BlockingError))?
|
||||||
|
.map_err(EncoderError::Io)?;
|
||||||
|
|
||||||
let chunk = encoder.take();
|
let chunk = encoder.take();
|
||||||
*this.encoder = Some(encoder);
|
*this.encoder = Some(encoder);
|
||||||
|
@ -172,7 +190,7 @@ impl<B: MessageBody<Error = Error>> MessageBody for Encoder<B> {
|
||||||
Some(Ok(chunk)) => {
|
Some(Ok(chunk)) => {
|
||||||
if let Some(mut encoder) = this.encoder.take() {
|
if let Some(mut encoder) = this.encoder.take() {
|
||||||
if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE {
|
if chunk.len() < MAX_CHUNK_SIZE_ENCODE_IN_PLACE {
|
||||||
encoder.write(&chunk)?;
|
encoder.write(&chunk).map_err(EncoderError::Io)?;
|
||||||
let chunk = encoder.take();
|
let chunk = encoder.take();
|
||||||
*this.encoder = Some(encoder);
|
*this.encoder = Some(encoder);
|
||||||
|
|
||||||
|
@ -192,7 +210,7 @@ impl<B: MessageBody<Error = Error>> MessageBody for Encoder<B> {
|
||||||
|
|
||||||
None => {
|
None => {
|
||||||
if let Some(encoder) = this.encoder.take() {
|
if let Some(encoder) = this.encoder.take() {
|
||||||
let chunk = encoder.finish()?;
|
let chunk = encoder.finish().map_err(EncoderError::Io)?;
|
||||||
if chunk.is_empty() {
|
if chunk.is_empty() {
|
||||||
return Poll::Ready(None);
|
return Poll::Ready(None);
|
||||||
} else {
|
} else {
|
||||||
|
@ -291,3 +309,35 @@ impl ContentEncoder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Display)]
|
||||||
|
pub enum EncoderError<E> {
|
||||||
|
#[display(fmt = "body")]
|
||||||
|
Body(E),
|
||||||
|
|
||||||
|
#[display(fmt = "boxed")]
|
||||||
|
Boxed(Error),
|
||||||
|
|
||||||
|
#[display(fmt = "blocking")]
|
||||||
|
Blocking(BlockingError),
|
||||||
|
|
||||||
|
#[display(fmt = "io")]
|
||||||
|
Io(io::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: StdError> StdError for EncoderError<E> {
|
||||||
|
fn source(&self) -> Option<&(dyn StdError + 'static)> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<E: Into<Error>> From<EncoderError<E>> for Error {
|
||||||
|
fn from(err: EncoderError<E>) -> Self {
|
||||||
|
match err {
|
||||||
|
EncoderError::Body(err) => err.into(),
|
||||||
|
EncoderError::Boxed(err) => err,
|
||||||
|
EncoderError::Blocking(err) => err.into(),
|
||||||
|
EncoderError::Io(err) => err.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue