diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index c01273b2..fac9202e 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,17 +1,27 @@ -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::Duration; +use std::{ + future::Future, + mem, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::Duration, +}; -use actix_rt::time::{sleep, Sleep}; -use actix_rt::{spawn, Arbiter}; +use actix_rt::{ + spawn, + time::{sleep, Instant, Sleep}, + Arbiter, +}; use actix_utils::counter::Counter; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot, +}; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; @@ -24,7 +34,7 @@ pub(crate) struct WorkerCommand(Conn); /// and `false` if some connections still alive. pub(crate) struct StopCommand { graceful: bool, - result: oneshot::Sender, + tx: oneshot::Sender, } #[derive(Debug)] @@ -67,8 +77,8 @@ impl WorkerHandle { } pub fn stop(&self, graceful: bool) -> oneshot::Receiver { - let (result, rx) = oneshot::channel(); - let _ = self.tx2.send(StopCommand { graceful, result }); + let (tx, rx) = oneshot::channel(); + let _ = self.tx2.send(StopCommand { graceful, tx }); rx } } @@ -111,7 +121,7 @@ pub(crate) struct ServerWorker { conns: Counter, factories: Vec>, state: WorkerState, - config: ServerWorkerConfig, + shutdown_timeout: Duration, } struct WorkerService { @@ -196,12 +206,12 @@ impl ServerWorker { let mut wrk = ServerWorker { rx, rx2, - availability, - factories, - config, services: Vec::new(), + availability, conns: Counter::new(config.max_concurrent_connections), - state: WorkerState::Unavailable, + factories, + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, }; let fut = wrk @@ -247,11 +257,15 @@ impl ServerWorker { WorkerHandle::new(idx, tx1, tx2, avail) } - fn restart_service(&mut self, token: Token, idx: usize) { - let factory = &self.factories[idx]; + fn restart_service(&mut self, token: Token, factory_id: usize) { + let factory = &self.factories[factory_id]; trace!("Service {:?} failed, restarting", factory.name(token)); self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = WorkerState::Restarting(idx, token, factory.create()); + self.state = WorkerState::Restarting(Restart { + factory_id, + token, + fut: factory.create(), + }); } fn shutdown(&mut self, force: bool) { @@ -317,16 +331,30 @@ impl ServerWorker { enum WorkerState { Available, Unavailable, - Restarting( - usize, - Token, - LocalBoxFuture<'static, Result, ()>>, - ), - Shutdown( - Pin>, - Pin>, - Option>, - ), + Restarting(Restart), + Shutdown(Shutdown), +} + +struct Restart { + factory_id: usize, + token: Token, + fut: LocalBoxFuture<'static, Result, ()>>, +} + +// Shutdown keep states necessary for server shutdown: +// Sleep for interval check the shutdown progress. +// Instant for the start time of shutdown. +// Sender for send back the shutdown outcome(force/grace) to StopCommand caller. +struct Shutdown { + timer: Pin>, + start_from: Instant, + tx: oneshot::Sender, +} + +impl Default for WorkerState { + fn default() -> Self { + Self::Unavailable + } } impl Future for ServerWorker { @@ -334,28 +362,31 @@ impl Future for ServerWorker { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut().get_mut(); + // `StopWorker` message handler - if let Poll::Ready(Some(StopCommand { graceful, result })) = + if let Poll::Ready(Some(StopCommand { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) { this.availability.set(false); let num = this.conns.total(); if num == 0 { info!("Shutting down worker, 0 connections"); - let _ = result.send(true); + let _ = tx.send(true); return Poll::Ready(()); } else if graceful { - this.shutdown(false); info!("Graceful worker shutdown, {} connections", num); - this.state = WorkerState::Shutdown( - Box::pin(sleep(Duration::from_secs(1))), - Box::pin(sleep(this.config.shutdown_timeout)), - Some(result), - ); + this.shutdown(false); + + this.state = WorkerState::Shutdown(Shutdown { + timer: Box::pin(sleep(Duration::from_secs(1))), + start_from: Instant::now(), + tx, + }); } else { info!("Force shutdown worker, {} connections", num); this.shutdown(true); - let _ = result.send(false); + + let _ = tx.send(false); return Poll::Ready(()); } } @@ -369,15 +400,18 @@ impl Future for ServerWorker { } Ok(false) => Poll::Pending, Err((token, idx)) => { - self.restart_service(token, idx); + this.restart_service(token, idx); self.poll(cx) } }, - WorkerState::Restarting(idx, token, ref mut fut) => { - let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { + WorkerState::Restarting(ref mut restart) => { + let factory_id = restart.factory_id; + let token = restart.token; + + let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| { panic!( "Can not restart {:?} service", - this.factories[idx].name(token) + this.factories[factory_id].name(token) ) }); @@ -389,7 +423,7 @@ impl Future for ServerWorker { trace!( "Service {:?} has been restarted", - this.factories[idx].name(token) + this.factories[factory_id].name(token) ); this.services[token.0].created(service); @@ -397,28 +431,30 @@ impl Future for ServerWorker { self.poll(cx) } - WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => { + WorkerState::Shutdown(ref mut shutdown) => { + // Wait for 1 second. + ready!(shutdown.timer.as_mut().poll(cx)); + if this.conns.total() == 0 { - let _ = tx.take().unwrap().send(true); + // Graceful shutdown. + if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { + let _ = shutdown.tx.send(true); + } Arbiter::current().stop(); - return Poll::Ready(()); - } - - // check graceful timeout - if Pin::new(t2).poll(cx).is_ready() { - let _ = tx.take().unwrap().send(false); - this.shutdown(true); + Poll::Ready(()) + } else if shutdown.start_from.elapsed() >= this.shutdown_timeout { + // Timeout forceful shutdown. + if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { + let _ = shutdown.tx.send(false); + } Arbiter::current().stop(); - return Poll::Ready(()); + Poll::Ready(()) + } else { + // Reset timer and wait for 1 second. + let time = Instant::now() + Duration::from_secs(1); + shutdown.timer.as_mut().reset(time); + shutdown.timer.as_mut().poll(cx) } - - // sleep for 1 second and then check again - if t1.as_mut().poll(cx).is_ready() { - *t1 = Box::pin(sleep(Duration::from_secs(1))); - let _ = t1.as_mut().poll(cx); - } - - Poll::Pending } // actively poll stream and handle worker command WorkerState::Available => loop {