From 4ab39c9341bcbb0f3c9f5baf7d560acc6a2e1fd5 Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Wed, 4 Feb 2026 06:06:32 +0900 Subject: [PATCH] feat(rt): implement `stop_future`/`into_parts` (#808) --- actix-rt/CHANGES.md | 1 + actix-rt/src/lib.rs | 2 +- actix-rt/src/system.rs | 97 +++++++++++++++++++++++++++++++++++++---- actix-rt/tests/tests.rs | 28 ++++++++++++ 4 files changed, 119 insertions(+), 9 deletions(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 6ba3532b..5bbb230f 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - Minimum supported Rust version (MSRV) is now 1.88. +- Add `SystemRunner::stop_future` and `SystemRunner::into_parts` for awaiting system stop inside `block_on`. ## 2.11.0 diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index fb795f0d..ec2176d3 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -68,7 +68,7 @@ use tokio::task::JoinHandle; pub use self::{ arbiter::{Arbiter, ArbiterHandle}, runtime::Runtime, - system::{System, SystemRunner}, + system::{System, SystemRunner, SystemStop}, }; pub mod signal { diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 77b11f4b..6375cb0e 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -9,7 +9,7 @@ use std::{ }; use futures_core::ready; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, watch}; use crate::{arbiter::ArbiterHandle, Arbiter}; @@ -50,7 +50,7 @@ impl System { where F: FnOnce() -> tokio::runtime::Runtime, { - let (stop_tx, stop_rx) = oneshot::channel(); + let (stop_tx, stop_rx) = watch::channel(None); let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let rt = crate::runtime::Runtime::from(runtime_factory()); @@ -176,7 +176,7 @@ impl System { #[derive(Debug)] pub struct SystemRunner { rt: crate::runtime::Runtime, - stop_rx: oneshot::Receiver, + stop_rx: watch::Receiver>, } #[cfg(not(feature = "io-uring"))] @@ -196,7 +196,7 @@ impl SystemRunner { let SystemRunner { rt, stop_rx, .. } = self; // run loop - rt.block_on(stop_rx).map_err(io::Error::other) + rt.block_on(wait_for_stop(stop_rx)) } /// Retrieves a reference to the underlying [Actix runtime](crate::Runtime) associated with this @@ -233,6 +233,43 @@ impl SystemRunner { &self.rt } + /// Returns a future that resolves with the system's exit code when it is stopped. + /// + /// This can be used to react to a system stop signal while running a future with + /// [`SystemRunner::block_on`], such as when coordinating shutdown with `tokio::select!`. + /// + /// # Examples + /// ```no_run + /// use std::process::ExitCode; + /// use actix_rt::System; + /// + /// let sys = System::new(); + /// let stop = sys.stop_future(); + /// + /// let exit = sys.block_on(async move { + /// actix_rt::spawn(async { + /// System::current().stop_with_code(0); + /// }); + /// + /// let code = stop.await.unwrap_or(1); + /// ExitCode::from(code as u8) + /// }); + /// + /// # drop(exit); + /// ``` + pub fn stop_future(&self) -> SystemStop { + SystemStop::new(self.stop_rx.clone()) + } + + /// Splits this runner into its runtime and a future that resolves when the system stops. + /// + /// After calling this method, [`SystemRunner::run`] and [`SystemRunner::run_with_code`] can no + /// longer be used. + pub fn into_parts(self) -> (crate::runtime::Runtime, SystemStop) { + let SystemRunner { rt, stop_rx } = self; + (rt, SystemStop::new(stop_rx)) + } + /// Runs the provided future, blocking the current thread until the future completes. #[track_caller] #[inline] @@ -259,11 +296,21 @@ impl SystemRunner { unimplemented!("SystemRunner::run_with_code is not implemented for io-uring feature yet"); } + /// Returns a future that resolves with the system's exit code when it is stopped. + pub fn stop_future(&self) -> SystemStop { + unimplemented!("SystemRunner::stop_future is not implemented for io-uring feature yet"); + } + + /// Splits this runner into its runtime and a future that resolves when the system stops. + pub fn into_parts(self) -> (crate::runtime::Runtime, SystemStop) { + unimplemented!("SystemRunner::into_parts is not implemented for io-uring feature yet"); + } + /// Runs the provided future, blocking the current thread until the future completes. #[inline] pub fn block_on(&self, fut: F) -> F::Output { tokio_uring::start(async move { - let (stop_tx, stop_rx) = oneshot::channel(); + let (stop_tx, stop_rx) = watch::channel(None); let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let sys_arbiter = Arbiter::in_new_system(); @@ -285,6 +332,40 @@ impl SystemRunner { } } +/// Future that resolves with the exit code when a [`System`] is stopped. +#[must_use = "SystemStop does nothing unless polled or awaited."] +pub struct SystemStop { + inner: Pin> + 'static>>, +} + +impl SystemStop { + #[cfg_attr(feature = "io-uring", allow(dead_code))] + fn new(stop_rx: watch::Receiver>) -> Self { + Self { + inner: Box::pin(wait_for_stop(stop_rx)), + } + } +} + +impl Future for SystemStop { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.inner.as_mut().poll(cx) + } +} + +#[cfg_attr(feature = "io-uring", allow(dead_code))] +async fn wait_for_stop(mut stop_rx: watch::Receiver>) -> io::Result { + loop { + if let Some(code) = *stop_rx.borrow() { + return Ok(code); + } + + stop_rx.changed().await.map_err(io::Error::other)?; + } +} + #[derive(Debug)] pub(crate) enum SystemCommand { Exit(i32), @@ -296,7 +377,7 @@ pub(crate) enum SystemCommand { /// [Arbiter]s and is able to distribute a system-wide stop command. #[derive(Debug)] pub(crate) struct SystemController { - stop_tx: Option>, + stop_tx: Option>>, cmd_rx: mpsc::UnboundedReceiver, arbiters: HashMap, } @@ -304,7 +385,7 @@ pub(crate) struct SystemController { impl SystemController { pub(crate) fn new( cmd_rx: mpsc::UnboundedReceiver, - stop_tx: oneshot::Sender, + stop_tx: watch::Sender>, ) -> Self { SystemController { cmd_rx, @@ -335,7 +416,7 @@ impl Future for SystemController { // stop event loop // will only fire once if let Some(stop_tx) = self.stop_tx.take() { - let _ = stop_tx.send(code); + let _ = stop_tx.send(Some(code)); } } diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index ae4c6812..f5493b4d 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -34,6 +34,34 @@ fn run_with_code() { assert_eq!(exit_code, 42); } +#[cfg(not(feature = "io-uring"))] +#[test] +fn stop_future_resolves() { + let sys = System::new(); + let stop = sys.stop_future(); + + let exit_code = sys.block_on(async move { + System::current().stop_with_code(7); + stop.await.expect("stop future should resolve") + }); + + assert_eq!(exit_code, 7); +} + +#[cfg(not(feature = "io-uring"))] +#[test] +fn into_parts_stop_future_resolves() { + let sys = System::new(); + let (rt, stop) = sys.into_parts(); + + let exit_code = rt.block_on(async move { + System::current().stop_with_code(9); + stop.await.expect("stop future should resolve") + }); + + assert_eq!(exit_code, 9); +} + #[test] fn join_another_arbiter() { let time = Duration::from_secs(1);