feat(rt): implement `stop_future`/`into_parts` (#808)

This commit is contained in:
Yuki Okushi 2026-02-04 06:06:32 +09:00 committed by GitHub
parent 13e17e77b8
commit 4ab39c9341
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 119 additions and 9 deletions

View File

@ -3,6 +3,7 @@
## Unreleased
- Minimum supported Rust version (MSRV) is now 1.88.
- Add `SystemRunner::stop_future` and `SystemRunner::into_parts` for awaiting system stop inside `block_on`.
## 2.11.0

View File

@ -68,7 +68,7 @@ use tokio::task::JoinHandle;
pub use self::{
arbiter::{Arbiter, ArbiterHandle},
runtime::Runtime,
system::{System, SystemRunner},
system::{System, SystemRunner, SystemStop},
};
pub mod signal {

View File

@ -9,7 +9,7 @@ use std::{
};
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, watch};
use crate::{arbiter::ArbiterHandle, Arbiter};
@ -50,7 +50,7 @@ impl System {
where
F: FnOnce() -> tokio::runtime::Runtime,
{
let (stop_tx, stop_rx) = oneshot::channel();
let (stop_tx, stop_rx) = watch::channel(None);
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = crate::runtime::Runtime::from(runtime_factory());
@ -176,7 +176,7 @@ impl System {
#[derive(Debug)]
pub struct SystemRunner {
rt: crate::runtime::Runtime,
stop_rx: oneshot::Receiver<i32>,
stop_rx: watch::Receiver<Option<i32>>,
}
#[cfg(not(feature = "io-uring"))]
@ -196,7 +196,7 @@ impl SystemRunner {
let SystemRunner { rt, stop_rx, .. } = self;
// run loop
rt.block_on(stop_rx).map_err(io::Error::other)
rt.block_on(wait_for_stop(stop_rx))
}
/// Retrieves a reference to the underlying [Actix runtime](crate::Runtime) associated with this
@ -233,6 +233,43 @@ impl SystemRunner {
&self.rt
}
/// Returns a future that resolves with the system's exit code when it is stopped.
///
/// This can be used to react to a system stop signal while running a future with
/// [`SystemRunner::block_on`], such as when coordinating shutdown with `tokio::select!`.
///
/// # Examples
/// ```no_run
/// use std::process::ExitCode;
/// use actix_rt::System;
///
/// let sys = System::new();
/// let stop = sys.stop_future();
///
/// let exit = sys.block_on(async move {
/// actix_rt::spawn(async {
/// System::current().stop_with_code(0);
/// });
///
/// let code = stop.await.unwrap_or(1);
/// ExitCode::from(code as u8)
/// });
///
/// # drop(exit);
/// ```
pub fn stop_future(&self) -> SystemStop {
SystemStop::new(self.stop_rx.clone())
}
/// Splits this runner into its runtime and a future that resolves when the system stops.
///
/// After calling this method, [`SystemRunner::run`] and [`SystemRunner::run_with_code`] can no
/// longer be used.
pub fn into_parts(self) -> (crate::runtime::Runtime, SystemStop) {
let SystemRunner { rt, stop_rx } = self;
(rt, SystemStop::new(stop_rx))
}
/// Runs the provided future, blocking the current thread until the future completes.
#[track_caller]
#[inline]
@ -259,11 +296,21 @@ impl SystemRunner {
unimplemented!("SystemRunner::run_with_code is not implemented for io-uring feature yet");
}
/// Returns a future that resolves with the system's exit code when it is stopped.
pub fn stop_future(&self) -> SystemStop {
unimplemented!("SystemRunner::stop_future is not implemented for io-uring feature yet");
}
/// Splits this runner into its runtime and a future that resolves when the system stops.
pub fn into_parts(self) -> (crate::runtime::Runtime, SystemStop) {
unimplemented!("SystemRunner::into_parts is not implemented for io-uring feature yet");
}
/// Runs the provided future, blocking the current thread until the future completes.
#[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
tokio_uring::start(async move {
let (stop_tx, stop_rx) = oneshot::channel();
let (stop_tx, stop_rx) = watch::channel(None);
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let sys_arbiter = Arbiter::in_new_system();
@ -285,6 +332,40 @@ impl SystemRunner {
}
}
/// Future that resolves with the exit code when a [`System`] is stopped.
#[must_use = "SystemStop does nothing unless polled or awaited."]
pub struct SystemStop {
inner: Pin<Box<dyn Future<Output = io::Result<i32>> + 'static>>,
}
impl SystemStop {
#[cfg_attr(feature = "io-uring", allow(dead_code))]
fn new(stop_rx: watch::Receiver<Option<i32>>) -> Self {
Self {
inner: Box::pin(wait_for_stop(stop_rx)),
}
}
}
impl Future for SystemStop {
type Output = io::Result<i32>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}
#[cfg_attr(feature = "io-uring", allow(dead_code))]
async fn wait_for_stop(mut stop_rx: watch::Receiver<Option<i32>>) -> io::Result<i32> {
loop {
if let Some(code) = *stop_rx.borrow() {
return Ok(code);
}
stop_rx.changed().await.map_err(io::Error::other)?;
}
}
#[derive(Debug)]
pub(crate) enum SystemCommand {
Exit(i32),
@ -296,7 +377,7 @@ pub(crate) enum SystemCommand {
/// [Arbiter]s and is able to distribute a system-wide stop command.
#[derive(Debug)]
pub(crate) struct SystemController {
stop_tx: Option<oneshot::Sender<i32>>,
stop_tx: Option<watch::Sender<Option<i32>>>,
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
arbiters: HashMap<usize, ArbiterHandle>,
}
@ -304,7 +385,7 @@ pub(crate) struct SystemController {
impl SystemController {
pub(crate) fn new(
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
stop_tx: oneshot::Sender<i32>,
stop_tx: watch::Sender<Option<i32>>,
) -> Self {
SystemController {
cmd_rx,
@ -335,7 +416,7 @@ impl Future for SystemController {
// stop event loop
// will only fire once
if let Some(stop_tx) = self.stop_tx.take() {
let _ = stop_tx.send(code);
let _ = stop_tx.send(Some(code));
}
}

View File

@ -34,6 +34,34 @@ fn run_with_code() {
assert_eq!(exit_code, 42);
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn stop_future_resolves() {
let sys = System::new();
let stop = sys.stop_future();
let exit_code = sys.block_on(async move {
System::current().stop_with_code(7);
stop.await.expect("stop future should resolve")
});
assert_eq!(exit_code, 7);
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn into_parts_stop_future_resolves() {
let sys = System::new();
let (rt, stop) = sys.into_parts();
let exit_code = rt.block_on(async move {
System::current().stop_with_code(9);
stop.await.expect("stop future should resolve")
});
assert_eq!(exit_code, 9);
}
#[test]
fn join_another_arbiter() {
let time = Duration::from_secs(1);