Remove backpressure. Add pause state

This commit is contained in:
fakeshadow 2021-04-20 00:17:31 +08:00
parent 8c7f0ef01b
commit de149527ff
2 changed files with 37 additions and 58 deletions

View File

@ -5,7 +5,7 @@ use actix_rt::{
time::{sleep, Instant}, time::{sleep, Instant},
System, System,
}; };
use log::{error, info, warn}; use log::{error, info};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use crate::server::Server; use crate::server::Server;
@ -80,7 +80,7 @@ struct Accept {
srv: Server, srv: Server,
next: usize, next: usize,
avail: Availability, avail: Availability,
backpressure: bool, paused: bool,
} }
/// Array of u128 with every bit as marker for a worker handle's availability. /// Array of u128 with every bit as marker for a worker handle's availability.
@ -217,7 +217,7 @@ impl Accept {
srv, srv,
next: 0, next: 0,
avail, avail,
backpressure: false, paused: false,
}; };
(accept, sockets) (accept, sockets)
@ -229,7 +229,7 @@ impl Accept {
loop { loop {
if let Err(e) = self.poll.poll(&mut events, None) { if let Err(e) = self.poll.poll(&mut events, None) {
match e.kind() { match e.kind() {
std::io::ErrorKind::Interrupted => continue, io::ErrorKind::Interrupted => continue,
_ => panic!("Poll error: {}", e), _ => panic!("Poll error: {}", e),
} }
} }
@ -240,6 +240,7 @@ impl Accept {
WAKER_TOKEN => { WAKER_TOKEN => {
let exit = self.handle_waker(sockets); let exit = self.handle_waker(sockets);
if exit { if exit {
info!("Accept is stopped.");
return; return;
} }
} }
@ -261,15 +262,15 @@ impl Accept {
// until the current task is done. // until the current task is done.
let mut guard = self.waker.guard(); let mut guard = self.waker.guard();
match guard.pop_front() { match guard.pop_front() {
// worker notify it becomes available. we may want to recover // worker notify it becomes available.
// from backpressure.
Some(WakerInterest::WorkerAvailable(idx)) => { Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard); drop(guard);
self.avail.set_available(idx, true); self.avail.set_available(idx, true);
self.accept_all(sockets); if !self.paused {
// self.maybe_backpressure(sockets, false); self.accept_all(sockets);
}
} }
// a new worker thread is made and it's handle would be added to Accept // a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => { Some(WakerInterest::Worker(handle)) => {
@ -278,32 +279,44 @@ impl Accept {
self.avail.set_available(handle.idx(), true); self.avail.set_available(handle.idx(), true);
self.handles.push(handle); self.handles.push(handle);
self.accept_all(sockets); if !self.paused {
// self.maybe_backpressure(sockets, false); self.accept_all(sockets);
}
} }
// got timer interest and it's time to try register socket(s) again // got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => { Some(WakerInterest::Timer) => {
drop(guard); drop(guard);
self.process_timer(sockets) self.process_timer(sockets)
} }
Some(WakerInterest::Pause) => { Some(WakerInterest::Pause) => {
drop(guard); drop(guard);
self.paused = true;
self.deregister_all(sockets); self.deregister_all(sockets);
} }
Some(WakerInterest::Resume) => { Some(WakerInterest::Resume) => {
drop(guard); drop(guard);
self.paused = false;
sockets.iter_mut().for_each(|info| { sockets.iter_mut().for_each(|info| {
self.register_logged(info); self.register_logged(info);
}); });
self.accept_all(sockets);
} }
Some(WakerInterest::Stop) => { Some(WakerInterest::Stop) => {
self.deregister_all(sockets); self.deregister_all(sockets);
return true; return true;
} }
// waker queue is drained // waker queue is drained
None => { None => {
// Reset the WakerQueue before break so it does not grow infinitely // Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard); WakerQueue::reset(&mut guard);
return false; return false;
} }
} }
@ -321,12 +334,12 @@ impl Accept {
if now < inst { if now < inst {
info.timeout = Some(inst); info.timeout = Some(inst);
} else if !self.backpressure { } else if !self.paused {
self.register_logged(info); self.register_logged(info);
} }
// Drop the timeout if server is in backpressure and socket timeout is expired. // Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from backpressure it will register all sockets without // When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then. // a timeout value so this socket register will be delayed till then.
}); });
} }
@ -390,27 +403,6 @@ impl Accept {
.for_each(|(_, info)| self.deregister_logged(info)); .for_each(|(_, info)| self.deregister_logged(info));
} }
// TODO: back pressure is disabled for now.
// fn maybe_backpressure(&mut self, sockets: &mut [ServerSocketInfo], on: bool) {
// // Only operate when server is in a different backpressure than the given flag.
// if self.backpressure != on {
// self.backpressure = on;
// sockets
// .iter_mut()
// // Only operate on sockets without associated timeout.
// // Sockets with it should be handled by `accept` and `process_timer` methods.
// // They are already deregistered or need to be reregister in the future.
// .filter(|info| info.timeout.is_none())
// .for_each(|info| {
// if on {
// self.deregister_logged(info);
// } else {
// self.register_logged(info);
// }
// });
// }
// }
// Send connection to worker and handle error. // Send connection to worker and handle error.
fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> { fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> {
let next = self.next(); let next = self.next();
@ -418,7 +410,7 @@ impl Accept {
Ok(_) => { Ok(_) => {
// Increment counter of WorkerHandle. // Increment counter of WorkerHandle.
// Set worker to unavailable with it hit max (Return false). // Set worker to unavailable with it hit max (Return false).
if !next.available() { if !next.incr_counter() {
let idx = next.idx(); let idx = next.idx();
self.avail.set_available(idx, false); self.avail.set_available(idx, false);
} }
@ -432,7 +424,6 @@ impl Accept {
if self.handles.is_empty() { if self.handles.is_empty() {
error!("No workers"); error!("No workers");
// self.maybe_backpressure(sockets, true);
// All workers are gone and Conn is nowhere to be sent. // All workers are gone and Conn is nowhere to be sent.
// Treat this situation as Ok and drop Conn. // Treat this situation as Ok and drop Conn.
return Ok(()); return Ok(());
@ -459,29 +450,14 @@ impl Accept {
self.avail.set_available(idx, false); self.avail.set_available(idx, false);
self.set_next(); self.set_next();
// The break condition should never be met.
// Address TODO below and remove dead code if this
// assert is proven to be never triggered.
assert!(self.avail.available());
if !self.avail.available() { if !self.avail.available() {
break; while let Err(c) = self.send_connection(conn) {
conn = c;
}
return;
} }
} }
} }
// TODO: This branch is never entered. Consider remove.
warn!("Enter backpressure path. Please make report for issue");
// Sending connection failed due to either all workers are in error or not available.
// Enter backpressure state and try again.
// self.maybe_backpressure(sockets, true);
// Force send connection to worker regardless it's avail state.
// Worst case here is conn get dropped after all handles are gone.
// while let Err(c) = self.send_connection(sockets, conn) {
// conn = c
// }
} }
fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {

View File

@ -72,12 +72,14 @@ impl Counter {
} }
} }
/// Increment counter it by 1 and return true when hitting limit /// Increment counter it by 1 and return false when hitting limit
#[inline(always)]
pub(crate) fn incr(&self) -> bool { pub(crate) fn incr(&self) -> bool {
self.counter.fetch_add(1, Ordering::Relaxed) != self.limit self.counter.fetch_add(1, Ordering::Relaxed) != self.limit
} }
/// Decrement counter it by 1 and return true when hitting limit /// Decrement counter it by 1 and return true when hitting limit
#[inline(always)]
pub(crate) fn derc(&self) -> bool { pub(crate) fn derc(&self) -> bool {
self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit
} }
@ -109,6 +111,7 @@ impl WorkerCounter {
} }
} }
#[inline(always)]
pub(crate) fn guard(&self) -> WorkerCounterGuard { pub(crate) fn guard(&self) -> WorkerCounterGuard {
WorkerCounterGuard(self.clone()) WorkerCounterGuard(self.clone())
} }
@ -151,7 +154,7 @@ impl WorkerHandleAccept {
} }
#[inline(always)] #[inline(always)]
pub(crate) fn available(&self) -> bool { pub(crate) fn incr_counter(&self) -> bool {
self.counter.incr() self.counter.incr()
} }
} }
@ -536,8 +539,8 @@ impl Future for ServerWorker {
} }
} }
// handle incoming io stream
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
// handle incoming io stream
Some(msg) => { Some(msg) => {
let guard = this.counter.guard(); let guard = this.counter.guard();
let _ = this.services[msg.token].service.call((guard, msg.io)); let _ = this.services[msg.token].service.call((guard, msg.io));