From 4f19584d03e65653fa4ff0f824c6e547cfc8f27e Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 8 Mar 2022 20:14:28 +0000 Subject: [PATCH] update tokio-uring to 0.3 --- actix-rt/CHANGES.md | 3 +++ actix-rt/Cargo.toml | 9 ++++++--- actix-rt/src/arbiter.rs | 6 +++--- actix-rt/src/lib.rs | 4 ++-- actix-rt/src/system.rs | 12 ++++++------ actix-rt/tests/tests.rs | 22 +++++++++++----------- actix-server/CHANGES.md | 2 ++ actix-server/Cargo.toml | 11 +++++++---- actix-server/examples/file-reader.rs | 12 +++++++----- actix-server/examples/tcp-echo.rs | 13 ++++++------- actix-server/src/accept.rs | 2 +- actix-server/src/builder.rs | 2 +- actix-server/src/server.rs | 2 +- actix-server/src/service.rs | 2 +- actix-server/src/signals.rs | 4 ++-- actix-server/src/worker.rs | 13 ++++++++----- 16 files changed, 67 insertions(+), 52 deletions(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 80733537..cf8b4fd1 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,8 +1,11 @@ # Changes ## Unreleased - 2022-xx-xx +- Update `tokio-uring` dependency to `0.3.0`. [#???] - Minimum supported Rust version (MSRV) is now 1.49. +[#???]: https://github.com/actix/actix-net/pull/??? + ## 2.6.0 - 2022-01-12 - Update `tokio-uring` dependency to `0.2.0`. [#436] diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 406c1de5..c3f1424b 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -21,7 +21,10 @@ path = "src/lib.rs" [features] default = ["macros"] macros = ["actix-macros"] -io-uring = ["tokio-uring"] +experimental-io-uring = ["tokio-uring"] + +# deprecated `experimental-io-uring` alias +io-uring = ["experimental-io-uring"] [dependencies] actix-macros = { version = "0.2.3", optional = true } @@ -29,9 +32,9 @@ actix-macros = { version = "0.2.3", optional = true } futures-core = { version = "0.3", default-features = false } tokio = { version = "1.13.1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } -# runtime for io-uring feature +# runtime for `experimental-io-uring` feature [target.'cfg(target_os = "linux")'.dependencies] -tokio-uring = { version = "0.2", optional = true } +tokio-uring = { version = "0.3", optional = true } [dev-dependencies] tokio = { version = "1.13.1", features = ["full"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 95256360..5677e561 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -95,7 +95,7 @@ impl Arbiter { /// /// # Panics /// Panics if a [System] is not registered on the current thread. - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + #[cfg(not(all(target_os = "linux", feature = "experimental-io-uring")))] #[allow(clippy::new_without_default)] pub fn new() -> Arbiter { Self::with_tokio_rt(|| { @@ -107,7 +107,7 @@ impl Arbiter { /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. /// /// [tokio-runtime]: tokio::runtime::Runtime - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + #[cfg(not(all(target_os = "linux", feature = "experimental-io-uring")))] pub fn with_tokio_rt(runtime_factory: F) -> Arbiter where F: Fn() -> tokio::runtime::Runtime + Send + 'static, @@ -162,7 +162,7 @@ impl Arbiter { /// /// # Panics /// Panics if a [System] is not registered on the current thread. - #[cfg(all(target_os = "linux", feature = "io-uring"))] + #[cfg(all(target_os = "linux", feature = "experimental-io-uring"))] #[allow(clippy::new_without_default)] pub fn new() -> Arbiter { let sys = System::current(); diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 7fb2b632..40df7213 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -35,7 +35,7 @@ //! //! # `io-uring` Support //! There is experimental support for using io-uring with this crate by enabling the -//! `io-uring` feature. For now, it is semver exempt. +//! `experimental-io-uring` feature. For now, it is semver exempt. //! //! Note that there are currently some unimplemented parts of using `actix-rt` with `io-uring`. //! In particular, when running a `System`, only `System::block_on` is supported. @@ -46,7 +46,7 @@ #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] -#[cfg(all(not(target_os = "linux"), feature = "io-uring"))] +#[cfg(all(not(target_os = "linux"), feature = "experimental-io-uring"))] compile_error!("io_uring is a linux only feature."); use std::future::Future; diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 0ea3547d..eec853be 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -29,7 +29,7 @@ pub struct System { arbiter_handle: ArbiterHandle, } -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] impl System { /// Create a new system. /// @@ -70,7 +70,7 @@ impl System { } } -#[cfg(feature = "io-uring")] +#[cfg(feature = "experimental-io-uring")] impl System { /// Create a new system. /// @@ -171,7 +171,7 @@ impl System { } /// Runner that keeps a [System]'s event loop alive until stop message is received. -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] #[must_use = "A SystemRunner does nothing unless `run` is called."] #[derive(Debug)] pub struct SystemRunner { @@ -179,7 +179,7 @@ pub struct SystemRunner { stop_rx: oneshot::Receiver, } -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] impl SystemRunner { /// Starts event loop and will return once [System] is [stopped](System::stop). pub fn run(self) -> io::Result<()> { @@ -211,12 +211,12 @@ impl SystemRunner { } /// Runner that keeps a [System]'s event loop alive until stop message is received. -#[cfg(feature = "io-uring")] +#[cfg(feature = "experimental-io-uring")] #[must_use = "A SystemRunner does nothing unless `run` is called."] #[derive(Debug)] pub struct SystemRunner; -#[cfg(feature = "io-uring")] +#[cfg(feature = "experimental-io-uring")] impl SystemRunner { /// Starts event loop and will return once [System] is [stopped](System::stop). pub fn run(self) -> io::Result<()> { diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 551a395d..dd810025 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -5,7 +5,7 @@ use std::{ use actix_rt::{task::JoinError, Arbiter, System}; -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] use { std::{sync::mpsc::channel, thread}, tokio::sync::oneshot, @@ -24,7 +24,7 @@ fn await_for_timer() { ); } -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] #[test] fn run_with_code() { let sys = System::new(); @@ -118,7 +118,7 @@ fn wait_for_spawns() { // Temporary disabled tests for io-uring feature. // They should be enabled when possible. -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] #[test] fn arbiter_spawn_fn_runs() { let _ = System::new(); @@ -135,7 +135,7 @@ fn arbiter_spawn_fn_runs() { arbiter.join().unwrap(); } -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] #[test] fn arbiter_handle_spawn_fn_runs() { let sys = System::new(); @@ -158,7 +158,7 @@ fn arbiter_handle_spawn_fn_runs() { sys.run().unwrap(); } -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] #[test] fn arbiter_drop_no_panic_fn() { let _ = System::new(); @@ -170,7 +170,7 @@ fn arbiter_drop_no_panic_fn() { arbiter.join().unwrap(); } -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] #[test] fn arbiter_drop_no_panic_fut() { let _ = System::new(); @@ -182,7 +182,7 @@ fn arbiter_drop_no_panic_fut() { arbiter.join().unwrap(); } -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] #[test] fn system_arbiter_spawn() { let runner = System::new(); @@ -213,7 +213,7 @@ fn system_arbiter_spawn() { thread.join().unwrap(); } -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] #[test] fn system_stop_stops_arbiters() { let sys = System::new(); @@ -236,7 +236,7 @@ fn system_stop_stops_arbiters() { arb.join().unwrap(); } -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] #[test] fn new_system_with_tokio() { let (tx, rx) = channel(); @@ -269,7 +269,7 @@ fn new_system_with_tokio() { assert_eq!(rx.recv().unwrap(), 42); } -#[cfg(not(feature = "io-uring"))] +#[cfg(not(feature = "experimental-io-uring"))] #[test] fn new_arbiter_with_tokio() { use std::sync::{ @@ -348,7 +348,7 @@ fn spawn_local() { }) } -#[cfg(all(target_os = "linux", feature = "io-uring"))] +#[cfg(all(target_os = "linux", feature = "experimental-io-uring"))] #[test] fn tokio_uring_arbiter() { System::new().block_on(async { diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 90b291c2..ec09ea79 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,9 +1,11 @@ # Changes ## Unreleased - 2022-xx-xx +- Update `tokio-uring` dependency to `0.3.0`. [#???] - Wait for accept thread to stop before sending completion signal. [#443] [#443]: https://github.com/actix/actix-net/pull/443 +[#???]: https://github.com/actix/actix-net/pull/??? ## 2.0.0 - 2022-01-19 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 4a788a9b..1d07df80 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -21,7 +21,10 @@ path = "src/lib.rs" [features] default = [] -io-uring = ["tokio-uring", "actix-rt/io-uring"] +experimental-io-uring = ["tokio-uring", "actix-rt/io-uring"] + +# deprecated `experimental-io-uring` alias +io-uring = ["experimental-io-uring"] [dependencies] actix-rt = { version = "2.6.0", default-features = false } @@ -30,7 +33,7 @@ actix-utils = "3.0.0" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } -log = "0.4" +tracing = { version = "0.1.30", features = ["log"] } mio = { version = "0.8", features = ["os-poll", "net"] } num_cpus = "1.13" socket2 = "0.4.2" @@ -38,13 +41,13 @@ tokio = { version = "1.13.1", features = ["sync"] } # runtime for io-uring feature [target.'cfg(target_os = "linux")'.dependencies] -tokio-uring = { version = "0.2", optional = true } +tokio-uring = { version = "0.3", optional = true } [dev-dependencies] actix-codec = "0.5.0" actix-rt = "2.6.0" bytes = "1" -env_logger = "0.9" +tracing-subscriber = "0.3.8" futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] } tokio = { version = "1.13.1", features = ["io-util", "rt-multi-thread", "macros", "fs"] } diff --git a/actix-server/examples/file-reader.rs b/actix-server/examples/file-reader.rs index 3cc991d3..dff0d0ed 100644 --- a/actix-server/examples/file-reader.rs +++ b/actix-server/examples/file-reader.rs @@ -18,10 +18,10 @@ use futures_util::{SinkExt as _, StreamExt as _}; use tokio::{fs::File, io::AsyncReadExt as _}; async fn run() -> io::Result<()> { - env_logger::init_from_env(env_logger::Env::default().default_filter_or("info")); + tracing_subscriber::fmt::init(); let addr = ("127.0.0.1", 8080); - log::info!("starting server on port: {}", &addr.0); + tracing::info!("starting server on port: {}", &addr.0); // Bind socket address and start worker(s). By default, the server uses the number of physical // CPU cores as the worker count. For this reason, the closure passed to bind needs to return @@ -39,8 +39,10 @@ async fn run() -> io::Result<()> { // wait for next line match framed.next().await { Some(Ok(line)) => { - match File::open(line).await { + match File::open(&line).await { Ok(mut file) => { + tracing::info!("reading file: {line}"); + // read file into String buffer let mut buf = String::new(); file.read_to_string(&mut buf).await?; @@ -52,7 +54,7 @@ async fn run() -> io::Result<()> { break; } Err(err) => { - log::error!("{}", err); + tracing::error!("{}", err); framed .send("File not found or not readable. Try again.") .await?; @@ -72,7 +74,7 @@ async fn run() -> io::Result<()> { // close connection after file has been copied to TCP stream Ok(()) }) - .map_err(|err| log::error!("Service Error: {:?}", err)) + .map_err(|err| tracing::error!("Service Error: {:?}", err)) })? .workers(2) .run() diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index da0b7053..37d7ad00 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -22,16 +22,15 @@ use actix_server::Server; use actix_service::{fn_service, ServiceFactoryExt as _}; use bytes::BytesMut; use futures_util::future::ok; -use log::{error, info}; use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; async fn run() -> io::Result<()> { - env_logger::init_from_env(env_logger::Env::default().default_filter_or("info")); + tracing_subscriber::fmt::init(); let count = Arc::new(AtomicUsize::new(0)); let addr = ("127.0.0.1", 8080); - info!("starting server on port: {}", &addr.0); + tracing::info!("starting server on port: {}", &addr.0); // Bind socket address and start worker(s). By default, the server uses the number of physical // CPU cores as the worker count. For this reason, the closure passed to bind needs to return @@ -58,14 +57,14 @@ async fn run() -> io::Result<()> { // more bytes to process Ok(bytes_read) => { - info!("[{}] read {} bytes", num, bytes_read); + tracing::info!("[{}] read {} bytes", num, bytes_read); stream.write_all(&buf[size..]).await.unwrap(); size += bytes_read; } // stream error; bail from loop with error Err(err) => { - error!("Stream Error: {:?}", err); + tracing::error!("Stream Error: {:?}", err); return Err(()); } } @@ -75,10 +74,10 @@ async fn run() -> io::Result<()> { Ok((buf.freeze(), size)) } }) - .map_err(|err| error!("Service Error: {:?}", err)) + .map_err(|err| tracing::error!("Service Error: {:?}", err)) .and_then(move |(_, size)| { let num = num2.load(Ordering::SeqCst); - info!("[{}] total bytes read: {}", num, size); + tracing::info!("[{}] total bytes read: {}", num, size); ok(size) }) })? diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index a1c4f732..a5c8225d 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,8 +1,8 @@ use std::{io, thread, time::Duration}; use actix_rt::time::Instant; -use log::{debug, error, info}; use mio::{Interest, Poll, Token as MioToken}; +use tracing::{debug, error, info}; use crate::{ availability::Availability, diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index c3bc0269..8660b4a9 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,8 +1,8 @@ use std::{io, time::Duration}; use actix_rt::net::TcpStream; -use log::{info, trace}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tracing::{info, trace}; use crate::{ server::ServerCommand, diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 8defa543..3e0d7733 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -10,8 +10,8 @@ use std::{ use actix_rt::{time::sleep, System}; use futures_core::{future::BoxFuture, Stream}; use futures_util::stream::StreamExt as _; -use log::{error, info}; use tokio::sync::{mpsc::UnboundedReceiver, oneshot}; +use tracing::{error, info}; use crate::{ accept::Accept, diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 3a9aeee4..03ee72d5 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -7,7 +7,7 @@ use std::{ use actix_service::{Service, ServiceFactory as BaseServiceFactory}; use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; -use log::error; +use tracing::error; use crate::{ socket::{FromStream, MioStream}, diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index d8cb84e3..91bd52cc 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -5,7 +5,7 @@ use std::{ task::{Context, Poll}, }; -use log::trace; +use tracing::trace; /// Types of process signals. // #[allow(dead_code)] @@ -69,7 +69,7 @@ impl Signals { unix::signal(*kind) .map(|tokio_sig| (*sig, tokio_sig)) .map_err(|e| { - log::error!( + tracing::error!( "Can not initialize stream handler for {:?} err: {}", sig, e diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 2fef9a7b..4fbd0aa0 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -17,11 +17,11 @@ use actix_rt::{ Arbiter, ArbiterHandle, System, }; use futures_core::{future::LocalBoxFuture, ready}; -use log::{error, info, trace}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, }; +use tracing::{error, info, trace}; use crate::{ service::{BoxedServerService, InternalServiceFactory}, @@ -383,7 +383,7 @@ impl ServerWorker { worker_stopped_rx.await.unwrap(); }; - #[cfg(all(target_os = "linux", feature = "io-uring"))] + #[cfg(all(target_os = "linux", feature = "experimental-io-uring"))] { // TODO: pass max blocking thread config when tokio-uring enable configuration // on building runtime. @@ -391,7 +391,10 @@ impl ServerWorker { tokio_uring::start(worker_fut); } - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + #[cfg(not(all( + target_os = "linux", + feature = "experimental-io-uring" + )))] { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() @@ -407,7 +410,7 @@ impl ServerWorker { // with actix system (Some(_sys), _) => { - #[cfg(all(target_os = "linux", feature = "io-uring"))] + #[cfg(all(target_os = "linux", feature = "experimental-io-uring"))] let arbiter = { // TODO: pass max blocking thread config when tokio-uring enable configuration // on building runtime. @@ -415,7 +418,7 @@ impl ServerWorker { Arbiter::new() }; - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + #[cfg(not(all(target_os = "linux", feature = "experimental-io-uring")))] let arbiter = { Arbiter::with_tokio_rt(move || { tokio::runtime::Builder::new_current_thread()