remove arbiter ref method

This commit is contained in:
Rob Ede 2021-01-29 14:36:35 +00:00
parent 7589f3c621
commit 04c6456398
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6
5 changed files with 37 additions and 36 deletions

View File

@ -8,6 +8,7 @@
* Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] * Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253]
* Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253] * Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253]
* Remove `Worker::exec`. [#253] * Remove `Worker::exec`. [#253]
* Remove `System::arbiter`. [#256]
* Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253] * Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253]
* `Worker::spawn` now accepts !Unpin futures. [#256] * `Worker::spawn` now accepts !Unpin futures. [#256]

View File

@ -25,18 +25,6 @@ pub use self::runtime::Runtime;
pub use self::system::System; pub use self::system::System;
pub use self::worker::Worker; pub use self::worker::Worker;
/// Spawns a future on the current [Arbiter].
///
/// # Panics
/// Panics if Actix system is not running.
#[inline]
pub fn spawn<Fut>(f: Fut) -> JoinHandle<()>
where
Fut: Future<Output = ()> + 'static,
{
tokio::task::spawn_local(f)
}
pub mod signal { pub mod signal {
//! Asynchronous signal handling (Tokio re-exports). //! Asynchronous signal handling (Tokio re-exports).
@ -72,3 +60,15 @@ pub mod task {
pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; pub use tokio::task::{spawn_blocking, yield_now, JoinHandle};
} }
/// Spawns a future on the current [Worker].
///
/// # Panics
/// Panics if Actix system is not running.
#[inline]
pub fn spawn<Fut>(f: Fut) -> JoinHandle<()>
where
Fut: Future<Output = ()> + 'static,
{
tokio::task::spawn_local(f)
}

View File

@ -23,7 +23,7 @@ static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct System { pub struct System {
id: usize, id: usize,
tx: mpsc::UnboundedSender<SystemCommand>, sys_tx: mpsc::UnboundedSender<SystemCommand>,
worker: Worker, worker: Worker,
} }
@ -32,10 +32,13 @@ thread_local!(
); );
impl System { impl System {
/// Constructs new system and sets it as current /// Constructs new system and sets it as current.
pub(crate) fn construct(sys: mpsc::UnboundedSender<SystemCommand>, worker: Worker) -> Self { pub(crate) fn construct(
sys_tx: mpsc::UnboundedSender<SystemCommand>,
worker: Worker,
) -> Self {
let sys = System { let sys = System {
tx: sys, sys_tx,
worker, worker,
id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst),
}; };
@ -103,16 +106,11 @@ impl System {
/// Stop the system with a particular exit code. /// Stop the system with a particular exit code.
pub fn stop_with_code(&self, code: i32) { pub fn stop_with_code(&self, code: i32) {
let _ = self.tx.send(SystemCommand::Exit(code)); let _ = self.sys_tx.send(SystemCommand::Exit(code));
} }
pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> { pub(crate) fn tx(&self) -> &mpsc::UnboundedSender<SystemCommand> {
&self.tx &self.sys_tx
}
/// Get shared reference to system arbiter.
pub fn arbiter(&self) -> &Worker {
&self.worker
} }
/// This function will start Tokio runtime and will finish once the `System::stop()` message /// This function will start Tokio runtime and will finish once the `System::stop()` message
@ -165,18 +163,21 @@ impl Future for SystemWorker {
// process system command // process system command
Some(cmd) => match cmd { Some(cmd) => match cmd {
SystemCommand::Exit(code) => { SystemCommand::Exit(code) => {
// stop arbiters // stop workers
for arb in self.workers.values() { for wkr in self.workers.values() {
arb.stop(); wkr.stop();
} }
// stop event loop // stop event loop
if let Some(stop) = self.stop.take() { if let Some(stop) = self.stop.take() {
let _ = stop.send(code); let _ = stop.send(code);
} }
} }
SystemCommand::RegisterArbiter(name, hnd) => { SystemCommand::RegisterArbiter(name, hnd) => {
self.workers.insert(name, hnd); self.workers.insert(name, hnd);
} }
SystemCommand::DeregisterArbiter(name) => { SystemCommand::DeregisterArbiter(name) => {
self.workers.remove(&name); self.workers.remove(&name);
} }

View File

@ -197,11 +197,6 @@ impl Worker {
/// Call a function with a shared reference to an item in this worker's thread-local storage. /// Call a function with a shared reference to an item in this worker's thread-local storage.
/// ///
/// # Examples
/// ```
///
/// ```
///
/// # Panics /// # Panics
/// Panics if item is not in worker's thread-local item storage. /// Panics if item is not in worker's thread-local item storage.
pub fn get_item<T: 'static, F, R>(mut f: F) -> R pub fn get_item<T: 'static, F, R>(mut f: F) -> R
@ -252,7 +247,7 @@ impl Future for WorkerRunner {
// channel closed; no more messages can be received // channel closed; no more messages can be received
None => return Poll::Ready(()), None => return Poll::Ready(()),
// process arbiter command // process worker command
Some(item) => match item { Some(item) => match item {
WorkerCommand::Stop => return Poll::Ready(()), WorkerCommand::Stop => return Poll::Ready(()),
WorkerCommand::Execute(task_fut) => { WorkerCommand::Execute(task_fut) => {

View File

@ -1,8 +1,11 @@
use std::time::Duration; use std::time::Duration;
use std::{io, thread}; use std::{io, thread};
use actix_rt::time::{sleep_until, Instant}; use actix_rt::{
use actix_rt::System; self as rt,
time::{sleep_until, Instant},
System,
};
use log::{error, info}; use log::{error, info};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use slab::Slab; use slab::Slab;
@ -401,10 +404,11 @@ impl Accept {
// after the sleep a Timer interest is sent to Accept Poll // after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone(); let waker = self.waker.clone();
System::current().arbiter().spawn(Box::pin(async move { rt::spawn(async move {
sleep_until(Instant::now() + Duration::from_millis(510)).await; sleep_until(Instant::now() + Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer); waker.wake(WakerInterest::Timer);
})); });
return; return;
} }
} }