Migrate actix-rt to std::future

This commit is contained in:
Michal Hornický 2019-10-01 15:02:12 +02:00
parent aa9bbe2114
commit a36f7f4ca8
6 changed files with 134 additions and 129 deletions

View File

@ -19,9 +19,13 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix-threadpool = "0.1.1" actix-threadpool = "0.1.1"
futures = "0.1.25" futures = { package = "futures-preview", version = "0.3.0-alpha.18" }
tokio-current-thread = "0.1"
tokio-executor = "0.1.5"
tokio-reactor = "0.1.7" # TODO: Replace this with dependency on tokio-runtime once it is ready
tokio-timer = "0.2.8" tokio = { version = "0.2.0-alpha.4" }
tokio-timer = "=0.3.0-alpha.4"
tokio-executor = "=0.2.0-alpha.4"
tokio-net = "=0.2.0-alpha.4"
copyless = "0.1.4" copyless = "0.1.4"

View File

@ -3,11 +3,13 @@ use std::cell::{Cell, RefCell};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::{fmt, thread}; use std::{fmt, thread};
use std::pin::Pin;
use std::task::Context;
use futures::sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::sync::oneshot::{channel, Canceled, Sender}; use futures::channel::oneshot::{channel, Canceled, Sender};
use futures::{future, Async, Future, IntoFuture, Poll, Stream}; use futures::{future, Future, Poll, FutureExt, Stream};
use tokio_current_thread::spawn; use tokio::runtime::current_thread::spawn;
use crate::builder::Builder; use crate::builder::Builder;
use crate::system::System; use crate::system::System;
@ -17,7 +19,7 @@ use copyless::BoxHelper;
thread_local!( thread_local!(
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None); static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
static RUNNING: Cell<bool> = Cell::new(false); static RUNNING: Cell<bool> = Cell::new(false);
static Q: RefCell<Vec<Box<dyn Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new()); static Q: RefCell<Vec<Box<dyn Future<Output = ()>>>> = RefCell::new(Vec::new());
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new()); static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
); );
@ -25,7 +27,7 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
pub(crate) enum ArbiterCommand { pub(crate) enum ArbiterCommand {
Stop, Stop,
Execute(Box<dyn Future<Item = (), Error = ()> + Send>), Execute(Box<dyn Future<Output=()> + Unpin + Send>),
ExecuteFn(Box<dyn FnExec>), ExecuteFn(Box<dyn FnExec>),
} }
@ -129,7 +131,9 @@ impl Arbiter {
Q.with(|cell| { Q.with(|cell| {
let mut v = cell.borrow_mut(); let mut v = cell.borrow_mut();
for fut in v.drain(..) { for fut in v.drain(..) {
spawn(fut); // We pin the boxed future, so it can never again be moved.
let fut = unsafe { Pin::new_unchecked(fut) };
tokio_executor::current_thread::spawn( fut);
} }
}); });
} }
@ -142,14 +146,19 @@ impl Arbiter {
/// or Arbiter address, it is simply a helper for spawning futures on the current /// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread. /// thread.
pub fn spawn<F>(future: F) pub fn spawn<F>(future: F)
where where
F: Future<Item = (), Error = ()> + 'static, F: Future<Output=()> + 'static,
{ {
RUNNING.with(move |cell| { RUNNING.with(move |cell| {
if cell.get() { if cell.get() {
spawn(Box::alloc().init(future)); // Spawn the future on running executor
spawn(future);
} else { } else {
Q.with(move |cell| cell.borrow_mut().push(Box::alloc().init(future))); // Box the future and push it to the queue, this results in double boxing
// because the executor boxes the future again, but works for now
Q.with(move |cell| {
cell.borrow_mut().push(Box::alloc().init(future))
});
} }
}); });
} }
@ -158,17 +167,17 @@ impl Arbiter {
/// or Arbiter address, it is simply a helper for executing futures on the current /// or Arbiter address, it is simply a helper for executing futures on the current
/// thread. /// thread.
pub fn spawn_fn<F, R>(f: F) pub fn spawn_fn<F, R>(f: F)
where where
F: FnOnce() -> R + 'static, F: FnOnce() -> R + 'static,
R: IntoFuture<Item = (), Error = ()> + 'static, R: Future<Output=()> + 'static,
{ {
Arbiter::spawn(future::lazy(f)) Arbiter::spawn(future::lazy(|_| f()).flatten())
} }
/// Send a future to the Arbiter's thread, and spawn it. /// Send a future to the Arbiter's thread, and spawn it.
pub fn send<F>(&self, future: F) pub fn send<F>(&self, future: F)
where where
F: Future<Item = (), Error = ()> + Send + 'static, F: Future<Output=()> + Send + Unpin + 'static,
{ {
let _ = self let _ = self
.0 .0
@ -178,8 +187,8 @@ impl Arbiter {
/// Send a function to the Arbiter's thread, and execute it. Any result from the function /// Send a function to the Arbiter's thread, and execute it. Any result from the function
/// is discarded. /// is discarded.
pub fn exec_fn<F>(&self, f: F) pub fn exec_fn<F>(&self, f: F)
where where
F: FnOnce() + Send + 'static, F: FnOnce() + Send + 'static,
{ {
let _ = self let _ = self
.0 .0
@ -191,10 +200,10 @@ impl Arbiter {
/// Send a function to the Arbiter's thread. This function will be executed asynchronously. /// Send a function to the Arbiter's thread. This function will be executed asynchronously.
/// A future is created, and when resolved will contain the result of the function sent /// A future is created, and when resolved will contain the result of the function sent
/// to the Arbiters thread. /// to the Arbiters thread.
pub fn exec<F, R>(&self, f: F) -> impl Future<Item = R, Error = Canceled> pub fn exec<F, R>(&self, f: F) -> impl Future<Output=Result<R, Canceled>>
where where
F: FnOnce() -> R + Send + 'static, F: FnOnce() -> R + Send + 'static,
R: Send + 'static, R: Send + 'static,
{ {
let (tx, rx) = channel(); let (tx, rx) = channel();
let _ = self let _ = self
@ -221,8 +230,8 @@ impl Arbiter {
/// ///
/// Panics is item is not inserted /// Panics is item is not inserted
pub fn get_item<T: 'static, F, R>(mut f: F) -> R pub fn get_item<T: 'static, F, R>(mut f: F) -> R
where where
F: FnMut(&T) -> R, F: FnMut(&T) -> R,
{ {
STORAGE.with(move |cell| { STORAGE.with(move |cell| {
let st = cell.borrow(); let st = cell.borrow();
@ -238,8 +247,8 @@ impl Arbiter {
/// ///
/// Panics is item is not inserted /// Panics is item is not inserted
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
where where
F: FnMut(&mut T) -> R, F: FnMut(&mut T) -> R,
{ {
STORAGE.with(move |cell| { STORAGE.with(move |cell| {
let mut st = cell.borrow_mut(); let mut st = cell.borrow_mut();
@ -269,28 +278,34 @@ impl Drop for ArbiterController {
} }
impl Future for ArbiterController { impl Future for ArbiterController {
type Item = (); type Output = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop { loop {
match self.rx.poll() {
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), match Pin::new(&mut self.rx).poll_next(cx) {
Ok(Async::Ready(Some(item))) => match item { Poll::Ready(None) => {
ArbiterCommand::Stop => { return Poll::Ready(())
if let Some(stop) = self.stop.take() { },
let _ = stop.send(0); Poll::Ready(Some(item)) => {
}; match item {
return Ok(Async::Ready(())); ArbiterCommand::Stop => {
} if let Some(stop) = self.stop.take() {
ArbiterCommand::Execute(fut) => { let _ = stop.send(0);
spawn(fut); };
} return Poll::Ready(());
ArbiterCommand::ExecuteFn(f) => { }
f.call_box(); ArbiterCommand::Execute(fut) => {
} spawn(fut);
}
ArbiterCommand::ExecuteFn(f) => {
f.call_box();
}
}
}
Poll::Pending => {
return Poll::Pending
}, },
Ok(Async::NotReady) => return Ok(Async::NotReady),
} }
} }
} }
@ -321,14 +336,13 @@ impl SystemArbiter {
} }
impl Future for SystemArbiter { impl Future for SystemArbiter {
type Item = (); type Output = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop { loop {
match self.commands.poll() { match Pin::new(&mut self.commands).poll_next(cx) {
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())), Poll::Ready(None) => return Poll::Ready(()),
Ok(Async::Ready(Some(cmd))) => match cmd { Poll::Ready(Some(cmd)) => match cmd {
SystemCommand::Exit(code) => { SystemCommand::Exit(code) => {
// stop arbiters // stop arbiters
for arb in self.arbiters.values() { for arb in self.arbiters.values() {
@ -346,7 +360,7 @@ impl Future for SystemArbiter {
self.arbiters.remove(&name); self.arbiters.remove(&name);
} }
}, },
Ok(Async::NotReady) => return Ok(Async::NotReady), Poll::Pending => return Poll::Pending,
} }
} }
} }
@ -357,8 +371,8 @@ pub trait FnExec: Send + 'static {
} }
impl<F> FnExec for F impl<F> FnExec for F
where where
F: FnOnce() + Send + 'static, F: FnOnce() + Send + 'static,
{ {
#[allow(clippy::boxed_local)] #[allow(clippy::boxed_local)]
fn call_box(self: Box<Self>) { fn call_box(self: Box<Self>) {

View File

@ -1,19 +1,19 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::io; use std::io;
use futures::future; use futures::{future, FutureExt};
use futures::future::{lazy, Future}; use futures::future::{lazy, Future};
use futures::sync::mpsc::unbounded; use futures::channel::mpsc::unbounded;
use futures::sync::oneshot::{channel, Receiver}; use futures::channel::oneshot::{channel, Receiver};
use tokio_current_thread::{CurrentThread, Handle}; use tokio::runtime::current_thread::Handle;
use tokio_reactor::Reactor; use tokio_timer::{timer::Timer, clock::Clock};
use tokio_timer::clock::Clock; use tokio_net::driver::Reactor;
use tokio_timer::timer::Timer;
use crate::arbiter::{Arbiter, SystemArbiter}; use crate::arbiter::{Arbiter, SystemArbiter};
use crate::runtime::Runtime; use crate::runtime::Runtime;
use crate::system::System; use crate::system::System;
use tokio_executor::current_thread::CurrentThread;
/// Builder struct for a actix runtime. /// Builder struct for a actix runtime.
/// ///
@ -118,7 +118,7 @@ impl Builder {
rt.spawn(arb); rt.spawn(arb);
// init system arbiter and run configuration method // init system arbiter and run configuration method
let _ = rt.block_on(lazy(move || { let _ = rt.block_on(lazy(move |_| {
f(); f();
Ok::<_, ()>(()) Ok::<_, ()>(())
})); }));
@ -159,30 +159,30 @@ pub(crate) struct AsyncSystemRunner {
impl AsyncSystemRunner { impl AsyncSystemRunner {
/// This function will start event loop and returns a future that /// This function will start event loop and returns a future that
/// resolves once the `System::stop()` function is called. /// resolves once the `System::stop()` function is called.
pub(crate) fn run_nonblocking(self) -> impl Future<Item = (), Error = io::Error> + Send { pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(),io::Error>> + Send {
let AsyncSystemRunner { stop, .. } = self; let AsyncSystemRunner { stop, .. } = self;
// run loop // run loop
future::lazy(|| { future::lazy(|_| {
Arbiter::run_system(); Arbiter::run_system();
stop.then(|res| match res { async {
Ok(code) => { let res = match stop.await {
if code != 0 { Ok(code) => {
Err(io::Error::new( if code != 0 {
io::ErrorKind::Other, Err(io::Error::new(
format!("Non-zero exit code: {}", code), io::ErrorKind::Other,
)) format!("Non-zero exit code: {}", code),
} else { ))
Ok(()) } else {
Ok(())
}
} }
} Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), };
})
.then(|result| {
Arbiter::stop_system(); Arbiter::stop_system();
result return res;
}) }
}) }).flatten()
} }
} }
@ -202,10 +202,10 @@ impl SystemRunner {
let SystemRunner { mut rt, stop, .. } = self; let SystemRunner { mut rt, stop, .. } = self;
// run loop // run loop
let _ = rt.block_on(lazy(move || { let _ = rt.block_on(async {
Arbiter::run_system(); Arbiter::run_system();
Ok::<_, ()>(()) Ok::<_, ()>(())
})); });
let result = match rt.block_on(stop) { let result = match rt.block_on(stop) {
Ok(code) => { Ok(code) => {
if code != 0 { if code != 0 {
@ -224,19 +224,19 @@ impl SystemRunner {
} }
/// Execute a future and wait for result. /// Execute a future and wait for result.
pub fn block_on<F, I, E>(&mut self, fut: F) -> Result<I, E> pub fn block_on<F, O>(&mut self, fut: F) -> O
where where
F: Future<Item = I, Error = E>, F: Future<Output = O>,
{ {
let _ = self.rt.block_on(lazy(move || { let _ = self.rt.block_on(async {
Arbiter::run_system(); Arbiter::run_system();
Ok::<_, ()>(()) });
}));
let res = self.rt.block_on(fut); let res = self.rt.block_on(fut);
let _ = self.rt.block_on(lazy(move || { let _ = self.rt.block_on(async {
Arbiter::stop_system(); Arbiter::stop_system();
Ok::<_, ()>(()) });
}));
res res
} }
} }

View File

@ -20,7 +20,7 @@ pub use actix_threadpool as blocking;
/// This function panics if actix system is not running. /// This function panics if actix system is not running.
pub fn spawn<F>(f: F) pub fn spawn<F>(f: F)
where where
F: futures::Future<Item = (), Error = ()> + 'static, F: futures::Future<Output = ()> + 'static,
{ {
if !System::is_set() { if !System::is_set() {
panic!("System is not running"); panic!("System is not running");

View File

@ -2,11 +2,9 @@ use std::error::Error;
use std::{fmt, io}; use std::{fmt, io};
use futures::Future; use futures::Future;
use tokio_current_thread::{self as current_thread, CurrentThread}; use tokio_executor::current_thread::{self, CurrentThread};
use tokio_executor; use tokio_timer::{timer::{self, Timer}, clock::Clock};
use tokio_reactor::{self, Reactor}; use tokio_net::driver::{Reactor, Handle as ReactorHandle};
use tokio_timer::clock::{self, Clock};
use tokio_timer::timer::{self, Timer};
use crate::builder::Builder; use crate::builder::Builder;
@ -18,7 +16,7 @@ use crate::builder::Builder;
/// [mod]: index.html /// [mod]: index.html
#[derive(Debug)] #[derive(Debug)]
pub struct Runtime { pub struct Runtime {
reactor_handle: tokio_reactor::Handle, reactor_handle: ReactorHandle,
timer_handle: timer::Handle, timer_handle: timer::Handle,
clock: Clock, clock: Clock,
executor: CurrentThread<Timer<Reactor>>, executor: CurrentThread<Timer<Reactor>>,
@ -53,7 +51,7 @@ impl Runtime {
} }
pub(super) fn new2( pub(super) fn new2(
reactor_handle: tokio_reactor::Handle, reactor_handle: ReactorHandle,
timer_handle: timer::Handle, timer_handle: timer::Handle,
clock: Clock, clock: Clock,
executor: CurrentThread<Timer<Reactor>>, executor: CurrentThread<Timer<Reactor>>,
@ -97,7 +95,7 @@ impl Runtime {
/// is currently at capacity and is unable to spawn a new future. /// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&mut self, future: F) -> &mut Self pub fn spawn<F>(&mut self, future: F) -> &mut Self
where where
F: Future<Item = (), Error = ()> + 'static, F: Future<Output = (),> + 'static,
{ {
self.executor.spawn(future); self.executor.spawn(future);
self self
@ -119,14 +117,14 @@ impl Runtime {
/// ///
/// The caller is responsible for ensuring that other spawned futures /// The caller is responsible for ensuring that other spawned futures
/// complete execution by calling `block_on` or `run`. /// complete execution by calling `block_on` or `run`.
pub fn block_on<F>(&mut self, f: F) -> Result<F::Item, F::Error> pub fn block_on<F>(&mut self, f: F) -> F::Output
where where
F: Future, F: Future,
{ {
self.enter(|executor| { self.enter(|executor| {
// Run the provided future // Run the provided future
let ret = executor.block_on(f); let ret = executor.block_on(f);
ret.map_err(|e| e.into_inner().expect("unexpected execution error")) ret
}) })
} }
@ -139,7 +137,7 @@ impl Runtime {
fn enter<F, R>(&mut self, f: F) -> R fn enter<F, R>(&mut self, f: F) -> R
where where
F: FnOnce(&mut current_thread::Entered<Timer<Reactor>>) -> R, F: FnOnce(&mut CurrentThread<Timer<Reactor>>) -> R,
{ {
let Runtime { let Runtime {
ref reactor_handle, ref reactor_handle,
@ -149,25 +147,14 @@ impl Runtime {
.. ..
} = *self; } = *self;
// Binds an executor to this thread // WARN: We do not enter the executor here, since in tokio 0.2 the executor is entered
let mut enter = tokio_executor::enter().expect("Multiple executors at once"); // automatically inside its `block_on` and `run` methods
// This will set the default handle and timer to use inside the closure tokio_executor::with_default(&mut current_thread::TaskExecutor::current(),|| {
// and run the future. tokio_timer::clock::with_default(clock, || {
tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| { let _reactor_guard = tokio_net::driver::set_default(reactor_handle);
clock::with_default(clock, enter, |enter| { let _timer_guard = tokio_timer::set_default(timer_handle);
timer::with_default(&timer_handle, enter, |enter| { f(executor)
// The TaskExecutor is a fake executor that looks into the
// current single-threaded executor when used. This is a trick,
// because we need two mutable references to the executor (one
// to run the provided future, another to install as the default
// one). We use the fake one here as the default one.
let mut default_executor = current_thread::TaskExecutor::current();
tokio_executor::with_default(&mut default_executor, enter, |enter| {
let mut executor = executor.enter(enter);
f(&mut executor)
})
})
}) })
}) })
} }

View File

@ -2,9 +2,9 @@ use std::cell::RefCell;
use std::io; use std::io;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use futures::sync::mpsc::UnboundedSender; use futures::channel::mpsc::UnboundedSender;
use futures::Future; use futures::Future;
use tokio_current_thread::Handle; use tokio::runtime::current_thread::Handle;
use crate::arbiter::{Arbiter, SystemCommand}; use crate::arbiter::{Arbiter, SystemCommand};
use crate::builder::{Builder, SystemRunner}; use crate::builder::{Builder, SystemRunner};
@ -64,7 +64,7 @@ impl System {
pub fn run_in_executor<T: Into<String>>( pub fn run_in_executor<T: Into<String>>(
name: T, name: T,
executor: Handle, executor: Handle,
) -> impl Future<Item = (), Error = io::Error> + Send { ) -> impl Future<Output = Result<(), io::Error>> + Send {
Self::builder() Self::builder()
.name(name) .name(name)
.build_async(executor) .build_async(executor)