From 501488f86eaa3661277b595f4a0818928f657c50 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 14 Apr 2021 15:57:32 +0800 Subject: [PATCH] Fix bug where backpressure happen too early --- actix-server/src/accept.rs | 123 +++++++++++++++++++++++++++++-------- 1 file changed, 98 insertions(+), 25 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 5b9f99c7..25eda37d 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -83,9 +83,55 @@ struct Accept { handles: Vec, srv: Server, next: usize, + avail: Availability, backpressure: bool, } +/// Array of u128 with every bit as marker for a worker handle's availability. +struct Availability([u128; 4]); + +impl Default for Availability { + fn default() -> Self { + Self([0; 4]) + } +} + +impl Availability { + /// Check if any worker handle is available + fn available(&self) -> bool { + self.0.iter().any(|a| *a != 0) + } + + /// Set worker handle available state by index. + fn set_available(&mut self, idx: usize, avail: bool) { + let (offset, idx) = if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + unreachable!("Max WorkerHandle count is 512") + }; + + if avail { + self.0[offset] |= 1 << idx as u128; + } else { + self.0[offset] ^= 1 << idx as u128; + } + } + + /// Set all worker handle to available state. + /// This would result in a re-check on all workers' availability. + fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { + handles.iter().for_each(|handle| { + self.set_available(handle.idx, true); + }) + } +} + /// This function defines errors that are per-connection. Which basically /// means that if we get this error from `accept()` system call it means /// next connection might be ready to be accepted. @@ -116,6 +162,7 @@ impl Accept { System::set_current(sys); let (mut accept, sockets) = Accept::new_with_sockets(poll, waker, socks, handles, srv); + accept.poll_with(sockets); }) .unwrap(); @@ -148,16 +195,31 @@ impl Accept { }); } - let accept = Accept { + let accept = Accept::new(poll, waker, handles, srv); + + (accept, sockets) + } + + fn new( + poll: Poll, + waker: WakerQueue, + handles: Vec, + srv: Server, + ) -> Self { + let mut avail = Availability::default(); + + // Assume all handles are avail at construct time. + avail.set_available_all(&handles); + + Self { poll, waker, handles, srv, next: 0, + avail, backpressure: false, - }; - - (accept, sockets) + } } fn poll_with(&mut self, mut sockets: Slab) { @@ -190,6 +252,8 @@ impl Accept { // from backpressure. Some(WakerInterest::WorkerAvailable) => { drop(guard); + // Assume all worker are avail as no worker index returned. + self.avail.set_available_all(&self.handles); self.maybe_backpressure(&mut sockets, false); } // a new worker thread is made and it's handle would be added to Accept @@ -197,6 +261,7 @@ impl Accept { drop(guard); // maybe we want to recover from a backpressure. self.maybe_backpressure(&mut sockets, false); + self.avail.set_available(handle.idx, true); self.handles.push(handle); } // got timer interest and it's time to try register socket(s) again @@ -342,27 +407,25 @@ impl Accept { if self.backpressure { // send_connection would remove fault worker from handles. // worst case here is conn get dropped after all handles are gone. - while !self.handles.is_empty() { - match self.send_connection(sockets, conn) { - Ok(_) => return, - Err(c) => conn = c, - } + while let Err(c) = self.send_connection(sockets, conn) { + conn = c } } else { - // Do one round and try to send conn to all workers until it succeed. - // Start from self.next. - let mut idx = 0; - while idx < self.handles.len() { - idx += 1; - if self.handles[self.next].available() { + while self.avail.available() { + let next = self.next; + let idx = self.handles[next].idx; + if self.handles[next].available() { + self.avail.set_available(idx, true); match self.send_connection(sockets, conn) { Ok(_) => return, Err(c) => conn = c, } } else { + self.avail.set_available(idx, false); self.set_next(); } } + // Sending Conn failed due to either all workers are in error or not available. // Enter backpressure state and try again. self.maybe_backpressure(sockets, true); @@ -370,11 +433,6 @@ impl Accept { } } - // Set next worker handle that would accept work. - fn set_next(&mut self) { - self.next = (self.next + 1) % self.handles.len(); - } - // Send connection to worker and handle error. fn send_connection( &mut self, @@ -387,11 +445,10 @@ impl Accept { Ok(()) } Err(conn) => { - // worker lost contact and could be gone. a message is sent to - // `ServerBuilder` future to notify it a new worker should be made. - // after that remove the fault worker and enter backpressure if necessary. - self.srv.worker_faulted(self.handles[self.next].idx); - self.handles.swap_remove(self.next); + // Worker thread is error and could be gone. + // Remove worker handle and notify `ServerBuilder`. + self.remove_next(); + if self.handles.is_empty() { error!("No workers"); self.maybe_backpressure(sockets, true); @@ -401,6 +458,7 @@ impl Accept { } else if self.handles.len() <= self.next { self.next = 0; } + Err(conn) } } @@ -445,4 +503,19 @@ impl Accept { }; } } + + /// Set next worker handle that would accept connection. + fn set_next(&mut self) { + self.next = (self.next + 1) % self.handles.len(); + } + + /// Remove next worker handle that fail to accept connection. + fn remove_next(&mut self) { + let handle = self.handles.swap_remove(self.next); + let idx = handle.idx; + // A message is sent to `ServerBuilder` future to notify it a new worker + // should be made. + self.srv.worker_faulted(idx); + self.avail.set_available(idx, false); + } }