diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 4262c82e..379afbd7 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -8,6 +8,7 @@ * Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] * Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253] * Remove `Worker::exec`. [#253] +* Remove `System::arbiter`. [#256] * Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253] * `Worker::spawn` now accepts !Unpin futures. [#256] diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 212c0c65..b0303d6c 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -25,18 +25,6 @@ pub use self::runtime::Runtime; pub use self::system::System; pub use self::worker::Worker; -/// Spawns a future on the current [Arbiter]. -/// -/// # Panics -/// Panics if Actix system is not running. -#[inline] -pub fn spawn(f: Fut) -> JoinHandle<()> -where - Fut: Future + 'static, -{ - tokio::task::spawn_local(f) -} - pub mod signal { //! Asynchronous signal handling (Tokio re-exports). @@ -72,3 +60,15 @@ pub mod task { pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; } + +/// Spawns a future on the current [Worker]. +/// +/// # Panics +/// Panics if Actix system is not running. +#[inline] +pub fn spawn(f: Fut) -> JoinHandle<()> +where + Fut: Future + 'static, +{ + tokio::task::spawn_local(f) +} diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index e3ee720b..86774a71 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -23,7 +23,7 @@ static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); #[derive(Clone, Debug)] pub struct System { id: usize, - tx: mpsc::UnboundedSender, + sys_tx: mpsc::UnboundedSender, worker: Worker, } @@ -32,10 +32,13 @@ thread_local!( ); impl System { - /// Constructs new system and sets it as current - pub(crate) fn construct(sys: mpsc::UnboundedSender, worker: Worker) -> Self { + /// Constructs new system and sets it as current. + pub(crate) fn construct( + sys_tx: mpsc::UnboundedSender, + worker: Worker, + ) -> Self { let sys = System { - tx: sys, + sys_tx, worker, id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), }; @@ -103,16 +106,11 @@ impl System { /// Stop the system with a particular exit code. pub fn stop_with_code(&self, code: i32) { - let _ = self.tx.send(SystemCommand::Exit(code)); + let _ = self.sys_tx.send(SystemCommand::Exit(code)); } pub(crate) fn tx(&self) -> &mpsc::UnboundedSender { - &self.tx - } - - /// Get shared reference to system arbiter. - pub fn arbiter(&self) -> &Worker { - &self.worker + &self.sys_tx } /// This function will start Tokio runtime and will finish once the `System::stop()` message @@ -165,18 +163,21 @@ impl Future for SystemWorker { // process system command Some(cmd) => match cmd { SystemCommand::Exit(code) => { - // stop arbiters - for arb in self.workers.values() { - arb.stop(); + // stop workers + for wkr in self.workers.values() { + wkr.stop(); } + // stop event loop if let Some(stop) = self.stop.take() { let _ = stop.send(code); } } + SystemCommand::RegisterArbiter(name, hnd) => { self.workers.insert(name, hnd); } + SystemCommand::DeregisterArbiter(name) => { self.workers.remove(&name); } diff --git a/actix-rt/src/worker.rs b/actix-rt/src/worker.rs index 9447c2c7..d8538639 100644 --- a/actix-rt/src/worker.rs +++ b/actix-rt/src/worker.rs @@ -197,11 +197,6 @@ impl Worker { /// Call a function with a shared reference to an item in this worker's thread-local storage. /// - /// # Examples - /// ``` - /// - /// ``` - /// /// # Panics /// Panics if item is not in worker's thread-local item storage. pub fn get_item(mut f: F) -> R @@ -252,7 +247,7 @@ impl Future for WorkerRunner { // channel closed; no more messages can be received None => return Poll::Ready(()), - // process arbiter command + // process worker command Some(item) => match item { WorkerCommand::Stop => return Poll::Ready(()), WorkerCommand::Execute(task_fut) => { diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 5c434709..fea8acbd 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,8 +1,11 @@ use std::time::Duration; use std::{io, thread}; -use actix_rt::time::{sleep_until, Instant}; -use actix_rt::System; +use actix_rt::{ + self as rt, + time::{sleep_until, Instant}, + System, +}; use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; use slab::Slab; @@ -401,10 +404,11 @@ impl Accept { // after the sleep a Timer interest is sent to Accept Poll let waker = self.waker.clone(); - System::current().arbiter().spawn(Box::pin(async move { + rt::spawn(async move { sleep_until(Instant::now() + Duration::from_millis(510)).await; waker.wake(WakerInterest::Timer); - })); + }); + return; } }