From 7e6b23af6f10ab97dc23b24a65f4a5af651750bb Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sun, 31 Jan 2021 02:51:29 +0000 Subject: [PATCH] rename back to arbiter --- actix-rt/CHANGES.md | 21 ++-- actix-rt/src/{worker.rs => arbiter.rs} | 134 ++++++++++++------------- actix-rt/src/lib.rs | 6 +- actix-rt/src/system.rs | 46 ++++----- actix-rt/tests/tests.rs | 102 +++++++++---------- actix-server/src/accept.rs | 2 +- actix-server/src/worker.rs | 2 +- 7 files changed, 155 insertions(+), 158 deletions(-) rename actix-rt/src/{worker.rs => arbiter.rs} (63%) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index a22d3794..3f2db63f 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,25 +1,22 @@ # Changes ## Unreleased - 2021-xx-xx - -* Rename `Arbiter => Worker`. [#254] * Remove `run_in_tokio`, `attach_to_tokio` and `AsyncSystemRunner`. [#253] * Return `JoinHandle` from `actix_rt::spawn`. [#253] -* Remove old `Worker::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] -* Rename `Worker::{send => spawn}` and `Worker::{exec_fn => spawn_fn}`. [#253] -* Remove `Worker::exec`. [#253] -* Remove `System::arbiter`. [#256] -* Remove deprecated `Worker::local_join` and `Worker::is_running`. [#253] -* `Worker::spawn` now accepts !Unpin futures. [#256] +* Remove old `Arbiter::spawn`. Implementation is now inlined into `actix_rt::spawn`. [#253] +* Rename `Arbiter::{send => spawn}` and `Arbiter::{exec_fn => spawn_fn}`. [#253] +* Remove `Arbiter::exec`. [#253] +* Remove deprecated `Arbiter::local_join` and `Arbiter::is_running`. [#253] +* `Arbiter::spawn` now accepts !Unpin futures. [#256] * `System::new` no longer takes arguments. [#257] * Remove `System::with_current`. [#257] * Remove `Builder`. [#257] * Add `System::with_init` as replacement for `Builder::run`. [#257] * Rename `System::{is_set => is_registered}`. [#257] -* Add `WorkerHandle` for sending messages to non-current-thread workers. [#257]. -* `System::worker` now returns a `&WorkerHandle`. [#257] -* Rename `Worker::{current => handle}` and return a `WorkerHandle` instead. [#257] -* `Worker::join` now takes self by value. [#257] +* Add `ArbiterHandle` for sending messages to non-current-thread arbiters. [#257]. +* `System::arbiter` now returns a `&ArbiterHandle`. [#257] +* Rename `Arbiter::{current => handle}` and return a `ArbiterHandle` instead. [#257] +* `Arbiter::join` now takes self by value. [#257] [#253]: https://github.com/actix/actix-net/pull/253 [#254]: https://github.com/actix/actix-net/pull/254 diff --git a/actix-rt/src/worker.rs b/actix-rt/src/arbiter.rs similarity index 63% rename from actix-rt/src/worker.rs rename to actix-rt/src/arbiter.rs index b7925252..a7f81fc1 100644 --- a/actix-rt/src/worker.rs +++ b/actix-rt/src/arbiter.rs @@ -21,55 +21,55 @@ use crate::{ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); thread_local!( - static HANDLE: RefCell> = RefCell::new(None); + static HANDLE: RefCell> = RefCell::new(None); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); -pub(crate) enum WorkerCommand { +pub(crate) enum ArbiterCommand { Stop, Execute(Pin + Send>>), } -impl fmt::Debug for WorkerCommand { +impl fmt::Debug for ArbiterCommand { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - WorkerCommand::Stop => write!(f, "WorkerCommand::Stop"), - WorkerCommand::Execute(_) => write!(f, "WorkerCommand::Execute"), + ArbiterCommand::Stop => write!(f, "ArbiterCommand::Stop"), + ArbiterCommand::Execute(_) => write!(f, "ArbiterCommand::Execute"), } } } -/// A handle for sending spawn and stop messages to a [Worker]. +/// A handle for sending spawn and stop messages to an [Arbiter]. #[derive(Debug, Clone)] -pub struct WorkerHandle { - sender: mpsc::UnboundedSender, +pub struct ArbiterHandle { + sender: mpsc::UnboundedSender, } -impl WorkerHandle { - pub(crate) fn new(sender: mpsc::UnboundedSender) -> Self { +impl ArbiterHandle { + pub(crate) fn new(sender: mpsc::UnboundedSender) -> Self { Self { sender } } - /// Send a future to the Worker's thread and spawn it. + /// Send a future to the [Arbiter]'s thread and spawn it. /// /// If you require a result, include a response channel in the future. /// - /// Returns true if future was sent successfully and false if the Worker has died. + /// Returns true if future was sent successfully and false if the [Arbiter] has died. pub fn spawn(&self, future: Fut) -> bool where Fut: Future + Send + 'static, { self.sender - .send(WorkerCommand::Execute(Box::pin(future))) + .send(ArbiterCommand::Execute(Box::pin(future))) .is_ok() } - /// Send a function to the Worker's thread and execute it. + /// Send a function to the [Arbiter]'s thread and execute it. /// /// Any result from the function is discarded. If you require a result, include a response /// channel in the function. /// - /// Returns true if function was sent successfully and false if the Worker has died. + /// Returns true if function was sent successfully and false if the [Arbiter] has died. pub fn spawn_fn(&self, f: F) -> bool where F: FnOnce() + Send + 'static, @@ -77,35 +77,35 @@ impl WorkerHandle { self.spawn(async { f() }) } - /// Instruct worker to stop processing it's event loop. + /// Instruct [Arbiter] to stop processing it's event loop. /// - /// Returns true if stop message was sent successfully and false if the Worker has been dropped. + /// Returns true if stop message was sent successfully and false if the [Arbiter] has + /// been dropped. pub fn stop(&self) -> bool { - self.sender.send(WorkerCommand::Stop).is_ok() + self.sender.send(ArbiterCommand::Stop).is_ok() } } -/// A worker represent a thread that provides an asynchronous execution environment for futures +/// A arbiter represents a thread that provides an asynchronous execution environment for futures /// and functions. /// -/// When a Worker is created, it spawns a new [OS thread](thread), and hosts an event loop. +/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop. #[derive(Debug)] -pub struct Worker { - sender: mpsc::UnboundedSender, +pub struct Arbiter { + sender: mpsc::UnboundedSender, thread_handle: thread::JoinHandle<()>, } -impl Worker { - /// Spawn new thread and run event loop in spawned thread. - /// - /// Returns handle of newly created worker. +impl Arbiter { + /// Spawn new Arbiter thread and start its event loop. /// /// # Panics /// Panics if a [System] is not registered on the current thread. #[allow(clippy::new_without_default)] - pub fn new() -> Worker { + pub fn new() -> Arbiter { let id = COUNT.fetch_add(1, Ordering::Relaxed); - let name = format!("actix-rt:worker:{}", id); + let system_id = System::current().id(); + let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id); let sys = System::current(); let (tx, rx) = mpsc::unbounded_channel(); @@ -115,89 +115,89 @@ impl Worker { let tx = tx.clone(); move || { let rt = Runtime::new().expect("Can not create Runtime"); - let hnd = WorkerHandle::new(tx); + let hnd = ArbiterHandle::new(tx); System::set_current(sys); STORAGE.with(|cell| cell.borrow_mut().clear()); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); - // register worker + // register arbiter let _ = System::current() .tx() - .send(SystemCommand::RegisterWorker(id, hnd)); + .send(SystemCommand::RegisterArbiter(id, hnd)); - // run worker event processing loop - rt.block_on(WorkerRunner { rx }); + // run arbiter event processing loop + rt.block_on(ArbiterRunner { rx }); - // deregister worker + // deregister arbiter let _ = System::current() .tx() - .send(SystemCommand::DeregisterWorker(id)); + .send(SystemCommand::DeregisterArbiter(id)); } }) .unwrap_or_else(|err| { - panic!("Cannot spawn a Worker's thread {:?}: {:?}", &name, err) + panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err) }); - Worker { + Arbiter { sender: tx, thread_handle, } } - /// Sets up a worker runner on the current thread using the provided runtime local task set. - pub(crate) fn new_current_thread(local: &LocalSet) -> WorkerHandle { + /// Sets up an Arbiter runner on the current thread using the provided runtime local task set. + pub(crate) fn new_current_thread(local: &LocalSet) -> ArbiterHandle { let (tx, rx) = mpsc::unbounded_channel(); - let hnd = WorkerHandle::new(tx); + let hnd = ArbiterHandle::new(tx); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); STORAGE.with(|cell| cell.borrow_mut().clear()); - local.spawn_local(WorkerRunner { rx }); + local.spawn_local(ArbiterRunner { rx }); hnd } - /// Return a handle to the worker. + /// Return a handle to the Arbiter's message sender. /// /// # Panics - /// Panics if no Worker is running on the current thread. - pub fn handle() -> WorkerHandle { + /// Panics if no Arbiter is running on the current thread. + pub fn handle() -> ArbiterHandle { HANDLE.with(|cell| match *cell.borrow() { Some(ref addr) => addr.clone(), - None => panic!("Worker is not running."), + None => panic!("Arbiter is not running."), }) } - /// Stop worker from continuing it's event loop. + /// Stop Arbiter from continuing it's event loop. /// - /// Returns true if stop message was sent successfully and false if the Worker has been dropped. + /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. pub fn stop(&self) -> bool { - self.sender.send(WorkerCommand::Stop).is_ok() + self.sender.send(ArbiterCommand::Stop).is_ok() } - /// Send a future to the Worker's thread and spawn it. + /// Send a future to the Arbiter's thread and spawn it. /// /// If you require a result, include a response channel in the future. /// - /// Returns true if future was sent successfully and false if the Worker has died. + /// Returns true if future was sent successfully and false if the Arbiter has died. pub fn spawn(&self, future: Fut) -> bool where Fut: Future + Send + 'static, { self.sender - .send(WorkerCommand::Execute(Box::pin(future))) + .send(ArbiterCommand::Execute(Box::pin(future))) .is_ok() } - /// Send a function to the Worker's thread and execute it. + /// Send a function to the Arbiter's thread and execute it. /// /// Any result from the function is discarded. If you require a result, include a response /// channel in the function. /// - /// Returns true if function was sent successfully and false if the Worker has died. + /// Returns true if function was sent successfully and false if the Arbiter has died. pub fn spawn_fn(&self, f: F) -> bool where F: FnOnce() + Send + 'static, @@ -205,29 +205,29 @@ impl Worker { self.spawn(async { f() }) } - /// Wait for worker's event loop to complete. + /// Wait for Arbiter's event loop to complete. /// /// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join). pub fn join(self) -> thread::Result<()> { self.thread_handle.join() } - /// Insert item into worker's thread-local storage. + /// Insert item into Arbiter's thread-local storage. /// /// Overwrites any item of the same type previously inserted. pub fn set_item(item: T) { STORAGE.with(move |cell| cell.borrow_mut().insert(TypeId::of::(), Box::new(item))); } - /// Check if worker's thread-local storage contains an item type. + /// Check if Arbiter's thread-local storage contains an item type. pub fn contains_item() -> bool { STORAGE.with(move |cell| cell.borrow().contains_key(&TypeId::of::())) } - /// Call a function with a shared reference to an item in this worker's thread-local storage. + /// Call a function with a shared reference to an item in this Arbiter's thread-local storage. /// /// # Panics - /// Panics if item is not in worker's thread-local item storage. + /// Panics if item is not in Arbiter's thread-local item storage. pub fn get_item(mut f: F) -> R where F: FnMut(&T) -> R, @@ -242,10 +242,10 @@ impl Worker { }) } - /// Call a function with a mutable reference to an item in this worker's thread-local storage. + /// Call a function with a mutable reference to an item in this Arbiter's thread-local storage. /// /// # Panics - /// Panics if item is not in worker's thread-local item storage. + /// Panics if item is not in Arbiter's thread-local item storage. pub fn get_mut_item(mut f: F) -> R where F: FnMut(&mut T) -> R, @@ -261,12 +261,12 @@ impl Worker { } } -/// A persistent worker future that processes worker commands. -struct WorkerRunner { - rx: mpsc::UnboundedReceiver, +/// A persistent future that processes [Arbiter] commands. +struct ArbiterRunner { + rx: mpsc::UnboundedReceiver, } -impl Future for WorkerRunner { +impl Future for ArbiterRunner { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -276,12 +276,12 @@ impl Future for WorkerRunner { // channel closed; no more messages can be received None => return Poll::Ready(()), - // process worker command + // process arbiter command Some(item) => match item { - WorkerCommand::Stop => { + ArbiterCommand::Stop => { return Poll::Ready(()); } - WorkerCommand::Execute(task_fut) => { + ArbiterCommand::Execute(task_fut) => { tokio::task::spawn_local(task_fut); } }, diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 40d31f3c..bf8a4796 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -15,13 +15,13 @@ use tokio::task::JoinHandle; #[cfg(all(feature = "macros", not(test)))] pub use actix_macros::{main, test}; +mod arbiter; mod runtime; mod system; -mod worker; +pub use self::arbiter::{Arbiter, ArbiterHandle}; pub use self::runtime::Runtime; pub use self::system::{System, SystemRunner}; -pub use self::worker::{Worker, WorkerHandle}; pub mod signal { //! Asynchronous signal handling (Tokio re-exports). @@ -59,7 +59,7 @@ pub mod task { pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; } -/// Spawns a future on the current [worker](Worker). +/// Spawns a future on the current thread. /// /// # Panics /// Panics if Actix system is not running. diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index d26fa1d7..260eb23e 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -11,7 +11,7 @@ use std::{ use futures_core::ready; use tokio::sync::{mpsc, oneshot}; -use crate::{worker::WorkerHandle, Runtime, Worker}; +use crate::{arbiter::ArbiterHandle, Arbiter, Runtime}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -25,8 +25,8 @@ pub struct System { id: usize, sys_tx: mpsc::UnboundedSender, - /// First worker that is created as part of the System. - worker_handle: WorkerHandle, + /// Handle to the first [Arbiter] that is created with the System. + arbiter_handle: ArbiterHandle, } impl System { @@ -53,11 +53,11 @@ impl System { /// Constructs new system and registers it on the current thread. pub(crate) fn construct( sys_tx: mpsc::UnboundedSender, - worker: WorkerHandle, + arbiter_handle: ArbiterHandle, ) -> Self { let sys = System { sys_tx, - worker_handle: worker, + arbiter_handle, id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), }; @@ -71,11 +71,11 @@ impl System { let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created."); - let system = System::construct(sys_tx, Worker::new_current_thread(rt.local_set())); + let system = System::construct(sys_tx, Arbiter::new_current_thread(rt.local_set())); - // init background system worker - let sys_worker = SystemController::new(sys_rx, stop_tx); - rt.spawn(sys_worker); + // init background system arbiter + let sys_ctrl = SystemController::new(sys_rx, stop_tx); + rt.spawn(sys_ctrl); // run system init future rt.block_on(init_fut); @@ -98,9 +98,9 @@ impl System { }) } - /// Get handle to a the System's initial [Worker]. - pub fn worker(&self) -> &WorkerHandle { - &self.worker_handle + /// Get handle to a the System's initial [Arbiter]. + pub fn arbiter(&self) -> &ArbiterHandle { + &self.arbiter_handle } /// Check if there is a System registered on the current thread. @@ -179,17 +179,17 @@ impl SystemRunner { #[derive(Debug)] pub(crate) enum SystemCommand { Exit(i32), - RegisterWorker(usize, WorkerHandle), - DeregisterWorker(usize), + RegisterArbiter(usize, ArbiterHandle), + DeregisterArbiter(usize), } /// There is one `SystemController` per [System]. It runs in the background, keeping track of -/// [Worker]s and is able to distribute a system-wide stop command. +/// [Arbiter]s and is able to distribute a system-wide stop command. #[derive(Debug)] pub(crate) struct SystemController { stop_tx: Option>, cmd_rx: mpsc::UnboundedReceiver, - workers: HashMap, + arbiters: HashMap, } impl SystemController { @@ -200,7 +200,7 @@ impl SystemController { SystemController { cmd_rx, stop_tx: Some(stop_tx), - workers: HashMap::with_capacity(4), + arbiters: HashMap::with_capacity(4), } } } @@ -218,8 +218,8 @@ impl Future for SystemController { // process system command Some(cmd) => match cmd { SystemCommand::Exit(code) => { - // stop workers - for wkr in self.workers.values() { + // stop all arbiters + for wkr in self.arbiters.values() { wkr.stop(); } @@ -230,12 +230,12 @@ impl Future for SystemController { } } - SystemCommand::RegisterWorker(name, hnd) => { - self.workers.insert(name, hnd); + SystemCommand::RegisterArbiter(name, hnd) => { + self.arbiters.insert(name, hnd); } - SystemCommand::DeregisterWorker(name) => { - self.workers.remove(&name); + SystemCommand::DeregisterArbiter(name) => { + self.arbiters.remove(&name); } }, } diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 7cd6619e..d4c16945 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -4,7 +4,7 @@ use std::{ time::{Duration, Instant}, }; -use actix_rt::{System, Worker}; +use actix_rt::{System, Arbiter}; use tokio::sync::oneshot; #[test] @@ -21,51 +21,51 @@ fn await_for_timer() { } #[test] -fn join_another_worker() { +fn join_another_arbiter() { let time = Duration::from_secs(1); let instant = Instant::now(); System::new().block_on(async move { - let worker = Worker::new(); - worker.spawn(Box::pin(async move { + let arbiter = Arbiter::new(); + arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; - Worker::handle().stop(); + Arbiter::handle().stop(); })); - worker.join().unwrap(); + arbiter.join().unwrap(); }); assert!( instant.elapsed() >= time, - "Join on another worker should complete only when it calls stop" + "Join on another arbiter should complete only when it calls stop" ); let instant = Instant::now(); System::new().block_on(async move { - let worker = Worker::new(); - worker.spawn_fn(move || { + let arbiter = Arbiter::new(); + arbiter.spawn_fn(move || { actix_rt::spawn(async move { tokio::time::sleep(time).await; - Worker::handle().stop(); + Arbiter::handle().stop(); }); }); - worker.join().unwrap(); + arbiter.join().unwrap(); }); assert!( instant.elapsed() >= time, - "Join on a worker that has used actix_rt::spawn should wait for said future" + "Join on a arbiter that has used actix_rt::spawn should wait for said future" ); let instant = Instant::now(); System::new().block_on(async move { - let worker = Worker::new(); - worker.spawn(Box::pin(async move { + let arbiter = Arbiter::new(); + arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; - Worker::handle().stop(); + Arbiter::handle().stop(); })); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); }); assert!( instant.elapsed() < time, - "Premature stop of worker should conclude regardless of it's current state" + "Premature stop of arbiter should conclude regardless of it's current state" ); } @@ -110,70 +110,70 @@ fn wait_for_spawns() { } #[test] -fn worker_spawn_fn_runs() { +fn arbiter_spawn_fn_runs() { let _ = System::new(); let (tx, rx) = channel::(); - let worker = Worker::new(); - worker.spawn_fn(move || tx.send(42).unwrap()); + let arbiter = Arbiter::new(); + arbiter.spawn_fn(move || tx.send(42).unwrap()); let num = rx.recv().unwrap(); assert_eq!(num, 42); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] -fn worker_drop_no_panic_fn() { +fn arbiter_drop_no_panic_fn() { let _ = System::new(); - let worker = Worker::new(); - worker.spawn_fn(|| panic!("test")); + let arbiter = Arbiter::new(); + arbiter.spawn_fn(|| panic!("test")); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] -fn worker_drop_no_panic_fut() { +fn arbiter_drop_no_panic_fut() { let _ = System::new(); - let worker = Worker::new(); - worker.spawn(async { panic!("test") }); + let arbiter = Arbiter::new(); + arbiter.spawn(async { panic!("test") }); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] -fn worker_item_storage() { +fn arbiter_item_storage() { let _ = System::new(); - let worker = Worker::new(); + let arbiter = Arbiter::new(); - assert!(!Worker::contains_item::()); - Worker::set_item(42u32); - assert!(Worker::contains_item::()); + assert!(!Arbiter::contains_item::()); + Arbiter::set_item(42u32); + assert!(Arbiter::contains_item::()); - Worker::get_item(|&item: &u32| assert_eq!(item, 42)); - Worker::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42)); + Arbiter::get_item(|&item: &u32| assert_eq!(item, 42)); + Arbiter::get_mut_item(|&mut item: &mut u32| assert_eq!(item, 42)); let thread = thread::spawn(move || { - Worker::get_item(|&_item: &u32| unreachable!("u32 not in this thread")); + Arbiter::get_item(|&_item: &u32| unreachable!("u32 not in this thread")); }) .join(); assert!(thread.is_err()); let thread = thread::spawn(move || { - Worker::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread")); + Arbiter::get_mut_item(|&mut _item: &mut i8| unreachable!("i8 not in this thread")); }) .join(); assert!(thread.is_err()); - worker.stop(); - worker.join().unwrap(); + arbiter.stop(); + arbiter.join().unwrap(); } #[test] @@ -184,30 +184,30 @@ fn no_system_current_panic() { #[test] #[should_panic] -fn no_system_worker_new_panic() { - Worker::new(); +fn no_system_arbiter_new_panic() { + Arbiter::new(); } #[test] -fn system_worker_spawn() { +fn system_arbiter_spawn() { let runner = System::new(); let (tx, rx) = oneshot::channel(); let sys = System::current(); thread::spawn(|| { - // this thread will have no worker in it's thread local so call will panic - Worker::handle(); + // this thread will have no arbiter in it's thread local so call will panic + Arbiter::handle(); }) .join() .unwrap_err(); let thread = thread::spawn(|| { - // this thread will have no worker in it's thread local so use the system handle instead + // this thread will have no arbiter in it's thread local so use the system handle instead System::set_current(sys); let sys = System::current(); - let wrk = sys.worker(); + let wrk = sys.arbiter(); wrk.spawn(async move { tx.send(42u32).unwrap(); System::current().stop(); diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 82c00ef5..a52184d9 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -403,7 +403,7 @@ impl Accept { // after the sleep a Timer interest is sent to Accept Poll let waker = self.waker.clone(); - System::current().worker().spawn(async move { + System::current().arbiter().spawn(async move { sleep_until(Instant::now() + Duration::from_millis(510)).await; waker.wake(WakerInterest::Timer); }); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 6f15d044..59abd491 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -6,7 +6,7 @@ use std::task::{Context, Poll}; use std::time::Duration; use actix_rt::time::{sleep_until, Instant, Sleep}; -use actix_rt::{spawn, Worker as Arbiter}; +use actix_rt::{spawn, Arbiter as Arbiter}; use actix_utils::counter::Counter; use futures_core::future::LocalBoxFuture; use log::{error, info, trace};