diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 5b6345cc..06432e54 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -83,7 +83,7 @@ struct Accept { handles: Vec, srv: Server, next: usize, - backpressure: bool, + state: u8, } /// This function defines errors that are per-connection. Which basically @@ -154,7 +154,7 @@ impl Accept { handles, srv, next: 0, - backpressure: false, + state: 0, }; (accept, sockets) @@ -190,13 +190,17 @@ impl Accept { // from backpressure. Some(WakerInterest::WorkerAvailable) => { drop(guard); - self.maybe_backpressure(&mut sockets, false); + if !self.pause() { + self.maybe_backpressure(&mut sockets, false); + } } // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { drop(guard); // maybe we want to recover from a backpressure. - self.maybe_backpressure(&mut sockets, false); + if !self.pause() { + self.maybe_backpressure(&mut sockets, false); + } self.handles.push(handle); } // got timer interest and it's time to try register socket(s) again @@ -206,16 +210,24 @@ impl Accept { } Some(WakerInterest::Pause) => { drop(guard); - self.deregister_all(&mut sockets); + if !self.pause() { + self.set_pause(true); + self.deregister_all(&mut sockets); + } } Some(WakerInterest::Resume) => { drop(guard); sockets.iter_mut().for_each(|(token, info)| { self.register_logged(token, info); }); + self.set_pause(false); } Some(WakerInterest::Stop) => { - return self.deregister_all(&mut sockets); + if !self.pause() { + self.deregister_all(&mut sockets); + } + + return; } // waker queue is drained None => { @@ -245,7 +257,7 @@ impl Accept { if now < inst { info.timeout = Some(inst); - } else if !self.backpressure { + } else if !self.backpressure() { self.register_logged(token, info); } @@ -286,12 +298,8 @@ impl Accept { } } - fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> { - self.poll.registry().deregister(&mut info.lst) - } - fn deregister_logged(&self, info: &mut ServerSocketInfo) { - match self.deregister(info) { + match self.poll.registry().deregister(&mut info.lst) { Ok(_) => info!("Paused accepting connections on {}", info.addr), Err(e) => { error!("Can not deregister server socket {}", e) @@ -320,8 +328,8 @@ impl Accept { fn maybe_backpressure(&mut self, sockets: &mut Slab, on: bool) { // Only operate when server is in a different backpressure than the given flag. - if self.backpressure != on { - self.backpressure = on; + if self.backpressure() != on { + self.set_backpressure(on); sockets .iter_mut() // Only operate on sockets without associated timeout. @@ -339,7 +347,7 @@ impl Accept { } fn accept_one(&mut self, sockets: &mut Slab, mut conn: Conn) { - if self.backpressure { + if self.backpressure() { // send_connection would remove fault worker from handles. // worst case here is conn get dropped after all handles are gone. while !self.handles.is_empty() { @@ -426,7 +434,9 @@ impl Accept { error!("Error accepting connection: {}", e); // deregister listener temporary - self.deregister_logged(info); + if !self.backpressure() { + self.deregister_logged(info); + } // sleep after error. write the timeout to socket info as later // the poll would need it mark which socket and when it's @@ -446,3 +456,38 @@ impl Accept { } } } + +/// bit flag for marking server in pause state; +const PAUSE: u8 = 1; +/// bit flag for marking server in back pressure state. +const BACK_PRESSURE: u8 = 1 << 1; + +impl Accept { + fn backpressure(&self) -> bool { + self.state(BACK_PRESSURE) + } + + fn pause(&self) -> bool { + self.state(PAUSE) + } + + fn set_backpressure(&mut self, on: bool) { + self.set_state(BACK_PRESSURE, on); + } + + fn set_pause(&mut self, on: bool) { + self.set_state(PAUSE, on); + } + + fn state(&self, state: u8) -> bool { + self.state & state != 0 + } + + fn set_state(&mut self, state: u8, on: bool) { + if on { + self.state |= state; + } else { + self.state ^= state; + } + } +}