mirror of https://github.com/fafhrd91/actix-net
remove builder and introduce worker handle
This commit is contained in:
parent
1b35ff8ee6
commit
ce627bf932
|
@ -7,7 +7,7 @@
|
||||||
use proc_macro::TokenStream;
|
use proc_macro::TokenStream;
|
||||||
use quote::quote;
|
use quote::quote;
|
||||||
|
|
||||||
/// Marks async function to be executed by actix system.
|
/// Marks async function to be executed by Actix system.
|
||||||
///
|
///
|
||||||
/// ## Usage
|
/// ## Usage
|
||||||
///
|
///
|
||||||
|
@ -26,7 +26,6 @@ pub fn main(_: TokenStream, item: TokenStream) -> TokenStream {
|
||||||
let vis = &input.vis;
|
let vis = &input.vis;
|
||||||
let sig = &mut input.sig;
|
let sig = &mut input.sig;
|
||||||
let body = &input.block;
|
let body = &input.block;
|
||||||
let name = &sig.ident;
|
|
||||||
|
|
||||||
if sig.asyncness.is_none() {
|
if sig.asyncness.is_none() {
|
||||||
return syn::Error::new_spanned(sig.fn_token, "only async fn is supported")
|
return syn::Error::new_spanned(sig.fn_token, "only async fn is supported")
|
||||||
|
@ -39,14 +38,14 @@ pub fn main(_: TokenStream, item: TokenStream) -> TokenStream {
|
||||||
(quote! {
|
(quote! {
|
||||||
#(#attrs)*
|
#(#attrs)*
|
||||||
#vis #sig {
|
#vis #sig {
|
||||||
actix_rt::System::new(stringify!(#name))
|
actix_rt::System::new()
|
||||||
.block_on(async move { #body })
|
.block_on(async move { #body })
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.into()
|
.into()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Marks async test function to be executed by actix runtime.
|
/// Marks async test function to be executed by Actix system.
|
||||||
///
|
///
|
||||||
/// ## Usage
|
/// ## Usage
|
||||||
///
|
///
|
||||||
|
@ -86,7 +85,7 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream {
|
||||||
quote! {
|
quote! {
|
||||||
#(#attrs)*
|
#(#attrs)*
|
||||||
#vis #sig {
|
#vis #sig {
|
||||||
actix_rt::System::new("test")
|
actix_rt::System::new()
|
||||||
.block_on(async { #body })
|
.block_on(async { #body })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -95,7 +94,7 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream {
|
||||||
#[test]
|
#[test]
|
||||||
#(#attrs)*
|
#(#attrs)*
|
||||||
#vis #sig {
|
#vis #sig {
|
||||||
actix_rt::System::new("test")
|
actix_rt::System::new()
|
||||||
.block_on(async { #body })
|
.block_on(async { #body })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,114 +0,0 @@
|
||||||
use std::{borrow::Cow, future::Future, io};
|
|
||||||
|
|
||||||
use tokio::sync::{
|
|
||||||
mpsc::unbounded_channel,
|
|
||||||
oneshot::{channel, Receiver},
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
runtime::Runtime,
|
|
||||||
system::{System, SystemWorker},
|
|
||||||
worker::Worker,
|
|
||||||
};
|
|
||||||
|
|
||||||
/// System builder.
|
|
||||||
///
|
|
||||||
/// Either use `Builder::build` to create a system and start actors. Alternatively, use
|
|
||||||
/// `Builder::run` to start the Tokio runtime and run a function in its context.
|
|
||||||
pub struct Builder {
|
|
||||||
/// Name of the System. Defaults to "actix-rt" if unset.
|
|
||||||
name: Cow<'static, str>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Builder {
|
|
||||||
pub(crate) fn new() -> Self {
|
|
||||||
Builder {
|
|
||||||
name: Cow::Borrowed("actix-rt"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Sets the name of the System.
|
|
||||||
pub fn name(mut self, name: impl Into<Cow<'static, str>>) -> Self {
|
|
||||||
self.name = name.into();
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create new System.
|
|
||||||
///
|
|
||||||
/// This method panics if it can not create Tokio runtime
|
|
||||||
pub fn build(self) -> SystemRunner {
|
|
||||||
self.create_runtime(|| {})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This function will start Tokio runtime and will finish once the `System::stop()` message
|
|
||||||
/// is called. Function `f` is called within Tokio runtime context.
|
|
||||||
pub fn run<F>(self, init_fn: F) -> io::Result<()>
|
|
||||||
where
|
|
||||||
F: FnOnce(),
|
|
||||||
{
|
|
||||||
self.create_runtime(init_fn).run()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create_runtime<F>(self, init_fn: F) -> SystemRunner
|
|
||||||
where
|
|
||||||
F: FnOnce(),
|
|
||||||
{
|
|
||||||
let (stop_tx, stop_rx) = channel();
|
|
||||||
let (sys_sender, sys_receiver) = unbounded_channel();
|
|
||||||
|
|
||||||
let rt = Runtime::new().unwrap();
|
|
||||||
|
|
||||||
let system = System::construct(sys_sender, Worker::new_system(rt.local()));
|
|
||||||
|
|
||||||
// init system worker
|
|
||||||
let sys_worker = SystemWorker::new(sys_receiver, stop_tx);
|
|
||||||
rt.spawn(sys_worker);
|
|
||||||
|
|
||||||
// run system init method
|
|
||||||
rt.block_on(async { init_fn() });
|
|
||||||
|
|
||||||
SystemRunner {
|
|
||||||
rt,
|
|
||||||
stop_rx,
|
|
||||||
system,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// System runner object that keeps event loop alive and running until stop message is received.
|
|
||||||
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct SystemRunner {
|
|
||||||
rt: Runtime,
|
|
||||||
stop_rx: Receiver<i32>,
|
|
||||||
system: System,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SystemRunner {
|
|
||||||
/// Starts event loop and will finish once [`System::stop()`] is called.
|
|
||||||
pub fn run(self) -> io::Result<()> {
|
|
||||||
let SystemRunner { rt, stop_rx, .. } = self;
|
|
||||||
|
|
||||||
// run loop
|
|
||||||
match rt.block_on(stop_rx) {
|
|
||||||
Ok(code) => {
|
|
||||||
if code != 0 {
|
|
||||||
Err(io::Error::new(
|
|
||||||
io::ErrorKind::Other,
|
|
||||||
format!("Non-zero exit code: {}", code),
|
|
||||||
))
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Runs the provided future, blocking the current thread until the future completes.
|
|
||||||
#[inline]
|
|
||||||
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
|
|
||||||
self.rt.block_on(fut)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -15,15 +15,13 @@ use tokio::task::JoinHandle;
|
||||||
#[cfg(all(feature = "macros", not(test)))]
|
#[cfg(all(feature = "macros", not(test)))]
|
||||||
pub use actix_macros::{main, test};
|
pub use actix_macros::{main, test};
|
||||||
|
|
||||||
mod builder;
|
|
||||||
mod runtime;
|
mod runtime;
|
||||||
mod system;
|
mod system;
|
||||||
mod worker;
|
mod worker;
|
||||||
|
|
||||||
pub use self::builder::{Builder, SystemRunner};
|
|
||||||
pub use self::runtime::Runtime;
|
pub use self::runtime::Runtime;
|
||||||
pub use self::system::System;
|
pub use self::system::{System, SystemRunner};
|
||||||
pub use self::worker::Worker;
|
pub use self::worker::{Worker, WorkerHandle};
|
||||||
|
|
||||||
pub mod signal {
|
pub mod signal {
|
||||||
//! Asynchronous signal handling (Tokio re-exports).
|
//! Asynchronous signal handling (Tokio re-exports).
|
||||||
|
@ -61,7 +59,7 @@ pub mod task {
|
||||||
pub use tokio::task::{spawn_blocking, yield_now, JoinHandle};
|
pub use tokio::task::{spawn_blocking, yield_now, JoinHandle};
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns a future on the current [Worker].
|
/// Spawns a future on the current [worker](Worker).
|
||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
/// Panics if Actix system is not running.
|
/// Panics if Actix system is not running.
|
||||||
|
|
|
@ -2,9 +2,10 @@ use std::{future::Future, io};
|
||||||
|
|
||||||
use tokio::task::{JoinHandle, LocalSet};
|
use tokio::task::{JoinHandle, LocalSet};
|
||||||
|
|
||||||
/// Single-threaded runtime provides a way to start reactor and runtime on the current thread.
|
/// A single-threaded runtime based on Tokio's "current thread" runtime.
|
||||||
///
|
///
|
||||||
/// See [crate root][crate] documentation for more details.
|
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
|
||||||
|
/// on submitted futures.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Runtime {
|
pub struct Runtime {
|
||||||
local: LocalSet,
|
local: LocalSet,
|
||||||
|
@ -27,7 +28,7 @@ impl Runtime {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reference to local task set.
|
/// Reference to local task set.
|
||||||
pub(crate) fn local(&self) -> &LocalSet {
|
pub(crate) fn local_set(&self) -> &LocalSet {
|
||||||
&self.local
|
&self.local
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use std::{
|
use std::{
|
||||||
borrow::Cow,
|
|
||||||
cell::RefCell,
|
cell::RefCell,
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
future::Future,
|
future::Future,
|
||||||
|
@ -12,55 +11,92 @@ use std::{
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use crate::{
|
use crate::{worker::WorkerHandle, Runtime, Worker};
|
||||||
builder::{Builder, SystemRunner},
|
|
||||||
worker::Worker,
|
|
||||||
};
|
|
||||||
|
|
||||||
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
|
||||||
/// System is a runtime manager.
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct System {
|
|
||||||
id: usize,
|
|
||||||
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
|
||||||
// TODO: which worker is this exactly
|
|
||||||
worker: Worker,
|
|
||||||
}
|
|
||||||
|
|
||||||
thread_local!(
|
thread_local!(
|
||||||
static CURRENT: RefCell<Option<System>> = RefCell::new(None);
|
static CURRENT: RefCell<Option<System>> = RefCell::new(None);
|
||||||
);
|
);
|
||||||
|
|
||||||
|
/// A manager for a per-thread distributed async runtime.
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct System {
|
||||||
|
id: usize,
|
||||||
|
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
||||||
|
|
||||||
|
/// First worker that is created as part of the System.
|
||||||
|
worker_handle: WorkerHandle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for System {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
eprintln!("dropping System")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl System {
|
impl System {
|
||||||
/// Constructs new system and sets it as current.
|
/// Create a new system.
|
||||||
pub(crate) fn construct(
|
|
||||||
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
|
||||||
worker: Worker,
|
|
||||||
) -> Self {
|
|
||||||
let sys = System {
|
|
||||||
sys_tx,
|
|
||||||
worker,
|
|
||||||
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
|
|
||||||
};
|
|
||||||
System::set_current(sys.clone());
|
|
||||||
sys
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Build a new system with a customized Tokio runtime.
|
|
||||||
///
|
|
||||||
/// This allows to customize the runtime. See [`Builder`] for more information.
|
|
||||||
pub fn builder() -> Builder {
|
|
||||||
Builder::new()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create new system.
|
|
||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
/// Panics if underlying Tokio runtime can not be created.
|
/// Panics if underlying Tokio runtime can not be created.
|
||||||
#[allow(clippy::new_ret_no_self)]
|
#[allow(clippy::new_ret_no_self)]
|
||||||
pub fn new(name: impl Into<Cow<'static, str>>) -> SystemRunner {
|
pub fn new() -> SystemRunner {
|
||||||
Self::builder().name(name).build()
|
eprintln!("System::new");
|
||||||
|
Self::create_runtime(async {})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new system with given initialization future.
|
||||||
|
///
|
||||||
|
/// The initialization future be run to completion (blocking current thread) before the system
|
||||||
|
/// runner is returned.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if underlying Tokio runtime can not be created.
|
||||||
|
pub fn with_init(init_fut: impl Future) -> SystemRunner {
|
||||||
|
Self::create_runtime(init_fut)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Constructs new system and registers it on the current thread.
|
||||||
|
pub(crate) fn construct(
|
||||||
|
sys_tx: mpsc::UnboundedSender<SystemCommand>,
|
||||||
|
worker: WorkerHandle,
|
||||||
|
) -> Self {
|
||||||
|
let sys = System {
|
||||||
|
sys_tx,
|
||||||
|
worker_handle: worker,
|
||||||
|
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
|
||||||
|
};
|
||||||
|
|
||||||
|
System::set_current(sys.clone());
|
||||||
|
|
||||||
|
sys
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_runtime(init_fut: impl Future) -> SystemRunner {
|
||||||
|
let (stop_tx, stop_rx) = oneshot::channel();
|
||||||
|
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
eprintln!("creating runtime");
|
||||||
|
let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
|
||||||
|
eprintln!("creating worker and system");
|
||||||
|
let system = System::construct(sys_tx, Worker::new_current_thread(rt.local_set()));
|
||||||
|
|
||||||
|
eprintln!("creating system controller");
|
||||||
|
// init background system worker
|
||||||
|
let sys_worker = SystemController::new(sys_rx, stop_tx);
|
||||||
|
rt.spawn(sys_worker);
|
||||||
|
|
||||||
|
eprintln!("running init future");
|
||||||
|
// run system init future
|
||||||
|
rt.block_on(init_fut);
|
||||||
|
|
||||||
|
eprintln!("done; here's your system runner");
|
||||||
|
SystemRunner {
|
||||||
|
rt,
|
||||||
|
stop_rx,
|
||||||
|
system,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get current running system.
|
/// Get current running system.
|
||||||
|
@ -68,37 +104,34 @@ impl System {
|
||||||
/// # Panics
|
/// # Panics
|
||||||
/// Panics if no system is registered on the current thread.
|
/// Panics if no system is registered on the current thread.
|
||||||
pub fn current() -> System {
|
pub fn current() -> System {
|
||||||
|
eprintln!("gib current system");
|
||||||
CURRENT.with(|cell| match *cell.borrow() {
|
CURRENT.with(|cell| match *cell.borrow() {
|
||||||
Some(ref sys) => sys.clone(),
|
Some(ref sys) => sys.clone(),
|
||||||
None => panic!("System is not running"),
|
None => panic!("System is not running"),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if current system has started.
|
/// Get handle to a the System's initial [Worker].
|
||||||
pub fn is_set() -> bool {
|
pub fn worker(&self) -> &WorkerHandle {
|
||||||
CURRENT.with(|cell| cell.borrow().is_some())
|
&self.worker_handle
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set current running system.
|
/// Check if there is a System registered on the current thread.
|
||||||
|
pub fn is_registered() -> bool {
|
||||||
|
CURRENT.with(|sys| sys.borrow().is_some())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register given system on current thread.
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub fn set_current(sys: System) {
|
pub fn set_current(sys: System) {
|
||||||
CURRENT.with(|s| {
|
CURRENT.with(|cell| {
|
||||||
*s.borrow_mut() = Some(sys);
|
*cell.borrow_mut() = Some(sys);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute function with system reference.
|
/// Numeric system identifier.
|
||||||
pub fn with_current<F, R>(f: F) -> R
|
///
|
||||||
where
|
/// Useful when using multiple Systems.
|
||||||
F: FnOnce(&System) -> R,
|
|
||||||
{
|
|
||||||
CURRENT.with(|cell| match *cell.borrow() {
|
|
||||||
Some(ref sys) => f(sys),
|
|
||||||
None => panic!("System is not running"),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Numeric system ID.
|
|
||||||
pub fn id(&self) -> usize {
|
pub fn id(&self) -> usize {
|
||||||
self.id
|
self.id
|
||||||
}
|
}
|
||||||
|
@ -108,7 +141,7 @@ impl System {
|
||||||
self.stop_with_code(0)
|
self.stop_with_code(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop the system with a particular exit code.
|
/// Stop the system with a given exit code.
|
||||||
pub fn stop_with_code(&self, code: i32) {
|
pub fn stop_with_code(&self, code: i32) {
|
||||||
let _ = self.sys_tx.send(SystemCommand::Exit(code));
|
let _ = self.sys_tx.send(SystemCommand::Exit(code));
|
||||||
}
|
}
|
||||||
|
@ -116,57 +149,89 @@ impl System {
|
||||||
pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> {
|
pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> {
|
||||||
&self.sys_tx
|
&self.sys_tx
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: give clarity on which worker this is; previous documented as returning "system worker"
|
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
||||||
/// Get shared reference to a worker.
|
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
||||||
pub fn worker(&self) -> &Worker {
|
#[derive(Debug)]
|
||||||
&self.worker
|
pub struct SystemRunner {
|
||||||
|
rt: Runtime,
|
||||||
|
stop_rx: oneshot::Receiver<i32>,
|
||||||
|
system: System,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SystemRunner {
|
||||||
|
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
||||||
|
pub fn run(self) -> io::Result<()> {
|
||||||
|
eprintln!("SystemRunner: run");
|
||||||
|
|
||||||
|
let SystemRunner { rt, stop_rx, .. } = self;
|
||||||
|
|
||||||
|
// run loop
|
||||||
|
match rt.block_on(stop_rx) {
|
||||||
|
Ok(code) => {
|
||||||
|
if code != 0 {
|
||||||
|
Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!("Non-zero exit code: {}", code),
|
||||||
|
))
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This function will start Tokio runtime and will finish once the `System::stop()` message
|
/// Runs the provided future, blocking the current thread until the future completes.
|
||||||
/// is called. Function `f` is called within Tokio runtime context.
|
#[inline]
|
||||||
pub fn run<F>(f: F) -> io::Result<()>
|
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
|
||||||
where
|
self.rt.block_on(fut)
|
||||||
F: FnOnce(),
|
|
||||||
{
|
|
||||||
Self::builder().run(f)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) enum SystemCommand {
|
pub(crate) enum SystemCommand {
|
||||||
Exit(i32),
|
Exit(i32),
|
||||||
RegisterArbiter(usize, Worker),
|
RegisterWorker(usize, WorkerHandle),
|
||||||
DeregisterArbiter(usize),
|
DeregisterWorker(usize),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// There is one `SystemController` per [System]. It runs in the background, keeping track of
|
||||||
|
/// [Worker]s and is able to distribute a system-wide stop command.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct SystemWorker {
|
pub(crate) struct SystemController {
|
||||||
stop: Option<oneshot::Sender<i32>>,
|
stop_tx: Option<oneshot::Sender<i32>>,
|
||||||
commands: mpsc::UnboundedReceiver<SystemCommand>,
|
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
|
||||||
workers: HashMap<usize, Worker>,
|
workers: HashMap<usize, WorkerHandle>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SystemWorker {
|
impl SystemController {
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
commands: mpsc::UnboundedReceiver<SystemCommand>,
|
cmd_rx: mpsc::UnboundedReceiver<SystemCommand>,
|
||||||
stop: oneshot::Sender<i32>,
|
stop_tx: oneshot::Sender<i32>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
SystemWorker {
|
SystemController {
|
||||||
commands,
|
cmd_rx,
|
||||||
stop: Some(stop),
|
stop_tx: Some(stop_tx),
|
||||||
workers: HashMap::new(),
|
workers: HashMap::with_capacity(4),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
impl Drop for SystemController {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
eprintln!("dropping SystemController")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Future for SystemWorker {
|
impl Future for SystemController {
|
||||||
type Output = ();
|
type Output = ();
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
// process all items currently buffered in channel
|
// process all items currently buffered in channel
|
||||||
loop {
|
loop {
|
||||||
match ready!(Pin::new(&mut self.commands).poll_recv(cx)) {
|
match ready!(Pin::new(&mut self.cmd_rx).poll_recv(cx)) {
|
||||||
// channel closed; no more messages can be received
|
// channel closed; no more messages can be received
|
||||||
None => return Poll::Ready(()),
|
None => return Poll::Ready(()),
|
||||||
|
|
||||||
|
@ -179,16 +244,17 @@ impl Future for SystemWorker {
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop event loop
|
// stop event loop
|
||||||
if let Some(stop) = self.stop.take() {
|
// will only fire once
|
||||||
let _ = stop.send(code);
|
if let Some(stop_tx) = self.stop_tx.take() {
|
||||||
|
let _ = stop_tx.send(code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SystemCommand::RegisterArbiter(name, hnd) => {
|
SystemCommand::RegisterWorker(name, hnd) => {
|
||||||
self.workers.insert(name, hnd);
|
self.workers.insert(name, hnd);
|
||||||
}
|
}
|
||||||
|
|
||||||
SystemCommand::DeregisterArbiter(name) => {
|
SystemCommand::DeregisterWorker(name) => {
|
||||||
self.workers.remove(&name);
|
self.workers.remove(&name);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -21,7 +21,7 @@ use crate::{
|
||||||
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
|
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
|
||||||
thread_local!(
|
thread_local!(
|
||||||
static ADDR: RefCell<Option<Worker>> = RefCell::new(None);
|
static HANDLE: RefCell<Option<WorkerHandle>> = RefCell::new(None);
|
||||||
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
|
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -33,107 +33,35 @@ pub(crate) enum WorkerCommand {
|
||||||
impl fmt::Debug for WorkerCommand {
|
impl fmt::Debug for WorkerCommand {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match self {
|
match self {
|
||||||
WorkerCommand::Stop => write!(f, "ArbiterCommand::Stop"),
|
WorkerCommand::Stop => write!(f, "WorkerCommand::Stop"),
|
||||||
WorkerCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"),
|
WorkerCommand::Execute(_) => write!(f, "WorkerCommand::Execute"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A worker represent a thread that provides an asynchronous execution environment for futures
|
/// A handle for sending spawn and stop messages to a [Worker].
|
||||||
/// and functions.
|
#[derive(Debug, Clone)]
|
||||||
///
|
pub struct WorkerHandle {
|
||||||
/// When a Worker is created, it spawns a new [OS thread](thread), and hosts an event loop.
|
|
||||||
/// Some Arbiter functions execute on the current thread.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Worker {
|
|
||||||
sender: mpsc::UnboundedSender<WorkerCommand>,
|
sender: mpsc::UnboundedSender<WorkerCommand>,
|
||||||
thread_handle: Option<thread::JoinHandle<()>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Clone for Worker {
|
impl Drop for WorkerHandle {
|
||||||
fn clone(&self) -> Self {
|
fn drop(&mut self) {
|
||||||
Self::new_handle(self.sender.clone())
|
eprintln!("dropping WorkerHandle")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Worker {
|
impl WorkerHandle {
|
||||||
fn default() -> Self {
|
pub(crate) fn new(sender: mpsc::UnboundedSender<WorkerCommand>) -> Self {
|
||||||
Self::new()
|
eprintln!("WorkerHandle::new");
|
||||||
}
|
Self { sender }
|
||||||
}
|
|
||||||
|
|
||||||
impl Worker {
|
|
||||||
/// Spawn new thread and run event loop in spawned thread.
|
|
||||||
///
|
|
||||||
/// Returns handle of newly created worker.
|
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
/// Panics if a [System] not registered on the current thread.
|
|
||||||
pub fn new() -> Worker {
|
|
||||||
let id = COUNT.fetch_add(1, Ordering::Relaxed);
|
|
||||||
let name = format!("actix-rt:worker:{}", id);
|
|
||||||
let sys = System::current();
|
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
let handle = thread::Builder::new()
|
|
||||||
.name(name.clone())
|
|
||||||
.spawn({
|
|
||||||
let tx = tx.clone();
|
|
||||||
move || {
|
|
||||||
let rt = Runtime::new().expect("Can not create Runtime");
|
|
||||||
let arb = Worker::new_handle(tx);
|
|
||||||
|
|
||||||
STORAGE.with(|cell| cell.borrow_mut().clear());
|
|
||||||
|
|
||||||
System::set_current(sys);
|
|
||||||
|
|
||||||
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
|
||||||
|
|
||||||
// register worker
|
|
||||||
let _ = System::current()
|
|
||||||
.tx()
|
|
||||||
.send(SystemCommand::RegisterArbiter(id, arb));
|
|
||||||
|
|
||||||
// run worker event processing loop
|
|
||||||
rt.block_on(WorkerRunner { rx });
|
|
||||||
|
|
||||||
// deregister worker
|
|
||||||
let _ = System::current()
|
|
||||||
.tx()
|
|
||||||
.send(SystemCommand::DeregisterArbiter(id));
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unwrap_or_else(|err| {
|
|
||||||
panic!("Cannot spawn a Worker's thread {:?}: {:?}", &name, err)
|
|
||||||
});
|
|
||||||
|
|
||||||
Worker {
|
|
||||||
sender: tx,
|
|
||||||
thread_handle: Some(handle),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the current Worker's handle.
|
/// Send a future to the Worker's thread and spawn it.
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
/// Panics if no Worker is running on the current thread.
|
|
||||||
pub fn current() -> Worker {
|
|
||||||
ADDR.with(|cell| match *cell.borrow() {
|
|
||||||
Some(ref addr) => addr.clone(),
|
|
||||||
None => panic!("Worker is not running."),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Stop worker from continuing it's event loop.
|
|
||||||
pub fn stop(&self) {
|
|
||||||
let _ = self.sender.send(WorkerCommand::Stop);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Send a future to the Arbiter's thread and spawn it.
|
|
||||||
///
|
///
|
||||||
/// If you require a result, include a response channel in the future.
|
/// If you require a result, include a response channel in the future.
|
||||||
///
|
///
|
||||||
/// Returns true if future was sent successfully and false if the Arbiter has died.
|
/// Returns true if future was sent successfully and false if the Worker has died.
|
||||||
pub fn spawn<Fut>(&self, future: Fut) -> bool
|
pub fn spawn<Fut>(&self, future: Fut) -> bool
|
||||||
where
|
where
|
||||||
Fut: Future<Output = ()> + Send + 'static,
|
Fut: Future<Output = ()> + Send + 'static,
|
||||||
|
@ -143,12 +71,139 @@ impl Worker {
|
||||||
.is_ok()
|
.is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a function to the Arbiter's thread and execute it.
|
/// Send a function to the Worker's thread and execute it.
|
||||||
///
|
///
|
||||||
/// Any result from the function is discarded. If you require a result, include a response
|
/// Any result from the function is discarded. If you require a result, include a response
|
||||||
/// channel in the function.
|
/// channel in the function.
|
||||||
///
|
///
|
||||||
/// Returns true if function was sent successfully and false if the Arbiter has died.
|
/// Returns true if function was sent successfully and false if the Worker has died.
|
||||||
|
pub fn spawn_fn<F>(&self, f: F) -> bool
|
||||||
|
where
|
||||||
|
F: FnOnce() + Send + 'static,
|
||||||
|
{
|
||||||
|
self.spawn(async { f() })
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Instruct worker to stop processing it's event loop.
|
||||||
|
///
|
||||||
|
/// Returns true if stop message was sent successfully and false if the Worker has been dropped.
|
||||||
|
pub fn stop(&self) -> bool {
|
||||||
|
self.sender.send(WorkerCommand::Stop).is_ok()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A worker represent a thread that provides an asynchronous execution environment for futures
|
||||||
|
/// and functions.
|
||||||
|
///
|
||||||
|
/// When a Worker is created, it spawns a new [OS thread](thread), and hosts an event loop.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Worker {
|
||||||
|
sender: mpsc::UnboundedSender<WorkerCommand>,
|
||||||
|
thread_handle: thread::JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Worker {
|
||||||
|
/// Spawn new thread and run event loop in spawned thread.
|
||||||
|
///
|
||||||
|
/// Returns handle of newly created worker.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if a [System] is not registered on the current thread.
|
||||||
|
pub fn new() -> Worker {
|
||||||
|
let id = COUNT.fetch_add(1, Ordering::Relaxed);
|
||||||
|
let name = format!("actix-rt:worker:{}", id);
|
||||||
|
let sys = System::current();
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let thread_handle = thread::Builder::new()
|
||||||
|
.name(name.clone())
|
||||||
|
.spawn({
|
||||||
|
let tx = tx.clone();
|
||||||
|
move || {
|
||||||
|
let rt = Runtime::new().expect("Can not create Runtime");
|
||||||
|
let hnd = WorkerHandle::new(tx);
|
||||||
|
|
||||||
|
System::set_current(sys);
|
||||||
|
|
||||||
|
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||||
|
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
||||||
|
|
||||||
|
// register worker
|
||||||
|
let _ = System::current()
|
||||||
|
.tx()
|
||||||
|
.send(SystemCommand::RegisterWorker(id, hnd));
|
||||||
|
|
||||||
|
// run worker event processing loop
|
||||||
|
rt.block_on(WorkerRunner { rx });
|
||||||
|
|
||||||
|
// deregister worker
|
||||||
|
let _ = System::current()
|
||||||
|
.tx()
|
||||||
|
.send(SystemCommand::DeregisterWorker(id));
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
panic!("Cannot spawn a Worker's thread {:?}: {:?}", &name, err)
|
||||||
|
});
|
||||||
|
|
||||||
|
Worker {
|
||||||
|
sender: tx,
|
||||||
|
thread_handle,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sets up a worker runner on the current thread using the provided runtime local task set.
|
||||||
|
pub(crate) fn new_current_thread(local: &LocalSet) -> WorkerHandle {
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let hnd = WorkerHandle::new(tx);
|
||||||
|
|
||||||
|
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
||||||
|
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||||
|
|
||||||
|
local.spawn_local(WorkerRunner { rx });
|
||||||
|
|
||||||
|
hnd
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return a handle to the worker.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if no Worker is running on the current thread.
|
||||||
|
pub fn handle() -> WorkerHandle {
|
||||||
|
HANDLE.with(|cell| match *cell.borrow() {
|
||||||
|
Some(ref addr) => addr.clone(),
|
||||||
|
None => panic!("Worker is not running."),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stop worker from continuing it's event loop.
|
||||||
|
///
|
||||||
|
/// Returns true if stop message was sent successfully and false if the Worker has been dropped.
|
||||||
|
pub fn stop(&self) -> bool {
|
||||||
|
self.sender.send(WorkerCommand::Stop).is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a future to the Worker's thread and spawn it.
|
||||||
|
///
|
||||||
|
/// If you require a result, include a response channel in the future.
|
||||||
|
///
|
||||||
|
/// Returns true if future was sent successfully and false if the Worker has died.
|
||||||
|
pub fn spawn<Fut>(&self, future: Fut) -> bool
|
||||||
|
where
|
||||||
|
Fut: Future<Output = ()> + Send + 'static,
|
||||||
|
{
|
||||||
|
self.sender
|
||||||
|
.send(WorkerCommand::Execute(Box::pin(future)))
|
||||||
|
.is_ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a function to the Worker's thread and execute it.
|
||||||
|
///
|
||||||
|
/// Any result from the function is discarded. If you require a result, include a response
|
||||||
|
/// channel in the function.
|
||||||
|
///
|
||||||
|
/// Returns true if function was sent successfully and false if the Worker has died.
|
||||||
pub fn spawn_fn<F>(&self, f: F) -> bool
|
pub fn spawn_fn<F>(&self, f: F) -> bool
|
||||||
where
|
where
|
||||||
F: FnOnce() + Send + 'static,
|
F: FnOnce() + Send + 'static,
|
||||||
|
@ -158,32 +213,9 @@ impl Worker {
|
||||||
|
|
||||||
/// Wait for worker's event loop to complete.
|
/// Wait for worker's event loop to complete.
|
||||||
///
|
///
|
||||||
/// Joins the underlying OS thread handle, if contained.
|
/// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).
|
||||||
pub fn join(&mut self) -> thread::Result<()> {
|
pub fn join(self) -> thread::Result<()> {
|
||||||
if let Some(thread_handle) = self.thread_handle.take() {
|
self.thread_handle.join()
|
||||||
thread_handle.join()
|
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn new_system(local: &LocalSet) -> Self {
|
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
let arb = Worker::new_handle(tx);
|
|
||||||
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
|
||||||
STORAGE.with(|cell| cell.borrow_mut().clear());
|
|
||||||
|
|
||||||
local.spawn_local(WorkerRunner { rx });
|
|
||||||
|
|
||||||
arb
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_handle(sender: mpsc::UnboundedSender<WorkerCommand>) -> Self {
|
|
||||||
Self {
|
|
||||||
sender,
|
|
||||||
thread_handle: None,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert item into worker's thread-local storage.
|
/// Insert item into worker's thread-local storage.
|
||||||
|
@ -240,20 +272,34 @@ struct WorkerRunner {
|
||||||
rx: mpsc::UnboundedReceiver<WorkerCommand>,
|
rx: mpsc::UnboundedReceiver<WorkerCommand>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for WorkerRunner {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
eprintln!("dropping WorkerRunner")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Future for WorkerRunner {
|
impl Future for WorkerRunner {
|
||||||
type Output = ();
|
type Output = ();
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
eprintln!("WorkerRunner: poll");
|
||||||
|
|
||||||
// process all items currently buffered in channel
|
// process all items currently buffered in channel
|
||||||
loop {
|
loop {
|
||||||
|
eprintln!("WorkerRunner: loop");
|
||||||
|
|
||||||
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
|
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
|
||||||
// channel closed; no more messages can be received
|
// channel closed; no more messages can be received
|
||||||
None => return Poll::Ready(()),
|
None => return Poll::Ready(()),
|
||||||
|
|
||||||
// process worker command
|
// process worker command
|
||||||
Some(item) => match item {
|
Some(item) => match item {
|
||||||
WorkerCommand::Stop => return Poll::Ready(()),
|
WorkerCommand::Stop => {
|
||||||
|
eprintln!("WorkerRunner: stopping");
|
||||||
|
return Poll::Ready(());
|
||||||
|
}
|
||||||
WorkerCommand::Execute(task_fut) => {
|
WorkerCommand::Execute(task_fut) => {
|
||||||
|
eprintln!("WorkerRunner: executing task");
|
||||||
tokio::task::spawn_local(task_fut);
|
tokio::task::spawn_local(task_fut);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,16 +1,17 @@
|
||||||
use std::{
|
use std::{
|
||||||
sync::mpsc::sync_channel,
|
sync::mpsc::channel,
|
||||||
thread,
|
thread,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
use actix_rt::{System, Worker};
|
use actix_rt::{System, Worker};
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn await_for_timer() {
|
fn await_for_timer() {
|
||||||
let time = Duration::from_secs(1);
|
let time = Duration::from_secs(1);
|
||||||
let instant = Instant::now();
|
let instant = Instant::now();
|
||||||
System::new("test_wait_timer").block_on(async move {
|
System::new().block_on(async move {
|
||||||
tokio::time::sleep(time).await;
|
tokio::time::sleep(time).await;
|
||||||
});
|
});
|
||||||
assert!(
|
assert!(
|
||||||
|
@ -23,11 +24,11 @@ fn await_for_timer() {
|
||||||
fn join_another_worker() {
|
fn join_another_worker() {
|
||||||
let time = Duration::from_secs(1);
|
let time = Duration::from_secs(1);
|
||||||
let instant = Instant::now();
|
let instant = Instant::now();
|
||||||
System::new("test_join_another_worker").block_on(async move {
|
System::new().block_on(async move {
|
||||||
let mut worker = Worker::new();
|
let worker = Worker::new();
|
||||||
worker.spawn(Box::pin(async move {
|
worker.spawn(Box::pin(async move {
|
||||||
tokio::time::sleep(time).await;
|
tokio::time::sleep(time).await;
|
||||||
Worker::current().stop();
|
Worker::handle().stop();
|
||||||
}));
|
}));
|
||||||
worker.join().unwrap();
|
worker.join().unwrap();
|
||||||
});
|
});
|
||||||
|
@ -37,12 +38,12 @@ fn join_another_worker() {
|
||||||
);
|
);
|
||||||
|
|
||||||
let instant = Instant::now();
|
let instant = Instant::now();
|
||||||
System::new("test_join_another_worker").block_on(async move {
|
System::new().block_on(async move {
|
||||||
let mut worker = Worker::new();
|
let worker = Worker::new();
|
||||||
worker.spawn_fn(move || {
|
worker.spawn_fn(move || {
|
||||||
actix_rt::spawn(async move {
|
actix_rt::spawn(async move {
|
||||||
tokio::time::sleep(time).await;
|
tokio::time::sleep(time).await;
|
||||||
Worker::current().stop();
|
Worker::handle().stop();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
worker.join().unwrap();
|
worker.join().unwrap();
|
||||||
|
@ -53,11 +54,11 @@ fn join_another_worker() {
|
||||||
);
|
);
|
||||||
|
|
||||||
let instant = Instant::now();
|
let instant = Instant::now();
|
||||||
System::new("test_join_another_worker").block_on(async move {
|
System::new().block_on(async move {
|
||||||
let mut worker = Worker::new();
|
let worker = Worker::new();
|
||||||
worker.spawn(Box::pin(async move {
|
worker.spawn(Box::pin(async move {
|
||||||
tokio::time::sleep(time).await;
|
tokio::time::sleep(time).await;
|
||||||
Worker::current().stop();
|
Worker::handle().stop();
|
||||||
}));
|
}));
|
||||||
worker.stop();
|
worker.stop();
|
||||||
worker.join().unwrap();
|
worker.join().unwrap();
|
||||||
|
@ -73,7 +74,7 @@ fn non_static_block_on() {
|
||||||
let string = String::from("test_str");
|
let string = String::from("test_str");
|
||||||
let str = string.as_str();
|
let str = string.as_str();
|
||||||
|
|
||||||
let sys = System::new("borrow some");
|
let sys = System::new();
|
||||||
|
|
||||||
sys.block_on(async {
|
sys.block_on(async {
|
||||||
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||||
|
@ -87,10 +88,11 @@ fn non_static_block_on() {
|
||||||
assert_eq!("test_str", str);
|
assert_eq!("test_str", str);
|
||||||
});
|
});
|
||||||
|
|
||||||
System::run(|| {
|
System::with_init(async {
|
||||||
assert_eq!("test_str", str);
|
assert_eq!("test_str", str);
|
||||||
System::current().stop();
|
System::current().stop();
|
||||||
})
|
})
|
||||||
|
.run()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,11 +111,11 @@ fn wait_for_spawns() {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn worker_spawn_fn_runs() {
|
fn worker_spawn_fn_runs() {
|
||||||
let _ = System::new("test-system");
|
let _ = System::new();
|
||||||
|
|
||||||
let (tx, rx) = sync_channel::<u32>(1);
|
let (tx, rx) = channel::<u32>();
|
||||||
|
|
||||||
let mut worker = Worker::new();
|
let worker = Worker::new();
|
||||||
worker.spawn_fn(move || tx.send(42).unwrap());
|
worker.spawn_fn(move || tx.send(42).unwrap());
|
||||||
|
|
||||||
let num = rx.recv().unwrap();
|
let num = rx.recv().unwrap();
|
||||||
|
@ -125,9 +127,9 @@ fn worker_spawn_fn_runs() {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn worker_drop_no_panic_fn() {
|
fn worker_drop_no_panic_fn() {
|
||||||
let _ = System::new("test-system");
|
let _ = System::new();
|
||||||
|
|
||||||
let mut worker = Worker::new();
|
let worker = Worker::new();
|
||||||
worker.spawn_fn(|| panic!("test"));
|
worker.spawn_fn(|| panic!("test"));
|
||||||
|
|
||||||
worker.stop();
|
worker.stop();
|
||||||
|
@ -136,9 +138,9 @@ fn worker_drop_no_panic_fn() {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn worker_drop_no_panic_fut() {
|
fn worker_drop_no_panic_fut() {
|
||||||
let _ = System::new("test-system");
|
let _ = System::new();
|
||||||
|
|
||||||
let mut worker = Worker::new();
|
let worker = Worker::new();
|
||||||
worker.spawn(async { panic!("test") });
|
worker.spawn(async { panic!("test") });
|
||||||
|
|
||||||
worker.stop();
|
worker.stop();
|
||||||
|
@ -147,9 +149,9 @@ fn worker_drop_no_panic_fut() {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn worker_item_storage() {
|
fn worker_item_storage() {
|
||||||
let _ = System::new("test-system");
|
let _ = System::new();
|
||||||
|
|
||||||
let mut worker = Worker::new();
|
let worker = Worker::new();
|
||||||
|
|
||||||
assert!(!Worker::contains_item::<u32>());
|
assert!(!Worker::contains_item::<u32>());
|
||||||
Worker::set_item(42u32);
|
Worker::set_item(42u32);
|
||||||
|
@ -174,18 +176,6 @@ fn worker_item_storage() {
|
||||||
worker.join().unwrap();
|
worker.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn system_name_cow_str() {
|
|
||||||
let _ = System::new("test-system");
|
|
||||||
System::current().stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn system_name_cow_string() {
|
|
||||||
let _ = System::new("test-system".to_owned());
|
|
||||||
System::current().stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[should_panic]
|
#[should_panic]
|
||||||
fn no_system_current_panic() {
|
fn no_system_current_panic() {
|
||||||
|
@ -197,3 +187,39 @@ fn no_system_current_panic() {
|
||||||
fn no_system_worker_new_panic() {
|
fn no_system_worker_new_panic() {
|
||||||
Worker::new();
|
Worker::new();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn system_worker_spawn() {
|
||||||
|
let runner = System::new();
|
||||||
|
|
||||||
|
eprintln!("making channel");
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
eprintln!("getting initial system worker");
|
||||||
|
let sys = System::current();
|
||||||
|
|
||||||
|
thread::spawn(|| {
|
||||||
|
// this thread will have no worker in it's thread local so call will panic
|
||||||
|
Worker::handle();
|
||||||
|
})
|
||||||
|
.join()
|
||||||
|
.unwrap_err();
|
||||||
|
|
||||||
|
let thread = thread::spawn(|| {
|
||||||
|
// this thread will have no worker in it's thread local so use the system handle instead
|
||||||
|
System::set_current(sys);
|
||||||
|
let sys = System::current();
|
||||||
|
|
||||||
|
let wrk = sys.worker();
|
||||||
|
wrk.spawn(async move {
|
||||||
|
eprintln!("before send");
|
||||||
|
tx.send(42u32).unwrap();
|
||||||
|
|
||||||
|
eprintln!("after send");
|
||||||
|
System::current().stop();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
assert_eq!(runner.block_on(rx).unwrap(), 42);
|
||||||
|
thread.join().unwrap();
|
||||||
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ impl TestServer {
|
||||||
|
|
||||||
// run server in separate thread
|
// run server in separate thread
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let sys = System::new("actix-test-server");
|
let sys = System::new();
|
||||||
factory(Server::build()).workers(1).disable_signals().run();
|
factory(Server::build()).workers(1).disable_signals().run();
|
||||||
|
|
||||||
tx.send(System::current()).unwrap();
|
tx.send(System::current()).unwrap();
|
||||||
|
@ -70,7 +70,7 @@ impl TestServer {
|
||||||
|
|
||||||
// run server in separate thread
|
// run server in separate thread
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let sys = System::new("actix-test-server");
|
let sys = System::new();
|
||||||
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
||||||
let local_addr = tcp.local_addr().unwrap();
|
let local_addr = tcp.local_addr().unwrap();
|
||||||
|
|
||||||
|
|
|
@ -215,7 +215,7 @@ impl ServerWorker {
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Can not start worker: {:?}", e);
|
error!("Can not start worker: {:?}", e);
|
||||||
Arbiter::current().stop();
|
Arbiter::handle().stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
wrk.await
|
wrk.await
|
||||||
|
@ -386,7 +386,7 @@ impl Future for ServerWorker {
|
||||||
let num = num_connections();
|
let num = num_connections();
|
||||||
if num == 0 {
|
if num == 0 {
|
||||||
let _ = tx.take().unwrap().send(true);
|
let _ = tx.take().unwrap().send(true);
|
||||||
Arbiter::current().stop();
|
Arbiter::handle().stop();
|
||||||
return Poll::Ready(());
|
return Poll::Ready(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -394,7 +394,7 @@ impl Future for ServerWorker {
|
||||||
if Pin::new(t2).poll(cx).is_ready() {
|
if Pin::new(t2).poll(cx).is_ready() {
|
||||||
let _ = tx.take().unwrap().send(false);
|
let _ = tx.take().unwrap().send(false);
|
||||||
self.shutdown(true);
|
self.shutdown(true);
|
||||||
Arbiter::current().stop();
|
Arbiter::handle().stop();
|
||||||
return Poll::Ready(());
|
return Poll::Ready(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue