From 0394b49841c93fcf8336aee633a803ccf7aa942c Mon Sep 17 00:00:00 2001 From: wireless4024 <wireless4024@proton.me> Date: Wed, 15 Feb 2023 11:38:21 +0700 Subject: [PATCH] cleanup and rustfmt --- actix-rt/examples/multi_thread_system.rs | 2 +- actix-rt/src/arbiter.rs | 54 ------------------------ actix-rt/src/runtime.rs | 2 +- actix-rt/src/system.rs | 52 ----------------------- actix-rt/tests/tests.rs | 5 +-- actix-server/src/builder.rs | 12 +++--- actix-server/src/worker.rs | 7 +-- 7 files changed, 14 insertions(+), 120 deletions(-) diff --git a/actix-rt/examples/multi_thread_system.rs b/actix-rt/examples/multi_thread_system.rs index 4d03cd2d..bd998471 100644 --- a/actix-rt/examples/multi_thread_system.rs +++ b/actix-rt/examples/multi_thread_system.rs @@ -17,7 +17,7 @@ fn main() { .unwrap() } // you can remove this two lines if you don't have tokio_uring feature enabled. - // Note: tokio_uring is single thread and can't be multithreaded, + // Note: tokio_uring is single thread and can't be multithreaded, // unless you spawn multiple of them #[cfg(feature = "io-uring")] tokio_uring::Runtime::new(&tokio_uring::builder()).unwrap() diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index c8673749..47d00cd9 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -95,7 +95,6 @@ impl Arbiter { /// /// # Panics /// Panics if a [System] is not registered on the current thread. - //#[cfg(not(all(target_os = "linux", feature = "io-uring")))] #[allow(clippy::new_without_default)] pub fn new() -> Arbiter { Self::with_tokio_rt(|| { @@ -107,7 +106,6 @@ impl Arbiter { /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. /// /// [tokio-runtime]: tokio::runtime::Runtime - //#[cfg(not(all(target_os = "linux", feature = "io-uring")))] pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter where F: Fn() -> crate::runtime::GlobalRuntime + Send + 'static, @@ -158,58 +156,6 @@ impl Arbiter { Arbiter { tx, thread_handle } } - /// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime. - /// - /// # Panics - /// Panics if a [System] is not registered on the current thread. - /*#[cfg(all(target_os = "linux", feature = "io-uring"))] - #[allow(clippy::new_without_default)] - pub fn new() -> Arbiter { - let sys = System::current(); - let system_id = sys.id(); - let arb_id = COUNT.fetch_add(1, Ordering::Relaxed); - - let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id); - let (tx, rx) = mpsc::unbounded_channel(); - - let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); - - let thread_handle = thread::Builder::new() - .name(name.clone()) - .spawn({ - let tx = tx.clone(); - move || { - let hnd = ArbiterHandle::new(tx); - - System::set_current(sys); - - HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); - - // register arbiter - let _ = System::current() - .tx() - .send(SystemCommand::RegisterArbiter(arb_id, hnd)); - - ready_tx.send(()).unwrap(); - - // run arbiter event processing loop - tokio_uring::start(ArbiterRunner { rx }); - - // deregister arbiter - let _ = System::current() - .tx() - .send(SystemCommand::DeregisterArbiter(arb_id)); - } - }) - .unwrap_or_else(|err| { - panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err) - }); - - ready_rx.recv().unwrap(); - - Arbiter { tx, thread_handle } - }*/ - /// Sets up an Arbiter runner in a new System using the environment's local set. pub(crate) fn in_new_system() -> ArbiterHandle { let (tx, rx) = mpsc::unbounded_channel(); diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index ae5d32a4..ed153712 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -6,7 +6,7 @@ use tokio::task::{JoinHandle, LocalSet}; /// /// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound /// on submitted futures. -#[cfg_attr(not(all(target_os = "linux", feature = "io-uring")),derive(Debug))] +#[cfg_attr(not(all(target_os = "linux", feature = "io-uring")), derive(Debug))] pub struct Runtime { local: LocalSet, rt: GlobalRuntime, diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 6968efda..175fe660 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -29,7 +29,6 @@ pub struct System { arbiter_handle: ArbiterHandle, } -//#[cfg(not(feature = "io-uring"))] impl System { /// Create a new system. /// @@ -193,7 +192,6 @@ impl System { } /// Runner that keeps a [System]'s event loop alive until stop message is received. -//#[cfg(not(feature = "io-uring"))] #[must_use = "A SystemRunner does nothing unless `run` is called."] #[derive(Debug)] pub struct SystemRunner { @@ -201,7 +199,6 @@ pub struct SystemRunner { stop_rx: oneshot::Receiver<i32>, } -//#[cfg(not(feature = "io-uring"))] impl SystemRunner { /// Starts event loop and will return once [System] is [stopped](System::stop). pub fn run(self) -> io::Result<()> { @@ -232,55 +229,6 @@ impl SystemRunner { self.rt.block_on(fut) } } -/* -/// Runner that keeps a [System]'s event loop alive until stop message is received. -#[cfg(feature = "io-uring")] -#[must_use = "A SystemRunner does nothing unless `run` is called."] -#[derive(Debug)] -pub struct SystemRunner { - rt: crate::runtime::Runtime, - stop_rx: oneshot::Receiver<i32>, -} - -#[cfg(feature = "io-uring")] -impl SystemRunner { - /// Starts event loop and will return once [System] is [stopped](System::stop). - pub fn run(self) -> io::Result<()> { - unimplemented!("SystemRunner::run is not implemented for io-uring feature yet"); - } - - /// Runs the event loop until [stopped](System::stop_with_code), returning the exit code. - pub fn run_with_code(self) -> io::Result<i32> { - unimplemented!( - "SystemRunner::run_with_code is not implemented for io-uring feature yet" - ); - } - - /// Runs the provided future, blocking the current thread until the future completes. - #[inline] - pub fn block_on<F: Future>(&self, fut: F) -> F::Output { - tokio_uring::start(async move { - let (stop_tx, stop_rx) = oneshot::channel(); - let (sys_tx, sys_rx) = mpsc::unbounded_channel(); - - let sys_arbiter = Arbiter::in_new_system(); - let system = System::construct(sys_tx, sys_arbiter.clone()); - - system - .tx() - .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter)) - .unwrap(); - - // init background system arbiter - let sys_ctrl = SystemController::new(sys_rx, stop_tx); - tokio_uring::spawn(sys_ctrl); - - let res = fut.await; - drop(stop_rx); - res - }) - } -}*/ #[derive(Debug)] pub(crate) enum SystemCommand { diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index a1f7e473..28c07820 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -337,9 +337,8 @@ fn new_arbiter_with_tokio() { let _ = System::new(); - let arb = Arbiter::with_tokio_rt(|| { - tokio_uring::Runtime::new(&tokio_uring::builder()).unwrap() - }); + let arb = + Arbiter::with_tokio_rt(|| tokio_uring::Runtime::new(&tokio_uring::builder()).unwrap()); let counter = Arc::new(AtomicBool::new(true)); diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 5d2b7529..974977ab 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -84,13 +84,13 @@ impl ServerBuilder { self.worker_config.max_blocking_threads(num); self } - + #[cfg(all(target_os = "linux", feature = "io-uring"))] - /// Set max number of submission queue and completion queue AKA. ring size + /// Set max number of submission queue and completion queue AKA. ring size /// for each worker's ring. - /// + /// /// Max ring size is defined here: [io_uring.c](https://github.com/torvalds/linux/blob/f339c2597ebb00e738f2b6328c14804ed19f5d57/io_uring/io_uring.c#L99) - /// + /// /// # Examples: /// ``` /// # use actix_server::ServerBuilder; @@ -98,13 +98,13 @@ impl ServerBuilder { /// .workers(4) // server has 4 worker thread. /// .worker_max_blocking_threads(512); // every worker has 512 sq & cq. /// ``` - /// + /// /// See [tokio_uring::Builder::entries] for behavior reference. pub fn worker_max_blocking_threads(mut self, num: usize) -> Self { self.worker_config.max_blocking_threads(num); self } - + /// Set the maximum number of pending connections. /// /// This refers to the number of clients that can be waiting to be served. Exceeding this number diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index f7291794..cbc6e4f0 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -252,7 +252,7 @@ impl Default for ServerWorkerConfig { // 512 is the default max blocking thread count of tokio runtime. #[cfg(not(all(target_os = "linux", feature = "io-uring")))] let max_blocking_threads = std::cmp::max(512 / num_cpus::get_physical(), 1); - // 256 is default sq & cq size used by tokio_uring + // 256 is default sq & cq size used by tokio_uring #[cfg(all(target_os = "linux", feature = "io-uring"))] let max_blocking_threads = 256; Self { @@ -389,9 +389,10 @@ impl ServerWorker { #[cfg(all(target_os = "linux", feature = "io-uring"))] { - // passing `max_blocking_threads` as submission queue & completion queue + // passing `max_blocking_threads` as submission queue & completion queue // should be useful than let it sit here - let queue_size = config.max_blocking_threads.clamp(1, u32::MAX as usize) as u32; + let queue_size = + config.max_blocking_threads.clamp(1, u32::MAX as usize) as u32; let mut builder = tokio_uring::builder(); builder.entries(queue_size); builder.start(worker_fut);