diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 01f58479..930ebf0a 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -23,7 +23,7 @@ use actix_service::{fn_service, ServiceFactoryExt as _}; use bytes::BytesMut; use futures_util::future::ok; use log::{error, info}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; async fn run() -> io::Result<()> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); @@ -83,7 +83,6 @@ async fn run() -> io::Result<()> { }) })? .workers(1) - // .system_exit() .run() .await } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 3bca4305..bdeb6004 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -6,11 +6,10 @@ use mio::{Interest, Poll, Token as MioToken}; use crate::{ availability::Availability, - server::ServerHandle, socket::MioListener, waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}, worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer}, - ServerBuilder, + ServerBuilder, ServerHandle, }; const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510); diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index f0eef5c0..0d4abe78 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -14,7 +14,7 @@ use crate::{ Server, }; -/// Server builder. +/// [Server] builder. pub struct ServerBuilder { pub(crate) threads: usize, pub(crate) token: usize, diff --git a/actix-server/src/handle.rs b/actix-server/src/handle.rs new file mode 100644 index 00000000..55f2bb25 --- /dev/null +++ b/actix-server/src/handle.rs @@ -0,0 +1,53 @@ +use std::future::Future; + +use tokio::sync::{mpsc::UnboundedSender, oneshot}; + +use crate::server::ServerCommand; + +/// Server handle. +#[derive(Debug, Clone)] +pub struct ServerHandle { + tx_cmd: UnboundedSender, +} + +impl ServerHandle { + pub(crate) fn new(tx_cmd: UnboundedSender) -> Self { + ServerHandle { tx_cmd } + } + + pub(crate) fn worker_faulted(&self, idx: usize) { + let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx)); + } + + /// Pause accepting incoming connections. + /// + /// May drop socket pending connection. All open connections remain active. + pub fn pause(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.tx_cmd.send(ServerCommand::Pause(tx)); + async { + let _ = rx.await; + } + } + + /// Resume accepting incoming connections. + pub fn resume(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.tx_cmd.send(ServerCommand::Resume(tx)); + async { + let _ = rx.await; + } + } + + /// Stop incoming connection processing, stop all workers and exit. + pub fn stop(&self, graceful: bool) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.tx_cmd.send(ServerCommand::Stop { + graceful, + completion: Some(tx), + }); + async { + let _ = rx.await; + } + } +} diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 86db82d2..6ac8ba7e 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -7,6 +7,7 @@ mod accept; mod availability; mod builder; +mod handle; mod join_all; mod server; mod service; @@ -17,7 +18,8 @@ mod waker_queue; mod worker; pub use self::builder::ServerBuilder; -pub use self::server::{Server, ServerHandle}; +pub use self::handle::ServerHandle; +pub use self::server::Server; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 58ee647f..f1edcb23 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -22,6 +22,7 @@ use crate::{ signals::{Signal, Signals}, waker_queue::{WakerInterest, WakerQueue}, worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}, + ServerHandle, }; #[derive(Debug)] @@ -45,13 +46,73 @@ pub(crate) enum ServerCommand { }, } -/// Server +/// General purpose TCP server that runs services receiving Tokio `TcpStream`s. +/// +/// Handles creating worker threads, restarting faulted workers, connection accepting, and +/// back-pressure logic. +/// +/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and +/// distributes connections with a round-robin strategy. +/// +/// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve +/// when the server has fully shut down. /// /// # Shutdown Signals /// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a -/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown. +/// forced shutdown. On Windows, a Ctrl-C signal will start a forced shutdown. /// /// A graceful shutdown will wait for all workers to stop first. +/// +/// # Examples +/// The following is a TCP echo server. Test using `telnet 127.0.0.1 8080`. +/// +/// ```no_run +/// use std::io; +/// +/// use actix_rt::net::TcpStream; +/// use actix_server::Server; +/// use actix_service::{fn_service, ServiceFactoryExt as _}; +/// use bytes::BytesMut; +/// use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; +/// +/// #[actix_rt::main] +/// async fn main() -> io::Result<()> { +/// let bind_addr = ("127.0.0.1", 8080); +/// +/// Server::build() +/// .bind("echo", bind_addr, move || { +/// fn_service(move |mut stream: TcpStream| { +/// async move { +/// let mut size = 0; +/// let mut buf = BytesMut::new(); +/// +/// loop { +/// match stream.read_buf(&mut buf).await { +/// // end of stream; bail from loop +/// Ok(0) => break, +/// +/// // write bytes back to stream +/// Ok(bytes_read) => { +/// stream.write_all(&buf[size..]).await.unwrap(); +/// size += bytes_read; +/// } +/// +/// Err(err) => { +/// eprintln!("Stream Error: {:?}", err); +/// return Err(()); +/// } +/// } +/// } +/// +/// Ok(()) +/// } +/// }) +/// .map_err(|err| eprintln!("Service Error: {:?}", err)) +/// })? +/// .run() +/// .await +/// } +/// ``` #[must_use = "futures do nothing unless you `.await` or poll them"] pub enum Server { Server(ServerInner), @@ -59,7 +120,7 @@ pub enum Server { } impl Server { - /// Start server building process. + /// Create server build. pub fn build() -> ServerBuilder { ServerBuilder::default() } @@ -169,54 +230,6 @@ impl Future for Server { } } -/// Server handle. -#[derive(Debug, Clone)] -pub struct ServerHandle { - tx_cmd: UnboundedSender, -} - -impl ServerHandle { - pub(crate) fn new(tx_cmd: UnboundedSender) -> Self { - ServerHandle { tx_cmd } - } - - pub(crate) fn worker_faulted(&self, idx: usize) { - let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx)); - } - - /// Pause accepting incoming connections. - /// - /// May drop socket pending connection. All open connections remain active. - pub fn pause(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.tx_cmd.send(ServerCommand::Pause(tx)); - async { - let _ = rx.await; - } - } - - /// Resume accepting incoming connections. - pub fn resume(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.tx_cmd.send(ServerCommand::Resume(tx)); - async { - let _ = rx.await; - } - } - - /// Stop incoming connection processing, stop all workers and exit. - pub fn stop(&self, graceful: bool) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.tx_cmd.send(ServerCommand::Stop { - graceful, - completion: Some(tx), - }); - async { - let _ = rx.await; - } - } -} - pub struct ServerInner { worker_handles: Vec, worker_config: ServerWorkerConfig,