mirror of https://github.com/fafhrd91/actix-net
Add support of tokio-compat executor
This commit is contained in:
parent
98a2197a09
commit
6b33e7c624
|
@ -62,6 +62,11 @@ jobs:
|
||||||
command: test
|
command: test
|
||||||
args: --all --all-features --no-fail-fast -- --nocapture
|
args: --all --all-features --no-fail-fast -- --nocapture
|
||||||
|
|
||||||
|
- name: tokio compat tests
|
||||||
|
run:
|
||||||
|
cd actix-rt
|
||||||
|
cargo test --features tokio-compat-executor --no-default-features
|
||||||
|
|
||||||
- name: Generate coverage file
|
- name: Generate coverage file
|
||||||
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
|
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
|
||||||
run: |
|
run: |
|
||||||
|
|
|
@ -41,3 +41,8 @@ jobs:
|
||||||
with:
|
with:
|
||||||
command: test
|
command: test
|
||||||
args: --all --all-features --no-fail-fast -- --nocapture
|
args: --all --all-features --no-fail-fast -- --nocapture
|
||||||
|
|
||||||
|
- name: tokio compat tests
|
||||||
|
run:
|
||||||
|
cd actix-rt
|
||||||
|
cargo test --features tokio-compat-executor --no-default-features
|
||||||
|
|
|
@ -67,3 +67,8 @@ jobs:
|
||||||
with:
|
with:
|
||||||
command: test
|
command: test
|
||||||
args: --all --all-features --no-fail-fast -- --nocapture
|
args: --all --all-features --no-fail-fast -- --nocapture
|
||||||
|
|
||||||
|
- name: tokio compat tests
|
||||||
|
run:
|
||||||
|
cd actix-rt
|
||||||
|
cargo test --features tokio-compat-executor --no-default-features
|
||||||
|
|
|
@ -22,7 +22,15 @@ futures-channel = { version = "0.3.4", default-features = false }
|
||||||
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] }
|
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] }
|
||||||
copyless = "0.1.4"
|
copyless = "0.1.4"
|
||||||
smallvec = "1"
|
smallvec = "1"
|
||||||
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
|
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"], optional = true }
|
||||||
|
tokio-compat = { version = "0.1.6", features = ["rt-full"], optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "0.2.6", features = ["full"] }
|
tokio = { version = "0.2.6", features = ["full"] }
|
||||||
|
tokio01 = { package = "tokio", version = "0.1" }
|
||||||
|
futures01 = { package = "futures", version = "0.1" }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = ["tokio-executor"]
|
||||||
|
tokio-executor = ["tokio"]
|
||||||
|
tokio-compat-executor = ["tokio", "tokio-compat"]
|
||||||
|
|
|
@ -13,13 +13,14 @@ use futures_util::{
|
||||||
stream::Stream,
|
stream::Stream,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::executor::spawn_local;
|
||||||
use crate::runtime::Runtime;
|
use crate::runtime::Runtime;
|
||||||
use crate::system::System;
|
use crate::system::System;
|
||||||
|
|
||||||
use copyless::BoxHelper;
|
use copyless::BoxHelper;
|
||||||
|
|
||||||
|
pub use crate::executor::JoinHandle;
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
pub use tokio::task::JoinHandle;
|
|
||||||
|
|
||||||
thread_local!(
|
thread_local!(
|
||||||
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
|
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
|
||||||
|
@ -162,7 +163,7 @@ impl Arbiter {
|
||||||
if let Some(rt) = rt {
|
if let Some(rt) = rt {
|
||||||
rt.spawn(fut);
|
rt.spawn(fut);
|
||||||
} else {
|
} else {
|
||||||
tokio::task::spawn_local(fut);
|
spawn_local(fut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -184,12 +185,12 @@ impl Arbiter {
|
||||||
// Spawn the future on running executor
|
// Spawn the future on running executor
|
||||||
let len = PENDING.with(move |cell| {
|
let len = PENDING.with(move |cell| {
|
||||||
let mut p = cell.borrow_mut();
|
let mut p = cell.borrow_mut();
|
||||||
p.push(tokio::task::spawn_local(future));
|
p.push(spawn_local(future));
|
||||||
p.len()
|
p.len()
|
||||||
});
|
});
|
||||||
if len > 7 {
|
if len > 7 {
|
||||||
// Before reaching the inline size
|
// Before reaching the inline size
|
||||||
tokio::task::spawn_local(CleanupPending);
|
spawn_local(CleanupPending);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Box the future and push it to the queue, this results in double boxing
|
// Box the future and push it to the queue, this results in double boxing
|
||||||
|
@ -383,12 +384,12 @@ impl Future for ArbiterController {
|
||||||
ArbiterCommand::Execute(fut) => {
|
ArbiterCommand::Execute(fut) => {
|
||||||
let len = PENDING.with(move |cell| {
|
let len = PENDING.with(move |cell| {
|
||||||
let mut p = cell.borrow_mut();
|
let mut p = cell.borrow_mut();
|
||||||
p.push(tokio::task::spawn_local(fut));
|
p.push(spawn_local(fut));
|
||||||
p.len()
|
p.len()
|
||||||
});
|
});
|
||||||
if len > 7 {
|
if len > 7 {
|
||||||
// Before reaching the inline size
|
// Before reaching the inline size
|
||||||
tokio::task::spawn_local(CleanupPending);
|
spawn_local(CleanupPending);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ArbiterCommand::ExecuteFn(f) => {
|
ArbiterCommand::ExecuteFn(f) => {
|
||||||
|
|
|
@ -4,9 +4,9 @@ use std::io;
|
||||||
use futures_channel::mpsc::unbounded;
|
use futures_channel::mpsc::unbounded;
|
||||||
use futures_channel::oneshot::{channel, Receiver};
|
use futures_channel::oneshot::{channel, Receiver};
|
||||||
use futures_util::future::{lazy, Future, FutureExt};
|
use futures_util::future::{lazy, Future, FutureExt};
|
||||||
use tokio::task::LocalSet;
|
|
||||||
|
|
||||||
use crate::arbiter::{Arbiter, SystemArbiter};
|
use crate::arbiter::{Arbiter, SystemArbiter};
|
||||||
|
use crate::executor::LocalSet;
|
||||||
use crate::runtime::Runtime;
|
use crate::runtime::Runtime;
|
||||||
use crate::system::System;
|
use crate::system::System;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,50 @@
|
||||||
|
//! This module provides a wrapper abstraction around used executor.
|
||||||
|
//!
|
||||||
|
//! Currently supported executors:
|
||||||
|
//!
|
||||||
|
//! - `tokio` (default)
|
||||||
|
//! - `tokio-compat`
|
||||||
|
|
||||||
|
pub use self::executor_impl::*;
|
||||||
|
|
||||||
|
#[cfg(feature = "tokio-executor")]
|
||||||
|
mod executor_impl {
|
||||||
|
use std::io::Result;
|
||||||
|
pub use tokio::runtime::Runtime;
|
||||||
|
pub use tokio::task::{spawn_local, JoinHandle, LocalSet};
|
||||||
|
|
||||||
|
pub fn build_runtime() -> Result<Runtime> {
|
||||||
|
tokio::runtime::Builder::new()
|
||||||
|
.enable_io()
|
||||||
|
.enable_time()
|
||||||
|
.basic_scheduler()
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn block_on_local<F>(rt: &mut Runtime, local: &LocalSet, f: F) -> F::Output
|
||||||
|
where
|
||||||
|
F: std::future::Future + 'static,
|
||||||
|
{
|
||||||
|
local.block_on(rt, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(all(not(feature = "tokio-executor"), feature = "tokio-compat-executor"))]
|
||||||
|
mod executor_impl {
|
||||||
|
use std::io::Result;
|
||||||
|
pub use tokio::task::{spawn_local, JoinHandle, LocalSet};
|
||||||
|
pub use tokio_compat::runtime::Runtime;
|
||||||
|
|
||||||
|
pub fn build_runtime() -> Result<Runtime> {
|
||||||
|
tokio_compat::runtime::Builder::new()
|
||||||
|
.core_threads(1)
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn block_on_local<F>(rt: &mut Runtime, local: &LocalSet, f: F) -> F::Output
|
||||||
|
where
|
||||||
|
F: std::future::Future + 'static,
|
||||||
|
{
|
||||||
|
rt.block_on_std(local.run_until(f))
|
||||||
|
}
|
||||||
|
}
|
|
@ -7,6 +7,7 @@ pub use actix_macros::{main, test};
|
||||||
|
|
||||||
mod arbiter;
|
mod arbiter;
|
||||||
mod builder;
|
mod builder;
|
||||||
|
mod executor;
|
||||||
mod runtime;
|
mod runtime;
|
||||||
mod system;
|
mod system;
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
|
use crate::executor::{LocalSet, Runtime as ExecutorRuntime};
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
use std::io;
|
use std::io;
|
||||||
use tokio::{runtime, task::LocalSet};
|
|
||||||
|
|
||||||
/// Single-threaded runtime provides a way to start reactor
|
/// Single-threaded runtime provides a way to start reactor
|
||||||
/// and runtime on the current thread.
|
/// and runtime on the current thread.
|
||||||
|
@ -11,18 +11,14 @@ use tokio::{runtime, task::LocalSet};
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Runtime {
|
pub struct Runtime {
|
||||||
local: LocalSet,
|
local: LocalSet,
|
||||||
rt: runtime::Runtime,
|
rt: ExecutorRuntime,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Runtime {
|
impl Runtime {
|
||||||
#[allow(clippy::new_ret_no_self)]
|
#[allow(clippy::new_ret_no_self)]
|
||||||
/// Returns a new runtime initialized with default configuration values.
|
/// Returns a new runtime initialized with default configuration values.
|
||||||
pub fn new() -> io::Result<Runtime> {
|
pub fn new() -> io::Result<Runtime> {
|
||||||
let rt = runtime::Builder::new()
|
let rt = crate::executor::build_runtime()?;
|
||||||
.enable_io()
|
|
||||||
.enable_time()
|
|
||||||
.basic_scheduler()
|
|
||||||
.build()?;
|
|
||||||
|
|
||||||
Ok(Runtime {
|
Ok(Runtime {
|
||||||
rt,
|
rt,
|
||||||
|
@ -86,6 +82,6 @@ impl Runtime {
|
||||||
where
|
where
|
||||||
F: Future + 'static,
|
F: Future + 'static,
|
||||||
{
|
{
|
||||||
self.local.block_on(&mut self.rt, f)
|
crate::executor::block_on_local(&mut self.rt, &self.local, f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,10 +4,10 @@ use std::io;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
use futures_channel::mpsc::UnboundedSender;
|
use futures_channel::mpsc::UnboundedSender;
|
||||||
use tokio::task::LocalSet;
|
|
||||||
|
|
||||||
use crate::arbiter::{Arbiter, SystemCommand};
|
use crate::arbiter::{Arbiter, SystemCommand};
|
||||||
use crate::builder::{Builder, SystemRunner};
|
use crate::builder::{Builder, SystemRunner};
|
||||||
|
use crate::executor::LocalSet;
|
||||||
|
|
||||||
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
|
||||||
|
@ -143,6 +143,8 @@ impl System {
|
||||||
/// # Examples
|
/// # Examples
|
||||||
///
|
///
|
||||||
/// ```
|
/// ```
|
||||||
|
/// # #[cfg(feature = "tokio-executor")] // Test won't work for `tokio-compat-executor`.
|
||||||
|
/// # fn main() {
|
||||||
/// use tokio::runtime::Runtime;
|
/// use tokio::runtime::Runtime;
|
||||||
/// use actix_rt::System;
|
/// use actix_rt::System;
|
||||||
/// use futures_util::future::try_join_all;
|
/// use futures_util::future::try_join_all;
|
||||||
|
@ -175,20 +177,22 @@ impl System {
|
||||||
///
|
///
|
||||||
/// let rest_operations = run_application();
|
/// let rest_operations = run_application();
|
||||||
/// System::attach_to_tokio("actix-main-system", runtime, rest_operations);
|
/// System::attach_to_tokio("actix-main-system", runtime, rest_operations);
|
||||||
|
/// # }
|
||||||
|
/// # #[cfg(not(feature = "tokio-executor"))] fn main() {} // Provie a blanket main function, so test will compile.
|
||||||
/// ```
|
/// ```
|
||||||
pub fn attach_to_tokio<Fut, R>(
|
pub fn attach_to_tokio<Fut, R>(
|
||||||
name: impl Into<String>,
|
name: impl Into<String>,
|
||||||
mut runtime: tokio::runtime::Runtime,
|
mut runtime: crate::executor::Runtime,
|
||||||
rest_operations: Fut,
|
rest_operations: Fut,
|
||||||
) -> R
|
) -> R
|
||||||
where
|
where
|
||||||
Fut: std::future::Future<Output = R>,
|
Fut: std::future::Future<Output = R> + 'static,
|
||||||
{
|
{
|
||||||
let actix_system_task = LocalSet::new();
|
let actix_system_task = LocalSet::new();
|
||||||
let sys = System::run_in_tokio(name.into(), &actix_system_task);
|
let sys = System::run_in_tokio(name.into(), &actix_system_task);
|
||||||
actix_system_task.spawn_local(sys);
|
actix_system_task.spawn_local(sys);
|
||||||
|
|
||||||
runtime.block_on(actix_system_task.run_until(rest_operations))
|
crate::executor::block_on_local(&mut runtime, &actix_system_task, rest_operations)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get current running system.
|
/// Get current running system.
|
||||||
|
|
|
@ -112,3 +112,39 @@ fn join_current_arbiter() {
|
||||||
"local_join should await only for the already spawned futures"
|
"local_join should await only for the already spawned futures"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg(all(feature = "tokio-compat-executor", not(feature = "tokio-executor")))]
|
||||||
|
fn tokio_compat_timer() {
|
||||||
|
use tokio_compat::prelude::*;
|
||||||
|
|
||||||
|
let time = Duration::from_secs(2);
|
||||||
|
let instant = Instant::now();
|
||||||
|
actix_rt::System::new("test_wait_timer").block_on(async move {
|
||||||
|
// Spawn a `std::Future`.
|
||||||
|
tokio::time::delay_for(time).await;
|
||||||
|
let when = Instant::now() + time;
|
||||||
|
// Spawn a `futures01::Future`.
|
||||||
|
tokio01::timer::Delay::new(when)
|
||||||
|
// convert the delay future into a `std::future` that we can `await`.
|
||||||
|
.compat()
|
||||||
|
.await
|
||||||
|
.expect("tokio 0.1 timer should work!");
|
||||||
|
});
|
||||||
|
assert!(
|
||||||
|
instant.elapsed() >= time * 2,
|
||||||
|
"Block on should poll awaited future to completion"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg(all(feature = "tokio-compat-executor", not(feature = "tokio-executor")))]
|
||||||
|
fn tokio_compat_spawn() {
|
||||||
|
actix_rt::System::new("test_wait_timer").block_on(async move {
|
||||||
|
// Spawning with tokio 0.1 works on the main thread
|
||||||
|
tokio01::spawn(futures01::lazy(|| futures01::future::ok(())));
|
||||||
|
|
||||||
|
// Spawning with tokio 0.2 works of course
|
||||||
|
tokio::spawn(async {}).await.unwrap();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue