Add System::spawn

This commit is contained in:
fakeshadow 2021-04-17 12:30:14 +08:00
parent 978e4f25fb
commit 07dec1efa4
2 changed files with 67 additions and 0 deletions

View File

@ -11,6 +11,7 @@ use std::{
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};
use crate::task::JoinHandle;
use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@ -144,6 +145,27 @@ impl System {
let _ = self.sys_tx.send(SystemCommand::Exit(code));
}
/// Send an async task to [System] and spawn running it.
/// Returned [JoinHandle](crate::task::JoinHandle) can be used to await for the task's output.
///
/// # Error
/// [actix_rt::spawn](crate::spawn) can not be used inside `System::spawn`.
pub fn spawn<T>(task: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(async {
let (tx, rx) = oneshot::channel();
System::current().arbiter().spawn(async move {
let res = tokio::spawn(task).await;
let _ = tx.send(res);
});
// unwrap would be caught by tokio and output as JoinError
rx.await.unwrap().unwrap()
})
}
pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> {
&self.sys_tx
}

View File

@ -298,3 +298,48 @@ fn try_current_no_system() {
fn try_current_with_system() {
System::new().block_on(async { assert!(System::try_current().is_some()) });
}
#[test]
fn system_spawn() {
System::new().block_on(async {
// Test detached with dropped handle
let num = Arc::new(AtomicBool::new(false));
let num_clone = num.clone();
System::spawn(async move {
actix_rt::time::sleep(Duration::from_millis(500)).await;
num_clone.store(true, Ordering::SeqCst);
});
let now = Instant::now();
while !num.load(Ordering::SeqCst) {
actix_rt::time::sleep(Duration::from_millis(500)).await;
if now.elapsed() > Duration::from_secs(10) {
panic!("System::spawn deadlocked")
}
}
// test join handle await
let num = Arc::new(AtomicBool::new(false));
let num_clone = num.clone();
System::spawn(async move {
actix_rt::task::yield_now().await;
num_clone.store(true, Ordering::SeqCst);
actix_rt::task::yield_now().await;
})
.await
.unwrap();
assert!(num.load(Ordering::SeqCst));
// test extra actix_rt::spawn
let res = System::spawn(async move {
actix_rt::task::yield_now().await;
actix_rt::spawn(async {
actix_rt::task::yield_now().await;
});
})
.await;
assert!(res.is_err())
});
}