From fb5c45b11de9207a9c10d3c76137189fac5bad1d Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 23 Dec 2020 03:37:01 +0800 Subject: [PATCH] add docs and changelog. fix force shutdown --- actix-server/CHANGES.md | 3 +- actix-server/src/builder.rs | 25 +++++++++------ actix-server/tests/test_server.rs | 52 ++++++++++++++++++++++++++++++- 3 files changed, 69 insertions(+), 11 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 5e28fe0b..32d182c7 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -3,10 +3,11 @@ ## Unreleased - 2020-xx-xx * Added explicit info log message on accept queue pause. [#215] * Prevent double registration of sockets when back-pressure is resolved. [#223] +* Added ServerBuilder::on_stop to run an async closure before Server shutdown [#230] [#215]: https://github.com/actix/actix-net/pull/215 [#223]: https://github.com/actix/actix-net/pull/223 - +[#230]: https://github.com/actix/actix-net/pull/230 ## 1.0.4 - 2020-09-12 * Update actix-codec to 0.3.0. diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 3572f3a7..9572024c 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; @@ -9,7 +10,7 @@ use actix_rt::{spawn, System}; use futures_channel::mpsc::{unbounded, UnboundedReceiver}; use futures_channel::oneshot; use futures_util::stream::FuturesUnordered; -use futures_util::{future::Future, ready, stream::Stream, StreamExt}; +use futures_util::{ready, stream::Stream, StreamExt}; use log::{error, info}; use socket2::{Domain, Protocol, Socket, Type}; @@ -297,13 +298,19 @@ impl ServerBuilder { } } + /// Async closure that would run before Server is shutdown. + /// + /// The exact timing is after `ServerCommand::Stop` is received. + /// Before worker threads stopped if the shutdown is graceful. + /// + /// Or before `actix::System` is stopped when not graceful. + /// (If `ServerBuilder::system_exit` is set to true) pub fn on_stop(mut self, future: F) -> Self where F: FnOnce() -> Fut + 'static, Fut: Future, { self.on_stop = Box::pin(async move { future().await }); - self } @@ -369,6 +376,7 @@ impl ServerBuilder { self.accept.send(Command::Stop); let notify = std::mem::take(&mut self.notify); + // take the on_stop future. let mut on_stop = Box::pin(async {}) as _; std::mem::swap(&mut self.on_stop, &mut on_stop); @@ -398,22 +406,21 @@ impl ServerBuilder { } }); // we need to stop system if server was spawned - } else if self.exit { + } else { spawn(async move { on_stop.await; - delay_until(Instant::now() + Duration::from_millis(300)).await; - System::current().stop(); + if exit { + delay_until(Instant::now() + Duration::from_millis(300)).await; + System::current().stop(); + } }); + if let Some(tx) = completion { let _ = tx.send(()); } for tx in notify { let _ = tx.send(()); } - } else { - spawn(async move { - on_stop.await; - }); } } ServerCommand::WorkerFaulted(idx) => { diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index eb6c75a6..4f987781 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -183,7 +183,7 @@ fn test_configure() { #[test] #[cfg(unix)] -fn test_on_stop() { +fn test_on_stop_graceful() { use actix_codec::{BytesCodec, Framed}; use actix_rt::net::TcpStream; use bytes::Bytes; @@ -230,3 +230,53 @@ fn test_on_stop() { sys.stop(); let _ = h.join(); } + +#[test] +#[cfg(unix)] +fn test_on_stop_force() { + use actix_codec::{BytesCodec, Framed}; + use actix_rt::net::TcpStream; + use bytes::Bytes; + use futures_util::sink::SinkExt; + + let bool = std::sync::Arc::new(AtomicBool::new(false)); + + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let h = thread::spawn({ + let bool = bool.clone(); + move || { + let sys = actix_rt::System::new("test"); + let srv: Server = Server::build() + .backlog(100) + .disable_signals() + .on_stop(|| async move { + bool.store(true, Ordering::SeqCst); + }) + .bind("test", addr, move || { + fn_service(|io: TcpStream| async move { + let mut f = Framed::new(io, BytesCodec); + f.send(Bytes::from_static(b"test")).await.unwrap(); + Ok::<_, ()>(()) + }) + }) + .unwrap() + .start(); + + let _ = tx.send((srv, actix_rt::System::current())); + let _ = sys.run(); + } + }); + + let (srv, sys) = rx.recv().unwrap(); + + let _ = srv.stop(false); + + thread::sleep(time::Duration::from_millis(100)); + + assert!(bool.load(Ordering::SeqCst)); + + sys.stop(); + let _ = h.join(); +}