From f51237900a71986658c2a3b3097b134e23a2e12d Mon Sep 17 00:00:00 2001 From: Alex Whitney Date: Sun, 2 Aug 2020 14:25:31 +0100 Subject: [PATCH] document accept loop --- actix-server/src/accept.rs | 198 +++++++++++++++++++++++++----------- actix-server/src/builder.rs | 8 +- actix-server/src/server.rs | 5 + actix-server/src/socket.rs | 1 + actix-server/src/worker.rs | 24 +++-- 5 files changed, 167 insertions(+), 69 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 4dc218fd..5092ad0d 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -106,7 +106,7 @@ struct Accept { workers: Vec, srv: Server, timer: (mio::Registration, mio::SetReadiness), - next: usize, + next_worker_ix: usize, backpressure: bool, } @@ -128,6 +128,12 @@ fn connection_error(e: &io::Error) -> bool { || e.kind() == io::ErrorKind::ConnectionReset } +// One-shot enum, indicates how to repond to commands +enum ShouldAbort { + Abort, + Continue, +} + impl Accept { #![allow(clippy::too_many_arguments)] pub(crate) fn start( @@ -138,9 +144,9 @@ impl Accept { srv: Server, workers: Vec, ) { + // start accepting events (within separate thread) let sys = System::current(); - // start accept thread let _ = thread::Builder::new() .name("actix-server accept loop".to_owned()) .spawn(move || { @@ -167,6 +173,7 @@ impl Accept { panic!("Can not register Registration: {}", err); } + // Start core accept loop. Blocks indefinitely except in case of error accept.poll(); }); } @@ -224,32 +231,46 @@ impl Accept { sockets, workers, srv, - next: 0, + next_worker_ix: 0, timer: (tm, tmr), backpressure: false, } } + // Core acceptor logic. Receive notifications from the event loop and respond. + // In particular, receive notifications of pending connections, and accept them. fn poll(&mut self) { // Create storage for events let mut events = mio::Events::with_capacity(128); loop { + // block here, waiting to receive events from the `tokio` event loop if let Err(err) = self.poll.poll(&mut events, None) { panic!("Poll error: {}", err); } + // now process the events for event in events.iter() { let token = event.token(); + // as well as responding to socket events, + // we also use `mio` as a messaging layer for + // actix events match token { CMD => { - if !self.process_cmd() { - return; + // There is a pending message from the server + match self.process_cmd() { + ShouldAbort::Abort => return, + ShouldAbort::Continue => continue, } } TIMER => self.process_timer(), - NOTIFY => self.backpressure(false), + NOTIFY => { + // A message from a worker thread indicating that it is + // available again - therefore remove backpressure, if any + self.set_backpressure(false) + } _ => { + // any other token indicates a pending connection - accept it let token = usize::from(token); if token < DELTA { continue; @@ -258,10 +279,14 @@ impl Accept { } } } + // all events processed - loop! } } fn process_timer(&mut self) { + // This function is triggered after an IO error. During error recovery + // the affected socket is de-registered, and after some timeout + // we must re-register it let now = Instant::now(); for (token, info) in self.sockets.iter_mut() { if let Some(inst) = info.timeout.take() { @@ -283,7 +308,8 @@ impl Accept { } } - fn process_cmd(&mut self) -> bool { + /// Process messages received from server + fn process_cmd(&mut self) -> ShouldAbort { loop { match self.rx.try_recv() { Ok(cmd) => match cmd { @@ -312,10 +338,10 @@ impl Accept { for (_, info) in self.sockets.iter() { let _ = self.poll.deregister(&info.sock); } - return false; + return ShouldAbort::Abort; } Command::Worker(worker) => { - self.backpressure(false); + self.set_backpressure(false); self.workers.push(worker); } }, @@ -325,12 +351,12 @@ impl Accept { for (_, info) in self.sockets.iter() { let _ = self.poll.deregister(&info.sock); } - return false; + return ShouldAbort::Abort; } }, } } - true + ShouldAbort::Continue } #[cfg(not(target_os = "windows"))] @@ -365,101 +391,157 @@ impl Accept { }) } - fn backpressure(&mut self, on: bool) { + /// While backpressure is enabled, we will not accept any + /// new connections (but existing ones will be served) + fn set_backpressure(&mut self, on: bool) { + if self.backpressure == on { + // already set -> no op + return; + } + self.backpressure = on; if self.backpressure { - if !on { - self.backpressure = false; - for (token, info) in self.sockets.iter() { - if let Err(err) = self.register(token, info) { - error!("Can not resume socket accept process: {}", err); - } else { - info!("Accepting connections on {} has been resumed", info.addr); - } - } - } - } else if on { - self.backpressure = true; + // stop being notified of pending connections for (_, info) in self.sockets.iter() { let _ = self.poll.deregister(&info.sock); } + } else { + // resume being notified of pending connections + for (token, info) in self.sockets.iter() { + if let Err(err) = self.register(token, info) { + error!("Can not resume socket accept process: {}", err); + } else { + info!("Accepting connections on {} has been resumed", info.addr); + } + } } } - fn accept_one(&mut self, mut msg: Conn) { + fn accept_one(&mut self, mut conn: Conn) { + // we have an incomming connection, we must send it to a worker + if self.backpressure { while !self.workers.is_empty() { - match self.workers[self.next].send(msg) { + match self.workers[self.next_worker_ix].send(conn) { Ok(_) => (), Err(tmp) => { - self.srv.worker_faulted(self.workers[self.next].idx); - msg = tmp; - self.workers.swap_remove(self.next); + // the receiving end of the channel is closed, + // probably because the worker thread has crashed + + // recover the connection and notify the server + conn = tmp; + self.srv + .worker_faulted(self.workers[self.next_worker_ix].idx); + + self.workers.swap_remove(self.next_worker_ix); if self.workers.is_empty() { error!("No workers"); return; - } else if self.workers.len() <= self.next { - self.next = 0; + } else if self.workers.len() <= self.next_worker_ix { + self.next_worker_ix = 0; } continue; } } - self.next = (self.next + 1) % self.workers.len(); + self.next_worker_ix = (self.next_worker_ix + 1) % self.workers.len(); break; } } else { + // We iterate through our workers, starting from + // `self.next_worker_ix`, and try to find one that is not busy let mut idx = 0; while idx < self.workers.len() { idx += 1; - if self.workers[self.next].available() { - match self.workers[self.next].send(msg) { - Ok(_) => { - self.next = (self.next + 1) % self.workers.len(); + if self.workers[self.next_worker_ix].is_available() { + // worker has indicated that it is available, so + // send the connection through a channel to the worker thread + match self.workers[self.next_worker_ix].send(conn) { + Ok(()) => { + // connection sent to worker, bump the index and we're done + self.next_worker_ix = + (self.next_worker_ix + 1) % self.workers.len(); return; } Err(tmp) => { - self.srv.worker_faulted(self.workers[self.next].idx); - msg = tmp; - self.workers.swap_remove(self.next); + // the receiving end of the channel is closed, + // probably because the worker thread has crashed + + // recover the connection and notify the server + conn = tmp; + self.srv + .worker_faulted(self.workers[self.next_worker_ix].idx); + + // discard the crashed worker + self.workers.swap_remove(self.next_worker_ix); + if self.workers.is_empty() { + // all workers have crashed! Drop the connection, + // we'll try to recover next time error!("No workers"); - self.backpressure(true); + self.set_backpressure(true); return; - } else if self.workers.len() <= self.next { - self.next = 0; + } else if self.workers.len() <= self.next_worker_ix { + // self.next_worker_ix now points out-of-bounds, so reset it + self.next_worker_ix = 0; } continue; } } } - self.next = (self.next + 1) % self.workers.len(); + self.next_worker_ix = (self.next_worker_ix + 1) % self.workers.len(); } - // enable backpressure - self.backpressure(true); - self.accept_one(msg); + // No workers are available. Enable backpressure and try again + self.set_backpressure(true); + self.accept_one(conn); } } fn accept(&mut self, token: usize) { + // This is the core 'accept' loop and is critical for overall performace. + // The overall logic is: We have received a token the mio event loop + // This could indicate // FIXME + // Now we need to do something useful with it loop { - let msg = if let Some(info) = self.sockets.get_mut(token) { + let conn = if let Some(info) = self.sockets.get_mut(token) { + // We have already registered this token, which means + // that the event relates to a request that is has already + // been accepted and is in the middle of being handled by actix + match info.sock.accept() { - Ok(Some((io, addr))) => Conn { - io, - token: info.token, - peer: Some(addr), - }, - Ok(None) => return, - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, - Err(ref e) if connection_error(e) => continue, + Ok(Some((io, addr))) => { + // connection accepted (happy path) + Conn { + io, + token: info.token, + peer: Some(addr), + } + } + Ok(None) => { + // Only reachable for unix domain sockets. No waiting connection + // so nothing to be done. Yield to the event loop + return; + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + // Socket not ready - yield to the event loop + return; + } + Err(ref e) if connection_error(e) => { + // connection error, retry the socket + continue; + } Err(e) => { + // some other (fatal) IO error + // We will attempt to recover by deregistering the socket + // with mio, then after a short pause sending a notification + // to re-register the socket error!("Error accepting connection: {}", e); if let Err(err) = self.poll.deregister(&info.sock) { error!("Can not deregister server socket {}", err); } - // sleep after error info.timeout = Some(Instant::now() + Duration::from_millis(500)); + // create and run a future which will sleep for a short period + // then trigger a mio event let r = self.timer.1.clone(); System::current().arbiter().send(Box::pin(async move { delay_until(Instant::now() + Duration::from_millis(510)).await; @@ -469,10 +551,12 @@ impl Accept { } } } else { + // no socket associated with the token, implies the token is + // stale in some way. Nothing to do so yield to the event loop return; }; - self.accept_one(msg); + self.accept_one(conn); } } } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 2e8ae30e..5a233e64 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -303,6 +303,7 @@ impl ServerBuilder { Worker::start(idx, services, avail, self.shutdown_timeout) } + /// Handle commands received from the `Server` fn handle_cmd(&mut self, item: ServerCommand) { match item { ServerCommand::Pause(tx) => { @@ -408,6 +409,9 @@ impl ServerBuilder { } } ServerCommand::WorkerFaulted(idx) => { + // a worker has crashed, attempt to restart + + // discard crashed worker let mut found = false; for i in 0..self.workers.len() { if self.workers[i].0 == idx { @@ -418,8 +422,9 @@ impl ServerBuilder { } if found { - error!("Worker has died {:?}, restarting", idx); + error!("Worker {} has died, restarting", idx); + // generate a new (unused) worker index let mut new_idx = self.workers.len(); 'found: loop { for i in 0..self.workers.len() { @@ -431,6 +436,7 @@ impl ServerBuilder { break; } + // start a new worker and send to the accept loop let worker = self.start_worker(new_idx, self.accept.get_notify()); self.workers.push((new_idx, worker.clone())); self.accept.send(Command::Worker(worker)); diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index b29a9e02..08e72c2a 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -25,6 +25,11 @@ pub(crate) enum ServerCommand { Notify(oneshot::Sender<()>), } +// A `Server` is just a handle through which we can send messages. +// It may be cloned and handed out to other parts of the `actix` +// runtime (e.g. one is held by the `Accept` struct) +// On the receiving end is a ServerBuilder, spawned onto the event +// loop, which responds to the messages #[derive(Debug)] pub struct Server( UnboundedSender, diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 3025660a..86238175 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -83,6 +83,7 @@ pub(crate) enum SocketListener { } impl SocketListener { + /// Accept the socket *in blocking mode* pub(crate) fn accept(&self) -> io::Result> { match *self { SocketListener::Tcp(ref lst) => lst diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 4ae3e4f3..72e26fee 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -78,14 +78,14 @@ impl WorkerClient { } } - pub fn send(&self, msg: Conn) -> Result<(), Conn> { + pub fn send(&self, conn: Conn) -> Result<(), Conn> { self.tx1 - .unbounded_send(WorkerCommand(msg)) - .map_err(|msg| msg.into_inner().0) + .unbounded_send(WorkerCommand(conn)) + .map_err(|conn| conn.into_inner().0) } - pub fn available(&self) -> bool { - self.avail.available() + pub fn is_available(&self) -> bool { + self.avail.is_available() } pub fn stop(&self, graceful: bool) -> oneshot::Receiver { @@ -109,12 +109,14 @@ impl WorkerAvailability { } } - pub fn available(&self) -> bool { + pub fn is_available(&self) -> bool { self.available.load(Ordering::Acquire) } pub fn set(&self, val: bool) { let old = self.available.swap(val, Ordering::Release); + // If changing availability to 'true', also + // send a notification event via `mio` if !old && val { self.notify.notify() } @@ -318,7 +320,7 @@ impl Future for Worker { // FIXME: remove this attribute #[allow(clippy::never_loop)] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // `StopWorker` message handler + // first we check the `StopWorker` message receiver if let Poll::Ready(Some(StopCommand { graceful, result })) = Pin::new(&mut self.rx2).poll_next(cx) { @@ -448,19 +450,19 @@ impl Future for Worker { loop { match Pin::new(&mut self.rx).poll_next(cx) { // handle incoming io stream - Poll::Ready(Some(WorkerCommand(msg))) => { + Poll::Ready(Some(WorkerCommand(conn))) => { match self.check_readiness(cx) { Ok(true) => { let guard = self.conns.get(); - let _ = self.services[msg.token.0] + let _ = self.services[conn.token.0] .service - .call((Some(guard), ServerMessage::Connect(msg.io))); + .call((Some(guard), ServerMessage::Connect(conn.io))); continue; } Ok(false) => { trace!("Worker is unavailable"); self.availability.set(false); - self.state = WorkerState::Unavailable(vec![msg]); + self.state = WorkerState::Unavailable(vec![conn]); } Err((token, idx)) => { trace!(