From 9a7a626f7bc251df723492a4e1da6d4bd641e9e4 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 29 Oct 2020 18:57:23 +0800 Subject: [PATCH] add generic executor support for actix-rt --- actix-rt/Cargo.toml | 3 + actix-rt/src/arbiter.rs | 62 ++++----- actix-rt/src/builder.rs | 93 ++++--------- actix-rt/src/lib.rs | 7 +- actix-rt/src/runtime.rs | 122 ++++++++-------- actix-rt/src/system.rs | 207 ++++++---------------------- actix-rt/tests/integration_tests.rs | 65 +++++++++ 7 files changed, 236 insertions(+), 323 deletions(-) diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 50da68b8..41a050c0 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -24,3 +24,6 @@ tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "r [dev-dependencies] futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } tokio = { version = "0.2.6", features = ["full"] } +tokio-compat = "0.1.6" +futures-01 = { package = "futures", version = "0.1" } +tokio-01 = { package = "tokio", version = "0.1" } \ No newline at end of file diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 1b94df2e..0e42838a 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -2,6 +2,7 @@ use std::any::{Any, TypeId}; use std::cell::RefCell; use std::collections::HashMap; use std::future::Future; +use std::marker::PhantomData; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; @@ -10,9 +11,8 @@ use std::{fmt, thread}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::oneshot::{channel, Canceled, Sender}; use tokio::stream::Stream; -use tokio::task::LocalSet; -use crate::runtime::Runtime; +use crate::runtime::{DefaultExec, ExecFactory}; use crate::system::System; thread_local!( @@ -60,18 +60,28 @@ impl Default for Arbiter { } impl Arbiter { - pub(crate) fn new_system(local: &LocalSet) -> Self { + pub(crate) fn new_system(exec: &mut E::Executor) -> Self { let (tx, rx) = unbounded(); let arb = Arbiter::with_sender(tx); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); STORAGE.with(|cell| cell.borrow_mut().clear()); - local.spawn_local(ArbiterController { rx }); + let controller: ArbiterController = ArbiterController { + rx, + _exec: Default::default(), + }; + + E::spawn_ref(exec, controller); arb } + #[deprecated(since = "1.2.0", note = "Please use actix_rt::spawn instead")] + pub fn spawn + 'static>(f: F) { + DefaultExec::spawn(f) + } + /// Returns the current thread's arbiter's address. If no Arbiter is present, then this /// function will panic! pub fn current() -> Arbiter { @@ -95,6 +105,10 @@ impl Arbiter { /// Spawn new thread and run event loop in spawned thread. /// Returns address of newly created arbiter. pub fn new() -> Arbiter { + Self::new_with::() + } + + pub fn new_with() -> Arbiter { let id = COUNT.fetch_add(1, Ordering::Relaxed); let name = format!("actix-rt:worker:{}", id); let sys = System::current(); @@ -105,7 +119,7 @@ impl Arbiter { .spawn({ let tx = tx.clone(); move || { - let mut rt = Runtime::new().expect("Can not create Runtime"); + let mut exec = E::build().expect("Can not create Runtime"); let arb = Arbiter::with_sender(tx); STORAGE.with(|cell| cell.borrow_mut().clear()); @@ -121,7 +135,11 @@ impl Arbiter { // start arbiter controller // run loop - rt.block_on(ArbiterController { rx }); + let fut: ArbiterController = ArbiterController { + rx, + _exec: PhantomData, + }; + E::block_on(&mut exec, fut); // unregister arbiter let _ = System::current() @@ -139,29 +157,6 @@ impl Arbiter { } } - /// Spawn a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for spawning futures on the current - /// thread. - pub fn spawn(future: F) - where - F: Future + 'static, - { - tokio::task::spawn_local(future); - } - - /// Executes a future on the current thread. This does not create a new Arbiter - /// or Arbiter address, it is simply a helper for executing futures on the current - /// thread. - pub fn spawn_fn(f: F) - where - F: FnOnce() -> R + 'static, - R: Future + 'static, - { - Arbiter::spawn(async { - f(); - }) - } - /// Send a future to the Arbiter's thread, and spawn it. pub fn send(&self, future: F) where @@ -272,11 +267,12 @@ impl Arbiter { } } -struct ArbiterController { +struct ArbiterController { rx: UnboundedReceiver, + _exec: PhantomData, } -impl Drop for ArbiterController { +impl Drop for ArbiterController { fn drop(&mut self) { if thread::panicking() { if System::current().stop_on_panic() { @@ -289,7 +285,7 @@ impl Drop for ArbiterController { } } -impl Future for ArbiterController { +impl Future for ArbiterController { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -299,7 +295,7 @@ impl Future for ArbiterController { Poll::Ready(Some(item)) => match item { ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Execute(fut) => { - tokio::task::spawn_local(fut); + E::spawn(fut); } ArbiterCommand::ExecuteFn(f) => { f.call_box(); diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index d0e2a627..4eb65991 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,13 +1,13 @@ use std::borrow::Cow; use std::future::Future; use std::io; +use std::marker::PhantomData; use futures_channel::mpsc::unbounded; use futures_channel::oneshot::{channel, Receiver}; -use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemArbiter}; -use crate::runtime::Runtime; +use crate::runtime::ExecFactory; use crate::system::System; /// Builder struct for a actix runtime. @@ -15,19 +15,21 @@ use crate::system::System; /// Either use `Builder::build` to create a system and start actors. /// Alternatively, use `Builder::run` to start the tokio runtime and /// run a function in its context. -pub struct Builder { +pub struct Builder { /// Name of the System. Defaults to "actix" if unset. name: Cow<'static, str>, /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. stop_on_panic: bool, + exec: PhantomData, } -impl Builder { - pub(crate) fn new() -> Self { +impl Builder { + pub(crate) fn new() -> Builder { Builder { name: Cow::Borrowed("actix"), stop_on_panic: false, + exec: PhantomData, } } @@ -49,17 +51,10 @@ impl Builder { /// Create new System. /// /// This method panics if it can not create tokio runtime - pub fn build(self) -> SystemRunner { + pub fn build(self) -> SystemRunner { self.create_runtime(|| {}) } - /// Create new System that can run asynchronously. - /// - /// This method panics if it cannot start the system arbiter - pub(crate) fn build_async(self, local: &LocalSet) -> AsyncSystemRunner { - self.create_async_runtime(local) - } - /// This function will start tokio runtime and will finish once the /// `System::stop()` message get called. /// Function `f` get called within tokio runtime context. @@ -70,97 +65,57 @@ impl Builder { self.create_runtime(f).run() } - fn create_async_runtime(self, local: &LocalSet) -> AsyncSystemRunner { - let (stop_tx, stop) = channel(); - let (sys_sender, sys_receiver) = unbounded(); - - let system = - System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic); - - // system arbiter - let arb = SystemArbiter::new(stop_tx, sys_receiver); - - // start the system arbiter - let _ = local.spawn_local(arb); - - AsyncSystemRunner { stop, system } - } - - fn create_runtime(self, f: F) -> SystemRunner + /// Create runtime with a given instance of type that impl `ExecFactory::Executor` trait. + pub fn create_with_runtime(self, mut rt: E::Executor, f: F) -> SystemRunner where F: FnOnce() + 'static, { let (stop_tx, stop) = channel(); let (sys_sender, sys_receiver) = unbounded(); - let mut rt = Runtime::new().unwrap(); - let system = System::construct( sys_sender, - Arbiter::new_system(rt.local()), + Arbiter::new_system::(&mut rt), self.stop_on_panic, ); // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); - rt.spawn(arb); + E::spawn_ref(&mut rt, arb); // init system arbiter and run configuration method - rt.block_on(async { f() }); + E::block_on(&mut rt, async { f() }); SystemRunner { rt, stop, system } } -} -#[derive(Debug)] -pub(crate) struct AsyncSystemRunner { - stop: Receiver, - system: System, -} - -impl AsyncSystemRunner { - /// This function will start event loop and returns a future that - /// resolves once the `System::stop()` function is called. - pub(crate) fn run_nonblocking(self) -> impl Future> + Send { - let AsyncSystemRunner { stop, .. } = self; - - // run loop - async { - match stop.await { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - } - } + fn create_runtime(self, f: F) -> SystemRunner + where + F: FnOnce() + 'static, + { + let rt = E::build().unwrap(); + self.create_with_runtime(rt, f) } } /// Helper object that runs System's event loop #[must_use = "SystemRunner must be run"] #[derive(Debug)] -pub struct SystemRunner { - rt: Runtime, +pub struct SystemRunner { + rt: E::Executor, stop: Receiver, system: System, } -impl SystemRunner { +impl SystemRunner { /// This function will start event loop and will finish once the /// `System::stop()` function is called. pub fn run(self) -> io::Result<()> { let SystemRunner { mut rt, stop, .. } = self; // run loop - match rt.block_on(stop) { + match E::block_on(&mut rt, stop) { Ok(code) => { if code != 0 { Err(io::Error::new( @@ -181,6 +136,6 @@ impl SystemRunner { where F: Future, { - self.rt.block_on(fut) + E::block_on(&mut self.rt, fut) } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 8097ca05..8d92eca5 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -14,7 +14,7 @@ mod system; pub use self::arbiter::Arbiter; pub use self::builder::{Builder, SystemRunner}; -pub use self::runtime::Runtime; +pub use self::runtime::{DefaultExec, ExecFactory}; pub use self::system::System; #[doc(hidden)] @@ -25,12 +25,15 @@ pub use actix_threadpool as blocking; /// # Panics /// /// This function panics if actix system is not running. +/// +/// This function panics if actix system does not runs on default executor +/// (tokio current-thread executor with a matching version to the actix-rt dependency). #[inline] pub fn spawn(f: F) where F: Future + 'static, { - Arbiter::spawn(f) + DefaultExec::spawn(f) } /// Asynchronous signal handling diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index da1f6d0d..91920d11 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -2,39 +2,34 @@ use std::future::Future; use std::io; use tokio::{runtime, task::LocalSet}; -/// Single-threaded runtime provides a way to start reactor -/// and runtime on the current thread. +/// A trait for construct async executor and run future on it. /// -/// See [module level][mod] documentation for more details. -/// -/// [mod]: index.html -#[derive(Debug)] -pub struct Runtime { - local: LocalSet, - rt: runtime::Runtime, -} +/// A factory trait is necessary as `actix` and `actix-web` can run on multiple instances of +/// executors. Therefore the executor would be constructed multiple times +pub trait ExecFactory: Sized + Unpin + 'static { + type Executor; -impl Runtime { - #[allow(clippy::new_ret_no_self)] - /// Returns a new runtime initialized with default configuration values. - pub fn new() -> io::Result { - let rt = runtime::Builder::new() - .enable_io() - .enable_time() - .basic_scheduler() - .build()?; + fn build() -> io::Result; - Ok(Runtime { - rt, - local: LocalSet::new(), - }) - } + /// Runs the provided future, blocking the current thread until the future + /// completes. + /// + /// This function can be used to synchronously block the current thread + /// until the provided `future` has resolved either successfully or with an + /// error. The result of the future is then returned from this function + /// call. + /// + /// Note that this function will **also** execute any spawned futures on the + /// current thread, but will **not** block until these other spawned futures + /// have completed. Once the function returns, any uncompleted futures + /// remain pending in the `Runtime` instance. These futures will not run + /// until `block_on` or `run` is called again. + /// + /// The caller is responsible for ensuring that other spawned futures + /// complete execution by calling `block_on` or `run`. + fn block_on(exec: &mut Self::Executor, f: F) -> F::Output; - pub(super) fn local(&self) -> &LocalSet { - &self.local - } - - /// Spawn a future onto the single-threaded runtime. + /// Spawn a future onto the single-threaded runtime without reference it. /// /// See [module level][mod] documentation for more details. /// @@ -62,34 +57,49 @@ impl Runtime { /// /// This function panics if the spawn fails. Failure occurs if the executor /// is currently at capacity and is unable to spawn a new future. - pub fn spawn(&self, future: F) -> &Self - where - F: Future + 'static, - { - self.local.spawn_local(future); - self + fn spawn + 'static>(f: F); + + /// Spawn a future onto the single-threaded runtime reference. Useful when you have direct + /// access to executor. + /// + /// *. `spawn_ref` is preferred when you can choose between it and `spawn`. + fn spawn_ref + 'static>(exec: &mut Self::Executor, f: F); +} + +/// Default Single-threaded tokio executor on the current thread. +/// +/// See [module level][mod] documentation for more details. +/// +/// [mod]: index.html +#[derive(Copy, Clone, Debug)] +pub struct DefaultExec; + +pub type DefaultExecutor = (runtime::Runtime, LocalSet); + +impl ExecFactory for DefaultExec { + type Executor = DefaultExecutor; + + fn build() -> io::Result { + let rt = runtime::Builder::new() + .enable_io() + .enable_time() + .basic_scheduler() + .build()?; + + Ok((rt, LocalSet::new())) } - /// Runs the provided future, blocking the current thread until the future - /// completes. - /// - /// This function can be used to synchronously block the current thread - /// until the provided `future` has resolved either successfully or with an - /// error. The result of the future is then returned from this function - /// call. - /// - /// Note that this function will **also** execute any spawned futures on the - /// current thread, but will **not** block until these other spawned futures - /// have completed. Once the function returns, any uncompleted futures - /// remain pending in the `Runtime` instance. These futures will not run - /// until `block_on` or `run` is called again. - /// - /// The caller is responsible for ensuring that other spawned futures - /// complete execution by calling `block_on` or `run`. - pub fn block_on(&mut self, f: F) -> F::Output - where - F: Future, - { - self.local.block_on(&mut self.rt, f) + fn block_on(exec: &mut Self::Executor, f: F) -> ::Output { + let (rt, local) = exec; + + rt.block_on(local.run_until(f)) + } + + fn spawn + 'static>(f: F) { + tokio::task::spawn_local(f); + } + + fn spawn_ref + 'static>(exec: &mut Self::Executor, f: F) { + exec.1.spawn_local(f); } } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index f33854ba..feaddc28 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -1,13 +1,13 @@ use std::cell::RefCell; -use std::future::Future; use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; use futures_channel::mpsc::UnboundedSender; -use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemCommand}; use crate::builder::{Builder, SystemRunner}; +use crate::runtime::ExecFactory; +use crate::DefaultExec; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -25,170 +25,22 @@ thread_local!( ); impl System { - /// Constructs new system and sets it as current - pub(crate) fn construct( - sys: UnboundedSender, - arbiter: Arbiter, - stop_on_panic: bool, - ) -> Self { - let sys = System { - sys, - arbiter, - stop_on_panic, - id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), - }; - System::set_current(sys.clone()); - sys - } - - /// Build a new system with a customized tokio runtime. - /// - /// This allows to customize the runtime. See struct level docs on - /// `Builder` for more information. - pub fn builder() -> Builder { - Builder::new() - } - #[allow(clippy::new_ret_no_self)] /// Create new system. /// /// This method panics if it can not create tokio runtime - pub fn new>(name: T) -> SystemRunner { - Self::builder().name(name).build() + pub fn new>(name: T) -> SystemRunner { + Self::new_with::(name) } - /// Create new system using provided tokio `LocalSet`. - /// - /// This method panics if it can not spawn system arbiter - /// - /// Note: This method uses provided `LocalSet` to create a `System` future only. - /// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s. - /// It means that using this method currently it is impossible to make `actix-rt` work in the - /// alternative `tokio` `Runtime`s (e.g. provided by [`tokio_compat`]). - /// - /// [`Arbiter`]: struct.Arbiter.html - /// [`tokio_compat`]: https://crates.io/crates/tokio-compat - /// - /// # Examples - /// - /// ``` - /// use tokio::{runtime::Runtime, task::LocalSet}; - /// use actix_rt::System; - /// use futures_util::future::try_join_all; - /// - /// async fn run_application() { - /// let first_task = tokio::spawn(async { - /// // ... - /// # println!("One task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// let second_task = tokio::spawn(async { - /// // ... - /// # println!("Another task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// try_join_all(vec![first_task, second_task]) - /// .await - /// .expect("Some of the futures finished unexpectedly"); - /// } - /// - /// - /// let mut runtime = tokio::runtime::Builder::new() - /// .core_threads(2) - /// .enable_all() - /// .threaded_scheduler() - /// .build() - /// .unwrap(); - /// - /// - /// let actix_system_task = LocalSet::new(); - /// let sys = System::run_in_tokio("actix-main-system", &actix_system_task); - /// actix_system_task.spawn_local(sys); - /// - /// let rest_operations = run_application(); - /// runtime.block_on(actix_system_task.run_until(rest_operations)); - /// ``` - pub fn run_in_tokio>( - name: T, - local: &LocalSet, - ) -> impl Future> { - Self::builder() - .name(name) - .build_async(local) - .run_nonblocking() - } - - /// Consume the provided tokio Runtime and start the `System` in it. - /// This method will create a `LocalSet` object and occupy the current thread - /// for the created `System` exclusively. All the other asynchronous tasks that - /// should be executed as well must be aggregated into one future, provided as the last - /// argument to this method. - /// - /// Note: This method uses provided `Runtime` to create a `System` future only. - /// All the [`Arbiter`]s will be started in separate threads using their own tokio `Runtime`s. - /// It means that using this method currently it is impossible to make `actix-rt` work in the - /// alternative `tokio` `Runtime`s (e.g. provided by `tokio_compat`). - /// - /// [`Arbiter`]: struct.Arbiter.html - /// [`tokio_compat`]: https://crates.io/crates/tokio-compat - /// - /// # Arguments - /// - /// - `name`: Name of the System - /// - `runtime`: A tokio Runtime to run the system in. - /// - `rest_operations`: A future to be executed in the runtime along with the System. - /// - /// # Examples - /// - /// ``` - /// use tokio::runtime::Runtime; - /// use actix_rt::System; - /// use futures_util::future::try_join_all; - /// - /// async fn run_application() { - /// let first_task = tokio::spawn(async { - /// // ... - /// # println!("One task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// let second_task = tokio::spawn(async { - /// // ... - /// # println!("Another task"); - /// # Ok::<(),()>(()) - /// }); - /// - /// try_join_all(vec![first_task, second_task]) - /// .await - /// .expect("Some of the futures finished unexpectedly"); - /// } - /// - /// - /// let runtime = tokio::runtime::Builder::new() - /// .core_threads(2) - /// .enable_all() - /// .threaded_scheduler() - /// .build() - /// .unwrap(); - /// - /// let rest_operations = run_application(); - /// System::attach_to_tokio("actix-main-system", runtime, rest_operations); - /// ``` - pub fn attach_to_tokio( - name: impl Into, - mut runtime: tokio::runtime::Runtime, - rest_operations: Fut, - ) -> R + /// This function will start tokio runtime and will finish once the + /// `System::stop()` message get called. + /// Function `f` get called within tokio runtime context. + pub fn run(f: F) -> io::Result<()> where - Fut: std::future::Future, + F: FnOnce() + 'static, { - let actix_system_task = LocalSet::new(); - let sys = System::run_in_tokio(name.into(), &actix_system_task); - actix_system_task.spawn_local(sys); - - runtime.block_on(actix_system_task.run_until(rest_operations)) + Self::run_with::(f) } /// Get current running system. @@ -253,13 +105,42 @@ impl System { &self.arbiter } - /// This function will start tokio runtime and will finish once the - /// `System::stop()` message get called. - /// Function `f` get called within tokio runtime context. - pub fn run(f: F) -> io::Result<()> + /// Build a new system with custom executor. + pub fn builder() -> Builder { + Builder::new() + } + + #[allow(clippy::new_ret_no_self)] + /// Create new system with custom runtime. + /// + /// This method panics if it can not create the runtime type. + pub fn new_with>(name: T) -> SystemRunner { + Self::builder().name(name).build() + } + + pub fn run_with(f: F) -> io::Result<()> where + E: ExecFactory, F: FnOnce() + 'static, { - Self::builder().run(f) + Self::builder::().run(f) + } +} + +impl System { + /// Constructs new system and sets it as current + pub(crate) fn construct( + sys: UnboundedSender, + arbiter: Arbiter, + stop_on_panic: bool, + ) -> Self { + let sys = System { + sys, + arbiter, + stop_on_panic, + id: SYSTEM_COUNT.fetch_add(1, Ordering::SeqCst), + }; + System::set_current(sys.clone()); + sys } } diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index e3296e89..4e228d38 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -1,10 +1,18 @@ +use actix_rt::ExecFactory; use std::time::{Duration, Instant}; +use tokio::macros::support::Future; #[test] fn await_for_timer() { let time = Duration::from_secs(2); let instant = Instant::now(); + actix_rt::System::new("test_wait_timer").block_on(async move { + let arbiter = actix_rt::Arbiter::new(); + arbiter.send(Box::pin(async move { + tokio::time::delay_for(time).await; + actix_rt::Arbiter::current().stop(); + })); tokio::time::delay_for(time).await; }); assert!( @@ -61,3 +69,60 @@ fn join_another_arbiter() { "Premature stop of arbiter should conclude regardless of it's current state" ); } + +struct TokioCompatExec; + +impl ExecFactory for TokioCompatExec { + type Executor = tokio_compat::runtime::current_thread::Runtime; + + fn build() -> std::io::Result { + let rt = tokio_compat::runtime::current_thread::Runtime::new()?; + + Ok(rt) + } + + fn block_on(exec: &mut Self::Executor, f: F) -> ::Output { + exec.block_on_std(f) + } + + fn spawn + 'static>(f: F) { + tokio_compat::runtime::current_thread::TaskExecutor::current() + .spawn_local_std(f) + .unwrap(); + } + + fn spawn_ref + 'static>(exec: &mut Self::Executor, f: F) { + exec.spawn_std(f); + } +} + +#[test] +fn tokio_compat() -> std::io::Result<()> { + // manually construct a compat executor. + let rt = TokioCompatExec::build()?; + + // do some work with rt and pass it to builder + actix_rt::System::builder::() + .create_with_runtime(rt, || {}) + .block_on(async { + let (tx, rx) = tokio::sync::oneshot::channel(); + tokio_01::spawn(futures_01::lazy(|| { + tx.send(251).unwrap(); + Ok(()) + })); + + assert_eq!(251, rx.await.unwrap()); + }); + + // let the system construct the executor and block on it directly. + actix_rt::System::new_with::("compat").block_on(async { + let (tx, rx) = tokio::sync::oneshot::channel(); + tokio::spawn(async move { + tokio::time::delay_for(Duration::from_secs(1)).await; + tx.send(996).unwrap(); + }); + assert_eq!(996, rx.await.unwrap()); + }); + + Ok(()) +}