merge master into mio-0.7.3

This commit is contained in:
fakeshadow 2020-12-30 11:32:20 +08:00
commit 358f81f34e
39 changed files with 467 additions and 715 deletions

View File

@ -1,12 +1,10 @@
[workspace] [workspace]
members = [ members = [
"actix-codec", "actix-codec",
"actix-connect",
"actix-rt", "actix-rt",
"actix-macros", "actix-macros",
"actix-service", "actix-service",
"actix-server", "actix-server",
"actix-testing",
"actix-threadpool", "actix-threadpool",
"actix-tls", "actix-tls",
"actix-tracing", "actix-tracing",
@ -16,21 +14,14 @@ members = [
] ]
[patch.crates-io] [patch.crates-io]
actix-codec = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } actix-codec = { path = "actix-codec" }
actix-connect = { path = "actix-connect" } actix-rt = { path = "actix-rt" }
actix-rt = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" }
actix-macros = { path = "actix-macros" } actix-macros = { path = "actix-macros" }
actix-server = { path = "actix-server" } actix-server = { path = "actix-server" }
actix-service = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } actix-service = { path = "actix-service" }
actix-testing = { path = "actix-testing" }
actix-threadpool = { path = "actix-threadpool" } actix-threadpool = { path = "actix-threadpool" }
actix-tls = { path = "actix-tls" } actix-tls = { path = "actix-tls" }
actix-tracing = { path = "actix-tracing" } actix-tracing = { path = "actix-tracing" }
actix-utils = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } actix-utils = { path = "actix-utils" }
actix-router = { path = "router" } actix-router = { path = "router" }
bytestring = { path = "string" } bytestring = { path = "string" }
#FIXME: remove override
#http = { git = "https://github.com/fakeshadow/http.git" }
trust-dns-proto = { git = "https://github.com/messense/trust-dns.git", branch = "tokio-1" }
trust-dns-resolver = { git = "https://github.com/messense/trust-dns.git", branch = "tokio-1" }

View File

@ -1,156 +0,0 @@
# Changes
## Unreleased - 2020-xx-xx
* Update `tokio-openssl` to `0.6.0`
* Update `tokio-rustls` to `0.22`
* Update `rustls` to `0.19.0`
## 2.0.0 - 2020-09-02
- No significant changes from `2.0.0-alpha.4`.
## 2.0.0-alpha.4 - 2020-08-17
### Changed
* Update `rustls` dependency to 0.18
* Update `tokio-rustls` dependency to 0.14
## [2.0.0-alpha.3] - 2020-05-08
### Fixed
* Corrected spelling of `ConnectError::Unresolverd` to `ConnectError::Unresolved`
## [2.0.0-alpha.2] - 2020-03-08
### Changed
* Update `trust-dns-proto` dependency to 0.19. [#116]
* Update `trust-dns-resolver` dependency to 0.19. [#116]
* `Address` trait is now required to have static lifetime. [#116]
* `start_resolver` and `start_default_resolver` are now `async` and may return a `ConnectError`. [#116]
[#116]: https://github.com/actix/actix-net/pull/116
## [2.0.0-alpha.1] - 2020-03-03
### Changed
* Update `rustls` dependency to 0.17
* Update `tokio-rustls` dependency to 0.13
## [1.0.2] - 2020-01-15
* Fix actix-service 1.0.3 compatibility
## [1.0.1] - 2019-12-15
* Fix trust-dns-resolver compilation
## [1.0.0] - 2019-12-11
* Release
## [1.0.0-alpha.3] - 2019-12-07
### Changed
* Migrate to tokio 0.2
## [1.0.0-alpha.2] - 2019-12-02
### Changed
* Migrated to `std::future`
## [0.3.0] - 2019-10-03
### Changed
* Update `rustls` to 0.16
* Minimum required Rust version upped to 1.37.0
## [0.2.5] - 2019-09-05
* Add `TcpConnectService`
## [0.2.4] - 2019-09-02
* Use arbiter's storage for default async resolver
## [0.2.3] - 2019-08-05
* Add `ConnectService` and `OpensslConnectService`
## [0.2.2] - 2019-07-24
* Add `rustls` support
## [0.2.1] - 2019-07-17
### Added
* Expose Connect addrs #30
### Changed
* Update `derive_more` to 0.15
## [0.2.0] - 2019-05-12
### Changed
* Upgrade to actix-service 0.4
## [0.1.5] - 2019-04-19
### Added
* `Connect::set_addr()`
### Changed
* Use trust-dns-resolver 0.11.0
## [0.1.4] - 2019-04-12
### Changed
* Do not start default resolver immediately for default connector.
## [0.1.3] - 2019-04-11
### Changed
* Start trust-dns default resolver on first use
## [0.1.2] - 2019-04-04
### Added
* Log error if dns system config could not be loaded.
### Changed
* Rename connect Connector to TcpConnector #10
## [0.1.1] - 2019-03-15
### Fixed
* Fix error handling for single address
## [0.1.0] - 2019-03-14
* Refactor resolver and connector services
* Rename crate

View File

@ -1,61 +0,0 @@
[package]
name = "actix-connect"
version = "2.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "TCP connector service for Actix ecosystem."
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-connect/"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
[package.metadata.docs.rs]
features = ["openssl", "rustls", "uri"]
[lib]
name = "actix_connect"
path = "src/lib.rs"
[features]
default = ["uri"]
# openssl
openssl = ["open-ssl", "tokio-openssl"]
# rustls
rustls = ["rust-tls", "tokio-rustls", "webpki"]
# support http::Uri as connect address
uri = ["http"]
[dependencies]
actix-service = "1.0.6"
actix-codec = "0.3.0"
actix-utils = "2.0.0"
actix-rt = "1.1.1"
derive_more = "0.99.2"
either = "1.5.3"
futures-util = { version = "0.3.7", default-features = false }
# FIXME: update to 0.3
http = { version = "0.2.2", optional = true }
log = "0.4"
# FIXME: Use release version
trust-dns-proto = "0.20.0-alpha.3"
trust-dns-resolver = "0.20.0-alpha.3"
# openssl
open-ssl = { package = "openssl", version = "0.10", optional = true }
tokio-openssl = { version = "0.6.0", optional = true }
# rustls
rust-tls = { package = "rustls", version = "0.19.0", optional = true }
tokio-rustls = { version = "0.22.0", optional = true }
webpki = { version = "0.21", optional = true }
[dev-dependencies]
bytes = "1"
actix-testing = "1.0.0"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }

View File

@ -1 +0,0 @@
../LICENSE-APACHE

View File

@ -1 +0,0 @@
../LICENSE-MIT

View File

@ -1,6 +1,7 @@
# Changes # Changes
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
<<<<<<< HEAD
* Update `mio` dependency to 0.7.3. * Update `mio` dependency to 0.7.3.
* Remove `socket2` dependency. * Remove `socket2` dependency.
* `ServerBuilder::backlog` would accept `u32` instead of `i32`. * `ServerBuilder::backlog` would accept `u32` instead of `i32`.
@ -8,11 +9,24 @@
* Remove `AcceptNotify` type and pass `WakerQueue` to `Worker` for wake up the `Accept`'s `Poll`. * Remove `AcceptNotify` type and pass `WakerQueue` to `Worker` for wake up the `Accept`'s `Poll`.
* Convert `mio::net::TcpStream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`FromRawSocket` and `IntoRawSocket` on windows). * Convert `mio::net::TcpStream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`FromRawSocket` and `IntoRawSocket` on windows).
* Remove `AsyncRead` and `AsyncWrite` trait bound for `socket::FromStream` trait. * Remove `AsyncRead` and `AsyncWrite` trait bound for `socket::FromStream` trait.
=======
* Merge `actix-testing` to `actix-server` as `test_server` mod.
## 2.0.0-beta.1 - 2020-12-28
>>>>>>> upstream/master
* Added explicit info log message on accept queue pause. [#215] * Added explicit info log message on accept queue pause. [#215]
* Prevent double registration of sockets when back-pressure is resolved. [#223] * Prevent double registration of sockets when back-pressure is resolved. [#223]
* Update `mio` dependency to `0.7.3`. [#239]
* Remove `socket2` dependency. [#239]
* `ServerBuilder::backlog` now accepts `u32` instead of `i32`. [#239]
* Remove `AcceptNotify` type and pass `WakerQueue` to `Worker` to wake up `Accept`'s `Poll`. [#239]
* Convert `mio::net::TcpStream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using
`FromRawFd` and `IntoRawFd`(`FromRawSocket` and `IntoRawSocket` on windows). [#239]
* Remove `AsyncRead` and `AsyncWrite` trait bound for `socket::FromStream` trait. [#239]
[#215]: https://github.com/actix/actix-net/pull/215 [#215]: https://github.com/actix/actix-net/pull/215
[#223]: https://github.com/actix/actix-net/pull/223 [#223]: https://github.com/actix/actix-net/pull/223
[#239]: https://github.com/actix/actix-net/pull/239
[#215]: https://github.com/actix/actix-net/pull/215 [#215]: https://github.com/actix/actix-net/pull/215

View File

@ -1,7 +1,10 @@
[package] [package]
name = "actix-server" name = "actix-server"
version = "1.0.4" version = "2.0.0-beta.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"fakeshadow <24548779@qq.com>",
]
description = "General purpose TCP server built for the Actix ecosystem" description = "General purpose TCP server built for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs" homepage = "https://actix.rs"
@ -20,22 +23,20 @@ path = "src/lib.rs"
default = [] default = []
[dependencies] [dependencies]
actix-service = "1.0.6" actix-codec = "0.4.0-beta.1"
actix-rt = "1.1.1" actix-rt = "2.0.0-beta.1"
actix-codec = "0.3.0" actix-service = "2.0.0-beta.1"
actix-utils = "2.0.0" actix-utils = "3.0.0-beta.1"
concurrent-queue = "1.2.2"
futures-core = { version = "0.3.7", default-features = false } futures-core = { version = "0.3.7", default-features = false }
log = "0.4" log = "0.4"
mio = { version = "0.7.3", features = [ "os-poll", "tcp", "uds"] } mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13" num_cpus = "1.13"
slab = "0.4" slab = "0.4"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }
[dev-dependencies] [dev-dependencies]
actix-testing = "1.0.0"
bytes = "1" bytes = "1"
env_logger = "0.7" env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1", features = ["io-util"] } tokio = { version = "1", features = ["io-util"] }

View File

@ -9,7 +9,7 @@ use slab::Slab;
use crate::server::Server; use crate::server::Server;
use crate::socket::{MioListener, SocketAddr}; use crate::socket::{MioListener, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue, WakerQueueError, WAKER_TOKEN}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandle}; use crate::worker::{Conn, WorkerHandle};
use crate::Token; use crate::Token;
@ -166,26 +166,36 @@ impl Accept {
for event in events.iter() { for event in events.iter() {
let token = event.token(); let token = event.token();
match token { match token {
// This is a loop because interests for command from previous version was a // This is a loop because interests for command from previous version was
// loop that would try to drain the command channel. It's yet unknown if it's // a loop that would try to drain the command channel. It's yet unknown
// necessary/good practice to actively drain the waker queue. // if it's necessary/good practice to actively drain the waker queue.
WAKER_TOKEN => 'waker: loop { WAKER_TOKEN => 'waker: loop {
match self.waker.pop() { // take guard with every iteration so no new interest can be added
// worker notify it becomes available. we may want to recover from // until the current task is done.
// backpressure. let mut guard = self.waker.guard();
Ok(WakerInterest::WorkerAvailable) => { match guard.pop_front() {
// worker notify it becomes available. we may want to recover
// from backpressure.
Some(WakerInterest::WorkerAvailable) => {
drop(guard);
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
} }
// a new worker thread is made and it's handle would be added to Accept // a new worker thread is made and it's handle would be added
Ok(WakerInterest::Worker(handle)) => { // to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
// maybe we want to recover from a backpressure. // maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
self.handles.push(handle); self.handles.push(handle);
} }
// got timer interest and it's time to try register socket(s) again. // got timer interest and it's time to try register socket(s)
Ok(WakerInterest::Timer) => self.process_timer(&mut sockets), // again.
Err(WakerQueueError::Empty) => break 'waker, Some(WakerInterest::Timer) => {
Ok(WakerInterest::Pause) => { drop(guard);
self.process_timer(&mut sockets)
}
Some(WakerInterest::Pause) => {
drop(guard);
sockets.iter_mut().for_each(|(_, info)| { sockets.iter_mut().for_each(|(_, info)| {
match self.deregister(info) { match self.deregister(info) {
Ok(_) => info!( Ok(_) => info!(
@ -198,14 +208,22 @@ impl Accept {
} }
}); });
} }
Ok(WakerInterest::Resume) => { Some(WakerInterest::Resume) => {
drop(guard);
sockets.iter_mut().for_each(|(token, info)| { sockets.iter_mut().for_each(|(token, info)| {
self.register_logged(token, info); self.register_logged(token, info);
}); });
} }
Ok(WakerInterest::Stop) | Err(WakerQueueError::Closed) => { Some(WakerInterest::Stop) => {
return self.deregister_all(&mut sockets); return self.deregister_all(&mut sockets);
} }
// waker queue is drained.
None => {
// Reset the WakerQueue before break so it does not grow
// infinitely.
WakerQueue::reset(&mut guard);
break 'waker;
}
} }
}, },
_ => { _ => {
@ -278,6 +296,10 @@ impl Accept {
if !on { if !on {
self.backpressure = false; self.backpressure = false;
for (token, info) in sockets.iter_mut() { for (token, info) in sockets.iter_mut() {
if info.timeout.is_some() {
// socket will attempt to re-register itself when its timeout completes
continue;
}
self.register_logged(token, info); self.register_logged(token, info);
} }
} }

View File

@ -11,6 +11,7 @@ mod server;
mod service; mod service;
mod signals; mod signals;
mod socket; mod socket;
mod test_server;
mod waker_queue; mod waker_queue;
mod worker; mod worker;
@ -18,6 +19,7 @@ pub use self::builder::ServerBuilder;
pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::config::{ServiceConfig, ServiceRuntime};
pub use self::server::Server; pub use self::server::Server;
pub use self::service::ServiceFactory; pub use self::service::ServiceFactory;
pub use self::test_server::TestServer;
#[doc(hidden)] #[doc(hidden)]
pub use self::socket::FromStream; pub use self::socket::FromStream;
@ -125,3 +127,17 @@ impl<T> Future for JoinAll<T> {
} }
} }
} }
#[cfg(test)]
mod test {
use super::*;
#[actix_rt::test]
async fn test_join_all() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
}

View File

@ -1,19 +1,9 @@
//! Various helpers for Actix applications to use during testing.
#![deny(rust_2018_idioms, nonstandard_style)]
#![allow(clippy::type_complexity, clippy::needless_doctest_main)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
use std::sync::mpsc; use std::sync::mpsc;
use std::{net, thread}; use std::{net, thread};
use actix_rt::{net::TcpStream, System}; use actix_rt::{net::TcpStream, System};
use actix_server::{Server, ServerBuilder, ServiceFactory};
use socket2::{Domain, Protocol, Socket, Type};
#[cfg(not(test))] // Work around for rust-lang/rust#62127 use crate::{Server, ServerBuilder, ServiceFactory};
pub use actix_macros::test;
/// The `TestServer` type. /// The `TestServer` type.
/// ///
@ -24,7 +14,7 @@ pub use actix_macros::test;
/// ///
/// ```rust /// ```rust
/// use actix_service::fn_service; /// use actix_service::fn_service;
/// use actix_testing::TestServer; /// use actix_server::TestServer;
/// ///
/// #[actix_rt::main] /// #[actix_rt::main]
/// async fn main() { /// async fn main() {
@ -94,9 +84,8 @@ impl TestServer {
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.start(); .start();
tx.send((System::current(), local_addr)).unwrap();
}); });
tx.send((System::current(), local_addr)).unwrap();
sys.run() sys.run()
}); });
@ -116,11 +105,10 @@ impl TestServer {
/// Get first available unused local address /// Get first available unused local address
pub fn unused_addr() -> net::SocketAddr { pub fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = let socket = mio::net::TcpSocket::new_v4().unwrap();
Socket::new(Domain::ipv4(), Type::stream(), Some(Protocol::tcp())).unwrap(); socket.bind(addr).unwrap();
socket.bind(&addr.into()).unwrap(); socket.set_reuseaddr(true).unwrap();
socket.set_reuse_address(true).unwrap(); let tcp = socket.listen(1024).unwrap();
let tcp = socket.into_tcp_listener();
tcp.local_addr().unwrap() tcp.local_addr().unwrap()
} }
} }

