From de149527ff0f3351f2a25620b079115fa3b9c069 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 20 Apr 2021 00:17:31 +0800 Subject: [PATCH] Remove backpressure. Add pause state --- actix-server/src/accept.rs | 86 ++++++++++++++------------------------ actix-server/src/worker.rs | 9 ++-- 2 files changed, 37 insertions(+), 58 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 07fdfb4e..ab0ae707 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -5,7 +5,7 @@ use actix_rt::{ time::{sleep, Instant}, System, }; -use log::{error, info, warn}; +use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; use crate::server::Server; @@ -80,7 +80,7 @@ struct Accept { srv: Server, next: usize, avail: Availability, - backpressure: bool, + paused: bool, } /// Array of u128 with every bit as marker for a worker handle's availability. @@ -217,7 +217,7 @@ impl Accept { srv, next: 0, avail, - backpressure: false, + paused: false, }; (accept, sockets) @@ -229,7 +229,7 @@ impl Accept { loop { if let Err(e) = self.poll.poll(&mut events, None) { match e.kind() { - std::io::ErrorKind::Interrupted => continue, + io::ErrorKind::Interrupted => continue, _ => panic!("Poll error: {}", e), } } @@ -240,6 +240,7 @@ impl Accept { WAKER_TOKEN => { let exit = self.handle_waker(sockets); if exit { + info!("Accept is stopped."); return; } } @@ -261,15 +262,15 @@ impl Accept { // until the current task is done. let mut guard = self.waker.guard(); match guard.pop_front() { - // worker notify it becomes available. we may want to recover - // from backpressure. + // worker notify it becomes available. Some(WakerInterest::WorkerAvailable(idx)) => { drop(guard); self.avail.set_available(idx, true); - self.accept_all(sockets); - // self.maybe_backpressure(sockets, false); + if !self.paused { + self.accept_all(sockets); + } } // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { @@ -278,32 +279,44 @@ impl Accept { self.avail.set_available(handle.idx(), true); self.handles.push(handle); - self.accept_all(sockets); - // self.maybe_backpressure(sockets, false); + if !self.paused { + self.accept_all(sockets); + } } // got timer interest and it's time to try register socket(s) again Some(WakerInterest::Timer) => { drop(guard); + self.process_timer(sockets) } Some(WakerInterest::Pause) => { drop(guard); + + self.paused = true; + self.deregister_all(sockets); } Some(WakerInterest::Resume) => { drop(guard); + + self.paused = false; + sockets.iter_mut().for_each(|info| { self.register_logged(info); }); + + self.accept_all(sockets); } Some(WakerInterest::Stop) => { self.deregister_all(sockets); + return true; } // waker queue is drained None => { // Reset the WakerQueue before break so it does not grow infinitely WakerQueue::reset(&mut guard); + return false; } } @@ -321,12 +334,12 @@ impl Accept { if now < inst { info.timeout = Some(inst); - } else if !self.backpressure { + } else if !self.paused { self.register_logged(info); } - // Drop the timeout if server is in backpressure and socket timeout is expired. - // When server recovers from backpressure it will register all sockets without + // Drop the timeout if server is paused and socket timeout is expired. + // When server recovers from pause it will register all sockets without // a timeout value so this socket register will be delayed till then. }); } @@ -390,27 +403,6 @@ impl Accept { .for_each(|(_, info)| self.deregister_logged(info)); } - // TODO: back pressure is disabled for now. - // fn maybe_backpressure(&mut self, sockets: &mut [ServerSocketInfo], on: bool) { - // // Only operate when server is in a different backpressure than the given flag. - // if self.backpressure != on { - // self.backpressure = on; - // sockets - // .iter_mut() - // // Only operate on sockets without associated timeout. - // // Sockets with it should be handled by `accept` and `process_timer` methods. - // // They are already deregistered or need to be reregister in the future. - // .filter(|info| info.timeout.is_none()) - // .for_each(|info| { - // if on { - // self.deregister_logged(info); - // } else { - // self.register_logged(info); - // } - // }); - // } - // } - // Send connection to worker and handle error. fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> { let next = self.next(); @@ -418,7 +410,7 @@ impl Accept { Ok(_) => { // Increment counter of WorkerHandle. // Set worker to unavailable with it hit max (Return false). - if !next.available() { + if !next.incr_counter() { let idx = next.idx(); self.avail.set_available(idx, false); } @@ -432,7 +424,6 @@ impl Accept { if self.handles.is_empty() { error!("No workers"); - // self.maybe_backpressure(sockets, true); // All workers are gone and Conn is nowhere to be sent. // Treat this situation as Ok and drop Conn. return Ok(()); @@ -459,29 +450,14 @@ impl Accept { self.avail.set_available(idx, false); self.set_next(); - // The break condition should never be met. - // Address TODO below and remove dead code if this - // assert is proven to be never triggered. - assert!(self.avail.available()); - if !self.avail.available() { - break; + while let Err(c) = self.send_connection(conn) { + conn = c; + } + return; } } } - - // TODO: This branch is never entered. Consider remove. - warn!("Enter backpressure path. Please make report for issue"); - - // Sending connection failed due to either all workers are in error or not available. - // Enter backpressure state and try again. - // self.maybe_backpressure(sockets, true); - - // Force send connection to worker regardless it's avail state. - // Worst case here is conn get dropped after all handles are gone. - // while let Err(c) = self.send_connection(sockets, conn) { - // conn = c - // } } fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 8a67046b..df8bc723 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -72,12 +72,14 @@ impl Counter { } } - /// Increment counter it by 1 and return true when hitting limit + /// Increment counter it by 1 and return false when hitting limit + #[inline(always)] pub(crate) fn incr(&self) -> bool { self.counter.fetch_add(1, Ordering::Relaxed) != self.limit } /// Decrement counter it by 1 and return true when hitting limit + #[inline(always)] pub(crate) fn derc(&self) -> bool { self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit } @@ -109,6 +111,7 @@ impl WorkerCounter { } } + #[inline(always)] pub(crate) fn guard(&self) -> WorkerCounterGuard { WorkerCounterGuard(self.clone()) } @@ -151,7 +154,7 @@ impl WorkerHandleAccept { } #[inline(always)] - pub(crate) fn available(&self) -> bool { + pub(crate) fn incr_counter(&self) -> bool { self.counter.incr() } } @@ -536,8 +539,8 @@ impl Future for ServerWorker { } } + // handle incoming io stream match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { - // handle incoming io stream Some(msg) => { let guard = this.counter.guard(); let _ = this.services[msg.token].service.call((guard, msg.io));