Merge branch 'master' into fix/server_process_timer

This commit is contained in:
Rob Ede 2021-04-01 07:46:04 +01:00 committed by GitHub
commit b3f5632ea9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 57 deletions

View File

@ -326,72 +326,74 @@ impl Accept {
}
}
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut msg: Conn) {
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut conn: Conn) {
if self.backpressure {
// send_connection would remove fault worker from handles.
// worst case here is conn get dropped after all handles are gone.
while !self.handles.is_empty() {
match self.handles[self.next].send(msg) {
Ok(_) => {
self.set_next();
break;
}
Err(tmp) => {
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made
// after that remove the fault worker
self.srv.worker_faulted(self.handles[self.next].idx);
msg = tmp;
self.handles.swap_remove(self.next);
if self.handles.is_empty() {
error!("No workers");
return;
} else if self.handles.len() <= self.next {
self.next = 0;
}
continue;
}
match self.send_connection(sockets, conn) {
Ok(_) => return,
Err(c) => conn = c,
}
}
} else {
// Do one round and try to send conn to all workers until it succeed.
// Start from self.next.
let mut idx = 0;
while idx < self.handles.len() {
idx += 1;
if self.handles[self.next].available() {
match self.handles[self.next].send(msg) {
Ok(_) => {
self.set_next();
return;
}
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made.
// after that remove the fault worker and enter backpressure if necessary.
Err(tmp) => {
self.srv.worker_faulted(self.handles[self.next].idx);
msg = tmp;
self.handles.swap_remove(self.next);
if self.handles.is_empty() {
error!("No workers");
self.maybe_backpressure(sockets, true);
return;
} else if self.handles.len() <= self.next {
self.next = 0;
}
continue;
}
match self.send_connection(sockets, conn) {
Ok(_) => return,
Err(c) => conn = c,
}
} else {
self.set_next();
}
self.set_next();
}
// enable backpressure
// Sending Conn failed due to either all workers are in error or not available.
// Enter backpressure state and try again.
self.maybe_backpressure(sockets, true);
self.accept_one(sockets, msg);
self.accept_one(sockets, conn);
}
}
// set next worker handle that would accept work.
// Set next worker handle that would accept work.
fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len();
}
// Send connection to worker and handle error.
fn send_connection(
&mut self,
sockets: &mut Slab<ServerSocketInfo>,
conn: Conn,
) -> Result<(), Conn> {
match self.handles[self.next].send(conn) {
Ok(_) => {
self.set_next();
Ok(())
}
Err(conn) => {
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made.
// after that remove the fault worker and enter backpressure if necessary.
self.srv.worker_faulted(self.handles[self.next].idx);
self.handles.swap_remove(self.next);
if self.handles.is_empty() {
error!("No workers");
self.maybe_backpressure(sockets, true);
// All workers are gone and Conn is nowhere to be sent.
// Treat this situation as Ok and drop Conn.
return Ok(());
} else if self.handles.len() <= self.next {
self.next = 0;
}
Err(conn)
}
}
}
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
loop {
let info = sockets
@ -399,11 +401,10 @@ impl Accept {
.expect("ServerSocketInfo is removed from Slab");
match info.lst.accept() {
Ok((io, addr)) => {
Ok(io) => {
let msg = Conn {
io,
token: info.token,
peer: Some(addr),
};
self.accept_one(sockets, msg);
}

View File

@ -40,15 +40,11 @@ impl MioListener {
}
}
pub(crate) fn accept(&self) -> io::Result<(MioStream, SocketAddr)> {
pub(crate) fn accept(&self) -> io::Result<MioStream> {
match *self {
MioListener::Tcp(ref lst) => lst
.accept()
.map(|(stream, addr)| (MioStream::Tcp(stream), SocketAddr::Tcp(addr))),
MioListener::Tcp(ref lst) => lst.accept().map(|(stream, _)| MioStream::Tcp(stream)),
#[cfg(unix)]
MioListener::Uds(ref lst) => lst
.accept()
.map(|(stream, addr)| (MioStream::Uds(stream), SocketAddr::Uds(addr))),
MioListener::Uds(ref lst) => lst.accept().map(|(stream, _)| MioStream::Uds(stream)),
}
}
}

View File

@ -14,7 +14,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::{MioStream, SocketAddr};
use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::{join_all, Token};
@ -31,7 +31,6 @@ pub(crate) struct StopCommand {
pub(crate) struct Conn {
pub io: MioStream,
pub token: Token,
pub peer: Option<SocketAddr>,
}
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);