diff --git a/Cargo.toml b/Cargo.toml index 8d8ebed2..13fc843e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,12 +1,10 @@ [workspace] members = [ "actix-codec", - "actix-connect", "actix-rt", "actix-macros", "actix-service", "actix-server", - "actix-testing", "actix-threadpool", "actix-tls", "actix-tracing", @@ -16,21 +14,14 @@ members = [ ] [patch.crates-io] -actix-codec = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } -actix-connect = { path = "actix-connect" } -actix-rt = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } +actix-codec = { path = "actix-codec" } +actix-rt = { path = "actix-rt" } actix-macros = { path = "actix-macros" } actix-server = { path = "actix-server" } -actix-service = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } -actix-testing = { path = "actix-testing" } +actix-service = { path = "actix-service" } actix-threadpool = { path = "actix-threadpool" } actix-tls = { path = "actix-tls" } actix-tracing = { path = "actix-tracing" } -actix-utils = { git = "https://github.com/actix/actix-net.git", rev = "ba44ea7d0bafaf5fccb9a34003d503e1910943ee" } +actix-utils = { path = "actix-utils" } actix-router = { path = "router" } bytestring = { path = "string" } - -#FIXME: remove override -#http = { git = "https://github.com/fakeshadow/http.git" } -trust-dns-proto = { git = "https://github.com/messense/trust-dns.git", branch = "tokio-1" } -trust-dns-resolver = { git = "https://github.com/messense/trust-dns.git", branch = "tokio-1" } \ No newline at end of file diff --git a/actix-connect/CHANGES.md b/actix-connect/CHANGES.md deleted file mode 100644 index 274564f3..00000000 --- a/actix-connect/CHANGES.md +++ /dev/null @@ -1,156 +0,0 @@ -# Changes - -## Unreleased - 2020-xx-xx -* Update `tokio-openssl` to `0.6.0` -* Update `tokio-rustls` to `0.22` -* Update `rustls` to `0.19.0` - -## 2.0.0 - 2020-09-02 -- No significant changes from `2.0.0-alpha.4`. - -## 2.0.0-alpha.4 - 2020-08-17 - -### Changed - -* Update `rustls` dependency to 0.18 -* Update `tokio-rustls` dependency to 0.14 - - -## [2.0.0-alpha.3] - 2020-05-08 - -### Fixed - -* Corrected spelling of `ConnectError::Unresolverd` to `ConnectError::Unresolved` - -## [2.0.0-alpha.2] - 2020-03-08 - -### Changed - -* Update `trust-dns-proto` dependency to 0.19. [#116] -* Update `trust-dns-resolver` dependency to 0.19. [#116] -* `Address` trait is now required to have static lifetime. [#116] -* `start_resolver` and `start_default_resolver` are now `async` and may return a `ConnectError`. [#116] - -[#116]: https://github.com/actix/actix-net/pull/116 - -## [2.0.0-alpha.1] - 2020-03-03 - -### Changed - -* Update `rustls` dependency to 0.17 -* Update `tokio-rustls` dependency to 0.13 - -## [1.0.2] - 2020-01-15 - -* Fix actix-service 1.0.3 compatibility - -## [1.0.1] - 2019-12-15 - -* Fix trust-dns-resolver compilation - -## [1.0.0] - 2019-12-11 - -* Release - -## [1.0.0-alpha.3] - 2019-12-07 - -### Changed - -* Migrate to tokio 0.2 - - -## [1.0.0-alpha.2] - 2019-12-02 - -### Changed - -* Migrated to `std::future` - - -## [0.3.0] - 2019-10-03 - -### Changed - -* Update `rustls` to 0.16 -* Minimum required Rust version upped to 1.37.0 - -## [0.2.5] - 2019-09-05 - -* Add `TcpConnectService` - -## [0.2.4] - 2019-09-02 - -* Use arbiter's storage for default async resolver - -## [0.2.3] - 2019-08-05 - -* Add `ConnectService` and `OpensslConnectService` - -## [0.2.2] - 2019-07-24 - -* Add `rustls` support - -## [0.2.1] - 2019-07-17 - -### Added - -* Expose Connect addrs #30 - -### Changed - -* Update `derive_more` to 0.15 - - -## [0.2.0] - 2019-05-12 - -### Changed - -* Upgrade to actix-service 0.4 - - -## [0.1.5] - 2019-04-19 - -### Added - -* `Connect::set_addr()` - -### Changed - -* Use trust-dns-resolver 0.11.0 - - -## [0.1.4] - 2019-04-12 - -### Changed - -* Do not start default resolver immediately for default connector. - - -## [0.1.3] - 2019-04-11 - -### Changed - -* Start trust-dns default resolver on first use - -## [0.1.2] - 2019-04-04 - -### Added - -* Log error if dns system config could not be loaded. - -### Changed - -* Rename connect Connector to TcpConnector #10 - - -## [0.1.1] - 2019-03-15 - -### Fixed - -* Fix error handling for single address - - -## [0.1.0] - 2019-03-14 - -* Refactor resolver and connector services - -* Rename crate diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml deleted file mode 100644 index cdec658f..00000000 --- a/actix-connect/Cargo.toml +++ /dev/null @@ -1,61 +0,0 @@ -[package] -name = "actix-connect" -version = "2.0.0" -authors = ["Nikolay Kim "] -description = "TCP connector service for Actix ecosystem." -keywords = ["network", "framework", "async", "futures"] -homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-connect/" -categories = ["network-programming", "asynchronous"] -license = "MIT OR Apache-2.0" -edition = "2018" - -[package.metadata.docs.rs] -features = ["openssl", "rustls", "uri"] - -[lib] -name = "actix_connect" -path = "src/lib.rs" - -[features] -default = ["uri"] - -# openssl -openssl = ["open-ssl", "tokio-openssl"] - -# rustls -rustls = ["rust-tls", "tokio-rustls", "webpki"] - -# support http::Uri as connect address -uri = ["http"] - -[dependencies] -actix-service = "1.0.6" -actix-codec = "0.3.0" -actix-utils = "2.0.0" -actix-rt = "1.1.1" - -derive_more = "0.99.2" -either = "1.5.3" -futures-util = { version = "0.3.7", default-features = false } -# FIXME: update to 0.3 -http = { version = "0.2.2", optional = true } -log = "0.4" -# FIXME: Use release version -trust-dns-proto = "0.20.0-alpha.3" -trust-dns-resolver = "0.20.0-alpha.3" - -# openssl -open-ssl = { package = "openssl", version = "0.10", optional = true } -tokio-openssl = { version = "0.6.0", optional = true } - -# rustls -rust-tls = { package = "rustls", version = "0.19.0", optional = true } -tokio-rustls = { version = "0.22.0", optional = true } -webpki = { version = "0.21", optional = true } - -[dev-dependencies] -bytes = "1" -actix-testing = "1.0.0" -futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } diff --git a/actix-connect/LICENSE-APACHE b/actix-connect/LICENSE-APACHE deleted file mode 120000 index 965b606f..00000000 --- a/actix-connect/LICENSE-APACHE +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-APACHE \ No newline at end of file diff --git a/actix-connect/LICENSE-MIT b/actix-connect/LICENSE-MIT deleted file mode 120000 index 76219eb7..00000000 --- a/actix-connect/LICENSE-MIT +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-MIT \ No newline at end of file diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 98d55e82..646c5b64 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,6 +1,7 @@ # Changes ## Unreleased - 2020-xx-xx +<<<<<<< HEAD * Update `mio` dependency to 0.7.3. * Remove `socket2` dependency. * `ServerBuilder::backlog` would accept `u32` instead of `i32`. @@ -8,11 +9,24 @@ * Remove `AcceptNotify` type and pass `WakerQueue` to `Worker` for wake up the `Accept`'s `Poll`. * Convert `mio::net::TcpStream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`FromRawSocket` and `IntoRawSocket` on windows). * Remove `AsyncRead` and `AsyncWrite` trait bound for `socket::FromStream` trait. +======= +* Merge `actix-testing` to `actix-server` as `test_server` mod. + +## 2.0.0-beta.1 - 2020-12-28 +>>>>>>> upstream/master * Added explicit info log message on accept queue pause. [#215] * Prevent double registration of sockets when back-pressure is resolved. [#223] +* Update `mio` dependency to `0.7.3`. [#239] +* Remove `socket2` dependency. [#239] +* `ServerBuilder::backlog` now accepts `u32` instead of `i32`. [#239] +* Remove `AcceptNotify` type and pass `WakerQueue` to `Worker` to wake up `Accept`'s `Poll`. [#239] +* Convert `mio::net::TcpStream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using + `FromRawFd` and `IntoRawFd`(`FromRawSocket` and `IntoRawSocket` on windows). [#239] +* Remove `AsyncRead` and `AsyncWrite` trait bound for `socket::FromStream` trait. [#239] [#215]: https://github.com/actix/actix-net/pull/215 [#223]: https://github.com/actix/actix-net/pull/223 +[#239]: https://github.com/actix/actix-net/pull/239 [#215]: https://github.com/actix/actix-net/pull/215 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 162dcc63..0bdd5d23 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "actix-server" -version = "1.0.4" -authors = ["Nikolay Kim "] +version = "2.0.0-beta.1" +authors = [ + "Nikolay Kim ", + "fakeshadow <24548779@qq.com>", +] description = "General purpose TCP server built for the Actix ecosystem" keywords = ["network", "framework", "async", "futures"] homepage = "https://actix.rs" @@ -20,22 +23,20 @@ path = "src/lib.rs" default = [] [dependencies] -actix-service = "1.0.6" -actix-rt = "1.1.1" -actix-codec = "0.3.0" -actix-utils = "2.0.0" +actix-codec = "0.4.0-beta.1" +actix-rt = "2.0.0-beta.1" +actix-service = "2.0.0-beta.1" +actix-utils = "3.0.0-beta.1" -concurrent-queue = "1.2.2" futures-core = { version = "0.3.7", default-features = false } log = "0.4" -mio = { version = "0.7.3", features = [ "os-poll", "tcp", "uds"] } +mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" slab = "0.4" tokio = { version = "1", features = ["sync"] } [dev-dependencies] -actix-testing = "1.0.0" bytes = "1" -env_logger = "0.7" +env_logger = "0.8" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } tokio = { version = "1", features = ["io-util"] } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 1ced51e4..bf895f06 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -9,7 +9,7 @@ use slab::Slab; use crate::server::Server; use crate::socket::{MioListener, SocketAddr}; -use crate::waker_queue::{WakerInterest, WakerQueue, WakerQueueError, WAKER_TOKEN}; +use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandle}; use crate::Token; @@ -166,26 +166,36 @@ impl Accept { for event in events.iter() { let token = event.token(); match token { - // This is a loop because interests for command from previous version was a - // loop that would try to drain the command channel. It's yet unknown if it's - // necessary/good practice to actively drain the waker queue. + // This is a loop because interests for command from previous version was + // a loop that would try to drain the command channel. It's yet unknown + // if it's necessary/good practice to actively drain the waker queue. WAKER_TOKEN => 'waker: loop { - match self.waker.pop() { - // worker notify it becomes available. we may want to recover from - // backpressure. - Ok(WakerInterest::WorkerAvailable) => { + // take guard with every iteration so no new interest can be added + // until the current task is done. + let mut guard = self.waker.guard(); + match guard.pop_front() { + // worker notify it becomes available. we may want to recover + // from backpressure. + Some(WakerInterest::WorkerAvailable) => { + drop(guard); self.maybe_backpressure(&mut sockets, false); } - // a new worker thread is made and it's handle would be added to Accept - Ok(WakerInterest::Worker(handle)) => { + // a new worker thread is made and it's handle would be added + // to Accept + Some(WakerInterest::Worker(handle)) => { + drop(guard); // maybe we want to recover from a backpressure. self.maybe_backpressure(&mut sockets, false); self.handles.push(handle); } - // got timer interest and it's time to try register socket(s) again. - Ok(WakerInterest::Timer) => self.process_timer(&mut sockets), - Err(WakerQueueError::Empty) => break 'waker, - Ok(WakerInterest::Pause) => { + // got timer interest and it's time to try register socket(s) + // again. + Some(WakerInterest::Timer) => { + drop(guard); + self.process_timer(&mut sockets) + } + Some(WakerInterest::Pause) => { + drop(guard); sockets.iter_mut().for_each(|(_, info)| { match self.deregister(info) { Ok(_) => info!( @@ -198,14 +208,22 @@ impl Accept { } }); } - Ok(WakerInterest::Resume) => { + Some(WakerInterest::Resume) => { + drop(guard); sockets.iter_mut().for_each(|(token, info)| { self.register_logged(token, info); }); } - Ok(WakerInterest::Stop) | Err(WakerQueueError::Closed) => { + Some(WakerInterest::Stop) => { return self.deregister_all(&mut sockets); } + // waker queue is drained. + None => { + // Reset the WakerQueue before break so it does not grow + // infinitely. + WakerQueue::reset(&mut guard); + break 'waker; + } } }, _ => { @@ -278,6 +296,10 @@ impl Accept { if !on { self.backpressure = false; for (token, info) in sockets.iter_mut() { + if info.timeout.is_some() { + // socket will attempt to re-register itself when its timeout completes + continue; + } self.register_logged(token, info); } } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 79b5d57b..24129b5a 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -11,6 +11,7 @@ mod server; mod service; mod signals; mod socket; +mod test_server; mod waker_queue; mod worker; @@ -18,6 +19,7 @@ pub use self::builder::ServerBuilder; pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::server::Server; pub use self::service::ServiceFactory; +pub use self::test_server::TestServer; #[doc(hidden)] pub use self::socket::FromStream; @@ -125,3 +127,17 @@ impl Future for JoinAll { } } } + +#[cfg(test)] +mod test { + use super::*; + + #[actix_rt::test] + async fn test_join_all() { + let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; + let mut res = join_all(futs).await.into_iter(); + assert_eq!(Ok(1), res.next().unwrap()); + assert_eq!(Err(3), res.next().unwrap()); + assert_eq!(Ok(9), res.next().unwrap()); + } +} diff --git a/actix-testing/src/lib.rs b/actix-server/src/test_server.rs similarity index 80% rename from actix-testing/src/lib.rs rename to actix-server/src/test_server.rs index de933085..627cc675 100644 --- a/actix-testing/src/lib.rs +++ b/actix-server/src/test_server.rs @@ -1,19 +1,9 @@ -//! Various helpers for Actix applications to use during testing. - -#![deny(rust_2018_idioms, nonstandard_style)] -#![allow(clippy::type_complexity, clippy::needless_doctest_main)] -#![doc(html_logo_url = "https://actix.rs/img/logo.png")] -#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] - use std::sync::mpsc; use std::{net, thread}; use actix_rt::{net::TcpStream, System}; -use actix_server::{Server, ServerBuilder, ServiceFactory}; -use socket2::{Domain, Protocol, Socket, Type}; -#[cfg(not(test))] // Work around for rust-lang/rust#62127 -pub use actix_macros::test; +use crate::{Server, ServerBuilder, ServiceFactory}; /// The `TestServer` type. /// @@ -24,7 +14,7 @@ pub use actix_macros::test; /// /// ```rust /// use actix_service::fn_service; -/// use actix_testing::TestServer; +/// use actix_server::TestServer; /// /// #[actix_rt::main] /// async fn main() { @@ -94,9 +84,8 @@ impl TestServer { .workers(1) .disable_signals() .start(); + tx.send((System::current(), local_addr)).unwrap(); }); - - tx.send((System::current(), local_addr)).unwrap(); sys.run() }); @@ -116,11 +105,10 @@ impl TestServer { /// Get first available unused local address pub fn unused_addr() -> net::SocketAddr { let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = - Socket::new(Domain::ipv4(), Type::stream(), Some(Protocol::tcp())).unwrap(); - socket.bind(&addr.into()).unwrap(); - socket.set_reuse_address(true).unwrap(); - let tcp = socket.into_tcp_listener(); + let socket = mio::net::TcpSocket::new_v4().unwrap(); + socket.bind(addr).unwrap(); + socket.set_reuseaddr(true).unwrap(); + let tcp = socket.listen(1024).unwrap(); tcp.local_addr().unwrap() } } diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 056979f7..f92363b5 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -1,7 +1,9 @@ -use std::ops::Deref; -use std::sync::Arc; +use std::{ + collections::VecDeque, + ops::Deref, + sync::{Arc, Mutex, MutexGuard}, +}; -use concurrent_queue::{ConcurrentQueue, PopError}; use mio::{Registry, Token as MioToken, Waker}; use crate::worker::WorkerHandle; @@ -11,7 +13,7 @@ pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); /// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` /// the `Poll` would want to look into. -pub(crate) struct WakerQueue(Arc<(Waker, ConcurrentQueue)>); +pub(crate) struct WakerQueue(Arc<(Waker, Mutex>)>); impl Clone for WakerQueue { fn clone(&self) -> Self { @@ -20,7 +22,7 @@ impl Clone for WakerQueue { } impl Deref for WakerQueue { - type Target = (Waker, ConcurrentQueue); + type Target = (Waker, Mutex>); fn deref(&self) -> &Self::Target { self.0.deref() @@ -34,7 +36,7 @@ impl WakerQueue { /// event's token for it to properly handle `WakerInterest`. pub(crate) fn new(registry: &Registry) -> std::io::Result { let waker = Waker::new(registry, WAKER_TOKEN)?; - let queue = ConcurrentQueue::unbounded(); + let queue = Mutex::new(VecDeque::with_capacity(16)); Ok(Self(Arc::new((waker, queue)))) } @@ -44,17 +46,23 @@ impl WakerQueue { let (waker, queue) = self.deref(); queue - .push(interest) - .unwrap_or_else(|e| panic!("WakerQueue closed: {}", e)); + .lock() + .expect("Failed to lock WakerQueue") + .push_back(interest); waker .wake() .unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e)); } - /// pop an `WakerInterest` from the back of the queue. - pub(crate) fn pop(&self) -> Result { - self.deref().1.pop() + /// get a MutexGuard of the waker queue. + pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque> { + self.deref().1.lock().expect("Failed to lock WakerQueue") + } + + /// reset the waker queue so it does not grow infinitely. + pub(crate) fn reset(queue: &mut VecDeque) { + std::mem::swap(&mut VecDeque::::with_capacity(16), queue); } } @@ -74,10 +82,8 @@ pub(crate) enum WakerInterest { /// connection `Accept` would deregister socket listener temporary and wake up the poll and /// register them again after the delayed future resolve. Timer, - /// `WorkerNew` is an interest happen after a worker runs into faulted state(This is determined + /// `Worker` is an interest happen after a worker runs into faulted state(This is determined /// by if work can be sent to it successfully).`Accept` would be waked up and add the new /// `WorkerHandle`. Worker(WorkerHandle), } - -pub(crate) type WakerQueueError = PopError; diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 1f567934..2604df74 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -55,9 +55,9 @@ fn test_listen() { .workers(1) .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) .unwrap() - .start() + .start(); + let _ = tx.send(actix_rt::System::current()); }); - let _ = tx.send(actix_rt::System::current()); let _ = sys.run(); }); let sys = rx.recv().unwrap(); diff --git a/actix-service/src/map_config.rs b/actix-service/src/map_config.rs index d6d6f6b2..1297f7a0 100644 --- a/actix-service/src/map_config.rs +++ b/actix-service/src/map_config.rs @@ -6,7 +6,7 @@ use super::{IntoServiceFactory, ServiceFactory}; /// /// Note that this function consumes the receiving service factory and returns /// a wrapped version of it. -pub fn map_config(factory: I, f: F) -> MapConfig +pub fn map_config(factory: I, f: F) -> MapConfig where I: IntoServiceFactory, SF: ServiceFactory, diff --git a/actix-testing/CHANGES.md b/actix-testing/CHANGES.md deleted file mode 100644 index 86a79ed7..00000000 --- a/actix-testing/CHANGES.md +++ /dev/null @@ -1,33 +0,0 @@ -# Changes - -## [1.0.1] - 2020-05-19 - -* Replace deprecated `net2` crate with `socket2` - -* Remove unused `futures` dependency - -## [1.0.0] - 2019-12-11 - -* Update actix-server to 1.0.0 - -## [1.0.0-alpha.3] - 2019-12-07 - -* Migrate to tokio 0.2 - -## [1.0.0-alpha.2] - 2019-12-02 - -* Re-export `test` attribute macros - - -## [0.3.0-alpha.1] - 2019-11-22 - -* Migrate to std::future - -## [0.2.0] - 2019-10-14 - -* Upgrade actix-server and actix-server-config deps - - -## [0.1.0] - 2019-09-25 - -* Initial impl diff --git a/actix-testing/Cargo.toml b/actix-testing/Cargo.toml deleted file mode 100644 index 957f59c3..00000000 --- a/actix-testing/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -name = "actix-testing" -version = "1.0.1" -authors = ["Nikolay Kim "] -description = "Actix testing utils" -keywords = ["network", "framework", "async", "futures"] -homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-testing/" -categories = ["network-programming", "asynchronous"] -license = "MIT OR Apache-2.0" -edition = "2018" -workspace = ".." -readme = "README.md" - -[lib] -name = "actix_testing" -path = "src/lib.rs" - -[dependencies] -actix-rt = "1.1.1" -actix-macros = "0.1.0" -actix-server = "1.0.0" -actix-service = "1.0.0" - -log = "0.4" -socket2 = "0.3" diff --git a/actix-testing/LICENSE-APACHE b/actix-testing/LICENSE-APACHE deleted file mode 120000 index 965b606f..00000000 --- a/actix-testing/LICENSE-APACHE +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-APACHE \ No newline at end of file diff --git a/actix-testing/LICENSE-MIT b/actix-testing/LICENSE-MIT deleted file mode 120000 index 76219eb7..00000000 --- a/actix-testing/LICENSE-MIT +++ /dev/null @@ -1 +0,0 @@ -../LICENSE-MIT \ No newline at end of file diff --git a/actix-testing/README.md b/actix-testing/README.md deleted file mode 100644 index bd4eec2f..00000000 --- a/actix-testing/README.md +++ /dev/null @@ -1,9 +0,0 @@ -# Actix test utilities [![crates.io](https://meritbadge.herokuapp.com/actix-testing)](https://crates.io/crates/actix-testint) [![Join the chat at https://gitter.im/actix/actix](https://badges.gitter.im/actix/actix.svg)](https://gitter.im/actix/actix?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) - -## Documentation & community resources - -* [User Guide](https://actix.rs/docs/) -* [API Documentation](https://docs.rs/actix-testing/) -* [Chat on gitter](https://gitter.im/actix/actix) -* Cargo package: [actix-http-test](https://crates.io/crates/actix-testing) -* Minimum supported Rust version: 1.37 or later diff --git a/actix-tls/CHANGES.md b/actix-tls/CHANGES.md index b120005b..dcb477d8 100644 --- a/actix-tls/CHANGES.md +++ b/actix-tls/CHANGES.md @@ -1,10 +1,15 @@ # Changes -## Unreleased - 2020-xx-xx -* move from `tokio-tls` to `tokio-native-tls` for native-tls feature. -* Update `tokio-openssl` to `0.6.0` -* Update `tokio-rustls` to `0.22.0` -* Update `rust-tls` to `0.19.0` +## Unreleased - 2021-xx-xx + + +## 3.0.0-beta.1 - 2020-12-29 +* Move acceptors under `accept` module. [#238] +* Merge `actix-connect` crate under `connect` module. [#238] +* Add feature flags to enable acceptors and/or connectors individually. [#238] + +[#238]: https://github.com/actix/actix-net/pull/238 + ## 2.0.0 - 2020-09-03 * `nativetls::NativeTlsAcceptor` is renamed to `nativetls::Acceptor`. diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 09e1ba08..017f17fe 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -1,9 +1,9 @@ [package] name = "actix-tls" -version = "2.0.0" +version = "3.0.0-beta.1" authors = ["Nikolay Kim "] -description = "TLS acceptor services for Actix ecosystem." -keywords = ["network", "framework", "async", "tls", "ssl"] +description = "TLS acceptor and connector services for Actix ecosystem" +keywords = ["network", "tls", "ssl", "async", "transport"] homepage = "https://actix.rs" repository = "https://github.com/actix/actix-net.git" documentation = "https://docs.rs/actix-tls/" @@ -12,7 +12,7 @@ license = "MIT OR Apache-2.0" edition = "2018" [package.metadata.docs.rs] -features = ["openssl", "rustls", "nativetls"] +features = ["openssl", "rustls", "native-tls", "accept", "connect", "uri"] [lib] name = "actix_tls" @@ -20,45 +20,64 @@ path = "src/lib.rs" [[example]] name = "basic" -required-features = ["rustls"] +required-features = ["accept", "rustls"] [features] -default = [] +default = ["accept", "connect", "uri"] -# openssl -openssl = ["open-ssl", "tokio-openssl"] +# enable acceptor services +accept = [] -# rustls -rustls = ["rust-tls", "webpki", "webpki-roots", "tokio-rustls"] +# enable connector services +connect = ["trust-dns-proto/tokio-runtime", "trust-dns-resolver/tokio-runtime", "trust-dns-resolver/system-config"] -# nativetls -nativetls = ["native-tls", "tokio-native-tls"] +# use openssl impls +openssl = ["tls-openssl", "tokio-openssl"] + +# use rustls impls +rustls = ["tls-rustls", "webpki", "webpki-roots", "tokio-rustls"] + +# use native-tls impls +native-tls = ["tls-native-tls", "tokio-native-tls"] + +# support http::Uri as connect address +uri = ["http"] [dependencies] -actix-service = "1.0.0" -actix-codec = "0.3.0" -actix-utils = "2.0.0" +actix-codec = "0.4.0-beta.1" +actix-rt = "2.0.0-beta.1" +actix-service = "2.0.0-beta.1" +actix-utils = "3.0.0-beta.1" +derive_more = "0.99.5" +either = "1.6" futures-util = { version = "0.3.7", default-features = false } +http = { version = "0.2.2", optional = true } +log = "0.4" + +# resolver +trust-dns-proto = { version = "0.20.0", default-features = false, optional = true } +trust-dns-resolver = { version = "0.20.0", default-features = false, optional = true } # openssl -open-ssl = { package = "openssl", version = "0.10", optional = true } -tokio-openssl = { version = "0.6.0", optional = true } +tls-openssl = { package = "openssl", version = "0.10", optional = true } +tokio-openssl = { version = "0.6", optional = true } + +# TODO: Reduce dependencies where tokio wrappers re-export base crate. # rustls -rust-tls = { package = "rustls", version = "0.19.0", optional = true } -tokio-rustls = { version = "0.22.0", optional = true } +tls-rustls = { package = "rustls", version = "0.19", optional = true } +tokio-rustls = { version = "0.22", optional = true } webpki = { version = "0.21", optional = true } -webpki-roots = { version = "0.20", optional = true } +webpki-roots = { version = "0.21", optional = true } # native-tls -native-tls = { version = "0.2", optional = true } -tokio-native-tls = { version = "0.3.0", optional = true } +tls-native-tls = { package = "native-tls", version = "0.2", optional = true } +tokio-native-tls = { version = "0.3", optional = true } [dev-dependencies] +actix-server = "2.0.0-beta.1" bytes = "1" +env_logger = "0.8" +futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } log = "0.4" -env_logger = "0.7" -actix-testing = "1.0.0" -actix-server = "1" -actix-rt = "1" diff --git a/actix-tls/examples/basic.rs b/actix-tls/examples/basic.rs index cd706958..3f4cdb24 100644 --- a/actix-tls/examples/basic.rs +++ b/actix-tls/examples/basic.rs @@ -15,6 +15,10 @@ //! http --verify=false https://127.0.0.1:8443 //! ``` +// this rename only exists because of how we have organised the crate's feature flags +// it is not necessary for your actual code +extern crate tls_rustls as rustls; + use std::{ env, fs::File, @@ -27,10 +31,10 @@ use std::{ use actix_server::Server; use actix_service::pipeline_factory; -use actix_tls::rustls::Acceptor as RustlsAcceptor; +use actix_tls::accept::rustls::Acceptor as RustlsAcceptor; use futures_util::future::ok; use log::info; -use rust_tls::{ +use rustls::{ internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig, }; diff --git a/actix-tls/src/accept/mod.rs b/actix-tls/src/accept/mod.rs new file mode 100644 index 00000000..8b1fe47c --- /dev/null +++ b/actix-tls/src/accept/mod.rs @@ -0,0 +1,42 @@ +//! TLS acceptor services for Actix ecosystem. +//! +//! ## Crate Features +//! * `openssl` - TLS acceptor using the `openssl` crate. +//! * `rustls` - TLS acceptor using the `rustls` crate. +//! * `native-tls` - TLS acceptor using the `native-tls` crate. + +use std::sync::atomic::{AtomicUsize, Ordering}; + +use actix_utils::counter::Counter; + +#[cfg(feature = "openssl")] +pub mod openssl; + +#[cfg(feature = "rustls")] +pub mod rustls; + +#[cfg(feature = "native-tls")] +pub mod nativetls; + +pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256); + +thread_local! { + static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed)); +} + +/// Sets the maximum per-worker concurrent TLS connection limit. +/// +/// All listeners will stop accepting connections when this limit is reached. +/// It can be used to regulate the global TLS CPU usage. +/// +/// By default, the connection limit is 256. +pub fn max_concurrent_tls_connect(num: usize) { + MAX_CONN.store(num, Ordering::Relaxed); +} + +/// TLS error combined with service error. +#[derive(Debug)] +pub enum TlsError { + Tls(E1), + Service(E2), +} diff --git a/actix-tls/src/nativetls.rs b/actix-tls/src/accept/nativetls.rs similarity index 71% rename from actix-tls/src/nativetls.rs rename to actix-tls/src/accept/nativetls.rs index 0710d22d..5d80ce8b 100644 --- a/actix-tls/src/nativetls.rs +++ b/actix-tls/src/accept/nativetls.rs @@ -1,20 +1,18 @@ -use std::future::Future; -use std::pin::Pin; use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; use actix_utils::counter::Counter; -use futures_util::future::{ready, Ready}; +use futures_util::future::{ready, LocalBoxFuture, Ready}; pub use native_tls::Error; pub use tokio_native_tls::{TlsAcceptor, TlsStream}; -use crate::MAX_CONN_COUNTER; +use super::MAX_CONN_COUNTER; /// Accept TLS connections via `native-tls` package. /// -/// `nativetls` feature enables this `Acceptor` type. +/// `native-tls` feature enables this `Acceptor` type. pub struct Acceptor { acceptor: TlsAcceptor, } @@ -36,11 +34,11 @@ impl Clone for Acceptor { } } -impl ServiceFactory for Acceptor +impl ServiceFactory for Acceptor where - Req: AsyncRead + AsyncWrite + Unpin + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, { - type Response = TlsStream; + type Response = TlsStream; type Error = Error; type Config = (); @@ -72,15 +70,13 @@ impl Clone for NativeTlsAcceptorService { } } -type LocalBoxFuture<'a, T> = Pin + 'a>>; - -impl Service for NativeTlsAcceptorService +impl Service for NativeTlsAcceptorService where - Req: AsyncRead + AsyncWrite + Unpin + 'static, + T: AsyncRead + AsyncWrite + Unpin + 'static, { - type Response = TlsStream; + type Response = TlsStream; type Error = Error; - type Future = LocalBoxFuture<'static, Result, Error>>; + type Future = LocalBoxFuture<'static, Result, Error>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.conns.available(cx) { @@ -90,13 +86,13 @@ where } } - fn call(&mut self, req: Req) -> Self::Future { + fn call(&mut self, io: T) -> Self::Future { let guard = self.conns.get(); let this = self.clone(); Box::pin(async move { - let res = this.acceptor.accept(req).await; + let io = this.acceptor.accept(io).await; drop(guard); - res + io }) } } diff --git a/actix-tls/src/accept/openssl.rs b/actix-tls/src/accept/openssl.rs new file mode 100644 index 00000000..efda5c38 --- /dev/null +++ b/actix-tls/src/accept/openssl.rs @@ -0,0 +1,111 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use actix_codec::{AsyncRead, AsyncWrite}; +use actix_service::{Service, ServiceFactory}; +use actix_utils::counter::{Counter, CounterGuard}; +use futures_util::{ + future::{ready, Ready}, + ready, +}; + +pub use openssl::ssl::{ + AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder, +}; +pub use tokio_openssl::SslStream; + +use super::MAX_CONN_COUNTER; + +/// Accept TLS connections via `openssl` package. +/// +/// `openssl` feature enables this `Acceptor` type. +pub struct Acceptor { + acceptor: SslAcceptor, +} + +impl Acceptor { + /// Create OpenSSL based `Acceptor` service factory. + #[inline] + pub fn new(acceptor: SslAcceptor) -> Self { + Acceptor { acceptor } + } +} + +impl Clone for Acceptor { + #[inline] + fn clone(&self) -> Self { + Self { + acceptor: self.acceptor.clone(), + } + } +} + +impl ServiceFactory for Acceptor +where + T: AsyncRead + AsyncWrite + Unpin + 'static, +{ + type Response = SslStream; + type Error = SslError; + type Config = (); + type Service = AcceptorService; + type InitError = (); + type Future = Ready>; + + fn new_service(&self, _: ()) -> Self::Future { + MAX_CONN_COUNTER.with(|conns| { + ready(Ok(AcceptorService { + acceptor: self.acceptor.clone(), + conns: conns.clone(), + })) + }) + } +} + +pub struct AcceptorService { + acceptor: SslAcceptor, + conns: Counter, +} + +impl Service for AcceptorService +where + T: AsyncRead + AsyncWrite + Unpin + 'static, +{ + type Response = SslStream; + type Error = SslError; + type Future = AcceptorServiceResponse; + + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + if self.conns.available(ctx) { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + } + + fn call(&mut self, io: T) -> Self::Future { + let ssl_ctx = self.acceptor.context(); + let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid."); + AcceptorServiceResponse { + _guard: self.conns.get(), + stream: Some(SslStream::new(ssl, io).unwrap()), + } + } +} + +pub struct AcceptorServiceResponse +where + T: AsyncRead + AsyncWrite, +{ + stream: Option>, + _guard: CounterGuard, +} + +impl Future for AcceptorServiceResponse { + type Output = Result, SslError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?; + Poll::Ready(Ok(self.stream.take().expect("SSL connect has resolved."))) + } +} diff --git a/actix-tls/src/rustls.rs b/actix-tls/src/accept/rustls.rs similarity index 76% rename from actix-tls/src/rustls.rs rename to actix-tls/src/accept/rustls.rs index ca68dc6b..a6686f44 100644 --- a/actix-tls/src/rustls.rs +++ b/actix-tls/src/accept/rustls.rs @@ -10,11 +10,11 @@ use actix_utils::counter::{Counter, CounterGuard}; use futures_util::future::{ready, Ready}; use tokio_rustls::{Accept, TlsAcceptor}; -pub use rust_tls::{ServerConfig, Session}; +pub use rustls::{ServerConfig, Session}; pub use tokio_rustls::server::TlsStream; pub use webpki_roots::TLS_SERVER_ROOTS; -use crate::MAX_CONN_COUNTER; +use super::MAX_CONN_COUNTER; /// Accept TLS connections via `rustls` package. /// @@ -42,8 +42,11 @@ impl Clone for Acceptor { } } -impl ServiceFactory for Acceptor { - type Response = TlsStream; +impl ServiceFactory for Acceptor +where + T: AsyncRead + AsyncWrite + Unpin, +{ + type Response = TlsStream; type Error = io::Error; type Config = (); @@ -67,10 +70,13 @@ pub struct AcceptorService { conns: Counter, } -impl Service for AcceptorService { - type Response = TlsStream; +impl Service for AcceptorService +where + T: AsyncRead + AsyncWrite + Unpin, +{ + type Response = TlsStream; type Error = io::Error; - type Future = AcceptorServiceFut; + type Future = AcceptorServiceFut; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { if self.conns.available(cx) { @@ -80,7 +86,7 @@ impl Service for AcceptorService { } } - fn call(&mut self, req: Req) -> Self::Future { + fn call(&mut self, req: T) -> Self::Future { AcceptorServiceFut { _guard: self.conns.get(), fut: self.acceptor.accept(req), @@ -88,16 +94,19 @@ impl Service for AcceptorService { } } -pub struct AcceptorServiceFut +pub struct AcceptorServiceFut where - Req: AsyncRead + AsyncWrite + Unpin, + T: AsyncRead + AsyncWrite + Unpin, { - fut: Accept, + fut: Accept, _guard: CounterGuard, } -impl Future for AcceptorServiceFut { - type Output = Result, io::Error>; +impl Future for AcceptorServiceFut +where + T: AsyncRead + AsyncWrite + Unpin, +{ + type Output = Result, io::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); diff --git a/actix-connect/src/connect.rs b/actix-tls/src/connect/connect.rs similarity index 100% rename from actix-connect/src/connect.rs rename to actix-tls/src/connect/connect.rs diff --git a/actix-connect/src/connector.rs b/actix-tls/src/connect/connector.rs similarity index 91% rename from actix-connect/src/connector.rs rename to actix-tls/src/connect/connector.rs index c5a3450e..a0a6b8b5 100644 --- a/actix-connect/src/connector.rs +++ b/actix-tls/src/connect/connector.rs @@ -9,6 +9,7 @@ use std::task::{Context, Poll}; use actix_rt::net::TcpStream; use actix_service::{Service, ServiceFactory}; use futures_util::future::{ready, Ready}; +use log::{error, trace}; use super::connect::{Address, Connect, Connection}; use super::error::ConnectError; @@ -151,18 +152,18 @@ impl Future for TcpConnectorResponse { Poll::Ready(Ok(sock)) => { let req = req.take().unwrap(); trace!( - "TCP connector - successfully connected to connecting to {:?} - {:?}", - req.host(), sock.peer_addr() - ); + "TCP connector - successfully connected to connecting to {:?} - {:?}", + req.host(), sock.peer_addr() + ); return Poll::Ready(Ok(Connection::new(sock, req))); } Poll::Pending => return Poll::Pending, Poll::Ready(Err(err)) => { trace!( - "TCP connector - failed to connect to connecting to {:?} port: {}", - req.as_ref().unwrap().host(), - port, - ); + "TCP connector - failed to connect to connecting to {:?} port: {}", + req.as_ref().unwrap().host(), + port, + ); if addrs.is_none() || addrs.as_ref().unwrap().is_empty() { return Poll::Ready(Err(err.into())); } diff --git a/actix-connect/src/error.rs b/actix-tls/src/connect/error.rs similarity index 100% rename from actix-connect/src/error.rs rename to actix-tls/src/connect/error.rs diff --git a/actix-connect/src/lib.rs b/actix-tls/src/connect/mod.rs similarity index 93% rename from actix-connect/src/lib.rs rename to actix-tls/src/connect/mod.rs index 5794498a..75312c59 100644 --- a/actix-connect/src/lib.rs +++ b/actix-tls/src/connect/mod.rs @@ -5,21 +5,12 @@ //! * `openssl` - enables TLS support via `openssl` crate //! * `rustls` - enables TLS support via `rustls` crate -#![deny(rust_2018_idioms, nonstandard_style)] -#![recursion_limit = "128"] -#![doc(html_logo_url = "https://actix.rs/img/logo.png")] -#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] - -#[macro_use] -extern crate log; - mod connect; mod connector; mod error; mod resolve; mod service; pub mod ssl; - #[cfg(feature = "uri")] mod uri; diff --git a/actix-connect/src/resolve.rs b/actix-tls/src/connect/resolve.rs similarity index 82% rename from actix-connect/src/resolve.rs rename to actix-tls/src/connect/resolve.rs index 16b3b37d..61535faa 100644 --- a/actix-connect/src/resolve.rs +++ b/actix-tls/src/connect/resolve.rs @@ -5,13 +5,14 @@ use std::pin::Pin; use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ready, Ready}; +use futures_util::future::{ok, Either, Ready}; +use log::trace; use trust_dns_resolver::TokioAsyncResolver as AsyncResolver; use trust_dns_resolver::{error::ResolveError, lookup_ip::LookupIp}; -use crate::connect::{Address, Connect}; -use crate::error::ConnectError; -use crate::get_default_resolver; +use super::connect::{Address, Connect}; +use super::error::ConnectError; +use super::get_default_resolver; /// DNS Resolver Service factory pub struct ResolverFactory { @@ -63,7 +64,7 @@ impl ServiceFactory> for ResolverFactory { type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { - ready(Ok(self.service())) + ok(self.service()) } } @@ -104,19 +105,23 @@ impl Clone for Resolver { impl Service> for Resolver { type Response = Connect; type Error = ConnectError; - type Future = ResolverServiceFuture; + #[allow(clippy::type_complexity)] + type Future = Either< + Pin>>>, + Ready, Self::Error>>, + >; actix_service::always_ready!(); fn call(&mut self, mut req: Connect) -> Self::Future { if req.addr.is_some() { - ResolverServiceFuture::NoLookUp(Some(req)) + Either::Right(ok(req)) } else if let Ok(ip) = req.host().parse() { req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port()))); - ResolverServiceFuture::NoLookUp(Some(req)) + Either::Right(ok(req)) } else { let resolver = self.resolver.as_ref().map(AsyncResolver::clone); - ResolverServiceFuture::LookUp(Box::pin(async move { + Either::Left(Box::pin(async move { trace!("DNS resolver: resolving host {:?}", req.host()); let resolver = if let Some(resolver) = resolver { resolver @@ -131,30 +136,13 @@ impl Service> for Resolver { } } -type LocalBoxFuture<'a, T> = Pin + 'a>>; - -#[doc(hidden)] -pub enum ResolverServiceFuture { - NoLookUp(Option>), - LookUp(LocalBoxFuture<'static, Result, ConnectError>>), -} - -impl Future for ResolverServiceFuture { - type Output = Result, ConnectError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.get_mut() { - Self::NoLookUp(conn) => Poll::Ready(Ok(conn.take().unwrap())), - Self::LookUp(fut) => fut.as_mut().poll(cx), - } - } -} +type LookupIpFuture = Pin>>>; #[doc(hidden)] /// Resolver future pub struct ResolverFuture { req: Option>, - lookup: LocalBoxFuture<'static, Result>, + lookup: LookupIpFuture, } impl ResolverFuture { diff --git a/actix-connect/src/service.rs b/actix-tls/src/connect/service.rs similarity index 96% rename from actix-connect/src/service.rs rename to actix-tls/src/connect/service.rs index a7960da9..59fe20cc 100644 --- a/actix-connect/src/service.rs +++ b/actix-tls/src/connect/service.rs @@ -5,13 +5,13 @@ use std::task::{Context, Poll}; use actix_rt::net::TcpStream; use actix_service::{Service, ServiceFactory}; use either::Either; -use futures_util::future::{ready, Ready}; +use futures_util::future::{ok, Ready}; use trust_dns_resolver::TokioAsyncResolver as AsyncResolver; -use crate::connect::{Address, Connect, Connection}; -use crate::connector::{TcpConnector, TcpConnectorFactory}; -use crate::error::ConnectError; -use crate::resolve::{Resolver, ResolverFactory}; +use super::connect::{Address, Connect, Connection}; +use super::connector::{TcpConnector, TcpConnectorFactory}; +use super::error::ConnectError; +use super::resolve::{Resolver, ResolverFactory}; pub struct ConnectServiceFactory { tcp: TcpConnectorFactory, @@ -79,7 +79,7 @@ impl ServiceFactory> for ConnectServiceFactory { type Future = Ready>; fn new_service(&self, _: ()) -> Self::Future { - ready(Ok(self.service())) + ok(self.service()) } } diff --git a/actix-connect/src/ssl/mod.rs b/actix-tls/src/connect/ssl/mod.rs similarity index 100% rename from actix-connect/src/ssl/mod.rs rename to actix-tls/src/connect/ssl/mod.rs diff --git a/actix-connect/src/ssl/openssl.rs b/actix-tls/src/connect/ssl/openssl.rs similarity index 72% rename from actix-connect/src/ssl/openssl.rs rename to actix-tls/src/connect/ssl/openssl.rs index 64ebde09..5193ce37 100644 --- a/actix-connect/src/ssl/openssl.rs +++ b/actix-tls/src/connect/ssl/openssl.rs @@ -1,19 +1,22 @@ use std::future::Future; +use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, io}; -pub use open_ssl::ssl::{Error as SslError, SslConnector, SslMethod}; -pub use tokio_openssl::SslStream; - use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::net::TcpStream; use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ready, Ready}; -use futures_util::ready; +use futures_util::{ + future::{ready, Either, Ready}, + ready, +}; +use log::trace; +pub use openssl::ssl::{Error as SslError, HandshakeError, SslConnector, SslMethod}; +pub use tokio_openssl::SslStream; use trust_dns_resolver::TokioAsyncResolver as AsyncResolver; -use crate::{ +use crate::connect::{ Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection, }; @@ -80,77 +83,59 @@ where { type Response = Connection>; type Error = io::Error; - type Future = OpensslConnectorServiceFuture; + #[allow(clippy::type_complexity)] + type Future = Either, Ready>>; actix_service::always_ready!(); fn call(&mut self, stream: Connection) -> Self::Future { - match self.ssl_stream(stream) { - Ok(acc) => OpensslConnectorServiceFuture::Accept(Some(acc)), - Err(e) => OpensslConnectorServiceFuture::Error(Some(e)), + trace!("SSL Handshake start for: {:?}", stream.host()); + let (io, stream) = stream.replace(()); + let host = stream.host().to_string(); + + match self.connector.configure() { + Err(e) => Either::Right(ready(Err(io::Error::new(io::ErrorKind::Other, e)))), + Ok(config) => { + let ssl = config + .into_ssl(&host) + .expect("SSL connect configuration was invalid."); + + Either::Left(ConnectAsyncExt { + io: Some(SslStream::new(ssl, io).unwrap()), + stream: Some(stream), + _t: PhantomData, + }) + } } } } -impl OpensslConnectorService { - // construct SslStream with connector. - // At this point SslStream does not perform any I/O. - // handshake would happen later in OpensslConnectorServiceFuture - fn ssl_stream( - &self, - stream: Connection, - ) -> Result<(SslStream, Connection), SslError> - where - T: Address + 'static, - U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, - { - trace!("SSL Handshake start for: {:?}", stream.host()); - let (stream, connection) = stream.replace(()); - let host = connection.host().to_string(); - - let config = self.connector.configure()?; - let ssl = config.into_ssl(host.as_str())?; - let stream = tokio_openssl::SslStream::new(ssl, stream)?; - Ok((stream, connection)) - } +pub struct ConnectAsyncExt { + io: Option>, + stream: Option>, + _t: PhantomData, } -#[doc(hidden)] -pub enum OpensslConnectorServiceFuture +impl Future for ConnectAsyncExt where - T: Address + 'static, - U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, -{ - Accept(Option<(SslStream, Connection)>), - Error(Option), -} - -impl Future for OpensslConnectorServiceFuture -where - T: Address, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, { type Output = Result>, io::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let e = match self.get_mut() { - Self::Error(e) => e.take().unwrap(), - Self::Accept(acc) => { - let (stream, _) = acc.as_mut().unwrap(); - match ready!(Pin::new(stream).poll_connect(cx)) { - Ok(()) => { - let (stream, connection) = acc.take().unwrap(); - trace!("SSL Handshake success: {:?}", connection.host()); - let (_, connection) = connection.replace(stream); - return Poll::Ready(Ok(connection)); - } - Err(e) => e, - } - } - }; + let this = self.get_mut(); - trace!("SSL Handshake error: {:?}", e); - Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)))) + match ready!(Pin::new(this.io.as_mut().unwrap()).poll_connect(cx)) { + Ok(_) => { + let stream = this.stream.take().unwrap(); + trace!("SSL Handshake success: {:?}", stream.host()); + Poll::Ready(Ok(stream.replace(this.io.take().unwrap()).1)) + } + Err(e) => { + trace!("SSL Handshake error: {:?}", e); + Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)))) + } + } } } diff --git a/actix-connect/src/ssl/rustls.rs b/actix-tls/src/connect/ssl/rustls.rs similarity index 93% rename from actix-connect/src/ssl/rustls.rs rename to actix-tls/src/connect/ssl/rustls.rs index d45b84ef..390ba413 100644 --- a/actix-connect/src/ssl/rustls.rs +++ b/actix-tls/src/connect/ssl/rustls.rs @@ -4,17 +4,20 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -pub use rust_tls::Session; +pub use rustls::Session; pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ready, Ready}; -use futures_util::ready; +use futures_util::{ + future::{ready, Ready}, + ready, +}; +use log::trace; use tokio_rustls::{Connect, TlsConnector}; use webpki::DNSNameRef; -use crate::{Address, Connection}; +use crate::connect::{Address, Connection}; /// Rustls connector factory pub struct RustlsConnector { @@ -99,8 +102,9 @@ pub struct ConnectAsyncExt { stream: Option>, } -impl Future for ConnectAsyncExt +impl Future for ConnectAsyncExt where + T: Address, U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { type Output = Result>, std::io::Error>; diff --git a/actix-connect/src/uri.rs b/actix-tls/src/connect/uri.rs similarity index 97% rename from actix-connect/src/uri.rs rename to actix-tls/src/connect/uri.rs index 5f5f15de..b208a8b3 100644 --- a/actix-connect/src/uri.rs +++ b/actix-tls/src/connect/uri.rs @@ -1,6 +1,6 @@ use http::Uri; -use crate::Address; +use super::Address; impl Address for Uri { fn host(&self) -> &str { diff --git a/actix-tls/src/lib.rs b/actix-tls/src/lib.rs index 8cc18046..1fa08b6b 100644 --- a/actix-tls/src/lib.rs +++ b/actix-tls/src/lib.rs @@ -1,46 +1,17 @@ -//! TLS acceptor services for Actix ecosystem. -//! -//! ## Crate Features -//! * `openssl` - TLS acceptor using the `openssl` crate. -//! * `rustls` - TLS acceptor using the `rustls` crate. -//! * `nativetls` - TLS acceptor using the `native-tls` crate. +//! TLS acceptor and connector services for Actix ecosystem #![deny(rust_2018_idioms, nonstandard_style)] #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] -use std::sync::atomic::{AtomicUsize, Ordering}; - -use actix_utils::counter::Counter; - +#[cfg(feature = "native-tls")] +extern crate tls_native_tls as native_tls; #[cfg(feature = "openssl")] -pub mod openssl; - +extern crate tls_openssl as openssl; #[cfg(feature = "rustls")] -pub mod rustls; +extern crate tls_rustls as rustls; -#[cfg(feature = "nativetls")] -pub mod nativetls; - -pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256); - -thread_local! { - static MAX_CONN_COUNTER: Counter = Counter::new(MAX_CONN.load(Ordering::Relaxed)); -} - -/// Sets the maximum per-worker concurrent TLS connection limit. -/// -/// All listeners will stop accepting connections when this limit is reached. -/// It can be used to regulate the global TLS CPU usage. -/// -/// By default, the connection limit is 256. -pub fn max_concurrent_tls_connect(num: usize) { - MAX_CONN.store(num, Ordering::Relaxed); -} - -/// TLS error combined with service error. -#[derive(Debug)] -pub enum TlsError { - Tls(E1), - Service(E2), -} +#[cfg(feature = "accept")] +pub mod accept; +#[cfg(feature = "connect")] +pub mod connect; diff --git a/actix-tls/src/openssl.rs b/actix-tls/src/openssl.rs deleted file mode 100644 index 31072c96..00000000 --- a/actix-tls/src/openssl.rs +++ /dev/null @@ -1,126 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_codec::{AsyncRead, AsyncWrite}; -use actix_service::{Service, ServiceFactory}; -use actix_utils::counter::{Counter, CounterGuard}; -use futures_util::future::{ready, Ready}; -use futures_util::ready; - -pub use open_ssl::ssl::{AlpnError, Error, Ssl, SslAcceptor, SslAcceptorBuilder}; -pub use tokio_openssl::SslStream; - -use crate::MAX_CONN_COUNTER; - -/// Accept TLS connections via `openssl` package. -/// -/// `openssl` feature enables this `Acceptor` type. -pub struct Acceptor { - acceptor: SslAcceptor, -} - -impl Acceptor { - /// Create OpenSSL based `Acceptor` service factory. - #[inline] - pub fn new(acceptor: SslAcceptor) -> Self { - Acceptor { acceptor } - } -} - -impl Clone for Acceptor { - #[inline] - fn clone(&self) -> Self { - Self { - acceptor: self.acceptor.clone(), - } - } -} - -impl ServiceFactory for Acceptor { - type Response = SslStream; - type Error = Error; - type Config = (); - type Service = AcceptorService; - type InitError = (); - type Future = Ready>; - - fn new_service(&self, _: ()) -> Self::Future { - MAX_CONN_COUNTER.with(|conns| { - ready(Ok(AcceptorService { - acceptor: self.acceptor.clone(), - conns: conns.clone(), - })) - }) - } -} - -pub struct AcceptorService { - acceptor: SslAcceptor, - conns: Counter, -} - -impl Service for AcceptorService { - type Response = SslStream; - type Error = Error; - type Future = AcceptorServiceResponse; - - fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { - if self.conns.available(ctx) { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } - - fn call(&mut self, req: Req) -> Self::Future { - match self.ssl_stream(req) { - Ok(stream) => { - let guard = self.conns.get(); - AcceptorServiceResponse::Accept(Some(stream), Some(guard)) - } - Err(e) => AcceptorServiceResponse::Error(Some(e)), - } - } -} - -impl AcceptorService { - // construct a new SslStream. - // At this point the SslStream does not perform any IO. - // The handshake would happen later in AcceptorServiceResponse - fn ssl_stream( - &self, - stream: Req, - ) -> Result, Error> { - let ssl = Ssl::new(self.acceptor.context())?; - let stream = SslStream::new(ssl, stream)?; - Ok(stream) - } -} - -pub enum AcceptorServiceResponse -where - Req: AsyncRead + AsyncWrite, -{ - Accept(Option>, Option), - Error(Option), -} - -impl Future for AcceptorServiceResponse -where - Req: AsyncRead + AsyncWrite + Unpin, -{ - type Output = Result, Error>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.get_mut() { - AcceptorServiceResponse::Error(e) => Poll::Ready(Err(e.take().unwrap())), - AcceptorServiceResponse::Accept(stream, guard) => { - ready!(Pin::new(stream.as_mut().unwrap()).poll_accept(cx))?; - // drop counter guard a little early as the accept has finished - guard.take(); - Poll::Ready(Ok(stream.take().unwrap())) - } - } - } -} diff --git a/actix-connect/tests/test_connect.rs b/actix-tls/tests/test_connect.rs similarity index 95% rename from actix-connect/tests/test_connect.rs rename to actix-tls/tests/test_connect.rs index 21d78d2c..aa773c7f 100644 --- a/actix-connect/tests/test_connect.rs +++ b/actix-tls/tests/test_connect.rs @@ -2,15 +2,18 @@ use std::io; use actix_codec::{BytesCodec, Framed}; use actix_rt::net::TcpStream; +use actix_server::TestServer; use actix_service::{fn_service, Service, ServiceFactory}; -use actix_testing::TestServer; use bytes::Bytes; use futures_util::sink::SinkExt; -use actix_connect::resolver::{ResolverConfig, ResolverOpts}; -use actix_connect::Connect; +use actix_tls::connect::{ + self as actix_connect, + resolver::{ResolverConfig, ResolverOpts}, + Connect, +}; -#[cfg(feature = "openssl")] +#[cfg(all(feature = "connect", feature = "openssl"))] #[actix_rt::test] async fn test_string() { let srv = TestServer::with(|| { diff --git a/actix-tracing/Cargo.toml b/actix-tracing/Cargo.toml index f1de6286..720e7ad5 100644 --- a/actix-tracing/Cargo.toml +++ b/actix-tracing/Cargo.toml @@ -16,11 +16,12 @@ name = "actix_tracing" path = "src/lib.rs" [dependencies] -actix-service = "1.0.4" -futures-util = { version = "0.3.7", default-features = false } +actix-service = "2.0.0-beta.1" + +futures-util = { version = "0.3.4", default-features = false } tracing = "0.1" tracing-futures = "0.2" [dev_dependencies] actix-rt = "1.0" -slab = "0.4" \ No newline at end of file +slab = "0.4"