diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 0a5732d8..34fb3775 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -27,7 +27,7 @@ actix-utils = "3.0.0-beta.1" futures-core = { version = "0.3.7", default-features = false } log = "0.4" -mio = { version = "0.7.3", features = [ "os-poll", "tcp", "uds"] } +mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" slab = "0.4" tokio = { version = "1", features = ["sync"] } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index ef62d7dc..bf895f06 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -166,65 +166,66 @@ impl Accept { for event in events.iter() { let token = event.token(); match token { - // This is a loop because interests for command from previous version was a - // loop that would try to drain the command channel. It's yet unknown if it's - // necessary/good practice to actively drain the waker queue. - WAKER_TOKEN => { - 'waker: loop { - // take guard with every iteration so no new interest can be added - // until the current task is done. - let mut guard = self.waker.guard(); - match guard.pop_front() { - // worker notify it becomes available. we may want to recover from - // backpressure. - Some(WakerInterest::WorkerAvailable) => { - drop(guard); - self.maybe_backpressure(&mut sockets, false); - } - // a new worker thread is made and it's handle would be added to Accept - Some(WakerInterest::Worker(handle)) => { - drop(guard); - // maybe we want to recover from a backpressure. - self.maybe_backpressure(&mut sockets, false); - self.handles.push(handle); - } - // got timer interest and it's time to try register socket(s) again. - Some(WakerInterest::Timer) => { - drop(guard); - self.process_timer(&mut sockets) - } - Some(WakerInterest::Pause) => { - drop(guard); - sockets.iter_mut().for_each(|(_, info)| { - match self.deregister(info) { - Ok(_) => info!( - "Paused accepting connections on {}", - info.addr - ), - Err(e) => { - error!("Can not deregister server socket {}", e) - } + // This is a loop because interests for command from previous version was + // a loop that would try to drain the command channel. It's yet unknown + // if it's necessary/good practice to actively drain the waker queue. + WAKER_TOKEN => 'waker: loop { + // take guard with every iteration so no new interest can be added + // until the current task is done. + let mut guard = self.waker.guard(); + match guard.pop_front() { + // worker notify it becomes available. we may want to recover + // from backpressure. + Some(WakerInterest::WorkerAvailable) => { + drop(guard); + self.maybe_backpressure(&mut sockets, false); + } + // a new worker thread is made and it's handle would be added + // to Accept + Some(WakerInterest::Worker(handle)) => { + drop(guard); + // maybe we want to recover from a backpressure. + self.maybe_backpressure(&mut sockets, false); + self.handles.push(handle); + } + // got timer interest and it's time to try register socket(s) + // again. + Some(WakerInterest::Timer) => { + drop(guard); + self.process_timer(&mut sockets) + } + Some(WakerInterest::Pause) => { + drop(guard); + sockets.iter_mut().for_each(|(_, info)| { + match self.deregister(info) { + Ok(_) => info!( + "Paused accepting connections on {}", + info.addr + ), + Err(e) => { + error!("Can not deregister server socket {}", e) } - }); - } - Some(WakerInterest::Resume) => { - drop(guard); - sockets.iter_mut().for_each(|(token, info)| { - self.register_logged(token, info); - }); - } - Some(WakerInterest::Stop) => { - return self.deregister_all(&mut sockets); - } - // waker queue is drained. - None => { - // Reset the VecDeque before break so it does not grow infinitely. - WakerQueue::reset(&mut guard); - break 'waker; - } + } + }); + } + Some(WakerInterest::Resume) => { + drop(guard); + sockets.iter_mut().for_each(|(token, info)| { + self.register_logged(token, info); + }); + } + Some(WakerInterest::Stop) => { + return self.deregister_all(&mut sockets); + } + // waker queue is drained. + None => { + // Reset the WakerQueue before break so it does not grow + // infinitely. + WakerQueue::reset(&mut guard); + break 'waker; } } - } + }, _ => { let token = usize::from(token); self.accept(&mut sockets, token);