diff --git a/actix-threadpool/src/lib.rs b/actix-threadpool/src/lib.rs index 52895932..5ed6b7ca 100644 --- a/actix-threadpool/src/lib.rs +++ b/actix-threadpool/src/lib.rs @@ -1,34 +1,34 @@ //! Thread pool for blocking operations -use std::future::Future; -use std::task::{Poll,Context}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use derive_more::Display; use futures::channel::oneshot; use parking_lot::Mutex; use threadpool::ThreadPool; -use std::pin::Pin; -/// Env variable for default cpu pool size +/// Env variable for default cpu pool size. const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL"; lazy_static::lazy_static! { pub(crate) static ref DEFAULT_POOL: Mutex = { - let default = match std::env::var(ENV_CPU_POOL_VAR) { - Ok(val) => { - if let Ok(val) = val.parse() { - val - } else { - log::error!("Can not parse ACTIX_THREADPOOL value"); - num_cpus::get() * 5 - } - } - Err(_) => num_cpus::get() * 5, - }; + let num = std::env::var(ENV_CPU_POOL_VAR) + .map_err(|_| ()) + .and_then(|val| { + val.parse().map_err(|_| log::warn!( + "Can not parse {} value, using default", + ENV_CPU_POOL_VAR, + )) + }) + .unwrap_or_else(|_| num_cpus::get() * 5); Mutex::new( threadpool::Builder::new() .thread_name("actix-web".to_owned()) - .num_threads(default) + .num_threads(num) .build(), ) }; @@ -40,8 +40,8 @@ thread_local! { }; } -/// Blocking operation execution error -#[derive(Debug, Display)] +/// Error of blocking operation execution being cancelled. +#[derive(Clone, Copy, Debug, Display)] #[display(fmt = "Thread pool is gone")] pub struct Cancelled; @@ -71,12 +71,11 @@ pub struct CpuFuture { } impl Future for CpuFuture { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let rx = Pin::new(&mut Pin::get_mut(self).rx); + let rx = Pin::new(&mut Pin::get_mut(self).rx); let res = futures::ready!(rx.poll(cx)); Poll::Ready(res.map_err(|_| Cancelled)) } - }