diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 6b427363..a547593e 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -4,38 +4,21 @@ ## 2.0.0-beta.3 - 2021-01-31 -* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253] -* Return `JoinHandle` from `actix_rt::spawn`. [#253] -<<<<<<< HEAD -* Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] -* Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253] -* Remove `Worker::exec`. [#253] -* Remove `System::arbiter`. [#256] -* Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253] -* `Worker::spawn` now accepts !Unpin futures. [#256] -======= +* Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunne * Remove old `Arbiter::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] * Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253] * Remove `Arbiter::exec`. [#253] * Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253] * `Arbiter::spawn` now accepts !Unpin futures. [#256] ->>>>>>> master * `System::new` no longer takes arguments. [#257] * Remove `System::with_current`. [#257] * Remove `Builder`. [#257] * Add `System::with_init` as replacement for `Builder::run`. [#257] * Rename `System::{is_set => is_registered}`. [#257] -<<<<<<< HEAD -* Add `WorkerHandle` for sending messages to non-current-thread workers. [#257]. -* `System::worker` now returns a `&WorkerHandle`. [#257] -* Rename `Worker::{current => handle}` and return a `WorkerHandle` instead. [#257] -* `Worker::join` now takes self by value. [#257] -======= * Add `ArbiterHandle` for sending messages to non-current-thread arbiters. [#257]. * `System::arbiter` now returns an `&ArbiterHandle`. [#257] * `Arbiter::current` now returns an `ArbiterHandle` instead. [#257] * `Arbiter::join` now takes self by value. [#257] ->>>>>>> master [#253]: https://github.com/actix/actix-net/pull/253 [#254]: https://github.com/actix/actix-net/pull/254 diff --git a/actix-rt/src/worker.rs b/actix-rt/src/worker.rs deleted file mode 100644 index b7925252..00000000 --- a/actix-rt/src/worker.rs +++ /dev/null @@ -1,299 +0,0 @@ -use std::{ - any::{Any, TypeId}, - cell::RefCell, - collections::HashMap, - fmt, - future::Future, - pin::Pin, - sync::atomic::{AtomicUsize, Ordering}, - task::{Context, Poll}, - thread, -}; - -use futures_core::ready; -use tokio::{sync::mpsc, task::LocalSet}; - -use crate::{ - runtime::Runtime, - system::{System, SystemCommand}, -}; - -pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); - -thread_local!( - static HANDLE: RefCell> = RefCell::new(None); - static STORAGE: RefCell>> = RefCell::new(HashMap::new()); -); - -pub(crate) enum WorkerCommand { - Stop, - Execute(Pin + Send>>), -} - -impl fmt::Debug for WorkerCommand { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - WorkerCommand::Stop => write!(f, "WorkerCommand::Stop"), - WorkerCommand::Execute(_) => write!(f, "WorkerCommand::Execute"), - } - } -} - -/// A handle for sending spawn and stop messages to a [Worker]. -#[derive(Debug, Clone)] -pub struct WorkerHandle { - sender: mpsc::UnboundedSender, -} - -impl WorkerHandle { - pub(crate) fn new(sender: mpsc::UnboundedSender) -> Self { - Self { sender } - } - - /// Send a future to the Worker's thread and spawn it. - /// - /// If you require a result, include a response channel in the future. - /// - /// Returns true if future was sent successfully and false if the Worker has died. - pub fn spawn(&self, future: Fut) -> bool - where - Fut: Future + Send + 'static, - { - self.sender - .send(WorkerCommand::Execute(Box::pin(future))) - .is_ok() - } - - /// Send a function to the Worker's thread and execute it. - /// - /// Any result from the function is discarded. If you require a result, include a response - /// channel in the function. - /// - /// Returns true if function was sent successfully and false if the Worker has died. - pub fn spawn_fn(&self, f: F) -> bool - where - F: FnOnce() + Send + 'static, - { - self.spawn(async { f() }) - } - - /// Instruct worker to stop processing it's event loop. - /// - /// Returns true if stop message was sent successfully and false if the Worker has been dropped. - pub fn stop(&self) -> bool { - self.sender.send(WorkerCommand::Stop).is_ok() - } -} - -/// A worker represent a thread that provides an asynchronous execution environment for futures -/// and functions. -/// -/// When a Worker is created, it spawns a new [OS thread](thread), and hosts an event loop. -#[derive(Debug)] -pub struct Worker { - sender: mpsc::UnboundedSender, - thread_handle: thread::JoinHandle<()>, -} - -impl Worker { - /// Spawn new thread and run event loop in spawned thread. - /// - /// Returns handle of newly created worker. - /// - /// # Panics - /// Panics if a [System] is not registered on the current thread. - #[allow(clippy::new_without_default)] - pub fn new() -> Worker { - let id = COUNT.fetch_add(1, Ordering::Relaxed); - let name = format!("actix-rt:worker:{}", id); - let sys = System::current(); - let (tx, rx) = mpsc::unbounded_channel(); - - let thread_handle = thread::Builder::new() - .name(name.clone()) - .spawn({ - let tx = tx.clone(); - move || { - let rt = Runtime::new().expect("Can not create Runtime"); - let hnd = WorkerHandle::new(tx); - - System::set_current(sys); - - STORAGE.with(|cell| cell.borrow_mut().clear()); - HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); - - // register worker - let _ = System::current() - .tx() - .send(SystemCommand::RegisterWorker(id, hnd)); - - // run worker event processing loop - rt.block_on(WorkerRunner { rx }); - - // deregister worker - let _ = System::current() - .tx() - .send(SystemCommand::DeregisterWorker(id)); - } - }) - .unwrap_or_else(|err| { - panic!("Cannot spawn a Worker's thread {:?}: {:?}", &name, err) - }); - - Worker { - sender: tx, - thread_handle, - } - } - - /// Sets up a worker runner on the current thread using the provided runtime local task set. - pub(crate) fn new_current_thread(local: &LocalSet) -> WorkerHandle { - let (tx, rx) = mpsc::unbounded_channel(); - - let hnd = WorkerHandle::new(tx); - - HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); - STORAGE.with(|cell| cell.borrow_mut().clear()); - - local.spawn_local(WorkerRunner { rx }); - - hnd - } - - /// Return a handle to the worker. - /// - /// # Panics - /// Panics if no Worker is running on the current thread. - pub fn handle() -> WorkerHandle { - HANDLE.with(|cell| match *cell.borrow() { - Some(ref addr) => addr.clone(), - None => panic!("Worker is not running."), - }) - } - - /// Stop worker from continuing it's event loop. - /// - /// Returns true if stop message was sent successfully and false if the Worker has been dropped. - pub fn stop(&self) -> bool { - self.sender.send(WorkerCommand::Stop).is_ok() - } - - /// Send a future to the Worker's thread and spawn it. - /// - /// If you require a result, include a response channel in the future. - /// - /// Returns true if future was sent successfully and false if the Worker has died. - pub fn spawn(&self, future: Fut) -> bool - where - Fut: Future + Send + 'static, - { - self.sender - .send(WorkerCommand::Execute(Box::pin(future))) - .is_ok() - } - - /// Send a function to the Worker's thread and execute it. - /// - /// Any result from the function is discarded. If you require a result, include a response - /// channel in the function. - /// - /// Returns true if function was sent successfully and false if the Worker has died. - pub fn spawn_fn(&self, f: F) -> bool - where - F: FnOnce() + Send + 'static, - { - self.spawn(async { f() }) - } - - /// Wait for worker's event loop to complete. - /// - /// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join). - pub fn join(self) -> thread::Result<()> { - self.thread_handle.join() - } - - /// Insert item into worker's thread-local storage. - /// - /// Overwrites any item of the same type previously inserted. - pub fn set_item(item: T) { - STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::(), Box::new(item))); - } - - /// Check if worker's thread-local storage contains an item type. - pub fn contains_item() -> bool { - STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::())) - } - - /// Call a function with a shared reference to an item in this worker's thread-local storage. - /// - /// # Panics - /// Panics if item is not in worker's thread-local item storage. - pub fn get_item(mut f: F) -> R - where - F: FnMut(&T) -> R, - { - STORAGE.with(move |cell| { - let st = cell.borrow(); - - let type_id = TypeId::of::(); - let item = st.get(&type_id).and_then(downcast_ref).unwrap(); - - f(item) - }) - } - - /// Call a function with a mutable reference to an item in this worker's thread-local storage. - /// - /// # Panics - /// Panics if item is not in worker's thread-local item storage. - pub fn get_mut_item(mut f: F) -> R - where - F: FnMut(&mut T) -> R, - { - STORAGE.with(move |cell| { - let mut st = cell.borrow_mut(); - - let type_id = TypeId::of::(); - let item = st.get_mut(&type_id).and_then(downcast_mut).unwrap(); - - f(item) - }) - } -} - -/// A persistent worker future that processes worker commands. -struct WorkerRunner { - rx: mpsc::UnboundedReceiver, -} - -impl Future for WorkerRunner { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // process all items currently buffered in channel - loop { - match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { - // channel closed; no more messages can be received - None => return Poll::Ready(()), - - // process worker command - Some(item) => match item { - WorkerCommand::Stop => { - return Poll::Ready(()); - } - WorkerCommand::Execute(task_fut) => { - tokio::task::spawn_local(task_fut); - } - }, - } - } - } -} - -fn downcast_ref(boxed: &Box) -> Option<&T> { - boxed.downcast_ref() -} - -fn downcast_mut(boxed: &mut Box) -> Option<&mut T> { - boxed.downcast_mut() -}