mirror of https://github.com/fafhrd91/actix-net
add actix single threaded runtime
This commit is contained in:
@ -18,6 +18,7 @@ members = [
@ -0,0 +1,5 @@
# Changes
## [0.1.0] - 2018-12-09
* Move codec to separate crate
@ -0,0 +1,27 @@
name = "actix-rt"
version = "0.1.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix runtime"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-rt/"
categories = ["network-programming", "asynchronous"]
license = "MIT/Apache-2.0"
exclude = [".gitignore", ".travis.yml", ".cargo/config", "appveyor.yml"]
edition = "2018"
workspace = "../"
name = "actix_rt"
path = "src/lib.rs"
log = "0.4"
bytes = "0.4"
futures = "0.1.24"
tokio-current-thread = "0.1"
tokio-executor = "0.1.5"
tokio-reactor = "0.1.7"
tokio-timer = "0.2.8"
@ -0,0 +1,238 @@
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use futures::sync::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::sync::oneshot::{channel, Sender};
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
use tokio_current_thread::spawn;
use crate::builder::Builder;
use crate::system::System;
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
static RUNNING: Cell<bool> = Cell::new(false);
static Q: RefCell<Vec<Box<Future<Item = (), Error = ()>>>> = RefCell::new(Vec::new());
pub(crate) const COUNT: AtomicUsize = AtomicUsize::new(0);
pub(crate) enum ArbiterCommand {
#[derive(Debug, Clone)]
pub struct Arbiter(UnboundedSender<ArbiterCommand>);
impl Default for Arbiter {
fn default() -> Self {
impl Arbiter {
pub(crate) fn new_system() -> Self {
let (tx, rx) = unbounded();
let arb = Arbiter(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
RUNNING.with(|cell| cell.set(false));
Arbiter::spawn(ArbiterController { stop: None, rx });
/// Stop arbiter
pub fn stop(&self) {
let _ = self.0.unbounded_send(ArbiterCommand::Stop);
/// Spawn new thread and run event loop in spawned thread.
/// Returns address of newly created arbiter.
pub fn new() -> Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt:worker:{}", id);
let sys = System::current();
let (arb_tx, arb_rx) = unbounded();
let arb_tx2 = arb_tx.clone();
let _ = thread::Builder::new().name(name.clone()).spawn(move || {
let mut rt = Builder::new().build_rt().expect("Can not create Runtime");
let arb = Arbiter(arb_tx);
let (stop, stop_rx) = channel();
RUNNING.with(|cell| cell.set(true));
// start arbiter controller
rt.spawn(ArbiterController {
stop: Some(stop),
rx: arb_rx,
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
// register arbiter
let _ = System::current()
.unbounded_send(SystemCommand::RegisterArbiter(id, arb.clone()));
// run loop
let _ = match rt.block_on(stop_rx) {
Ok(code) => code,
Err(_) => 1,
// unregister arbiter
let _ = System::current()
pub(crate) fn run_system() {
RUNNING.with(|cell| cell.set(true));
Q.with(|cell| {
let mut v = cell.borrow_mut();
for fut in v.drain(..) {
pub(crate) fn stop_system() {
RUNNING.with(|cell| cell.set(false));
/// Executes a future on the current thread.
pub fn spawn<F>(future: F)
F: Future<Item = (), Error = ()> + 'static,
RUNNING.with(move |cell| {
if cell.get() {
} else {
Q.with(move |cell| cell.borrow_mut().push(Box::new(future)));
/// Executes a future on the current thread.
pub fn spawn_fn<F, R>(f: F)
F: FnOnce() -> R + 'static,
R: IntoFuture<Item = (), Error = ()> + 'static,
struct ArbiterController {
stop: Option<Sender<i32>>,
rx: UnboundedReceiver<ArbiterCommand>,
impl Drop for ArbiterController {
fn drop(&mut self) {
if thread::panicking() {
eprintln!("Panic in Arbiter thread, shutting down system.");
if System::current().stop_on_panic() {
impl Future for ArbiterController {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.rx.poll() {
Ok(Async::Ready(None)) | Err(_) => Ok(Async::Ready(())),
Ok(Async::Ready(Some(item))) => match item {
ArbiterCommand::Stop => {
if let Some(stop) = self.stop.take() {
let _ = stop.send(0);
Ok(Async::NotReady) => Ok(Async::NotReady),
pub(crate) enum SystemCommand {
RegisterArbiter(usize, Arbiter),
pub(crate) struct SystemArbiter {
stop: Option<Sender<i32>>,
commands: UnboundedReceiver<SystemCommand>,
arbiters: HashMap<usize, Arbiter>,
impl SystemArbiter {
pub(crate) fn new(stop: Sender<i32>, commands: UnboundedReceiver<SystemCommand>) -> Self {
SystemArbiter {
stop: Some(stop),
arbiters: HashMap::new(),
impl Future for SystemArbiter {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.commands.poll() {
Ok(Async::Ready(None)) | Err(_) => return Ok(Async::Ready(())),
Ok(Async::Ready(Some(cmd))) => match cmd {
SystemCommand::Exit(code) => {
// stop arbiters
for arb in self.arbiters.values() {
// stop event loop
if let Some(stop) = self.stop.take() {
let _ = stop.send(code);
SystemCommand::RegisterArbiter(name, hnd) => {
self.arbiters.insert(name, hnd);
SystemCommand::UnregisterArbiter(name) => {
Ok(Async::NotReady) => return Ok(Async::NotReady),
// /// Execute function in arbiter's thread
// impl<I: Send, E: Send> Handler<Execute<I, E>> for SystemArbiter {
// type Result = Result<I, E>;
// fn handle(&mut self, msg: Execute<I, E>, _: &mut Context<Self>) -> Result<I, E> {
// msg.exec()
// }
// }
@ -0,0 +1,175 @@
use std::borrow::Cow;
use std::io;
use futures::future::{lazy, Future};
use futures::sync::mpsc::unbounded;
use futures::sync::oneshot::{channel, Receiver};
use tokio_current_thread::CurrentThread;
use tokio_reactor::Reactor;
use tokio_timer::clock::Clock;
use tokio_timer::timer::Timer;
use crate::arbiter::{Arbiter, SystemArbiter};
use crate::runtime::Runtime;
use crate::system::System;
/// Builder struct for a actix runtime.
/// Either use `Builder::build` to create a system and start actors.
/// Alternatively, use `Builder::run` to start the tokio runtime and
/// run a function in its context.
pub struct Builder {
/// Name of the System. Defaults to "actix" if unset.
name: Cow<'static, str>,
/// The clock to use
clock: Clock,
/// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
stop_on_panic: bool,
impl Builder {
pub(crate) fn new() -> Self {
Builder {
name: Cow::Borrowed("actix"),
clock: Clock::new(),
stop_on_panic: false,
/// Sets the name of the System.
pub fn name<T: Into<String>>(mut self, name: T) -> Self {
self.name = Cow::Owned(name.into());
/// Set the Clock instance that will be used by this System.
/// Defaults to the system clock.
pub fn clock(mut self, clock: Clock) -> Self {
self.clock = clock;
/// Sets the option 'stop_on_panic' which controls whether the System is stopped when an
/// uncaught panic is thrown from a worker thread.
/// Defaults to false.
pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self {
self.stop_on_panic = stop_on_panic;
/// Create new System.
/// This method panics if it can not create tokio runtime
pub fn build(self) -> SystemRunner {
self.create_runtime(|| {})
/// This function will start tokio runtime and will finish once the
/// `System::stop()` message get called.
/// Function `f` get called within tokio runtime context.
pub fn run<F>(self, f: F) -> i32
F: FnOnce() + 'static,
fn create_runtime<F>(self, f: F) -> SystemRunner
F: FnOnce() + 'static,
let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded();
let arbiter = Arbiter::new_system();
let system = System::construct(sys_sender, arbiter.clone(), self.stop_on_panic);
// system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver);
let mut rt = self.build_rt().unwrap();
// init system arbiter and run configuration method
let _ = rt.block_on(lazy(move || {
Ok::<_, ()>(())
SystemRunner { rt, stop, system }
pub(crate) fn build_rt(&self) -> io::Result<Runtime> {
// We need a reactor to receive events about IO objects from kernel
let reactor = Reactor::new()?;
let reactor_handle = reactor.handle();
// Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the
// reactor pick up some new external events.
let timer = Timer::new_with_now(reactor, self.clock.clone());
let timer_handle = timer.handle();
// And now put a single-threaded executor on top of the timer. When there are no futures ready
// to do something, it'll let the timer or the reactor to generate some new stimuli for the
// futures to continue in their life.
let executor = CurrentThread::new_with_park(timer);
/// Helper object that runs System's event loop
#[must_use = "SystemRunner must be run"]
pub struct SystemRunner {
rt: Runtime,
stop: Receiver<i32>,
system: System,
impl SystemRunner {
/// This function will start event loop and will finish once the
/// `System::stop()` function is called.
pub fn run(self) -> i32 {
let SystemRunner { mut rt, stop, .. } = self;
// run loop
let _ = rt.block_on(lazy(move || {
Ok::<_, ()>(())
let code = match rt.block_on(stop) {
Ok(code) => code,
Err(_) => 1,
/// Execute a future and wait for result.
pub fn block_on<F, I, E>(&mut self, fut: F) -> Result<I, E>
F: Future<Item = I, Error = E>,
let _ = self.rt.block_on(lazy(move || {
Ok::<_, ()>(())
let res = self.rt.block_on(fut);
let _ = self.rt.block_on(lazy(move || {
Ok::<_, ()>(())
@ -0,0 +1,12 @@
//! A runtime implementation that runs everything on the current thread.
mod arbiter;
mod builder;
mod runtime;
mod system;
pub use self::builder::{Builder, SystemRunner};
pub use self::runtime::{Handle, Runtime};
pub use self::system::System;
// pub use tokio_current_thread::spawn;
// pub use tokio_current_thread::TaskExecutor;
@ -0,0 +1,92 @@
//! A runtime implementation that runs everything on the current thread.
//! [`current_thread::Runtime`][rt] is similar to the primary
//! [`Runtime`][concurrent-rt] except that it runs all components on the current
//! thread instead of using a thread pool. This means that it is able to spawn
//! futures that do not implement `Send`.
//! Same as the default [`Runtime`][concurrent-rt], the
//! [`current_thread::Runtime`][rt] includes:
//! * A [reactor] to drive I/O resources.
//! * An [executor] to execute tasks that use these I/O resources.
//! * A [timer] for scheduling work to run after a set period of time.
//! Note that [`current_thread::Runtime`][rt] does not implement `Send` itself
//! and cannot be safely moved to other threads.
//! # Spawning from other threads
//! While [`current_thread::Runtime`][rt] does not implement `Send` and cannot
//! safely be moved to other threads, it provides a `Handle` that can be sent
//! to other threads and allows to spawn new tasks from there.
//! For example:
//! ```
//! # extern crate tokio;
//! # extern crate futures;
//! use tokio::runtime::current_thread::Runtime;
//! use tokio::prelude::*;
//! use std::thread;
//! # fn main() {
//! let mut runtime = Runtime::new().unwrap();
//! let handle = runtime.handle();
//! thread::spawn(move || {
//! handle.spawn(future::ok(()));
//! }).join().unwrap();
//! # /*
//! runtime.run().unwrap();
//! # */
//! # }
//! ```
//! # Examples
//! Creating a new `Runtime` and running a future `f` until its completion and
//! returning its result.
//! ```
//! use tokio::runtime::current_thread::Runtime;
//! use tokio::prelude::*;
//! let mut runtime = Runtime::new().unwrap();
//! // Use the runtime...
//! // runtime.block_on(f); // where f is a future
//! ```
//! [rt]: struct.Runtime.html
//! [concurrent-rt]: ../struct.Runtime.html
//! [chan]: https://docs.rs/futures/0.1/futures/sync/mpsc/fn.channel.html
//! [reactor]: ../../reactor/struct.Reactor.html
//! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors
//! [timer]: ../../timer/index.html
mod builder;
mod runtime;
pub use self::builder::Builder;
pub use self::runtime::{Runtime, Handle};
pub use tokio_current_thread::spawn;
pub use tokio_current_thread::TaskExecutor;
use futures::Future;
/// Run the provided future to completion using a runtime running on the current thread.
/// This first creates a new [`Runtime`], and calls [`Runtime::block_on`] with the provided future,
/// which blocks the current thread until the provided future completes. It then calls
/// [`Runtime::run`] to wait for any other spawned futures to resolve.
pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error>
F: Future,
let mut r = Runtime::new().expect("failed to start runtime on current thread");
let v = r.block_on(future)?;
r.run().expect("failed to resolve remaining futures");
@ -0,0 +1,236 @@
use std::error::Error;
use std::fmt;
use std::io;
use futures::{future, Future};
use tokio_current_thread::Handle as ExecutorHandle;
use tokio_current_thread::{self as current_thread, CurrentThread};
use tokio_executor;
use tokio_reactor::{self, Reactor};
use tokio_timer::clock::{self, Clock};
use tokio_timer::timer::{self, Timer};
use crate::builder::Builder;
/// Single-threaded runtime provides a way to start reactor
/// and executor on the current thread.
/// See [module level][mod] documentation for more details.
/// [mod]: index.html
pub struct Runtime {
reactor_handle: tokio_reactor::Handle,
timer_handle: timer::Handle,
clock: Clock,
executor: CurrentThread<Timer<Reactor>>,
/// Handle to spawn a future on the corresponding `CurrentThread` runtime instance
#[derive(Debug, Clone)]
pub struct Handle(ExecutorHandle);
impl Handle {
/// Spawn a future onto the `CurrentThread` runtime instance corresponding to this handle
/// # Panics
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
/// instance of the `Handle` does not exist anymore.
pub fn spawn<F>(&self, future: F) -> Result<(), tokio_executor::SpawnError>
F: Future<Item = (), Error = ()> + Send + 'static,
/// Provides a best effort **hint** to whether or not `spawn` will succeed.
/// This function may return both false positives **and** false negatives.
/// If `status` returns `Ok`, then a call to `spawn` will *probably*
/// succeed, but may fail. If `status` returns `Err`, a call to `spawn` will
/// *probably* fail, but may succeed.
/// This allows a caller to avoid creating the task if the call to `spawn`
/// has a high likelihood of failing.
pub fn status(&self) -> Result<(), tokio_executor::SpawnError> {
impl<T> future::Executor<T> for Handle
T: Future<Item = (), Error = ()> + Send + 'static,
fn execute(&self, future: T) -> Result<(), future::ExecuteError<T>> {
if let Err(e) = self.status() {
let kind = if e.is_at_capacity() {
} else {
return Err(future::ExecuteError::new(kind, future));
let _ = self.spawn(future);
/// Error returned by the `run` function.
pub struct RunError {
inner: current_thread::RunError,
impl fmt::Display for RunError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{}", self.inner)
impl Error for RunError {
fn description(&self) -> &str {
fn cause(&self) -> Option<&Error> {
impl Runtime {
/// Returns a new runtime initialized with default configuration values.
pub fn new() -> io::Result<Runtime> {
pub(super) fn new2(
reactor_handle: tokio_reactor::Handle,
timer_handle: timer::Handle,
clock: Clock,
executor: CurrentThread<Timer<Reactor>>,
) -> Runtime {
Runtime {
/// Get a new handle to spawn futures on the single-threaded Tokio runtime
/// Different to the runtime itself, the handle can be sent to different
/// threads.
pub fn handle(&self) -> Handle {
/// Spawn a future onto the single-threaded Tokio runtime.
/// See [module level][mod] documentation for more details.
/// [mod]: index.html
/// # Examples
/// ```rust
/// # use futures::{future, Future, Stream};
/// use actix_rt::Runtime;
/// # fn dox() {
/// // Create the runtime
/// let mut rt = Runtime::new().unwrap();
/// // Spawn a future onto the runtime
/// rt.spawn(future::lazy(|| {
/// println!("running on the runtime");
/// Ok(())
/// }));
/// # }
/// # pub fn main() {}
/// ```
/// # Panics
/// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&mut self, future: F) -> &mut Self
F: Future<Item = (), Error = ()> + 'static,
/// Runs the provided future, blocking the current thread until the future
/// completes.
/// This function can be used to synchronously block the current thread
/// until the provided `future` has resolved either successfully or with an
/// error. The result of the future is then returned from this function
/// call.
/// Note that this function will **also** execute any spawned futures on the
/// current thread, but will **not** block until these other spawned futures
/// have completed. Once the function returns, any uncompleted futures
/// remain pending in the `Runtime` instance. These futures will not run
/// until `block_on` or `run` is called again.
/// The caller is responsible for ensuring that other spawned futures
/// complete execution by calling `block_on` or `run`.
pub fn block_on<F>(&mut self, f: F) -> Result<F::Item, F::Error>
F: Future,
self.enter(|executor| {
// Run the provided future
let ret = executor.block_on(f);
ret.map_err(|e| e.into_inner().expect("unexpected execution error"))
/// Run the executor to completion, blocking the thread until **all**
/// spawned futures have completed.
pub fn run(&mut self) -> Result<(), RunError> {
self.enter(|executor| executor.run())
.map_err(|e| RunError { inner: e })
fn enter<F, R>(&mut self, f: F) -> R
F: FnOnce(&mut current_thread::Entered<Timer<Reactor>>) -> R,
let Runtime {
ref reactor_handle,
ref timer_handle,
ref clock,
ref mut executor,
} = *self;
// Binds an executor to this thread
let mut enter = tokio_executor::enter().expect("Multiple executors at once");
// This will set the default handle and timer to use inside the closure
// and run the future.
tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| {
clock::with_default(clock, enter, |enter| {
timer::with_default(&timer_handle, enter, |enter| {
// The TaskExecutor is a fake executor that looks into the
// current single-threaded executor when used. This is a trick,
// because we need two mutable references to the executor (one
// to run the provided future, another to install as the default
// one). We use the fake one here as the default one.
let mut default_executor = current_thread::TaskExecutor::current();
tokio_executor::with_default(&mut default_executor, enter, |enter| {
let mut executor = executor.enter(enter);
f(&mut executor)
@ -0,0 +1,119 @@
use std::cell::RefCell;
use futures::sync::mpsc::UnboundedSender;
use crate::arbiter::{Arbiter, SystemCommand};
use crate::builder::{Builder, SystemRunner};
/// System is a runtime manager.
#[derive(Clone, Debug)]
pub struct System {
sys: UnboundedSender<SystemCommand>,
arbiter: Arbiter,
stop_on_panic: bool,
static CURRENT: RefCell<Option<System>> = RefCell::new(None);
impl System {
/// Constructs new system and sets it as current
pub(crate) fn construct(
sys: UnboundedSender<SystemCommand>,
arbiter: Arbiter,
stop_on_panic: bool,
) -> Self {
let sys = System {
/// Build a new system with a customized tokio runtime.
/// This allows to customize the runtime. See struct level docs on
/// `Builder` for more information.
pub fn builder() -> Builder {
/// Create new system.
/// This method panics if it can not create tokio runtime
pub fn new<T: Into<String>>(name: T) -> SystemRunner {
/// Get current running system.
pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() {
Some(ref sys) => sys.clone(),
None => panic!("System is not running"),
/// Set current running system.
pub(crate) fn _is_set() -> bool {
CURRENT.with(|cell| cell.borrow().is_some())
/// Set current running system.
pub fn set_current(sys: System) {
CURRENT.with(|s| {
*s.borrow_mut() = Some(sys);
/// Execute function with system reference.
pub fn with_current<F, R>(f: F) -> R
F: FnOnce(&System) -> R,
CURRENT.with(|cell| match *cell.borrow() {
Some(ref sys) => f(sys),
None => panic!("System is not running"),
/// Stop the system
pub fn stop(&self) {
/// Stop the system with a particular exit code.
pub fn stop_with_code(&self, code: i32) {
let _ = self.sys.unbounded_send(SystemCommand::Exit(code));
pub(crate) fn sys(&self) -> &UnboundedSender<SystemCommand> {
/// Return status of 'stop_on_panic' option which controls whether the System is stopped when an
/// uncaught panic is thrown from a worker thread.
pub fn stop_on_panic(&self) -> bool {
/// System arbiter
pub fn arbiter(&self) -> &Arbiter {
/// This function will start tokio runtime and will finish once the
/// `System::stop()` message get called.
/// Function `f` get called within tokio runtime context.
pub fn run<F>(f: F) -> i32
F: FnOnce() + 'static,
Reference in New Issue