From 05ae2585f353a007a532bf6dc7709fa6afb2faa3 Mon Sep 17 00:00:00 2001 From: Kai Ren Date: Sun, 10 Nov 2019 19:43:02 +0200 Subject: [PATCH] Migrate actix-threadpool to std::future (#59) * Migrate actix-threadpool to std::future * Cosmetic refactor - turn log::error! into log::warn! as it doesn't throw any error - add Clone and Copy impls for Cancelled making it cheap to operate with - apply rustfmt * Bump up crate version to 0.2.0 and pre-fill its changelog * Disable patching 'actix-threadpool' crate in global workspace as unnecessary * Revert patching and fix 'actix-rt' --- actix-rt/Cargo.toml | 2 +- actix-rt/src/arbiter.rs | 4 ++-- actix-threadpool/CHANGES.md | 6 +++++ actix-threadpool/Cargo.toml | 2 +- actix-threadpool/src/lib.rs | 44 ++++++++++++++++++------------------- 5 files changed, 31 insertions(+), 27 deletions(-) diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index db3782ab..2130afaa 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -18,7 +18,7 @@ name = "actix_rt" path = "src/lib.rs" [dependencies] -actix-threadpool = "0.1.1" +actix-threadpool = "0.2" futures = "0.3.1" # TODO: Replace this with dependency on tokio-runtime once it is ready diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index f85dcc96..dc3d0fdc 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -4,11 +4,11 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, thread}; use std::pin::Pin; -use std::task::Context; +use std::task::{Context, Poll}; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot::{channel, Canceled, Sender}; -use futures::{future, Future, Poll, FutureExt, Stream}; +use futures::{future, Future, FutureExt, Stream}; use tokio::runtime::current_thread::spawn; use crate::builder::Builder; diff --git a/actix-threadpool/CHANGES.md b/actix-threadpool/CHANGES.md index 1050c503..aadf9ac9 100644 --- a/actix-threadpool/CHANGES.md +++ b/actix-threadpool/CHANGES.md @@ -1,5 +1,11 @@ # Changes +## [0.2.0] - 2019-??-?? + +### Changed + +* Migrate to `std::future` + ## [0.1.2] - 2019-08-05 ### Changed diff --git a/actix-threadpool/Cargo.toml b/actix-threadpool/Cargo.toml index 03e3e027..da6a9133 100644 --- a/actix-threadpool/Cargo.toml +++ b/actix-threadpool/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-threadpool" -version = "0.1.2" +version = "0.2.0" authors = ["Nikolay Kim "] description = "Actix thread pool for sync code" keywords = ["actix", "network", "framework", "async", "futures"] diff --git a/actix-threadpool/src/lib.rs b/actix-threadpool/src/lib.rs index e78438f5..5ed6b7ca 100644 --- a/actix-threadpool/src/lib.rs +++ b/actix-threadpool/src/lib.rs @@ -1,34 +1,34 @@ //! Thread pool for blocking operations -use std::future::Future; -use std::task::{Poll,Context}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; use derive_more::Display; use futures::channel::oneshot; use parking_lot::Mutex; use threadpool::ThreadPool; -use std::pin::Pin; -/// Env variable for default cpu pool size +/// Env variable for default cpu pool size. const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL"; lazy_static::lazy_static! { pub(crate) static ref DEFAULT_POOL: Mutex = { - let default = match std::env::var(ENV_CPU_POOL_VAR) { - Ok(val) => { - if let Ok(val) = val.parse() { - val - } else { - log::error!("Can not parse ACTIX_THREADPOOL value"); - num_cpus::get() * 5 - } - } - Err(_) => num_cpus::get() * 5, - }; + let num = std::env::var(ENV_CPU_POOL_VAR) + .map_err(|_| ()) + .and_then(|val| { + val.parse().map_err(|_| log::warn!( + "Can not parse {} value, using default", + ENV_CPU_POOL_VAR, + )) + }) + .unwrap_or_else(|_| num_cpus::get() * 5); Mutex::new( threadpool::Builder::new() .thread_name("actix-web".to_owned()) - .num_threads(default) + .num_threads(num) .build(), ) }; @@ -40,8 +40,8 @@ thread_local! { }; } -/// Blocking operation execution error -#[derive(Debug, Display)] +/// Error of blocking operation execution being cancelled. +#[derive(Clone, Copy, Debug, Display)] #[display(fmt = "Thread pool is gone")] pub struct Cancelled; @@ -71,13 +71,11 @@ pub struct CpuFuture { } impl Future for CpuFuture { - type Output = Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let rx = unsafe{ self.map_unchecked_mut(|s|&mut s.rx)}; - let res = futures::ready!(rx.poll(cx)); - + let rx = Pin::new(&mut Pin::get_mut(self).rx); + let res = futures::ready!(rx.poll(cx)); Poll::Ready(res.map_err(|_| Cancelled)) } - }