diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 79f15b16..17535c05 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -192,10 +192,112 @@ impl WorkerHandleServer { } } -/// Service worker. +/// Server worker. /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. pub(crate) struct ServerWorker { + worker: Worker, + state: WorkerState, +} + +impl ServerWorker { + fn new( + rx: UnboundedReceiver, + rx2: UnboundedReceiver, + counter: WorkerCounter, + services: Box<[WorkerService]>, + factories: Box<[Box]>, + shutdown_timeout: Duration, + ) -> Self { + Self { + worker: Worker { + rx, + rx2, + counter, + services, + factories, + shutdown_timeout, + }, + state: WorkerState::default(), + } + } + + pub(crate) fn start( + idx: usize, + factories: Vec>, + waker_queue: WakerQueue, + config: ServerWorkerConfig, + ) -> (WorkerHandleAccept, WorkerHandleServer) { + let (tx1, rx) = unbounded_channel(); + let (tx2, rx2) = unbounded_channel(); + + let counter = Counter::new(config.max_concurrent_connections); + + let counter_clone = counter.clone(); + // every worker runs in it's own arbiter. + // use a custom tokio runtime builder to change the settings of runtime. + Arbiter::with_tokio_rt(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap() + }) + .spawn(async move { + let fut = factories + .iter() + .enumerate() + .map(|(idx, factory)| { + let fut = factory.create(); + async move { fut.await.map(|(t, s)| (idx, t, s)) } + }) + .collect::>(); + + // a second spawn to run !Send future tasks. + spawn(async move { + let res = join_all(fut) + .await + .into_iter() + .collect::, _>>(); + let services = match res { + Ok(res) => res + .into_iter() + .fold(Vec::new(), |mut services, (factory, token, service)| { + assert_eq!(token, services.len()); + services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); + services + }) + .into_boxed_slice(), + Err(e) => { + error!("Can not start worker: {:?}", e); + Arbiter::current().stop(); + return; + } + }; + + let counter = WorkerCounter::new(idx, waker_queue, counter_clone); + + // a third spawn to make sure ServerWorker runs as non boxed future. + spawn(ServerWorker::new( + rx, + rx2, + counter, + services, + factories.into_boxed_slice(), + config.shutdown_timeout, + )); + }); + }); + + handle_pair(idx, tx1, tx2, counter) + } +} + +struct Worker { // UnboundedReceiver should always be the first field. // It must be dropped as soon as ServerWorker dropping. rx: UnboundedReceiver, @@ -203,10 +305,208 @@ pub(crate) struct ServerWorker { counter: WorkerCounter, services: Box<[WorkerService]>, factories: Box<[Box]>, - state: WorkerState, shutdown_timeout: Duration, } +impl Worker { + /// `Conn` message and worker/service state switch handler + fn poll_running(&mut self, running: &mut Running, cx: &mut Context<'_>) -> Poll<()> { + // loop only exit on `Conn` channel shutdown or any poll method returns Pending state. + loop { + match *running { + Running::Unavailable => match self.poll_ready(cx) { + Ok(true) => { + *running = Running::Available; + } + Ok(false) => return Poll::Pending, + Err((token, idx)) => { + let restart = self.restart_service(token, idx); + *running = Running::Restart(restart); + } + }, + Running::Restart(Restart { + factory_id, + token, + ref mut fut, + }) => { + let (token_new, service) = + ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { + // Restart failure would result in a panic drop of ServerWorker. + // This would prevent busy loop of poll_running. + panic!( + "Can not restart {:?} service", + self.factories[factory_id].name(token) + ) + }); + + assert_eq!(token, token_new); + + trace!( + "Service {:?} has been restarted", + self.factories[factory_id].name(token) + ); + + self.services[token].created(service); + *running = Running::Unavailable; + } + // actively poll Conn channel and handle MioStream. + Running::Available => loop { + match self.poll_ready(cx) { + Ok(true) => {} + Ok(false) => { + trace!("Worker is unavailable"); + *running = Running::Unavailable; + } + Err((token, idx)) => { + let restart = self.restart_service(token, idx); + *running = Running::Restart(restart); + } + } + + match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + Some(msg) => { + let guard = self.counter.guard(); + let _ = self.services[msg.token].service.call((guard, msg.io)); + } + None => return Poll::Ready(()), + }; + }, + } + } + } + + /// `Stop` message handler. + /// + /// Return Ready when worker should shutdown. + fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll> { + match ready!(Pin::new(&mut self.rx2).poll_recv(cx)) { + Some(Stop { graceful, tx }) => { + self.rx2.close(); + + let num = self.counter.total(); + if num == 0 { + info!("Shutting down worker, 0 connections"); + let _ = tx.send(true); + Poll::Ready(None) + } else if graceful { + info!("Graceful worker shutdown, {} connections", num); + self.shutdown(false); + + let shutdown = DelayShutdown { + timer: Box::pin(sleep(Duration::from_secs(1))), + start_from: Instant::now(), + tx, + }; + + Poll::Ready(Some(shutdown)) + } else { + info!("Force shutdown worker, {} connections", num); + self.shutdown(true); + + let _ = tx.send(false); + Poll::Ready(None) + } + } + None => Poll::Pending, + } + } + + /// Check readiness of services. + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Result { + let mut ready = true; + for (idx, srv) in self.services.iter_mut().enumerate() { + if srv.status == WorkerServiceStatus::Available + || srv.status == WorkerServiceStatus::Unavailable + { + match srv.service.poll_ready(cx) { + Poll::Ready(Ok(_)) => { + if srv.status == WorkerServiceStatus::Unavailable { + trace!( + "Service {:?} is available", + self.factories[srv.factory].name(idx) + ); + srv.status = WorkerServiceStatus::Available; + } + } + Poll::Pending => { + ready = false; + + if srv.status == WorkerServiceStatus::Available { + trace!( + "Service {:?} is unavailable", + self.factories[srv.factory].name(idx) + ); + srv.status = WorkerServiceStatus::Unavailable; + } + } + Poll::Ready(Err(_)) => { + error!( + "Service {:?} readiness check returned error, restarting", + self.factories[srv.factory].name(idx) + ); + srv.status = WorkerServiceStatus::Failed; + return Err((idx, srv.factory)); + } + } + } + } + + Ok(ready) + } + + /// Delay shutdown and drain all unhandled `Conn`. + fn poll_shutdown( + &mut self, + delay: &mut DelayShutdown, + running: &mut Running, + cx: &mut Context<'_>, + ) -> Poll { + if self.counter.total() == 0 { + // Graceful shutdown. + Poll::Ready(true) + } else if delay.start_from.elapsed() >= self.shutdown_timeout { + // Timeout forceful shutdown. + Poll::Ready(false) + } else { + // Poll Running state and try to drain all `Conn` from channel. + let _ = self.poll_running(running, cx); + + // Wait for 1 second. + ready!(delay.timer.as_mut().poll(cx)); + + // Reset timer and try again. + let time = Instant::now() + Duration::from_secs(1); + delay.timer.as_mut().reset(time); + + delay.timer.as_mut().poll(cx).map(|_| false) + } + } + + fn restart_service(&mut self, idx: usize, factory_id: usize) -> Restart { + let factory = &self.factories[factory_id]; + trace!("Service {:?} failed, restarting", factory.name(idx)); + self.services[idx].status = WorkerServiceStatus::Restarting; + Restart { + factory_id, + token: idx, + fut: factory.create(), + } + } + + fn shutdown(&mut self, force: bool) { + self.services + .iter_mut() + .filter(|srv| srv.status == WorkerServiceStatus::Available) + .for_each(|srv| { + srv.status = if force { + WorkerServiceStatus::Stopped + } else { + WorkerServiceStatus::Stopping + }; + }); + } +} + struct WorkerService { factory: usize, status: WorkerServiceStatus, @@ -264,152 +564,21 @@ impl ServerWorkerConfig { } } -impl ServerWorker { - pub(crate) fn start( - idx: usize, - factories: Vec>, - waker_queue: WakerQueue, - config: ServerWorkerConfig, - ) -> (WorkerHandleAccept, WorkerHandleServer) { - let (tx1, rx) = unbounded_channel(); - let (tx2, rx2) = unbounded_channel(); +enum WorkerState { + Running(Running), + DelayShutdown(DelayShutdown, Running), +} - let counter = Counter::new(config.max_concurrent_connections); - - let counter_clone = counter.clone(); - // every worker runs in it's own arbiter. - // use a custom tokio runtime builder to change the settings of runtime. - Arbiter::with_tokio_rt(move || { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap() - }) - .spawn(async move { - let fut = factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { fut.await.map(|(t, s)| (idx, t, s)) } - }) - .collect::>(); - - // a second spawn to run !Send future tasks. - spawn(async move { - let res = join_all(fut) - .await - .into_iter() - .collect::, _>>(); - let services = match res { - Ok(res) => res - .into_iter() - .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token, services.len()); - services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - services - }) - .into_boxed_slice(), - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - return; - } - }; - - // a third spawn to make sure ServerWorker runs as non boxed future. - spawn(ServerWorker { - rx, - rx2, - services, - counter: WorkerCounter::new(idx, waker_queue, counter_clone), - factories: factories.into_boxed_slice(), - state: Default::default(), - shutdown_timeout: config.shutdown_timeout, - }); - }); - }); - - handle_pair(idx, tx1, tx2, counter) - } - - fn restart_service(&mut self, idx: usize, factory_id: usize) { - let factory = &self.factories[factory_id]; - trace!("Service {:?} failed, restarting", factory.name(idx)); - self.services[idx].status = WorkerServiceStatus::Restarting; - self.state = WorkerState::Restarting(Restart { - factory_id, - token: idx, - fut: factory.create(), - }); - } - - fn shutdown(&mut self, force: bool) { - self.services - .iter_mut() - .filter(|srv| srv.status == WorkerServiceStatus::Available) - .for_each(|srv| { - srv.status = if force { - WorkerServiceStatus::Stopped - } else { - WorkerServiceStatus::Stopping - }; - }); - } - - fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { - let mut ready = true; - for (idx, srv) in self.services.iter_mut().enumerate() { - if srv.status == WorkerServiceStatus::Available - || srv.status == WorkerServiceStatus::Unavailable - { - match srv.service.poll_ready(cx) { - Poll::Ready(Ok(_)) => { - if srv.status == WorkerServiceStatus::Unavailable { - trace!( - "Service {:?} is available", - self.factories[srv.factory].name(idx) - ); - srv.status = WorkerServiceStatus::Available; - } - } - Poll::Pending => { - ready = false; - - if srv.status == WorkerServiceStatus::Available { - trace!( - "Service {:?} is unavailable", - self.factories[srv.factory].name(idx) - ); - srv.status = WorkerServiceStatus::Unavailable; - } - } - Poll::Ready(Err(_)) => { - error!( - "Service {:?} readiness check returned error, restarting", - self.factories[srv.factory].name(idx) - ); - srv.status = WorkerServiceStatus::Failed; - return Err((idx, srv.factory)); - } - } - } - } - - Ok(ready) +impl Default for WorkerState { + fn default() -> Self { + Self::Running(Running::Unavailable) } } -enum WorkerState { +enum Running { Available, Unavailable, - Restarting(Restart), - Shutdown(Shutdown), + Restart(Restart), } struct Restart { @@ -418,22 +587,16 @@ struct Restart { fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>, } -// Shutdown keep states necessary for server shutdown: +// Keep states necessary for delayed server shutdown: // Sleep for interval check the shutdown progress. // Instant for the start time of shutdown. -// Sender for send back the shutdown outcome(force/grace) to StopCommand caller. -struct Shutdown { +// Sender for send back the shutdown outcome(force/grace) to `Stop` caller. +struct DelayShutdown { timer: Pin>, start_from: Instant, tx: oneshot::Sender, } -impl Default for WorkerState { - fn default() -> Self { - Self::Unavailable - } -} - impl Drop for ServerWorker { fn drop(&mut self) { // Stop the Arbiter ServerWorker runs on on drop. @@ -447,115 +610,36 @@ impl Future for ServerWorker { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.as_mut().get_mut(); - // `StopWorker` message handler - if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) - { - let num = this.counter.total(); - if num == 0 { - info!("Shutting down worker, 0 connections"); - let _ = tx.send(true); - return Poll::Ready(()); - } else if graceful { - info!("Graceful worker shutdown, {} connections", num); - this.shutdown(false); - - this.state = WorkerState::Shutdown(Shutdown { - timer: Box::pin(sleep(Duration::from_secs(1))), - start_from: Instant::now(), - tx, - }); - } else { - info!("Force shutdown worker, {} connections", num); - this.shutdown(true); - - let _ = tx.send(false); - return Poll::Ready(()); + // poll Stop message first. + match this.worker.poll_stop(cx) { + Poll::Pending => {} + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(delay)) => { + // Take running state and pass it to DelayShutdown. + // During shutdown there could be unhandled `Conn` message left in channel. + // They should be drainned and worker would try to handle them all until + // delay shutdown timeout met. + this.state = match mem::take(&mut this.state) { + WorkerState::Running(running) => WorkerState::DelayShutdown(delay, running), + _ => unreachable!( + "Duplicate worker::Stop message sent to one worker::ServerWorker." + ), + }; } } match this.state { - WorkerState::Unavailable => match this.check_readiness(cx) { - Ok(true) => { - this.state = WorkerState::Available; - self.poll(cx) + WorkerState::Running(ref mut running) => this.worker.poll_running(running, cx), + WorkerState::DelayShutdown(ref mut delay, ref mut running) => { + let is_graceful = ready!(this.worker.poll_shutdown(delay, running, cx)); + + // Report back shutdown outcome to caller. + if let WorkerState::DelayShutdown(delay, _) = mem::take(&mut this.state) { + let _ = delay.tx.send(is_graceful); } - Ok(false) => Poll::Pending, - Err((token, idx)) => { - this.restart_service(token, idx); - self.poll(cx) - } - }, - WorkerState::Restarting(ref mut restart) => { - let factory_id = restart.factory_id; - let token = restart.token; - let (token_new, service) = ready!(restart.fut.as_mut().poll(cx)) - .unwrap_or_else(|_| { - panic!( - "Can not restart {:?} service", - this.factories[factory_id].name(token) - ) - }); - - assert_eq!(token, token_new); - - trace!( - "Service {:?} has been restarted", - this.factories[factory_id].name(token) - ); - - this.services[token].created(service); - this.state = WorkerState::Unavailable; - - self.poll(cx) + Poll::Ready(()) } - WorkerState::Shutdown(ref mut shutdown) => { - // Wait for 1 second. - ready!(shutdown.timer.as_mut().poll(cx)); - - if this.counter.total() == 0 { - // Graceful shutdown. - if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { - let _ = shutdown.tx.send(true); - } - Poll::Ready(()) - } else if shutdown.start_from.elapsed() >= this.shutdown_timeout { - // Timeout forceful shutdown. - if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { - let _ = shutdown.tx.send(false); - } - Poll::Ready(()) - } else { - // Reset timer and wait for 1 second. - let time = Instant::now() + Duration::from_secs(1); - shutdown.timer.as_mut().reset(time); - shutdown.timer.as_mut().poll(cx) - } - } - // actively poll stream and handle worker command - WorkerState::Available => loop { - match this.check_readiness(cx) { - Ok(true) => {} - Ok(false) => { - trace!("Worker is unavailable"); - this.state = WorkerState::Unavailable; - return self.poll(cx); - } - Err((token, idx)) => { - this.restart_service(token, idx); - return self.poll(cx); - } - } - - // handle incoming io stream - match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { - Some(msg) => { - let guard = this.counter.guard(); - let _ = this.services[msg.token].service.call((guard, msg.io)); - } - None => return Poll::Ready(()), - }; - }, } } } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 83357be1..5d28998f 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -28,8 +28,7 @@ fn test_bind() { .workers(1) .disable_signals() .shutdown_timeout(3600) - .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() + .bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))? .run(); let _ = tx.send(srv.clone()); @@ -51,15 +50,14 @@ fn test_listen() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let lst = net::TcpListener::bind(addr).unwrap(); - actix_rt::System::new().block_on(async { + let lst = net::TcpListener::bind(addr).unwrap(); + let srv = Server::build() .disable_signals() .shutdown_timeout(3600) .workers(1) - .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() + .listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))? .run(); let _ = tx.send(srv.clone());