Merge branch 'master' into refactor/simplify_server_future

This commit is contained in:
fakeshadow 2021-04-30 02:11:06 +08:00
commit f92d87d721
7 changed files with 338 additions and 294 deletions

View File

@ -28,7 +28,6 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc
log = "0.4" log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] } mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13" num_cpus = "1.13"
slab = "0.4"
tokio = { version = "1.4", features = ["sync"] } tokio = { version = "1.4", features = ["sync"] }
[dev-dependencies] [dev-dependencies]
@ -38,4 +37,4 @@ actix-rt = "2.0.0"
bytes = "1" bytes = "1"
env_logger = "0.8" env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1", features = ["io-util", "macros", "rt-multi-thread"] } tokio = { version = "1.4", features = ["io-util", "macros", "rt-multi-thread"] }

View File

@ -3,28 +3,23 @@ use std::{io, thread};
use log::{error, info}; use log::{error, info};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use slab::Slab;
use crate::builder::ServerBuilder; use crate::builder::ServerBuilder;
use crate::server::ServerHandle; use crate::server::ServerHandle;
use crate::socket::MioListener; use crate::socket::MioListener;
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{ use crate::worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer};
Conn, ServerWorker, WorkerAvailability, WorkerHandleAccept, WorkerHandleServer,
};
use crate::Token;
const DUR_ON_ERR: Duration = Duration::from_millis(500); const DUR_ON_ERR: Duration = Duration::from_millis(500);
struct ServerSocketInfo { struct ServerSocketInfo {
/// Beware this is the crate token for identify socket and should not be confused token: usize,
/// with `mio::Token`.
token: Token,
lst: MioListener, lst: MioListener,
// mark the deadline when this socket's listener should be registered again /// Timeout is used to mark the deadline when this socket's listener should be registered again
timeout_deadline: Option<Instant>, /// after an error.
timeout: Option<Instant>,
} }
/// poll instance of the server. /// poll instance of the server.
@ -35,10 +30,9 @@ pub(crate) struct Accept {
srv: ServerHandle, srv: ServerHandle,
next: usize, next: usize,
avail: Availability, avail: Availability,
backpressure: bool, // use the smallest duration from sockets timeout.
// poll time out duration.
// use the smallest duration from sockets timeout_deadline.
timeout: Option<Duration>, timeout: Option<Duration>,
paused: bool,
} }
/// Array of u128 with every bit as marker for a worker handle's availability. /// Array of u128 with every bit as marker for a worker handle's availability.
@ -52,23 +46,22 @@ impl Default for Availability {
impl Availability { impl Availability {
/// Check if any worker handle is available /// Check if any worker handle is available
#[inline(always)]
fn available(&self) -> bool { fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0) self.0.iter().any(|a| *a != 0)
} }
/// Check if worker handle is available by index
#[inline(always)]
fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index. /// Set worker handle available state by index.
fn set_available(&mut self, idx: usize, avail: bool) { fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = if idx < 128 { let (offset, idx) = Self::offset(idx);
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
};
let off = 1 << idx as u128; let off = 1 << idx as u128;
if avail { if avail {
@ -85,6 +78,21 @@ impl Availability {
self.set_available(handle.idx(), true); self.set_available(handle.idx(), true);
}) })
} }
/// Get offset and adjusted index of given worker handle index.
fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
}
}
} }
/// This function defines errors that are per-connection. Which basically /// This function defines errors that are per-connection. Which basically
@ -102,7 +110,7 @@ fn connection_error(e: &io::Error) -> bool {
impl Accept { impl Accept {
pub(crate) fn start( pub(crate) fn start(
sockets: Vec<(Token, MioListener)>, sockets: Vec<(usize, MioListener)>,
builder: &ServerBuilder, builder: &ServerBuilder,
) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>)> { ) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>)> {
let server_handle = ServerHandle::new(builder.cmd_tx.clone()); let server_handle = ServerHandle::new(builder.cmd_tx.clone());
@ -115,16 +123,15 @@ impl Accept {
let (handles_accept, handles_server) = (0..builder.threads) let (handles_accept, handles_server) = (0..builder.threads)
.map(|idx| { .map(|idx| {
// start workers // start workers
let availability = WorkerAvailability::new(idx, waker_queue.clone());
let factories = builder.services.iter().map(|v| v.clone_factory()).collect(); let factories = builder.services.iter().map(|v| v.clone_factory()).collect();
ServerWorker::start(idx, factories, availability, builder.worker_config) ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config)
}) })
.collect::<Result<Vec<_>, io::Error>>()? .collect::<Result<Vec<_>, io::Error>>()?
.into_iter() .into_iter()
.unzip(); .unzip();
let (mut accept, sockets) = Accept::new_with_sockets( let (mut accept, mut sockets) = Accept::new_with_sockets(
poll, poll,
waker_queue.clone(), waker_queue.clone(),
sockets, sockets,
@ -135,7 +142,7 @@ impl Accept {
// Accept runs in its own thread. // Accept runs in its own thread.
thread::Builder::new() thread::Builder::new()
.name("actix-server acceptor".to_owned()) .name("actix-server acceptor".to_owned())
.spawn(move || accept.poll_with(sockets)) .spawn(move || accept.poll_with(&mut sockets))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
// return waker and worker handle clones to server builder. // return waker and worker handle clones to server builder.
@ -145,25 +152,24 @@ impl Accept {
fn new_with_sockets( fn new_with_sockets(
poll: Poll, poll: Poll,
waker_queue: WakerQueue, waker_queue: WakerQueue,
socks: Vec<(Token, MioListener)>, socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
srv: ServerHandle, srv: ServerHandle,
) -> io::Result<(Accept, Slab<ServerSocketInfo>)> { ) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> {
let mut sockets = Slab::new(); let sockets = socks
for (hnd_token, mut lst) in socks.into_iter() { .into_iter()
let entry = sockets.vacant_entry(); .map(|(token, mut lst)| {
let token = entry.key(); // Start listening for incoming connections
poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)?;
// Start listening for incoming connections Ok(ServerSocketInfo {
poll.registry() token,
.register(&mut lst, MioToken(token), Interest::READABLE)?; lst,
timeout: None,
entry.insert(ServerSocketInfo { })
token: hnd_token, })
lst, .collect::<Result<_, io::Error>>()?;
timeout_deadline: None,
});
}
let mut avail = Availability::default(); let mut avail = Availability::default();
@ -177,46 +183,47 @@ impl Accept {
srv, srv,
next: 0, next: 0,
avail, avail,
backpressure: false,
timeout: None, timeout: None,
paused: false,
}; };
Ok((accept, sockets)) Ok((accept, sockets))
} }
fn poll_with(&mut self, mut sockets: Slab<ServerSocketInfo>) { fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
let mut events = mio::Events::with_capacity(128); let mut events = mio::Events::with_capacity(128);
loop { loop {
match self.poll.poll(&mut events, self.timeout) { if let Err(e) = self.poll.poll(&mut events, None) {
Ok(_) => {} match e.kind() {
Err(e) if e.kind() == io::ErrorKind::Interrupted => {} io::ErrorKind::Interrupted => continue,
Err(e) => panic!("Poll error: {}", e), _ => panic!("Poll error: {}", e),
}
} }
for event in events.iter() { for event in events.iter() {
let token = event.token(); let token = event.token();
match token { match token {
WAKER_TOKEN => { WAKER_TOKEN => {
let should_return = self.handle_waker(&mut sockets); let exit = self.handle_waker(sockets);
if should_return { if exit {
info!("Accept is stopped.");
return; return;
} }
} }
_ => { _ => {
let token = usize::from(token); let token = usize::from(token);
self.accept(&mut sockets, token) self.accept(sockets, token);
} }
} }
} }
// check for timeout and re-register sockets. // check for timeout and re-register sockets.
self.process_timeout(&mut sockets); self.process_timeout(sockets);
} }
} }
/// Return true to notify `Accept::poll_with` to return. fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool {
fn handle_waker(&mut self, sockets: &mut Slab<ServerSocketInfo>) -> bool {
// This is a loop because interests for command from previous version was // This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown // a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue. // if it's necessary/good practice to actively drain the waker queue.
@ -231,7 +238,10 @@ impl Accept {
drop(guard); drop(guard);
self.avail.set_available(idx, true); self.avail.set_available(idx, true);
self.maybe_backpressure(sockets, false);
if !self.paused {
self.accept_all(sockets);
}
} }
// a new worker thread is made and it's handle would be added to Accept // a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => { Some(WakerInterest::Worker(handle)) => {
@ -239,55 +249,67 @@ impl Accept {
self.avail.set_available(handle.idx(), true); self.avail.set_available(handle.idx(), true);
self.handles.push(handle); self.handles.push(handle);
// maybe we want to recover from a backpressure.
self.maybe_backpressure(sockets, false); if !self.paused {
self.accept_all(sockets);
}
} }
Some(WakerInterest::Pause) => { Some(WakerInterest::Pause) => {
drop(guard); drop(guard);
self.paused = true;
self.deregister_all(sockets); self.deregister_all(sockets);
} }
Some(WakerInterest::Resume) => { Some(WakerInterest::Resume) => {
drop(guard); drop(guard);
sockets.iter_mut().for_each(|(token, info)| {
self.register_logged(token, info); self.paused = false;
sockets.iter_mut().for_each(|info| {
self.register_logged(info);
}); });
self.accept_all(sockets);
} }
Some(WakerInterest::Stop) => { Some(WakerInterest::Stop) => {
self.deregister_all(sockets); self.deregister_all(sockets);
return true; return true;
} }
// waker queue is drained // waker queue is drained
None => { None => {
// Reset the WakerQueue before break so it does not grow infinitely // Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard); WakerQueue::reset(&mut guard);
return false; return false;
} }
} }
} }
} }
fn process_timeout(&mut self, sockets: &mut Slab<ServerSocketInfo>) { fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) {
// Take old timeout as it's no use after each iteration. // Take old timeout as it's no use after each iteration.
if self.timeout.take().is_some() { if self.timeout.take().is_some() {
let now = Instant::now(); let now = Instant::now();
sockets sockets
.iter_mut() .iter_mut()
// Only sockets that had an associated timeout were deregistered. // Only sockets that had an associated timeout were deregistered.
.filter(|(_, info)| info.timeout_deadline.is_some()) .filter(|info| info.timeout.is_some())
.for_each(|(token, info)| { .for_each(|info| {
let inst = info.timeout_deadline.take().unwrap(); let inst = info.timeout.take().unwrap();
if now < inst { if now < inst {
// still timed out. try set new timeout. // still timed out. try set new timeout.
info.timeout_deadline = Some(inst); info.timeout = Some(inst);
self.set_timeout(inst - now); self.set_timeout(inst - now);
} else if !self.backpressure { } else if !self.paused {
// timeout expired register socket again. // timeout expired register socket again.
self.register_logged(token, info); self.register_logged(info);
} }
// Drop the timeout if server is in backpressure and socket timeout is expired. // Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from backpressure it will register all sockets without // When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then. // a timeout value so this socket register will be delayed till then.
}); });
} }
@ -306,31 +328,31 @@ impl Accept {
} }
#[cfg(not(target_os = "windows"))] #[cfg(not(target_os = "windows"))]
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
let token = MioToken(info.token);
self.poll self.poll
.registry() .registry()
.register(&mut info.lst, MioToken(token), Interest::READABLE) .register(&mut info.lst, token, Interest::READABLE)
} }
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
// On windows, calling register without deregister cause an error. // On windows, calling register without deregister cause an error.
// See https://github.com/actix/actix-web/issues/905 // See https://github.com/actix/actix-web/issues/905
// Calling reregister seems to fix the issue. // Calling reregister seems to fix the issue.
let token = MioToken(info.token);
self.poll self.poll
.registry() .registry()
.register(&mut info.lst, mio::Token(token), Interest::READABLE) .register(&mut info.lst, token, Interest::READABLE)
.or_else(|_| { .or_else(|_| {
self.poll.registry().reregister( self.poll
&mut info.lst, .registry()
mio::Token(token), .reregister(&mut info.lst, token, Interest::READABLE)
Interest::READABLE,
)
}) })
} }
fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) { fn register_logged(&self, info: &mut ServerSocketInfo) {
match self.register(token, info) { match self.register(info) {
Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()), Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
Err(e) => error!("Can not register server socket {}", e), Err(e) => error!("Can not register server socket {}", e),
} }
@ -345,7 +367,7 @@ impl Accept {
} }
} }
fn deregister_all(&self, sockets: &mut Slab<ServerSocketInfo>) { fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
// This is a best effort implementation with following limitation: // This is a best effort implementation with following limitation:
// //
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout // Every ServerSocketInfo with associate timeout will be skipped and it's timeout
@ -358,70 +380,23 @@ impl Accept {
.iter_mut() .iter_mut()
// Take all timeout. // Take all timeout.
// This is to prevent Accept::process_timer method re-register a socket afterwards. // This is to prevent Accept::process_timer method re-register a socket afterwards.
.map(|(_, info)| (info.timeout_deadline.take(), info)) .map(|info| (info.timeout.take(), info))
// Socket info with a timeout is already deregistered so skip them. // Socket info with a timeout is already deregistered so skip them.
.filter(|(timeout, _)| timeout.is_none()) .filter(|(timeout, _)| timeout.is_none())
.for_each(|(_, info)| self.deregister_logged(info)); .for_each(|(_, info)| self.deregister_logged(info));
} }
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
// Only operate when server is in a different backpressure than the given flag.
if self.backpressure != on {
self.backpressure = on;
sockets
.iter_mut()
// Only operate on sockets without associated timeout.
// Sockets with it should be handled by `accept` and `process_timer` methods.
// They are already deregistered or need to be reregister in the future.
.filter(|(_, info)| info.timeout_deadline.is_none())
.for_each(|(token, info)| {
if on {
self.deregister_logged(info);
} else {
self.register_logged(token, info);
}
});
}
}
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut conn: Conn) {
if self.backpressure {
// send_connection would remove fault worker from handles.
// worst case here is conn get dropped after all handles are gone.
while let Err(c) = self.send_connection(sockets, conn) {
conn = c
}
} else {
while self.avail.available() {
let next = self.next();
let idx = next.idx();
if next.available() {
self.avail.set_available(idx, true);
match self.send_connection(sockets, conn) {
Ok(_) => return,
Err(c) => conn = c,
}
} else {
self.avail.set_available(idx, false);
self.set_next();
}
}
// Sending Conn failed due to either all workers are in error or not available.
// Enter backpressure state and try again.
self.maybe_backpressure(sockets, true);
self.accept_one(sockets, conn);
}
}
// Send connection to worker and handle error. // Send connection to worker and handle error.
fn send_connection( fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> {
&mut self, let next = self.next();
sockets: &mut Slab<ServerSocketInfo>, match next.send(conn) {
conn: Conn,
) -> Result<(), Conn> {
match self.next().send(conn) {
Ok(_) => { Ok(_) => {
// Increment counter of WorkerHandle.
// Set worker to unavailable with it hit max (Return false).
if !next.inc_counter() {
let idx = next.idx();
self.avail.set_available(idx, false);
}
self.set_next(); self.set_next();
Ok(()) Ok(())
} }
@ -432,7 +407,6 @@ impl Accept {
if self.handles.is_empty() { if self.handles.is_empty() {
error!("No workers"); error!("No workers");
self.maybe_backpressure(sockets, true);
// All workers are gone and Conn is nowhere to be sent. // All workers are gone and Conn is nowhere to be sent.
// Treat this situation as Ok and drop Conn. // Treat this situation as Ok and drop Conn.
return Ok(()); return Ok(());
@ -445,17 +419,38 @@ impl Accept {
} }
} }
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) { fn accept_one(&mut self, mut conn: Conn) {
loop { loop {
let next = self.next();
let idx = next.idx();
if self.avail.get_available(idx) {
match self.send_connection(conn) {
Ok(_) => return,
Err(c) => conn = c,
}
} else {
self.avail.set_available(idx, false);
self.set_next();
if !self.avail.available() {
while let Err(c) = self.send_connection(conn) {
conn = c;
}
return;
}
}
}
}
fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {
while self.avail.available() {
let info = &mut sockets[token]; let info = &mut sockets[token];
match info.lst.accept() { match info.lst.accept() {
Ok(io) => { Ok(io) => {
let msg = Conn { let conn = Conn { io, token };
io, self.accept_one(conn);
token: info.token,
};
self.accept_one(sockets, msg);
} }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue, Err(ref e) if connection_error(e) => continue,
@ -468,7 +463,7 @@ impl Accept {
// sleep after error. write the timeout deadline to socket info // sleep after error. write the timeout deadline to socket info
// as later the poll would need it mark which socket and when // as later the poll would need it mark which socket and when
// it's listener should be registered again. // it's listener should be registered again.
info.timeout_deadline = Some(Instant::now() + DUR_ON_ERR); info.timeout = Some(Instant::now() + DUR_ON_ERR);
self.set_timeout(DUR_ON_ERR); self.set_timeout(DUR_ON_ERR);
return; return;
@ -477,6 +472,15 @@ impl Accept {
} }
} }
fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) {
sockets
.iter_mut()
.map(|info| info.token)
.collect::<Vec<_>>()
.into_iter()
.for_each(|idx| self.accept(sockets, idx))
}
#[inline(always)] #[inline(always)]
fn next(&self) -> &WorkerHandleAccept { fn next(&self) -> &WorkerHandleAccept {
&self.handles[self.next] &self.handles[self.next]

View File

@ -11,15 +11,14 @@ use crate::socket::{
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
}; };
use crate::worker::ServerWorkerConfig; use crate::worker::ServerWorkerConfig;
use crate::Token;
/// Server builder /// Server builder
pub struct ServerBuilder { pub struct ServerBuilder {
pub(super) threads: usize, pub(super) threads: usize,
token: Token, token: usize,
backlog: u32, backlog: u32,
pub(super) services: Vec<Box<dyn InternalServiceFactory>>, pub(super) services: Vec<Box<dyn InternalServiceFactory>>,
pub(super) sockets: Vec<(Token, String, MioListener)>, pub(super) sockets: Vec<(usize, String, MioListener)>,
pub(super) exit: bool, pub(super) exit: bool,
pub(super) no_signals: bool, pub(super) no_signals: bool,
pub(super) cmd_tx: UnboundedSender<ServerCommand>, pub(super) cmd_tx: UnboundedSender<ServerCommand>,
@ -39,7 +38,7 @@ impl ServerBuilder {
let (tx, rx) = unbounded_channel(); let (tx, rx) = unbounded_channel();
ServerBuilder { ServerBuilder {
threads: num_cpus::get(), threads: num_cpus::get(),
token: Token::default(), token: 0,
services: Vec::new(), services: Vec::new(),
sockets: Vec::new(), sockets: Vec::new(),
backlog: 2048, backlog: 2048,
@ -138,7 +137,7 @@ impl ServerBuilder {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog)?;
for lst in sockets { for lst in sockets {
let token = self.token.next(); let token = self.next_token();
self.services.push(StreamNewService::create( self.services.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
@ -187,7 +186,7 @@ impl ServerBuilder {
{ {
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
let token = self.token.next(); let token = self.next_token();
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create( self.services.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
@ -213,7 +212,7 @@ impl ServerBuilder {
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
let addr = lst.local_addr()?; let addr = lst.local_addr()?;
let token = self.token.next(); let token = self.next_token();
self.services.push(StreamNewService::create( self.services.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
@ -236,6 +235,12 @@ impl ServerBuilder {
Server::new(self) Server::new(self)
} }
} }
pub(crate) fn next_token(&mut self) -> usize {
let token = self.token;
self.token += 1;
token
}
} }
pub(super) fn bind_addr<S: ToSocketAddrs>( pub(super) fn bind_addr<S: ToSocketAddrs>(

View File

@ -22,28 +22,6 @@ pub use self::test_server::TestServer;
#[doc(hidden)] #[doc(hidden)]
pub use self::socket::FromStream; pub use self::socket::FromStream;
/// Socket ID token
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Token(usize);
impl Default for Token {
fn default() -> Self {
Self::new()
}
}
impl Token {
fn new() -> Self {
Self(0)
}
pub(crate) fn next(&mut self) -> Token {
let token = Token(self.0);
self.0 += 1;
token
}
}
/// Start server building process /// Start server building process
pub fn new() -> ServerBuilder { pub fn new() -> ServerBuilder {
ServerBuilder::default() ServerBuilder::default()

View File

@ -19,7 +19,7 @@ use crate::builder::ServerBuilder;
use crate::service::InternalServiceFactory; use crate::service::InternalServiceFactory;
use crate::signals::{Signal, Signals}; use crate::signals::{Signal, Signals};
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleServer}; use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer};
/// When awaited or spawned would listen to signal and message from [ServerHandle](ServerHandle). /// When awaited or spawned would listen to signal and message from [ServerHandle](ServerHandle).
#[must_use = "futures do nothing unless you `.await` or poll them"] #[must_use = "futures do nothing unless you `.await` or poll them"]
@ -198,14 +198,13 @@ impl ServerInner {
error!("Worker {} has died, restarting", idx); error!("Worker {} has died, restarting", idx);
let availability = WorkerAvailability::new(idx, self.waker_queue.clone());
let factories = self let factories = self
.services .services
.iter() .iter()
.map(|service| service.clone_factory()) .map(|service| service.clone_factory())
.collect(); .collect();
match ServerWorker::start(idx, factories, availability, self.worker_config) { match ServerWorker::start(idx, factories, self.waker_queue.clone(), self.worker_config) {
Ok((handle_accept, handle_server)) => { Ok((handle_accept, handle_server)) => {
*self *self
.handles .handles

View File

@ -3,15 +3,12 @@ use std::net::SocketAddr;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_service::{Service, ServiceFactory as BaseServiceFactory}; use actix_service::{Service, ServiceFactory as BaseServiceFactory};
use actix_utils::{ use actix_utils::future::{ready, Ready};
counter::CounterGuard,
future::{ready, Ready},
};
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use log::error; use log::error;
use crate::socket::{FromStream, MioStream}; use crate::socket::{FromStream, MioStream};
use crate::Token; use crate::worker::WorkerCounterGuard;
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: BaseServiceFactory<Stream, Config = ()>; type Factory: BaseServiceFactory<Stream, Config = ()>;
@ -20,16 +17,16 @@ pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
} }
pub(crate) trait InternalServiceFactory: Send { pub(crate) trait InternalServiceFactory: Send {
fn name(&self, token: Token) -> &str; fn name(&self, token: usize) -> &str;
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>; fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn create(&self) -> LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>>; fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
} }
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
dyn Service< dyn Service<
(CounterGuard, MioStream), (WorkerCounterGuard, MioStream),
Response = (), Response = (),
Error = (), Error = (),
Future = Ready<Result<(), ()>>, Future = Ready<Result<(), ()>>,
@ -50,7 +47,7 @@ impl<S, I> StreamService<S, I> {
} }
} }
impl<S, I> Service<(CounterGuard, MioStream)> for StreamService<S, I> impl<S, I> Service<(WorkerCounterGuard, MioStream)> for StreamService<S, I>
where where
S: Service<I>, S: Service<I>,
S::Future: 'static, S::Future: 'static,
@ -65,7 +62,7 @@ where
self.service.poll_ready(ctx).map_err(|_| ()) self.service.poll_ready(ctx).map_err(|_| ())
} }
fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future { fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future {
ready(match FromStream::from_mio(req) { ready(match FromStream::from_mio(req) {
Ok(stream) => { Ok(stream) => {
let f = self.service.call(stream); let f = self.service.call(stream);
@ -86,7 +83,7 @@ where
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> { pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
name: String, name: String,
inner: F, inner: F,
token: Token, token: usize,
addr: SocketAddr, addr: SocketAddr,
_t: PhantomData<Io>, _t: PhantomData<Io>,
} }
@ -98,7 +95,7 @@ where
{ {
pub(crate) fn create( pub(crate) fn create(
name: String, name: String,
token: Token, token: usize,
inner: F, inner: F,
addr: SocketAddr, addr: SocketAddr,
) -> Box<dyn InternalServiceFactory> { ) -> Box<dyn InternalServiceFactory> {
@ -117,7 +114,7 @@ where
F: ServiceFactory<Io>, F: ServiceFactory<Io>,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
{ {
fn name(&self, _: Token) -> &str { fn name(&self, _: usize) -> &str {
&self.name &self.name
} }
@ -131,7 +128,7 @@ where
}) })
} }
fn create(&self) -> LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>> { fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
let token = self.token; let token = self.token;
let fut = self.inner.create().new_service(()); let fut = self.inner.create().new_service(());
Box::pin(async move { Box::pin(async move {

View File

@ -2,8 +2,9 @@ use std::{
future::Future, future::Future,
io, mem, io, mem,
pin::Pin, pin::Pin,
rc::Rc,
sync::{ sync::{
atomic::{AtomicBool, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
}, },
task::{Context, Poll}, task::{Context, Poll},
@ -16,7 +17,6 @@ use actix_rt::{
time::{sleep, Instant, Sleep}, time::{sleep, Instant, Sleep},
System, System,
}; };
use actix_utils::counter::Counter;
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace}; use log::{error, info, trace};
use tokio::sync::{ use tokio::sync::{
@ -27,7 +27,6 @@ use tokio::sync::{
use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream; use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::Token;
/// Stop worker message. Returns `true` on successful graceful shutdown. /// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute. /// and `false` if some connections still alive when shutdown execute.
@ -39,35 +38,131 @@ struct Stop {
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Conn { pub(crate) struct Conn {
pub io: MioStream, pub io: MioStream,
pub token: Token, pub token: usize,
} }
fn handle_pair( fn handle_pair(
idx: usize, idx: usize,
tx1: UnboundedSender<Conn>, tx1: UnboundedSender<Conn>,
tx2: UnboundedSender<Stop>, tx2: UnboundedSender<Stop>,
avail: WorkerAvailability, counter: Counter,
) -> (WorkerHandleAccept, WorkerHandleServer) { ) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept { tx: tx1, avail }; let accept = WorkerHandleAccept {
idx,
tx: tx1,
counter,
};
let server = WorkerHandleServer { idx, tx: tx2 }; let server = WorkerHandleServer { idx, tx: tx2 };
(accept, server) (accept, server)
} }
/// counter: Arc<AtomicUsize> field is owned by `Accept` thread and `ServerWorker` thread.
///
/// `Accept` would increment the counter and `ServerWorker` would decrement it.
///
/// # Atomic Ordering:
///
/// `Accept` always look into it's cached `Availability` field for `ServerWorker` state.
/// It lazily increment counter after successful dispatching new work to `ServerWorker`.
/// On reaching counter limit `Accept` update it's cached `Availability` and mark worker as
/// unable to accept any work.
///
/// `ServerWorker` always decrement the counter when every work received from `Accept` is done.
/// On reaching counter limit worker would use `mio::Waker` and `WakerQueue` to wake up `Accept`
/// and notify it to update cached `Availability` again to mark worker as able to accept work again.
///
/// Hense a wake up would only happen after `Accept` increment it to limit.
/// And a decrement to limit always wake up `Accept`.
#[derive(Clone)]
pub(crate) struct Counter {
counter: Arc<AtomicUsize>,
limit: usize,
}
impl Counter {
pub(crate) fn new(limit: usize) -> Self {
Self {
counter: Arc::new(AtomicUsize::new(1)),
limit,
}
}
/// Increment counter by 1 and return true when hitting limit
#[inline(always)]
pub(crate) fn inc(&self) -> bool {
self.counter.fetch_add(1, Ordering::Relaxed) != self.limit
}
/// Decrement counter by 1 and return true if crossing limit.
#[inline(always)]
pub(crate) fn dec(&self) -> bool {
self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit
}
pub(crate) fn total(&self) -> usize {
self.counter.load(Ordering::SeqCst) - 1
}
}
pub(crate) struct WorkerCounter {
idx: usize,
inner: Rc<(WakerQueue, Counter)>,
}
impl Clone for WorkerCounter {
fn clone(&self) -> Self {
Self {
idx: self.idx,
inner: self.inner.clone(),
}
}
}
impl WorkerCounter {
pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self {
Self {
idx,
inner: Rc::new((waker_queue, counter)),
}
}
#[inline(always)]
pub(crate) fn guard(&self) -> WorkerCounterGuard {
WorkerCounterGuard(self.clone())
}
fn total(&self) -> usize {
self.inner.1.total()
}
}
pub(crate) struct WorkerCounterGuard(WorkerCounter);
impl Drop for WorkerCounterGuard {
fn drop(&mut self) {
let (waker_queue, counter) = &*self.0.inner;
if counter.dec() {
waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx));
}
}
}
/// Handle to worker that can send connection message to worker and share the /// Handle to worker that can send connection message to worker and share the
/// availability of worker to other thread. /// availability of worker to other thread.
/// ///
/// Held by [Accept](crate::accept::Accept). /// Held by [Accept](crate::accept::Accept).
pub(crate) struct WorkerHandleAccept { pub(crate) struct WorkerHandleAccept {
idx: usize,
tx: UnboundedSender<Conn>, tx: UnboundedSender<Conn>,
avail: WorkerAvailability, counter: Counter,
} }
impl WorkerHandleAccept { impl WorkerHandleAccept {
#[inline(always)] #[inline(always)]
pub(crate) fn idx(&self) -> usize { pub(crate) fn idx(&self) -> usize {
self.avail.idx self.idx
} }
#[inline(always)] #[inline(always)]
@ -76,8 +171,8 @@ impl WorkerHandleAccept {
} }
#[inline(always)] #[inline(always)]
pub(crate) fn available(&self) -> bool { pub(crate) fn inc_counter(&self) -> bool {
self.avail.available() self.counter.inc()
} }
} }
@ -97,40 +192,6 @@ impl WorkerHandleServer {
} }
} }
#[derive(Clone)]
pub(crate) struct WorkerAvailability {
idx: usize,
waker: WakerQueue,
available: Arc<AtomicBool>,
}
impl WorkerAvailability {
pub fn new(idx: usize, waker: WakerQueue) -> Self {
WorkerAvailability {
idx,
waker,
available: Arc::new(AtomicBool::new(false)),
}
}
#[inline(always)]
pub fn available(&self) -> bool {
self.available.load(Ordering::Acquire)
}
pub fn set(&self, val: bool) {
// Ordering:
//
// There could be multiple set calls happen in one <ServerWorker as Future>::poll.
// Order is important between them.
let old = self.available.swap(val, Ordering::AcqRel);
// Notify the accept on switched to available.
if !old && val {
self.waker.wake(WakerInterest::WorkerAvailable(self.idx));
}
}
}
/// Service worker. /// Service worker.
/// ///
/// Worker accepts Socket objects via unbounded channel and starts stream processing. /// Worker accepts Socket objects via unbounded channel and starts stream processing.
@ -139,9 +200,8 @@ pub(crate) struct ServerWorker {
// It must be dropped as soon as ServerWorker dropping. // It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<Conn>, rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>, rx2: UnboundedReceiver<Stop>,
counter: WorkerCounter,
services: Box<[WorkerService]>, services: Box<[WorkerService]>,
availability: WorkerAvailability,
conns: Counter,
factories: Box<[Box<dyn InternalServiceFactory>]>, factories: Box<[Box<dyn InternalServiceFactory>]>,
state: WorkerState, state: WorkerState,
shutdown_timeout: Duration, shutdown_timeout: Duration,
@ -209,20 +269,29 @@ impl ServerWorker {
pub(crate) fn start( pub(crate) fn start(
idx: usize, idx: usize,
factories: Box<[Box<dyn InternalServiceFactory>]>, factories: Box<[Box<dyn InternalServiceFactory>]>,
avail: WorkerAvailability, waker_queue: WakerQueue,
config: ServerWorkerConfig, config: ServerWorkerConfig,
) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
let counter = Counter::new(config.max_concurrent_connections);
let counter_clone = counter.clone();
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1); let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1);
let (tx1, tx2) = Self::_start(idx, factories, avail.clone(), config, move |err| { let (tx1, tx2) = Self::_start(
factory_tx.send(err).unwrap() idx,
})?; factories,
waker_queue,
counter_clone,
config,
move |err| factory_tx.send(err).unwrap(),
)?;
factory_rx factory_rx
.recv() .recv()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))? .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.map(Err) .map(Err)
.unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail))) .unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, counter)))
} }
// TODO: Use start_non_blocking when restarting worker. // TODO: Use start_non_blocking when restarting worker.
@ -231,14 +300,23 @@ impl ServerWorker {
pub(crate) fn start_non_blocking( pub(crate) fn start_non_blocking(
idx: usize, idx: usize,
factories: Box<[Box<dyn InternalServiceFactory>]>, factories: Box<[Box<dyn InternalServiceFactory>]>,
avail: WorkerAvailability, waker_queue: WakerQueue,
config: ServerWorkerConfig, config: ServerWorkerConfig,
) -> impl Future<Output = io::Result<(WorkerHandleAccept, WorkerHandleServer)>> { ) -> impl Future<Output = io::Result<(WorkerHandleAccept, WorkerHandleServer)>> {
let counter = Counter::new(config.max_concurrent_connections);
let counter_clone = counter.clone();
let (factory_tx, factory_rx) = oneshot::channel(); let (factory_tx, factory_rx) = oneshot::channel();
let res = Self::_start(idx, factories, avail.clone(), config, move |err| { let res = Self::_start(
factory_tx.send(err).unwrap() idx,
}); factories,
waker_queue,
counter_clone,
config,
move |err| factory_tx.send(err).unwrap(),
);
async move { async move {
let (tx1, tx2) = res?; let (tx1, tx2) = res?;
@ -246,22 +324,21 @@ impl ServerWorker {
.await .await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))? .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.map(Err) .map(Err)
.unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail))) .unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, counter)))
} }
} }
fn _start<F>( fn _start<F>(
idx: usize, idx: usize,
factories: Box<[Box<dyn InternalServiceFactory>]>, factories: Box<[Box<dyn InternalServiceFactory>]>,
availability: WorkerAvailability, waker_queue: WakerQueue,
counter: Counter,
config: ServerWorkerConfig, config: ServerWorkerConfig,
f: F, f: F,
) -> io::Result<(UnboundedSender<Conn>, UnboundedSender<Stop>)> ) -> io::Result<(UnboundedSender<Conn>, UnboundedSender<Stop>)>
where where
F: FnOnce(Option<io::Error>) + Send + 'static, F: FnOnce(Option<io::Error>) + Send + 'static,
{ {
assert!(!availability.available());
// Try to get actix system. // Try to get actix system.
let system = System::try_current(); let system = System::try_current();
@ -283,14 +360,14 @@ impl ServerWorker {
io::Error::new(io::ErrorKind::Other, "Can not start worker service") io::Error::new(io::ErrorKind::Other, "Can not start worker service")
})?; })?;
assert_eq!(token.0, services.len()); assert_eq!(token, services.len());
services.push(WorkerService { services.push(WorkerService {
factory: idx, factory: idx,
service, service,
status: WorkerServiceStatus::Unavailable, status: WorkerServiceStatus::Unavailable,
}); });
} }
Ok::<_, io::Error>(services) Ok::<_, io::Error>(services.into_boxed_slice())
}; };
// All future runs in a LocalSet for being able to run !Send future. // All future runs in a LocalSet for being able to run !Send future.
@ -306,9 +383,8 @@ impl ServerWorker {
let worker = ServerWorker { let worker = ServerWorker {
rx, rx,
rx2, rx2,
services: services.into_boxed_slice(), services,
availability, counter: WorkerCounter::new(idx, waker_queue, counter),
conns: Counter::new(config.max_concurrent_connections),
factories, factories,
state: Default::default(), state: Default::default(),
shutdown_timeout: config.shutdown_timeout, shutdown_timeout: config.shutdown_timeout,
@ -346,9 +422,8 @@ impl ServerWorker {
let worker = ServerWorker { let worker = ServerWorker {
rx, rx,
rx2, rx2,
services: services.into_boxed_slice(), services,
availability, counter: WorkerCounter::new(idx, waker_queue, counter),
conns: Counter::new(config.max_concurrent_connections),
factories, factories,
state: Default::default(), state: Default::default(),
shutdown_timeout: config.shutdown_timeout, shutdown_timeout: config.shutdown_timeout,
@ -371,13 +446,13 @@ impl ServerWorker {
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
} }
fn restart_service(&mut self, token: Token, factory_id: usize) { fn restart_service(&mut self, idx: usize, factory_id: usize) {
let factory = &self.factories[factory_id]; let factory = &self.factories[factory_id];
trace!("Service {:?} failed, restarting", factory.name(token)); trace!("Service {:?} failed, restarting", factory.name(idx));
self.services[token.0].status = WorkerServiceStatus::Restarting; self.services[idx].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(Restart { self.state = WorkerState::Restarting(Restart {
factory_id, factory_id,
token, token: idx,
fut: factory.create(), fut: factory.create(),
}); });
} }
@ -395,8 +470,8 @@ impl ServerWorker {
}); });
} }
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> { fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (usize, usize)> {
let mut ready = self.conns.available(cx); let mut ready = true;
for (idx, srv) in self.services.iter_mut().enumerate() { for (idx, srv) in self.services.iter_mut().enumerate() {
if srv.status == WorkerServiceStatus::Available if srv.status == WorkerServiceStatus::Available
|| srv.status == WorkerServiceStatus::Unavailable || srv.status == WorkerServiceStatus::Unavailable
@ -406,7 +481,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Unavailable { if srv.status == WorkerServiceStatus::Unavailable {
trace!( trace!(
"Service {:?} is available", "Service {:?} is available",
self.factories[srv.factory].name(Token(idx)) self.factories[srv.factory].name(idx)
); );
srv.status = WorkerServiceStatus::Available; srv.status = WorkerServiceStatus::Available;
} }
@ -417,7 +492,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Available { if srv.status == WorkerServiceStatus::Available {
trace!( trace!(
"Service {:?} is unavailable", "Service {:?} is unavailable",
self.factories[srv.factory].name(Token(idx)) self.factories[srv.factory].name(idx)
); );
srv.status = WorkerServiceStatus::Unavailable; srv.status = WorkerServiceStatus::Unavailable;
} }
@ -425,10 +500,10 @@ impl ServerWorker {
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
error!( error!(
"Service {:?} readiness check returned error, restarting", "Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(Token(idx)) self.factories[srv.factory].name(idx)
); );
srv.status = WorkerServiceStatus::Failed; srv.status = WorkerServiceStatus::Failed;
return Err((Token(idx), srv.factory)); return Err((idx, srv.factory));
} }
} }
} }
@ -447,8 +522,8 @@ enum WorkerState {
struct Restart { struct Restart {
factory_id: usize, factory_id: usize,
token: Token, token: usize,
fut: LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>>, fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>,
} }
// Shutdown keep states necessary for server shutdown: // Shutdown keep states necessary for server shutdown:
@ -467,15 +542,6 @@ impl Default for WorkerState {
} }
} }
impl Drop for ServerWorker {
fn drop(&mut self) {
// Set availability to true so if accept try to send connection to this worker
// it would find worker is gone and remove it.
// This is helpful when worker is dropped unexpected.
self.availability.set(true);
}
}
impl Future for ServerWorker { impl Future for ServerWorker {
type Output = (); type Output = ();
@ -485,8 +551,7 @@ impl Future for ServerWorker {
// `StopWorker` message handler // `StopWorker` message handler
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
{ {
this.availability.set(false); let num = this.counter.total();
let num = this.conns.total();
if num == 0 { if num == 0 {
info!("Shutting down worker, 0 connections"); info!("Shutting down worker, 0 connections");
let _ = tx.send(true); let _ = tx.send(true);
@ -513,7 +578,6 @@ impl Future for ServerWorker {
WorkerState::Unavailable => match this.check_readiness(cx) { WorkerState::Unavailable => match this.check_readiness(cx) {
Ok(true) => { Ok(true) => {
this.state = WorkerState::Available; this.state = WorkerState::Available;
this.availability.set(true);
self.poll(cx) self.poll(cx)
} }
Ok(false) => Poll::Pending, Ok(false) => Poll::Pending,
@ -541,7 +605,7 @@ impl Future for ServerWorker {
this.factories[factory_id].name(token) this.factories[factory_id].name(token)
); );
this.services[token.0].created(service); this.services[token].created(service);
this.state = WorkerState::Unavailable; this.state = WorkerState::Unavailable;
self.poll(cx) self.poll(cx)
@ -550,7 +614,7 @@ impl Future for ServerWorker {
// Wait for 1 second. // Wait for 1 second.
ready!(shutdown.timer.as_mut().poll(cx)); ready!(shutdown.timer.as_mut().poll(cx));
if this.conns.total() == 0 { if this.counter.total() == 0 {
// Graceful shutdown. // Graceful shutdown.
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(true); let _ = shutdown.tx.send(true);
@ -575,22 +639,20 @@ impl Future for ServerWorker {
Ok(true) => {} Ok(true) => {}
Ok(false) => { Ok(false) => {
trace!("Worker is unavailable"); trace!("Worker is unavailable");
this.availability.set(false);
this.state = WorkerState::Unavailable; this.state = WorkerState::Unavailable;
return self.poll(cx); return self.poll(cx);
} }
Err((token, idx)) => { Err((token, idx)) => {
this.restart_service(token, idx); this.restart_service(token, idx);
this.availability.set(false);
return self.poll(cx); return self.poll(cx);
} }
} }
// handle incoming io stream
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
// handle incoming io stream
Some(msg) => { Some(msg) => {
let guard = this.conns.get(); let guard = this.counter.guard();
let _ = this.services[msg.token.0].service.call((guard, msg.io)); let _ = this.services[msg.token].service.call((guard, msg.io));
} }
None => return Poll::Ready(()), None => return Poll::Ready(()),
}; };