View File

@ -1,7 +1,9 @@
use std::ops::Deref; use std::{
use std::sync::Arc; collections::VecDeque,
ops::Deref,
sync::{Arc, Mutex, MutexGuard},
};
use concurrent_queue::{ConcurrentQueue, PopError};
use mio::{Registry, Token as MioToken, Waker}; use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerHandle; use crate::worker::WorkerHandle;
@ -11,7 +13,7 @@ pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` /// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
/// the `Poll` would want to look into. /// the `Poll` would want to look into.
pub(crate) struct WakerQueue(Arc<(Waker, ConcurrentQueue<WakerInterest>)>); pub(crate) struct WakerQueue(Arc<(Waker, Mutex<VecDeque<WakerInterest>>)>);
impl Clone for WakerQueue { impl Clone for WakerQueue {
fn clone(&self) -> Self { fn clone(&self) -> Self {
@ -20,7 +22,7 @@ impl Clone for WakerQueue {
} }
impl Deref for WakerQueue { impl Deref for WakerQueue {
type Target = (Waker, ConcurrentQueue<WakerInterest>); type Target = (Waker, Mutex<VecDeque<WakerInterest>>);
fn deref(&self) -> &Self::Target { fn deref(&self) -> &Self::Target {
self.0.deref() self.0.deref()
@ -34,7 +36,7 @@ impl WakerQueue {
/// event's token for it to properly handle `WakerInterest`. /// event's token for it to properly handle `WakerInterest`.
pub(crate) fn new(registry: &Registry) -> std::io::Result<Self> { pub(crate) fn new(registry: &Registry) -> std::io::Result<Self> {
let waker = Waker::new(registry, WAKER_TOKEN)?; let waker = Waker::new(registry, WAKER_TOKEN)?;
let queue = ConcurrentQueue::unbounded(); let queue = Mutex::new(VecDeque::with_capacity(16));
Ok(Self(Arc::new((waker, queue)))) Ok(Self(Arc::new((waker, queue))))
} }
@ -44,17 +46,23 @@ impl WakerQueue {
let (waker, queue) = self.deref(); let (waker, queue) = self.deref();
queue queue
.push(interest) .lock()
.unwrap_or_else(|e| panic!("WakerQueue closed: {}", e)); .expect("Failed to lock WakerQueue")
.push_back(interest);
waker waker
.wake() .wake()
.unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e)); .unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e));
} }
/// pop an `WakerInterest` from the back of the queue. /// get a MutexGuard of the waker queue.
pub(crate) fn pop(&self) -> Result<WakerInterest, WakerQueueError> { pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest>> {
self.deref().1.pop() self.deref().1.lock().expect("Failed to lock WakerQueue")
}
/// reset the waker queue so it does not grow infinitely.
pub(crate) fn reset(queue: &mut VecDeque<WakerInterest>) {
std::mem::swap(&mut VecDeque::<WakerInterest>::with_capacity(16), queue);
} }
} }
@ -74,10 +82,8 @@ pub(crate) enum WakerInterest {
/// connection `Accept` would deregister socket listener temporary and wake up the poll and /// connection `Accept` would deregister socket listener temporary and wake up the poll and
/// register them again after the delayed future resolve. /// register them again after the delayed future resolve.
Timer, Timer,
/// `WorkerNew` is an interest happen after a worker runs into faulted state(This is determined /// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new /// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandle`. /// `WorkerHandle`.
Worker(WorkerHandle), Worker(WorkerHandle),
} }
pub(crate) type WakerQueueError = PopError;

View File

@ -55,9 +55,9 @@ fn test_listen() {
.workers(1) .workers(1)
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) .listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
.unwrap() .unwrap()
.start() .start();
let _ = tx.send(actix_rt::System::current());
}); });
let _ = tx.send(actix_rt::System::current());
let _ = sys.run(); let _ = sys.run();
}); });
let sys = rx.recv().unwrap(); let sys = rx.recv().unwrap();

View File

@ -6,7 +6,7 @@ use super::{IntoServiceFactory, ServiceFactory};
/// ///
/// Note that this function consumes the receiving service factory and returns /// Note that this function consumes the receiving service factory and returns
/// a wrapped version of it. /// a wrapped version of it.
pub fn map_config<I, SF, S, Req, F, Cfg>(factory: I, f: F) -> MapConfig<SF, Req, F, Cfg> pub fn map_config<I, SF, Req, F, Cfg>(factory: I, f: F) -> MapConfig<SF, Req, F, Cfg>
where where
I: IntoServiceFactory<SF, Req>, I: IntoServiceFactory<SF, Req>,
SF: ServiceFactory<Req>, SF: ServiceFactory<Req>,

