mirror of https://github.com/fafhrd91/actix-net
use tokio::runtime::Handle block on worker thread
This commit is contained in:
commit
75068a1ba0
|
@ -0,0 +1,131 @@
|
|||
name: CI
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, synchronize, reopened]
|
||||
push:
|
||||
branches: [master]
|
||||
|
||||
jobs:
|
||||
build_and_test:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
target:
|
||||
- { name: Linux, os: ubuntu-latest, triple: x86_64-unknown-linux-gnu }
|
||||
- { name: macOS, os: macos-latest, triple: x86_64-apple-darwin }
|
||||
- { name: Windows, os: windows-latest, triple: x86_64-pc-windows-msvc }
|
||||
- { name: Windows (MinGW), os: windows-latest, triple: x86_64-pc-windows-gnu }
|
||||
- { name: Windows (32-bit), os: windows-latest, triple: i686-pc-windows-msvc }
|
||||
version:
|
||||
- 1.46.0 # MSRV
|
||||
- stable
|
||||
- nightly
|
||||
|
||||
name: ${{ matrix.target.name }} / ${{ matrix.version }}
|
||||
runs-on: ${{ matrix.target.os }}
|
||||
|
||||
env:
|
||||
VCPKGRS_DYNAMIC: 1
|
||||
|
||||
steps:
|
||||
- name: Setup Routing
|
||||
if: matrix.target.os == 'macos-latest'
|
||||
run: sudo ifconfig lo0 alias 127.0.0.3
|
||||
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
# install OpenSSL on Windows
|
||||
- name: Set vcpkg root
|
||||
if: matrix.target.triple == 'x86_64-pc-windows-msvc' || matrix.target.triple == 'i686-pc-windows-msvc'
|
||||
run: echo "VCPKG_ROOT=$env:VCPKG_INSTALLATION_ROOT" | Out-File -FilePath $env:GITHUB_ENV -Append
|
||||
- name: Install OpenSSL
|
||||
if: matrix.target.triple == 'x86_64-pc-windows-msvc'
|
||||
run: vcpkg install openssl:x64-windows
|
||||
- name: Install OpenSSL
|
||||
if: matrix.target.triple == 'i686-pc-windows-msvc'
|
||||
run: vcpkg install openssl:x86-windows
|
||||
|
||||
- name: Install ${{ matrix.version }}
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ matrix.version }}-${{ matrix.target.triple }}
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
# - name: Install MSYS2
|
||||
# if: matrix.target.triple == 'x86_64-pc-windows-gnu'
|
||||
# uses: msys2/setup-msys2@v2
|
||||
# - name: Install MinGW Packages
|
||||
# if: matrix.target.triple == 'x86_64-pc-windows-gnu'
|
||||
# run: |
|
||||
# msys2 -c 'pacman -Sy --noconfirm pacman'
|
||||
# msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
|
||||
|
||||
# - name: Generate Cargo.lock
|
||||
# uses: actions-rs/cargo@v1
|
||||
# with:
|
||||
# command: generate-lockfile
|
||||
# - name: Cache Dependencies
|
||||
# uses: Swatinem/rust-cache@v1.2.0
|
||||
|
||||
- name: Install cargo-hack
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: install
|
||||
args: cargo-hack
|
||||
|
||||
- name: check minimal
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: hack
|
||||
args: check --workspace --no-default-features
|
||||
|
||||
- name: check minimal + tests
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: hack
|
||||
args: check --workspace --no-default-features --tests --examples
|
||||
|
||||
- name: check default
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: check
|
||||
args: --workspace --tests --examples
|
||||
|
||||
- name: check full
|
||||
# TODO: compile OpenSSL and run tests on MinGW
|
||||
if: matrix.target.triple != 'x86_64-pc-windows-gnu'
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: check
|
||||
args: --workspace --all-features --tests --examples
|
||||
|
||||
- name: tests
|
||||
if: matrix.target.triple != 'x86_64-pc-windows-gnu'
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: test
|
||||
args: --workspace --all-features --no-fail-fast -- --nocapture
|
||||
|
||||
- name: Generate coverage file
|
||||
if: >
|
||||
matrix.target.os == 'ubuntu-latest'
|
||||
&& matrix.version == 'stable'
|
||||
&& github.ref == 'refs/heads/master'
|
||||
run: |
|
||||
cargo install cargo-tarpaulin
|
||||
cargo tarpaulin --out Xml --verbose
|
||||
- name: Upload to Codecov
|
||||
if: >
|
||||
matrix.target.os == 'ubuntu-latest'
|
||||
&& matrix.version == 'stable'
|
||||
&& github.ref == 'refs/heads/master'
|
||||
uses: codecov/codecov-action@v1
|
||||
with:
|
||||
file: cobertura.xml
|
||||
|
||||
- name: Clear the cargo caches
|
||||
run: |
|
||||
cargo install cargo-cache --no-default-features --features ci-autoclean
|
||||
cargo-cache
|
|
@ -1,34 +1,42 @@
|
|||
name: Lint
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, synchronize, reopened]
|
||||
|
||||
name: Clippy and rustfmt Check
|
||||
jobs:
|
||||
clippy_check:
|
||||
fmt:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- uses: actions-rs/toolchain@v1
|
||||
- name: Install Rust
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: stable
|
||||
components: rustfmt
|
||||
profile: minimal
|
||||
components: rustfmt
|
||||
override: true
|
||||
- name: Check with rustfmt
|
||||
- name: Rustfmt Check
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: fmt
|
||||
args: --all -- --check
|
||||
|
||||
- uses: actions-rs/toolchain@v1
|
||||
clippy:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install Rust
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: nightly
|
||||
components: clippy
|
||||
toolchain: stable
|
||||
profile: minimal
|
||||
components: clippy
|
||||
override: true
|
||||
- name: Check with Clippy
|
||||
- name: Clippy Check
|
||||
uses: actions-rs/clippy-check@v1
|
||||
with:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
args: --workspace --tests
|
||||
args: --workspace --tests --all-features
|
||||
|
|
|
@ -1,82 +0,0 @@
|
|||
name: CI (Linux)
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, synchronize, reopened]
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- '1.0'
|
||||
|
||||
jobs:
|
||||
build_and_test:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
version:
|
||||
- 1.46.0
|
||||
- stable
|
||||
- nightly
|
||||
|
||||
name: ${{ matrix.version }} - x86_64-unknown-linux-gnu
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install ${{ matrix.version }}
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ matrix.version }}-x86_64-unknown-linux-gnu
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
- name: Generate Cargo.lock
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: generate-lockfile
|
||||
- name: Cache cargo dirs
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path:
|
||||
~/.cargo/registry
|
||||
~/.cargo/git
|
||||
~/.cargo/bin
|
||||
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-trimmed-${{ hashFiles('**/Cargo.lock') }}
|
||||
- name: Cache cargo build
|
||||
uses: actions/cache@v2
|
||||
with:
|
||||
path: target
|
||||
key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
|
||||
|
||||
- name: check build
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: check
|
||||
args: --workspace --bins --examples --tests
|
||||
|
||||
- name: tests
|
||||
uses: actions-rs/cargo@v1
|
||||
timeout-minutes: 40
|
||||
with:
|
||||
command: test
|
||||
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture
|
||||
|
||||
- name: Generate coverage file
|
||||
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
|
||||
run: |
|
||||
cargo install cargo-tarpaulin
|
||||
cargo tarpaulin --out Xml --workspace
|
||||
|
||||
- name: Upload to Codecov
|
||||
if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
|
||||
uses: codecov/codecov-action@v1
|
||||
with:
|
||||
file: cobertura.xml
|
||||
|
||||
- name: Clear the cargo caches
|
||||
run: |
|
||||
rustup update stable
|
||||
rustup override set stable
|
||||
cargo install cargo-cache --no-default-features --features ci-autoclean
|
||||
cargo-cache
|
|
@ -1,43 +0,0 @@
|
|||
name: CI (macOS)
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, synchronize, reopened]
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- '1.0'
|
||||
|
||||
jobs:
|
||||
build_and_test:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
version:
|
||||
- stable
|
||||
- nightly
|
||||
|
||||
name: ${{ matrix.version }} - x86_64-apple-darwin
|
||||
runs-on: macos-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install ${{ matrix.version }}
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ matrix.version }}-x86_64-apple-darwin
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
- name: check build
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: check
|
||||
args: --workspace --bins --examples --tests
|
||||
|
||||
- name: tests
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: test
|
||||
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture
|
|
@ -0,0 +1,35 @@
|
|||
name: Upload documentation
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [master]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install Rust
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: nightly-x86_64-unknown-linux-gnu
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
- name: Build Docs
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: doc
|
||||
args: --workspace --all-features --no-deps
|
||||
|
||||
- name: Tweak HTML
|
||||
run: echo '<meta http-equiv="refresh" content="0;url=actix_server/index.html">' > target/doc/index.html
|
||||
|
||||
- name: Deploy to GitHub Pages
|
||||
uses: JamesIves/github-pages-deploy-action@3.7.1
|
||||
with:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
BRANCH: gh-pages
|
||||
FOLDER: target/doc
|
|
@ -1,45 +0,0 @@
|
|||
name: CI (Windows-mingw)
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, synchronize, reopened]
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- '1.0'
|
||||
|
||||
jobs:
|
||||
build_and_test:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
version:
|
||||
- stable
|
||||
- nightly
|
||||
|
||||
name: ${{ matrix.version }} - x86_64-pc-windows-gnu
|
||||
runs-on: windows-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install ${{ matrix.version }}
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ matrix.version }}-x86_64-pc-windows-gnu
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
- name: Install MSYS2
|
||||
uses: msys2/setup-msys2@v2
|
||||
|
||||
- name: Install packages
|
||||
run: |
|
||||
msys2 -c 'pacman -Sy --noconfirm pacman'
|
||||
msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
|
||||
|
||||
- name: check build
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: check
|
||||
args: --workspace --bins --examples --tests
|
|
@ -1,69 +0,0 @@
|
|||
name: CI (Windows)
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
types: [opened, synchronize, reopened]
|
||||
push:
|
||||
branches:
|
||||
- master
|
||||
- '1.0'
|
||||
|
||||
env:
|
||||
VCPKGRS_DYNAMIC: 1
|
||||
|
||||
jobs:
|
||||
build_and_test:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
version:
|
||||
- stable
|
||||
- nightly
|
||||
target:
|
||||
- x86_64-pc-windows-msvc
|
||||
- i686-pc-windows-msvc
|
||||
|
||||
name: ${{ matrix.version }} - ${{ matrix.target }}
|
||||
runs-on: windows-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
- name: Install ${{ matrix.version }}
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ matrix.version }}-${{ matrix.target }}
|
||||
profile: minimal
|
||||
override: true
|
||||
|
||||
- name: Install OpenSSL (x64)
|
||||
if: matrix.target == 'x86_64-pc-windows-msvc'
|
||||
run: |
|
||||
vcpkg integrate install
|
||||
vcpkg install openssl:x64-windows
|
||||
Get-ChildItem C:\vcpkg\installed\x64-windows\bin
|
||||
Get-ChildItem C:\vcpkg\installed\x64-windows\lib
|
||||
Copy-Item C:\vcpkg\installed\x64-windows\bin\libcrypto-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libcrypto.dll
|
||||
Copy-Item C:\vcpkg\installed\x64-windows\bin\libssl-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libssl.dll
|
||||
|
||||
- name: Install OpenSSL (x86)
|
||||
if: matrix.target == 'i686-pc-windows-msvc'
|
||||
run: |
|
||||
vcpkg integrate install
|
||||
vcpkg install openssl:x86-windows
|
||||
Get-ChildItem C:\vcpkg\installed\x86-windows\bin
|
||||
Get-ChildItem C:\vcpkg\installed\x86-windows\lib
|
||||
Copy-Item C:\vcpkg\installed\x86-windows\bin\libcrypto-1_1.dll C:\vcpkg\installed\x86-windows\bin\libcrypto.dll
|
||||
Copy-Item C:\vcpkg\installed\x86-windows\bin\libssl-1_1.dll C:\vcpkg\installed\x86-windows\bin\libssl.dll
|
||||
|
||||
- name: check build
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: check
|
||||
args: --workspace --bins --examples --tests
|
||||
|
||||
- name: tests
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
command: test
|
||||
args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture
|
|
@ -670,8 +670,6 @@ pub(crate) fn insert_slash(path: &str) -> String {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use http::Uri;
|
||||
use std::convert::TryFrom;
|
||||
|
||||
#[test]
|
||||
fn test_parse_static() {
|
||||
|
@ -833,8 +831,11 @@ mod tests {
|
|||
assert!(re.is_match("/user/2345/sdg"));
|
||||
}
|
||||
|
||||
#[cfg(feature = "http")]
|
||||
#[test]
|
||||
fn test_parse_urlencoded_param() {
|
||||
use std::convert::TryFrom;
|
||||
|
||||
let re = ResourceDef::new("/user/{id}/test");
|
||||
|
||||
let mut path = Path::new("/user/2345/test");
|
||||
|
@ -845,7 +846,7 @@ mod tests {
|
|||
assert!(re.match_path(&mut path));
|
||||
assert_eq!(path.get("id").unwrap(), "qwe%25");
|
||||
|
||||
let uri = Uri::try_from("/user/qwe%25/test").unwrap();
|
||||
let uri = http::Uri::try_from("/user/qwe%25/test").unwrap();
|
||||
let mut path = Path::new(uri);
|
||||
assert!(re.match_path(&mut path));
|
||||
assert_eq!(path.get("id").unwrap(), "qwe%25");
|
||||
|
|
|
@ -1,6 +1,17 @@
|
|||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* `ActixStream::{poll_read_ready, poll_write_ready}` would return `Ready` in Ok variant. [#293]
|
||||
|
||||
[#293] https://github.com/actix/actix-net/pull/293
|
||||
|
||||
|
||||
## 2.1.0 - 2021-02-24
|
||||
* Add `ActixStream` extension trait to include readiness methods. [#276]
|
||||
* Re-export `tokio::net::TcpSocket` in `net` module [#282]
|
||||
|
||||
[#276]: https://github.com/actix/actix-net/pull/276
|
||||
[#282]: https://github.com/actix/actix-net/pull/282
|
||||
|
||||
|
||||
## 2.0.2 - 2021-02-06
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "actix-rt"
|
||||
version = "2.0.2"
|
||||
version = "2.1.0"
|
||||
authors = [
|
||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||
"Rob Ede <robjtede@icloud.com>",
|
||||
|
@ -8,7 +8,7 @@ authors = [
|
|||
description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
|
||||
keywords = ["async", "futures", "io", "runtime"]
|
||||
homepage = "https://actix.rs"
|
||||
repository = "https://github.com/actix/actix-net.git"
|
||||
repository = "https://github.com/actix/actix-net"
|
||||
documentation = "https://docs.rs/actix-rt"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
|
@ -26,7 +26,7 @@ macros = ["actix-macros"]
|
|||
actix-macros = { version = "0.2.0", optional = true }
|
||||
|
||||
futures-core = { version = "0.3", default-features = false }
|
||||
tokio = { version = "1.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
|
||||
tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "1.2", features = ["full"] }
|
||||
|
|
|
@ -2,4 +2,13 @@
|
|||
|
||||
> Tokio-based single-threaded async runtime for the Actix ecosystem.
|
||||
|
||||
[](https://crates.io/crates/actix-rt)
|
||||
[](https://docs.rs/actix-rt/2.1.0)
|
||||
[](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
|
||||

|
||||
<br />
|
||||
[](https://deps.rs/crate/actix-rt/2.1.0)
|
||||

|
||||
[](https://discord.gg/WghFtEH6Hb)
|
||||
|
||||
See crate documentation for more: https://docs.rs/actix-rt.
|
||||
|
|
|
@ -70,13 +70,63 @@ pub mod signal {
|
|||
}
|
||||
|
||||
pub mod net {
|
||||
//! TCP/UDP/Unix bindings (Tokio re-exports).
|
||||
//! TCP/UDP/Unix bindings (mostly Tokio re-exports).
|
||||
|
||||
use std::{
|
||||
future::Future,
|
||||
io,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pub use tokio::io::Ready;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, Interest};
|
||||
pub use tokio::net::UdpSocket;
|
||||
pub use tokio::net::{TcpListener, TcpStream};
|
||||
pub use tokio::net::{TcpListener, TcpSocket, TcpStream};
|
||||
|
||||
#[cfg(unix)]
|
||||
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
|
||||
|
||||
/// Extension trait over async read+write types that can also signal readiness.
|
||||
pub trait ActixStream: AsyncRead + AsyncWrite + Unpin + 'static {
|
||||
/// Poll stream and check read readiness of Self.
|
||||
///
|
||||
/// See [tokio::net::TcpStream::poll_read_ready] for detail on intended use.
|
||||
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>;
|
||||
|
||||
/// Poll stream and check write readiness of Self.
|
||||
///
|
||||
/// See [tokio::net::TcpStream::poll_write_ready] for detail on intended use.
|
||||
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>>;
|
||||
}
|
||||
|
||||
impl ActixStream for TcpStream {
|
||||
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||
let ready = self.ready(Interest::READABLE);
|
||||
tokio::pin!(ready);
|
||||
ready.poll(cx)
|
||||
}
|
||||
|
||||
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||
let ready = self.ready(Interest::WRITABLE);
|
||||
tokio::pin!(ready);
|
||||
ready.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl ActixStream for UnixStream {
|
||||
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||
let ready = self.ready(Interest::READABLE);
|
||||
tokio::pin!(ready);
|
||||
ready.poll(cx)
|
||||
}
|
||||
|
||||
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||
let ready = self.ready(Interest::WRITABLE);
|
||||
tokio::pin!(ready);
|
||||
ready.poll(cx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod time {
|
||||
|
|
|
@ -12,7 +12,6 @@ repository = "https://github.com/actix/actix-net.git"
|
|||
documentation = "https://docs.rs/actix-server"
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
license = "MIT OR Apache-2.0"
|
||||
exclude = [".gitignore", ".cargo/config"]
|
||||
edition = "2018"
|
||||
|
||||
[lib]
|
||||
|
@ -25,7 +24,7 @@ default = []
|
|||
[dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-rt = { version = "2.0.0", default-features = false }
|
||||
actix-service = "2.0.0-beta.4"
|
||||
actix-service = "2.0.0-beta.5"
|
||||
actix-utils = "3.0.0-beta.2"
|
||||
|
||||
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
||||
|
@ -33,7 +32,7 @@ log = "0.4"
|
|||
mio = { version = "0.7.6", features = ["os-poll", "net"] }
|
||||
num_cpus = "1.13"
|
||||
slab = "0.4"
|
||||
tokio = { version = "1", features = ["sync"] }
|
||||
tokio = { version = "1.2", features = ["sync"] }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
|
|
|
@ -16,7 +16,7 @@ use std::sync::{
|
|||
use std::{env, io};
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_server::ServerHandle;
|
||||
use actix_server::Server;
|
||||
use actix_service::pipeline_factory;
|
||||
use bytes::BytesMut;
|
||||
use futures_util::future::ok;
|
||||
|
@ -36,7 +36,7 @@ async fn main() -> io::Result<()> {
|
|||
// Bind socket address and start worker(s). By default, the server uses the number of available
|
||||
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs
|
||||
// to return a service *factory*; so it can be created once per worker.
|
||||
ServerHandle::build()
|
||||
Server::build()
|
||||
.bind("echo", addr, move || {
|
||||
let count = Arc::clone(&count);
|
||||
let num2 = Arc::clone(&count);
|
|
@ -5,7 +5,7 @@ use log::{error, info};
|
|||
use mio::{Interest, Poll, Token as MioToken};
|
||||
use slab::Slab;
|
||||
|
||||
use crate::server_handle::ServerHandle;
|
||||
use crate::server_handle::Server;
|
||||
use crate::socket::{MioListener, SocketAddr};
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
|
||||
use crate::worker::{Conn, WorkerHandle};
|
||||
|
@ -29,7 +29,7 @@ pub(crate) struct Accept {
|
|||
poll: Poll,
|
||||
waker_queue: WakerQueue,
|
||||
handles: Vec<WorkerHandle>,
|
||||
srv: ServerHandle,
|
||||
srv: Server,
|
||||
next: usize,
|
||||
backpressure: bool,
|
||||
// poll time duration.
|
||||
|
@ -53,7 +53,7 @@ fn connection_error(e: &io::Error) -> bool {
|
|||
impl Accept {
|
||||
pub(crate) fn start<F>(
|
||||
sockets: Vec<(Token, MioListener)>,
|
||||
server_handle: ServerHandle,
|
||||
server_handle: Server,
|
||||
worker_factory: F,
|
||||
) -> WakerQueue
|
||||
where
|
||||
|
@ -92,7 +92,7 @@ impl Accept {
|
|||
waker_queue: WakerQueue,
|
||||
socks: Vec<(Token, MioListener)>,
|
||||
handles: Vec<WorkerHandle>,
|
||||
srv: ServerHandle,
|
||||
srv: Server,
|
||||
) -> (Accept, Slab<ServerSocketInfo>) {
|
||||
let mut sockets = Slab::new();
|
||||
for (hnd_token, mut lst) in socks.into_iter() {
|
||||
|
|
|
@ -14,7 +14,7 @@ use tokio::sync::oneshot;
|
|||
|
||||
use crate::accept::Accept;
|
||||
use crate::config::{ConfiguredService, ServiceConfig};
|
||||
use crate::server_handle::{ServerCommand, ServerHandle};
|
||||
use crate::server_handle::{Server, ServerCommand};
|
||||
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
|
||||
use crate::signals::{Signal, Signals};
|
||||
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
||||
|
@ -285,7 +285,7 @@ impl ServerBuilder {
|
|||
// start accept thread. return waker_queue for wake up it.
|
||||
let waker_queue = Accept::start(
|
||||
sockets,
|
||||
ServerHandle::new(self.cmd_tx.clone()),
|
||||
Server::new(self.cmd_tx.clone()),
|
||||
// closure for construct worker and return it's handler.
|
||||
|waker| {
|
||||
(0..self.threads)
|
||||
|
@ -349,8 +349,8 @@ impl ServerFuture {
|
|||
/// Obtain a Handle for ServerFuture that can be used to change state of actix server.
|
||||
///
|
||||
/// See [ServerHandle](crate::server::ServerHandle) for usage.
|
||||
pub fn handle(&self) -> ServerHandle {
|
||||
ServerHandle::new(self.cmd_tx.clone())
|
||||
pub fn handle(&self) -> Server {
|
||||
Server::new(self.cmd_tx.clone())
|
||||
}
|
||||
|
||||
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
|
||||
|
|
|
@ -15,9 +15,9 @@ mod test_server;
|
|||
mod waker_queue;
|
||||
mod worker;
|
||||
|
||||
pub use self::builder::ServerBuilder;
|
||||
pub use self::builder::{ServerBuilder, ServerFuture};
|
||||
pub use self::config::{ServiceConfig, ServiceRuntime};
|
||||
pub use self::server_handle::ServerHandle;
|
||||
pub use self::server_handle::Server;
|
||||
pub use self::service::ServiceFactory;
|
||||
pub use self::test_server::TestServer;
|
||||
|
||||
|
|
|
@ -25,14 +25,14 @@ pub(crate) enum ServerCommand {
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ServerHandle(
|
||||
pub struct Server(
|
||||
UnboundedSender<ServerCommand>,
|
||||
Option<oneshot::Receiver<()>>,
|
||||
);
|
||||
|
||||
impl ServerHandle {
|
||||
impl Server {
|
||||
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
|
||||
ServerHandle(tx, None)
|
||||
Server(tx, None)
|
||||
}
|
||||
|
||||
/// Start server building process
|
||||
|
@ -80,13 +80,13 @@ impl ServerHandle {
|
|||
}
|
||||
}
|
||||
|
||||
impl Clone for ServerHandle {
|
||||
impl Clone for Server {
|
||||
fn clone(&self) -> Self {
|
||||
Self(self.0.clone(), None)
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for ServerHandle {
|
||||
impl Future for Server {
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::{net, thread};
|
|||
|
||||
use actix_rt::{net::TcpStream, System};
|
||||
|
||||
use crate::{ServerBuilder, ServerHandle, ServiceFactory};
|
||||
use crate::{Server, ServerBuilder, ServiceFactory};
|
||||
|
||||
/// The `TestServer` type.
|
||||
///
|
||||
|
@ -51,7 +51,7 @@ impl TestServer {
|
|||
let sys = System::new();
|
||||
sys.block_on(async {
|
||||
actix_rt::spawn(async move {
|
||||
let _ = factory(ServerHandle::build())
|
||||
let _ = factory(Server::build())
|
||||
.workers(1)
|
||||
.disable_signals()
|
||||
.run()
|
||||
|
@ -84,7 +84,7 @@ impl TestServer {
|
|||
|
||||
sys.block_on(async {
|
||||
actix_rt::spawn(async move {
|
||||
let _ = ServerHandle::build()
|
||||
let _ = Server::build()
|
||||
.listen("test", tcp, factory)
|
||||
.unwrap()
|
||||
.workers(1)
|
||||
|
|
|
@ -8,7 +8,7 @@ use mio::{Registry, Token as MioToken, Waker};
|
|||
|
||||
use crate::worker::WorkerHandle;
|
||||
|
||||
/// waker token for `mio::Poll` instance
|
||||
/// Waker token for `mio::Poll` instance.
|
||||
pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
|
||||
|
||||
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
|
||||
|
@ -30,7 +30,7 @@ impl Deref for WakerQueue {
|
|||
}
|
||||
|
||||
impl WakerQueue {
|
||||
/// construct a waker queue with given `Poll`'s `Registry` and capacity.
|
||||
/// Construct a waker queue with given `Poll`'s `Registry` and capacity.
|
||||
///
|
||||
/// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match
|
||||
/// event's token for it to properly handle `WakerInterest`.
|
||||
|
@ -41,7 +41,7 @@ impl WakerQueue {
|
|||
Ok(Self(Arc::new((waker, queue))))
|
||||
}
|
||||
|
||||
/// push a new interest to the queue and wake up the accept poll afterwards.
|
||||
/// Push a new interest to the queue and wake up the accept poll afterwards.
|
||||
pub(crate) fn wake(&self, interest: WakerInterest) {
|
||||
let (waker, queue) = self.deref();
|
||||
|
||||
|
@ -55,20 +55,20 @@ impl WakerQueue {
|
|||
.unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e));
|
||||
}
|
||||
|
||||
/// get a MutexGuard of the waker queue.
|
||||
/// Get a MutexGuard of the waker queue.
|
||||
pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest>> {
|
||||
self.deref().1.lock().expect("Failed to lock WakerQueue")
|
||||
}
|
||||
|
||||
/// reset the waker queue so it does not grow infinitely.
|
||||
/// Reset the waker queue so it does not grow infinitely.
|
||||
pub(crate) fn reset(queue: &mut VecDeque<WakerInterest>) {
|
||||
std::mem::swap(&mut VecDeque::<WakerInterest>::with_capacity(16), queue);
|
||||
}
|
||||
}
|
||||
|
||||
/// types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
|
||||
/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
|
||||
///
|
||||
/// *. These interests should not be confused with `mio::Interest` and mostly not I/O related
|
||||
/// These interests should not be confused with `mio::Interest` and mostly not I/O related
|
||||
pub(crate) enum WakerInterest {
|
||||
/// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
|
||||
/// available and can accept new tasks.
|
||||
|
|
|
@ -199,62 +199,59 @@ impl ServerWorker {
|
|||
|
||||
availability.set(false);
|
||||
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
// every worker runs in it's own arbiter.
|
||||
// use a custom tokio runtime builder to change the settings of runtime.
|
||||
std::thread::spawn(move || {
|
||||
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
|
||||
rx,
|
||||
rx2,
|
||||
availability,
|
||||
factories,
|
||||
config,
|
||||
services: Vec::new(),
|
||||
conns: conns.clone(),
|
||||
state: WorkerState::Unavailable,
|
||||
});
|
||||
let fut = wrk
|
||||
.factories
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, factory)| {
|
||||
let fut = factory.create();
|
||||
async move {
|
||||
fut.await.map(|r| {
|
||||
r.into_iter().map(|(t, s)| (idx, t, s)).collect::<Vec<_>>()
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
handle.block_on(tokio::task::LocalSet::new().run_until(async move {
|
||||
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
|
||||
rx,
|
||||
rx2,
|
||||
availability,
|
||||
factories,
|
||||
config,
|
||||
services: Vec::new(),
|
||||
conns: conns.clone(),
|
||||
state: WorkerState::Unavailable,
|
||||
});
|
||||
let fut = wrk
|
||||
.factories
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(idx, factory)| {
|
||||
let fut = factory.create();
|
||||
async move {
|
||||
fut.await.map(|r| {
|
||||
r.into_iter().map(|(t, s)| (idx, t, s)).collect::<Vec<_>>()
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.max_blocking_threads(config.max_blocking_threads)
|
||||
.build()
|
||||
.unwrap()
|
||||
.block_on(tokio::task::LocalSet::new().run_until(async move {
|
||||
let res = join_all(fut)
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>, _>>();
|
||||
match res {
|
||||
Ok(services) => {
|
||||
for item in services {
|
||||
for (factory, token, service) in item {
|
||||
assert_eq!(token.0, wrk.services.len());
|
||||
wrk.services.push(WorkerService {
|
||||
factory,
|
||||
service,
|
||||
status: WorkerServiceStatus::Unavailable,
|
||||
});
|
||||
}
|
||||
let res = join_all(fut)
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>, _>>();
|
||||
match res {
|
||||
Ok(services) => {
|
||||
for item in services {
|
||||
for (factory, token, service) in item {
|
||||
assert_eq!(token.0, wrk.services.len());
|
||||
wrk.services.push(WorkerService {
|
||||
factory,
|
||||
service,
|
||||
status: WorkerServiceStatus::Unavailable,
|
||||
});
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Can not start worker: {:?}", e);
|
||||
}
|
||||
}
|
||||
wrk.await
|
||||
}))
|
||||
Err(e) => {
|
||||
error!("Can not start worker: {:?}", e);
|
||||
}
|
||||
}
|
||||
wrk.await
|
||||
}))
|
||||
});
|
||||
|
||||
WorkerHandle::new(idx, tx1, tx2, avail)
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed};
|
|||
use std::sync::{mpsc, Arc};
|
||||
use std::{net, thread, time};
|
||||
|
||||
use actix_server::ServerHandle;
|
||||
use actix_server::Server;
|
||||
use actix_service::fn_service;
|
||||
use futures_util::future::{lazy, ok};
|
||||
|
||||
|
@ -24,7 +24,7 @@ fn test_bind() {
|
|||
let sys = actix_rt::System::new();
|
||||
sys.block_on(async {
|
||||
actix_rt::spawn(async move {
|
||||
let _ = ServerHandle::build()
|
||||
let _ = Server::build()
|
||||
.workers(1)
|
||||
.disable_signals()
|
||||
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
|
||||
|
@ -54,7 +54,7 @@ fn test_listen() {
|
|||
let lst = net::TcpListener::bind(addr).unwrap();
|
||||
sys.block_on(async {
|
||||
actix_rt::spawn(async move {
|
||||
let _ = ServerHandle::build()
|
||||
let _ = Server::build()
|
||||
.disable_signals()
|
||||
.workers(1)
|
||||
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
|
||||
|
@ -88,7 +88,7 @@ fn test_start() {
|
|||
|
||||
let h = thread::spawn(move || {
|
||||
actix_rt::System::new().block_on(async {
|
||||
let server = ServerHandle::build()
|
||||
let server = Server::build()
|
||||
.backlog(100)
|
||||
.disable_signals()
|
||||
.bind("test", addr, move || {
|
||||
|
@ -156,7 +156,7 @@ fn test_configure() {
|
|||
let h = thread::spawn(move || {
|
||||
let num = num2.clone();
|
||||
actix_rt::System::new().block_on(async {
|
||||
let server = ServerHandle::build()
|
||||
let server = Server::build()
|
||||
.disable_signals()
|
||||
.configure(move |cfg| {
|
||||
let num = num.clone();
|
||||
|
|
|
@ -3,6 +3,14 @@
|
|||
## Unreleased - 2021-xx-xx
|
||||
|
||||
|
||||
## 2.0.0-beta.5 - 2021-03-15
|
||||
* Add default `Service` trait impl for `Rc<S: Service>` and `&S: Service`. [#288]
|
||||
* Add `boxed::rc_service` function for constructing `boxed::RcService` type [#290]
|
||||
|
||||
[#288]: https://github.com/actix/actix-net/pull/288
|
||||
[#290]: https://github.com/actix/actix-net/pull/290
|
||||
|
||||
|
||||
## 2.0.0-beta.4 - 2021-02-04
|
||||
* `Service::poll_ready` and `Service::call` receive `&self`. [#247]
|
||||
* `apply_fn` and `apply_fn_factory` now receive `Fn(Req, &Service)` function type. [#247]
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "actix-service"
|
||||
version = "2.0.0-beta.4"
|
||||
version = "2.0.0-beta.5"
|
||||
authors = [
|
||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||
"Rob Ede <robjtede@icloud.com>",
|
||||
|
|
|
@ -3,11 +3,11 @@
|
|||
> Service trait and combinators for representing asynchronous request/response operations.
|
||||
|
||||
[](https://crates.io/crates/actix-service)
|
||||
[](https://docs.rs/actix-service/2.0.0-beta.4)
|
||||
[](https://docs.rs/actix-service/2.0.0-beta.5)
|
||||
[](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
|
||||

|
||||
[](https://deps.rs/crate/actix-service/2.0.0-beta.4)
|
||||
[](https://crates.io/crates/actix-service)
|
||||
[](https://deps.rs/crate/actix-service/2.0.0-beta.5)
|
||||

|
||||
[](https://discord.gg/NWpN5mmg3x)
|
||||
|
||||
See documentation for detailed explanations of these components: https://docs.rs/actix-service.
|
||||
|
|
|
@ -1,21 +1,65 @@
|
|||
use alloc::boxed::Box;
|
||||
use core::{
|
||||
future::Future,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
//! Trait object forms of services and service factories.
|
||||
|
||||
use alloc::{boxed::Box, rc::Rc};
|
||||
use core::{future::Future, pin::Pin};
|
||||
|
||||
use crate::{Service, ServiceFactory};
|
||||
|
||||
/// A boxed future without a Send bound or lifetime parameters.
|
||||
pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T>>>;
|
||||
|
||||
pub type BoxService<Req, Res, Err> =
|
||||
Box<dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<Result<Res, Err>>>>;
|
||||
macro_rules! service_object {
|
||||
($name: ident, $type: tt, $fn_name: ident) => {
|
||||
/// Type alias for service trait object.
|
||||
pub type $name<Req, Res, Err> = $type<
|
||||
dyn Service<Req, Response = Res, Error = Err, Future = BoxFuture<Result<Res, Err>>>,
|
||||
>;
|
||||
|
||||
/// Create service trait object.
|
||||
pub fn $fn_name<S, Req>(service: S) -> $name<Req, S::Response, S::Error>
|
||||
where
|
||||
S: Service<Req> + 'static,
|
||||
Req: 'static,
|
||||
S::Future: 'static,
|
||||
{
|
||||
$type::new(ServiceWrapper::new(service))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
service_object!(BoxService, Box, service);
|
||||
service_object!(RcService, Rc, rc_service);
|
||||
|
||||
struct ServiceWrapper<S> {
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S> ServiceWrapper<S> {
|
||||
fn new(inner: S) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Req, Res, Err> Service<Req> for ServiceWrapper<S>
|
||||
where
|
||||
S: Service<Req, Response = Res, Error = Err>,
|
||||
S::Future: 'static,
|
||||
{
|
||||
type Response = Res;
|
||||
type Error = Err;
|
||||
type Future = BoxFuture<Result<Res, Err>>;
|
||||
|
||||
crate::forward_ready!(inner);
|
||||
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
Box::pin(self.inner.call(req))
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper for a service factory trait object that will produce a boxed trait object service.
|
||||
pub struct BoxServiceFactory<Cfg, Req, Res, Err, InitErr>(Inner<Cfg, Req, Res, Err, InitErr>);
|
||||
|
||||
/// Create boxed service factory
|
||||
/// Create service factory trait object.
|
||||
pub fn factory<SF, Req>(
|
||||
factory: SF,
|
||||
) -> BoxServiceFactory<SF::Config, Req, SF::Response, SF::Error, SF::InitError>
|
||||
|
@ -28,20 +72,7 @@ where
|
|||
SF::Error: 'static,
|
||||
SF::InitError: 'static,
|
||||
{
|
||||
BoxServiceFactory(Box::new(FactoryWrapper {
|
||||
factory,
|
||||
_t: PhantomData,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Create boxed service
|
||||
pub fn service<S, Req>(service: S) -> BoxService<Req, S::Response, S::Error>
|
||||
where
|
||||
S: Service<Req> + 'static,
|
||||
Req: 'static,
|
||||
S::Future: 'static,
|
||||
{
|
||||
Box::new(ServiceWrapper(service, PhantomData))
|
||||
BoxServiceFactory(Box::new(FactoryWrapper(factory)))
|
||||
}
|
||||
|
||||
type Inner<C, Req, Res, Err, InitErr> = Box<
|
||||
|
@ -66,9 +97,9 @@ where
|
|||
{
|
||||
type Response = Res;
|
||||
type Error = Err;
|
||||
type InitError = InitErr;
|
||||
type Config = C;
|
||||
type Service = BoxService<Req, Res, Err>;
|
||||
type InitError = InitErr;
|
||||
|
||||
type Future = BoxFuture<Result<Self::Service, InitErr>>;
|
||||
|
||||
|
@ -77,12 +108,9 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
struct FactoryWrapper<SF, Req, Cfg> {
|
||||
factory: SF,
|
||||
_t: PhantomData<(Req, Cfg)>,
|
||||
}
|
||||
struct FactoryWrapper<SF>(SF);
|
||||
|
||||
impl<SF, Req, Cfg, Res, Err, InitErr> ServiceFactory<Req> for FactoryWrapper<SF, Req, Cfg>
|
||||
impl<SF, Req, Cfg, Res, Err, InitErr> ServiceFactory<Req> for FactoryWrapper<SF>
|
||||
where
|
||||
Req: 'static,
|
||||
Res: 'static,
|
||||
|
@ -95,47 +123,13 @@ where
|
|||
{
|
||||
type Response = Res;
|
||||
type Error = Err;
|
||||
type InitError = InitErr;
|
||||
type Config = Cfg;
|
||||
type Service = BoxService<Req, Res, Err>;
|
||||
type InitError = InitErr;
|
||||
type Future = BoxFuture<Result<Self::Service, Self::InitError>>;
|
||||
|
||||
fn new_service(&self, cfg: Cfg) -> Self::Future {
|
||||
let fut = self.factory.new_service(cfg);
|
||||
Box::pin(async {
|
||||
let res = fut.await;
|
||||
res.map(ServiceWrapper::boxed)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct ServiceWrapper<S: Service<Req>, Req>(S, PhantomData<Req>);
|
||||
|
||||
impl<S, Req> ServiceWrapper<S, Req>
|
||||
where
|
||||
S: Service<Req> + 'static,
|
||||
Req: 'static,
|
||||
S::Future: 'static,
|
||||
{
|
||||
fn boxed(service: S) -> BoxService<Req, S::Response, S::Error> {
|
||||
Box::new(ServiceWrapper(service, PhantomData))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Req, Res, Err> Service<Req> for ServiceWrapper<S, Req>
|
||||
where
|
||||
S: Service<Req, Response = Res, Error = Err>,
|
||||
S::Future: 'static,
|
||||
{
|
||||
type Response = Res;
|
||||
type Error = Err;
|
||||
type Future = BoxFuture<Result<Res, Err>>;
|
||||
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_ready(ctx)
|
||||
}
|
||||
|
||||
fn call(&self, req: Req) -> Self::Future {
|
||||
Box::pin(self.0.call(req))
|
||||
let f = self.0.new_service(cfg);
|
||||
Box::pin(async { f.await.map(|s| Box::new(ServiceWrapper::new(s)) as _) })
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,7 @@
|
|||
use crate::{dev, Service, ServiceFactory};
|
||||
use crate::{
|
||||
map::Map, map_err::MapErr, transform_err::TransformMapInitErr, Service, ServiceFactory,
|
||||
Transform,
|
||||
};
|
||||
|
||||
pub trait ServiceExt<Req>: Service<Req> {
|
||||
/// Map this service's output to a different type, returning a new service
|
||||
|
@ -10,12 +13,12 @@ pub trait ServiceExt<Req>: Service<Req> {
|
|||
/// Note that this function consumes the receiving service and returns a
|
||||
/// wrapped version of it, similar to the existing `map` methods in the
|
||||
/// standard library.
|
||||
fn map<F, R>(self, f: F) -> dev::Map<Self, F, Req, R>
|
||||
fn map<F, R>(self, f: F) -> Map<Self, F, Req, R>
|
||||
where
|
||||
Self: Sized,
|
||||
F: FnMut(Self::Response) -> R,
|
||||
{
|
||||
dev::Map::new(self, f)
|
||||
Map::new(self, f)
|
||||
}
|
||||
|
||||
/// Map this service's error to a different error, returning a new service.
|
||||
|
@ -26,12 +29,12 @@ pub trait ServiceExt<Req>: Service<Req> {
|
|||
///
|
||||
/// Note that this function consumes the receiving service and returns a
|
||||
/// wrapped version of it.
|
||||
fn map_err<F, E>(self, f: F) -> dev::MapErr<Self, Req, F, E>
|
||||
fn map_err<F, E>(self, f: F) -> MapErr<Self, Req, F, E>
|
||||
where
|
||||
Self: Sized,
|
||||
F: Fn(Self::Error) -> E,
|
||||
{
|
||||
dev::MapErr::new(self, f)
|
||||
MapErr::new(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -67,4 +70,17 @@ pub trait ServiceFactoryExt<Req>: ServiceFactory<Req> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<S, Req> ServiceFactoryExt<Req> for S where S: ServiceFactory<Req> {}
|
||||
impl<SF, Req> ServiceFactoryExt<Req> for SF where SF: ServiceFactory<Req> {}
|
||||
|
||||
pub trait TransformExt<S, Req>: Transform<S, Req> {
|
||||
/// Return a new `Transform` whose init error is mapped to to a different type.
|
||||
fn map_init_err<F, E>(self, f: F) -> TransformMapInitErr<Self, S, Req, F, E>
|
||||
where
|
||||
Self: Sized,
|
||||
F: Fn(Self::InitError) -> E + Clone,
|
||||
{
|
||||
TransformMapInitErr::new(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Req> TransformExt<T, Req> for T where T: Transform<T, Req> {}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use core::{future::Future, marker::PhantomData, task::Poll};
|
||||
use core::{future::Future, marker::PhantomData};
|
||||
|
||||
use crate::{ok, IntoService, IntoServiceFactory, Ready, Service, ServiceFactory};
|
||||
|
||||
|
@ -15,8 +15,7 @@ where
|
|||
|
||||
/// Create `ServiceFactory` for function that can produce services
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// use std::io;
|
||||
/// use actix_service::{fn_factory, fn_service, Service, ServiceFactory};
|
||||
|
@ -62,11 +61,10 @@ where
|
|||
|
||||
/// Create `ServiceFactory` for function that accepts config argument and can produce services
|
||||
///
|
||||
/// Any function that has following form `Fn(Config) -> Future<Output = Service>` could
|
||||
/// act as a `ServiceFactory`.
|
||||
///
|
||||
/// # Example
|
||||
/// Any function that has following form `Fn(Config) -> Future<Output = Service>` could act as
|
||||
/// a `ServiceFactory`.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// use std::io;
|
||||
/// use actix_service::{fn_factory_with_config, fn_service, Service, ServiceFactory};
|
||||
|
|
|
@ -21,6 +21,7 @@ mod apply_cfg;
|
|||
pub mod boxed;
|
||||
mod ext;
|
||||
mod fn_service;
|
||||
mod macros;
|
||||
mod map;
|
||||
mod map_config;
|
||||
mod map_err;
|
||||
|
@ -33,11 +34,11 @@ mod transform_err;
|
|||
|
||||
pub use self::apply::{apply_fn, apply_fn_factory};
|
||||
pub use self::apply_cfg::{apply_cfg, apply_cfg_factory};
|
||||
pub use self::ext::{ServiceExt, ServiceFactoryExt};
|
||||
pub use self::ext::{ServiceExt, ServiceFactoryExt, TransformExt};
|
||||
pub use self::fn_service::{fn_factory, fn_factory_with_config, fn_service};
|
||||
pub use self::map_config::{map_config, unit_config};
|
||||
pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};
|
||||
pub use self::transform::{apply, Transform};
|
||||
pub use self::transform::{apply, ApplyTransform, Transform};
|
||||
|
||||
#[allow(unused_imports)]
|
||||
use self::ready::{err, ok, ready, Ready};
|
||||
|
@ -102,8 +103,8 @@ pub trait Service<Req> {
|
|||
/// call and the next invocation of `call` results in an error.
|
||||
///
|
||||
/// # Notes
|
||||
/// 1. `.poll_ready()` might be called on different task from actual service call.
|
||||
/// 1. In case of chained services, `.poll_ready()` get called for all services at once.
|
||||
/// 1. `poll_ready` might be called on a different task to `call`.
|
||||
/// 1. In cases of chained services, `.poll_ready()` is called for all services at once.
|
||||
fn poll_ready(&self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
|
||||
|
||||
/// Process the request and return the response asynchronously.
|
||||
|
@ -150,6 +151,7 @@ pub trait ServiceFactory<Req> {
|
|||
fn new_service(&self, cfg: Self::Config) -> Self::Future;
|
||||
}
|
||||
|
||||
// TODO: remove implement on mut reference.
|
||||
impl<'a, S, Req> Service<Req> for &'a mut S
|
||||
where
|
||||
S: Service<Req> + 'a,
|
||||
|
@ -167,6 +169,23 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<'a, S, Req> Service<Req> for &'a S
|
||||
where
|
||||
S: Service<Req> + 'a,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
(**self).poll_ready(ctx)
|
||||
}
|
||||
|
||||
fn call(&self, request: Req) -> S::Future {
|
||||
(**self).call(request)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Req> Service<Req> for Box<S>
|
||||
where
|
||||
S: Service<Req> + ?Sized,
|
||||
|
@ -184,24 +203,25 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
impl<S, Req> Service<Req> for RefCell<S>
|
||||
impl<S, Req> Service<Req> for Rc<S>
|
||||
where
|
||||
S: Service<Req>,
|
||||
S: Service<Req> + ?Sized,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.borrow().poll_ready(ctx)
|
||||
(**self).poll_ready(ctx)
|
||||
}
|
||||
|
||||
fn call(&self, request: Req) -> S::Future {
|
||||
self.borrow().call(request)
|
||||
(**self).call(request)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, Req> Service<Req> for Rc<RefCell<S>>
|
||||
/// This impl is deprecated since v2 because the `Service` trait now receives shared reference.
|
||||
impl<S, Req> Service<Req> for RefCell<S>
|
||||
where
|
||||
S: Service<Req>,
|
||||
{
|
||||
|
@ -294,44 +314,3 @@ where
|
|||
{
|
||||
tp.into_service()
|
||||
}
|
||||
|
||||
pub mod dev {
|
||||
pub use crate::apply::{Apply, ApplyFactory};
|
||||
pub use crate::fn_service::{
|
||||
FnService, FnServiceConfig, FnServiceFactory, FnServiceNoConfig,
|
||||
};
|
||||
pub use crate::map::{Map, MapServiceFactory};
|
||||
pub use crate::map_config::{MapConfig, UnitConfig};
|
||||
pub use crate::map_err::{MapErr, MapErrServiceFactory};
|
||||
pub use crate::map_init_err::MapInitErr;
|
||||
pub use crate::transform::ApplyTransform;
|
||||
pub use crate::transform_err::TransformMapInitErr;
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! always_ready {
|
||||
() => {
|
||||
#[inline]
|
||||
fn poll_ready(
|
||||
&self,
|
||||
_: &mut ::core::task::Context<'_>,
|
||||
) -> ::core::task::Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! forward_ready {
|
||||
($field:ident) => {
|
||||
#[inline]
|
||||
fn poll_ready(
|
||||
&self,
|
||||
cx: &mut ::core::task::Context<'_>,
|
||||
) -> ::core::task::Poll<Result<(), Self::Error>> {
|
||||
self.$field
|
||||
.poll_ready(cx)
|
||||
.map_err(::core::convert::Into::into)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -0,0 +1,181 @@
|
|||
/// A boilerplate implementation of [`Service::poll_ready`] that always signals readiness.
|
||||
///
|
||||
/// [`Service::poll_ready`]: crate::Service::poll_ready
|
||||
///
|
||||
/// # Examples
|
||||
/// ```no_run
|
||||
/// use actix_service::Service;
|
||||
/// use futures_util::future::{ready, Ready};
|
||||
///
|
||||
/// struct IdentityService;
|
||||
///
|
||||
/// impl Service<u32> for IdentityService {
|
||||
/// type Response = u32;
|
||||
/// type Error = ();
|
||||
/// type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||
///
|
||||
/// actix_service::always_ready!();
|
||||
///
|
||||
/// fn call(&self, req: u32) -> Self::Future {
|
||||
/// ready(Ok(req))
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! always_ready {
|
||||
() => {
|
||||
#[inline]
|
||||
fn poll_ready(
|
||||
&self,
|
||||
_: &mut ::core::task::Context<'_>,
|
||||
) -> ::core::task::Poll<Result<(), Self::Error>> {
|
||||
::core::task::Poll::Ready(Ok(()))
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// A boilerplate implementation of [`Service::poll_ready`] that forwards readiness checks to a
|
||||
/// named struct field.
|
||||
///
|
||||
/// Tuple structs are not supported.
|
||||
///
|
||||
/// [`Service::poll_ready`]: crate::Service::poll_ready
|
||||
///
|
||||
/// # Examples
|
||||
/// ```no_run
|
||||
/// use actix_service::Service;
|
||||
/// use futures_util::future::{ready, Ready};
|
||||
///
|
||||
/// struct WrapperService<S> {
|
||||
/// inner: S,
|
||||
/// }
|
||||
///
|
||||
/// impl<S> Service<()> for WrapperService<S>
|
||||
/// where
|
||||
/// S: Service<()>,
|
||||
/// {
|
||||
/// type Response = S::Response;
|
||||
/// type Error = S::Error;
|
||||
/// type Future = S::Future;
|
||||
///
|
||||
/// actix_service::forward_ready!(inner);
|
||||
///
|
||||
/// fn call(&self, req: ()) -> Self::Future {
|
||||
/// self.inner.call(req)
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! forward_ready {
|
||||
($field:ident) => {
|
||||
#[inline]
|
||||
fn poll_ready(
|
||||
&self,
|
||||
cx: &mut ::core::task::Context<'_>,
|
||||
) -> ::core::task::Poll<Result<(), Self::Error>> {
|
||||
self.$field
|
||||
.poll_ready(cx)
|
||||
.map_err(::core::convert::Into::into)
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use core::{
|
||||
cell::Cell,
|
||||
convert::Infallible,
|
||||
task::{self, Context, Poll},
|
||||
};
|
||||
|
||||
use futures_util::{
|
||||
future::{ready, Ready},
|
||||
task::noop_waker,
|
||||
};
|
||||
|
||||
use crate::Service;
|
||||
|
||||
struct IdentityService;
|
||||
|
||||
impl Service<u32> for IdentityService {
|
||||
type Response = u32;
|
||||
type Error = Infallible;
|
||||
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||
|
||||
always_ready!();
|
||||
|
||||
fn call(&self, req: u32) -> Self::Future {
|
||||
ready(Ok(req))
|
||||
}
|
||||
}
|
||||
|
||||
struct CountdownService(Cell<u32>);
|
||||
|
||||
impl Service<()> for CountdownService {
|
||||
type Response = ();
|
||||
type Error = Infallible;
|
||||
type Future = Ready<Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let count = self.0.get();
|
||||
|
||||
if count == 0 {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
self.0.set(count - 1);
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&self, _: ()) -> Self::Future {
|
||||
ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
struct WrapperService<S> {
|
||||
inner: S,
|
||||
}
|
||||
|
||||
impl<S> Service<()> for WrapperService<S>
|
||||
where
|
||||
S: Service<()>,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
forward_ready!(inner);
|
||||
|
||||
fn call(&self, req: ()) -> Self::Future {
|
||||
self.inner.call(req)
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_always_ready_macro() {
|
||||
let waker = noop_waker();
|
||||
let mut cx = task::Context::from_waker(&waker);
|
||||
|
||||
let svc = IdentityService;
|
||||
|
||||
assert!(svc.poll_ready(&mut cx).is_ready());
|
||||
assert!(svc.poll_ready(&mut cx).is_ready());
|
||||
assert!(svc.poll_ready(&mut cx).is_ready());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_forward_ready_macro() {
|
||||
let waker = noop_waker();
|
||||
let mut cx = task::Context::from_waker(&waker);
|
||||
|
||||
let svc = WrapperService {
|
||||
inner: CountdownService(Cell::new(3)),
|
||||
};
|
||||
|
||||
assert!(svc.poll_ready(&mut cx).is_pending());
|
||||
assert!(svc.poll_ready(&mut cx).is_pending());
|
||||
assert!(svc.poll_ready(&mut cx).is_pending());
|
||||
assert!(svc.poll_ready(&mut cx).is_ready());
|
||||
}
|
||||
}
|
|
@ -9,10 +9,9 @@ use core::{
|
|||
use futures_core::ready;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::transform_err::TransformMapInitErr;
|
||||
use crate::{IntoServiceFactory, Service, ServiceFactory};
|
||||
|
||||
/// Apply transform to a service.
|
||||
/// Apply a [`Transform`] to a [`Service`].
|
||||
pub fn apply<T, S, I, Req>(t: T, factory: I) -> ApplyTransform<T, S, Req>
|
||||
where
|
||||
I: IntoServiceFactory<S, Req>,
|
||||
|
@ -25,9 +24,8 @@ where
|
|||
/// The `Transform` trait defines the interface of a service factory that wraps inner service
|
||||
/// during construction.
|
||||
///
|
||||
/// Transform(middleware) wraps inner service and runs during
|
||||
/// inbound and/or outbound processing in the request/response lifecycle.
|
||||
/// It may modify request and/or response.
|
||||
/// Transform(middleware) wraps inner service and runs during inbound and/or outbound processing in
|
||||
/// the request/response lifecycle. It may modify request and/or response.
|
||||
///
|
||||
/// For example, timeout transform:
|
||||
///
|
||||
|
@ -51,20 +49,19 @@ where
|
|||
/// fn call(&self, req: S::Request) -> Self::Future {
|
||||
/// TimeoutServiceResponse {
|
||||
/// fut: self.service.call(req),
|
||||
/// sleep: Delay::new(clock::now() + self.timeout),
|
||||
/// sleep: Sleep::new(clock::now() + self.timeout),
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
///
|
||||
/// Timeout service in above example is decoupled from underlying service implementation
|
||||
/// and could be applied to any service.
|
||||
/// Timeout service in above example is decoupled from underlying service implementation and could
|
||||
/// be applied to any service.
|
||||
///
|
||||
/// The `Transform` trait defines the interface of a Service factory. `Transform`
|
||||
/// is often implemented for middleware, defining how to construct a
|
||||
/// middleware Service. A Service that is constructed by the factory takes
|
||||
/// the Service that follows it during execution as a parameter, assuming
|
||||
/// ownership of the next Service.
|
||||
/// The `Transform` trait defines the interface of a Service factory. `Transform` is often
|
||||
/// implemented for middleware, defining how to construct a middleware Service. A Service that is
|
||||
/// constructed by the factory takes the Service that follows it during execution as a parameter,
|
||||
/// assuming ownership of the next Service.
|
||||
///
|
||||
/// Factory for `Timeout` middleware from the above example could look like this:
|
||||
///
|
||||
|
@ -85,15 +82,15 @@ where
|
|||
/// type Future = Ready<Result<Self::Transform, Self::InitError>>;
|
||||
///
|
||||
/// fn new_transform(&self, service: S) -> Self::Future {
|
||||
/// ok(TimeoutService {
|
||||
/// ready(Ok(TimeoutService {
|
||||
/// service,
|
||||
/// timeout: self.timeout,
|
||||
/// })
|
||||
/// }))
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
pub trait Transform<S, Req> {
|
||||
/// Responses given by the service.
|
||||
/// Responses produced by the service.
|
||||
type Response;
|
||||
|
||||
/// Errors produced by the service.
|
||||
|
@ -110,16 +107,6 @@ pub trait Transform<S, Req> {
|
|||
|
||||
/// Creates and returns a new Transform component, asynchronously
|
||||
fn new_transform(&self, service: S) -> Self::Future;
|
||||
|
||||
/// Map this transform's factory error to a different error,
|
||||
/// returning a new transform service factory.
|
||||
fn map_init_err<F, E>(self, f: F) -> TransformMapInitErr<Self, S, Req, F, E>
|
||||
where
|
||||
Self: Sized,
|
||||
F: Fn(Self::InitError) -> E + Clone,
|
||||
{
|
||||
TransformMapInitErr::new(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, S, Req> Transform<S, Req> for Rc<T>
|
||||
|
@ -152,7 +139,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// `Apply` transform to new service
|
||||
/// Apply a [`Transform`] to a [`Service`].
|
||||
pub struct ApplyTransform<T, S, Req>(Rc<(T, S)>, PhantomData<Req>);
|
||||
|
||||
impl<T, S, Req> ApplyTransform<T, S, Req>
|
||||
|
|
|
@ -9,10 +9,8 @@ use pin_project_lite::pin_project;
|
|||
|
||||
use super::Transform;
|
||||
|
||||
/// Transform for the `map_init_err` combinator, changing the type of a new
|
||||
/// transform's init error.
|
||||
///
|
||||
/// This is created by the `Transform::map_init_err` method.
|
||||
/// Transform for the [`TransformExt::map_init_err`] combinator, changing the type of a new
|
||||
/// [`Transform`]'s initialization error.
|
||||
pub struct TransformMapInitErr<T, S, Req, F, E> {
|
||||
transform: T,
|
||||
mapper: F,
|
||||
|
|
|
@ -3,6 +3,14 @@
|
|||
## Unreleased - 2021-xx-xx
|
||||
|
||||
|
||||
## 3.0.0-beta.4 - 2021-02-24
|
||||
* Rename `accept::openssl::{SslStream => TlsStream}`.
|
||||
* Add `connect::Connect::set_local_addr` to attach local `IpAddr`. [#282]
|
||||
* `connector::TcpConnector` service will try to bind to local_addr of `IpAddr` when given. [#282]
|
||||
|
||||
[#282]: https://github.com/actix/actix-net/pull/282
|
||||
|
||||
|
||||
## 3.0.0-beta.3 - 2021-02-06
|
||||
* Remove `trust-dns-proto` and `trust-dns-resolver`. [#248]
|
||||
* Use `std::net::ToSocketAddrs` as simple and basic default resolver. [#248]
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "actix-tls"
|
||||
version = "3.0.0-beta.3"
|
||||
version = "3.0.0-beta.4"
|
||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||
description = "TLS acceptor and connector services for Actix ecosystem"
|
||||
keywords = ["network", "tls", "ssl", "async", "transport"]
|
||||
|
@ -41,8 +41,8 @@ uri = ["http"]
|
|||
|
||||
[dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-rt = { version = "2.0.0", default-features = false }
|
||||
actix-service = "2.0.0-beta.4"
|
||||
actix-rt = { version = "2.1.0", default-features = false }
|
||||
actix-service = "2.0.0-beta.5"
|
||||
actix-utils = "3.0.0-beta.2"
|
||||
|
||||
derive_more = "0.99.5"
|
||||
|
@ -52,7 +52,7 @@ log = "0.4"
|
|||
tokio-util = { version = "0.6.3", default-features = false }
|
||||
|
||||
# openssl
|
||||
tls-openssl = { package = "openssl", version = "0.10", optional = true }
|
||||
tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
|
||||
tokio-openssl = { version = "0.6", optional = true }
|
||||
|
||||
# rustls
|
||||
|
@ -63,7 +63,7 @@ webpki-roots = { version = "0.21", optional = true }
|
|||
tokio-native-tls = { version = "0.3", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.0.0"
|
||||
actix-rt = "2.1.0"
|
||||
actix-server = "2.0.0-beta.3"
|
||||
bytes = "1"
|
||||
env_logger = "0.8"
|
||||
|
@ -72,5 +72,5 @@ log = "0.4"
|
|||
trust-dns-resolver = "0.20.0"
|
||||
|
||||
[[example]]
|
||||
name = "basic"
|
||||
name = "tcp-rustls"
|
||||
required-features = ["accept", "rustls"]
|
||||
|
|
|
@ -29,9 +29,10 @@ use std::{
|
|||
},
|
||||
};
|
||||
|
||||
use actix_server::ServerHandle;
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_server::Server;
|
||||
use actix_service::pipeline_factory;
|
||||
use actix_tls::accept::rustls::Acceptor as RustlsAcceptor;
|
||||
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
|
||||
use futures_util::future::ok;
|
||||
use log::info;
|
||||
use rustls::{
|
||||
|
@ -67,16 +68,16 @@ async fn main() -> io::Result<()> {
|
|||
let addr = ("127.0.0.1", 8443);
|
||||
info!("starting server on port: {}", &addr.0);
|
||||
|
||||
ServerHandle::build()
|
||||
Server::build()
|
||||
.bind("tls-example", addr, move || {
|
||||
let count = Arc::clone(&count);
|
||||
|
||||
// Set up TLS service factory
|
||||
pipeline_factory(tls_acceptor.clone())
|
||||
.map_err(|err| println!("Rustls error: {:?}", err))
|
||||
.and_then(move |stream| {
|
||||
.and_then(move |stream: TlsStream<TcpStream>| {
|
||||
let num = count.fetch_add(1, Ordering::Relaxed);
|
||||
info!("[{}] Got TLS connection: {:?}", num, stream);
|
||||
info!("[{}] Got TLS connection: {:?}", num, &*stream);
|
||||
ok(())
|
||||
})
|
||||
})?
|
|
@ -1,15 +1,94 @@
|
|||
use std::task::{Context, Poll};
|
||||
use std::{
|
||||
io::{self, IoSlice},
|
||||
ops::{Deref, DerefMut},
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use actix_rt::net::{ActixStream, Ready};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use actix_utils::counter::Counter;
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
|
||||
pub use tokio_native_tls::native_tls::Error;
|
||||
pub use tokio_native_tls::{TlsAcceptor, TlsStream};
|
||||
pub use tokio_native_tls::TlsAcceptor;
|
||||
|
||||
use super::MAX_CONN_COUNTER;
|
||||
|
||||
/// Wrapper type for `tokio_native_tls::TlsStream` in order to impl `ActixStream` trait.
|
||||
pub struct TlsStream<T>(tokio_native_tls::TlsStream<T>);
|
||||
|
||||
impl<T> From<tokio_native_tls::TlsStream<T>> for TlsStream<T> {
|
||||
fn from(stream: tokio_native_tls::TlsStream<T>) -> Self {
|
||||
Self(stream)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ActixStream> Deref for TlsStream<T> {
|
||||
type Target = tokio_native_tls::TlsStream<T>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ActixStream> DerefMut for TlsStream<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ActixStream> AsyncRead for TlsStream<T> {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
|
||||
}
|
||||
|
||||
fn poll_write_vectored(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &[IoSlice<'_>],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
|
||||
}
|
||||
|
||||
fn is_write_vectored(&self) -> bool {
|
||||
(&**self).is_write_vectored()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ActixStream> ActixStream for TlsStream<T> {
|
||||
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||
T::poll_read_ready((&**self).get_ref().get_ref().get_ref(), cx)
|
||||
}
|
||||
|
||||
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||
T::poll_write_ready((&**self).get_ref().get_ref().get_ref(), cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Accept TLS connections via `native-tls` package.
|
||||
///
|
||||
/// `native-tls` feature enables this `Acceptor` type.
|
||||
|
@ -34,10 +113,7 @@ impl Clone for Acceptor {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> ServiceFactory<T> for Acceptor
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
||||
type Response = TlsStream<T>;
|
||||
type Error = Error;
|
||||
type Config = ();
|
||||
|
@ -71,10 +147,7 @@ impl Clone for NativeTlsAcceptorService {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Service<T> for NativeTlsAcceptorService
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
impl<T: ActixStream> Service<T> for NativeTlsAcceptorService {
|
||||
type Response = TlsStream<T>;
|
||||
type Error = Error;
|
||||
type Future = LocalBoxFuture<'static, Result<TlsStream<T>, Error>>;
|
||||
|
@ -93,7 +166,7 @@ where
|
|||
Box::pin(async move {
|
||||
let io = this.acceptor.accept(io).await;
|
||||
drop(guard);
|
||||
io
|
||||
io.map(Into::into)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
io::{self, IoSlice},
|
||||
ops::{Deref, DerefMut},
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use actix_rt::net::{ActixStream, Ready};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use actix_utils::counter::{Counter, CounterGuard};
|
||||
use futures_core::{future::LocalBoxFuture, ready};
|
||||
|
@ -12,10 +15,82 @@ use futures_core::{future::LocalBoxFuture, ready};
|
|||
pub use openssl::ssl::{
|
||||
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
|
||||
};
|
||||
pub use tokio_openssl::SslStream;
|
||||
|
||||
use super::MAX_CONN_COUNTER;
|
||||
|
||||
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
|
||||
pub struct TlsStream<T>(tokio_openssl::SslStream<T>);
|
||||
|
||||
impl<T> From<tokio_openssl::SslStream<T>> for TlsStream<T> {
|
||||
fn from(stream: tokio_openssl::SslStream<T>) -> Self {
|
||||
Self(stream)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for TlsStream<T> {
|
||||
type Target = tokio_openssl::SslStream<T>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for TlsStream<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ActixStream> AsyncRead for TlsStream<T> {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
|
||||
}
|
||||
|
||||
fn poll_write_vectored(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &[IoSlice<'_>],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
|
||||
}
|
||||
|
||||
fn is_write_vectored(&self) -> bool {
|
||||
(&**self).is_write_vectored()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ActixStream> ActixStream for TlsStream<T> {
|
||||
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||
T::poll_read_ready((&**self).get_ref(), cx)
|
||||
}
|
||||
|
||||
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||
T::poll_write_ready((&**self).get_ref(), cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Accept TLS connections via `openssl` package.
|
||||
///
|
||||
/// `openssl` feature enables this `Acceptor` type.
|
||||
|
@ -40,11 +115,8 @@ impl Clone for Acceptor {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> ServiceFactory<T> for Acceptor
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
type Response = SslStream<T>;
|
||||
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
||||
type Response = TlsStream<T>;
|
||||
type Error = SslError;
|
||||
type Config = ();
|
||||
type Service = AcceptorService;
|
||||
|
@ -67,11 +139,8 @@ pub struct AcceptorService {
|
|||
conns: Counter,
|
||||
}
|
||||
|
||||
impl<T> Service<T> for AcceptorService
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin + 'static,
|
||||
{
|
||||
type Response = SslStream<T>;
|
||||
impl<T: ActixStream> Service<T> for AcceptorService {
|
||||
type Response = TlsStream<T>;
|
||||
type Error = SslError;
|
||||
type Future = AcceptorServiceResponse<T>;
|
||||
|
||||
|
@ -88,24 +157,25 @@ where
|
|||
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
|
||||
AcceptorServiceResponse {
|
||||
_guard: self.conns.get(),
|
||||
stream: Some(SslStream::new(ssl, io).unwrap()),
|
||||
stream: Some(tokio_openssl::SslStream::new(ssl, io).unwrap()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AcceptorServiceResponse<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
{
|
||||
stream: Option<SslStream<T>>,
|
||||
pub struct AcceptorServiceResponse<T: ActixStream> {
|
||||
stream: Option<tokio_openssl::SslStream<T>>,
|
||||
_guard: CounterGuard,
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
|
||||
type Output = Result<SslStream<T>, SslError>;
|
||||
impl<T: ActixStream> Future for AcceptorServiceResponse<T> {
|
||||
type Output = Result<TlsStream<T>, SslError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
ready!(Pin::new(self.stream.as_mut().unwrap()).poll_accept(cx))?;
|
||||
Poll::Ready(Ok(self.stream.take().expect("SSL connect has resolved.")))
|
||||
Poll::Ready(Ok(self
|
||||
.stream
|
||||
.take()
|
||||
.expect("SSL connect has resolved.")
|
||||
.into()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,22 +1,96 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
io,
|
||||
io::{self, IoSlice},
|
||||
ops::{Deref, DerefMut},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_codec::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use actix_rt::net::{ActixStream, Ready};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use actix_utils::counter::{Counter, CounterGuard};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use tokio_rustls::{Accept, TlsAcceptor};
|
||||
|
||||
pub use tokio_rustls::rustls::{ServerConfig, Session};
|
||||
pub use tokio_rustls::server::TlsStream;
|
||||
|
||||
use super::MAX_CONN_COUNTER;
|
||||
|
||||
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
|
||||
pub struct TlsStream<T>(tokio_rustls::server::TlsStream<T>);
|
||||
|
||||
impl<T> From<tokio_rustls::server::TlsStream<T>> for TlsStream<T> {
|
||||
fn from(stream: tokio_rustls::server::TlsStream<T>) -> Self {
|
||||
Self(stream)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Deref for TlsStream<T> {
|
||||
type Target = tokio_rustls::server::TlsStream<T>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DerefMut for TlsStream<T> {
|
||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ActixStream> AsyncRead for TlsStream<T> {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
|
||||
}
|
||||
|
||||
fn poll_write_vectored(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
bufs: &[IoSlice<'_>],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
|
||||
}
|
||||
|
||||
fn is_write_vectored(&self) -> bool {
|
||||
(&**self).is_write_vectored()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ActixStream> ActixStream for TlsStream<T> {
|
||||
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||
T::poll_read_ready((&**self).get_ref().0, cx)
|
||||
}
|
||||
|
||||
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
|
||||
T::poll_write_ready((&**self).get_ref().0, cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Accept TLS connections via `rustls` package.
|
||||
///
|
||||
/// `rustls` feature enables this `Acceptor` type.
|
||||
|
@ -43,10 +117,7 @@ impl Clone for Acceptor {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> ServiceFactory<T> for Acceptor
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
|
||||
type Response = TlsStream<T>;
|
||||
type Error = io::Error;
|
||||
type Config = ();
|
||||
|
@ -72,10 +143,7 @@ pub struct AcceptorService {
|
|||
conns: Counter,
|
||||
}
|
||||
|
||||
impl<T> Service<T> for AcceptorService
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
impl<T: ActixStream> Service<T> for AcceptorService {
|
||||
type Response = TlsStream<T>;
|
||||
type Error = io::Error;
|
||||
type Future = AcceptorServiceFut<T>;
|
||||
|
@ -96,22 +164,16 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub struct AcceptorServiceFut<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
pub struct AcceptorServiceFut<T: ActixStream> {
|
||||
fut: Accept<T>,
|
||||
_guard: CounterGuard,
|
||||
}
|
||||
|
||||
impl<T> Future for AcceptorServiceFut<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
impl<T: ActixStream> Future for AcceptorServiceFut<T> {
|
||||
type Output = Result<TlsStream<T>, io::Error>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
Pin::new(&mut this.fut).poll(cx)
|
||||
Pin::new(&mut this.fut).poll(cx).map_ok(TlsStream)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::{
|
|||
fmt,
|
||||
iter::{self, FromIterator as _},
|
||||
mem,
|
||||
net::SocketAddr,
|
||||
net::{IpAddr, SocketAddr},
|
||||
};
|
||||
|
||||
/// Parse a host into parts (hostname and port).
|
||||
|
@ -67,6 +67,7 @@ pub struct Connect<T> {
|
|||
pub(crate) req: T,
|
||||
pub(crate) port: u16,
|
||||
pub(crate) addr: ConnectAddrs,
|
||||
pub(crate) local_addr: Option<IpAddr>,
|
||||
}
|
||||
|
||||
impl<T: Address> Connect<T> {
|
||||
|
@ -78,6 +79,7 @@ impl<T: Address> Connect<T> {
|
|||
req,
|
||||
port: port.unwrap_or(0),
|
||||
addr: ConnectAddrs::None,
|
||||
local_addr: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -88,6 +90,7 @@ impl<T: Address> Connect<T> {
|
|||
req,
|
||||
port: 0,
|
||||
addr: ConnectAddrs::One(addr),
|
||||
local_addr: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,6 +122,12 @@ impl<T: Address> Connect<T> {
|
|||
self
|
||||
}
|
||||
|
||||
/// Set local_addr of connect.
|
||||
pub fn set_local_addr(mut self, addr: impl Into<IpAddr>) -> Self {
|
||||
self.local_addr = Some(addr.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Get hostname.
|
||||
pub fn hostname(&self) -> &str {
|
||||
self.req.hostname()
|
||||
|
@ -285,7 +294,7 @@ fn parse_host(host: &str) -> (&str, Option<u16>) {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
use std::net::Ipv4Addr;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
@ -329,4 +338,13 @@ mod tests {
|
|||
let mut iter = ConnectAddrsIter::None;
|
||||
assert_eq!(iter.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_local_addr() {
|
||||
let conn = Connect::new("hello").set_local_addr([127, 0, 0, 1]);
|
||||
assert_eq!(
|
||||
conn.local_addr.unwrap(),
|
||||
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,12 +2,12 @@ use std::{
|
|||
collections::VecDeque,
|
||||
future::Future,
|
||||
io,
|
||||
net::SocketAddr,
|
||||
net::{IpAddr, SocketAddr, SocketAddrV4, SocketAddrV6},
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_rt::net::{TcpSocket, TcpStream};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use futures_core::{future::LocalBoxFuture, ready};
|
||||
use log::{error, trace};
|
||||
|
@ -54,9 +54,14 @@ impl<T: Address> Service<Connect<T>> for TcpConnector {
|
|||
|
||||
fn call(&self, req: Connect<T>) -> Self::Future {
|
||||
let port = req.port();
|
||||
let Connect { req, addr, .. } = req;
|
||||
let Connect {
|
||||
req,
|
||||
addr,
|
||||
local_addr,
|
||||
..
|
||||
} = req;
|
||||
|
||||
TcpConnectorResponse::new(req, port, addr)
|
||||
TcpConnectorResponse::new(req, port, local_addr, addr)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,6 +70,7 @@ pub enum TcpConnectorResponse<T> {
|
|||
Response {
|
||||
req: Option<T>,
|
||||
port: u16,
|
||||
local_addr: Option<IpAddr>,
|
||||
addrs: Option<VecDeque<SocketAddr>>,
|
||||
stream: Option<ReusableBoxFuture<Result<TcpStream, io::Error>>>,
|
||||
},
|
||||
|
@ -72,7 +78,12 @@ pub enum TcpConnectorResponse<T> {
|
|||
}
|
||||
|
||||
impl<T: Address> TcpConnectorResponse<T> {
|
||||
pub(crate) fn new(req: T, port: u16, addr: ConnectAddrs) -> TcpConnectorResponse<T> {
|
||||
pub(crate) fn new(
|
||||
req: T,
|
||||
port: u16,
|
||||
local_addr: Option<IpAddr>,
|
||||
addr: ConnectAddrs,
|
||||
) -> TcpConnectorResponse<T> {
|
||||
if addr.is_none() {
|
||||
error!("TCP connector: unresolved connection address");
|
||||
return TcpConnectorResponse::Error(Some(ConnectError::Unresolved));
|
||||
|
@ -90,8 +101,9 @@ impl<T: Address> TcpConnectorResponse<T> {
|
|||
ConnectAddrs::One(addr) => TcpConnectorResponse::Response {
|
||||
req: Some(req),
|
||||
port,
|
||||
local_addr,
|
||||
addrs: None,
|
||||
stream: Some(ReusableBoxFuture::new(TcpStream::connect(addr))),
|
||||
stream: Some(ReusableBoxFuture::new(connect(addr, local_addr))),
|
||||
},
|
||||
|
||||
// when resolver returns multiple socket addr for request they would be popped from
|
||||
|
@ -99,6 +111,7 @@ impl<T: Address> TcpConnectorResponse<T> {
|
|||
ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response {
|
||||
req: Some(req),
|
||||
port,
|
||||
local_addr,
|
||||
addrs: Some(addrs),
|
||||
stream: None,
|
||||
},
|
||||
|
@ -116,6 +129,7 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
|
|||
TcpConnectorResponse::Response {
|
||||
req,
|
||||
port,
|
||||
local_addr,
|
||||
addrs,
|
||||
stream,
|
||||
} => loop {
|
||||
|
@ -148,11 +162,38 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
|
|||
// try to connect
|
||||
let addr = addrs.as_mut().unwrap().pop_front().unwrap();
|
||||
|
||||
let fut = connect(addr, *local_addr);
|
||||
match stream {
|
||||
Some(rbf) => rbf.set(TcpStream::connect(addr)),
|
||||
None => *stream = Some(ReusableBoxFuture::new(TcpStream::connect(addr))),
|
||||
Some(rbf) => rbf.set(fut),
|
||||
None => *stream = Some(ReusableBoxFuture::new(fut)),
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect(addr: SocketAddr, local_addr: Option<IpAddr>) -> io::Result<TcpStream> {
|
||||
// use local addr if connect asks for it.
|
||||
match local_addr {
|
||||
Some(ip_addr) => {
|
||||
let socket = match ip_addr {
|
||||
IpAddr::V4(ip_addr) => {
|
||||
let socket = TcpSocket::new_v4()?;
|
||||
let addr = SocketAddr::V4(SocketAddrV4::new(ip_addr, 0));
|
||||
socket.bind(addr)?;
|
||||
socket
|
||||
}
|
||||
IpAddr::V6(ip_addr) => {
|
||||
let socket = TcpSocket::new_v6()?;
|
||||
let addr = SocketAddr::V6(SocketAddrV6::new(ip_addr, 0, 0, 0));
|
||||
socket.bind(addr)?;
|
||||
socket
|
||||
}
|
||||
};
|
||||
|
||||
socket.connect(addr).await
|
||||
}
|
||||
|
||||
None => TcpStream::connect(addr).await,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
use std::io;
|
||||
#![cfg(feature = "connect")]
|
||||
|
||||
use std::{
|
||||
io,
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
};
|
||||
|
||||
use actix_codec::{BytesCodec, Framed};
|
||||
use actix_rt::net::TcpStream;
|
||||
|
@ -9,7 +14,7 @@ use futures_util::sink::SinkExt;
|
|||
|
||||
use actix_tls::connect::{self as actix_connect, Connect};
|
||||
|
||||
#[cfg(all(feature = "connect", feature = "openssl"))]
|
||||
#[cfg(feature = "openssl")]
|
||||
#[actix_rt::test]
|
||||
async fn test_string() {
|
||||
let srv = TestServer::with(|| {
|
||||
|
@ -125,3 +130,25 @@ async fn test_rustls_uri() {
|
|||
let con = conn.call(addr.into()).await.unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_local_addr() {
|
||||
let srv = TestServer::with(|| {
|
||||
fn_service(|io: TcpStream| async {
|
||||
let mut framed = Framed::new(io, BytesCodec);
|
||||
framed.send(Bytes::from_static(b"test")).await?;
|
||||
Ok::<_, io::Error>(())
|
||||
})
|
||||
});
|
||||
|
||||
let conn = actix_connect::default_connector();
|
||||
let local = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3));
|
||||
|
||||
let (con, _) = conn
|
||||
.call(Connect::with_addr("10", srv.addr()).set_local_addr(local))
|
||||
.await
|
||||
.unwrap()
|
||||
.into_parts();
|
||||
|
||||
assert_eq!(con.local_addr().unwrap().ip(), local)
|
||||
}
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
#![cfg(feature = "connect")]
|
||||
|
||||
use std::{
|
||||
io,
|
||||
net::{Ipv4Addr, SocketAddr},
|
||||
|
|
|
@ -16,7 +16,7 @@ name = "actix_tracing"
|
|||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
actix-service = "2.0.0-beta.4"
|
||||
actix-service = "2.0.0-beta.5"
|
||||
|
||||
futures-util = { version = "0.3.4", default-features = false }
|
||||
tracing = "0.1"
|
||||
|
|
|
@ -7,7 +7,7 @@
|
|||
use core::marker::PhantomData;
|
||||
|
||||
use actix_service::{
|
||||
apply, dev::ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform,
|
||||
apply, ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform,
|
||||
};
|
||||
use futures_util::future::{ok, Either, Ready};
|
||||
use tracing_futures::{Instrument, Instrumented};
|
||||
|
|
|
@ -1,6 +1,15 @@
|
|||
# Changes
|
||||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* Add `async fn mpsc::Receiver::recv`. [#286]
|
||||
* `SendError` inner field is now public. [#286]
|
||||
* Rename `Dispatcher::{get_sink => tx}`. [#286]
|
||||
* Rename `Dispatcher::{get_ref => service}`. [#286]
|
||||
* Rename `Dispatcher::{get_mut => service_mut}`. [#286]
|
||||
* Rename `Dispatcher::{get_framed => framed}`. [#286]
|
||||
* Rename `Dispatcher::{get_framed_mut => framed_mut}`. [#286]
|
||||
|
||||
[#286]: https://github.com/actix/actix-net/pull/286
|
||||
|
||||
|
||||
## 3.0.0-beta.2 - 2021-02-06
|
||||
|
|
|
@ -18,7 +18,7 @@ path = "src/lib.rs"
|
|||
[dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-rt = { version = "2.0.0", default-features = false }
|
||||
actix-service = "2.0.0-beta.4"
|
||||
actix-service = "2.0.0-beta.5"
|
||||
|
||||
futures-core = { version = "0.3.7", default-features = false }
|
||||
futures-sink = { version = "0.3.7", default-features = false }
|
||||
|
|
|
@ -163,29 +163,28 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Get sink
|
||||
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
|
||||
/// Get sender handle.
|
||||
pub fn tx(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
|
||||
self.tx.clone()
|
||||
}
|
||||
|
||||
/// Get reference to a service wrapped by `Dispatcher` instance.
|
||||
pub fn get_ref(&self) -> &S {
|
||||
pub fn service(&self) -> &S {
|
||||
&self.service
|
||||
}
|
||||
|
||||
/// Get mutable reference to a service wrapped by `Dispatcher` instance.
|
||||
pub fn get_mut(&mut self) -> &mut S {
|
||||
pub fn service_mut(&mut self) -> &mut S {
|
||||
&mut self.service
|
||||
}
|
||||
|
||||
/// Get reference to a framed instance wrapped by `Dispatcher`
|
||||
/// instance.
|
||||
pub fn get_framed(&self) -> &Framed<T, U> {
|
||||
/// Get reference to a framed instance wrapped by `Dispatcher` instance.
|
||||
pub fn framed(&self) -> &Framed<T, U> {
|
||||
&self.framed
|
||||
}
|
||||
|
||||
/// Get mutable reference to a framed instance wrapped by `Dispatcher` instance.
|
||||
pub fn get_framed_mut(&mut self) -> &mut Framed<T, U> {
|
||||
pub fn framed_mut(&mut self) -> &mut Framed<T, U> {
|
||||
&mut self.framed
|
||||
}
|
||||
|
||||
|
@ -268,7 +267,7 @@ where
|
|||
if !this.framed.is_write_buf_empty() {
|
||||
match this.framed.flush(cx) {
|
||||
Poll::Pending => break,
|
||||
Poll::Ready(Ok(_)) => (),
|
||||
Poll::Ready(Ok(_)) => {}
|
||||
Poll::Ready(Err(err)) => {
|
||||
debug!("Error sending data: {:?}", err);
|
||||
*this.state = State::FramedError(DispatcherError::Encoder(err));
|
||||
|
@ -318,14 +317,13 @@ where
|
|||
}
|
||||
State::FlushAndStop => {
|
||||
if !this.framed.is_write_buf_empty() {
|
||||
match this.framed.flush(cx) {
|
||||
Poll::Ready(Err(err)) => {
|
||||
this.framed.flush(cx).map(|res| {
|
||||
if let Err(err) = res {
|
||||
debug!("Error sending data: {:?}", err);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
} else {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
|
|
@ -8,5 +8,8 @@
|
|||
pub mod counter;
|
||||
pub mod dispatcher;
|
||||
pub mod mpsc;
|
||||
mod poll_fn;
|
||||
pub mod task;
|
||||
pub mod timeout;
|
||||
|
||||
use self::poll_fn::poll_fn;
|
||||
|
|
|
@ -1,31 +1,35 @@
|
|||
//! A multi-producer, single-consumer, futures-aware, FIFO queue.
|
||||
|
||||
use core::any::Any;
|
||||
use core::cell::RefCell;
|
||||
use core::fmt;
|
||||
use core::pin::Pin;
|
||||
use core::task::{Context, Poll};
|
||||
use core::{
|
||||
cell::RefCell,
|
||||
fmt,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::error::Error;
|
||||
use std::rc::Rc;
|
||||
use std::{collections::VecDeque, error::Error, rc::Rc};
|
||||
|
||||
use futures_core::stream::Stream;
|
||||
use futures_sink::Sink;
|
||||
|
||||
use crate::task::LocalWaker;
|
||||
use crate::{poll_fn, task::LocalWaker};
|
||||
|
||||
/// Creates a unbounded in-memory channel with buffered storage.
|
||||
///
|
||||
/// [Sender]s and [Receiver]s are `!Send`.
|
||||
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
|
||||
let shared = Rc::new(RefCell::new(Shared {
|
||||
has_receiver: true,
|
||||
buffer: VecDeque::new(),
|
||||
blocked_recv: LocalWaker::new(),
|
||||
}));
|
||||
|
||||
let sender = Sender {
|
||||
shared: shared.clone(),
|
||||
};
|
||||
|
||||
let receiver = Receiver { shared };
|
||||
|
||||
(sender, receiver)
|
||||
}
|
||||
|
||||
|
@ -50,18 +54,22 @@ impl<T> Sender<T> {
|
|||
/// Sends the provided message along this channel.
|
||||
pub fn send(&self, item: T) -> Result<(), SendError<T>> {
|
||||
let mut shared = self.shared.borrow_mut();
|
||||
|
||||
if !shared.has_receiver {
|
||||
return Err(SendError(item)); // receiver was dropped
|
||||
// receiver was dropped
|
||||
return Err(SendError(item));
|
||||
};
|
||||
|
||||
shared.buffer.push_back(item);
|
||||
shared.blocked_recv.wake();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Closes the sender half
|
||||
/// Closes the sender half.
|
||||
///
|
||||
/// This prevents any further messages from being sent on the channel while
|
||||
/// still enabling the receiver to drain messages that are buffered.
|
||||
/// This prevents any further messages from being sent on the channel, by any sender, while
|
||||
/// still enabling the receiver to drain messages that are already buffered.
|
||||
pub fn close(&mut self) {
|
||||
self.shared.borrow_mut().has_receiver = false;
|
||||
}
|
||||
|
@ -110,14 +118,24 @@ impl<T> Drop for Sender<T> {
|
|||
|
||||
/// The receiving end of a channel which implements the `Stream` trait.
|
||||
///
|
||||
/// This is created by the `channel` function.
|
||||
/// This is created by the [`channel`] function.
|
||||
#[derive(Debug)]
|
||||
pub struct Receiver<T> {
|
||||
shared: Rc<RefCell<Shared<T>>>,
|
||||
}
|
||||
|
||||
impl<T> Receiver<T> {
|
||||
/// Create Sender
|
||||
/// Receive the next value.
|
||||
///
|
||||
/// Returns `None` if the channel is empty and has been [closed](Sender::close) explicitly or
|
||||
/// when all senders have been dropped and, therefore, no more values can ever be sent though
|
||||
/// this channel.
|
||||
pub async fn recv(&mut self) -> Option<T> {
|
||||
let mut this = Pin::new(self);
|
||||
poll_fn(|cx| this.as_mut().poll_next(cx)).await
|
||||
}
|
||||
|
||||
/// Create an associated [Sender].
|
||||
pub fn sender(&self) -> Sender<T> {
|
||||
Sender {
|
||||
shared: self.shared.clone(),
|
||||
|
@ -132,11 +150,13 @@ impl<T> Stream for Receiver<T> {
|
|||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let mut shared = self.shared.borrow_mut();
|
||||
|
||||
if Rc::strong_count(&self.shared) == 1 {
|
||||
// All senders have been dropped, so drain the buffer and end the
|
||||
// stream.
|
||||
Poll::Ready(shared.buffer.pop_front())
|
||||
} else if let Some(msg) = shared.buffer.pop_front() {
|
||||
// All senders have been dropped, so drain the buffer and end the stream.
|
||||
return Poll::Ready(shared.buffer.pop_front());
|
||||
}
|
||||
|
||||
if let Some(msg) = shared.buffer.pop_front() {
|
||||
Poll::Ready(Some(msg))
|
||||
} else {
|
||||
shared.blocked_recv.register(cx.waker());
|
||||
|
@ -153,9 +173,15 @@ impl<T> Drop for Receiver<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Error type for sending, used when the receiving end of a channel is
|
||||
/// dropped
|
||||
pub struct SendError<T>(T);
|
||||
/// Error returned when attempting to send after the channels' [Receiver] is dropped or closed.
|
||||
pub struct SendError<T>(pub T);
|
||||
|
||||
impl<T> SendError<T> {
|
||||
/// Returns the message that was attempted to be sent but failed.
|
||||
pub fn into_inner(self) -> T {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for SendError<T> {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
|
@ -169,18 +195,7 @@ impl<T> fmt::Display for SendError<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T: Any> Error for SendError<T> {
|
||||
fn description(&self) -> &str {
|
||||
"send failed because receiver is gone"
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> SendError<T> {
|
||||
/// Returns the message that was attempted to be sent but failed.
|
||||
pub fn into_inner(self) -> T {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
impl<T> Error for SendError<T> {}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
@ -221,4 +236,18 @@ mod tests {
|
|||
assert!(tx.send("test").is_err());
|
||||
assert!(tx2.send("test").is_err());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_recv() {
|
||||
let (tx, mut rx) = channel();
|
||||
tx.send("test").unwrap();
|
||||
assert_eq!(rx.recv().await.unwrap(), "test");
|
||||
drop(tx);
|
||||
|
||||
let (tx, mut rx) = channel();
|
||||
tx.send("test").unwrap();
|
||||
assert_eq!(rx.recv().await.unwrap(), "test");
|
||||
drop(tx);
|
||||
assert!(rx.recv().await.is_none());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
//! Simple "poll function" future and factory.
|
||||
|
||||
use core::{
|
||||
fmt,
|
||||
future::Future,
|
||||
task::{self, Poll},
|
||||
};
|
||||
use std::pin::Pin;
|
||||
|
||||
/// Create a future driven by the provided function that receives a task context.
|
||||
pub(crate) fn poll_fn<F, T>(f: F) -> PollFn<F>
|
||||
where
|
||||
F: FnMut(&mut task::Context<'_>) -> Poll<T>,
|
||||
{
|
||||
PollFn { f }
|
||||
}
|
||||
|
||||
/// A Future driven by the inner function.
|
||||
pub(crate) struct PollFn<F> {
|
||||
f: F,
|
||||
}
|
||||
|
||||
impl<F> Unpin for PollFn<F> {}
|
||||
|
||||
impl<F> fmt::Debug for PollFn<F> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("PollFn").finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<F, T> Future for PollFn<F>
|
||||
where
|
||||
F: FnMut(&mut task::Context<'_>) -> task::Poll<T>,
|
||||
{
|
||||
type Output = T;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
|
||||
(self.f)(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_poll_fn() {
|
||||
let res = poll_fn(|_| Poll::Ready(42)).await;
|
||||
assert_eq!(res, 42);
|
||||
|
||||
let mut i = 5;
|
||||
let res = poll_fn(|cx| {
|
||||
i -= 1;
|
||||
|
||||
if i > 0 {
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
} else {
|
||||
Poll::Ready(42)
|
||||
}
|
||||
})
|
||||
.await;
|
||||
assert_eq!(res, 42);
|
||||
}
|
||||
}
|
|
@ -1,49 +1,33 @@
|
|||
use core::cell::UnsafeCell;
|
||||
use core::fmt;
|
||||
use core::marker::PhantomData;
|
||||
use core::task::Waker;
|
||||
use core::{cell::Cell, fmt, marker::PhantomData, task::Waker};
|
||||
|
||||
/// A synchronization primitive for task wakeup.
|
||||
///
|
||||
/// Sometimes the task interested in a given event will change over time.
|
||||
/// An `LocalWaker` can coordinate concurrent notifications with the consumer
|
||||
/// potentially "updating" the underlying task to wake up. This is useful in
|
||||
/// scenarios where a computation completes in another task and wants to
|
||||
/// notify the consumer, but the consumer is in the process of being migrated to
|
||||
/// a new logical task.
|
||||
/// Sometimes the task interested in a given event will change over time. A `LocalWaker` can
|
||||
/// coordinate concurrent notifications with the consumer, potentially "updating" the underlying
|
||||
/// task to wake up. This is useful in scenarios where a computation completes in another task and
|
||||
/// wants to notify the consumer, but the consumer is in the process of being migrated to a new
|
||||
/// logical task.
|
||||
///
|
||||
/// Consumers should call `register` before checking the result of a computation
|
||||
/// and producers should call `wake` after producing the computation (this
|
||||
/// differs from the usual `thread::park` pattern). It is also permitted for
|
||||
/// `wake` to be called **before** `register`. This results in a no-op.
|
||||
/// Consumers should call [`register`] before checking the result of a computation and producers
|
||||
/// should call [`wake`] after producing the computation (this differs from the usual `thread::park`
|
||||
/// pattern). It is also permitted for [`wake`] to be called _before_ [`register`]. This results in
|
||||
/// a no-op.
|
||||
///
|
||||
/// A single `AtomicWaker` may be reused for any number of calls to `register` or
|
||||
/// `wake`.
|
||||
// TODO: Refactor to Cell when remove deprecated methods (@botika)
|
||||
/// A single `LocalWaker` may be reused for any number of calls to [`register`] or [`wake`].
|
||||
///
|
||||
/// [`register`]: LocalWaker::register
|
||||
/// [`wake`]: LocalWaker::wake
|
||||
#[derive(Default)]
|
||||
pub struct LocalWaker {
|
||||
pub(crate) waker: UnsafeCell<Option<Waker>>,
|
||||
pub(crate) waker: Cell<Option<Waker>>,
|
||||
// mark LocalWaker as a !Send type.
|
||||
_t: PhantomData<*const ()>,
|
||||
_phantom: PhantomData<*const ()>,
|
||||
}
|
||||
|
||||
impl LocalWaker {
|
||||
/// Create an `LocalWaker`.
|
||||
/// Creates a new, empty `LocalWaker`.
|
||||
pub fn new() -> Self {
|
||||
LocalWaker {
|
||||
waker: UnsafeCell::new(None),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
#[deprecated(
|
||||
since = "2.1.0",
|
||||
note = "In favor of `wake`. State of the register doesn't matter at `wake` up"
|
||||
)]
|
||||
/// Check if waker has been registered.
|
||||
#[inline]
|
||||
pub fn is_registered(&self) -> bool {
|
||||
unsafe { (*self.waker.get()).is_some() }
|
||||
LocalWaker::default()
|
||||
}
|
||||
|
||||
/// Registers the waker to be notified on calls to `wake`.
|
||||
|
@ -51,11 +35,8 @@ impl LocalWaker {
|
|||
/// Returns `true` if waker was registered before.
|
||||
#[inline]
|
||||
pub fn register(&self, waker: &Waker) -> bool {
|
||||
unsafe {
|
||||
let w = self.waker.get();
|
||||
let last_waker = w.replace(Some(waker.clone()));
|
||||
last_waker.is_some()
|
||||
}
|
||||
let last_waker = self.waker.replace(Some(waker.clone()));
|
||||
last_waker.is_some()
|
||||
}
|
||||
|
||||
/// Calls `wake` on the last `Waker` passed to `register`.
|
||||
|
@ -73,7 +54,7 @@ impl LocalWaker {
|
|||
/// If a waker has not been registered, this returns `None`.
|
||||
#[inline]
|
||||
pub fn take(&self) -> Option<Waker> {
|
||||
unsafe { (*self.waker.get()).take() }
|
||||
self.waker.take()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -197,7 +197,6 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use core::task::Poll;
|
||||
use core::time::Duration;
|
||||
|
||||
use super::*;
|
||||
|
|
Loading…
Reference in New Issue