diff --git a/actix-threadpool/Cargo.toml b/actix-threadpool/Cargo.toml index 3a633d62..9802fa1e 100644 --- a/actix-threadpool/Cargo.toml +++ b/actix-threadpool/Cargo.toml @@ -18,10 +18,4 @@ name = "actix_threadpool" path = "src/lib.rs" [dependencies] -derive_more = "0.99.2" -futures-channel = "0.3.7" -parking_lot = "0.11" -lazy_static = "1.3" -log = "0.4" -num_cpus = "1.10" -threadpool = "1.7" +tokio = { version = "1", features = ["parking_lot", "rt"] } diff --git a/actix-threadpool/src/lib.rs b/actix-threadpool/src/lib.rs index 2fda28ef..e0a5db34 100644 --- a/actix-threadpool/src/lib.rs +++ b/actix-threadpool/src/lib.rs @@ -9,49 +9,24 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use derive_more::Display; -use futures_channel::oneshot; -use parking_lot::Mutex; -use threadpool::ThreadPool; - -/// Env variable for default cpu pool size. -const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL"; - -lazy_static::lazy_static! { - pub(crate) static ref DEFAULT_POOL: Mutex = { - let num = std::env::var(ENV_CPU_POOL_VAR) - .map_err(|_| ()) - .and_then(|val| { - val.parse().map_err(|_| log::warn!( - "Can not parse {} value, using default", - ENV_CPU_POOL_VAR, - )) - }) - .unwrap_or_else(|_| num_cpus::get() * 5); - Mutex::new( - threadpool::Builder::new() - .thread_name("actix-web".to_owned()) - .num_threads(num) - .build(), - ) - }; -} - -thread_local! { - static POOL: ThreadPool = { - DEFAULT_POOL.lock().clone() - }; -} +use tokio::task::JoinHandle; /// Blocking operation execution error -#[derive(Debug, Display)] +#[derive(Debug)] pub enum BlockingError { - #[display(fmt = "{:?}", _0)] Error(E), - #[display(fmt = "Thread pool is gone")] Canceled, } +impl fmt::Display for BlockingError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + Self::Error(ref e) => write!(f, "{:?}", e), + Self::Canceled => write!(f, "Thread pool is gone"), + } + } +} + impl std::error::Error for BlockingError {} /// Execute blocking function on a thread pool, returns future that resolves @@ -62,35 +37,25 @@ where I: Send + 'static, E: Send + fmt::Debug + 'static, { - let (tx, rx) = oneshot::channel(); - POOL.with(|pool| { - pool.execute(move || { - if !tx.is_canceled() { - let _ = tx.send(f()); - } - }) - }); + let handle = tokio::task::spawn_blocking(f); - CpuFuture { rx } + CpuFuture { handle } } /// Blocking operation completion future. It resolves with results /// of blocking function execution. pub struct CpuFuture { - rx: oneshot::Receiver>, + handle: JoinHandle>, } impl Future for CpuFuture { type Output = Result>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let rx = Pin::new(&mut self.rx); - let res = match rx.poll(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(res) => res - .map_err(|_| BlockingError::Canceled) - .and_then(|res| res.map_err(BlockingError::Error)), - }; - Poll::Ready(res) + match Pin::new(&mut self.handle).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(res)) => Poll::Ready(res.map_err(BlockingError::Error)), + Poll::Ready(Err(_)) => Poll::Ready(Err(BlockingError::Canceled)), + } } }