make waker interests handling a cold method

This commit is contained in:
fakeshadow 2021-04-13 11:55:34 +08:00
parent 42aa0aa9ff
commit 7ec2d8450a
1 changed files with 68 additions and 61 deletions

View File

@ -38,7 +38,7 @@ pub(crate) struct Accept {
srv: ServerHandle, srv: ServerHandle,
next: usize, next: usize,
backpressure: bool, backpressure: bool,
// poll time duration. // poll time out duration.
// use the smallest duration from sockets timeout_deadline. // use the smallest duration from sockets timeout_deadline.
timeout: Option<Duration>, timeout: Option<Duration>,
} }
@ -80,11 +80,9 @@ impl Accept {
.into_iter() .into_iter()
.unzip(); .unzip();
let wake_queue_clone = waker_queue.clone();
let (mut accept, sockets) = Accept::new_with_sockets( let (mut accept, sockets) = Accept::new_with_sockets(
poll, poll,
wake_queue_clone, waker_queue.clone(),
sockets, sockets,
handles_accept, handles_accept,
server_handle, server_handle,
@ -143,67 +141,28 @@ impl Accept {
let mut events = mio::Events::with_capacity(128); let mut events = mio::Events::with_capacity(128);
loop { loop {
if let Err(e) = self.poll.poll(&mut events, self.timeout) { match self.poll.poll(&mut events, self.timeout) {
match e.kind() { Ok(_) => {
std::io::ErrorKind::Interrupted => { for event in events.iter() {
// check for timeout and re-register sockets. let token = event.token();
self.process_timeout(&mut sockets); match token {
continue; WAKER_TOKEN => {
} let should_return = self.handle_waker(&mut sockets);
_ => panic!("Poll error: {}", e), if should_return {
} return;
} }
for event in events.iter() {
let token = event.token();
match token {
// This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue.
WAKER_TOKEN => 'waker: loop {
// take guard with every iteration so no new interest can be added
// until the current task is done.
let mut guard = self.waker_queue.guard();
match guard.pop_front() {
// worker notify it becomes available. we may want to recover
// from backpressure.
Some(WakerInterest::WorkerAvailable) => {
drop(guard);
self.maybe_backpressure(&mut sockets, false);
} }
// a new worker thread is made and it's handle would be added to Accept _ => {
Some(WakerInterest::Worker(handle)) => { let token = usize::from(token);
drop(guard); self.accept(&mut sockets, token)
// maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false);
self.handles.push(handle);
}
Some(WakerInterest::Pause) => {
drop(guard);
self.deregister_all(&mut sockets);
}
Some(WakerInterest::Resume) => {
drop(guard);
sockets.iter_mut().for_each(|(token, info)| {
self.register_logged(token, info);
});
}
Some(WakerInterest::Stop) => {
return self.deregister_all(&mut sockets);
}
// waker queue is drained
None => {
// Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard);
break 'waker;
} }
} }
},
_ => {
let token = usize::from(token);
self.accept(&mut sockets, token);
} }
} }
Err(e) => match e.kind() {
std::io::ErrorKind::Interrupted => {}
_ => panic!("Poll error: {}", e),
},
} }
// check for timeout and re-register sockets. // check for timeout and re-register sockets.
@ -211,6 +170,54 @@ impl Accept {
} }
} }
/// Return true to notify `Accept::poll_with` to return.
#[cold]
fn handle_waker(&mut self, sockets: &mut Slab<ServerSocketInfo>) -> bool {
// This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue.
loop {
// take guard with every iteration so no new interest can be added
// until the current task is done.
let mut guard = self.waker_queue.guard();
match guard.pop_front() {
// worker notify it becomes available. we may want to recover
// from backpressure.
Some(WakerInterest::WorkerAvailable) => {
drop(guard);
self.maybe_backpressure(sockets, false);
}
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
// maybe we want to recover from a backpressure.
self.maybe_backpressure(sockets, false);
self.handles.push(handle);
}
Some(WakerInterest::Pause) => {
drop(guard);
self.deregister_all(sockets);
}
Some(WakerInterest::Resume) => {
drop(guard);
sockets.iter_mut().for_each(|(token, info)| {
self.register_logged(token, info);
});
}
Some(WakerInterest::Stop) => {
self.deregister_all(sockets);
return true;
}
// waker queue is drained
None => {
// Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard);
return false;
}
}
}
}
fn process_timeout(&mut self, sockets: &mut Slab<ServerSocketInfo>) { fn process_timeout(&mut self, sockets: &mut Slab<ServerSocketInfo>) {
// Take old timeout as it's no use after each iteration. // Take old timeout as it's no use after each iteration.
if self.timeout.take().is_some() { if self.timeout.take().is_some() {
@ -238,7 +245,7 @@ impl Accept {
} }
} }
// update Accept timeout duration. would keep the smallest duration. /// Update Accept timeout duration. would keep the smallest duration.
fn set_timeout(&mut self, dur: Duration) { fn set_timeout(&mut self, dur: Duration) {
match self.timeout { match self.timeout {
Some(ref mut timeout) => { Some(ref mut timeout) => {