feat: server shutdown signal

This commit is contained in:
Rob Ede 2025-05-09 05:37:20 +01:00
parent 270360e095
commit eb061205b9
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
8 changed files with 170 additions and 27 deletions

View File

@ -1,3 +1,8 @@
version: "0.2" version: "0.2"
words: words:
- actix
- addrs
- mptcp
- nonblocking
- oneshot
- rustup - rustup

2
Cargo.lock generated
View File

@ -56,8 +56,10 @@ dependencies = [
"mio", "mio",
"pretty_env_logger", "pretty_env_logger",
"socket2 0.5.9", "socket2 0.5.9",
"static_assertions",
"tokio", "tokio",
"tokio-uring", "tokio-uring",
"tokio-util",
"tracing", "tracing",
] ]

View File

@ -2,6 +2,7 @@
## Unreleased ## Unreleased
- Add `ServerBuilder::shutdown_signal()` method.
- Minimum supported Rust version (MSRV) is now 1.74. - Minimum supported Rust version (MSRV) is now 1.74.
## 2.5.1 ## 2.5.1

View File

@ -44,7 +44,9 @@ actix-rt = "2.8"
bytes = "1" bytes = "1"
futures-util = { version = "0.3.17", default-features = false, features = ["sink", "async-await-macro"] } futures-util = { version = "0.3.17", default-features = false, features = ["sink", "async-await-macro"] }
pretty_env_logger = "0.5" pretty_env_logger = "0.5"
tokio = { version = "1.44.2", features = ["io-util", "rt-multi-thread", "macros", "fs"] } static_assertions = "1"
tokio = { version = "1.44.2", features = ["io-util", "rt-multi-thread", "macros", "fs", "time"] }
tokio-util = "0.7"
[lints] [lints]
workspace = true workspace = true

View File

@ -0,0 +1,49 @@
//! Demonstrates use of the `ServerBuilder::shutdown_signal` method using `tokio-util`s
//! `CancellationToken` helper using a nonsensical timer. In practice, this cancellation token would
//! be wired throughout your application and typically triggered by OS signals elsewhere.
use std::{io, time::Duration};
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::fn_service;
use tokio_util::sync::CancellationToken;
async fn run(stop_signal: CancellationToken) -> io::Result<()> {
pretty_env_logger::formatted_timed_builder()
.parse_env(pretty_env_logger::env_logger::Env::default().default_filter_or("info"));
let addr = ("127.0.0.1", 8080);
tracing::info!("starting server on port: {}", &addr.0);
// let (tx, rx) = tokio::sync::oneshot::channel::<()>();
// let (tx, mut rx) = tokio::sync::broadcast::channel::<()>(1);
Server::build()
.bind("shutdown-signal", addr, || {
fn_service(|_stream: TcpStream| async { Ok::<_, io::Error>(()) })
})?
.shutdown_signal(stop_signal.cancelled_owned())
// .shutdown_signal(async move {
// rx.await;
// // rx.recv().await;
// })
.run()
.await
}
#[tokio::main]
async fn main() -> io::Result<()> {
let stop_signal = CancellationToken::new();
tokio::spawn({
let stop_signal = stop_signal.clone();
async move {
tokio::time::sleep(Duration::from_secs(10)).await;
stop_signal.cancel();
}
});
run(stop_signal).await?;
Ok(())
}

View File

@ -1,6 +1,7 @@
use std::{io, num::NonZeroUsize, time::Duration}; use std::{future::Future, io, num::NonZeroUsize, time::Duration};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use futures_core::future::BoxFuture;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::{ use crate::{
@ -39,6 +40,7 @@ pub struct ServerBuilder {
pub(crate) mptcp: MpTcp, pub(crate) mptcp: MpTcp,
pub(crate) exit: bool, pub(crate) exit: bool,
pub(crate) listen_os_signals: bool, pub(crate) listen_os_signals: bool,
pub(crate) shutdown_signal: Option<BoxFuture<'static, ()>>,
pub(crate) cmd_tx: UnboundedSender<ServerCommand>, pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>, pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>,
pub(crate) worker_config: ServerWorkerConfig, pub(crate) worker_config: ServerWorkerConfig,
@ -64,6 +66,7 @@ impl ServerBuilder {
mptcp: MpTcp::Disabled, mptcp: MpTcp::Disabled,
exit: false, exit: false,
listen_os_signals: true, listen_os_signals: true,
shutdown_signal: None,
cmd_tx, cmd_tx,
cmd_rx, cmd_rx,
worker_config: ServerWorkerConfig::default(), worker_config: ServerWorkerConfig::default(),
@ -170,6 +173,41 @@ impl ServerBuilder {
self self
} }
/// Specify shutdown signal from a future.
///
/// Using this method will prevent OS signal handlers being set up.
///
/// Typically, a `CancellationToken` or `oneshot` / `broadcast` channel will be used.
///
/// # Examples
///
/// ```
/// # use std::io;
/// # use tokio::net::TcpStream;
/// # use actix_server::Server;
/// # async fn run() -> io::Result<()> {
/// use actix_service::fn_service;
/// use tokio_util::sync::CancellationToken;
///
/// let stop_signal = CancellationToken::new();
///
/// Server::build()
/// .bind("shutdown-signal", "127.0.0.1:12345", || {
/// fn_service(|_stream: TcpStream| async { Ok::<_, io::Error>(()) })
/// })?
/// .shutdown_signal(stop_signal.cancelled_owned())
/// .run()
/// .await
/// # }
/// ```
pub fn shutdown_signal<Fut>(mut self, shutdown_signal: Fut) -> Self
where
Fut: Future<Output = ()> + Send + 'static,
{
self.shutdown_signal = Some(Box::pin(shutdown_signal));
self
}
/// Timeout for graceful workers shutdown in seconds. /// Timeout for graceful workers shutdown in seconds.
/// ///
/// After receiving a stop signal, workers have this much time to finish serving requests. /// After receiving a stop signal, workers have this much time to finish serving requests.

View File

@ -18,7 +18,7 @@ use crate::{
builder::ServerBuilder, builder::ServerBuilder,
join_all::join_all, join_all::join_all,
service::InternalServiceFactory, service::InternalServiceFactory,
signals::{SignalKind, Signals}, signals::{OsSignals, SignalKind, StopSignal},
waker_queue::{WakerInterest, WakerQueue}, waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}, worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
ServerHandle, ServerHandle,
@ -210,7 +210,12 @@ impl ServerInner {
let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?; let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?;
let mux = ServerEventMultiplexer { let mux = ServerEventMultiplexer {
signal_fut: (builder.listen_os_signals).then(Signals::new), signal_fut: builder.shutdown_signal.map(StopSignal::Cancel).or_else(|| {
builder
.listen_os_signals
.then(OsSignals::new)
.map(StopSignal::Os)
}),
cmd_rx: builder.cmd_rx, cmd_rx: builder.cmd_rx,
}; };
@ -315,7 +320,16 @@ impl ServerInner {
fn map_signal(signal: SignalKind) -> ServerCommand { fn map_signal(signal: SignalKind) -> ServerCommand {
match signal { match signal {
SignalKind::Int => { SignalKind::Cancel => {
info!("Cancellation token/channel received; starting graceful shutdown");
ServerCommand::Stop {
graceful: true,
completion: None,
force_system_stop: true,
}
}
SignalKind::OsInt => {
info!("SIGINT received; starting forced shutdown"); info!("SIGINT received; starting forced shutdown");
ServerCommand::Stop { ServerCommand::Stop {
graceful: false, graceful: false,
@ -324,7 +338,7 @@ impl ServerInner {
} }
} }
SignalKind::Term => { SignalKind::OsTerm => {
info!("SIGTERM received; starting graceful shutdown"); info!("SIGTERM received; starting graceful shutdown");
ServerCommand::Stop { ServerCommand::Stop {
graceful: true, graceful: true,
@ -333,7 +347,7 @@ impl ServerInner {
} }
} }
SignalKind::Quit => { SignalKind::OsQuit => {
info!("SIGQUIT received; starting forced shutdown"); info!("SIGQUIT received; starting forced shutdown");
ServerCommand::Stop { ServerCommand::Stop {
graceful: false, graceful: false,
@ -347,7 +361,7 @@ impl ServerInner {
struct ServerEventMultiplexer { struct ServerEventMultiplexer {
cmd_rx: UnboundedReceiver<ServerCommand>, cmd_rx: UnboundedReceiver<ServerCommand>,
signal_fut: Option<Signals>, signal_fut: Option<StopSignal>,
} }
impl Stream for ServerEventMultiplexer { impl Stream for ServerEventMultiplexer {

View File

@ -1,10 +1,11 @@
use std::{ use std::{
fmt, fmt,
future::Future, future::Future,
pin::Pin, pin::{pin, Pin},
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures_core::future::BoxFuture;
use tracing::trace; use tracing::trace;
/// Types of process signals. /// Types of process signals.
@ -12,28 +13,52 @@ use tracing::trace;
#[derive(Debug, Clone, Copy, PartialEq)] #[derive(Debug, Clone, Copy, PartialEq)]
#[allow(dead_code)] // variants are never constructed on non-unix #[allow(dead_code)] // variants are never constructed on non-unix
pub(crate) enum SignalKind { pub(crate) enum SignalKind {
/// `SIGINT` /// Cancellation token or channel.
Int, Cancel,
/// `SIGTERM` /// OS `SIGINT`.
Term, OsInt,
/// `SIGQUIT` /// OS `SIGTERM`.
Quit, OsTerm,
/// OS `SIGQUIT`.
OsQuit,
} }
impl fmt::Display for SignalKind { impl fmt::Display for SignalKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self { f.write_str(match self {
SignalKind::Int => "SIGINT", SignalKind::Cancel => "Cancellation token or channel",
SignalKind::Term => "SIGTERM", SignalKind::OsInt => "SIGINT",
SignalKind::Quit => "SIGQUIT", SignalKind::OsTerm => "SIGTERM",
SignalKind::OsQuit => "SIGQUIT",
}) })
} }
} }
pub(crate) enum StopSignal {
/// OS signal handling is configured.
Os(OsSignals),
/// Cancellation token or channel.
Cancel(BoxFuture<'static, ()>),
}
impl Future for StopSignal {
type Output = SignalKind;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.get_mut() {
StopSignal::Os(os_signals) => pin!(os_signals).poll(cx),
StopSignal::Cancel(cancel) => pin!(cancel).poll(cx).map(|()| SignalKind::Cancel),
}
}
}
/// Process signal listener. /// Process signal listener.
pub(crate) struct Signals { #[derive(Debug)]
pub(crate) struct OsSignals {
#[cfg(not(unix))] #[cfg(not(unix))]
signals: futures_core::future::BoxFuture<'static, std::io::Result<()>>, signals: futures_core::future::BoxFuture<'static, std::io::Result<()>>,
@ -41,14 +66,14 @@ pub(crate) struct Signals {
signals: Vec<(SignalKind, actix_rt::signal::unix::Signal)>, signals: Vec<(SignalKind, actix_rt::signal::unix::Signal)>,
} }
impl Signals { impl OsSignals {
/// Constructs an OS signal listening future. /// Constructs an OS signal listening future.
pub(crate) fn new() -> Self { pub(crate) fn new() -> Self {
trace!("setting up OS signal listener"); trace!("setting up OS signal listener");
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
Signals { OsSignals {
signals: Box::pin(actix_rt::signal::ctrl_c()), signals: Box::pin(actix_rt::signal::ctrl_c()),
} }
} }
@ -58,9 +83,9 @@ impl Signals {
use actix_rt::signal::unix; use actix_rt::signal::unix;
let sig_map = [ let sig_map = [
(unix::SignalKind::interrupt(), SignalKind::Int), (unix::SignalKind::interrupt(), SignalKind::OsInt),
(unix::SignalKind::terminate(), SignalKind::Term), (unix::SignalKind::terminate(), SignalKind::OsTerm),
(unix::SignalKind::quit(), SignalKind::Quit), (unix::SignalKind::quit(), SignalKind::OsQuit),
]; ];
let signals = sig_map let signals = sig_map
@ -79,18 +104,18 @@ impl Signals {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Signals { signals } OsSignals { signals }
} }
} }
} }
impl Future for Signals { impl Future for OsSignals {
type Output = SignalKind; type Output = SignalKind;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
self.signals.as_mut().poll(cx).map(|_| SignalKind::Int) self.signals.as_mut().poll(cx).map(|_| SignalKind::OsInt)
} }
#[cfg(unix)] #[cfg(unix)]
@ -106,3 +131,10 @@ impl Future for Signals {
} }
} }
} }
#[cfg(test)]
mod tests {
use super::*;
static_assertions::assert_impl_all!(StopSignal: Send, Unpin);
}