diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index c0bc750b..d4a71f3f 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -23,3 +23,7 @@ futures-util = { version = "0.3.4", default-features = false, features = ["alloc copyless = "0.1.4" smallvec = "1" tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } + +[dev-dependencies] +futures = "0.3" +tokio = { version = "0.2.6", features = ["full"] } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 21264669..937f94aa 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -57,10 +57,58 @@ impl System { Self::builder().name(name).build() } - #[allow(clippy::new_ret_no_self)] - /// Create new system using provided tokio Handle. + /// Create new system using provided tokio `LocalSet`. /// /// This method panics if it can not spawn system arbiter + /// + /// Note: This method uses provided `LocalSet` to create a `System` future only. + /// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s. + /// It means that using this method currently it is impossible to make `actix-rt` work in the `Runtime` other + /// than the provided by `tokio` 0.2 (e.g. provided by `tokio_compat`). + /// + /// [`Arbiter`]: struct.Arbiter.html + /// + /// # Examples + /// + /// ``` + /// use tokio::{runtime::Runtime, task::LocalSet}; + /// use actix_rt::System; + /// use futures::future::try_join_all; + /// + /// async fn run_application() { + /// let first_task = tokio::spawn(async { + /// // ... + /// # println!("One task"); + /// # Ok::<(),()>(()) + /// }); + /// + /// let second_task = tokio::spawn(async { + /// // ... + /// # println!("Another task"); + /// # Ok::<(),()>(()) + /// }); + /// + /// try_join_all(vec![first_task, second_task]) + /// .await + /// .expect("Some of the futures finished unexpectedly"); + /// } + /// + /// + /// let mut runtime = tokio::runtime::Builder::new() + /// .core_threads(2) + /// .enable_all() + /// .threaded_scheduler() + /// .build() + /// .unwrap(); + /// + /// + /// let actix_system_task = LocalSet::new(); + /// let sys = System::run_in_tokio("actix-main-system", &actix_system_task); + /// actix_system_task.spawn_local(sys); + /// + /// let rest_operations = run_application(); + /// runtime.block_on(actix_system_task.run_until(rest_operations)); + /// ``` pub fn run_in_tokio>( name: T, local: &LocalSet, @@ -71,6 +119,76 @@ impl System { .run_nonblocking() } + /// Consume the provided tokio Runtime and start the `System` in it. + /// This method will create a `LocalSet` object and occupy the current thread + /// for the created `System` exclusively. All the other asynchronous tasks that + /// should be executed as well must be aggregated into one future, provided as the last + /// argument to this method. + /// + /// Note: This method uses provided `Runtime` to create a `System` future only. + /// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s. + /// It means that using this method currently it is impossible to make `actix-rt` work in the + /// `Runtime` other than the provided by `tokio` 0.2 (e.g. provided by `tokio_compat`). + /// + /// [`Arbiter`]: struct.Arbiter.html + /// + /// # Arguments + /// + /// - `runtime`: A tokio Runtime to run the system in. + /// - `name`: Name of the System + /// - `rest_operations`: A future to be executed in the runtime along with the System. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime::Runtime; + /// use actix_rt::System; + /// use futures::future::try_join_all; + /// + /// async fn run_application() { + /// let first_task = tokio::spawn(async { + /// // ... + /// # println!("One task"); + /// # Ok::<(),()>(()) + /// }); + /// + /// let second_task = tokio::spawn(async { + /// // ... + /// # println!("Another task"); + /// # Ok::<(),()>(()) + /// }); + /// + /// try_join_all(vec![first_task, second_task]) + /// .await + /// .expect("Some of the futures finished unexpectedly"); + /// } + /// + /// + /// let runtime = tokio::runtime::Builder::new() + /// .core_threads(2) + /// .enable_all() + /// .threaded_scheduler() + /// .build() + /// .unwrap(); + /// + /// let rest_operations = run_application(); + /// System::attach_to_tokio(runtime, "actix-main-system", rest_operations); + /// ``` + pub fn attach_to_tokio( + mut runtime: tokio::runtime::Runtime, + name: impl Into, + rest_operations: Fut, + ) -> R + where + Fut: std::future::Future, + { + let actix_system_task = LocalSet::new(); + let sys = System::run_in_tokio(name.into(), &actix_system_task); + actix_system_task.spawn_local(sys); + + runtime.block_on(actix_system_task.run_until(rest_operations)) + } + /// Get current running system. pub fn current() -> System { CURRENT.with(|cell| match *cell.borrow() {