diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 3e77e990..085e608c 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -62,6 +62,11 @@ jobs: command: test args: --all --all-features --no-fail-fast -- --nocapture + - name: tokio compat tests + run: + cd actix-rt + cargo test --features tokio-compat-executor --no-default-features + - name: Generate coverage file if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') run: | diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index 98788274..ea24016f 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -41,3 +41,8 @@ jobs: with: command: test args: --all --all-features --no-fail-fast -- --nocapture + + - name: tokio compat tests + run: + cd actix-rt + cargo test --features tokio-compat-executor --no-default-features diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml index a319fd65..fd50cc7a 100644 --- a/.github/workflows/windows.yml +++ b/.github/workflows/windows.yml @@ -67,3 +67,8 @@ jobs: with: command: test args: --all --all-features --no-fail-fast -- --nocapture + + - name: tokio compat tests + run: + cd actix-rt + cargo test --features tokio-compat-executor --no-default-features diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index b7d272cd..6ba1d617 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -22,7 +22,15 @@ futures-channel = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } copyless = "0.1.4" smallvec = "1" -tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } +tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"], optional = true } +tokio-compat = { version = "0.1.6", features = ["rt-full"], optional = true } [dev-dependencies] tokio = { version = "0.2.6", features = ["full"] } +tokio01 = { package = "tokio", version = "0.1" } +futures01 = { package = "futures", version = "0.1" } + +[features] +default = ["tokio-executor"] +tokio-executor = ["tokio"] +tokio-compat-executor = ["tokio", "tokio-compat"] diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index eff10ca3..9bb6ea4a 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -13,13 +13,14 @@ use futures_util::{ stream::Stream, }; +use crate::executor::spawn_local; use crate::runtime::Runtime; use crate::system::System; use copyless::BoxHelper; +pub use crate::executor::JoinHandle; use smallvec::SmallVec; -pub use tokio::task::JoinHandle; thread_local!( static ADDR: RefCell> = RefCell::new(None); @@ -162,7 +163,7 @@ impl Arbiter { if let Some(rt) = rt { rt.spawn(fut); } else { - tokio::task::spawn_local(fut); + spawn_local(fut); } } }); @@ -184,12 +185,12 @@ impl Arbiter { // Spawn the future on running executor let len = PENDING.with(move |cell| { let mut p = cell.borrow_mut(); - p.push(tokio::task::spawn_local(future)); + p.push(spawn_local(future)); p.len() }); if len > 7 { // Before reaching the inline size - tokio::task::spawn_local(CleanupPending); + spawn_local(CleanupPending); } } else { // Box the future and push it to the queue, this results in double boxing @@ -383,12 +384,12 @@ impl Future for ArbiterController { ArbiterCommand::Execute(fut) => { let len = PENDING.with(move |cell| { let mut p = cell.borrow_mut(); - p.push(tokio::task::spawn_local(fut)); + p.push(spawn_local(fut)); p.len() }); if len > 7 { // Before reaching the inline size - tokio::task::spawn_local(CleanupPending); + spawn_local(CleanupPending); } } ArbiterCommand::ExecuteFn(f) => { diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index f4d9b1bf..f3806a22 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -4,9 +4,9 @@ use std::io; use futures_channel::mpsc::unbounded; use futures_channel::oneshot::{channel, Receiver}; use futures_util::future::{lazy, Future, FutureExt}; -use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemArbiter}; +use crate::executor::LocalSet; use crate::runtime::Runtime; use crate::system::System; diff --git a/actix-rt/src/executor.rs b/actix-rt/src/executor.rs new file mode 100644 index 00000000..5f27d5c2 --- /dev/null +++ b/actix-rt/src/executor.rs @@ -0,0 +1,50 @@ +//! This module provides a wrapper abstraction around used executor. +//! +//! Currently supported executors: +//! +//! - `tokio` (default) +//! - `tokio-compat` + +pub use self::executor_impl::*; + +#[cfg(feature = "tokio-executor")] +mod executor_impl { + use std::io::Result; + pub use tokio::runtime::Runtime; + pub use tokio::task::{spawn_local, JoinHandle, LocalSet}; + + pub fn build_runtime() -> Result { + tokio::runtime::Builder::new() + .enable_io() + .enable_time() + .basic_scheduler() + .build() + } + + pub fn block_on_local(rt: &mut Runtime, local: &LocalSet, f: F) -> F::Output + where + F: std::future::Future + 'static, + { + local.block_on(rt, f) + } +} + +#[cfg(all(not(feature = "tokio-executor"), feature = "tokio-compat-executor"))] +mod executor_impl { + use std::io::Result; + pub use tokio::task::{spawn_local, JoinHandle, LocalSet}; + pub use tokio_compat::runtime::Runtime; + + pub fn build_runtime() -> Result { + tokio_compat::runtime::Builder::new() + .core_threads(1) + .build() + } + + pub fn block_on_local(rt: &mut Runtime, local: &LocalSet, f: F) -> F::Output + where + F: std::future::Future + 'static, + { + rt.block_on_std(local.run_until(f)) + } +} diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index aef78f12..14f463a6 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -7,6 +7,7 @@ pub use actix_macros::{main, test}; mod arbiter; mod builder; +mod executor; mod runtime; mod system; diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index bbafb6f0..f3b677c0 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -1,6 +1,6 @@ +use crate::executor::{LocalSet, Runtime as ExecutorRuntime}; use std::future::Future; use std::io; -use tokio::{runtime, task::LocalSet}; /// Single-threaded runtime provides a way to start reactor /// and runtime on the current thread. @@ -11,18 +11,14 @@ use tokio::{runtime, task::LocalSet}; #[derive(Debug)] pub struct Runtime { local: LocalSet, - rt: runtime::Runtime, + rt: ExecutorRuntime, } impl Runtime { #[allow(clippy::new_ret_no_self)] /// Returns a new runtime initialized with default configuration values. pub fn new() -> io::Result { - let rt = runtime::Builder::new() - .enable_io() - .enable_time() - .basic_scheduler() - .build()?; + let rt = crate::executor::build_runtime()?; Ok(Runtime { rt, @@ -86,6 +82,6 @@ impl Runtime { where F: Future + 'static, { - self.local.block_on(&mut self.rt, f) + crate::executor::block_on_local(&mut self.rt, &self.local, f) } } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index f33854ba..b2fc3834 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -4,10 +4,10 @@ use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; use futures_channel::mpsc::UnboundedSender; -use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemCommand}; use crate::builder::{Builder, SystemRunner}; +use crate::executor::LocalSet; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -143,6 +143,8 @@ impl System { /// # Examples /// /// ``` + /// # #[cfg(feature = "tokio-executor")] // Test won't work for `tokio-compat-executor`. + /// # fn main() { /// use tokio::runtime::Runtime; /// use actix_rt::System; /// use futures_util::future::try_join_all; @@ -175,20 +177,22 @@ impl System { /// /// let rest_operations = run_application(); /// System::attach_to_tokio("actix-main-system", runtime, rest_operations); + /// # } + /// # #[cfg(not(feature = "tokio-executor"))] fn main() {} // Provie a blanket main function, so test will compile. /// ``` pub fn attach_to_tokio( name: impl Into, - mut runtime: tokio::runtime::Runtime, + mut runtime: crate::executor::Runtime, rest_operations: Fut, ) -> R where - Fut: std::future::Future, + Fut: std::future::Future + 'static, { let actix_system_task = LocalSet::new(); let sys = System::run_in_tokio(name.into(), &actix_system_task); actix_system_task.spawn_local(sys); - runtime.block_on(actix_system_task.run_until(rest_operations)) + crate::executor::block_on_local(&mut runtime, &actix_system_task, rest_operations) } /// Get current running system. diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index 8e775bab..e57f45ca 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -112,3 +112,39 @@ fn join_current_arbiter() { "local_join should await only for the already spawned futures" ); } + +#[test] +#[cfg(all(feature = "tokio-compat-executor", not(feature = "tokio-executor")))] +fn tokio_compat_timer() { + use tokio_compat::prelude::*; + + let time = Duration::from_secs(2); + let instant = Instant::now(); + actix_rt::System::new("test_wait_timer").block_on(async move { + // Spawn a `std::Future`. + tokio::time::delay_for(time).await; + let when = Instant::now() + time; + // Spawn a `futures01::Future`. + tokio01::timer::Delay::new(when) + // convert the delay future into a `std::future` that we can `await`. + .compat() + .await + .expect("tokio 0.1 timer should work!"); + }); + assert!( + instant.elapsed() >= time * 2, + "Block on should poll awaited future to completion" + ); +} + +#[test] +#[cfg(all(feature = "tokio-compat-executor", not(feature = "tokio-executor")))] +fn tokio_compat_spawn() { + actix_rt::System::new("test_wait_timer").block_on(async move { + // Spawning with tokio 0.1 works on the main thread + tokio01::spawn(futures01::lazy(|| futures01::future::ok(()))); + + // Spawning with tokio 0.2 works of course + tokio::spawn(async {}).await.unwrap(); + }); +}