fix signal shutdown

This commit is contained in:
fakeshadow 2021-04-15 16:31:58 +08:00
parent c307b11b08
commit 46fa3d55e3
3 changed files with 63 additions and 70 deletions

View File

@ -227,7 +227,6 @@ impl Accept {
} }
/// Return true to notify `Accept::poll_with` to return. /// Return true to notify `Accept::poll_with` to return.
#[cold]
fn handle_waker(&mut self, sockets: &mut Slab<ServerSocketInfo>) -> bool { fn handle_waker(&mut self, sockets: &mut Slab<ServerSocketInfo>) -> bool {
// This is a loop because interests for command from previous version was // This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown // a loop that would try to drain the command channel. It's yet unknown
@ -460,9 +459,7 @@ impl Accept {
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) { fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
loop { loop {
let info = sockets let info = &mut sockets[token];
.get_mut(token)
.expect("ServerSocketInfo is removed from Slab");
match info.lst.accept() { match info.lst.accept() {
Ok(io) => { Ok(io) => {
@ -492,11 +489,13 @@ impl Accept {
} }
} }
#[inline(always)]
fn next(&self) -> &WorkerHandleAccept { fn next(&self) -> &WorkerHandleAccept {
&self.handles[self.next] &self.handles[self.next]
} }
/// Set next worker handle that would accept connection. /// Set next worker handle that would accept connection.
#[inline(always)]
fn set_next(&mut self) { fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len(); self.next = (self.next + 1) % self.handles.len();
} }

View File

@ -101,6 +101,38 @@ impl Server {
} }
impl ServerInner { impl ServerInner {
fn handle_signal(&mut self, signal: Signal) -> Option<BoxFuture<'static, ()>> {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match signal {
Signal::Int => {
info!("SIGINT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
Signal::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
_ => None,
}
}
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> { fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
match item { match item {
ServerCommand::Pause(tx) => { ServerCommand::Pause(tx) => {
@ -113,37 +145,6 @@ impl ServerInner {
let _ = tx.send(()); let _ = tx.send(());
None None
} }
ServerCommand::Signal(sig) => {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match sig {
Signal::Int => {
info!("SIGINT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
Signal::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
_ => None,
}
}
ServerCommand::Stop { ServerCommand::Stop {
graceful, graceful,
completion, completion,
@ -154,39 +155,29 @@ impl ServerInner {
self.waker_queue.wake(WakerInterest::Stop); self.waker_queue.wake(WakerInterest::Stop);
// stop workers // stop workers
if !self.handles.is_empty() && graceful { let stop = self
let iter = self .handles
.handles .iter()
.iter() .map(move |worker| worker.stop(graceful))
.map(move |worker| worker.stop(graceful)) .collect::<Vec<_>>();
.collect::<Vec<_>>();
// TODO: this async block can return io::Error. // TODO: this async block can return io::Error.
Some(Box::pin(async move { Some(Box::pin(async move {
for handle in iter { if graceful {
for handle in stop {
let _ = handle.await; let _ = handle.await;
} }
if let Some(tx) = completion { }
let _ = tx.send(());
} if let Some(tx) = completion {
if exit { let _ = tx.send(());
sleep(Duration::from_millis(300)).await; }
System::try_current().as_ref().map(System::stop);
} if exit {
})) sleep(Duration::from_millis(300)).await;
} else { System::try_current().as_ref().map(System::stop);
// we need to stop system if server was spawned }
// TODO: this async block can return io::Error. }))
Some(Box::pin(async move {
if exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
if let Some(tx) = completion {
let _ = tx.send(());
}
}))
}
} }
ServerCommand::WorkerFaulted(idx) => { ServerCommand::WorkerFaulted(idx) => {
assert!(self.handles.iter().any(|handle| handle.idx == idx)); assert!(self.handles.iter().any(|handle| handle.idx == idx));
@ -199,9 +190,8 @@ impl ServerInner {
.iter() .iter()
.map(|service| service.clone_factory()) .map(|service| service.clone_factory())
.collect(); .collect();
let res = ServerWorker::start(idx, factories, availability, self.worker_config);
match res { match ServerWorker::start(idx, factories, availability, self.worker_config) {
Ok((handle_accept, handle_server)) => { Ok((handle_accept, handle_server)) => {
*self *self
.handles .handles
@ -229,7 +219,7 @@ impl Future for Server {
// poll signals first. remove it on resolve. // poll signals first. remove it on resolve.
if let Some(ref mut signals) = inner.signals { if let Some(ref mut signals) = inner.signals {
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
inner.on_stop_task = inner.handle_cmd(ServerCommand::Signal(signal)); inner.on_stop_task = inner.handle_signal(signal);
inner.signals = None; inner.signals = None;
} }
} }
@ -258,7 +248,6 @@ pub(crate) enum ServerCommand {
WorkerFaulted(usize), WorkerFaulted(usize),
Pause(oneshot::Sender<()>), Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>), Resume(oneshot::Sender<()>),
Signal(Signal),
/// Whether to try and shut down gracefully /// Whether to try and shut down gracefully
Stop { Stop {
graceful: bool, graceful: bool,

View File

@ -272,6 +272,7 @@ impl ServerWorker {
.enable_all() .enable_all()
.max_blocking_threads(config.max_blocking_threads) .max_blocking_threads(config.max_blocking_threads)
.build(); .build();
let res = rt.and_then(|rt| { let res = rt.and_then(|rt| {
let fut = async { let fut = async {
for (idx, factory) in factories.iter().enumerate() { for (idx, factory) in factories.iter().enumerate() {
@ -312,7 +313,11 @@ impl ServerWorker {
shutdown_timeout: config.shutdown_timeout, shutdown_timeout: config.shutdown_timeout,
}; };
local.block_on(&rt, async { worker.await }); let handle = local.spawn_local(worker);
local.block_on(&rt, async {
let _ = handle.await;
});
} }
Err(e) => f(Some(e)), Err(e) => f(Some(e)),
} }