From b410a5e7a12b3e005bacb328cc396f1e44f385c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Hornick=C3=BD?= Date: Sat, 14 Sep 2019 12:38:14 +0200 Subject: [PATCH] Migrate actix-threadpool to std::future --- actix-threadpool/Cargo.toml | 2 +- actix-threadpool/src/lib.rs | 40 ++++++++++++++++--------------------- 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/actix-threadpool/Cargo.toml b/actix-threadpool/Cargo.toml index 5e9d855d..643c380b 100644 --- a/actix-threadpool/Cargo.toml +++ b/actix-threadpool/Cargo.toml @@ -19,7 +19,7 @@ path = "src/lib.rs" [dependencies] derive_more = "0.15" -futures = "0.1.25" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } parking_lot = "0.9" lazy_static = "1.2" log = "0.4" diff --git a/actix-threadpool/src/lib.rs b/actix-threadpool/src/lib.rs index b9952e62..52895932 100644 --- a/actix-threadpool/src/lib.rs +++ b/actix-threadpool/src/lib.rs @@ -1,12 +1,13 @@ //! Thread pool for blocking operations -use std::fmt; +use std::future::Future; +use std::task::{Poll,Context}; use derive_more::Display; -use futures::sync::oneshot; -use futures::{Async, Future, Poll}; +use futures::channel::oneshot; use parking_lot::Mutex; use threadpool::ThreadPool; +use std::pin::Pin; /// Env variable for default cpu pool size const ENV_CPU_POOL_VAR: &str = "ACTIX_THREADPOOL"; @@ -41,20 +42,15 @@ thread_local! { /// Blocking operation execution error #[derive(Debug, Display)] -pub enum BlockingError { - #[display(fmt = "{:?}", _0)] - Error(E), - #[display(fmt = "Thread pool is gone")] - Canceled, -} +#[display(fmt = "Thread pool is gone")] +pub struct Cancelled; /// Execute blocking function on a thread pool, returns future that resolves /// to result of the function execution. -pub fn run(f: F) -> CpuFuture +pub fn run(f: F) -> CpuFuture where - F: FnOnce() -> Result + Send + 'static, + F: FnOnce() -> I + Send + 'static, I: Send + 'static, - E: Send + fmt::Debug + 'static, { let (tx, rx) = oneshot::channel(); POOL.with(|pool| { @@ -70,19 +66,17 @@ where /// Blocking operation completion future. It resolves with results /// of blocking function execution. -pub struct CpuFuture { - rx: oneshot::Receiver>, +pub struct CpuFuture { + rx: oneshot::Receiver, } -impl Future for CpuFuture { - type Item = I; - type Error = BlockingError; +impl Future for CpuFuture { + type Output = Result; - fn poll(&mut self) -> Poll { - let res = futures::try_ready!(self.rx.poll().map_err(|_| BlockingError::Canceled)); - match res { - Ok(val) => Ok(Async::Ready(val)), - Err(err) => Err(BlockingError::Error(err)), - } + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let rx = Pin::new(&mut Pin::get_mut(self).rx); + let res = futures::ready!(rx.poll(cx)); + Poll::Ready(res.map_err(|_| Cancelled)) } + }