From 0e3d1068da34b499feb4d7542d7db5647149e1ec Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Thu, 1 Nov 2018 15:33:35 -0700
Subject: [PATCH] separate stop worker channel

---
 src/server/server.rs |   8 ++--
 src/server/worker.rs | 107 ++++++++++++++++++++++++-------------------
 2 files changed, 64 insertions(+), 51 deletions(-)

diff --git a/src/server/server.rs b/src/server/server.rs
index 09e795d0..834fa747 100644
--- a/src/server/server.rs
+++ b/src/server/server.rs
@@ -249,15 +249,16 @@ impl Server {
     }
 
     fn start_worker(&self, idx: usize, notify: AcceptNotify) -> WorkerClient {
-        let (tx, rx) = unbounded();
+        let (tx1, rx1) = unbounded();
+        let (tx2, rx2) = unbounded();
         let timeout = self.shutdown_timeout;
         let avail = WorkerAvailability::new(notify);
-        let worker = WorkerClient::new(idx, tx, avail.clone());
+        let worker = WorkerClient::new(idx, tx1, tx2, avail.clone());
         let services: Vec<Box<InternalServiceFactory>> =
             self.services.iter().map(|v| v.clone_factory()).collect();
 
         Arbiter::new(format!("actix-net-worker-{}", idx)).do_send(Execute::new(move || {
-            Worker::start(rx, services, avail, timeout.clone());
+            Worker::start(rx1, rx2, services, avail, timeout.clone());
             Ok::<_, ()>(())
         }));
 
@@ -317,6 +318,7 @@ impl Handler<StopServer> for Server {
     type Result = Response<(), ()>;
 
     fn handle(&mut self, msg: StopServer, ctx: &mut Context<Self>) -> Self::Result {
+        println!("STOP command");
         // stop accept thread
         self.accept.send(Command::Stop);
 
diff --git a/src/server/worker.rs b/src/server/worker.rs
index 68e9cbf7..e9d8f8c8 100644
--- a/src/server/worker.rs
+++ b/src/server/worker.rs
@@ -16,11 +16,13 @@ use super::services::{BoxedServerService, InternalServiceFactory, ServerMessage}
 use super::Token;
 use counter::Counter;
 
-pub(crate) enum WorkerCommand {
-    Message(Conn),
-    /// Stop worker message. Returns `true` on successful shutdown
-    /// and `false` if some connections still alive.
-    Stop(bool, oneshot::Sender<bool>),
+pub(crate) struct WorkerCommand(Conn);
+
+/// Stop worker message. Returns `true` on successful shutdown
+/// and `false` if some connections still alive.
+pub(crate) struct StopCommand {
+    graceful: bool,
+    result: oneshot::Sender<bool>,
 }
 
 #[derive(Debug, Message)]
@@ -55,26 +57,30 @@ thread_local! {
 #[derive(Clone)]
 pub(crate) struct WorkerClient {
     pub idx: usize,
-    tx: UnboundedSender<WorkerCommand>,
+    tx1: UnboundedSender<WorkerCommand>,
+    tx2: UnboundedSender<StopCommand>,
     avail: WorkerAvailability,
 }
 
 impl WorkerClient {
     pub fn new(
         idx: usize,
-        tx: UnboundedSender<WorkerCommand>,
+        tx1: UnboundedSender<WorkerCommand>,
+        tx2: UnboundedSender<StopCommand>,
         avail: WorkerAvailability,
     ) -> Self {
-        WorkerClient { idx, tx, avail }
+        WorkerClient {
+            idx,
+            tx1,
+            tx2,
+            avail,
+        }
     }
 
     pub fn send(&self, msg: Conn) -> Result<(), Conn> {
-        self.tx
-            .unbounded_send(WorkerCommand::Message(msg))
-            .map_err(|e| match e.into_inner() {
-                WorkerCommand::Message(msg) => msg,
-                _ => panic!(),
-            })
+        self.tx1
+            .unbounded_send(WorkerCommand(msg))
+            .map_err(|msg| msg.into_inner().0)
     }
 
     pub fn available(&self) -> bool {
@@ -82,8 +88,8 @@ impl WorkerClient {
     }
 
     pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
-        let (tx, rx) = oneshot::channel();
-        let _ = self.tx.unbounded_send(WorkerCommand::Stop(graceful, tx));
+        let (result, rx) = oneshot::channel();
+        let _ = self.tx2.unbounded_send(StopCommand { graceful, result });
         rx
     }
 }
@@ -120,6 +126,7 @@ impl WorkerAvailability {
 /// processing.
 pub(crate) struct Worker {
     rx: UnboundedReceiver<WorkerCommand>,
+    rx2: UnboundedReceiver<StopCommand>,
     services: Vec<BoxedServerService>,
     availability: WorkerAvailability,
     conns: Counter,
@@ -131,6 +138,7 @@ pub(crate) struct Worker {
 impl Worker {
     pub(crate) fn start(
         rx: UnboundedReceiver<WorkerCommand>,
+        rx2: UnboundedReceiver<StopCommand>,
         factories: Vec<Box<InternalServiceFactory>>,
         availability: WorkerAvailability,
         shutdown_timeout: time::Duration,
@@ -138,6 +146,7 @@ impl Worker {
         availability.set(false);
         let mut wrk = MAX_CONNS_COUNTER.with(|conns| Worker {
             rx,
+            rx2,
             availability,
             factories,
             shutdown_timeout,
@@ -216,6 +225,39 @@ impl Future for Worker {
     type Error = ();
 
     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
+        // `StopWorker` message handler
+        match self.rx2.poll() {
+            Ok(Async::Ready(Some(StopCommand { graceful, result }))) => {
+                self.availability.set(false);
+                let num = num_connections();
+                if num == 0 {
+                    info!("Shutting down worker, 0 connections");
+                    let _ = result.send(true);
+                    return Ok(Async::Ready(()));
+                } else if graceful {
+                    self.shutdown(false);
+                    let num = num_connections();
+                    if num != 0 {
+                        info!("Graceful worker shutdown, {} connections", num);
+                        self.state = WorkerState::Shutdown(
+                            sleep(time::Duration::from_secs(1)),
+                            sleep(self.shutdown_timeout),
+                            result,
+                        );
+                    } else {
+                        let _ = result.send(true);
+                        return Ok(Async::Ready(()));
+                    }
+                } else {
+                    info!("Force shutdown worker, {} connections", num);
+                    self.shutdown(true);
+                    let _ = result.send(false);
+                    return Ok(Async::Ready(()));
+                }
+            }
+            _ => (),
+        }
+
         let state = mem::replace(&mut self.state, WorkerState::None);
 
         match state {
@@ -321,7 +363,7 @@ impl Future for Worker {
                 loop {
                     match self.rx.poll() {
                         // handle incoming tcp stream
-                        Ok(Async::Ready(Some(WorkerCommand::Message(msg)))) => {
+                        Ok(Async::Ready(Some(WorkerCommand(msg)))) => {
                             match self.check_readiness(false) {
                                 Ok(true) => {
                                     let guard = self.conns.get();
@@ -348,35 +390,6 @@ impl Future for Worker {
                             }
                             return self.poll();
                         }
-                        // `StopWorker` message handler
-                        Ok(Async::Ready(Some(WorkerCommand::Stop(graceful, tx)))) => {
-                            self.availability.set(false);
-                            let num = num_connections();
-                            if num == 0 {
-                                info!("Shutting down worker, 0 connections");
-                                let _ = tx.send(true);
-                                return Ok(Async::Ready(()));
-                            } else if graceful {
-                                self.shutdown(false);
-                                let num = num_connections();
-                                if num != 0 {
-                                    info!("Graceful worker shutdown, {} connections", num);
-                                    break Some(WorkerState::Shutdown(
-                                        sleep(time::Duration::from_secs(1)),
-                                        sleep(self.shutdown_timeout),
-                                        tx,
-                                    ));
-                                } else {
-                                    let _ = tx.send(true);
-                                    return Ok(Async::Ready(()));
-                                }
-                            } else {
-                                info!("Force shutdown worker, {} connections", num);
-                                self.shutdown(true);
-                                let _ = tx.send(false);
-                                return Ok(Async::Ready(()));
-                            }
-                        }
                         Ok(Async::NotReady) => {
                             self.state = WorkerState::Available;
                             return Ok(Async::NotReady);
@@ -387,7 +400,5 @@ impl Future for Worker {
             }
             WorkerState::None => panic!(),
         };
-
-        Ok(Async::NotReady)
     }
 }