simplify server futures. reduce spawned tasks

This commit is contained in:
fakeshadow 2021-02-04 07:35:48 +08:00
parent 4a8693d000
commit c8affa95fe
4 changed files with 165 additions and 156 deletions

View File

@ -27,54 +27,8 @@ struct ServerSocketInfo {
timeout: Option<Instant>, timeout: Option<Instant>,
} }
/// Accept loop would live with `ServerBuilder`.
///
/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to
/// `Accept` and `Worker`.
///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
pub(crate) struct AcceptLoop {
srv: Option<Server>,
poll: Option<Poll>,
waker: WakerQueue,
}
impl AcceptLoop {
pub fn new(srv: Server) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
Self {
srv: Some(srv),
poll: Some(poll),
waker,
}
}
pub(crate) fn waker_owned(&self) -> WakerQueue {
self.waker.clone()
}
pub fn wake(&self, i: WakerInterest) {
self.waker.wake(i);
}
pub(crate) fn start(
&mut self,
socks: Vec<(Token, MioListener)>,
handles: Vec<WorkerHandle>,
) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap();
let waker = self.waker.clone();
Accept::start(poll, waker, socks, srv, handles);
}
}
/// poll instance of the server. /// poll instance of the server.
struct Accept { pub(crate) struct Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
handles: Vec<WorkerHandle>, handles: Vec<WorkerHandle>,
@ -97,13 +51,23 @@ fn connection_error(e: &io::Error) -> bool {
} }
impl Accept { impl Accept {
pub(crate) fn start( pub(crate) fn start<F>(
poll: Poll, sockets: Vec<(Token, MioListener)>,
waker: WakerQueue, server: Server,
socks: Vec<(Token, MioListener)>, worker_factory: F,
srv: Server, ) -> WakerQueue
handles: Vec<WorkerHandle>, where
) { F: FnOnce(&WakerQueue) -> Vec<WorkerHandle>,
{
// construct poll instance and it's waker
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
let waker_clone = waker.clone();
// construct workers and collect handles.
let handles = worker_factory(&waker);
// Accept runs in its own thread and would want to spawn additional futures to current // Accept runs in its own thread and would want to spawn additional futures to current
// actix system. // actix system.
let sys = System::current(); let sys = System::current();
@ -112,10 +76,13 @@ impl Accept {
.spawn(move || { .spawn(move || {
System::set_current(sys); System::set_current(sys);
let (mut accept, sockets) = let (mut accept, sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv); Accept::new_with_sockets(poll, waker, sockets, handles, server);
accept.poll_with(sockets); accept.poll_with(sockets);
}) })
.unwrap(); .unwrap();
// return waker to server builder.
waker_clone
} }
fn new_with_sockets( fn new_with_sockets(

View File

@ -7,11 +7,12 @@ use std::{io, mem};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_rt::time::{sleep_until, Instant}; use actix_rt::time::{sleep_until, Instant};
use actix_rt::{self as rt, System}; use actix_rt::{self as rt, System};
use futures_core::future::BoxFuture;
use log::{error, info}; use log::{error, info};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use crate::accept::AcceptLoop; use crate::accept::Accept;
use crate::config::{ConfiguredService, ServiceConfig}; use crate::config::{ConfiguredService, ServiceConfig};
use crate::server::{Server, ServerCommand}; use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
@ -20,23 +21,20 @@ use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{self, ServerWorker, WorkerAvailability, WorkerHandle}; use crate::worker::{self, ServerWorker, WorkerAvailability, WorkerHandle};
use crate::{join_all, Token}; use crate::Token;
/// Server builder /// Server builder
pub struct ServerBuilder { pub struct ServerBuilder {
threads: usize, threads: usize,
token: Token, token: Token,
backlog: u32, backlog: u32,
handles: Vec<(usize, WorkerHandle)>,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, String, MioListener)>, sockets: Vec<(Token, String, MioListener)>,
accept: AcceptLoop,
exit: bool, exit: bool,
shutdown_timeout: Duration, shutdown_timeout: Duration,
no_signals: bool, no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>, cmd: UnboundedReceiver<ServerCommand>,
server: Server, server: Server,
notify: Vec<oneshot::Sender<()>>,
} }
impl Default for ServerBuilder { impl Default for ServerBuilder {
@ -54,16 +52,13 @@ impl ServerBuilder {
ServerBuilder { ServerBuilder {
threads: num_cpus::get(), threads: num_cpus::get(),
token: Token::default(), token: Token::default(),
handles: Vec::new(),
services: Vec::new(), services: Vec::new(),
sockets: Vec::new(), sockets: Vec::new(),
accept: AcceptLoop::new(server.clone()),
backlog: 2048, backlog: 2048,
exit: false, exit: false,
shutdown_timeout: Duration::from_secs(30), shutdown_timeout: Duration::from_secs(30),
no_signals: false, no_signals: false,
cmd: rx, cmd: rx,
notify: Vec::new(),
server, server,
} }
} }
@ -259,56 +254,94 @@ impl ServerBuilder {
} else { } else {
info!("Starting {} workers", self.threads); info!("Starting {} workers", self.threads);
// start workers let sockets = mem::take(&mut self.sockets)
let handles = (0..self.threads) .into_iter()
.map(|idx| { .map(|t| {
let handle = self.start_worker(idx, self.accept.waker_owned()); info!("Starting \"{}\" service on {}", t.1, t.2);
self.handles.push((idx, handle.clone())); (t.0, t.2)
handle
}) })
.collect(); .collect();
// start accept thread // collect worker handles on start.
for sock in &self.sockets { let mut handles = Vec::new();
info!("Starting \"{}\" service on {}", sock.1, sock.2);
} // start accept thread. return waker_queue for wake up it.
self.accept.start( let waker_queue = Accept::start(
mem::take(&mut self.sockets) sockets,
.into_iter() self.server.clone(),
.map(|t| (t.0, t.2)) // closure for construct worker and return it's handler.
.collect(), |waker| {
handles, (0..self.threads)
.map(|idx| {
// start workers
let avail = WorkerAvailability::new(waker.clone());
let services =
self.services.iter().map(|v| v.clone_factory()).collect();
let handle = ServerWorker::start(
idx,
services,
avail,
self.shutdown_timeout,
);
handles.push((idx, handle.clone()));
handle
})
.collect()
},
); );
// handle signals // construct signals future.
if !self.no_signals { let signals = if !self.no_signals {
Signals::start(self.server.clone()); Some(Signals::new())
} } else {
None
};
// start http server actor let server_future = ServerFuture {
let server = self.server.clone(); cmd: self.cmd,
rt::spawn(self); handles,
server services: self.services,
notify: Vec::new(),
exit: self.exit,
shutdown_timeout: self.shutdown_timeout,
signals,
on_stop_task: None,
waker_queue,
};
// spawn server future.
rt::spawn(server_future);
self.server
} }
} }
}
fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerHandle { /// `ServerFuture` when awaited or spawned would listen to signal and message from `Server`.
let avail = WorkerAvailability::new(waker); struct ServerFuture {
let services = self.services.iter().map(|v| v.clone_factory()).collect(); cmd: UnboundedReceiver<ServerCommand>,
handles: Vec<(usize, WorkerHandle)>,
services: Vec<Box<dyn InternalServiceFactory>>,
notify: Vec<oneshot::Sender<()>>,
exit: bool,
shutdown_timeout: Duration,
signals: Option<Signals>,
on_stop_task: Option<BoxFuture<'static, ()>>,
waker_queue: WakerQueue,
}
ServerWorker::start(idx, services, avail, self.shutdown_timeout) impl ServerFuture {
} fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
fn handle_cmd(&mut self, item: ServerCommand) {
match item { match item {
ServerCommand::Pause(tx) => { ServerCommand::Pause(tx) => {
self.accept.wake(WakerInterest::Pause); self.waker_queue.wake(WakerInterest::Pause);
let _ = tx.send(()); let _ = tx.send(());
None
} }
ServerCommand::Resume(tx) => { ServerCommand::Resume(tx) => {
self.accept.wake(WakerInterest::Resume); self.waker_queue.wake(WakerInterest::Resume);
let _ = tx.send(()); let _ = tx.send(());
None
} }
ServerCommand::Signal(sig) => { ServerCommand::Signal(sig) => {
// Signals support // Signals support
@ -338,11 +371,12 @@ impl ServerBuilder {
completion: None, completion: None,
}) })
} }
_ => (), _ => None,
} }
} }
ServerCommand::Notify(tx) => { ServerCommand::Notify(tx) => {
self.notify.push(tx); self.notify.push(tx);
None
} }
ServerCommand::Stop { ServerCommand::Stop {
graceful, graceful,
@ -351,7 +385,7 @@ impl ServerBuilder {
let exit = self.exit; let exit = self.exit;
// stop accept thread // stop accept thread
self.accept.wake(WakerInterest::Stop); self.waker_queue.wake(WakerInterest::Stop);
let notify = std::mem::take(&mut self.notify); let notify = std::mem::take(&mut self.notify);
// stop workers // stop workers
@ -360,12 +394,12 @@ impl ServerBuilder {
.handles .handles
.iter() .iter()
.map(move |worker| worker.1.stop(graceful)) .map(move |worker| worker.1.stop(graceful))
.collect(); .collect::<Vec<_>>();
let fut = join_all(iter); Some(Box::pin(async move {
for handle in iter {
rt::spawn(async move { let _ = handle.await;
let _ = fut.await; }
if let Some(tx) = completion { if let Some(tx) = completion {
let _ = tx.send(()); let _ = tx.send(());
} }
@ -373,26 +407,25 @@ impl ServerBuilder {
let _ = tx.send(()); let _ = tx.send(());
} }
if exit { if exit {
rt::spawn(async {
sleep_until(Instant::now() + Duration::from_millis(300)).await;
System::current().stop();
});
}
});
} else {
// we need to stop system if server was spawned
if self.exit {
rt::spawn(async {
sleep_until(Instant::now() + Duration::from_millis(300)).await; sleep_until(Instant::now() + Duration::from_millis(300)).await;
System::current().stop(); System::current().stop();
}); }
} }))
if let Some(tx) = completion { } else {
let _ = tx.send(()); // we need to stop system if server was spawned
} let exit = self.exit;
for tx in notify { Some(Box::pin(async move {
let _ = tx.send(()); if exit {
} sleep_until(Instant::now() + Duration::from_millis(300)).await;
System::current().stop();
}
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
}))
} }
} }
ServerCommand::WorkerFaulted(idx) => { ServerCommand::WorkerFaulted(idx) => {
@ -419,22 +452,47 @@ impl ServerBuilder {
break; break;
} }
let handle = self.start_worker(new_idx, self.accept.waker_owned()); let avail = WorkerAvailability::new(self.waker_queue.clone());
let services = self.services.iter().map(|v| v.clone_factory()).collect();
let handle =
ServerWorker::start(new_idx, services, avail, self.shutdown_timeout);
self.handles.push((new_idx, handle.clone())); self.handles.push((new_idx, handle.clone()));
self.accept.wake(WakerInterest::Worker(handle)); self.waker_queue.wake(WakerInterest::Worker(handle));
} }
None
} }
} }
} }
} }
impl Future for ServerBuilder { impl Future for ServerFuture {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
// poll signals first. remove it on resolve.
if let Some(ref mut signals) = this.signals {
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
this.on_stop_task = this.handle_cmd(ServerCommand::Signal(signal));
this.signals = None;
// poll another round for trying on_stop_task.
return self.poll(cx);
}
}
// actively poll command channel and handle command.
loop { loop {
match Pin::new(&mut self.cmd).poll_recv(cx) { // got on stop task. resolve it exclusively and exit.
Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), if let Some(ref mut fut) = this.on_stop_task {
return fut.as_mut().poll(cx);
}
match Pin::new(&mut this.cmd).poll_recv(cx) {
Poll::Ready(Some(it)) => {
this.on_stop_task = this.handle_cmd(it);
}
_ => return Poll::Pending, _ => return Poll::Pending,
} }
} }

View File

@ -40,10 +40,6 @@ impl Server {
ServerBuilder::default() ServerBuilder::default()
} }
pub(crate) fn signal(&self, sig: Signal) {
let _ = self.0.send(ServerCommand::Signal(sig));
}
pub(crate) fn worker_faulted(&self, idx: usize) { pub(crate) fn worker_faulted(&self, idx: usize) {
let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); let _ = self.0.send(ServerCommand::WorkerFaulted(idx));
} }
@ -104,9 +100,6 @@ impl Future for Server {
this.1 = Some(rx); this.1 = Some(rx);
} }
match Pin::new(this.1.as_mut().unwrap()).poll(cx) { Pin::new(this.1.as_mut().unwrap()).poll(cx).map(|_| Ok(()))
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Ok(())),
}
} }
} }

View File

@ -2,9 +2,7 @@ use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use futures_core::future::LocalBoxFuture; use futures_core::future::BoxFuture;
use crate::server::Server;
/// Different types of process signals /// Different types of process signals
#[allow(dead_code)] #[allow(dead_code)]
@ -21,21 +19,19 @@ pub(crate) enum Signal {
} }
pub(crate) struct Signals { pub(crate) struct Signals {
srv: Server,
#[cfg(not(unix))] #[cfg(not(unix))]
signals: LocalBoxFuture<'static, std::io::Result<()>>, signals: BoxFuture<'static, std::io::Result<()>>,
#[cfg(unix)] #[cfg(unix)]
signals: Vec<(Signal, LocalBoxFuture<'static, ()>)>, signals: Vec<(Signal, BoxFuture<'static, ()>)>,
} }
impl Signals { impl Signals {
pub(crate) fn start(srv: Server) { pub(crate) fn new() -> Self {
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
actix_rt::spawn(Signals { Signals {
srv,
signals: Box::pin(actix_rt::signal::ctrl_c()), signals: Box::pin(actix_rt::signal::ctrl_c()),
}); }
} }
#[cfg(unix)] #[cfg(unix)]
{ {
@ -66,30 +62,25 @@ impl Signals {
} }
} }
actix_rt::spawn(Signals { srv, signals }); Signals { signals }
} }
} }
} }
impl Future for Signals { impl Future for Signals {
type Output = (); type Output = Signal;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(unix))] #[cfg(not(unix))]
match self.signals.as_mut().poll(cx) { {
Poll::Ready(_) => { self.signals.as_mut().poll(cx).map(|_| Signal::Int)
self.srv.signal(Signal::Int);
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
} }
#[cfg(unix)] #[cfg(unix)]
{ {
for (sig, fut) in self.signals.iter_mut() { for (sig, fut) in self.signals.iter_mut() {
if fut.as_mut().poll(cx).is_ready() { if fut.as_mut().poll(cx).is_ready() {
let sig = *sig; return Poll::Ready(*sig);
self.srv.signal(sig);
return Poll::Ready(());
} }
} }
Poll::Pending Poll::Pending