feat(rt): accept shared tokio runtimes in with_tokio_rt (#811)

This commit is contained in:
Yuki Okushi 2026-02-08 09:51:23 +09:00 committed by GitHub
parent 1e07dce27e
commit 2c401e08e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 159 additions and 14 deletions

View File

@ -4,6 +4,7 @@
- Minimum supported Rust version (MSRV) is now 1.88.
- Add `SystemRunner::stop_future` and `SystemRunner::into_parts` for awaiting system stop inside `block_on`.
- Allow `{System, Arbiter}::with_tokio_rt` to accept shared Tokio runtimes (e.g. `Arc<tokio::runtime::Runtime>` or `&'static tokio::runtime::Runtime`).
## 2.11.0

View File

@ -105,11 +105,17 @@ impl Arbiter {
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// The closure may return any type that can be converted into [`Runtime`], such as
/// `tokio::runtime::Runtime`, `Arc<tokio::runtime::Runtime>`, or
/// `&'static tokio::runtime::Runtime`.
///
/// [tokio-runtime]: tokio::runtime::Runtime
/// [`Runtime`]: crate::Runtime
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
pub fn with_tokio_rt<F, R>(runtime_factory: F) -> Arbiter
where
F: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
F: FnOnce() -> R + Send + 'static,
R: Into<crate::runtime::Runtime> + Send + 'static,
{
let sys = System::current();
let system_id = sys.id();
@ -125,7 +131,7 @@ impl Arbiter {
.spawn({
let tx = tx.clone();
move || {
let rt = crate::runtime::Runtime::from(runtime_factory());
let rt = runtime_factory().into();
let hnd = ArbiterHandle::new(tx);
System::set_current(sys);

View File

@ -1,7 +1,14 @@
use std::{future::Future, io};
use std::{future::Future, io, sync::Arc};
use tokio::task::{JoinHandle, LocalSet};
#[derive(Debug)]
enum RuntimeInner {
Owned(tokio::runtime::Runtime),
Shared(Arc<tokio::runtime::Runtime>),
Static(&'static tokio::runtime::Runtime),
}
/// A Tokio-based runtime proxy.
///
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
@ -9,7 +16,7 @@ use tokio::task::{JoinHandle, LocalSet};
#[derive(Debug)]
pub struct Runtime {
local: LocalSet,
rt: tokio::runtime::Runtime,
rt: RuntimeInner,
}
pub(crate) fn default_tokio_runtime() -> io::Result<tokio::runtime::Runtime> {
@ -26,11 +33,19 @@ impl Runtime {
let rt = default_tokio_runtime()?;
Ok(Runtime {
rt,
rt: RuntimeInner::Owned(rt),
local: LocalSet::new(),
})
}
fn tokio_runtime_ref(&self) -> &tokio::runtime::Runtime {
match &self.rt {
RuntimeInner::Owned(rt) => rt,
RuntimeInner::Shared(rt) => rt,
RuntimeInner::Static(rt) => rt,
}
}
/// Offload a future onto the single-threaded runtime.
///
/// The returned join handle can be used to await the future's result.
@ -114,7 +129,7 @@ impl Runtime {
/// of the Actix runtime. This is because Tokio is responsible for driving the Actix system,
/// and blocking tasks could delay or deadlock other tasks in run loop.
pub fn tokio_runtime(&self) -> &tokio::runtime::Runtime {
&self.rt
self.tokio_runtime_ref()
}
/// Runs the provided future, blocking the current thread until the future completes.
@ -135,7 +150,7 @@ impl Runtime {
where
F: Future,
{
self.local.block_on(&self.rt, f)
self.local.block_on(self.tokio_runtime_ref(), f)
}
}
@ -143,7 +158,25 @@ impl From<tokio::runtime::Runtime> for Runtime {
fn from(rt: tokio::runtime::Runtime) -> Self {
Self {
local: LocalSet::new(),
rt,
rt: RuntimeInner::Owned(rt),
}
}
}
impl From<Arc<tokio::runtime::Runtime>> for Runtime {
fn from(rt: Arc<tokio::runtime::Runtime>) -> Self {
Self {
local: LocalSet::new(),
rt: RuntimeInner::Shared(rt),
}
}
}
impl From<&'static tokio::runtime::Runtime> for Runtime {
fn from(rt: &'static tokio::runtime::Runtime) -> Self {
Self {
local: LocalSet::new(),
rt: RuntimeInner::Static(rt),
}
}
}

View File

@ -45,15 +45,21 @@ impl System {
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// The closure may return any type that can be converted into [`Runtime`], such as
/// `tokio::runtime::Runtime`, `Arc<tokio::runtime::Runtime>`, or
/// `&'static tokio::runtime::Runtime`.
///
/// [tokio-runtime]: tokio::runtime::Runtime
pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
/// [`Runtime`]: crate::Runtime
pub fn with_tokio_rt<F, R>(runtime_factory: F) -> SystemRunner
where
F: FnOnce() -> tokio::runtime::Runtime,
F: FnOnce() -> R,
R: Into<crate::runtime::Runtime>,
{
let (stop_tx, stop_rx) = watch::channel(None);
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = crate::runtime::Runtime::from(runtime_factory());
let rt = runtime_factory().into();
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
let system = System::construct(sys_tx, sys_arbiter.clone());
@ -85,9 +91,10 @@ impl System {
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
pub fn with_tokio_rt<F, R>(_: F) -> SystemRunner
where
F: FnOnce() -> tokio::runtime::Runtime,
F: FnOnce() -> R,
R: Into<crate::runtime::Runtime>,
{
unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet")
}

View File

@ -298,6 +298,65 @@ fn new_system_with_tokio() {
assert_eq!(rx.recv().unwrap(), 42);
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn new_system_with_shared_tokio_runtime() {
use std::sync::Arc;
let (tx, rx) = channel();
let rt = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.worker_threads(2)
.max_blocking_threads(2)
.build()
.unwrap(),
);
let res = System::with_tokio_rt({
let rt = rt.clone();
move || rt
})
.block_on(async {
actix_rt::time::sleep(Duration::from_millis(1)).await;
tokio::task::spawn(async move {
tx.send(7).unwrap();
})
.await
.unwrap();
321usize
});
assert_eq!(res, 321);
assert_eq!(rx.recv().unwrap(), 7);
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn new_system_with_static_tokio_runtime() {
use std::sync::OnceLock;
static TOKIO: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
let res = System::with_tokio_rt(|| -> &'static tokio::runtime::Runtime {
TOKIO.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_io()
.enable_time()
.worker_threads(1)
.build()
.unwrap()
})
})
.block_on(async { 7usize });
assert_eq!(res, 7);
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn new_arbiter_with_tokio() {
@ -331,6 +390,45 @@ fn new_arbiter_with_tokio() {
assert!(!counter.load(Ordering::SeqCst));
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn new_arbiter_with_shared_tokio_runtime() {
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
let _ = System::new();
let rt = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(2)
.build()
.unwrap(),
);
let arb = Arbiter::with_tokio_rt({
let rt = rt.clone();
move || rt
});
let flag = Arc::new(AtomicBool::new(false));
let flag1 = flag.clone();
let did_spawn = arb.spawn(async move {
actix_rt::time::sleep(Duration::from_millis(1)).await;
flag1.store(true, Ordering::SeqCst);
Arbiter::current().stop();
});
assert!(did_spawn);
arb.join().unwrap();
assert!(flag.load(Ordering::SeqCst));
}
#[test]
#[should_panic]
fn no_system_current_panic() {