From 172a87c8680f3f1c94ff20c8b6eb5e7cac423227 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 7 Nov 2021 22:16:31 +0800 Subject: [PATCH] fix io-uring feature for actix-server --- actix-server/Cargo.toml | 5 ++++- actix-server/src/worker.rs | 40 ++++++++++++++++++++++---------------- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index d58aaa39..7e23c861 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" [features] default = [] -io-uring = ["actix-rt/io-uring"] +io-uring = ["tokio-uring"] [dependencies] actix-rt = { version = "2.4.0", default-features = false } @@ -31,6 +31,9 @@ mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" tokio = { version = "1.5.1", features = ["sync"] } +# runtime for io-uring feature +tokio-uring = { version = "0.1", optional = true } + [dev-dependencies] actix-codec = "0.4.0" actix-rt = "2.0.0" diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 2d104b8d..02f68294 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -283,15 +283,6 @@ impl ServerWorker { let counter = Counter::new(config.max_concurrent_connections); let counter_clone = counter.clone(); - // every worker runs in it's own arbiter. - // use a custom tokio runtime builder to change the settings of runtime. - #[cfg(all(target_os = "linux", feature = "io-uring"))] - let arbiter = { - // TODO: pass max blocking thread config when tokio-uring enable configuration - // on building runtime. - let _ = config.max_blocking_threads; - Arbiter::new() - }; // get actix system context if it is set let sys = System::try_current(); @@ -299,6 +290,8 @@ impl ServerWorker { // service factories initialization channel let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1); + // every worker runs in it's own thread and tokio runtime. + // use a custom tokio runtime builder to change the settings of runtime. std::thread::Builder::new() .name(format!("actix-server worker {}", idx)) .spawn(move || { @@ -307,13 +300,7 @@ impl ServerWorker { System::set_current(sys); } - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap(); - - rt.block_on(tokio::task::LocalSet::new().run_until(async move { + let worker_fut = async move { let fut = factories .iter() .enumerate() @@ -368,7 +355,26 @@ impl ServerWorker { }) .await .expect("task 2 panic"); - })) + }; + + #[cfg(all(target_os = "linux", feature = "io-uring"))] + { + // TODO: pass max blocking thread config when tokio-uring enable configuration + // on building runtime. + let _ = config.max_blocking_threads; + tokio_uring::start(worker_fut) + } + + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap(); + + rt.block_on(tokio::task::LocalSet::new().run_until(worker_fut)) + } }) .expect("worker thread error/panic");