From d4808acee1b3d39f4bfcdcf06518595ae66c0bfe Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Fri, 7 Sep 2018 13:06:51 -0700
Subject: [PATCH] implement worker availability system

---
 src/lib.rs            |  1 +
 src/server.rs         | 74 ++--------------------------------
 src/server_service.rs | 58 +++++++++++++++------------
 src/worker.rs         | 92 ++++++++++++++++++++++++++++++++++++-------
 4 files changed, 115 insertions(+), 110 deletions(-)

diff --git a/src/lib.rs b/src/lib.rs
index 2f694c7d..60533ff3 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -26,6 +26,7 @@ extern crate tokio_timer;
 extern crate tower_service;
 extern crate trust_dns_resolver;
 
+#[allow(unused_imports)]
 #[macro_use]
 extern crate actix;
 
diff --git a/src/server.rs b/src/server.rs
index c61a3f56..c8041005 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -1,7 +1,3 @@
-use std::sync::{
-    atomic::{AtomicUsize, Ordering},
-    Arc,
-};
 use std::time::Duration;
 use std::{fmt, io, mem, net};
 
@@ -18,7 +14,7 @@ use actix::{
 
 use super::accept::{AcceptLoop, AcceptNotify, Command};
 use super::server_service::{self, ServerNewService, ServerServiceFactory};
-use super::worker::{Conn, StopWorker, Worker, WorkerClient};
+use super::worker::{Conn, StopWorker, Worker, WorkerAvailability, WorkerClient};
 use super::NewService;
 use super::{PauseServer, ResumeServer, StopServer, Token};
 
@@ -258,14 +254,14 @@ impl Server {
 
     fn start_worker(&self, idx: usize, notify: AcceptNotify) -> (Addr<Worker>, WorkerClient) {
         let (tx, rx) = unbounded::<Conn>();
-        let conns = Connections::new(notify, 0, 0);
-        let worker = WorkerClient::new(idx, tx, conns.clone());
+        let avail = WorkerAvailability::new(notify);
+        let worker = WorkerClient::new(idx, tx, avail.clone());
         let services: Vec<Box<ServerServiceFactory + Send>> =
             self.services.iter().map(|v| v.clone_factory()).collect();
 
         let addr = Arbiter::start(move |ctx: &mut Context<_>| {
             ctx.add_message_stream(rx);
-            Worker::new(ctx, services)
+            Worker::new(ctx, services, avail)
         });
 
         (addr, worker)
@@ -413,68 +409,6 @@ impl StreamHandler<ServerCommand, ()> for Server {
     }
 }
 
-#[derive(Clone, Default)]
-/// Contains information about connection.
-pub struct Connections(Arc<ConnectionsInner>);
-
-impl Connections {
-    fn new(notify: AcceptNotify, maxconn: usize, maxconnrate: usize) -> Self {
-        let maxconn_low = if maxconn > 10 { maxconn - 10 } else { 0 };
-        let maxconnrate_low = if maxconnrate > 10 {
-            maxconnrate - 10
-        } else {
-            0
-        };
-
-        Connections(Arc::new(ConnectionsInner {
-            notify,
-            maxconn,
-            maxconnrate,
-            maxconn_low,
-            maxconnrate_low,
-            conn: AtomicUsize::new(0),
-            connrate: AtomicUsize::new(0),
-        }))
-    }
-
-    pub(crate) fn available(&self) -> bool {
-        self.0.available()
-    }
-}
-
-#[derive(Default)]
-struct ConnectionsInner {
-    notify: AcceptNotify,
-    conn: AtomicUsize,
-    connrate: AtomicUsize,
-    maxconn: usize,
-    maxconnrate: usize,
-    maxconn_low: usize,
-    maxconnrate_low: usize,
-}
-
-impl ConnectionsInner {
-    fn available(&self) -> bool {
-        if self.maxconnrate <= self.connrate.load(Ordering::Relaxed) {
-            false
-        } else {
-            self.maxconn > self.conn.load(Ordering::Relaxed)
-        }
-    }
-
-    fn notify_maxconn(&self, maxconn: usize) {
-        if maxconn > self.maxconn_low && maxconn <= self.maxconn {
-            self.notify.notify();
-        }
-    }
-
-    fn notify_maxconnrate(&self, connrate: usize) {
-        if connrate > self.maxconnrate_low && connrate <= self.maxconnrate {
-            self.notify.notify();
-        }
-    }
-}
-
 fn bind_addr<S: net::ToSocketAddrs>(addr: S) -> io::Result<Vec<net::TcpListener>> {
     let mut err = None;
     let mut succ = false;
diff --git a/src/server_service.rs b/src/server_service.rs
index e6292f64..6debdbdb 100644
--- a/src/server_service.rs
+++ b/src/server_service.rs
@@ -3,16 +3,23 @@ use std::rc::Rc;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::{fmt, net};
 
+use futures::future::{err, ok};
 use futures::task::AtomicTask;
-use futures::{future, Async, Future, Poll};
+use futures::{Async, Future, Poll};
 use tokio_reactor::Handle;
 use tokio_tcp::TcpStream;
 
 use super::{NewService, Service};
 
+pub enum ServerMessage {
+    Connect(net::TcpStream),
+    Shutdown,
+    ForceShutdown,
+}
+
 pub(crate) type BoxedServerService = Box<
     Service<
-        Request = net::TcpStream,
+        Request = ServerMessage,
         Response = (),
         Error = (),
         Future = Box<Future<Item = (), Error = ()>>,
@@ -59,7 +66,7 @@ where
     T::Future: 'static,
     T::Error: fmt::Display + 'static,
 {
-    type Request = net::TcpStream;
+    type Request = ServerMessage;
     type Response = ();
     type Error = ();
     type Future = Box<Future<Item = (), Error = ()>>;
@@ -72,22 +79,27 @@ where
         }
     }
 
-    fn call(&mut self, stream: net::TcpStream) -> Self::Future {
-        let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| {
-            error!("Can not convert to an async tcp stream: {}", e);
-        });
+    fn call(&mut self, req: ServerMessage) -> Self::Future {
+        match req {
+            ServerMessage::Connect(stream) => {
+                let stream = TcpStream::from_std(stream, &Handle::default()).map_err(|e| {
+                    error!("Can not convert to an async tcp stream: {}", e);
+                });
 
-        if let Ok(stream) = stream {
-            let guard = self.counter.get();
+                if let Ok(stream) = stream {
+                    let guard = self.counter.get();
 
-            Box::new(
-                self.service
-                    .call(stream)
-                    .map_err(|_| ())
-                    .map(move |_| drop(guard)),
-            )
-        } else {
-            Box::new(future::err(()))
+                    Box::new(
+                        self.service
+                            .call(stream)
+                            .map_err(|_| ())
+                            .map(move |_| drop(guard)),
+                    )
+                } else {
+                    Box::new(err(()))
+                }
+            }
+            _ => Box::new(ok(())),
         }
     }
 }
@@ -133,14 +145,10 @@ where
     }
 
     fn create(&self) -> Box<Future<Item = BoxedServerService, Error = ()>> {
-        Box::new(
-            (self.inner)()
-                .new_service()
-                .map(move |inner| {
-                    let service: BoxedServerService = Box::new(ServerService::new(inner));
-                    service
-                }),
-        )
+        Box::new((self.inner)().new_service().map(move |inner| {
+            let service: BoxedServerService = Box::new(ServerService::new(inner));
+            service
+        }))
     }
 }
 
diff --git a/src/worker.rs b/src/worker.rs
index f88f4db8..9cc054f3 100644
--- a/src/worker.rs
+++ b/src/worker.rs
@@ -1,8 +1,10 @@
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
 use std::{net, time};
 
 use futures::sync::mpsc::{SendError, UnboundedSender};
 use futures::sync::oneshot;
-use futures::{future, Future};
+use futures::{future, Async, Future, Poll};
 
 use actix::msgs::StopArbiter;
 use actix::{
@@ -10,8 +12,9 @@ use actix::{
     Response, WrapFuture,
 };
 
-use super::server_service::{self, BoxedServerService, ServerServiceFactory};
-use super::{server::Connections, Token};
+use super::accept::AcceptNotify;
+use super::server_service::{self, BoxedServerService, ServerMessage, ServerServiceFactory};
+use super::Token;
 
 #[derive(Message)]
 pub(crate) struct Conn {
@@ -25,12 +28,12 @@ pub(crate) struct Conn {
 pub(crate) struct WorkerClient {
     pub idx: usize,
     tx: UnboundedSender<Conn>,
-    conns: Connections,
+    avail: WorkerAvailability,
 }
 
 impl WorkerClient {
-    pub fn new(idx: usize, tx: UnboundedSender<Conn>, conns: Connections) -> Self {
-        WorkerClient { idx, tx, conns }
+    pub fn new(idx: usize, tx: UnboundedSender<Conn>, avail: WorkerAvailability) -> Self {
+        WorkerClient { idx, tx, avail }
     }
 
     pub fn send(&self, msg: Conn) -> Result<(), SendError<Conn>> {
@@ -38,7 +41,33 @@ impl WorkerClient {
     }
 
     pub fn available(&self) -> bool {
-        self.conns.available()
+        self.avail.available()
+    }
+}
+
+#[derive(Clone)]
+pub(crate) struct WorkerAvailability {
+    notify: AcceptNotify,
+    available: Arc<AtomicBool>,
+}
+
+impl WorkerAvailability {
+    pub fn new(notify: AcceptNotify) -> Self {
+        WorkerAvailability {
+            notify,
+            available: Arc::new(AtomicBool::new(false)),
+        }
+    }
+
+    pub fn available(&self) -> bool {
+        self.available.load(Ordering::Acquire)
+    }
+
+    pub fn set(&self, val: bool) {
+        let old = self.available.swap(val, Ordering::Release);
+        if !old && val {
+            self.notify.notify()
+        }
     }
 }
 
@@ -57,9 +86,8 @@ impl Message for StopWorker {
 /// Worker accepts Socket objects via unbounded channel and start requests
 /// processing.
 pub(crate) struct Worker {
-    // conns: Connections,
     services: Vec<BoxedServerService>,
-    // counters: Vec<Arc<AtomicUsize>>,
+    availability: WorkerAvailability,
 }
 
 impl Actor for Worker {
@@ -69,10 +97,11 @@ impl Actor for Worker {
 impl Worker {
     pub(crate) fn new(
         ctx: &mut Context<Self>, services: Vec<Box<ServerServiceFactory + Send>>,
+        availability: WorkerAvailability,
     ) -> Self {
         let wrk = Worker {
+            availability,
             services: Vec::new(),
-            // counters: services.iter().map(|i| i.counter()).collect(),
         };
 
         ctx.wait(
@@ -82,8 +111,10 @@ impl Worker {
                     error!("Can not start worker: {:?}", e);
                     Arbiter::current().do_send(StopArbiter(0));
                     ctx.stop();
-                }).and_then(|services, act, _| {
+                }).and_then(|services, act, ctx| {
                     act.services.extend(services);
+                    act.availability.set(true);
+                    ctx.spawn(CheckReadiness(true));
                     fut::ok(())
                 }),
         );
@@ -91,12 +122,20 @@ impl Worker {
         wrk
     }
 
-    fn shutdown(&self, _force: bool) {
-        // self.services.iter().for_each(|h| h.shutdown(force));
+    fn shutdown(&mut self, force: bool) {
+        if force {
+            self.services.iter_mut().for_each(|h| {
+                h.call(ServerMessage::ForceShutdown);
+            });
+        } else {
+            self.services.iter_mut().for_each(|h| {
+                h.call(ServerMessage::Shutdown);
+            });
+        }
     }
 
     fn shutdown_timeout(
-        &self, ctx: &mut Context<Worker>, tx: oneshot::Sender<bool>, dur: time::Duration,
+        &mut self, ctx: &mut Context<Worker>, tx: oneshot::Sender<bool>, dur: time::Duration,
     ) {
         // sleep for 1 second and then check again
         ctx.run_later(time::Duration::new(1, 0), move |slf, ctx| {
@@ -120,7 +159,7 @@ impl Handler<Conn> for Worker {
     type Result = ();
 
     fn handle(&mut self, msg: Conn, _: &mut Context<Self>) {
-        Arbiter::spawn(self.services[msg.handler.0].call(msg.io))
+        Arbiter::spawn(self.services[msg.handler.0].call(ServerMessage::Connect(msg.io)))
     }
 }
 
@@ -151,3 +190,26 @@ impl Handler<StopWorker> for Worker {
         }
     }
 }
+
+struct CheckReadiness(bool);
+
+impl ActorFuture for CheckReadiness {
+    type Item = ();
+    type Error = ();
+    type Actor = Worker;
+
+    fn poll(&mut self, act: &mut Worker, _: &mut Context<Worker>) -> Poll<(), ()> {
+        let mut val = true;
+        for service in &mut act.services {
+            if let Ok(Async::NotReady) = service.poll_ready() {
+                val = false;
+                break;
+            }
+        }
+        if self.0 != val {
+            self.0 = val;
+            act.availability.set(val);
+        }
+        Ok(Async::NotReady)
+    }
+}