From 38bc67a61cde2be649216b19f5f93f6de2438de6 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 29 Jan 2021 15:01:42 +0000 Subject: [PATCH] fix spawn call in server --- actix-rt/src/builder.rs | 24 ++++++++++++++---------- actix-rt/src/system.rs | 10 ++++++++++ actix-rt/src/worker.rs | 3 +++ actix-rt/tests/tests.rs | 12 ++++++++++++ actix-server/src/accept.rs | 3 +-- 5 files changed, 40 insertions(+), 12 deletions(-) diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 93d2fe28..f9a3fca2 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -53,7 +53,7 @@ impl Builder { where F: FnOnce(), { - let (stop_tx, stop) = channel(); + let (stop_tx, stop_rx) = channel(); let (sys_sender, sys_receiver) = unbounded_channel(); let rt = Runtime::new().unwrap(); @@ -67,27 +67,30 @@ impl Builder { // run system init method rt.block_on(async { init_fn() }); - SystemRunner { rt, stop, system } + SystemRunner { + rt, + stop_rx, + system, + } } } -/// Helper object that runs System's event loop -#[must_use = "SystemRunner must be run"] +/// System runner object that keeps event loop alive and running until stop message is received. +#[must_use = "A SystemRunner does nothing unless `run` is called."] #[derive(Debug)] pub struct SystemRunner { rt: Runtime, - stop: Receiver, + stop_rx: Receiver, system: System, } impl SystemRunner { - /// This function will start event loop and will finish once the - /// `System::stop()` function is called. + /// Starts event loop and will finish once [`System::stop()`] is called. pub fn run(self) -> io::Result<()> { - let SystemRunner { rt, stop, .. } = self; + let SystemRunner { rt, stop_rx, .. } = self; // run loop - match rt.block_on(stop) { + match rt.block_on(stop_rx) { Ok(code) => { if code != 0 { Err(io::Error::new( @@ -98,11 +101,12 @@ impl SystemRunner { Ok(()) } } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } } - /// Execute a future and wait for result. + /// Runs the provided future, blocking the current thread until the future completes. #[inline] pub fn block_on(&self, fut: F) -> F::Output { self.rt.block_on(fut) diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 86774a71..0182136e 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -24,6 +24,7 @@ static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); pub struct System { id: usize, sys_tx: mpsc::UnboundedSender, + // TODO: which worker is this exactly worker: Worker, } @@ -63,6 +64,9 @@ impl System { } /// Get current running system. + /// + /// # Panics + /// Panics if no system is registered on the current thread. pub fn current() -> System { CURRENT.with(|cell| match *cell.borrow() { Some(ref sys) => sys.clone(), @@ -113,6 +117,12 @@ impl System { &self.sys_tx } + // TODO: give clarity on which worker this is; previous documented as returning "system worker" + /// Get shared reference to a worker. + pub fn worker(&self) -> &Worker { + &self.worker + } + /// This function will start Tokio runtime and will finish once the `System::stop()` message /// is called. Function `f` is called within Tokio runtime context. pub fn run(f: F) -> io::Result<()> diff --git a/actix-rt/src/worker.rs b/actix-rt/src/worker.rs index d8538639..adda3cff 100644 --- a/actix-rt/src/worker.rs +++ b/actix-rt/src/worker.rs @@ -66,6 +66,9 @@ impl Worker { /// Spawn new thread and run event loop in spawned thread. /// /// Returns handle of newly created worker. + /// + /// # Panics + /// Panics if a [System] not registered on the current thread. pub fn new() -> Worker { let id = COUNT.fetch_add(1, Ordering::Relaxed); let name = format!("actix-rt:worker:{}", id); diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index f100c7ce..ec71656c 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -185,3 +185,15 @@ fn system_name_cow_string() { let _ = System::new("test-system".to_owned()); System::current().stop(); } + +#[test] +#[should_panic] +fn no_system_current_panic() { + System::current(); +} + +#[test] +#[should_panic] +fn no_system_worker_new_panic() { + Worker::new(); +} diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index fea8acbd..82c00ef5 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -2,7 +2,6 @@ use std::time::Duration; use std::{io, thread}; use actix_rt::{ - self as rt, time::{sleep_until, Instant}, System, }; @@ -404,7 +403,7 @@ impl Accept { // after the sleep a Timer interest is sent to Accept Poll let waker = self.waker.clone(); - rt::spawn(async move { + System::current().worker().spawn(async move { sleep_until(Instant::now() + Duration::from_millis(510)).await; waker.wake(WakerInterest::Timer); });