diff --git a/actix-threadpool/CHANGES.md b/actix-threadpool/CHANGES.md index 9b481fbf..26dc42a9 100644 --- a/actix-threadpool/CHANGES.md +++ b/actix-threadpool/CHANGES.md @@ -1,5 +1,7 @@ # Changes +* Rework pool's gut. + ## [0.3.3] - 2020-07-14 ### Changed diff --git a/actix-threadpool/Cargo.toml b/actix-threadpool/Cargo.toml index 5998bae7..2250c391 100644 --- a/actix-threadpool/Cargo.toml +++ b/actix-threadpool/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-threadpool" -version = "0.3.3" +version = "0.4.0" authors = ["Nikolay Kim "] description = "Actix thread pool for sync code" keywords = ["actix", "network", "framework", "async", "futures"] @@ -20,8 +20,10 @@ path = "src/lib.rs" [dependencies] derive_more = "0.99.2" futures-channel = "0.3.1" -parking_lot = "0.11" +jian_rs = { git = "https://github.com/fakeshadow/jian_rs"} lazy_static = "1.3" log = "0.4" num_cpus = "1.10" -threadpool = "1.7" + +[dev-dependencies] +actix-rt = "1" diff --git a/actix-threadpool/src/lib.rs b/actix-threadpool/src/lib.rs index 08b563ef..56e5c528 100644 --- a/actix-threadpool/src/lib.rs +++ b/actix-threadpool/src/lib.rs @@ -1,41 +1,80 @@ //! Thread pool for blocking operations +//! +//! The pool would lazily generate thread according to the workload and spawn up to a total amount +//! of you machine's `logical CPU cores * 5` threads. Any spawned threads kept idle for 3 minutes +//! would be recycled and de spawned. +//! +//! # Example: +//! ```rust +//! #[actix_rt::main] +//! async fn main() { +//! // Optional: Set the max thread count for the blocking pool. +//! std::env::set_var("ACTIX_THREADPOOL", "30"); +//! // Optional: Set the timeout duration IN SECONDS for the blocking pool's idle threads. +//! std::env::set_var("ACTIX_THREADPOOL_TIMEOUT", "300"); +//! +//! let future = actix_threadpool::run(|| { +//! /* Some blocking code with a Result as return type */ +//! Ok::(1usize) +//! }); +//! +//! // calling actix::web::block(|| {}) would have the same functionality. +//! +//! /* +//! We can await on this blocking code and NOT block our runtime. +//! When we waiting our actix runtime can switch to other async tasks. +//! */ +//! +//! let result: Result> = future.await; +//! +//! assert_eq!(1usize, result.unwrap()) +//! } +//! ``` use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; use derive_more::Display; use futures_channel::oneshot; -use parking_lot::Mutex; -use threadpool::ThreadPool; +use jian_rs::ThreadPool; /// Env variable for default cpu pool size. -const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL"; +const ENV_MAX_THREADS: &str = "ACTIX_THREADPOOL"; + +/// Env variable for default thread idle timeout duration. +const ENV_IDLE_TIMEOUT: &str = "ACTIX_THREADPOOL_TIMEOUT"; lazy_static::lazy_static! { - pub(crate) static ref DEFAULT_POOL: Mutex = { - let num = std::env::var(ENV_CPU_POOL_VAR) + pub(crate) static ref POOL: ThreadPool = { + let num = std::env::var(ENV_MAX_THREADS) .map_err(|_| ()) .and_then(|val| { val.parse().map_err(|_| log::warn!( "Can not parse {} value, using default", - ENV_CPU_POOL_VAR, + ENV_MAX_THREADS, )) }) .unwrap_or_else(|_| num_cpus::get() * 5); - Mutex::new( - threadpool::Builder::new() - .thread_name("actix-web".to_owned()) - .num_threads(num) - .build(), - ) - }; -} -thread_local! { - static POOL: ThreadPool = { - DEFAULT_POOL.lock().clone() + let dur = std::env::var(ENV_IDLE_TIMEOUT) + .map_err(|_| ()) + .and_then(|val| { + val.parse().map_err(|_| log::warn!( + "Can not parse {} value, using default", + ENV_IDLE_TIMEOUT, + )) + }) + .unwrap_or_else(|_| 60 * 5); + + ThreadPool::builder() + .thread_name("actix-threadpool") + .max_threads(num) + .min_threads(1) + .idle_timeout(Duration::from_secs(dur)) + .build() }; } @@ -59,12 +98,11 @@ where E: Send + fmt::Debug + 'static, { let (tx, rx) = oneshot::channel(); - POOL.with(|pool| { - pool.execute(move || { - if !tx.is_canceled() { - let _ = tx.send(f()); - } - }) + + let _ = POOL.execute(move || { + if !tx.is_canceled() { + let _ = tx.send(f()); + } }); CpuFuture { rx } diff --git a/actix-threadpool/tests/test_thread_pool.rs b/actix-threadpool/tests/test_thread_pool.rs new file mode 100644 index 00000000..b482d189 --- /dev/null +++ b/actix-threadpool/tests/test_thread_pool.rs @@ -0,0 +1,21 @@ +#[actix_rt::test] +async fn async_work() { + use core::sync::atomic::{AtomicUsize, Ordering}; + use core::time::Duration; + + use std::sync::Arc; + + let counter = Arc::new(AtomicUsize::new(0)); + + for _ in 0..1024 { + let counter = counter.clone(); + let _ = actix_threadpool::run(move || { + counter.fetch_add(1, Ordering::Release); + std::thread::sleep(Duration::from_millis(1)); + Ok::<(), ()>(()) + }) + .await; + } + + assert_eq!(1024, counter.load(Ordering::Acquire)); +}