merge Server and ServerHandle to server module. fix doc link

This commit is contained in:
fakeshadow 2021-04-03 19:14:26 +08:00
parent 47cc858ab4
commit 7a773bc610
5 changed files with 363 additions and 358 deletions

View File

@ -5,11 +5,12 @@ use log::{error, info};
use mio::{Interest, Poll, Token as MioToken};
use slab::Slab;
use crate::server_handle::ServerHandle;
use crate::builder::ServerBuilder;
use crate::server::ServerHandle;
use crate::socket::{MioListener, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, ServerWorker, WorkerAvailability, WorkerHandle};
use crate::{ServerBuilder, Token};
use crate::Token;
const DUR_ON_ERR: Duration = Duration::from_millis(500);
@ -86,7 +87,7 @@ impl Accept {
thread::Builder::new()
.name("actix-server acceptor".to_owned())
.spawn(move || accept.poll_with(sockets))
.unwrap();
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
// return waker and worker handle clones to server builder.
Ok((waker_queue, handles_clone))

View File

@ -1,26 +1,16 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::io;
use std::time::Duration;
use std::{io, mem};
use actix_rt::net::TcpStream;
use actix_rt::time::sleep;
use actix_rt::System;
use futures_core::future::BoxFuture;
use log::{error, info};
use log::info;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use crate::accept::Accept;
use crate::config::{ConfiguredService, ServiceConfig};
use crate::server_handle::{ServerCommand, ServerHandle};
use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{self, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle};
use crate::worker::{self, ServerWorkerConfig};
use crate::Token;
/// Server builder
@ -29,11 +19,11 @@ pub struct ServerBuilder {
token: Token,
backlog: u32,
pub(super) services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, String, MioListener)>,
exit: bool,
no_signals: bool,
pub(super) sockets: Vec<(Token, String, MioListener)>,
pub(super) exit: bool,
pub(super) no_signals: bool,
pub(super) cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
pub(super) cmd_rx: UnboundedReceiver<ServerCommand>,
pub(super) worker_config: ServerWorkerConfig,
}
@ -264,239 +254,12 @@ impl ServerBuilder {
}
/// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> Server {
pub fn run(self) -> Server {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
info!("Starting {} workers", self.threads);
let sockets = mem::take(&mut self.sockets)
.into_iter()
.map(|t| {
info!("Starting \"{}\" service on {}", t.1, t.2);
(t.0, t.2)
})
.collect();
// start accept thread. return waker_queue and worker handles.
let (waker_queue, handles) = Accept::start(sockets, &self)
// TODO: include error to Server type and poll return it in Future.
.unwrap_or_else(|e| panic!("Can not start Accept: {}", e));
// construct signals future.
let signals = if !self.no_signals {
Some(Signals::new())
} else {
None
};
Server {
cmd_tx: self.cmd_tx,
cmd_rx: self.cmd_rx,
handles,
services: self.services,
notify: Vec::new(),
exit: self.exit,
worker_config: self.worker_config,
signals,
on_stop_task: None,
waker_queue,
}
}
}
}
/// When awaited or spawned would listen to signal and message from [ServerHandle](crate::server::ServerHandle).
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Server {
cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
handles: Vec<(usize, WorkerHandle)>,
services: Vec<Box<dyn InternalServiceFactory>>,
notify: Vec<oneshot::Sender<()>>,
exit: bool,
worker_config: ServerWorkerConfig,
signals: Option<Signals>,
on_stop_task: Option<BoxFuture<'static, ()>>,
waker_queue: WakerQueue,
}
impl Server {
/// Obtain a Handle for ServerFuture that can be used to change state of actix server.
///
/// See [ServerHandle](crate::server::ServerHandle) for usage.
pub fn handle(&self) -> ServerHandle {
ServerHandle::new(self.cmd_tx.clone())
}
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
match item {
ServerCommand::Pause(tx) => {
self.waker_queue.wake(WakerInterest::Pause);
let _ = tx.send(());
None
}
ServerCommand::Resume(tx) => {
self.waker_queue.wake(WakerInterest::Resume);
let _ = tx.send(());
None
}
ServerCommand::Signal(sig) => {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match sig {
Signal::Int => {
info!("SIGINT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
Signal::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
_ => None,
}
}
ServerCommand::Notify(tx) => {
self.notify.push(tx);
None
}
ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;
// stop accept thread
self.waker_queue.wake(WakerInterest::Stop);
let notify = std::mem::take(&mut self.notify);
// stop workers
if !self.handles.is_empty() && graceful {
let iter = self
.handles
.iter()
.map(move |worker| worker.1.stop(graceful))
.collect::<Vec<_>>();
// TODO: this async block can return io::Error.
Some(Box::pin(async move {
for handle in iter {
let _ = handle.await;
}
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
if exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
}))
} else {
// we need to stop system if server was spawned
let exit = self.exit;
// TODO: this async block can return io::Error.
Some(Box::pin(async move {
if exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
}))
}
}
ServerCommand::WorkerFaulted(idx) => {
let mut found = false;
for i in 0..self.handles.len() {
if self.handles[i].0 == idx {
self.handles.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let mut new_idx = self.handles.len();
'found: loop {
for i in 0..self.handles.len() {
if self.handles[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let availability = WorkerAvailability::new(self.waker_queue.clone());
let factories = self.services.iter().map(|v| v.clone_factory()).collect();
let handle = ServerWorker::start(
new_idx,
factories,
availability,
self.worker_config,
);
self.handles.push((new_idx, handle.clone()));
self.waker_queue.wake(WakerInterest::Worker(handle));
}
None
}
}
}
}
impl Future for Server {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
// poll signals first. remove it on resolve.
if let Some(ref mut signals) = this.signals {
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
this.on_stop_task = this.handle_cmd(ServerCommand::Signal(signal));
this.signals = None;
}
}
// actively poll command channel and handle command.
loop {
// got on stop task. resolve it exclusively and exit.
if let Some(ref mut fut) = this.on_stop_task {
return fut.as_mut().poll(cx).map(|_| Ok(()));
}
match Pin::new(&mut this.cmd_rx).poll_recv(cx) {
Poll::Ready(Some(it)) => {
this.on_stop_task = this.handle_cmd(it);
}
_ => return Poll::Pending,
}
Server::new(self)
}
}
}

View File

@ -7,7 +7,7 @@
mod accept;
mod builder;
mod config;
mod server_handle;
mod server;
mod service;
mod signals;
mod socket;
@ -15,9 +15,9 @@ mod test_server;
mod waker_queue;
mod worker;
pub use self::builder::{Server, ServerBuilder};
pub use self::builder::ServerBuilder;
pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server_handle::ServerHandle;
pub use self::server::{Server, ServerHandle};
pub use self::service::ServiceFactory;
pub use self::test_server::TestServer;

346
actix-server/src/server.rs Normal file
View File

@ -0,0 +1,346 @@
use std::{
future::Future,
io, mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use actix_rt::{time::sleep, System};
use futures_core::future::BoxFuture;
use log::{error, info};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use crate::accept::Accept;
use crate::builder::ServerBuilder;
use crate::service::InternalServiceFactory;
use crate::signals::{Signal, Signals};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle};
/// When awaited or spawned would listen to signal and message from [ServerHandle](ServerHandle).
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Server {
cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
handles: Vec<(usize, WorkerHandle)>,
services: Vec<Box<dyn InternalServiceFactory>>,
notify: Vec<oneshot::Sender<()>>,
exit: bool,
worker_config: ServerWorkerConfig,
signals: Option<Signals>,
on_stop_task: Option<BoxFuture<'static, ()>>,
waker_queue: WakerQueue,
}
impl Server {
pub(crate) fn new(mut builder: ServerBuilder) -> Self {
let sockets = mem::take(&mut builder.sockets)
.into_iter()
.map(|(token, name, lst)| {
info!("Starting \"{}\" service on {}", name, lst);
(token, lst)
})
.collect();
// start accept thread. return waker_queue and worker handles.
let (waker_queue, handles) = Accept::start(sockets, &builder)
// TODO: include error to Server type and poll return it in Future.
.unwrap_or_else(|e| panic!("Can not start Accept: {}", e));
// construct signals future.
let signals = if !builder.no_signals {
Some(Signals::new())
} else {
None
};
Self {
cmd_tx: builder.cmd_tx,
cmd_rx: builder.cmd_rx,
handles,
services: builder.services,
notify: Vec::new(),
exit: builder.exit,
worker_config: builder.worker_config,
signals,
on_stop_task: None,
waker_queue,
}
}
/// Obtain a Handle for ServerFuture that can be used to change state of actix server.
///
/// See [ServerHandle](ServerHandle) for usage.
pub fn handle(&self) -> ServerHandle {
ServerHandle::new(self.cmd_tx.clone())
}
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
match item {
ServerCommand::Pause(tx) => {
self.waker_queue.wake(WakerInterest::Pause);
let _ = tx.send(());
None
}
ServerCommand::Resume(tx) => {
self.waker_queue.wake(WakerInterest::Resume);
let _ = tx.send(());
None
}
ServerCommand::Signal(sig) => {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match sig {
Signal::Int => {
info!("SIGINT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received, stopping");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
Signal::Quit => {
info!("SIGQUIT received, exiting");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
_ => None,
}
}
ServerCommand::Notify(tx) => {
self.notify.push(tx);
None
}
ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;
// stop accept thread
self.waker_queue.wake(WakerInterest::Stop);
let notify = std::mem::take(&mut self.notify);
// stop workers
if !self.handles.is_empty() && graceful {
let iter = self
.handles
.iter()
.map(move |worker| worker.1.stop(graceful))
.collect::<Vec<_>>();
// TODO: this async block can return io::Error.
Some(Box::pin(async move {
for handle in iter {
let _ = handle.await;
}
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
if exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
}))
} else {
// we need to stop system if server was spawned
// TODO: this async block can return io::Error.
Some(Box::pin(async move {
if exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
}))
}
}
ServerCommand::WorkerFaulted(idx) => {
let mut found = false;
for i in 0..self.handles.len() {
if self.handles[i].0 == idx {
self.handles.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let mut new_idx = self.handles.len();
'found: loop {
for i in 0..self.handles.len() {
if self.handles[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let availability = WorkerAvailability::new(self.waker_queue.clone());
let factories = self.services.iter().map(|v| v.clone_factory()).collect();
let handle = ServerWorker::start(
new_idx,
factories,
availability,
self.worker_config,
);
self.handles.push((new_idx, handle.clone()));
self.waker_queue.wake(WakerInterest::Worker(handle));
}
None
}
}
}
}
impl Future for Server {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
// poll signals first. remove it on resolve.
if let Some(ref mut signals) = this.signals {
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
this.on_stop_task = this.handle_cmd(ServerCommand::Signal(signal));
this.signals = None;
}
}
// actively poll command channel and handle command.
loop {
// got on stop task. resolve it exclusively and exit.
if let Some(ref mut fut) = this.on_stop_task {
return fut.as_mut().poll(cx).map(|_| Ok(()));
}
match Pin::new(&mut this.cmd_rx).poll_recv(cx) {
Poll::Ready(Some(it)) => {
this.on_stop_task = this.handle_cmd(it);
}
_ => return Poll::Pending,
}
}
}
}
#[derive(Debug)]
pub(crate) enum ServerCommand {
WorkerFaulted(usize),
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
Signal(Signal),
/// Whether to try and shut down gracefully
Stop {
graceful: bool,
completion: Option<oneshot::Sender<()>>,
},
/// Notify of server stop
Notify(oneshot::Sender<()>),
}
#[derive(Debug)]
pub struct ServerHandle(
UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>,
);
impl ServerHandle {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
ServerHandle(tx, None)
}
/// Start server building process
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
pub(crate) fn worker_faulted(&self, idx: usize) {
let _ = self.0.send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Pause(tx));
async {
let _ = rx.await;
}
}
/// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Resume(tx));
async {
let _ = rx.await;
}
}
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
async {
let _ = rx.await;
}
}
}
impl Clone for ServerHandle {
fn clone(&self) -> Self {
Self(self.0.clone(), None)
}
}
impl Future for ServerHandle {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.1.is_none() {
let (tx, rx) = oneshot::channel();
if this.0.send(ServerCommand::Notify(tx)).is_err() {
return Poll::Ready(Ok(()));
}
this.1 = Some(rx);
}
Pin::new(this.1.as_mut().unwrap()).poll(cx).map(|_| Ok(()))
}
}

View File

@ -1,105 +0,0 @@
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use crate::builder::ServerBuilder;
use crate::signals::Signal;
#[derive(Debug)]
pub(crate) enum ServerCommand {
WorkerFaulted(usize),
Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>),
Signal(Signal),
/// Whether to try and shut down gracefully
Stop {
graceful: bool,
completion: Option<oneshot::Sender<()>>,
},
/// Notify of server stop
Notify(oneshot::Sender<()>),
}
#[derive(Debug)]
pub struct ServerHandle(
UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>,
);
impl ServerHandle {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
ServerHandle(tx, None)
}
/// Start server building process
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
pub(crate) fn worker_faulted(&self, idx: usize) {
let _ = self.0.send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Pause(tx));
async {
let _ = rx.await;
}
}
/// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Resume(tx));
async {
let _ = rx.await;
}
}
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
async {
let _ = rx.await;
}
}
}
impl Clone for ServerHandle {
fn clone(&self) -> Self {
Self(self.0.clone(), None)
}
}
impl Future for ServerHandle {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.1.is_none() {
let (tx, rx) = oneshot::channel();
if this.0.send(ServerCommand::Notify(tx)).is_err() {
return Poll::Ready(Ok(()));
}
this.1 = Some(rx);
}
Pin::new(this.1.as_mut().unwrap()).poll(cx).map(|_| Ok(()))
}
}