From c8affa95fe3787ba3a1be5205f5780453d36a69a Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 4 Feb 2021 07:35:48 +0800 Subject: [PATCH] simplify server futures. reduce spawned tasks --- actix-server/src/accept.rs | 77 ++++---------- actix-server/src/builder.rs | 202 +++++++++++++++++++++++------------- actix-server/src/server.rs | 9 +- actix-server/src/signals.rs | 33 +++--- 4 files changed, 165 insertions(+), 156 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index a52184d9..ac960ae8 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -27,54 +27,8 @@ struct ServerSocketInfo { timeout: Option, } -/// Accept loop would live with `ServerBuilder`. -/// -/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to -/// `Accept` and `Worker`. -/// -/// It would also listen to `ServerCommand` and push interests to `WakerQueue`. -pub(crate) struct AcceptLoop { - srv: Option, - poll: Option, - waker: WakerQueue, -} - -impl AcceptLoop { - pub fn new(srv: Server) -> Self { - let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); - let waker = WakerQueue::new(poll.registry()) - .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); - - Self { - srv: Some(srv), - poll: Some(poll), - waker, - } - } - - pub(crate) fn waker_owned(&self) -> WakerQueue { - self.waker.clone() - } - - pub fn wake(&self, i: WakerInterest) { - self.waker.wake(i); - } - - pub(crate) fn start( - &mut self, - socks: Vec<(Token, MioListener)>, - handles: Vec, - ) { - let srv = self.srv.take().expect("Can not re-use AcceptInfo"); - let poll = self.poll.take().unwrap(); - let waker = self.waker.clone(); - - Accept::start(poll, waker, socks, srv, handles); - } -} - /// poll instance of the server. -struct Accept { +pub(crate) struct Accept { poll: Poll, waker: WakerQueue, handles: Vec, @@ -97,13 +51,23 @@ fn connection_error(e: &io::Error) -> bool { } impl Accept { - pub(crate) fn start( - poll: Poll, - waker: WakerQueue, - socks: Vec<(Token, MioListener)>, - srv: Server, - handles: Vec, - ) { + pub(crate) fn start( + sockets: Vec<(Token, MioListener)>, + server: Server, + worker_factory: F, + ) -> WakerQueue + where + F: FnOnce(&WakerQueue) -> Vec, + { + // construct poll instance and it's waker + let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); + let waker = WakerQueue::new(poll.registry()) + .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); + let waker_clone = waker.clone(); + + // construct workers and collect handles. + let handles = worker_factory(&waker); + // Accept runs in its own thread and would want to spawn additional futures to current // actix system. let sys = System::current(); @@ -112,10 +76,13 @@ impl Accept { .spawn(move || { System::set_current(sys); let (mut accept, sockets) = - Accept::new_with_sockets(poll, waker, socks, handles, srv); + Accept::new_with_sockets(poll, waker, sockets, handles, server); accept.poll_with(sockets); }) .unwrap(); + + // return waker to server builder. + waker_clone } fn new_with_sockets( diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 7290f9dd..fdaf94f4 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -7,11 +7,12 @@ use std::{io, mem}; use actix_rt::net::TcpStream; use actix_rt::time::{sleep_until, Instant}; use actix_rt::{self as rt, System}; +use futures_core::future::BoxFuture; use log::{error, info}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::oneshot; -use crate::accept::AcceptLoop; +use crate::accept::Accept; use crate::config::{ConfiguredService, ServiceConfig}; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; @@ -20,23 +21,20 @@ use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::worker::{self, ServerWorker, WorkerAvailability, WorkerHandle}; -use crate::{join_all, Token}; +use crate::Token; /// Server builder pub struct ServerBuilder { threads: usize, token: Token, backlog: u32, - handles: Vec<(usize, WorkerHandle)>, services: Vec>, sockets: Vec<(Token, String, MioListener)>, - accept: AcceptLoop, exit: bool, shutdown_timeout: Duration, no_signals: bool, cmd: UnboundedReceiver, server: Server, - notify: Vec>, } impl Default for ServerBuilder { @@ -54,16 +52,13 @@ impl ServerBuilder { ServerBuilder { threads: num_cpus::get(), token: Token::default(), - handles: Vec::new(), services: Vec::new(), sockets: Vec::new(), - accept: AcceptLoop::new(server.clone()), backlog: 2048, exit: false, shutdown_timeout: Duration::from_secs(30), no_signals: false, cmd: rx, - notify: Vec::new(), server, } } @@ -259,56 +254,94 @@ impl ServerBuilder { } else { info!("Starting {} workers", self.threads); - // start workers - let handles = (0..self.threads) - .map(|idx| { - let handle = self.start_worker(idx, self.accept.waker_owned()); - self.handles.push((idx, handle.clone())); - - handle + let sockets = mem::take(&mut self.sockets) + .into_iter() + .map(|t| { + info!("Starting \"{}\" service on {}", t.1, t.2); + (t.0, t.2) }) .collect(); - // start accept thread - for sock in &self.sockets { - info!("Starting \"{}\" service on {}", sock.1, sock.2); - } - self.accept.start( - mem::take(&mut self.sockets) - .into_iter() - .map(|t| (t.0, t.2)) - .collect(), - handles, + // collect worker handles on start. + let mut handles = Vec::new(); + + // start accept thread. return waker_queue for wake up it. + let waker_queue = Accept::start( + sockets, + self.server.clone(), + // closure for construct worker and return it's handler. + |waker| { + (0..self.threads) + .map(|idx| { + // start workers + let avail = WorkerAvailability::new(waker.clone()); + let services = + self.services.iter().map(|v| v.clone_factory()).collect(); + let handle = ServerWorker::start( + idx, + services, + avail, + self.shutdown_timeout, + ); + handles.push((idx, handle.clone())); + handle + }) + .collect() + }, ); - // handle signals - if !self.no_signals { - Signals::start(self.server.clone()); - } + // construct signals future. + let signals = if !self.no_signals { + Some(Signals::new()) + } else { + None + }; - // start http server actor - let server = self.server.clone(); - rt::spawn(self); - server + let server_future = ServerFuture { + cmd: self.cmd, + handles, + services: self.services, + notify: Vec::new(), + exit: self.exit, + shutdown_timeout: self.shutdown_timeout, + signals, + on_stop_task: None, + waker_queue, + }; + + // spawn server future. + rt::spawn(server_future); + + self.server } } +} - fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerHandle { - let avail = WorkerAvailability::new(waker); - let services = self.services.iter().map(|v| v.clone_factory()).collect(); +/// `ServerFuture` when awaited or spawned would listen to signal and message from `Server`. +struct ServerFuture { + cmd: UnboundedReceiver, + handles: Vec<(usize, WorkerHandle)>, + services: Vec>, + notify: Vec>, + exit: bool, + shutdown_timeout: Duration, + signals: Option, + on_stop_task: Option>, + waker_queue: WakerQueue, +} - ServerWorker::start(idx, services, avail, self.shutdown_timeout) - } - - fn handle_cmd(&mut self, item: ServerCommand) { +impl ServerFuture { + fn handle_cmd(&mut self, item: ServerCommand) -> Option> { match item { ServerCommand::Pause(tx) => { - self.accept.wake(WakerInterest::Pause); + self.waker_queue.wake(WakerInterest::Pause); let _ = tx.send(()); + None } ServerCommand::Resume(tx) => { - self.accept.wake(WakerInterest::Resume); + self.waker_queue.wake(WakerInterest::Resume); let _ = tx.send(()); + None } ServerCommand::Signal(sig) => { // Signals support @@ -338,11 +371,12 @@ impl ServerBuilder { completion: None, }) } - _ => (), + _ => None, } } ServerCommand::Notify(tx) => { self.notify.push(tx); + None } ServerCommand::Stop { graceful, @@ -351,7 +385,7 @@ impl ServerBuilder { let exit = self.exit; // stop accept thread - self.accept.wake(WakerInterest::Stop); + self.waker_queue.wake(WakerInterest::Stop); let notify = std::mem::take(&mut self.notify); // stop workers @@ -360,12 +394,12 @@ impl ServerBuilder { .handles .iter() .map(move |worker| worker.1.stop(graceful)) - .collect(); + .collect::>(); - let fut = join_all(iter); - - rt::spawn(async move { - let _ = fut.await; + Some(Box::pin(async move { + for handle in iter { + let _ = handle.await; + } if let Some(tx) = completion { let _ = tx.send(()); } @@ -373,26 +407,25 @@ impl ServerBuilder { let _ = tx.send(()); } if exit { - rt::spawn(async { - sleep_until(Instant::now() + Duration::from_millis(300)).await; - System::current().stop(); - }); - } - }); - } else { - // we need to stop system if server was spawned - if self.exit { - rt::spawn(async { sleep_until(Instant::now() + Duration::from_millis(300)).await; System::current().stop(); - }); - } - if let Some(tx) = completion { - let _ = tx.send(()); - } - for tx in notify { - let _ = tx.send(()); - } + } + })) + } else { + // we need to stop system if server was spawned + let exit = self.exit; + Some(Box::pin(async move { + if exit { + sleep_until(Instant::now() + Duration::from_millis(300)).await; + System::current().stop(); + } + if let Some(tx) = completion { + let _ = tx.send(()); + } + for tx in notify { + let _ = tx.send(()); + } + })) } } ServerCommand::WorkerFaulted(idx) => { @@ -419,22 +452,47 @@ impl ServerBuilder { break; } - let handle = self.start_worker(new_idx, self.accept.waker_owned()); + let avail = WorkerAvailability::new(self.waker_queue.clone()); + let services = self.services.iter().map(|v| v.clone_factory()).collect(); + let handle = + ServerWorker::start(new_idx, services, avail, self.shutdown_timeout); + self.handles.push((new_idx, handle.clone())); - self.accept.wake(WakerInterest::Worker(handle)); + self.waker_queue.wake(WakerInterest::Worker(handle)); } + None } } } } -impl Future for ServerBuilder { +impl Future for ServerFuture { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().get_mut(); + + // poll signals first. remove it on resolve. + if let Some(ref mut signals) = this.signals { + if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { + this.on_stop_task = this.handle_cmd(ServerCommand::Signal(signal)); + this.signals = None; + // poll another round for trying on_stop_task. + return self.poll(cx); + } + } + + // actively poll command channel and handle command. loop { - match Pin::new(&mut self.cmd).poll_recv(cx) { - Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), + // got on stop task. resolve it exclusively and exit. + if let Some(ref mut fut) = this.on_stop_task { + return fut.as_mut().poll(cx); + } + + match Pin::new(&mut this.cmd).poll_recv(cx) { + Poll::Ready(Some(it)) => { + this.on_stop_task = this.handle_cmd(it); + } _ => return Poll::Pending, } } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 6b0d0aea..74228b3f 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -40,10 +40,6 @@ impl Server { ServerBuilder::default() } - pub(crate) fn signal(&self, sig: Signal) { - let _ = self.0.send(ServerCommand::Signal(sig)); - } - pub(crate) fn worker_faulted(&self, idx: usize) { let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); } @@ -104,9 +100,6 @@ impl Future for Server { this.1 = Some(rx); } - match Pin::new(this.1.as_mut().unwrap()).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => Poll::Ready(Ok(())), - } + Pin::new(this.1.as_mut().unwrap()).poll(cx).map(|_| Ok(())) } } diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index ea1de47e..23cc8986 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -2,9 +2,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use futures_core::future::LocalBoxFuture; - -use crate::server::Server; +use futures_core::future::BoxFuture; /// Different types of process signals #[allow(dead_code)] @@ -21,21 +19,19 @@ pub(crate) enum Signal { } pub(crate) struct Signals { - srv: Server, #[cfg(not(unix))] - signals: LocalBoxFuture<'static, std::io::Result<()>>, + signals: BoxFuture<'static, std::io::Result<()>>, #[cfg(unix)] - signals: Vec<(Signal, LocalBoxFuture<'static, ()>)>, + signals: Vec<(Signal, BoxFuture<'static, ()>)>, } impl Signals { - pub(crate) fn start(srv: Server) { + pub(crate) fn new() -> Self { #[cfg(not(unix))] { - actix_rt::spawn(Signals { - srv, + Signals { signals: Box::pin(actix_rt::signal::ctrl_c()), - }); + } } #[cfg(unix)] { @@ -66,30 +62,25 @@ impl Signals { } } - actix_rt::spawn(Signals { srv, signals }); + Signals { signals } } } } impl Future for Signals { - type Output = (); + type Output = Signal; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { #[cfg(not(unix))] - match self.signals.as_mut().poll(cx) { - Poll::Ready(_) => { - self.srv.signal(Signal::Int); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, + { + self.signals.as_mut().poll(cx).map(|_| Signal::Int) } + #[cfg(unix)] { for (sig, fut) in self.signals.iter_mut() { if fut.as_mut().poll(cx).is_ready() { - let sig = *sig; - self.srv.signal(sig); - return Poll::Ready(()); + return Poll::Ready(*sig); } } Poll::Pending