diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 3bb940f4..07fdfb4e 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -5,7 +5,7 @@ use actix_rt::{ time::{sleep, Instant}, System, }; -use log::{error, info}; +use log::{error, info, warn}; use mio::{Interest, Poll, Token as MioToken}; use crate::server::Server; @@ -99,19 +99,17 @@ impl Availability { self.0.iter().any(|a| *a != 0) } + /// Check if worker handle is available by index + #[inline(always)] + fn get_available(&self, idx: usize) -> bool { + let (offset, idx) = Self::offset(idx); + + self.0[offset] & (1 << idx as u128) != 0 + } + /// Set worker handle available state by index. fn set_available(&mut self, idx: usize, avail: bool) { - let (offset, idx) = if idx < 128 { - (0, idx) - } else if idx < 128 * 2 { - (1, idx - 128) - } else if idx < 128 * 3 { - (2, idx - 128 * 2) - } else if idx < 128 * 4 { - (3, idx - 128 * 3) - } else { - panic!("Max WorkerHandle count is 512") - }; + let (offset, idx) = Self::offset(idx); let off = 1 << idx as u128; if avail { @@ -128,6 +126,21 @@ impl Availability { self.set_available(handle.idx(), true); }) } + + /// Get offset and adjusted index of given worker handle index. + fn offset(idx: usize) -> (usize, usize) { + if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + } + } } /// This function defines errors that are per-connection. Which basically @@ -252,16 +265,21 @@ impl Accept { // from backpressure. Some(WakerInterest::WorkerAvailable(idx)) => { drop(guard); + self.avail.set_available(idx, true); - self.maybe_backpressure(sockets, false); + + self.accept_all(sockets); + // self.maybe_backpressure(sockets, false); } // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { drop(guard); + self.avail.set_available(handle.idx(), true); self.handles.push(handle); - // maybe we want to recover from a backpressure. - self.maybe_backpressure(sockets, false); + + self.accept_all(sockets); + // self.maybe_backpressure(sockets, false); } // got timer interest and it's time to try register socket(s) again Some(WakerInterest::Timer) => { @@ -372,42 +390,38 @@ impl Accept { .for_each(|(_, info)| self.deregister_logged(info)); } - fn maybe_backpressure(&mut self, sockets: &mut [ServerSocketInfo], on: bool) { - // Only operate when server is in a different backpressure than the given flag. - if self.backpressure != on { - self.backpressure = on; - sockets - .iter_mut() - // Only operate on sockets without associated timeout. - // Sockets with it should be handled by `accept` and `process_timer` methods. - // They are already deregistered or need to be reregister in the future. - .filter(|info| info.timeout.is_none()) - .for_each(|info| { - if on { - self.deregister_logged(info); - } else { - self.register_logged(info); - } - }); - - // Try to drain sockets backlog on recovery of backpressure. - // This is necessary for not hang listeners. - // - // In Accept::accept method listener does not always read to WouldBlock state. - if !self.backpressure { - self.accept_all(sockets); - } - } - } + // TODO: back pressure is disabled for now. + // fn maybe_backpressure(&mut self, sockets: &mut [ServerSocketInfo], on: bool) { + // // Only operate when server is in a different backpressure than the given flag. + // if self.backpressure != on { + // self.backpressure = on; + // sockets + // .iter_mut() + // // Only operate on sockets without associated timeout. + // // Sockets with it should be handled by `accept` and `process_timer` methods. + // // They are already deregistered or need to be reregister in the future. + // .filter(|info| info.timeout.is_none()) + // .for_each(|info| { + // if on { + // self.deregister_logged(info); + // } else { + // self.register_logged(info); + // } + // }); + // } + // } // Send connection to worker and handle error. - fn send_connection( - &mut self, - sockets: &mut [ServerSocketInfo], - conn: Conn, - ) -> Result<(), Conn> { - match self.next().send(conn) { + fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> { + let next = self.next(); + match next.send(conn) { Ok(_) => { + // Increment counter of WorkerHandle. + // Set worker to unavailable with it hit max (Return false). + if !next.available() { + let idx = next.idx(); + self.avail.set_available(idx, false); + } self.set_next(); Ok(()) } @@ -418,7 +432,7 @@ impl Accept { if self.handles.is_empty() { error!("No workers"); - self.maybe_backpressure(sockets, true); + // self.maybe_backpressure(sockets, true); // All workers are gone and Conn is nowhere to be sent. // Treat this situation as Ok and drop Conn. return Ok(()); @@ -431,13 +445,13 @@ impl Accept { } } - fn accept_one(&mut self, sockets: &mut [ServerSocketInfo], mut conn: Conn) { + fn accept_one(&mut self, mut conn: Conn) { loop { let next = self.next(); let idx = next.idx(); - if next.available() { - // self.avail.set_available(idx, true); - match self.send_connection(sockets, conn) { + + if self.avail.get_available(idx) { + match self.send_connection(conn) { Ok(_) => return, Err(c) => conn = c, } @@ -445,21 +459,29 @@ impl Accept { self.avail.set_available(idx, false); self.set_next(); + // The break condition should never be met. + // Address TODO below and remove dead code if this + // assert is proven to be never triggered. + assert!(self.avail.available()); + if !self.avail.available() { break; } } } + // TODO: This branch is never entered. Consider remove. + warn!("Enter backpressure path. Please make report for issue"); + // Sending connection failed due to either all workers are in error or not available. // Enter backpressure state and try again. - self.maybe_backpressure(sockets, true); + // self.maybe_backpressure(sockets, true); // Force send connection to worker regardless it's avail state. // Worst case here is conn get dropped after all handles are gone. - while let Err(c) = self.send_connection(sockets, conn) { - conn = c - } + // while let Err(c) = self.send_connection(sockets, conn) { + // conn = c + // } } fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { @@ -469,7 +491,7 @@ impl Accept { match info.lst.accept() { Ok(io) => { let conn = Conn { io, token }; - self.accept_one(sockets, conn); + self.accept_one(conn); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, @@ -503,7 +525,7 @@ impl Accept { .map(|info| info.token) .collect::>() .into_iter() - .for_each(|idx| self.accept(sockets, idx)); + .for_each(|idx| self.accept(sockets, idx)) } #[inline(always)]