View File

@ -1,33 +0,0 @@
# Changes
## [1.0.1] - 2020-05-19
* Replace deprecated `net2` crate with `socket2`
* Remove unused `futures` dependency
## [1.0.0] - 2019-12-11
* Update actix-server to 1.0.0
## [1.0.0-alpha.3] - 2019-12-07
* Migrate to tokio 0.2
## [1.0.0-alpha.2] - 2019-12-02
* Re-export `test` attribute macros
## [0.3.0-alpha.1] - 2019-11-22
* Migrate to std::future
## [0.2.0] - 2019-10-14
* Upgrade actix-server and actix-server-config deps
## [0.1.0] - 2019-09-25
* Initial impl

View File

@ -1,27 +0,0 @@
[package]
name = "actix-testing"
version = "1.0.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix testing utils"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-testing/"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
workspace = ".."
readme = "README.md"
[lib]
name = "actix_testing"
path = "src/lib.rs"
[dependencies]
actix-rt = "1.1.1"
actix-macros = "0.1.0"
actix-server = "1.0.0"
actix-service = "1.0.0"
log = "0.4"
socket2 = "0.3"

View File

@ -1 +0,0 @@
../LICENSE-APACHE

View File

@ -1 +0,0 @@
../LICENSE-MIT

View File

@ -1,9 +0,0 @@
# Actix test utilities [![crates.io](https://meritbadge.herokuapp.com/actix-testing)](https://crates.io/crates/actix-testint) [![Join the chat at https://gitter.im/actix/actix](https://badges.gitter.im/actix/actix.svg)](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
## Documentation & community resources
* [User Guide](https://actix.rs/docs/)
* [API Documentation](https://docs.rs/actix-testing/)
* [Chat on gitter](https://gitter.im/actix/actix)
* Cargo package: [actix-http-test](https://crates.io/crates/actix-testing)
* Minimum supported Rust version: 1.37 or later

View File

@ -1,10 +1,15 @@
# Changes # Changes
## Unreleased - 2020-xx-xx ## Unreleased - 2021-xx-xx
* move from `tokio-tls` to `tokio-native-tls` for native-tls feature.
* Update `tokio-openssl` to `0.6.0`
* Update `tokio-rustls` to `0.22.0` ## 3.0.0-beta.1 - 2020-12-29
* Update `rust-tls` to `0.19.0` * Move acceptors under `accept` module. [#238]
* Merge `actix-connect` crate under `connect` module. [#238]
* Add feature flags to enable acceptors and/or connectors individually. [#238]
[#238]: https://github.com/actix/actix-net/pull/238
## 2.0.0 - 2020-09-03 ## 2.0.0 - 2020-09-03
* `nativetls::NativeTlsAcceptor` is renamed to `nativetls::Acceptor`. * `nativetls::NativeTlsAcceptor` is renamed to `nativetls::Acceptor`.

View File

@ -1,9 +1,9 @@
[package] [package]
name = "actix-tls" name = "actix-tls"
version = "2.0.0" version = "3.0.0-beta.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "TLS acceptor services for Actix ecosystem." description = "TLS acceptor and connector services for Actix ecosystem"
keywords = ["network", "framework", "async", "tls", "ssl"] keywords = ["network", "tls", "ssl", "async", "transport"]
homepage = "https://actix.rs" homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git" repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-tls/" documentation = "https://docs.rs/actix-tls/"
@ -12,7 +12,7 @@ license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"
[package.metadata.docs.rs] [package.metadata.docs.rs]
features = ["openssl", "rustls", "nativetls"] features = ["openssl", "rustls", "native-tls", "accept", "connect", "uri"]
[lib] [lib]
name = "actix_tls" name = "actix_tls"
@ -20,45 +20,64 @@ path = "src/lib.rs"
[[example]] [[example]]
name = "basic" name = "basic"
required-features = ["rustls"] required-features = ["accept", "rustls"]
[features] [features]
default = [] default = ["accept", "connect", "uri"]
# openssl # enable acceptor services
openssl = ["open-ssl", "tokio-openssl"] accept = []
# rustls # enable connector services
rustls = ["rust-tls", "webpki", "webpki-roots", "tokio-rustls"] connect = ["trust-dns-proto/tokio-runtime", "trust-dns-resolver/tokio-runtime", "trust-dns-resolver/system-config"]
# nativetls # use openssl impls
nativetls = ["native-tls", "tokio-native-tls"] openssl = ["tls-openssl", "tokio-openssl"]
# use rustls impls
rustls = ["tls-rustls", "webpki", "webpki-roots", "tokio-rustls"]
# use native-tls impls
native-tls = ["tls-native-tls", "tokio-native-tls"]
# support http::Uri as connect address
uri = ["http"]
[dependencies] [dependencies]
actix-service = "1.0.0" actix-codec = "0.4.0-beta.1"
actix-codec = "0.3.0" actix-rt = "2.0.0-beta.1"
actix-utils = "2.0.0" actix-service = "2.0.0-beta.1"
actix-utils = "3.0.0-beta.1"
derive_more = "0.99.5"
either = "1.6"
futures-util = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false }
http = { version = "0.2.2", optional = true }
log = "0.4"
# resolver
trust-dns-proto = { version = "0.20.0", default-features = false, optional = true }
trust-dns-resolver = { version = "0.20.0", default-features = false, optional = true }
# openssl # openssl
open-ssl = { package = "openssl", version = "0.10", optional = true } tls-openssl = { package = "openssl", version = "0.10", optional = true }
tokio-openssl = { version = "0.6.0", optional = true } tokio-openssl = { version = "0.6", optional = true }
# TODO: Reduce dependencies where tokio wrappers re-export base crate.
# rustls # rustls
rust-tls = { package = "rustls", version = "0.19.0", optional = true } tls-rustls = { package = "rustls", version = "0.19", optional = true }
tokio-rustls = { version = "0.22.0", optional = true } tokio-rustls = { version = "0.22", optional = true }
webpki = { version = "0.21", optional = true } webpki = { version = "0.21", optional = true }
webpki-roots = { version = "0.20", optional = true } webpki-roots = { version = "0.21", optional = true }
# native-tls # native-tls
native-tls = { version = "0.2", optional = true } tls-native-tls = { package = "native-tls", version = "0.2", optional = true }
tokio-native-tls = { version = "0.3.0", optional = true } tokio-native-tls = { version = "0.3", optional = true }
[dev-dependencies] [dev-dependencies]
actix-server = "2.0.0-beta.1"
bytes = "1" bytes = "1"
env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
log = "0.4" log = "0.4"
env_logger = "0.7"
actix-testing = "1.0.0"
actix-server = "1"
actix-rt = "1"

View File

@ -15,6 +15,10 @@
//! http --verify=false https://127.0.0.1:8443 //! http --verify=false https://127.0.0.1:8443
//! ``` //! ```
// this rename only exists because of how we have organised the crate's feature flags
// it is not necessary for your actual code
extern crate tls_rustls as rustls;
use std::{ use std::{
env, env,
fs::File, fs::File,
@ -27,10 +31,10 @@ use std::{
use actix_server::Server; use actix_server::Server;
use actix_service::pipeline_factory; use actix_service::pipeline_factory;
use actix_tls::rustls::Acceptor as RustlsAcceptor; use actix_tls::accept::rustls::Acceptor as RustlsAcceptor;
use futures_util::future::ok; use futures_util::future::ok;
use log::info; use log::info;
use rust_tls::{ use rustls::{
internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig, internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig,
}; };

View File

@ -0,0 +1,42 @@
//! TLS acceptor services for Actix ecosystem.
//!
//! ## Crate Features
//! * `openssl` - TLS acceptor using the `openssl` crate.
//! * `rustls` - TLS acceptor using the `rustls` crate.
//! * `native-tls` - TLS acceptor using the `native-tls` crate.
use std::sync::atomic::{AtomicUsize, Ordering};
use actix_utils::counter::Counter;
#[cfg(feature = "openssl")]
pub mod openssl;
#[cfg(feature = "rustls")]
pub mod rustls;
#[cfg(feature = "native-tls")]
pub mod nativetls;
pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256);
thread_local! {
static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed));
}
/// Sets the maximum per-worker concurrent TLS connection limit.
///
/// All listeners will stop accepting connections when this limit is reached.
/// It can be used to regulate the global TLS CPU usage.
///
/// By default, the connection limit is 256.
pub fn max_concurrent_tls_connect(num: usize) {
MAX_CONN.store(num, Ordering::Relaxed);
}
/// TLS error combined with service error.
#[derive(Debug)]
pub enum TlsError<E1, E2> {
Tls(E1),
Service(E2),
}

View File

@ -1,20 +1,18 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_util::future::{ready, Ready}; use futures_util::future::{ready, LocalBoxFuture, Ready};
pub use native_tls::Error; pub use native_tls::Error;
pub use tokio_native_tls::{TlsAcceptor, TlsStream}; pub use tokio_native_tls::{TlsAcceptor, TlsStream};
use crate::MAX_CONN_COUNTER; use super::MAX_CONN_COUNTER;
/// Accept TLS connections via `native-tls` package. /// Accept TLS connections via `native-tls` package.
/// ///
/// `nativetls` feature enables this `Acceptor` type. /// `native-tls` feature enables this `Acceptor` type.
pub struct Acceptor { pub struct Acceptor {
acceptor: TlsAcceptor, acceptor: TlsAcceptor,
} }
@ -36,11 +34,11 @@ impl Clone for Acceptor {
} }
} }
impl<Req> ServiceFactory<Req> for Acceptor impl<T> ServiceFactory<T> for Acceptor
where where
Req: AsyncRead + AsyncWrite + Unpin + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
type Response = TlsStream<Req>; type Response = TlsStream<T>;
type Error = Error; type Error = Error;
type Config = (); type Config = ();
@ -72,15 +70,13 @@ impl Clone for NativeTlsAcceptorService {
} }
} }
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>; impl<T> Service<T> for NativeTlsAcceptorService
impl<Req> Service<Req> for NativeTlsAcceptorService
where where
Req: AsyncRead + AsyncWrite + Unpin + 'static, T: AsyncRead + AsyncWrite + Unpin + 'static,
{ {
type Response = TlsStream<Req>; type Response = TlsStream<T>;
type Error = Error; type Error = Error;
type Future = LocalBoxFuture<'static, Result<TlsStream<Req>, Error>>; type Future = LocalBoxFuture<'static, Result<TlsStream<T>, Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) { if self.conns.available(cx) {
@ -90,13 +86,13 @@ where
} }
} }
fn call(&mut self, req: Req) -> Self::Future { fn call(&mut self, io: T) -> Self::Future {
let guard = self.conns.get(); let guard = self.conns.get();
let this = self.clone(); let this = self.clone();
Box::pin(async move { Box::pin(async move {
let res = this.acceptor.accept(req).await; let io = this.acceptor.accept(io).await;
drop(guard); drop(guard);
res io
}) })
} }
} }

