Merge branch 'master' into less_logging

This commit is contained in:
Rob Ede 2021-10-22 15:23:53 +01:00 committed by GitHub
commit 742ea9596e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 366 additions and 153 deletions

View File

@ -5,18 +5,21 @@ ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocaptur
# just check the library (without dev deps)
ci-check-min = "hack --workspace check --no-default-features"
ci-check-lib = "hack --workspace --feature-powerset --exclude-features io-uring check"
ci-check-lib = "hack --workspace --feature-powerset --exclude-features=io-uring check"
ci-check-lib-linux = "hack --workspace --feature-powerset check"
# check everything
ci-check = "hack --workspace --feature-powerset --exclude-features io-uring check --tests --examples"
ci-check = "hack --workspace --feature-powerset --exclude-features=io-uring check --tests --examples"
ci-check-linux = "hack --workspace --feature-powerset check --tests --examples"
# tests avoiding io-uring feature
ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture"
ci-test-rt = " hack --feature-powerset --exclude-features io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server = "hack --feature-powerset --exclude-features io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
ci-test-rt = " hack --feature-powerset --exclude-features=io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server = "hack --feature-powerset --exclude-features=io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
# test with io-uring feature
ci-test-rt-linux = " hack --feature-powerset test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server-linux = "hack --feature-powerset test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
# test lower msrv
ci-test-lower-msrv = "hack --workspace --exclude=actix-server --exclude=actix-tls --feature-powerset test --lib --tests --no-fail-fast -- --nocapture"

View File

@ -18,7 +18,7 @@ jobs:
- { name: Windows (MinGW), os: windows-latest, triple: x86_64-pc-windows-gnu }
- { name: Windows (32-bit), os: windows-latest, triple: i686-pc-windows-msvc }
version:
- 1.46.0 # MSRV
- 1.52.0 # MSRV for -server and -tls
- stable
- nightly
@ -64,8 +64,7 @@ jobs:
# - name: Generate Cargo.lock
# uses: actions-rs/cargo@v1
# with:
# command: generate-lockfile
# with: { command: generate-lockfile }
# - name: Cache Dependencies
# uses: Swatinem/rust-cache@v1.2.0
@ -113,30 +112,69 @@ jobs:
- name: tests
if: matrix.target.os == 'ubuntu-latest'
run: |
cargo ci-test
cargo ci-test-rt-linux
cargo ci-test-server-linux
- name: Generate coverage file
if: >
matrix.target.os == 'ubuntu-latest'
&& matrix.version == 'stable'
&& github.ref == 'refs/heads/master'
run: |
cargo install cargo-tarpaulin
cargo tarpaulin --out Xml --verbose
- name: Upload to Codecov
if: >
matrix.target.os == 'ubuntu-latest'
&& matrix.version == 'stable'
&& github.ref == 'refs/heads/master'
uses: codecov/codecov-action@v1
with: { file: cobertura.xml }
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-rt-linux && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-server-linux"
- name: Clear the cargo caches
run: |
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
cargo-cache
build_and_test_lower_msrv:
name: Linux / 1.46 (lower MSRV)
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install 1.46.0 # MSRV for all but -server and -tls
uses: actions-rs/toolchain@v1
with:
toolchain: 1.46.0-x86_64-unknown-linux-gnu
profile: minimal
override: true
- name: Install cargo-hack
uses: actions-rs/cargo@v1
with:
command: install
args: cargo-hack
- name: tests
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=1.46 cargo ci-test-lower-msrv"
- name: Clear the cargo caches
run: |
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
cargo-cache
coverage:
name: coverage
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Rust (nightly)
uses: actions-rs/toolchain@v1
with:
toolchain: stable-x86_64-unknown-linux-gnu
profile: minimal
override: true
- name: Generate Cargo.lock
uses: actions-rs/cargo@v1
with: { command: generate-lockfile }
- name: Cache Dependencies
uses: Swatinem/rust-cache@v1.3.0
- name: Generate coverage file
if: github.ref == 'refs/heads/master'
run: |
cargo install cargo-tarpaulin
cargo tarpaulin --out Xml --verbose
- name: Upload to Codecov
if: github.ref == 'refs/heads/master'
uses: codecov/codecov-action@v1
with: { file: cobertura.xml }
rustdoc:
name: rustdoc

View File

@ -178,7 +178,7 @@ impl<T, U> Framed<T, U> {
U: Decoder,
{
loop {
let mut this = self.as_mut().project();
let this = self.as_mut().project();
// Repeatedly call `decode` or `decode_eof` as long as it is "readable". Readable is
// defined as not having returned `None`. If the upstream has returned EOF, and the
// decoder is no longer readable, it can be assumed that the decoder will never become
@ -186,7 +186,7 @@ impl<T, U> Framed<T, U> {
if this.flags.contains(Flags::READABLE) {
if this.flags.contains(Flags::EOF) {
match this.codec.decode_eof(&mut this.read_buf) {
match this.codec.decode_eof(this.read_buf) {
Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))),
Ok(None) => return Poll::Ready(None),
Err(e) => return Poll::Ready(Some(Err(e))),
@ -195,7 +195,7 @@ impl<T, U> Framed<T, U> {
log::trace!("attempting to decode a frame");
match this.codec.decode(&mut this.read_buf) {
match this.codec.decode(this.read_buf) {
Ok(Some(frame)) => {
log::trace!("frame decoded from buffer");
return Poll::Ready(Some(Ok(frame)));

View File

@ -3,6 +3,12 @@
## Unreleased - 2021-xx-xx
## 0.2.3 - 2021-10-19
* Fix test macro in presence of other imports named "test". [#399]
[#399]: https://github.com/actix/actix-net/pull/399
## 0.2.2 - 2021-10-14
* Improve error recovery potential when macro input is invalid. [#391]
* Allow custom `System`s on test macro. [#391]

View File

@ -1,9 +1,10 @@
[package]
name = "actix-macros"
version = "0.2.2"
version = "0.2.3"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Ibraheem Ahmed <ibrah1440@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
description = "Macros for Actix system and runtime"
repository = "https://github.com/actix/actix-net.git"
@ -22,4 +23,5 @@ syn = { version = "^1", features = ["full"] }
actix-rt = "2.0.0"
futures-util = { version = "0.3.7", default-features = false }
rustversion = "1"
trybuild = "1"

View File

@ -139,9 +139,9 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
sig.asyncness = None;
let missing_test_attr = if has_test_attr {
quote!()
quote! {}
} else {
quote!(#[test])
quote! { #[::core::prelude::v1::test] }
};
let mut system = syn::parse_str::<syn::Path>("::actix_rt::System").unwrap();
@ -198,5 +198,5 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
fn input_and_compile_error(mut item: TokenStream, err: syn::Error) -> TokenStream {
let compile_err = TokenStream::from(err.to_compile_error());
item.extend(compile_err);
return item;
item
}

View File

@ -1,3 +1,4 @@
#[rustversion::stable(1.46)] // MSRV
#[test]
fn compile_macros() {
let t = trybuild::TestCases::new();

View File

@ -23,7 +23,7 @@ macros = ["actix-macros"]
io-uring = ["tokio-uring"]
[dependencies]
actix-macros = { version = "0.2.0", optional = true }
actix-macros = { version = "0.2.3", optional = true }
futures-core = { version = "0.3", default-features = false }
tokio = { version = "1.5.1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }

View File

@ -15,7 +15,7 @@
//! blocking task thread-pool using [`task::spawn_blocking`].
//!
//! # Examples
//! ```
//! ```no_run
//! use std::sync::mpsc;
//! use actix_rt::{Arbiter, System};
//!

View File

@ -11,7 +11,7 @@ use std::{
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};
use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
use crate::{arbiter::ArbiterHandle, Arbiter};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@ -29,6 +29,7 @@ pub struct System {
arbiter_handle: ArbiterHandle,
}
#[cfg(not(feature = "io-uring"))]
impl System {
/// Create a new system.
///
@ -37,7 +38,7 @@ impl System {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
Self::with_tokio_rt(|| {
default_tokio_runtime()
crate::runtime::default_tokio_runtime()
.expect("Default Actix (Tokio) runtime could not be created.")
})
}
@ -53,7 +54,7 @@ impl System {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::from(runtime_factory());
let rt = crate::runtime::Runtime::from(runtime_factory());
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
let system = System::construct(sys_tx, sys_arbiter.clone());
@ -72,7 +73,32 @@ impl System {
system,
}
}
}
#[cfg(feature = "io-uring")]
impl System {
/// Create a new system.
///
/// # Panics
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
SystemRunner
}
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
where
F: Fn() -> tokio::runtime::Runtime,
{
unimplemented!("System::with_tokio_rt is not implemented yet")
}
}
impl System {
/// Constructs new system and registers it on the current thread.
pub(crate) fn construct(
sys_tx: mpsc::UnboundedSender<SystemCommand>,
@ -149,15 +175,18 @@ impl System {
}
}
#[cfg(not(feature = "io-uring"))]
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner {
rt: Runtime,
rt: crate::runtime::Runtime,
stop_rx: oneshot::Receiver<i32>,
#[allow(dead_code)]
system: System,
}
#[cfg(not(feature = "io-uring"))]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
@ -187,6 +216,45 @@ impl SystemRunner {
}
}
#[cfg(feature = "io-uring")]
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner;
#[cfg(feature = "io-uring")]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
unimplemented!("SystemRunner::run is not implemented yet")
}
/// Runs the provided future, blocking the current thread until the future completes.
#[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
tokio_uring::start(async move {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let sys_arbiter = Arbiter::in_new_system();
let system = System::construct(sys_tx, sys_arbiter.clone());
system
.tx()
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
.unwrap();
// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
tokio_uring::spawn(sys_ctrl);
let res = fut.await;
drop(stop_rx);
res
})
}
}
#[derive(Debug)]
pub(crate) enum SystemCommand {
Exit(i32),

View File

@ -0,0 +1,17 @@
//! Checks that test macro does not cause problems in the presence of imports named "test" that
//! could be either a module with test items or the "test with runtime" macro itself.
//!
//! Before actix/actix-net#399 was implemented, this macro was running twice. The first run output
//! `#[test]` and it got run again and since it was in scope.
//!
//! Prevented by using the fully-qualified test marker (`#[::core::prelude::v1::test]`).
#![cfg(feature = "macros")]
use actix_rt::time as test;
#[actix_rt::test]
async fn test_naming_conflict() {
use test as time;
time::sleep(std::time::Duration::from_millis(2)).await;
}

View File

@ -1,12 +1,15 @@
use std::{
future::Future,
sync::mpsc::channel,
thread,
time::{Duration, Instant},
};
use actix_rt::{task::JoinError, Arbiter, System};
use tokio::sync::oneshot;
#[cfg(not(feature = "io-uring"))]
use {
std::{sync::mpsc::channel, thread},
tokio::sync::oneshot,
};
#[test]
fn await_for_timer() {
@ -103,6 +106,10 @@ fn wait_for_spawns() {
assert!(rt.block_on(handle).is_err());
}
// Temporary disabled tests for io-uring feature.
// They should be enabled when possible.
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_spawn_fn_runs() {
let _ = System::new();
@ -119,6 +126,7 @@ fn arbiter_spawn_fn_runs() {
arbiter.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_handle_spawn_fn_runs() {
let sys = System::new();
@ -141,6 +149,7 @@ fn arbiter_handle_spawn_fn_runs() {
sys.run().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_drop_no_panic_fn() {
let _ = System::new();
@ -152,6 +161,7 @@ fn arbiter_drop_no_panic_fn() {
arbiter.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_drop_no_panic_fut() {
let _ = System::new();
@ -163,18 +173,7 @@ fn arbiter_drop_no_panic_fut() {
arbiter.join().unwrap();
}
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn system_arbiter_spawn() {
let runner = System::new();
@ -205,6 +204,7 @@ fn system_arbiter_spawn() {
thread.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn system_stop_stops_arbiters() {
let sys = System::new();
@ -293,6 +293,18 @@ fn new_arbiter_with_tokio() {
assert!(!counter.load(Ordering::SeqCst));
}
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}
#[test]
fn try_current_no_system() {
assert!(System::try_current().is_none())
@ -330,28 +342,27 @@ fn spawn_local() {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[test]
fn tokio_uring_arbiter() {
let system = System::new();
let (tx, rx) = std::sync::mpsc::channel();
System::new().block_on(async {
let (tx, rx) = std::sync::mpsc::channel();
Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";
Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";
let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());
let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());
f.sync_all().await.unwrap();
f.close().await.unwrap();
f.sync_all().await.unwrap();
f.close().await.unwrap();
std::fs::remove_file("test.txt").unwrap();
std::fs::remove_file("test.txt").unwrap();
});
handle.await.unwrap();
tx.send(true).unwrap();
});
handle.await.unwrap();
tx.send(true).unwrap();
});
assert!(rx.recv().unwrap());
drop(system);
assert!(rx.recv().unwrap());
})
}

View File

@ -1,6 +1,7 @@
# Changes
## Unreleased - 2021-xx-xx
* Minimum supported Rust version (MSRV) is now 1.52.
* Reduce the log noise. [#394]

View File

@ -32,7 +32,7 @@ num_cpus = "1.13"
tokio = { version = "1.5.1", features = ["sync"] }
[dev-dependencies]
actix-codec = "0.4.0-beta.1"
actix-codec = "0.4.0"
actix-rt = "2.0.0"
bytes = "1"

View File

@ -81,14 +81,9 @@ struct Accept {
}
/// Array of u128 with every bit as marker for a worker handle's availability.
#[derive(Debug, Default)]
struct Availability([u128; 4]);
impl Default for Availability {
fn default() -> Self {
Self([0; 4])
}
}
impl Availability {
/// Check if any worker handle is available
#[inline(always)]

View File

@ -181,6 +181,7 @@ impl WorkerHandleAccept {
/// Held by [ServerBuilder](crate::builder::ServerBuilder).
#[derive(Debug)]
pub(crate) struct WorkerHandleServer {
#[allow(dead_code)]
idx: usize,
tx: UnboundedSender<Stop>,
}

View File

@ -5,8 +5,6 @@ use std::{net, thread, time::Duration};
use actix_rt::{net::TcpStream, time::sleep};
use actix_server::Server;
use actix_service::fn_service;
use actix_utils::future::ok;
use futures_util::future::lazy;
fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
@ -23,25 +21,28 @@ fn test_bind() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| {
Server::build()
actix_rt::System::new().block_on(async {
let srv = Server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()
.run()
}));
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run();
let _ = tx.send((srv.clone(), actix_rt::System::current()));
srv.await
})
});
let (_, sys) = rx.recv().unwrap();
let (srv, sys) = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
sys.stop();
let _ = h.join();
h.join().unwrap().unwrap();
}
#[test]
@ -50,25 +51,30 @@ fn test_listen() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let sys = actix_rt::System::new();
let lst = net::TcpListener::bind(addr).unwrap();
sys.block_on(async {
Server::build()
let lst = net::TcpListener::bind(addr)?;
actix_rt::System::new().block_on(async {
let srv = Server::build()
.disable_signals()
.workers(1)
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()
.listen("test", lst, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
let _ = tx.send(actix_rt::System::current());
});
let _ = sys.run();
let _ = tx.send((srv.clone(), actix_rt::System::current()));
srv.await
})
});
let sys = rx.recv().unwrap();
let (srv, sys) = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
sys.stop();
let _ = h.join();
h.join().unwrap().unwrap();
}
#[test]
@ -84,9 +90,8 @@ fn test_start() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| {
Server::build()
actix_rt::System::new().block_on(async {
let srv = Server::build()
.backlog(100)
.disable_signals()
.bind("test", addr, move || {
@ -95,13 +100,13 @@ fn test_start() {
f.send(Bytes::from_static(b"test")).await.unwrap();
Ok::<_, ()>(())
})
})
.unwrap()
.run()
}));
})?
.run();
let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run();
let _ = tx.send((srv.clone(), actix_rt::System::current()));
srv.await
})
});
let (srv, sys) = rx.recv().unwrap();
@ -134,12 +139,11 @@ fn test_start() {
// stop
let _ = srv.stop(false);
thread::sleep(Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err());
thread::sleep(Duration::from_millis(100));
sys.stop();
let _ = h.join();
h.join().unwrap().unwrap();
thread::sleep(Duration::from_secs(1));
assert!(net::TcpStream::connect(addr).is_err());
}
#[actix_rt::test]
@ -209,9 +213,8 @@ async fn test_max_concurrent_connections() {
}
srv.stop(false).await;
sys.stop();
let _ = h.join().unwrap();
h.join().unwrap().unwrap();
}
#[actix_rt::test]
@ -266,16 +269,14 @@ async fn test_service_restart() {
let num = num.clone();
async move { Ok::<_, ()>(TestService(num)) }
})
})
.unwrap()
})?
.bind("addr2", addr2, move || {
let num2 = num2.clone();
fn_factory(move || {
let num2 = num2.clone();
async move { Ok::<_, ()>(TestService(num2)) }
})
})
.unwrap()
})?
.workers(1)
.run();
@ -306,9 +307,9 @@ async fn test_service_restart() {
assert!(num_clone.load(Ordering::SeqCst) > 5);
assert!(num2_clone.load(Ordering::SeqCst) > 5);
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
sys.stop();
h.join().unwrap().unwrap();
}
#[ignore]
@ -380,12 +381,12 @@ async fn worker_restart() {
actix_rt::System::new().block_on(async {
let server = Server::build()
.disable_signals()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))
.unwrap()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))?
.workers(2)
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
})
});
@ -447,7 +448,7 @@ async fn worker_restart() {
assert_eq!("3", id);
stream.shutdown().await.unwrap();
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
sys.stop();
h.join().unwrap().unwrap();
}

View File

@ -0,0 +1,33 @@
use std::{future::Future, sync::mpsc, time::Duration};
async fn oracle<F, Fut>(f: F) -> (u32, u32)
where
F: FnOnce() -> Fut + Clone + Send + 'static,
Fut: Future<Output = u32> + 'static,
{
let f1 = actix_rt::spawn(f.clone()());
let f2 = actix_rt::spawn(f());
(f1.await.unwrap(), f2.await.unwrap())
}
#[actix_rt::main]
async fn main() {
let (tx, rx) = mpsc::channel();
let (r1, r2) = oracle({
let tx = tx.clone();
|| async move {
tx.send(()).unwrap();
4 * 4
}
})
.await;
assert_eq!(r1, r2);
tx.send(()).unwrap();
rx.recv_timeout(Duration::from_millis(100)).unwrap();
rx.recv_timeout(Duration::from_millis(100)).unwrap();
}

View File

@ -52,9 +52,9 @@ pub fn fn_factory<F, Cfg, Srv, Req, Fut, Err>(
f: F,
) -> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
where
Srv: Service<Req>,
F: Fn() -> Fut,
Fut: Future<Output = Result<Srv, Err>>,
Srv: Service<Req>,
{
FnServiceNoConfig::new(f)
}

View File

@ -3,14 +3,29 @@
## Unreleased - 2021-xx-xx
## 3.0.0-beta.7 - 2021-10-20
* Add `webpki_roots_cert_store()` to get rustls compatible webpki roots cert store. [#401]
* Alias `connect::ssl` to `connect::tls`. [#401]
[#401]: https://github.com/actix/actix-net/pull/401
## 3.0.0-beta.6 - 2021-10-19
* Update `tokio-rustls` to `0.23` which uses `rustls` `0.20`. [#396]
* Removed a re-export of `Session` from `rustls` as it no longer exist. [#396]
* Minimum supported Rust version (MSRV) is now 1.52.
[#396]: https://github.com/actix/actix-net/pull/396
## 3.0.0-beta.5 - 2021-03-29
* Changed `connect::ssl::rustls::RustlsConnectorService` to return error when `DNSNameRef`
* Changed `connect::ssl::rustls::RustlsConnectorService` to return error when `DNSNameRef`
generation failed instead of panic. [#296]
* Remove `connect::ssl::openssl::OpensslConnectServiceFactory`. [#297]
* Remove `connect::ssl::openssl::OpensslConnectService`. [#297]
* Add `connect::ssl::native_tls` module for native tls support. [#295]
* Rename `accept::{nativetls => native_tls}`. [#295]
* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use
* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use
`connect::ConnectService` instead and call `Connection<T, TcpStream>::into_parts`. [#299]
[#295]: https://github.com/actix/actix-net/pull/295

View File

@ -1,12 +1,10 @@
[package]
name = "actix-tls"
version = "3.0.0-beta.5"
version = "3.0.0-beta.7"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "TLS acceptor and connector services for Actix ecosystem"
keywords = ["network", "tls", "ssl", "async", "transport"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-tls"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@ -40,7 +38,7 @@ native-tls = ["tokio-native-tls"]
uri = ["http"]
[dependencies]
actix-codec = "0.4.0-beta.1"
actix-codec = "0.4.0"
actix-rt = { version = "2.2.0", default-features = false }
actix-service = "2.0.0"
actix-utils = "3.0.0"
@ -56,8 +54,8 @@ tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
tokio-openssl = { version = "0.6", optional = true }
# rustls
tokio-rustls = { version = "0.22", optional = true }
webpki-roots = { version = "0.21", optional = true }
tokio-rustls = { version = "0.23", optional = true }
webpki-roots = { version = "0.22", optional = true }
# native-tls
tokio-native-tls = { version = "0.3", optional = true }
@ -66,9 +64,10 @@ tokio-native-tls = { version = "0.3", optional = true }
actix-rt = "2.2.0"
actix-server = "2.0.0-beta.6"
bytes = "1"
env_logger = "0.8"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
log = "0.4"
rustls-pemfile = "0.2.1"
trust-dns-resolver = "0.20.0"
[[example]]

View File

@ -35,25 +35,29 @@ use actix_service::ServiceFactoryExt as _;
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
use futures_util::future::ok;
use log::info;
use rustls::{
internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig,
};
use rustls::{server::ServerConfig, Certificate, PrivateKey};
use rustls_pemfile::{certs, rsa_private_keys};
#[actix_rt::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "info");
env_logger::init();
let mut tls_config = ServerConfig::new(NoClientAuth::new());
// Load TLS key and cert files
let cert_file = &mut BufReader::new(File::open("./examples/cert.pem").unwrap());
let key_file = &mut BufReader::new(File::open("./examples/key.pem").unwrap());
let cert_chain = certs(cert_file).unwrap();
let cert_chain = certs(cert_file)
.unwrap()
.into_iter()
.map(Certificate)
.collect();
let mut keys = rsa_private_keys(key_file).unwrap();
tls_config
.set_single_cert(cert_chain, keys.remove(0))
let tls_config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(cert_chain, PrivateKey(keys.remove(0)))
.unwrap();
let tls_acceptor = RustlsAcceptor::new(tls_config);

View File

@ -14,7 +14,7 @@ use actix_utils::counter::{Counter, CounterGuard};
use futures_core::future::LocalBoxFuture;
use tokio_rustls::{Accept, TlsAcceptor};
pub use tokio_rustls::rustls::{ServerConfig, Session};
pub use tokio_rustls::rustls::ServerConfig;
use super::MAX_CONN_COUNTER;

View File

@ -21,7 +21,9 @@ mod connector;
mod error;
mod resolve;
mod service;
pub mod ssl;
pub mod tls;
#[doc(hidden)]
pub use tls as ssl;
#[cfg(feature = "uri")]
mod uri;

View File

@ -1,4 +1,4 @@
//! SSL Services
//! TLS Services
#[cfg(feature = "openssl")]
pub mod openssl;

View File

@ -1,4 +1,5 @@
use std::{
convert::TryFrom,
future::Future,
io,
pin::Pin,
@ -6,7 +7,6 @@ use std::{
task::{Context, Poll},
};
pub use tokio_rustls::rustls::Session;
pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig};
pub use webpki_roots::TLS_SERVER_ROOTS;
@ -14,11 +14,26 @@ use actix_rt::net::ActixStream;
use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready};
use log::trace;
use tokio_rustls::webpki::DNSNameRef;
use tokio_rustls::rustls::{client::ServerName, OwnedTrustAnchor, RootCertStore};
use tokio_rustls::{Connect, TlsConnector};
use crate::connect::{Address, Connection};
/// Returns standard root certificates from `webpki-roots` crate as a rustls certificate store.
pub fn webpki_roots_cert_store() -> RootCertStore {
let mut root_certs = RootCertStore::empty();
for cert in TLS_SERVER_ROOTS.0 {
let cert = OwnedTrustAnchor::from_subject_spki_name_constraints(
cert.subject,
cert.spki,
cert.name_constraints,
);
let certs = vec![cert].into_iter();
root_certs.add_server_trust_anchors(certs);
}
root_certs
}
/// Rustls connector factory
pub struct RustlsConnector {
connector: Arc<ClientConfig>,
@ -89,7 +104,7 @@ where
trace!("SSL Handshake start for: {:?}", connection.host());
let (stream, connection) = connection.replace_io(());
match DNSNameRef::try_from_ascii_str(connection.host()) {
match ServerName::try_from(connection.host()) {
Ok(host) => RustlsConnectorServiceFuture::Future {
connect: TlsConnector::from(self.connector.clone()).connect(host, stream),
connection: Some(connection),

1
clippy.toml Normal file
View File

@ -0,0 +1 @@
msrv = "1.48"