use tokio::task::spawn_blocking as threadpool

This commit is contained in:
fakeshadow 2021-01-03 03:26:48 +08:00
parent 296294061f
commit 5d95014208
2 changed files with 20 additions and 61 deletions

View File

@ -18,10 +18,4 @@ name = "actix_threadpool"
path = "src/lib.rs"
[dependencies]
derive_more = "0.99.2"
futures-channel = "0.3.7"
parking_lot = "0.11"
lazy_static = "1.3"
log = "0.4"
num_cpus = "1.10"
threadpool = "1.7"
tokio = { version = "1", features = ["parking_lot", "rt"] }

View File

@ -9,49 +9,24 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use derive_more::Display;
use futures_channel::oneshot;
use parking_lot::Mutex;
use threadpool::ThreadPool;
/// Env variable for default cpu pool size.
const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL";
lazy_static::lazy_static! {
pub(crate) static ref DEFAULT_POOL: Mutex<ThreadPool> = {
let num = std::env::var(ENV_CPU_POOL_VAR)
.map_err(|_| ())
.and_then(|val| {
val.parse().map_err(|_| log::warn!(
"Can not parse {} value, using default",
ENV_CPU_POOL_VAR,
))
})
.unwrap_or_else(|_| num_cpus::get() * 5);
Mutex::new(
threadpool::Builder::new()
.thread_name("actix-web".to_owned())
.num_threads(num)
.build(),
)
};
}
thread_local! {
static POOL: ThreadPool = {
DEFAULT_POOL.lock().clone()
};
}
use tokio::task::JoinHandle;
/// Blocking operation execution error
#[derive(Debug, Display)]
#[derive(Debug)]
pub enum BlockingError<E: fmt::Debug> {
#[display(fmt = "{:?}", _0)]
Error(E),
#[display(fmt = "Thread pool is gone")]
Canceled,
}
impl<E: fmt::Debug> fmt::Display for BlockingError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Self::Error(ref e) => write!(f, "{:?}", e),
Self::Canceled => write!(f, "Thread pool is gone"),
}
}
}
impl<E: fmt::Debug> std::error::Error for BlockingError<E> {}
/// Execute blocking function on a thread pool, returns future that resolves
@ -62,35 +37,25 @@ where
I: Send + 'static,
E: Send + fmt::Debug + 'static,
{
let (tx, rx) = oneshot::channel();
POOL.with(|pool| {
pool.execute(move || {
if !tx.is_canceled() {
let _ = tx.send(f());
}
})
});
let handle = tokio::task::spawn_blocking(f);
CpuFuture { rx }
CpuFuture { handle }
}
/// Blocking operation completion future. It resolves with results
/// of blocking function execution.
pub struct CpuFuture<I, E> {
rx: oneshot::Receiver<Result<I, E>>,
handle: JoinHandle<Result<I, E>>,
}
impl<I, E: fmt::Debug> Future for CpuFuture<I, E> {
type Output = Result<I, BlockingError<E>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let rx = Pin::new(&mut self.rx);
let res = match rx.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(res) => res
.map_err(|_| BlockingError::Canceled)
.and_then(|res| res.map_err(BlockingError::Error)),
};
Poll::Ready(res)
match Pin::new(&mut self.handle).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(res)) => Poll::Ready(res.map_err(BlockingError::Error)),
Poll::Ready(Err(_)) => Poll::Ready(Err(BlockingError::Canceled)),
}
}
}