mirror of https://github.com/fafhrd91/actix-net
merge master
This commit is contained in:
commit
c307b11b08
|
@ -37,12 +37,62 @@ pub(crate) struct Accept {
|
||||||
handles: Vec<WorkerHandleAccept>,
|
handles: Vec<WorkerHandleAccept>,
|
||||||
srv: ServerHandle,
|
srv: ServerHandle,
|
||||||
next: usize,
|
next: usize,
|
||||||
|
avail: Availability,
|
||||||
backpressure: bool,
|
backpressure: bool,
|
||||||
// poll time out duration.
|
// poll time out duration.
|
||||||
// use the smallest duration from sockets timeout_deadline.
|
// use the smallest duration from sockets timeout_deadline.
|
||||||
timeout: Option<Duration>,
|
timeout: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Array of u128 with every bit as marker for a worker handle's availability.
|
||||||
|
struct Availability([u128; 4]);
|
||||||
|
|
||||||
|
impl Default for Availability {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self([0; 4])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Availability {
|
||||||
|
/// Check if any worker handle is available
|
||||||
|
fn available(&self) -> bool {
|
||||||
|
self.0.iter().any(|a| *a != 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set worker handle available state by index.
|
||||||
|
fn set_available(&mut self, idx: usize, avail: bool) {
|
||||||
|
let (offset, idx) = if idx < 128 {
|
||||||
|
(0, idx)
|
||||||
|
} else if idx < 128 * 2 {
|
||||||
|
(1, idx - 128)
|
||||||
|
} else if idx < 128 * 3 {
|
||||||
|
(2, idx - 128 * 2)
|
||||||
|
} else if idx < 128 * 4 {
|
||||||
|
(3, idx - 128 * 3)
|
||||||
|
} else {
|
||||||
|
panic!("Max WorkerHandle count is 512")
|
||||||
|
};
|
||||||
|
|
||||||
|
if avail {
|
||||||
|
self.0[offset] |= 1 << idx as u128;
|
||||||
|
} else {
|
||||||
|
let shift = 1 << idx as u128;
|
||||||
|
|
||||||
|
debug_assert_ne!(self.0[offset] & shift, 0);
|
||||||
|
|
||||||
|
self.0[offset] ^= shift;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set all worker handle to available state.
|
||||||
|
/// This would result in a re-check on all workers' availability.
|
||||||
|
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
|
||||||
|
handles.iter().for_each(|handle| {
|
||||||
|
self.set_available(handle.idx, true);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// This function defines errors that are per-connection. Which basically
|
/// This function defines errors that are per-connection. Which basically
|
||||||
/// means that if we get this error from `accept()` system call it means
|
/// means that if we get this error from `accept()` system call it means
|
||||||
/// next connection might be ready to be accepted.
|
/// next connection might be ready to be accepted.
|
||||||
|
@ -124,12 +174,18 @@ impl Accept {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut avail = Availability::default();
|
||||||
|
|
||||||
|
// Assume all handles are avail at construct time.
|
||||||
|
avail.set_available_all(&handles);
|
||||||
|
|
||||||
let accept = Accept {
|
let accept = Accept {
|
||||||
poll,
|
poll,
|
||||||
waker_queue,
|
waker_queue,
|
||||||
handles,
|
handles,
|
||||||
srv,
|
srv,
|
||||||
next: 0,
|
next: 0,
|
||||||
|
avail,
|
||||||
backpressure: false,
|
backpressure: false,
|
||||||
timeout: None,
|
timeout: None,
|
||||||
};
|
};
|
||||||
|
@ -186,12 +242,17 @@ impl Accept {
|
||||||
Some(WakerInterest::WorkerAvailable) => {
|
Some(WakerInterest::WorkerAvailable) => {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
self.maybe_backpressure(sockets, false);
|
self.maybe_backpressure(sockets, false);
|
||||||
|
|
||||||
|
// Assume all worker are avail as no worker index returned.
|
||||||
|
self.avail.set_available_all(&self.handles);
|
||||||
}
|
}
|
||||||
// a new worker thread is made and it's handle would be added to Accept
|
// a new worker thread is made and it's handle would be added to Accept
|
||||||
Some(WakerInterest::Worker(handle)) => {
|
Some(WakerInterest::Worker(handle)) => {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
// maybe we want to recover from a backpressure.
|
// maybe we want to recover from a backpressure.
|
||||||
self.maybe_backpressure(sockets, false);
|
self.maybe_backpressure(sockets, false);
|
||||||
|
|
||||||
|
self.avail.set_available(handle.idx, true);
|
||||||
self.handles.push(handle);
|
self.handles.push(handle);
|
||||||
}
|
}
|
||||||
Some(WakerInterest::Pause) => {
|
Some(WakerInterest::Pause) => {
|
||||||
|
@ -340,27 +401,25 @@ impl Accept {
|
||||||
if self.backpressure {
|
if self.backpressure {
|
||||||
// send_connection would remove fault worker from handles.
|
// send_connection would remove fault worker from handles.
|
||||||
// worst case here is conn get dropped after all handles are gone.
|
// worst case here is conn get dropped after all handles are gone.
|
||||||
while !self.handles.is_empty() {
|
while let Err(c) = self.send_connection(sockets, conn) {
|
||||||
match self.send_connection(sockets, conn) {
|
conn = c
|
||||||
Ok(_) => return,
|
|
||||||
Err(c) => conn = c,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Do one round and try to send conn to all workers until it succeed.
|
while self.avail.available() {
|
||||||
// Start from self.next.
|
let next = self.next();
|
||||||
let mut idx = 0;
|
let idx = next.idx;
|
||||||
while idx < self.handles.len() {
|
if next.available() {
|
||||||
idx += 1;
|
self.avail.set_available(idx, true);
|
||||||
if self.handles[self.next].available() {
|
|
||||||
match self.send_connection(sockets, conn) {
|
match self.send_connection(sockets, conn) {
|
||||||
Ok(_) => return,
|
Ok(_) => return,
|
||||||
Err(c) => conn = c,
|
Err(c) => conn = c,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
self.avail.set_available(idx, false);
|
||||||
self.set_next();
|
self.set_next();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sending Conn failed due to either all workers are in error or not available.
|
// Sending Conn failed due to either all workers are in error or not available.
|
||||||
// Enter backpressure state and try again.
|
// Enter backpressure state and try again.
|
||||||
self.maybe_backpressure(sockets, true);
|
self.maybe_backpressure(sockets, true);
|
||||||
|
@ -368,28 +427,22 @@ impl Accept {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set next worker handle that would accept work.
|
|
||||||
fn set_next(&mut self) {
|
|
||||||
self.next = (self.next + 1) % self.handles.len();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send connection to worker and handle error.
|
// Send connection to worker and handle error.
|
||||||
fn send_connection(
|
fn send_connection(
|
||||||
&mut self,
|
&mut self,
|
||||||
sockets: &mut Slab<ServerSocketInfo>,
|
sockets: &mut Slab<ServerSocketInfo>,
|
||||||
conn: Conn,
|
conn: Conn,
|
||||||
) -> Result<(), Conn> {
|
) -> Result<(), Conn> {
|
||||||
match self.handles[self.next].send(conn) {
|
match self.next().send(conn) {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
self.set_next();
|
self.set_next();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(conn) => {
|
Err(conn) => {
|
||||||
// worker lost contact and could be gone. a message is sent to
|
// Worker thread is error and could be gone.
|
||||||
// `ServerBuilder` future to notify it a new worker should be made.
|
// Remove worker handle and notify `ServerBuilder`.
|
||||||
// after that remove the fault worker and enter backpressure if necessary.
|
self.remove_next();
|
||||||
self.srv.worker_faulted(self.handles[self.next].idx);
|
|
||||||
self.handles.swap_remove(self.next);
|
|
||||||
if self.handles.is_empty() {
|
if self.handles.is_empty() {
|
||||||
error!("No workers");
|
error!("No workers");
|
||||||
self.maybe_backpressure(sockets, true);
|
self.maybe_backpressure(sockets, true);
|
||||||
|
@ -399,6 +452,7 @@ impl Accept {
|
||||||
} else if self.handles.len() <= self.next {
|
} else if self.handles.len() <= self.next {
|
||||||
self.next = 0;
|
self.next = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
Err(conn)
|
Err(conn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -437,4 +491,92 @@ impl Accept {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn next(&self) -> &WorkerHandleAccept {
|
||||||
|
&self.handles[self.next]
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set next worker handle that would accept connection.
|
||||||
|
fn set_next(&mut self) {
|
||||||
|
self.next = (self.next + 1) % self.handles.len();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove next worker handle that fail to accept connection.
|
||||||
|
fn remove_next(&mut self) {
|
||||||
|
let handle = self.handles.swap_remove(self.next);
|
||||||
|
let idx = handle.idx;
|
||||||
|
// A message is sent to `ServerBuilder` future to notify it a new worker
|
||||||
|
// should be made.
|
||||||
|
self.srv.worker_faulted(idx);
|
||||||
|
self.avail.set_available(idx, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
use super::Availability;
|
||||||
|
|
||||||
|
fn single(aval: &mut Availability, idx: usize) {
|
||||||
|
aval.set_available(idx, true);
|
||||||
|
assert!(aval.available());
|
||||||
|
|
||||||
|
aval.set_available(idx, true);
|
||||||
|
|
||||||
|
aval.set_available(idx, false);
|
||||||
|
assert!(!aval.available());
|
||||||
|
}
|
||||||
|
|
||||||
|
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
|
||||||
|
idx.iter().for_each(|idx| aval.set_available(*idx, true));
|
||||||
|
|
||||||
|
assert!(aval.available());
|
||||||
|
|
||||||
|
while let Some(idx) = idx.pop() {
|
||||||
|
assert!(aval.available());
|
||||||
|
aval.set_available(idx, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(!aval.available());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn availability() {
|
||||||
|
let mut aval = Availability::default();
|
||||||
|
|
||||||
|
single(&mut aval, 1);
|
||||||
|
single(&mut aval, 128);
|
||||||
|
single(&mut aval, 256);
|
||||||
|
single(&mut aval, 511);
|
||||||
|
|
||||||
|
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
|
||||||
|
|
||||||
|
multi(&mut aval, idx);
|
||||||
|
|
||||||
|
multi(&mut aval, (0..511).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[should_panic]
|
||||||
|
fn overflow() {
|
||||||
|
let mut aval = Availability::default();
|
||||||
|
single(&mut aval, 512);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[should_panic]
|
||||||
|
fn double_set_unavailable() {
|
||||||
|
let mut aval = Availability::default();
|
||||||
|
aval.set_available(233, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn pin_point() {
|
||||||
|
let mut aval = Availability::default();
|
||||||
|
|
||||||
|
aval.set_available(438, true);
|
||||||
|
|
||||||
|
aval.set_available(479, true);
|
||||||
|
|
||||||
|
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue