Merge branch 'master' into master

This commit is contained in:
Rob Ede 2021-10-18 16:06:14 +01:00 committed by GitHub
commit aafaef1fc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 385 additions and 116 deletions

View File

@ -1,5 +1,22 @@
[alias] [alias]
chk = "check --workspace --all-features --tests --examples --bins"
lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo" lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo"
ci-test = "test --workspace --all-features --lib --tests --no-fail-fast -- --nocapture"
ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture" ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture"
# just check the library (without dev deps)
ci-check-min = "hack --workspace check --no-default-features"
ci-check-lib = "hack --workspace --feature-powerset --exclude-features io-uring check"
ci-check-lib-linux = "hack --workspace --feature-powerset check"
# check everything
ci-check = "hack --workspace --feature-powerset --exclude-features io-uring check --tests --examples"
ci-check-linux = "hack --workspace --feature-powerset check --tests --examples"
# tests avoiding io-uring feature
ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture"
ci-test-rt = " hack --feature-powerset --exclude-features io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server = "hack --feature-powerset --exclude-features io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
# test with io-uring feature
ci-test-rt-linux = " hack --feature-powerset test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server-linux = "hack --feature-powerset test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"

View File

@ -75,36 +75,47 @@ jobs:
command: install command: install
args: cargo-hack args: cargo-hack
- name: check minimal - name: check lib
if: >
matrix.target.os != 'ubuntu-latest'
&& matrix.target.triple != 'x86_64-pc-windows-gnu'
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with: { command: ci-check-lib }
command: hack - name: check lib
args: check --workspace --no-default-features if: matrix.target.os == 'ubuntu-latest'
- name: check minimal + tests
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with: { command: ci-check-lib-linux }
command: hack - name: check lib
args: check --workspace --no-default-features --tests --examples if: matrix.target.triple == 'x86_64-pc-windows-gnu'
- name: check default
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with: { command: ci-check-min }
command: check
args: --workspace --tests --examples
- name: check full - name: check full
# TODO: compile OpenSSL and run tests on MinGW # TODO: compile OpenSSL and run tests on MinGW
if: matrix.target.triple != 'x86_64-pc-windows-gnu' if: >
matrix.target.os != 'ubuntu-latest'
&& matrix.target.triple != 'x86_64-pc-windows-gnu'
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with: { command: ci-check }
command: check - name: check all
args: --workspace --all-features --tests --examples if: matrix.target.os == 'ubuntu-latest'
uses: actions-rs/cargo@v1
with: { command: ci-check-linux }
- name: tests - name: tests
if: matrix.target.triple != 'x86_64-pc-windows-gnu' if: >
uses: actions-rs/cargo@v1 matrix.target.os != 'ubuntu-latest'
with: { command: ci-test } && matrix.target.triple != 'x86_64-pc-windows-gnu'
run: |
cargo ci-test
cargo ci-test-rt
cargo ci-test-server
- name: tests
if: matrix.target.os == 'ubuntu-latest'
run: |
cargo ci-test
cargo ci-test-rt-linux
cargo ci-test-server-linux
- name: Generate coverage file - name: Generate coverage file
if: > if: >
@ -120,8 +131,7 @@ jobs:
&& matrix.version == 'stable' && matrix.version == 'stable'
&& github.ref == 'refs/heads/master' && github.ref == 'refs/heads/master'
uses: codecov/codecov-action@v1 uses: codecov/codecov-action@v1
with: with: { file: cobertura.xml }
file: cobertura.xml
- name: Clear the cargo caches - name: Clear the cargo caches
run: | run: |

View File

@ -20,5 +20,5 @@ futures-core = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false }
log = "0.4" log = "0.4"
pin-project-lite = "0.2" pin-project-lite = "0.2"
tokio = "1" tokio = "1.5.1"
tokio-util = { version = "0.6", features = ["codec", "io"] } tokio-util = { version = "0.6", features = ["codec", "io"] }

View File

@ -3,6 +3,13 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 0.2.2 - 2021-10-14
* Improve error recovery potential when macro input is invalid. [#391]
* Allow custom `System`s on test macro. [#391]
[#391]: https://github.com/actix/actix-net/pull/391
## 0.2.1 - 2021-02-02 ## 0.2.1 - 2021-02-02
* Add optional argument `system` to `main` macro which can be used to specify the path to `actix_rt::System` (useful for re-exports). [#363] * Add optional argument `system` to `main` macro which can be used to specify the path to `actix_rt::System` (useful for re-exports). [#363]

View File

@ -1,12 +1,12 @@
[package] [package]
name = "actix-macros" name = "actix-macros"
version = "0.2.1" version = "0.2.2"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"Ibraheem Ahmed <ibrah1440@gmail.com>", "Ibraheem Ahmed <ibrah1440@gmail.com>",
] ]
description = "Macros for Actix system and runtime" description = "Macros for Actix system and runtime"
repository = "https://github.com/actix/actix-net" repository = "https://github.com/actix/actix-net.git"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"

View File

@ -28,7 +28,12 @@ use quote::quote;
#[proc_macro_attribute] #[proc_macro_attribute]
#[cfg(not(test))] // Work around for rust-lang/rust#62127 #[cfg(not(test))] // Work around for rust-lang/rust#62127
pub fn main(args: TokenStream, item: TokenStream) -> TokenStream { pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
let mut input = syn::parse_macro_input!(item as syn::ItemFn); let mut input = match syn::parse::<syn::ItemFn>(item.clone()) {
Ok(input) => input,
// on parse err, make IDEs happy; see fn docs
Err(err) => return input_and_compile_error(item, err),
};
let args = syn::parse_macro_input!(args as syn::AttributeArgs); let args = syn::parse_macro_input!(args as syn::AttributeArgs);
let attrs = &input.attrs; let attrs = &input.attrs;
@ -101,8 +106,15 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
/// } /// }
/// ``` /// ```
#[proc_macro_attribute] #[proc_macro_attribute]
pub fn test(_: TokenStream, item: TokenStream) -> TokenStream { pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
let mut input = syn::parse_macro_input!(item as syn::ItemFn); let mut input = match syn::parse::<syn::ItemFn>(item.clone()) {
Ok(input) => input,
// on parse err, make IDEs happy; see fn docs
Err(err) => return input_and_compile_error(item, err),
};
let args = syn::parse_macro_input!(args as syn::AttributeArgs);
let attrs = &input.attrs; let attrs = &input.attrs;
let vis = &input.vis; let vis = &input.vis;
let sig = &mut input.sig; let sig = &mut input.sig;
@ -132,13 +144,59 @@ pub fn test(_: TokenStream, item: TokenStream) -> TokenStream {
quote!(#[test]) quote!(#[test])
}; };
let mut system = syn::parse_str::<syn::Path>("::actix_rt::System").unwrap();
for arg in &args {
match arg {
syn::NestedMeta::Meta(syn::Meta::NameValue(syn::MetaNameValue {
lit: syn::Lit::Str(lit),
path,
..
})) => match path
.get_ident()
.map(|i| i.to_string().to_lowercase())
.as_deref()
{
Some("system") => match lit.parse() {
Ok(path) => system = path,
Err(_) => {
return syn::Error::new_spanned(lit, "Expected path")
.to_compile_error()
.into();
}
},
_ => {
return syn::Error::new_spanned(arg, "Unknown attribute specified")
.to_compile_error()
.into();
}
},
_ => {
return syn::Error::new_spanned(arg, "Unknown attribute specified")
.to_compile_error()
.into();
}
}
}
(quote! { (quote! {
#missing_test_attr #missing_test_attr
#(#attrs)* #(#attrs)*
#vis #sig { #vis #sig {
actix_rt::System::new() <#system>::new().block_on(async { #body })
.block_on(async { #body })
} }
}) })
.into() .into()
} }
/// Converts the error to a token stream and appends it to the original input.
///
/// Returning the original input in addition to the error is good for IDEs which can gracefully
/// recover and show more precise errors within the macro body.
///
/// See <https://github.com/rust-analyzer/rust-analyzer/issues/10468> for more info.
fn input_and_compile_error(mut item: TokenStream, err: syn::Error) -> TokenStream {
let compile_err = TokenStream::from(err.to_compile_error());
item.extend(compile_err);
return item;
}

View File

@ -11,4 +11,7 @@ fn compile_macros() {
t.pass("tests/trybuild/test-01-basic.rs"); t.pass("tests/trybuild/test-01-basic.rs");
t.pass("tests/trybuild/test-02-keep-attrs.rs"); t.pass("tests/trybuild/test-02-keep-attrs.rs");
t.compile_fail("tests/trybuild/test-03-only-async.rs"); t.compile_fail("tests/trybuild/test-03-only-async.rs");
t.pass("tests/trybuild/test-04-system-path.rs");
t.compile_fail("tests/trybuild/test-05-system-expect-path.rs");
t.compile_fail("tests/trybuild/test-06-unknown-attr.rs");
} }

View File

@ -0,0 +1,10 @@
mod system {
pub use actix_rt::System as MySystem;
}
#[actix_rt::test(system = "system::MySystem")]
async fn my_test() {
futures_util::future::ready(()).await
}
fn main() {}

View File

@ -0,0 +1,4 @@
#[actix_rt::test(system = "!@#*&")]
async fn my_test() {}
fn main() {}

View File

@ -0,0 +1,5 @@
error: Expected path
--> $DIR/test-05-system-expect-path.rs:1:27
|
1 | #[actix_rt::test(system = "!@#*&")]
| ^^^^^^^

View File

@ -0,0 +1,7 @@
#[actix_rt::test(foo = "bar")]
async fn my_test_1() {}
#[actix_rt::test(bar::baz)]
async fn my_test_2() {}
fn main() {}

View File

@ -0,0 +1,11 @@
error: Unknown attribute specified
--> $DIR/test-06-unknown-attr.rs:1:18
|
1 | #[actix_rt::test(foo = "bar")]
| ^^^^^^^^^^^
error: Unknown attribute specified
--> $DIR/test-06-unknown-attr.rs:4:18
|
4 | #[actix_rt::test(bar::baz)]
| ^^^^^^^^

View File

@ -1,9 +1,14 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 2.3.0 - 2021-10-11
* The `spawn` method can now resolve with non-unit outputs. [#369] * The `spawn` method can now resolve with non-unit outputs. [#369]
* Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374]
[#369]: https://github.com/actix/actix-net/pull/369 [#369]: https://github.com/actix/actix-net/pull/369
[#374]: https://github.com/actix/actix-net/pull/374
## 2.2.0 - 2021-03-29 ## 2.2.0 - 2021-03-29

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-rt" name = "actix-rt"
version = "2.2.0" version = "2.3.0"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>", "Rob Ede <robjtede@icloud.com>",
@ -8,8 +8,7 @@ authors = [
description = "Tokio-based single-threaded async runtime for the Actix ecosystem" description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
keywords = ["async", "futures", "io", "runtime"] keywords = ["async", "futures", "io", "runtime"]
homepage = "https://actix.rs" homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net" repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-rt"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"
@ -21,13 +20,17 @@ path = "src/lib.rs"
[features] [features]
default = ["macros"] default = ["macros"]
macros = ["actix-macros"] macros = ["actix-macros"]
io-uring = ["tokio-uring"]
[dependencies] [dependencies]
actix-macros = { version = "0.2.0", optional = true } actix-macros = { version = "0.2.0", optional = true }
futures-core = { version = "0.3", default-features = false } futures-core = { version = "0.3", default-features = false }
tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } tokio = { version = "1.5.1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
[target.'cfg(target_os = "linux")'.dependencies]
tokio-uring = { version = "0.1", optional = true }
[dev-dependencies] [dev-dependencies]
tokio = { version = "1.2", features = ["full"] } tokio = { version = "1.5.1", features = ["full"] }
hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] } hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] }

