update tokio-uring to 0.3

This commit is contained in:
Rob Ede 2022-03-08 20:14:28 +00:00
parent bd9bda0504
commit 4f19584d03
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
16 changed files with 67 additions and 52 deletions

View File

@ -1,8 +1,11 @@
# Changes
## Unreleased - 2022-xx-xx
- Update `tokio-uring` dependency to `0.3.0`. [#???]
- Minimum supported Rust version (MSRV) is now 1.49.
[#???]: https://github.com/actix/actix-net/pull/???
## 2.6.0 - 2022-01-12
- Update `tokio-uring` dependency to `0.2.0`. [#436]

View File

@ -21,7 +21,10 @@ path = "src/lib.rs"
[features]
default = ["macros"]
macros = ["actix-macros"]
io-uring = ["tokio-uring"]
experimental-io-uring = ["tokio-uring"]
# deprecated `experimental-io-uring` alias
io-uring = ["experimental-io-uring"]
[dependencies]
actix-macros = { version = "0.2.3", optional = true }
@ -29,9 +32,9 @@ actix-macros = { version = "0.2.3", optional = true }
futures-core = { version = "0.3", default-features = false }
tokio = { version = "1.13.1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
# runtime for io-uring feature
# runtime for `experimental-io-uring` feature
[target.'cfg(target_os = "linux")'.dependencies]
tokio-uring = { version = "0.2", optional = true }
tokio-uring = { version = "0.3", optional = true }
[dev-dependencies]
tokio = { version = "1.13.1", features = ["full"] }

View File

@ -95,7 +95,7 @@ impl Arbiter {
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[cfg(not(all(target_os = "linux", feature = "experimental-io-uring")))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
Self::with_tokio_rt(|| {
@ -107,7 +107,7 @@ impl Arbiter {
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[cfg(not(all(target_os = "linux", feature = "experimental-io-uring")))]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
@ -162,7 +162,7 @@ impl Arbiter {
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[cfg(all(target_os = "linux", feature = "experimental-io-uring"))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
let sys = System::current();

View File

@ -35,7 +35,7 @@
//!
//! # `io-uring` Support
//! There is experimental support for using io-uring with this crate by enabling the
//! `io-uring` feature. For now, it is semver exempt.
//! `experimental-io-uring` feature. For now, it is semver exempt.
//!
//! Note that there are currently some unimplemented parts of using `actix-rt` with `io-uring`.
//! In particular, when running a `System`, only `System::block_on` is supported.
@ -46,7 +46,7 @@
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
#[cfg(all(not(target_os = "linux"), feature = "io-uring"))]
#[cfg(all(not(target_os = "linux"), feature = "experimental-io-uring"))]
compile_error!("io_uring is a linux only feature.");
use std::future::Future;

View File

@ -29,7 +29,7 @@ pub struct System {
arbiter_handle: ArbiterHandle,
}
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
impl System {
/// Create a new system.
///
@ -70,7 +70,7 @@ impl System {
}
}
#[cfg(feature = "io-uring")]
#[cfg(feature = "experimental-io-uring")]
impl System {
/// Create a new system.
///
@ -171,7 +171,7 @@ impl System {
}
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner {
@ -179,7 +179,7 @@ pub struct SystemRunner {
stop_rx: oneshot::Receiver<i32>,
}
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
@ -211,12 +211,12 @@ impl SystemRunner {
}
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[cfg(feature = "io-uring")]
#[cfg(feature = "experimental-io-uring")]
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner;
#[cfg(feature = "io-uring")]
#[cfg(feature = "experimental-io-uring")]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {

View File

@ -5,7 +5,7 @@ use std::{
use actix_rt::{task::JoinError, Arbiter, System};
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
use {
std::{sync::mpsc::channel, thread},
tokio::sync::oneshot,
@ -24,7 +24,7 @@ fn await_for_timer() {
);
}
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
#[test]
fn run_with_code() {
let sys = System::new();
@ -118,7 +118,7 @@ fn wait_for_spawns() {
// Temporary disabled tests for io-uring feature.
// They should be enabled when possible.
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
#[test]
fn arbiter_spawn_fn_runs() {
let _ = System::new();
@ -135,7 +135,7 @@ fn arbiter_spawn_fn_runs() {
arbiter.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
#[test]
fn arbiter_handle_spawn_fn_runs() {
let sys = System::new();
@ -158,7 +158,7 @@ fn arbiter_handle_spawn_fn_runs() {
sys.run().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
#[test]
fn arbiter_drop_no_panic_fn() {
let _ = System::new();
@ -170,7 +170,7 @@ fn arbiter_drop_no_panic_fn() {
arbiter.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
#[test]
fn arbiter_drop_no_panic_fut() {
let _ = System::new();
@ -182,7 +182,7 @@ fn arbiter_drop_no_panic_fut() {
arbiter.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
#[test]
fn system_arbiter_spawn() {
let runner = System::new();
@ -213,7 +213,7 @@ fn system_arbiter_spawn() {
thread.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
#[test]
fn system_stop_stops_arbiters() {
let sys = System::new();
@ -236,7 +236,7 @@ fn system_stop_stops_arbiters() {
arb.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
#[test]
fn new_system_with_tokio() {
let (tx, rx) = channel();
@ -269,7 +269,7 @@ fn new_system_with_tokio() {
assert_eq!(rx.recv().unwrap(), 42);
}
#[cfg(not(feature = "io-uring"))]
#[cfg(not(feature = "experimental-io-uring"))]
#[test]
fn new_arbiter_with_tokio() {
use std::sync::{
@ -348,7 +348,7 @@ fn spawn_local() {
})
}
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[cfg(all(target_os = "linux", feature = "experimental-io-uring"))]
#[test]
fn tokio_uring_arbiter() {
System::new().block_on(async {

View File

@ -1,9 +1,11 @@
# Changes
## Unreleased - 2022-xx-xx
- Update `tokio-uring` dependency to `0.3.0`. [#???]
- Wait for accept thread to stop before sending completion signal. [#443]
[#443]: https://github.com/actix/actix-net/pull/443
[#???]: https://github.com/actix/actix-net/pull/???
## 2.0.0 - 2022-01-19

View File

@ -21,7 +21,10 @@ path = "src/lib.rs"
[features]
default = []
io-uring = ["tokio-uring", "actix-rt/io-uring"]
experimental-io-uring = ["tokio-uring", "actix-rt/io-uring"]
# deprecated `experimental-io-uring` alias
io-uring = ["experimental-io-uring"]
[dependencies]
actix-rt = { version = "2.6.0", default-features = false }
@ -30,7 +33,7 @@ actix-utils = "3.0.0"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4"
tracing = { version = "0.1.30", features = ["log"] }
mio = { version = "0.8", features = ["os-poll", "net"] }
num_cpus = "1.13"
socket2 = "0.4.2"
@ -38,13 +41,13 @@ tokio = { version = "1.13.1", features = ["sync"] }
# runtime for io-uring feature
[target.'cfg(target_os = "linux")'.dependencies]
tokio-uring = { version = "0.2", optional = true }
tokio-uring = { version = "0.3", optional = true }
[dev-dependencies]
actix-codec = "0.5.0"
actix-rt = "2.6.0"
bytes = "1"
env_logger = "0.9"
tracing-subscriber = "0.3.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] }
tokio = { version = "1.13.1", features = ["io-util", "rt-multi-thread", "macros", "fs"] }

View File

@ -18,10 +18,10 @@ use futures_util::{SinkExt as _, StreamExt as _};
use tokio::{fs::File, io::AsyncReadExt as _};
async fn run() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
tracing_subscriber::fmt::init();
let addr = ("127.0.0.1", 8080);
log::info!("starting server on port: {}", &addr.0);
tracing::info!("starting server on port: {}", &addr.0);
// Bind socket address and start worker(s). By default, the server uses the number of physical
// CPU cores as the worker count. For this reason, the closure passed to bind needs to return
@ -39,8 +39,10 @@ async fn run() -> io::Result<()> {
// wait for next line
match framed.next().await {
Some(Ok(line)) => {
match File::open(line).await {
match File::open(&line).await {
Ok(mut file) => {
tracing::info!("reading file: {line}");
// read file into String buffer
let mut buf = String::new();
file.read_to_string(&mut buf).await?;
@ -52,7 +54,7 @@ async fn run() -> io::Result<()> {
break;
}
Err(err) => {
log::error!("{}", err);
tracing::error!("{}", err);
framed
.send("File not found or not readable. Try again.")
.await?;
@ -72,7 +74,7 @@ async fn run() -> io::Result<()> {
// close connection after file has been copied to TCP stream
Ok(())
})
.map_err(|err| log::error!("Service Error: {:?}", err))
.map_err(|err| tracing::error!("Service Error: {:?}", err))
})?
.workers(2)
.run()

View File

@ -22,16 +22,15 @@ use actix_server::Server;
use actix_service::{fn_service, ServiceFactoryExt as _};
use bytes::BytesMut;
use futures_util::future::ok;
use log::{error, info};
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
async fn run() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
tracing_subscriber::fmt::init();
let count = Arc::new(AtomicUsize::new(0));
let addr = ("127.0.0.1", 8080);
info!("starting server on port: {}", &addr.0);
tracing::info!("starting server on port: {}", &addr.0);
// Bind socket address and start worker(s). By default, the server uses the number of physical
// CPU cores as the worker count. For this reason, the closure passed to bind needs to return
@ -58,14 +57,14 @@ async fn run() -> io::Result<()> {
// more bytes to process
Ok(bytes_read) => {
info!("[{}] read {} bytes", num, bytes_read);
tracing::info!("[{}] read {} bytes", num, bytes_read);
stream.write_all(&buf[size..]).await.unwrap();
size += bytes_read;
}
// stream error; bail from loop with error
Err(err) => {
error!("Stream Error: {:?}", err);
tracing::error!("Stream Error: {:?}", err);
return Err(());
}
}
@ -75,10 +74,10 @@ async fn run() -> io::Result<()> {
Ok((buf.freeze(), size))
}
})
.map_err(|err| error!("Service Error: {:?}", err))
.map_err(|err| tracing::error!("Service Error: {:?}", err))
.and_then(move |(_, size)| {
let num = num2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size);
tracing::info!("[{}] total bytes read: {}", num, size);
ok(size)
})
})?

View File

@ -1,8 +1,8 @@
use std::{io, thread, time::Duration};
use actix_rt::time::Instant;
use log::{debug, error, info};
use mio::{Interest, Poll, Token as MioToken};
use tracing::{debug, error, info};
use crate::{
availability::Availability,

View File

@ -1,8 +1,8 @@
use std::{io, time::Duration};
use actix_rt::net::TcpStream;
use log::{info, trace};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tracing::{info, trace};
use crate::{
server::ServerCommand,

View File

@ -10,8 +10,8 @@ use std::{
use actix_rt::{time::sleep, System};
use futures_core::{future::BoxFuture, Stream};
use futures_util::stream::StreamExt as _;
use log::{error, info};
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
use tracing::{error, info};
use crate::{
accept::Accept,

View File

@ -7,7 +7,7 @@ use std::{
use actix_service::{Service, ServiceFactory as BaseServiceFactory};
use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture;
use log::error;
use tracing::error;
use crate::{
socket::{FromStream, MioStream},

View File

@ -5,7 +5,7 @@ use std::{
task::{Context, Poll},
};
use log::trace;
use tracing::trace;
/// Types of process signals.
// #[allow(dead_code)]
@ -69,7 +69,7 @@ impl Signals {
unix::signal(*kind)
.map(|tokio_sig| (*sig, tokio_sig))
.map_err(|e| {
log::error!(
tracing::error!(
"Can not initialize stream handler for {:?} err: {}",
sig,
e

View File

@ -17,11 +17,11 @@ use actix_rt::{
Arbiter, ArbiterHandle, System,
};
use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
use tracing::{error, info, trace};
use crate::{
service::{BoxedServerService, InternalServiceFactory},
@ -383,7 +383,7 @@ impl ServerWorker {
worker_stopped_rx.await.unwrap();
};
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[cfg(all(target_os = "linux", feature = "experimental-io-uring"))]
{
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
@ -391,7 +391,10 @@ impl ServerWorker {
tokio_uring::start(worker_fut);
}
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[cfg(not(all(
target_os = "linux",
feature = "experimental-io-uring"
)))]
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
@ -407,7 +410,7 @@ impl ServerWorker {
// with actix system
(Some(_sys), _) => {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[cfg(all(target_os = "linux", feature = "experimental-io-uring"))]
let arbiter = {
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
@ -415,7 +418,7 @@ impl ServerWorker {
Arbiter::new()
};
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[cfg(not(all(target_os = "linux", feature = "experimental-io-uring")))]
let arbiter = {
Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()