diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index c48a5dc3..02c175db 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -28,7 +28,6 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc log = "0.4" mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" -slab = "0.4" tokio = { version = "1.4", features = ["sync"] } [dev-dependencies] @@ -38,4 +37,4 @@ actix-rt = "2.0.0" bytes = "1" env_logger = "0.8" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } -tokio = { version = "1", features = ["io-util", "macros", "rt-multi-thread"] } +tokio = { version = "1.4", features = ["io-util", "macros", "rt-multi-thread"] } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 81dd87a1..c7540ed9 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -3,28 +3,23 @@ use std::{io, thread}; use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; -use slab::Slab; use crate::builder::ServerBuilder; use crate::server::ServerHandle; use crate::socket::MioListener; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; -use crate::worker::{ - Conn, ServerWorker, WorkerAvailability, WorkerHandleAccept, WorkerHandleServer, -}; -use crate::Token; +use crate::worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer}; const DUR_ON_ERR: Duration = Duration::from_millis(500); struct ServerSocketInfo { - /// Beware this is the crate token for identify socket and should not be confused - /// with `mio::Token`. - token: Token, + token: usize, lst: MioListener, - // mark the deadline when this socket's listener should be registered again - timeout_deadline: Option, + /// Timeout is used to mark the deadline when this socket's listener should be registered again + /// after an error. + timeout: Option, } /// poll instance of the server. @@ -35,10 +30,9 @@ pub(crate) struct Accept { srv: ServerHandle, next: usize, avail: Availability, - backpressure: bool, - // poll time out duration. - // use the smallest duration from sockets timeout_deadline. + // use the smallest duration from sockets timeout. timeout: Option, + paused: bool, } /// Array of u128 with every bit as marker for a worker handle's availability. @@ -52,23 +46,22 @@ impl Default for Availability { impl Availability { /// Check if any worker handle is available + #[inline(always)] fn available(&self) -> bool { self.0.iter().any(|a| *a != 0) } + /// Check if worker handle is available by index + #[inline(always)] + fn get_available(&self, idx: usize) -> bool { + let (offset, idx) = Self::offset(idx); + + self.0[offset] & (1 << idx as u128) != 0 + } + /// Set worker handle available state by index. fn set_available(&mut self, idx: usize, avail: bool) { - let (offset, idx) = if idx < 128 { - (0, idx) - } else if idx < 128 * 2 { - (1, idx - 128) - } else if idx < 128 * 3 { - (2, idx - 128 * 2) - } else if idx < 128 * 4 { - (3, idx - 128 * 3) - } else { - panic!("Max WorkerHandle count is 512") - }; + let (offset, idx) = Self::offset(idx); let off = 1 << idx as u128; if avail { @@ -85,6 +78,21 @@ impl Availability { self.set_available(handle.idx(), true); }) } + + /// Get offset and adjusted index of given worker handle index. + fn offset(idx: usize) -> (usize, usize) { + if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + } + } } /// This function defines errors that are per-connection. Which basically @@ -102,7 +110,7 @@ fn connection_error(e: &io::Error) -> bool { impl Accept { pub(crate) fn start( - sockets: Vec<(Token, MioListener)>, + sockets: Vec<(usize, MioListener)>, builder: &ServerBuilder, ) -> io::Result<(WakerQueue, Vec)> { let server_handle = ServerHandle::new(builder.cmd_tx.clone()); @@ -115,16 +123,15 @@ impl Accept { let (handles_accept, handles_server) = (0..builder.threads) .map(|idx| { // start workers - let availability = WorkerAvailability::new(idx, waker_queue.clone()); let factories = builder.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, factories, availability, builder.worker_config) + ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config) }) .collect::, io::Error>>()? .into_iter() .unzip(); - let (mut accept, sockets) = Accept::new_with_sockets( + let (mut accept, mut sockets) = Accept::new_with_sockets( poll, waker_queue.clone(), sockets, @@ -135,7 +142,7 @@ impl Accept { // Accept runs in its own thread. thread::Builder::new() .name("actix-server acceptor".to_owned()) - .spawn(move || accept.poll_with(sockets)) + .spawn(move || accept.poll_with(&mut sockets)) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; // return waker and worker handle clones to server builder. @@ -145,25 +152,24 @@ impl Accept { fn new_with_sockets( poll: Poll, waker_queue: WakerQueue, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, handles: Vec, srv: ServerHandle, - ) -> io::Result<(Accept, Slab)> { - let mut sockets = Slab::new(); - for (hnd_token, mut lst) in socks.into_iter() { - let entry = sockets.vacant_entry(); - let token = entry.key(); + ) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> { + let sockets = socks + .into_iter() + .map(|(token, mut lst)| { + // Start listening for incoming connections + poll.registry() + .register(&mut lst, MioToken(token), Interest::READABLE)?; - // Start listening for incoming connections - poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE)?; - - entry.insert(ServerSocketInfo { - token: hnd_token, - lst, - timeout_deadline: None, - }); - } + Ok(ServerSocketInfo { + token, + lst, + timeout: None, + }) + }) + .collect::>()?; let mut avail = Availability::default(); @@ -177,46 +183,47 @@ impl Accept { srv, next: 0, avail, - backpressure: false, timeout: None, + paused: false, }; Ok((accept, sockets)) } - fn poll_with(&mut self, mut sockets: Slab) { + fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { let mut events = mio::Events::with_capacity(128); loop { - match self.poll.poll(&mut events, self.timeout) { - Ok(_) => {} - Err(e) if e.kind() == io::ErrorKind::Interrupted => {} - Err(e) => panic!("Poll error: {}", e), + if let Err(e) = self.poll.poll(&mut events, None) { + match e.kind() { + io::ErrorKind::Interrupted => continue, + _ => panic!("Poll error: {}", e), + } } for event in events.iter() { let token = event.token(); match token { WAKER_TOKEN => { - let should_return = self.handle_waker(&mut sockets); - if should_return { + let exit = self.handle_waker(sockets); + if exit { + info!("Accept is stopped."); return; } } _ => { let token = usize::from(token); - self.accept(&mut sockets, token) + self.accept(sockets, token); } } } // check for timeout and re-register sockets. - self.process_timeout(&mut sockets); + self.process_timeout(sockets); } } - /// Return true to notify `Accept::poll_with` to return. - fn handle_waker(&mut self, sockets: &mut Slab) -> bool { + fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool { // This is a loop because interests for command from previous version was // a loop that would try to drain the command channel. It's yet unknown // if it's necessary/good practice to actively drain the waker queue. @@ -231,7 +238,10 @@ impl Accept { drop(guard); self.avail.set_available(idx, true); - self.maybe_backpressure(sockets, false); + + if !self.paused { + self.accept_all(sockets); + } } // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { @@ -239,55 +249,67 @@ impl Accept { self.avail.set_available(handle.idx(), true); self.handles.push(handle); - // maybe we want to recover from a backpressure. - self.maybe_backpressure(sockets, false); + + if !self.paused { + self.accept_all(sockets); + } } Some(WakerInterest::Pause) => { drop(guard); + + self.paused = true; + self.deregister_all(sockets); } Some(WakerInterest::Resume) => { drop(guard); - sockets.iter_mut().for_each(|(token, info)| { - self.register_logged(token, info); + + self.paused = false; + + sockets.iter_mut().for_each(|info| { + self.register_logged(info); }); + + self.accept_all(sockets); } Some(WakerInterest::Stop) => { self.deregister_all(sockets); + return true; } // waker queue is drained None => { // Reset the WakerQueue before break so it does not grow infinitely WakerQueue::reset(&mut guard); + return false; } } } } - fn process_timeout(&mut self, sockets: &mut Slab) { + fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) { // Take old timeout as it's no use after each iteration. if self.timeout.take().is_some() { let now = Instant::now(); sockets .iter_mut() // Only sockets that had an associated timeout were deregistered. - .filter(|(_, info)| info.timeout_deadline.is_some()) - .for_each(|(token, info)| { - let inst = info.timeout_deadline.take().unwrap(); + .filter(|info| info.timeout.is_some()) + .for_each(|info| { + let inst = info.timeout.take().unwrap(); if now < inst { // still timed out. try set new timeout. - info.timeout_deadline = Some(inst); + info.timeout = Some(inst); self.set_timeout(inst - now); - } else if !self.backpressure { + } else if !self.paused { // timeout expired register socket again. - self.register_logged(token, info); + self.register_logged(info); } - // Drop the timeout if server is in backpressure and socket timeout is expired. - // When server recovers from backpressure it will register all sockets without + // Drop the timeout if server is paused and socket timeout is expired. + // When server recovers from pause it will register all sockets without // a timeout value so this socket register will be delayed till then. }); } @@ -306,31 +328,31 @@ impl Accept { } #[cfg(not(target_os = "windows"))] - fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { + let token = MioToken(info.token); self.poll .registry() - .register(&mut info.lst, MioToken(token), Interest::READABLE) + .register(&mut info.lst, token, Interest::READABLE) } #[cfg(target_os = "windows")] - fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { // On windows, calling register without deregister cause an error. // See https://github.com/actix/actix-web/issues/905 // Calling reregister seems to fix the issue. + let token = MioToken(info.token); self.poll .registry() - .register(&mut info.lst, mio::Token(token), Interest::READABLE) + .register(&mut info.lst, token, Interest::READABLE) .or_else(|_| { - self.poll.registry().reregister( - &mut info.lst, - mio::Token(token), - Interest::READABLE, - ) + self.poll + .registry() + .reregister(&mut info.lst, token, Interest::READABLE) }) } - fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) { - match self.register(token, info) { + fn register_logged(&self, info: &mut ServerSocketInfo) { + match self.register(info) { Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()), Err(e) => error!("Can not register server socket {}", e), } @@ -345,7 +367,7 @@ impl Accept { } } - fn deregister_all(&self, sockets: &mut Slab) { + fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { // This is a best effort implementation with following limitation: // // Every ServerSocketInfo with associate timeout will be skipped and it's timeout @@ -358,70 +380,23 @@ impl Accept { .iter_mut() // Take all timeout. // This is to prevent Accept::process_timer method re-register a socket afterwards. - .map(|(_, info)| (info.timeout_deadline.take(), info)) + .map(|info| (info.timeout.take(), info)) // Socket info with a timeout is already deregistered so skip them. .filter(|(timeout, _)| timeout.is_none()) .for_each(|(_, info)| self.deregister_logged(info)); } - fn maybe_backpressure(&mut self, sockets: &mut Slab, on: bool) { - // Only operate when server is in a different backpressure than the given flag. - if self.backpressure != on { - self.backpressure = on; - sockets - .iter_mut() - // Only operate on sockets without associated timeout. - // Sockets with it should be handled by `accept` and `process_timer` methods. - // They are already deregistered or need to be reregister in the future. - .filter(|(_, info)| info.timeout_deadline.is_none()) - .for_each(|(token, info)| { - if on { - self.deregister_logged(info); - } else { - self.register_logged(token, info); - } - }); - } - } - - fn accept_one(&mut self, sockets: &mut Slab, mut conn: Conn) { - if self.backpressure { - // send_connection would remove fault worker from handles. - // worst case here is conn get dropped after all handles are gone. - while let Err(c) = self.send_connection(sockets, conn) { - conn = c - } - } else { - while self.avail.available() { - let next = self.next(); - let idx = next.idx(); - if next.available() { - self.avail.set_available(idx, true); - match self.send_connection(sockets, conn) { - Ok(_) => return, - Err(c) => conn = c, - } - } else { - self.avail.set_available(idx, false); - self.set_next(); - } - } - - // Sending Conn failed due to either all workers are in error or not available. - // Enter backpressure state and try again. - self.maybe_backpressure(sockets, true); - self.accept_one(sockets, conn); - } - } - // Send connection to worker and handle error. - fn send_connection( - &mut self, - sockets: &mut Slab, - conn: Conn, - ) -> Result<(), Conn> { - match self.next().send(conn) { + fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> { + let next = self.next(); + match next.send(conn) { Ok(_) => { + // Increment counter of WorkerHandle. + // Set worker to unavailable with it hit max (Return false). + if !next.inc_counter() { + let idx = next.idx(); + self.avail.set_available(idx, false); + } self.set_next(); Ok(()) } @@ -432,7 +407,6 @@ impl Accept { if self.handles.is_empty() { error!("No workers"); - self.maybe_backpressure(sockets, true); // All workers are gone and Conn is nowhere to be sent. // Treat this situation as Ok and drop Conn. return Ok(()); @@ -445,17 +419,38 @@ impl Accept { } } - fn accept(&mut self, sockets: &mut Slab, token: usize) { + fn accept_one(&mut self, mut conn: Conn) { loop { + let next = self.next(); + let idx = next.idx(); + + if self.avail.get_available(idx) { + match self.send_connection(conn) { + Ok(_) => return, + Err(c) => conn = c, + } + } else { + self.avail.set_available(idx, false); + self.set_next(); + + if !self.avail.available() { + while let Err(c) = self.send_connection(conn) { + conn = c; + } + return; + } + } + } + } + + fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { + while self.avail.available() { let info = &mut sockets[token]; match info.lst.accept() { Ok(io) => { - let msg = Conn { - io, - token: info.token, - }; - self.accept_one(sockets, msg); + let conn = Conn { io, token }; + self.accept_one(conn); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, @@ -468,7 +463,7 @@ impl Accept { // sleep after error. write the timeout deadline to socket info // as later the poll would need it mark which socket and when // it's listener should be registered again. - info.timeout_deadline = Some(Instant::now() + DUR_ON_ERR); + info.timeout = Some(Instant::now() + DUR_ON_ERR); self.set_timeout(DUR_ON_ERR); return; @@ -477,6 +472,15 @@ impl Accept { } } + fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) { + sockets + .iter_mut() + .map(|info| info.token) + .collect::>() + .into_iter() + .for_each(|idx| self.accept(sockets, idx)) + } + #[inline(always)] fn next(&self) -> &WorkerHandleAccept { &self.handles[self.next] diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 3971932b..1888d972 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -11,15 +11,14 @@ use crate::socket::{ MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, }; use crate::worker::ServerWorkerConfig; -use crate::Token; /// Server builder pub struct ServerBuilder { pub(super) threads: usize, - token: Token, + token: usize, backlog: u32, pub(super) services: Vec>, - pub(super) sockets: Vec<(Token, String, MioListener)>, + pub(super) sockets: Vec<(usize, String, MioListener)>, pub(super) exit: bool, pub(super) no_signals: bool, pub(super) cmd_tx: UnboundedSender, @@ -39,7 +38,7 @@ impl ServerBuilder { let (tx, rx) = unbounded_channel(); ServerBuilder { threads: num_cpus::get(), - token: Token::default(), + token: 0, services: Vec::new(), sockets: Vec::new(), backlog: 2048, @@ -138,7 +137,7 @@ impl ServerBuilder { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { - let token = self.token.next(); + let token = self.next_token(); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -187,7 +186,7 @@ impl ServerBuilder { { use std::net::{IpAddr, Ipv4Addr}; lst.set_nonblocking(true)?; - let token = self.token.next(); + let token = self.next_token(); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); self.services.push(StreamNewService::create( name.as_ref().to_string(), @@ -213,7 +212,7 @@ impl ServerBuilder { lst.set_nonblocking(true)?; let addr = lst.local_addr()?; - let token = self.token.next(); + let token = self.next_token(); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -236,6 +235,12 @@ impl ServerBuilder { Server::new(self) } } + + pub(crate) fn next_token(&mut self) -> usize { + let token = self.token; + self.token += 1; + token + } } pub(super) fn bind_addr( diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index a11e62a7..7805e5fa 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -22,28 +22,6 @@ pub use self::test_server::TestServer; #[doc(hidden)] pub use self::socket::FromStream; -/// Socket ID token -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -pub(crate) struct Token(usize); - -impl Default for Token { - fn default() -> Self { - Self::new() - } -} - -impl Token { - fn new() -> Self { - Self(0) - } - - pub(crate) fn next(&mut self) -> Token { - let token = Token(self.0); - self.0 += 1; - token - } -} - /// Start server building process pub fn new() -> ServerBuilder { ServerBuilder::default() diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index f37f2867..743d4e39 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -19,7 +19,7 @@ use crate::builder::ServerBuilder; use crate::service::InternalServiceFactory; use crate::signals::{Signal, Signals}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleServer}; +use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}; /// When awaited or spawned would listen to signal and message from [ServerHandle](ServerHandle). #[must_use = "futures do nothing unless you `.await` or poll them"] @@ -198,14 +198,13 @@ impl ServerInner { error!("Worker {} has died, restarting", idx); - let availability = WorkerAvailability::new(idx, self.waker_queue.clone()); let factories = self .services .iter() .map(|service| service.clone_factory()) .collect(); - match ServerWorker::start(idx, factories, availability, self.worker_config) { + match ServerWorker::start(idx, factories, self.waker_queue.clone(), self.worker_config) { Ok((handle_accept, handle_server)) => { *self .handles diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index d0eea966..28ffb4f1 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -3,15 +3,12 @@ use std::net::SocketAddr; use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory as BaseServiceFactory}; -use actix_utils::{ - counter::CounterGuard, - future::{ready, Ready}, -}; +use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; use log::error; use crate::socket::{FromStream, MioStream}; -use crate::Token; +use crate::worker::WorkerCounterGuard; pub trait ServiceFactory: Send + Clone + 'static { type Factory: BaseServiceFactory; @@ -20,16 +17,16 @@ pub trait ServiceFactory: Send + Clone + 'static { } pub(crate) trait InternalServiceFactory: Send { - fn name(&self, token: Token) -> &str; + fn name(&self, token: usize) -> &str; fn clone_factory(&self) -> Box; - fn create(&self) -> LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>>; + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>; } pub(crate) type BoxedServerService = Box< dyn Service< - (CounterGuard, MioStream), + (WorkerCounterGuard, MioStream), Response = (), Error = (), Future = Ready>, @@ -50,7 +47,7 @@ impl StreamService { } } -impl Service<(CounterGuard, MioStream)> for StreamService +impl Service<(WorkerCounterGuard, MioStream)> for StreamService where S: Service, S::Future: 'static, @@ -65,7 +62,7 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future { + fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future { ready(match FromStream::from_mio(req) { Ok(stream) => { let f = self.service.call(stream); @@ -86,7 +83,7 @@ where pub(crate) struct StreamNewService, Io: FromStream> { name: String, inner: F, - token: Token, + token: usize, addr: SocketAddr, _t: PhantomData, } @@ -98,7 +95,7 @@ where { pub(crate) fn create( name: String, - token: Token, + token: usize, inner: F, addr: SocketAddr, ) -> Box { @@ -117,7 +114,7 @@ where F: ServiceFactory, Io: FromStream + Send + 'static, { - fn name(&self, _: Token) -> &str { + fn name(&self, _: usize) -> &str { &self.name } @@ -131,7 +128,7 @@ where }) } - fn create(&self) -> LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { let token = self.token; let fut = self.inner.create().new_service(()); Box::pin(async move { diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index ab8ee460..c1f8b2e0 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -2,8 +2,9 @@ use std::{ future::Future, io, mem, pin::Pin, + rc::Rc, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }, task::{Context, Poll}, @@ -16,7 +17,6 @@ use actix_rt::{ time::{sleep, Instant, Sleep}, System, }; -use actix_utils::counter::Counter; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; use tokio::sync::{ @@ -27,7 +27,6 @@ use tokio::sync::{ use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::Token; /// Stop worker message. Returns `true` on successful graceful shutdown. /// and `false` if some connections still alive when shutdown execute. @@ -39,35 +38,131 @@ struct Stop { #[derive(Debug)] pub(crate) struct Conn { pub io: MioStream, - pub token: Token, + pub token: usize, } fn handle_pair( idx: usize, tx1: UnboundedSender, tx2: UnboundedSender, - avail: WorkerAvailability, + counter: Counter, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let accept = WorkerHandleAccept { tx: tx1, avail }; + let accept = WorkerHandleAccept { + idx, + tx: tx1, + counter, + }; let server = WorkerHandleServer { idx, tx: tx2 }; (accept, server) } +/// counter: Arc field is owned by `Accept` thread and `ServerWorker` thread. +/// +/// `Accept` would increment the counter and `ServerWorker` would decrement it. +/// +/// # Atomic Ordering: +/// +/// `Accept` always look into it's cached `Availability` field for `ServerWorker` state. +/// It lazily increment counter after successful dispatching new work to `ServerWorker`. +/// On reaching counter limit `Accept` update it's cached `Availability` and mark worker as +/// unable to accept any work. +/// +/// `ServerWorker` always decrement the counter when every work received from `Accept` is done. +/// On reaching counter limit worker would use `mio::Waker` and `WakerQueue` to wake up `Accept` +/// and notify it to update cached `Availability` again to mark worker as able to accept work again. +/// +/// Hense a wake up would only happen after `Accept` increment it to limit. +/// And a decrement to limit always wake up `Accept`. +#[derive(Clone)] +pub(crate) struct Counter { + counter: Arc, + limit: usize, +} + +impl Counter { + pub(crate) fn new(limit: usize) -> Self { + Self { + counter: Arc::new(AtomicUsize::new(1)), + limit, + } + } + + /// Increment counter by 1 and return true when hitting limit + #[inline(always)] + pub(crate) fn inc(&self) -> bool { + self.counter.fetch_add(1, Ordering::Relaxed) != self.limit + } + + /// Decrement counter by 1 and return true if crossing limit. + #[inline(always)] + pub(crate) fn dec(&self) -> bool { + self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit + } + + pub(crate) fn total(&self) -> usize { + self.counter.load(Ordering::SeqCst) - 1 + } +} + +pub(crate) struct WorkerCounter { + idx: usize, + inner: Rc<(WakerQueue, Counter)>, +} + +impl Clone for WorkerCounter { + fn clone(&self) -> Self { + Self { + idx: self.idx, + inner: self.inner.clone(), + } + } +} + +impl WorkerCounter { + pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self { + Self { + idx, + inner: Rc::new((waker_queue, counter)), + } + } + + #[inline(always)] + pub(crate) fn guard(&self) -> WorkerCounterGuard { + WorkerCounterGuard(self.clone()) + } + + fn total(&self) -> usize { + self.inner.1.total() + } +} + +pub(crate) struct WorkerCounterGuard(WorkerCounter); + +impl Drop for WorkerCounterGuard { + fn drop(&mut self) { + let (waker_queue, counter) = &*self.0.inner; + if counter.dec() { + waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx)); + } + } +} + /// Handle to worker that can send connection message to worker and share the /// availability of worker to other thread. /// /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { + idx: usize, tx: UnboundedSender, - avail: WorkerAvailability, + counter: Counter, } impl WorkerHandleAccept { #[inline(always)] pub(crate) fn idx(&self) -> usize { - self.avail.idx + self.idx } #[inline(always)] @@ -76,8 +171,8 @@ impl WorkerHandleAccept { } #[inline(always)] - pub(crate) fn available(&self) -> bool { - self.avail.available() + pub(crate) fn inc_counter(&self) -> bool { + self.counter.inc() } } @@ -97,40 +192,6 @@ impl WorkerHandleServer { } } -#[derive(Clone)] -pub(crate) struct WorkerAvailability { - idx: usize, - waker: WakerQueue, - available: Arc, -} - -impl WorkerAvailability { - pub fn new(idx: usize, waker: WakerQueue) -> Self { - WorkerAvailability { - idx, - waker, - available: Arc::new(AtomicBool::new(false)), - } - } - - #[inline(always)] - pub fn available(&self) -> bool { - self.available.load(Ordering::Acquire) - } - - pub fn set(&self, val: bool) { - // Ordering: - // - // There could be multiple set calls happen in one ::poll. - // Order is important between them. - let old = self.available.swap(val, Ordering::AcqRel); - // Notify the accept on switched to available. - if !old && val { - self.waker.wake(WakerInterest::WorkerAvailable(self.idx)); - } - } -} - /// Service worker. /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. @@ -139,9 +200,8 @@ pub(crate) struct ServerWorker { // It must be dropped as soon as ServerWorker dropping. rx: UnboundedReceiver, rx2: UnboundedReceiver, + counter: WorkerCounter, services: Box<[WorkerService]>, - availability: WorkerAvailability, - conns: Counter, factories: Box<[Box]>, state: WorkerState, shutdown_timeout: Duration, @@ -209,20 +269,29 @@ impl ServerWorker { pub(crate) fn start( idx: usize, factories: Box<[Box]>, - avail: WorkerAvailability, + waker_queue: WakerQueue, config: ServerWorkerConfig, ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { + let counter = Counter::new(config.max_concurrent_connections); + + let counter_clone = counter.clone(); + let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1); - let (tx1, tx2) = Self::_start(idx, factories, avail.clone(), config, move |err| { - factory_tx.send(err).unwrap() - })?; + let (tx1, tx2) = Self::_start( + idx, + factories, + waker_queue, + counter_clone, + config, + move |err| factory_tx.send(err).unwrap(), + )?; factory_rx .recv() .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? .map(Err) - .unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail))) + .unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, counter))) } // TODO: Use start_non_blocking when restarting worker. @@ -231,14 +300,23 @@ impl ServerWorker { pub(crate) fn start_non_blocking( idx: usize, factories: Box<[Box]>, - avail: WorkerAvailability, + waker_queue: WakerQueue, config: ServerWorkerConfig, ) -> impl Future> { + let counter = Counter::new(config.max_concurrent_connections); + + let counter_clone = counter.clone(); + let (factory_tx, factory_rx) = oneshot::channel(); - let res = Self::_start(idx, factories, avail.clone(), config, move |err| { - factory_tx.send(err).unwrap() - }); + let res = Self::_start( + idx, + factories, + waker_queue, + counter_clone, + config, + move |err| factory_tx.send(err).unwrap(), + ); async move { let (tx1, tx2) = res?; @@ -246,22 +324,21 @@ impl ServerWorker { .await .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? .map(Err) - .unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail))) + .unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, counter))) } } fn _start( idx: usize, factories: Box<[Box]>, - availability: WorkerAvailability, + waker_queue: WakerQueue, + counter: Counter, config: ServerWorkerConfig, f: F, ) -> io::Result<(UnboundedSender, UnboundedSender)> where F: FnOnce(Option) + Send + 'static, { - assert!(!availability.available()); - // Try to get actix system. let system = System::try_current(); @@ -283,14 +360,14 @@ impl ServerWorker { io::Error::new(io::ErrorKind::Other, "Can not start worker service") })?; - assert_eq!(token.0, services.len()); + assert_eq!(token, services.len()); services.push(WorkerService { factory: idx, service, status: WorkerServiceStatus::Unavailable, }); } - Ok::<_, io::Error>(services) + Ok::<_, io::Error>(services.into_boxed_slice()) }; // All future runs in a LocalSet for being able to run !Send future. @@ -306,9 +383,8 @@ impl ServerWorker { let worker = ServerWorker { rx, rx2, - services: services.into_boxed_slice(), - availability, - conns: Counter::new(config.max_concurrent_connections), + services, + counter: WorkerCounter::new(idx, waker_queue, counter), factories, state: Default::default(), shutdown_timeout: config.shutdown_timeout, @@ -346,9 +422,8 @@ impl ServerWorker { let worker = ServerWorker { rx, rx2, - services: services.into_boxed_slice(), - availability, - conns: Counter::new(config.max_concurrent_connections), + services, + counter: WorkerCounter::new(idx, waker_queue, counter), factories, state: Default::default(), shutdown_timeout: config.shutdown_timeout, @@ -371,13 +446,13 @@ impl ServerWorker { .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } - fn restart_service(&mut self, token: Token, factory_id: usize) { + fn restart_service(&mut self, idx: usize, factory_id: usize) { let factory = &self.factories[factory_id]; - trace!("Service {:?} failed, restarting", factory.name(token)); - self.services[token.0].status = WorkerServiceStatus::Restarting; + trace!("Service {:?} failed, restarting", factory.name(idx)); + self.services[idx].status = WorkerServiceStatus::Restarting; self.state = WorkerState::Restarting(Restart { factory_id, - token, + token: idx, fut: factory.create(), }); } @@ -395,8 +470,8 @@ impl ServerWorker { }); } - fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { - let mut ready = self.conns.available(cx); + fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { + let mut ready = true; for (idx, srv) in self.services.iter_mut().enumerate() { if srv.status == WorkerServiceStatus::Available || srv.status == WorkerServiceStatus::Unavailable @@ -406,7 +481,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Unavailable { trace!( "Service {:?} is available", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Available; } @@ -417,7 +492,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Available { trace!( "Service {:?} is unavailable", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Unavailable; } @@ -425,10 +500,10 @@ impl ServerWorker { Poll::Ready(Err(_)) => { error!( "Service {:?} readiness check returned error, restarting", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Failed; - return Err((Token(idx), srv.factory)); + return Err((idx, srv.factory)); } } } @@ -447,8 +522,8 @@ enum WorkerState { struct Restart { factory_id: usize, - token: Token, - fut: LocalBoxFuture<'static, Result<(Token, BoxedServerService), ()>>, + token: usize, + fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>, } // Shutdown keep states necessary for server shutdown: @@ -467,15 +542,6 @@ impl Default for WorkerState { } } -impl Drop for ServerWorker { - fn drop(&mut self) { - // Set availability to true so if accept try to send connection to this worker - // it would find worker is gone and remove it. - // This is helpful when worker is dropped unexpected. - self.availability.set(true); - } -} - impl Future for ServerWorker { type Output = (); @@ -485,8 +551,7 @@ impl Future for ServerWorker { // `StopWorker` message handler if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) { - this.availability.set(false); - let num = this.conns.total(); + let num = this.counter.total(); if num == 0 { info!("Shutting down worker, 0 connections"); let _ = tx.send(true); @@ -513,7 +578,6 @@ impl Future for ServerWorker { WorkerState::Unavailable => match this.check_readiness(cx) { Ok(true) => { this.state = WorkerState::Available; - this.availability.set(true); self.poll(cx) } Ok(false) => Poll::Pending, @@ -541,7 +605,7 @@ impl Future for ServerWorker { this.factories[factory_id].name(token) ); - this.services[token.0].created(service); + this.services[token].created(service); this.state = WorkerState::Unavailable; self.poll(cx) @@ -550,7 +614,7 @@ impl Future for ServerWorker { // Wait for 1 second. ready!(shutdown.timer.as_mut().poll(cx)); - if this.conns.total() == 0 { + if this.counter.total() == 0 { // Graceful shutdown. if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { let _ = shutdown.tx.send(true); @@ -575,22 +639,20 @@ impl Future for ServerWorker { Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); - this.availability.set(false); this.state = WorkerState::Unavailable; return self.poll(cx); } Err((token, idx)) => { this.restart_service(token, idx); - this.availability.set(false); return self.poll(cx); } } + // handle incoming io stream match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { - // handle incoming io stream Some(msg) => { - let guard = this.conns.get(); - let _ = this.services[msg.token.0].service.call((guard, msg.io)); + let guard = this.counter.guard(); + let _ = this.services[msg.token].service.call((guard, msg.io)); } None => return Poll::Ready(()), };