dont panic in accept loop

This commit is contained in:
Rob Ede 2021-11-02 23:57:50 +00:00
parent c56265d9aa
commit 9662a6e51c
2 changed files with 134 additions and 12 deletions

View File

@ -65,7 +65,7 @@ impl AcceptLoop {
let poll = self.poll.take().unwrap();
let waker = self.waker.clone();
Accept::start(poll, waker, socks, srv, handles);
Accept::start(poll, waker, socks, srv, handles).expect("accept failed to start");
}
}
@ -155,10 +155,13 @@ impl Accept {
socks: Vec<(usize, MioListener)>,
srv: ServerHandle,
handles: Vec<WorkerHandleAccept>,
) {
) -> io::Result<()> {
// Accept runs in its own thread and might spawn additional futures to current system
let sys = System::try_current();
let (mut accept, mut sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv)?;
thread::Builder::new()
.name("actix-server accept loop".to_owned())
.spawn(move || {
@ -167,12 +170,11 @@ impl Accept {
System::set_current(sys);
}
let (mut accept, mut sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv);
accept.poll_with(&mut sockets);
})
.unwrap();
Ok(())
}
fn new_with_sockets(
@ -181,22 +183,21 @@ impl Accept {
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
srv: ServerHandle,
) -> (Accept, Vec<ServerSocketInfo>) {
) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> {
let sockets = socks
.into_iter()
.map(|(token, mut lst)| {
// Start listening for incoming connections
poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
.register(&mut lst, MioToken(token), Interest::READABLE)?;
ServerSocketInfo {
Ok(ServerSocketInfo {
token,
lst,
timeout: None,
}
})
})
.collect();
.collect::<io::Result<_>>()?;
let mut avail = Availability::default();
@ -213,7 +214,7 @@ impl Accept {
paused: false,
};
(accept, sockets)
Ok((accept, sockets))
}
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {

View File

@ -0,0 +1,121 @@
use crate::worker::WorkerHandleAccept;
/// Array of u128 with every bit as marker for a worker handle's availability.
#[derive(Debug, Default)]
pub(crate) struct Availability([u128; 4]);
impl Availability {
/// Check if any worker handle is available
#[inline(always)]
pub(crate) fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
}
/// Check if worker handle is available by index
#[inline(always)]
pub(crate) fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index.
pub(crate) fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = Self::offset(idx);
let off = 1 << idx as u128;
if avail {
self.0[offset] |= off;
} else {
self.0[offset] &= !off
}
}
/// Set all worker handle to available state.
/// This would result in a re-check on all workers' availability.
pub(crate) fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| {
self.set_available(handle.idx(), true);
})
}
/// Get offset and adjusted index of given worker handle index.
pub(crate) fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn single(aval: &mut Availability, idx: usize) {
aval.set_available(idx, true);
assert!(aval.available());
aval.set_available(idx, true);
aval.set_available(idx, false);
assert!(!aval.available());
aval.set_available(idx, false);
assert!(!aval.available());
}
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
idx.iter().for_each(|idx| aval.set_available(*idx, true));
assert!(aval.available());
while let Some(idx) = idx.pop() {
assert!(aval.available());
aval.set_available(idx, false);
}
assert!(!aval.available());
}
#[test]
fn availability() {
let mut aval = Availability::default();
single(&mut aval, 1);
single(&mut aval, 128);
single(&mut aval, 256);
single(&mut aval, 511);
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
multi(&mut aval, idx);
multi(&mut aval, (0..511).collect())
}
#[test]
#[should_panic]
fn overflow() {
let mut aval = Availability::default();
single(&mut aval, 512);
}
#[test]
fn pin_point() {
let mut aval = Availability::default();
aval.set_available(438, true);
aval.set_available(479, true);
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
}
}