From e8bb559e64b7a20268737db3433a5c7c2805278f Mon Sep 17 00:00:00 2001 From: wireless4024 Date: Sun, 12 Feb 2023 10:08:58 +0700 Subject: [PATCH] implement missing rt feature in tokio_uring runtime --- actix-rt/examples/hyper.rs | 13 +++++-- actix-rt/examples/multi_thread_system.rs | 13 +++++-- actix-rt/src/arbiter.rs | 10 +++--- actix-rt/src/runtime.rs | 33 ++++++++++++++---- actix-rt/src/system.rs | 43 +++++++++++++++++++----- 5 files changed, 86 insertions(+), 26 deletions(-) diff --git a/actix-rt/examples/hyper.rs b/actix-rt/examples/hyper.rs index 45b5e551..3134f413 100644 --- a/actix-rt/examples/hyper.rs +++ b/actix-rt/examples/hyper.rs @@ -10,13 +10,20 @@ async fn handle(_req: Request) -> Result, Infallible> { } fn main() { - actix_rt::System::with_tokio_rt(|| { + #[cfg(not(feature = "io-uring"))] + let rt = actix_rt::System::with_tokio_rt(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() .build() .unwrap() - }) - .block_on(async { + }); + #[cfg(feature = "io-uring")] + let rt = actix_rt::System::with_tokio_rt(|| { + tokio_uring::Runtime::new(&tokio_uring::builder()) + .unwrap() + }); + + rt.block_on(async { let make_service = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) }); diff --git a/actix-rt/examples/multi_thread_system.rs b/actix-rt/examples/multi_thread_system.rs index 0ecd1ef1..9e2c5db5 100644 --- a/actix-rt/examples/multi_thread_system.rs +++ b/actix-rt/examples/multi_thread_system.rs @@ -4,15 +4,22 @@ use actix_rt::System; fn main() { - System::with_tokio_rt(|| { + #[cfg(not(feature = "io-uring"))] + let system = System::with_tokio_rt(|| { // build system with a multi-thread tokio runtime. tokio::runtime::Builder::new_multi_thread() .worker_threads(2) .enable_all() .build() .unwrap() - }) - .block_on(async_main()); + }); + #[cfg(feature = "io-uring")] + let system = System::with_tokio_rt(|| { + // build system with tokio uring runtime. + tokio_uring::Runtime::new(&tokio_uring::builder()) + .unwrap() + }); + system.block_on(async_main()); } // async main function that acts like #[actix_web::main] or #[tokio::main] diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index a84e25ea..c8673749 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -95,7 +95,7 @@ impl Arbiter { /// /// # Panics /// Panics if a [System] is not registered on the current thread. - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + //#[cfg(not(all(target_os = "linux", feature = "io-uring")))] #[allow(clippy::new_without_default)] pub fn new() -> Arbiter { Self::with_tokio_rt(|| { @@ -107,10 +107,10 @@ impl Arbiter { /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. /// /// [tokio-runtime]: tokio::runtime::Runtime - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + //#[cfg(not(all(target_os = "linux", feature = "io-uring")))] pub fn with_tokio_rt(runtime_factory: F) -> Arbiter where - F: Fn() -> tokio::runtime::Runtime + Send + 'static, + F: Fn() -> crate::runtime::GlobalRuntime + Send + 'static, { let sys = System::current(); let system_id = sys.id(); @@ -162,7 +162,7 @@ impl Arbiter { /// /// # Panics /// Panics if a [System] is not registered on the current thread. - #[cfg(all(target_os = "linux", feature = "io-uring"))] + /*#[cfg(all(target_os = "linux", feature = "io-uring"))] #[allow(clippy::new_without_default)] pub fn new() -> Arbiter { let sys = System::current(); @@ -208,7 +208,7 @@ impl Arbiter { ready_rx.recv().unwrap(); Arbiter { tx, thread_handle } - } + }*/ /// Sets up an Arbiter runner in a new System using the environment's local set. pub(crate) fn in_new_system() -> ArbiterHandle { diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index 557dfcfe..ae5d32a4 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -6,19 +6,40 @@ use tokio::task::{JoinHandle, LocalSet}; /// /// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound /// on submitted futures. -#[derive(Debug)] +#[cfg_attr(not(all(target_os = "linux", feature = "io-uring")),derive(Debug))] pub struct Runtime { local: LocalSet, - rt: tokio::runtime::Runtime, + rt: GlobalRuntime, } -pub(crate) fn default_tokio_runtime() -> io::Result { +#[cfg(all(target_os = "linux", feature = "io-uring"))] +impl std::fmt::Debug for Runtime { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Runtime") + .field("local", &self.local) + .finish_non_exhaustive() + } +} + +#[cfg(all(target_os = "linux", feature = "io-uring"))] +pub(crate) type GlobalRuntime = tokio_uring::Runtime; + +#[cfg(not(all(target_os = "linux", feature = "io-uring")))] +pub(crate) type GlobalRuntime = tokio::runtime::Runtime; + +#[cfg(not(all(target_os = "linux", feature = "io-uring")))] +pub(crate) fn default_tokio_runtime() -> io::Result { tokio::runtime::Builder::new_current_thread() .enable_io() .enable_time() .build() } +#[cfg(all(target_os = "linux", feature = "io-uring"))] +pub(crate) fn default_tokio_runtime() -> io::Result { + tokio_uring::Runtime::new(&tokio_uring::builder()) +} + impl Runtime { /// Returns a new runtime initialized with default configuration values. #[allow(clippy::new_ret_no_self)] @@ -79,12 +100,12 @@ impl Runtime { where F: Future, { - self.local.block_on(&self.rt, f) + self.rt.block_on(self.local.run_until(f)) } } -impl From for Runtime { - fn from(rt: tokio::runtime::Runtime) -> Self { +impl From for Runtime { + fn from(rt: GlobalRuntime) -> Self { Self { local: LocalSet::new(), rt, diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index d0494a22..4441f2f1 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -78,18 +78,40 @@ impl System { /// Panics if underlying Tokio runtime can not be created. #[allow(clippy::new_ret_no_self)] pub fn new() -> SystemRunner { - SystemRunner + Self::with_tokio_rt(|| { + crate::runtime::default_tokio_runtime() + .expect("Default Actix (Tokio) runtime could not be created.") + }) } /// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure. /// /// [tokio-runtime]: tokio::runtime::Runtime #[doc(hidden)] - pub fn with_tokio_rt(_: F) -> SystemRunner + pub fn with_tokio_rt(runtime_factory: F) -> SystemRunner where - F: Fn() -> tokio::runtime::Runtime, + F: Fn() -> tokio_uring::Runtime, { - unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet") + let (stop_tx, stop_rx) = oneshot::channel(); + let (sys_tx, sys_rx) = mpsc::unbounded_channel(); + + let rt = runtime_factory(); + let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() }); + let system = System::construct(sys_tx, sys_arbiter.clone()); + + system + .tx() + .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter)) + .unwrap(); + + // init background system arbiter + let sys_ctrl = SystemController::new(sys_rx, stop_tx); + rt.block_on(async move { + tokio::spawn(sys_ctrl); + }); + + let rt = crate::runtime::Runtime::from(rt); + SystemRunner { rt, stop_rx } } } @@ -171,7 +193,7 @@ impl System { } /// Runner that keeps a [System]'s event loop alive until stop message is received. -#[cfg(not(feature = "io-uring"))] +//#[cfg(not(feature = "io-uring"))] #[must_use = "A SystemRunner does nothing unless `run` is called."] #[derive(Debug)] pub struct SystemRunner { @@ -179,7 +201,7 @@ pub struct SystemRunner { stop_rx: oneshot::Receiver, } -#[cfg(not(feature = "io-uring"))] +//#[cfg(not(feature = "io-uring"))] impl SystemRunner { /// Starts event loop and will return once [System] is [stopped](System::stop). pub fn run(self) -> io::Result<()> { @@ -210,12 +232,15 @@ impl SystemRunner { self.rt.block_on(fut) } } - +/* /// Runner that keeps a [System]'s event loop alive until stop message is received. #[cfg(feature = "io-uring")] #[must_use = "A SystemRunner does nothing unless `run` is called."] #[derive(Debug)] -pub struct SystemRunner; +pub struct SystemRunner { + rt: crate::runtime::Runtime, + stop_rx: oneshot::Receiver, +} #[cfg(feature = "io-uring")] impl SystemRunner { @@ -255,7 +280,7 @@ impl SystemRunner { res }) } -} +}*/ #[derive(Debug)] pub(crate) enum SystemCommand {