Add pause flag for Accept

This commit is contained in:
fakeshadow 2021-04-07 00:30:33 +08:00
parent 5961eb892e
commit ac78583fb5
1 changed files with 61 additions and 16 deletions

View File

@ -83,7 +83,7 @@ struct Accept {
handles: Vec<WorkerHandle>,
srv: Server,
next: usize,
backpressure: bool,
state: u8,
}
/// This function defines errors that are per-connection. Which basically
@ -154,7 +154,7 @@ impl Accept {
handles,
srv,
next: 0,
backpressure: false,
state: 0,
};
(accept, sockets)
@ -190,13 +190,17 @@ impl Accept {
// from backpressure.
Some(WakerInterest::WorkerAvailable) => {
drop(guard);
self.maybe_backpressure(&mut sockets, false);
if !self.pause() {
self.maybe_backpressure(&mut sockets, false);
}
}
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
// maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false);
if !self.pause() {
self.maybe_backpressure(&mut sockets, false);
}
self.handles.push(handle);
}
// got timer interest and it's time to try register socket(s) again
@ -206,16 +210,24 @@ impl Accept {
}
Some(WakerInterest::Pause) => {
drop(guard);
self.deregister_all(&mut sockets);
if !self.pause() {
self.set_pause(true);
self.deregister_all(&mut sockets);
}
}
Some(WakerInterest::Resume) => {
drop(guard);
sockets.iter_mut().for_each(|(token, info)| {
self.register_logged(token, info);
});
self.set_pause(false);
}
Some(WakerInterest::Stop) => {
return self.deregister_all(&mut sockets);
if !self.pause() {
self.deregister_all(&mut sockets);
}
return;
}
// waker queue is drained
None => {
@ -245,7 +257,7 @@ impl Accept {
if now < inst {
info.timeout = Some(inst);
} else if !self.backpressure {
} else if !self.backpressure() {
self.register_logged(token, info);
}
@ -286,12 +298,8 @@ impl Accept {
}
}
fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
self.poll.registry().deregister(&mut info.lst)
}
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.deregister(info) {
match self.poll.registry().deregister(&mut info.lst) {
Ok(_) => info!("Paused accepting connections on {}", info.addr),
Err(e) => {
error!("Can not deregister server socket {}", e)
@ -320,8 +328,8 @@ impl Accept {
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
// Only operate when server is in a different backpressure than the given flag.
if self.backpressure != on {
self.backpressure = on;
if self.backpressure() != on {
self.set_backpressure(on);
sockets
.iter_mut()
// Only operate on sockets without associated timeout.
@ -339,7 +347,7 @@ impl Accept {
}
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut conn: Conn) {
if self.backpressure {
if self.backpressure() {
// send_connection would remove fault worker from handles.
// worst case here is conn get dropped after all handles are gone.
while !self.handles.is_empty() {
@ -426,7 +434,9 @@ impl Accept {
error!("Error accepting connection: {}", e);
// deregister listener temporary
self.deregister_logged(info);
if !self.backpressure() {
self.deregister_logged(info);
}
// sleep after error. write the timeout to socket info as later
// the poll would need it mark which socket and when it's
@ -446,3 +456,38 @@ impl Accept {
}
}
}
/// bit flag for marking server in pause state;
const PAUSE: u8 = 1;
/// bit flag for marking server in back pressure state.
const BACK_PRESSURE: u8 = 1 << 1;
impl Accept {
fn backpressure(&self) -> bool {
self.state(BACK_PRESSURE)
}
fn pause(&self) -> bool {
self.state(PAUSE)
}
fn set_backpressure(&mut self, on: bool) {
self.set_state(BACK_PRESSURE, on);
}
fn set_pause(&mut self, on: bool) {
self.set_state(PAUSE, on);
}
fn state(&self, state: u8) -> bool {
self.state & state != 0
}
fn set_state(&mut self, state: u8, on: bool) {
if on {
self.state |= state;
} else {
self.state ^= state;
}
}
}