Fix panic when sending message through closed waker channel

This commit is contained in:
fakeshadow 2021-04-25 04:19:54 +08:00
parent f0501d53d3
commit 40544d01ae
3 changed files with 56 additions and 63 deletions

View File

@ -9,14 +9,11 @@ use log::{error, info};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use crate::server::Server; use crate::server::Server;
use crate::socket::{MioListener, SocketAddr}; use crate::socket::MioListener;
use crate::waker::{self, WakerInterest, WakerRx, WakerTx, WAKER_TOKEN}; use crate::waker::{self, WakerInterest, WakerRx, WakerTx, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept}; use crate::worker::{Conn, WorkerHandleAccept};
struct ServerSocketInfo { struct ServerSocketInfo {
/// Address of socket. Mainly used for logging.
addr: SocketAddr,
token: usize, token: usize,
lst: MioListener, lst: MioListener,
@ -171,12 +168,42 @@ impl Accept {
// Accept runs in its own thread and would want to spawn additional futures to current // Accept runs in its own thread and would want to spawn additional futures to current
// actix system. // actix system.
let sys = System::current(); let sys = System::current();
thread::Builder::new() thread::Builder::new()
.name("actix-server accept loop".to_owned()) .name("actix-server accept loop".to_owned())
.spawn(move || { .spawn(move || {
System::set_current(sys); System::set_current(sys);
let (mut accept, mut sockets) =
Accept::new_with_sockets(poll, waker_rx, socks, handles, srv); let mut sockets = socks
.into_iter()
.map(|(token, mut lst)| {
// Start listening for incoming connections
poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
ServerSocketInfo {
token,
lst,
timeout: None,
}
})
.collect::<Vec<_>>();
let mut avail = Availability::default();
// Assume all handles are avail at construct time.
avail.set_available_all(&handles);
let mut accept = Accept {
poll,
waker_rx,
handles,
srv,
next: 0,
avail,
paused: false,
};
// Construct Context from waker. // Construct Context from waker.
let waker = waker::from_registry(accept.poll.registry()).unwrap(); let waker = waker::from_registry(accept.poll.registry()).unwrap();
@ -187,50 +214,6 @@ impl Accept {
.unwrap(); .unwrap();
} }
fn new_with_sockets(
poll: Poll,
waker_rx: WakerRx,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
srv: Server,
) -> (Accept, Vec<ServerSocketInfo>) {
let sockets = socks
.into_iter()
.map(|(token, mut lst)| {
let addr = lst.local_addr();
// Start listening for incoming connections
poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
ServerSocketInfo {
addr,
token,
lst,
timeout: None,
}
})
.collect();
let mut avail = Availability::default();
// Assume all handles are avail at construct time.
avail.set_available_all(&handles);
let accept = Accept {
poll,
waker_rx,
handles,
srv,
next: 0,
avail,
paused: false,
};
(accept, sockets)
}
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo], cx: &mut Context<'_>) { fn poll_with(&mut self, sockets: &mut [ServerSocketInfo], cx: &mut Context<'_>) {
let mut events = mio::Events::with_capacity(128); let mut events = mio::Events::with_capacity(128);
@ -346,14 +329,14 @@ impl Accept {
fn register_logged(&self, info: &mut ServerSocketInfo) { fn register_logged(&self, info: &mut ServerSocketInfo) {
match self.register(info) { match self.register(info) {
Ok(_) => info!("Resume accepting connections on {}", info.addr), Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
Err(e) => error!("Can not register server socket {}", e), Err(e) => error!("Can not register server socket {}", e),
} }
} }
fn deregister_logged(&self, info: &mut ServerSocketInfo) { fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.poll.registry().deregister(&mut info.lst) { match self.poll.registry().deregister(&mut info.lst) {
Ok(_) => info!("Paused accepting connections on {}", info.addr), Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()),
Err(e) => { Err(e) => {
error!("Can not deregister server socket {}", e) error!("Can not deregister server socket {}", e)
} }

View File

@ -23,9 +23,15 @@ pub(crate) enum MioListener {
impl MioListener { impl MioListener {
pub(crate) fn local_addr(&self) -> SocketAddr { pub(crate) fn local_addr(&self) -> SocketAddr {
match *self { match *self {
MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), MioListener::Tcp(ref lst) => lst
.local_addr()
.map(SocketAddr::Tcp)
.unwrap_or(SocketAddr::Unknown),
#[cfg(unix)] #[cfg(unix)]
MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()), MioListener::Uds(ref lst) => lst
.local_addr()
.map(SocketAddr::Uds)
.unwrap_or(SocketAddr::Unknown),
} }
} }
@ -110,14 +116,15 @@ impl fmt::Debug for MioListener {
impl fmt::Display for MioListener { impl fmt::Display for MioListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {
MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), MioListener::Tcp(ref lst) => write!(f, "{:?}", lst),
#[cfg(unix)] #[cfg(unix)]
MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), MioListener::Uds(ref lst) => write!(f, "{:?}", lst),
} }
} }
} }
pub(crate) enum SocketAddr { pub(crate) enum SocketAddr {
Unknown,
Tcp(StdSocketAddr), Tcp(StdSocketAddr),
#[cfg(unix)] #[cfg(unix)]
Uds(mio::net::SocketAddr), Uds(mio::net::SocketAddr),
@ -126,9 +133,10 @@ pub(crate) enum SocketAddr {
impl fmt::Display for SocketAddr { impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), Self::Unknown => write!(f, "Unknown Socket Address"),
Self::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(unix)] #[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), Self::Uds(ref addr) => write!(f, "{:?}", addr),
} }
} }
} }
@ -136,9 +144,10 @@ impl fmt::Display for SocketAddr {
impl fmt::Debug for SocketAddr { impl fmt::Debug for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), Self::Unknown => write!(f, "Unknown Socket Address"),
Self::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(unix)] #[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), Self::Uds(ref addr) => write!(f, "{:?}", addr),
} }
} }
} }

View File

@ -57,9 +57,10 @@ impl Clone for WakerTx {
impl WakerTx { impl WakerTx {
/// Send WakerInterest through channel and panic on error(shutdown). /// Send WakerInterest through channel and panic on error(shutdown).
pub(crate) fn wake(&self, interest: WakerInterest) { pub(crate) fn wake(&self, interest: WakerInterest) {
self.0 // ingore result. tokio UnboundedSender only fail when the channel
.send(interest) // is closed. In that case the Accept thread is gone and no further
.unwrap_or_else(|e| panic!("Can not send WakerInterest: {}", e)); // wake up is needed/possible.
let _ = self.0.send(interest);
} }
} }