rename back to arbiter

This commit is contained in:
Rob Ede 2021-01-31 02:51:29 +00:00
parent df15ef40f6
commit 7e6b23af6f
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6
7 changed files with 155 additions and 158 deletions

View File

@ -1,25 +1,22 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Rename `Arbiter => Worker`. [#254]
* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253] * Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253]
* Return `JoinHandle` from `actix_rt::spawn`. [#253] * Return `JoinHandle` from `actix_rt::spawn`. [#253]
* Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] * Remove old `Arbiter::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253]
* Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253] * Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253]
* Remove `Worker::exec`. [#253] * Remove `Arbiter::exec`. [#253]
* Remove `System::arbiter`. [#256] * Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253]
* Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253] * `Arbiter::spawn` now accepts !Unpin futures. [#256]
* `Worker::spawn` now accepts !Unpin futures. [#256]
* `System::new` no longer takes arguments. [#257] * `System::new` no longer takes arguments. [#257]
* Remove `System::with_current`. [#257] * Remove `System::with_current`. [#257]
* Remove `Builder`. [#257] * Remove `Builder`. [#257]
* Add `System::with_init` as replacement for `Builder::run`. [#257] * Add `System::with_init` as replacement for `Builder::run`. [#257]
* Rename `System::{is_set => is_registered}`. [#257] * Rename `System::{is_set => is_registered}`. [#257]
* Add `WorkerHandle` for sending messages to non-current-thread workers. [#257]. * Add `ArbiterHandle` for sending messages to non-current-thread arbiters. [#257].
* `System::worker` now returns a `&WorkerHandle`. [#257] * `System::arbiter` now returns a `&ArbiterHandle`. [#257]
* Rename `Worker::{current => handle}` and return a `WorkerHandle` instead. [#257] * Rename `Arbiter::{current => handle}` and return a `ArbiterHandle` instead. [#257]
* `Worker::join` now takes self by value. [#257] * `Arbiter::join` now takes self by value. [#257]
[#253]: https://github.com/actix/actix-net/pull/253 [#253]: https://github.com/actix/actix-net/pull/253
[#254]: https://github.com/actix/actix-net/pull/254 [#254]: https://github.com/actix/actix-net/pull/254

View File

@ -21,55 +21,55 @@ use crate::{
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
thread_local!( thread_local!(
static HANDLE: RefCell<Option<WorkerHandle>> = RefCell::new(None); static HANDLE: RefCell<Option<ArbiterHandle>> = RefCell::new(None);
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new()); static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
); );
pub(crate) enum WorkerCommand { pub(crate) enum ArbiterCommand {
Stop, Stop,
Execute(Pin<Box<dyn Future<Output = ()> + Send>>), Execute(Pin<Box<dyn Future<Output = ()> + Send>>),
} }
impl fmt::Debug for WorkerCommand { impl fmt::Debug for ArbiterCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
WorkerCommand::Stop => write!(f, "WorkerCommand::Stop"), ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"),
WorkerCommand::Execute(_) => write!(f, "WorkerCommand::Execute"), ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
} }
} }
} }
/// A handle for sending spawn and stop messages to a [Worker]. /// A handle for sending spawn and stop messages to an [Arbiter].
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct WorkerHandle { pub struct ArbiterHandle {
sender: mpsc::UnboundedSender<WorkerCommand>, sender: mpsc::UnboundedSender<ArbiterCommand>,
} }
impl WorkerHandle { impl ArbiterHandle {
pub(crate) fn new(sender: mpsc::UnboundedSender<WorkerCommand>) -> Self { pub(crate) fn new(sender: mpsc::UnboundedSender<ArbiterCommand>) -> Self {
Self { sender } Self { sender }
} }
/// Send a future to the Worker's thread and spawn it. /// Send a future to the [Arbiter]'s thread and spawn it.
/// ///
/// If you require a result, include a response channel in the future. /// If you require a result, include a response channel in the future.
/// ///
/// Returns true if future was sent successfully and false if the Worker has died. /// Returns true if future was sent successfully and false if the [Arbiter] has died.
pub fn spawn<Fut>(&self, future: Fut) -> bool pub fn spawn<Fut>(&self, future: Fut) -> bool
where where
Fut: Future<Output = ()> + Send + 'static, Fut: Future<Output = ()> + Send + 'static,
{ {
self.sender self.sender
.send(WorkerCommand::Execute(Box::pin(future))) .send(ArbiterCommand::Execute(Box::pin(future)))
.is_ok() .is_ok()
} }
/// Send a function to the Worker's thread and execute it. /// Send a function to the [Arbiter]'s thread and execute it.
/// ///
/// Any result from the function is discarded. If you require a result, include a response /// Any result from the function is discarded. If you require a result, include a response
/// channel in the function. /// channel in the function.
/// ///
/// Returns true if function was sent successfully and false if the Worker has died. /// Returns true if function was sent successfully and false if the [Arbiter] has died.
pub fn spawn_fn<F>(&self, f: F) -> bool pub fn spawn_fn<F>(&self, f: F) -> bool
where where
F: FnOnce() + Send + 'static, F: FnOnce() + Send + 'static,
@ -77,35 +77,35 @@ impl WorkerHandle {
self.spawn(async { f() }) self.spawn(async { f() })
} }
/// Instruct worker to stop processing it's event loop. /// Instruct [Arbiter] to stop processing it's event loop.
/// ///
/// Returns true if stop message was sent successfully and false if the Worker has been dropped. /// Returns true if stop message was sent successfully and false if the [Arbiter] has
/// been dropped.
pub fn stop(&self) -> bool { pub fn stop(&self) -> bool {
self.sender.send(WorkerCommand::Stop).is_ok() self.sender.send(ArbiterCommand::Stop).is_ok()
} }
} }
/// A worker represent a thread that provides an asynchronous execution environment for futures /// A arbiter represents a thread that provides an asynchronous execution environment for futures
/// and functions. /// and functions.
/// ///
/// When a Worker is created, it spawns a new [OS thread](thread), and hosts an event loop. /// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
#[derive(Debug)] #[derive(Debug)]
pub struct Worker { pub struct Arbiter {
sender: mpsc::UnboundedSender<WorkerCommand>, sender: mpsc::UnboundedSender<ArbiterCommand>,
thread_handle: thread::JoinHandle<()>, thread_handle: thread::JoinHandle<()>,
} }
impl Worker { impl Arbiter {
/// Spawn new thread and run event loop in spawned thread. /// Spawn new Arbiter thread and start its event loop.
///
/// Returns handle of newly created worker.
/// ///
/// # Panics /// # Panics
/// Panics if a [System] is not registered on the current thread. /// Panics if a [System] is not registered on the current thread.
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub fn new() -> Worker { pub fn new() -> Arbiter {
let id = COUNT.fetch_add(1, Ordering::Relaxed); let id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt:worker:{}", id); let system_id = System::current().id();
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id);
let sys = System::current(); let sys = System::current();
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
@ -115,89 +115,89 @@ impl Worker {
let tx = tx.clone(); let tx = tx.clone();
move || { move || {
let rt = Runtime::new().expect("Can not create Runtime"); let rt = Runtime::new().expect("Can not create Runtime");
let hnd = WorkerHandle::new(tx); let hnd = ArbiterHandle::new(tx);
System::set_current(sys); System::set_current(sys);
STORAGE.with(|cell| cell.borrow_mut().clear()); STORAGE.with(|cell| cell.borrow_mut().clear());
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
// register worker // register arbiter
let _ = System::current() let _ = System::current()
.tx() .tx()
.send(SystemCommand::RegisterWorker(id, hnd)); .send(SystemCommand::RegisterArbiter(id, hnd));
// run worker event processing loop // run arbiter event processing loop
rt.block_on(WorkerRunner { rx }); rt.block_on(ArbiterRunner { rx });
// deregister worker // deregister arbiter
let _ = System::current() let _ = System::current()
.tx() .tx()
.send(SystemCommand::DeregisterWorker(id)); .send(SystemCommand::DeregisterArbiter(id));
} }
}) })
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
panic!("Cannot spawn a Worker's thread {:?}: {:?}", &name, err) panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
}); });
Worker { Arbiter {
sender: tx, sender: tx,
thread_handle, thread_handle,
} }
} }
/// Sets up a worker runner on the current thread using the provided runtime local task set. /// Sets up an Arbiter runner on the current thread using the provided runtime local task set.
pub(crate) fn new_current_thread(local: &LocalSet) -> WorkerHandle { pub(crate) fn new_current_thread(local: &LocalSet) -> ArbiterHandle {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let hnd = WorkerHandle::new(tx); let hnd = ArbiterHandle::new(tx);
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
STORAGE.with(|cell| cell.borrow_mut().clear()); STORAGE.with(|cell| cell.borrow_mut().clear());
local.spawn_local(WorkerRunner { rx }); local.spawn_local(ArbiterRunner { rx });
hnd hnd
} }
/// Return a handle to the worker. /// Return a handle to the Arbiter's message sender.
/// ///
/// # Panics /// # Panics
/// Panics if no Worker is running on the current thread. /// Panics if no Arbiter is running on the current thread.
pub fn handle() -> WorkerHandle { pub fn handle() -> ArbiterHandle {
HANDLE.with(|cell| match *cell.borrow() { HANDLE.with(|cell| match *cell.borrow() {
Some(ref addr) => addr.clone(), Some(ref addr) => addr.clone(),
None => panic!("Worker is not running."), None => panic!("Arbiter is not running."),
}) })
} }
/// Stop worker from continuing it's event loop. /// Stop Arbiter from continuing it's event loop.
/// ///
/// Returns true if stop message was sent successfully and false if the Worker has been dropped. /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.
pub fn stop(&self) -> bool { pub fn stop(&self) -> bool {
self.sender.send(WorkerCommand::Stop).is_ok() self.sender.send(ArbiterCommand::Stop).is_ok()
} }
/// Send a future to the Worker's thread and spawn it. /// Send a future to the Arbiter's thread and spawn it.
/// ///
/// If you require a result, include a response channel in the future. /// If you require a result, include a response channel in the future.
/// ///
/// Returns true if future was sent successfully and false if the Worker has died. /// Returns true if future was sent successfully and false if the Arbiter has died.
pub fn spawn<Fut>(&self, future: Fut) -> bool pub fn spawn<Fut>(&self, future: Fut) -> bool
where where
Fut: Future<Output = ()> + Send + 'static, Fut: Future<Output = ()> + Send + 'static,
{ {
self.sender self.sender
.send(WorkerCommand::Execute(Box::pin(future))) .send(ArbiterCommand::Execute(Box::pin(future)))
.is_ok() .is_ok()
} }
/// Send a function to the Worker's thread and execute it. /// Send a function to the Arbiter's thread and execute it.
/// ///
/// Any result from the function is discarded. If you require a result, include a response /// Any result from the function is discarded. If you require a result, include a response
/// channel in the function. /// channel in the function.
/// ///
/// Returns true if function was sent successfully and false if the Worker has died. /// Returns true if function was sent successfully and false if the Arbiter has died.
pub fn spawn_fn<F>(&self, f: F) -> bool pub fn spawn_fn<F>(&self, f: F) -> bool
where where
F: FnOnce() + Send + 'static, F: FnOnce() + Send + 'static,
@ -205,29 +205,29 @@ impl Worker {
self.spawn(async { f() }) self.spawn(async { f() })
} }
/// Wait for worker's event loop to complete. /// Wait for Arbiter's event loop to complete.
/// ///
/// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join). /// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).
pub fn join(self) -> thread::Result<()> { pub fn join(self) -> thread::Result<()> {
self.thread_handle.join() self.thread_handle.join()
} }
/// Insert item into worker's thread-local storage. /// Insert item into Arbiter's thread-local storage.
/// ///
/// Overwrites any item of the same type previously inserted. /// Overwrites any item of the same type previously inserted.
pub fn set_item<T: 'static>(item: T) { pub fn set_item<T: 'static>(item: T) {
STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item))); STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::<T>(), Box::new(item)));
} }
/// Check if worker's thread-local storage contains an item type. /// Check if Arbiter's thread-local storage contains an item type.
pub fn contains_item<T: 'static>() -> bool { pub fn contains_item<T: 'static>() -> bool {
STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::<T>())) STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::<T>()))
} }
/// Call a function with a shared reference to an item in this worker's thread-local storage. /// Call a function with a shared reference to an item in this Arbiter's thread-local storage.
/// ///
/// # Panics /// # Panics
/// Panics if item is not in worker's thread-local item storage. /// Panics if item is not in Arbiter's thread-local item storage.
pub fn get_item<T: 'static, F, R>(mut f: F) -> R pub fn get_item<T: 'static, F, R>(mut f: F) -> R
where where
F: FnMut(&T) -> R, F: FnMut(&T) -> R,
@ -242,10 +242,10 @@ impl Worker {
}) })
} }
/// Call a function with a mutable reference to an item in this worker's thread-local storage. /// Call a function with a mutable reference to an item in this Arbiter's thread-local storage.
/// ///
/// # Panics /// # Panics
/// Panics if item is not in worker's thread-local item storage. /// Panics if item is not in Arbiter's thread-local item storage.
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
where where
F: FnMut(&mut T) -> R, F: FnMut(&mut T) -> R,
@ -261,12 +261,12 @@ impl Worker {
} }
} }
/// A persistent worker future that processes worker commands. /// A persistent future that processes [Arbiter] commands.
struct WorkerRunner { struct ArbiterRunner {
rx: mpsc::UnboundedReceiver<WorkerCommand>, rx: mpsc::UnboundedReceiver<ArbiterCommand>,
} }
impl Future for WorkerRunner { impl Future for ArbiterRunner {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -276,12 +276,12 @@ impl Future for WorkerRunner {
// channel closed; no more messages can be received // channel closed; no more messages can be received
None => return Poll::Ready(()), None => return Poll::Ready(()),
// process worker command // process arbiter command
Some(item) => match item { Some(item) => match item {
WorkerCommand::Stop => { ArbiterCommand::Stop => {
return Poll::Ready(()); return Poll::Ready(());
} }
WorkerCommand::Execute(task_fut) => { ArbiterCommand::Execute(task_fut) => {
tokio::task::spawn_local(task_fut); tokio::task::spawn_local(task_fut);
} }
}, },

View File

@ -15,13 +15,13 @@ use tokio::task::JoinHandle;
#[cfg(all(feature = "macros", not(test)))] #[cfg(all(feature = "macros", not(test)))]
pub use actix_macros::{main, test}; pub use actix_macros::{main, test};
mod arbiter;
mod runtime; mod runtime;
mod system; mod system;
mod worker;
pub use self::arbiter::{Arbiter, ArbiterHandle};
pub use self::runtime::Runtime; pub use self::runtime::Runtime;
pub use self::system::{System, SystemRunner}; pub use self::system::{System, SystemRunner};
pub use self::worker::{Worker, WorkerHandle};
pub mod signal { pub mod signal {
//! Asynchronous signal handling (Tokio re-exports). //! Asynchronous signal handling (Tokio re-exports).
@ -59,7 +59,7 @@ pub mod task {
pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; pub use tokio::task::{spawn_blocking, yield_now, JoinHandle};
} }
/// Spawns a future on the current [worker](Worker). /// Spawns a future on the current thread.
/// ///
/// # Panics /// # Panics
/// Panics if Actix system is not running. /// Panics if Actix system is not running.

View File

@ -11,7 +11,7 @@ use std::{
use futures_core::ready; use futures_core::ready;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use crate::{worker::WorkerHandle, Runtime, Worker}; use crate::{arbiter::ArbiterHandle, Arbiter, Runtime};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@ -25,8 +25,8 @@ pub struct System {
id: usize, id: usize,
sys_tx: mpsc::UnboundedSender<SystemCommand>, sys_tx: mpsc::UnboundedSender<SystemCommand>,
/// First worker that is created as part of the System. /// Handle to the first [Arbiter] that is created with the System.
worker_handle: WorkerHandle, arbiter_handle: ArbiterHandle,
} }
impl System { impl System {
@ -53,11 +53,11 @@ impl System {
/// Constructs new system and registers it on the current thread. /// Constructs new system and registers it on the current thread.
pub(crate) fn construct( pub(crate) fn construct(
sys_tx: mpsc::UnboundedSender<SystemCommand>, sys_tx: mpsc::UnboundedSender<SystemCommand>,
worker: WorkerHandle, arbiter_handle: ArbiterHandle,
) -> Self { ) -> Self {
let sys = System { let sys = System {
sys_tx, sys_tx,
worker_handle: worker, arbiter_handle,
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
}; };
@ -71,11 +71,11 @@ impl System {
let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created."); let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
let system = System::construct(sys_tx, Worker::new_current_thread(rt.local_set())); let system = System::construct(sys_tx, Arbiter::new_current_thread(rt.local_set()));
// init background system worker // init background system arbiter
let sys_worker = SystemController::new(sys_rx, stop_tx); let sys_ctrl = SystemController::new(sys_rx, stop_tx);
rt.spawn(sys_worker); rt.spawn(sys_ctrl);
// run system init future // run system init future
rt.block_on(init_fut); rt.block_on(init_fut);
@ -98,9 +98,9 @@ impl System {
}) })
} }
/// Get handle to a the System's initial [Worker]. /// Get handle to a the System's initial [Arbiter].
pub fn worker(&self) -> &WorkerHandle { pub fn arbiter(&self) -> &ArbiterHandle {
&self.worker_handle &self.arbiter_handle
} }
/// Check if there is a System registered on the current thread. /// Check if there is a System registered on the current thread.
@ -179,17 +179,17 @@ impl SystemRunner {
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum SystemCommand { pub(crate) enum SystemCommand {
Exit(i32), Exit(i32),
RegisterWorker(usize, WorkerHandle), RegisterArbiter(usize, ArbiterHandle),
DeregisterWorker(usize), DeregisterArbiter(usize),
} }
/// There is one `SystemController` per [System]. It runs in the background, keeping track of /// There is one `SystemController` per [System]. It runs in the background, keeping track of
/// [Worker]s and is able to distribute a system-wide stop command. /// [Arbiter]s and is able to distribute a system-wide stop command.
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct SystemController { pub(crate) struct SystemController {
stop_tx: Option<oneshot::Sender<i32>>, stop_tx: Option<oneshot::Sender<i32>>,
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>, cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
workers: HashMap<usize, WorkerHandle>, arbiters: HashMap<usize, ArbiterHandle>,
} }
impl SystemController { impl SystemController {
@ -200,7 +200,7 @@ impl SystemController {
SystemController { SystemController {
cmd_rx, cmd_rx,
stop_tx: Some(stop_tx), stop_tx: Some(stop_tx),
workers: HashMap::with_capacity(4), arbiters: HashMap::with_capacity(4),
} }
} }
} }
@ -218,8 +218,8 @@ impl Future for SystemController {
// process system command // process system command
Some(cmd) => match cmd { Some(cmd) => match cmd {
SystemCommand::Exit(code) => { SystemCommand::Exit(code) => {
// stop workers // stop all arbiters
for wkr in self.workers.values() { for wkr in self.arbiters.values() {
wkr.stop(); wkr.stop();
} }
@ -230,12 +230,12 @@ impl Future for SystemController {
} }
} }
SystemCommand::RegisterWorker(name, hnd) => { SystemCommand::RegisterArbiter(name, hnd) => {
self.workers.insert(name, hnd); self.arbiters.insert(name, hnd);
} }
SystemCommand::DeregisterWorker(name) => { SystemCommand::DeregisterArbiter(name) => {
self.workers.remove(&name); self.arbiters.remove(&name);
} }
}, },
} }

View File

@ -4,7 +4,7 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use actix_rt::{System, Worker}; use actix_rt::{System, Arbiter};
use tokio::sync::oneshot; use tokio::sync::oneshot;
#[test] #[test]
@ -21,51 +21,51 @@ fn await_for_timer() {
} }
#[test] #[test]
fn join_another_worker() { fn join_another_arbiter() {
let time = Duration::from_secs(1); let time = Duration::from_secs(1);
let instant = Instant::now(); let instant = Instant::now();
System::new().block_on(async move { System::new().block_on(async move {
let worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn(Box::pin(async move { arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await; tokio::time::sleep(time).await;
Worker::handle().stop(); Arbiter::handle().stop();
})); }));
worker.join().unwrap(); arbiter.join().unwrap();
}); });
assert!( assert!(
instant.elapsed() >= time, instant.elapsed() >= time,
"Join on another worker should complete only when it calls stop" "Join on another arbiter should complete only when it calls stop"
); );
let instant = Instant::now(); let instant = Instant::now();
System::new().block_on(async move { System::new().block_on(async move {
let worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn_fn(move || { arbiter.spawn_fn(move || {
actix_rt::spawn(async move { actix_rt::spawn(async move {
tokio::time::sleep(time).await; tokio::time::sleep(time).await;
Worker::handle().stop(); Arbiter::handle().stop();
}); });
}); });
worker.join().unwrap(); arbiter.join().unwrap();
}); });
assert!( assert!(
instant.elapsed() >= time, instant.elapsed() >= time,
"Join on a worker that has used actix_rt::spawn should wait for said future" "Join on a arbiter that has used actix_rt::spawn should wait for said future"
); );
let instant = Instant::now(); let instant = Instant::now();
System::new().block_on(async move { System::new().block_on(async move {
let worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn(Box::pin(async move { arbiter.spawn(Box::pin(async move {
tokio::time::sleep(time).await; tokio::time::sleep(time).await;
Worker::handle().stop(); Arbiter::handle().stop();
})); }));
worker.stop(); arbiter.stop();
worker.join().unwrap(); arbiter.join().unwrap();
}); });
assert!( assert!(
instant.elapsed() < time, instant.elapsed() < time,
"Premature stop of worker should conclude regardless of it's current state" "Premature stop of arbiter should conclude regardless of it's current state"
); );
} }
@ -110,70 +110,70 @@ fn wait_for_spawns() {
} }
#[test] #[test]
fn worker_spawn_fn_runs() { fn arbiter_spawn_fn_runs() {
let _ = System::new(); let _ = System::new();
let (tx, rx) = channel::<u32>(); let (tx, rx) = channel::<u32>();
let worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn_fn(move || tx.send(42).unwrap()); arbiter.spawn_fn(move || tx.send(42).unwrap());
let num = rx.recv().unwrap(); let num = rx.recv().unwrap();
assert_eq!(num, 42); assert_eq!(num, 42);
worker.stop(); arbiter.stop();
worker.join().unwrap(); arbiter.join().unwrap();
} }
#[test] #[test]
fn worker_drop_no_panic_fn() { fn arbiter_drop_no_panic_fn() {
let _ = System::new(); let _ = System::new();
let worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn_fn(|| panic!("test")); arbiter.spawn_fn(|| panic!("test"));
worker.stop(); arbiter.stop();
worker.join().unwrap(); arbiter.join().unwrap();
} }
#[test] #[test]
fn worker_drop_no_panic_fut() { fn arbiter_drop_no_panic_fut() {
let _ = System::new(); let _ = System::new();
let worker = Worker::new(); let arbiter = Arbiter::new();
worker.spawn(async { panic!("test") }); arbiter.spawn(async { panic!("test") });
worker.stop(); arbiter.stop();
worker.join().unwrap(); arbiter.join().unwrap();
} }
#[test] #[test]
fn worker_item_storage() { fn arbiter_item_storage() {
let _ = System::new(); let _ = System::new();
let worker = Worker::new(); let arbiter = Arbiter::new();
assert!(!Worker::contains_item::<u32>()); assert!(!Arbiter::contains_item::<u32>());
Worker::set_item(42u32); Arbiter::set_item(42u32);
assert!(Worker::contains_item::<u32>()); assert!(Arbiter::contains_item::<u32>());
Worker::get_item(|&item: &u32| assert_eq!(item, 42)); Arbiter::get_item(|&item: &u32| assert_eq!(item, 42));
Worker::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42)); Arbiter::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42));
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
Worker::get_item(|&_item: &u32| unreachable!("u32 not in this thread")); Arbiter::get_item(|&_item: &u32| unreachable!("u32 not in this thread"));
}) })
.join(); .join();
assert!(thread.is_err()); assert!(thread.is_err());
let thread = thread::spawn(move || { let thread = thread::spawn(move || {
Worker::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread")); Arbiter::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread"));
}) })
.join(); .join();
assert!(thread.is_err()); assert!(thread.is_err());
worker.stop(); arbiter.stop();
worker.join().unwrap(); arbiter.join().unwrap();
} }
#[test] #[test]
@ -184,30 +184,30 @@ fn no_system_current_panic() {
#[test] #[test]
#[should_panic] #[should_panic]
fn no_system_worker_new_panic() { fn no_system_arbiter_new_panic() {
Worker::new(); Arbiter::new();
} }
#[test] #[test]
fn system_worker_spawn() { fn system_arbiter_spawn() {
let runner = System::new(); let runner = System::new();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let sys = System::current(); let sys = System::current();
thread::spawn(|| { thread::spawn(|| {
// this thread will have no worker in it's thread local so call will panic // this thread will have no arbiter in it's thread local so call will panic
Worker::handle(); Arbiter::handle();
}) })
.join() .join()
.unwrap_err(); .unwrap_err();
let thread = thread::spawn(|| { let thread = thread::spawn(|| {
// this thread will have no worker in it's thread local so use the system handle instead // this thread will have no arbiter in it's thread local so use the system handle instead
System::set_current(sys); System::set_current(sys);
let sys = System::current(); let sys = System::current();
let wrk = sys.worker(); let wrk = sys.arbiter();
wrk.spawn(async move { wrk.spawn(async move {
tx.send(42u32).unwrap(); tx.send(42u32).unwrap();
System::current().stop(); System::current().stop();

View File

@ -403,7 +403,7 @@ impl Accept {
// after the sleep a Timer interest is sent to Accept Poll // after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone(); let waker = self.waker.clone();
System::current().worker().spawn(async move { System::current().arbiter().spawn(async move {
sleep_until(Instant::now() + Duration::from_millis(510)).await; sleep_until(Instant::now() + Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer); waker.wake(WakerInterest::Timer);
}); });

View File

@ -6,7 +6,7 @@ use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use actix_rt::time::{sleep_until, Instant, Sleep}; use actix_rt::time::{sleep_until, Instant, Sleep};
use actix_rt::{spawn, Worker as Arbiter}; use actix_rt::{spawn, Arbiter as Arbiter};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use log::{error, info, trace}; use log::{error, info, trace};