From 21ba890c0a4ddae2a3a1009dc397f5e94d5175dc Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Feb 2021 00:57:56 +0000 Subject: [PATCH] add hidden from tokio methods --- actix-rt/Cargo.toml | 1 + actix-rt/examples/hyper.rs | 28 ++++++++++++++++ actix-rt/src/arbiter.rs | 19 +++++++++-- actix-rt/src/runtime.rs | 25 +++++++++++---- actix-rt/src/system.rs | 18 +++++++++-- actix-rt/tests/tests.rs | 65 +++++++++++++++++++++++++++++++++++++- 6 files changed, 144 insertions(+), 12 deletions(-) create mode 100644 actix-rt/examples/hyper.rs diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index d8a873ba..f8f3984d 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -30,3 +30,4 @@ tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync [dev-dependencies] tokio = { version = "1", features = ["full"] } +hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] } diff --git a/actix-rt/examples/hyper.rs b/actix-rt/examples/hyper.rs new file mode 100644 index 00000000..8bad1b33 --- /dev/null +++ b/actix-rt/examples/hyper.rs @@ -0,0 +1,28 @@ +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Request, Response, Server}; +use std::convert::Infallible; +use std::net::SocketAddr; + +async fn handle(_req: Request) -> Result, Infallible> { + Ok(Response::new(Body::from("Hello World"))) +} + +fn main() { + actix_rt::System::with_tokio_rt(|| { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + }) + .block_on(async { + let make_service = + make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) }); + + let server = + Server::bind(&SocketAddr::from(([127, 0, 0, 1], 3000))).serve(make_service); + + if let Err(e) = server.await { + eprintln!("server error: {}", e); + } + }) +} diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index a3cb5272..8279cb90 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -12,7 +12,7 @@ use futures_core::ready; use tokio::{sync::mpsc, task::LocalSet}; use crate::{ - runtime::Runtime, + runtime::{default_tokio_runtime, Runtime}, system::{System, SystemCommand}, }; @@ -94,12 +94,25 @@ pub struct Arbiter { } impl Arbiter { - /// Spawn new Arbiter thread and start its event loop. + /// Spawn a new Arbiter thread and start its event loop. /// /// # Panics /// Panics if a [System] is not registered on the current thread. #[allow(clippy::new_without_default)] pub fn new() -> Arbiter { + Self::with_tokio_rt(|| { + default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.") + }) + } + + /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. + /// + /// [tokio-runtime]: tokio::runtime::Runtime + #[doc(hidden)] + pub fn with_tokio_rt(runtime_factory: F) -> Arbiter + where + F: Fn() -> tokio::runtime::Runtime + Send + 'static, + { let id = COUNT.fetch_add(1, Ordering::Relaxed); let system_id = System::current().id(); let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id); @@ -113,7 +126,7 @@ impl Arbiter { .spawn({ let tx = tx.clone(); move || { - let rt = Runtime::new().expect("Cannot create new Arbiter's Runtime."); + let rt = Runtime::from(runtime_factory()); let hnd = ArbiterHandle::new(tx); System::set_current(sys); diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index a20dfe7e..1adbf6c0 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -2,7 +2,7 @@ use std::{future::Future, io}; use tokio::task::{JoinHandle, LocalSet}; -/// A single-threaded runtime based on Tokio's "current thread" runtime. +/// A Tokio-based runtime proxy. /// /// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound /// on submitted futures. @@ -12,14 +12,18 @@ pub struct Runtime { rt: tokio::runtime::Runtime, } +pub(crate) fn default_tokio_runtime() -> io::Result { + tokio::runtime::Builder::new_current_thread() + .enable_io() + .enable_time() + .build() +} + impl Runtime { /// Returns a new runtime initialized with default configuration values. #[allow(clippy::new_ret_no_self)] - pub fn new() -> io::Result { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_io() - .enable_time() - .build()?; + pub fn new() -> io::Result { + let rt = default_tokio_runtime()?; Ok(Runtime { rt, @@ -81,3 +85,12 @@ impl Runtime { self.local.block_on(&self.rt, f) } } + +impl From for Runtime { + fn from(rt: tokio::runtime::Runtime) -> Self { + Self { + local: LocalSet::new(), + rt, + } + } +} diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 58fe3cab..b7f134cb 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -11,7 +11,7 @@ use std::{ use futures_core::ready; use tokio::sync::{mpsc, oneshot}; -use crate::{arbiter::ArbiterHandle, Arbiter, Runtime}; +use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -36,10 +36,24 @@ impl System { /// Panics if underlying Tokio runtime can not be created. #[allow(clippy::new_ret_no_self)] pub fn new() -> SystemRunner { + Self::with_tokio_rt(|| { + default_tokio_runtime() + .expect("Default Actix (Tokio) runtime could not be created.") + }) + } + + /// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure. + /// + /// [tokio-runtime]: tokio::runtime::Runtime + #[doc(hidden)] + pub fn with_tokio_rt(runtime_factory: F) -> SystemRunner + where + F: Fn() -> tokio::runtime::Runtime, + { let (stop_tx, stop_rx) = oneshot::channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel(); - let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created."); + let rt = Runtime::from(runtime_factory()); let sys_arbiter = Arbiter::in_new_system(rt.local_set()); let system = System::construct(sys_tx, sys_arbiter.clone()); diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index f54e9909..f9634aba 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -1,5 +1,9 @@ use std::{ - sync::mpsc::channel, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::channel, + Arc, + }, thread, time::{Duration, Instant}, }; @@ -200,3 +204,62 @@ fn system_stop_stops_arbiters() { arb.join().unwrap(); } + +#[test] +fn new_system_with_tokio() { + let (tx, rx) = channel(); + + let res = System::with_tokio_rt(move || { + tokio::runtime::Builder::new_multi_thread() + .enable_io() + .enable_time() + .thread_keep_alive(Duration::from_millis(1000)) + .worker_threads(2) + .max_blocking_threads(2) + .on_thread_start(|| {}) + .on_thread_stop(|| {}) + .build() + .unwrap() + }) + .block_on(async { + actix_rt::time::sleep(Duration::from_millis(1)).await; + + tokio::task::spawn(async move { + tx.send(42).unwrap(); + }) + .await + .unwrap(); + + 123usize + }); + + assert_eq!(res, 123); + assert_eq!(rx.recv().unwrap(), 42); +} + +#[test] +fn new_arbiter_with_tokio() { + let _ = System::new(); + + let arb = Arbiter::with_tokio_rt(|| { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + }); + + let counter = Arc::new(AtomicBool::new(true)); + + let counter1 = counter.clone(); + let did_spawn = arb.spawn(async move { + actix_rt::time::sleep(Duration::from_millis(1)).await; + counter1.store(false, Ordering::SeqCst); + Arbiter::current().stop(); + }); + + assert!(did_spawn); + + arb.join().unwrap(); + + assert_eq!(false, counter.load(Ordering::SeqCst)); +}