View File

@ -0,0 +1,111 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard};
use futures_util::{
future::{ready, Ready},
ready,
};
pub use openssl::ssl::{
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
};
pub use tokio_openssl::SslStream;
use super::MAX_CONN_COUNTER;
/// Accept TLS connections via `openssl` package.
///
/// `openssl` feature enables this `Acceptor` type.
pub struct Acceptor {
acceptor: SslAcceptor,
}
impl Acceptor {
/// Create OpenSSL based `Acceptor` service factory.
#[inline]
pub fn new(acceptor: SslAcceptor) -> Self {
Acceptor { acceptor }
}
}
impl Clone for Acceptor {
#[inline]
fn clone(&self) -> Self {
Self {
acceptor: self.acceptor.clone(),
}
}
}
impl<T> ServiceFactory<T> for Acceptor
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = SslStream<T>;
type Error = SslError;
type Config = ();
type Service = AcceptorService;
type InitError = ();
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
MAX_CONN_COUNTER.with(|conns| {
ready(Ok(AcceptorService {
acceptor: self.acceptor.clone(),
conns: conns.clone(),
}))
})
}
}
pub struct AcceptorService {
acceptor: SslAcceptor,
conns: Counter,
}
impl<T> Service<T> for AcceptorService
where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
type Response = SslStream<T>;
type Error = SslError;
type Future = AcceptorServiceResponse<T>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(ctx) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&mut self, io: T) -> Self::Future {
let ssl_ctx = self.acceptor.context();
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
AcceptorServiceResponse {
_guard: self.conns.get(),
stream: Some(SslStream::new(ssl, io).unwrap()),
}
}
}
pub struct AcceptorServiceResponse<T>
where
T: AsyncRead + AsyncWrite,
{
stream: Option<SslStream<T>>,
_guard: CounterGuard,
}
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
type Output = Result<SslStream<T>, SslError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?;
Poll::Ready(Ok(self.stream.take().expect("SSL connect has resolved.")))
}
}

View File

@ -10,11 +10,11 @@ use actix_utils::counter::{Counter, CounterGuard};
use futures_util::future::{ready, Ready}; use futures_util::future::{ready, Ready};
use tokio_rustls::{Accept, TlsAcceptor}; use tokio_rustls::{Accept, TlsAcceptor};
pub use rust_tls::{ServerConfig, Session}; pub use rustls::{ServerConfig, Session};
pub use tokio_rustls::server::TlsStream; pub use tokio_rustls::server::TlsStream;
pub use webpki_roots::TLS_SERVER_ROOTS; pub use webpki_roots::TLS_SERVER_ROOTS;
use crate::MAX_CONN_COUNTER; use super::MAX_CONN_COUNTER;
/// Accept TLS connections via `rustls` package. /// Accept TLS connections via `rustls` package.
/// ///
@ -42,8 +42,11 @@ impl Clone for Acceptor {
} }
} }
impl<Req: AsyncRead + AsyncWrite + Unpin> ServiceFactory<Req> for Acceptor { impl<T> ServiceFactory<T> for Acceptor
type Response = TlsStream<Req>; where
T: AsyncRead + AsyncWrite + Unpin,
{
type Response = TlsStream<T>;
type Error = io::Error; type Error = io::Error;
type Config = (); type Config = ();
@ -67,10 +70,13 @@ pub struct AcceptorService {
conns: Counter, conns: Counter,
} }
impl<Req: AsyncRead + AsyncWrite + Unpin> Service<Req> for AcceptorService { impl<T> Service<T> for AcceptorService
type Response = TlsStream<Req>; where
T: AsyncRead + AsyncWrite + Unpin,
{
type Response = TlsStream<T>;
type Error = io::Error; type Error = io::Error;
type Future = AcceptorServiceFut<Req>; type Future = AcceptorServiceFut<T>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) { if self.conns.available(cx) {
@ -80,7 +86,7 @@ impl<Req: AsyncRead + AsyncWrite + Unpin> Service<Req> for AcceptorService {
} }
} }
fn call(&mut self, req: Req) -> Self::Future { fn call(&mut self, req: T) -> Self::Future {
AcceptorServiceFut { AcceptorServiceFut {
_guard: self.conns.get(), _guard: self.conns.get(),
fut: self.acceptor.accept(req), fut: self.acceptor.accept(req),
@ -88,16 +94,19 @@ impl<Req: AsyncRead + AsyncWrite + Unpin> Service<Req> for AcceptorService {
} }
} }
pub struct AcceptorServiceFut<Req> pub struct AcceptorServiceFut<T>
where where
Req: AsyncRead + AsyncWrite + Unpin, T: AsyncRead + AsyncWrite + Unpin,
{ {
fut: Accept<Req>, fut: Accept<T>,
_guard: CounterGuard, _guard: CounterGuard,
} }
impl<Req: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceFut<Req> { impl<T> Future for AcceptorServiceFut<T>
type Output = Result<TlsStream<Req>, io::Error>; where
T: AsyncRead + AsyncWrite + Unpin,
{
type Output = Result<TlsStream<T>, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut(); let this = self.get_mut();

View File

@ -9,6 +9,7 @@ use std::task::{Context, Poll};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_util::future::{ready, Ready}; use futures_util::future::{ready, Ready};
use log::{error, trace};
use super::connect::{Address, Connect, Connection}; use super::connect::{Address, Connect, Connection};
use super::error::ConnectError; use super::error::ConnectError;
@ -151,18 +152,18 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
Poll::Ready(Ok(sock)) => { Poll::Ready(Ok(sock)) => {
let req = req.take().unwrap(); let req = req.take().unwrap();
trace!( trace!(
"TCP connector - successfully connected to connecting to {:?} - {:?}", "TCP connector - successfully connected to connecting to {:?} - {:?}",
req.host(), sock.peer_addr() req.host(), sock.peer_addr()
); );
return Poll::Ready(Ok(Connection::new(sock, req))); return Poll::Ready(Ok(Connection::new(sock, req)));
} }
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Err(err)) => { Poll::Ready(Err(err)) => {
trace!( trace!(
"TCP connector - failed to connect to connecting to {:?} port: {}", "TCP connector - failed to connect to connecting to {:?} port: {}",
req.as_ref().unwrap().host(), req.as_ref().unwrap().host(),
port, port,
); );
if addrs.is_none() || addrs.as_ref().unwrap().is_empty() { if addrs.is_none() || addrs.as_ref().unwrap().is_empty() {
return Poll::Ready(Err(err.into())); return Poll::Ready(Err(err.into()));
} }

View File

@ -5,21 +5,12 @@
//! * `openssl` - enables TLS support via `openssl` crate //! * `openssl` - enables TLS support via `openssl` crate
//! * `rustls` - enables TLS support via `rustls` crate //! * `rustls` - enables TLS support via `rustls` crate
#![deny(rust_2018_idioms, nonstandard_style)]
#![recursion_limit = "128"]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
#[macro_use]
extern crate log;
mod connect; mod connect;
mod connector; mod connector;
mod error; mod error;
mod resolve; mod resolve;
mod service; mod service;
pub mod ssl; pub mod ssl;
#[cfg(feature = "uri")] #[cfg(feature = "uri")]
mod uri; mod uri;

View File

@ -5,13 +5,14 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_util::future::{ready, Ready}; use futures_util::future::{ok, Either, Ready};
use log::trace;
use trust_dns_resolver::TokioAsyncResolver as AsyncResolver; use trust_dns_resolver::TokioAsyncResolver as AsyncResolver;
use trust_dns_resolver::{error::ResolveError, lookup_ip::LookupIp}; use trust_dns_resolver::{error::ResolveError, lookup_ip::LookupIp};
use crate::connect::{Address, Connect}; use super::connect::{Address, Connect};
use crate::error::ConnectError; use super::error::ConnectError;
use crate::get_default_resolver; use super::get_default_resolver;
/// DNS Resolver Service factory /// DNS Resolver Service factory
pub struct ResolverFactory<T> { pub struct ResolverFactory<T> {
@ -63,7 +64,7 @@ impl<T: Address> ServiceFactory<Connect<T>> for ResolverFactory<T> {
type Future = Ready<Result<Self::Service, Self::InitError>>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
ready(Ok(self.service())) ok(self.service())
} }
} }
@ -104,19 +105,23 @@ impl<T> Clone for Resolver<T> {
impl<T: Address> Service<Connect<T>> for Resolver<T> { impl<T: Address> Service<Connect<T>> for Resolver<T> {
type Response = Connect<T>; type Response = Connect<T>;
type Error = ConnectError; type Error = ConnectError;
type Future = ResolverServiceFuture<T>; #[allow(clippy::type_complexity)]
type Future = Either<
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>,
Ready<Result<Connect<T>, Self::Error>>,
>;
actix_service::always_ready!(); actix_service::always_ready!();
fn call(&mut self, mut req: Connect<T>) -> Self::Future { fn call(&mut self, mut req: Connect<T>) -> Self::Future {
if req.addr.is_some() { if req.addr.is_some() {
ResolverServiceFuture::NoLookUp(Some(req)) Either::Right(ok(req))
} else if let Ok(ip) = req.host().parse() { } else if let Ok(ip) = req.host().parse() {
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port()))); req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
ResolverServiceFuture::NoLookUp(Some(req)) Either::Right(ok(req))
} else { } else {
let resolver = self.resolver.as_ref().map(AsyncResolver::clone); let resolver = self.resolver.as_ref().map(AsyncResolver::clone);
ResolverServiceFuture::LookUp(Box::pin(async move { Either::Left(Box::pin(async move {
trace!("DNS resolver: resolving host {:?}", req.host()); trace!("DNS resolver: resolving host {:?}", req.host());
let resolver = if let Some(resolver) = resolver { let resolver = if let Some(resolver) = resolver {
resolver resolver
@ -131,30 +136,13 @@ impl<T: Address> Service<Connect<T>> for Resolver<T> {
} }
} }
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>; type LookupIpFuture = Pin<Box<dyn Future<Output = Result<LookupIp, ResolveError>>>>;
#[doc(hidden)]
pub enum ResolverServiceFuture<T: Address> {
NoLookUp(Option<Connect<T>>),
LookUp(LocalBoxFuture<'static, Result<Connect<T>, ConnectError>>),
}
impl<T: Address> Future for ResolverServiceFuture<T> {
type Output = Result<Connect<T>, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.get_mut() {
Self::NoLookUp(conn) => Poll::Ready(Ok(conn.take().unwrap())),
Self::LookUp(fut) => fut.as_mut().poll(cx),
}
}
}
#[doc(hidden)] #[doc(hidden)]
/// Resolver future /// Resolver future
pub struct ResolverFuture<T: Address> { pub struct ResolverFuture<T: Address> {
req: Option<Connect<T>>, req: Option<Connect<T>>,
lookup: LocalBoxFuture<'static, Result<LookupIp, ResolveError>>, lookup: LookupIpFuture,
} }
impl<T: Address> ResolverFuture<T> { impl<T: Address> ResolverFuture<T> {

View File

@ -5,13 +5,13 @@ use std::task::{Context, Poll};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use either::Either; use either::Either;
use futures_util::future::{ready, Ready}; use futures_util::future::{ok, Ready};
use trust_dns_resolver::TokioAsyncResolver as AsyncResolver; use trust_dns_resolver::TokioAsyncResolver as AsyncResolver;
use crate::connect::{Address, Connect, Connection}; use super::connect::{Address, Connect, Connection};
use crate::connector::{TcpConnector, TcpConnectorFactory}; use super::connector::{TcpConnector, TcpConnectorFactory};
use crate::error::ConnectError; use super::error::ConnectError;
use crate::resolve::{Resolver, ResolverFactory}; use super::resolve::{Resolver, ResolverFactory};
pub struct ConnectServiceFactory<T> { pub struct ConnectServiceFactory<T> {
tcp: TcpConnectorFactory<T>, tcp: TcpConnectorFactory<T>,
@ -79,7 +79,7 @@ impl<T: Address> ServiceFactory<Connect<T>> for ConnectServiceFactory<T> {
type Future = Ready<Result<Self::Service, Self::InitError>>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
ready(Ok(self.service())) ok(self.service())
} }
} }

View File

@ -1,19 +1,22 @@
use std::future::Future; use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, io}; use std::{fmt, io};
pub use open_ssl::ssl::{Error as SslError, SslConnector, SslMethod};
pub use tokio_openssl::SslStream;
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_util::future::{ready, Ready}; use futures_util::{
use futures_util::ready; future::{ready, Either, Ready},
ready,
};
use log::trace;
pub use openssl::ssl::{Error as SslError, HandshakeError, SslConnector, SslMethod};
pub use tokio_openssl::SslStream;
use trust_dns_resolver::TokioAsyncResolver as AsyncResolver; use trust_dns_resolver::TokioAsyncResolver as AsyncResolver;
use crate::{ use crate::connect::{
Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection, Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection,
}; };
@ -80,77 +83,59 @@ where
{ {
type Response = Connection<T, SslStream<U>>; type Response = Connection<T, SslStream<U>>;
type Error = io::Error; type Error = io::Error;
type Future = OpensslConnectorServiceFuture<T, U>; #[allow(clippy::type_complexity)]
type Future = Either<ConnectAsyncExt<T, U>, Ready<Result<Self::Response, Self::Error>>>;
actix_service::always_ready!(); actix_service::always_ready!();
fn call(&mut self, stream: Connection<T, U>) -> Self::Future { fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
match self.ssl_stream(stream) { trace!("SSL Handshake start for: {:?}", stream.host());
Ok(acc) => OpensslConnectorServiceFuture::Accept(Some(acc)), let (io, stream) = stream.replace(());
Err(e) => OpensslConnectorServiceFuture::Error(Some(e)), let host = stream.host().to_string();
match self.connector.configure() {
Err(e) => Either::Right(ready(Err(io::Error::new(io::ErrorKind::Other, e)))),
Ok(config) => {
let ssl = config
.into_ssl(&host)
.expect("SSL connect configuration was invalid.");
Either::Left(ConnectAsyncExt {
io: Some(SslStream::new(ssl, io).unwrap()),
stream: Some(stream),
_t: PhantomData,
})
}
} }
} }
} }
impl OpensslConnectorService { pub struct ConnectAsyncExt<T, U> {
// construct SslStream with connector. io: Option<SslStream<U>>,
// At this point SslStream does not perform any I/O. stream: Option<Connection<T, ()>>,
// handshake would happen later in OpensslConnectorServiceFuture _t: PhantomData<U>,
fn ssl_stream<T, U>(
&self,
stream: Connection<T, U>,
) -> Result<(SslStream<U>, Connection<T, ()>), SslError>
where
T: Address + 'static,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
{
trace!("SSL Handshake start for: {:?}", stream.host());
let (stream, connection) = stream.replace(());
let host = connection.host().to_string();
let config = self.connector.configure()?;
let ssl = config.into_ssl(host.as_str())?;
let stream = tokio_openssl::SslStream::new(ssl, stream)?;
Ok((stream, connection))
}
} }
#[doc(hidden)] impl<T: Address, U> Future for ConnectAsyncExt<T, U>
pub enum OpensslConnectorServiceFuture<T, U>
where where
T: Address + 'static,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
{
Accept(Option<(SslStream<U>, Connection<T, ()>)>),
Error(Option<SslError>),
}
impl<T, U> Future for OpensslConnectorServiceFuture<T, U>
where
T: Address,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
{ {
type Output = Result<Connection<T, SslStream<U>>, io::Error>; type Output = Result<Connection<T, SslStream<U>>, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let e = match self.get_mut() { let this = self.get_mut();
Self::Error(e) => e.take().unwrap(),
Self::Accept(acc) => {
let (stream, _) = acc.as_mut().unwrap();
match ready!(Pin::new(stream).poll_connect(cx)) {
Ok(()) => {
let (stream, connection) = acc.take().unwrap();
trace!("SSL Handshake success: {:?}", connection.host());
let (_, connection) = connection.replace(stream);
return Poll::Ready(Ok(connection));
}
Err(e) => e,
}
}
};
trace!("SSL Handshake error: {:?}", e); match ready!(Pin::new(this.io.as_mut().unwrap()).poll_connect(cx)) {
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)))) Ok(_) => {
let stream = this.stream.take().unwrap();
trace!("SSL Handshake success: {:?}", stream.host());
Poll::Ready(Ok(stream.replace(this.io.take().unwrap()).1))
}
Err(e) => {
trace!("SSL Handshake error: {:?}", e);
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, format!("{}", e))))
}
}
} }
} }

View File

@ -4,17 +4,20 @@ use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
pub use rust_tls::Session; pub use rustls::Session;
pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig}; pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig};
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};
use futures_util::future::{ready, Ready}; use futures_util::{
use futures_util::ready; future::{ready, Ready},
ready,
};
use log::trace;
use tokio_rustls::{Connect, TlsConnector}; use tokio_rustls::{Connect, TlsConnector};
use webpki::DNSNameRef; use webpki::DNSNameRef;
use crate::{Address, Connection}; use crate::connect::{Address, Connection};
/// Rustls connector factory /// Rustls connector factory
pub struct RustlsConnector { pub struct RustlsConnector {
@ -99,8 +102,9 @@ pub struct ConnectAsyncExt<T, U> {
stream: Option<Connection<T, ()>>, stream: Option<Connection<T, ()>>,
} }
impl<T: Address, U> Future for ConnectAsyncExt<T, U> impl<T, U> Future for ConnectAsyncExt<T, U>
where where
T: Address,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
{ {
type Output = Result<Connection<T, TlsStream<U>>, std::io::Error>; type Output = Result<Connection<T, TlsStream<U>>, std::io::Error>;

View File

@ -1,6 +1,6 @@
use http::Uri; use http::Uri;
use crate::Address; use super::Address;
impl Address for Uri { impl Address for Uri {
fn host(&self) -> &str { fn host(&self) -> &str {

View File

@ -1,46 +1,17 @@
//! TLS acceptor services for Actix ecosystem. //! TLS acceptor and connector services for Actix ecosystem
//!
//! ## Crate Features
//! * `openssl` - TLS acceptor using the `openssl` crate.
//! * `rustls` - TLS acceptor using the `rustls` crate.
//! * `nativetls` - TLS acceptor using the `native-tls` crate.
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
use std::sync::atomic::{AtomicUsize, Ordering}; #[cfg(feature = "native-tls")]
extern crate tls_native_tls as native_tls;
use actix_utils::counter::Counter;
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
pub mod openssl; extern crate tls_openssl as openssl;
#[cfg(feature = "rustls")] #[cfg(feature = "rustls")]
pub mod rustls; extern crate tls_rustls as rustls;
#[cfg(feature = "nativetls")] #[cfg(feature = "accept")]
pub mod nativetls; pub mod accept;
#[cfg(feature = "connect")]
pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256); pub mod connect;
thread_local! {
static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed));
}
/// Sets the maximum per-worker concurrent TLS connection limit.
///
/// All listeners will stop accepting connections when this limit is reached.
/// It can be used to regulate the global TLS CPU usage.
///
/// By default, the connection limit is 256.
pub fn max_concurrent_tls_connect(num: usize) {
MAX_CONN.store(num, Ordering::Relaxed);
}
/// TLS error combined with service error.
#[derive(Debug)]
pub enum TlsError<E1, E2> {
Tls(E1),
Service(E2),
}

View File

@ -1,126 +0,0 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard};
use futures_util::future::{ready, Ready};
use futures_util::ready;
pub use open_ssl::ssl::{AlpnError, Error, Ssl, SslAcceptor, SslAcceptorBuilder};
pub use tokio_openssl::SslStream;
use crate::MAX_CONN_COUNTER;
/// Accept TLS connections via `openssl` package.
///
/// `openssl` feature enables this `Acceptor` type.
pub struct Acceptor {
acceptor: SslAcceptor,
}
impl Acceptor {
/// Create OpenSSL based `Acceptor` service factory.
#[inline]
pub fn new(acceptor: SslAcceptor) -> Self {
Acceptor { acceptor }
}
}
impl Clone for Acceptor {
#[inline]
fn clone(&self) -> Self {
Self {
acceptor: self.acceptor.clone(),
}
}
}
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> ServiceFactory<T> for Acceptor {
type Response = SslStream<T>;
type Error = Error;
type Config = ();
type Service = AcceptorService;
type InitError = ();
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
MAX_CONN_COUNTER.with(|conns| {
ready(Ok(AcceptorService {
acceptor: self.acceptor.clone(),
conns: conns.clone(),
}))
})
}
}
pub struct AcceptorService {
acceptor: SslAcceptor,
conns: Counter,
}
impl<Req: AsyncRead + AsyncWrite + Unpin + 'static> Service<Req> for AcceptorService {
type Response = SslStream<Req>;
type Error = Error;
type Future = AcceptorServiceResponse<Req>;
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(ctx) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&mut self, req: Req) -> Self::Future {
match self.ssl_stream(req) {
Ok(stream) => {
let guard = self.conns.get();
AcceptorServiceResponse::Accept(Some(stream), Some(guard))
}
Err(e) => AcceptorServiceResponse::Error(Some(e)),
}
}
}
impl AcceptorService {
// construct a new SslStream.
// At this point the SslStream does not perform any IO.
// The handshake would happen later in AcceptorServiceResponse
fn ssl_stream<Req: AsyncRead + AsyncWrite + Unpin + 'static>(
&self,
stream: Req,
) -> Result<SslStream<Req>, Error> {
let ssl = Ssl::new(self.acceptor.context())?;
let stream = SslStream::new(ssl, stream)?;
Ok(stream)
}
}
pub enum AcceptorServiceResponse<Req>
where
Req: AsyncRead + AsyncWrite,
{
Accept(Option<SslStream<Req>>, Option<CounterGuard>),
Error(Option<Error>),
}
impl<Req: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<Req>
where
Req: AsyncRead + AsyncWrite + Unpin,
{
type Output = Result<SslStream<Req>, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.get_mut() {
AcceptorServiceResponse::Error(e) => Poll::Ready(Err(e.take().unwrap())),
AcceptorServiceResponse::Accept(stream, guard) => {
ready!(Pin::new(stream.as_mut().unwrap()).poll_accept(cx))?;
// drop counter guard a little early as the accept has finished
guard.take();
Poll::Ready(Ok(stream.take().unwrap()))
}
}
}
}

View File

@ -2,15 +2,18 @@ use std::io;
use actix_codec::{BytesCodec, Framed}; use actix_codec::{BytesCodec, Framed};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::TestServer;
use actix_service::{fn_service, Service, ServiceFactory}; use actix_service::{fn_service, Service, ServiceFactory};
use actix_testing::TestServer;
use bytes::Bytes; use bytes::Bytes;
use futures_util::sink::SinkExt; use futures_util::sink::SinkExt;
use actix_connect::resolver::{ResolverConfig, ResolverOpts}; use actix_tls::connect::{
use actix_connect::Connect; self as actix_connect,
resolver::{ResolverConfig, ResolverOpts},
Connect,
};
#[cfg(feature = "openssl")] #[cfg(all(feature = "connect", feature = "openssl"))]
#[actix_rt::test] #[actix_rt::test]
async fn test_string() { async fn test_string() {
let srv = TestServer::with(|| { let srv = TestServer::with(|| {

View File

@ -16,11 +16,12 @@ name = "actix_tracing"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-service = "1.0.4" actix-service = "2.0.0-beta.1"
futures-util = { version = "0.3.7", default-features = false }
futures-util = { version = "0.3.4", default-features = false }
tracing = "0.1" tracing = "0.1"
tracing-futures = "0.2" tracing-futures = "0.2"
[dev_dependencies] [dev_dependencies]
actix-rt = "1.0" actix-rt = "1.0"
slab = "0.4" slab = "0.4"