fix spawn call in server

This commit is contained in:
Rob Ede 2021-01-29 15:01:42 +00:00
parent 04c6456398
commit 38bc67a61c
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6
5 changed files with 40 additions and 12 deletions

View File

@ -53,7 +53,7 @@ impl Builder {
where where
F: FnOnce(), F: FnOnce(),
{ {
let (stop_tx, stop) = channel(); let (stop_tx, stop_rx) = channel();
let (sys_sender, sys_receiver) = unbounded_channel(); let (sys_sender, sys_receiver) = unbounded_channel();
let rt = Runtime::new().unwrap(); let rt = Runtime::new().unwrap();
@ -67,27 +67,30 @@ impl Builder {
// run system init method // run system init method
rt.block_on(async { init_fn() }); rt.block_on(async { init_fn() });
SystemRunner { rt, stop, system } SystemRunner {
rt,
stop_rx,
system,
}
} }
} }
/// Helper object that runs System's event loop /// System runner object that keeps event loop alive and running until stop message is received.
#[must_use = "SystemRunner must be run"] #[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)] #[derive(Debug)]
pub struct SystemRunner { pub struct SystemRunner {
rt: Runtime, rt: Runtime,
stop: Receiver<i32>, stop_rx: Receiver<i32>,
system: System, system: System,
} }
impl SystemRunner { impl SystemRunner {
/// This function will start event loop and will finish once the /// Starts event loop and will finish once [`System::stop()`] is called.
/// `System::stop()` function is called.
pub fn run(self) -> io::Result<()> { pub fn run(self) -> io::Result<()> {
let SystemRunner { rt, stop, .. } = self; let SystemRunner { rt, stop_rx, .. } = self;
// run loop // run loop
match rt.block_on(stop) { match rt.block_on(stop_rx) {
Ok(code) => { Ok(code) => {
if code != 0 { if code != 0 {
Err(io::Error::new( Err(io::Error::new(
@ -98,11 +101,12 @@ impl SystemRunner {
Ok(()) Ok(())
} }
} }
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
} }
} }
/// Execute a future and wait for result. /// Runs the provided future, blocking the current thread until the future completes.
#[inline] #[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output { pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
self.rt.block_on(fut) self.rt.block_on(fut)

View File

@ -24,6 +24,7 @@ static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
pub struct System { pub struct System {
id: usize, id: usize,
sys_tx: mpsc::UnboundedSender<SystemCommand>, sys_tx: mpsc::UnboundedSender<SystemCommand>,
// TODO: which worker is this exactly
worker: Worker, worker: Worker,
} }
@ -63,6 +64,9 @@ impl System {
} }
/// Get current running system. /// Get current running system.
///
/// # Panics
/// Panics if no system is registered on the current thread.
pub fn current() -> System { pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() { CURRENT.with(|cell| match *cell.borrow() {
Some(ref sys) => sys.clone(), Some(ref sys) => sys.clone(),
@ -113,6 +117,12 @@ impl System {
&self.sys_tx &self.sys_tx
} }
// TODO: give clarity on which worker this is; previous documented as returning "system worker"
/// Get shared reference to a worker.
pub fn worker(&self) -> &Worker {
&self.worker
}
/// This function will start Tokio runtime and will finish once the `System::stop()` message /// This function will start Tokio runtime and will finish once the `System::stop()` message
/// is called. Function `f` is called within Tokio runtime context. /// is called. Function `f` is called within Tokio runtime context.
pub fn run<F>(f: F) -> io::Result<()> pub fn run<F>(f: F) -> io::Result<()>

View File

@ -66,6 +66,9 @@ impl Worker {
/// Spawn new thread and run event loop in spawned thread. /// Spawn new thread and run event loop in spawned thread.
/// ///
/// Returns handle of newly created worker. /// Returns handle of newly created worker.
///
/// # Panics
/// Panics if a [System] not registered on the current thread.
pub fn new() -> Worker { pub fn new() -> Worker {
let id = COUNT.fetch_add(1, Ordering::Relaxed); let id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt:worker:{}", id); let name = format!("actix-rt:worker:{}", id);

View File

@ -185,3 +185,15 @@ fn system_name_cow_string() {
let _ = System::new("test-system".to_owned()); let _ = System::new("test-system".to_owned());
System::current().stop(); System::current().stop();
} }
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
#[should_panic]
fn no_system_worker_new_panic() {
Worker::new();
}

View File

@ -2,7 +2,6 @@ use std::time::Duration;
use std::{io, thread}; use std::{io, thread};
use actix_rt::{ use actix_rt::{
self as rt,
time::{sleep_until, Instant}, time::{sleep_until, Instant},
System, System,
}; };
@ -404,7 +403,7 @@ impl Accept {
// after the sleep a Timer interest is sent to Accept Poll // after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone(); let waker = self.waker.clone();
rt::spawn(async move { System::current().worker().spawn(async move {
sleep_until(Instant::now() + Duration::from_millis(510)).await; sleep_until(Instant::now() + Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer); waker.wake(WakerInterest::Timer);
}); });