From 2c401e08e401717448126d581d883bde15ff0743 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Sun, 8 Feb 2026 09:51:23 +0900 Subject: [PATCH] feat(rt): accept shared tokio runtimes in with_tokio_rt (#811) --- actix-rt/CHANGES.md | 1 + actix-rt/src/arbiter.rs | 12 +++-- actix-rt/src/runtime.rs | 45 ++++++++++++++++--- actix-rt/src/system.rs | 17 ++++--- actix-rt/tests/tests.rs | 98 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 159 insertions(+), 14 deletions(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 5bbb230f..90822164 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -4,6 +4,7 @@ - Minimum supported Rust version (MSRV) is now 1.88. - Add `SystemRunner::stop_future` and `SystemRunner::into_parts` for awaiting system stop inside `block_on`. +- Allow `{System, Arbiter}::with_tokio_rt` to accept shared Tokio runtimes (e.g. `Arc` or `&'static tokio::runtime::Runtime`). ## 2.11.0 diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index b2d723e6..80ba055b 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -105,11 +105,17 @@ impl Arbiter { /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. /// + /// The closure may return any type that can be converted into [`Runtime`], such as + /// `tokio::runtime::Runtime`, `Arc`, or + /// `&'static tokio::runtime::Runtime`. + /// /// [tokio-runtime]: tokio::runtime::Runtime + /// [`Runtime`]: crate::Runtime #[cfg(not(all(target_os = "linux", feature = "io-uring")))] - pub fn with_tokio_rt(runtime_factory: F) -> Arbiter + pub fn with_tokio_rt(runtime_factory: F) -> Arbiter where - F: FnOnce() -> tokio::runtime::Runtime + Send + 'static, + F: FnOnce() -> R + Send + 'static, + R: Into + Send + 'static, { let sys = System::current(); let system_id = sys.id(); @@ -125,7 +131,7 @@ impl Arbiter { .spawn({ let tx = tx.clone(); move || { - let rt = crate::runtime::Runtime::from(runtime_factory()); + let rt = runtime_factory().into(); let hnd = ArbiterHandle::new(tx); System::set_current(sys); diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index 55e29a77..408853c8 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -1,7 +1,14 @@ -use std::{future::Future, io}; +use std::{future::Future, io, sync::Arc}; use tokio::task::{JoinHandle, LocalSet}; +#[derive(Debug)] +enum RuntimeInner { + Owned(tokio::runtime::Runtime), + Shared(Arc), + Static(&'static tokio::runtime::Runtime), +} + /// A Tokio-based runtime proxy. /// /// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound @@ -9,7 +16,7 @@ use tokio::task::{JoinHandle, LocalSet}; #[derive(Debug)] pub struct Runtime { local: LocalSet, - rt: tokio::runtime::Runtime, + rt: RuntimeInner, } pub(crate) fn default_tokio_runtime() -> io::Result { @@ -26,11 +33,19 @@ impl Runtime { let rt = default_tokio_runtime()?; Ok(Runtime { - rt, + rt: RuntimeInner::Owned(rt), local: LocalSet::new(), }) } + fn tokio_runtime_ref(&self) -> &tokio::runtime::Runtime { + match &self.rt { + RuntimeInner::Owned(rt) => rt, + RuntimeInner::Shared(rt) => rt, + RuntimeInner::Static(rt) => rt, + } + } + /// Offload a future onto the single-threaded runtime. /// /// The returned join handle can be used to await the future's result. @@ -114,7 +129,7 @@ impl Runtime { /// of the Actix runtime. This is because Tokio is responsible for driving the Actix system, /// and blocking tasks could delay or deadlock other tasks in run loop. pub fn tokio_runtime(&self) -> &tokio::runtime::Runtime { - &self.rt + self.tokio_runtime_ref() } /// Runs the provided future, blocking the current thread until the future completes. @@ -135,7 +150,7 @@ impl Runtime { where F: Future, { - self.local.block_on(&self.rt, f) + self.local.block_on(self.tokio_runtime_ref(), f) } } @@ -143,7 +158,25 @@ impl From for Runtime { fn from(rt: tokio::runtime::Runtime) -> Self { Self { local: LocalSet::new(), - rt, + rt: RuntimeInner::Owned(rt), + } + } +} + +impl From> for Runtime { + fn from(rt: Arc) -> Self { + Self { + local: LocalSet::new(), + rt: RuntimeInner::Shared(rt), + } + } +} + +impl From<&'static tokio::runtime::Runtime> for Runtime { + fn from(rt: &'static tokio::runtime::Runtime) -> Self { + Self { + local: LocalSet::new(), + rt: RuntimeInner::Static(rt), } } } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 6375cb0e..76f29df5 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -45,15 +45,21 @@ impl System { /// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure. /// + /// The closure may return any type that can be converted into [`Runtime`], such as + /// `tokio::runtime::Runtime`, `Arc`, or + /// `&'static tokio::runtime::Runtime`. + /// /// [tokio-runtime]: tokio::runtime::Runtime - pub fn with_tokio_rt(runtime_factory: F) -> SystemRunner + /// [`Runtime`]: crate::Runtime + pub fn with_tokio_rt(runtime_factory: F) -> SystemRunner where - F: FnOnce() -> tokio::runtime::Runtime, + F: FnOnce() -> R, + R: Into, { let (stop_tx, stop_rx) = watch::channel(None); let (sys_tx, sys_rx) = mpsc::unbounded_channel(); - let rt = crate::runtime::Runtime::from(runtime_factory()); + let rt = runtime_factory().into(); let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() }); let system = System::construct(sys_tx, sys_arbiter.clone()); @@ -85,9 +91,10 @@ impl System { /// /// [tokio-runtime]: tokio::runtime::Runtime #[doc(hidden)] - pub fn with_tokio_rt(_: F) -> SystemRunner + pub fn with_tokio_rt(_: F) -> SystemRunner where - F: FnOnce() -> tokio::runtime::Runtime, + F: FnOnce() -> R, + R: Into, { unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet") } diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index f5493b4d..1110a055 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -298,6 +298,65 @@ fn new_system_with_tokio() { assert_eq!(rx.recv().unwrap(), 42); } +#[cfg(not(feature = "io-uring"))] +#[test] +fn new_system_with_shared_tokio_runtime() { + use std::sync::Arc; + + let (tx, rx) = channel(); + + let rt = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_io() + .enable_time() + .worker_threads(2) + .max_blocking_threads(2) + .build() + .unwrap(), + ); + + let res = System::with_tokio_rt({ + let rt = rt.clone(); + move || rt + }) + .block_on(async { + actix_rt::time::sleep(Duration::from_millis(1)).await; + + tokio::task::spawn(async move { + tx.send(7).unwrap(); + }) + .await + .unwrap(); + + 321usize + }); + + assert_eq!(res, 321); + assert_eq!(rx.recv().unwrap(), 7); +} + +#[cfg(not(feature = "io-uring"))] +#[test] +fn new_system_with_static_tokio_runtime() { + use std::sync::OnceLock; + + static TOKIO: OnceLock = OnceLock::new(); + + let res = System::with_tokio_rt(|| -> &'static tokio::runtime::Runtime { + TOKIO.get_or_init(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_io() + .enable_time() + .worker_threads(1) + .build() + .unwrap() + }) + }) + .block_on(async { 7usize }); + + assert_eq!(res, 7); +} + #[cfg(not(feature = "io-uring"))] #[test] fn new_arbiter_with_tokio() { @@ -331,6 +390,45 @@ fn new_arbiter_with_tokio() { assert!(!counter.load(Ordering::SeqCst)); } +#[cfg(not(feature = "io-uring"))] +#[test] +fn new_arbiter_with_shared_tokio_runtime() { + use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }; + + let _ = System::new(); + + let rt = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(2) + .build() + .unwrap(), + ); + + let arb = Arbiter::with_tokio_rt({ + let rt = rt.clone(); + move || rt + }); + + let flag = Arc::new(AtomicBool::new(false)); + + let flag1 = flag.clone(); + let did_spawn = arb.spawn(async move { + actix_rt::time::sleep(Duration::from_millis(1)).await; + flag1.store(true, Ordering::SeqCst); + Arbiter::current().stop(); + }); + + assert!(did_spawn); + + arb.join().unwrap(); + + assert!(flag.load(Ordering::SeqCst)); +} + #[test] #[should_panic] fn no_system_current_panic() {