Merge branch 'master' into fix/server_deregister_all

This commit is contained in:
fakeshadow 2021-04-04 14:04:36 -07:00 committed by GitHub
commit 5381294b16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 117 additions and 81 deletions

View File

@ -1,17 +1,27 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{
future::Future,
mem,
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};
use actix_rt::time::{sleep, Sleep};
use actix_rt::{spawn, Arbiter};
use actix_rt::{
spawn,
time::{sleep, Instant, Sleep},
Arbiter,
};
use actix_utils::counter::Counter;
use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream;
@ -24,7 +34,7 @@ pub(crate) struct WorkerCommand(Conn);
/// and `false` if some connections still alive.
pub(crate) struct StopCommand {
graceful: bool,
result: oneshot::Sender<bool>,
tx: oneshot::Sender<bool>,
}
#[derive(Debug)]
@ -88,8 +98,8 @@ impl WorkerHandle {
}
pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (result, rx) = oneshot::channel();
let _ = self.tx2.send(StopCommand { graceful, result });
let (tx, rx) = oneshot::channel();
let _ = self.tx2.send(StopCommand { graceful, tx });
rx
}
}
@ -132,7 +142,7 @@ pub(crate) struct ServerWorker {
conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState,
config: ServerWorkerConfig,
shutdown_timeout: Duration,
}
struct WorkerService {
@ -211,12 +221,12 @@ impl ServerWorker {
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
rx,
rx2,
services: Vec::new(),
availability,
factories,
config,
services: Vec::new(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
conns: conns.clone(),
state: WorkerState::Unavailable,
});
let fut = wrk
@ -262,11 +272,15 @@ impl ServerWorker {
WorkerHandle::new(idx, tx1, tx2, avail)
}
fn restart_service(&mut self, token: Token, idx: usize) {
let factory = &self.factories[idx];
fn restart_service(&mut self, token: Token, factory_id: usize) {
let factory = &self.factories[factory_id];
trace!("Service {:?} failed, restarting", factory.name(token));
self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(idx, token, factory.create());
self.state = WorkerState::Restarting(Restart {
factory_id,
token,
fut: factory.create(),
});
}
fn shutdown(&mut self, force: bool) {
@ -332,66 +346,87 @@ impl ServerWorker {
enum WorkerState {
Available,
Unavailable,
Restarting(
usize,
Token,
LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
),
Shutdown(
Pin<Box<Sleep>>,
Pin<Box<Sleep>>,
Option<oneshot::Sender<bool>>,
),
Restarting(Restart),
Shutdown(Shutdown),
}
struct Restart {
factory_id: usize,
token: Token,
fut: LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
}
// Shutdown keep states necessary for server shutdown:
// Sleep for interval check the shutdown progress.
// Instant for the start time of shutdown.
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
struct Shutdown {
timer: Pin<Box<Sleep>>,
start_from: Instant,
tx: oneshot::Sender<bool>,
}
impl Default for WorkerState {
fn default() -> Self {
Self::Unavailable
}
}
impl Future for ServerWorker {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
// `StopWorker` message handler
if let Poll::Ready(Some(StopCommand { graceful, result })) =
Pin::new(&mut self.rx2).poll_recv(cx)
if let Poll::Ready(Some(StopCommand { graceful, tx })) =
Pin::new(&mut this.rx2).poll_recv(cx)
{
self.availability.set(false);
this.availability.set(false);
let num = num_connections();
if num == 0 {
info!("Shutting down worker, 0 connections");
let _ = result.send(true);
let _ = tx.send(true);
return Poll::Ready(());
} else if graceful {
self.shutdown(false);
info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown(
Box::pin(sleep(Duration::from_secs(1))),
Box::pin(sleep(self.config.shutdown_timeout)),
Some(result),
);
this.shutdown(false);
this.state = WorkerState::Shutdown(Shutdown {
timer: Box::pin(sleep(Duration::from_secs(1))),
start_from: Instant::now(),
tx,
});
} else {
info!("Force shutdown worker, {} connections", num);
self.shutdown(true);
let _ = result.send(false);
this.shutdown(true);
let _ = tx.send(false);
return Poll::Ready(());
}
}
match self.state {
WorkerState::Unavailable => match self.check_readiness(cx) {
match this.state {
WorkerState::Unavailable => match this.check_readiness(cx) {
Ok(true) => {
self.state = WorkerState::Available;
self.availability.set(true);
this.state = WorkerState::Available;
this.availability.set(true);
self.poll(cx)
}
Ok(false) => Poll::Pending,
Err((token, idx)) => {
self.restart_service(token, idx);
this.restart_service(token, idx);
self.poll(cx)
}
},
WorkerState::Restarting(idx, token, ref mut fut) => {
let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| {
WorkerState::Restarting(ref mut restart) => {
let factory_id = restart.factory_id;
let token = restart.token;
let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| {
panic!(
"Can not restart {:?} service",
self.factories[idx].name(token)
this.factories[factory_id].name(token)
)
});
@ -403,60 +438,61 @@ impl Future for ServerWorker {
trace!(
"Service {:?} has been restarted",
self.factories[idx].name(token)
this.factories[factory_id].name(token)
);
self.services[token.0].created(service);
self.state = WorkerState::Unavailable;
this.services[token.0].created(service);
this.state = WorkerState::Unavailable;
self.poll(cx)
}
WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => {
let num = num_connections();
if num == 0 {
let _ = tx.take().unwrap().send(true);
WorkerState::Shutdown(ref mut shutdown) => {
// Wait for 1 second.
ready!(shutdown.timer.as_mut().poll(cx));
if num_connections() == 0 {
// Graceful shutdown.
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(true);
}
Arbiter::current().stop();
return Poll::Ready(());
}
// check graceful timeout
if Pin::new(t2).poll(cx).is_ready() {
let _ = tx.take().unwrap().send(false);
self.shutdown(true);
Poll::Ready(())
} else if shutdown.start_from.elapsed() >= this.shutdown_timeout {
// Timeout forceful shutdown.
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(false);
}
Arbiter::current().stop();
return Poll::Ready(());
Poll::Ready(())
} else {
// Reset timer and wait for 1 second.
let time = Instant::now() + Duration::from_secs(1);
shutdown.timer.as_mut().reset(time);
shutdown.timer.as_mut().poll(cx)
}
// sleep for 1 second and then check again
if t1.as_mut().poll(cx).is_ready() {
*t1 = Box::pin(sleep(Duration::from_secs(1)));
let _ = t1.as_mut().poll(cx);
}
Poll::Pending
}
// actively poll stream and handle worker command
WorkerState::Available => loop {
match self.check_readiness(cx) {
match this.check_readiness(cx) {
Ok(true) => {}
Ok(false) => {
trace!("Worker is unavailable");
self.availability.set(false);
self.state = WorkerState::Unavailable;
this.availability.set(false);
this.state = WorkerState::Unavailable;
return self.poll(cx);
}
Err((token, idx)) => {
self.restart_service(token, idx);
self.availability.set(false);
this.restart_service(token, idx);
this.availability.set(false);
return self.poll(cx);
}
}
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
// handle incoming io stream
Some(WorkerCommand(msg)) => {
let guard = self.conns.get();
let _ = self.services[msg.token.0].service.call((guard, msg.io));
let guard = this.conns.get();
let _ = this.services[msg.token.0].service.call((guard, msg.io));
}
None => return Poll::Ready(()),
};