remove init future arg

This commit is contained in:
Rob Ede 2021-01-31 03:11:53 +00:00
parent 7e6b23af6f
commit 13bc72a71e
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6
3 changed files with 23 additions and 44 deletions

View File

@ -86,7 +86,7 @@ impl ArbiterHandle {
}
}
/// A arbiter represents a thread that provides an asynchronous execution environment for futures
/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
/// and functions.
///
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.

View File

@ -36,18 +36,25 @@ impl System {
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
Self::create_runtime(async {})
Self::create_runtime()
}
/// Create a new system with given initialization future.
///
/// The initialization future be run to completion (blocking current thread) before the system
/// runner is returned.
///
/// # Panics
/// Panics if underlying Tokio runtime can not be created.
pub fn with_init(init_fut: impl Future) -> SystemRunner {
Self::create_runtime(init_fut)
fn create_runtime() -> SystemRunner {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
let system = System::construct(sys_tx, Arbiter::new_current_thread(rt.local_set()));
// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
rt.spawn(sys_ctrl);
SystemRunner {
rt,
stop_rx,
system,
}
}
/// Constructs new system and registers it on the current thread.
@ -66,27 +73,6 @@ impl System {
sys
}
fn create_runtime(init_fut: impl Future) -> SystemRunner {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
let system = System::construct(sys_tx, Arbiter::new_current_thread(rt.local_set()));
// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
rt.spawn(sys_ctrl);
// run system init future
rt.block_on(init_fut);
SystemRunner {
rt,
stop_rx,
system,
}
}
/// Get current running system.
///
/// # Panics

View File

@ -4,7 +4,7 @@ use std::{
time::{Duration, Instant},
};
use actix_rt::{System, Arbiter};
use actix_rt::{Arbiter, System};
use tokio::sync::oneshot;
#[test]
@ -50,7 +50,7 @@ fn join_another_arbiter() {
});
assert!(
instant.elapsed() >= time,
"Join on a arbiter that has used actix_rt::spawn should wait for said future"
"Join on an arbiter that has used actix_rt::spawn should wait for said future"
);
let instant = Instant::now();
@ -72,28 +72,21 @@ fn join_another_arbiter() {
#[test]
fn non_static_block_on() {
let string = String::from("test_str");
let str = string.as_str();
let string = string.as_str();
let sys = System::new();
sys.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", str);
assert_eq!("test_str", string);
});
let rt = actix_rt::Runtime::new().unwrap();
rt.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
assert_eq!("test_str", str);
assert_eq!("test_str", string);
});
System::with_init(async {
assert_eq!("test_str", str);
System::current().stop();
})
.run()
.unwrap();
}
#[test]