From 7dd0ef88ee58078a5fabd3db54f3fc918db8698e Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 5 Feb 2022 22:48:45 +0800 Subject: [PATCH] block and wait for accept thread to exit. --- actix-server/src/accept.rs | 6 +++--- actix-server/src/server.rs | 14 ++++++++++++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 9f7872f8..a1c4f732 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -41,7 +41,7 @@ impl Accept { pub(crate) fn start( sockets: Vec<(usize, MioListener)>, builder: &ServerBuilder, - ) -> io::Result<(WakerQueue, Vec)> { + ) -> io::Result<(WakerQueue, Vec, thread::JoinHandle<()>)> { let handle_server = ServerHandle::new(builder.cmd_tx.clone()); // construct poll instance and its waker @@ -73,12 +73,12 @@ impl Accept { handle_server, )?; - thread::Builder::new() + let accept_handle = thread::Builder::new() .name("actix-server acceptor".to_owned()) .spawn(move || accept.poll_with(&mut sockets)) .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - Ok((waker_queue, handles_server)) + Ok((waker_queue, handles_server, accept_handle)) } fn new_with_sockets( diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index fec3b06e..1dd0c597 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -3,6 +3,7 @@ use std::{ io, mem, pin::Pin, task::{Context, Poll}, + thread, time::Duration, }; @@ -158,6 +159,7 @@ impl Future for Server { pub struct ServerInner { worker_handles: Vec, + accept_handle: Option>, worker_config: ServerWorkerConfig, services: Vec>, waker_queue: WakerQueue, @@ -205,7 +207,7 @@ impl ServerInner { ); } - let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?; + let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?; let mux = ServerEventMultiplexer { signal_fut: (builder.listen_os_signals).then(Signals::new), @@ -214,6 +216,7 @@ impl ServerInner { let server = ServerInner { waker_queue, + accept_handle: Some(accept_handle), worker_handles, worker_config: builder.worker_config, services: builder.factories, @@ -243,7 +246,7 @@ impl ServerInner { } => { self.stopping = true; - // stop accept thread + // signal accept thread to stop. This signal is non-blocking with no guarantee for immediate stop. self.waker_queue.wake(WakerInterest::Stop); // send stop signal to workers @@ -258,6 +261,13 @@ impl ServerInner { let _ = join_all(workers_stop).await; } + // wait for accept thread stop. + self.accept_handle + .take() + .unwrap() + .join() + .expect("Accept thread must not panic in any case"); + if let Some(tx) = completion { let _ = tx.send(()); }