Merge branch 'master' into refactor/actix_server_worker_restart

This commit is contained in:
fakeshadow 2021-04-02 08:44:38 -07:00 committed by GitHub
commit 036900208c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 18 additions and 28 deletions

View File

@ -263,19 +263,16 @@ impl ServerWorker {
} }
fn shutdown(&mut self, force: bool) { fn shutdown(&mut self, force: bool) {
if force { self.services
self.services.iter_mut().for_each(|srv| { .iter_mut()
if srv.status == WorkerServiceStatus::Available { .filter(|srv| srv.status == WorkerServiceStatus::Available)
srv.status = WorkerServiceStatus::Stopped; .for_each(|srv| {
} srv.status = if force {
WorkerServiceStatus::Stopped
} else {
WorkerServiceStatus::Stopping
};
}); });
} else {
self.services.iter_mut().for_each(move |srv| {
if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopping;
}
});
}
} }
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> { fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
@ -356,18 +353,12 @@ impl Future for ServerWorker {
return Poll::Ready(()); return Poll::Ready(());
} else if graceful { } else if graceful {
self.shutdown(false); self.shutdown(false);
let num = num_connections(); info!("Graceful worker shutdown, {} connections", num);
if num != 0 { self.state = WorkerState::Shutdown(
info!("Graceful worker shutdown, {} connections", num); Box::pin(sleep(Duration::from_secs(1))),
self.state = WorkerState::Shutdown( Box::pin(sleep(self.config.shutdown_timeout)),
Box::pin(sleep(Duration::from_secs(1))), Some(result),
Box::pin(sleep(self.config.shutdown_timeout)), );
Some(result),
);
} else {
let _ = result.send(true);
return Poll::Ready(());
}
} else { } else {
info!("Force shutdown worker, {} connections", num); info!("Force shutdown worker, {} connections", num);
self.shutdown(true); self.shutdown(true);
@ -466,16 +457,15 @@ impl Future for ServerWorker {
} }
} }
match Pin::new(&mut self.rx).poll_recv(cx) { match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
// handle incoming io stream // handle incoming io stream
Poll::Ready(Some(WorkerCommand(msg))) => { Some(WorkerCommand(msg)) => {
let guard = self.conns.get(); let guard = self.conns.get();
let _ = self.services[msg.token.0] let _ = self.services[msg.token.0]
.service .service
.call((Some(guard), msg.io)); .call((Some(guard), msg.io));
} }
Poll::Pending => return Poll::Pending, None => return Poll::Ready(()),
Poll::Ready(None) => return Poll::Ready(()),
}; };
}, },
} }