diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 5813f091..d0c5416e 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -9,14 +9,11 @@ use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; use crate::server::Server; -use crate::socket::{MioListener, SocketAddr}; +use crate::socket::MioListener; use crate::waker::{self, WakerInterest, WakerRx, WakerTx, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandleAccept}; struct ServerSocketInfo { - /// Address of socket. Mainly used for logging. - addr: SocketAddr, - token: usize, lst: MioListener, @@ -171,12 +168,42 @@ impl Accept { // Accept runs in its own thread and would want to spawn additional futures to current // actix system. let sys = System::current(); + thread::Builder::new() .name("actix-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - let (mut accept, mut sockets) = - Accept::new_with_sockets(poll, waker_rx, socks, handles, srv); + + let mut sockets = socks + .into_iter() + .map(|(token, mut lst)| { + // Start listening for incoming connections + poll.registry() + .register(&mut lst, MioToken(token), Interest::READABLE) + .unwrap_or_else(|e| panic!("Can not register io: {}", e)); + + ServerSocketInfo { + token, + lst, + timeout: None, + } + }) + .collect::>(); + + let mut avail = Availability::default(); + + // Assume all handles are avail at construct time. + avail.set_available_all(&handles); + + let mut accept = Accept { + poll, + waker_rx, + handles, + srv, + next: 0, + avail, + paused: false, + }; // Construct Context from waker. let waker = waker::from_registry(accept.poll.registry()).unwrap(); @@ -187,50 +214,6 @@ impl Accept { .unwrap(); } - fn new_with_sockets( - poll: Poll, - waker_rx: WakerRx, - socks: Vec<(usize, MioListener)>, - handles: Vec, - srv: Server, - ) -> (Accept, Vec) { - let sockets = socks - .into_iter() - .map(|(token, mut lst)| { - let addr = lst.local_addr(); - - // Start listening for incoming connections - poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - - ServerSocketInfo { - addr, - token, - lst, - timeout: None, - } - }) - .collect(); - - let mut avail = Availability::default(); - - // Assume all handles are avail at construct time. - avail.set_available_all(&handles); - - let accept = Accept { - poll, - waker_rx, - handles, - srv, - next: 0, - avail, - paused: false, - }; - - (accept, sockets) - } - fn poll_with(&mut self, sockets: &mut [ServerSocketInfo], cx: &mut Context<'_>) { let mut events = mio::Events::with_capacity(128); @@ -346,14 +329,14 @@ impl Accept { fn register_logged(&self, info: &mut ServerSocketInfo) { match self.register(info) { - Ok(_) => info!("Resume accepting connections on {}", info.addr), + Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()), Err(e) => error!("Can not register server socket {}", e), } } fn deregister_logged(&self, info: &mut ServerSocketInfo) { match self.poll.registry().deregister(&mut info.lst) { - Ok(_) => info!("Paused accepting connections on {}", info.addr), + Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()), Err(e) => { error!("Can not deregister server socket {}", e) } diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 948b5f1f..3a31584a 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -23,9 +23,15 @@ pub(crate) enum MioListener { impl MioListener { pub(crate) fn local_addr(&self) -> SocketAddr { match *self { - MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), + MioListener::Tcp(ref lst) => lst + .local_addr() + .map(SocketAddr::Tcp) + .unwrap_or(SocketAddr::Unknown), #[cfg(unix)] - MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()), + MioListener::Uds(ref lst) => lst + .local_addr() + .map(SocketAddr::Uds) + .unwrap_or(SocketAddr::Unknown), } } @@ -110,14 +116,15 @@ impl fmt::Debug for MioListener { impl fmt::Display for MioListener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), + MioListener::Tcp(ref lst) => write!(f, "{:?}", lst), #[cfg(unix)] - MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), + MioListener::Uds(ref lst) => write!(f, "{:?}", lst), } } } pub(crate) enum SocketAddr { + Unknown, Tcp(StdSocketAddr), #[cfg(unix)] Uds(mio::net::SocketAddr), @@ -126,9 +133,10 @@ pub(crate) enum SocketAddr { impl fmt::Display for SocketAddr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), + Self::Unknown => write!(f, "Unknown Socket Address"), + Self::Tcp(ref addr) => write!(f, "{}", addr), #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + Self::Uds(ref addr) => write!(f, "{:?}", addr), } } } @@ -136,9 +144,10 @@ impl fmt::Display for SocketAddr { impl fmt::Debug for SocketAddr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), + Self::Unknown => write!(f, "Unknown Socket Address"), + Self::Tcp(ref addr) => write!(f, "{:?}", addr), #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + Self::Uds(ref addr) => write!(f, "{:?}", addr), } } } diff --git a/actix-server/src/waker.rs b/actix-server/src/waker.rs index b19c22a7..34178764 100644 --- a/actix-server/src/waker.rs +++ b/actix-server/src/waker.rs @@ -57,9 +57,10 @@ impl Clone for WakerTx { impl WakerTx { /// Send WakerInterest through channel and panic on error(shutdown). pub(crate) fn wake(&self, interest: WakerInterest) { - self.0 - .send(interest) - .unwrap_or_else(|e| panic!("Can not send WakerInterest: {}", e)); + // ingore result. tokio UnboundedSender only fail when the channel + // is closed. In that case the Accept thread is gone and no further + // wake up is needed/possible. + let _ = self.0.send(interest); } }