mirror of https://github.com/fafhrd91/actix-web
remove actix-threadpool.use actix_rt::task::spawn_blocking
This commit is contained in:
parent
68117543ea
commit
eae605b959
|
@ -21,13 +21,15 @@
|
|||
### Removed
|
||||
* Public modules `middleware::{normalize, err_handlers}`. All necessary middleware structs are now
|
||||
exposed directly by the `middleware` module.
|
||||
* Remove `actix-threadpool` as dependency. `actix_threadpool::Blocking` error type can be imported
|
||||
from `actix_hweb::error` module. [#1878]
|
||||
|
||||
[#1812]: https://github.com/actix/actix-web/pull/1812
|
||||
[#1813]: https://github.com/actix/actix-web/pull/1813
|
||||
[#1852]: https://github.com/actix/actix-web/pull/1852
|
||||
[#1865]: https://github.com/actix/actix-web/pull/1865
|
||||
[#1875]: https://github.com/actix/actix-web/pull/1875
|
||||
|
||||
[#1878]: https://github.com/actix/actix-web/pull/1878
|
||||
|
||||
## 3.3.2 - 2020-12-01
|
||||
### Fixed
|
||||
|
|
|
@ -80,7 +80,6 @@ actix-rt = "2.0.0-beta.1"
|
|||
actix-server = "2.0.0-beta.2"
|
||||
actix-service = "2.0.0-beta.2"
|
||||
actix-utils = "3.0.0-beta.1"
|
||||
actix-threadpool = "0.3.1"
|
||||
actix-tls = { version = "3.0.0-beta.2", default-features = false, optional = true }
|
||||
|
||||
actix-web-codegen = "0.4.0"
|
||||
|
@ -131,6 +130,9 @@ actix-multipart = { path = "actix-multipart" }
|
|||
actix-files = { path = "actix-files" }
|
||||
awc = { path = "awc" }
|
||||
|
||||
# TODO: remove override
|
||||
actix-rt = { git = "https://github.com/actix/actix-net.git" }
|
||||
|
||||
[[bench]]
|
||||
name = "server"
|
||||
harness = false
|
||||
|
|
|
@ -16,11 +16,14 @@
|
|||
* Remove `ConnectError::SslHandshakeError` and re-export of `HandshakeError`.
|
||||
due to the removal of this type from `tokio-openssl` crate. openssl handshake
|
||||
error would return as `ConnectError::SslError`. [#1813]
|
||||
* Remove `actix-threadpool` dependency. Use `actix_rt::task::spawn_blocking`.
|
||||
Due to this change `actix_threadpool::BlockingError` type is moved into
|
||||
`actix_http::error` module. [#1878]
|
||||
|
||||
[#1813]: https://github.com/actix/actix-web/pull/1813
|
||||
[#1857]: https://github.com/actix/actix-web/pull/1857
|
||||
[#1864]: https://github.com/actix/actix-web/pull/1864
|
||||
|
||||
[#1878]: https://github.com/actix/actix-web/pull/1878
|
||||
|
||||
## 2.2.0 - 2020-11-25
|
||||
### Added
|
||||
|
|
|
@ -44,7 +44,6 @@ actix-service = "2.0.0-beta.2"
|
|||
actix-codec = "0.4.0-beta.1"
|
||||
actix-utils = "3.0.0-beta.1"
|
||||
actix-rt = "2.0.0-beta.1"
|
||||
actix-threadpool = "0.3.1"
|
||||
actix-tls = "3.0.0-beta.2"
|
||||
actix = { version = "0.11.0-beta.1", optional = true }
|
||||
|
||||
|
|
|
@ -3,14 +3,14 @@ use std::io::{self, Write};
|
|||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use actix_threadpool::{run, CpuFuture};
|
||||
use actix_rt::task::{spawn_blocking, JoinHandle};
|
||||
use brotli2::write::BrotliDecoder;
|
||||
use bytes::Bytes;
|
||||
use flate2::write::{GzDecoder, ZlibDecoder};
|
||||
use futures_core::{ready, Stream};
|
||||
|
||||
use super::Writer;
|
||||
use crate::error::PayloadError;
|
||||
use crate::error::{BlockingError, PayloadError};
|
||||
use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING};
|
||||
|
||||
const INPLACE: usize = 2049;
|
||||
|
@ -19,7 +19,7 @@ pub struct Decoder<S> {
|
|||
decoder: Option<ContentDecoder>,
|
||||
stream: S,
|
||||
eof: bool,
|
||||
fut: Option<CpuFuture<(Option<Bytes>, ContentDecoder), io::Error>>,
|
||||
fut: Option<JoinHandle<Result<(Option<Bytes>, ContentDecoder), io::Error>>>,
|
||||
}
|
||||
|
||||
impl<S> Decoder<S>
|
||||
|
@ -80,8 +80,13 @@ where
|
|||
loop {
|
||||
if let Some(ref mut fut) = self.fut {
|
||||
let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) {
|
||||
Ok(item) => item,
|
||||
Err(e) => return Poll::Ready(Some(Err(e.into()))),
|
||||
Ok(Ok(item)) => item,
|
||||
Ok(Err(e)) => {
|
||||
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
|
||||
}
|
||||
Err(_) => {
|
||||
return Poll::Ready(Some(Err(BlockingError::Canceled.into())))
|
||||
}
|
||||
};
|
||||
self.decoder = Some(decoder);
|
||||
self.fut.take();
|
||||
|
@ -105,7 +110,7 @@ where
|
|||
return Poll::Ready(Some(Ok(chunk)));
|
||||
}
|
||||
} else {
|
||||
self.fut = Some(run(move || {
|
||||
self.fut = Some(spawn_blocking(move || {
|
||||
let chunk = decoder.feed_data(chunk)?;
|
||||
Ok((chunk, decoder))
|
||||
}));
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::io::{self, Write};
|
|||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use actix_threadpool::{run, CpuFuture};
|
||||
use actix_rt::task::{spawn_blocking, JoinHandle};
|
||||
use brotli2::write::BrotliEncoder;
|
||||
use bytes::Bytes;
|
||||
use flate2::write::{GzEncoder, ZlibEncoder};
|
||||
|
@ -17,6 +17,7 @@ use crate::http::{HeaderValue, StatusCode};
|
|||
use crate::{Error, ResponseHead};
|
||||
|
||||
use super::Writer;
|
||||
use crate::error::BlockingError;
|
||||
|
||||
const INPLACE: usize = 1024;
|
||||
|
||||
|
@ -26,7 +27,7 @@ pub struct Encoder<B> {
|
|||
#[pin]
|
||||
body: EncoderBody<B>,
|
||||
encoder: Option<ContentEncoder>,
|
||||
fut: Option<CpuFuture<ContentEncoder, io::Error>>,
|
||||
fut: Option<JoinHandle<Result<ContentEncoder, io::Error>>>,
|
||||
}
|
||||
|
||||
impl<B: MessageBody> Encoder<B> {
|
||||
|
@ -136,8 +137,15 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
|
|||
|
||||
if let Some(ref mut fut) = this.fut {
|
||||
let mut encoder = match ready!(Pin::new(fut).poll(cx)) {
|
||||
Ok(item) => item,
|
||||
Err(e) => return Poll::Ready(Some(Err(e.into()))),
|
||||
Ok(Ok(item)) => item,
|
||||
Ok(Err(e)) => {
|
||||
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
|
||||
}
|
||||
Err(_) => {
|
||||
return Poll::Ready(Some(Err(
|
||||
BlockingError::<io::Error>::Canceled.into(),
|
||||
)))
|
||||
}
|
||||
};
|
||||
let chunk = encoder.take();
|
||||
*this.encoder = Some(encoder);
|
||||
|
@ -160,7 +168,7 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
|
|||
return Poll::Ready(Some(Ok(chunk)));
|
||||
}
|
||||
} else {
|
||||
*this.fut = Some(run(move || {
|
||||
*this.fut = Some(spawn_blocking(move || {
|
||||
encoder.write(&chunk)?;
|
||||
Ok(encoder)
|
||||
}));
|
||||
|
|
|
@ -7,7 +7,6 @@ use std::string::FromUtf8Error;
|
|||
use std::{fmt, io, result};
|
||||
|
||||
use actix_codec::{Decoder, Encoder};
|
||||
pub use actix_threadpool::BlockingError;
|
||||
use actix_utils::dispatcher::DispatcherError as FramedDispatcherError;
|
||||
use actix_utils::timeout::TimeoutError;
|
||||
use bytes::BytesMut;
|
||||
|
@ -190,9 +189,6 @@ impl ResponseError for DeError {
|
|||
/// `InternalServerError` for `Canceled`
|
||||
impl ResponseError for Canceled {}
|
||||
|
||||
/// `InternalServerError` for `BlockingError`
|
||||
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}
|
||||
|
||||
/// Return `BAD_REQUEST` for `Utf8Error`
|
||||
impl ResponseError for Utf8Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
|
@ -304,6 +300,20 @@ impl From<httparse::Error> for ParseError {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Display)]
|
||||
/// A set of errors that can occur running blocking tasks in thread pool.
|
||||
pub enum BlockingError<E: fmt::Debug> {
|
||||
#[display(fmt = "{:?}", _0)]
|
||||
Error(E),
|
||||
#[display(fmt = "Thread pool is gone")]
|
||||
Canceled,
|
||||
}
|
||||
|
||||
impl<E: fmt::Debug> std::error::Error for BlockingError<E> {}
|
||||
|
||||
/// `InternalServerError` for `BlockingError`
|
||||
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}
|
||||
|
||||
#[derive(Display, Debug)]
|
||||
/// A set of errors that can occur during payload parsing
|
||||
pub enum PayloadError {
|
||||
|
|
|
@ -280,5 +280,11 @@ where
|
|||
I: Send + 'static,
|
||||
E: Send + std::fmt::Debug + 'static,
|
||||
{
|
||||
actix_threadpool::run(f).await
|
||||
// map error to BlockingError. this is for not breaking the existing api.
|
||||
// preferably this should just return actix_rt::task::JoinHandle for a more
|
||||
// flexible control of when the task is awaited and/or the abort(if needed).
|
||||
match actix_rt::task::spawn_blocking(f).await {
|
||||
Ok(res) => res.map_err(BlockingError::Error),
|
||||
Err(_) => Err(BlockingError::Canceled),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue