From 9662a6e51c57ed7e0d781b74582c2ebe72afa7ff Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 2 Nov 2021 23:57:50 +0000 Subject: [PATCH] dont panic in accept loop --- actix-server/src/accept.rs | 25 ++++--- actix-server/src/availability.rs | 121 +++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 12 deletions(-) create mode 100644 actix-server/src/availability.rs diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index bb75a1be..07192f45 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -65,7 +65,7 @@ impl AcceptLoop { let poll = self.poll.take().unwrap(); let waker = self.waker.clone(); - Accept::start(poll, waker, socks, srv, handles); + Accept::start(poll, waker, socks, srv, handles).expect("accept failed to start"); } } @@ -155,10 +155,13 @@ impl Accept { socks: Vec<(usize, MioListener)>, srv: ServerHandle, handles: Vec, - ) { + ) -> io::Result<()> { // Accept runs in its own thread and might spawn additional futures to current system let sys = System::try_current(); + let (mut accept, mut sockets) = + Accept::new_with_sockets(poll, waker, socks, handles, srv)?; + thread::Builder::new() .name("actix-server accept loop".to_owned()) .spawn(move || { @@ -167,12 +170,11 @@ impl Accept { System::set_current(sys); } - let (mut accept, mut sockets) = - Accept::new_with_sockets(poll, waker, socks, handles, srv); - accept.poll_with(&mut sockets); }) .unwrap(); + + Ok(()) } fn new_with_sockets( @@ -181,22 +183,21 @@ impl Accept { socks: Vec<(usize, MioListener)>, handles: Vec, srv: ServerHandle, - ) -> (Accept, Vec) { + ) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> { let sockets = socks .into_iter() .map(|(token, mut lst)| { // Start listening for incoming connections poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); + .register(&mut lst, MioToken(token), Interest::READABLE)?; - ServerSocketInfo { + Ok(ServerSocketInfo { token, lst, timeout: None, - } + }) }) - .collect(); + .collect::>()?; let mut avail = Availability::default(); @@ -213,7 +214,7 @@ impl Accept { paused: false, }; - (accept, sockets) + Ok((accept, sockets)) } fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { diff --git a/actix-server/src/availability.rs b/actix-server/src/availability.rs new file mode 100644 index 00000000..801b08f2 --- /dev/null +++ b/actix-server/src/availability.rs @@ -0,0 +1,121 @@ +use crate::worker::WorkerHandleAccept; + +/// Array of u128 with every bit as marker for a worker handle's availability. +#[derive(Debug, Default)] +pub(crate) struct Availability([u128; 4]); + +impl Availability { + /// Check if any worker handle is available + #[inline(always)] + pub(crate) fn available(&self) -> bool { + self.0.iter().any(|a| *a != 0) + } + + /// Check if worker handle is available by index + #[inline(always)] + pub(crate) fn get_available(&self, idx: usize) -> bool { + let (offset, idx) = Self::offset(idx); + + self.0[offset] & (1 << idx as u128) != 0 + } + + /// Set worker handle available state by index. + pub(crate) fn set_available(&mut self, idx: usize, avail: bool) { + let (offset, idx) = Self::offset(idx); + + let off = 1 << idx as u128; + if avail { + self.0[offset] |= off; + } else { + self.0[offset] &= !off + } + } + + /// Set all worker handle to available state. + /// This would result in a re-check on all workers' availability. + pub(crate) fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { + handles.iter().for_each(|handle| { + self.set_available(handle.idx(), true); + }) + } + + /// Get offset and adjusted index of given worker handle index. + pub(crate) fn offset(idx: usize) -> (usize, usize) { + if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn single(aval: &mut Availability, idx: usize) { + aval.set_available(idx, true); + assert!(aval.available()); + + aval.set_available(idx, true); + + aval.set_available(idx, false); + assert!(!aval.available()); + + aval.set_available(idx, false); + assert!(!aval.available()); + } + + fn multi(aval: &mut Availability, mut idx: Vec) { + idx.iter().for_each(|idx| aval.set_available(*idx, true)); + + assert!(aval.available()); + + while let Some(idx) = idx.pop() { + assert!(aval.available()); + aval.set_available(idx, false); + } + + assert!(!aval.available()); + } + + #[test] + fn availability() { + let mut aval = Availability::default(); + + single(&mut aval, 1); + single(&mut aval, 128); + single(&mut aval, 256); + single(&mut aval, 511); + + let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect(); + + multi(&mut aval, idx); + + multi(&mut aval, (0..511).collect()) + } + + #[test] + #[should_panic] + fn overflow() { + let mut aval = Availability::default(); + single(&mut aval, 512); + } + + #[test] + fn pin_point() { + let mut aval = Availability::default(); + + aval.set_available(438, true); + + aval.set_available(479, true); + + assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384)); + } +}