From fb05714f08cbcecb602a9cf7dcab59035b5e4532 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 21 Nov 2020 20:36:58 +0800 Subject: [PATCH] localwork --- actix-server/src/builder.rs | 60 +++++++++++++++++-------------------- actix-server/src/service.rs | 20 +++++-------- actix-server/src/signals.rs | 60 +++++++++++++++++-------------------- actix-server/src/worker.rs | 3 +- 4 files changed, 65 insertions(+), 78 deletions(-) diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 2c35864c..e2a38d92 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -9,9 +9,8 @@ use actix_rt::time::{sleep_until, Instant}; use actix_rt::{spawn, System}; use futures_channel::mpsc::{unbounded, UnboundedReceiver}; use futures_channel::oneshot; -use futures_util::future::ready; -use futures_util::stream::FuturesUnordered; -use futures_util::{ready, stream::Stream, FutureExt, StreamExt}; +use futures_util::future::join_all; +use futures_util::stream::Stream; use log::{error, info}; use crate::accept::AcceptLoop; @@ -360,31 +359,28 @@ impl ServerBuilder { // stop workers if !self.handles.is_empty() && graceful { - spawn( - self.handles - .iter() - .map(move |worker| worker.1.stop(graceful)) - .collect::>() - .collect::>() - .then(move |_| { - if let Some(tx) = completion { - let _ = tx.send(()); - } - for tx in notify { - let _ = tx.send(()); - } - if exit { - spawn(async { - sleep_until( - Instant::now() + Duration::from_millis(300), - ) - .await; - System::current().stop(); - }); - } - ready(()) - }), - ) + let iter = self + .handles + .iter() + .map(move |worker| worker.1.stop(graceful)); + + let fut = join_all(iter); + + spawn(async move { + let _ = fut.await; + if let Some(tx) = completion { + let _ = tx.send(()); + } + for tx in notify { + let _ = tx.send(()); + } + if exit { + spawn(async { + sleep_until(Instant::now() + Duration::from_millis(300)).await; + System::current().stop(); + }); + } + }) } else { // we need to stop system if server was spawned if self.exit { @@ -439,11 +435,9 @@ impl Future for ServerBuilder { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - match ready!(Pin::new(&mut self.cmd).poll_next(cx)) { - Some(it) => self.as_mut().get_mut().handle_cmd(it), - None => { - return Poll::Pending; - } + match Pin::new(&mut self.cmd).poll_next(cx) { + Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), + _ => return Poll::Pending, } } } diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 29359fb6..ca2a7c64 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -3,7 +3,6 @@ use std::net::SocketAddr; use std::task::{Context, Poll}; use std::time::Duration; -use actix_rt::spawn; use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory}; use actix_utils::counter::CounterGuard; use futures_util::future::{ready, Ready}; @@ -75,23 +74,20 @@ where } fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { - match req { - ServerMessage::Connect(stream) => match FromStream::from_mio(stream) { + if let ServerMessage::Connect(stream) = req { + match FromStream::from_mio(stream) { Ok(stream) => { let f = self.service.call(stream); - spawn(async move { + actix_rt::spawn(async move { let _ = f.await; drop(guard); }); - ready(Ok(())) } - Err(e) => { - error!("Can not convert to an async tcp stream: {}", e); - ready(Err(())) - } - }, - _ => ready(Ok(())), + Err(e) => error!("Can not convert to an async tcp stream: {}", e), + }; } + + ready(Ok(())) } } @@ -149,7 +145,7 @@ where Box::pin(async move { match fut.await { Ok(inner) => { - let service = Box::new(StreamService::new(inner)) as BoxedServerService; + let service = Box::new(StreamService::new(inner)) as _; Ok(vec![(token, service)]) } Err(_) => Err(()), diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index a677814b..b44504c5 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -3,8 +3,6 @@ use std::io; use std::pin::Pin; use std::task::{Context, Poll}; -use futures_util::future::lazy; - use crate::server::Server; /// Different types of process signals @@ -31,41 +29,39 @@ pub(crate) struct Signals { impl Signals { pub(crate) fn start(srv: Server) -> io::Result<()> { - actix_rt::spawn(lazy(|_| { - #[cfg(not(unix))] - { - actix_rt::spawn(Signals { - srv, - stream: Box::pin(actix_rt::signal::ctrl_c()), - }); - } - #[cfg(unix)] - { - use actix_rt::signal::unix; + #[cfg(not(unix))] + { + actix_rt::spawn(Signals { + srv, + stream: Box::pin(actix_rt::signal::ctrl_c()), + }); + } + #[cfg(unix)] + { + use actix_rt::signal::unix; - let mut streams = Vec::new(); + let mut streams = Vec::new(); - let sig_map = [ - (unix::SignalKind::interrupt(), Signal::Int), - (unix::SignalKind::hangup(), Signal::Hup), - (unix::SignalKind::terminate(), Signal::Term), - (unix::SignalKind::quit(), Signal::Quit), - ]; + let sig_map = [ + (unix::SignalKind::interrupt(), Signal::Int), + (unix::SignalKind::hangup(), Signal::Hup), + (unix::SignalKind::terminate(), Signal::Term), + (unix::SignalKind::quit(), Signal::Quit), + ]; - for (kind, sig) in sig_map.iter() { - match unix::signal(*kind) { - Ok(stream) => streams.push((*sig, stream)), - Err(e) => log::error!( - "Can not initialize stream handler for {:?} err: {}", - sig, - e - ), - } + for (kind, sig) in sig_map.iter() { + match unix::signal(*kind) { + Ok(stream) => streams.push((*sig, stream)), + Err(e) => log::error!( + "Can not initialize stream handler for {:?} err: {}", + sig, + e + ), } - - actix_rt::spawn(Signals { srv, streams }) } - })); + + actix_rt::spawn(Signals { srv, streams }) + } Ok(()) } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 54158492..1c8baf8c 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -11,7 +11,8 @@ use actix_utils::counter::Counter; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::oneshot; use futures_util::future::join_all; -use futures_util::{stream::Stream, TryFutureExt}; +use futures_util::stream::Stream; +use futures_util::TryFutureExt; use log::{error, info, trace}; use crate::service::{BoxedServerService, InternalServiceFactory, ServerMessage};