diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index c38373dd..73bcac5c 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -8,8 +8,10 @@ * Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253] * Remove `Arbiter::exec`. [#253] * Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253] +* Rename `Arbiter => Worker`. [#254] [#253]: https://github.com/actix/actix-net/pull/253 +[#254]: https://github.com/actix/actix-net/pull/254 ## 2.0.0-beta.2 - 2021-01-09 diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 273917eb..f5a6ba6a 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -22,7 +22,6 @@ macros = ["actix-macros"] [dependencies] actix-macros = { version = "0.2.0-beta.1", optional = true } -ahash = "0.7" futures-core = { version = "0.3", default-features = false } tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index b8acf71b..01c85512 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -6,9 +6,9 @@ use tokio::sync::{ }; use crate::{ - worker::Worker, runtime::Runtime, system::{System, SystemWorker}, + worker::Worker, }; /// Builder an actix runtime. diff --git a/actix-rt/src/worker.rs b/actix-rt/src/worker.rs index 44c14cb7..6b022eb2 100644 --- a/actix-rt/src/worker.rs +++ b/actix-rt/src/worker.rs @@ -1,6 +1,7 @@ use std::{ any::{Any, TypeId}, cell::RefCell, + collections::HashMap, fmt, future::Future, pin::Pin, @@ -9,12 +10,8 @@ use std::{ thread, }; -use ahash::AHashMap; use futures_core::ready; -use tokio::{ - sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - task::LocalSet, -}; +use tokio::{sync::mpsc, task::LocalSet}; use crate::{ runtime::Runtime, @@ -25,7 +22,7 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); thread_local!( static ADDR: RefCell> = RefCell::new(None); - static STORAGE: RefCell>> = RefCell::new(AHashMap::new()); + static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); pub(crate) enum WorkerCommand { @@ -51,13 +48,13 @@ impl fmt::Debug for WorkerCommand { /// Some Arbiter functions execute on the current thread. #[derive(Debug)] pub struct Worker { - sender: UnboundedSender, + sender: mpsc::UnboundedSender, thread_handle: Option>, } impl Clone for Worker { fn clone(&self) -> Self { - Self::new_with_sender(self.sender.clone()) + Self::new_handle(self.sender.clone()) } } @@ -69,9 +66,9 @@ impl Default for Worker { impl Worker { pub(crate) fn new_system(local: &LocalSet) -> Self { - let (tx, rx) = unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel(); - let arb = Worker::new_with_sender(tx); + let arb = Worker::new_handle(tx); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); STORAGE.with(|cell| cell.borrow_mut().clear()); @@ -80,7 +77,7 @@ impl Worker { arb } - fn new_with_sender(sender: UnboundedSender) -> Self { + fn new_handle(sender: mpsc::UnboundedSender) -> Self { Self { sender, thread_handle: None, @@ -110,7 +107,7 @@ impl Worker { let id = COUNT.fetch_add(1, Ordering::Relaxed); let name = format!("actix-rt:worker:{}", id); let sys = System::current(); - let (tx, rx) = unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel(); let handle = thread::Builder::new() .name(name.clone()) @@ -118,7 +115,7 @@ impl Worker { let tx = tx.clone(); move || { let rt = Runtime::new().expect("Can not create Runtime"); - let arb = Worker::new_with_sender(tx); + let arb = Worker::new_handle(tx); STORAGE.with(|cell| cell.borrow_mut().clear()); @@ -179,8 +176,10 @@ impl Worker { .is_ok() } - /// Insert item to worker's thread-local storage. - pub fn insert_item(item: T) { + /// Insert item into worker's thread-local storage. + /// + /// Overwrites any item of the same type previously inserted. + pub fn set_item(item: T) { STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::(), Box::new(item))); } @@ -216,7 +215,7 @@ impl Worker { /// /// # Panics /// Panics if item is not in worker's thread-local item storage. - pub fn get_item_mut(mut f: F) -> R + pub fn get_mut_item(mut f: F) -> R where F: FnMut(&mut T) -> R, { @@ -244,7 +243,7 @@ impl Worker { /// A persistent worker future that processes worker commands. struct WorkerRunner { - rx: UnboundedReceiver, + rx: mpsc::UnboundedReceiver, } impl Drop for WorkerRunner {