diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index d0c5416e..405f0a9f 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -77,6 +77,7 @@ impl AcceptLoop { struct Accept { poll: Poll, waker_rx: WakerRx, + sockets: Box<[ServerSocketInfo]>, handles: Vec, srv: Server, next: usize, @@ -174,7 +175,7 @@ impl Accept { .spawn(move || { System::set_current(sys); - let mut sockets = socks + let sockets = socks .into_iter() .map(|(token, mut lst)| { // Start listening for incoming connections @@ -188,7 +189,7 @@ impl Accept { timeout: None, } }) - .collect::>(); + .collect(); let mut avail = Availability::default(); @@ -198,6 +199,7 @@ impl Accept { let mut accept = Accept { poll, waker_rx, + sockets, handles, srv, next: 0, @@ -209,49 +211,49 @@ impl Accept { let waker = waker::from_registry(accept.poll.registry()).unwrap(); let cx = &mut Context::from_waker(&waker); - accept.poll_with(&mut sockets, cx); + accept.poll(cx); }) .unwrap(); } - fn poll_with(&mut self, sockets: &mut [ServerSocketInfo], cx: &mut Context<'_>) { + fn poll(&mut self, cx: &mut Context<'_>) { let mut events = mio::Events::with_capacity(128); // poll waker channel once and register the context/waker. - let exit = self.poll_waker(sockets, cx); + let exit = self.poll_waker(cx); if exit { info!("Accept is stopped."); return; } loop { - if let Err(e) = self.poll.poll(&mut events, None) { - match e.kind() { - io::ErrorKind::Interrupted => continue, - _ => panic!("Poll error: {}", e), - } + match self.poll.poll(&mut events, None) { + Ok(_) => {} + Err(e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => panic!("Poll error: {}", e), } for event in events.iter() { let token = event.token(); match token { WAKER_TOKEN => { - let exit = self.poll_waker(sockets, cx); + let exit = self.poll_waker(cx); if exit { info!("Accept is stopped."); return; } } _ => { + println!("polling event"); let token = usize::from(token); - self.accept(sockets, token); + self.accept(token); } } } } } - fn poll_waker(&mut self, sockets: &mut [ServerSocketInfo], cx: &mut Context<'_>) -> bool { + fn poll_waker(&mut self, cx: &mut Context<'_>) -> bool { // This is a loop because interests for command from previous version was // a loop that would try to drain the command channel. It's yet unknown // if it's necessary/good practice to actively drain the waker queue. @@ -261,7 +263,7 @@ impl Accept { self.avail.set_available(idx, true); if !self.paused { - self.accept_all(sockets); + self.accept_all(); } } // a new worker thread is made and it's handle would be added to Accept @@ -270,29 +272,26 @@ impl Accept { self.handles.push(handle); if !self.paused { - self.accept_all(sockets); + self.accept_all(); } } Some(WakerInterest::Pause) => { if !self.paused { self.paused = true; - self.deregister_all(sockets); + self.deregister_all(); } } Some(WakerInterest::Resume) => { if self.paused { self.paused = false; + self.register_all(); - sockets.iter_mut().for_each(|info| { - self.register_logged(info); - }); - - self.accept_all(sockets); + self.accept_all(); } } Some(WakerInterest::Stop) | None => { if !self.paused { - self.deregister_all(sockets); + self.deregister_all(); } return true; @@ -303,47 +302,20 @@ impl Accept { false } - #[cfg(not(target_os = "windows"))] - fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { - let token = MioToken(info.token); - self.poll - .registry() - .register(&mut info.lst, token, Interest::READABLE) - } + fn register_all(&mut self) { + let reg = self.poll.registry(); - #[cfg(target_os = "windows")] - fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { - // On windows, calling register without deregister cause an error. - // See https://github.com/actix/actix-web/issues/905 - // Calling reregister seems to fix the issue. - let token = MioToken(info.token); - self.poll - .registry() - .register(&mut info.lst, token, Interest::READABLE) - .or_else(|_| { - self.poll - .registry() - .reregister(&mut info.lst, token, Interest::READABLE) - }) - } - - fn register_logged(&self, info: &mut ServerSocketInfo) { - match self.register(info) { - Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()), - Err(e) => error!("Can not register server socket {}", e), - } - } - - fn deregister_logged(&self, info: &mut ServerSocketInfo) { - match self.poll.registry().deregister(&mut info.lst) { - Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()), - Err(e) => { - error!("Can not deregister server socket {}", e) + self.sockets.iter_mut().for_each(|info| { + let token = MioToken(info.token); + match reg.register(&mut info.lst, token, Interest::READABLE) { + Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()), + Err(e) => error!("Can not register server socket {}", e), } - } + }) } - fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { + fn deregister_all(&mut self) { + let reg = self.poll.registry(); // This is a best effort implementation with following limitation: // // Every ServerSocketInfo with associate timeout will be skipped and it's timeout @@ -352,14 +324,17 @@ impl Accept { // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short // gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered // before expected timing. - sockets + self.sockets .iter_mut() // Take all timeout. // This is to prevent Accept::process_timer method re-register a socket afterwards. - .map(|info| (info.timeout.take(), info)) + .map(|info| (info.timeout.take(), &mut info.lst)) // Socket info with a timeout is already deregistered so skip them. .filter(|(timeout, _)| timeout.is_none()) - .for_each(|(_, info)| self.deregister_logged(info)); + .for_each(|(_, lst)| match reg.deregister(lst) { + Ok(_) => info!("Paused accepting connections on {}", lst.local_addr()), + Err(e) => error!("Can not deregister server socket {}", e), + }); } // Send connection to worker and handle error. @@ -419,9 +394,9 @@ impl Accept { } } - fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { + fn accept(&mut self, token: usize) { while self.avail.available() { - let info = &mut sockets[token]; + let info = &mut self.sockets[token]; match info.lst.accept() { Ok(io) => { @@ -434,7 +409,12 @@ impl Accept { error!("Error accepting connection: {}", e); // deregister listener temporary - self.deregister_logged(info); + match self.poll.registry().deregister(&mut info.lst) { + Ok(_) => { + info!("Paused accepting connections on {}", info.lst.local_addr()) + } + Err(e) => error!("Can not deregister server socket {}", e), + }; // sleep after error. write the timeout to socket info as later // the poll would need it mark which socket and when it's @@ -447,13 +427,13 @@ impl Accept { } } - fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) { - sockets + fn accept_all(&mut self) { + self.sockets .iter_mut() .map(|info| info.token) .collect::>() .into_iter() - .for_each(|idx| self.accept(sockets, idx)) + .for_each(|idx| self.accept(idx)) } #[inline(always)]