diff --git a/actix-rt/examples/hyper.rs b/actix-rt/examples/hyper.rs
index 45b5e551..3134f413 100644
--- a/actix-rt/examples/hyper.rs
+++ b/actix-rt/examples/hyper.rs
@@ -10,13 +10,20 @@ async fn handle(_req: Request
) -> Result, Infallible> {
}
fn main() {
- actix_rt::System::with_tokio_rt(|| {
+ #[cfg(not(feature = "io-uring"))]
+ let rt = actix_rt::System::with_tokio_rt(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
- })
- .block_on(async {
+ });
+ #[cfg(feature = "io-uring")]
+ let rt = actix_rt::System::with_tokio_rt(|| {
+ tokio_uring::Runtime::new(&tokio_uring::builder())
+ .unwrap()
+ });
+
+ rt.block_on(async {
let make_service =
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
diff --git a/actix-rt/examples/multi_thread_system.rs b/actix-rt/examples/multi_thread_system.rs
index 0ecd1ef1..9e2c5db5 100644
--- a/actix-rt/examples/multi_thread_system.rs
+++ b/actix-rt/examples/multi_thread_system.rs
@@ -4,15 +4,22 @@
use actix_rt::System;
fn main() {
- System::with_tokio_rt(|| {
+ #[cfg(not(feature = "io-uring"))]
+ let system = System::with_tokio_rt(|| {
// build system with a multi-thread tokio runtime.
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap()
- })
- .block_on(async_main());
+ });
+ #[cfg(feature = "io-uring")]
+ let system = System::with_tokio_rt(|| {
+ // build system with tokio uring runtime.
+ tokio_uring::Runtime::new(&tokio_uring::builder())
+ .unwrap()
+ });
+ system.block_on(async_main());
}
// async main function that acts like #[actix_web::main] or #[tokio::main]
diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs
index a84e25ea..c8673749 100644
--- a/actix-rt/src/arbiter.rs
+++ b/actix-rt/src/arbiter.rs
@@ -95,7 +95,7 @@ impl Arbiter {
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
- #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
+ //#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
Self::with_tokio_rt(|| {
@@ -107,10 +107,10 @@ impl Arbiter {
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
- #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
+ //#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub fn with_tokio_rt(runtime_factory: F) -> Arbiter
where
- F: Fn() -> tokio::runtime::Runtime + Send + 'static,
+ F: Fn() -> crate::runtime::GlobalRuntime + Send + 'static,
{
let sys = System::current();
let system_id = sys.id();
@@ -162,7 +162,7 @@ impl Arbiter {
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
- #[cfg(all(target_os = "linux", feature = "io-uring"))]
+ /*#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
let sys = System::current();
@@ -208,7 +208,7 @@ impl Arbiter {
ready_rx.recv().unwrap();
Arbiter { tx, thread_handle }
- }
+ }*/
/// Sets up an Arbiter runner in a new System using the environment's local set.
pub(crate) fn in_new_system() -> ArbiterHandle {
diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs
index 557dfcfe..ae5d32a4 100644
--- a/actix-rt/src/runtime.rs
+++ b/actix-rt/src/runtime.rs
@@ -6,19 +6,40 @@ use tokio::task::{JoinHandle, LocalSet};
///
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
/// on submitted futures.
-#[derive(Debug)]
+#[cfg_attr(not(all(target_os = "linux", feature = "io-uring")),derive(Debug))]
pub struct Runtime {
local: LocalSet,
- rt: tokio::runtime::Runtime,
+ rt: GlobalRuntime,
}
-pub(crate) fn default_tokio_runtime() -> io::Result {
+#[cfg(all(target_os = "linux", feature = "io-uring"))]
+impl std::fmt::Debug for Runtime {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Runtime")
+ .field("local", &self.local)
+ .finish_non_exhaustive()
+ }
+}
+
+#[cfg(all(target_os = "linux", feature = "io-uring"))]
+pub(crate) type GlobalRuntime = tokio_uring::Runtime;
+
+#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
+pub(crate) type GlobalRuntime = tokio::runtime::Runtime;
+
+#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
+pub(crate) fn default_tokio_runtime() -> io::Result {
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
}
+#[cfg(all(target_os = "linux", feature = "io-uring"))]
+pub(crate) fn default_tokio_runtime() -> io::Result {
+ tokio_uring::Runtime::new(&tokio_uring::builder())
+}
+
impl Runtime {
/// Returns a new runtime initialized with default configuration values.
#[allow(clippy::new_ret_no_self)]
@@ -79,12 +100,12 @@ impl Runtime {
where
F: Future,
{
- self.local.block_on(&self.rt, f)
+ self.rt.block_on(self.local.run_until(f))
}
}
-impl From for Runtime {
- fn from(rt: tokio::runtime::Runtime) -> Self {
+impl From for Runtime {
+ fn from(rt: GlobalRuntime) -> Self {
Self {
local: LocalSet::new(),
rt,
diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs
index d0494a22..4441f2f1 100644
--- a/actix-rt/src/system.rs
+++ b/actix-rt/src/system.rs
@@ -78,18 +78,40 @@ impl System {
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
- SystemRunner
+ Self::with_tokio_rt(|| {
+ crate::runtime::default_tokio_runtime()
+ .expect("Default Actix (Tokio) runtime could not be created.")
+ })
}
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
- pub fn with_tokio_rt(_: F) -> SystemRunner
+ pub fn with_tokio_rt(runtime_factory: F) -> SystemRunner
where
- F: Fn() -> tokio::runtime::Runtime,
+ F: Fn() -> tokio_uring::Runtime,
{
- unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet")
+ let (stop_tx, stop_rx) = oneshot::channel();
+ let (sys_tx, sys_rx) = mpsc::unbounded_channel();
+
+ let rt = runtime_factory();
+ let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
+ let system = System::construct(sys_tx, sys_arbiter.clone());
+
+ system
+ .tx()
+ .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
+ .unwrap();
+
+ // init background system arbiter
+ let sys_ctrl = SystemController::new(sys_rx, stop_tx);
+ rt.block_on(async move {
+ tokio::spawn(sys_ctrl);
+ });
+
+ let rt = crate::runtime::Runtime::from(rt);
+ SystemRunner { rt, stop_rx }
}
}
@@ -171,7 +193,7 @@ impl System {
}
/// Runner that keeps a [System]'s event loop alive until stop message is received.
-#[cfg(not(feature = "io-uring"))]
+//#[cfg(not(feature = "io-uring"))]
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner {
@@ -179,7 +201,7 @@ pub struct SystemRunner {
stop_rx: oneshot::Receiver,
}
-#[cfg(not(feature = "io-uring"))]
+//#[cfg(not(feature = "io-uring"))]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
@@ -210,12 +232,15 @@ impl SystemRunner {
self.rt.block_on(fut)
}
}
-
+/*
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[cfg(feature = "io-uring")]
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
-pub struct SystemRunner;
+pub struct SystemRunner {
+ rt: crate::runtime::Runtime,
+ stop_rx: oneshot::Receiver,
+}
#[cfg(feature = "io-uring")]
impl SystemRunner {
@@ -255,7 +280,7 @@ impl SystemRunner {
res
})
}
-}
+}*/
#[derive(Debug)]
pub(crate) enum SystemCommand {