From 0a11cf5cba713d56b449658117a41b9c68e714ce Mon Sep 17 00:00:00 2001
From: fakeshadow <24548779@qq.com>
Date: Fri, 9 Apr 2021 17:03:28 -0700
Subject: [PATCH] Separate WorkerHandle to two parts (#323)

---
 actix-server/src/accept.rs      | 10 ++--
 actix-server/src/builder.rs     | 27 +++++++----
 actix-server/src/waker_queue.rs |  6 +--
 actix-server/src/worker.rs      | 86 ++++++++++++++++++---------------
 4 files changed, 74 insertions(+), 55 deletions(-)

diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs
index 5b6345cc..5b9f99c7 100644
--- a/actix-server/src/accept.rs
+++ b/actix-server/src/accept.rs
@@ -12,7 +12,7 @@ use slab::Slab;
 use crate::server::Server;
 use crate::socket::{MioListener, SocketAddr};
 use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
-use crate::worker::{Conn, WorkerHandle};
+use crate::worker::{Conn, WorkerHandleAccept};
 use crate::Token;
 
 struct ServerSocketInfo {
@@ -66,7 +66,7 @@ impl AcceptLoop {
     pub(crate) fn start(
         &mut self,
         socks: Vec<(Token, MioListener)>,
-        handles: Vec<WorkerHandle>,
+        handles: Vec<WorkerHandleAccept>,
     ) {
         let srv = self.srv.take().expect("Can not re-use AcceptInfo");
         let poll = self.poll.take().unwrap();
@@ -80,7 +80,7 @@ impl AcceptLoop {
 struct Accept {
     poll: Poll,
     waker: WakerQueue,
-    handles: Vec<WorkerHandle>,
+    handles: Vec<WorkerHandleAccept>,
     srv: Server,
     next: usize,
     backpressure: bool,
@@ -105,7 +105,7 @@ impl Accept {
         waker: WakerQueue,
         socks: Vec<(Token, MioListener)>,
         srv: Server,
-        handles: Vec<WorkerHandle>,
+        handles: Vec<WorkerHandleAccept>,
     ) {
         // Accept runs in its own thread and would want to spawn additional futures to current
         // actix system.
@@ -125,7 +125,7 @@ impl Accept {
         poll: Poll,
         waker: WakerQueue,
         socks: Vec<(Token, MioListener)>,
-        handles: Vec<WorkerHandle>,
+        handles: Vec<WorkerHandleAccept>,
         srv: Server,
     ) -> (Accept, Slab<ServerSocketInfo>) {
         let mut sockets = Slab::new();
diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs
index fdb02205..6019ff16 100644
--- a/actix-server/src/builder.rs
+++ b/actix-server/src/builder.rs
@@ -19,7 +19,10 @@ use crate::signals::{Signal, Signals};
 use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
 use crate::socket::{MioTcpListener, MioTcpSocket};
 use crate::waker_queue::{WakerInterest, WakerQueue};
-use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle};
+use crate::worker::{
+    ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept,
+    WorkerHandleServer,
+};
 use crate::{join_all, Token};
 
 /// Server builder
@@ -27,7 +30,7 @@ pub struct ServerBuilder {
     threads: usize,
     token: Token,
     backlog: u32,
-    handles: Vec<(usize, WorkerHandle)>,
+    handles: Vec<(usize, WorkerHandleServer)>,
     services: Vec<Box<dyn InternalServiceFactory>>,
     sockets: Vec<(Token, String, MioListener)>,
     accept: AcceptLoop,
@@ -280,10 +283,11 @@ impl ServerBuilder {
             // start workers
             let handles = (0..self.threads)
                 .map(|idx| {
-                    let handle = self.start_worker(idx, self.accept.waker_owned());
-                    self.handles.push((idx, handle.clone()));
+                    let (handle_accept, handle_server) =
+                        self.start_worker(idx, self.accept.waker_owned());
+                    self.handles.push((idx, handle_server));
 
-                    handle
+                    handle_accept
                 })
                 .collect();
 
@@ -311,7 +315,11 @@ impl ServerBuilder {
         }
     }
 
-    fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerHandle {
+    fn start_worker(
+        &self,
+        idx: usize,
+        waker: WakerQueue,
+    ) -> (WorkerHandleAccept, WorkerHandleServer) {
         let avail = WorkerAvailability::new(waker);
         let services = self.services.iter().map(|v| v.clone_factory()).collect();
 
@@ -437,9 +445,10 @@ impl ServerBuilder {
                         break;
                     }
 
-                    let handle = self.start_worker(new_idx, self.accept.waker_owned());
-                    self.handles.push((new_idx, handle.clone()));
-                    self.accept.wake(WakerInterest::Worker(handle));
+                    let (handle_accept, handle_server) =
+                        self.start_worker(new_idx, self.accept.waker_owned());
+                    self.handles.push((new_idx, handle_server));
+                    self.accept.wake(WakerInterest::Worker(handle_accept));
                 }
             }
         }
diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs
index e38a9782..8aa493aa 100644
--- a/actix-server/src/waker_queue.rs
+++ b/actix-server/src/waker_queue.rs
@@ -6,7 +6,7 @@ use std::{
 
 use mio::{Registry, Token as MioToken, Waker};
 
-use crate::worker::WorkerHandle;
+use crate::worker::WorkerHandleAccept;
 
 /// Waker token for `mio::Poll` instance.
 pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
@@ -84,6 +84,6 @@ pub(crate) enum WakerInterest {
     Timer,
     /// `Worker` is an interest happen after a worker runs into faulted state(This is determined
     /// by if work can be sent to it successfully).`Accept` would be waked up and add the new
-    /// `WorkerHandle`.
-    Worker(WorkerHandle),
+    /// `WorkerHandleAccept`.
+    Worker(WorkerHandleAccept),
 }
diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs
index c3074646..8e122623 100644
--- a/actix-server/src/worker.rs
+++ b/actix-server/src/worker.rs
@@ -28,11 +28,9 @@ use crate::socket::MioStream;
 use crate::waker_queue::{WakerInterest, WakerQueue};
 use crate::{join_all, Token};
 
-pub(crate) struct WorkerCommand(Conn);
-
-/// Stop worker message. Returns `true` on successful shutdown
-/// and `false` if some connections still alive.
-pub(crate) struct StopCommand {
+/// Stop worker message. Returns `true` on successful graceful shutdown.
+/// and `false` if some connections still alive when shutdown execute.
+pub(crate) struct Stop {
     graceful: bool,
     tx: oneshot::Sender<bool>,
 }
@@ -43,42 +41,55 @@ pub(crate) struct Conn {
     pub token: Token,
 }
 
-// a handle to worker that can send message to worker and share the availability of worker to other
-// thread.
-#[derive(Clone)]
-pub(crate) struct WorkerHandle {
+fn handle_pair(
+    idx: usize,
+    tx1: UnboundedSender<Conn>,
+    tx2: UnboundedSender<Stop>,
+    avail: WorkerAvailability,
+) -> (WorkerHandleAccept, WorkerHandleServer) {
+    let accept = WorkerHandleAccept {
+        idx,
+        tx: tx1,
+        avail,
+    };
+
+    let server = WorkerHandleServer { idx, tx: tx2 };
+
+    (accept, server)
+}
+
+/// Handle to worker that can send connection message to worker and share the
+/// availability of worker to other thread.
+///
+/// Held by [Accept](crate::accept::Accept).
+pub(crate) struct WorkerHandleAccept {
     pub idx: usize,
-    tx1: UnboundedSender<WorkerCommand>,
-    tx2: UnboundedSender<StopCommand>,
+    tx: UnboundedSender<Conn>,
     avail: WorkerAvailability,
 }
 
-impl WorkerHandle {
-    pub fn new(
-        idx: usize,
-        tx1: UnboundedSender<WorkerCommand>,
-        tx2: UnboundedSender<StopCommand>,
-        avail: WorkerAvailability,
-    ) -> Self {
-        WorkerHandle {
-            idx,
-            tx1,
-            tx2,
-            avail,
-        }
+impl WorkerHandleAccept {
+    pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
+        self.tx.send(msg).map_err(|msg| msg.0)
     }
 
-    pub fn send(&self, msg: Conn) -> Result<(), Conn> {
-        self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0)
-    }
-
-    pub fn available(&self) -> bool {
+    pub(crate) fn available(&self) -> bool {
         self.avail.available()
     }
+}
 
-    pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
+/// Handle to worker than can send stop message to worker.
+///
+/// Held by [ServerBuilder](crate::builder::ServerBuilder).
+pub(crate) struct WorkerHandleServer {
+    pub idx: usize,
+    tx: UnboundedSender<Stop>,
+}
+
+impl WorkerHandleServer {
+    pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
         let (tx, rx) = oneshot::channel();
-        let _ = self.tx2.send(StopCommand { graceful, tx });
+        let _ = self.tx.send(Stop { graceful, tx });
         rx
     }
 }
@@ -114,8 +125,8 @@ impl WorkerAvailability {
 ///
 /// Worker accepts Socket objects via unbounded channel and starts stream processing.
 pub(crate) struct ServerWorker {
-    rx: UnboundedReceiver<WorkerCommand>,
-    rx2: UnboundedReceiver<StopCommand>,
+    rx: UnboundedReceiver<Conn>,
+    rx2: UnboundedReceiver<Stop>,
     services: Vec<WorkerService>,
     availability: WorkerAvailability,
     conns: Counter,
@@ -187,7 +198,7 @@ impl ServerWorker {
         factories: Vec<Box<dyn InternalServiceFactory>>,
         availability: WorkerAvailability,
         config: ServerWorkerConfig,
-    ) -> WorkerHandle {
+    ) -> (WorkerHandleAccept, WorkerHandleServer) {
         let (tx1, rx) = unbounded_channel();
         let (tx2, rx2) = unbounded_channel();
         let avail = availability.clone();
@@ -254,7 +265,7 @@ impl ServerWorker {
             });
         });
 
-        WorkerHandle::new(idx, tx1, tx2, avail)
+        handle_pair(idx, tx1, tx2, avail)
     }
 
     fn restart_service(&mut self, token: Token, factory_id: usize) {
@@ -360,8 +371,7 @@ impl Future for ServerWorker {
         let this = self.as_mut().get_mut();
 
         // `StopWorker` message handler
-        if let Poll::Ready(Some(StopCommand { graceful, tx })) =
-            Pin::new(&mut this.rx2).poll_recv(cx)
+        if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
         {
             this.availability.set(false);
             let num = this.conns.total();
@@ -472,7 +482,7 @@ impl Future for ServerWorker {
 
                 match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
                     // handle incoming io stream
-                    Some(WorkerCommand(msg)) => {
+                    Some(msg) => {
                         let guard = this.conns.get();
                         let _ = this.services[msg.token.0].service.call((guard, msg.io));
                     }