From ab4ad69ad22e938f41d13aa7410fe60603eb445a Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 23 Dec 2020 09:14:31 +0800 Subject: [PATCH] use boxed future for signals --- Cargo.toml | 2 +- actix-server/src/signals.rs | 99 +++++++++++++++---------------------- 2 files changed, 42 insertions(+), 59 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 63941d7d..99cde3e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,6 @@ actix-utils = { path = "actix-utils" } actix-router = { path = "router" } bytestring = { path = "string" } -bytes = { git = "https://github.com/tokio-rs/bytes.git" } +# FIXME: remove these overrides tokio = { git = "https://github.com/tokio-rs/tokio.git" } tokio-util = { git = "https://github.com/tokio-rs/tokio.git" } \ No newline at end of file diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index 7a85b9a8..ed16e86f 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -1,8 +1,9 @@ -// use std::future::Future; -// use std::pin::Pin; -// use std::task::{Context, Poll}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use crate::server::Server; +use crate::LocalBoxFuture; /// Different types of process signals #[allow(dead_code)] @@ -19,11 +20,11 @@ pub(crate) enum Signal { } pub(crate) struct Signals { - // srv: Server, -// #[cfg(not(unix))] -// stream: Pin>>>, -// #[cfg(unix)] -// streams: Vec<(Signal, actix_rt::signal::unix::Signal)>, + srv: Server, + #[cfg(not(unix))] + signals: LocalBoxFuture<'static, std::io::Result<()>>, + #[cfg(unix)] + signals: Vec<(Signal, LocalBoxFuture<'static, ()>)>, } impl Signals { @@ -46,15 +47,15 @@ impl Signals { (unix::SignalKind::quit(), Signal::Quit), ]; + let mut signals = Vec::new(); + for (kind, sig) in sig_map.iter() { match unix::signal(*kind) { Ok(mut stream) => { - let sig = *sig; - let srv = srv.clone(); - actix_rt::spawn(async move { - stream.recv().await; - srv.signal(sig); - }); + let fut = Box::pin(async move { + let _ = stream.recv().await; + }) as _; + signals.push((*sig, fut)); } Err(e) => log::error!( "Can not initialize stream handler for {:?} err: {}", @@ -64,51 +65,33 @@ impl Signals { } } - // for (kind, sig) in sig_map.iter() { - // match unix::signal(*kind) { - // Ok(stream) => streams.push((*sig, stream)), - // Err(e) => log::error!( - // "Can not initialize stream handler for {:?} err: {}", - // sig, - // e - // ), - // } - // } - // - // actix_rt::spawn(Signals { srv, streams }); + actix_rt::spawn(Signals { srv, signals }); } } } -// impl Future for Signals { -// type Output = (); -// -// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { -// #[cfg(not(unix))] -// match Pin::new(&mut self.stream).poll(cx) { -// Poll::Ready(_) => { -// self.srv.signal(Signal::Int); -// Poll::Ready(()) -// } -// Poll::Pending => Poll::Pending, -// } -// #[cfg(unix)] -// { -// use futures_util::stream::Stream; -// -// for idx in 0..self.streams.len() { -// loop { -// match Pin::new(&mut self.streams[idx].1).poll_next(cx) { -// Poll::Ready(None) => return Poll::Ready(()), -// Poll::Pending => break, -// Poll::Ready(Some(_)) => { -// let sig = self.streams[idx].0; -// self.srv.signal(sig); -// } -// } -// } -// } -// Poll::Pending -// } -// } -// } +impl Future for Signals { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + #[cfg(not(unix))] + match Pin::new(&mut self.stream).poll(cx) { + Poll::Ready(_) => { + self.srv.signal(Signal::Int); + Poll::Ready(()) + } + Poll::Pending => Poll::Pending, + } + #[cfg(unix)] + { + for (sig, fut) in self.signals.iter_mut() { + if fut.as_mut().poll(cx).is_ready() { + let sig = *sig; + self.srv.signal(sig); + return Poll::Ready(()); + } + } + Poll::Pending + } + } +}