From f596a779e4434260d9b0d014b205d1eeaa394adc Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 17 Jul 2021 12:25:02 +0800 Subject: [PATCH] add support for io-uring --- actix-router/src/path.rs | 4 +-- actix-rt/Cargo.toml | 2 ++ actix-rt/src/arbiter.rs | 70 +++++++++++++++++++++++++++++++++----- actix-rt/src/lib.rs | 3 ++ actix-rt/src/runtime.rs | 5 --- actix-rt/src/system.rs | 3 +- actix-rt/tests/tests.rs | 37 +++++++++++++++++--- actix-server/Cargo.toml | 1 + actix-server/src/worker.rs | 16 +++++++-- 9 files changed, 116 insertions(+), 25 deletions(-) diff --git a/actix-router/src/path.rs b/actix-router/src/path.rs index b937665c..38931c35 100644 --- a/actix-router/src/path.rs +++ b/actix-router/src/path.rs @@ -122,7 +122,7 @@ impl Path { /// Get matched parameter by name without type conversion pub fn get(&self, key: &str) -> Option<&str> { profile_method!(get); - + for item in self.segments.iter() { if key == item.0 { return match item.1 { @@ -150,7 +150,7 @@ impl Path { /// If keyed parameter is not available empty string is used as default value. pub fn query(&self, key: &str) -> &str { profile_method!(query); - + if let Some(s) = self.get(key) { s } else { diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index f4a90d2c..70ab0af8 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -21,12 +21,14 @@ path = "src/lib.rs" [features] default = ["macros"] macros = ["actix-macros"] +io-uring = ["tokio-uring"] [dependencies] actix-macros = { version = "0.2.0", optional = true } futures-core = { version = "0.3", default-features = false } tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } +tokio-uring = { git = "https://github.com/tokio-rs/tokio-uring.git", optional = true } [dev-dependencies] tokio = { version = "1.2", features = ["full"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 9ff1419d..8d51a7f2 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -9,12 +9,9 @@ use std::{ }; use futures_core::ready; -use tokio::{sync::mpsc, task::LocalSet}; +use tokio::sync::mpsc; -use crate::{ - runtime::{default_tokio_runtime, Runtime}, - system::{System, SystemCommand}, -}; +use crate::system::{System, SystemCommand}; pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); @@ -94,6 +91,7 @@ pub struct Arbiter { } impl Arbiter { + #[cfg(not(feature = "io-uring"))] /// Spawn a new Arbiter thread and start its event loop. /// /// # Panics @@ -101,10 +99,12 @@ impl Arbiter { #[allow(clippy::new_without_default)] pub fn new() -> Arbiter { Self::with_tokio_rt(|| { - default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.") + crate::runtime::default_tokio_runtime() + .expect("Cannot create new Arbiter's Runtime.") }) } + #[cfg(not(feature = "io-uring"))] /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. /// /// [tokio-runtime]: tokio::runtime::Runtime @@ -127,7 +127,7 @@ impl Arbiter { .spawn({ let tx = tx.clone(); move || { - let rt = Runtime::from(runtime_factory()); + let rt = crate::runtime::Runtime::from(runtime_factory()); let hnd = ArbiterHandle::new(tx); System::set_current(sys); @@ -159,15 +159,67 @@ impl Arbiter { Arbiter { tx, thread_handle } } + #[cfg(feature = "io-uring")] + /// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime. + /// + /// # Panics + /// Panics if a [System] is not registered on the current thread. + #[allow(clippy::new_without_default)] + pub fn new() -> Arbiter { + let sys = System::current(); + let system_id = sys.id(); + let arb_id = COUNT.fetch_add(1, Ordering::Relaxed); + + let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id); + let (tx, rx) = mpsc::unbounded_channel(); + + let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); + + let thread_handle = thread::Builder::new() + .name(name.clone()) + .spawn({ + let tx = tx.clone(); + move || { + let hnd = ArbiterHandle::new(tx); + + System::set_current(sys); + + HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); + + // register arbiter + let _ = System::current() + .tx() + .send(SystemCommand::RegisterArbiter(arb_id, hnd)); + + ready_tx.send(()).unwrap(); + + // run arbiter event processing loop + tokio_uring::start(ArbiterRunner { rx }); + + // deregister arbiter + let _ = System::current() + .tx() + .send(SystemCommand::DeregisterArbiter(arb_id)); + } + }) + .unwrap_or_else(|err| { + panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err) + }); + + ready_rx.recv().unwrap(); + + Arbiter { tx, thread_handle } + } + /// Sets up an Arbiter runner in a new System using the provided runtime local task set. - pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle { + pub(crate) fn in_new_system() -> ArbiterHandle { let (tx, rx) = mpsc::unbounded_channel(); let hnd = ArbiterHandle::new(tx); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); - local.spawn_local(ArbiterRunner { rx }); + crate::spawn(ArbiterRunner { rx }); hnd } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 4454b3c4..511a2cbd 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -39,6 +39,9 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] +#[cfg(all(not(target_os = "linux"), feature = "io-uring"))] +compile_error!("io_uring is a linux only feature."); + use std::future::Future; use tokio::task::JoinHandle; diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index 1adbf6c0..25937003 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -31,11 +31,6 @@ impl Runtime { }) } - /// Reference to local task set. - pub(crate) fn local_set(&self) -> &LocalSet { - &self.local - } - /// Offload a future onto the single-threaded runtime. /// /// The returned join handle can be used to await the future's result. diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 3bc8a6e3..796c7d74 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -54,7 +54,8 @@ impl System { let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let rt = Runtime::from(runtime_factory()); - let sys_arbiter = Arbiter::in_new_system(rt.local_set()); + + let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() }); let system = System::construct(sys_tx, sys_arbiter.clone()); system diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 839b1fbc..e94cbd00 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -1,9 +1,5 @@ use std::{ - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::channel, - Arc, - }, + sync::mpsc::channel, thread, time::{Duration, Instant}, }; @@ -230,6 +226,7 @@ fn system_stop_stops_arbiters() { arb.join().unwrap(); } +#[cfg(not(feature = "io-uring"))] #[test] fn new_system_with_tokio() { let (tx, rx) = channel(); @@ -262,8 +259,14 @@ fn new_system_with_tokio() { assert_eq!(rx.recv().unwrap(), 42); } +#[cfg(not(feature = "io-uring"))] #[test] fn new_arbiter_with_tokio() { + use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }; + let _ = System::new(); let arb = Arbiter::with_tokio_rt(|| { @@ -298,3 +301,27 @@ fn try_current_no_system() { fn try_current_with_system() { System::new().block_on(async { assert!(System::try_current().is_some()) }); } + +#[cfg(feature = "io-uring")] +#[test] +fn tokio_uring_arbiter() { + let system = System::new(); + let (tx, rx) = std::sync::mpsc::channel(); + Arbiter::new().spawn(async move { + let handle = actix_rt::spawn(async move { + let f = tokio_uring::fs::File::create("test.txt").await.unwrap(); + let buf = b"Hello World!"; + let (res, _) = f.write_at(&buf[..], 0).await; + assert!(res.is_ok()); + f.sync_all().await.unwrap(); + f.close().await.unwrap(); + std::fs::remove_file("test.txt").unwrap(); + }); + handle.await.unwrap(); + tx.send(true).unwrap(); + }); + + assert!(rx.recv().unwrap()); + + drop(system); +} diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 58471cf9..89e1d4e2 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -18,6 +18,7 @@ path = "src/lib.rs" [features] default = [] +io-uring = ["actix-rt/io-uring"] [dependencies] actix-rt = { version = "2.0.0", default-features = false } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index a974522a..3a2da590 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -280,14 +280,24 @@ impl ServerWorker { let counter_clone = counter.clone(); // every worker runs in it's own arbiter. // use a custom tokio runtime builder to change the settings of runtime. - Arbiter::with_tokio_rt(move || { + #[cfg(feature = "io-uring")] + let arbiter = { + // TODO: pass max blocking thread config when tokio-uring enable configuration + // on building runtime. + let _ = config.max_blocking_threads; + Arbiter::new() + }; + + #[cfg(not(feature = "io-uring"))] + let arbiter = Arbiter::with_tokio_rt(move || { tokio::runtime::Builder::new_current_thread() .enable_all() .max_blocking_threads(config.max_blocking_threads) .build() .unwrap() - }) - .spawn(async move { + }); + + arbiter.spawn(async move { let fut = factories .iter() .enumerate()