diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 7c97ee30..0e2cbe05 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -326,72 +326,74 @@ impl Accept { } } - fn accept_one(&mut self, sockets: &mut Slab, mut msg: Conn) { + fn accept_one(&mut self, sockets: &mut Slab, mut conn: Conn) { if self.backpressure { + // send_connection would remove fault worker from handles. + // worst case here is conn get dropped after all handles are gone. while !self.handles.is_empty() { - match self.handles[self.next].send(msg) { - Ok(_) => { - self.set_next(); - break; - } - Err(tmp) => { - // worker lost contact and could be gone. a message is sent to - // `ServerBuilder` future to notify it a new worker should be made - // after that remove the fault worker - self.srv.worker_faulted(self.handles[self.next].idx); - msg = tmp; - self.handles.swap_remove(self.next); - if self.handles.is_empty() { - error!("No workers"); - return; - } else if self.handles.len() <= self.next { - self.next = 0; - } - continue; - } + match self.send_connection(sockets, conn) { + Ok(_) => return, + Err(c) => conn = c, } } } else { + // Do one round and try to send conn to all workers until it succeed. + // Start from self.next. let mut idx = 0; while idx < self.handles.len() { idx += 1; if self.handles[self.next].available() { - match self.handles[self.next].send(msg) { - Ok(_) => { - self.set_next(); - return; - } - // worker lost contact and could be gone. a message is sent to - // `ServerBuilder` future to notify it a new worker should be made. - // after that remove the fault worker and enter backpressure if necessary. - Err(tmp) => { - self.srv.worker_faulted(self.handles[self.next].idx); - msg = tmp; - self.handles.swap_remove(self.next); - if self.handles.is_empty() { - error!("No workers"); - self.maybe_backpressure(sockets, true); - return; - } else if self.handles.len() <= self.next { - self.next = 0; - } - continue; - } + match self.send_connection(sockets, conn) { + Ok(_) => return, + Err(c) => conn = c, } + } else { + self.set_next(); } - self.set_next(); } - // enable backpressure + // Sending Conn failed due to either all workers are in error or not available. + // Enter backpressure state and try again. self.maybe_backpressure(sockets, true); - self.accept_one(sockets, msg); + self.accept_one(sockets, conn); } } - // set next worker handle that would accept work. + // Set next worker handle that would accept work. fn set_next(&mut self) { self.next = (self.next + 1) % self.handles.len(); } + // Send connection to worker and handle error. + fn send_connection( + &mut self, + sockets: &mut Slab, + conn: Conn, + ) -> Result<(), Conn> { + match self.handles[self.next].send(conn) { + Ok(_) => { + self.set_next(); + Ok(()) + } + Err(conn) => { + // worker lost contact and could be gone. a message is sent to + // `ServerBuilder` future to notify it a new worker should be made. + // after that remove the fault worker and enter backpressure if necessary. + self.srv.worker_faulted(self.handles[self.next].idx); + self.handles.swap_remove(self.next); + if self.handles.is_empty() { + error!("No workers"); + self.maybe_backpressure(sockets, true); + // All workers are gone and Conn is nowhere to be sent. + // Treat this situation as Ok and drop Conn. + return Ok(()); + } else if self.handles.len() <= self.next { + self.next = 0; + } + Err(conn) + } + } + } + fn accept(&mut self, sockets: &mut Slab, token: usize) { loop { let info = sockets @@ -399,11 +401,10 @@ impl Accept { .expect("ServerSocketInfo is removed from Slab"); match info.lst.accept() { - Ok((io, addr)) => { + Ok(io) => { let msg = Conn { io, token: info.token, - peer: Some(addr), }; self.accept_one(sockets, msg); } diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index baf02cbe..0625cfda 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -40,15 +40,11 @@ impl MioListener { } } - pub(crate) fn accept(&self) -> io::Result<(MioStream, SocketAddr)> { + pub(crate) fn accept(&self) -> io::Result { match *self { - MioListener::Tcp(ref lst) => lst - .accept() - .map(|(stream, addr)| (MioStream::Tcp(stream), SocketAddr::Tcp(addr))), + MioListener::Tcp(ref lst) => lst.accept().map(|(stream, _)| MioStream::Tcp(stream)), #[cfg(unix)] - MioListener::Uds(ref lst) => lst - .accept() - .map(|(stream, addr)| (MioStream::Uds(stream), SocketAddr::Uds(addr))), + MioListener::Uds(ref lst) => lst.accept().map(|(stream, _)| MioStream::Uds(stream)), } } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index aa6d31fc..63c45757 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -14,7 +14,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use crate::service::{BoxedServerService, InternalServiceFactory}; -use crate::socket::{MioStream, SocketAddr}; +use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::{join_all, Token}; @@ -31,7 +31,6 @@ pub(crate) struct StopCommand { pub(crate) struct Conn { pub io: MioStream, pub token: Token, - pub peer: Option, } static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);