add generic executor support for actix-rt

This commit is contained in:
fakeshadow 2020-10-29 18:57:23 +08:00
parent 1d5219da17
commit 9a7a626f7b
7 changed files with 236 additions and 323 deletions

View File

@ -24,3 +24,6 @@ tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "r
[dev-dependencies] [dev-dependencies]
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] }
tokio = { version = "0.2.6", features = ["full"] } tokio = { version = "0.2.6", features = ["full"] }
tokio-compat = "0.1.6"
futures-01 = { package = "futures", version = "0.1" }
tokio-01 = { package = "tokio", version = "0.1" }

View File

@ -2,6 +2,7 @@ use std::any::{Any, TypeId};
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future; use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -10,9 +11,8 @@ use std::{fmt, thread};
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures_channel::oneshot::{channel, Canceled, Sender}; use futures_channel::oneshot::{channel, Canceled, Sender};
use tokio::stream::Stream; use tokio::stream::Stream;
use tokio::task::LocalSet;
use crate::runtime::Runtime; use crate::runtime::{DefaultExec, ExecFactory};
use crate::system::System; use crate::system::System;
thread_local!( thread_local!(
@ -60,18 +60,28 @@ impl Default for Arbiter {
} }
impl Arbiter { impl Arbiter {
pub(crate) fn new_system(local: &LocalSet) -> Self { pub(crate) fn new_system<E: ExecFactory>(exec: &mut E::Executor) -> Self {
let (tx, rx) = unbounded(); let (tx, rx) = unbounded();
let arb = Arbiter::with_sender(tx); let arb = Arbiter::with_sender(tx);
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear()); STORAGE.with(|cell| cell.borrow_mut().clear());
local.spawn_local(ArbiterController { rx }); let controller: ArbiterController<E> = ArbiterController {
rx,
_exec: Default::default(),
};
E::spawn_ref(exec, controller);
arb arb
} }
#[deprecated(since = "1.2.0", note = "Please use actix_rt::spawn instead")]
pub fn spawn<F: Future<Output = ()> + 'static>(f: F) {
DefaultExec::spawn(f)
}
/// Returns the current thread's arbiter's address. If no Arbiter is present, then this /// Returns the current thread's arbiter's address. If no Arbiter is present, then this
/// function will panic! /// function will panic!
pub fn current() -> Arbiter { pub fn current() -> Arbiter {
@ -95,6 +105,10 @@ impl Arbiter {
/// Spawn new thread and run event loop in spawned thread. /// Spawn new thread and run event loop in spawned thread.
/// Returns address of newly created arbiter. /// Returns address of newly created arbiter.
pub fn new() -> Arbiter { pub fn new() -> Arbiter {
Self::new_with::<DefaultExec>()
}
pub fn new_with<E: ExecFactory>() -> Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed); let id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt:worker:{}", id); let name = format!("actix-rt:worker:{}", id);
let sys = System::current(); let sys = System::current();
@ -105,7 +119,7 @@ impl Arbiter {
.spawn({ .spawn({
let tx = tx.clone(); let tx = tx.clone();
move || { move || {
let mut rt = Runtime::new().expect("Can not create Runtime"); let mut exec = E::build().expect("Can not create Runtime");
let arb = Arbiter::with_sender(tx); let arb = Arbiter::with_sender(tx);
STORAGE.with(|cell| cell.borrow_mut().clear()); STORAGE.with(|cell| cell.borrow_mut().clear());
@ -121,7 +135,11 @@ impl Arbiter {
// start arbiter controller // start arbiter controller
// run loop // run loop
rt.block_on(ArbiterController { rx }); let fut: ArbiterController<E> = ArbiterController {
rx,
_exec: PhantomData,
};
E::block_on(&mut exec, fut);
// unregister arbiter // unregister arbiter
let _ = System::current() let _ = System::current()
@ -139,29 +157,6 @@ impl Arbiter {
} }
} }
/// Spawn a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
pub fn spawn<F>(future: F)
where
F: Future<Output = ()> + 'static,
{
tokio::task::spawn_local(future);
}
/// Executes a future on the current thread. This does not create a new Arbiter
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
pub fn spawn_fn<F, R>(f: F)
where
F: FnOnce() -> R + 'static,
R: Future<Output = ()> + 'static,
{
Arbiter::spawn(async {
f();
})
}
/// Send a future to the Arbiter's thread, and spawn it. /// Send a future to the Arbiter's thread, and spawn it.
pub fn send<F>(&self, future: F) pub fn send<F>(&self, future: F)
where where
@ -272,11 +267,12 @@ impl Arbiter {
} }
} }
struct ArbiterController { struct ArbiterController<Exec> {
rx: UnboundedReceiver<ArbiterCommand>, rx: UnboundedReceiver<ArbiterCommand>,
_exec: PhantomData<Exec>,
} }
impl Drop for ArbiterController { impl<Exec> Drop for ArbiterController<Exec> {
fn drop(&mut self) { fn drop(&mut self) {
if thread::panicking() { if thread::panicking() {
if System::current().stop_on_panic() { if System::current().stop_on_panic() {
@ -289,7 +285,7 @@ impl Drop for ArbiterController {
} }
} }
impl Future for ArbiterController { impl<E: ExecFactory> Future for ArbiterController<E> {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -299,7 +295,7 @@ impl Future for ArbiterController {
Poll::Ready(Some(item)) => match item { Poll::Ready(Some(item)) => match item {
ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Stop => return Poll::Ready(()),
ArbiterCommand::Execute(fut) => { ArbiterCommand::Execute(fut) => {
tokio::task::spawn_local(fut); E::spawn(fut);
} }
ArbiterCommand::ExecuteFn(f) => { ArbiterCommand::ExecuteFn(f) => {
f.call_box(); f.call_box();

View File

@ -1,13 +1,13 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::future::Future; use std::future::Future;
use std::io; use std::io;
use std::marker::PhantomData;
use futures_channel::mpsc::unbounded; use futures_channel::mpsc::unbounded;
use futures_channel::oneshot::{channel, Receiver}; use futures_channel::oneshot::{channel, Receiver};
use tokio::task::LocalSet;
use crate::arbiter::{Arbiter, SystemArbiter}; use crate::arbiter::{Arbiter, SystemArbiter};
use crate::runtime::Runtime; use crate::runtime::ExecFactory;
use crate::system::System; use crate::system::System;
/// Builder struct for a actix runtime. /// Builder struct for a actix runtime.
@ -15,19 +15,21 @@ use crate::system::System;
/// Either use `Builder::build` to create a system and start actors. /// Either use `Builder::build` to create a system and start actors.
/// Alternatively, use `Builder::run` to start the tokio runtime and /// Alternatively, use `Builder::run` to start the tokio runtime and
/// run a function in its context. /// run a function in its context.
pub struct Builder { pub struct Builder<E> {
/// Name of the System. Defaults to "actix" if unset. /// Name of the System. Defaults to "actix" if unset.
name: Cow<'static, str>, name: Cow<'static, str>,
/// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false.
stop_on_panic: bool, stop_on_panic: bool,
exec: PhantomData<E>,
} }
impl Builder { impl<E: ExecFactory> Builder<E> {
pub(crate) fn new() -> Self { pub(crate) fn new() -> Builder<E> {
Builder { Builder {
name: Cow::Borrowed("actix"), name: Cow::Borrowed("actix"),
stop_on_panic: false, stop_on_panic: false,
exec: PhantomData,
} }
} }
@ -49,17 +51,10 @@ impl Builder {
/// Create new System. /// Create new System.
/// ///
/// This method panics if it can not create tokio runtime /// This method panics if it can not create tokio runtime
pub fn build(self) -> SystemRunner { pub fn build(self) -> SystemRunner<E> {
self.create_runtime(|| {}) self.create_runtime(|| {})
} }
/// Create new System that can run asynchronously.
///
/// This method panics if it cannot start the system arbiter
pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner {
self.create_async_runtime(local)
}
/// This function will start tokio runtime and will finish once the /// This function will start tokio runtime and will finish once the
/// `System::stop()` message get called. /// `System::stop()` message get called.
/// Function `f` get called within tokio runtime context. /// Function `f` get called within tokio runtime context.
@ -70,97 +65,57 @@ impl Builder {
self.create_runtime(f).run() self.create_runtime(f).run()
} }
fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { /// Create runtime with a given instance of type that impl `ExecFactory::Executor` trait.
let (stop_tx, stop) = channel(); pub fn create_with_runtime<F>(self, mut rt: E::Executor, f: F) -> SystemRunner<E>
let (sys_sender, sys_receiver) = unbounded();
let system =
System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic);
// system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver);
// start the system arbiter
let _ = local.spawn_local(arb);
AsyncSystemRunner { stop, system }
}
fn create_runtime<F>(self, f: F) -> SystemRunner
where where
F: FnOnce() + 'static, F: FnOnce() + 'static,
{ {
let (stop_tx, stop) = channel(); let (stop_tx, stop) = channel();
let (sys_sender, sys_receiver) = unbounded(); let (sys_sender, sys_receiver) = unbounded();
let mut rt = Runtime::new().unwrap();
let system = System::construct( let system = System::construct(
sys_sender, sys_sender,
Arbiter::new_system(rt.local()), Arbiter::new_system::<E>(&mut rt),
self.stop_on_panic, self.stop_on_panic,
); );
// system arbiter // system arbiter
let arb = SystemArbiter::new(stop_tx, sys_receiver); let arb = SystemArbiter::new(stop_tx, sys_receiver);
rt.spawn(arb); E::spawn_ref(&mut rt, arb);
// init system arbiter and run configuration method // init system arbiter and run configuration method
rt.block_on(async { f() }); E::block_on(&mut rt, async { f() });
SystemRunner { rt, stop, system } SystemRunner { rt, stop, system }
} }
}
#[derive(Debug)] fn create_runtime<F>(self, f: F) -> SystemRunner<E>
pub(crate) struct AsyncSystemRunner { where
stop: Receiver<i32>, F: FnOnce() + 'static,
system: System, {
} let rt = E::build().unwrap();
self.create_with_runtime(rt, f)
impl AsyncSystemRunner {
/// This function will start event loop and returns a future that
/// resolves once the `System::stop()` function is called.
pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(), io::Error>> + Send {
let AsyncSystemRunner { stop, .. } = self;
// run loop
async {
match stop.await {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
}
} }
} }
/// Helper object that runs System's event loop /// Helper object that runs System's event loop
#[must_use = "SystemRunner must be run"] #[must_use = "SystemRunner must be run"]
#[derive(Debug)] #[derive(Debug)]
pub struct SystemRunner { pub struct SystemRunner<E: ExecFactory> {
rt: Runtime, rt: E::Executor,
stop: Receiver<i32>, stop: Receiver<i32>,
system: System, system: System,
} }
impl SystemRunner { impl<E: ExecFactory> SystemRunner<E> {
/// This function will start event loop and will finish once the /// This function will start event loop and will finish once the
/// `System::stop()` function is called. /// `System::stop()` function is called.
pub fn run(self) -> io::Result<()> { pub fn run(self) -> io::Result<()> {
let SystemRunner { mut rt, stop, .. } = self; let SystemRunner { mut rt, stop, .. } = self;
// run loop // run loop
match rt.block_on(stop) { match E::block_on(&mut rt, stop) {
Ok(code) => { Ok(code) => {
if code != 0 { if code != 0 {
Err(io::Error::new( Err(io::Error::new(
@ -181,6 +136,6 @@ impl SystemRunner {
where where
F: Future<Output = O>, F: Future<Output = O>,
{ {
self.rt.block_on(fut) E::block_on(&mut self.rt, fut)
} }
} }

View File

@ -14,7 +14,7 @@ mod system;
pub use self::arbiter::Arbiter; pub use self::arbiter::Arbiter;
pub use self::builder::{Builder, SystemRunner}; pub use self::builder::{Builder, SystemRunner};
pub use self::runtime::Runtime; pub use self::runtime::{DefaultExec, ExecFactory};
pub use self::system::System; pub use self::system::System;
#[doc(hidden)] #[doc(hidden)]
@ -25,12 +25,15 @@ pub use actix_threadpool as blocking;
/// # Panics /// # Panics
/// ///
/// This function panics if actix system is not running. /// This function panics if actix system is not running.
///
/// This function panics if actix system does not runs on default executor
/// (tokio current-thread executor with a matching version to the actix-rt dependency).
#[inline] #[inline]
pub fn spawn<F>(f: F) pub fn spawn<F>(f: F)
where where
F: Future<Output = ()> + 'static, F: Future<Output = ()> + 'static,
{ {
Arbiter::spawn(f) DefaultExec::spawn(f)
} }
/// Asynchronous signal handling /// Asynchronous signal handling

View File

@ -2,39 +2,34 @@ use std::future::Future;
use std::io; use std::io;
use tokio::{runtime, task::LocalSet}; use tokio::{runtime, task::LocalSet};
/// Single-threaded runtime provides a way to start reactor /// A trait for construct async executor and run future on it.
/// and runtime on the current thread.
/// ///
/// See [module level][mod] documentation for more details. /// A factory trait is necessary as `actix` and `actix-web` can run on multiple instances of
/// executors. Therefore the executor would be constructed multiple times
pub trait ExecFactory: Sized + Unpin + 'static {
type Executor;
fn build() -> io::Result<Self::Executor>;
/// Runs the provided future, blocking the current thread until the future
/// completes.
/// ///
/// [mod]: index.html /// This function can be used to synchronously block the current thread
#[derive(Debug)] /// until the provided `future` has resolved either successfully or with an
pub struct Runtime { /// error. The result of the future is then returned from this function
local: LocalSet, /// call.
rt: runtime::Runtime, ///
} /// Note that this function will **also** execute any spawned futures on the
/// current thread, but will **not** block until these other spawned futures
/// have completed. Once the function returns, any uncompleted futures
/// remain pending in the `Runtime` instance. These futures will not run
/// until `block_on` or `run` is called again.
///
/// The caller is responsible for ensuring that other spawned futures
/// complete execution by calling `block_on` or `run`.
fn block_on<F: Future>(exec: &mut Self::Executor, f: F) -> F::Output;
impl Runtime { /// Spawn a future onto the single-threaded runtime without reference it.
#[allow(clippy::new_ret_no_self)]
/// Returns a new runtime initialized with default configuration values.
pub fn new() -> io::Result<Runtime> {
let rt = runtime::Builder::new()
.enable_io()
.enable_time()
.basic_scheduler()
.build()?;
Ok(Runtime {
rt,
local: LocalSet::new(),
})
}
pub(super) fn local(&self) -> &LocalSet {
&self.local
}
/// Spawn a future onto the single-threaded runtime.
/// ///
/// See [module level][mod] documentation for more details. /// See [module level][mod] documentation for more details.
/// ///
@ -62,34 +57,49 @@ impl Runtime {
/// ///
/// This function panics if the spawn fails. Failure occurs if the executor /// This function panics if the spawn fails. Failure occurs if the executor
/// is currently at capacity and is unable to spawn a new future. /// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&self, future: F) -> &Self fn spawn<F: Future<Output = ()> + 'static>(f: F);
where
F: Future<Output = ()> + 'static, /// Spawn a future onto the single-threaded runtime reference. Useful when you have direct
{ /// access to executor.
self.local.spawn_local(future); ///
self /// *. `spawn_ref` is preferred when you can choose between it and `spawn`.
fn spawn_ref<F: Future<Output = ()> + 'static>(exec: &mut Self::Executor, f: F);
} }
/// Runs the provided future, blocking the current thread until the future /// Default Single-threaded tokio executor on the current thread.
/// completes.
/// ///
/// This function can be used to synchronously block the current thread /// See [module level][mod] documentation for more details.
/// until the provided `future` has resolved either successfully or with an
/// error. The result of the future is then returned from this function
/// call.
/// ///
/// Note that this function will **also** execute any spawned futures on the /// [mod]: index.html
/// current thread, but will **not** block until these other spawned futures #[derive(Copy, Clone, Debug)]
/// have completed. Once the function returns, any uncompleted futures pub struct DefaultExec;
/// remain pending in the `Runtime` instance. These futures will not run
/// until `block_on` or `run` is called again. pub type DefaultExecutor = (runtime::Runtime, LocalSet);
///
/// The caller is responsible for ensuring that other spawned futures impl ExecFactory for DefaultExec {
/// complete execution by calling `block_on` or `run`. type Executor = DefaultExecutor;
pub fn block_on<F>(&mut self, f: F) -> F::Output
where fn build() -> io::Result<Self::Executor> {
F: Future, let rt = runtime::Builder::new()
{ .enable_io()
self.local.block_on(&mut self.rt, f) .enable_time()
.basic_scheduler()
.build()?;
Ok((rt, LocalSet::new()))
}
fn block_on<F: Future>(exec: &mut Self::Executor, f: F) -> <F as Future>::Output {
let (rt, local) = exec;
rt.block_on(local.run_until(f))
}
fn spawn<F: Future<Output = ()> + 'static>(f: F) {
tokio::task::spawn_local(f);
}
fn spawn_ref<F: Future<Output = ()> + 'static>(exec: &mut Self::Executor, f: F) {
exec.1.spawn_local(f);
} }
} }

View File

@ -1,13 +1,13 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::future::Future;
use std::io; use std::io;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use futures_channel::mpsc::UnboundedSender; use futures_channel::mpsc::UnboundedSender;
use tokio::task::LocalSet;
use crate::arbiter::{Arbiter, SystemCommand}; use crate::arbiter::{Arbiter, SystemCommand};
use crate::builder::{Builder, SystemRunner}; use crate::builder::{Builder, SystemRunner};
use crate::runtime::ExecFactory;
use crate::DefaultExec;
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@ -25,170 +25,22 @@ thread_local!(
); );
impl System { impl System {
/// Constructs new system and sets it as current
pub(crate) fn construct(
sys: UnboundedSender<SystemCommand>,
arbiter: Arbiter,
stop_on_panic: bool,
) -> Self {
let sys = System {
sys,
arbiter,
stop_on_panic,
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
};
System::set_current(sys.clone());
sys
}
/// Build a new system with a customized tokio runtime.
///
/// This allows to customize the runtime. See struct level docs on
/// `Builder` for more information.
pub fn builder() -> Builder {
Builder::new()
}
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
/// Create new system. /// Create new system.
/// ///
/// This method panics if it can not create tokio runtime /// This method panics if it can not create tokio runtime
pub fn new<T: Into<String>>(name: T) -> SystemRunner { pub fn new<T: Into<String>>(name: T) -> SystemRunner<DefaultExec> {
Self::builder().name(name).build() Self::new_with::<DefaultExec, T>(name)
} }
/// Create new system using provided tokio `LocalSet`. /// This function will start tokio runtime and will finish once the
/// /// `System::stop()` message get called.
/// This method panics if it can not spawn system arbiter /// Function `f` get called within tokio runtime context.
/// pub fn run<F>(f: F) -> io::Result<()>
/// Note: This method uses provided `LocalSet` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the
/// alternative `tokio` `Runtime`s (e.g. provided by [`tokio_compat`]).
///
/// [`Arbiter`]: struct.Arbiter.html
/// [`tokio_compat`]: https://crates.io/crates/tokio-compat
///
/// # Examples
///
/// ```
/// use tokio::{runtime::Runtime, task::LocalSet};
/// use actix_rt::System;
/// use futures_util::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
///
/// let mut runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
///
/// let actix_system_task = LocalSet::new();
/// let sys = System::run_in_tokio("actix-main-system", &actix_system_task);
/// actix_system_task.spawn_local(sys);
///
/// let rest_operations = run_application();
/// runtime.block_on(actix_system_task.run_until(rest_operations));
/// ```
pub fn run_in_tokio<T: Into<String>>(
name: T,
local: &LocalSet,
) -> impl Future<Output = io::Result<()>> {
Self::builder()
.name(name)
.build_async(local)
.run_nonblocking()
}
/// Consume the provided tokio Runtime and start the `System` in it.
/// This method will create a `LocalSet` object and occupy the current thread
/// for the created `System` exclusively. All the other asynchronous tasks that
/// should be executed as well must be aggregated into one future, provided as the last
/// argument to this method.
///
/// Note: This method uses provided `Runtime` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the
/// alternative `tokio` `Runtime`s (e.g. provided by `tokio_compat`).
///
/// [`Arbiter`]: struct.Arbiter.html
/// [`tokio_compat`]: https://crates.io/crates/tokio-compat
///
/// # Arguments
///
/// - `name`: Name of the System
/// - `runtime`: A tokio Runtime to run the system in.
/// - `rest_operations`: A future to be executed in the runtime along with the System.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
/// use actix_rt::System;
/// use futures_util::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
///
/// let runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
/// let rest_operations = run_application();
/// System::attach_to_tokio("actix-main-system", runtime, rest_operations);
/// ```
pub fn attach_to_tokio<Fut, R>(
name: impl Into<String>,
mut runtime: tokio::runtime::Runtime,
rest_operations: Fut,
) -> R
where where
Fut: std::future::Future<Output = R>, F: FnOnce() + 'static,
{ {
let actix_system_task = LocalSet::new(); Self::run_with::<DefaultExec, F>(f)
let sys = System::run_in_tokio(name.into(), &actix_system_task);
actix_system_task.spawn_local(sys);
runtime.block_on(actix_system_task.run_until(rest_operations))
} }
/// Get current running system. /// Get current running system.
@ -253,13 +105,42 @@ impl System {
&self.arbiter &self.arbiter
} }
/// This function will start tokio runtime and will finish once the /// Build a new system with custom executor.
/// `System::stop()` message get called. pub fn builder<E: ExecFactory>() -> Builder<E> {
/// Function `f` get called within tokio runtime context. Builder::new()
pub fn run<F>(f: F) -> io::Result<()> }
#[allow(clippy::new_ret_no_self)]
/// Create new system with custom runtime.
///
/// This method panics if it can not create the runtime type.
pub fn new_with<E: ExecFactory, T: Into<String>>(name: T) -> SystemRunner<E> {
Self::builder().name(name).build()
}
pub fn run_with<E, F>(f: F) -> io::Result<()>
where where
E: ExecFactory,
F: FnOnce() + 'static, F: FnOnce() + 'static,
{ {
Self::builder().run(f) Self::builder::<E>().run(f)
}
}
impl System {
/// Constructs new system and sets it as current
pub(crate) fn construct(
sys: UnboundedSender<SystemCommand>,
arbiter: Arbiter,
stop_on_panic: bool,
) -> Self {
let sys = System {
sys,
arbiter,
stop_on_panic,
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
};
System::set_current(sys.clone());
sys
} }
} }

View File

@ -1,10 +1,18 @@
use actix_rt::ExecFactory;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::macros::support::Future;
#[test] #[test]
fn await_for_timer() { fn await_for_timer() {
let time = Duration::from_secs(2); let time = Duration::from_secs(2);
let instant = Instant::now(); let instant = Instant::now();
actix_rt::System::new("test_wait_timer").block_on(async move { actix_rt::System::new("test_wait_timer").block_on(async move {
let arbiter = actix_rt::Arbiter::new();
arbiter.send(Box::pin(async move {
tokio::time::delay_for(time).await;
actix_rt::Arbiter::current().stop();
}));
tokio::time::delay_for(time).await; tokio::time::delay_for(time).await;
}); });
assert!( assert!(
@ -61,3 +69,60 @@ fn join_another_arbiter() {
"Premature stop of arbiter should conclude regardless of it's current state" "Premature stop of arbiter should conclude regardless of it's current state"
); );
} }
struct TokioCompatExec;
impl ExecFactory for TokioCompatExec {
type Executor = tokio_compat::runtime::current_thread::Runtime;
fn build() -> std::io::Result<Self::Executor> {
let rt = tokio_compat::runtime::current_thread::Runtime::new()?;
Ok(rt)
}
fn block_on<F: Future>(exec: &mut Self::Executor, f: F) -> <F as Future>::Output {
exec.block_on_std(f)
}
fn spawn<F: Future<Output = ()> + 'static>(f: F) {
tokio_compat::runtime::current_thread::TaskExecutor::current()
.spawn_local_std(f)
.unwrap();
}
fn spawn_ref<F: Future<Output = ()> + 'static>(exec: &mut Self::Executor, f: F) {
exec.spawn_std(f);
}
}
#[test]
fn tokio_compat() -> std::io::Result<()> {
// manually construct a compat executor.
let rt = TokioCompatExec::build()?;
// do some work with rt and pass it to builder
actix_rt::System::builder::<TokioCompatExec>()
.create_with_runtime(rt, || {})
.block_on(async {
let (tx, rx) = tokio::sync::oneshot::channel();
tokio_01::spawn(futures_01::lazy(|| {
tx.send(251).unwrap();
Ok(())
}));
assert_eq!(251, rx.await.unwrap());
});
// let the system construct the executor and block on it directly.
actix_rt::System::new_with::<TokioCompatExec, _>("compat").block_on(async {
let (tx, rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
tokio::time::delay_for(Duration::from_secs(1)).await;
tx.send(996).unwrap();
});
assert_eq!(996, rx.await.unwrap());
});
Ok(())
}