From 46fa3d55e3f64d744f80d04c58ec1a081f018212 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 15 Apr 2021 16:31:58 +0800 Subject: [PATCH] fix signal shutdown --- actix-server/src/accept.rs | 7 +-- actix-server/src/server.rs | 119 +++++++++++++++++-------------------- actix-server/src/worker.rs | 7 ++- 3 files changed, 63 insertions(+), 70 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index ed027610..820fbcd9 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -227,7 +227,6 @@ impl Accept { } /// Return true to notify `Accept::poll_with` to return. - #[cold] fn handle_waker(&mut self, sockets: &mut Slab) -> bool { // This is a loop because interests for command from previous version was // a loop that would try to drain the command channel. It's yet unknown @@ -460,9 +459,7 @@ impl Accept { fn accept(&mut self, sockets: &mut Slab, token: usize) { loop { - let info = sockets - .get_mut(token) - .expect("ServerSocketInfo is removed from Slab"); + let info = &mut sockets[token]; match info.lst.accept() { Ok(io) => { @@ -492,11 +489,13 @@ impl Accept { } } + #[inline(always)] fn next(&self) -> &WorkerHandleAccept { &self.handles[self.next] } /// Set next worker handle that would accept connection. + #[inline(always)] fn set_next(&mut self) { self.next = (self.next + 1) % self.handles.len(); } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 99e4385e..d58938ff 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -101,6 +101,38 @@ impl Server { } impl ServerInner { + fn handle_signal(&mut self, signal: Signal) -> Option> { + // Signals support + // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system + match signal { + Signal::Int => { + info!("SIGINT received, exiting"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + Signal::Term => { + info!("SIGTERM received, stopping"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: true, + completion: None, + }) + } + Signal::Quit => { + info!("SIGQUIT received, exiting"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + _ => None, + } + } + fn handle_cmd(&mut self, item: ServerCommand) -> Option> { match item { ServerCommand::Pause(tx) => { @@ -113,37 +145,6 @@ impl ServerInner { let _ = tx.send(()); None } - ServerCommand::Signal(sig) => { - // Signals support - // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system - match sig { - Signal::Int => { - info!("SIGINT received, exiting"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - Signal::Term => { - info!("SIGTERM received, stopping"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: true, - completion: None, - }) - } - Signal::Quit => { - info!("SIGQUIT received, exiting"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - _ => None, - } - } ServerCommand::Stop { graceful, completion, @@ -154,39 +155,29 @@ impl ServerInner { self.waker_queue.wake(WakerInterest::Stop); // stop workers - if !self.handles.is_empty() && graceful { - let iter = self - .handles - .iter() - .map(move |worker| worker.stop(graceful)) - .collect::>(); + let stop = self + .handles + .iter() + .map(move |worker| worker.stop(graceful)) + .collect::>(); - // TODO: this async block can return io::Error. - Some(Box::pin(async move { - for handle in iter { + // TODO: this async block can return io::Error. + Some(Box::pin(async move { + if graceful { + for handle in stop { let _ = handle.await; } - if let Some(tx) = completion { - let _ = tx.send(()); - } - if exit { - sleep(Duration::from_millis(300)).await; - System::try_current().as_ref().map(System::stop); - } - })) - } else { - // we need to stop system if server was spawned - // TODO: this async block can return io::Error. - Some(Box::pin(async move { - if exit { - sleep(Duration::from_millis(300)).await; - System::try_current().as_ref().map(System::stop); - } - if let Some(tx) = completion { - let _ = tx.send(()); - } - })) - } + } + + if let Some(tx) = completion { + let _ = tx.send(()); + } + + if exit { + sleep(Duration::from_millis(300)).await; + System::try_current().as_ref().map(System::stop); + } + })) } ServerCommand::WorkerFaulted(idx) => { assert!(self.handles.iter().any(|handle| handle.idx == idx)); @@ -199,9 +190,8 @@ impl ServerInner { .iter() .map(|service| service.clone_factory()) .collect(); - let res = ServerWorker::start(idx, factories, availability, self.worker_config); - match res { + match ServerWorker::start(idx, factories, availability, self.worker_config) { Ok((handle_accept, handle_server)) => { *self .handles @@ -229,7 +219,7 @@ impl Future for Server { // poll signals first. remove it on resolve. if let Some(ref mut signals) = inner.signals { if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { - inner.on_stop_task = inner.handle_cmd(ServerCommand::Signal(signal)); + inner.on_stop_task = inner.handle_signal(signal); inner.signals = None; } } @@ -258,7 +248,6 @@ pub(crate) enum ServerCommand { WorkerFaulted(usize), Pause(oneshot::Sender<()>), Resume(oneshot::Sender<()>), - Signal(Signal), /// Whether to try and shut down gracefully Stop { graceful: bool, diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index ca5c2314..f249c119 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -272,6 +272,7 @@ impl ServerWorker { .enable_all() .max_blocking_threads(config.max_blocking_threads) .build(); + let res = rt.and_then(|rt| { let fut = async { for (idx, factory) in factories.iter().enumerate() { @@ -312,7 +313,11 @@ impl ServerWorker { shutdown_timeout: config.shutdown_timeout, }; - local.block_on(&rt, async { worker.await }); + let handle = local.spawn_local(worker); + + local.block_on(&rt, async { + let _ = handle.await; + }); } Err(e) => f(Some(e)), }