diff --git a/actix-rt/src/worker.rs b/actix-rt/src/worker.rs index f586d15f..9447c2c7 100644 --- a/actix-rt/src/worker.rs +++ b/actix-rt/src/worker.rs @@ -27,8 +27,7 @@ thread_local!( pub(crate) enum WorkerCommand { Stop, - Execute(Box + Unpin + Send>), - ExecuteFn(Box), + Execute(Pin + Send>>), } impl fmt::Debug for WorkerCommand { @@ -36,7 +35,6 @@ impl fmt::Debug for WorkerCommand { match self { WorkerCommand::Stop => write!(f, "ArbiterCommand::Stop"), WorkerCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), - WorkerCommand::ExecuteFn(_) => write!(f, "ArbiterCommand::ExecuteFn"), } } } @@ -65,41 +63,6 @@ impl Default for Worker { } impl Worker { - pub(crate) fn new_system(local: &LocalSet) -> Self { - let (tx, rx) = mpsc::unbounded_channel(); - - let arb = Worker::new_handle(tx); - ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - STORAGE.with(|cell| cell.borrow_mut().clear()); - - local.spawn_local(WorkerRunner { rx }); - - arb - } - - fn new_handle(sender: mpsc::UnboundedSender) -> Self { - Self { - sender, - thread_handle: None, - } - } - - /// Returns the current Worker's handle. - /// - /// # Panics - /// Panics if no Worker is running on the current thread. - pub fn current() -> Worker { - ADDR.with(|cell| match *cell.borrow() { - Some(ref addr) => addr.clone(), - None => panic!("Worker is not running."), - }) - } - - /// Stop worker from continuing it's event loop. - pub fn stop(&self) { - let _ = self.sender.send(WorkerCommand::Stop); - } - /// Spawn new thread and run event loop in spawned thread. /// /// Returns handle of newly created worker. @@ -147,6 +110,22 @@ impl Worker { } } + /// Returns the current Worker's handle. + /// + /// # Panics + /// Panics if no Worker is running on the current thread. + pub fn current() -> Worker { + ADDR.with(|cell| match *cell.borrow() { + Some(ref addr) => addr.clone(), + None => panic!("Worker is not running."), + }) + } + + /// Stop worker from continuing it's event loop. + pub fn stop(&self) { + let _ = self.sender.send(WorkerCommand::Stop); + } + /// Send a future to the Arbiter's thread and spawn it. /// /// If you require a result, include a response channel in the future. @@ -154,10 +133,10 @@ impl Worker { /// Returns true if future was sent successfully and false if the Arbiter has died. pub fn spawn(&self, future: Fut) -> bool where - Fut: Future + Unpin + Send + 'static, + Fut: Future + Send + 'static, { self.sender - .send(WorkerCommand::Execute(Box::new(future))) + .send(WorkerCommand::Execute(Box::pin(future))) .is_ok() } @@ -171,9 +150,37 @@ impl Worker { where F: FnOnce() + Send + 'static, { - self.sender - .send(WorkerCommand::ExecuteFn(Box::new(f))) - .is_ok() + self.spawn(async { f() }) + } + + /// Wait for worker's event loop to complete. + /// + /// Joins the underlying OS thread handle, if contained. + pub fn join(&mut self) -> thread::Result<()> { + if let Some(thread_handle) = self.thread_handle.take() { + thread_handle.join() + } else { + Ok(()) + } + } + + pub(crate) fn new_system(local: &LocalSet) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + + let arb = Worker::new_handle(tx); + ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); + STORAGE.with(|cell| cell.borrow_mut().clear()); + + local.spawn_local(WorkerRunner { rx }); + + arb + } + + fn new_handle(sender: mpsc::UnboundedSender) -> Self { + Self { + sender, + thread_handle: None, + } } /// Insert item into worker's thread-local storage. @@ -228,17 +235,6 @@ impl Worker { f(item) }) } - - /// Wait for worker's event loop to complete. - /// - /// Joins the underlying OS thread handle, if contained. - pub fn join(&mut self) -> thread::Result<()> { - if let Some(thread_handle) = self.thread_handle.take() { - thread_handle.join() - } else { - Ok(()) - } - } } /// A persistent worker future that processes worker commands. @@ -262,11 +258,6 @@ impl Future for WorkerRunner { WorkerCommand::Execute(task_fut) => { tokio::task::spawn_local(task_fut); } - WorkerCommand::ExecuteFn(task_fn) => { - tokio::task::spawn_local(async { - task_fn(); - }); - } }, } }