diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index a22d3794..6b427363 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -2,24 +2,40 @@ ## Unreleased - 2021-xx-xx -* Rename `Arbiter => Worker`. [#254] + +## 2.0.0-beta.3 - 2021-01-31 * Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253] * Return `JoinHandle` from `actix_rt::spawn`. [#253] +<<<<<<< HEAD * Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] * Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253] * Remove `Worker::exec`. [#253] * Remove `System::arbiter`. [#256] * Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253] * `Worker::spawn` now accepts !Unpin futures. [#256] +======= +* Remove old `Arbiter::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] +* Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253] +* Remove `Arbiter::exec`. [#253] +* Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253] +* `Arbiter::spawn` now accepts !Unpin futures. [#256] +>>>>>>> master * `System::new` no longer takes arguments. [#257] * Remove `System::with_current`. [#257] * Remove `Builder`. [#257] * Add `System::with_init` as replacement for `Builder::run`. [#257] * Rename `System::{is_set => is_registered}`. [#257] +<<<<<<< HEAD * Add `WorkerHandle` for sending messages to non-current-thread workers. [#257]. * `System::worker` now returns a `&WorkerHandle`. [#257] * Rename `Worker::{current => handle}` and return a `WorkerHandle` instead. [#257] * `Worker::join` now takes self by value. [#257] +======= +* Add `ArbiterHandle` for sending messages to non-current-thread arbiters. [#257]. +* `System::arbiter` now returns an `&ArbiterHandle`. [#257] +* `Arbiter::current` now returns an `ArbiterHandle` instead. [#257] +* `Arbiter::join` now takes self by value. [#257] +>>>>>>> master [#253]: https://github.com/actix/actix-net/pull/253 [#254]: https://github.com/actix/actix-net/pull/254 diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 68ed8563..65877abd 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "actix-rt" -version = "2.0.0-beta.2" -authors = ["Nikolay Kim "] +version = "2.0.0-beta.3" +authors = [ + "Nikolay Kim ", + "Rob Ede ", +] description = "Tokio-based single-thread async runtime for the Actix ecosystem" keywords = ["network", "framework", "async", "futures"] homepage = "https://actix.rs" diff --git a/actix-rt/README.md b/actix-rt/README.md new file mode 100644 index 00000000..f58c2f4f --- /dev/null +++ b/actix-rt/README.md @@ -0,0 +1,7 @@ +# actix-rt + +> Tokio-based single-thread async runtime for the Actix ecosystem. + +See documentation for detailed explanations these components: [https://docs.rs/actix-rt][docs]. + +[docs]: https://docs.rs/actix-rt diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs new file mode 100644 index 00000000..81b5e409 --- /dev/null +++ b/actix-rt/src/arbiter.rs @@ -0,0 +1,317 @@ +use std::{ + any::{Any, TypeId}, + cell::RefCell, + collections::HashMap, + fmt, + future::Future, + pin::Pin, + sync::atomic::{AtomicUsize, Ordering}, + task::{Context, Poll}, + thread, +}; + +use futures_core::ready; +use tokio::{sync::mpsc, task::LocalSet}; + +use crate::{ + runtime::ActixRuntime, + system::{System, SystemCommand}, +}; + +pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); + +thread_local!( + static HANDLE: RefCell> = RefCell::new(None); + static STORAGE: RefCell>> = RefCell::new(HashMap::new()); +); + +pub(crate) enum ArbiterCommand { + Stop, + Execute(Pin + Send>>), +} + +impl fmt::Debug for ArbiterCommand { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"), + ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), + } + } +} + +/// A handle for sending spawn and stop messages to an [Arbiter]. +#[derive(Debug, Clone)] +pub struct ArbiterHandle { + tx: mpsc::UnboundedSender, +} + +impl ArbiterHandle { + pub(crate) fn new(tx: mpsc::UnboundedSender) -> Self { + Self { tx } + } + + /// Send a future to the [Arbiter]'s thread and spawn it. + /// + /// If you require a result, include a response channel in the future. + /// + /// Returns true if future was sent successfully and false if the [Arbiter] has died. + pub fn spawn(&self, future: Fut) -> bool + where + Fut: Future + Send + 'static, + { + self.tx + .send(ArbiterCommand::Execute(Box::pin(future))) + .is_ok() + } + + /// Send a function to the [Arbiter]'s thread and execute it. + /// + /// Any result from the function is discarded. If you require a result, include a response + /// channel in the function. + /// + /// Returns true if function was sent successfully and false if the [Arbiter] has died. + pub fn spawn_fn(&self, f: F) -> bool + where + F: FnOnce() + Send + 'static, + { + self.spawn(async { f() }) + } + + /// Instruct [Arbiter] to stop processing it's event loop. + /// + /// Returns true if stop message was sent successfully and false if the [Arbiter] has + /// been dropped. + pub fn stop(&self) -> bool { + self.tx.send(ArbiterCommand::Stop).is_ok() + } +} + +/// An Arbiter represents a thread that provides an asynchronous execution environment for futures +/// and functions. +/// +/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop. +#[derive(Debug)] +pub struct Arbiter { + tx: mpsc::UnboundedSender, + thread_handle: thread::JoinHandle<()>, +} + +impl Arbiter { + /// Spawn new Arbiter thread and start its event loop. + /// + /// # Panics + /// Panics if a [System] is not registered on the current thread. + #[allow(clippy::new_without_default)] + pub fn new() -> Arbiter { + Self::with_tokio_rt(|| { + ActixRuntime::new_tokio_rt().expect("Cannot create new Arbiter's Runtime.") + }) + } + + /// Create a new system with a closure that return a tokio Runtime instance + pub fn with_tokio_rt(f: F) -> Arbiter + where + F: Fn() -> tokio::runtime::Runtime + Send + 'static, + { + let id = COUNT.fetch_add(1, Ordering::Relaxed); + let system_id = System::current().id(); + let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id); + let sys = System::current(); + let (tx, rx) = mpsc::unbounded_channel(); + + let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); + + let thread_handle = thread::Builder::new() + .name(name.clone()) + .spawn({ + let tx = tx.clone(); + move || { + let tokio_rt = f(); + let rt = ActixRuntime::from(tokio_rt); + let hnd = ArbiterHandle::new(tx); + + System::set_current(sys); + + STORAGE.with(|cell| cell.borrow_mut().clear()); + HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); + + // register arbiter + let _ = System::current() + .tx() + .send(SystemCommand::RegisterArbiter(id, hnd)); + + ready_tx.send(()).unwrap(); + + // run arbiter event processing loop + rt.block_on(ArbiterRunner { rx }); + + // deregister arbiter + let _ = System::current() + .tx() + .send(SystemCommand::DeregisterArbiter(id)); + } + }) + .unwrap_or_else(|err| { + panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err) + }); + + ready_rx.recv().unwrap(); + + Arbiter { tx, thread_handle } + } + + /// Sets up an Arbiter runner in a new System using the provided runtime local task set. + pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle { + let (tx, rx) = mpsc::unbounded_channel(); + + let hnd = ArbiterHandle::new(tx); + + HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); + STORAGE.with(|cell| cell.borrow_mut().clear()); + + local.spawn_local(ArbiterRunner { rx }); + + hnd + } + + /// Return a handle to the current thread's Arbiter's message sender. + /// + /// # Panics + /// Panics if no Arbiter is running on the current thread. + pub fn current() -> ArbiterHandle { + HANDLE.with(|cell| match *cell.borrow() { + Some(ref addr) => addr.clone(), + None => panic!("Arbiter is not running."), + }) + } + + /// Stop Arbiter from continuing it's event loop. + /// + /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. + pub fn stop(&self) -> bool { + self.tx.send(ArbiterCommand::Stop).is_ok() + } + + /// Send a future to the Arbiter's thread and spawn it. + /// + /// If you require a result, include a response channel in the future. + /// + /// Returns true if future was sent successfully and false if the Arbiter has died. + pub fn spawn(&self, future: Fut) -> bool + where + Fut: Future + Send + 'static, + { + self.tx + .send(ArbiterCommand::Execute(Box::pin(future))) + .is_ok() + } + + /// Send a function to the Arbiter's thread and execute it. + /// + /// Any result from the function is discarded. If you require a result, include a response + /// channel in the function. + /// + /// Returns true if function was sent successfully and false if the Arbiter has died. + pub fn spawn_fn(&self, f: F) -> bool + where + F: FnOnce() + Send + 'static, + { + self.spawn(async { f() }) + } + + /// Wait for Arbiter's event loop to complete. + /// + /// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join). + pub fn join(self) -> thread::Result<()> { + self.thread_handle.join() + } + + /// Insert item into Arbiter's thread-local storage. + /// + /// Overwrites any item of the same type previously inserted. + #[deprecated = "Will be removed in stable v2."] + pub fn set_item(item: T) { + STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::(), Box::new(item))); + } + + /// Check if Arbiter's thread-local storage contains an item type. + #[deprecated = "Will be removed in stable v2."] + pub fn contains_item() -> bool { + STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::())) + } + + /// Call a function with a shared reference to an item in this Arbiter's thread-local storage. + /// + /// # Panics + /// Panics if item is not in Arbiter's thread-local item storage. + #[deprecated = "Will be removed in stable v2."] + pub fn get_item(mut f: F) -> R + where + F: FnMut(&T) -> R, + { + STORAGE.with(move |cell| { + let st = cell.borrow(); + + let type_id = TypeId::of::(); + let item = st.get(&type_id).and_then(downcast_ref).unwrap(); + + f(item) + }) + } + + /// Call a function with a mutable reference to an item in this Arbiter's thread-local storage. + /// + /// # Panics + /// Panics if item is not in Arbiter's thread-local item storage. + #[deprecated = "Will be removed in stable v2."] + pub fn get_mut_item(mut f: F) -> R + where + F: FnMut(&mut T) -> R, + { + STORAGE.with(move |cell| { + let mut st = cell.borrow_mut(); + + let type_id = TypeId::of::(); + let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap(); + + f(item) + }) + } +} + +/// A persistent future that processes [Arbiter] commands. +struct ArbiterRunner { + rx: mpsc::UnboundedReceiver, +} + +impl Future for ArbiterRunner { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // process all items currently buffered in channel + loop { + match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + // channel closed; no more messages can be received + None => return Poll::Ready(()), + + // process arbiter command + Some(item) => match item { + ArbiterCommand::Stop => { + return Poll::Ready(()); + } + ArbiterCommand::Execute(task_fut) => { + tokio::task::spawn_local(task_fut); + } + }, + } + } + } +} + +fn downcast_ref(boxed: &Box) -> Option<&T> { + boxed.downcast_ref() +} + +fn downcast_mut(boxed: &mut Box) -> Option<&mut T> { + boxed.downcast_mut() +} diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 40d31f3c..d649a344 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -1,4 +1,37 @@ //! Tokio-based single-thread async runtime for the Actix ecosystem. +//! +//! In most parts of the the Actix ecosystem, it has been chosen to use !Send futures. For this +//! reason, a single-threaded runtime is appropriate since it is guaranteed that futures will not +//! be moved between threads. This can result in small performance improvements over cases where +//! atomics would otherwise be needed. +//! +//! To achieve similar performance to multi-threaded, work-stealing runtimes, applications +//! using `actix-rt` will create multiple, mostly disconnected, single-threaded runtimes. +//! This approach has good performance characteristics for workloads where the majority of tasks +//! have similar runtime expense. +//! +//! The disadvantage is that idle threads will not steal work from very busy, stuck or otherwise +//! backlogged threads. Tasks that are disproportionately expensive should be offloaded to the +//! blocking thread-pool using [`task::spawn_blocking`]. +//! +//! # Examples +//! ``` +//! use std::sync::mpsc; +//! use actix_rt::{Arbiter, System}; +//! +//! let _ = System::new(); +//! +//! let (tx, rx) = mpsc::channel::(); +//! +//! let arbiter = Arbiter::new(); +//! arbiter.spawn_fn(move || tx.send(42).unwrap()); +//! +//! let num = rx.recv().unwrap(); +//! assert_eq!(num, 42); +//! +//! arbiter.stop(); +//! arbiter.join().unwrap(); +//! ``` #![deny(rust_2018_idioms, nonstandard_style)] #![allow(clippy::type_complexity)] @@ -15,13 +48,13 @@ use tokio::task::JoinHandle; #[cfg(all(feature = "macros", not(test)))] pub use actix_macros::{main, test}; +mod arbiter; mod runtime; mod system; -mod worker; -pub use self::runtime::Runtime; +pub use self::arbiter::{Arbiter, ArbiterHandle}; +pub use self::runtime::ActixRuntime; pub use self::system::{System, SystemRunner}; -pub use self::worker::{Worker, WorkerHandle}; pub mod signal { //! Asynchronous signal handling (Tokio re-exports). @@ -59,7 +92,7 @@ pub mod task { pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; } -/// Spawns a future on the current [worker](Worker). +/// Spawns a future on the current thread. /// /// # Panics /// Panics if Actix system is not running. diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index a20dfe7e..2ed41491 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -2,29 +2,40 @@ use std::{future::Future, io}; use tokio::task::{JoinHandle, LocalSet}; -/// A single-threaded runtime based on Tokio's "current thread" runtime. +/// A runtime based on Tokio runtime. /// /// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound /// on submitted futures. #[derive(Debug)] -pub struct Runtime { +pub struct ActixRuntime { local: LocalSet, rt: tokio::runtime::Runtime, } -impl Runtime { - /// Returns a new runtime initialized with default configuration values. +impl From for ActixRuntime { + fn from(rt: tokio::runtime::Runtime) -> Self { + Self { + local: LocalSet::new(), + rt, + } + } +} + +impl ActixRuntime { + /// Returns a new ActixRuntime instance. #[allow(clippy::new_ret_no_self)] - pub fn new() -> io::Result { - let rt = tokio::runtime::Builder::new_current_thread() + pub fn new() -> io::Result { + let rt = Self::new_tokio_rt()?; + Ok(ActixRuntime::from(rt)) + } + + /// Returns a new tokio current thread runtime initialized with default configuration values. + #[allow(clippy::new_ret_no_self)] + pub fn new_tokio_rt() -> io::Result { + tokio::runtime::Builder::new_current_thread() .enable_io() .enable_time() - .build()?; - - Ok(Runtime { - rt, - local: LocalSet::new(), - }) + .build() } /// Reference to local task set. @@ -40,7 +51,7 @@ impl Runtime { /// /// # Examples /// ``` - /// let rt = actix_rt::Runtime::new().unwrap(); + /// let rt = actix_rt::ActixRuntime::new().unwrap(); /// /// // Spawn a future onto the runtime /// let handle = rt.spawn(async { diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index d26fa1d7..84fca3d5 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -11,7 +11,7 @@ use std::{ use futures_core::ready; use tokio::sync::{mpsc, oneshot}; -use crate::{worker::WorkerHandle, Runtime, Worker}; +use crate::{arbiter::ArbiterHandle, ActixRuntime, Arbiter}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -25,8 +25,8 @@ pub struct System { id: usize, sys_tx: mpsc::UnboundedSender, - /// First worker that is created as part of the System. - worker_handle: WorkerHandle, + /// Handle to the first [Arbiter] that is created with the System. + arbiter_handle: ArbiterHandle, } impl System { @@ -36,49 +36,34 @@ impl System { /// Panics if underlying Tokio runtime can not be created. #[allow(clippy::new_ret_no_self)] pub fn new() -> SystemRunner { - Self::create_runtime(async {}) + Self::with_tokio_rt(|| { + ActixRuntime::new_tokio_rt().expect("Cannot create new System's Runtime.") + }) } - /// Create a new system with given initialization future. - /// - /// The initialization future be run to completion (blocking current thread) before the system - /// runner is returned. - /// - /// # Panics - /// Panics if underlying Tokio runtime can not be created. - pub fn with_init(init_fut: impl Future) -> SystemRunner { - Self::create_runtime(init_fut) - } - - /// Constructs new system and registers it on the current thread. - pub(crate) fn construct( - sys_tx: mpsc::UnboundedSender, - worker: WorkerHandle, - ) -> Self { - let sys = System { - sys_tx, - worker_handle: worker, - id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), - }; - - System::set_current(sys.clone()); - - sys - } - - fn create_runtime(init_fut: impl Future) -> SystemRunner { + /// Create a new system with a closure that return a tokio Runtime instance + pub fn with_tokio_rt(f: F) -> SystemRunner + where + F: Fn() -> tokio::runtime::Runtime, + { let (stop_tx, stop_rx) = oneshot::channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel(); - let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created."); - let system = System::construct(sys_tx, Worker::new_current_thread(rt.local_set())); + let tokio_rt = f(); + let rt = ActixRuntime::from(tokio_rt); - // init background system worker - let sys_worker = SystemController::new(sys_rx, stop_tx); - rt.spawn(sys_worker); + let sys_arbiter = Arbiter::in_new_system(rt.local_set()); + let system = System::construct(sys_tx, sys_arbiter.clone()); - // run system init future - rt.block_on(init_fut); + system + .tx() + .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter)) + .unwrap(); + + // init background system arbiter + let sys_ctrl = SystemController::new(sys_rx, stop_tx); + + rt.spawn(sys_ctrl); SystemRunner { rt, @@ -87,6 +72,22 @@ impl System { } } + /// Constructs new system and registers it on the current thread. + pub(crate) fn construct( + sys_tx: mpsc::UnboundedSender, + arbiter_handle: ArbiterHandle, + ) -> Self { + let sys = System { + sys_tx, + arbiter_handle, + id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), + }; + + System::set_current(sys.clone()); + + sys + } + /// Get current running system. /// /// # Panics @@ -98,9 +99,9 @@ impl System { }) } - /// Get handle to a the System's initial [Worker]. - pub fn worker(&self) -> &WorkerHandle { - &self.worker_handle + /// Get handle to a the System's initial [Arbiter]. + pub fn arbiter(&self) -> &ArbiterHandle { + &self.arbiter_handle } /// Check if there is a System registered on the current thread. @@ -142,7 +143,7 @@ impl System { #[must_use = "A SystemRunner does nothing unless `run` is called."] #[derive(Debug)] pub struct SystemRunner { - rt: Runtime, + rt: ActixRuntime, stop_rx: oneshot::Receiver, system: System, } @@ -179,17 +180,17 @@ impl SystemRunner { #[derive(Debug)] pub(crate) enum SystemCommand { Exit(i32), - RegisterWorker(usize, WorkerHandle), - DeregisterWorker(usize), + RegisterArbiter(usize, ArbiterHandle), + DeregisterArbiter(usize), } /// There is one `SystemController` per [System]. It runs in the background, keeping track of -/// [Worker]s and is able to distribute a system-wide stop command. +/// [Arbiter]s and is able to distribute a system-wide stop command. #[derive(Debug)] pub(crate) struct SystemController { stop_tx: Option>, cmd_rx: mpsc::UnboundedReceiver, - workers: HashMap, + arbiters: HashMap, } impl SystemController { @@ -200,7 +201,7 @@ impl SystemController { SystemController { cmd_rx, stop_tx: Some(stop_tx), - workers: HashMap::with_capacity(4), + arbiters: HashMap::with_capacity(4), } } } @@ -218,9 +219,9 @@ impl Future for SystemController { // process system command Some(cmd) => match cmd { SystemCommand::Exit(code) => { - // stop workers - for wkr in self.workers.values() { - wkr.stop(); + // stop all arbiters + for arb in self.arbiters.values() { + arb.stop(); } // stop event loop @@ -230,12 +231,12 @@ impl Future for SystemController { } } - SystemCommand::RegisterWorker(name, hnd) => { - self.workers.insert(name, hnd); + SystemCommand::RegisterArbiter(id, arb) => { + self.arbiters.insert(id, arb); } - SystemCommand::DeregisterWorker(name) => { - self.workers.remove(&name); + SystemCommand::DeregisterArbiter(id) => { + self.arbiters.remove(&id); } }, } diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 7cd6619e..de777628 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -4,7 +4,7 @@ use std::{ time::{Duration, Instant}, }; -use actix_rt::{System, Worker}; +use actix_rt::{Arbiter, System}; use tokio::sync::oneshot; #[test] @@ -21,84 +21,77 @@ fn await_for_timer() { } #[test] -fn join_another_worker() { +fn join_another_arbiter() { let time = Duration::from_secs(1); let instant = Instant::now(); System::new().block_on(async move { - let worker = Worker::new(); - worker.spawn(Box::pin(async move { + let arbiter = Arbiter::new(); + arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; - Worker::handle().stop(); + Arbiter::current().stop(); })); - worker.join().unwrap(); + arbiter.join().unwrap(); }); assert!( instant.elapsed() >= time, - "Join on another worker should complete only when it calls stop" + "Join on another arbiter should complete only when it calls stop" ); let instant = Instant::now(); System::new().block_on(async move { - let worker = Worker::new(); - worker.spawn_fn(move || { + let arbiter = Arbiter::new(); + arbiter.spawn_fn(move || { actix_rt::spawn(async move { tokio::time::sleep(time).await; - Worker::handle().stop(); + Arbiter::current().stop(); }); }); - worker.join().unwrap(); + arbiter.join().unwrap(); }); assert!( instant.elapsed() >= time, - "Join on a worker that has used actix_rt::spawn should wait for said future" + "Join on an arbiter that has used actix_rt::spawn should wait for said future" ); let instant = Instant::now(); System::new().block_on(async move { - let worker = Worker::new(); - worker.spawn(Box::pin(async move { + let arbiter = Arbiter::new(); + arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; - Worker::handle().stop(); + Arbiter::current().stop(); })); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); }); assert!( instant.elapsed() < time, - "Premature stop of worker should conclude regardless of it's current state" + "Premature stop of arbiter should conclude regardless of it's current state" ); } #[test] fn non_static_block_on() { let string = String::from("test_str"); - let str = string.as_str(); + let string = string.as_str(); let sys = System::new(); sys.block_on(async { actix_rt::time::sleep(Duration::from_millis(1)).await; - assert_eq!("test_str", str); + assert_eq!("test_str", string); }); - let rt = actix_rt::Runtime::new().unwrap(); + let rt = actix_rt::ActixRuntime::new().unwrap(); rt.block_on(async { actix_rt::time::sleep(Duration::from_millis(1)).await; - assert_eq!("test_str", str); + assert_eq!("test_str", string); }); - - System::with_init(async { - assert_eq!("test_str", str); - System::current().stop(); - }) - .run() - .unwrap(); } #[test] fn wait_for_spawns() { - let rt = actix_rt::Runtime::new().unwrap(); + let rt = actix_rt::ActixRuntime::new().unwrap(); let handle = rt.spawn(async { println!("running on the runtime"); @@ -110,70 +103,71 @@ fn wait_for_spawns() { } #[test] -fn worker_spawn_fn_runs() { +fn arbiter_spawn_fn_runs() { let _ = System::new(); let (tx, rx) = channel::(); - let worker = Worker::new(); - worker.spawn_fn(move || tx.send(42).unwrap()); + let arbiter = Arbiter::new(); + arbiter.spawn_fn(move || tx.send(42).unwrap()); let num = rx.recv().unwrap(); assert_eq!(num, 42); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] -fn worker_drop_no_panic_fn() { +fn arbiter_drop_no_panic_fn() { let _ = System::new(); - let worker = Worker::new(); - worker.spawn_fn(|| panic!("test")); + let arbiter = Arbiter::new(); + arbiter.spawn_fn(|| panic!("test")); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] -fn worker_drop_no_panic_fut() { +fn arbiter_drop_no_panic_fut() { let _ = System::new(); - let worker = Worker::new(); - worker.spawn(async { panic!("test") }); + let arbiter = Arbiter::new(); + arbiter.spawn(async { panic!("test") }); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] -fn worker_item_storage() { +#[allow(deprecated)] +fn arbiter_item_storage() { let _ = System::new(); - let worker = Worker::new(); + let arbiter = Arbiter::new(); - assert!(!Worker::contains_item::()); - Worker::set_item(42u32); - assert!(Worker::contains_item::()); + assert!(!Arbiter::contains_item::()); + Arbiter::set_item(42u32); + assert!(Arbiter::contains_item::()); - Worker::get_item(|&item: &u32| assert_eq!(item, 42)); - Worker::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42)); + Arbiter::get_item(|&item: &u32| assert_eq!(item, 42)); + Arbiter::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42)); let thread = thread::spawn(move || { - Worker::get_item(|&_item: &u32| unreachable!("u32 not in this thread")); + Arbiter::get_item(|&_item: &u32| unreachable!("u32 not in this thread")); }) .join(); assert!(thread.is_err()); let thread = thread::spawn(move || { - Worker::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread")); + Arbiter::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread")); }) .join(); assert!(thread.is_err()); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] @@ -184,31 +178,31 @@ fn no_system_current_panic() { #[test] #[should_panic] -fn no_system_worker_new_panic() { - Worker::new(); +fn no_system_arbiter_new_panic() { + Arbiter::new(); } #[test] -fn system_worker_spawn() { +fn system_arbiter_spawn() { let runner = System::new(); let (tx, rx) = oneshot::channel(); let sys = System::current(); thread::spawn(|| { - // this thread will have no worker in it's thread local so call will panic - Worker::handle(); + // this thread will have no arbiter in it's thread local so call will panic + Arbiter::current(); }) .join() .unwrap_err(); let thread = thread::spawn(|| { - // this thread will have no worker in it's thread local so use the system handle instead + // this thread will have no arbiter in it's thread local so use the system handle instead System::set_current(sys); let sys = System::current(); - let wrk = sys.worker(); - wrk.spawn(async move { + let arb = sys.arbiter(); + arb.spawn(async move { tx.send(42u32).unwrap(); System::current().stop(); }); @@ -217,3 +211,73 @@ fn system_worker_spawn() { assert_eq!(runner.block_on(rx).unwrap(), 42); thread.join().unwrap(); } + +#[test] +fn system_stop_stops_arbiters() { + let sys = System::new(); + let arb = Arbiter::new(); + + // arbiter should be alive to receive spawn msg + assert!(Arbiter::current().spawn_fn(|| {})); + assert!(arb.spawn_fn(|| {})); + + System::current().stop(); + sys.run().unwrap(); + + // arbiter should be dead and return false + assert!(!Arbiter::current().spawn_fn(|| {})); + assert!(!arb.spawn_fn(|| {})); + + arb.join().unwrap(); +} + +#[test] +fn new_system_with_tokio() { + let res = System::with_tokio_rt(move || { + tokio::runtime::Builder::new_multi_thread() + .enable_io() + .enable_time() + .thread_keep_alive(std::time::Duration::from_millis(1000)) + .worker_threads(2) + .max_blocking_threads(2) + .on_thread_start(|| {}) + .on_thread_stop(|| {}) + .build() + .unwrap() + }) + .block_on(async { + actix_rt::time::sleep(std::time::Duration::from_millis(1)).await; + 123usize + }); + + assert_eq!(res, 123); +} + +#[test] +fn new_arbiter_with_tokio() { + let _ = System::new(); + + let arb = Arbiter::with_tokio_rt(|| { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + }); + + let counter = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(true)); + + let counter1 = counter.clone(); + let did_spawn = arb.spawn(async move { + actix_rt::time::sleep(std::time::Duration::from_millis(1)).await; + counter1.store(false, std::sync::atomic::Ordering::SeqCst); + Arbiter::current().stop(); + }); + + assert!(did_spawn); + + arb.join().unwrap(); + + std::thread::sleep(std::time::Duration::from_millis(100)); + + assert_eq!(false, counter.load(std::sync::atomic::Ordering::SeqCst)); +} diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 4665612b..387e11b4 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -180,7 +180,7 @@ impl Worker { state: WorkerState::Unavailable, }); - actix_rt::Runtime::new().unwrap().block_on(async move { + actix_rt::ActixRuntime::new().unwrap().block_on(async move { let fut = wrk .factories .iter()