mirror of https://github.com/fafhrd91/actix-net
add support for io-uring
This commit is contained in:
parent
e4ec956001
commit
f596a779e4
|
@ -122,7 +122,7 @@ impl<T: ResourcePath> Path<T> {
|
||||||
/// Get matched parameter by name without type conversion
|
/// Get matched parameter by name without type conversion
|
||||||
pub fn get(&self, key: &str) -> Option<&str> {
|
pub fn get(&self, key: &str) -> Option<&str> {
|
||||||
profile_method!(get);
|
profile_method!(get);
|
||||||
|
|
||||||
for item in self.segments.iter() {
|
for item in self.segments.iter() {
|
||||||
if key == item.0 {
|
if key == item.0 {
|
||||||
return match item.1 {
|
return match item.1 {
|
||||||
|
@ -150,7 +150,7 @@ impl<T: ResourcePath> Path<T> {
|
||||||
/// If keyed parameter is not available empty string is used as default value.
|
/// If keyed parameter is not available empty string is used as default value.
|
||||||
pub fn query(&self, key: &str) -> &str {
|
pub fn query(&self, key: &str) -> &str {
|
||||||
profile_method!(query);
|
profile_method!(query);
|
||||||
|
|
||||||
if let Some(s) = self.get(key) {
|
if let Some(s) = self.get(key) {
|
||||||
s
|
s
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -21,12 +21,14 @@ path = "src/lib.rs"
|
||||||
[features]
|
[features]
|
||||||
default = ["macros"]
|
default = ["macros"]
|
||||||
macros = ["actix-macros"]
|
macros = ["actix-macros"]
|
||||||
|
io-uring = ["tokio-uring"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-macros = { version = "0.2.0", optional = true }
|
actix-macros = { version = "0.2.0", optional = true }
|
||||||
|
|
||||||
futures-core = { version = "0.3", default-features = false }
|
futures-core = { version = "0.3", default-features = false }
|
||||||
tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
|
tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
|
||||||
|
tokio-uring = { git = "https://github.com/tokio-rs/tokio-uring.git", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "1.2", features = ["full"] }
|
tokio = { version = "1.2", features = ["full"] }
|
||||||
|
|
|
@ -9,12 +9,9 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use tokio::{sync::mpsc, task::LocalSet};
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use crate::{
|
use crate::system::{System, SystemCommand};
|
||||||
runtime::{default_tokio_runtime, Runtime},
|
|
||||||
system::{System, SystemCommand},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
|
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
|
||||||
|
@ -94,6 +91,7 @@ pub struct Arbiter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Arbiter {
|
impl Arbiter {
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
/// Spawn a new Arbiter thread and start its event loop.
|
/// Spawn a new Arbiter thread and start its event loop.
|
||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
|
@ -101,10 +99,12 @@ impl Arbiter {
|
||||||
#[allow(clippy::new_without_default)]
|
#[allow(clippy::new_without_default)]
|
||||||
pub fn new() -> Arbiter {
|
pub fn new() -> Arbiter {
|
||||||
Self::with_tokio_rt(|| {
|
Self::with_tokio_rt(|| {
|
||||||
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
|
crate::runtime::default_tokio_runtime()
|
||||||
|
.expect("Cannot create new Arbiter's Runtime.")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
||||||
///
|
///
|
||||||
/// [tokio-runtime]: tokio::runtime::Runtime
|
/// [tokio-runtime]: tokio::runtime::Runtime
|
||||||
|
@ -127,7 +127,7 @@ impl Arbiter {
|
||||||
.spawn({
|
.spawn({
|
||||||
let tx = tx.clone();
|
let tx = tx.clone();
|
||||||
move || {
|
move || {
|
||||||
let rt = Runtime::from(runtime_factory());
|
let rt = crate::runtime::Runtime::from(runtime_factory());
|
||||||
let hnd = ArbiterHandle::new(tx);
|
let hnd = ArbiterHandle::new(tx);
|
||||||
|
|
||||||
System::set_current(sys);
|
System::set_current(sys);
|
||||||
|
@ -159,15 +159,67 @@ impl Arbiter {
|
||||||
Arbiter { tx, thread_handle }
|
Arbiter { tx, thread_handle }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "io-uring")]
|
||||||
|
/// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
/// Panics if a [System] is not registered on the current thread.
|
||||||
|
#[allow(clippy::new_without_default)]
|
||||||
|
pub fn new() -> Arbiter {
|
||||||
|
let sys = System::current();
|
||||||
|
let system_id = sys.id();
|
||||||
|
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
|
||||||
|
|
||||||
|
let thread_handle = thread::Builder::new()
|
||||||
|
.name(name.clone())
|
||||||
|
.spawn({
|
||||||
|
let tx = tx.clone();
|
||||||
|
move || {
|
||||||
|
let hnd = ArbiterHandle::new(tx);
|
||||||
|
|
||||||
|
System::set_current(sys);
|
||||||
|
|
||||||
|
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
||||||
|
|
||||||
|
// register arbiter
|
||||||
|
let _ = System::current()
|
||||||
|
.tx()
|
||||||
|
.send(SystemCommand::RegisterArbiter(arb_id, hnd));
|
||||||
|
|
||||||
|
ready_tx.send(()).unwrap();
|
||||||
|
|
||||||
|
// run arbiter event processing loop
|
||||||
|
tokio_uring::start(ArbiterRunner { rx });
|
||||||
|
|
||||||
|
// deregister arbiter
|
||||||
|
let _ = System::current()
|
||||||
|
.tx()
|
||||||
|
.send(SystemCommand::DeregisterArbiter(arb_id));
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
|
||||||
|
});
|
||||||
|
|
||||||
|
ready_rx.recv().unwrap();
|
||||||
|
|
||||||
|
Arbiter { tx, thread_handle }
|
||||||
|
}
|
||||||
|
|
||||||
/// Sets up an Arbiter runner in a new System using the provided runtime local task set.
|
/// Sets up an Arbiter runner in a new System using the provided runtime local task set.
|
||||||
pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle {
|
pub(crate) fn in_new_system() -> ArbiterHandle {
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let hnd = ArbiterHandle::new(tx);
|
let hnd = ArbiterHandle::new(tx);
|
||||||
|
|
||||||
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
||||||
|
|
||||||
local.spawn_local(ArbiterRunner { rx });
|
crate::spawn(ArbiterRunner { rx });
|
||||||
|
|
||||||
hnd
|
hnd
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,9 @@
|
||||||
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
|
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
|
||||||
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
|
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
|
||||||
|
|
||||||
|
#[cfg(all(not(target_os = "linux"), feature = "io-uring"))]
|
||||||
|
compile_error!("io_uring is a linux only feature.");
|
||||||
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
|
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
|
|
|
@ -31,11 +31,6 @@ impl Runtime {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Reference to local task set.
|
|
||||||
pub(crate) fn local_set(&self) -> &LocalSet {
|
|
||||||
&self.local
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Offload a future onto the single-threaded runtime.
|
/// Offload a future onto the single-threaded runtime.
|
||||||
///
|
///
|
||||||
/// The returned join handle can be used to await the future's result.
|
/// The returned join handle can be used to await the future's result.
|
||||||
|
|
|
@ -54,7 +54,8 @@ impl System {
|
||||||
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let rt = Runtime::from(runtime_factory());
|
let rt = Runtime::from(runtime_factory());
|
||||||
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
|
|
||||||
|
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
|
||||||
let system = System::construct(sys_tx, sys_arbiter.clone());
|
let system = System::construct(sys_tx, sys_arbiter.clone());
|
||||||
|
|
||||||
system
|
system
|
||||||
|
|
|
@ -1,9 +1,5 @@
|
||||||
use std::{
|
use std::{
|
||||||
sync::{
|
sync::mpsc::channel,
|
||||||
atomic::{AtomicBool, Ordering},
|
|
||||||
mpsc::channel,
|
|
||||||
Arc,
|
|
||||||
},
|
|
||||||
thread,
|
thread,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
@ -230,6 +226,7 @@ fn system_stop_stops_arbiters() {
|
||||||
arb.join().unwrap();
|
arb.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
#[test]
|
#[test]
|
||||||
fn new_system_with_tokio() {
|
fn new_system_with_tokio() {
|
||||||
let (tx, rx) = channel();
|
let (tx, rx) = channel();
|
||||||
|
@ -262,8 +259,14 @@ fn new_system_with_tokio() {
|
||||||
assert_eq!(rx.recv().unwrap(), 42);
|
assert_eq!(rx.recv().unwrap(), 42);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
#[test]
|
#[test]
|
||||||
fn new_arbiter_with_tokio() {
|
fn new_arbiter_with_tokio() {
|
||||||
|
use std::sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
Arc,
|
||||||
|
};
|
||||||
|
|
||||||
let _ = System::new();
|
let _ = System::new();
|
||||||
|
|
||||||
let arb = Arbiter::with_tokio_rt(|| {
|
let arb = Arbiter::with_tokio_rt(|| {
|
||||||
|
@ -298,3 +301,27 @@ fn try_current_no_system() {
|
||||||
fn try_current_with_system() {
|
fn try_current_with_system() {
|
||||||
System::new().block_on(async { assert!(System::try_current().is_some()) });
|
System::new().block_on(async { assert!(System::try_current().is_some()) });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "io-uring")]
|
||||||
|
#[test]
|
||||||
|
fn tokio_uring_arbiter() {
|
||||||
|
let system = System::new();
|
||||||
|
let (tx, rx) = std::sync::mpsc::channel();
|
||||||
|
Arbiter::new().spawn(async move {
|
||||||
|
let handle = actix_rt::spawn(async move {
|
||||||
|
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
|
||||||
|
let buf = b"Hello World!";
|
||||||
|
let (res, _) = f.write_at(&buf[..], 0).await;
|
||||||
|
assert!(res.is_ok());
|
||||||
|
f.sync_all().await.unwrap();
|
||||||
|
f.close().await.unwrap();
|
||||||
|
std::fs::remove_file("test.txt").unwrap();
|
||||||
|
});
|
||||||
|
handle.await.unwrap();
|
||||||
|
tx.send(true).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
assert!(rx.recv().unwrap());
|
||||||
|
|
||||||
|
drop(system);
|
||||||
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ path = "src/lib.rs"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
|
io-uring = ["actix-rt/io-uring"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-rt = { version = "2.0.0", default-features = false }
|
actix-rt = { version = "2.0.0", default-features = false }
|
||||||
|
|
|
@ -280,14 +280,24 @@ impl ServerWorker {
|
||||||
let counter_clone = counter.clone();
|
let counter_clone = counter.clone();
|
||||||
// every worker runs in it's own arbiter.
|
// every worker runs in it's own arbiter.
|
||||||
// use a custom tokio runtime builder to change the settings of runtime.
|
// use a custom tokio runtime builder to change the settings of runtime.
|
||||||
Arbiter::with_tokio_rt(move || {
|
#[cfg(feature = "io-uring")]
|
||||||
|
let arbiter = {
|
||||||
|
// TODO: pass max blocking thread config when tokio-uring enable configuration
|
||||||
|
// on building runtime.
|
||||||
|
let _ = config.max_blocking_threads;
|
||||||
|
Arbiter::new()
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(not(feature = "io-uring"))]
|
||||||
|
let arbiter = Arbiter::with_tokio_rt(move || {
|
||||||
tokio::runtime::Builder::new_current_thread()
|
tokio::runtime::Builder::new_current_thread()
|
||||||
.enable_all()
|
.enable_all()
|
||||||
.max_blocking_threads(config.max_blocking_threads)
|
.max_blocking_threads(config.max_blocking_threads)
|
||||||
.build()
|
.build()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
})
|
});
|
||||||
.spawn(async move {
|
|
||||||
|
arbiter.spawn(async move {
|
||||||
let fut = factories
|
let fut = factories
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
|
|
Loading…
Reference in New Issue