fix code style/update to mio 0.7.6

This commit is contained in:
fakeshadow 2020-12-29 06:46:03 +08:00
parent cb58696682
commit a6d6600cbf
2 changed files with 58 additions and 57 deletions

View File

@ -27,7 +27,7 @@ actix-utils = "3.0.0-beta.1"
futures-core = { version = "0.3.7", default-features = false } futures-core = { version = "0.3.7", default-features = false }
log = "0.4" log = "0.4"
mio = { version = "0.7.3", features = [ "os-poll", "tcp", "uds"] } mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13" num_cpus = "1.13"
slab = "0.4" slab = "0.4"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }

View File

@ -166,65 +166,66 @@ impl Accept {
for event in events.iter() { for event in events.iter() {
let token = event.token(); let token = event.token();
match token { match token {
// This is a loop because interests for command from previous version was a // This is a loop because interests for command from previous version was
// loop that would try to drain the command channel. It's yet unknown if it's // a loop that would try to drain the command channel. It's yet unknown
// necessary/good practice to actively drain the waker queue. // if it's necessary/good practice to actively drain the waker queue.
WAKER_TOKEN => { WAKER_TOKEN => 'waker: loop {
'waker: loop { // take guard with every iteration so no new interest can be added
// take guard with every iteration so no new interest can be added // until the current task is done.
// until the current task is done. let mut guard = self.waker.guard();
let mut guard = self.waker.guard(); match guard.pop_front() {
match guard.pop_front() { // worker notify it becomes available. we may want to recover
// worker notify it becomes available. we may want to recover from // from backpressure.
// backpressure. Some(WakerInterest::WorkerAvailable) => {
Some(WakerInterest::WorkerAvailable) => { drop(guard);
drop(guard); self.maybe_backpressure(&mut sockets, false);
self.maybe_backpressure(&mut sockets, false); }
} // a new worker thread is made and it's handle would be added
// a new worker thread is made and it's handle would be added to Accept // to Accept
Some(WakerInterest::Worker(handle)) => { Some(WakerInterest::Worker(handle)) => {
drop(guard); drop(guard);
// maybe we want to recover from a backpressure. // maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
self.handles.push(handle); self.handles.push(handle);
} }
// got timer interest and it's time to try register socket(s) again. // got timer interest and it's time to try register socket(s)
Some(WakerInterest::Timer) => { // again.
drop(guard); Some(WakerInterest::Timer) => {
self.process_timer(&mut sockets) drop(guard);
} self.process_timer(&mut sockets)
Some(WakerInterest::Pause) => { }
drop(guard); Some(WakerInterest::Pause) => {
sockets.iter_mut().for_each(|(_, info)| { drop(guard);
match self.deregister(info) { sockets.iter_mut().for_each(|(_, info)| {
Ok(_) => info!( match self.deregister(info) {
"Paused accepting connections on {}", Ok(_) => info!(
info.addr "Paused accepting connections on {}",
), info.addr
Err(e) => { ),
error!("Can not deregister server socket {}", e) Err(e) => {
} error!("Can not deregister server socket {}", e)
} }
}); }
} });
Some(WakerInterest::Resume) => { }
drop(guard); Some(WakerInterest::Resume) => {
sockets.iter_mut().for_each(|(token, info)| { drop(guard);
self.register_logged(token, info); sockets.iter_mut().for_each(|(token, info)| {
}); self.register_logged(token, info);
} });
Some(WakerInterest::Stop) => { }
return self.deregister_all(&mut sockets); Some(WakerInterest::Stop) => {
} return self.deregister_all(&mut sockets);
// waker queue is drained. }
None => { // waker queue is drained.
// Reset the VecDeque before break so it does not grow infinitely. None => {
WakerQueue::reset(&mut guard); // Reset the WakerQueue before break so it does not grow
break 'waker; // infinitely.
} WakerQueue::reset(&mut guard);
break 'waker;
} }
} }
} },
_ => { _ => {
let token = usize::from(token); let token = usize::from(token);
self.accept(&mut sockets, token); self.accept(&mut sockets, token);