This commit is contained in:
Rob Ede 2021-04-01 07:56:46 +01:00 committed by GitHub
parent b3f5632ea9
commit d7a8d3ea96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 9 additions and 11 deletions

View File

@ -187,21 +187,19 @@ impl Accept {
let mut guard = self.waker.guard(); let mut guard = self.waker.guard();
match guard.pop_front() { match guard.pop_front() {
// worker notify it becomes available. we may want to recover // worker notify it becomes available. we may want to recover
// from backpressure. // from backpressure.
Some(WakerInterest::WorkerAvailable) => { Some(WakerInterest::WorkerAvailable) => {
drop(guard); drop(guard);
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
} }
// a new worker thread is made and it's handle would be added // a new worker thread is made and it's handle would be added to Accept
// to Accept
Some(WakerInterest::Worker(handle)) => { Some(WakerInterest::Worker(handle)) => {
drop(guard); drop(guard);
// maybe we want to recover from a backpressure. // maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
self.handles.push(handle); self.handles.push(handle);
} }
// got timer interest and it's time to try register socket(s) // got timer interest and it's time to try register socket(s) again
// again.
Some(WakerInterest::Timer) => { Some(WakerInterest::Timer) => {
drop(guard); drop(guard);
self.process_timer(&mut sockets) self.process_timer(&mut sockets)
@ -240,7 +238,7 @@ impl Accept {
let now = Instant::now(); let now = Instant::now();
sockets sockets
.iter_mut() .iter_mut()
// Only the ServerSocketInfo have an associate timeout value was de registered. // Only sockets that had an associated timeout were deregistered.
.filter(|(_, info)| info.timeout.is_some()) .filter(|(_, info)| info.timeout.is_some())
.for_each(|(token, info)| { .for_each(|(token, info)| {
let inst = info.timeout.take().unwrap(); let inst = info.timeout.take().unwrap();
@ -250,9 +248,10 @@ impl Accept {
} else if !self.backpressure { } else if !self.backpressure {
self.register_logged(token, info); self.register_logged(token, info);
} }
// Drop the timeout if server is in backpressure and socket timeout is expired. // Drop the timeout if server is in backpressure and socket timeout is expired.
// When server is recovered from backpressure it would register all socket without // When server recovers from backpressure it will register all sockets without
// a timeout value so this socket register would be delayed till then. // a timeout value so this socket register will be delayed till then.
}); });
} }
@ -317,9 +316,8 @@ impl Accept {
self.backpressure = false; self.backpressure = false;
sockets sockets
.iter_mut() .iter_mut()
// Only operate on socket without associated timeout. // Only operate on sockets without associated timeout.
// Socket info with it would attempt to re-register itself when its timeout // Sockets with it will attempt to re-register when their timeout expires.
// expire.
.filter(|(_, info)| info.timeout.is_none()) .filter(|(_, info)| info.timeout.is_none())
.for_each(|(token, info)| self.register_logged(token, info)); .for_each(|(token, info)| self.register_logged(token, info));
} }