Make counter limit switch accurate

This commit is contained in:
fakeshadow 2021-04-19 18:43:58 +08:00
parent 0bad2258b5
commit 8c7f0ef01b
1 changed files with 81 additions and 59 deletions

View File

@ -5,7 +5,7 @@ use actix_rt::{
time::{sleep, Instant}, time::{sleep, Instant},
System, System,
}; };
use log::{error, info}; use log::{error, info, warn};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use crate::server::Server; use crate::server::Server;
@ -99,19 +99,17 @@ impl Availability {
self.0.iter().any(|a| *a != 0) self.0.iter().any(|a| *a != 0)
} }
/// Check if worker handle is available by index
#[inline(always)]
fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index. /// Set worker handle available state by index.
fn set_available(&mut self, idx: usize, avail: bool) { fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = if idx < 128 { let (offset, idx) = Self::offset(idx);
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
};
let off = 1 << idx as u128; let off = 1 << idx as u128;
if avail { if avail {
@ -128,6 +126,21 @@ impl Availability {
self.set_available(handle.idx(), true); self.set_available(handle.idx(), true);
}) })
} }
/// Get offset and adjusted index of given worker handle index.
fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
}
}
} }
/// This function defines errors that are per-connection. Which basically /// This function defines errors that are per-connection. Which basically
@ -252,16 +265,21 @@ impl Accept {
// from backpressure. // from backpressure.
Some(WakerInterest::WorkerAvailable(idx)) => { Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard); drop(guard);
self.avail.set_available(idx, true); self.avail.set_available(idx, true);
self.maybe_backpressure(sockets, false);
self.accept_all(sockets);
// self.maybe_backpressure(sockets, false);
} }
// a new worker thread is made and it's handle would be added to Accept // a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => { Some(WakerInterest::Worker(handle)) => {
drop(guard); drop(guard);
self.avail.set_available(handle.idx(), true); self.avail.set_available(handle.idx(), true);
self.handles.push(handle); self.handles.push(handle);
// maybe we want to recover from a backpressure.
self.maybe_backpressure(sockets, false); self.accept_all(sockets);
// self.maybe_backpressure(sockets, false);
} }
// got timer interest and it's time to try register socket(s) again // got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => { Some(WakerInterest::Timer) => {
@ -372,42 +390,38 @@ impl Accept {
.for_each(|(_, info)| self.deregister_logged(info)); .for_each(|(_, info)| self.deregister_logged(info));
} }
fn maybe_backpressure(&mut self, sockets: &mut [ServerSocketInfo], on: bool) { // TODO: back pressure is disabled for now.
// Only operate when server is in a different backpressure than the given flag. // fn maybe_backpressure(&mut self, sockets: &mut [ServerSocketInfo], on: bool) {
if self.backpressure != on { // // Only operate when server is in a different backpressure than the given flag.
self.backpressure = on; // if self.backpressure != on {
sockets // self.backpressure = on;
.iter_mut() // sockets
// Only operate on sockets without associated timeout. // .iter_mut()
// Sockets with it should be handled by `accept` and `process_timer` methods. // // Only operate on sockets without associated timeout.
// They are already deregistered or need to be reregister in the future. // // Sockets with it should be handled by `accept` and `process_timer` methods.
.filter(|info| info.timeout.is_none()) // // They are already deregistered or need to be reregister in the future.
.for_each(|info| { // .filter(|info| info.timeout.is_none())
if on { // .for_each(|info| {
self.deregister_logged(info); // if on {
} else { // self.deregister_logged(info);
self.register_logged(info); // } else {
} // self.register_logged(info);
}); // }
// });
// Try to drain sockets backlog on recovery of backpressure. // }
// This is necessary for not hang listeners. // }
//
// In Accept::accept method listener does not always read to WouldBlock state.
if !self.backpressure {
self.accept_all(sockets);
}
}
}
// Send connection to worker and handle error. // Send connection to worker and handle error.
fn send_connection( fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> {
&mut self, let next = self.next();
sockets: &mut [ServerSocketInfo], match next.send(conn) {
conn: Conn,
) -> Result<(), Conn> {
match self.next().send(conn) {
Ok(_) => { Ok(_) => {
// Increment counter of WorkerHandle.
// Set worker to unavailable with it hit max (Return false).
if !next.available() {
let idx = next.idx();
self.avail.set_available(idx, false);
}
self.set_next(); self.set_next();
Ok(()) Ok(())
} }
@ -418,7 +432,7 @@ impl Accept {
if self.handles.is_empty() { if self.handles.is_empty() {
error!("No workers"); error!("No workers");
self.maybe_backpressure(sockets, true); // self.maybe_backpressure(sockets, true);
// All workers are gone and Conn is nowhere to be sent. // All workers are gone and Conn is nowhere to be sent.
// Treat this situation as Ok and drop Conn. // Treat this situation as Ok and drop Conn.
return Ok(()); return Ok(());
@ -431,13 +445,13 @@ impl Accept {
} }
} }
fn accept_one(&mut self, sockets: &mut [ServerSocketInfo], mut conn: Conn) { fn accept_one(&mut self, mut conn: Conn) {
loop { loop {
let next = self.next(); let next = self.next();
let idx = next.idx(); let idx = next.idx();
if next.available() {
// self.avail.set_available(idx, true); if self.avail.get_available(idx) {
match self.send_connection(sockets, conn) { match self.send_connection(conn) {
Ok(_) => return, Ok(_) => return,
Err(c) => conn = c, Err(c) => conn = c,
} }
@ -445,21 +459,29 @@ impl Accept {
self.avail.set_available(idx, false); self.avail.set_available(idx, false);
self.set_next(); self.set_next();
// The break condition should never be met.
// Address TODO below and remove dead code if this
// assert is proven to be never triggered.
assert!(self.avail.available());
if !self.avail.available() { if !self.avail.available() {
break; break;
} }
} }
} }
// TODO: This branch is never entered. Consider remove.
warn!("Enter backpressure path. Please make report for issue");
// Sending connection failed due to either all workers are in error or not available. // Sending connection failed due to either all workers are in error or not available.
// Enter backpressure state and try again. // Enter backpressure state and try again.
self.maybe_backpressure(sockets, true); // self.maybe_backpressure(sockets, true);
// Force send connection to worker regardless it's avail state. // Force send connection to worker regardless it's avail state.
// Worst case here is conn get dropped after all handles are gone. // Worst case here is conn get dropped after all handles are gone.
while let Err(c) = self.send_connection(sockets, conn) { // while let Err(c) = self.send_connection(sockets, conn) {
conn = c // conn = c
} // }
} }
fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {
@ -469,7 +491,7 @@ impl Accept {
match info.lst.accept() { match info.lst.accept() {
Ok(io) => { Ok(io) => {
let conn = Conn { io, token }; let conn = Conn { io, token };
self.accept_one(sockets, conn); self.accept_one(conn);
} }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue, Err(ref e) if connection_error(e) => continue,
@ -503,7 +525,7 @@ impl Accept {
.map(|info| info.token) .map(|info| info.token)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter() .into_iter()
.for_each(|idx| self.accept(sockets, idx)); .for_each(|idx| self.accept(sockets, idx))
} }
#[inline(always)] #[inline(always)]