From eb061205b9243db52d3f3ead2d789d77e99c719b Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 9 May 2025 05:37:20 +0100 Subject: [PATCH] feat: server shutdown signal --- .cspell.yml | 5 ++ Cargo.lock | 2 + actix-server/CHANGES.md | 1 + actix-server/Cargo.toml | 4 +- actix-server/examples/shutdown-signal.rs | 49 +++++++++++++++++ actix-server/src/builder.rs | 40 +++++++++++++- actix-server/src/server.rs | 26 +++++++-- actix-server/src/signals.rs | 70 +++++++++++++++++------- 8 files changed, 170 insertions(+), 27 deletions(-) create mode 100644 actix-server/examples/shutdown-signal.rs diff --git a/.cspell.yml b/.cspell.yml index ce712a0e..fe18fd1e 100644 --- a/.cspell.yml +++ b/.cspell.yml @@ -1,3 +1,8 @@ version: "0.2" words: + - actix + - addrs + - mptcp + - nonblocking + - oneshot - rustup diff --git a/Cargo.lock b/Cargo.lock index cdadbbd4..145d6127 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,8 +56,10 @@ dependencies = [ "mio", "pretty_env_logger", "socket2 0.5.9", + "static_assertions", "tokio", "tokio-uring", + "tokio-util", "tracing", ] diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 268361c6..02640b16 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -2,6 +2,7 @@ ## Unreleased +- Add `ServerBuilder::shutdown_signal()` method. - Minimum supported Rust version (MSRV) is now 1.74. ## 2.5.1 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 9e2f138e..a6d78e89 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -44,7 +44,9 @@ actix-rt = "2.8" bytes = "1" futures-util = { version = "0.3.17", default-features = false, features = ["sink", "async-await-macro"] } pretty_env_logger = "0.5" -tokio = { version = "1.44.2", features = ["io-util", "rt-multi-thread", "macros", "fs"] } +static_assertions = "1" +tokio = { version = "1.44.2", features = ["io-util", "rt-multi-thread", "macros", "fs", "time"] } +tokio-util = "0.7" [lints] workspace = true diff --git a/actix-server/examples/shutdown-signal.rs b/actix-server/examples/shutdown-signal.rs new file mode 100644 index 00000000..843b9101 --- /dev/null +++ b/actix-server/examples/shutdown-signal.rs @@ -0,0 +1,49 @@ +//! Demonstrates use of the `ServerBuilder::shutdown_signal` method using `tokio-util`s +//! `CancellationToken` helper using a nonsensical timer. In practice, this cancellation token would +//! be wired throughout your application and typically triggered by OS signals elsewhere. + +use std::{io, time::Duration}; + +use actix_rt::net::TcpStream; +use actix_server::Server; +use actix_service::fn_service; +use tokio_util::sync::CancellationToken; + +async fn run(stop_signal: CancellationToken) -> io::Result<()> { + pretty_env_logger::formatted_timed_builder() + .parse_env(pretty_env_logger::env_logger::Env::default().default_filter_or("info")); + + let addr = ("127.0.0.1", 8080); + tracing::info!("starting server on port: {}", &addr.0); + + // let (tx, rx) = tokio::sync::oneshot::channel::<()>(); + // let (tx, mut rx) = tokio::sync::broadcast::channel::<()>(1); + + Server::build() + .bind("shutdown-signal", addr, || { + fn_service(|_stream: TcpStream| async { Ok::<_, io::Error>(()) }) + })? + .shutdown_signal(stop_signal.cancelled_owned()) + // .shutdown_signal(async move { + // rx.await; + // // rx.recv().await; + // }) + .run() + .await +} + +#[tokio::main] +async fn main() -> io::Result<()> { + let stop_signal = CancellationToken::new(); + + tokio::spawn({ + let stop_signal = stop_signal.clone(); + async move { + tokio::time::sleep(Duration::from_secs(10)).await; + stop_signal.cancel(); + } + }); + + run(stop_signal).await?; + Ok(()) +} diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index bd5418c1..fac873b6 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,6 +1,7 @@ -use std::{io, num::NonZeroUsize, time::Duration}; +use std::{future::Future, io, num::NonZeroUsize, time::Duration}; use actix_rt::net::TcpStream; +use futures_core::future::BoxFuture; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::{ @@ -39,6 +40,7 @@ pub struct ServerBuilder { pub(crate) mptcp: MpTcp, pub(crate) exit: bool, pub(crate) listen_os_signals: bool, + pub(crate) shutdown_signal: Option>, pub(crate) cmd_tx: UnboundedSender, pub(crate) cmd_rx: UnboundedReceiver, pub(crate) worker_config: ServerWorkerConfig, @@ -64,6 +66,7 @@ impl ServerBuilder { mptcp: MpTcp::Disabled, exit: false, listen_os_signals: true, + shutdown_signal: None, cmd_tx, cmd_rx, worker_config: ServerWorkerConfig::default(), @@ -170,6 +173,41 @@ impl ServerBuilder { self } + /// Specify shutdown signal from a future. + /// + /// Using this method will prevent OS signal handlers being set up. + /// + /// Typically, a `CancellationToken` or `oneshot` / `broadcast` channel will be used. + /// + /// # Examples + /// + /// ``` + /// # use std::io; + /// # use tokio::net::TcpStream; + /// # use actix_server::Server; + /// # async fn run() -> io::Result<()> { + /// use actix_service::fn_service; + /// use tokio_util::sync::CancellationToken; + /// + /// let stop_signal = CancellationToken::new(); + /// + /// Server::build() + /// .bind("shutdown-signal", "127.0.0.1:12345", || { + /// fn_service(|_stream: TcpStream| async { Ok::<_, io::Error>(()) }) + /// })? + /// .shutdown_signal(stop_signal.cancelled_owned()) + /// .run() + /// .await + /// # } + /// ``` + pub fn shutdown_signal(mut self, shutdown_signal: Fut) -> Self + where + Fut: Future + Send + 'static, + { + self.shutdown_signal = Some(Box::pin(shutdown_signal)); + self + } + /// Timeout for graceful workers shutdown in seconds. /// /// After receiving a stop signal, workers have this much time to finish serving requests. diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index db7d67fd..0d582978 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -18,7 +18,7 @@ use crate::{ builder::ServerBuilder, join_all::join_all, service::InternalServiceFactory, - signals::{SignalKind, Signals}, + signals::{OsSignals, SignalKind, StopSignal}, waker_queue::{WakerInterest, WakerQueue}, worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}, ServerHandle, @@ -210,7 +210,12 @@ impl ServerInner { let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?; let mux = ServerEventMultiplexer { - signal_fut: (builder.listen_os_signals).then(Signals::new), + signal_fut: builder.shutdown_signal.map(StopSignal::Cancel).or_else(|| { + builder + .listen_os_signals + .then(OsSignals::new) + .map(StopSignal::Os) + }), cmd_rx: builder.cmd_rx, }; @@ -315,7 +320,16 @@ impl ServerInner { fn map_signal(signal: SignalKind) -> ServerCommand { match signal { - SignalKind::Int => { + SignalKind::Cancel => { + info!("Cancellation token/channel received; starting graceful shutdown"); + ServerCommand::Stop { + graceful: true, + completion: None, + force_system_stop: true, + } + } + + SignalKind::OsInt => { info!("SIGINT received; starting forced shutdown"); ServerCommand::Stop { graceful: false, @@ -324,7 +338,7 @@ impl ServerInner { } } - SignalKind::Term => { + SignalKind::OsTerm => { info!("SIGTERM received; starting graceful shutdown"); ServerCommand::Stop { graceful: true, @@ -333,7 +347,7 @@ impl ServerInner { } } - SignalKind::Quit => { + SignalKind::OsQuit => { info!("SIGQUIT received; starting forced shutdown"); ServerCommand::Stop { graceful: false, @@ -347,7 +361,7 @@ impl ServerInner { struct ServerEventMultiplexer { cmd_rx: UnboundedReceiver, - signal_fut: Option, + signal_fut: Option, } impl Stream for ServerEventMultiplexer { diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index 2b01f015..7d01887d 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -1,10 +1,11 @@ use std::{ fmt, future::Future, - pin::Pin, + pin::{pin, Pin}, task::{Context, Poll}, }; +use futures_core::future::BoxFuture; use tracing::trace; /// Types of process signals. @@ -12,28 +13,52 @@ use tracing::trace; #[derive(Debug, Clone, Copy, PartialEq)] #[allow(dead_code)] // variants are never constructed on non-unix pub(crate) enum SignalKind { - /// `SIGINT` - Int, + /// Cancellation token or channel. + Cancel, - /// `SIGTERM` - Term, + /// OS `SIGINT`. + OsInt, - /// `SIGQUIT` - Quit, + /// OS `SIGTERM`. + OsTerm, + + /// OS `SIGQUIT`. + OsQuit, } impl fmt::Display for SignalKind { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(match self { - SignalKind::Int => "SIGINT", - SignalKind::Term => "SIGTERM", - SignalKind::Quit => "SIGQUIT", + SignalKind::Cancel => "Cancellation token or channel", + SignalKind::OsInt => "SIGINT", + SignalKind::OsTerm => "SIGTERM", + SignalKind::OsQuit => "SIGQUIT", }) } } +pub(crate) enum StopSignal { + /// OS signal handling is configured. + Os(OsSignals), + + /// Cancellation token or channel. + Cancel(BoxFuture<'static, ()>), +} + +impl Future for StopSignal { + type Output = SignalKind; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.get_mut() { + StopSignal::Os(os_signals) => pin!(os_signals).poll(cx), + StopSignal::Cancel(cancel) => pin!(cancel).poll(cx).map(|()| SignalKind::Cancel), + } + } +} + /// Process signal listener. -pub(crate) struct Signals { +#[derive(Debug)] +pub(crate) struct OsSignals { #[cfg(not(unix))] signals: futures_core::future::BoxFuture<'static, std::io::Result<()>>, @@ -41,14 +66,14 @@ pub(crate) struct Signals { signals: Vec<(SignalKind, actix_rt::signal::unix::Signal)>, } -impl Signals { +impl OsSignals { /// Constructs an OS signal listening future. pub(crate) fn new() -> Self { trace!("setting up OS signal listener"); #[cfg(not(unix))] { - Signals { + OsSignals { signals: Box::pin(actix_rt::signal::ctrl_c()), } } @@ -58,9 +83,9 @@ impl Signals { use actix_rt::signal::unix; let sig_map = [ - (unix::SignalKind::interrupt(), SignalKind::Int), - (unix::SignalKind::terminate(), SignalKind::Term), - (unix::SignalKind::quit(), SignalKind::Quit), + (unix::SignalKind::interrupt(), SignalKind::OsInt), + (unix::SignalKind::terminate(), SignalKind::OsTerm), + (unix::SignalKind::quit(), SignalKind::OsQuit), ]; let signals = sig_map @@ -79,18 +104,18 @@ impl Signals { }) .collect::>(); - Signals { signals } + OsSignals { signals } } } } -impl Future for Signals { +impl Future for OsSignals { type Output = SignalKind; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { #[cfg(not(unix))] { - self.signals.as_mut().poll(cx).map(|_| SignalKind::Int) + self.signals.as_mut().poll(cx).map(|_| SignalKind::OsInt) } #[cfg(unix)] @@ -106,3 +131,10 @@ impl Future for Signals { } } } + +#[cfg(test)] +mod tests { + use super::*; + + static_assertions::assert_impl_all!(StopSignal: Send, Unpin); +}