diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index f3fcb5a8..ad76a74d 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -5,11 +5,12 @@ use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; use slab::Slab; -use crate::server_handle::ServerHandle; +use crate::builder::ServerBuilder; +use crate::server::ServerHandle; use crate::socket::{MioListener, SocketAddr}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, ServerWorker, WorkerAvailability, WorkerHandle}; -use crate::{ServerBuilder, Token}; +use crate::Token; const DUR_ON_ERR: Duration = Duration::from_millis(500); @@ -86,7 +87,7 @@ impl Accept { thread::Builder::new() .name("actix-server acceptor".to_owned()) .spawn(move || accept.poll_with(sockets)) - .unwrap(); + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; // return waker and worker handle clones to server builder. Ok((waker_queue, handles_clone)) diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 2da34703..ef01f5a7 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,26 +1,16 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::io; use std::time::Duration; -use std::{io, mem}; use actix_rt::net::TcpStream; -use actix_rt::time::sleep; -use actix_rt::System; -use futures_core::future::BoxFuture; -use log::{error, info}; +use log::info; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot; -use crate::accept::Accept; use crate::config::{ConfiguredService, ServiceConfig}; -use crate::server_handle::{ServerCommand, ServerHandle}; +use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; -use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{self, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle}; +use crate::worker::{self, ServerWorkerConfig}; use crate::Token; /// Server builder @@ -29,11 +19,11 @@ pub struct ServerBuilder { token: Token, backlog: u32, pub(super) services: Vec>, - sockets: Vec<(Token, String, MioListener)>, - exit: bool, - no_signals: bool, + pub(super) sockets: Vec<(Token, String, MioListener)>, + pub(super) exit: bool, + pub(super) no_signals: bool, pub(super) cmd_tx: UnboundedSender, - cmd_rx: UnboundedReceiver, + pub(super) cmd_rx: UnboundedReceiver, pub(super) worker_config: ServerWorkerConfig, } @@ -264,239 +254,12 @@ impl ServerBuilder { } /// Starts processing incoming connections and return server controller. - pub fn run(mut self) -> Server { + pub fn run(self) -> Server { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { info!("Starting {} workers", self.threads); - - let sockets = mem::take(&mut self.sockets) - .into_iter() - .map(|t| { - info!("Starting \"{}\" service on {}", t.1, t.2); - (t.0, t.2) - }) - .collect(); - - // start accept thread. return waker_queue and worker handles. - let (waker_queue, handles) = Accept::start(sockets, &self) - // TODO: include error to Server type and poll return it in Future. - .unwrap_or_else(|e| panic!("Can not start Accept: {}", e)); - - // construct signals future. - let signals = if !self.no_signals { - Some(Signals::new()) - } else { - None - }; - - Server { - cmd_tx: self.cmd_tx, - cmd_rx: self.cmd_rx, - handles, - services: self.services, - notify: Vec::new(), - exit: self.exit, - worker_config: self.worker_config, - signals, - on_stop_task: None, - waker_queue, - } - } - } -} - -/// When awaited or spawned would listen to signal and message from [ServerHandle](crate::server::ServerHandle). -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Server { - cmd_tx: UnboundedSender, - cmd_rx: UnboundedReceiver, - handles: Vec<(usize, WorkerHandle)>, - services: Vec>, - notify: Vec>, - exit: bool, - worker_config: ServerWorkerConfig, - signals: Option, - on_stop_task: Option>, - waker_queue: WakerQueue, -} - -impl Server { - /// Obtain a Handle for ServerFuture that can be used to change state of actix server. - /// - /// See [ServerHandle](crate::server::ServerHandle) for usage. - pub fn handle(&self) -> ServerHandle { - ServerHandle::new(self.cmd_tx.clone()) - } - - fn handle_cmd(&mut self, item: ServerCommand) -> Option> { - match item { - ServerCommand::Pause(tx) => { - self.waker_queue.wake(WakerInterest::Pause); - let _ = tx.send(()); - None - } - ServerCommand::Resume(tx) => { - self.waker_queue.wake(WakerInterest::Resume); - let _ = tx.send(()); - None - } - ServerCommand::Signal(sig) => { - // Signals support - // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system - match sig { - Signal::Int => { - info!("SIGINT received, exiting"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - Signal::Term => { - info!("SIGTERM received, stopping"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: true, - completion: None, - }) - } - Signal::Quit => { - info!("SIGQUIT received, exiting"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - _ => None, - } - } - ServerCommand::Notify(tx) => { - self.notify.push(tx); - None - } - ServerCommand::Stop { - graceful, - completion, - } => { - let exit = self.exit; - - // stop accept thread - self.waker_queue.wake(WakerInterest::Stop); - let notify = std::mem::take(&mut self.notify); - - // stop workers - if !self.handles.is_empty() && graceful { - let iter = self - .handles - .iter() - .map(move |worker| worker.1.stop(graceful)) - .collect::>(); - - // TODO: this async block can return io::Error. - Some(Box::pin(async move { - for handle in iter { - let _ = handle.await; - } - if let Some(tx) = completion { - let _ = tx.send(()); - } - for tx in notify { - let _ = tx.send(()); - } - if exit { - sleep(Duration::from_millis(300)).await; - System::try_current().as_ref().map(System::stop); - } - })) - } else { - // we need to stop system if server was spawned - let exit = self.exit; - // TODO: this async block can return io::Error. - Some(Box::pin(async move { - if exit { - sleep(Duration::from_millis(300)).await; - System::try_current().as_ref().map(System::stop); - } - if let Some(tx) = completion { - let _ = tx.send(()); - } - for tx in notify { - let _ = tx.send(()); - } - })) - } - } - ServerCommand::WorkerFaulted(idx) => { - let mut found = false; - for i in 0..self.handles.len() { - if self.handles[i].0 == idx { - self.handles.swap_remove(i); - found = true; - break; - } - } - - if found { - error!("Worker has died {:?}, restarting", idx); - - let mut new_idx = self.handles.len(); - 'found: loop { - for i in 0..self.handles.len() { - if self.handles[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let availability = WorkerAvailability::new(self.waker_queue.clone()); - let factories = self.services.iter().map(|v| v.clone_factory()).collect(); - let handle = ServerWorker::start( - new_idx, - factories, - availability, - self.worker_config, - ); - - self.handles.push((new_idx, handle.clone())); - self.waker_queue.wake(WakerInterest::Worker(handle)); - } - None - } - } - } -} - -impl Future for Server { - type Output = io::Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().get_mut(); - - // poll signals first. remove it on resolve. - if let Some(ref mut signals) = this.signals { - if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { - this.on_stop_task = this.handle_cmd(ServerCommand::Signal(signal)); - this.signals = None; - } - } - - // actively poll command channel and handle command. - loop { - // got on stop task. resolve it exclusively and exit. - if let Some(ref mut fut) = this.on_stop_task { - return fut.as_mut().poll(cx).map(|_| Ok(())); - } - - match Pin::new(&mut this.cmd_rx).poll_recv(cx) { - Poll::Ready(Some(it)) => { - this.on_stop_task = this.handle_cmd(it); - } - _ => return Poll::Pending, - } + Server::new(self) } } } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 8a87b4b3..fcb4ed84 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -7,7 +7,7 @@ mod accept; mod builder; mod config; -mod server_handle; +mod server; mod service; mod signals; mod socket; @@ -15,9 +15,9 @@ mod test_server; mod waker_queue; mod worker; -pub use self::builder::{Server, ServerBuilder}; +pub use self::builder::ServerBuilder; pub use self::config::{ServiceConfig, ServiceRuntime}; -pub use self::server_handle::ServerHandle; +pub use self::server::{Server, ServerHandle}; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs new file mode 100644 index 00000000..38d218b8 --- /dev/null +++ b/actix-server/src/server.rs @@ -0,0 +1,346 @@ +use std::{ + future::Future, + io, mem, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use actix_rt::{time::sleep, System}; +use futures_core::future::BoxFuture; +use log::{error, info}; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, +}; + +use crate::accept::Accept; +use crate::builder::ServerBuilder; +use crate::service::InternalServiceFactory; +use crate::signals::{Signal, Signals}; +use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle}; + +/// When awaited or spawned would listen to signal and message from [ServerHandle](ServerHandle). +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Server { + cmd_tx: UnboundedSender, + cmd_rx: UnboundedReceiver, + handles: Vec<(usize, WorkerHandle)>, + services: Vec>, + notify: Vec>, + exit: bool, + worker_config: ServerWorkerConfig, + signals: Option, + on_stop_task: Option>, + waker_queue: WakerQueue, +} + +impl Server { + pub(crate) fn new(mut builder: ServerBuilder) -> Self { + let sockets = mem::take(&mut builder.sockets) + .into_iter() + .map(|(token, name, lst)| { + info!("Starting \"{}\" service on {}", name, lst); + (token, lst) + }) + .collect(); + + // start accept thread. return waker_queue and worker handles. + let (waker_queue, handles) = Accept::start(sockets, &builder) + // TODO: include error to Server type and poll return it in Future. + .unwrap_or_else(|e| panic!("Can not start Accept: {}", e)); + + // construct signals future. + let signals = if !builder.no_signals { + Some(Signals::new()) + } else { + None + }; + + Self { + cmd_tx: builder.cmd_tx, + cmd_rx: builder.cmd_rx, + handles, + services: builder.services, + notify: Vec::new(), + exit: builder.exit, + worker_config: builder.worker_config, + signals, + on_stop_task: None, + waker_queue, + } + } + + /// Obtain a Handle for ServerFuture that can be used to change state of actix server. + /// + /// See [ServerHandle](ServerHandle) for usage. + pub fn handle(&self) -> ServerHandle { + ServerHandle::new(self.cmd_tx.clone()) + } + + fn handle_cmd(&mut self, item: ServerCommand) -> Option> { + match item { + ServerCommand::Pause(tx) => { + self.waker_queue.wake(WakerInterest::Pause); + let _ = tx.send(()); + None + } + ServerCommand::Resume(tx) => { + self.waker_queue.wake(WakerInterest::Resume); + let _ = tx.send(()); + None + } + ServerCommand::Signal(sig) => { + // Signals support + // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system + match sig { + Signal::Int => { + info!("SIGINT received, exiting"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + Signal::Term => { + info!("SIGTERM received, stopping"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: true, + completion: None, + }) + } + Signal::Quit => { + info!("SIGQUIT received, exiting"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + _ => None, + } + } + ServerCommand::Notify(tx) => { + self.notify.push(tx); + None + } + ServerCommand::Stop { + graceful, + completion, + } => { + let exit = self.exit; + + // stop accept thread + self.waker_queue.wake(WakerInterest::Stop); + let notify = std::mem::take(&mut self.notify); + + // stop workers + if !self.handles.is_empty() && graceful { + let iter = self + .handles + .iter() + .map(move |worker| worker.1.stop(graceful)) + .collect::>(); + + // TODO: this async block can return io::Error. + Some(Box::pin(async move { + for handle in iter { + let _ = handle.await; + } + if let Some(tx) = completion { + let _ = tx.send(()); + } + for tx in notify { + let _ = tx.send(()); + } + if exit { + sleep(Duration::from_millis(300)).await; + System::try_current().as_ref().map(System::stop); + } + })) + } else { + // we need to stop system if server was spawned + // TODO: this async block can return io::Error. + Some(Box::pin(async move { + if exit { + sleep(Duration::from_millis(300)).await; + System::try_current().as_ref().map(System::stop); + } + if let Some(tx) = completion { + let _ = tx.send(()); + } + for tx in notify { + let _ = tx.send(()); + } + })) + } + } + ServerCommand::WorkerFaulted(idx) => { + let mut found = false; + for i in 0..self.handles.len() { + if self.handles[i].0 == idx { + self.handles.swap_remove(i); + found = true; + break; + } + } + + if found { + error!("Worker has died {:?}, restarting", idx); + + let mut new_idx = self.handles.len(); + 'found: loop { + for i in 0..self.handles.len() { + if self.handles[i].0 == new_idx { + new_idx += 1; + continue 'found; + } + } + break; + } + + let availability = WorkerAvailability::new(self.waker_queue.clone()); + let factories = self.services.iter().map(|v| v.clone_factory()).collect(); + let handle = ServerWorker::start( + new_idx, + factories, + availability, + self.worker_config, + ); + + self.handles.push((new_idx, handle.clone())); + self.waker_queue.wake(WakerInterest::Worker(handle)); + } + None + } + } + } +} + +impl Future for Server { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().get_mut(); + + // poll signals first. remove it on resolve. + if let Some(ref mut signals) = this.signals { + if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { + this.on_stop_task = this.handle_cmd(ServerCommand::Signal(signal)); + this.signals = None; + } + } + + // actively poll command channel and handle command. + loop { + // got on stop task. resolve it exclusively and exit. + if let Some(ref mut fut) = this.on_stop_task { + return fut.as_mut().poll(cx).map(|_| Ok(())); + } + + match Pin::new(&mut this.cmd_rx).poll_recv(cx) { + Poll::Ready(Some(it)) => { + this.on_stop_task = this.handle_cmd(it); + } + _ => return Poll::Pending, + } + } + } +} + +#[derive(Debug)] +pub(crate) enum ServerCommand { + WorkerFaulted(usize), + Pause(oneshot::Sender<()>), + Resume(oneshot::Sender<()>), + Signal(Signal), + /// Whether to try and shut down gracefully + Stop { + graceful: bool, + completion: Option>, + }, + /// Notify of server stop + Notify(oneshot::Sender<()>), +} + +#[derive(Debug)] +pub struct ServerHandle( + UnboundedSender, + Option>, +); + +impl ServerHandle { + pub(crate) fn new(tx: UnboundedSender) -> Self { + ServerHandle(tx, None) + } + + /// Start server building process + pub fn build() -> ServerBuilder { + ServerBuilder::default() + } + + pub(crate) fn worker_faulted(&self, idx: usize) { + let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); + } + + /// Pause accepting incoming connections + /// + /// If socket contains some pending connection, they might be dropped. + /// All opened connection remains active. + pub fn pause(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.0.send(ServerCommand::Pause(tx)); + async { + let _ = rx.await; + } + } + + /// Resume accepting incoming connections + pub fn resume(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.0.send(ServerCommand::Resume(tx)); + async { + let _ = rx.await; + } + } + + /// Stop incoming connection processing, stop all workers and exit. + /// + /// If server starts with `spawn()` method, then spawned thread get terminated. + pub fn stop(&self, graceful: bool) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.0.send(ServerCommand::Stop { + graceful, + completion: Some(tx), + }); + async { + let _ = rx.await; + } + } +} + +impl Clone for ServerHandle { + fn clone(&self) -> Self { + Self(self.0.clone(), None) + } +} + +impl Future for ServerHandle { + type Output = io::Result<()>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + if this.1.is_none() { + let (tx, rx) = oneshot::channel(); + if this.0.send(ServerCommand::Notify(tx)).is_err() { + return Poll::Ready(Ok(())); + } + this.1 = Some(rx); + } + + Pin::new(this.1.as_mut().unwrap()).poll(cx).map(|_| Ok(())) + } +} diff --git a/actix-server/src/server_handle.rs b/actix-server/src/server_handle.rs deleted file mode 100644 index 679dc1a1..00000000 --- a/actix-server/src/server_handle.rs +++ /dev/null @@ -1,105 +0,0 @@ -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::oneshot; - -use crate::builder::ServerBuilder; -use crate::signals::Signal; - -#[derive(Debug)] -pub(crate) enum ServerCommand { - WorkerFaulted(usize), - Pause(oneshot::Sender<()>), - Resume(oneshot::Sender<()>), - Signal(Signal), - /// Whether to try and shut down gracefully - Stop { - graceful: bool, - completion: Option>, - }, - /// Notify of server stop - Notify(oneshot::Sender<()>), -} - -#[derive(Debug)] -pub struct ServerHandle( - UnboundedSender, - Option>, -); - -impl ServerHandle { - pub(crate) fn new(tx: UnboundedSender) -> Self { - ServerHandle(tx, None) - } - - /// Start server building process - pub fn build() -> ServerBuilder { - ServerBuilder::default() - } - - pub(crate) fn worker_faulted(&self, idx: usize) { - let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); - } - - /// Pause accepting incoming connections - /// - /// If socket contains some pending connection, they might be dropped. - /// All opened connection remains active. - pub fn pause(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Pause(tx)); - async { - let _ = rx.await; - } - } - - /// Resume accepting incoming connections - pub fn resume(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Resume(tx)); - async { - let _ = rx.await; - } - } - - /// Stop incoming connection processing, stop all workers and exit. - /// - /// If server starts with `spawn()` method, then spawned thread get terminated. - pub fn stop(&self, graceful: bool) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Stop { - graceful, - completion: Some(tx), - }); - async { - let _ = rx.await; - } - } -} - -impl Clone for ServerHandle { - fn clone(&self) -> Self { - Self(self.0.clone(), None) - } -} - -impl Future for ServerHandle { - type Output = io::Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - if this.1.is_none() { - let (tx, rx) = oneshot::channel(); - if this.0.send(ServerCommand::Notify(tx)).is_err() { - return Poll::Ready(Ok(())); - } - this.1 = Some(rx); - } - - Pin::new(this.1.as_mut().unwrap()).poll(cx).map(|_| Ok(())) - } -}