Refactor Accept::accept_one

This commit is contained in:
fakeshadow 2021-04-01 08:33:14 +08:00
parent 8becb0db70
commit 8ed34cc46e
1 changed files with 48 additions and 46 deletions

View File

@ -318,71 +318,73 @@ impl Accept {
}
}
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut msg: Conn) {
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut conn: Conn) {
if self.backpressure {
// send_connection would remove fault worker from handles.
// worst case here is conn get dropped after all handles are gone.
while !self.handles.is_empty() {
match self.handles[self.next].send(msg) {
Ok(_) => {
self.set_next();
break;
}
Err(tmp) => {
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made
// after that remove the fault worker
self.srv.worker_faulted(self.handles[self.next].idx);
msg = tmp;
self.handles.swap_remove(self.next);
if self.handles.is_empty() {
error!("No workers");
return;
} else if self.handles.len() <= self.next {
self.next = 0;
}
continue;
}
match self.send_connection(sockets, conn) {
Ok(_) => return,
Err(c) => conn = c,
}
}
} else {
// Do one round and try to send conn to all workers until it succeed.
// Start from self.next.
let mut idx = 0;
while idx < self.handles.len() {
idx += 1;
if self.handles[self.next].available() {
match self.handles[self.next].send(msg) {
match self.send_connection(sockets, conn) {
Ok(_) => return,
Err(c) => conn = c,
}
} else {
self.set_next();
}
}
// Sending Conn failed due to either all workers are in error or not available.
// Enter backpressure state and try again.
self.maybe_backpressure(sockets, true);
self.accept_one(sockets, conn);
}
}
// Set next worker handle that would accept work.
fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len();
}
// Send connection to worker and handle error.
fn send_connection(
&mut self,
sockets: &mut Slab<ServerSocketInfo>,
conn: Conn,
) -> Result<(), Conn> {
match self.handles[self.next].send(conn) {
Ok(_) => {
self.set_next();
return;
}
Ok(())
},
Err(conn) => {
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made.
// after that remove the fault worker and enter backpressure if necessary.
Err(tmp) => {
self.srv.worker_faulted(self.handles[self.next].idx);
msg = tmp;
self.handles.swap_remove(self.next);
if self.handles.is_empty() {
error!("No workers");
self.maybe_backpressure(sockets, true);
return;
// All workers are gone and Conn is nowhere to be sent.
// Treat this situation as Ok and drop Conn.
return Ok(());
} else if self.handles.len() <= self.next {
self.next = 0;
}
continue;
Err(conn)
}
}
}
self.set_next();
}
// enable backpressure
self.maybe_backpressure(sockets, true);
self.accept_one(sockets, msg);
}
}
// set next worker handle that would accept work.
fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len();
}
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
loop {