document server struct

This commit is contained in:
Rob Ede 2021-11-03 16:37:43 +00:00
parent 5fe957d275
commit 3ba69afe9f
6 changed files with 123 additions and 57 deletions

View File

@ -23,7 +23,7 @@ use actix_service::{fn_service, ServiceFactoryExt as _};
use bytes::BytesMut;
use futures_util::future::ok;
use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
async fn run() -> io::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
@ -83,7 +83,6 @@ async fn run() -> io::Result<()> {
})
})?
.workers(1)
// .system_exit()
.run()
.await
}

View File

@ -6,11 +6,10 @@ use mio::{Interest, Poll, Token as MioToken};
use crate::{
availability::Availability,
server::ServerHandle,
socket::MioListener,
waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN},
worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer},
ServerBuilder,
ServerBuilder, ServerHandle,
};
const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510);

View File

@ -14,7 +14,7 @@ use crate::{
Server,
};
/// Server builder.
/// [Server] builder.
pub struct ServerBuilder {
pub(crate) threads: usize,
pub(crate) token: usize,

View File

@ -0,0 +1,53 @@
use std::future::Future;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use crate::server::ServerCommand;
/// Server handle.
#[derive(Debug, Clone)]
pub struct ServerHandle {
tx_cmd: UnboundedSender<ServerCommand>,
}
impl ServerHandle {
pub(crate) fn new(tx_cmd: UnboundedSender<ServerCommand>) -> Self {
ServerHandle { tx_cmd }
}
pub(crate) fn worker_faulted(&self, idx: usize) {
let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections.
///
/// May drop socket pending connection. All open connections remain active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx_cmd.send(ServerCommand::Pause(tx));
async {
let _ = rx.await;
}
}
/// Resume accepting incoming connections.
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx_cmd.send(ServerCommand::Resume(tx));
async {
let _ = rx.await;
}
}
/// Stop incoming connection processing, stop all workers and exit.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx_cmd.send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
async {
let _ = rx.await;
}
}
}

View File

@ -7,6 +7,7 @@
mod accept;
mod availability;
mod builder;
mod handle;
mod join_all;
mod server;
mod service;
@ -17,7 +18,8 @@ mod waker_queue;
mod worker;
pub use self::builder::ServerBuilder;
pub use self::server::{Server, ServerHandle};
pub use self::handle::ServerHandle;
pub use self::server::Server;
pub use self::service::ServiceFactory;
pub use self::test_server::TestServer;

View File

@ -22,6 +22,7 @@ use crate::{
signals::{Signal, Signals},
waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
ServerHandle,
};
#[derive(Debug)]
@ -45,13 +46,73 @@ pub(crate) enum ServerCommand {
},
}
/// Server
/// General purpose TCP server that runs services receiving Tokio `TcpStream`s.
///
/// Handles creating worker threads, restarting faulted workers, connection accepting, and
/// back-pressure logic.
///
/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and
/// distributes connections with a round-robin strategy.
///
/// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve
/// when the server has fully shut down.
///
/// # Shutdown Signals
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown.
/// forced shutdown. On Windows, a Ctrl-C signal will start a forced shutdown.
///
/// A graceful shutdown will wait for all workers to stop first.
///
/// # Examples
/// The following is a TCP echo server. Test using `telnet 127.0.0.1 8080`.
///
/// ```no_run
/// use std::io;
///
/// use actix_rt::net::TcpStream;
/// use actix_server::Server;
/// use actix_service::{fn_service, ServiceFactoryExt as _};
/// use bytes::BytesMut;
/// use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
///
/// #[actix_rt::main]
/// async fn main() -> io::Result<()> {
/// let bind_addr = ("127.0.0.1", 8080);
///
/// Server::build()
/// .bind("echo", bind_addr, move || {
/// fn_service(move |mut stream: TcpStream| {
/// async move {
/// let mut size = 0;
/// let mut buf = BytesMut::new();
///
/// loop {
/// match stream.read_buf(&mut buf).await {
/// // end of stream; bail from loop
/// Ok(0) => break,
///
/// // write bytes back to stream
/// Ok(bytes_read) => {
/// stream.write_all(&buf[size..]).await.unwrap();
/// size += bytes_read;
/// }
///
/// Err(err) => {
/// eprintln!("Stream Error: {:?}", err);
/// return Err(());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// })
/// .map_err(|err| eprintln!("Service Error: {:?}", err))
/// })?
/// .run()
/// .await
/// }
/// ```
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub enum Server {
Server(ServerInner),
@ -59,7 +120,7 @@ pub enum Server {
}
impl Server {
/// Start server building process.
/// Create server build.
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
@ -169,54 +230,6 @@ impl Future for Server {
}
}
/// Server handle.
#[derive(Debug, Clone)]
pub struct ServerHandle {
tx_cmd: UnboundedSender<ServerCommand>,
}
impl ServerHandle {
pub(crate) fn new(tx_cmd: UnboundedSender<ServerCommand>) -> Self {
ServerHandle { tx_cmd }
}
pub(crate) fn worker_faulted(&self, idx: usize) {
let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections.
///
/// May drop socket pending connection. All open connections remain active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx_cmd.send(ServerCommand::Pause(tx));
async {
let _ = rx.await;
}
}
/// Resume accepting incoming connections.
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx_cmd.send(ServerCommand::Resume(tx));
async {
let _ = rx.await;
}
}
/// Stop incoming connection processing, stop all workers and exit.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx_cmd.send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
async {
let _ = rx.await;
}
}
}
pub struct ServerInner {
worker_handles: Vec<WorkerHandleServer>,
worker_config: ServerWorkerConfig,