From eae605b959a0b6c947cb00f52c7a6f7c3c0444c9 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 6 Jan 2021 19:20:16 +0800 Subject: [PATCH] remove actix-threadpool.use actix_rt::task::spawn_blocking --- CHANGES.md | 4 +++- Cargo.toml | 4 +++- actix-http/CHANGES.md | 5 ++++- actix-http/Cargo.toml | 1 - actix-http/src/encoding/decoder.rs | 17 +++++++++++------ actix-http/src/encoding/encoder.rs | 18 +++++++++++++----- actix-http/src/error.rs | 18 ++++++++++++++---- src/web.rs | 8 +++++++- 8 files changed, 55 insertions(+), 20 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index f0b55801b..03f5fc19e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -21,13 +21,15 @@ ### Removed * Public modules `middleware::{normalize, err_handlers}`. All necessary middleware structs are now exposed directly by the `middleware` module. +* Remove `actix-threadpool` as dependency. `actix_threadpool::Blocking` error type can be imported + from `actix_hweb::error` module. [#1878] [#1812]: https://github.com/actix/actix-web/pull/1812 [#1813]: https://github.com/actix/actix-web/pull/1813 [#1852]: https://github.com/actix/actix-web/pull/1852 [#1865]: https://github.com/actix/actix-web/pull/1865 [#1875]: https://github.com/actix/actix-web/pull/1875 - +[#1878]: https://github.com/actix/actix-web/pull/1878 ## 3.3.2 - 2020-12-01 ### Fixed diff --git a/Cargo.toml b/Cargo.toml index 5388de4ed..1d951aefb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,6 @@ actix-rt = "2.0.0-beta.1" actix-server = "2.0.0-beta.2" actix-service = "2.0.0-beta.2" actix-utils = "3.0.0-beta.1" -actix-threadpool = "0.3.1" actix-tls = { version = "3.0.0-beta.2", default-features = false, optional = true } actix-web-codegen = "0.4.0" @@ -131,6 +130,9 @@ actix-multipart = { path = "actix-multipart" } actix-files = { path = "actix-files" } awc = { path = "awc" } +# TODO: remove override +actix-rt = { git = "https://github.com/actix/actix-net.git" } + [[bench]] name = "server" harness = false diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index eadbf6f46..284791aee 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -16,11 +16,14 @@ * Remove `ConnectError::SslHandshakeError` and re-export of `HandshakeError`. due to the removal of this type from `tokio-openssl` crate. openssl handshake error would return as `ConnectError::SslError`. [#1813] +* Remove `actix-threadpool` dependency. Use `actix_rt::task::spawn_blocking`. + Due to this change `actix_threadpool::BlockingError` type is moved into + `actix_http::error` module. [#1878] [#1813]: https://github.com/actix/actix-web/pull/1813 [#1857]: https://github.com/actix/actix-web/pull/1857 [#1864]: https://github.com/actix/actix-web/pull/1864 - +[#1878]: https://github.com/actix/actix-web/pull/1878 ## 2.2.0 - 2020-11-25 ### Added diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index e80800d06..ae71632f2 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -44,7 +44,6 @@ actix-service = "2.0.0-beta.2" actix-codec = "0.4.0-beta.1" actix-utils = "3.0.0-beta.1" actix-rt = "2.0.0-beta.1" -actix-threadpool = "0.3.1" actix-tls = "3.0.0-beta.2" actix = { version = "0.11.0-beta.1", optional = true } diff --git a/actix-http/src/encoding/decoder.rs b/actix-http/src/encoding/decoder.rs index b60435859..b26609911 100644 --- a/actix-http/src/encoding/decoder.rs +++ b/actix-http/src/encoding/decoder.rs @@ -3,14 +3,14 @@ use std::io::{self, Write}; use std::pin::Pin; use std::task::{Context, Poll}; -use actix_threadpool::{run, CpuFuture}; +use actix_rt::task::{spawn_blocking, JoinHandle}; use brotli2::write::BrotliDecoder; use bytes::Bytes; use flate2::write::{GzDecoder, ZlibDecoder}; use futures_core::{ready, Stream}; use super::Writer; -use crate::error::PayloadError; +use crate::error::{BlockingError, PayloadError}; use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING}; const INPLACE: usize = 2049; @@ -19,7 +19,7 @@ pub struct Decoder { decoder: Option, stream: S, eof: bool, - fut: Option, ContentDecoder), io::Error>>, + fut: Option, ContentDecoder), io::Error>>>, } impl Decoder @@ -80,8 +80,13 @@ where loop { if let Some(ref mut fut) = self.fut { let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) { - Ok(item) => item, - Err(e) => return Poll::Ready(Some(Err(e.into()))), + Ok(Ok(item)) => item, + Ok(Err(e)) => { + return Poll::Ready(Some(Err(BlockingError::Error(e).into()))) + } + Err(_) => { + return Poll::Ready(Some(Err(BlockingError::Canceled.into()))) + } }; self.decoder = Some(decoder); self.fut.take(); @@ -105,7 +110,7 @@ where return Poll::Ready(Some(Ok(chunk))); } } else { - self.fut = Some(run(move || { + self.fut = Some(spawn_blocking(move || { let chunk = decoder.feed_data(chunk)?; Ok((chunk, decoder)) })); diff --git a/actix-http/src/encoding/encoder.rs b/actix-http/src/encoding/encoder.rs index eb1821285..28c757076 100644 --- a/actix-http/src/encoding/encoder.rs +++ b/actix-http/src/encoding/encoder.rs @@ -4,7 +4,7 @@ use std::io::{self, Write}; use std::pin::Pin; use std::task::{Context, Poll}; -use actix_threadpool::{run, CpuFuture}; +use actix_rt::task::{spawn_blocking, JoinHandle}; use brotli2::write::BrotliEncoder; use bytes::Bytes; use flate2::write::{GzEncoder, ZlibEncoder}; @@ -17,6 +17,7 @@ use crate::http::{HeaderValue, StatusCode}; use crate::{Error, ResponseHead}; use super::Writer; +use crate::error::BlockingError; const INPLACE: usize = 1024; @@ -26,7 +27,7 @@ pub struct Encoder { #[pin] body: EncoderBody, encoder: Option, - fut: Option>, + fut: Option>>, } impl Encoder { @@ -136,8 +137,15 @@ impl MessageBody for Encoder { if let Some(ref mut fut) = this.fut { let mut encoder = match ready!(Pin::new(fut).poll(cx)) { - Ok(item) => item, - Err(e) => return Poll::Ready(Some(Err(e.into()))), + Ok(Ok(item)) => item, + Ok(Err(e)) => { + return Poll::Ready(Some(Err(BlockingError::Error(e).into()))) + } + Err(_) => { + return Poll::Ready(Some(Err( + BlockingError::::Canceled.into(), + ))) + } }; let chunk = encoder.take(); *this.encoder = Some(encoder); @@ -160,7 +168,7 @@ impl MessageBody for Encoder { return Poll::Ready(Some(Ok(chunk))); } } else { - *this.fut = Some(run(move || { + *this.fut = Some(spawn_blocking(move || { encoder.write(&chunk)?; Ok(encoder) })); diff --git a/actix-http/src/error.rs b/actix-http/src/error.rs index 03e5467c5..df099790c 100644 --- a/actix-http/src/error.rs +++ b/actix-http/src/error.rs @@ -7,7 +7,6 @@ use std::string::FromUtf8Error; use std::{fmt, io, result}; use actix_codec::{Decoder, Encoder}; -pub use actix_threadpool::BlockingError; use actix_utils::dispatcher::DispatcherError as FramedDispatcherError; use actix_utils::timeout::TimeoutError; use bytes::BytesMut; @@ -190,9 +189,6 @@ impl ResponseError for DeError { /// `InternalServerError` for `Canceled` impl ResponseError for Canceled {} -/// `InternalServerError` for `BlockingError` -impl ResponseError for BlockingError {} - /// Return `BAD_REQUEST` for `Utf8Error` impl ResponseError for Utf8Error { fn status_code(&self) -> StatusCode { @@ -304,6 +300,20 @@ impl From for ParseError { } } +#[derive(Debug, Display)] +/// A set of errors that can occur running blocking tasks in thread pool. +pub enum BlockingError { + #[display(fmt = "{:?}", _0)] + Error(E), + #[display(fmt = "Thread pool is gone")] + Canceled, +} + +impl std::error::Error for BlockingError {} + +/// `InternalServerError` for `BlockingError` +impl ResponseError for BlockingError {} + #[derive(Display, Debug)] /// A set of errors that can occur during payload parsing pub enum PayloadError { diff --git a/src/web.rs b/src/web.rs index 39dfc450a..2c0a9e26a 100644 --- a/src/web.rs +++ b/src/web.rs @@ -280,5 +280,11 @@ where I: Send + 'static, E: Send + std::fmt::Debug + 'static, { - actix_threadpool::run(f).await + // map error to BlockingError. this is for not breaking the existing api. + // preferably this should just return actix_rt::task::JoinHandle for a more + // flexible control of when the task is awaited and/or the abort(if needed). + match actix_rt::task::spawn_blocking(f).await { + Ok(res) => res.map_err(BlockingError::Error), + Err(_) => Err(BlockingError::Canceled), + } }