localwork

This commit is contained in:
fakeshadow 2020-11-21 20:36:58 +08:00
parent f8189462c6
commit fb05714f08
4 changed files with 65 additions and 78 deletions

View File

@ -9,9 +9,8 @@ use actix_rt::time::{sleep_until, Instant};
use actix_rt::{spawn, System}; use actix_rt::{spawn, System};
use futures_channel::mpsc::{unbounded, UnboundedReceiver}; use futures_channel::mpsc::{unbounded, UnboundedReceiver};
use futures_channel::oneshot; use futures_channel::oneshot;
use futures_util::future::ready; use futures_util::future::join_all;
use futures_util::stream::FuturesUnordered; use futures_util::stream::Stream;
use futures_util::{ready, stream::Stream, FutureExt, StreamExt};
use log::{error, info}; use log::{error, info};
use crate::accept::AcceptLoop; use crate::accept::AcceptLoop;
@ -360,31 +359,28 @@ impl ServerBuilder {
// stop workers // stop workers
if !self.handles.is_empty() && graceful { if !self.handles.is_empty() && graceful {
spawn( let iter = self
self.handles .handles
.iter() .iter()
.map(move |worker| worker.1.stop(graceful)) .map(move |worker| worker.1.stop(graceful));
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>() let fut = join_all(iter);
.then(move |_| {
if let Some(tx) = completion { spawn(async move {
let _ = tx.send(()); let _ = fut.await;
} if let Some(tx) = completion {
for tx in notify { let _ = tx.send(());
let _ = tx.send(()); }
} for tx in notify {
if exit { let _ = tx.send(());
spawn(async { }
sleep_until( if exit {
Instant::now() + Duration::from_millis(300), spawn(async {
) sleep_until(Instant::now() + Duration::from_millis(300)).await;
.await; System::current().stop();
System::current().stop(); });
}); }
} })
ready(())
}),
)
} else { } else {
// we need to stop system if server was spawned // we need to stop system if server was spawned
if self.exit { if self.exit {
@ -439,11 +435,9 @@ impl Future for ServerBuilder {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop { loop {
match ready!(Pin::new(&mut self.cmd).poll_next(cx)) { match Pin::new(&mut self.cmd).poll_next(cx) {
Some(it) => self.as_mut().get_mut().handle_cmd(it), Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it),
None => { _ => return Poll::Pending,
return Poll::Pending;
}
} }
} }
} }

View File

@ -3,7 +3,6 @@ use std::net::SocketAddr;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use actix_rt::spawn;
use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory}; use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory};
use actix_utils::counter::CounterGuard; use actix_utils::counter::CounterGuard;
use futures_util::future::{ready, Ready}; use futures_util::future::{ready, Ready};
@ -75,23 +74,20 @@ where
} }
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req { if let ServerMessage::Connect(stream) = req {
ServerMessage::Connect(stream) => match FromStream::from_mio(stream) { match FromStream::from_mio(stream) {
Ok(stream) => { Ok(stream) => {
let f = self.service.call(stream); let f = self.service.call(stream);
spawn(async move { actix_rt::spawn(async move {
let _ = f.await; let _ = f.await;
drop(guard); drop(guard);
}); });
ready(Ok(()))
} }
Err(e) => { Err(e) => error!("Can not convert to an async tcp stream: {}", e),
error!("Can not convert to an async tcp stream: {}", e); };
ready(Err(()))
}
},
_ => ready(Ok(())),
} }
ready(Ok(()))
} }
} }
@ -149,7 +145,7 @@ where
Box::pin(async move { Box::pin(async move {
match fut.await { match fut.await {
Ok(inner) => { Ok(inner) => {
let service = Box::new(StreamService::new(inner)) as BoxedServerService; let service = Box::new(StreamService::new(inner)) as _;
Ok(vec![(token, service)]) Ok(vec![(token, service)])
} }
Err(_) => Err(()), Err(_) => Err(()),

View File

@ -3,8 +3,6 @@ use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures_util::future::lazy;
use crate::server::Server; use crate::server::Server;
/// Different types of process signals /// Different types of process signals
@ -31,41 +29,39 @@ pub(crate) struct Signals {
impl Signals { impl Signals {
pub(crate) fn start(srv: Server) -> io::Result<()> { pub(crate) fn start(srv: Server) -> io::Result<()> {
actix_rt::spawn(lazy(|_| { #[cfg(not(unix))]
#[cfg(not(unix))] {
{ actix_rt::spawn(Signals {
actix_rt::spawn(Signals { srv,
srv, stream: Box::pin(actix_rt::signal::ctrl_c()),
stream: Box::pin(actix_rt::signal::ctrl_c()), });
}); }
} #[cfg(unix)]
#[cfg(unix)] {
{ use actix_rt::signal::unix;
use actix_rt::signal::unix;
let mut streams = Vec::new(); let mut streams = Vec::new();
let sig_map = [ let sig_map = [
(unix::SignalKind::interrupt(), Signal::Int), (unix::SignalKind::interrupt(), Signal::Int),
(unix::SignalKind::hangup(), Signal::Hup), (unix::SignalKind::hangup(), Signal::Hup),
(unix::SignalKind::terminate(), Signal::Term), (unix::SignalKind::terminate(), Signal::Term),
(unix::SignalKind::quit(), Signal::Quit), (unix::SignalKind::quit(), Signal::Quit),
]; ];
for (kind, sig) in sig_map.iter() { for (kind, sig) in sig_map.iter() {
match unix::signal(*kind) { match unix::signal(*kind) {
Ok(stream) => streams.push((*sig, stream)), Ok(stream) => streams.push((*sig, stream)),
Err(e) => log::error!( Err(e) => log::error!(
"Can not initialize stream handler for {:?} err: {}", "Can not initialize stream handler for {:?} err: {}",
sig, sig,
e e
), ),
}
} }
actix_rt::spawn(Signals { srv, streams })
} }
}));
actix_rt::spawn(Signals { srv, streams })
}
Ok(()) Ok(())
} }

View File

@ -11,7 +11,8 @@ use actix_utils::counter::Counter;
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures_channel::oneshot; use futures_channel::oneshot;
use futures_util::future::join_all; use futures_util::future::join_all;
use futures_util::{stream::Stream, TryFutureExt}; use futures_util::stream::Stream;
use futures_util::TryFutureExt;
use log::{error, info, trace}; use log::{error, info, trace};
use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage}; use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage};