implement missing rt feature in tokio_uring runtime

This commit is contained in:
wireless4024 2023-02-12 10:08:58 +07:00
parent fe019304e1
commit e8bb559e64
No known key found for this signature in database
GPG Key ID: D102C2A868192A39
5 changed files with 86 additions and 26 deletions

View File

@ -10,13 +10,20 @@ async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
} }
fn main() { fn main() {
actix_rt::System::with_tokio_rt(|| { #[cfg(not(feature = "io-uring"))]
let rt = actix_rt::System::with_tokio_rt(|| {
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.enable_all() .enable_all()
.build() .build()
.unwrap() .unwrap()
}) });
.block_on(async { #[cfg(feature = "io-uring")]
let rt = actix_rt::System::with_tokio_rt(|| {
tokio_uring::Runtime::new(&tokio_uring::builder())
.unwrap()
});
rt.block_on(async {
let make_service = let make_service =
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) }); make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });

View File

@ -4,15 +4,22 @@
use actix_rt::System; use actix_rt::System;
fn main() { fn main() {
System::with_tokio_rt(|| { #[cfg(not(feature = "io-uring"))]
let system = System::with_tokio_rt(|| {
// build system with a multi-thread tokio runtime. // build system with a multi-thread tokio runtime.
tokio::runtime::Builder::new_multi_thread() tokio::runtime::Builder::new_multi_thread()
.worker_threads(2) .worker_threads(2)
.enable_all() .enable_all()
.build() .build()
.unwrap() .unwrap()
}) });
.block_on(async_main()); #[cfg(feature = "io-uring")]
let system = System::with_tokio_rt(|| {
// build system with tokio uring runtime.
tokio_uring::Runtime::new(&tokio_uring::builder())
.unwrap()
});
system.block_on(async_main());
} }
// async main function that acts like #[actix_web::main] or #[tokio::main] // async main function that acts like #[actix_web::main] or #[tokio::main]

View File

@ -95,7 +95,7 @@ impl Arbiter {
/// ///
/// # Panics /// # Panics
/// Panics if a [System] is not registered on the current thread. /// Panics if a [System] is not registered on the current thread.
#[cfg(not(all(target_os = "linux", feature = "io-uring")))] //#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub fn new() -> Arbiter { pub fn new() -> Arbiter {
Self::with_tokio_rt(|| { Self::with_tokio_rt(|| {
@ -107,10 +107,10 @@ impl Arbiter {
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
/// ///
/// [tokio-runtime]: tokio::runtime::Runtime /// [tokio-runtime]: tokio::runtime::Runtime
#[cfg(not(all(target_os = "linux", feature = "io-uring")))] //#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where where
F: Fn() -> tokio::runtime::Runtime + Send + 'static, F: Fn() -> crate::runtime::GlobalRuntime + Send + 'static,
{ {
let sys = System::current(); let sys = System::current();
let system_id = sys.id(); let system_id = sys.id();
@ -162,7 +162,7 @@ impl Arbiter {
/// ///
/// # Panics /// # Panics
/// Panics if a [System] is not registered on the current thread. /// Panics if a [System] is not registered on the current thread.
#[cfg(all(target_os = "linux", feature = "io-uring"))] /*#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub fn new() -> Arbiter { pub fn new() -> Arbiter {
let sys = System::current(); let sys = System::current();
@ -208,7 +208,7 @@ impl Arbiter {
ready_rx.recv().unwrap(); ready_rx.recv().unwrap();
Arbiter { tx, thread_handle } Arbiter { tx, thread_handle }
} }*/
/// Sets up an Arbiter runner in a new System using the environment's local set. /// Sets up an Arbiter runner in a new System using the environment's local set.
pub(crate) fn in_new_system() -> ArbiterHandle { pub(crate) fn in_new_system() -> ArbiterHandle {

View File

@ -6,19 +6,40 @@ use tokio::task::{JoinHandle, LocalSet};
/// ///
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound /// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
/// on submitted futures. /// on submitted futures.
#[derive(Debug)] #[cfg_attr(not(all(target_os = "linux", feature = "io-uring")),derive(Debug))]
pub struct Runtime { pub struct Runtime {
local: LocalSet, local: LocalSet,
rt: tokio::runtime::Runtime, rt: GlobalRuntime,
} }
pub(crate) fn default_tokio_runtime() -> io::Result<tokio::runtime::Runtime> { #[cfg(all(target_os = "linux", feature = "io-uring"))]
impl std::fmt::Debug for Runtime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Runtime")
.field("local", &self.local)
.finish_non_exhaustive()
}
}
#[cfg(all(target_os = "linux", feature = "io-uring"))]
pub(crate) type GlobalRuntime = tokio_uring::Runtime;
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub(crate) type GlobalRuntime = tokio::runtime::Runtime;
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub(crate) fn default_tokio_runtime() -> io::Result<GlobalRuntime> {
tokio::runtime::Builder::new_current_thread() tokio::runtime::Builder::new_current_thread()
.enable_io() .enable_io()
.enable_time() .enable_time()
.build() .build()
} }
#[cfg(all(target_os = "linux", feature = "io-uring"))]
pub(crate) fn default_tokio_runtime() -> io::Result<GlobalRuntime> {
tokio_uring::Runtime::new(&tokio_uring::builder())
}
impl Runtime { impl Runtime {
/// Returns a new runtime initialized with default configuration values. /// Returns a new runtime initialized with default configuration values.
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
@ -79,12 +100,12 @@ impl Runtime {
where where
F: Future, F: Future,
{ {
self.local.block_on(&self.rt, f) self.rt.block_on(self.local.run_until(f))
} }
} }
impl From<tokio::runtime::Runtime> for Runtime { impl From<GlobalRuntime> for Runtime {
fn from(rt: tokio::runtime::Runtime) -> Self { fn from(rt: GlobalRuntime) -> Self {
Self { Self {
local: LocalSet::new(), local: LocalSet::new(),
rt, rt,

View File

@ -78,18 +78,40 @@ impl System {
/// Panics if underlying Tokio runtime can not be created. /// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)] #[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner { pub fn new() -> SystemRunner {
SystemRunner Self::with_tokio_rt(|| {
crate::runtime::default_tokio_runtime()
.expect("Default Actix (Tokio) runtime could not be created.")
})
} }
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure. /// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
/// ///
/// [tokio-runtime]: tokio::runtime::Runtime /// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)] #[doc(hidden)]
pub fn with_tokio_rt<F>(_: F) -> SystemRunner pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
where where
F: Fn() -> tokio::runtime::Runtime, F: Fn() -> tokio_uring::Runtime,
{ {
unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet") let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = runtime_factory();
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
let system = System::construct(sys_tx, sys_arbiter.clone());
system
.tx()
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
.unwrap();
// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
rt.block_on(async move {
tokio::spawn(sys_ctrl);
});
let rt = crate::runtime::Runtime::from(rt);
SystemRunner { rt, stop_rx }
} }
} }
@ -171,7 +193,7 @@ impl System {
} }
/// Runner that keeps a [System]'s event loop alive until stop message is received. /// Runner that keeps a [System]'s event loop alive until stop message is received.
#[cfg(not(feature = "io-uring"))] //#[cfg(not(feature = "io-uring"))]
#[must_use = "A SystemRunner does nothing unless `run` is called."] #[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)] #[derive(Debug)]
pub struct SystemRunner { pub struct SystemRunner {
@ -179,7 +201,7 @@ pub struct SystemRunner {
stop_rx: oneshot::Receiver<i32>, stop_rx: oneshot::Receiver<i32>,
} }
#[cfg(not(feature = "io-uring"))] //#[cfg(not(feature = "io-uring"))]
impl SystemRunner { impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop). /// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> { pub fn run(self) -> io::Result<()> {
@ -210,12 +232,15 @@ impl SystemRunner {
self.rt.block_on(fut) self.rt.block_on(fut)
} }
} }
/*
/// Runner that keeps a [System]'s event loop alive until stop message is received. /// Runner that keeps a [System]'s event loop alive until stop message is received.
#[cfg(feature = "io-uring")] #[cfg(feature = "io-uring")]
#[must_use = "A SystemRunner does nothing unless `run` is called."] #[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)] #[derive(Debug)]
pub struct SystemRunner; pub struct SystemRunner {
rt: crate::runtime::Runtime,
stop_rx: oneshot::Receiver<i32>,
}
#[cfg(feature = "io-uring")] #[cfg(feature = "io-uring")]
impl SystemRunner { impl SystemRunner {
@ -255,7 +280,7 @@ impl SystemRunner {
res res
}) })
} }
} }*/
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum SystemCommand { pub(crate) enum SystemCommand {