Implement fix for graceful shutdown

This commit is contained in:
fakeshadow 2021-04-30 21:51:37 +08:00
parent 341fcb3daf
commit 9a3e4818b4
2 changed files with 341 additions and 259 deletions

View File

@ -192,10 +192,112 @@ impl WorkerHandleServer {
}
}
/// Service worker.
/// Server worker.
///
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker {
worker: Worker,
state: WorkerState,
}
impl ServerWorker {
fn new(
rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>,
counter: WorkerCounter,
services: Box<[WorkerService]>,
factories: Box<[Box<dyn InternalServiceFactory>]>,
shutdown_timeout: Duration,
) -> Self {
Self {
worker: Worker {
rx,
rx2,
counter,
services,
factories,
shutdown_timeout,
},
state: WorkerState::default(),
}
}
pub(crate) fn start(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
waker_queue: WakerQueue,
config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let counter = Counter::new(config.max_concurrent_connections);
let counter_clone = counter.clone();
// every worker runs in it's own arbiter.
// use a custom tokio runtime builder to change the settings of runtime.
Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
})
.spawn(async move {
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
})
.collect::<Vec<_>>();
// a second spawn to run !Send future tasks.
spawn(async move {
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
return;
}
};
let counter = WorkerCounter::new(idx, waker_queue, counter_clone);
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker::new(
rx,
rx2,
counter,
services,
factories.into_boxed_slice(),
config.shutdown_timeout,
));
});
});
handle_pair(idx, tx1, tx2, counter)
}
}
struct Worker {
// UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<Conn>,
@ -203,10 +305,208 @@ pub(crate) struct ServerWorker {
counter: WorkerCounter,
services: Box<[WorkerService]>,
factories: Box<[Box<dyn InternalServiceFactory>]>,
state: WorkerState,
shutdown_timeout: Duration,
}
impl Worker {
/// `Conn` message and worker/service state switch handler
fn poll_running(&mut self, running: &mut Running, cx: &mut Context<'_>) -> Poll<()> {
// loop only exit on `Conn` channel shutdown or any poll method returns Pending state.
loop {
match *running {
Running::Unavailable => match self.poll_ready(cx) {
Ok(true) => {
*running = Running::Available;
}
Ok(false) => return Poll::Pending,
Err((token, idx)) => {
let restart = self.restart_service(token, idx);
*running = Running::Restart(restart);
}
},
Running::Restart(Restart {
factory_id,
token,
ref mut fut,
}) => {
let (token_new, service) =
ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| {
// Restart failure would result in a panic drop of ServerWorker.
// This would prevent busy loop of poll_running.
panic!(
"Can not restart {:?} service",
self.factories[factory_id].name(token)
)
});
assert_eq!(token, token_new);
trace!(
"Service {:?} has been restarted",
self.factories[factory_id].name(token)
);
self.services[token].created(service);
*running = Running::Unavailable;
}
// actively poll Conn channel and handle MioStream.
Running::Available => loop {
match self.poll_ready(cx) {
Ok(true) => {}
Ok(false) => {
trace!("Worker is unavailable");
*running = Running::Unavailable;
}
Err((token, idx)) => {
let restart = self.restart_service(token, idx);
*running = Running::Restart(restart);
}
}
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
Some(msg) => {
let guard = self.counter.guard();
let _ = self.services[msg.token].service.call((guard, msg.io));
}
None => return Poll::Ready(()),
};
},
}
}
}
/// `Stop` message handler.
///
/// Return Ready when worker should shutdown.
fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<Option<DelayShutdown>> {
match ready!(Pin::new(&mut self.rx2).poll_recv(cx)) {
Some(Stop { graceful, tx }) => {
self.rx2.close();
let num = self.counter.total();
if num == 0 {
info!("Shutting down worker, 0 connections");
let _ = tx.send(true);
Poll::Ready(None)
} else if graceful {
info!("Graceful worker shutdown, {} connections", num);
self.shutdown(false);
let shutdown = DelayShutdown {
timer: Box::pin(sleep(Duration::from_secs(1))),
start_from: Instant::now(),
tx,
};
Poll::Ready(Some(shutdown))
} else {
info!("Force shutdown worker, {} connections", num);
self.shutdown(true);
let _ = tx.send(false);
Poll::Ready(None)
}
}
None => Poll::Pending,
}
}
/// Check readiness of services.
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Result<bool, (usize, usize)> {
let mut ready = true;
for (idx, srv) in self.services.iter_mut().enumerate() {
if srv.status == WorkerServiceStatus::Available
|| srv.status == WorkerServiceStatus::Unavailable
{
match srv.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
if srv.status == WorkerServiceStatus::Unavailable {
trace!(
"Service {:?} is available",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Available;
}
}
Poll::Pending => {
ready = false;
if srv.status == WorkerServiceStatus::Available {
trace!(
"Service {:?} is unavailable",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Unavailable;
}
}
Poll::Ready(Err(_)) => {
error!(
"Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Failed;
return Err((idx, srv.factory));
}
}
}
}
Ok(ready)
}
/// Delay shutdown and drain all unhandled `Conn`.
fn poll_shutdown(
&mut self,
delay: &mut DelayShutdown,
running: &mut Running,
cx: &mut Context<'_>,
) -> Poll<bool> {
if self.counter.total() == 0 {
// Graceful shutdown.
Poll::Ready(true)
} else if delay.start_from.elapsed() >= self.shutdown_timeout {
// Timeout forceful shutdown.
Poll::Ready(false)
} else {
// Poll Running state and try to drain all `Conn` from channel.
let _ = self.poll_running(running, cx);
// Wait for 1 second.
ready!(delay.timer.as_mut().poll(cx));
// Reset timer and try again.
let time = Instant::now() + Duration::from_secs(1);
delay.timer.as_mut().reset(time);
delay.timer.as_mut().poll(cx).map(|_| false)
}
}
fn restart_service(&mut self, idx: usize, factory_id: usize) -> Restart {
let factory = &self.factories[factory_id];
trace!("Service {:?} failed, restarting", factory.name(idx));
self.services[idx].status = WorkerServiceStatus::Restarting;
Restart {
factory_id,
token: idx,
fut: factory.create(),
}
}
fn shutdown(&mut self, force: bool) {
self.services
.iter_mut()
.filter(|srv| srv.status == WorkerServiceStatus::Available)
.for_each(|srv| {
srv.status = if force {
WorkerServiceStatus::Stopped
} else {
WorkerServiceStatus::Stopping
};
});
}
}
struct WorkerService {
factory: usize,
status: WorkerServiceStatus,
@ -264,152 +564,21 @@ impl ServerWorkerConfig {
}
}
impl ServerWorker {
pub(crate) fn start(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
waker_queue: WakerQueue,
config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
enum WorkerState {
Running(Running),
DelayShutdown(DelayShutdown, Running),
}
let counter = Counter::new(config.max_concurrent_connections);
let counter_clone = counter.clone();
// every worker runs in it's own arbiter.
// use a custom tokio runtime builder to change the settings of runtime.
Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
})
.spawn(async move {
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
})
.collect::<Vec<_>>();
// a second spawn to run !Send future tasks.
spawn(async move {
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
return;
}
};
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
rx,
rx2,
services,
counter: WorkerCounter::new(idx, waker_queue, counter_clone),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
});
});
});
handle_pair(idx, tx1, tx2, counter)
}
fn restart_service(&mut self, idx: usize, factory_id: usize) {
let factory = &self.factories[factory_id];
trace!("Service {:?} failed, restarting", factory.name(idx));
self.services[idx].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(Restart {
factory_id,
token: idx,
fut: factory.create(),
});
}
fn shutdown(&mut self, force: bool) {
self.services
.iter_mut()
.filter(|srv| srv.status == WorkerServiceStatus::Available)
.for_each(|srv| {
srv.status = if force {
WorkerServiceStatus::Stopped
} else {
WorkerServiceStatus::Stopping
};
});
}
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (usize, usize)> {
let mut ready = true;
for (idx, srv) in self.services.iter_mut().enumerate() {
if srv.status == WorkerServiceStatus::Available
|| srv.status == WorkerServiceStatus::Unavailable
{
match srv.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
if srv.status == WorkerServiceStatus::Unavailable {
trace!(
"Service {:?} is available",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Available;
}
}
Poll::Pending => {
ready = false;
if srv.status == WorkerServiceStatus::Available {
trace!(
"Service {:?} is unavailable",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Unavailable;
}
}
Poll::Ready(Err(_)) => {
error!(
"Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Failed;
return Err((idx, srv.factory));
}
}
}
}
Ok(ready)
impl Default for WorkerState {
fn default() -> Self {
Self::Running(Running::Unavailable)
}
}
enum WorkerState {
enum Running {
Available,
Unavailable,
Restarting(Restart),
Shutdown(Shutdown),
Restart(Restart),
}
struct Restart {
@ -418,22 +587,16 @@ struct Restart {
fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>,
}
// Shutdown keep states necessary for server shutdown:
// Keep states necessary for delayed server shutdown:
// Sleep for interval check the shutdown progress.
// Instant for the start time of shutdown.
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
struct Shutdown {
// Sender for send back the shutdown outcome(force/grace) to `Stop` caller.
struct DelayShutdown {
timer: Pin<Box<Sleep>>,
start_from: Instant,
tx: oneshot::Sender<bool>,
}
impl Default for WorkerState {
fn default() -> Self {
Self::Unavailable
}
}
impl Drop for ServerWorker {
fn drop(&mut self) {
// Stop the Arbiter ServerWorker runs on on drop.
@ -447,115 +610,36 @@ impl Future for ServerWorker {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
// `StopWorker` message handler
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
{
let num = this.counter.total();
if num == 0 {
info!("Shutting down worker, 0 connections");
let _ = tx.send(true);
return Poll::Ready(());
} else if graceful {
info!("Graceful worker shutdown, {} connections", num);
this.shutdown(false);
this.state = WorkerState::Shutdown(Shutdown {
timer: Box::pin(sleep(Duration::from_secs(1))),
start_from: Instant::now(),
tx,
});
} else {
info!("Force shutdown worker, {} connections", num);
this.shutdown(true);
let _ = tx.send(false);
return Poll::Ready(());
// poll Stop message first.
match this.worker.poll_stop(cx) {
Poll::Pending => {}
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(delay)) => {
// Take running state and pass it to DelayShutdown.
// During shutdown there could be unhandled `Conn` message left in channel.
// They should be drainned and worker would try to handle them all until
// delay shutdown timeout met.
this.state = match mem::take(&mut this.state) {
WorkerState::Running(running) => WorkerState::DelayShutdown(delay, running),
_ => unreachable!(
"Duplicate worker::Stop message sent to one worker::ServerWorker."
),
};
}
}
match this.state {
WorkerState::Unavailable => match this.check_readiness(cx) {
Ok(true) => {
this.state = WorkerState::Available;
self.poll(cx)
WorkerState::Running(ref mut running) => this.worker.poll_running(running, cx),
WorkerState::DelayShutdown(ref mut delay, ref mut running) => {
let is_graceful = ready!(this.worker.poll_shutdown(delay, running, cx));
// Report back shutdown outcome to caller.
if let WorkerState::DelayShutdown(delay, _) = mem::take(&mut this.state) {
let _ = delay.tx.send(is_graceful);
}
Ok(false) => Poll::Pending,
Err((token, idx)) => {
this.restart_service(token, idx);
self.poll(cx)
}
},
WorkerState::Restarting(ref mut restart) => {
let factory_id = restart.factory_id;
let token = restart.token;
let (token_new, service) = ready!(restart.fut.as_mut().poll(cx))
.unwrap_or_else(|_| {
panic!(
"Can not restart {:?} service",
this.factories[factory_id].name(token)
)
});
assert_eq!(token, token_new);
trace!(
"Service {:?} has been restarted",
this.factories[factory_id].name(token)
);
this.services[token].created(service);
this.state = WorkerState::Unavailable;
self.poll(cx)
Poll::Ready(())
}
WorkerState::Shutdown(ref mut shutdown) => {
// Wait for 1 second.
ready!(shutdown.timer.as_mut().poll(cx));
if this.counter.total() == 0 {
// Graceful shutdown.
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(true);
}
Poll::Ready(())
} else if shutdown.start_from.elapsed() >= this.shutdown_timeout {
// Timeout forceful shutdown.
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(false);
}
Poll::Ready(())
} else {
// Reset timer and wait for 1 second.
let time = Instant::now() + Duration::from_secs(1);
shutdown.timer.as_mut().reset(time);
shutdown.timer.as_mut().poll(cx)
}
}
// actively poll stream and handle worker command
WorkerState::Available => loop {
match this.check_readiness(cx) {
Ok(true) => {}
Ok(false) => {
trace!("Worker is unavailable");
this.state = WorkerState::Unavailable;
return self.poll(cx);
}
Err((token, idx)) => {
this.restart_service(token, idx);
return self.poll(cx);
}
}
// handle incoming io stream
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
Some(msg) => {
let guard = this.counter.guard();
let _ = this.services[msg.token].service.call((guard, msg.io));
}
None => return Poll::Ready(()),
};
},
}
}
}

View File

@ -28,8 +28,7 @@ fn test_bind() {
.workers(1)
.disable_signals()
.shutdown_timeout(3600)
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))?
.run();
let _ = tx.send(srv.clone());
@ -51,15 +50,14 @@ fn test_listen() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let lst = net::TcpListener::bind(addr).unwrap();
actix_rt::System::new().block_on(async {
let lst = net::TcpListener::bind(addr).unwrap();
let srv = Server::build()
.disable_signals()
.shutdown_timeout(3600)
.workers(1)
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))?
.run();
let _ = tx.send(srv.clone());