From a36f7f4ca83f45cf36a47ca486f5fd1976e09253 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Hornick=C3=BD?= Date: Tue, 1 Oct 2019 15:02:12 +0200 Subject: [PATCH] Migrate actix-rt to std::future --- actix-rt/Cargo.toml | 14 +++-- actix-rt/src/arbiter.rs | 124 ++++++++++++++++++++++------------------ actix-rt/src/builder.rs | 72 +++++++++++------------ actix-rt/src/lib.rs | 2 +- actix-rt/src/runtime.rs | 45 ++++++--------- actix-rt/src/system.rs | 6 +- 6 files changed, 134 insertions(+), 129 deletions(-) diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 8131ed19..9deeaaf6 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -19,9 +19,13 @@ path = "src/lib.rs" [dependencies] actix-threadpool = "0.1.1" -futures = "0.1.25" -tokio-current-thread = "0.1" -tokio-executor = "0.1.5" -tokio-reactor = "0.1.7" -tokio-timer = "0.2.8" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } + + +# TODO: Replace this with dependency on tokio-runtime once it is ready +tokio = { version = "0.2.0-alpha.4" } +tokio-timer = "=0.3.0-alpha.4" +tokio-executor = "=0.2.0-alpha.4" +tokio-net = "=0.2.0-alpha.4" + copyless = "0.1.4" diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 285ad3c9..8b4150a4 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -3,11 +3,13 @@ use std::cell::{Cell, RefCell}; use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{fmt, thread}; +use std::pin::Pin; +use std::task::Context; -use futures::sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; -use futures::sync::oneshot::{channel, Canceled, Sender}; -use futures::{future, Async, Future, IntoFuture, Poll, Stream}; -use tokio_current_thread::spawn; +use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; +use futures::channel::oneshot::{channel, Canceled, Sender}; +use futures::{future, Future, Poll, FutureExt, Stream}; +use tokio::runtime::current_thread::spawn; use crate::builder::Builder; use crate::system::System; @@ -17,7 +19,7 @@ use copyless::BoxHelper; thread_local!( static ADDR: RefCell> = RefCell::new(None); static RUNNING: Cell = Cell::new(false); - static Q: RefCell>>> = RefCell::new(Vec::new()); + static Q: RefCell>>> = RefCell::new(Vec::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -25,7 +27,7 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); pub(crate) enum ArbiterCommand { Stop, - Execute(Box + Send>), + Execute(Box + Unpin + Send>), ExecuteFn(Box), } @@ -129,7 +131,9 @@ impl Arbiter { Q.with(|cell| { let mut v = cell.borrow_mut(); for fut in v.drain(..) { - spawn(fut); + // We pin the boxed future, so it can never again be moved. + let fut = unsafe { Pin::new_unchecked(fut) }; + tokio_executor::current_thread::spawn( fut); } }); } @@ -142,14 +146,19 @@ impl Arbiter { /// or Arbiter address, it is simply a helper for spawning futures on the current /// thread. pub fn spawn(future: F) - where - F: Future + 'static, + where + F: Future + 'static, { RUNNING.with(move |cell| { if cell.get() { - spawn(Box::alloc().init(future)); + // Spawn the future on running executor + spawn(future); } else { - Q.with(move |cell| cell.borrow_mut().push(Box::alloc().init(future))); + // Box the future and push it to the queue, this results in double boxing + // because the executor boxes the future again, but works for now + Q.with(move |cell| { + cell.borrow_mut().push(Box::alloc().init(future)) + }); } }); } @@ -158,17 +167,17 @@ impl Arbiter { /// or Arbiter address, it is simply a helper for executing futures on the current /// thread. pub fn spawn_fn(f: F) - where - F: FnOnce() -> R + 'static, - R: IntoFuture + 'static, + where + F: FnOnce() -> R + 'static, + R: Future + 'static, { - Arbiter::spawn(future::lazy(f)) + Arbiter::spawn(future::lazy(|_| f()).flatten()) } /// Send a future to the Arbiter's thread, and spawn it. pub fn send(&self, future: F) - where - F: Future + Send + 'static, + where + F: Future + Send + Unpin + 'static, { let _ = self .0 @@ -178,8 +187,8 @@ impl Arbiter { /// Send a function to the Arbiter's thread, and execute it. Any result from the function /// is discarded. pub fn exec_fn(&self, f: F) - where - F: FnOnce() + Send + 'static, + where + F: FnOnce() + Send + 'static, { let _ = self .0 @@ -191,10 +200,10 @@ impl Arbiter { /// Send a function to the Arbiter's thread. This function will be executed asynchronously. /// A future is created, and when resolved will contain the result of the function sent /// to the Arbiters thread. - pub fn exec(&self, f: F) -> impl Future - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, + pub fn exec(&self, f: F) -> impl Future> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, { let (tx, rx) = channel(); let _ = self @@ -221,8 +230,8 @@ impl Arbiter { /// /// Panics is item is not inserted pub fn get_item(mut f: F) -> R - where - F: FnMut(&T) -> R, + where + F: FnMut(&T) -> R, { STORAGE.with(move |cell| { let st = cell.borrow(); @@ -238,8 +247,8 @@ impl Arbiter { /// /// Panics is item is not inserted pub fn get_mut_item(mut f: F) -> R - where - F: FnMut(&mut T) -> R, + where + F: FnMut(&mut T) -> R, { STORAGE.with(move |cell| { let mut st = cell.borrow_mut(); @@ -269,28 +278,34 @@ impl Drop for ArbiterController { } impl Future for ArbiterController { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match self.rx.poll() { - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), - Ok(Async::Ready(Some(item))) => match item { - ArbiterCommand::Stop => { - if let Some(stop) = self.stop.take() { - let _ = stop.send(0); - }; - return Ok(Async::Ready(())); - } - ArbiterCommand::Execute(fut) => { - spawn(fut); - } - ArbiterCommand::ExecuteFn(f) => { - f.call_box(); - } + + match Pin::new(&mut self.rx).poll_next(cx) { + Poll::Ready(None) => { + return Poll::Ready(()) + }, + Poll::Ready(Some(item)) => { + match item { + ArbiterCommand::Stop => { + if let Some(stop) = self.stop.take() { + let _ = stop.send(0); + }; + return Poll::Ready(()); + } + ArbiterCommand::Execute(fut) => { + spawn(fut); + } + ArbiterCommand::ExecuteFn(f) => { + f.call_box(); + } + } + } + Poll::Pending => { + return Poll::Pending }, - Ok(Async::NotReady) => return Ok(Async::NotReady), } } } @@ -321,14 +336,13 @@ impl SystemArbiter { } impl Future for SystemArbiter { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match self.commands.poll() { - Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), - Ok(Async::Ready(Some(cmd))) => match cmd { + match Pin::new(&mut self.commands).poll_next(cx) { + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(cmd)) => match cmd { SystemCommand::Exit(code) => { // stop arbiters for arb in self.arbiters.values() { @@ -346,7 +360,7 @@ impl Future for SystemArbiter { self.arbiters.remove(&name); } }, - Ok(Async::NotReady) => return Ok(Async::NotReady), + Poll::Pending => return Poll::Pending, } } } @@ -357,8 +371,8 @@ pub trait FnExec: Send + 'static { } impl FnExec for F -where - F: FnOnce() + Send + 'static, + where + F: FnOnce() + Send + 'static, { #[allow(clippy::boxed_local)] fn call_box(self: Box) { diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 71509f14..1aa95045 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,19 +1,19 @@ use std::borrow::Cow; use std::io; -use futures::future; +use futures::{future, FutureExt}; use futures::future::{lazy, Future}; -use futures::sync::mpsc::unbounded; -use futures::sync::oneshot::{channel, Receiver}; +use futures::channel::mpsc::unbounded; +use futures::channel::oneshot::{channel, Receiver}; -use tokio_current_thread::{CurrentThread, Handle}; -use tokio_reactor::Reactor; -use tokio_timer::clock::Clock; -use tokio_timer::timer::Timer; +use tokio::runtime::current_thread::Handle; +use tokio_timer::{timer::Timer, clock::Clock}; +use tokio_net::driver::Reactor; use crate::arbiter::{Arbiter, SystemArbiter}; use crate::runtime::Runtime; use crate::system::System; +use tokio_executor::current_thread::CurrentThread; /// Builder struct for a actix runtime. /// @@ -118,7 +118,7 @@ impl Builder { rt.spawn(arb); // init system arbiter and run configuration method - let _ = rt.block_on(lazy(move || { + let _ = rt.block_on(lazy(move |_| { f(); Ok::<_, ()>(()) })); @@ -159,30 +159,30 @@ pub(crate) struct AsyncSystemRunner { impl AsyncSystemRunner { /// This function will start event loop and returns a future that /// resolves once the `System::stop()` function is called. - pub(crate) fn run_nonblocking(self) -> impl Future + Send { + pub(crate) fn run_nonblocking(self) -> impl Future> + Send { let AsyncSystemRunner { stop, .. } = self; // run loop - future::lazy(|| { + future::lazy(|_| { Arbiter::run_system(); - stop.then(|res| match res { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) + async { + let res = match stop.await { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) + } } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - }) - .then(|result| { + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + }; Arbiter::stop_system(); - result - }) - }) + return res; + } + }).flatten() } } @@ -202,10 +202,10 @@ impl SystemRunner { let SystemRunner { mut rt, stop, .. } = self; // run loop - let _ = rt.block_on(lazy(move || { + let _ = rt.block_on(async { Arbiter::run_system(); Ok::<_, ()>(()) - })); + }); let result = match rt.block_on(stop) { Ok(code) => { if code != 0 { @@ -224,19 +224,19 @@ impl SystemRunner { } /// Execute a future and wait for result. - pub fn block_on(&mut self, fut: F) -> Result + pub fn block_on(&mut self, fut: F) -> O where - F: Future, + F: Future, { - let _ = self.rt.block_on(lazy(move || { + let _ = self.rt.block_on(async { Arbiter::run_system(); - Ok::<_, ()>(()) - })); + }); + let res = self.rt.block_on(fut); - let _ = self.rt.block_on(lazy(move || { + let _ = self.rt.block_on(async { Arbiter::stop_system(); - Ok::<_, ()>(()) - })); + }); + res } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 9b16b959..97c56102 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -20,7 +20,7 @@ pub use actix_threadpool as blocking; /// This function panics if actix system is not running. pub fn spawn(f: F) where - F: futures::Future + 'static, + F: futures::Future + 'static, { if !System::is_set() { panic!("System is not running"); diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index c6b2a9fc..0e7ffc41 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -2,11 +2,9 @@ use std::error::Error; use std::{fmt, io}; use futures::Future; -use tokio_current_thread::{self as current_thread, CurrentThread}; -use tokio_executor; -use tokio_reactor::{self, Reactor}; -use tokio_timer::clock::{self, Clock}; -use tokio_timer::timer::{self, Timer}; +use tokio_executor::current_thread::{self, CurrentThread}; +use tokio_timer::{timer::{self, Timer}, clock::Clock}; +use tokio_net::driver::{Reactor, Handle as ReactorHandle}; use crate::builder::Builder; @@ -18,7 +16,7 @@ use crate::builder::Builder; /// [mod]: index.html #[derive(Debug)] pub struct Runtime { - reactor_handle: tokio_reactor::Handle, + reactor_handle: ReactorHandle, timer_handle: timer::Handle, clock: Clock, executor: CurrentThread>, @@ -53,7 +51,7 @@ impl Runtime { } pub(super) fn new2( - reactor_handle: tokio_reactor::Handle, + reactor_handle: ReactorHandle, timer_handle: timer::Handle, clock: Clock, executor: CurrentThread>, @@ -97,7 +95,7 @@ impl Runtime { /// is currently at capacity and is unable to spawn a new future. pub fn spawn(&mut self, future: F) -> &mut Self where - F: Future + 'static, + F: Future + 'static, { self.executor.spawn(future); self @@ -119,14 +117,14 @@ impl Runtime { /// /// The caller is responsible for ensuring that other spawned futures /// complete execution by calling `block_on` or `run`. - pub fn block_on(&mut self, f: F) -> Result + pub fn block_on(&mut self, f: F) -> F::Output where F: Future, { self.enter(|executor| { // Run the provided future let ret = executor.block_on(f); - ret.map_err(|e| e.into_inner().expect("unexpected execution error")) + ret }) } @@ -139,7 +137,7 @@ impl Runtime { fn enter(&mut self, f: F) -> R where - F: FnOnce(&mut current_thread::Entered>) -> R, + F: FnOnce(&mut CurrentThread>) -> R, { let Runtime { ref reactor_handle, @@ -149,25 +147,14 @@ impl Runtime { .. } = *self; - // Binds an executor to this thread - let mut enter = tokio_executor::enter().expect("Multiple executors at once"); + // WARN: We do not enter the executor here, since in tokio 0.2 the executor is entered + // automatically inside its `block_on` and `run` methods - // This will set the default handle and timer to use inside the closure - // and run the future. - tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { - clock::with_default(clock, enter, |enter| { - timer::with_default(&timer_handle, enter, |enter| { - // The TaskExecutor is a fake executor that looks into the - // current single-threaded executor when used. This is a trick, - // because we need two mutable references to the executor (one - // to run the provided future, another to install as the default - // one). We use the fake one here as the default one. - let mut default_executor = current_thread::TaskExecutor::current(); - tokio_executor::with_default(&mut default_executor, enter, |enter| { - let mut executor = executor.enter(enter); - f(&mut executor) - }) - }) + tokio_executor::with_default(&mut current_thread::TaskExecutor::current(),|| { + tokio_timer::clock::with_default(clock, || { + let _reactor_guard = tokio_net::driver::set_default(reactor_handle); + let _timer_guard = tokio_timer::set_default(timer_handle); + f(executor) }) }) } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 745de00f..c2eb9f37 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -2,9 +2,9 @@ use std::cell::RefCell; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; -use futures::sync::mpsc::UnboundedSender; +use futures::channel::mpsc::UnboundedSender; use futures::Future; -use tokio_current_thread::Handle; +use tokio::runtime::current_thread::Handle; use crate::arbiter::{Arbiter, SystemCommand}; use crate::builder::{Builder, SystemRunner}; @@ -64,7 +64,7 @@ impl System { pub fn run_in_executor>( name: T, executor: Handle, - ) -> impl Future + Send { + ) -> impl Future> + Send { Self::builder() .name(name) .build_async(executor)