From 42c53159daf12d5cd3299df99c5f91f7ce8790b4 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 13 Apr 2021 06:11:20 +0800 Subject: [PATCH] make server start panic free. All errors are caught and return as std::io::Error --- actix-server/src/accept.rs | 7 +- actix-server/src/server.rs | 184 ++++++++++++++++++------------------- actix-server/src/worker.rs | 2 +- 3 files changed, 92 insertions(+), 101 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 1d608719..16ac3407 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -60,7 +60,7 @@ impl Accept { pub(crate) fn start( sockets: Vec<(Token, MioListener)>, builder: &ServerBuilder, - ) -> io::Result<(WakerQueue, Vec<(usize, WorkerHandleServer)>)> { + ) -> io::Result<(WakerQueue, Vec)> { let server_handle = ServerHandle::new(builder.cmd_tx.clone()); // construct poll instance and it's waker @@ -73,9 +73,8 @@ impl Accept { // start workers let availability = WorkerAvailability::new(waker_queue.clone()); let factories = builder.services.iter().map(|v| v.clone_factory()).collect(); - let (handle_accept, handle_server) = - ServerWorker::start(idx, factories, availability, builder.worker_config)?; - Ok((handle_accept, (idx, handle_server))) + + ServerWorker::start(idx, factories, availability, builder.worker_config) }) .collect::, io::Error>>()? .into_iter() diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index d7987d56..99e4385e 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -23,12 +23,16 @@ use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, Worker /// When awaited or spawned would listen to signal and message from [ServerHandle](ServerHandle). #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct Server { +pub enum Server { + Server(ServerInner), + Error(Option), +} + +pub struct ServerInner { cmd_tx: UnboundedSender, cmd_rx: UnboundedReceiver, - handles: Vec<(usize, WorkerHandleServer)>, + handles: Vec, services: Vec>, - notify: Vec>, exit: bool, worker_config: ServerWorkerConfig, signals: Option, @@ -52,32 +56,33 @@ impl Server { .collect(); // start accept thread. return waker_queue and worker handles. - let (waker_queue, handles) = Accept::start(sockets, &builder) - // TODO: include error to Server type and poll return it in Future. - .unwrap_or_else(|e| panic!("Can not start Accept: {}", e)); + match Accept::start(sockets, &builder) { + Ok((waker_queue, handles)) => { + // construct signals future. + let signals = if !builder.no_signals { + // Check tokio runtime. + if tokio::runtime::Handle::try_current().is_err() { + let err = io::Error::new(io::ErrorKind::Other, "there is no reactor running. Please enable ServerBuilder::disable_signals when start server in non tokio 1.x runtime."); + return Self::Error(Some(err)); + } + Some(Signals::new()) + } else { + None + }; - // construct signals future. - let signals = if !builder.no_signals { - // Check tokio runtime. - tokio::runtime::Handle::try_current() - .map(|_|()) - .expect("there is no reactor running. Please enable ServerBuilder::disable_signals when start server in non tokio 1.x runtime."); - Some(Signals::new()) - } else { - None - }; - - Self { - cmd_tx: builder.cmd_tx, - cmd_rx: builder.cmd_rx, - handles, - services: builder.services, - notify: Vec::new(), - exit: builder.exit, - worker_config: builder.worker_config, - signals, - on_stop_task: None, - waker_queue, + Self::Server(ServerInner { + cmd_tx: builder.cmd_tx, + cmd_rx: builder.cmd_rx, + handles, + services: builder.services, + exit: builder.exit, + worker_config: builder.worker_config, + signals, + on_stop_task: None, + waker_queue, + }) + } + Err(e) => Self::Error(Some(e)), } } @@ -85,9 +90,17 @@ impl Server { /// /// See [ServerHandle](ServerHandle) for usage. pub fn handle(&self) -> ServerHandle { - ServerHandle::new(self.cmd_tx.clone()) + match self { + Self::Server(ref inner) => ServerHandle::new(inner.cmd_tx.clone()), + Self::Error(err) => panic!( + "ServerHandle can not be obtained. Server failed to start due to error: {:?}", + err.as_ref().unwrap() + ), + } } +} +impl ServerInner { fn handle_cmd(&mut self, item: ServerCommand) -> Option> { match item { ServerCommand::Pause(tx) => { @@ -139,14 +152,13 @@ impl Server { // stop accept thread self.waker_queue.wake(WakerInterest::Stop); - let notify = std::mem::take(&mut self.notify); // stop workers if !self.handles.is_empty() && graceful { let iter = self .handles .iter() - .map(move |worker| worker.1.stop(graceful)) + .map(move |worker| worker.stop(graceful)) .collect::>(); // TODO: this async block can return io::Error. @@ -157,9 +169,6 @@ impl Server { if let Some(tx) = completion { let _ = tx.send(()); } - for tx in notify { - let _ = tx.send(()); - } if exit { sleep(Duration::from_millis(300)).await; System::try_current().as_ref().map(System::stop); @@ -176,52 +185,32 @@ impl Server { if let Some(tx) = completion { let _ = tx.send(()); } - for tx in notify { - let _ = tx.send(()); - } })) } } ServerCommand::WorkerFaulted(idx) => { - let mut found = false; - for i in 0..self.handles.len() { - if self.handles[i].0 == idx { - self.handles.swap_remove(i); - found = true; - break; - } - } - - if found { - error!("Worker has died {:?}, restarting", idx); - - let mut new_idx = self.handles.len(); - 'found: loop { - for i in 0..self.handles.len() { - if self.handles[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let availability = WorkerAvailability::new(self.waker_queue.clone()); - let factories = self.services.iter().map(|v| v.clone_factory()).collect(); - let res = ServerWorker::start( - new_idx, - factories, - availability, - self.worker_config, - ); - - match res { - Ok((handle_accept, handle_server)) => { - self.handles.push((new_idx, handle_server)); - self.waker_queue.wake(WakerInterest::Worker(handle_accept)); - } - Err(e) => error!("Can not start worker: {:?}", e), + assert!(self.handles.iter().any(|handle| handle.idx == idx)); + + error!("Worker {} has died, restarting", idx); + + let availability = WorkerAvailability::new(self.waker_queue.clone()); + let factories = self + .services + .iter() + .map(|service| service.clone_factory()) + .collect(); + let res = ServerWorker::start(idx, factories, availability, self.worker_config); + + match res { + Ok((handle_accept, handle_server)) => { + *self + .handles + .iter_mut() + .find(|handle| handle.idx == idx) + .unwrap() = handle_server; + self.waker_queue.wake(WakerInterest::Worker(handle_accept)); } + Err(e) => error!("Can not start worker: {:?}", e), } None @@ -234,28 +223,31 @@ impl Future for Server { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().get_mut(); - - // poll signals first. remove it on resolve. - if let Some(ref mut signals) = this.signals { - if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { - this.on_stop_task = this.handle_cmd(ServerCommand::Signal(signal)); - this.signals = None; - } - } - - // actively poll command channel and handle command. - loop { - // got on stop task. resolve it exclusively and exit. - if let Some(ref mut fut) = this.on_stop_task { - return fut.as_mut().poll(cx).map(|_| Ok(())); - } - - match Pin::new(&mut this.cmd_rx).poll_recv(cx) { - Poll::Ready(Some(it)) => { - this.on_stop_task = this.handle_cmd(it); + match self.as_mut().get_mut() { + Self::Error(e) => Poll::Ready(Err(e.take().unwrap())), + Self::Server(inner) => { + // poll signals first. remove it on resolve. + if let Some(ref mut signals) = inner.signals { + if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { + inner.on_stop_task = inner.handle_cmd(ServerCommand::Signal(signal)); + inner.signals = None; + } + } + + // actively poll command channel and handle command. + loop { + // got on stop task. resolve it exclusively and exit. + if let Some(ref mut fut) = inner.on_stop_task { + return fut.as_mut().poll(cx).map(|_| Ok(())); + } + + match Pin::new(&mut inner.cmd_rx).poll_recv(cx) { + Poll::Ready(Some(it)) => { + inner.on_stop_task = inner.handle_cmd(it); + } + _ => return Poll::Pending, + } } - _ => return Poll::Pending, } } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index fa5d22dc..2a1d9316 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -200,10 +200,10 @@ impl ServerWorker { config: ServerWorkerConfig, ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { assert!(!availability.available()); + let avail = availability.clone(); let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); - let avail = availability.clone(); // Try to get actix system when have one. let system = System::try_current();