diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..2bb9a234 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,131 @@ +name: CI + +on: + pull_request: + types: [opened, synchronize, reopened] + push: + branches: [master] + +jobs: + build_and_test: + strategy: + fail-fast: false + matrix: + target: + - { name: Linux, os: ubuntu-latest, triple: x86_64-unknown-linux-gnu } + - { name: macOS, os: macos-latest, triple: x86_64-apple-darwin } + - { name: Windows, os: windows-latest, triple: x86_64-pc-windows-msvc } + - { name: Windows (MinGW), os: windows-latest, triple: x86_64-pc-windows-gnu } + - { name: Windows (32-bit), os: windows-latest, triple: i686-pc-windows-msvc } + version: + - 1.46.0 # MSRV + - stable + - nightly + + name: ${{ matrix.target.name }} / ${{ matrix.version }} + runs-on: ${{ matrix.target.os }} + + env: + VCPKGRS_DYNAMIC: 1 + + steps: + - name: Setup Routing + if: matrix.target.os == 'macos-latest' + run: sudo ifconfig lo0 alias 127.0.0.3 + + - uses: actions/checkout@v2 + + # install OpenSSL on Windows + - name: Set vcpkg root + if: matrix.target.triple == 'x86_64-pc-windows-msvc' || matrix.target.triple == 'i686-pc-windows-msvc' + run: echo "VCPKG_ROOT=$env:VCPKG_INSTALLATION_ROOT" | Out-File -FilePath $env:GITHUB_ENV -Append + - name: Install OpenSSL + if: matrix.target.triple == 'x86_64-pc-windows-msvc' + run: vcpkg install openssl:x64-windows + - name: Install OpenSSL + if: matrix.target.triple == 'i686-pc-windows-msvc' + run: vcpkg install openssl:x86-windows + + - name: Install ${{ matrix.version }} + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.version }}-${{ matrix.target.triple }} + profile: minimal + override: true + + # - name: Install MSYS2 + # if: matrix.target.triple == 'x86_64-pc-windows-gnu' + # uses: msys2/setup-msys2@v2 + # - name: Install MinGW Packages + # if: matrix.target.triple == 'x86_64-pc-windows-gnu' + # run: | + # msys2 -c 'pacman -Sy --noconfirm pacman' + # msys2 -c 'pacman --noconfirm -S base-devel pkg-config' + + # - name: Generate Cargo.lock + # uses: actions-rs/cargo@v1 + # with: + # command: generate-lockfile + # - name: Cache Dependencies + # uses: Swatinem/rust-cache@v1.2.0 + + - name: Install cargo-hack + uses: actions-rs/cargo@v1 + with: + command: install + args: cargo-hack + + - name: check minimal + uses: actions-rs/cargo@v1 + with: + command: hack + args: check --workspace --no-default-features + + - name: check minimal + tests + uses: actions-rs/cargo@v1 + with: + command: hack + args: check --workspace --no-default-features --tests --examples + + - name: check default + uses: actions-rs/cargo@v1 + with: + command: check + args: --workspace --tests --examples + + - name: check full + # TODO: compile OpenSSL and run tests on MinGW + if: matrix.target.triple != 'x86_64-pc-windows-gnu' + uses: actions-rs/cargo@v1 + with: + command: check + args: --workspace --all-features --tests --examples + + - name: tests + if: matrix.target.triple != 'x86_64-pc-windows-gnu' + uses: actions-rs/cargo@v1 + with: + command: test + args: --workspace --all-features --no-fail-fast -- --nocapture + + - name: Generate coverage file + if: > + matrix.target.os == 'ubuntu-latest' + && matrix.version == 'stable' + && github.ref == 'refs/heads/master' + run: | + cargo install cargo-tarpaulin + cargo tarpaulin --out Xml --verbose + - name: Upload to Codecov + if: > + matrix.target.os == 'ubuntu-latest' + && matrix.version == 'stable' + && github.ref == 'refs/heads/master' + uses: codecov/codecov-action@v1 + with: + file: cobertura.xml + + - name: Clear the cargo caches + run: | + cargo install cargo-cache --no-default-features --features ci-autoclean + cargo-cache diff --git a/.github/workflows/clippy-fmt.yml b/.github/workflows/clippy-fmt.yml index 12343dd4..3bef81db 100644 --- a/.github/workflows/clippy-fmt.yml +++ b/.github/workflows/clippy-fmt.yml @@ -1,34 +1,42 @@ +name: Lint + on: pull_request: types: [opened, synchronize, reopened] -name: Clippy and rustfmt Check jobs: - clippy_check: + fmt: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: actions-rs/toolchain@v1 + - name: Install Rust + uses: actions-rs/toolchain@v1 with: toolchain: stable - components: rustfmt profile: minimal + components: rustfmt override: true - - name: Check with rustfmt + - name: Rustfmt Check uses: actions-rs/cargo@v1 with: command: fmt args: --all -- --check - - uses: actions-rs/toolchain@v1 + clippy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 with: - toolchain: nightly - components: clippy + toolchain: stable profile: minimal + components: clippy override: true - - name: Check with Clippy + - name: Clippy Check uses: actions-rs/clippy-check@v1 with: token: ${{ secrets.GITHUB_TOKEN }} - args: --workspace --tests + args: --workspace --tests --all-features diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml deleted file mode 100644 index 8ea7823d..00000000 --- a/.github/workflows/linux.yml +++ /dev/null @@ -1,82 +0,0 @@ -name: CI (Linux) - -on: - pull_request: - types: [opened, synchronize, reopened] - push: - branches: - - master - - '1.0' - -jobs: - build_and_test: - strategy: - fail-fast: false - matrix: - version: - - 1.46.0 - - stable - - nightly - - name: ${{ matrix.version }} - x86_64-unknown-linux-gnu - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - - name: Install ${{ matrix.version }} - uses: actions-rs/toolchain@v1 - with: - toolchain: ${{ matrix.version }}-x86_64-unknown-linux-gnu - profile: minimal - override: true - - - name: Generate Cargo.lock - uses: actions-rs/cargo@v1 - with: - command: generate-lockfile - - name: Cache cargo dirs - uses: actions/cache@v2 - with: - path: - ~/.cargo/registry - ~/.cargo/git - ~/.cargo/bin - key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-trimmed-${{ hashFiles('**/Cargo.lock') }} - - name: Cache cargo build - uses: actions/cache@v2 - with: - path: target - key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }} - - - name: check build - uses: actions-rs/cargo@v1 - with: - command: check - args: --workspace --bins --examples --tests - - - name: tests - uses: actions-rs/cargo@v1 - timeout-minutes: 40 - with: - command: test - args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture - - - name: Generate coverage file - if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') - run: | - cargo install cargo-tarpaulin - cargo tarpaulin --out Xml --workspace - - - name: Upload to Codecov - if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request') - uses: codecov/codecov-action@v1 - with: - file: cobertura.xml - - - name: Clear the cargo caches - run: | - rustup update stable - rustup override set stable - cargo install cargo-cache --no-default-features --features ci-autoclean - cargo-cache diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml deleted file mode 100644 index b2555bd3..00000000 --- a/.github/workflows/macos.yml +++ /dev/null @@ -1,43 +0,0 @@ -name: CI (macOS) - -on: - pull_request: - types: [opened, synchronize, reopened] - push: - branches: - - master - - '1.0' - -jobs: - build_and_test: - strategy: - fail-fast: false - matrix: - version: - - stable - - nightly - - name: ${{ matrix.version }} - x86_64-apple-darwin - runs-on: macos-latest - - steps: - - uses: actions/checkout@v2 - - - name: Install ${{ matrix.version }} - uses: actions-rs/toolchain@v1 - with: - toolchain: ${{ matrix.version }}-x86_64-apple-darwin - profile: minimal - override: true - - - name: check build - uses: actions-rs/cargo@v1 - with: - command: check - args: --workspace --bins --examples --tests - - - name: tests - uses: actions-rs/cargo@v1 - with: - command: test - args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture diff --git a/.github/workflows/upload-doc.yml b/.github/workflows/upload-doc.yml new file mode 100644 index 00000000..36044230 --- /dev/null +++ b/.github/workflows/upload-doc.yml @@ -0,0 +1,35 @@ +name: Upload documentation + +on: + push: + branches: [master] + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + toolchain: nightly-x86_64-unknown-linux-gnu + profile: minimal + override: true + + - name: Build Docs + uses: actions-rs/cargo@v1 + with: + command: doc + args: --workspace --all-features --no-deps + + - name: Tweak HTML + run: echo '' > target/doc/index.html + + - name: Deploy to GitHub Pages + uses: JamesIves/github-pages-deploy-action@3.7.1 + with: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + BRANCH: gh-pages + FOLDER: target/doc diff --git a/.github/workflows/windows-mingw.yml b/.github/workflows/windows-mingw.yml deleted file mode 100644 index 1fd5fc59..00000000 --- a/.github/workflows/windows-mingw.yml +++ /dev/null @@ -1,45 +0,0 @@ -name: CI (Windows-mingw) - -on: - pull_request: - types: [opened, synchronize, reopened] - push: - branches: - - master - - '1.0' - -jobs: - build_and_test: - strategy: - fail-fast: false - matrix: - version: - - stable - - nightly - - name: ${{ matrix.version }} - x86_64-pc-windows-gnu - runs-on: windows-latest - - steps: - - uses: actions/checkout@v2 - - - name: Install ${{ matrix.version }} - uses: actions-rs/toolchain@v1 - with: - toolchain: ${{ matrix.version }}-x86_64-pc-windows-gnu - profile: minimal - override: true - - - name: Install MSYS2 - uses: msys2/setup-msys2@v2 - - - name: Install packages - run: | - msys2 -c 'pacman -Sy --noconfirm pacman' - msys2 -c 'pacman --noconfirm -S base-devel pkg-config' - - - name: check build - uses: actions-rs/cargo@v1 - with: - command: check - args: --workspace --bins --examples --tests diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml deleted file mode 100644 index b2b57989..00000000 --- a/.github/workflows/windows.yml +++ /dev/null @@ -1,69 +0,0 @@ -name: CI (Windows) - -on: - pull_request: - types: [opened, synchronize, reopened] - push: - branches: - - master - - '1.0' - -env: - VCPKGRS_DYNAMIC: 1 - -jobs: - build_and_test: - strategy: - fail-fast: false - matrix: - version: - - stable - - nightly - target: - - x86_64-pc-windows-msvc - - i686-pc-windows-msvc - - name: ${{ matrix.version }} - ${{ matrix.target }} - runs-on: windows-latest - - steps: - - uses: actions/checkout@v2 - - - name: Install ${{ matrix.version }} - uses: actions-rs/toolchain@v1 - with: - toolchain: ${{ matrix.version }}-${{ matrix.target }} - profile: minimal - override: true - - - name: Install OpenSSL (x64) - if: matrix.target == 'x86_64-pc-windows-msvc' - run: | - vcpkg integrate install - vcpkg install openssl:x64-windows - Get-ChildItem C:\vcpkg\installed\x64-windows\bin - Get-ChildItem C:\vcpkg\installed\x64-windows\lib - Copy-Item C:\vcpkg\installed\x64-windows\bin\libcrypto-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libcrypto.dll - Copy-Item C:\vcpkg\installed\x64-windows\bin\libssl-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libssl.dll - - - name: Install OpenSSL (x86) - if: matrix.target == 'i686-pc-windows-msvc' - run: | - vcpkg integrate install - vcpkg install openssl:x86-windows - Get-ChildItem C:\vcpkg\installed\x86-windows\bin - Get-ChildItem C:\vcpkg\installed\x86-windows\lib - Copy-Item C:\vcpkg\installed\x86-windows\bin\libcrypto-1_1.dll C:\vcpkg\installed\x86-windows\bin\libcrypto.dll - Copy-Item C:\vcpkg\installed\x86-windows\bin\libssl-1_1.dll C:\vcpkg\installed\x86-windows\bin\libssl.dll - - - name: check build - uses: actions-rs/cargo@v1 - with: - command: check - args: --workspace --bins --examples --tests - - - name: tests - uses: actions-rs/cargo@v1 - with: - command: test - args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture diff --git a/actix-router/src/resource.rs b/actix-router/src/resource.rs index 8dbef26c..98b4a709 100644 --- a/actix-router/src/resource.rs +++ b/actix-router/src/resource.rs @@ -670,8 +670,6 @@ pub(crate) fn insert_slash(path: &str) -> String { #[cfg(test)] mod tests { use super::*; - use http::Uri; - use std::convert::TryFrom; #[test] fn test_parse_static() { @@ -833,8 +831,11 @@ mod tests { assert!(re.is_match("/user/2345/sdg")); } + #[cfg(feature = "http")] #[test] fn test_parse_urlencoded_param() { + use std::convert::TryFrom; + let re = ResourceDef::new("/user/{id}/test"); let mut path = Path::new("/user/2345/test"); @@ -845,7 +846,7 @@ mod tests { assert!(re.match_path(&mut path)); assert_eq!(path.get("id").unwrap(), "qwe%25"); - let uri = Uri::try_from("/user/qwe%25/test").unwrap(); + let uri = http::Uri::try_from("/user/qwe%25/test").unwrap(); let mut path = Path::new(uri); assert!(re.match_path(&mut path)); assert_eq!(path.get("id").unwrap(), "qwe%25"); diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 6754ca33..83ecc5ed 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,6 +1,17 @@ # Changes ## Unreleased - 2021-xx-xx +* `ActixStream::{poll_read_ready, poll_write_ready}` would return `Ready` in Ok variant. [#293] + +[#293] https://github.com/actix/actix-net/pull/293 + + +## 2.1.0 - 2021-02-24 +* Add `ActixStream` extension trait to include readiness methods. [#276] +* Re-export `tokio::net::TcpSocket` in `net` module [#282] + +[#276]: https://github.com/actix/actix-net/pull/276 +[#282]: https://github.com/actix/actix-net/pull/282 ## 2.0.2 - 2021-02-06 diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 7990e67d..126056ec 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-rt" -version = "2.0.2" +version = "2.1.0" authors = [ "Nikolay Kim ", "Rob Ede ", @@ -8,7 +8,7 @@ authors = [ description = "Tokio-based single-threaded async runtime for the Actix ecosystem" keywords = ["async", "futures", "io", "runtime"] homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-net.git" +repository = "https://github.com/actix/actix-net" documentation = "https://docs.rs/actix-rt" categories = ["network-programming", "asynchronous"] license = "MIT OR Apache-2.0" @@ -26,7 +26,7 @@ macros = ["actix-macros"] actix-macros = { version = "0.2.0", optional = true } futures-core = { version = "0.3", default-features = false } -tokio = { version = "1.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } +tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } [dev-dependencies] tokio = { version = "1.2", features = ["full"] } diff --git a/actix-rt/README.md b/actix-rt/README.md index c29d563d..f9a3ed31 100644 --- a/actix-rt/README.md +++ b/actix-rt/README.md @@ -2,4 +2,13 @@ > Tokio-based single-threaded async runtime for the Actix ecosystem. +[![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt) +[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.1.0)](https://docs.rs/actix-rt/2.1.0) +[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html) +![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg) +
+[![dependency status](https://deps.rs/crate/actix-rt/2.1.0/status.svg)](https://deps.rs/crate/actix-rt/2.1.0) +![Download](https://img.shields.io/crates/d/actix-rt.svg) +[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb) + See crate documentation for more: https://docs.rs/actix-rt. diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index a7e9f309..bd2e165d 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -70,13 +70,63 @@ pub mod signal { } pub mod net { - //! TCP/UDP/Unix bindings (Tokio re-exports). + //! TCP/UDP/Unix bindings (mostly Tokio re-exports). + use std::{ + future::Future, + io, + task::{Context, Poll}, + }; + + pub use tokio::io::Ready; + use tokio::io::{AsyncRead, AsyncWrite, Interest}; pub use tokio::net::UdpSocket; - pub use tokio::net::{TcpListener, TcpStream}; + pub use tokio::net::{TcpListener, TcpSocket, TcpStream}; #[cfg(unix)] pub use tokio::net::{UnixDatagram, UnixListener, UnixStream}; + + /// Extension trait over async read+write types that can also signal readiness. + pub trait ActixStream: AsyncRead + AsyncWrite + Unpin + 'static { + /// Poll stream and check read readiness of Self. + /// + /// See [tokio::net::TcpStream::poll_read_ready] for detail on intended use. + fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll>; + + /// Poll stream and check write readiness of Self. + /// + /// See [tokio::net::TcpStream::poll_write_ready] for detail on intended use. + fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll>; + } + + impl ActixStream for TcpStream { + fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + let ready = self.ready(Interest::READABLE); + tokio::pin!(ready); + ready.poll(cx) + } + + fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + let ready = self.ready(Interest::WRITABLE); + tokio::pin!(ready); + ready.poll(cx) + } + } + + #[cfg(unix)] + impl ActixStream for UnixStream { + fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + let ready = self.ready(Interest::READABLE); + tokio::pin!(ready); + ready.poll(cx) + } + + fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + let ready = self.ready(Interest::WRITABLE); + tokio::pin!(ready); + ready.poll(cx) + } + } } pub mod time { diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 1b088e30..83350a81 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -12,7 +12,6 @@ repository = "https://github.com/actix/actix-net.git" documentation = "https://docs.rs/actix-server" categories = ["network-programming", "asynchronous"] license = "MIT OR Apache-2.0" -exclude = [".gitignore", ".cargo/config"] edition = "2018" [lib] @@ -25,7 +24,7 @@ default = [] [dependencies] actix-codec = "0.4.0-beta.1" actix-rt = { version = "2.0.0", default-features = false } -actix-service = "2.0.0-beta.4" +actix-service = "2.0.0-beta.5" actix-utils = "3.0.0-beta.2" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } @@ -33,7 +32,7 @@ log = "0.4" mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" slab = "0.4" -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1.2", features = ["sync"] } [dev-dependencies] actix-rt = "2.0.0" diff --git a/actix-server/examples/basic.rs b/actix-server/examples/tcp-echo.rs similarity index 98% rename from actix-server/examples/basic.rs rename to actix-server/examples/tcp-echo.rs index 76918967..ad18a1ac 100644 --- a/actix-server/examples/basic.rs +++ b/actix-server/examples/tcp-echo.rs @@ -16,7 +16,7 @@ use std::sync::{ use std::{env, io}; use actix_rt::net::TcpStream; -use actix_server::ServerHandle; +use actix_server::Server; use actix_service::pipeline_factory; use bytes::BytesMut; use futures_util::future::ok; @@ -36,7 +36,7 @@ async fn main() -> io::Result<()> { // Bind socket address and start worker(s). By default, the server uses the number of available // logical CPU cores as the worker count. For this reason, the closure passed to bind needs // to return a service *factory*; so it can be created once per worker. - ServerHandle::build() + Server::build() .bind("echo", addr, move || { let count = Arc::clone(&count); let num2 = Arc::clone(&count); diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index b78ebf70..2b5bd764 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -5,7 +5,7 @@ use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; use slab::Slab; -use crate::server_handle::ServerHandle; +use crate::server_handle::Server; use crate::socket::{MioListener, SocketAddr}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandle}; @@ -29,7 +29,7 @@ pub(crate) struct Accept { poll: Poll, waker_queue: WakerQueue, handles: Vec, - srv: ServerHandle, + srv: Server, next: usize, backpressure: bool, // poll time duration. @@ -53,7 +53,7 @@ fn connection_error(e: &io::Error) -> bool { impl Accept { pub(crate) fn start( sockets: Vec<(Token, MioListener)>, - server_handle: ServerHandle, + server_handle: Server, worker_factory: F, ) -> WakerQueue where @@ -92,7 +92,7 @@ impl Accept { waker_queue: WakerQueue, socks: Vec<(Token, MioListener)>, handles: Vec, - srv: ServerHandle, + srv: Server, ) -> (Accept, Slab) { let mut sockets = Slab::new(); for (hnd_token, mut lst) in socks.into_iter() { diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 7fed6f5e..8fd130fc 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -14,7 +14,7 @@ use tokio::sync::oneshot; use crate::accept::Accept; use crate::config::{ConfiguredService, ServiceConfig}; -use crate::server_handle::{ServerCommand, ServerHandle}; +use crate::server_handle::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; @@ -285,7 +285,7 @@ impl ServerBuilder { // start accept thread. return waker_queue for wake up it. let waker_queue = Accept::start( sockets, - ServerHandle::new(self.cmd_tx.clone()), + Server::new(self.cmd_tx.clone()), // closure for construct worker and return it's handler. |waker| { (0..self.threads) @@ -349,8 +349,8 @@ impl ServerFuture { /// Obtain a Handle for ServerFuture that can be used to change state of actix server. /// /// See [ServerHandle](crate::server::ServerHandle) for usage. - pub fn handle(&self) -> ServerHandle { - ServerHandle::new(self.cmd_tx.clone()) + pub fn handle(&self) -> Server { + Server::new(self.cmd_tx.clone()) } fn handle_cmd(&mut self, item: ServerCommand) -> Option> { diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index d9c1eeb2..6be373bd 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -15,9 +15,9 @@ mod test_server; mod waker_queue; mod worker; -pub use self::builder::ServerBuilder; +pub use self::builder::{ServerBuilder, ServerFuture}; pub use self::config::{ServiceConfig, ServiceRuntime}; -pub use self::server_handle::ServerHandle; +pub use self::server_handle::Server; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; diff --git a/actix-server/src/server_handle.rs b/actix-server/src/server_handle.rs index 679dc1a1..74228b3f 100644 --- a/actix-server/src/server_handle.rs +++ b/actix-server/src/server_handle.rs @@ -25,14 +25,14 @@ pub(crate) enum ServerCommand { } #[derive(Debug)] -pub struct ServerHandle( +pub struct Server( UnboundedSender, Option>, ); -impl ServerHandle { +impl Server { pub(crate) fn new(tx: UnboundedSender) -> Self { - ServerHandle(tx, None) + Server(tx, None) } /// Start server building process @@ -80,13 +80,13 @@ impl ServerHandle { } } -impl Clone for ServerHandle { +impl Clone for Server { fn clone(&self) -> Self { Self(self.0.clone(), None) } } -impl Future for ServerHandle { +impl Future for Server { type Output = io::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index 996faa2a..c8e5941c 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -3,7 +3,7 @@ use std::{net, thread}; use actix_rt::{net::TcpStream, System}; -use crate::{ServerBuilder, ServerHandle, ServiceFactory}; +use crate::{Server, ServerBuilder, ServiceFactory}; /// The `TestServer` type. /// @@ -51,7 +51,7 @@ impl TestServer { let sys = System::new(); sys.block_on(async { actix_rt::spawn(async move { - let _ = factory(ServerHandle::build()) + let _ = factory(Server::build()) .workers(1) .disable_signals() .run() @@ -84,7 +84,7 @@ impl TestServer { sys.block_on(async { actix_rt::spawn(async move { - let _ = ServerHandle::build() + let _ = Server::build() .listen("test", tcp, factory) .unwrap() .workers(1) diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 6b103689..9ac101b3 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -8,7 +8,7 @@ use mio::{Registry, Token as MioToken, Waker}; use crate::worker::WorkerHandle; -/// waker token for `mio::Poll` instance +/// Waker token for `mio::Poll` instance. pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); /// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` @@ -30,7 +30,7 @@ impl Deref for WakerQueue { } impl WakerQueue { - /// construct a waker queue with given `Poll`'s `Registry` and capacity. + /// Construct a waker queue with given `Poll`'s `Registry` and capacity. /// /// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match /// event's token for it to properly handle `WakerInterest`. @@ -41,7 +41,7 @@ impl WakerQueue { Ok(Self(Arc::new((waker, queue)))) } - /// push a new interest to the queue and wake up the accept poll afterwards. + /// Push a new interest to the queue and wake up the accept poll afterwards. pub(crate) fn wake(&self, interest: WakerInterest) { let (waker, queue) = self.deref(); @@ -55,20 +55,20 @@ impl WakerQueue { .unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e)); } - /// get a MutexGuard of the waker queue. + /// Get a MutexGuard of the waker queue. pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque> { self.deref().1.lock().expect("Failed to lock WakerQueue") } - /// reset the waker queue so it does not grow infinitely. + /// Reset the waker queue so it does not grow infinitely. pub(crate) fn reset(queue: &mut VecDeque) { std::mem::swap(&mut VecDeque::::with_capacity(16), queue); } } -/// types of interests we would look into when `Accept`'s `Poll` is waked up by waker. +/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker. /// -/// *. These interests should not be confused with `mio::Interest` and mostly not I/O related +/// These interests should not be confused with `mio::Interest` and mostly not I/O related pub(crate) enum WakerInterest { /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker /// available and can accept new tasks. diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index aad4c7a3..a6527fc0 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -199,62 +199,59 @@ impl ServerWorker { availability.set(false); + let handle = tokio::runtime::Handle::current(); + // every worker runs in it's own arbiter. // use a custom tokio runtime builder to change the settings of runtime. std::thread::spawn(move || { - let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { - rx, - rx2, - availability, - factories, - config, - services: Vec::new(), - conns: conns.clone(), - state: WorkerState::Unavailable, - }); - let fut = wrk - .factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { - fut.await.map(|r| { - r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() - }) - } - }) - .collect::>(); + handle.block_on(tokio::task::LocalSet::new().run_until(async move { + let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { + rx, + rx2, + availability, + factories, + config, + services: Vec::new(), + conns: conns.clone(), + state: WorkerState::Unavailable, + }); + let fut = wrk + .factories + .iter() + .enumerate() + .map(|(idx, factory)| { + let fut = factory.create(); + async move { + fut.await.map(|r| { + r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() + }) + } + }) + .collect::>(); - tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap() - .block_on(tokio::task::LocalSet::new().run_until(async move { - let res = join_all(fut) - .await - .into_iter() - .collect::, _>>(); - match res { - Ok(services) => { - for item in services { - for (factory, token, service) in item { - assert_eq!(token.0, wrk.services.len()); - wrk.services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - } + let res = join_all(fut) + .await + .into_iter() + .collect::, _>>(); + match res { + Ok(services) => { + for item in services { + for (factory, token, service) in item { + assert_eq!(token.0, wrk.services.len()); + wrk.services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); } } - Err(e) => { - error!("Can not start worker: {:?}", e); - } } - wrk.await - })) + Err(e) => { + error!("Can not start worker: {:?}", e); + } + } + wrk.await + })) }); WorkerHandle::new(idx, tx1, tx2, avail) diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 23e4635c..55774b1f 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; use std::sync::{mpsc, Arc}; use std::{net, thread, time}; -use actix_server::ServerHandle; +use actix_server::Server; use actix_service::fn_service; use futures_util::future::{lazy, ok}; @@ -24,7 +24,7 @@ fn test_bind() { let sys = actix_rt::System::new(); sys.block_on(async { actix_rt::spawn(async move { - let _ = ServerHandle::build() + let _ = Server::build() .workers(1) .disable_signals() .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) @@ -54,7 +54,7 @@ fn test_listen() { let lst = net::TcpListener::bind(addr).unwrap(); sys.block_on(async { actix_rt::spawn(async move { - let _ = ServerHandle::build() + let _ = Server::build() .disable_signals() .workers(1) .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) @@ -88,7 +88,7 @@ fn test_start() { let h = thread::spawn(move || { actix_rt::System::new().block_on(async { - let server = ServerHandle::build() + let server = Server::build() .backlog(100) .disable_signals() .bind("test", addr, move || { @@ -156,7 +156,7 @@ fn test_configure() { let h = thread::spawn(move || { let num = num2.clone(); actix_rt::System::new().block_on(async { - let server = ServerHandle::build() + let server = Server::build() .disable_signals() .configure(move |cfg| { let num = num.clone(); diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index f5da9d2e..51749ecd 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -3,6 +3,14 @@ ## Unreleased - 2021-xx-xx +## 2.0.0-beta.5 - 2021-03-15 +* Add default `Service` trait impl for `Rc` and `&S: Service`. [#288] +* Add `boxed::rc_service` function for constructing `boxed::RcService` type [#290] + +[#288]: https://github.com/actix/actix-net/pull/288 +[#290]: https://github.com/actix/actix-net/pull/290 + + ## 2.0.0-beta.4 - 2021-02-04 * `Service::poll_ready` and `Service::call` receive `&self`. [#247] * `apply_fn` and `apply_fn_factory` now receive `Fn(Req, &Service)` function type. [#247] diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index eecf4669..84a0c172 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-service" -version = "2.0.0-beta.4" +version = "2.0.0-beta.5" authors = [ "Nikolay Kim ", "Rob Ede ", diff --git a/actix-service/README.md b/actix-service/README.md index 28c38295..54171274 100644 --- a/actix-service/README.md +++ b/actix-service/README.md @@ -3,11 +3,11 @@ > Service trait and combinators for representing asynchronous request/response operations. [![crates.io](https://img.shields.io/crates/v/actix-service?label=latest)](https://crates.io/crates/actix-service) -[![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0-beta.4)](https://docs.rs/actix-service/2.0.0-beta.4) +[![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0-beta.5)](https://docs.rs/actix-service/2.0.0-beta.5) [![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html) ![License](https://img.shields.io/crates/l/actix-service.svg) -[![Dependency Status](https://deps.rs/crate/actix-service/2.0.0-beta.4/status.svg)](https://deps.rs/crate/actix-service/2.0.0-beta.4) -[![Download](https://img.shields.io/crates/d/actix-service.svg)](https://crates.io/crates/actix-service) +[![Dependency Status](https://deps.rs/crate/actix-service/2.0.0-beta.5/status.svg)](https://deps.rs/crate/actix-service/2.0.0-beta.5) +![Download](https://img.shields.io/crates/d/actix-service.svg) [![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x) See documentation for detailed explanations of these components: https://docs.rs/actix-service. diff --git a/actix-service/src/boxed.rs b/actix-service/src/boxed.rs index 6ad2eaf4..a872ca9f 100644 --- a/actix-service/src/boxed.rs +++ b/actix-service/src/boxed.rs @@ -1,21 +1,65 @@ -use alloc::boxed::Box; -use core::{ - future::Future, - marker::PhantomData, - pin::Pin, - task::{Context, Poll}, -}; +//! Trait object forms of services and service factories. + +use alloc::{boxed::Box, rc::Rc}; +use core::{future::Future, pin::Pin}; use crate::{Service, ServiceFactory}; +/// A boxed future without a Send bound or lifetime parameters. pub type BoxFuture = Pin>>; -pub type BoxService = - Box>>>; +macro_rules! service_object { + ($name: ident, $type: tt, $fn_name: ident) => { + /// Type alias for service trait object. + pub type $name = $type< + dyn Service>>, + >; + /// Create service trait object. + pub fn $fn_name(service: S) -> $name + where + S: Service + 'static, + Req: 'static, + S::Future: 'static, + { + $type::new(ServiceWrapper::new(service)) + } + }; +} + +service_object!(BoxService, Box, service); +service_object!(RcService, Rc, rc_service); + +struct ServiceWrapper { + inner: S, +} + +impl ServiceWrapper { + fn new(inner: S) -> Self { + Self { inner } + } +} + +impl Service for ServiceWrapper +where + S: Service, + S::Future: 'static, +{ + type Response = Res; + type Error = Err; + type Future = BoxFuture>; + + crate::forward_ready!(inner); + + fn call(&self, req: Req) -> Self::Future { + Box::pin(self.inner.call(req)) + } +} + +/// Wrapper for a service factory trait object that will produce a boxed trait object service. pub struct BoxServiceFactory(Inner); -/// Create boxed service factory +/// Create service factory trait object. pub fn factory( factory: SF, ) -> BoxServiceFactory @@ -28,20 +72,7 @@ where SF::Error: 'static, SF::InitError: 'static, { - BoxServiceFactory(Box::new(FactoryWrapper { - factory, - _t: PhantomData, - })) -} - -/// Create boxed service -pub fn service(service: S) -> BoxService -where - S: Service + 'static, - Req: 'static, - S::Future: 'static, -{ - Box::new(ServiceWrapper(service, PhantomData)) + BoxServiceFactory(Box::new(FactoryWrapper(factory))) } type Inner = Box< @@ -66,9 +97,9 @@ where { type Response = Res; type Error = Err; - type InitError = InitErr; type Config = C; type Service = BoxService; + type InitError = InitErr; type Future = BoxFuture>; @@ -77,12 +108,9 @@ where } } -struct FactoryWrapper { - factory: SF, - _t: PhantomData<(Req, Cfg)>, -} +struct FactoryWrapper(SF); -impl ServiceFactory for FactoryWrapper +impl ServiceFactory for FactoryWrapper where Req: 'static, Res: 'static, @@ -95,47 +123,13 @@ where { type Response = Res; type Error = Err; - type InitError = InitErr; type Config = Cfg; type Service = BoxService; + type InitError = InitErr; type Future = BoxFuture>; fn new_service(&self, cfg: Cfg) -> Self::Future { - let fut = self.factory.new_service(cfg); - Box::pin(async { - let res = fut.await; - res.map(ServiceWrapper::boxed) - }) - } -} - -struct ServiceWrapper, Req>(S, PhantomData); - -impl ServiceWrapper -where - S: Service + 'static, - Req: 'static, - S::Future: 'static, -{ - fn boxed(service: S) -> BoxService { - Box::new(ServiceWrapper(service, PhantomData)) - } -} - -impl Service for ServiceWrapper -where - S: Service, - S::Future: 'static, -{ - type Response = Res; - type Error = Err; - type Future = BoxFuture>; - - fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(ctx) - } - - fn call(&self, req: Req) -> Self::Future { - Box::pin(self.0.call(req)) + let f = self.0.new_service(cfg); + Box::pin(async { f.await.map(|s| Box::new(ServiceWrapper::new(s)) as _) }) } } diff --git a/actix-service/src/ext.rs b/actix-service/src/ext.rs index e778d11e..d931596b 100644 --- a/actix-service/src/ext.rs +++ b/actix-service/src/ext.rs @@ -1,4 +1,7 @@ -use crate::{dev, Service, ServiceFactory}; +use crate::{ + map::Map, map_err::MapErr, transform_err::TransformMapInitErr, Service, ServiceFactory, + Transform, +}; pub trait ServiceExt: Service { /// Map this service's output to a different type, returning a new service @@ -10,12 +13,12 @@ pub trait ServiceExt: Service { /// Note that this function consumes the receiving service and returns a /// wrapped version of it, similar to the existing `map` methods in the /// standard library. - fn map(self, f: F) -> dev::Map + fn map(self, f: F) -> Map where Self: Sized, F: FnMut(Self::Response) -> R, { - dev::Map::new(self, f) + Map::new(self, f) } /// Map this service's error to a different error, returning a new service. @@ -26,12 +29,12 @@ pub trait ServiceExt: Service { /// /// Note that this function consumes the receiving service and returns a /// wrapped version of it. - fn map_err(self, f: F) -> dev::MapErr + fn map_err(self, f: F) -> MapErr where Self: Sized, F: Fn(Self::Error) -> E, { - dev::MapErr::new(self, f) + MapErr::new(self, f) } } @@ -67,4 +70,17 @@ pub trait ServiceFactoryExt: ServiceFactory { } } -impl ServiceFactoryExt for S where S: ServiceFactory {} +impl ServiceFactoryExt for SF where SF: ServiceFactory {} + +pub trait TransformExt: Transform { + /// Return a new `Transform` whose init error is mapped to to a different type. + fn map_init_err(self, f: F) -> TransformMapInitErr + where + Self: Sized, + F: Fn(Self::InitError) -> E + Clone, + { + TransformMapInitErr::new(self, f) + } +} + +impl TransformExt for T where T: Transform {} diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index 230f437b..8c1a6f51 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -1,4 +1,4 @@ -use core::{future::Future, marker::PhantomData, task::Poll}; +use core::{future::Future, marker::PhantomData}; use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory}; @@ -15,8 +15,7 @@ where /// Create `ServiceFactory` for function that can produce services /// -/// # Example -/// +/// # Examples /// ``` /// use std::io; /// use actix_service::{fn_factory, fn_service, Service, ServiceFactory}; @@ -62,11 +61,10 @@ where /// Create `ServiceFactory` for function that accepts config argument and can produce services /// -/// Any function that has following form `Fn(Config) -> Future` could -/// act as a `ServiceFactory`. -/// -/// # Example +/// Any function that has following form `Fn(Config) -> Future` could act as +/// a `ServiceFactory`. /// +/// # Examples /// ``` /// use std::io; /// use actix_service::{fn_factory_with_config, fn_service, Service, ServiceFactory}; diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 7c3a271c..cc82bfa6 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -21,6 +21,7 @@ mod apply_cfg; pub mod boxed; mod ext; mod fn_service; +mod macros; mod map; mod map_config; mod map_err; @@ -33,11 +34,11 @@ mod transform_err; pub use self::apply::{apply_fn, apply_fn_factory}; pub use self::apply_cfg::{apply_cfg, apply_cfg_factory}; -pub use self::ext::{ServiceExt, ServiceFactoryExt}; +pub use self::ext::{ServiceExt, ServiceFactoryExt, TransformExt}; pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service}; pub use self::map_config::{map_config, unit_config}; pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory}; -pub use self::transform::{apply, Transform}; +pub use self::transform::{apply, ApplyTransform, Transform}; #[allow(unused_imports)] use self::ready::{err, ok, ready, Ready}; @@ -102,8 +103,8 @@ pub trait Service { /// call and the next invocation of `call` results in an error. /// /// # Notes - /// 1. `.poll_ready()` might be called on different task from actual service call. - /// 1. In case of chained services, `.poll_ready()` get called for all services at once. + /// 1. `poll_ready` might be called on a different task to `call`. + /// 1. In cases of chained services, `.poll_ready()` is called for all services at once. fn poll_ready(&self, ctx: &mut task::Context<'_>) -> Poll>; /// Process the request and return the response asynchronously. @@ -150,6 +151,7 @@ pub trait ServiceFactory { fn new_service(&self, cfg: Self::Config) -> Self::Future; } +// TODO: remove implement on mut reference. impl<'a, S, Req> Service for &'a mut S where S: Service + 'a, @@ -167,6 +169,23 @@ where } } +impl<'a, S, Req> Service for &'a S +where + S: Service + 'a, +{ + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll> { + (**self).poll_ready(ctx) + } + + fn call(&self, request: Req) -> S::Future { + (**self).call(request) + } +} + impl Service for Box where S: Service + ?Sized, @@ -184,24 +203,25 @@ where } } -impl Service for RefCell +impl Service for Rc where - S: Service, + S: Service + ?Sized, { type Response = S::Response; type Error = S::Error; type Future = S::Future; fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll> { - self.borrow().poll_ready(ctx) + (**self).poll_ready(ctx) } fn call(&self, request: Req) -> S::Future { - self.borrow().call(request) + (**self).call(request) } } -impl Service for Rc> +/// This impl is deprecated since v2 because the `Service` trait now receives shared reference. +impl Service for RefCell where S: Service, { @@ -294,44 +314,3 @@ where { tp.into_service() } - -pub mod dev { - pub use crate::apply::{Apply, ApplyFactory}; - pub use crate::fn_service::{ - FnService, FnServiceConfig, FnServiceFactory, FnServiceNoConfig, - }; - pub use crate::map::{Map, MapServiceFactory}; - pub use crate::map_config::{MapConfig, UnitConfig}; - pub use crate::map_err::{MapErr, MapErrServiceFactory}; - pub use crate::map_init_err::MapInitErr; - pub use crate::transform::ApplyTransform; - pub use crate::transform_err::TransformMapInitErr; -} - -#[macro_export] -macro_rules! always_ready { - () => { - #[inline] - fn poll_ready( - &self, - _: &mut ::core::task::Context<'_>, - ) -> ::core::task::Poll> { - Poll::Ready(Ok(())) - } - }; -} - -#[macro_export] -macro_rules! forward_ready { - ($field:ident) => { - #[inline] - fn poll_ready( - &self, - cx: &mut ::core::task::Context<'_>, - ) -> ::core::task::Poll> { - self.$field - .poll_ready(cx) - .map_err(::core::convert::Into::into) - } - }; -} diff --git a/actix-service/src/macros.rs b/actix-service/src/macros.rs new file mode 100644 index 00000000..4a083895 --- /dev/null +++ b/actix-service/src/macros.rs @@ -0,0 +1,181 @@ +/// A boilerplate implementation of [`Service::poll_ready`] that always signals readiness. +/// +/// [`Service::poll_ready`]: crate::Service::poll_ready +/// +/// # Examples +/// ```no_run +/// use actix_service::Service; +/// use futures_util::future::{ready, Ready}; +/// +/// struct IdentityService; +/// +/// impl Service for IdentityService { +/// type Response = u32; +/// type Error = (); +/// type Future = Ready>; +/// +/// actix_service::always_ready!(); +/// +/// fn call(&self, req: u32) -> Self::Future { +/// ready(Ok(req)) +/// } +/// } +/// ``` +#[macro_export] +macro_rules! always_ready { + () => { + #[inline] + fn poll_ready( + &self, + _: &mut ::core::task::Context<'_>, + ) -> ::core::task::Poll> { + ::core::task::Poll::Ready(Ok(())) + } + }; +} + +/// A boilerplate implementation of [`Service::poll_ready`] that forwards readiness checks to a +/// named struct field. +/// +/// Tuple structs are not supported. +/// +/// [`Service::poll_ready`]: crate::Service::poll_ready +/// +/// # Examples +/// ```no_run +/// use actix_service::Service; +/// use futures_util::future::{ready, Ready}; +/// +/// struct WrapperService { +/// inner: S, +/// } +/// +/// impl Service<()> for WrapperService +/// where +/// S: Service<()>, +/// { +/// type Response = S::Response; +/// type Error = S::Error; +/// type Future = S::Future; +/// +/// actix_service::forward_ready!(inner); +/// +/// fn call(&self, req: ()) -> Self::Future { +/// self.inner.call(req) +/// } +/// } +/// ``` +#[macro_export] +macro_rules! forward_ready { + ($field:ident) => { + #[inline] + fn poll_ready( + &self, + cx: &mut ::core::task::Context<'_>, + ) -> ::core::task::Poll> { + self.$field + .poll_ready(cx) + .map_err(::core::convert::Into::into) + } + }; +} + +#[cfg(test)] +mod tests { + use core::{ + cell::Cell, + convert::Infallible, + task::{self, Context, Poll}, + }; + + use futures_util::{ + future::{ready, Ready}, + task::noop_waker, + }; + + use crate::Service; + + struct IdentityService; + + impl Service for IdentityService { + type Response = u32; + type Error = Infallible; + type Future = Ready>; + + always_ready!(); + + fn call(&self, req: u32) -> Self::Future { + ready(Ok(req)) + } + } + + struct CountdownService(Cell); + + impl Service<()> for CountdownService { + type Response = (); + type Error = Infallible; + type Future = Ready>; + + fn poll_ready(&self, cx: &mut Context<'_>) -> Poll> { + let count = self.0.get(); + + if count == 0 { + Poll::Ready(Ok(())) + } else { + self.0.set(count - 1); + cx.waker().wake_by_ref(); + Poll::Pending + } + } + + fn call(&self, _: ()) -> Self::Future { + ready(Ok(())) + } + } + + struct WrapperService { + inner: S, + } + + impl Service<()> for WrapperService + where + S: Service<()>, + { + type Response = S::Response; + type Error = S::Error; + type Future = S::Future; + + forward_ready!(inner); + + fn call(&self, req: ()) -> Self::Future { + self.inner.call(req) + } + } + + #[test] + fn test_always_ready_macro() { + let waker = noop_waker(); + let mut cx = task::Context::from_waker(&waker); + + let svc = IdentityService; + + assert!(svc.poll_ready(&mut cx).is_ready()); + assert!(svc.poll_ready(&mut cx).is_ready()); + assert!(svc.poll_ready(&mut cx).is_ready()); + } + + #[test] + fn test_forward_ready_macro() { + let waker = noop_waker(); + let mut cx = task::Context::from_waker(&waker); + + let svc = WrapperService { + inner: CountdownService(Cell::new(3)), + }; + + assert!(svc.poll_ready(&mut cx).is_pending()); + assert!(svc.poll_ready(&mut cx).is_pending()); + assert!(svc.poll_ready(&mut cx).is_pending()); + assert!(svc.poll_ready(&mut cx).is_ready()); + } +} diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index 7f477e54..b0abe72b 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -9,10 +9,9 @@ use core::{ use futures_core::ready; use pin_project_lite::pin_project; -use crate::transform_err::TransformMapInitErr; use crate::{IntoServiceFactory, Service, ServiceFactory}; -/// Apply transform to a service. +/// Apply a [`Transform`] to a [`Service`]. pub fn apply(t: T, factory: I) -> ApplyTransform where I: IntoServiceFactory, @@ -25,9 +24,8 @@ where /// The `Transform` trait defines the interface of a service factory that wraps inner service /// during construction. /// -/// Transform(middleware) wraps inner service and runs during -/// inbound and/or outbound processing in the request/response lifecycle. -/// It may modify request and/or response. +/// Transform(middleware) wraps inner service and runs during inbound and/or outbound processing in +/// the request/response lifecycle. It may modify request and/or response. /// /// For example, timeout transform: /// @@ -51,20 +49,19 @@ where /// fn call(&self, req: S::Request) -> Self::Future { /// TimeoutServiceResponse { /// fut: self.service.call(req), -/// sleep: Delay::new(clock::now() + self.timeout), +/// sleep: Sleep::new(clock::now() + self.timeout), /// } /// } /// } /// ``` /// -/// Timeout service in above example is decoupled from underlying service implementation -/// and could be applied to any service. +/// Timeout service in above example is decoupled from underlying service implementation and could +/// be applied to any service. /// -/// The `Transform` trait defines the interface of a Service factory. `Transform` -/// is often implemented for middleware, defining how to construct a -/// middleware Service. A Service that is constructed by the factory takes -/// the Service that follows it during execution as a parameter, assuming -/// ownership of the next Service. +/// The `Transform` trait defines the interface of a Service factory. `Transform` is often +/// implemented for middleware, defining how to construct a middleware Service. A Service that is +/// constructed by the factory takes the Service that follows it during execution as a parameter, +/// assuming ownership of the next Service. /// /// Factory for `Timeout` middleware from the above example could look like this: /// @@ -85,15 +82,15 @@ where /// type Future = Ready>; /// /// fn new_transform(&self, service: S) -> Self::Future { -/// ok(TimeoutService { +/// ready(Ok(TimeoutService { /// service, /// timeout: self.timeout, -/// }) +/// })) /// } /// } /// ``` pub trait Transform { - /// Responses given by the service. + /// Responses produced by the service. type Response; /// Errors produced by the service. @@ -110,16 +107,6 @@ pub trait Transform { /// Creates and returns a new Transform component, asynchronously fn new_transform(&self, service: S) -> Self::Future; - - /// Map this transform's factory error to a different error, - /// returning a new transform service factory. - fn map_init_err(self, f: F) -> TransformMapInitErr - where - Self: Sized, - F: Fn(Self::InitError) -> E + Clone, - { - TransformMapInitErr::new(self, f) - } } impl Transform for Rc @@ -152,7 +139,7 @@ where } } -/// `Apply` transform to new service +/// Apply a [`Transform`] to a [`Service`]. pub struct ApplyTransform(Rc<(T, S)>, PhantomData); impl ApplyTransform diff --git a/actix-service/src/transform_err.rs b/actix-service/src/transform_err.rs index cbf5fe3b..b4695d5c 100644 --- a/actix-service/src/transform_err.rs +++ b/actix-service/src/transform_err.rs @@ -9,10 +9,8 @@ use pin_project_lite::pin_project; use super::Transform; -/// Transform for the `map_init_err` combinator, changing the type of a new -/// transform's init error. -/// -/// This is created by the `Transform::map_init_err` method. +/// Transform for the [`TransformExt::map_init_err`] combinator, changing the type of a new +/// [`Transform`]'s initialization error. pub struct TransformMapInitErr { transform: T, mapper: F, diff --git a/actix-tls/CHANGES.md b/actix-tls/CHANGES.md index a87f0fc5..824663b0 100644 --- a/actix-tls/CHANGES.md +++ b/actix-tls/CHANGES.md @@ -3,6 +3,14 @@ ## Unreleased - 2021-xx-xx +## 3.0.0-beta.4 - 2021-02-24 +* Rename `accept::openssl::{SslStream => TlsStream}`. +* Add `connect::Connect::set_local_addr` to attach local `IpAddr`. [#282] +* `connector::TcpConnector` service will try to bind to local_addr of `IpAddr` when given. [#282] + +[#282]: https://github.com/actix/actix-net/pull/282 + + ## 3.0.0-beta.3 - 2021-02-06 * Remove `trust-dns-proto` and `trust-dns-resolver`. [#248] * Use `std::net::ToSocketAddrs` as simple and basic default resolver. [#248] diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index db79d6ab..d14c65ac 100755 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-tls" -version = "3.0.0-beta.3" +version = "3.0.0-beta.4" authors = ["Nikolay Kim "] description = "TLS acceptor and connector services for Actix ecosystem" keywords = ["network", "tls", "ssl", "async", "transport"] @@ -41,8 +41,8 @@ uri = ["http"] [dependencies] actix-codec = "0.4.0-beta.1" -actix-rt = { version = "2.0.0", default-features = false } -actix-service = "2.0.0-beta.4" +actix-rt = { version = "2.1.0", default-features = false } +actix-service = "2.0.0-beta.5" actix-utils = "3.0.0-beta.2" derive_more = "0.99.5" @@ -52,7 +52,7 @@ log = "0.4" tokio-util = { version = "0.6.3", default-features = false } # openssl -tls-openssl = { package = "openssl", version = "0.10", optional = true } +tls-openssl = { package = "openssl", version = "0.10.9", optional = true } tokio-openssl = { version = "0.6", optional = true } # rustls @@ -63,7 +63,7 @@ webpki-roots = { version = "0.21", optional = true } tokio-native-tls = { version = "0.3", optional = true } [dev-dependencies] -actix-rt = "2.0.0" +actix-rt = "2.1.0" actix-server = "2.0.0-beta.3" bytes = "1" env_logger = "0.8" @@ -72,5 +72,5 @@ log = "0.4" trust-dns-resolver = "0.20.0" [[example]] -name = "basic" +name = "tcp-rustls" required-features = ["accept", "rustls"] diff --git a/actix-tls/examples/basic.rs b/actix-tls/examples/tcp-rustls.rs similarity index 91% rename from actix-tls/examples/basic.rs rename to actix-tls/examples/tcp-rustls.rs index 0f02cf38..d0c20428 100644 --- a/actix-tls/examples/basic.rs +++ b/actix-tls/examples/tcp-rustls.rs @@ -29,9 +29,10 @@ use std::{ }, }; -use actix_server::ServerHandle; +use actix_rt::net::TcpStream; +use actix_server::Server; use actix_service::pipeline_factory; -use actix_tls::accept::rustls::Acceptor as RustlsAcceptor; +use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use futures_util::future::ok; use log::info; use rustls::{ @@ -67,16 +68,16 @@ async fn main() -> io::Result<()> { let addr = ("127.0.0.1", 8443); info!("starting server on port: {}", &addr.0); - ServerHandle::build() + Server::build() .bind("tls-example", addr, move || { let count = Arc::clone(&count); // Set up TLS service factory pipeline_factory(tls_acceptor.clone()) .map_err(|err| println!("Rustls error: {:?}", err)) - .and_then(move |stream| { + .and_then(move |stream: TlsStream| { let num = count.fetch_add(1, Ordering::Relaxed); - info!("[{}] Got TLS connection: {:?}", num, stream); + info!("[{}] Got TLS connection: {:?}", num, &*stream); ok(()) }) })? diff --git a/actix-tls/src/accept/nativetls.rs b/actix-tls/src/accept/nativetls.rs index 236ce973..614bdad3 100644 --- a/actix-tls/src/accept/nativetls.rs +++ b/actix-tls/src/accept/nativetls.rs @@ -1,15 +1,94 @@ -use std::task::{Context, Poll}; +use std::{ + io::{self, IoSlice}, + ops::{Deref, DerefMut}, + pin::Pin, + task::{Context, Poll}, +}; -use actix_codec::{AsyncRead, AsyncWrite}; +use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; +use actix_rt::net::{ActixStream, Ready}; use actix_service::{Service, ServiceFactory}; use actix_utils::counter::Counter; use futures_core::future::LocalBoxFuture; pub use tokio_native_tls::native_tls::Error; -pub use tokio_native_tls::{TlsAcceptor, TlsStream}; +pub use tokio_native_tls::TlsAcceptor; use super::MAX_CONN_COUNTER; +/// Wrapper type for `tokio_native_tls::TlsStream` in order to impl `ActixStream` trait. +pub struct TlsStream(tokio_native_tls::TlsStream); + +impl From> for TlsStream { + fn from(stream: tokio_native_tls::TlsStream) -> Self { + Self(stream) + } +} + +impl Deref for TlsStream { + type Target = tokio_native_tls::TlsStream; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for TlsStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl AsyncRead for TlsStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_read(cx, buf) + } +} + +impl AsyncWrite for TlsStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_shutdown(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + (&**self).is_write_vectored() + } +} + +impl ActixStream for TlsStream { + fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + T::poll_read_ready((&**self).get_ref().get_ref().get_ref(), cx) + } + + fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + T::poll_write_ready((&**self).get_ref().get_ref().get_ref(), cx) + } +} + /// Accept TLS connections via `native-tls` package. /// /// `native-tls` feature enables this `Acceptor` type. @@ -34,10 +113,7 @@ impl Clone for Acceptor { } } -impl ServiceFactory for Acceptor -where - T: AsyncRead + AsyncWrite + Unpin + 'static, -{ +impl ServiceFactory for Acceptor { type Response = TlsStream; type Error = Error; type Config = (); @@ -71,10 +147,7 @@ impl Clone for NativeTlsAcceptorService { } } -impl Service for NativeTlsAcceptorService -where - T: AsyncRead + AsyncWrite + Unpin + 'static, -{ +impl Service for NativeTlsAcceptorService { type Response = TlsStream; type Error = Error; type Future = LocalBoxFuture<'static, Result, Error>>; @@ -93,7 +166,7 @@ where Box::pin(async move { let io = this.acceptor.accept(io).await; drop(guard); - io + io.map(Into::into) }) } } diff --git a/actix-tls/src/accept/openssl.rs b/actix-tls/src/accept/openssl.rs index 8ca88578..4afcdcab 100644 --- a/actix-tls/src/accept/openssl.rs +++ b/actix-tls/src/accept/openssl.rs @@ -1,10 +1,13 @@ use std::{ future::Future, + io::{self, IoSlice}, + ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}, }; -use actix_codec::{AsyncRead, AsyncWrite}; +use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; +use actix_rt::net::{ActixStream, Ready}; use actix_service::{Service, ServiceFactory}; use actix_utils::counter::{Counter, CounterGuard}; use futures_core::{future::LocalBoxFuture, ready}; @@ -12,10 +15,82 @@ use futures_core::{future::LocalBoxFuture, ready}; pub use openssl::ssl::{ AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder, }; -pub use tokio_openssl::SslStream; use super::MAX_CONN_COUNTER; +/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait. +pub struct TlsStream(tokio_openssl::SslStream); + +impl From> for TlsStream { + fn from(stream: tokio_openssl::SslStream) -> Self { + Self(stream) + } +} + +impl Deref for TlsStream { + type Target = tokio_openssl::SslStream; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for TlsStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl AsyncRead for TlsStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_read(cx, buf) + } +} + +impl AsyncWrite for TlsStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_shutdown(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + (&**self).is_write_vectored() + } +} + +impl ActixStream for TlsStream { + fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + T::poll_read_ready((&**self).get_ref(), cx) + } + + fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + T::poll_write_ready((&**self).get_ref(), cx) + } +} + /// Accept TLS connections via `openssl` package. /// /// `openssl` feature enables this `Acceptor` type. @@ -40,11 +115,8 @@ impl Clone for Acceptor { } } -impl ServiceFactory for Acceptor -where - T: AsyncRead + AsyncWrite + Unpin + 'static, -{ - type Response = SslStream; +impl ServiceFactory for Acceptor { + type Response = TlsStream; type Error = SslError; type Config = (); type Service = AcceptorService; @@ -67,11 +139,8 @@ pub struct AcceptorService { conns: Counter, } -impl Service for AcceptorService -where - T: AsyncRead + AsyncWrite + Unpin + 'static, -{ - type Response = SslStream; +impl Service for AcceptorService { + type Response = TlsStream; type Error = SslError; type Future = AcceptorServiceResponse; @@ -88,24 +157,25 @@ where let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid."); AcceptorServiceResponse { _guard: self.conns.get(), - stream: Some(SslStream::new(ssl, io).unwrap()), + stream: Some(tokio_openssl::SslStream::new(ssl, io).unwrap()), } } } -pub struct AcceptorServiceResponse -where - T: AsyncRead + AsyncWrite, -{ - stream: Option>, +pub struct AcceptorServiceResponse { + stream: Option>, _guard: CounterGuard, } -impl Future for AcceptorServiceResponse { - type Output = Result, SslError>; +impl Future for AcceptorServiceResponse { + type Output = Result, SslError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?; - Poll::Ready(Ok(self.stream.take().expect("SSL connect has resolved."))) + Poll::Ready(Ok(self + .stream + .take() + .expect("SSL connect has resolved.") + .into())) } } diff --git a/actix-tls/src/accept/rustls.rs b/actix-tls/src/accept/rustls.rs index ff5cf3e5..ffac687a 100644 --- a/actix-tls/src/accept/rustls.rs +++ b/actix-tls/src/accept/rustls.rs @@ -1,22 +1,96 @@ use std::{ future::Future, - io, + io::{self, IoSlice}, + ops::{Deref, DerefMut}, pin::Pin, sync::Arc, task::{Context, Poll}, }; -use actix_codec::{AsyncRead, AsyncWrite}; +use actix_codec::{AsyncRead, AsyncWrite, ReadBuf}; +use actix_rt::net::{ActixStream, Ready}; use actix_service::{Service, ServiceFactory}; use actix_utils::counter::{Counter, CounterGuard}; use futures_core::future::LocalBoxFuture; use tokio_rustls::{Accept, TlsAcceptor}; pub use tokio_rustls::rustls::{ServerConfig, Session}; -pub use tokio_rustls::server::TlsStream; use super::MAX_CONN_COUNTER; +/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait. +pub struct TlsStream(tokio_rustls::server::TlsStream); + +impl From> for TlsStream { + fn from(stream: tokio_rustls::server::TlsStream) -> Self { + Self(stream) + } +} + +impl Deref for TlsStream { + type Target = tokio_rustls::server::TlsStream; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for TlsStream { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl AsyncRead for TlsStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_read(cx, buf) + } +} + +impl AsyncWrite for TlsStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_shutdown(cx) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + (&**self).is_write_vectored() + } +} + +impl ActixStream for TlsStream { + fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + T::poll_read_ready((&**self).get_ref().0, cx) + } + + fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + T::poll_write_ready((&**self).get_ref().0, cx) + } +} + /// Accept TLS connections via `rustls` package. /// /// `rustls` feature enables this `Acceptor` type. @@ -43,10 +117,7 @@ impl Clone for Acceptor { } } -impl ServiceFactory for Acceptor -where - T: AsyncRead + AsyncWrite + Unpin, -{ +impl ServiceFactory for Acceptor { type Response = TlsStream; type Error = io::Error; type Config = (); @@ -72,10 +143,7 @@ pub struct AcceptorService { conns: Counter, } -impl Service for AcceptorService -where - T: AsyncRead + AsyncWrite + Unpin, -{ +impl Service for AcceptorService { type Response = TlsStream; type Error = io::Error; type Future = AcceptorServiceFut; @@ -96,22 +164,16 @@ where } } -pub struct AcceptorServiceFut -where - T: AsyncRead + AsyncWrite + Unpin, -{ +pub struct AcceptorServiceFut { fut: Accept, _guard: CounterGuard, } -impl Future for AcceptorServiceFut -where - T: AsyncRead + AsyncWrite + Unpin, -{ +impl Future for AcceptorServiceFut { type Output = Result, io::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - Pin::new(&mut this.fut).poll(cx) + Pin::new(&mut this.fut).poll(cx).map_ok(TlsStream) } } diff --git a/actix-tls/src/connect/connect.rs b/actix-tls/src/connect/connect.rs index 9e5d417f..bd4b3fdf 100755 --- a/actix-tls/src/connect/connect.rs +++ b/actix-tls/src/connect/connect.rs @@ -3,7 +3,7 @@ use std::{ fmt, iter::{self, FromIterator as _}, mem, - net::SocketAddr, + net::{IpAddr, SocketAddr}, }; /// Parse a host into parts (hostname and port). @@ -67,6 +67,7 @@ pub struct Connect { pub(crate) req: T, pub(crate) port: u16, pub(crate) addr: ConnectAddrs, + pub(crate) local_addr: Option, } impl Connect { @@ -78,6 +79,7 @@ impl Connect { req, port: port.unwrap_or(0), addr: ConnectAddrs::None, + local_addr: None, } } @@ -88,6 +90,7 @@ impl Connect { req, port: 0, addr: ConnectAddrs::One(addr), + local_addr: None, } } @@ -119,6 +122,12 @@ impl Connect { self } + /// Set local_addr of connect. + pub fn set_local_addr(mut self, addr: impl Into) -> Self { + self.local_addr = Some(addr.into()); + self + } + /// Get hostname. pub fn hostname(&self) -> &str { self.req.hostname() @@ -285,7 +294,7 @@ fn parse_host(host: &str) -> (&str, Option) { #[cfg(test)] mod tests { - use std::net::{IpAddr, Ipv4Addr}; + use std::net::Ipv4Addr; use super::*; @@ -329,4 +338,13 @@ mod tests { let mut iter = ConnectAddrsIter::None; assert_eq!(iter.next(), None); } + + #[test] + fn test_local_addr() { + let conn = Connect::new("hello").set_local_addr([127, 0, 0, 1]); + assert_eq!( + conn.local_addr.unwrap(), + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) + ) + } } diff --git a/actix-tls/src/connect/connector.rs b/actix-tls/src/connect/connector.rs index 9acb1dd5..8f32270f 100755 --- a/actix-tls/src/connect/connector.rs +++ b/actix-tls/src/connect/connector.rs @@ -2,12 +2,12 @@ use std::{ collections::VecDeque, future::Future, io, - net::SocketAddr, + net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6}, pin::Pin, task::{Context, Poll}, }; -use actix_rt::net::TcpStream; +use actix_rt::net::{TcpSocket, TcpStream}; use actix_service::{Service, ServiceFactory}; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, trace}; @@ -54,9 +54,14 @@ impl Service> for TcpConnector { fn call(&self, req: Connect) -> Self::Future { let port = req.port(); - let Connect { req, addr, .. } = req; + let Connect { + req, + addr, + local_addr, + .. + } = req; - TcpConnectorResponse::new(req, port, addr) + TcpConnectorResponse::new(req, port, local_addr, addr) } } @@ -65,6 +70,7 @@ pub enum TcpConnectorResponse { Response { req: Option, port: u16, + local_addr: Option, addrs: Option>, stream: Option>>, }, @@ -72,7 +78,12 @@ pub enum TcpConnectorResponse { } impl TcpConnectorResponse { - pub(crate) fn new(req: T, port: u16, addr: ConnectAddrs) -> TcpConnectorResponse { + pub(crate) fn new( + req: T, + port: u16, + local_addr: Option, + addr: ConnectAddrs, + ) -> TcpConnectorResponse { if addr.is_none() { error!("TCP connector: unresolved connection address"); return TcpConnectorResponse::Error(Some(ConnectError::Unresolved)); @@ -90,8 +101,9 @@ impl TcpConnectorResponse { ConnectAddrs::One(addr) => TcpConnectorResponse::Response { req: Some(req), port, + local_addr, addrs: None, - stream: Some(ReusableBoxFuture::new(TcpStream::connect(addr))), + stream: Some(ReusableBoxFuture::new(connect(addr, local_addr))), }, // when resolver returns multiple socket addr for request they would be popped from @@ -99,6 +111,7 @@ impl TcpConnectorResponse { ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response { req: Some(req), port, + local_addr, addrs: Some(addrs), stream: None, }, @@ -116,6 +129,7 @@ impl Future for TcpConnectorResponse { TcpConnectorResponse::Response { req, port, + local_addr, addrs, stream, } => loop { @@ -148,11 +162,38 @@ impl Future for TcpConnectorResponse { // try to connect let addr = addrs.as_mut().unwrap().pop_front().unwrap(); + let fut = connect(addr, *local_addr); match stream { - Some(rbf) => rbf.set(TcpStream::connect(addr)), - None => *stream = Some(ReusableBoxFuture::new(TcpStream::connect(addr))), + Some(rbf) => rbf.set(fut), + None => *stream = Some(ReusableBoxFuture::new(fut)), } }, } } } + +async fn connect(addr: SocketAddr, local_addr: Option) -> io::Result { + // use local addr if connect asks for it. + match local_addr { + Some(ip_addr) => { + let socket = match ip_addr { + IpAddr::V4(ip_addr) => { + let socket = TcpSocket::new_v4()?; + let addr = SocketAddr::V4(SocketAddrV4::new(ip_addr, 0)); + socket.bind(addr)?; + socket + } + IpAddr::V6(ip_addr) => { + let socket = TcpSocket::new_v6()?; + let addr = SocketAddr::V6(SocketAddrV6::new(ip_addr, 0, 0, 0)); + socket.bind(addr)?; + socket + } + }; + + socket.connect(addr).await + } + + None => TcpStream::connect(addr).await, + } +} diff --git a/actix-tls/tests/test_connect.rs b/actix-tls/tests/test_connect.rs index 7ee7afda..564151ce 100755 --- a/actix-tls/tests/test_connect.rs +++ b/actix-tls/tests/test_connect.rs @@ -1,4 +1,9 @@ -use std::io; +#![cfg(feature = "connect")] + +use std::{ + io, + net::{IpAddr, Ipv4Addr}, +}; use actix_codec::{BytesCodec, Framed}; use actix_rt::net::TcpStream; @@ -9,7 +14,7 @@ use futures_util::sink::SinkExt; use actix_tls::connect::{self as actix_connect, Connect}; -#[cfg(all(feature = "connect", feature = "openssl"))] +#[cfg(feature = "openssl")] #[actix_rt::test] async fn test_string() { let srv = TestServer::with(|| { @@ -125,3 +130,25 @@ async fn test_rustls_uri() { let con = conn.call(addr.into()).await.unwrap(); assert_eq!(con.peer_addr().unwrap(), srv.addr()); } + +#[actix_rt::test] +async fn test_local_addr() { + let srv = TestServer::with(|| { + fn_service(|io: TcpStream| async { + let mut framed = Framed::new(io, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + }) + }); + + let conn = actix_connect::default_connector(); + let local = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3)); + + let (con, _) = conn + .call(Connect::with_addr("10", srv.addr()).set_local_addr(local)) + .await + .unwrap() + .into_parts(); + + assert_eq!(con.local_addr().unwrap().ip(), local) +} diff --git a/actix-tls/tests/test_resolvers.rs b/actix-tls/tests/test_resolvers.rs index 0f49c486..40ee21fa 100644 --- a/actix-tls/tests/test_resolvers.rs +++ b/actix-tls/tests/test_resolvers.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "connect")] + use std::{ io, net::{Ipv4Addr, SocketAddr}, diff --git a/actix-tracing/Cargo.toml b/actix-tracing/Cargo.toml index 60ad1454..7f043f4b 100644 --- a/actix-tracing/Cargo.toml +++ b/actix-tracing/Cargo.toml @@ -16,7 +16,7 @@ name = "actix_tracing" path = "src/lib.rs" [dependencies] -actix-service = "2.0.0-beta.4" +actix-service = "2.0.0-beta.5" futures-util = { version = "0.3.4", default-features = false } tracing = "0.1" diff --git a/actix-tracing/src/lib.rs b/actix-tracing/src/lib.rs index b34f40d6..89e93be1 100644 --- a/actix-tracing/src/lib.rs +++ b/actix-tracing/src/lib.rs @@ -7,7 +7,7 @@ use core::marker::PhantomData; use actix_service::{ - apply, dev::ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform, + apply, ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform, }; use futures_util::future::{ok, Either, Ready}; use tracing_futures::{Instrument, Instrumented}; diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index a7871612..8d97b741 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,6 +1,15 @@ # Changes ## Unreleased - 2021-xx-xx +* Add `async fn mpsc::Receiver::recv`. [#286] +* `SendError` inner field is now public. [#286] +* Rename `Dispatcher::{get_sink => tx}`. [#286] +* Rename `Dispatcher::{get_ref => service}`. [#286] +* Rename `Dispatcher::{get_mut => service_mut}`. [#286] +* Rename `Dispatcher::{get_framed => framed}`. [#286] +* Rename `Dispatcher::{get_framed_mut => framed_mut}`. [#286] + +[#286]: https://github.com/actix/actix-net/pull/286 ## 3.0.0-beta.2 - 2021-02-06 diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index da46256e..9c21dd1b 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" [dependencies] actix-codec = "0.4.0-beta.1" actix-rt = { version = "2.0.0", default-features = false } -actix-service = "2.0.0-beta.4" +actix-service = "2.0.0-beta.5" futures-core = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false } diff --git a/actix-utils/src/dispatcher.rs b/actix-utils/src/dispatcher.rs index 1e55aa2c..94ac9971 100644 --- a/actix-utils/src/dispatcher.rs +++ b/actix-utils/src/dispatcher.rs @@ -163,29 +163,28 @@ where } } - /// Get sink - pub fn get_sink(&self) -> mpsc::Sender, S::Error>> { + /// Get sender handle. + pub fn tx(&self) -> mpsc::Sender, S::Error>> { self.tx.clone() } /// Get reference to a service wrapped by `Dispatcher` instance. - pub fn get_ref(&self) -> &S { + pub fn service(&self) -> &S { &self.service } /// Get mutable reference to a service wrapped by `Dispatcher` instance. - pub fn get_mut(&mut self) -> &mut S { + pub fn service_mut(&mut self) -> &mut S { &mut self.service } - /// Get reference to a framed instance wrapped by `Dispatcher` - /// instance. - pub fn get_framed(&self) -> &Framed { + /// Get reference to a framed instance wrapped by `Dispatcher` instance. + pub fn framed(&self) -> &Framed { &self.framed } /// Get mutable reference to a framed instance wrapped by `Dispatcher` instance. - pub fn get_framed_mut(&mut self) -> &mut Framed { + pub fn framed_mut(&mut self) -> &mut Framed { &mut self.framed } @@ -268,7 +267,7 @@ where if !this.framed.is_write_buf_empty() { match this.framed.flush(cx) { Poll::Pending => break, - Poll::Ready(Ok(_)) => (), + Poll::Ready(Ok(_)) => {} Poll::Ready(Err(err)) => { debug!("Error sending data: {:?}", err); *this.state = State::FramedError(DispatcherError::Encoder(err)); @@ -318,14 +317,13 @@ where } State::FlushAndStop => { if !this.framed.is_write_buf_empty() { - match this.framed.flush(cx) { - Poll::Ready(Err(err)) => { + this.framed.flush(cx).map(|res| { + if let Err(err) = res { debug!("Error sending data: {:?}", err); - Poll::Ready(Ok(())) } - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - } + + Ok(()) + }) } else { Poll::Ready(Ok(())) } diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index 5c10bac6..6658cba8 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -8,5 +8,8 @@ pub mod counter; pub mod dispatcher; pub mod mpsc; +mod poll_fn; pub mod task; pub mod timeout; + +use self::poll_fn::poll_fn; diff --git a/actix-utils/src/mpsc.rs b/actix-utils/src/mpsc.rs index 2f2b3f04..9c7a5a0e 100644 --- a/actix-utils/src/mpsc.rs +++ b/actix-utils/src/mpsc.rs @@ -1,31 +1,35 @@ //! A multi-producer, single-consumer, futures-aware, FIFO queue. -use core::any::Any; -use core::cell::RefCell; -use core::fmt; -use core::pin::Pin; -use core::task::{Context, Poll}; +use core::{ + cell::RefCell, + fmt, + pin::Pin, + task::{Context, Poll}, +}; -use std::collections::VecDeque; -use std::error::Error; -use std::rc::Rc; +use std::{collections::VecDeque, error::Error, rc::Rc}; use futures_core::stream::Stream; use futures_sink::Sink; -use crate::task::LocalWaker; +use crate::{poll_fn, task::LocalWaker}; /// Creates a unbounded in-memory channel with buffered storage. +/// +/// [Sender]s and [Receiver]s are `!Send`. pub fn channel() -> (Sender, Receiver) { let shared = Rc::new(RefCell::new(Shared { has_receiver: true, buffer: VecDeque::new(), blocked_recv: LocalWaker::new(), })); + let sender = Sender { shared: shared.clone(), }; + let receiver = Receiver { shared }; + (sender, receiver) } @@ -50,18 +54,22 @@ impl Sender { /// Sends the provided message along this channel. pub fn send(&self, item: T) -> Result<(), SendError> { let mut shared = self.shared.borrow_mut(); + if !shared.has_receiver { - return Err(SendError(item)); // receiver was dropped + // receiver was dropped + return Err(SendError(item)); }; + shared.buffer.push_back(item); shared.blocked_recv.wake(); + Ok(()) } - /// Closes the sender half + /// Closes the sender half. /// - /// This prevents any further messages from being sent on the channel while - /// still enabling the receiver to drain messages that are buffered. + /// This prevents any further messages from being sent on the channel, by any sender, while + /// still enabling the receiver to drain messages that are already buffered. pub fn close(&mut self) { self.shared.borrow_mut().has_receiver = false; } @@ -110,14 +118,24 @@ impl Drop for Sender { /// The receiving end of a channel which implements the `Stream` trait. /// -/// This is created by the `channel` function. +/// This is created by the [`channel`] function. #[derive(Debug)] pub struct Receiver { shared: Rc>>, } impl Receiver { - /// Create Sender + /// Receive the next value. + /// + /// Returns `None` if the channel is empty and has been [closed](Sender::close) explicitly or + /// when all senders have been dropped and, therefore, no more values can ever be sent though + /// this channel. + pub async fn recv(&mut self) -> Option { + let mut this = Pin::new(self); + poll_fn(|cx| this.as_mut().poll_next(cx)).await + } + + /// Create an associated [Sender]. pub fn sender(&self) -> Sender { Sender { shared: self.shared.clone(), @@ -132,11 +150,13 @@ impl Stream for Receiver { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut shared = self.shared.borrow_mut(); + if Rc::strong_count(&self.shared) == 1 { - // All senders have been dropped, so drain the buffer and end the - // stream. - Poll::Ready(shared.buffer.pop_front()) - } else if let Some(msg) = shared.buffer.pop_front() { + // All senders have been dropped, so drain the buffer and end the stream. + return Poll::Ready(shared.buffer.pop_front()); + } + + if let Some(msg) = shared.buffer.pop_front() { Poll::Ready(Some(msg)) } else { shared.blocked_recv.register(cx.waker()); @@ -153,9 +173,15 @@ impl Drop for Receiver { } } -/// Error type for sending, used when the receiving end of a channel is -/// dropped -pub struct SendError(T); +/// Error returned when attempting to send after the channels' [Receiver] is dropped or closed. +pub struct SendError(pub T); + +impl SendError { + /// Returns the message that was attempted to be sent but failed. + pub fn into_inner(self) -> T { + self.0 + } +} impl fmt::Debug for SendError { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -169,18 +195,7 @@ impl fmt::Display for SendError { } } -impl Error for SendError { - fn description(&self) -> &str { - "send failed because receiver is gone" - } -} - -impl SendError { - /// Returns the message that was attempted to be sent but failed. - pub fn into_inner(self) -> T { - self.0 - } -} +impl Error for SendError {} #[cfg(test)] mod tests { @@ -221,4 +236,18 @@ mod tests { assert!(tx.send("test").is_err()); assert!(tx2.send("test").is_err()); } + + #[actix_rt::test] + async fn test_recv() { + let (tx, mut rx) = channel(); + tx.send("test").unwrap(); + assert_eq!(rx.recv().await.unwrap(), "test"); + drop(tx); + + let (tx, mut rx) = channel(); + tx.send("test").unwrap(); + assert_eq!(rx.recv().await.unwrap(), "test"); + drop(tx); + assert!(rx.recv().await.is_none()); + } } diff --git a/actix-utils/src/poll_fn.rs b/actix-utils/src/poll_fn.rs new file mode 100644 index 00000000..2180f4a4 --- /dev/null +++ b/actix-utils/src/poll_fn.rs @@ -0,0 +1,65 @@ +//! Simple "poll function" future and factory. + +use core::{ + fmt, + future::Future, + task::{self, Poll}, +}; +use std::pin::Pin; + +/// Create a future driven by the provided function that receives a task context. +pub(crate) fn poll_fn(f: F) -> PollFn +where + F: FnMut(&mut task::Context<'_>) -> Poll, +{ + PollFn { f } +} + +/// A Future driven by the inner function. +pub(crate) struct PollFn { + f: F, +} + +impl Unpin for PollFn {} + +impl fmt::Debug for PollFn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PollFn").finish() + } +} + +impl Future for PollFn +where + F: FnMut(&mut task::Context<'_>) -> task::Poll, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + (self.f)(cx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[actix_rt::test] + async fn test_poll_fn() { + let res = poll_fn(|_| Poll::Ready(42)).await; + assert_eq!(res, 42); + + let mut i = 5; + let res = poll_fn(|cx| { + i -= 1; + + if i > 0 { + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(42) + } + }) + .await; + assert_eq!(res, 42); + } +} diff --git a/actix-utils/src/task.rs b/actix-utils/src/task.rs index 2a3469cf..507bfc14 100644 --- a/actix-utils/src/task.rs +++ b/actix-utils/src/task.rs @@ -1,49 +1,33 @@ -use core::cell::UnsafeCell; -use core::fmt; -use core::marker::PhantomData; -use core::task::Waker; +use core::{cell::Cell, fmt, marker::PhantomData, task::Waker}; /// A synchronization primitive for task wakeup. /// -/// Sometimes the task interested in a given event will change over time. -/// An `LocalWaker` can coordinate concurrent notifications with the consumer -/// potentially "updating" the underlying task to wake up. This is useful in -/// scenarios where a computation completes in another task and wants to -/// notify the consumer, but the consumer is in the process of being migrated to -/// a new logical task. +/// Sometimes the task interested in a given event will change over time. A `LocalWaker` can +/// coordinate concurrent notifications with the consumer, potentially "updating" the underlying +/// task to wake up. This is useful in scenarios where a computation completes in another task and +/// wants to notify the consumer, but the consumer is in the process of being migrated to a new +/// logical task. /// -/// Consumers should call `register` before checking the result of a computation -/// and producers should call `wake` after producing the computation (this -/// differs from the usual `thread::park` pattern). It is also permitted for -/// `wake` to be called **before** `register`. This results in a no-op. +/// Consumers should call [`register`] before checking the result of a computation and producers +/// should call [`wake`] after producing the computation (this differs from the usual `thread::park` +/// pattern). It is also permitted for [`wake`] to be called _before_ [`register`]. This results in +/// a no-op. /// -/// A single `AtomicWaker` may be reused for any number of calls to `register` or -/// `wake`. -// TODO: Refactor to Cell when remove deprecated methods (@botika) +/// A single `LocalWaker` may be reused for any number of calls to [`register`] or [`wake`]. +/// +/// [`register`]: LocalWaker::register +/// [`wake`]: LocalWaker::wake #[derive(Default)] pub struct LocalWaker { - pub(crate) waker: UnsafeCell>, + pub(crate) waker: Cell>, // mark LocalWaker as a !Send type. - _t: PhantomData<*const ()>, + _phantom: PhantomData<*const ()>, } impl LocalWaker { - /// Create an `LocalWaker`. + /// Creates a new, empty `LocalWaker`. pub fn new() -> Self { - LocalWaker { - waker: UnsafeCell::new(None), - _t: PhantomData, - } - } - - #[deprecated( - since = "2.1.0", - note = "In favor of `wake`. State of the register doesn't matter at `wake` up" - )] - /// Check if waker has been registered. - #[inline] - pub fn is_registered(&self) -> bool { - unsafe { (*self.waker.get()).is_some() } + LocalWaker::default() } /// Registers the waker to be notified on calls to `wake`. @@ -51,11 +35,8 @@ impl LocalWaker { /// Returns `true` if waker was registered before. #[inline] pub fn register(&self, waker: &Waker) -> bool { - unsafe { - let w = self.waker.get(); - let last_waker = w.replace(Some(waker.clone())); - last_waker.is_some() - } + let last_waker = self.waker.replace(Some(waker.clone())); + last_waker.is_some() } /// Calls `wake` on the last `Waker` passed to `register`. @@ -73,7 +54,7 @@ impl LocalWaker { /// If a waker has not been registered, this returns `None`. #[inline] pub fn take(&self) -> Option { - unsafe { (*self.waker.get()).take() } + self.waker.take() } } diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index 9304e5f6..f13c7ffa 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -197,7 +197,6 @@ where #[cfg(test)] mod tests { - use core::task::Poll; use core::time::Duration; use super::*;