Server: run after await

This commit is contained in:
Ali MJ Al-Nasrawy 2021-12-03 23:44:31 +03:00
parent 6335921085
commit 9229c590d2
4 changed files with 99 additions and 99 deletions

View File

@ -1,6 +1,9 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* `Server` now runs only after awaiting it. [#425]
[#425]: https://github.com/actix/actix-net/pull/425
## 2.0.0-beta.9 - 2021-11-15 ## 2.0.0-beta.9 - 2021-11-15

View File

@ -31,7 +31,7 @@ log = "0.4"
mio = { version = "0.8", features = ["os-poll", "net"] } mio = { version = "0.8", features = ["os-poll", "net"] }
num_cpus = "1.13" num_cpus = "1.13"
socket2 = "0.4.2" socket2 = "0.4.2"
tokio = { version = "1.5.1", features = ["sync"] } tokio = { version = "1.5.1", features = ["sync", "macros"] }
# runtime for io-uring feature # runtime for io-uring feature
tokio-uring = { version = "0.1", optional = true } tokio-uring = { version = "0.1", optional = true }
@ -42,5 +42,5 @@ actix-rt = "2.4.0"
bytes = "1" bytes = "1"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] }
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }

View File

@ -9,10 +9,7 @@ use std::{
use actix_rt::{time::sleep, System}; use actix_rt::{time::sleep, System};
use futures_core::future::BoxFuture; use futures_core::future::BoxFuture;
use log::{error, info}; use log::{error, info};
use tokio::sync::{ use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use crate::{ use crate::{
accept::Accept, accept::Accept,
@ -120,7 +117,10 @@ pub(crate) enum ServerCommand {
/// } /// }
/// ``` /// ```
#[must_use = "futures do nothing unless you `.await` or poll them"] #[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Server(Result<ServerInner, Option<io::Error>>); pub struct Server {
handle: ServerHandle,
fut: BoxFuture<'static, io::Result<()>>,
}
impl Server { impl Server {
/// Create server build. /// Create server build.
@ -129,62 +129,26 @@ impl Server {
} }
pub(crate) fn new(builder: ServerBuilder) -> Self { pub(crate) fn new(builder: ServerBuilder) -> Self {
Server(ServerInner::new(builder).map_err(Some)) Server {
handle: ServerHandle::new(builder.cmd_tx.clone()),
fut: Box::pin(ServerInner::run(builder)),
}
} }
/// Get a handle for ServerFuture that can be used to change state of actix server. /// Get a handle for ServerFuture that can be used to change state of actix server.
/// ///
/// See [ServerHandle](ServerHandle) for usage. /// See [ServerHandle](ServerHandle) for usage.
pub fn handle(&self) -> ServerHandle { pub fn handle(&self) -> ServerHandle {
match &self.0 { self.handle.clone()
Ok(inner) => ServerHandle::new(inner.cmd_tx.clone()),
Err(err) => {
// TODO: i don't think this is the best way to handle server startup fail
panic!(
"server handle can not be obtained because server failed to start up: {}",
err.as_ref().unwrap()
);
}
}
} }
} }
impl Future for Server { impl Future for Server {
type Output = io::Result<()>; type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { #[inline]
match &mut self.as_mut().get_mut().0 { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Err(err) => Poll::Ready(Err(err Pin::new(&mut Pin::into_inner(self).fut).poll(cx)
.take()
.expect("Server future cannot be polled after error"))),
Ok(inner) => {
// poll Signals
if let Some(ref mut signals) = inner.signals {
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
inner.stop_task = inner.handle_signal(signal);
// drop signals listener
inner.signals = None;
}
}
// handle stop tasks and eager drain command channel
loop {
if let Some(ref mut fut) = inner.stop_task {
// only resolve stop task and exit
return fut.as_mut().poll(cx).map(|_| Ok(()));
}
match Pin::new(&mut inner.cmd_rx).poll_recv(cx) {
Poll::Ready(Some(cmd)) => {
// if stop task is required, set it and loop
inner.stop_task = inner.handle_cmd(cmd);
}
_ => return Poll::Pending,
}
}
}
}
} }
} }
@ -193,15 +157,35 @@ pub struct ServerInner {
worker_config: ServerWorkerConfig, worker_config: ServerWorkerConfig,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
exit: bool, exit: bool,
cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
signals: Option<Signals>,
waker_queue: WakerQueue, waker_queue: WakerQueue,
stop_task: Option<BoxFuture<'static, ()>>, stopped: bool,
} }
impl ServerInner { impl ServerInner {
fn new(mut builder: ServerBuilder) -> io::Result<Self> { async fn run(builder: ServerBuilder) -> io::Result<()> {
let (mut this, mut cmd_rx, mut signal_fut) = Self::run_sync(builder)?;
let listen_to_signals = signal_fut.is_some();
while !this.stopped {
tokio::select! {
signal = async {
signal_fut.as_mut().unwrap().await
}, if listen_to_signals => {
this.handle_signal(signal).await;
},
Some(cmd) = cmd_rx.recv() => {
this.handle_cmd(cmd).await;
},
else => break,
};
}
Ok(())
}
fn run_sync(
mut builder: ServerBuilder,
) -> io::Result<(Self, UnboundedReceiver<ServerCommand>, Option<Signals>)> {
let sockets = mem::take(&mut builder.sockets) let sockets = mem::take(&mut builder.sockets)
.into_iter() .into_iter()
.map(|t| (t.0, t.2)) .map(|t| (t.0, t.2))
@ -229,66 +213,59 @@ impl ServerInner {
let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?; let (waker_queue, worker_handles) = Accept::start(sockets, &builder)?;
// construct OS signals listener future // construct OS signals listener future
let signals = (builder.listen_os_signals).then(Signals::new); let signal_fut = (builder.listen_os_signals).then(Signals::new);
Ok(ServerInner { let server = ServerInner {
cmd_tx: builder.cmd_tx.clone(),
cmd_rx: builder.cmd_rx,
signals,
waker_queue, waker_queue,
worker_handles, worker_handles,
worker_config: builder.worker_config, worker_config: builder.worker_config,
services: builder.factories, services: builder.factories,
exit: builder.exit, exit: builder.exit,
stop_task: None, stopped: false,
}) };
Ok((server, builder.cmd_rx, signal_fut))
} }
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> { async fn handle_cmd(&mut self, item: ServerCommand) {
match item { match item {
ServerCommand::Pause(tx) => { ServerCommand::Pause(tx) => {
self.waker_queue.wake(WakerInterest::Pause); self.waker_queue.wake(WakerInterest::Pause);
let _ = tx.send(()); let _ = tx.send(());
None
} }
ServerCommand::Resume(tx) => { ServerCommand::Resume(tx) => {
self.waker_queue.wake(WakerInterest::Resume); self.waker_queue.wake(WakerInterest::Resume);
let _ = tx.send(()); let _ = tx.send(());
None
} }
ServerCommand::Stop { ServerCommand::Stop {
graceful, graceful,
completion, completion,
} => { } => {
let exit = self.exit; self.stopped = true;
// stop accept thread // stop accept thread
self.waker_queue.wake(WakerInterest::Stop); self.waker_queue.wake(WakerInterest::Stop);
// stop workers if graceful {
let workers_stop = self // wait for all workers to shut down
.worker_handles let workers_stop = self
.iter() .worker_handles
.map(|worker| worker.stop(graceful)) .iter()
.collect::<Vec<_>>(); .map(|worker| worker.stop(graceful))
.collect::<Vec<_>>();
let _ = join_all(workers_stop).await;
}
Some(Box::pin(async move { if let Some(tx) = completion {
if graceful { let _ = tx.send(());
// wait for all workers to shut down }
let _ = join_all(workers_stop).await;
}
if let Some(tx) = completion { if self.exit {
let _ = tx.send(()); sleep(Duration::from_millis(300)).await;
} System::try_current().as_ref().map(System::stop);
}
if exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
}))
} }
ServerCommand::WorkerFaulted(idx) => { ServerCommand::WorkerFaulted(idx) => {
@ -321,13 +298,11 @@ impl ServerInner {
Err(err) => error!("can not restart worker {}: {}", idx, err), Err(err) => error!("can not restart worker {}: {}", idx, err),
}; };
None
} }
} }
} }
fn handle_signal(&mut self, signal: SignalKind) -> Option<BoxFuture<'static, ()>> { async fn handle_signal(&mut self, signal: SignalKind) {
match signal { match signal {
SignalKind::Int => { SignalKind::Int => {
info!("SIGINT received; starting forced shutdown"); info!("SIGINT received; starting forced shutdown");
@ -336,6 +311,7 @@ impl ServerInner {
graceful: false, graceful: false,
completion: None, completion: None,
}) })
.await
} }
SignalKind::Term => { SignalKind::Term => {
@ -345,6 +321,7 @@ impl ServerInner {
graceful: true, graceful: true,
completion: None, completion: None,
}) })
.await
} }
SignalKind::Quit => { SignalKind::Quit => {
@ -354,6 +331,7 @@ impl ServerInner {
graceful: false, graceful: false,
completion: None, completion: None,
}) })
.await
} }
} }
} }

