Compare commits

...

4 Commits

Author SHA1 Message Date
Gerd Zellweger 092f00ef41
Merge 192d0661e9 into 5267bc2c89 2025-08-31 17:04:58 -07:00
Rob Ede 5267bc2c89
chore(rt): include correct tokio features 2025-08-30 02:31:17 +01:00
Rob Ede 3c0702cfcd
chore(actix-rt): prepare release 2.11.0 2025-08-30 02:25:46 +01:00
Gerd Zellweger 192d0661e9 Change with_tokio_rt to accept Arc<Runtime>.
This allows to share tokio runtimes across different sub-systems
inside your application.

Signed-off-by: Gerd Zellweger <mail@gerdzellweger.com>
2024-09-04 15:08:47 -07:00
12 changed files with 67 additions and 44 deletions

31
Cargo.lock generated
View File

@ -34,7 +34,7 @@ dependencies = [
[[package]] [[package]]
name = "actix-rt" name = "actix-rt"
version = "2.10.0" version = "2.11.0"
dependencies = [ dependencies = [
"actix-macros", "actix-macros",
"futures-core", "futures-core",
@ -406,18 +406,18 @@ dependencies = [
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.5.45" version = "4.5.46"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fc0e74a703892159f5ae7d3aac52c8e6c392f5ae5f359c70b5881d60aaac318" checksum = "2c5e4fcf9c21d2e544ca1ee9d8552de13019a42aa7dbf32747fa7aaf1df76e57"
dependencies = [ dependencies = [
"clap_builder", "clap_builder",
] ]
[[package]] [[package]]
name = "clap_builder" name = "clap_builder"
version = "4.5.44" version = "4.5.46"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3e7f4214277f3c7aa526a59dd3fbe306a370daee1f8b7b8c987069cd8e888a8" checksum = "fecb53a0e6fcfb055f686001bc2e2592fa527efaf38dbe81a6a9563562e57d41"
dependencies = [ dependencies = [
"anstyle", "anstyle",
"clap_lex", "clap_lex",
@ -721,7 +721,7 @@ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"r-efi", "r-efi",
"wasi 0.14.2+wasi-0.2.4", "wasi 0.14.3+wasi-0.2.4",
] ]
[[package]] [[package]]
@ -1394,9 +1394,9 @@ dependencies = [
[[package]] [[package]]
name = "potential_utf" name = "potential_utf"
version = "0.1.2" version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" checksum = "84df19adbe5b5a0782edcab45899906947ab039ccf4573713735ee7de1e6b08a"
dependencies = [ dependencies = [
"zerovec", "zerovec",
] ]
@ -2515,11 +2515,11 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.14.2+wasi-0.2.4" version = "0.14.3+wasi-0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" checksum = "6a51ae83037bdd272a9e28ce236db8c07016dd0d50c27038b3f407533c030c95"
dependencies = [ dependencies = [
"wit-bindgen-rt", "wit-bindgen",
] ]
[[package]] [[package]]
@ -2927,13 +2927,10 @@ dependencies = [
] ]
[[package]] [[package]]
name = "wit-bindgen-rt" name = "wit-bindgen"
version = "0.39.0" version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" checksum = "052283831dbae3d879dc7f51f3d92703a316ca49f91540417d38591826127814"
dependencies = [
"bitflags 2.9.3",
]
[[package]] [[package]]
name = "writeable" name = "writeable"

View File

@ -15,6 +15,8 @@ members = [
] ]
[workspace.package] [workspace.package]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2021" edition = "2021"
rust-version = "1.75" rust-version = "1.75"

View File

@ -2,6 +2,10 @@
## Unreleased ## Unreleased
## 2.11.0
- Implement `ActixStream` for `tokio::io::BufReader<IO>`.
- Deprecate the `pin` re-export.
- Minimum supported Rust version (MSRV) is now 1.75. - Minimum supported Rust version (MSRV) is now 1.75.
## 2.10.0 ## 2.10.0

View File

@ -1,13 +1,13 @@
[package] [package]
name = "actix-rt" name = "actix-rt"
version = "2.10.0" version = "2.11.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Rob Ede <robjtede@icloud.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>", "Rob Ede <robjtede@icloud.com>"]
description = "Tokio-based single-threaded async runtime for the Actix ecosystem" description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
keywords = ["async", "futures", "io", "runtime"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0" keywords = ["async", "futures", "io", "runtime"]
homepage.workspace = true
repository.workspace = true
license.workspace = true
edition.workspace = true edition.workspace = true
rust-version.workspace = true rust-version.workspace = true
@ -23,7 +23,7 @@ io-uring = ["tokio-uring"]
actix-macros = { version = "0.2.3", optional = true } actix-macros = { version = "0.2.3", optional = true }
futures-core = { version = "0.3", default-features = false } futures-core = { version = "0.3", default-features = false }
tokio = { version = "1.44.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } tokio = { version = "1.44.2", features = ["rt", "io-util", "net", "parking_lot", "signal", "sync", "time"] }
# runtime for `io-uring` feature # runtime for `io-uring` feature
[target.'cfg(target_os = "linux")'.dependencies] [target.'cfg(target_os = "linux")'.dependencies]

View File

@ -3,11 +3,11 @@
> Tokio-based single-threaded async runtime for the Actix ecosystem. > Tokio-based single-threaded async runtime for the Actix ecosystem.
[![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt) [![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt)
[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.10.0)](https://docs.rs/actix-rt/2.10.0) [![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.11.0)](https://docs.rs/actix-rt/2.11.0)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html) [![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg) ![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg)
<br /> <br />
[![dependency status](https://deps.rs/crate/actix-rt/2.10.0/status.svg)](https://deps.rs/crate/actix-rt/2.10.0) [![dependency status](https://deps.rs/crate/actix-rt/2.11.0/status.svg)](https://deps.rs/crate/actix-rt/2.11.0)
![Download](https://img.shields.io/crates/d/actix-rt.svg) ![Download](https://img.shields.io/crates/d/actix-rt.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb) [![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb)

View File

@ -10,6 +10,7 @@ fn main() {
.worker_threads(2) .worker_threads(2)
.enable_all() .enable_all()
.build() .build()
.map(std::sync::Arc::new)
.unwrap() .unwrap()
}) })
.block_on(async_main()); .block_on(async_main());

View File

@ -109,7 +109,7 @@ impl Arbiter {
#[cfg(not(all(target_os = "linux", feature = "io-uring")))] #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where where
F: FnOnce() -> tokio::runtime::Runtime + Send + 'static, F: FnOnce() -> std::sync::Arc<tokio::runtime::Runtime> + Send + 'static,
{ {
let sys = System::current(); let sys = System::current();
let system_id = sys.id(); let system_id = sys.id();

View File

@ -61,6 +61,7 @@ mod arbiter;
mod runtime; mod runtime;
mod system; mod system;
#[deprecated(since = "2.11.0", note = "Prefer `std::pin::pin!`.")]
pub use tokio::pin; pub use tokio::pin;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
@ -87,10 +88,11 @@ pub mod net {
use std::{ use std::{
future::Future, future::Future,
io, io,
pin::pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::io::{AsyncRead, AsyncWrite, Interest}; use tokio::io::{AsyncRead, AsyncWrite, BufReader, Interest};
#[cfg(unix)] #[cfg(unix)]
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream}; pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
pub use tokio::{ pub use tokio::{
@ -115,14 +117,12 @@ pub mod net {
impl ActixStream for TcpStream { impl ActixStream for TcpStream {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> { fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
let ready = self.ready(Interest::READABLE); let ready = self.ready(Interest::READABLE);
tokio::pin!(ready); pin!(ready).poll(cx)
ready.poll(cx)
} }
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> { fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
let ready = self.ready(Interest::WRITABLE); let ready = self.ready(Interest::WRITABLE);
tokio::pin!(ready); pin!(ready).poll(cx)
ready.poll(cx)
} }
} }
@ -130,14 +130,12 @@ pub mod net {
impl ActixStream for UnixStream { impl ActixStream for UnixStream {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> { fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
let ready = self.ready(Interest::READABLE); let ready = self.ready(Interest::READABLE);
tokio::pin!(ready); pin!(ready).poll(cx)
ready.poll(cx)
} }
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> { fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
let ready = self.ready(Interest::WRITABLE); let ready = self.ready(Interest::WRITABLE);
tokio::pin!(ready); pin!(ready).poll(cx)
ready.poll(cx)
} }
} }
@ -150,6 +148,16 @@ pub mod net {
(**self).poll_write_ready(cx) (**self).poll_write_ready(cx)
} }
} }
impl<Io: ActixStream> ActixStream for BufReader<Io> {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
self.get_ref().poll_read_ready(cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
self.get_ref().poll_write_ready(cx)
}
}
} }
pub mod time { pub mod time {

View File

@ -1,5 +1,4 @@
use std::{future::Future, io}; use std::{sync::Arc, future::Future, io};
use tokio::task::{JoinHandle, LocalSet}; use tokio::task::{JoinHandle, LocalSet};
/// A Tokio-based runtime proxy. /// A Tokio-based runtime proxy.
@ -9,14 +8,14 @@ use tokio::task::{JoinHandle, LocalSet};
#[derive(Debug)] #[derive(Debug)]
pub struct Runtime { pub struct Runtime {
local: LocalSet, local: LocalSet,
rt: tokio::runtime::Runtime, rt: Arc<tokio::runtime::Runtime>,
} }
pub(crate) fn default_tokio_runtime() -> io::Result<tokio::runtime::Runtime> { pub(crate) fn default_tokio_runtime() -> io::Result<Arc<tokio::runtime::Runtime>> {
tokio::runtime::Builder::new_current_thread() tokio::runtime::Builder::new_current_thread()
.enable_io() .enable_io()
.enable_time() .enable_time()
.build() .build().map(Arc::new)
} }
impl Runtime { impl Runtime {
@ -141,6 +140,15 @@ impl Runtime {
impl From<tokio::runtime::Runtime> for Runtime { impl From<tokio::runtime::Runtime> for Runtime {
fn from(rt: tokio::runtime::Runtime) -> Self { fn from(rt: tokio::runtime::Runtime) -> Self {
Self {
local: LocalSet::new(),
rt: Arc::new(rt),
}
}
}
impl From<Arc<tokio::runtime::Runtime>> for Runtime {
fn from(rt: Arc<tokio::runtime::Runtime>) -> Self {
Self { Self {
local: LocalSet::new(), local: LocalSet::new(),
rt, rt,

View File

@ -5,9 +5,9 @@ use std::{
io, io,
pin::Pin, pin::Pin,
sync::atomic::{AtomicUsize, Ordering}, sync::atomic::{AtomicUsize, Ordering},
sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures_core::ready; use futures_core::ready;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
@ -48,7 +48,7 @@ impl System {
/// [tokio-runtime]: tokio::runtime::Runtime /// [tokio-runtime]: tokio::runtime::Runtime
pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
where where
F: FnOnce() -> tokio::runtime::Runtime, F: FnOnce() -> Arc<tokio::runtime::Runtime>,
{ {
let (stop_tx, stop_rx) = oneshot::channel(); let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel();
@ -87,7 +87,7 @@ impl System {
#[doc(hidden)] #[doc(hidden)]
pub fn with_tokio_rt<F>(_: F) -> SystemRunner pub fn with_tokio_rt<F>(_: F) -> SystemRunner
where where
F: FnOnce() -> tokio::runtime::Runtime, F: FnOnce() -> Arc<tokio::runtime::Runtime>,
{ {
unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet") unimplemented!("System::with_tokio_rt is not implemented for io-uring feature yet")
} }

View File

@ -8,7 +8,7 @@ use std::{
use actix_rt::{task::JoinError, Arbiter, System}; use actix_rt::{task::JoinError, Arbiter, System};
#[cfg(not(feature = "io-uring"))] #[cfg(not(feature = "io-uring"))]
use { use {
std::{sync::mpsc::channel, thread}, std::{sync::Arc, sync::mpsc::channel, thread},
tokio::sync::oneshot, tokio::sync::oneshot,
}; };
@ -252,6 +252,7 @@ fn new_system_with_tokio() {
.on_thread_start(|| {}) .on_thread_start(|| {})
.on_thread_stop(|| {}) .on_thread_stop(|| {})
.build() .build()
.map(Arc::new)
.unwrap() .unwrap()
}) })
.block_on(async { .block_on(async {
@ -284,6 +285,7 @@ fn new_arbiter_with_tokio() {
tokio::runtime::Builder::new_current_thread() tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build()
.map(Arc::new)
.unwrap() .unwrap()
}); });

View File

@ -425,6 +425,7 @@ impl ServerWorker {
.enable_all() .enable_all()
.max_blocking_threads(config.max_blocking_threads) .max_blocking_threads(config.max_blocking_threads)
.build() .build()
.map(Arc::new)
.unwrap() .unwrap()
}) })
}; };