From 63e48356e3b14e35ffabf924dae9e7b10dab1870 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 28 Jan 2021 23:34:06 +0000 Subject: [PATCH] remove tokio runners --- actix-rt/Cargo.toml | 1 + actix-rt/src/arbiter.rs | 113 ++++++++++---------------- actix-rt/src/builder.rs | 61 +------------- actix-rt/src/lib.rs | 10 ++- actix-rt/src/system.rs | 121 +--------------------------- actix-rt/tests/integration_tests.rs | 80 ++---------------- 6 files changed, 59 insertions(+), 327 deletions(-) diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 5bc79416..f5a6ba6a 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -22,6 +22,7 @@ macros = ["actix-macros"] [dependencies] actix-macros = { version = "0.2.0-beta.1", optional = true } +futures-core = { version = "0.3", default-features = false } tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } [dev-dependencies] diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 48d11b36..1a9dbf51 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -10,10 +10,11 @@ use std::{ thread, }; +use futures_core::ready; use tokio::{ sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot::{channel, error::RecvError as Canceled, Sender}, + oneshot::Sender, }, task::LocalSet, }; @@ -65,7 +66,8 @@ impl Default for Arbiter { } impl Arbiter { - pub(crate) fn new_system(local: &LocalSet) -> Self { + /// TODO: make pub(crate) again + pub fn new_system(local: &LocalSet) -> Self { let (tx, rx) = unbounded_channel(); let arb = Arbiter::with_sender(tx); @@ -128,7 +130,7 @@ impl Arbiter { // run loop rt.block_on(ArbiterController { rx }); - // unregister arbiter + // deregister arbiter let _ = System::current() .sys() .send(SystemCommand::DeregisterArbiter(id)); @@ -144,67 +146,35 @@ impl Arbiter { } } - /// Spawn a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for spawning futures on the current - /// thread. - pub fn spawn(future: F) + /// Send a future to the Arbiter's thread and spawn it. + /// + /// If you require a result, include a response channel in the future. + /// + /// Returns true if function was sent successfully and false if the Arbiter has died. + pub fn spawn(&self, future: Fut) -> bool where - F: Future + 'static, + Fut: Future + Unpin + Send + 'static, { - let _ = tokio::task::spawn_local(future); + match self.sender.send(ArbiterCommand::Execute(Box::new(future))) { + Ok(_) => true, + Err(_) => false, + } } - /// Executes a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for executing futures on the current - /// thread. - pub fn spawn_fn(f: F) - where - F: FnOnce() -> R + 'static, - R: Future + 'static, - { - Arbiter::spawn(async { - f(); - }) - } - - /// Send a future to the Arbiter's thread, and spawn it. - pub fn send(&self, future: F) - where - F: Future + Send + Unpin + 'static, - { - let _ = self.sender.send(ArbiterCommand::Execute(Box::new(future))); - } - - /// Send a function to the Arbiter's thread, and execute it. Any result from the function - /// is discarded. - pub fn exec_fn(&self, f: F) + /// Send a function to the Arbiter's thread and execute it. + /// + /// Any result from the function is discarded. If you require a result, include a response + /// channel in the function. + /// + /// Returns true if function was sent successfully and false if the Arbiter has died. + pub fn spawn_fn(&self, f: F) -> bool where F: FnOnce() + Send + 'static, { - let _ = self - .sender - .send(ArbiterCommand::ExecuteFn(Box::new(move || { - f(); - }))); - } - - /// Send a function to the Arbiter's thread. This function will be executed asynchronously. - /// A future is created, and when resolved will contain the result of the function sent - /// to the Arbiters thread. - pub fn exec(&self, f: F) -> impl Future> - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - let (tx, rx) = channel(); - let _ = self - .sender - .send(ArbiterCommand::ExecuteFn(Box::new(move || { - if !tx.is_closed() { - let _ = tx.send(f()); - } - }))); - rx + match self.sender.send(ArbiterCommand::ExecuteFn(Box::new(f))) { + Ok(_) => true, + Err(_) => false, + } } /// Set item to arbiter storage @@ -266,13 +236,6 @@ impl Arbiter { Ok(()) } } - - /// Returns a future that will be completed once all currently spawned futures - /// have completed. - #[deprecated(since = "2.0.0", note = "Arbiter::local_join function is removed.")] - pub async fn local_join() { - unimplemented!("Arbiter::local_join function is removed.") - } } struct ArbiterController { @@ -296,10 +259,14 @@ impl Future for ArbiterController { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // process all items currently buffered in channel loop { - match Pin::new(&mut self.rx).poll_recv(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(item)) => match item { + match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + // channel closed; no more messages can be received + None => return Poll::Ready(()), + + // process arbiter command + Some(item) => match item { ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Execute(fut) => { tokio::task::spawn_local(fut); @@ -308,7 +275,6 @@ impl Future for ArbiterController { f.call_box(); } }, - Poll::Pending => return Poll::Pending, } } } @@ -342,10 +308,14 @@ impl Future for SystemArbiter { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // process all items currently buffered in channel loop { - match Pin::new(&mut self.commands).poll_recv(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(cmd)) => match cmd { + match ready!(Pin::new(&mut self.commands).poll_recv(cx)) { + // channel closed; no more messages can be received + None => return Poll::Ready(()), + + // process system command + Some(cmd) => match cmd { SystemCommand::Exit(code) => { // stop arbiters for arb in self.arbiters.values() { @@ -363,7 +333,6 @@ impl Future for SystemArbiter { self.arbiters.remove(&name); } }, - Poll::Pending => return Poll::Pending, } } } diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index c43af7c5..56cfcb91 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,11 +1,8 @@ use std::{borrow::Cow, future::Future, io}; -use tokio::{ - sync::{ - mpsc::unbounded_channel, - oneshot::{channel, Receiver}, - }, - task::LocalSet, +use tokio::sync::{ + mpsc::unbounded_channel, + oneshot::{channel, Receiver}, }; use crate::{ @@ -56,13 +53,6 @@ impl Builder { self.create_runtime(|| {}) } - /// Create new System that can run asynchronously. - /// - /// This method panics if it cannot start the system arbiter - pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner { - self.create_async_runtime(local) - } - /// This function will start Tokio runtime and will finish once the `System::stop()` message /// is called. Function `f` is called within Tokio runtime context. pub fn run(self, f: F) -> io::Result<()> @@ -72,22 +62,6 @@ impl Builder { self.create_runtime(f).run() } - fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { - let (stop_tx, stop_rx) = channel(); - let (sys_sender, sys_receiver) = unbounded_channel(); - - let system = - System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic); - - // system arbiter - let arb = SystemArbiter::new(stop_tx, sys_receiver); - - // start the system arbiter - let _ = local.spawn_local(arb); - - AsyncSystemRunner { system, stop_rx } - } - fn create_runtime(self, f: F) -> SystemRunner where F: FnOnce(), @@ -115,35 +89,6 @@ impl Builder { } } -#[derive(Debug)] -pub(crate) struct AsyncSystemRunner { - system: System, - stop_rx: Receiver, -} - -impl AsyncSystemRunner { - /// This function will start event loop and returns a future that resolves once the - /// `System::stop()` function is called. - pub(crate) async fn run(self) -> Result<(), io::Error> { - let AsyncSystemRunner { stop_rx: stop, .. } = self; - - // run loop - match stop.await { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - } - } -} - /// Helper object that runs System's event loop #[must_use = "SystemRunner must be run"] #[derive(Debug)] diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 2151952e..c2222a79 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -8,6 +8,8 @@ use std::future::Future; +use tokio::task::JoinHandle; + // Cannot define a main macro when compiled into test harness. // Workaround for https://github.com/rust-lang/rust/issues/62127. #[cfg(all(feature = "macros", not(test)))] @@ -26,13 +28,13 @@ pub use self::system::System; /// Spawns a future on the current arbiter. /// /// # Panics -/// This function panics if actix system is not running. +/// Panics if Actix system is not running. #[inline] -pub fn spawn(f: F) +pub fn spawn(f: Fut) -> JoinHandle<()> where - F: Future + 'static, + Fut: Future + 'static, { - Arbiter::spawn(f) + tokio::task::spawn_local(f) } /// Asynchronous signal handling diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 262f60a6..df0ab57f 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -1,11 +1,10 @@ use std::{ cell::RefCell, - future::Future, io, sync::atomic::{AtomicUsize, Ordering}, }; -use tokio::{sync::mpsc::UnboundedSender, task::LocalSet}; +use tokio::sync::mpsc::UnboundedSender; use crate::{ arbiter::{Arbiter, SystemCommand}, @@ -55,126 +54,10 @@ impl System { /// /// This method panics if it can not create tokio runtime #[allow(clippy::new_ret_no_self)] - pub fn new>(name: T) -> SystemRunner { + pub fn new(name: impl Into) -> SystemRunner { Self::builder().name(name).build() } - /// Create new system using provided tokio `LocalSet`. - /// - /// This method panics if it can not spawn system arbiter - /// - /// Note: This method uses provided `LocalSet` to create a `System` future only. - /// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s. - /// It means that using this method currently it is impossible to make `actix-rt` work in the - /// alternative Tokio runtimes such as those provided by `tokio_compat`. - /// - /// # Examples - /// ``` - /// use tokio::{runtime::Runtime, task::LocalSet}; - /// use actix_rt::System; - /// use futures_util::future::try_join_all; - /// - /// async fn run_application() { - /// let first_task = tokio::spawn(async { - /// // ... - /// # println!("One task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// let second_task = tokio::spawn(async { - /// // ... - /// # println!("Another task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// try_join_all(vec![first_task, second_task]) - /// .await - /// .expect("Some of the futures finished unexpectedly"); - /// } - /// - /// let runtime = tokio::runtime::Builder::new_multi_thread() - /// .worker_threads(2) - /// .enable_all() - /// .build() - /// .unwrap(); - /// - /// let actix_system_task = LocalSet::new(); - /// let sys = System::run_in_tokio("actix-main-system", &actix_system_task); - /// actix_system_task.spawn_local(sys); - /// - /// let rest_operations = run_application(); - /// runtime.block_on(actix_system_task.run_until(rest_operations)); - /// ``` - pub fn run_in_tokio>( - name: T, - local: &LocalSet, - ) -> impl Future> { - Self::builder().name(name).build_async(local).run() - } - - /// Consume the provided Tokio Runtime and start the `System` in it. - /// This method will create a `LocalSet` object and occupy the current thread - /// for the created `System` exclusively. All the other asynchronous tasks that - /// should be executed as well must be aggregated into one future, provided as the last - /// argument to this method. - /// - /// Note: This method uses provided `Runtime` to create a `System` future only. - /// All the [`Arbiter`]s will be started in separate threads using their own Tokio `Runtime`s. - /// It means that using this method currently it is impossible to make `actix-rt` work in the - /// alternative Tokio runtimes such as those provided by `tokio_compat`. - /// - /// # Arguments - /// - /// - `name`: Name of the System - /// - `runtime`: A Tokio Runtime to run the system in. - /// - `rest_operations`: A future to be executed in the runtime along with the System. - /// - /// # Examples - /// ``` - /// use tokio::runtime::Runtime; - /// use actix_rt::System; - /// use futures_util::future::try_join_all; - /// - /// async fn run_application() { - /// let first_task = tokio::spawn(async { - /// // ... - /// # println!("One task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// let second_task = tokio::spawn(async { - /// // ... - /// # println!("Another task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// try_join_all(vec![first_task, second_task]) - /// .await - /// .expect("Some of the futures finished unexpectedly"); - /// } - /// - /// - /// let runtime = tokio::runtime::Builder::new_multi_thread() - /// .worker_threads(2) - /// .enable_all() - /// .build() - /// .unwrap(); - /// - /// let rest_operations = run_application(); - /// System::attach_to_tokio("actix-main-system", runtime, rest_operations); - /// ``` - pub fn attach_to_tokio( - name: impl Into, - runtime: tokio::runtime::Runtime, - rest_operations: Fut, - ) -> Fut::Output { - let actix_system_task = LocalSet::new(); - let sys = System::run_in_tokio(name.into(), &actix_system_task); - actix_system_task.spawn_local(sys); - - runtime.block_on(actix_system_task.run_until(rest_operations)) - } - /// Get current running system. pub fn current() -> System { CURRENT.with(|cell| match *cell.borrow() { diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index 225fd53b..5bfd0733 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -1,6 +1,8 @@ use std::time::{Duration, Instant}; -use futures_util::future::try_join_all; +use actix_rt::{Arbiter, System}; +use futures_util::future::lazy; +use tokio::task::LocalSet; #[test] fn await_for_timer() { @@ -21,7 +23,7 @@ fn join_another_arbiter() { let instant = Instant::now(); actix_rt::System::new("test_join_another_arbiter").block_on(async move { let mut arbiter = actix_rt::Arbiter::new(); - arbiter.send(Box::pin(async move { + arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); })); @@ -35,7 +37,7 @@ fn join_another_arbiter() { let instant = Instant::now(); actix_rt::System::new("test_join_another_arbiter").block_on(async move { let mut arbiter = actix_rt::Arbiter::new(); - arbiter.exec_fn(move || { + arbiter.spawn_fn(move || { actix_rt::spawn(async move { tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); @@ -51,7 +53,7 @@ fn join_another_arbiter() { let instant = Instant::now(); actix_rt::System::new("test_join_another_arbiter").block_on(async move { let mut arbiter = actix_rt::Arbiter::new(); - arbiter.send(Box::pin(async move { + arbiter.spawn(Box::pin(async move { tokio::time::sleep(time).await; actix_rt::Arbiter::current().stop(); })); @@ -102,73 +104,3 @@ fn wait_for_spawns() { assert!(rt.block_on(handle).is_err()); } - -#[test] -fn run_in_existing_tokio() { - use actix_rt::System; - use futures_util::future::try_join_all; - use tokio::task::LocalSet; - - async fn run_application() { - let first_task = tokio::spawn(async { - println!("One task"); - Ok::<(), ()>(()) - }); - - let second_task = tokio::spawn(async { - println!("Another task"); - Ok::<(), ()>(()) - }); - - try_join_all(vec![first_task, second_task]) - .await - .expect("Some of the futures finished unexpectedly"); - } - - let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .enable_all() - .build() - .unwrap(); - - let actix_local_set = LocalSet::new(); - let sys = System::run_in_tokio("actix-main-system", &actix_local_set); - actix_local_set.spawn_local(sys); - - let rest_operations = run_application(); - runtime.block_on(actix_local_set.run_until(rest_operations)); -} - -async fn run_application() -> usize { - let first_task = tokio::spawn(async { - println!("One task"); - Ok::<(), ()>(()) - }); - - let second_task = tokio::spawn(async { - println!("Another task"); - Ok::<(), ()>(()) - }); - - let tasks = try_join_all(vec![first_task, second_task]) - .await - .expect("Some of the futures finished unexpectedly"); - - tasks.len() -} - -#[test] -fn attack_to_tokio() { - use actix_rt::System; - - let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(2) - .enable_all() - .build() - .unwrap(); - - let rest_operations = run_application(); - let res = System::attach_to_tokio("actix-main-system", runtime, rest_operations); - - assert_eq!(res, 2); -}