From 7ec2d8450ab5b7c491b343054f646f75029372a6 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 13 Apr 2021 11:55:34 +0800 Subject: [PATCH] make waker interests handling a cold method --- actix-server/src/accept.rs | 129 +++++++++++++++++++------------------ 1 file changed, 68 insertions(+), 61 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 16ac3407..bb30ac0a 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -38,7 +38,7 @@ pub(crate) struct Accept { srv: ServerHandle, next: usize, backpressure: bool, - // poll time duration. + // poll time out duration. // use the smallest duration from sockets timeout_deadline. timeout: Option, } @@ -80,11 +80,9 @@ impl Accept { .into_iter() .unzip(); - let wake_queue_clone = waker_queue.clone(); - let (mut accept, sockets) = Accept::new_with_sockets( poll, - wake_queue_clone, + waker_queue.clone(), sockets, handles_accept, server_handle, @@ -143,67 +141,28 @@ impl Accept { let mut events = mio::Events::with_capacity(128); loop { - if let Err(e) = self.poll.poll(&mut events, self.timeout) { - match e.kind() { - std::io::ErrorKind::Interrupted => { - // check for timeout and re-register sockets. - self.process_timeout(&mut sockets); - continue; - } - _ => panic!("Poll error: {}", e), - } - } - - for event in events.iter() { - let token = event.token(); - match token { - // This is a loop because interests for command from previous version was - // a loop that would try to drain the command channel. It's yet unknown - // if it's necessary/good practice to actively drain the waker queue. - WAKER_TOKEN => 'waker: loop { - // take guard with every iteration so no new interest can be added - // until the current task is done. - let mut guard = self.waker_queue.guard(); - match guard.pop_front() { - // worker notify it becomes available. we may want to recover - // from backpressure. - Some(WakerInterest::WorkerAvailable) => { - drop(guard); - self.maybe_backpressure(&mut sockets, false); + match self.poll.poll(&mut events, self.timeout) { + Ok(_) => { + for event in events.iter() { + let token = event.token(); + match token { + WAKER_TOKEN => { + let should_return = self.handle_waker(&mut sockets); + if should_return { + return; + } } - // a new worker thread is made and it's handle would be added to Accept - Some(WakerInterest::Worker(handle)) => { - drop(guard); - // maybe we want to recover from a backpressure. - self.maybe_backpressure(&mut sockets, false); - self.handles.push(handle); - } - Some(WakerInterest::Pause) => { - drop(guard); - self.deregister_all(&mut sockets); - } - Some(WakerInterest::Resume) => { - drop(guard); - sockets.iter_mut().for_each(|(token, info)| { - self.register_logged(token, info); - }); - } - Some(WakerInterest::Stop) => { - return self.deregister_all(&mut sockets); - } - // waker queue is drained - None => { - // Reset the WakerQueue before break so it does not grow infinitely - WakerQueue::reset(&mut guard); - break 'waker; + _ => { + let token = usize::from(token); + self.accept(&mut sockets, token) } } - }, - _ => { - let token = usize::from(token); - self.accept(&mut sockets, token); } } + Err(e) => match e.kind() { + std::io::ErrorKind::Interrupted => {} + _ => panic!("Poll error: {}", e), + }, } // check for timeout and re-register sockets. @@ -211,6 +170,54 @@ impl Accept { } } + /// Return true to notify `Accept::poll_with` to return. + #[cold] + fn handle_waker(&mut self, sockets: &mut Slab) -> bool { + // This is a loop because interests for command from previous version was + // a loop that would try to drain the command channel. It's yet unknown + // if it's necessary/good practice to actively drain the waker queue. + loop { + // take guard with every iteration so no new interest can be added + // until the current task is done. + let mut guard = self.waker_queue.guard(); + match guard.pop_front() { + // worker notify it becomes available. we may want to recover + // from backpressure. + Some(WakerInterest::WorkerAvailable) => { + drop(guard); + self.maybe_backpressure(sockets, false); + } + // a new worker thread is made and it's handle would be added to Accept + Some(WakerInterest::Worker(handle)) => { + drop(guard); + // maybe we want to recover from a backpressure. + self.maybe_backpressure(sockets, false); + self.handles.push(handle); + } + Some(WakerInterest::Pause) => { + drop(guard); + self.deregister_all(sockets); + } + Some(WakerInterest::Resume) => { + drop(guard); + sockets.iter_mut().for_each(|(token, info)| { + self.register_logged(token, info); + }); + } + Some(WakerInterest::Stop) => { + self.deregister_all(sockets); + return true; + } + // waker queue is drained + None => { + // Reset the WakerQueue before break so it does not grow infinitely + WakerQueue::reset(&mut guard); + return false; + } + } + } + } + fn process_timeout(&mut self, sockets: &mut Slab) { // Take old timeout as it's no use after each iteration. if self.timeout.take().is_some() { @@ -238,7 +245,7 @@ impl Accept { } } - // update Accept timeout duration. would keep the smallest duration. + /// Update Accept timeout duration. would keep the smallest duration. fn set_timeout(&mut self, dur: Duration) { match self.timeout { Some(ref mut timeout) => {