From 7804ed12ebe2b04861ecba792dc60299fc6ded6d Mon Sep 17 00:00:00 2001
From: fakeshadow <24548779@qq.com>
Date: Wed, 2 Mar 2022 11:52:12 +0800
Subject: [PATCH] block and wait for accept thread to exit. (#443)

Co-authored-by: Rob Ede <robjtede@icloud.com>
---
 actix-server/CHANGES.md    |  3 +++
 actix-server/src/accept.rs |  6 +++---
 actix-server/src/server.rs | 15 +++++++++++++--
 3 files changed, 19 insertions(+), 5 deletions(-)

diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md
index 14e5f4d7..daec4413 100644
--- a/actix-server/CHANGES.md
+++ b/actix-server/CHANGES.md
@@ -1,6 +1,9 @@
 # Changes
 
 ## Unreleased - 2021-xx-xx
+- Wait for accept thread to stop before sending completion signal. [#443]
+
+[#443]: https://github.com/actix/actix-net/pull/443
 
 
 ## 2.0.0 - 2022-01-19
diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs
index 9f7872f8..a1c4f732 100644
--- a/actix-server/src/accept.rs
+++ b/actix-server/src/accept.rs
@@ -41,7 +41,7 @@ impl Accept {
     pub(crate) fn start(
         sockets: Vec<(usize, MioListener)>,
         builder: &ServerBuilder,
-    ) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>)> {
+    ) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>, thread::JoinHandle<()>)> {
         let handle_server = ServerHandle::new(builder.cmd_tx.clone());
 
         // construct poll instance and its waker
@@ -73,12 +73,12 @@ impl Accept {
             handle_server,
         )?;
 
-        thread::Builder::new()
+        let accept_handle = thread::Builder::new()
             .name("actix-server acceptor".to_owned())
             .spawn(move || accept.poll_with(&mut sockets))
             .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
 
-        Ok((waker_queue, handles_server))
+        Ok((waker_queue, handles_server, accept_handle))
     }
 
     fn new_with_sockets(
diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs
index fec3b06e..8defa543 100644
--- a/actix-server/src/server.rs
+++ b/actix-server/src/server.rs
@@ -3,6 +3,7 @@ use std::{
     io, mem,
     pin::Pin,
     task::{Context, Poll},
+    thread,
     time::Duration,
 };
 
@@ -158,6 +159,7 @@ impl Future for Server {
 
 pub struct ServerInner {
     worker_handles: Vec<WorkerHandleServer>,
+    accept_handle: Option<thread::JoinHandle<()>>,
     worker_config: ServerWorkerConfig,
     services: Vec<Box<dyn InternalServiceFactory>>,
     waker_queue: WakerQueue,
@@ -205,7 +207,7 @@ impl ServerInner {
             );
         }
 
-        let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?;
+        let (waker_queue, worker_handles, accept_handle) = Accept::start(sockets, &builder)?;
 
         let mux = ServerEventMultiplexer {
             signal_fut: (builder.listen_os_signals).then(Signals::new),
@@ -214,6 +216,7 @@ impl ServerInner {
 
         let server = ServerInner {
             waker_queue,
+            accept_handle: Some(accept_handle),
             worker_handles,
             worker_config: builder.worker_config,
             services: builder.factories,
@@ -243,7 +246,8 @@ impl ServerInner {
             } => {
                 self.stopping = true;
 
-                // stop accept thread
+                // Signal accept thread to stop.
+                // Signal is non-blocking; we wait for thread to stop later.
                 self.waker_queue.wake(WakerInterest::Stop);
 
                 // send stop signal to workers
@@ -258,6 +262,13 @@ impl ServerInner {
                     let _ = join_all(workers_stop).await;
                 }
 
+                // wait for accept thread stop
+                self.accept_handle
+                    .take()
+                    .unwrap()
+                    .join()
+                    .expect("Accept thread must not panic in any case");
+
                 if let Some(tx) = completion {
                     let _ = tx.send(());
                 }