add docs and changelog. fix force shutdown

This commit is contained in:
fakeshadow 2020-12-23 03:37:01 +08:00
parent 54e8b6628c
commit fb5c45b11d
3 changed files with 69 additions and 11 deletions

View File

@ -3,10 +3,11 @@
## Unreleased - 2020-xx-xx
* Added explicit info log message on accept queue pause. [#215]
* Prevent double registration of sockets when back-pressure is resolved. [#223]
* Added ServerBuilder::on_stop to run an async closure before Server shutdown [#230]
[#215]: https://github.com/actix/actix-net/pull/215
[#223]: https://github.com/actix/actix-net/pull/223
[#230]: https://github.com/actix/actix-net/pull/230
## 1.0.4 - 2020-09-12
* Update actix-codec to 0.3.0.

View File

@ -1,3 +1,4 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
@ -9,7 +10,7 @@ use actix_rt::{spawn, System};
use futures_channel::mpsc::{unbounded, UnboundedReceiver};
use futures_channel::oneshot;
use futures_util::stream::FuturesUnordered;
use futures_util::{future::Future, ready, stream::Stream, StreamExt};
use futures_util::{ready, stream::Stream, StreamExt};
use log::{error, info};
use socket2::{Domain, Protocol, Socket, Type};
@ -297,13 +298,19 @@ impl ServerBuilder {
}
}
/// Async closure that would run before Server is shutdown.
///
/// The exact timing is after `ServerCommand::Stop` is received.
/// Before worker threads stopped if the shutdown is graceful.
///
/// Or before `actix::System` is stopped when not graceful.
/// (If `ServerBuilder::system_exit` is set to true)
pub fn on_stop<F, Fut>(mut self, future: F) -> Self
where
F: FnOnce() -> Fut + 'static,
Fut: Future<Output = ()>,
{
self.on_stop = Box::pin(async move { future().await });
self
}
@ -369,6 +376,7 @@ impl ServerBuilder {
self.accept.send(Command::Stop);
let notify = std::mem::take(&mut self.notify);
// take the on_stop future.
let mut on_stop = Box::pin(async {}) as _;
std::mem::swap(&mut self.on_stop, &mut on_stop);
@ -398,22 +406,21 @@ impl ServerBuilder {
}
});
// we need to stop system if server was spawned
} else if self.exit {
} else {
spawn(async move {
on_stop.await;
delay_until(Instant::now() + Duration::from_millis(300)).await;
System::current().stop();
if exit {
delay_until(Instant::now() + Duration::from_millis(300)).await;
System::current().stop();
}
});
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
} else {
spawn(async move {
on_stop.await;
});
}
}
ServerCommand::WorkerFaulted(idx) => {

View File

@ -183,7 +183,7 @@ fn test_configure() {
#[test]
#[cfg(unix)]
fn test_on_stop() {
fn test_on_stop_graceful() {
use actix_codec::{BytesCodec, Framed};
use actix_rt::net::TcpStream;
use bytes::Bytes;
@ -230,3 +230,53 @@ fn test_on_stop() {
sys.stop();
let _ = h.join();
}
#[test]
#[cfg(unix)]
fn test_on_stop_force() {
use actix_codec::{BytesCodec, Framed};
use actix_rt::net::TcpStream;
use bytes::Bytes;
use futures_util::sink::SinkExt;
let bool = std::sync::Arc::new(AtomicBool::new(false));
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let h = thread::spawn({
let bool = bool.clone();
move || {
let sys = actix_rt::System::new("test");
let srv: Server = Server::build()
.backlog(100)
.disable_signals()
.on_stop(|| async move {
bool.store(true, Ordering::SeqCst);
})
.bind("test", addr, move || {
fn_service(|io: TcpStream| async move {
let mut f = Framed::new(io, BytesCodec);
f.send(Bytes::from_static(b"test")).await.unwrap();
Ok::<_, ()>(())
})
})
.unwrap()
.start();
let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run();
}
});
let (srv, sys) = rx.recv().unwrap();
let _ = srv.stop(false);
thread::sleep(time::Duration::from_millis(100));
assert!(bool.load(Ordering::SeqCst));
sys.stop();
let _ = h.join();
}