View File

@ -487,27 +487,46 @@ async fn worker_restart() {
} }
#[test] #[test]
#[should_panic] fn no_runtime_on_init() {
fn no_runtime() { use std::{thread::sleep, time::Duration};
// test set up in a way that would prevent time out if support for runtime-less init was added
let addr = unused_addr(); let addr = unused_addr();
let counter = Arc::new(AtomicUsize::new(0));
let srv = Server::build() let mut srv = Server::build()
.workers(1) .workers(2)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind("test", addr, {
fn_service(|_| async { Ok::<_, ()>(()) }) let counter = counter.clone();
move || {
counter.fetch_add(1, Ordering::SeqCst);
fn_service(|_| async { Ok::<_, ()>(()) })
}
}) })
.unwrap() .unwrap()
.run(); .run();
fn is_send<T: Send>(_: &T) {}
is_send(&srv);
is_send(&srv.handle());
sleep(Duration::from_millis(1_000));
assert_eq!(counter.load(Ordering::SeqCst), 0);
let rt = tokio::runtime::Builder::new_current_thread() let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build()
.unwrap(); .unwrap();
let _ = srv.handle().stop(true); rt.block_on(async move {
let _ = futures_util::poll!(&mut srv);
rt.block_on(async { srv.await }).unwrap(); // available after the first poll
sleep(Duration::from_millis(500));
assert_eq!(counter.load(Ordering::SeqCst), 2);
let _ = srv.handle().stop(true);
srv.await
})
.unwrap();
} }