move accept_all to backpressure

This commit is contained in:
fakeshadow 2021-04-17 11:49:25 +08:00
parent b0f02ae8fc
commit 428a288d3f
1 changed files with 14 additions and 8 deletions

View File

@ -252,18 +252,16 @@ impl Accept {
// from backpressure. // from backpressure.
Some(WakerInterest::WorkerAvailable(idx)) => { Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard); drop(guard);
self.maybe_backpressure(sockets, false);
self.avail.set_available(idx, true); self.avail.set_available(idx, true);
self.accept_all(sockets); self.maybe_backpressure(sockets, false);
} }
// a new worker thread is made and it's handle would be added to Accept // a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => { Some(WakerInterest::Worker(handle)) => {
drop(guard); drop(guard);
// maybe we want to recover from a backpressure.
self.maybe_backpressure(sockets, false);
self.avail.set_available(handle.idx(), true); self.avail.set_available(handle.idx(), true);
self.handles.push(handle); self.handles.push(handle);
self.accept_all(sockets); // maybe we want to recover from a backpressure.
self.maybe_backpressure(sockets, false);
} }
// got timer interest and it's time to try register socket(s) again // got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => { Some(WakerInterest::Timer) => {
@ -391,6 +389,14 @@ impl Accept {
self.register_logged(info); self.register_logged(info);
} }
}); });
// Try to drain sockets backlog on recovery of backpressure.
// This is necessary for not hang listeners.
//
// In Accept::accept method listener does not always read to WouldBlock state.
if !self.backpressure {
self.accept_all(sockets);
}
} }
} }
@ -425,12 +431,12 @@ impl Accept {
} }
} }
fn send_conn(&mut self, sockets: &mut [ServerSocketInfo], mut conn: Conn) { fn accept_one(&mut self, sockets: &mut [ServerSocketInfo], mut conn: Conn) {
loop { loop {
let next = self.next(); let next = self.next();
let idx = next.idx(); let idx = next.idx();
if next.available() { if next.available() {
self.avail.set_available(idx, true); // self.avail.set_available(idx, true);
match self.send_connection(sockets, conn) { match self.send_connection(sockets, conn) {
Ok(_) => return, Ok(_) => return,
Err(c) => conn = c, Err(c) => conn = c,
@ -463,7 +469,7 @@ impl Accept {
match info.lst.accept() { match info.lst.accept() {
Ok(io) => { Ok(io) => {
let conn = Conn { io, token }; let conn = Conn { io, token };
self.send_conn(sockets, conn); self.accept_one(sockets, conn);
} }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue, Err(ref e) if connection_error(e) => continue,