View File

@ -3,11 +3,11 @@
> Tokio-based single-threaded async runtime for the Actix ecosystem. > Tokio-based single-threaded async runtime for the Actix ecosystem.
[![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt) [![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt)
[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.2.0)](https://docs.rs/actix-rt/2.2.0) [![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.3.0)](https://docs.rs/actix-rt/2.3.0)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html) [![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg) ![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg)
<br /> <br />
[![dependency status](https://deps.rs/crate/actix-rt/2.2.0/status.svg)](https://deps.rs/crate/actix-rt/2.2.0) [![dependency status](https://deps.rs/crate/actix-rt/2.3.0/status.svg)](https://deps.rs/crate/actix-rt/2.3.0)
![Download](https://img.shields.io/crates/d/actix-rt.svg) ![Download](https://img.shields.io/crates/d/actix-rt.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb) [![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb)

View File

@ -9,12 +9,9 @@ use std::{
}; };
use futures_core::ready; use futures_core::ready;
use tokio::{sync::mpsc, task::LocalSet}; use tokio::sync::mpsc;
use crate::{ use crate::system::{System, SystemCommand};
runtime::{default_tokio_runtime, Runtime},
system::{System, SystemCommand},
};
pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
@ -98,16 +95,19 @@ impl Arbiter {
/// ///
/// # Panics /// # Panics
/// Panics if a [System] is not registered on the current thread. /// Panics if a [System] is not registered on the current thread.
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub fn new() -> Arbiter { pub fn new() -> Arbiter {
Self::with_tokio_rt(|| { Self::with_tokio_rt(|| {
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.") crate::runtime::default_tokio_runtime()
.expect("Cannot create new Arbiter's Runtime.")
}) })
} }
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
/// ///
/// [tokio-runtime]: tokio::runtime::Runtime /// [tokio-runtime]: tokio::runtime::Runtime
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[doc(hidden)] #[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where where
@ -127,7 +127,7 @@ impl Arbiter {
.spawn({ .spawn({
let tx = tx.clone(); let tx = tx.clone();
move || { move || {
let rt = Runtime::from(runtime_factory()); let rt = crate::runtime::Runtime::from(runtime_factory());
let hnd = ArbiterHandle::new(tx); let hnd = ArbiterHandle::new(tx);
System::set_current(sys); System::set_current(sys);
@ -159,15 +159,67 @@ impl Arbiter {
Arbiter { tx, thread_handle } Arbiter { tx, thread_handle }
} }
/// Sets up an Arbiter runner in a new System using the provided runtime local task set. /// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle { ///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
let sys = System::current();
let system_id = sys.id();
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let (tx, rx) = mpsc::unbounded_channel();
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
let thread_handle = thread::Builder::new()
.name(name.clone())
.spawn({
let tx = tx.clone();
move || {
let hnd = ArbiterHandle::new(tx);
System::set_current(sys);
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
// register arbiter
let _ = System::current()
.tx()
.send(SystemCommand::RegisterArbiter(arb_id, hnd));
ready_tx.send(()).unwrap();
// run arbiter event processing loop
tokio_uring::start(ArbiterRunner { rx });
// deregister arbiter
let _ = System::current()
.tx()
.send(SystemCommand::DeregisterArbiter(arb_id));
}
})
.unwrap_or_else(|err| {
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
});
ready_rx.recv().unwrap();
Arbiter { tx, thread_handle }
}
/// Sets up an Arbiter runner in a new System using the environment's local set.
pub(crate) fn in_new_system() -> ArbiterHandle {
let (tx, rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::unbounded_channel();
let hnd = ArbiterHandle::new(tx); let hnd = ArbiterHandle::new(tx);
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone())); HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
local.spawn_local(ArbiterRunner { rx }); crate::spawn(ArbiterRunner { rx });
hnd hnd
} }

View File

@ -32,6 +32,10 @@
//! arbiter.stop(); //! arbiter.stop();
//! arbiter.join().unwrap(); //! arbiter.join().unwrap();
//! ``` //! ```
//!
//! # `io-uring` Support
//! There is experimental support for using io-uring with this crate by enabling the
//! `io-uring` feature. For now, it is semver exempt.
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style)]
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
@ -39,6 +43,9 @@
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
#[cfg(all(not(target_os = "linux"), feature = "io-uring"))]
compile_error!("io_uring is a linux only feature.");
use std::future::Future; use std::future::Future;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;

View File

@ -31,11 +31,6 @@ impl Runtime {
}) })
} }
/// Reference to local task set.
pub(crate) fn local_set(&self) -> &LocalSet {
&self.local
}
/// Offload a future onto the single-threaded runtime. /// Offload a future onto the single-threaded runtime.
/// ///
/// The returned join handle can be used to await the future's result. /// The returned join handle can be used to await the future's result.

View File

@ -54,7 +54,7 @@ impl System {
let (sys_tx, sys_rx) = mpsc::unbounded_channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::from(runtime_factory()); let rt = Runtime::from(runtime_factory());
let sys_arbiter = Arbiter::in_new_system(rt.local_set()); let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
let system = System::construct(sys_tx, sys_arbiter.clone()); let system = System::construct(sys_tx, sys_arbiter.clone());
system system

View File

@ -1,10 +1,6 @@
use std::{ use std::{
future::Future, future::Future,
sync::{ sync::mpsc::channel,
atomic::{AtomicBool, Ordering},
mpsc::channel,
Arc,
},
thread, thread,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -221,8 +217,8 @@ fn system_stop_stops_arbiters() {
System::current().stop(); System::current().stop();
sys.run().unwrap(); sys.run().unwrap();
// account for slightly slow thread de-spawns (only observed on windows) // account for slightly slow thread de-spawns
thread::sleep(Duration::from_millis(100)); thread::sleep(Duration::from_millis(500));
// arbiter should be dead and return false // arbiter should be dead and return false
assert!(!Arbiter::current().spawn_fn(|| {})); assert!(!Arbiter::current().spawn_fn(|| {}));
@ -231,6 +227,7 @@ fn system_stop_stops_arbiters() {
arb.join().unwrap(); arb.join().unwrap();
} }
#[cfg(not(feature = "io-uring"))]
#[test] #[test]
fn new_system_with_tokio() { fn new_system_with_tokio() {
let (tx, rx) = channel(); let (tx, rx) = channel();
@ -263,8 +260,14 @@ fn new_system_with_tokio() {
assert_eq!(rx.recv().unwrap(), 42); assert_eq!(rx.recv().unwrap(), 42);
} }
#[cfg(not(feature = "io-uring"))]
#[test] #[test]
fn new_arbiter_with_tokio() { fn new_arbiter_with_tokio() {
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
let _ = System::new(); let _ = System::new();
let arb = Arbiter::with_tokio_rt(|| { let arb = Arbiter::with_tokio_rt(|| {
@ -323,3 +326,32 @@ fn spawn_local() {
h(actix_rt::spawn(async { 1 })); h(actix_rt::spawn(async { 1 }));
}) })
} }
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[test]
fn tokio_uring_arbiter() {
let system = System::new();
let (tx, rx) = std::sync::mpsc::channel();
Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";
let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());
f.sync_all().await.unwrap();
f.close().await.unwrap();
std::fs::remove_file("test.txt").unwrap();
});
handle.await.unwrap();
tx.send(true).unwrap();
});
assert!(rx.recv().unwrap());
drop(system);
}

View File

@ -1,21 +1,25 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349]
* Remove `ServerBuilder::configure` [#349]
* Server no long listens to SIGHUP signal.
It actually did not take any action when receiving SIGHUP, the only thing SIGHUP did was to stop
the Server from receiving any future signal, because the `Signals` future stops on the first
signal received [#389]
## 2.0.0-beta.6 - 2021-10-11
* Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374]
* Server no long listens to `SIGHUP` signal. Previously, the received was not used but did block
subsequent exit signals from working. [#389]
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to
this change. [#349]
* Remove `ServerBuilder::configure` [#349]
[#374]: https://github.com/actix/actix-net/pull/374
[#349]: https://github.com/actix/actix-net/pull/349 [#349]: https://github.com/actix/actix-net/pull/349
[#389]: https://github.com/actix/actix-net/pull/389 [#389]: https://github.com/actix/actix-net/pull/389
## 2.0.0-beta.5 - 2021-04-20 ## 2.0.0-beta.5 - 2021-04-20
* Server shutdown would notify all workers to exit regardless if shutdown is graceful. * Server shutdown notifies all workers to exit regardless if shutdown is graceful. This causes all
This would make all worker shutdown immediately in force shutdown case. [#333] workers to shutdown immediately in force shutdown case. [#333]
[#333]: https://github.com/actix/actix-net/pull/333 [#333]: https://github.com/actix/actix-net/pull/333

View File

@ -1,13 +1,13 @@
[package] [package]
name = "actix-server" name = "actix-server"
version = "2.0.0-beta.5" version = "2.0.0-beta.6"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"fakeshadow <24548779@qq.com>", "fakeshadow <24548779@qq.com>",
] ]
description = "General purpose TCP server built for the Actix ecosystem" description = "General purpose TCP server built for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
repository = "https://github.com/actix/actix-net" repository = "https://github.com/actix/actix-net.git"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"
@ -18,6 +18,7 @@ path = "src/lib.rs"
[features] [features]
default = [] default = []
io-uring = ["actix-rt/io-uring"]
[dependencies] [dependencies]
actix-rt = { version = "2.0.0", default-features = false } actix-rt = { version = "2.0.0", default-features = false }
@ -28,13 +29,13 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc
log = "0.4" log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] } mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13" num_cpus = "1.13"
tokio = { version = "1.2", features = ["sync"] } tokio = { version = "1.5.1", features = ["sync"] }
[dev-dependencies] [dev-dependencies]
actix-codec = "0.4.0-beta.1" actix-codec = "0.4.0-beta.1"
actix-rt = "2.0.0" actix-rt = "2.0.0"
bytes = "1" bytes = "1"
env_logger = "0.8" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1", features = ["io-util"] } tokio = { version = "1.5.1", features = ["io-util"] }

View File

@ -312,23 +312,25 @@ impl ServerBuilder {
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match sig { match sig {
Signal::Int => { Signal::Int => {
info!("SIGINT received, exiting"); info!("SIGINT received, starting forced shutdown");
self.exit = true; self.exit = true;
self.handle_cmd(ServerCommand::Stop { self.handle_cmd(ServerCommand::Stop {
graceful: false, graceful: false,
completion: None, completion: None,
}) })
} }
Signal::Term => { Signal::Term => {
info!("SIGTERM received, stopping"); info!("SIGTERM received, starting graceful shutdown");
self.exit = true; self.exit = true;
self.handle_cmd(ServerCommand::Stop { self.handle_cmd(ServerCommand::Stop {
graceful: true, graceful: true,
completion: None, completion: None,
}) })
} }
Signal::Quit => { Signal::Quit => {
info!("SIGQUIT received, exiting"); info!("SIGQUIT received, starting forced shutdown");
self.exit = true; self.exit = true;
self.handle_cmd(ServerCommand::Stop { self.handle_cmd(ServerCommand::Stop {
graceful: false, graceful: false,
@ -359,12 +361,14 @@ impl ServerBuilder {
rt::spawn(async move { rt::spawn(async move {
if graceful { if graceful {
// wait for all workers to shut down
let _ = join_all(stop).await; let _ = join_all(stop).await;
} }
if let Some(tx) = completion { if let Some(tx) = completion {
let _ = tx.send(()); let _ = tx.send(());
} }
for tx in notify { for tx in notify {
let _ = tx.send(()); let _ = tx.send(());
} }

View File

@ -15,8 +15,8 @@ pub(crate) enum ServerCommand {
Pause(oneshot::Sender<()>), Pause(oneshot::Sender<()>),
Resume(oneshot::Sender<()>), Resume(oneshot::Sender<()>),
Signal(Signal), Signal(Signal),
/// Whether to try and shut down gracefully
Stop { Stop {
/// True if shut down should be graceful.
graceful: bool, graceful: bool,
completion: Option<oneshot::Sender<()>>, completion: Option<oneshot::Sender<()>>,
}, },
@ -24,6 +24,13 @@ pub(crate) enum ServerCommand {
Notify(oneshot::Sender<()>), Notify(oneshot::Sender<()>),
} }
/// Server handle.
///
/// # Shutdown Signals
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown.
///
/// A graceful shutdown will wait for all workers to stop first.
#[derive(Debug)] #[derive(Debug)]
pub struct Server( pub struct Server(
UnboundedSender<ServerCommand>, UnboundedSender<ServerCommand>,

View File

@ -4,27 +4,33 @@ use std::task::{Context, Poll};
use crate::server::Server; use crate::server::Server;
/// Different types of process signals /// Types of process signals.
#[allow(dead_code)] #[allow(dead_code)]
#[derive(PartialEq, Clone, Copy, Debug)] #[derive(PartialEq, Clone, Copy, Debug)]
pub(crate) enum Signal { pub(crate) enum Signal {
/// SIGINT /// `SIGINT`
Int, Int,
/// SIGTERM
/// `SIGTERM`
Term, Term,
/// SIGQUIT
/// `SIGQUIT`
Quit, Quit,
} }
/// Process signal listener.
pub(crate) struct Signals { pub(crate) struct Signals {
srv: Server, srv: Server,
#[cfg(not(unix))] #[cfg(not(unix))]
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>, signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>,
#[cfg(unix)] #[cfg(unix)]
signals: Vec<(Signal, actix_rt::signal::unix::Signal)>, signals: Vec<(Signal, actix_rt::signal::unix::Signal)>,
} }
impl Signals { impl Signals {
/// Spawns a signal listening future that is able to send commands to the `Server`.
pub(crate) fn start(srv: Server) { pub(crate) fn start(srv: Server) {
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
@ -33,6 +39,7 @@ impl Signals {
signals: Box::pin(actix_rt::signal::ctrl_c()), signals: Box::pin(actix_rt::signal::ctrl_c()),
}); });
} }
#[cfg(unix)] #[cfg(unix)]
{ {
use actix_rt::signal::unix; use actix_rt::signal::unix;
@ -76,6 +83,7 @@ impl Future for Signals {
} }
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
} }
#[cfg(unix)] #[cfg(unix)]
{ {
for (sig, fut) in self.signals.iter_mut() { for (sig, fut) in self.signals.iter_mut() {

View File

@ -5,13 +5,12 @@ use actix_rt::{net::TcpStream, System};
use crate::{Server, ServerBuilder, ServiceFactory}; use crate::{Server, ServerBuilder, ServiceFactory};
/// The `TestServer` type. /// A testing server.
/// ///
/// `TestServer` is very simple test server that simplify process of writing /// `TestServer` is very simple test server that simplify process of writing integration tests for
/// integration tests for actix-net applications. /// network applications.
/// ///
/// # Examples /// # Examples
///
/// ``` /// ```
/// use actix_service::fn_service; /// use actix_service::fn_service;
/// use actix_server::TestServer; /// use actix_server::TestServer;
@ -39,7 +38,7 @@ pub struct TestServerRuntime {
} }
impl TestServer { impl TestServer {
/// Start new server with server builder /// Start new server with server builder.
pub fn start<F>(mut factory: F) -> TestServerRuntime pub fn start<F>(mut factory: F) -> TestServerRuntime
where where
F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static, F: FnMut(ServerBuilder) -> ServerBuilder + Send + 'static,
@ -64,7 +63,7 @@ impl TestServer {
} }
} }
/// Start new test server with application factory /// Start new test server with application factory.
pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime { pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
@ -99,7 +98,7 @@ impl TestServer {
} }
} }
/// Get first available unused local address /// Get first available unused local address.
pub fn unused_addr() -> net::SocketAddr { pub fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = mio::net::TcpSocket::new_v4().unwrap(); let socket = mio::net::TcpSocket::new_v4().unwrap();
@ -111,27 +110,27 @@ impl TestServer {
} }
impl TestServerRuntime { impl TestServerRuntime {
/// Test server host /// Test server host.
pub fn host(&self) -> &str { pub fn host(&self) -> &str {
&self.host &self.host
} }
/// Test server port /// Test server port.
pub fn port(&self) -> u16 { pub fn port(&self) -> u16 {
self.port self.port
} }
/// Get test server address /// Get test server address.
pub fn addr(&self) -> net::SocketAddr { pub fn addr(&self) -> net::SocketAddr {
self.addr self.addr
} }
/// Stop http server /// Stop server.
fn stop(&mut self) { fn stop(&mut self) {
self.system.stop(); self.system.stop();
} }
/// Connect to server, return tokio TcpStream /// Connect to server, returning a Tokio `TcpStream`.
pub fn connect(&self) -> std::io::Result<TcpStream> { pub fn connect(&self) -> std::io::Result<TcpStream> {
TcpStream::from_std(net::TcpStream::connect(self.addr)?) TcpStream::from_std(net::TcpStream::connect(self.addr)?)
} }

View File

@ -280,14 +280,24 @@ impl ServerWorker {
let counter_clone = counter.clone(); let counter_clone = counter.clone();
// every worker runs in it's own arbiter. // every worker runs in it's own arbiter.
// use a custom tokio runtime builder to change the settings of runtime. // use a custom tokio runtime builder to change the settings of runtime.
Arbiter::with_tokio_rt(move || { #[cfg(all(target_os = "linux", feature = "io-uring"))]
let arbiter = {
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
let _ = config.max_blocking_threads;
Arbiter::new()
};
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
let arbiter = Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread() tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.max_blocking_threads(config.max_blocking_threads) .max_blocking_threads(config.max_blocking_threads)
.build() .build()
.unwrap() .unwrap()
}) });
.spawn(async move {
arbiter.spawn(async move {
let fut = factories let fut = factories
.iter() .iter()
.enumerate() .enumerate()
@ -419,13 +429,15 @@ struct Restart {
fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>, fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>,
} }
// Shutdown keep states necessary for server shutdown: /// State necessary for server shutdown.
// Sleep for interval check the shutdown progress.
// Instant for the start time of shutdown.
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
struct Shutdown { struct Shutdown {
// Interval for checking the shutdown progress.
timer: Pin<Box<Sleep>>, timer: Pin<Box<Sleep>>,
/// Start time of shutdown.
start_from: Instant, start_from: Instant,
/// Notify of the shutdown outcome (force/grace) to stop caller.
tx: oneshot::Sender<bool>, tx: oneshot::Sender<bool>,
} }
@ -511,23 +523,25 @@ impl Future for ServerWorker {
self.poll(cx) self.poll(cx)
} }
WorkerState::Shutdown(ref mut shutdown) => { WorkerState::Shutdown(ref mut shutdown) => {
// Wait for 1 second. // wait for 1 second
ready!(shutdown.timer.as_mut().poll(cx)); ready!(shutdown.timer.as_mut().poll(cx));
if this.counter.total() == 0 { if this.counter.total() == 0 {
// Graceful shutdown. // graceful shutdown
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(true); let _ = shutdown.tx.send(true);
} }
Poll::Ready(()) Poll::Ready(())
} else if shutdown.start_from.elapsed() >= this.shutdown_timeout { } else if shutdown.start_from.elapsed() >= this.shutdown_timeout {
// Timeout forceful shutdown. // timeout forceful shutdown
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
let _ = shutdown.tx.send(false); let _ = shutdown.tx.send(false);
} }
Poll::Ready(()) Poll::Ready(())
} else { } else {
// Reset timer and wait for 1 second. // reset timer and wait for 1 second
let time = Instant::now() + Duration::from_secs(1); let time = Instant::now() + Duration::from_secs(1);
shutdown.timer.as_mut().reset(time); shutdown.timer.as_mut().reset(time);
shutdown.timer.as_mut().poll(cx) shutdown.timer.as_mut().poll(cx)

View File

@ -3,6 +3,10 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 2.0.1 - 2021-10-11
* Documentation fix.
## 2.0.0 - 2021-04-16 ## 2.0.0 - 2021-04-16
* Removed pipeline and related structs/functions. [#335] * Removed pipeline and related structs/functions. [#335]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-service" name = "actix-service"
version = "2.0.0" version = "2.0.1"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>", "Rob Ede <robjtede@icloud.com>",
@ -8,7 +8,7 @@ authors = [
] ]
description = "Service trait and combinators for representing asynchronous request/response operations." description = "Service trait and combinators for representing asynchronous request/response operations."
keywords = ["network", "framework", "async", "futures", "service"] keywords = ["network", "framework", "async", "futures", "service"]
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous", "no-std"]
repository = "https://github.com/actix/actix-net" repository = "https://github.com/actix/actix-net"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"

View File

@ -3,10 +3,10 @@
> Service trait and combinators for representing asynchronous request/response operations. > Service trait and combinators for representing asynchronous request/response operations.
[![crates.io](https://img.shields.io/crates/v/actix-service?label=latest)](https://crates.io/crates/actix-service) [![crates.io](https://img.shields.io/crates/v/actix-service?label=latest)](https://crates.io/crates/actix-service)
[![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0)](https://docs.rs/actix-service/2.0.0) [![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.1)](https://docs.rs/actix-service/2.0.1)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html) [![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![License](https://img.shields.io/crates/l/actix-service.svg) ![License](https://img.shields.io/crates/l/actix-service.svg)
[![Dependency Status](https://deps.rs/crate/actix-service/2.0.0/status.svg)](https://deps.rs/crate/actix-service/2.0.0) [![Dependency Status](https://deps.rs/crate/actix-service/2.0.1/status.svg)](https://deps.rs/crate/actix-service/2.0.1)
![Download](https://img.shields.io/crates/d/actix-service.svg) ![Download](https://img.shields.io/crates/d/actix-service.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x) [![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)

View File

@ -64,7 +64,7 @@ tokio-native-tls = { version = "0.3", optional = true }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.2.0" actix-rt = "2.2.0"
actix-server = "2.0.0-beta.5" actix-server = "2.0.0-beta.6"
bytes = "1" bytes = "1"
env_logger = "0.8" env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }

View File

@ -5,6 +5,7 @@
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
#[allow(unused_extern_crates)]
extern crate tls_openssl as openssl; extern crate tls_openssl as openssl;
#[cfg(feature = "accept")] #[cfg(feature = "accept")]

View File

@ -24,4 +24,5 @@ serde = { version = "1.0", optional = true }
[dev-dependencies] [dev-dependencies]
serde_json = "1.0" serde_json = "1.0"
ahash = { version = "0.7", default-features = false } # TODO: remove when ahash MSRV is restored
ahash = { version = "=0.7.4", default-features = false }

View File

@ -18,4 +18,4 @@ futures-util = { version = "0.3.7", default-features = false }
local-waker = "0.1" local-waker = "0.1"
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["rt", "macros"] } tokio = { version = "1.5.1", features = ["rt", "macros"] }