diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 3bc8a6e3..2fdab778 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -11,6 +11,7 @@ use std::{ use futures_core::ready; use tokio::sync::{mpsc, oneshot}; +use crate::task::JoinHandle; use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -144,6 +145,27 @@ impl System { let _ = self.sys_tx.send(SystemCommand::Exit(code)); } + /// Send an async task to [System] and spawn running it. + /// Returned [JoinHandle](crate::task::JoinHandle) can be used to await for the task's output. + /// + /// # Error + /// [actix_rt::spawn](crate::spawn) can not be used inside `System::spawn`. + pub fn spawn(task: T) -> JoinHandle + where + T: Future + Send + 'static, + T::Output: Send + 'static, + { + tokio::spawn(async { + let (tx, rx) = oneshot::channel(); + System::current().arbiter().spawn(async move { + let res = tokio::spawn(task).await; + let _ = tx.send(res); + }); + // unwrap would be caught by tokio and output as JoinError + rx.await.unwrap().unwrap() + }) + } + pub(crate) fn tx(&self) -> &mpsc::UnboundedSender { &self.sys_tx } diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 86fba96d..e01d73c5 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -298,3 +298,48 @@ fn try_current_no_system() { fn try_current_with_system() { System::new().block_on(async { assert!(System::try_current().is_some()) }); } + +#[test] +fn system_spawn() { + System::new().block_on(async { + // Test detached with dropped handle + let num = Arc::new(AtomicBool::new(false)); + let num_clone = num.clone(); + System::spawn(async move { + actix_rt::time::sleep(Duration::from_millis(500)).await; + num_clone.store(true, Ordering::SeqCst); + }); + + let now = Instant::now(); + while !num.load(Ordering::SeqCst) { + actix_rt::time::sleep(Duration::from_millis(500)).await; + if now.elapsed() > Duration::from_secs(10) { + panic!("System::spawn deadlocked") + } + } + + // test join handle await + let num = Arc::new(AtomicBool::new(false)); + let num_clone = num.clone(); + System::spawn(async move { + actix_rt::task::yield_now().await; + num_clone.store(true, Ordering::SeqCst); + actix_rt::task::yield_now().await; + }) + .await + .unwrap(); + + assert!(num.load(Ordering::SeqCst)); + + // test extra actix_rt::spawn + let res = System::spawn(async move { + actix_rt::task::yield_now().await; + actix_rt::spawn(async { + actix_rt::task::yield_now().await; + }); + }) + .await; + + assert!(res.is_err()) + }); +}