use boxed future for signals

This commit is contained in:
fakeshadow 2020-12-23 09:14:31 +08:00
parent aa8d33a22b
commit ab4ad69ad2
2 changed files with 42 additions and 59 deletions

View File

@ -30,6 +30,6 @@ actix-utils = { path = "actix-utils" }
actix-router = { path = "router" }
bytestring = { path = "string" }
bytes = { git = "https://github.com/tokio-rs/bytes.git" }
# FIXME: remove these overrides
tokio = { git = "https://github.com/tokio-rs/tokio.git" }
tokio-util = { git = "https://github.com/tokio-rs/tokio.git" }

View File

@ -1,8 +1,9 @@
// use std::future::Future;
// use std::pin::Pin;
// use std::task::{Context, Poll};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::server::Server;
use crate::LocalBoxFuture;
/// Different types of process signals
#[allow(dead_code)]
@ -19,11 +20,11 @@ pub(crate) enum Signal {
}
pub(crate) struct Signals {
// srv: Server,
// #[cfg(not(unix))]
// stream: Pin<Box<dyn Future<Output = std::io::Result<()>>>>,
// #[cfg(unix)]
// streams: Vec<(Signal, actix_rt::signal::unix::Signal)>,
srv: Server,
#[cfg(not(unix))]
signals: LocalBoxFuture<'static, std::io::Result<()>>,
#[cfg(unix)]
signals: Vec<(Signal, LocalBoxFuture<'static, ()>)>,
}
impl Signals {
@ -46,15 +47,15 @@ impl Signals {
(unix::SignalKind::quit(), Signal::Quit),
];
let mut signals = Vec::new();
for (kind, sig) in sig_map.iter() {
match unix::signal(*kind) {
Ok(mut stream) => {
let sig = *sig;
let srv = srv.clone();
actix_rt::spawn(async move {
stream.recv().await;
srv.signal(sig);
});
let fut = Box::pin(async move {
let _ = stream.recv().await;
}) as _;
signals.push((*sig, fut));
}
Err(e) => log::error!(
"Can not initialize stream handler for {:?} err: {}",
@ -64,51 +65,33 @@ impl Signals {
}
}
// for (kind, sig) in sig_map.iter() {
// match unix::signal(*kind) {
// Ok(stream) => streams.push((*sig, stream)),
// Err(e) => log::error!(
// "Can not initialize stream handler for {:?} err: {}",
// sig,
// e
// ),
// }
// }
//
// actix_rt::spawn(Signals { srv, streams });
actix_rt::spawn(Signals { srv, signals });
}
}
}
// impl Future for Signals {
// type Output = ();
//
// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// #[cfg(not(unix))]
// match Pin::new(&mut self.stream).poll(cx) {
// Poll::Ready(_) => {
// self.srv.signal(Signal::Int);
// Poll::Ready(())
// }
// Poll::Pending => Poll::Pending,
// }
// #[cfg(unix)]
// {
// use futures_util::stream::Stream;
//
// for idx in 0..self.streams.len() {
// loop {
// match Pin::new(&mut self.streams[idx].1).poll_next(cx) {
// Poll::Ready(None) => return Poll::Ready(()),
// Poll::Pending => break,
// Poll::Ready(Some(_)) => {
// let sig = self.streams[idx].0;
// self.srv.signal(sig);
// }
// }
// }
// }
// Poll::Pending
// }
// }
// }
impl Future for Signals {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(unix))]
match Pin::new(&mut self.stream).poll(cx) {
Poll::Ready(_) => {
self.srv.signal(Signal::Int);
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
#[cfg(unix)]
{
for (sig, fut) in self.signals.iter_mut() {
if fut.as_mut().poll(cx).is_ready() {
let sig = *sig;
self.srv.signal(sig);
return Poll::Ready(());
}
}
Poll::Pending
}
}
}