From 8aade720ede221ec0962f52692fe4a1f00c182ee Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 4 Apr 2021 12:34:52 -0700 Subject: [PATCH 1/2] Refactor WorkerState::Shutdown (#310) --- actix-server/src/worker.rs | 149 +++++++++++++++++++++---------------- 1 file changed, 84 insertions(+), 65 deletions(-) diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index bd28ccda..9409dfb4 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,17 +1,27 @@ -use std::future::Future; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::time::Duration; +use std::{ + future::Future, + mem, + pin::Pin, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::Duration, +}; -use actix_rt::time::{sleep, Sleep}; -use actix_rt::{spawn, Arbiter}; +use actix_rt::{ + spawn, + time::{sleep, Instant, Sleep}, + Arbiter, +}; use actix_utils::counter::Counter; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot, +}; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; @@ -132,7 +142,7 @@ pub(crate) struct ServerWorker { conns: Counter, factories: Vec>, state: WorkerState, - config: ServerWorkerConfig, + shutdown_timeout: Duration, } struct WorkerService { @@ -211,12 +221,12 @@ impl ServerWorker { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { rx, rx2, + services: Default::default(), availability, factories, - config, - services: Vec::new(), + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, conns: conns.clone(), - state: WorkerState::Unavailable, }); let fut = wrk @@ -337,53 +347,61 @@ enum WorkerState { Token, LocalBoxFuture<'static, Result, ()>>, ), - Shutdown( - Pin>, - Pin>, - Option>, - ), + // Shutdown keep states necessary for server shutdown: + // Sleep for interval check the shutdown progress. + // Instant for the start time of shutdown. + // Sender for send back the shutdown outcome(force/grace) to StopCommand caller. + Shutdown(Pin>, Instant, oneshot::Sender), +} + +impl Default for WorkerState { + fn default() -> Self { + Self::Unavailable + } } impl Future for ServerWorker { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().get_mut(); + // `StopWorker` message handler if let Poll::Ready(Some(StopCommand { graceful, result })) = - Pin::new(&mut self.rx2).poll_recv(cx) + Pin::new(&mut this.rx2).poll_recv(cx) { - self.availability.set(false); + this.availability.set(false); let num = num_connections(); if num == 0 { info!("Shutting down worker, 0 connections"); let _ = result.send(true); return Poll::Ready(()); } else if graceful { - self.shutdown(false); info!("Graceful worker shutdown, {} connections", num); - self.state = WorkerState::Shutdown( - Box::pin(sleep(Duration::from_secs(1))), - Box::pin(sleep(self.config.shutdown_timeout)), - Some(result), - ); + this.shutdown(false); + + let timer = Box::pin(sleep(Duration::from_secs(1))); + let start_from = Instant::now(); + this.state = WorkerState::Shutdown(timer, start_from, result); } else { info!("Force shutdown worker, {} connections", num); - self.shutdown(true); + this.shutdown(true); + let _ = result.send(false); return Poll::Ready(()); } } - match self.state { - WorkerState::Unavailable => match self.check_readiness(cx) { + match this.state { + WorkerState::Unavailable => match this.check_readiness(cx) { Ok(true) => { - self.state = WorkerState::Available; - self.availability.set(true); + this.state = WorkerState::Available; + this.availability.set(true); self.poll(cx) } Ok(false) => Poll::Pending, Err((token, idx)) => { - self.restart_service(token, idx); + this.restart_service(token, idx); self.poll(cx) } }, @@ -391,7 +409,7 @@ impl Future for ServerWorker { let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { panic!( "Can not restart {:?} service", - self.factories[idx].name(token) + this.factories[idx].name(token) ) }); @@ -403,60 +421,61 @@ impl Future for ServerWorker { trace!( "Service {:?} has been restarted", - self.factories[idx].name(token) + this.factories[idx].name(token) ); - self.services[token.0].created(service); - self.state = WorkerState::Unavailable; + this.services[token.0].created(service); + this.state = WorkerState::Unavailable; self.poll(cx) } - WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => { - let num = num_connections(); - if num == 0 { - let _ = tx.take().unwrap().send(true); + WorkerState::Shutdown(ref mut timer, ref start_from, _) => { + // Wait for 1 second. + ready!(timer.as_mut().poll(cx)); + + if num_connections() == 0 { + // Graceful shutdown. + if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) { + let _ = sender.send(true); + } Arbiter::current().stop(); - return Poll::Ready(()); - } - - // check graceful timeout - if Pin::new(t2).poll(cx).is_ready() { - let _ = tx.take().unwrap().send(false); - self.shutdown(true); + Poll::Ready(()) + } else if start_from.elapsed() >= this.shutdown_timeout { + // Timeout forceful shutdown. + if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) { + let _ = sender.send(false); + } Arbiter::current().stop(); - return Poll::Ready(()); + Poll::Ready(()) + } else { + // Reset timer and wait for 1 second. + let time = Instant::now() + Duration::from_secs(1); + timer.as_mut().reset(time); + timer.as_mut().poll(cx) } - - // sleep for 1 second and then check again - if t1.as_mut().poll(cx).is_ready() { - *t1 = Box::pin(sleep(Duration::from_secs(1))); - let _ = t1.as_mut().poll(cx); - } - - Poll::Pending } // actively poll stream and handle worker command WorkerState::Available => loop { - match self.check_readiness(cx) { + match this.check_readiness(cx) { Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); - self.availability.set(false); - self.state = WorkerState::Unavailable; + this.availability.set(false); + this.state = WorkerState::Unavailable; return self.poll(cx); } Err((token, idx)) => { - self.restart_service(token, idx); - self.availability.set(false); + this.restart_service(token, idx); + this.availability.set(false); return self.poll(cx); } } - match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { // handle incoming io stream Some(WorkerCommand(msg)) => { - let guard = self.conns.get(); - let _ = self.services[msg.token.0].service.call((guard, msg.io)); + let guard = this.conns.get(); + let _ = this.services[msg.token.0].service.call((guard, msg.io)); } None => return Poll::Ready(()), }; From 3859e91799aae4a7fdaf667091ab1416a7e66f1e Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 4 Apr 2021 13:53:19 -0700 Subject: [PATCH 2/2] Use named type for WorkerState::Restarting and Shutdown (#317) --- actix-server/src/worker.rs | 89 +++++++++++++++++++++++--------------- 1 file changed, 53 insertions(+), 36 deletions(-) diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 9409dfb4..6417dd0b 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -34,7 +34,7 @@ pub(crate) struct WorkerCommand(Conn); /// and `false` if some connections still alive. pub(crate) struct StopCommand { graceful: bool, - result: oneshot::Sender, + tx: oneshot::Sender, } #[derive(Debug)] @@ -98,8 +98,8 @@ impl WorkerHandle { } pub fn stop(&self, graceful: bool) -> oneshot::Receiver { - let (result, rx) = oneshot::channel(); - let _ = self.tx2.send(StopCommand { graceful, result }); + let (tx, rx) = oneshot::channel(); + let _ = self.tx2.send(StopCommand { graceful, tx }); rx } } @@ -221,7 +221,7 @@ impl ServerWorker { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { rx, rx2, - services: Default::default(), + services: Vec::new(), availability, factories, state: Default::default(), @@ -272,11 +272,15 @@ impl ServerWorker { WorkerHandle::new(idx, tx1, tx2, avail) } - fn restart_service(&mut self, token: Token, idx: usize) { - let factory = &self.factories[idx]; + fn restart_service(&mut self, token: Token, factory_id: usize) { + let factory = &self.factories[factory_id]; trace!("Service {:?} failed, restarting", factory.name(token)); self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = WorkerState::Restarting(idx, token, factory.create()); + self.state = WorkerState::Restarting(Restart { + factory_id, + token, + fut: factory.create(), + }); } fn shutdown(&mut self, force: bool) { @@ -342,16 +346,24 @@ impl ServerWorker { enum WorkerState { Available, Unavailable, - Restarting( - usize, - Token, - LocalBoxFuture<'static, Result, ()>>, - ), - // Shutdown keep states necessary for server shutdown: - // Sleep for interval check the shutdown progress. - // Instant for the start time of shutdown. - // Sender for send back the shutdown outcome(force/grace) to StopCommand caller. - Shutdown(Pin>, Instant, oneshot::Sender), + Restarting(Restart), + Shutdown(Shutdown), +} + +struct Restart { + factory_id: usize, + token: Token, + fut: LocalBoxFuture<'static, Result, ()>>, +} + +// Shutdown keep states necessary for server shutdown: +// Sleep for interval check the shutdown progress. +// Instant for the start time of shutdown. +// Sender for send back the shutdown outcome(force/grace) to StopCommand caller. +struct Shutdown { + timer: Pin>, + start_from: Instant, + tx: oneshot::Sender, } impl Default for WorkerState { @@ -367,27 +379,29 @@ impl Future for ServerWorker { let this = self.as_mut().get_mut(); // `StopWorker` message handler - if let Poll::Ready(Some(StopCommand { graceful, result })) = + if let Poll::Ready(Some(StopCommand { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) { this.availability.set(false); let num = num_connections(); if num == 0 { info!("Shutting down worker, 0 connections"); - let _ = result.send(true); + let _ = tx.send(true); return Poll::Ready(()); } else if graceful { info!("Graceful worker shutdown, {} connections", num); this.shutdown(false); - let timer = Box::pin(sleep(Duration::from_secs(1))); - let start_from = Instant::now(); - this.state = WorkerState::Shutdown(timer, start_from, result); + this.state = WorkerState::Shutdown(Shutdown { + timer: Box::pin(sleep(Duration::from_secs(1))), + start_from: Instant::now(), + tx, + }); } else { info!("Force shutdown worker, {} connections", num); this.shutdown(true); - let _ = result.send(false); + let _ = tx.send(false); return Poll::Ready(()); } } @@ -405,11 +419,14 @@ impl Future for ServerWorker { self.poll(cx) } }, - WorkerState::Restarting(idx, token, ref mut fut) => { - let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { + WorkerState::Restarting(ref mut restart) => { + let factory_id = restart.factory_id; + let token = restart.token; + + let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| { panic!( "Can not restart {:?} service", - this.factories[idx].name(token) + this.factories[factory_id].name(token) ) }); @@ -421,7 +438,7 @@ impl Future for ServerWorker { trace!( "Service {:?} has been restarted", - this.factories[idx].name(token) + this.factories[factory_id].name(token) ); this.services[token.0].created(service); @@ -429,29 +446,29 @@ impl Future for ServerWorker { self.poll(cx) } - WorkerState::Shutdown(ref mut timer, ref start_from, _) => { + WorkerState::Shutdown(ref mut shutdown) => { // Wait for 1 second. - ready!(timer.as_mut().poll(cx)); + ready!(shutdown.timer.as_mut().poll(cx)); if num_connections() == 0 { // Graceful shutdown. - if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) { - let _ = sender.send(true); + if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { + let _ = shutdown.tx.send(true); } Arbiter::current().stop(); Poll::Ready(()) - } else if start_from.elapsed() >= this.shutdown_timeout { + } else if shutdown.start_from.elapsed() >= this.shutdown_timeout { // Timeout forceful shutdown. - if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) { - let _ = sender.send(false); + if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { + let _ = shutdown.tx.send(false); } Arbiter::current().stop(); Poll::Ready(()) } else { // Reset timer and wait for 1 second. let time = Instant::now() + Duration::from_secs(1); - timer.as_mut().reset(time); - timer.as_mut().poll(cx) + shutdown.timer.as_mut().reset(time); + shutdown.timer.as_mut().poll(cx) } } // actively poll stream and handle worker command