diff --git a/actix-threadpool/Cargo.toml b/actix-threadpool/Cargo.toml index 2250c391..58a1d7a0 100644 --- a/actix-threadpool/Cargo.toml +++ b/actix-threadpool/Cargo.toml @@ -24,6 +24,7 @@ jian_rs = { git = "https://github.com/fakeshadow/jian_rs"} lazy_static = "1.3" log = "0.4" num_cpus = "1.10" +parking_lot = "0.11.0" [dev-dependencies] actix-rt = "1" diff --git a/actix-threadpool/src/lib.rs b/actix-threadpool/src/lib.rs index 65db87eb..d06763f4 100644 --- a/actix-threadpool/src/lib.rs +++ b/actix-threadpool/src/lib.rs @@ -1,7 +1,7 @@ //! Thread pool for blocking operations //! //! The pool would lazily generate thread according to the workload and spawn up to a total amount -//! of you machine's `logical CPU cores * 5` threads. Any spawned threads kept idle for 5 minutes +//! of you machine's `logical CPU cores * 5` threads. Any spawned threads kept idle for 1 minutes //! would be recycled and de spawned. //! //! *. Settings are configuable through env variables. @@ -15,14 +15,14 @@ //! // Optional: Set the min thread count for the blocking pool. //! std::env::set_var("ACTIX_THREADPOOL_MIN", "1"); //! // Optional: Set the timeout duration IN SECONDS for the blocking pool's idle threads. -//! std::env::set_var("ACTIX_THREADPOOL_TIMEOUT", "300"); +//! std::env::set_var("ACTIX_THREADPOOL_TIMEOUT", "60"); //! //! let future = actix_threadpool::run(|| { //! /* Some blocking code with a Result as return type */ //! Ok::(1usize) //! }); //! -//! // calling actix::web::block(|| {}) would have the same functionality. +//! // calling actix_web::web::block(|| {}) would have the same functionality. //! //! /* //! We can await on this blocking code and NOT block our runtime. @@ -44,6 +44,7 @@ use std::time::Duration; use derive_more::Display; use futures_channel::oneshot; use jian_rs::ThreadPool; +use parking_lot::Mutex; /// Env variable for default cpu pool max size. const ENV_MAX_THREADS: &str = "ACTIX_THREADPOOL"; @@ -55,46 +56,34 @@ const ENV_MIN_THREADS: &str = "ACTIX_THREADPOOL_MIN"; const ENV_IDLE_TIMEOUT: &str = "ACTIX_THREADPOOL_TIMEOUT"; lazy_static::lazy_static! { - pub(crate) static ref POOL: ThreadPool = { - let max = std::env::var(ENV_MAX_THREADS) - .map_err(|_| ()) - .and_then(|val| { - val.parse().map_err(|_| log::warn!( - "Can not parse {} value, using default", - ENV_MAX_THREADS, - )) - }) - .unwrap_or_else(|_| num_cpus::get() * 5); + pub(crate) static ref POOL: Mutex = { + let max = parse_env(ENV_MAX_THREADS).unwrap_or_else(|| num_cpus::get() * 5); + let min = parse_env(ENV_MIN_THREADS).unwrap_or(1); + let dur = parse_env(ENV_IDLE_TIMEOUT).unwrap_or(60); - let min = std::env::var(ENV_MIN_THREADS) - .map_err(|_| ()) - .and_then(|val| { - val.parse().map_err(|_| log::warn!( - "Can not parse {} value, using default", - ENV_MIN_THREADS, - )) - }) - .unwrap_or_else(|_| 1); - - let dur = std::env::var(ENV_IDLE_TIMEOUT) - .map_err(|_| ()) - .and_then(|val| { - val.parse().map_err(|_| log::warn!( - "Can not parse {} value, using default", - ENV_IDLE_TIMEOUT, - )) - }) - .unwrap_or_else(|_| 60 * 5); - - ThreadPool::builder() + Mutex::new(ThreadPool::builder() .thread_name("actix-threadpool") .max_threads(max) .min_threads(min) .idle_timeout(Duration::from_secs(dur)) - .build() + .build()) }; } +thread_local! { + static POOL_LOCAL: ThreadPool = { + POOL.lock().clone() + } +} + +fn parse_env(env: &str) -> Option { + std::env::var(env).ok().and_then(|val| { + val.parse() + .map_err(|_| log::warn!("Can not parse {} value, using default", env)) + .ok() + }) +} + /// Blocking operation execution error #[derive(Debug, Display)] pub enum BlockingError { @@ -116,10 +105,10 @@ where { let (tx, rx) = oneshot::channel(); - let _ = POOL.execute(move || { - if !tx.is_canceled() { + POOL_LOCAL.with(|pool| { + let _ = pool.execute(move || { let _ = tx.send(f()); - } + }); }); CpuFuture { rx }