actix-rt: Make the process of running System in existing Runtime more clear

This commit is contained in:
Igor Aleksanov 2020-08-06 17:33:57 +03:00
parent 7632f51509
commit 8f8bee28fa
2 changed files with 124 additions and 2 deletions

View File

@ -23,3 +23,7 @@ futures-util = { version = "0.3.4", default-features = false, features = ["alloc
copyless = "0.1.4"
smallvec = "1"
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
[dev-dependencies]
futures = "0.3"
tokio = { version = "0.2.6", features = ["full"] }

View File

@ -57,10 +57,58 @@ impl System {
Self::builder().name(name).build()
}
#[allow(clippy::new_ret_no_self)]
/// Create new system using provided tokio Handle.
/// Create new system using provided tokio `LocalSet`.
///
/// This method panics if it can not spawn system arbiter
///
/// Note: This method uses provided `LocalSet` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the `Runtime` other
/// than the provided by `tokio` 0.2 (e.g. provided by `tokio_compat`).
///
/// [`Arbiter`]: struct.Arbiter.html
///
/// # Examples
///
/// ```
/// use tokio::{runtime::Runtime, task::LocalSet};
/// use actix_rt::System;
/// use futures::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
///
/// let mut runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
///
/// let actix_system_task = LocalSet::new();
/// let sys = System::run_in_tokio("actix-main-system", &actix_system_task);
/// actix_system_task.spawn_local(sys);
///
/// let rest_operations = run_application();
/// runtime.block_on(actix_system_task.run_until(rest_operations));
/// ```
pub fn run_in_tokio<T: Into<String>>(
name: T,
local: &LocalSet,
@ -71,6 +119,76 @@ impl System {
.run_nonblocking()
}
/// Consume the provided tokio Runtime and start the `System` in it.
/// This method will create a `LocalSet` object and occupy the current thread
/// for the created `System` exclusively. All the other asynchronous tasks that
/// should be executed as well must be aggregated into one future, provided as the last
/// argument to this method.
///
/// Note: This method uses provided `Runtime` to create a `System` future only.
/// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s.
/// It means that using this method currently it is impossible to make `actix-rt` work in the
/// `Runtime` other than the provided by `tokio` 0.2 (e.g. provided by `tokio_compat`).
///
/// [`Arbiter`]: struct.Arbiter.html
///
/// # Arguments
///
/// - `runtime`: A tokio Runtime to run the system in.
/// - `name`: Name of the System
/// - `rest_operations`: A future to be executed in the runtime along with the System.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Runtime;
/// use actix_rt::System;
/// use futures::future::try_join_all;
///
/// async fn run_application() {
/// let first_task = tokio::spawn(async {
/// // ...
/// # println!("One task");
/// # Ok::<(),()>(())
/// });
///
/// let second_task = tokio::spawn(async {
/// // ...
/// # println!("Another task");
/// # Ok::<(),()>(())
/// });
///
/// try_join_all(vec![first_task, second_task])
/// .await
/// .expect("Some of the futures finished unexpectedly");
/// }
///
///
/// let runtime = tokio::runtime::Builder::new()
/// .core_threads(2)
/// .enable_all()
/// .threaded_scheduler()
/// .build()
/// .unwrap();
///
/// let rest_operations = run_application();
/// System::attach_to_tokio(runtime, "actix-main-system", rest_operations);
/// ```
pub fn attach_to_tokio<Fut, R>(
mut runtime: tokio::runtime::Runtime,
name: impl Into<String>,
rest_operations: Fut,
) -> R
where
Fut: std::future::Future<Output = R>,
{
let actix_system_task = LocalSet::new();
let sys = System::run_in_tokio(name.into(), &actix_system_task);
actix_system_task.spawn_local(sys);
runtime.block_on(actix_system_task.run_until(rest_operations))
}
/// Get current running system.
pub fn current() -> System {
CURRENT.with(|cell| match *cell.borrow() {