mirror of https://github.com/fafhrd91/actix-net
minimal support of System type with io-uring
This commit is contained in:
parent
00775884f8
commit
528ff435e9
|
@ -15,7 +15,7 @@
|
||||||
//! blocking task thread-pool using [`task::spawn_blocking`].
|
//! blocking task thread-pool using [`task::spawn_blocking`].
|
||||||
//!
|
//!
|
||||||
//! # Examples
|
//! # Examples
|
||||||
//! ```
|
//! ```no_run
|
||||||
//! use std::sync::mpsc;
|
//! use std::sync::mpsc;
|
||||||
//! use actix_rt::{Arbiter, System};
|
//! use actix_rt::{Arbiter, System};
|
||||||
//!
|
//!
|
||||||
|
|
|
@ -11,7 +11,7 @@ use std::{
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
|
use crate::{arbiter::ArbiterHandle, Arbiter};
|
||||||
|
|
||||||
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
|
||||||
|
@ -29,6 +29,7 @@ pub struct System {
|
||||||
arbiter_handle: ArbiterHandle,
|
arbiter_handle: ArbiterHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
impl System {
|
impl System {
|
||||||
/// Create a new system.
|
/// Create a new system.
|
||||||
///
|
///
|
||||||
|
@ -37,7 +38,7 @@ impl System {
|
||||||
#[allow(clippy::new_ret_no_self)]
|
#[allow(clippy::new_ret_no_self)]
|
||||||
pub fn new() -> SystemRunner {
|
pub fn new() -> SystemRunner {
|
||||||
Self::with_tokio_rt(|| {
|
Self::with_tokio_rt(|| {
|
||||||
default_tokio_runtime()
|
crate::runtime::default_tokio_runtime()
|
||||||
.expect("Default Actix (Tokio) runtime could not be created.")
|
.expect("Default Actix (Tokio) runtime could not be created.")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -53,7 +54,7 @@ impl System {
|
||||||
let (stop_tx, stop_rx) = oneshot::channel();
|
let (stop_tx, stop_rx) = oneshot::channel();
|
||||||
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let rt = Runtime::from(runtime_factory());
|
let rt = crate::runtime::Runtime::from(runtime_factory());
|
||||||
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
|
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
|
||||||
let system = System::construct(sys_tx, sys_arbiter.clone());
|
let system = System::construct(sys_tx, sys_arbiter.clone());
|
||||||
|
|
||||||
|
@ -72,7 +73,32 @@ impl System {
|
||||||
system,
|
system,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "io-uring")]
|
||||||
|
impl System {
|
||||||
|
/// Create a new system.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if underlying Tokio runtime can not be created.
|
||||||
|
#[allow(clippy::new_ret_no_self)]
|
||||||
|
pub fn new() -> SystemRunner {
|
||||||
|
SystemRunner
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
||||||
|
///
|
||||||
|
/// [tokio-runtime]: tokio::runtime::Runtime
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
|
||||||
|
where
|
||||||
|
F: Fn() -> tokio::runtime::Runtime,
|
||||||
|
{
|
||||||
|
unimplemented!("System::with_tokio_rt is not implemented yet")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl System {
|
||||||
/// Constructs new system and registers it on the current thread.
|
/// Constructs new system and registers it on the current thread.
|
||||||
pub(crate) fn construct(
|
pub(crate) fn construct(
|
||||||
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
||||||
|
@ -149,15 +175,18 @@ impl System {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
||||||
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SystemRunner {
|
pub struct SystemRunner {
|
||||||
rt: Runtime,
|
rt: crate::runtime::Runtime,
|
||||||
stop_rx: oneshot::Receiver<i32>,
|
stop_rx: oneshot::Receiver<i32>,
|
||||||
|
#[allow(dead_code)]
|
||||||
system: System,
|
system: System,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
impl SystemRunner {
|
impl SystemRunner {
|
||||||
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
||||||
pub fn run(self) -> io::Result<()> {
|
pub fn run(self) -> io::Result<()> {
|
||||||
|
@ -187,6 +216,45 @@ impl SystemRunner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "io-uring")]
|
||||||
|
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
||||||
|
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SystemRunner;
|
||||||
|
|
||||||
|
#[cfg(feature = "io-uring")]
|
||||||
|
impl SystemRunner {
|
||||||
|
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
||||||
|
pub fn run(self) -> io::Result<()> {
|
||||||
|
unimplemented!("SystemRunner::run is not implemented yet")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs the provided future, blocking the current thread until the future completes.
|
||||||
|
#[inline]
|
||||||
|
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
|
||||||
|
tokio_uring::start(async move {
|
||||||
|
let (stop_tx, stop_rx) = oneshot::channel();
|
||||||
|
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let sys_arbiter = Arbiter::in_new_system();
|
||||||
|
let system = System::construct(sys_tx, sys_arbiter.clone());
|
||||||
|
|
||||||
|
system
|
||||||
|
.tx()
|
||||||
|
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
// init background system arbiter
|
||||||
|
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
|
||||||
|
tokio_uring::spawn(sys_ctrl);
|
||||||
|
|
||||||
|
let res = fut.await;
|
||||||
|
drop(stop_rx);
|
||||||
|
res
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) enum SystemCommand {
|
pub(crate) enum SystemCommand {
|
||||||
Exit(i32),
|
Exit(i32),
|
||||||
|
|
|
@ -1,12 +1,15 @@
|
||||||
use std::{
|
use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
sync::mpsc::channel,
|
|
||||||
thread,
|
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_rt::{task::JoinError, Arbiter, System};
|
use actix_rt::{task::JoinError, Arbiter, System};
|
||||||
use tokio::sync::oneshot;
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
|
use {
|
||||||
|
std::{sync::mpsc::channel, thread},
|
||||||
|
tokio::sync::oneshot,
|
||||||
|
};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn await_for_timer() {
|
fn await_for_timer() {
|
||||||
|
@ -103,6 +106,10 @@ fn wait_for_spawns() {
|
||||||
assert!(rt.block_on(handle).is_err());
|
assert!(rt.block_on(handle).is_err());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Temporary disabled tests for io-uring feature.
|
||||||
|
// They should be enabled when possible.
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
#[test]
|
#[test]
|
||||||
fn arbiter_spawn_fn_runs() {
|
fn arbiter_spawn_fn_runs() {
|
||||||
let _ = System::new();
|
let _ = System::new();
|
||||||
|
@ -119,6 +126,7 @@ fn arbiter_spawn_fn_runs() {
|
||||||
arbiter.join().unwrap();
|
arbiter.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
#[test]
|
#[test]
|
||||||
fn arbiter_handle_spawn_fn_runs() {
|
fn arbiter_handle_spawn_fn_runs() {
|
||||||
let sys = System::new();
|
let sys = System::new();
|
||||||
|
@ -141,6 +149,7 @@ fn arbiter_handle_spawn_fn_runs() {
|
||||||
sys.run().unwrap();
|
sys.run().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
#[test]
|
#[test]
|
||||||
fn arbiter_drop_no_panic_fn() {
|
fn arbiter_drop_no_panic_fn() {
|
||||||
let _ = System::new();
|
let _ = System::new();
|
||||||
|
@ -152,6 +161,7 @@ fn arbiter_drop_no_panic_fn() {
|
||||||
arbiter.join().unwrap();
|
arbiter.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
#[test]
|
#[test]
|
||||||
fn arbiter_drop_no_panic_fut() {
|
fn arbiter_drop_no_panic_fut() {
|
||||||
let _ = System::new();
|
let _ = System::new();
|
||||||
|
@ -163,18 +173,7 @@ fn arbiter_drop_no_panic_fut() {
|
||||||
arbiter.join().unwrap();
|
arbiter.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[cfg(not(feature = "io-uring"))]
|
||||||
#[should_panic]
|
|
||||||
fn no_system_current_panic() {
|
|
||||||
System::current();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
#[should_panic]
|
|
||||||
fn no_system_arbiter_new_panic() {
|
|
||||||
Arbiter::new();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn system_arbiter_spawn() {
|
fn system_arbiter_spawn() {
|
||||||
let runner = System::new();
|
let runner = System::new();
|
||||||
|
@ -205,6 +204,7 @@ fn system_arbiter_spawn() {
|
||||||
thread.join().unwrap();
|
thread.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
#[test]
|
#[test]
|
||||||
fn system_stop_stops_arbiters() {
|
fn system_stop_stops_arbiters() {
|
||||||
let sys = System::new();
|
let sys = System::new();
|
||||||
|
@ -293,6 +293,18 @@ fn new_arbiter_with_tokio() {
|
||||||
assert!(!counter.load(Ordering::SeqCst));
|
assert!(!counter.load(Ordering::SeqCst));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[should_panic]
|
||||||
|
fn no_system_current_panic() {
|
||||||
|
System::current();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[should_panic]
|
||||||
|
fn no_system_arbiter_new_panic() {
|
||||||
|
Arbiter::new();
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn try_current_no_system() {
|
fn try_current_no_system() {
|
||||||
assert!(System::try_current().is_none())
|
assert!(System::try_current().is_none())
|
||||||
|
@ -330,7 +342,7 @@ fn spawn_local() {
|
||||||
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
||||||
#[test]
|
#[test]
|
||||||
fn tokio_uring_arbiter() {
|
fn tokio_uring_arbiter() {
|
||||||
let system = System::new();
|
System::new().block_on(async {
|
||||||
let (tx, rx) = std::sync::mpsc::channel();
|
let (tx, rx) = std::sync::mpsc::channel();
|
||||||
|
|
||||||
Arbiter::new().spawn(async move {
|
Arbiter::new().spawn(async move {
|
||||||
|
@ -352,6 +364,5 @@ fn tokio_uring_arbiter() {
|
||||||
});
|
});
|
||||||
|
|
||||||
assert!(rx.recv().unwrap());
|
assert!(rx.recv().unwrap());
|
||||||
|
})
|
||||||
drop(system);
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue