diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 5e2b99ad..3529fab8 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -88,15 +88,9 @@ async fn run() -> io::Result<()> { .await } -fn main() -> io::Result<()> { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - let ls = tokio::task::LocalSet::new(); - rt.block_on(ls.run_until(run()))?; - +#[tokio::main] +async fn main() -> io::Result<()> { + run().await?; Ok(()) } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 07192f45..1f2ecafa 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,17 +1,20 @@ use std::time::Duration; use std::{io, thread}; -use actix_rt::{ - time::{sleep, Instant}, - System, -}; +use actix_rt::time::Instant; +use actix_rt::{time::sleep, System}; use log::{debug, error, info}; use mio::{Interest, Poll, Token as MioToken}; -use crate::server::ServerHandle; -use crate::socket::MioListener; -use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; -use crate::worker::{Conn, WorkerHandleAccept}; +use crate::worker::ServerWorker; +use crate::{ + availability::Availability, + server::ServerHandle, + socket::MioListener, + waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}, + worker::{Conn, WorkerHandleAccept, WorkerHandleServer}, + ServerBuilder, +}; struct ServerSocketInfo { token: usize, @@ -20,59 +23,13 @@ struct ServerSocketInfo { /// Timeout is used to mark the deadline when this socket's listener should be registered again /// after an error. - timeout: Option, -} - -/// Accept loop would live with `ServerBuilder`. -/// -/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to -/// `Accept` and `Worker`. -/// -/// It would also listen to `ServerCommand` and push interests to `WakerQueue`. -pub(crate) struct AcceptLoop { - srv: Option, - poll: Option, - waker: WakerQueue, -} - -impl AcceptLoop { - pub fn new(srv: ServerHandle) -> Self { - let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); - let waker = WakerQueue::new(poll.registry()) - .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); - - Self { - srv: Some(srv), - poll: Some(poll), - waker, - } - } - - pub(crate) fn waker_owned(&self) -> WakerQueue { - self.waker.clone() - } - - pub fn wake(&self, i: WakerInterest) { - self.waker.wake(i); - } - - pub(crate) fn start( - &mut self, - socks: Vec<(usize, MioListener)>, - handles: Vec, - ) { - let srv = self.srv.take().expect("Can not re-use AcceptInfo"); - let poll = self.poll.take().unwrap(); - let waker = self.waker.clone(); - - Accept::start(poll, waker, socks, srv, handles).expect("accept failed to start"); - } + timeout: Option, } /// poll instance of the server. -struct Accept { +pub(crate) struct Accept { poll: Poll, - waker: WakerQueue, + waker_queue: WakerQueue, handles: Vec, srv: ServerHandle, next: usize, @@ -80,111 +37,58 @@ struct Accept { paused: bool, } -/// Array of u128 with every bit as marker for a worker handle's availability. -#[derive(Debug, Default)] -struct Availability([u128; 4]); - -impl Availability { - /// Check if any worker handle is available - #[inline(always)] - fn available(&self) -> bool { - self.0.iter().any(|a| *a != 0) - } - - /// Check if worker handle is available by index - #[inline(always)] - fn get_available(&self, idx: usize) -> bool { - let (offset, idx) = Self::offset(idx); - - self.0[offset] & (1 << idx as u128) != 0 - } - - /// Set worker handle available state by index. - fn set_available(&mut self, idx: usize, avail: bool) { - let (offset, idx) = Self::offset(idx); - - let off = 1 << idx as u128; - if avail { - self.0[offset] |= off; - } else { - self.0[offset] &= !off - } - } - - /// Set all worker handle to available state. - /// This would result in a re-check on all workers' availability. - fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { - handles.iter().for_each(|handle| { - self.set_available(handle.idx(), true); - }) - } - - /// Get offset and adjusted index of given worker handle index. - fn offset(idx: usize) -> (usize, usize) { - if idx < 128 { - (0, idx) - } else if idx < 128 * 2 { - (1, idx - 128) - } else if idx < 128 * 3 { - (2, idx - 128 * 2) - } else if idx < 128 * 4 { - (3, idx - 128 * 3) - } else { - panic!("Max WorkerHandle count is 512") - } - } -} - -/// This function defines errors that are per-connection. Which basically -/// means that if we get this error from `accept()` system call it means -/// next connection might be ready to be accepted. -/// -/// All other errors will incur a timeout before next `accept()` is performed. -/// The timeout is useful to handle resource exhaustion errors like ENFILE -/// and EMFILE. Otherwise, could enter into tight loop. -fn connection_error(e: &io::Error) -> bool { - e.kind() == io::ErrorKind::ConnectionRefused - || e.kind() == io::ErrorKind::ConnectionAborted - || e.kind() == io::ErrorKind::ConnectionReset -} - impl Accept { pub(crate) fn start( - poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, - srv: ServerHandle, - handles: Vec, - ) -> io::Result<()> { - // Accept runs in its own thread and might spawn additional futures to current system - let sys = System::try_current(); + sockets: Vec<(usize, MioListener)>, + builder: &ServerBuilder, + ) -> io::Result<(WakerQueue, Vec)> { + let handle_server = ServerHandle::new(builder.cmd_tx.clone()); - let (mut accept, mut sockets) = - Accept::new_with_sockets(poll, waker, socks, handles, srv)?; + // construct poll instance and its waker + let poll = Poll::new()?; + let waker_queue = WakerQueue::new(poll.registry())?; + + // start workers and collect handles + let (handles_accept, handles_server) = (0..builder.threads) + .map(|idx| { + // clone service factories + let factories = builder + .factories + .iter() + .map(|f| f.clone_factory()) + .collect::>(); + + // start worker using service factories + ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config) + }) + .collect::>>()? + .into_iter() + .unzip(); + + let (mut accept, mut sockets) = Accept::new_with_sockets( + poll, + waker_queue.clone(), + sockets, + handles_accept, + handle_server, + )?; thread::Builder::new() - .name("actix-server accept loop".to_owned()) - .spawn(move || { - // forward existing actix system context - if let Some(sys) = sys { - System::set_current(sys); - } + .name("actix-server acceptor".to_owned()) + .spawn(move || accept.poll_with(&mut sockets)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - accept.poll_with(&mut sockets); - }) - .unwrap(); - - Ok(()) + Ok((waker_queue, handles_server)) } fn new_with_sockets( poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, - handles: Vec, - srv: ServerHandle, + waker_queue: WakerQueue, + sockets: Vec<(usize, MioListener)>, + accept_handles: Vec, + server_handle: ServerHandle, ) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> { - let sockets = socks + let sockets = sockets .into_iter() .map(|(token, mut lst)| { // Start listening for incoming connections @@ -202,13 +106,13 @@ impl Accept { let mut avail = Availability::default(); // Assume all handles are avail at construct time. - avail.set_available_all(&handles); + avail.set_available_all(&accept_handles); let accept = Accept { poll, - waker, - handles, - srv, + waker_queue, + handles: accept_handles, + srv: server_handle, next: 0, avail, paused: false, @@ -217,8 +121,9 @@ impl Accept { Ok((accept, sockets)) } + /// blocking wait for readiness events triggered by mio fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { - let mut events = mio::Events::with_capacity(128); + let mut events = mio::Events::with_capacity(256); loop { if let Err(e) = self.poll.poll(&mut events, None) { @@ -254,7 +159,7 @@ impl Accept { loop { // take guard with every iteration so no new interest can be added // until the current task is done. - let mut guard = self.waker.guard(); + let mut guard = self.waker_queue.guard(); match guard.pop_front() { // worker notify it becomes available. Some(WakerInterest::WorkerAvailable(idx)) => { @@ -325,6 +230,7 @@ impl Accept { fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { let now = Instant::now(); + sockets .iter_mut() // Only sockets that had an associated timeout were deregistered. @@ -387,12 +293,12 @@ impl Accept { fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { // This is a best effort implementation with following limitation: // - // Every ServerSocketInfo with associate timeout will be skipped and it's timeout - // is removed in the process. + // Every ServerSocketInfo with associated timeout will be skipped and it's timeout is + // removed in the process. // - // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short - // gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered - // before expected timing. + // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short gap + // (less than 500ms) would cause all timing out ServerSocketInfos be re-registered before + // expected timing. sockets .iter_mut() // Take all timeout. @@ -483,7 +389,7 @@ impl Accept { info.timeout = Some(Instant::now() + Duration::from_millis(500)); // after the sleep a Timer interest is sent to Accept Poll - let waker = self.waker.clone(); + let waker = self.waker_queue.clone(); match System::try_current() { Some(sys) => { @@ -539,67 +445,14 @@ impl Accept { } } -#[cfg(test)] -mod test { - use super::Availability; - - fn single(aval: &mut Availability, idx: usize) { - aval.set_available(idx, true); - assert!(aval.available()); - - aval.set_available(idx, true); - - aval.set_available(idx, false); - assert!(!aval.available()); - - aval.set_available(idx, false); - assert!(!aval.available()); - } - - fn multi(aval: &mut Availability, mut idx: Vec) { - idx.iter().for_each(|idx| aval.set_available(*idx, true)); - - assert!(aval.available()); - - while let Some(idx) = idx.pop() { - assert!(aval.available()); - aval.set_available(idx, false); - } - - assert!(!aval.available()); - } - - #[test] - fn availability() { - let mut aval = Availability::default(); - - single(&mut aval, 1); - single(&mut aval, 128); - single(&mut aval, 256); - single(&mut aval, 511); - - let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect(); - - multi(&mut aval, idx); - - multi(&mut aval, (0..511).collect()) - } - - #[test] - #[should_panic] - fn overflow() { - let mut aval = Availability::default(); - single(&mut aval, 512); - } - - #[test] - fn pin_point() { - let mut aval = Availability::default(); - - aval.set_available(438, true); - - aval.set_available(479, true); - - assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384)); - } +/// This function defines errors that are per-connection; if we get this error from the `accept()` +/// system call it means the next connection might be ready to be accepted. +/// +/// All other errors will incur a timeout before next `accept()` call is attempted. The timeout is +/// useful to handle resource exhaustion errors like `ENFILE` and `EMFILE`. Otherwise, it could +/// enter into a temporary spin loop. +fn connection_error(e: &io::Error) -> bool { + e.kind() == io::ErrorKind::ConnectionRefused + || e.kind() == io::ErrorKind::ConnectionAborted + || e.kind() == io::ErrorKind::ConnectionReset } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 36c79655..62ae57e4 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,43 +1,31 @@ -use std::{ - future::Future, - io, mem, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; +use std::{io, time::Duration}; -use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; -use log::{error, info, trace}; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver}, - oneshot, -}; +use actix_rt::net::TcpStream; +use log::trace; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use crate::accept::AcceptLoop; -use crate::join_all; -use crate::server::{ServerCommand, ServerHandle}; -use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; -use crate::signals::{Signal, Signals}; -use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; +use crate::{ + server::{ServerCommand, ServerHandle}, + service::{InternalServiceFactory, ServiceFactory, StreamNewService}, + socket::{ + MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, + }, + worker::ServerWorkerConfig, + Server, +}; /// Server builder pub struct ServerBuilder { - threads: usize, - token: usize, - backlog: u32, - handles: Vec<(usize, WorkerHandleServer)>, - services: Vec>, - sockets: Vec<(usize, String, MioListener)>, - accept: AcceptLoop, - exit: bool, - no_signals: bool, - cmd: UnboundedReceiver, - server: ServerHandle, - notify: Vec>, - worker_config: ServerWorkerConfig, + pub(super) threads: usize, + pub(super) token: usize, + pub(super) backlog: u32, + pub(super) factories: Vec>, + pub(super) sockets: Vec<(usize, String, MioListener)>, + pub(super) exit: bool, + pub(super) listen_os_signals: bool, + pub(super) cmd_tx: UnboundedSender, + pub(super) cmd_rx: UnboundedReceiver, + pub(super) worker_config: ServerWorkerConfig, } impl Default for ServerBuilder { @@ -50,21 +38,18 @@ impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { let (tx, rx) = unbounded_channel(); - let server = ServerHandle::new(tx); + let _server = ServerHandle::new(tx.clone()); ServerBuilder { threads: num_cpus::get(), token: 0, - handles: Vec::new(), - services: Vec::new(), + factories: Vec::new(), sockets: Vec::new(), - accept: AcceptLoop::new(server.clone()), backlog: 2048, exit: false, - no_signals: false, - cmd: rx, - notify: Vec::new(), - server, + listen_os_signals: true, + cmd_tx: tx, + cmd_rx: rx, worker_config: ServerWorkerConfig::default(), } } @@ -128,15 +113,16 @@ impl ServerBuilder { self.max_concurrent_connections(num) } + // TODO: wtf is this for /// Stop Actix system. pub fn system_exit(mut self) -> Self { self.exit = true; self } - /// Disable signal handling. + /// Disable OS signal handling. pub fn disable_signals(mut self) -> Self { - self.no_signals = true; + self.listen_os_signals = false; self } @@ -164,7 +150,7 @@ impl ServerBuilder { for lst in sockets { let token = self.next_token(); - self.services.push(StreamNewService::create( + self.factories.push(StreamNewService::create( name.as_ref().to_string(), token, factory.clone(), @@ -215,7 +201,7 @@ impl ServerBuilder { lst.set_nonblocking(true)?; let token = self.next_token(); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); - self.services.push(StreamNewService::create( + self.factories.push(StreamNewService::create( name.as_ref().to_string(), token, factory, @@ -240,7 +226,7 @@ impl ServerBuilder { let addr = lst.local_addr()?; let token = self.next_token(); - self.services.push(StreamNewService::create( + self.factories.push(StreamNewService::create( name.as_ref().to_string(), token, factory, @@ -254,177 +240,11 @@ impl ServerBuilder { } /// Starts processing incoming connections and return server controller. - pub fn run(mut self) -> ServerHandle { + pub fn run(self) -> Server { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { - trace!("start running server"); - - for (_, name, lst) in &self.sockets { - info!( - r#"Starting service: "{}", workers: {}, listening on: {}"#, - name, - self.threads, - lst.local_addr() - ); - } - - trace!("run server"); - - // start workers - let handles = (0..self.threads) - .map(|idx| { - let (handle_accept, handle_server) = - self.start_worker(idx, self.accept.waker_owned()); - self.handles.push((idx, handle_server)); - - handle_accept - }) - .collect(); - - // start accept thread - self.accept.start( - mem::take(&mut self.sockets) - .into_iter() - .map(|t| (t.0, t.2)) - .collect(), - handles, - ); - - // handle signals - if !self.no_signals { - Signals::start(self.server.clone()); - } - - // start http server actor - let server = self.server.clone(); - rt::spawn(self); - server - } - } - - fn start_worker( - &self, - idx: usize, - waker_queue: WakerQueue, - ) -> (WorkerHandleAccept, WorkerHandleServer) { - trace!("start server worker {}", idx); - let services = self.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, services, waker_queue, self.worker_config) - } - - fn handle_cmd(&mut self, item: ServerCommand) { - match item { - ServerCommand::Pause(tx) => { - self.accept.wake(WakerInterest::Pause); - let _ = tx.send(()); - } - ServerCommand::Resume(tx) => { - self.accept.wake(WakerInterest::Resume); - let _ = tx.send(()); - } - ServerCommand::Signal(sig) => { - // Signals support - // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system - match sig { - Signal::Int => { - info!("SIGINT received; starting forced shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - - Signal::Term => { - info!("SIGTERM received; starting graceful shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: true, - completion: None, - }) - } - - Signal::Quit => { - info!("SIGQUIT received; starting forced shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - } - } - ServerCommand::Notify(tx) => { - self.notify.push(tx); - } - ServerCommand::Stop { - graceful, - completion, - } => { - let exit = self.exit; - - // stop accept thread - self.accept.wake(WakerInterest::Stop); - let notify = std::mem::take(&mut self.notify); - - // stop workers - let stop = self - .handles - .iter() - .map(move |worker| worker.1.stop(graceful)) - .collect(); - - rt::spawn(async move { - if graceful { - // wait for all workers to shut down - let _ = join_all(stop).await; - } - - if let Some(tx) = completion { - let _ = tx.send(()); - } - - for tx in notify { - let _ = tx.send(()); - } - - if exit { - sleep(Duration::from_millis(300)).await; - System::try_current().as_ref().map(System::stop); - } - }); - } - ServerCommand::WorkerFaulted(idx) => { - let mut found = false; - for i in 0..self.handles.len() { - if self.handles[i].0 == idx { - self.handles.swap_remove(i); - found = true; - break; - } - } - - if found { - error!("Worker {} has died; restarting", idx); - - let mut new_idx = self.handles.len(); - 'found: loop { - for i in 0..self.handles.len() { - if self.handles[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let (handle_accept, handle_server) = - self.start_worker(new_idx, self.accept.waker_owned()); - self.handles.push((new_idx, handle_server)); - self.accept.wake(WakerInterest::Worker(handle_accept)); - } - } + Server::new(self) } } @@ -435,19 +255,6 @@ impl ServerBuilder { } } -impl Future for ServerBuilder { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match Pin::new(&mut self.cmd).poll_recv(cx) { - Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), - _ => return Poll::Pending, - } - } - } -} - pub(super) fn bind_addr( addr: S, backlog: u32, diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 5bfc8faf..d0c6de3e 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -5,6 +5,7 @@ #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] mod accept; +mod availability; mod builder; mod server; mod service; diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 46ffb3cd..633bf1c0 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -1,64 +1,170 @@ use std::future::Future; -use std::io; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; +use std::{io, mem}; -use tokio::sync::mpsc::UnboundedSender; +use actix_rt::time::sleep; +use actix_rt::System; +use futures_core::future::LocalBoxFuture; +use log::{error, info, trace}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; +use crate::accept::Accept; use crate::builder::ServerBuilder; -use crate::signals::Signal; +use crate::join_all; +use crate::service::InternalServiceFactory; +use crate::signals::{Signal, Signals}; +use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}; #[derive(Debug)] pub(crate) enum ServerCommand { WorkerFaulted(usize), Pause(oneshot::Sender<()>), Resume(oneshot::Sender<()>), - Signal(Signal), Stop { /// True if shut down should be graceful. graceful: bool, completion: Option>, }, - /// Notify of server stop - Notify(oneshot::Sender<()>), } -#[derive(Debug)] -#[non_exhaustive] -pub struct Server; +// TODO: docs + must use -impl Server { - /// Start server building process. - pub fn build() -> ServerBuilder { - ServerBuilder::default() - } -} - -/// Server handle. +/// Server /// /// # Shutdown Signals /// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a /// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown. /// /// A graceful shutdown will wait for all workers to stop first. -#[derive(Debug)] -pub struct ServerHandle( - UnboundedSender, - Option>, -); +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub enum Server { + Server(ServerInner), + Error(Option), +} -impl ServerHandle { - pub(crate) fn new(tx: UnboundedSender) -> Self { - ServerHandle(tx, None) +impl Server { + /// Start server building process. + pub fn build() -> ServerBuilder { + ServerBuilder::default() } - pub(crate) fn signal(&self, sig: Signal) { - let _ = self.0.send(ServerCommand::Signal(sig)); + pub(crate) fn new(mut builder: ServerBuilder) -> Self { + trace!("start running server"); + + let sockets = mem::take(&mut builder.sockets) + .into_iter() + .map(|t| (t.0, t.2)) + .collect(); + + // Give log information on what runtime will be used. + let is_tokio = tokio::runtime::Handle::try_current().is_ok(); + let is_actix = actix_rt::System::try_current().is_some(); + + match (is_tokio, is_actix) { + (true, false) => info!("Tokio runtime found. Starting in existing Tokio runtime"), + (_, true) => info!("Actix runtime found. Starting in Actix runtime"), + (_, _) => info!( + "Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime" + ), + } + + for (_, name, lst) in &builder.sockets { + info!( + r#"Starting service: "{}", workers: {}, listening on: {}"#, + name, + builder.threads, + lst.local_addr() + ); + } + + trace!("run server"); + + match Accept::start(sockets, &builder) { + Ok((waker_queue, worker_handles)) => { + // construct OS signals listener future + let signals = (!builder.listen_os_signals).then(Signals::new); + + Self::Server(ServerInner { + cmd_tx: builder.cmd_tx.clone(), + cmd_rx: builder.cmd_rx, + signals, + waker_queue, + worker_handles, + worker_config: builder.worker_config, + services: builder.factories, + exit: builder.exit, + stop_task: None, + }) + } + + Err(err) => Self::Error(Some(err)), + } + } + + pub fn handle(&self) -> ServerHandle { + match self { + Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()), + Server::Error(err) => { + // TODO: i don't think this is the best way to handle server startup fail + panic!( + "server handle can not be obtained because server failed to start up: {:?}", + err + ); + } + } + } +} + +impl Future for Server { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().get_mut() { + Server::Error(err) => Poll::Ready(Err(err + .take() + .expect("Server future cannot be polled after error"))), + + Server::Server(inner) => { + // poll Signals + if let Some(ref mut signals) = inner.signals { + if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { + inner.stop_task = inner.handle_signal(signal); + // drop signals listener + inner.signals = None; + } + } + + // eager drain command channel and handle command + loop { + match Pin::new(&mut inner.cmd_rx).poll_recv(cx) { + Poll::Ready(Some(cmd)) => { + inner.stop_task = inner.handle_cmd(cmd); + } + _ => return Poll::Pending, + } + } + } + } + } +} + +/// Server handle. +#[derive(Debug, Clone)] +pub struct ServerHandle { + tx_cmd: UnboundedSender, +} + +impl ServerHandle { + pub(crate) fn new(tx_cmd: UnboundedSender) -> Self { + ServerHandle { tx_cmd } } pub(crate) fn worker_faulted(&self, idx: usize) { - let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); + let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx)); } /// Pause accepting incoming connections @@ -67,7 +173,7 @@ impl ServerHandle { /// All opened connection remains active. pub fn pause(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Pause(tx)); + let _ = self.tx_cmd.send(ServerCommand::Pause(tx)); async { let _ = rx.await; } @@ -76,7 +182,7 @@ impl ServerHandle { /// Resume accepting incoming connections pub fn resume(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Resume(tx)); + let _ = self.tx_cmd.send(ServerCommand::Resume(tx)); async { let _ = rx.await; } @@ -87,7 +193,7 @@ impl ServerHandle { /// If server starts with `spawn()` method, then spawned thread get terminated. pub fn stop(&self, graceful: bool) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Stop { + let _ = self.tx_cmd.send(ServerCommand::Stop { graceful, completion: Some(tx), }); @@ -97,29 +203,129 @@ impl ServerHandle { } } -impl Clone for ServerHandle { - fn clone(&self) -> Self { - Self(self.0.clone(), None) - } +pub struct ServerInner { + worker_handles: Vec, + worker_config: ServerWorkerConfig, + services: Vec>, + exit: bool, + cmd_tx: UnboundedSender, + cmd_rx: UnboundedReceiver, + signals: Option, + waker_queue: WakerQueue, + stop_task: Option>, } -impl Future for ServerHandle { - type Output = io::Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - if this.1.is_none() { - let (tx, rx) = oneshot::channel(); - if this.0.send(ServerCommand::Notify(tx)).is_err() { - return Poll::Ready(Ok(())); +impl ServerInner { + fn handle_cmd(&mut self, item: ServerCommand) -> Option> { + match item { + ServerCommand::Pause(tx) => { + self.waker_queue.wake(WakerInterest::Pause); + let _ = tx.send(()); + None } - this.1 = Some(rx); - } - match Pin::new(this.1.as_mut().unwrap()).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => Poll::Ready(Ok(())), + ServerCommand::Resume(tx) => { + self.waker_queue.wake(WakerInterest::Resume); + let _ = tx.send(()); + None + } + + ServerCommand::Stop { + graceful, + completion, + } => { + let exit = self.exit; + + // stop accept thread + self.waker_queue.wake(WakerInterest::Stop); + + // stop workers + let stop = self + .worker_handles + .iter() + .map(|worker| worker.stop(graceful)) + .collect::>(); + + Some(Box::pin(async move { + if graceful { + // wait for all workers to shut down + let _ = join_all(stop).await; + } + + if let Some(tx) = completion { + let _ = tx.send(()); + } + + if exit { + sleep(Duration::from_millis(300)).await; + System::try_current().as_ref().map(System::stop); + } + })) + } + + ServerCommand::WorkerFaulted(idx) => { + // TODO: maybe just return if not found ? + assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx)); + + error!("Worker {} has died; restarting", idx); + + let factories = self + .services + .iter() + .map(|service| service.clone_factory()) + .collect(); + + match ServerWorker::start( + idx, + factories, + self.waker_queue.clone(), + self.worker_config, + ) { + Ok((handle_accept, handle_server)) => { + *self + .worker_handles + .iter_mut() + .find(|wrk| wrk.idx == idx) + .unwrap() = handle_server; + + self.waker_queue.wake(WakerInterest::Worker(handle_accept)); + } + Err(_) => todo!(), + }; + + None + } + } + } + + fn handle_signal(&mut self, signal: Signal) -> Option> { + match signal { + Signal::Int => { + info!("SIGINT received; starting forced shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + + Signal::Term => { + info!("SIGTERM received; starting graceful shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: true, + completion: None, + }) + } + + Signal::Quit => { + info!("SIGQUIT received; starting forced shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } } } } diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index b80fa759..00821808 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -2,11 +2,9 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use crate::server::ServerHandle; - /// Types of process signals. -#[allow(dead_code)] -#[derive(PartialEq, Clone, Copy, Debug)] +// #[allow(dead_code)] +#[derive(Debug, Clone, Copy, PartialEq)] pub(crate) enum Signal { /// `SIGINT` Int, @@ -20,8 +18,6 @@ pub(crate) enum Signal { /// Process signal listener. pub(crate) struct Signals { - srv: ServerHandle, - #[cfg(not(unix))] signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>, @@ -30,14 +26,13 @@ pub(crate) struct Signals { } impl Signals { - /// Spawns a signal listening future that is able to send commands to the `Server`. - pub(crate) fn start(srv: ServerHandle) { + /// Constructs an OS signal listening future. + pub(crate) fn new() -> Self { #[cfg(not(unix))] { - actix_rt::spawn(Signals { - srv, + Signals { signals: Box::pin(actix_rt::signal::ctrl_c()), - }); + } } #[cfg(unix)] @@ -66,33 +61,29 @@ impl Signals { }) .collect::>(); - actix_rt::spawn(Signals { srv, signals }); + Signals { signals } } } } impl Future for Signals { - type Output = (); + type Output = Signal; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { #[cfg(not(unix))] - match self.signals.as_mut().poll(cx) { - Poll::Ready(_) => { - self.srv.signal(Signal::Int); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, + { + self.signals.as_mut().poll(cx).map(|_| Signal::Int) } #[cfg(unix)] { for (sig, fut) in self.signals.iter_mut() { + // TODO: match on if let Some ? if Pin::new(fut).poll_recv(cx).is_ready() { - let sig = *sig; - self.srv.signal(sig); - return Poll::Ready(()); + return Poll::Ready(*sig); } } + Poll::Pending } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 39ae7914..24f79e60 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,6 +1,6 @@ use std::{ future::Future, - mem, + io, mem, pin::Pin, rc::Rc, sync::{ @@ -43,19 +43,20 @@ pub(crate) struct Conn { pub token: usize, } +/// fn handle_pair( idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, + tx_conn: UnboundedSender, + tx_stop: UnboundedSender, counter: Counter, ) -> (WorkerHandleAccept, WorkerHandleServer) { let accept = WorkerHandleAccept { idx, - tx: tx1, + tx_conn, counter, }; - let server = WorkerHandleServer { idx, tx: tx2 }; + let server = WorkerHandleServer { idx, tx_stop }; (accept, server) } @@ -151,13 +152,13 @@ impl Drop for WorkerCounterGuard { } } -/// Handle to worker that can send connection message to worker and share the -/// availability of worker to other thread. +/// Handle to worker that can send connection message to worker and share the availability of worker +/// to other threads. /// /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { idx: usize, - tx: UnboundedSender, + tx_conn: UnboundedSender, counter: Counter, } @@ -168,8 +169,8 @@ impl WorkerHandleAccept { } #[inline(always)] - pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { - self.tx.send(msg).map_err(|msg| msg.0) + pub(crate) fn send(&self, conn: Conn) -> Result<(), Conn> { + self.tx_conn.send(conn).map_err(|msg| msg.0) } #[inline(always)] @@ -183,15 +184,14 @@ impl WorkerHandleAccept { /// Held by [ServerBuilder](crate::builder::ServerBuilder). #[derive(Debug)] pub(crate) struct WorkerHandleServer { - #[allow(dead_code)] - idx: usize, - tx: UnboundedSender, + pub(crate) idx: usize, + tx_stop: UnboundedSender, } impl WorkerHandleServer { pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); - let _ = self.tx.send(Stop { graceful, tx }); + let _ = self.tx_stop.send(Stop { graceful, tx }); rx } } @@ -274,7 +274,7 @@ impl ServerWorker { factories: Vec>, waker_queue: WakerQueue, config: ServerWorkerConfig, - ) -> (WorkerHandleAccept, WorkerHandleServer) { + ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { trace!("starting server worker {}", idx); let (tx1, rx) = unbounded_channel(); @@ -296,6 +296,8 @@ impl ServerWorker { // get actix system context if it is set let sys = System::try_current(); + // TODO: wait for server startup with sync channel + std::thread::Builder::new() .name("eofibef".to_owned()) .spawn(move || { @@ -339,6 +341,7 @@ impl ServerWorker { services }) .into_boxed_slice(), + Err(e) => { error!("Can not start worker: {:?}", e); Arbiter::try_current().as_ref().map(ArbiterHandle::stop); @@ -365,7 +368,7 @@ impl ServerWorker { }) .expect("worker thread error/panic"); - handle_pair(idx, tx1, tx2, counter) + Ok(handle_pair(idx, tx1, tx2, counter)) } fn restart_service(&mut self, idx: usize, factory_id: usize) { diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 5919438b..ee9790a5 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -30,7 +30,7 @@ fn test_bind() { })? .run(); - let _ = tx.send((srv.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); srv.await }) @@ -61,7 +61,7 @@ fn test_listen() { })? .run(); - let _ = tx.send((srv.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); srv.await }) @@ -103,7 +103,7 @@ fn test_start() { })? .run(); - let _ = tx.send((srv.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); srv.await }) @@ -166,7 +166,7 @@ async fn test_max_concurrent_connections() { let h = thread::spawn(move || { actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() // Set a relative higher backlog. .backlog(12) // max connection for a worker is 3. @@ -187,9 +187,9 @@ async fn test_max_concurrent_connections() { })? .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); - server.await + srv.await }) }); @@ -260,7 +260,7 @@ async fn test_service_restart() { let h = thread::spawn(move || { let num = num.clone(); actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() .backlog(1) .disable_signals() .bind("addr1", addr1, move || { @@ -280,12 +280,12 @@ async fn test_service_restart() { .workers(1) .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); - server.await + let _ = tx.send((srv.handle(), actix_rt::System::current())); + srv.await }) }); - let (server, sys) = rx.recv().unwrap(); + let (srv, sys) = rx.recv().unwrap(); for _ in 0..5 { TcpStream::connect(addr1) @@ -307,7 +307,7 @@ async fn test_service_restart() { assert!(num_clone.load(Ordering::SeqCst) > 5); assert!(num2_clone.load(Ordering::SeqCst) > 5); - let _ = server.stop(false); + let _ = srv.stop(false); sys.stop(); h.join().unwrap().unwrap(); } @@ -379,19 +379,19 @@ async fn worker_restart() { let h = thread::spawn(move || { let counter = counter.clone(); actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() .disable_signals() .bind("addr", addr, move || TestServiceFactory(counter.clone()))? .workers(2) .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); - server.await + srv.await }) }); - let (server, sys) = rx.recv().unwrap(); + let (srv, sys) = rx.recv().unwrap(); sleep(Duration::from_secs(3)).await; @@ -448,7 +448,7 @@ async fn worker_restart() { assert_eq!("3", id); stream.shutdown().await.unwrap(); - let _ = server.stop(false); + let _ = srv.stop(false); sys.stop(); h.join().unwrap().unwrap(); }