diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml
index d8a873ba..f8f3984d 100644
--- a/actix-rt/Cargo.toml
+++ b/actix-rt/Cargo.toml
@@ -30,3 +30,4 @@ tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync
[dev-dependencies]
tokio = { version = "1", features = ["full"] }
+hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] }
diff --git a/actix-rt/examples/hyper.rs b/actix-rt/examples/hyper.rs
new file mode 100644
index 00000000..8bad1b33
--- /dev/null
+++ b/actix-rt/examples/hyper.rs
@@ -0,0 +1,28 @@
+use hyper::service::{make_service_fn, service_fn};
+use hyper::{Body, Request, Response, Server};
+use std::convert::Infallible;
+use std::net::SocketAddr;
+
+async fn handle(_req: Request
) -> Result, Infallible> {
+ Ok(Response::new(Body::from("Hello World")))
+}
+
+fn main() {
+ actix_rt::System::with_tokio_rt(|| {
+ tokio::runtime::Builder::new_multi_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ })
+ .block_on(async {
+ let make_service =
+ make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
+
+ let server =
+ Server::bind(&SocketAddr::from(([127, 0, 0, 1], 3000))).serve(make_service);
+
+ if let Err(e) = server.await {
+ eprintln!("server error: {}", e);
+ }
+ })
+}
diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs
index a3cb5272..8279cb90 100644
--- a/actix-rt/src/arbiter.rs
+++ b/actix-rt/src/arbiter.rs
@@ -12,7 +12,7 @@ use futures_core::ready;
use tokio::{sync::mpsc, task::LocalSet};
use crate::{
- runtime::Runtime,
+ runtime::{default_tokio_runtime, Runtime},
system::{System, SystemCommand},
};
@@ -94,12 +94,25 @@ pub struct Arbiter {
}
impl Arbiter {
- /// Spawn new Arbiter thread and start its event loop.
+ /// Spawn a new Arbiter thread and start its event loop.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
+ Self::with_tokio_rt(|| {
+ default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
+ })
+ }
+
+ /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
+ ///
+ /// [tokio-runtime]: tokio::runtime::Runtime
+ #[doc(hidden)]
+ pub fn with_tokio_rt(runtime_factory: F) -> Arbiter
+ where
+ F: Fn() -> tokio::runtime::Runtime + Send + 'static,
+ {
let id = COUNT.fetch_add(1, Ordering::Relaxed);
let system_id = System::current().id();
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id);
@@ -113,7 +126,7 @@ impl Arbiter {
.spawn({
let tx = tx.clone();
move || {
- let rt = Runtime::new().expect("Cannot create new Arbiter's Runtime.");
+ let rt = Runtime::from(runtime_factory());
let hnd = ArbiterHandle::new(tx);
System::set_current(sys);
diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs
index a20dfe7e..1adbf6c0 100644
--- a/actix-rt/src/runtime.rs
+++ b/actix-rt/src/runtime.rs
@@ -2,7 +2,7 @@ use std::{future::Future, io};
use tokio::task::{JoinHandle, LocalSet};
-/// A single-threaded runtime based on Tokio's "current thread" runtime.
+/// A Tokio-based runtime proxy.
///
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
/// on submitted futures.
@@ -12,14 +12,18 @@ pub struct Runtime {
rt: tokio::runtime::Runtime,
}
+pub(crate) fn default_tokio_runtime() -> io::Result {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_io()
+ .enable_time()
+ .build()
+}
+
impl Runtime {
/// Returns a new runtime initialized with default configuration values.
#[allow(clippy::new_ret_no_self)]
- pub fn new() -> io::Result {
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_io()
- .enable_time()
- .build()?;
+ pub fn new() -> io::Result {
+ let rt = default_tokio_runtime()?;
Ok(Runtime {
rt,
@@ -81,3 +85,12 @@ impl Runtime {
self.local.block_on(&self.rt, f)
}
}
+
+impl From for Runtime {
+ fn from(rt: tokio::runtime::Runtime) -> Self {
+ Self {
+ local: LocalSet::new(),
+ rt,
+ }
+ }
+}
diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs
index 58fe3cab..b7f134cb 100644
--- a/actix-rt/src/system.rs
+++ b/actix-rt/src/system.rs
@@ -11,7 +11,7 @@ use std::{
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};
-use crate::{arbiter::ArbiterHandle, Arbiter, Runtime};
+use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@@ -36,10 +36,24 @@ impl System {
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
+ Self::with_tokio_rt(|| {
+ default_tokio_runtime()
+ .expect("Default Actix (Tokio) runtime could not be created.")
+ })
+ }
+
+ /// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
+ ///
+ /// [tokio-runtime]: tokio::runtime::Runtime
+ #[doc(hidden)]
+ pub fn with_tokio_rt(runtime_factory: F) -> SystemRunner
+ where
+ F: Fn() -> tokio::runtime::Runtime,
+ {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
- let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
+ let rt = Runtime::from(runtime_factory());
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
let system = System::construct(sys_tx, sys_arbiter.clone());
diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs
index f54e9909..f9634aba 100644
--- a/actix-rt/tests/tests.rs
+++ b/actix-rt/tests/tests.rs
@@ -1,5 +1,9 @@
use std::{
- sync::mpsc::channel,
+ sync::{
+ atomic::{AtomicBool, Ordering},
+ mpsc::channel,
+ Arc,
+ },
thread,
time::{Duration, Instant},
};
@@ -200,3 +204,62 @@ fn system_stop_stops_arbiters() {
arb.join().unwrap();
}
+
+#[test]
+fn new_system_with_tokio() {
+ let (tx, rx) = channel();
+
+ let res = System::with_tokio_rt(move || {
+ tokio::runtime::Builder::new_multi_thread()
+ .enable_io()
+ .enable_time()
+ .thread_keep_alive(Duration::from_millis(1000))
+ .worker_threads(2)
+ .max_blocking_threads(2)
+ .on_thread_start(|| {})
+ .on_thread_stop(|| {})
+ .build()
+ .unwrap()
+ })
+ .block_on(async {
+ actix_rt::time::sleep(Duration::from_millis(1)).await;
+
+ tokio::task::spawn(async move {
+ tx.send(42).unwrap();
+ })
+ .await
+ .unwrap();
+
+ 123usize
+ });
+
+ assert_eq!(res, 123);
+ assert_eq!(rx.recv().unwrap(), 42);
+}
+
+#[test]
+fn new_arbiter_with_tokio() {
+ let _ = System::new();
+
+ let arb = Arbiter::with_tokio_rt(|| {
+ tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap()
+ });
+
+ let counter = Arc::new(AtomicBool::new(true));
+
+ let counter1 = counter.clone();
+ let did_spawn = arb.spawn(async move {
+ actix_rt::time::sleep(Duration::from_millis(1)).await;
+ counter1.store(false, Ordering::SeqCst);
+ Arbiter::current().stop();
+ });
+
+ assert!(did_spawn);
+
+ arb.join().unwrap();
+
+ assert_eq!(false, counter.load(Ordering::SeqCst));
+}