WIP: Update actix-rt to tokio 0.2

This commit is contained in:
Martell Malone 2019-11-27 23:41:49 -08:00
parent b98b897c8e
commit dca7cad09d
6 changed files with 50 additions and 453 deletions

View File

@ -9,9 +9,8 @@ use std::{fmt, thread};
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot::{channel, Canceled, Sender};
use futures::{future, Future, FutureExt, Stream};
use tokio_executor::current_thread::spawn;
use tokio::runtime::{Builder, Runtime};
use crate::builder::Builder;
use crate::system::System;
use copyless::BoxHelper;
@ -101,7 +100,7 @@ impl Arbiter {
let handle = thread::Builder::new()
.name(name.clone())
.spawn(move || {
let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
let mut rt = Builder::new().build().expect("Can not create Runtime");
let arb = Arbiter::with_sender(arb_tx);
let (stop, stop_rx) = channel();
@ -150,7 +149,8 @@ impl Arbiter {
for fut in v.drain(..) {
// We pin the boxed future, so it can never again be moved.
let fut = unsafe { Pin::new_unchecked(fut) };
tokio_executor::current_thread::spawn(fut);
// FIXME
//tokio_executor::current_thread::spawn(fut);
}
});
}
@ -169,7 +169,8 @@ impl Arbiter {
RUNNING.with(move |cell| {
if cell.get() {
// Spawn the future on running executor
spawn(future);
// FIXME
//spawn(future);
} else {
// Box the future and push it to the queue, this results in double boxing
// because the executor boxes the future again, but works for now
@ -325,7 +326,8 @@ impl Future for ArbiterController {
return Poll::Ready(());
}
ArbiterCommand::Execute(fut) => {
spawn(fut);
// FIXME
//spawn(fut);
}
ArbiterCommand::ExecuteFn(f) => {
f.call_box();

View File

@ -1,242 +0,0 @@
use std::borrow::Cow;
use std::io;
use futures::channel::mpsc::unbounded;
use futures::channel::oneshot::{channel, Receiver};
use futures::future::{lazy, Future, FutureExt};
use tokio::runtime::current_thread::Handle;
use tokio_executor::current_thread::CurrentThread;
use tokio::net::driver::Reactor;
use tokio::time::{clock::Clock, timer::Timer};
use crate::arbiter::{Arbiter, SystemArbiter};
use crate::runtime::Runtime;
use crate::system::System;
/// Builder struct for a actix runtime.
///
/// Either use `Builder::build` to create a system and start actors.
/// Alternatively, use `Builder::run` to start the tokio runtime and
/// run a function in its context.
pub struct Builder {
/// Name of the System. Defaults to "actix" if unset.
name: Cow<'static, str>,
/// The clock to use
clock: Clock,
/// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
stop_on_panic: bool,
}
impl Builder {
pub(crate) fn new() -> Self {
Builder {
name: Cow::Borrowed("actix"),
clock: Clock::new(),
stop_on_panic: false,
}
}
/// Sets the name of the System.
pub fn name<T: Into<String>>(mut self, name: T) -> Self {
self.name = Cow::Owned(name.into());
self
}
/// Set the Clock instance that will be used by this System.
///
/// Defaults to the system clock.
pub fn clock(mut self, clock: Clock) -> Self {
self.clock = clock;
self
}
/// Sets the option 'stop_on_panic' which controls whether the System is stopped when an
/// uncaught panic is thrown from a worker thread.
///
/// Defaults to false.
pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
self.stop_on_panic = stop_on_panic;
self
}
/// Create new System.
///
/// This method panics if it can not create tokio runtime
pub fn build(self) -> SystemRunner {
self.create_runtime(|| {})
}
/// Create new System that can run asynchronously.
///
/// This method panics if it cannot start the system arbiter
pub(crate) fn build_async(self, executor: Handle) -> AsyncSystemRunner {
self.create_async_runtime(executor)
}
/// This function will start tokio runtime and will finish once the
/// `System::stop()` message get called.
/// Function `f` get called within tokio runtime context.
pub fn run<F>(self, f: F) -> io::Result<()>
where
F: FnOnce() + 'static,
{
self.create_runtime(f).run()
}
fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner {
let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded();
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
// system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver);
// start the system arbiter
executor.spawn(arb).expect("could not start system arbiter");
AsyncSystemRunner { stop, system }
}
fn create_runtime<F>(self, f: F) -> SystemRunner
where
F: FnOnce() + 'static,
{
let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded();
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
// system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver);
let mut rt = self.build_rt().unwrap();
rt.spawn(arb);
// init system arbiter and run configuration method
let _ = rt.block_on(lazy(move |_| {
f();
Ok::<_, ()>(())
}));
SystemRunner { rt, stop, system }
}
pub(crate) fn build_rt(&self) -> io::Result<Runtime> {
// We need a reactor to receive events about IO objects from kernel
let reactor = Reactor::new()?;
let reactor_handle = reactor.handle();
// Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the
// reactor pick up some new external events.
let timer = Timer::new_with_now(reactor, self.clock.clone());
let timer_handle = timer.handle();
// And now put a single-threaded executor on top of the timer. When there are no futures ready
// to do something, it'll let the timer or the reactor to generate some new stimuli for the
// futures to continue in their life.
let executor = CurrentThread::new_with_park(timer);
Ok(Runtime::new2(
reactor_handle,
timer_handle,
self.clock.clone(),
executor,
))
}
}
#[derive(Debug)]
pub(crate) struct AsyncSystemRunner {
stop: Receiver<i32>,
system: System,
}
impl AsyncSystemRunner {
/// This function will start event loop and returns a future that
/// resolves once the `System::stop()` function is called.
pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(), io::Error>> + Send {
let AsyncSystemRunner { stop, .. } = self;
// run loop
lazy(|_| {
Arbiter::run_system();
async {
let res = match stop.await {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
};
Arbiter::stop_system();
return res;
}
})
.flatten()
}
}
/// Helper object that runs System's event loop
#[must_use = "SystemRunner must be run"]
#[derive(Debug)]
pub struct SystemRunner {
rt: Runtime,
stop: Receiver<i32>,
system: System,
}
impl SystemRunner {
/// This function will start event loop and will finish once the
/// `System::stop()` function is called.
pub fn run(self) -> io::Result<()> {
let SystemRunner { mut rt, stop, .. } = self;
// run loop
let _ = rt.block_on(async {
Arbiter::run_system();
Ok::<_, ()>(())
});
let result = match rt.block_on(stop) {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
};
Arbiter::stop_system();
result
}
/// Execute a future and wait for result.
pub fn block_on<F, O>(&mut self, fut: F) -> O
where
F: Future<Output = O>,
{
self.rt.block_on(async {
Arbiter::run_system();
});
let res = self.rt.block_on(fut);
self.rt.block_on(async {
Arbiter::stop_system();
});
res
}
}

View File

@ -4,13 +4,9 @@
pub use actix_macros::{main, test};
mod arbiter;
mod builder;
mod runtime;
mod system;
pub use self::arbiter::Arbiter;
pub use self::builder::{Builder, SystemRunner};
pub use self::runtime::Runtime;
pub use self::system::System;
#[doc(hidden)]
@ -34,21 +30,8 @@ where
/// Utilities for tracking time.
pub mod time {
use std::time::{Duration, Instant};
pub use tokio_timer::Interval;
pub use tokio_timer::{delay, delay_for, Delay};
pub use tokio_timer::{timeout, Timeout};
/// Creates new `Interval` that yields with interval of `duration`. The first
/// tick completes immediately.
pub fn interval(duration: Duration) -> Interval {
Interval::new(Instant::now(), duration)
}
/// Creates new `Interval` that yields with interval of `period` with the
/// first tick completing at `at`.
pub fn interval_at(start: Instant, duration: Duration) -> Interval {
Interval::new(start, duration)
}
pub use tokio::time::{Duration, Instant};
pub use tokio::time::{delay_for, delay_until, Delay};
pub use tokio::time::{interval, interval_at, Interval};
pub use tokio::time::{timeout, Timeout};
}

View File

@ -66,15 +66,8 @@
//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors
//! [timer]: ../../timer/index.html
mod builder;
mod runtime;
pub use self::builder::Builder;
pub use self::runtime::{Runtime, Handle};
pub use tokio_current_thread::spawn;
pub use tokio_current_thread::TaskExecutor;
use futures::Future;
use tokio::runtime;
/// Run the provided future to completion using a runtime running on the current thread.
///
@ -85,7 +78,11 @@ pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error>
where
F: Future,
{
let mut r = Runtime::new().expect("failed to start runtime on current thread");
let mut r = runtime::Builder::new()
.basic_scheduler()
.enable_io()
.build()
.expect("failed to start runtime on current thread");
let v = r.block_on(future)?;
r.run().expect("failed to resolve remaining futures");
Ok(v)

View File

@ -1,161 +0,0 @@
use std::error::Error;
use std::{fmt, io};
use futures::Future;
use tokio_executor::current_thread::{self, CurrentThread};
use tokio::net::driver::{Handle as ReactorHandle, Reactor};
use tokio_timer::{
clock::Clock,
timer::{self, Timer},
};
use crate::builder::Builder;
/// Single-threaded runtime provides a way to start reactor
/// and executor on the current thread.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
#[derive(Debug)]
pub struct Runtime {
reactor_handle: ReactorHandle,
timer_handle: timer::Handle,
clock: Clock,
executor: CurrentThread<Timer<Reactor>>,
}
/// Error returned by the `run` function.
#[derive(Debug)]
pub struct RunError {
inner: current_thread::RunError,
}
impl fmt::Display for RunError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.inner)
}
}
impl Error for RunError {
fn description(&self) -> &str {
self.inner.description()
}
fn cause(&self) -> Option<&dyn Error> {
self.inner.source()
}
}
impl Runtime {
#[allow(clippy::new_ret_no_self)]
/// Returns a new runtime initialized with default configuration values.
pub fn new() -> io::Result<Runtime> {
Builder::new().build_rt()
}
pub(super) fn new2(
reactor_handle: ReactorHandle,
timer_handle: timer::Handle,
clock: Clock,
executor: CurrentThread<Timer<Reactor>>,
) -> Runtime {
Runtime {
reactor_handle,
timer_handle,
clock,
executor,
}
}
/// Spawn a future onto the single-threaded Tokio runtime.
///
/// See [module level][mod] documentation for more details.
///
/// [mod]: index.html
///
/// # Examples
///
/// ```rust
/// # use futures::{future, Future, Stream};
/// use actix_rt::Runtime;
///
/// # fn dox() {
/// // Create the runtime
/// let mut rt = Runtime::new().unwrap();
///
/// // Spawn a future onto the runtime
/// rt.spawn(future::lazy(|_| {
/// println!("running on the runtime");
/// }));
/// # }
/// # pub fn main() {}
/// ```
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where
F: Future<Output = ()> + 'static,
{
self.executor.spawn(future);
self
}
/// Runs the provided future, blocking the current thread until the future
/// completes.
///
/// This function can be used to synchronously block the current thread
/// until the provided `future` has resolved either successfully or with an
/// error. The result of the future is then returned from this function
/// call.
///
/// Note that this function will **also** execute any spawned futures on the
/// current thread, but will **not** block until these other spawned futures
/// have completed. Once the function returns, any uncompleted futures
/// remain pending in the `Runtime` instance. These futures will not run
/// until `block_on` or `run` is called again.
///
/// The caller is responsible for ensuring that other spawned futures
/// complete execution by calling `block_on` or `run`.
pub fn block_on<F>(&mut self, f: F) -> F::Output
where
F: Future,
{
self.enter(|executor| {
// Run the provided future
executor.block_on(f)
})
}
/// Run the executor to completion, blocking the thread until **all**
/// spawned futures have completed.
pub fn run(&mut self) -> Result<(), RunError> {
self.enter(|executor| executor.run())
.map_err(|e| RunError { inner: e })
}
fn enter<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&mut CurrentThread<Timer<Reactor>>) -> R,
{
let Runtime {
ref reactor_handle,
ref timer_handle,
ref clock,
ref mut executor,
..
} = *self;
// WARN: We do not enter the executor here, since in tokio 0.2 the executor is entered
// automatically inside its `block_on` and `run` methods
tokio_executor::with_default(&mut current_thread::TaskExecutor::current(), || {
tokio_timer::clock::with_default(clock, || {
let _reactor_guard = tokio::net::driver::set_default(reactor_handle);
let _timer_guard = tokio_timer::set_default(timer_handle);
f(executor)
})
})
}
}

View File

@ -4,10 +4,10 @@ use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use futures::channel::mpsc::UnboundedSender;
use tokio::runtime::current_thread::Handle;
use tokio::runtime::{Builder, Runtime};
use crate::arbiter::{Arbiter, SystemCommand};
use crate::builder::{Builder, SystemRunner};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@ -53,23 +53,23 @@ impl System {
/// Create new system.
///
/// This method panics if it can not create tokio runtime
pub fn new<T: Into<String>>(name: T) -> SystemRunner {
Self::builder().name(name).build()
pub fn new<T: Into<String>>(name: T) -> Runtime {
Self::builder().thread_name(name).build().unwrap()
}
#[allow(clippy::new_ret_no_self)]
/// Create new system using provided CurrentThread Handle.
///
/// This method panics if it can not spawn system arbiter
pub fn run_in_executor<T: Into<String>>(
name: T,
executor: Handle,
) -> impl Future<Output = Result<(), io::Error>> + Send {
Self::builder()
.name(name)
.build_async(executor)
.run_nonblocking()
}
// pub fn run_in_executor<T: Into<String>>(
// name: T,
// executor: Handle,
// ) -> impl Future<Output = Result<(), io::Error>> + Send {
// Self::builder()
// .name(name)
// .build_async(executor)
// .run_nonblocking()
// }
/// Get current running system.
pub fn current() -> System {
@ -140,6 +140,24 @@ impl System {
where
F: FnOnce() + 'static,
{
Self::builder().run(f)
// let (stop_tx, stop) = channel();
// let (sys_sender, sys_receiver) = unbounded();
// let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
// // system arbiter
// let arb = SystemArbiter::new(stop_tx, sys_receiver);
// let mut rt = self.build_rt().unwrap();
// rt.spawn(arb);
// // init system arbiter and run configuration method
// let _ = rt.block_on(lazy(move |_| {
// f();
// Ok::<_, ()>(())
// }));
// FIXME
Ok(())
//Self::builder().run(f)
}
}