mirror of https://github.com/fafhrd91/actix-net
merge master into branch
This commit is contained in:
commit
69314fa648
|
@ -2,24 +2,40 @@
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
|
||||||
* Rename `Arbiter => Worker`. [#254]
|
|
||||||
|
## 2.0.0-beta.3 - 2021-01-31
|
||||||
* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253]
|
* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253]
|
||||||
* Return `JoinHandle` from `actix_rt::spawn`. [#253]
|
* Return `JoinHandle` from `actix_rt::spawn`. [#253]
|
||||||
|
<<<<<<< HEAD
|
||||||
* Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253]
|
* Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253]
|
||||||
* Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253]
|
* Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253]
|
||||||
* Remove `Worker::exec`. [#253]
|
* Remove `Worker::exec`. [#253]
|
||||||
* Remove `System::arbiter`. [#256]
|
* Remove `System::arbiter`. [#256]
|
||||||
* Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253]
|
* Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253]
|
||||||
* `Worker::spawn` now accepts !Unpin futures. [#256]
|
* `Worker::spawn` now accepts !Unpin futures. [#256]
|
||||||
|
=======
|
||||||
|
* Remove old `Arbiter::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253]
|
||||||
|
* Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253]
|
||||||
|
* Remove `Arbiter::exec`. [#253]
|
||||||
|
* Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253]
|
||||||
|
* `Arbiter::spawn` now accepts !Unpin futures. [#256]
|
||||||
|
>>>>>>> master
|
||||||
* `System::new` no longer takes arguments. [#257]
|
* `System::new` no longer takes arguments. [#257]
|
||||||
* Remove `System::with_current`. [#257]
|
* Remove `System::with_current`. [#257]
|
||||||
* Remove `Builder`. [#257]
|
* Remove `Builder`. [#257]
|
||||||
* Add `System::with_init` as replacement for `Builder::run`. [#257]
|
* Add `System::with_init` as replacement for `Builder::run`. [#257]
|
||||||
* Rename `System::{is_set => is_registered}`. [#257]
|
* Rename `System::{is_set => is_registered}`. [#257]
|
||||||
|
<<<<<<< HEAD
|
||||||
* Add `WorkerHandle` for sending messages to non-current-thread workers. [#257].
|
* Add `WorkerHandle` for sending messages to non-current-thread workers. [#257].
|
||||||
* `System::worker` now returns a `&WorkerHandle`. [#257]
|
* `System::worker` now returns a `&WorkerHandle`. [#257]
|
||||||
* Rename `Worker::{current => handle}` and return a `WorkerHandle` instead. [#257]
|
* Rename `Worker::{current => handle}` and return a `WorkerHandle` instead. [#257]
|
||||||
* `Worker::join` now takes self by value. [#257]
|
* `Worker::join` now takes self by value. [#257]
|
||||||
|
=======
|
||||||
|
* Add `ArbiterHandle` for sending messages to non-current-thread arbiters. [#257].
|
||||||
|
* `System::arbiter` now returns an `&ArbiterHandle`. [#257]
|
||||||
|
* `Arbiter::current` now returns an `ArbiterHandle` instead. [#257]
|
||||||
|
* `Arbiter::join` now takes self by value. [#257]
|
||||||
|
>>>>>>> master
|
||||||
|
|
||||||
[#253]: https://github.com/actix/actix-net/pull/253
|
[#253]: https://github.com/actix/actix-net/pull/253
|
||||||
[#254]: https://github.com/actix/actix-net/pull/254
|
[#254]: https://github.com/actix/actix-net/pull/254
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
[package]
|
[package]
|
||||||
name = "actix-rt"
|
name = "actix-rt"
|
||||||
version = "2.0.0-beta.2"
|
version = "2.0.0-beta.3"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = [
|
||||||
|
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||||
|
"Rob Ede <robjtede@icloud.com>",
|
||||||
|
]
|
||||||
description = "Tokio-based single-thread async runtime for the Actix ecosystem"
|
description = "Tokio-based single-thread async runtime for the Actix ecosystem"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
homepage = "https://actix.rs"
|
homepage = "https://actix.rs"
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
# actix-rt
|
||||||
|
|
||||||
|
> Tokio-based single-thread async runtime for the Actix ecosystem.
|
||||||
|
|
||||||
|
See documentation for detailed explanations these components: [https://docs.rs/actix-rt][docs].
|
||||||
|
|
||||||
|
[docs]: https://docs.rs/actix-rt
|
|
@ -0,0 +1,317 @@
|
||||||
|
use std::{
|
||||||
|
any::{Any, TypeId},
|
||||||
|
cell::RefCell,
|
||||||
|
collections::HashMap,
|
||||||
|
fmt,
|
||||||
|
future::Future,
|
||||||
|
pin::Pin,
|
||||||
|
sync::atomic::{AtomicUsize, Ordering},
|
||||||
|
task::{Context, Poll},
|
||||||
|
thread,
|
||||||
|
};
|
||||||
|
|
||||||
|
use futures_core::ready;
|
||||||
|
use tokio::{sync::mpsc, task::LocalSet};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
runtime::ActixRuntime,
|
||||||
|
system::{System, SystemCommand},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
|
||||||
|
thread_local!(
|
||||||
|
static HANDLE: RefCell<Option<ArbiterHandle>> = RefCell::new(None);
|
||||||
|
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
|
||||||
|
);
|
||||||
|
|
||||||
|
pub(crate) enum ArbiterCommand {
|
||||||
|
Stop,
|
||||||
|
Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for ArbiterCommand {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"),
|
||||||
|
ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A handle for sending spawn and stop messages to an [Arbiter].
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ArbiterHandle {
|
||||||
|
tx: mpsc::UnboundedSender<ArbiterCommand>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ArbiterHandle {
|
||||||
|
pub(crate) fn new(tx: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
|
||||||
|
Self { tx }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a future to the [Arbiter]'s thread and spawn it.
|
||||||
|
///
|
||||||
|
/// If you require a result, include a response channel in the future.
|
||||||
|
///
|
||||||
|
/// Returns true if future was sent successfully and false if the [Arbiter] has died.
|
||||||
|
pub fn spawn<Fut>(&self, future: Fut) -> bool
|
||||||
|
where
|
||||||
|
Fut: Future<Output = ()> + Send + 'static,
|
||||||
|
{
|
||||||
|
self.tx
|
||||||
|
.send(ArbiterCommand::Execute(Box::pin(future)))
|
||||||
|
.is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a function to the [Arbiter]'s thread and execute it.
|
||||||
|
///
|
||||||
|
/// Any result from the function is discarded. If you require a result, include a response
|
||||||
|
/// channel in the function.
|
||||||
|
///
|
||||||
|
/// Returns true if function was sent successfully and false if the [Arbiter] has died.
|
||||||
|
pub fn spawn_fn<F>(&self, f: F) -> bool
|
||||||
|
where
|
||||||
|
F: FnOnce() + Send + 'static,
|
||||||
|
{
|
||||||
|
self.spawn(async { f() })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Instruct [Arbiter] to stop processing it's event loop.
|
||||||
|
///
|
||||||
|
/// Returns true if stop message was sent successfully and false if the [Arbiter] has
|
||||||
|
/// been dropped.
|
||||||
|
pub fn stop(&self) -> bool {
|
||||||
|
self.tx.send(ArbiterCommand::Stop).is_ok()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
|
||||||
|
/// and functions.
|
||||||
|
///
|
||||||
|
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Arbiter {
|
||||||
|
tx: mpsc::UnboundedSender<ArbiterCommand>,
|
||||||
|
thread_handle: thread::JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Arbiter {
|
||||||
|
/// Spawn new Arbiter thread and start its event loop.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if a [System] is not registered on the current thread.
|
||||||
|
#[allow(clippy::new_without_default)]
|
||||||
|
pub fn new() -> Arbiter {
|
||||||
|
Self::with_tokio_rt(|| {
|
||||||
|
ActixRuntime::new_tokio_rt().expect("Cannot create new Arbiter's Runtime.")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new system with a closure that return a tokio Runtime instance
|
||||||
|
pub fn with_tokio_rt<F>(f: F) -> Arbiter
|
||||||
|
where
|
||||||
|
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
|
||||||
|
{
|
||||||
|
let id = COUNT.fetch_add(1, Ordering::Relaxed);
|
||||||
|
let system_id = System::current().id();
|
||||||
|
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id);
|
||||||
|
let sys = System::current();
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
|
||||||
|
|
||||||
|
let thread_handle = thread::Builder::new()
|
||||||
|
.name(name.clone())
|
||||||
|
.spawn({
|
||||||
|
let tx = tx.clone();
|
||||||
|
move || {
|
||||||
|
let tokio_rt = f();
|
||||||
|
let rt = ActixRuntime::from(tokio_rt);
|
||||||
|
let hnd = ArbiterHandle::new(tx);
|
||||||
|
|
||||||
|
System::set_current(sys);
|
||||||
|
|
||||||
|
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||||
|
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
||||||
|
|
||||||
|
// register arbiter
|
||||||
|
let _ = System::current()
|
||||||
|
.tx()
|
||||||
|
.send(SystemCommand::RegisterArbiter(id, hnd));
|
||||||
|
|
||||||
|
ready_tx.send(()).unwrap();
|
||||||
|
|
||||||
|
// run arbiter event processing loop
|
||||||
|
rt.block_on(ArbiterRunner { rx });
|
||||||
|
|
||||||
|
// deregister arbiter
|
||||||
|
let _ = System::current()
|
||||||
|
.tx()
|
||||||
|
.send(SystemCommand::DeregisterArbiter(id));
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
|
||||||
|
});
|
||||||
|
|
||||||
|
ready_rx.recv().unwrap();
|
||||||
|
|
||||||
|
Arbiter { tx, thread_handle }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets up an Arbiter runner in a new System using the provided runtime local task set.
|
||||||
|
pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle {
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let hnd = ArbiterHandle::new(tx);
|
||||||
|
|
||||||
|
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
||||||
|
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||||
|
|
||||||
|
local.spawn_local(ArbiterRunner { rx });
|
||||||
|
|
||||||
|
hnd
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return a handle to the current thread's Arbiter's message sender.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if no Arbiter is running on the current thread.
|
||||||
|
pub fn current() -> ArbiterHandle {
|
||||||
|
HANDLE.with(|cell| match *cell.borrow() {
|
||||||
|
Some(ref addr) => addr.clone(),
|
||||||
|
None => panic!("Arbiter is not running."),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stop Arbiter from continuing it's event loop.
|
||||||
|
///
|
||||||
|
/// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.
|
||||||
|
pub fn stop(&self) -> bool {
|
||||||
|
self.tx.send(ArbiterCommand::Stop).is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a future to the Arbiter's thread and spawn it.
|
||||||
|
///
|
||||||
|
/// If you require a result, include a response channel in the future.
|
||||||
|
///
|
||||||
|
/// Returns true if future was sent successfully and false if the Arbiter has died.
|
||||||
|
pub fn spawn<Fut>(&self, future: Fut) -> bool
|
||||||
|
where
|
||||||
|
Fut: Future<Output = ()> + Send + 'static,
|
||||||
|
{
|
||||||
|
self.tx
|
||||||
|
.send(ArbiterCommand::Execute(Box::pin(future)))
|
||||||
|
.is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a function to the Arbiter's thread and execute it.
|
||||||
|
///
|
||||||
|
/// Any result from the function is discarded. If you require a result, include a response
|
||||||
|
/// channel in the function.
|
||||||
|
///
|
||||||
|
/// Returns true if function was sent successfully and false if the Arbiter has died.
|
||||||
|
pub fn spawn_fn<F>(&self, f: F) -> bool
|
||||||
|
where
|
||||||
|
F: FnOnce() + Send + 'static,
|
||||||
|
{
|
||||||
|
self.spawn(async { f() })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wait for Arbiter's event loop to complete.
|
||||||
|
///
|
||||||
|
/// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).
|
||||||
|
pub fn join(self) -> thread::Result<()> {
|
||||||
|
self.thread_handle.join()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert item into Arbiter's thread-local storage.
|
||||||
|
///
|
||||||
|
/// Overwrites any item of the same type previously inserted.
|
||||||
|
#[deprecated = "Will be removed in stable v2."]
|
||||||
|
pub fn set_item<T: 'static>(item: T) {
|
||||||
|
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if Arbiter's thread-local storage contains an item type.
|
||||||
|
#[deprecated = "Will be removed in stable v2."]
|
||||||
|
pub fn contains_item<T: 'static>() -> bool {
|
||||||
|
STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::<T>()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Call a function with a shared reference to an item in this Arbiter's thread-local storage.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if item is not in Arbiter's thread-local item storage.
|
||||||
|
#[deprecated = "Will be removed in stable v2."]
|
||||||
|
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
|
||||||
|
where
|
||||||
|
F: FnMut(&T) -> R,
|
||||||
|
{
|
||||||
|
STORAGE.with(move |cell| {
|
||||||
|
let st = cell.borrow();
|
||||||
|
|
||||||
|
let type_id = TypeId::of::<T>();
|
||||||
|
let item = st.get(&type_id).and_then(downcast_ref).unwrap();
|
||||||
|
|
||||||
|
f(item)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Call a function with a mutable reference to an item in this Arbiter's thread-local storage.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if item is not in Arbiter's thread-local item storage.
|
||||||
|
#[deprecated = "Will be removed in stable v2."]
|
||||||
|
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
|
||||||
|
where
|
||||||
|
F: FnMut(&mut T) -> R,
|
||||||
|
{
|
||||||
|
STORAGE.with(move |cell| {
|
||||||
|
let mut st = cell.borrow_mut();
|
||||||
|
|
||||||
|
let type_id = TypeId::of::<T>();
|
||||||
|
let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap();
|
||||||
|
|
||||||
|
f(item)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A persistent future that processes [Arbiter] commands.
|
||||||
|
struct ArbiterRunner {
|
||||||
|
rx: mpsc::UnboundedReceiver<ArbiterCommand>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for ArbiterRunner {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
// process all items currently buffered in channel
|
||||||
|
loop {
|
||||||
|
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
|
||||||
|
// channel closed; no more messages can be received
|
||||||
|
None => return Poll::Ready(()),
|
||||||
|
|
||||||
|
// process arbiter command
|
||||||
|
Some(item) => match item {
|
||||||
|
ArbiterCommand::Stop => {
|
||||||
|
return Poll::Ready(());
|
||||||
|
}
|
||||||
|
ArbiterCommand::Execute(task_fut) => {
|
||||||
|
tokio::task::spawn_local(task_fut);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn downcast_ref<T: 'static>(boxed: &Box<dyn Any>) -> Option<&T> {
|
||||||
|
boxed.downcast_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn downcast_mut<T: 'static>(boxed: &mut Box<dyn Any>) -> Option<&mut T> {
|
||||||
|
boxed.downcast_mut()
|
||||||
|
}
|
|
@ -1,4 +1,37 @@
|
||||||
//! Tokio-based single-thread async runtime for the Actix ecosystem.
|
//! Tokio-based single-thread async runtime for the Actix ecosystem.
|
||||||
|
//!
|
||||||
|
//! In most parts of the the Actix ecosystem, it has been chosen to use !Send futures. For this
|
||||||
|
//! reason, a single-threaded runtime is appropriate since it is guaranteed that futures will not
|
||||||
|
//! be moved between threads. This can result in small performance improvements over cases where
|
||||||
|
//! atomics would otherwise be needed.
|
||||||
|
//!
|
||||||
|
//! To achieve similar performance to multi-threaded, work-stealing runtimes, applications
|
||||||
|
//! using `actix-rt` will create multiple, mostly disconnected, single-threaded runtimes.
|
||||||
|
//! This approach has good performance characteristics for workloads where the majority of tasks
|
||||||
|
//! have similar runtime expense.
|
||||||
|
//!
|
||||||
|
//! The disadvantage is that idle threads will not steal work from very busy, stuck or otherwise
|
||||||
|
//! backlogged threads. Tasks that are disproportionately expensive should be offloaded to the
|
||||||
|
//! blocking thread-pool using [`task::spawn_blocking`].
|
||||||
|
//!
|
||||||
|
//! # Examples
|
||||||
|
//! ```
|
||||||
|
//! use std::sync::mpsc;
|
||||||
|
//! use actix_rt::{Arbiter, System};
|
||||||
|
//!
|
||||||
|
//! let _ = System::new();
|
||||||
|
//!
|
||||||
|
//! let (tx, rx) = mpsc::channel::<u32>();
|
||||||
|
//!
|
||||||
|
//! let arbiter = Arbiter::new();
|
||||||
|
//! arbiter.spawn_fn(move || tx.send(42).unwrap());
|
||||||
|
//!
|
||||||
|
//! let num = rx.recv().unwrap();
|
||||||
|
//! assert_eq!(num, 42);
|
||||||
|
//!
|
||||||
|
//! arbiter.stop();
|
||||||
|
//! arbiter.join().unwrap();
|
||||||
|
//! ```
|
||||||
|
|
||||||
#![deny(rust_2018_idioms, nonstandard_style)]
|
#![deny(rust_2018_idioms, nonstandard_style)]
|
||||||
#![allow(clippy::type_complexity)]
|
#![allow(clippy::type_complexity)]
|
||||||
|
@ -15,13 +48,13 @@ use tokio::task::JoinHandle;
|
||||||
#[cfg(all(feature = "macros", not(test)))]
|
#[cfg(all(feature = "macros", not(test)))]
|
||||||
pub use actix_macros::{main, test};
|
pub use actix_macros::{main, test};
|
||||||
|
|
||||||
|
mod arbiter;
|
||||||
mod runtime;
|
mod runtime;
|
||||||
mod system;
|
mod system;
|
||||||
mod worker;
|
|
||||||
|
|
||||||
pub use self::runtime::Runtime;
|
pub use self::arbiter::{Arbiter, ArbiterHandle};
|
||||||
|
pub use self::runtime::ActixRuntime;
|
||||||
pub use self::system::{System, SystemRunner};
|
pub use self::system::{System, SystemRunner};
|
||||||
pub use self::worker::{Worker, WorkerHandle};
|
|
||||||
|
|
||||||
pub mod signal {
|
pub mod signal {
|
||||||
//! Asynchronous signal handling (Tokio re-exports).
|
//! Asynchronous signal handling (Tokio re-exports).
|
||||||
|
@ -59,7 +92,7 @@ pub mod task {
|
||||||
pub use tokio::task::{spawn_blocking, yield_now, JoinHandle};
|
pub use tokio::task::{spawn_blocking, yield_now, JoinHandle};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns a future on the current [worker](Worker).
|
/// Spawns a future on the current thread.
|
||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
/// Panics if Actix system is not running.
|
/// Panics if Actix system is not running.
|
||||||
|
|
|
@ -2,29 +2,40 @@ use std::{future::Future, io};
|
||||||
|
|
||||||
use tokio::task::{JoinHandle, LocalSet};
|
use tokio::task::{JoinHandle, LocalSet};
|
||||||
|
|
||||||
/// A single-threaded runtime based on Tokio's "current thread" runtime.
|
/// A runtime based on Tokio runtime.
|
||||||
///
|
///
|
||||||
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
|
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
|
||||||
/// on submitted futures.
|
/// on submitted futures.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Runtime {
|
pub struct ActixRuntime {
|
||||||
local: LocalSet,
|
local: LocalSet,
|
||||||
rt: tokio::runtime::Runtime,
|
rt: tokio::runtime::Runtime,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Runtime {
|
impl From<tokio::runtime::Runtime> for ActixRuntime {
|
||||||
/// Returns a new runtime initialized with default configuration values.
|
fn from(rt: tokio::runtime::Runtime) -> Self {
|
||||||
|
Self {
|
||||||
|
local: LocalSet::new(),
|
||||||
|
rt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ActixRuntime {
|
||||||
|
/// Returns a new ActixRuntime instance.
|
||||||
#[allow(clippy::new_ret_no_self)]
|
#[allow(clippy::new_ret_no_self)]
|
||||||
pub fn new() -> io::Result<Runtime> {
|
pub fn new() -> io::Result<Self> {
|
||||||
let rt = tokio::runtime::Builder::new_current_thread()
|
let rt = Self::new_tokio_rt()?;
|
||||||
|
Ok(ActixRuntime::from(rt))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a new tokio current thread runtime initialized with default configuration values.
|
||||||
|
#[allow(clippy::new_ret_no_self)]
|
||||||
|
pub fn new_tokio_rt() -> io::Result<tokio::runtime::Runtime> {
|
||||||
|
tokio::runtime::Builder::new_current_thread()
|
||||||
.enable_io()
|
.enable_io()
|
||||||
.enable_time()
|
.enable_time()
|
||||||
.build()?;
|
.build()
|
||||||
|
|
||||||
Ok(Runtime {
|
|
||||||
rt,
|
|
||||||
local: LocalSet::new(),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reference to local task set.
|
/// Reference to local task set.
|
||||||
|
@ -40,7 +51,7 @@ impl Runtime {
|
||||||
///
|
///
|
||||||
/// # Examples
|
/// # Examples
|
||||||
/// ```
|
/// ```
|
||||||
/// let rt = actix_rt::Runtime::new().unwrap();
|
/// let rt = actix_rt::ActixRuntime::new().unwrap();
|
||||||
///
|
///
|
||||||
/// // Spawn a future onto the runtime
|
/// // Spawn a future onto the runtime
|
||||||
/// let handle = rt.spawn(async {
|
/// let handle = rt.spawn(async {
|
||||||
|
|
|
@ -11,7 +11,7 @@ use std::{
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use crate::{worker::WorkerHandle, Runtime, Worker};
|
use crate::{arbiter::ArbiterHandle, ActixRuntime, Arbiter};
|
||||||
|
|
||||||
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
|
||||||
|
@ -25,8 +25,8 @@ pub struct System {
|
||||||
id: usize,
|
id: usize,
|
||||||
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
||||||
|
|
||||||
/// First worker that is created as part of the System.
|
/// Handle to the first [Arbiter] that is created with the System.
|
||||||
worker_handle: WorkerHandle,
|
arbiter_handle: ArbiterHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl System {
|
impl System {
|
||||||
|
@ -36,49 +36,34 @@ impl System {
|
||||||
/// Panics if underlying Tokio runtime can not be created.
|
/// Panics if underlying Tokio runtime can not be created.
|
||||||
#[allow(clippy::new_ret_no_self)]
|
#[allow(clippy::new_ret_no_self)]
|
||||||
pub fn new() -> SystemRunner {
|
pub fn new() -> SystemRunner {
|
||||||
Self::create_runtime(async {})
|
Self::with_tokio_rt(|| {
|
||||||
|
ActixRuntime::new_tokio_rt().expect("Cannot create new System's Runtime.")
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a new system with given initialization future.
|
/// Create a new system with a closure that return a tokio Runtime instance
|
||||||
///
|
pub fn with_tokio_rt<F>(f: F) -> SystemRunner
|
||||||
/// The initialization future be run to completion (blocking current thread) before the system
|
where
|
||||||
/// runner is returned.
|
F: Fn() -> tokio::runtime::Runtime,
|
||||||
///
|
{
|
||||||
/// # Panics
|
|
||||||
/// Panics if underlying Tokio runtime can not be created.
|
|
||||||
pub fn with_init(init_fut: impl Future) -> SystemRunner {
|
|
||||||
Self::create_runtime(init_fut)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Constructs new system and registers it on the current thread.
|
|
||||||
pub(crate) fn construct(
|
|
||||||
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
|
||||||
worker: WorkerHandle,
|
|
||||||
) -> Self {
|
|
||||||
let sys = System {
|
|
||||||
sys_tx,
|
|
||||||
worker_handle: worker,
|
|
||||||
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
|
|
||||||
};
|
|
||||||
|
|
||||||
System::set_current(sys.clone());
|
|
||||||
|
|
||||||
sys
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_runtime(init_fut: impl Future) -> SystemRunner {
|
|
||||||
let (stop_tx, stop_rx) = oneshot::channel();
|
let (stop_tx, stop_rx) = oneshot::channel();
|
||||||
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
|
let tokio_rt = f();
|
||||||
let system = System::construct(sys_tx, Worker::new_current_thread(rt.local_set()));
|
let rt = ActixRuntime::from(tokio_rt);
|
||||||
|
|
||||||
// init background system worker
|
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
|
||||||
let sys_worker = SystemController::new(sys_rx, stop_tx);
|
let system = System::construct(sys_tx, sys_arbiter.clone());
|
||||||
rt.spawn(sys_worker);
|
|
||||||
|
|
||||||
// run system init future
|
system
|
||||||
rt.block_on(init_fut);
|
.tx()
|
||||||
|
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// init background system arbiter
|
||||||
|
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
|
||||||
|
|
||||||
|
rt.spawn(sys_ctrl);
|
||||||
|
|
||||||
SystemRunner {
|
SystemRunner {
|
||||||
rt,
|
rt,
|
||||||
|
@ -87,6 +72,22 @@ impl System {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Constructs new system and registers it on the current thread.
|
||||||
|
pub(crate) fn construct(
|
||||||
|
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
||||||
|
arbiter_handle: ArbiterHandle,
|
||||||
|
) -> Self {
|
||||||
|
let sys = System {
|
||||||
|
sys_tx,
|
||||||
|
arbiter_handle,
|
||||||
|
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
|
||||||
|
};
|
||||||
|
|
||||||
|
System::set_current(sys.clone());
|
||||||
|
|
||||||
|
sys
|
||||||
|
}
|
||||||
|
|
||||||
/// Get current running system.
|
/// Get current running system.
|
||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
|
@ -98,9 +99,9 @@ impl System {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get handle to a the System's initial [Worker].
|
/// Get handle to a the System's initial [Arbiter].
|
||||||
pub fn worker(&self) -> &WorkerHandle {
|
pub fn arbiter(&self) -> &ArbiterHandle {
|
||||||
&self.worker_handle
|
&self.arbiter_handle
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if there is a System registered on the current thread.
|
/// Check if there is a System registered on the current thread.
|
||||||
|
@ -142,7 +143,7 @@ impl System {
|
||||||
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SystemRunner {
|
pub struct SystemRunner {
|
||||||
rt: Runtime,
|
rt: ActixRuntime,
|
||||||
stop_rx: oneshot::Receiver<i32>,
|
stop_rx: oneshot::Receiver<i32>,
|
||||||
system: System,
|
system: System,
|
||||||
}
|
}
|
||||||
|
@ -179,17 +180,17 @@ impl SystemRunner {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) enum SystemCommand {
|
pub(crate) enum SystemCommand {
|
||||||
Exit(i32),
|
Exit(i32),
|
||||||
RegisterWorker(usize, WorkerHandle),
|
RegisterArbiter(usize, ArbiterHandle),
|
||||||
DeregisterWorker(usize),
|
DeregisterArbiter(usize),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// There is one `SystemController` per [System]. It runs in the background, keeping track of
|
/// There is one `SystemController` per [System]. It runs in the background, keeping track of
|
||||||
/// [Worker]s and is able to distribute a system-wide stop command.
|
/// [Arbiter]s and is able to distribute a system-wide stop command.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct SystemController {
|
pub(crate) struct SystemController {
|
||||||
stop_tx: Option<oneshot::Sender<i32>>,
|
stop_tx: Option<oneshot::Sender<i32>>,
|
||||||
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
|
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
|
||||||
workers: HashMap<usize, WorkerHandle>,
|
arbiters: HashMap<usize, ArbiterHandle>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SystemController {
|
impl SystemController {
|
||||||
|
@ -200,7 +201,7 @@ impl SystemController {
|
||||||
SystemController {
|
SystemController {
|
||||||
cmd_rx,
|
cmd_rx,
|
||||||
stop_tx: Some(stop_tx),
|
stop_tx: Some(stop_tx),
|
||||||
workers: HashMap::with_capacity(4),
|
arbiters: HashMap::with_capacity(4),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -218,9 +219,9 @@ impl Future for SystemController {
|
||||||
// process system command
|
// process system command
|
||||||
Some(cmd) => match cmd {
|
Some(cmd) => match cmd {
|
||||||
SystemCommand::Exit(code) => {
|
SystemCommand::Exit(code) => {
|
||||||
// stop workers
|
// stop all arbiters
|
||||||
for wkr in self.workers.values() {
|
for arb in self.arbiters.values() {
|
||||||
wkr.stop();
|
arb.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop event loop
|
// stop event loop
|
||||||
|
@ -230,12 +231,12 @@ impl Future for SystemController {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SystemCommand::RegisterWorker(name, hnd) => {
|
SystemCommand::RegisterArbiter(id, arb) => {
|
||||||
self.workers.insert(name, hnd);
|
self.arbiters.insert(id, arb);
|
||||||
}
|
}
|
||||||
|
|
||||||
SystemCommand::DeregisterWorker(name) => {
|
SystemCommand::DeregisterArbiter(id) => {
|
||||||
self.workers.remove(&name);
|
self.arbiters.remove(&id);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,7 @@ use std::{
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_rt::{System, Worker};
|
use actix_rt::{Arbiter, System};
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -21,84 +21,77 @@ fn await_for_timer() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn join_another_worker() {
|
fn join_another_arbiter() {
|
||||||
let time = Duration::from_secs(1);
|
let time = Duration::from_secs(1);
|
||||||
let instant = Instant::now();
|
let instant = Instant::now();
|
||||||
System::new().block_on(async move {
|
System::new().block_on(async move {
|
||||||
let worker = Worker::new();
|
let arbiter = Arbiter::new();
|
||||||
worker.spawn(Box::pin(async move {
|
arbiter.spawn(Box::pin(async move {
|
||||||
tokio::time::sleep(time).await;
|
tokio::time::sleep(time).await;
|
||||||
Worker::handle().stop();
|
Arbiter::current().stop();
|
||||||
}));
|
}));
|
||||||
worker.join().unwrap();
|
arbiter.join().unwrap();
|
||||||
});
|
});
|
||||||
assert!(
|
assert!(
|
||||||
instant.elapsed() >= time,
|
instant.elapsed() >= time,
|
||||||
"Join on another worker should complete only when it calls stop"
|
"Join on another arbiter should complete only when it calls stop"
|
||||||
);
|
);
|
||||||
|
|
||||||
let instant = Instant::now();
|
let instant = Instant::now();
|
||||||
System::new().block_on(async move {
|
System::new().block_on(async move {
|
||||||
let worker = Worker::new();
|
let arbiter = Arbiter::new();
|
||||||
worker.spawn_fn(move || {
|
arbiter.spawn_fn(move || {
|
||||||
actix_rt::spawn(async move {
|
actix_rt::spawn(async move {
|
||||||
tokio::time::sleep(time).await;
|
tokio::time::sleep(time).await;
|
||||||
Worker::handle().stop();
|
Arbiter::current().stop();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
worker.join().unwrap();
|
arbiter.join().unwrap();
|
||||||
});
|
});
|
||||||
assert!(
|
assert!(
|
||||||
instant.elapsed() >= time,
|
instant.elapsed() >= time,
|
||||||
"Join on a worker that has used actix_rt::spawn should wait for said future"
|
"Join on an arbiter that has used actix_rt::spawn should wait for said future"
|
||||||
);
|
);
|
||||||
|
|
||||||
let instant = Instant::now();
|
let instant = Instant::now();
|
||||||
System::new().block_on(async move {
|
System::new().block_on(async move {
|
||||||
let worker = Worker::new();
|
let arbiter = Arbiter::new();
|
||||||
worker.spawn(Box::pin(async move {
|
arbiter.spawn(Box::pin(async move {
|
||||||
tokio::time::sleep(time).await;
|
tokio::time::sleep(time).await;
|
||||||
Worker::handle().stop();
|
Arbiter::current().stop();
|
||||||
}));
|
}));
|
||||||
worker.stop();
|
arbiter.stop();
|
||||||
worker.join().unwrap();
|
arbiter.join().unwrap();
|
||||||
});
|
});
|
||||||
assert!(
|
assert!(
|
||||||
instant.elapsed() < time,
|
instant.elapsed() < time,
|
||||||
"Premature stop of worker should conclude regardless of it's current state"
|
"Premature stop of arbiter should conclude regardless of it's current state"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn non_static_block_on() {
|
fn non_static_block_on() {
|
||||||
let string = String::from("test_str");
|
let string = String::from("test_str");
|
||||||
let str = string.as_str();
|
let string = string.as_str();
|
||||||
|
|
||||||
let sys = System::new();
|
let sys = System::new();
|
||||||
|
|
||||||
sys.block_on(async {
|
sys.block_on(async {
|
||||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||||
assert_eq!("test_str", str);
|
assert_eq!("test_str", string);
|
||||||
});
|
});
|
||||||
|
|
||||||
let rt = actix_rt::Runtime::new().unwrap();
|
let rt = actix_rt::ActixRuntime::new().unwrap();
|
||||||
|
|
||||||
rt.block_on(async {
|
rt.block_on(async {
|
||||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||||
assert_eq!("test_str", str);
|
assert_eq!("test_str", string);
|
||||||
});
|
});
|
||||||
|
|
||||||
System::with_init(async {
|
|
||||||
assert_eq!("test_str", str);
|
|
||||||
System::current().stop();
|
|
||||||
})
|
|
||||||
.run()
|
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn wait_for_spawns() {
|
fn wait_for_spawns() {
|
||||||
let rt = actix_rt::Runtime::new().unwrap();
|
let rt = actix_rt::ActixRuntime::new().unwrap();
|
||||||
|
|
||||||
let handle = rt.spawn(async {
|
let handle = rt.spawn(async {
|
||||||
println!("running on the runtime");
|
println!("running on the runtime");
|
||||||
|
@ -110,70 +103,71 @@ fn wait_for_spawns() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn worker_spawn_fn_runs() {
|
fn arbiter_spawn_fn_runs() {
|
||||||
let _ = System::new();
|
let _ = System::new();
|
||||||
|
|
||||||
let (tx, rx) = channel::<u32>();
|
let (tx, rx) = channel::<u32>();
|
||||||
|
|
||||||
let worker = Worker::new();
|
let arbiter = Arbiter::new();
|
||||||
worker.spawn_fn(move || tx.send(42).unwrap());
|
arbiter.spawn_fn(move || tx.send(42).unwrap());
|
||||||
|
|
||||||
let num = rx.recv().unwrap();
|
let num = rx.recv().unwrap();
|
||||||
assert_eq!(num, 42);
|
assert_eq!(num, 42);
|
||||||
|
|
||||||
worker.stop();
|
arbiter.stop();
|
||||||
worker.join().unwrap();
|
arbiter.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn worker_drop_no_panic_fn() {
|
fn arbiter_drop_no_panic_fn() {
|
||||||
let _ = System::new();
|
let _ = System::new();
|
||||||
|
|
||||||
let worker = Worker::new();
|
let arbiter = Arbiter::new();
|
||||||
worker.spawn_fn(|| panic!("test"));
|
arbiter.spawn_fn(|| panic!("test"));
|
||||||
|
|
||||||
worker.stop();
|
arbiter.stop();
|
||||||
worker.join().unwrap();
|
arbiter.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn worker_drop_no_panic_fut() {
|
fn arbiter_drop_no_panic_fut() {
|
||||||
let _ = System::new();
|
let _ = System::new();
|
||||||
|
|
||||||
let worker = Worker::new();
|
let arbiter = Arbiter::new();
|
||||||
worker.spawn(async { panic!("test") });
|
arbiter.spawn(async { panic!("test") });
|
||||||
|
|
||||||
worker.stop();
|
arbiter.stop();
|
||||||
worker.join().unwrap();
|
arbiter.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn worker_item_storage() {
|
#[allow(deprecated)]
|
||||||
|
fn arbiter_item_storage() {
|
||||||
let _ = System::new();
|
let _ = System::new();
|
||||||
|
|
||||||
let worker = Worker::new();
|
let arbiter = Arbiter::new();
|
||||||
|
|
||||||
assert!(!Worker::contains_item::<u32>());
|
assert!(!Arbiter::contains_item::<u32>());
|
||||||
Worker::set_item(42u32);
|
Arbiter::set_item(42u32);
|
||||||
assert!(Worker::contains_item::<u32>());
|
assert!(Arbiter::contains_item::<u32>());
|
||||||
|
|
||||||
Worker::get_item(|&item: &u32| assert_eq!(item, 42));
|
Arbiter::get_item(|&item: &u32| assert_eq!(item, 42));
|
||||||
Worker::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42));
|
Arbiter::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42));
|
||||||
|
|
||||||
let thread = thread::spawn(move || {
|
let thread = thread::spawn(move || {
|
||||||
Worker::get_item(|&_item: &u32| unreachable!("u32 not in this thread"));
|
Arbiter::get_item(|&_item: &u32| unreachable!("u32 not in this thread"));
|
||||||
})
|
})
|
||||||
.join();
|
.join();
|
||||||
assert!(thread.is_err());
|
assert!(thread.is_err());
|
||||||
|
|
||||||
let thread = thread::spawn(move || {
|
let thread = thread::spawn(move || {
|
||||||
Worker::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread"));
|
Arbiter::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread"));
|
||||||
})
|
})
|
||||||
.join();
|
.join();
|
||||||
assert!(thread.is_err());
|
assert!(thread.is_err());
|
||||||
|
|
||||||
worker.stop();
|
arbiter.stop();
|
||||||
worker.join().unwrap();
|
arbiter.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -184,31 +178,31 @@ fn no_system_current_panic() {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[should_panic]
|
#[should_panic]
|
||||||
fn no_system_worker_new_panic() {
|
fn no_system_arbiter_new_panic() {
|
||||||
Worker::new();
|
Arbiter::new();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn system_worker_spawn() {
|
fn system_arbiter_spawn() {
|
||||||
let runner = System::new();
|
let runner = System::new();
|
||||||
|
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
let sys = System::current();
|
let sys = System::current();
|
||||||
|
|
||||||
thread::spawn(|| {
|
thread::spawn(|| {
|
||||||
// this thread will have no worker in it's thread local so call will panic
|
// this thread will have no arbiter in it's thread local so call will panic
|
||||||
Worker::handle();
|
Arbiter::current();
|
||||||
})
|
})
|
||||||
.join()
|
.join()
|
||||||
.unwrap_err();
|
.unwrap_err();
|
||||||
|
|
||||||
let thread = thread::spawn(|| {
|
let thread = thread::spawn(|| {
|
||||||
// this thread will have no worker in it's thread local so use the system handle instead
|
// this thread will have no arbiter in it's thread local so use the system handle instead
|
||||||
System::set_current(sys);
|
System::set_current(sys);
|
||||||
let sys = System::current();
|
let sys = System::current();
|
||||||
|
|
||||||
let wrk = sys.worker();
|
let arb = sys.arbiter();
|
||||||
wrk.spawn(async move {
|
arb.spawn(async move {
|
||||||
tx.send(42u32).unwrap();
|
tx.send(42u32).unwrap();
|
||||||
System::current().stop();
|
System::current().stop();
|
||||||
});
|
});
|
||||||
|
@ -217,3 +211,73 @@ fn system_worker_spawn() {
|
||||||
assert_eq!(runner.block_on(rx).unwrap(), 42);
|
assert_eq!(runner.block_on(rx).unwrap(), 42);
|
||||||
thread.join().unwrap();
|
thread.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn system_stop_stops_arbiters() {
|
||||||
|
let sys = System::new();
|
||||||
|
let arb = Arbiter::new();
|
||||||
|
|
||||||
|
// arbiter should be alive to receive spawn msg
|
||||||
|
assert!(Arbiter::current().spawn_fn(|| {}));
|
||||||
|
assert!(arb.spawn_fn(|| {}));
|
||||||
|
|
||||||
|
System::current().stop();
|
||||||
|
sys.run().unwrap();
|
||||||
|
|
||||||
|
// arbiter should be dead and return false
|
||||||
|
assert!(!Arbiter::current().spawn_fn(|| {}));
|
||||||
|
assert!(!arb.spawn_fn(|| {}));
|
||||||
|
|
||||||
|
arb.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn new_system_with_tokio() {
|
||||||
|
let res = System::with_tokio_rt(move || {
|
||||||
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
|
.enable_io()
|
||||||
|
.enable_time()
|
||||||
|
.thread_keep_alive(std::time::Duration::from_millis(1000))
|
||||||
|
.worker_threads(2)
|
||||||
|
.max_blocking_threads(2)
|
||||||
|
.on_thread_start(|| {})
|
||||||
|
.on_thread_stop(|| {})
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
.block_on(async {
|
||||||
|
actix_rt::time::sleep(std::time::Duration::from_millis(1)).await;
|
||||||
|
123usize
|
||||||
|
});
|
||||||
|
|
||||||
|
assert_eq!(res, 123);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn new_arbiter_with_tokio() {
|
||||||
|
let _ = System::new();
|
||||||
|
|
||||||
|
let arb = Arbiter::with_tokio_rt(|| {
|
||||||
|
tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
let counter = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true));
|
||||||
|
|
||||||
|
let counter1 = counter.clone();
|
||||||
|
let did_spawn = arb.spawn(async move {
|
||||||
|
actix_rt::time::sleep(std::time::Duration::from_millis(1)).await;
|
||||||
|
counter1.store(false, std::sync::atomic::Ordering::SeqCst);
|
||||||
|
Arbiter::current().stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
assert!(did_spawn);
|
||||||
|
|
||||||
|
arb.join().unwrap();
|
||||||
|
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||||
|
|
||||||
|
assert_eq!(false, counter.load(std::sync::atomic::Ordering::SeqCst));
|
||||||
|
}
|
||||||
|
|
|
@ -180,7 +180,7 @@ impl Worker {
|
||||||
state: WorkerState::Unavailable,
|
state: WorkerState::Unavailable,
|
||||||
});
|
});
|
||||||
|
|
||||||
actix_rt::Runtime::new().unwrap().block_on(async move {
|
actix_rt::ActixRuntime::new().unwrap().block_on(async move {
|
||||||
let fut = wrk
|
let fut = wrk
|
||||||
.factories
|
.factories
|
||||||
.iter()
|
.iter()
|
||||||
|
|
Loading…
Reference in New Issue