From ce627bf93286782951617a79315efd3b53723612 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 29 Jan 2021 20:33:04 +0000 Subject: [PATCH] remove builder and introduce worker handle --- actix-macros/src/lib.rs | 11 +- actix-rt/src/builder.rs | 114 ------------- actix-rt/src/lib.rs | 8 +- actix-rt/src/runtime.rs | 7 +- actix-rt/src/system.rs | 240 +++++++++++++++++---------- actix-rt/src/worker.rs | 278 +++++++++++++++++++------------- actix-rt/tests/tests.rs | 94 +++++++---- actix-server/src/test_server.rs | 4 +- actix-server/src/worker.rs | 6 +- 9 files changed, 392 insertions(+), 370 deletions(-) delete mode 100644 actix-rt/src/builder.rs diff --git a/actix-macros/src/lib.rs b/actix-macros/src/lib.rs index 60f177fa..54b89565 100644 --- a/actix-macros/src/lib.rs +++ b/actix-macros/src/lib.rs @@ -7,7 +7,7 @@ use proc_macro::TokenStream; use quote::quote; -/// Marks async function to be executed by actix system. +/// Marks async function to be executed by Actix system. /// /// ## Usage /// @@ -26,7 +26,6 @@ pub fn main(_: TokenStream, item: TokenStream) -> TokenStream { let vis = &input.vis; let sig = &mut input.sig; let body = &input.block; - let name = &sig.ident; if sig.asyncness.is_none() { return syn::Error::new_spanned(sig.fn_token, "only async fn is supported") @@ -39,14 +38,14 @@ pub fn main(_: TokenStream, item: TokenStream) -> TokenStream { (quote! { #(#attrs)* #vis #sig { - actix_rt::System::new(stringify!(#name)) + actix_rt::System::new() .block_on(async move { #body }) } }) .into() } -/// Marks async test function to be executed by actix runtime. +/// Marks async test function to be executed by Actix system. /// /// ## Usage /// @@ -86,7 +85,7 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream { quote! { #(#attrs)* #vis #sig { - actix_rt::System::new("test") + actix_rt::System::new() .block_on(async { #body }) } } @@ -95,7 +94,7 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream { #[test] #(#attrs)* #vis #sig { - actix_rt::System::new("test") + actix_rt::System::new() .block_on(async { #body }) } } diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs deleted file mode 100644 index f9a3fca2..00000000 --- a/actix-rt/src/builder.rs +++ /dev/null @@ -1,114 +0,0 @@ -use std::{borrow::Cow, future::Future, io}; - -use tokio::sync::{ - mpsc::unbounded_channel, - oneshot::{channel, Receiver}, -}; - -use crate::{ - runtime::Runtime, - system::{System, SystemWorker}, - worker::Worker, -}; - -/// System builder. -/// -/// Either use `Builder::build` to create a system and start actors. Alternatively, use -/// `Builder::run` to start the Tokio runtime and run a function in its context. -pub struct Builder { - /// Name of the System. Defaults to "actix-rt" if unset. - name: Cow<'static, str>, -} - -impl Builder { - pub(crate) fn new() -> Self { - Builder { - name: Cow::Borrowed("actix-rt"), - } - } - - /// Sets the name of the System. - pub fn name(mut self, name: impl Into>) -> Self { - self.name = name.into(); - self - } - - /// Create new System. - /// - /// This method panics if it can not create Tokio runtime - pub fn build(self) -> SystemRunner { - self.create_runtime(|| {}) - } - - /// This function will start Tokio runtime and will finish once the `System::stop()` message - /// is called. Function `f` is called within Tokio runtime context. - pub fn run(self, init_fn: F) -> io::Result<()> - where - F: FnOnce(), - { - self.create_runtime(init_fn).run() - } - - fn create_runtime(self, init_fn: F) -> SystemRunner - where - F: FnOnce(), - { - let (stop_tx, stop_rx) = channel(); - let (sys_sender, sys_receiver) = unbounded_channel(); - - let rt = Runtime::new().unwrap(); - - let system = System::construct(sys_sender, Worker::new_system(rt.local())); - - // init system worker - let sys_worker = SystemWorker::new(sys_receiver, stop_tx); - rt.spawn(sys_worker); - - // run system init method - rt.block_on(async { init_fn() }); - - SystemRunner { - rt, - stop_rx, - system, - } - } -} - -/// System runner object that keeps event loop alive and running until stop message is received. -#[must_use = "A SystemRunner does nothing unless `run` is called."] -#[derive(Debug)] -pub struct SystemRunner { - rt: Runtime, - stop_rx: Receiver, - system: System, -} - -impl SystemRunner { - /// Starts event loop and will finish once [`System::stop()`] is called. - pub fn run(self) -> io::Result<()> { - let SystemRunner { rt, stop_rx, .. } = self; - - // run loop - match rt.block_on(stop_rx) { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - } - } - - /// Runs the provided future, blocking the current thread until the future completes. - #[inline] - pub fn block_on(&self, fut: F) -> F::Output { - self.rt.block_on(fut) - } -} diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index b0303d6c..40d31f3c 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -15,15 +15,13 @@ use tokio::task::JoinHandle; #[cfg(all(feature = "macros", not(test)))] pub use actix_macros::{main, test}; -mod builder; mod runtime; mod system; mod worker; -pub use self::builder::{Builder, SystemRunner}; pub use self::runtime::Runtime; -pub use self::system::System; -pub use self::worker::Worker; +pub use self::system::{System, SystemRunner}; +pub use self::worker::{Worker, WorkerHandle}; pub mod signal { //! Asynchronous signal handling (Tokio re-exports). @@ -61,7 +59,7 @@ pub mod task { pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; } -/// Spawns a future on the current [Worker]. +/// Spawns a future on the current [worker](Worker). /// /// # Panics /// Panics if Actix system is not running. diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index c7f611ed..a20dfe7e 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -2,9 +2,10 @@ use std::{future::Future, io}; use tokio::task::{JoinHandle, LocalSet}; -/// Single-threaded runtime provides a way to start reactor and runtime on the current thread. +/// A single-threaded runtime based on Tokio's "current thread" runtime. /// -/// See [crate root][crate] documentation for more details. +/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound +/// on submitted futures. #[derive(Debug)] pub struct Runtime { local: LocalSet, @@ -27,7 +28,7 @@ impl Runtime { } /// Reference to local task set. - pub(crate) fn local(&self) -> &LocalSet { + pub(crate) fn local_set(&self) -> &LocalSet { &self.local } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 0182136e..8a4cb365 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -1,5 +1,4 @@ use std::{ - borrow::Cow, cell::RefCell, collections::HashMap, future::Future, @@ -12,55 +11,92 @@ use std::{ use futures_core::ready; use tokio::sync::{mpsc, oneshot}; -use crate::{ - builder::{Builder, SystemRunner}, - worker::Worker, -}; +use crate::{worker::WorkerHandle, Runtime, Worker}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); -/// System is a runtime manager. -#[derive(Clone, Debug)] -pub struct System { - id: usize, - sys_tx: mpsc::UnboundedSender, - // TODO: which worker is this exactly - worker: Worker, -} - thread_local!( static CURRENT: RefCell> = RefCell::new(None); ); +/// A manager for a per-thread distributed async runtime. +#[derive(Clone, Debug)] +pub struct System { + id: usize, + sys_tx: mpsc::UnboundedSender, + + /// First worker that is created as part of the System. + worker_handle: WorkerHandle, +} + +impl Drop for System { + fn drop(&mut self) { + eprintln!("dropping System") + } +} + impl System { - /// Constructs new system and sets it as current. - pub(crate) fn construct( - sys_tx: mpsc::UnboundedSender, - worker: Worker, - ) -> Self { - let sys = System { - sys_tx, - worker, - id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), - }; - System::set_current(sys.clone()); - sys - } - - /// Build a new system with a customized Tokio runtime. - /// - /// This allows to customize the runtime. See [`Builder`] for more information. - pub fn builder() -> Builder { - Builder::new() - } - - /// Create new system. + /// Create a new system. /// /// # Panics /// Panics if underlying Tokio runtime can not be created. #[allow(clippy::new_ret_no_self)] - pub fn new(name: impl Into>) -> SystemRunner { - Self::builder().name(name).build() + pub fn new() -> SystemRunner { + eprintln!("System::new"); + Self::create_runtime(async {}) + } + + /// Create a new system with given initialization future. + /// + /// The initialization future be run to completion (blocking current thread) before the system + /// runner is returned. + /// + /// # Panics + /// Panics if underlying Tokio runtime can not be created. + pub fn with_init(init_fut: impl Future) -> SystemRunner { + Self::create_runtime(init_fut) + } + + /// Constructs new system and registers it on the current thread. + pub(crate) fn construct( + sys_tx: mpsc::UnboundedSender, + worker: WorkerHandle, + ) -> Self { + let sys = System { + sys_tx, + worker_handle: worker, + id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), + }; + + System::set_current(sys.clone()); + + sys + } + + fn create_runtime(init_fut: impl Future) -> SystemRunner { + let (stop_tx, stop_rx) = oneshot::channel(); + let (sys_tx, sys_rx) = mpsc::unbounded_channel(); + + eprintln!("creating runtime"); + let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created."); + eprintln!("creating worker and system"); + let system = System::construct(sys_tx, Worker::new_current_thread(rt.local_set())); + + eprintln!("creating system controller"); + // init background system worker + let sys_worker = SystemController::new(sys_rx, stop_tx); + rt.spawn(sys_worker); + + eprintln!("running init future"); + // run system init future + rt.block_on(init_fut); + + eprintln!("done; here's your system runner"); + SystemRunner { + rt, + stop_rx, + system, + } } /// Get current running system. @@ -68,37 +104,34 @@ impl System { /// # Panics /// Panics if no system is registered on the current thread. pub fn current() -> System { + eprintln!("gib current system"); CURRENT.with(|cell| match *cell.borrow() { Some(ref sys) => sys.clone(), None => panic!("System is not running"), }) } - /// Check if current system has started. - pub fn is_set() -> bool { - CURRENT.with(|cell| cell.borrow().is_some()) + /// Get handle to a the System's initial [Worker]. + pub fn worker(&self) -> &WorkerHandle { + &self.worker_handle } - /// Set current running system. + /// Check if there is a System registered on the current thread. + pub fn is_registered() -> bool { + CURRENT.with(|sys| sys.borrow().is_some()) + } + + /// Register given system on current thread. #[doc(hidden)] pub fn set_current(sys: System) { - CURRENT.with(|s| { - *s.borrow_mut() = Some(sys); + CURRENT.with(|cell| { + *cell.borrow_mut() = Some(sys); }) } - /// Execute function with system reference. - pub fn with_current(f: F) -> R - where - F: FnOnce(&System) -> R, - { - CURRENT.with(|cell| match *cell.borrow() { - Some(ref sys) => f(sys), - None => panic!("System is not running"), - }) - } - - /// Numeric system ID. + /// Numeric system identifier. + /// + /// Useful when using multiple Systems. pub fn id(&self) -> usize { self.id } @@ -108,7 +141,7 @@ impl System { self.stop_with_code(0) } - /// Stop the system with a particular exit code. + /// Stop the system with a given exit code. pub fn stop_with_code(&self, code: i32) { let _ = self.sys_tx.send(SystemCommand::Exit(code)); } @@ -116,57 +149,89 @@ impl System { pub(crate) fn tx(&self) -> &mpsc::UnboundedSender { &self.sys_tx } +} - // TODO: give clarity on which worker this is; previous documented as returning "system worker" - /// Get shared reference to a worker. - pub fn worker(&self) -> &Worker { - &self.worker +/// Runner that keeps a [System]'s event loop alive until stop message is received. +#[must_use = "A SystemRunner does nothing unless `run` is called."] +#[derive(Debug)] +pub struct SystemRunner { + rt: Runtime, + stop_rx: oneshot::Receiver, + system: System, +} + +impl SystemRunner { + /// Starts event loop and will return once [System] is [stopped](System::stop). + pub fn run(self) -> io::Result<()> { + eprintln!("SystemRunner: run"); + + let SystemRunner { rt, stop_rx, .. } = self; + + // run loop + match rt.block_on(stop_rx) { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) + } + } + + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + } } - /// This function will start Tokio runtime and will finish once the `System::stop()` message - /// is called. Function `f` is called within Tokio runtime context. - pub fn run(f: F) -> io::Result<()> - where - F: FnOnce(), - { - Self::builder().run(f) + /// Runs the provided future, blocking the current thread until the future completes. + #[inline] + pub fn block_on(&self, fut: F) -> F::Output { + self.rt.block_on(fut) } } #[derive(Debug)] pub(crate) enum SystemCommand { Exit(i32), - RegisterArbiter(usize, Worker), - DeregisterArbiter(usize), + RegisterWorker(usize, WorkerHandle), + DeregisterWorker(usize), } +/// There is one `SystemController` per [System]. It runs in the background, keeping track of +/// [Worker]s and is able to distribute a system-wide stop command. #[derive(Debug)] -pub(crate) struct SystemWorker { - stop: Option>, - commands: mpsc::UnboundedReceiver, - workers: HashMap, +pub(crate) struct SystemController { + stop_tx: Option>, + cmd_rx: mpsc::UnboundedReceiver, + workers: HashMap, } -impl SystemWorker { +impl SystemController { pub(crate) fn new( - commands: mpsc::UnboundedReceiver, - stop: oneshot::Sender, + cmd_rx: mpsc::UnboundedReceiver, + stop_tx: oneshot::Sender, ) -> Self { - SystemWorker { - commands, - stop: Some(stop), - workers: HashMap::new(), + SystemController { + cmd_rx, + stop_tx: Some(stop_tx), + workers: HashMap::with_capacity(4), } } } +impl Drop for SystemController { + fn drop(&mut self) { + eprintln!("dropping SystemController") + } +} -impl Future for SystemWorker { +impl Future for SystemController { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // process all items currently buffered in channel loop { - match ready!(Pin::new(&mut self.commands).poll_recv(cx)) { + match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) { // channel closed; no more messages can be received None => return Poll::Ready(()), @@ -179,16 +244,17 @@ impl Future for SystemWorker { } // stop event loop - if let Some(stop) = self.stop.take() { - let _ = stop.send(code); + // will only fire once + if let Some(stop_tx) = self.stop_tx.take() { + let _ = stop_tx.send(code); } } - SystemCommand::RegisterArbiter(name, hnd) => { + SystemCommand::RegisterWorker(name, hnd) => { self.workers.insert(name, hnd); } - SystemCommand::DeregisterArbiter(name) => { + SystemCommand::DeregisterWorker(name) => { self.workers.remove(&name); } }, diff --git a/actix-rt/src/worker.rs b/actix-rt/src/worker.rs index adda3cff..b1f4a713 100644 --- a/actix-rt/src/worker.rs +++ b/actix-rt/src/worker.rs @@ -21,7 +21,7 @@ use crate::{ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); thread_local!( - static ADDR: RefCell> = RefCell::new(None); + static HANDLE: RefCell> = RefCell::new(None); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -33,107 +33,35 @@ pub(crate) enum WorkerCommand { impl fmt::Debug for WorkerCommand { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - WorkerCommand::Stop => write!(f, "ArbiterCommand::Stop"), - WorkerCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), + WorkerCommand::Stop => write!(f, "WorkerCommand::Stop"), + WorkerCommand::Execute(_) => write!(f, "WorkerCommand::Execute"), } } } -/// A worker represent a thread that provides an asynchronous execution environment for futures -/// and functions. -/// -/// When a Worker is created, it spawns a new [OS thread](thread), and hosts an event loop. -/// Some Arbiter functions execute on the current thread. -#[derive(Debug)] -pub struct Worker { +/// A handle for sending spawn and stop messages to a [Worker]. +#[derive(Debug, Clone)] +pub struct WorkerHandle { sender: mpsc::UnboundedSender, - thread_handle: Option>, } -impl Clone for Worker { - fn clone(&self) -> Self { - Self::new_handle(self.sender.clone()) +impl Drop for WorkerHandle { + fn drop(&mut self) { + eprintln!("dropping WorkerHandle") } } -impl Default for Worker { - fn default() -> Self { - Self::new() - } -} - -impl Worker { - /// Spawn new thread and run event loop in spawned thread. - /// - /// Returns handle of newly created worker. - /// - /// # Panics - /// Panics if a [System] not registered on the current thread. - pub fn new() -> Worker { - let id = COUNT.fetch_add(1, Ordering::Relaxed); - let name = format!("actix-rt:worker:{}", id); - let sys = System::current(); - let (tx, rx) = mpsc::unbounded_channel(); - - let handle = thread::Builder::new() - .name(name.clone()) - .spawn({ - let tx = tx.clone(); - move || { - let rt = Runtime::new().expect("Can not create Runtime"); - let arb = Worker::new_handle(tx); - - STORAGE.with(|cell| cell.borrow_mut().clear()); - - System::set_current(sys); - - ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - - // register worker - let _ = System::current() - .tx() - .send(SystemCommand::RegisterArbiter(id, arb)); - - // run worker event processing loop - rt.block_on(WorkerRunner { rx }); - - // deregister worker - let _ = System::current() - .tx() - .send(SystemCommand::DeregisterArbiter(id)); - } - }) - .unwrap_or_else(|err| { - panic!("Cannot spawn a Worker's thread {:?}: {:?}", &name, err) - }); - - Worker { - sender: tx, - thread_handle: Some(handle), - } +impl WorkerHandle { + pub(crate) fn new(sender: mpsc::UnboundedSender) -> Self { + eprintln!("WorkerHandle::new"); + Self { sender } } - /// Returns the current Worker's handle. - /// - /// # Panics - /// Panics if no Worker is running on the current thread. - pub fn current() -> Worker { - ADDR.with(|cell| match *cell.borrow() { - Some(ref addr) => addr.clone(), - None => panic!("Worker is not running."), - }) - } - - /// Stop worker from continuing it's event loop. - pub fn stop(&self) { - let _ = self.sender.send(WorkerCommand::Stop); - } - - /// Send a future to the Arbiter's thread and spawn it. + /// Send a future to the Worker's thread and spawn it. /// /// If you require a result, include a response channel in the future. /// - /// Returns true if future was sent successfully and false if the Arbiter has died. + /// Returns true if future was sent successfully and false if the Worker has died. pub fn spawn(&self, future: Fut) -> bool where Fut: Future + Send + 'static, @@ -143,12 +71,139 @@ impl Worker { .is_ok() } - /// Send a function to the Arbiter's thread and execute it. + /// Send a function to the Worker's thread and execute it. /// /// Any result from the function is discarded. If you require a result, include a response /// channel in the function. /// - /// Returns true if function was sent successfully and false if the Arbiter has died. + /// Returns true if function was sent successfully and false if the Worker has died. + pub fn spawn_fn(&self, f: F) -> bool + where + F: FnOnce() + Send + 'static, + { + self.spawn(async { f() }) + } + + /// Instruct worker to stop processing it's event loop. + /// + /// Returns true if stop message was sent successfully and false if the Worker has been dropped. + pub fn stop(&self) -> bool { + self.sender.send(WorkerCommand::Stop).is_ok() + } +} + +/// A worker represent a thread that provides an asynchronous execution environment for futures +/// and functions. +/// +/// When a Worker is created, it spawns a new [OS thread](thread), and hosts an event loop. +#[derive(Debug)] +pub struct Worker { + sender: mpsc::UnboundedSender, + thread_handle: thread::JoinHandle<()>, +} + +impl Worker { + /// Spawn new thread and run event loop in spawned thread. + /// + /// Returns handle of newly created worker. + /// + /// # Panics + /// Panics if a [System] is not registered on the current thread. + pub fn new() -> Worker { + let id = COUNT.fetch_add(1, Ordering::Relaxed); + let name = format!("actix-rt:worker:{}", id); + let sys = System::current(); + let (tx, rx) = mpsc::unbounded_channel(); + + let thread_handle = thread::Builder::new() + .name(name.clone()) + .spawn({ + let tx = tx.clone(); + move || { + let rt = Runtime::new().expect("Can not create Runtime"); + let hnd = WorkerHandle::new(tx); + + System::set_current(sys); + + STORAGE.with(|cell| cell.borrow_mut().clear()); + HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); + + // register worker + let _ = System::current() + .tx() + .send(SystemCommand::RegisterWorker(id, hnd)); + + // run worker event processing loop + rt.block_on(WorkerRunner { rx }); + + // deregister worker + let _ = System::current() + .tx() + .send(SystemCommand::DeregisterWorker(id)); + } + }) + .unwrap_or_else(|err| { + panic!("Cannot spawn a Worker's thread {:?}: {:?}", &name, err) + }); + + Worker { + sender: tx, + thread_handle, + } + } + + /// Sets up a worker runner on the current thread using the provided runtime local task set. + pub(crate) fn new_current_thread(local: &LocalSet) -> WorkerHandle { + let (tx, rx) = mpsc::unbounded_channel(); + + let hnd = WorkerHandle::new(tx); + + HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); + STORAGE.with(|cell| cell.borrow_mut().clear()); + + local.spawn_local(WorkerRunner { rx }); + + hnd + } + + /// Return a handle to the worker. + /// + /// # Panics + /// Panics if no Worker is running on the current thread. + pub fn handle() -> WorkerHandle { + HANDLE.with(|cell| match *cell.borrow() { + Some(ref addr) => addr.clone(), + None => panic!("Worker is not running."), + }) + } + + /// Stop worker from continuing it's event loop. + /// + /// Returns true if stop message was sent successfully and false if the Worker has been dropped. + pub fn stop(&self) -> bool { + self.sender.send(WorkerCommand::Stop).is_ok() + } + + /// Send a future to the Worker's thread and spawn it. + /// + /// If you require a result, include a response channel in the future. + /// + /// Returns true if future was sent successfully and false if the Worker has died. + pub fn spawn(&self, future: Fut) -> bool + where + Fut: Future + Send + 'static, + { + self.sender + .send(WorkerCommand::Execute(Box::pin(future))) + .is_ok() + } + + /// Send a function to the Worker's thread and execute it. + /// + /// Any result from the function is discarded. If you require a result, include a response + /// channel in the function. + /// + /// Returns true if function was sent successfully and false if the Worker has died. pub fn spawn_fn(&self, f: F) -> bool where F: FnOnce() + Send + 'static, @@ -158,32 +213,9 @@ impl Worker { /// Wait for worker's event loop to complete. /// - /// Joins the underlying OS thread handle, if contained. - pub fn join(&mut self) -> thread::Result<()> { - if let Some(thread_handle) = self.thread_handle.take() { - thread_handle.join() - } else { - Ok(()) - } - } - - pub(crate) fn new_system(local: &LocalSet) -> Self { - let (tx, rx) = mpsc::unbounded_channel(); - - let arb = Worker::new_handle(tx); - ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - STORAGE.with(|cell| cell.borrow_mut().clear()); - - local.spawn_local(WorkerRunner { rx }); - - arb - } - - fn new_handle(sender: mpsc::UnboundedSender) -> Self { - Self { - sender, - thread_handle: None, - } + /// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join). + pub fn join(self) -> thread::Result<()> { + self.thread_handle.join() } /// Insert item into worker's thread-local storage. @@ -240,20 +272,34 @@ struct WorkerRunner { rx: mpsc::UnboundedReceiver, } +impl Drop for WorkerRunner { + fn drop(&mut self) { + eprintln!("dropping WorkerRunner") + } +} + impl Future for WorkerRunner { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + eprintln!("WorkerRunner: poll"); + // process all items currently buffered in channel loop { + eprintln!("WorkerRunner: loop"); + match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { // channel closed; no more messages can be received None => return Poll::Ready(()), // process worker command Some(item) => match item { - WorkerCommand::Stop => return Poll::Ready(()), + WorkerCommand::Stop => { + eprintln!("WorkerRunner: stopping"); + return Poll::Ready(()); + } WorkerCommand::Execute(task_fut) => { + eprintln!("WorkerRunner: executing task"); tokio::task::spawn_local(task_fut); } }, diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index ec71656c..76b25715 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -1,16 +1,17 @@ use std::{ - sync::mpsc::sync_channel, + sync::mpsc::channel, thread, time::{Duration, Instant}, }; use actix_rt::{System, Worker}; +use tokio::sync::oneshot; #[test] fn await_for_timer() { let time = Duration::from_secs(1); let instant = Instant::now(); - System::new("test_wait_timer").block_on(async move { + System::new().block_on(async move { tokio::time::sleep(time).await; }); assert!( @@ -23,11 +24,11 @@ fn await_for_timer() { fn join_another_worker() { let time = Duration::from_secs(1); let instant = Instant::now(); - System::new("test_join_another_worker").block_on(async move { - let mut worker = Worker::new(); + System::new().block_on(async move { + let worker = Worker::new(); worker.spawn(Box::pin(async move { tokio::time::sleep(time).await; - Worker::current().stop(); + Worker::handle().stop(); })); worker.join().unwrap(); }); @@ -37,12 +38,12 @@ fn join_another_worker() { ); let instant = Instant::now(); - System::new("test_join_another_worker").block_on(async move { - let mut worker = Worker::new(); + System::new().block_on(async move { + let worker = Worker::new(); worker.spawn_fn(move || { actix_rt::spawn(async move { tokio::time::sleep(time).await; - Worker::current().stop(); + Worker::handle().stop(); }); }); worker.join().unwrap(); @@ -53,11 +54,11 @@ fn join_another_worker() { ); let instant = Instant::now(); - System::new("test_join_another_worker").block_on(async move { - let mut worker = Worker::new(); + System::new().block_on(async move { + let worker = Worker::new(); worker.spawn(Box::pin(async move { tokio::time::sleep(time).await; - Worker::current().stop(); + Worker::handle().stop(); })); worker.stop(); worker.join().unwrap(); @@ -73,7 +74,7 @@ fn non_static_block_on() { let string = String::from("test_str"); let str = string.as_str(); - let sys = System::new("borrow some"); + let sys = System::new(); sys.block_on(async { actix_rt::time::sleep(Duration::from_millis(1)).await; @@ -87,10 +88,11 @@ fn non_static_block_on() { assert_eq!("test_str", str); }); - System::run(|| { + System::with_init(async { assert_eq!("test_str", str); System::current().stop(); }) + .run() .unwrap(); } @@ -109,11 +111,11 @@ fn wait_for_spawns() { #[test] fn worker_spawn_fn_runs() { - let _ = System::new("test-system"); + let _ = System::new(); - let (tx, rx) = sync_channel::(1); + let (tx, rx) = channel::(); - let mut worker = Worker::new(); + let worker = Worker::new(); worker.spawn_fn(move || tx.send(42).unwrap()); let num = rx.recv().unwrap(); @@ -125,9 +127,9 @@ fn worker_spawn_fn_runs() { #[test] fn worker_drop_no_panic_fn() { - let _ = System::new("test-system"); + let _ = System::new(); - let mut worker = Worker::new(); + let worker = Worker::new(); worker.spawn_fn(|| panic!("test")); worker.stop(); @@ -136,9 +138,9 @@ fn worker_drop_no_panic_fn() { #[test] fn worker_drop_no_panic_fut() { - let _ = System::new("test-system"); + let _ = System::new(); - let mut worker = Worker::new(); + let worker = Worker::new(); worker.spawn(async { panic!("test") }); worker.stop(); @@ -147,9 +149,9 @@ fn worker_drop_no_panic_fut() { #[test] fn worker_item_storage() { - let _ = System::new("test-system"); + let _ = System::new(); - let mut worker = Worker::new(); + let worker = Worker::new(); assert!(!Worker::contains_item::()); Worker::set_item(42u32); @@ -174,18 +176,6 @@ fn worker_item_storage() { worker.join().unwrap(); } -#[test] -fn system_name_cow_str() { - let _ = System::new("test-system"); - System::current().stop(); -} - -#[test] -fn system_name_cow_string() { - let _ = System::new("test-system".to_owned()); - System::current().stop(); -} - #[test] #[should_panic] fn no_system_current_panic() { @@ -197,3 +187,39 @@ fn no_system_current_panic() { fn no_system_worker_new_panic() { Worker::new(); } + +#[test] +fn system_worker_spawn() { + let runner = System::new(); + + eprintln!("making channel"); + let (tx, rx) = oneshot::channel(); + + eprintln!("getting initial system worker"); + let sys = System::current(); + + thread::spawn(|| { + // this thread will have no worker in it's thread local so call will panic + Worker::handle(); + }) + .join() + .unwrap_err(); + + let thread = thread::spawn(|| { + // this thread will have no worker in it's thread local so use the system handle instead + System::set_current(sys); + let sys = System::current(); + + let wrk = sys.worker(); + wrk.spawn(async move { + eprintln!("before send"); + tx.send(42u32).unwrap(); + + eprintln!("after send"); + System::current().stop(); + }); + }); + + assert_eq!(runner.block_on(rx).unwrap(), 42); + thread.join().unwrap(); +} diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index 4b7f7873..864f391c 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -48,7 +48,7 @@ impl TestServer { // run server in separate thread thread::spawn(move || { - let sys = System::new("actix-test-server"); + let sys = System::new(); factory(Server::build()).workers(1).disable_signals().run(); tx.send(System::current()).unwrap(); @@ -70,7 +70,7 @@ impl TestServer { // run server in separate thread thread::spawn(move || { - let sys = System::new("actix-test-server"); + let sys = System::new(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index ae387cbd..6f15d044 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -215,7 +215,7 @@ impl ServerWorker { } Err(e) => { error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); + Arbiter::handle().stop(); } } wrk.await @@ -386,7 +386,7 @@ impl Future for ServerWorker { let num = num_connections(); if num == 0 { let _ = tx.take().unwrap().send(true); - Arbiter::current().stop(); + Arbiter::handle().stop(); return Poll::Ready(()); } @@ -394,7 +394,7 @@ impl Future for ServerWorker { if Pin::new(t2).poll(cx).is_ready() { let _ = tx.take().unwrap().send(false); self.shutdown(true); - Arbiter::current().stop(); + Arbiter::handle().stop(); return Poll::Ready(()); }