diff --git a/.cargo/config.toml b/.cargo/config.toml
new file mode 100644
index 00000000..40fe3e57
--- /dev/null
+++ b/.cargo/config.toml
@@ -0,0 +1,3 @@
+[alias]
+chk = "hack check --workspace --all-features --tests --examples"
+lint = "hack --clean-per-run clippy --workspace --tests --examples"
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 00000000..2bb9a234
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,131 @@
+name: CI
+
+on:
+ pull_request:
+ types: [opened, synchronize, reopened]
+ push:
+ branches: [master]
+
+jobs:
+ build_and_test:
+ strategy:
+ fail-fast: false
+ matrix:
+ target:
+ - { name: Linux, os: ubuntu-latest, triple: x86_64-unknown-linux-gnu }
+ - { name: macOS, os: macos-latest, triple: x86_64-apple-darwin }
+ - { name: Windows, os: windows-latest, triple: x86_64-pc-windows-msvc }
+ - { name: Windows (MinGW), os: windows-latest, triple: x86_64-pc-windows-gnu }
+ - { name: Windows (32-bit), os: windows-latest, triple: i686-pc-windows-msvc }
+ version:
+ - 1.46.0 # MSRV
+ - stable
+ - nightly
+
+ name: ${{ matrix.target.name }} / ${{ matrix.version }}
+ runs-on: ${{ matrix.target.os }}
+
+ env:
+ VCPKGRS_DYNAMIC: 1
+
+ steps:
+ - name: Setup Routing
+ if: matrix.target.os == 'macos-latest'
+ run: sudo ifconfig lo0 alias 127.0.0.3
+
+ - uses: actions/checkout@v2
+
+ # install OpenSSL on Windows
+ - name: Set vcpkg root
+ if: matrix.target.triple == 'x86_64-pc-windows-msvc' || matrix.target.triple == 'i686-pc-windows-msvc'
+ run: echo "VCPKG_ROOT=$env:VCPKG_INSTALLATION_ROOT" | Out-File -FilePath $env:GITHUB_ENV -Append
+ - name: Install OpenSSL
+ if: matrix.target.triple == 'x86_64-pc-windows-msvc'
+ run: vcpkg install openssl:x64-windows
+ - name: Install OpenSSL
+ if: matrix.target.triple == 'i686-pc-windows-msvc'
+ run: vcpkg install openssl:x86-windows
+
+ - name: Install ${{ matrix.version }}
+ uses: actions-rs/toolchain@v1
+ with:
+ toolchain: ${{ matrix.version }}-${{ matrix.target.triple }}
+ profile: minimal
+ override: true
+
+ # - name: Install MSYS2
+ # if: matrix.target.triple == 'x86_64-pc-windows-gnu'
+ # uses: msys2/setup-msys2@v2
+ # - name: Install MinGW Packages
+ # if: matrix.target.triple == 'x86_64-pc-windows-gnu'
+ # run: |
+ # msys2 -c 'pacman -Sy --noconfirm pacman'
+ # msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
+
+ # - name: Generate Cargo.lock
+ # uses: actions-rs/cargo@v1
+ # with:
+ # command: generate-lockfile
+ # - name: Cache Dependencies
+ # uses: Swatinem/rust-cache@v1.2.0
+
+ - name: Install cargo-hack
+ uses: actions-rs/cargo@v1
+ with:
+ command: install
+ args: cargo-hack
+
+ - name: check minimal
+ uses: actions-rs/cargo@v1
+ with:
+ command: hack
+ args: check --workspace --no-default-features
+
+ - name: check minimal + tests
+ uses: actions-rs/cargo@v1
+ with:
+ command: hack
+ args: check --workspace --no-default-features --tests --examples
+
+ - name: check default
+ uses: actions-rs/cargo@v1
+ with:
+ command: check
+ args: --workspace --tests --examples
+
+ - name: check full
+ # TODO: compile OpenSSL and run tests on MinGW
+ if: matrix.target.triple != 'x86_64-pc-windows-gnu'
+ uses: actions-rs/cargo@v1
+ with:
+ command: check
+ args: --workspace --all-features --tests --examples
+
+ - name: tests
+ if: matrix.target.triple != 'x86_64-pc-windows-gnu'
+ uses: actions-rs/cargo@v1
+ with:
+ command: test
+ args: --workspace --all-features --no-fail-fast -- --nocapture
+
+ - name: Generate coverage file
+ if: >
+ matrix.target.os == 'ubuntu-latest'
+ && matrix.version == 'stable'
+ && github.ref == 'refs/heads/master'
+ run: |
+ cargo install cargo-tarpaulin
+ cargo tarpaulin --out Xml --verbose
+ - name: Upload to Codecov
+ if: >
+ matrix.target.os == 'ubuntu-latest'
+ && matrix.version == 'stable'
+ && github.ref == 'refs/heads/master'
+ uses: codecov/codecov-action@v1
+ with:
+ file: cobertura.xml
+
+ - name: Clear the cargo caches
+ run: |
+ cargo install cargo-cache --no-default-features --features ci-autoclean
+ cargo-cache
diff --git a/.github/workflows/clippy-fmt.yml b/.github/workflows/clippy-fmt.yml
index 12343dd4..3bef81db 100644
--- a/.github/workflows/clippy-fmt.yml
+++ b/.github/workflows/clippy-fmt.yml
@@ -1,34 +1,42 @@
+name: Lint
+
on:
pull_request:
types: [opened, synchronize, reopened]
-name: Clippy and rustfmt Check
jobs:
- clippy_check:
+ fmt:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- - uses: actions-rs/toolchain@v1
+ - name: Install Rust
+ uses: actions-rs/toolchain@v1
with:
toolchain: stable
- components: rustfmt
profile: minimal
+ components: rustfmt
override: true
- - name: Check with rustfmt
+ - name: Rustfmt Check
uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check
- - uses: actions-rs/toolchain@v1
+ clippy:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Install Rust
+ uses: actions-rs/toolchain@v1
with:
- toolchain: nightly
- components: clippy
+ toolchain: stable
profile: minimal
+ components: clippy
override: true
- - name: Check with Clippy
+ - name: Clippy Check
uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
- args: --workspace --tests
+ args: --workspace --tests --all-features
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
deleted file mode 100644
index 8ea7823d..00000000
--- a/.github/workflows/linux.yml
+++ /dev/null
@@ -1,82 +0,0 @@
-name: CI (Linux)
-
-on:
- pull_request:
- types: [opened, synchronize, reopened]
- push:
- branches:
- - master
- - '1.0'
-
-jobs:
- build_and_test:
- strategy:
- fail-fast: false
- matrix:
- version:
- - 1.46.0
- - stable
- - nightly
-
- name: ${{ matrix.version }} - x86_64-unknown-linux-gnu
- runs-on: ubuntu-latest
-
- steps:
- - uses: actions/checkout@v2
-
- - name: Install ${{ matrix.version }}
- uses: actions-rs/toolchain@v1
- with:
- toolchain: ${{ matrix.version }}-x86_64-unknown-linux-gnu
- profile: minimal
- override: true
-
- - name: Generate Cargo.lock
- uses: actions-rs/cargo@v1
- with:
- command: generate-lockfile
- - name: Cache cargo dirs
- uses: actions/cache@v2
- with:
- path:
- ~/.cargo/registry
- ~/.cargo/git
- ~/.cargo/bin
- key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-trimmed-${{ hashFiles('**/Cargo.lock') }}
- - name: Cache cargo build
- uses: actions/cache@v2
- with:
- path: target
- key: ${{ matrix.version }}-x86_64-unknown-linux-gnu-cargo-build-trimmed-${{ hashFiles('**/Cargo.lock') }}
-
- - name: check build
- uses: actions-rs/cargo@v1
- with:
- command: check
- args: --workspace --bins --examples --tests
-
- - name: tests
- uses: actions-rs/cargo@v1
- timeout-minutes: 40
- with:
- command: test
- args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture
-
- - name: Generate coverage file
- if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
- run: |
- cargo install cargo-tarpaulin
- cargo tarpaulin --out Xml --workspace
-
- - name: Upload to Codecov
- if: matrix.version == 'stable' && (github.ref == 'refs/heads/master' || github.event_name == 'pull_request')
- uses: codecov/codecov-action@v1
- with:
- file: cobertura.xml
-
- - name: Clear the cargo caches
- run: |
- rustup update stable
- rustup override set stable
- cargo install cargo-cache --no-default-features --features ci-autoclean
- cargo-cache
diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml
deleted file mode 100644
index b2555bd3..00000000
--- a/.github/workflows/macos.yml
+++ /dev/null
@@ -1,43 +0,0 @@
-name: CI (macOS)
-
-on:
- pull_request:
- types: [opened, synchronize, reopened]
- push:
- branches:
- - master
- - '1.0'
-
-jobs:
- build_and_test:
- strategy:
- fail-fast: false
- matrix:
- version:
- - stable
- - nightly
-
- name: ${{ matrix.version }} - x86_64-apple-darwin
- runs-on: macos-latest
-
- steps:
- - uses: actions/checkout@v2
-
- - name: Install ${{ matrix.version }}
- uses: actions-rs/toolchain@v1
- with:
- toolchain: ${{ matrix.version }}-x86_64-apple-darwin
- profile: minimal
- override: true
-
- - name: check build
- uses: actions-rs/cargo@v1
- with:
- command: check
- args: --workspace --bins --examples --tests
-
- - name: tests
- uses: actions-rs/cargo@v1
- with:
- command: test
- args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture
diff --git a/.github/workflows/upload-doc.yml b/.github/workflows/upload-doc.yml
new file mode 100644
index 00000000..36044230
--- /dev/null
+++ b/.github/workflows/upload-doc.yml
@@ -0,0 +1,35 @@
+name: Upload documentation
+
+on:
+ push:
+ branches: [master]
+
+jobs:
+ build:
+ runs-on: ubuntu-latest
+
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Install Rust
+ uses: actions-rs/toolchain@v1
+ with:
+ toolchain: nightly-x86_64-unknown-linux-gnu
+ profile: minimal
+ override: true
+
+ - name: Build Docs
+ uses: actions-rs/cargo@v1
+ with:
+ command: doc
+ args: --workspace --all-features --no-deps
+
+ - name: Tweak HTML
+ run: echo '' > target/doc/index.html
+
+ - name: Deploy to GitHub Pages
+ uses: JamesIves/github-pages-deploy-action@3.7.1
+ with:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ BRANCH: gh-pages
+ FOLDER: target/doc
diff --git a/.github/workflows/windows-mingw.yml b/.github/workflows/windows-mingw.yml
deleted file mode 100644
index 1fd5fc59..00000000
--- a/.github/workflows/windows-mingw.yml
+++ /dev/null
@@ -1,45 +0,0 @@
-name: CI (Windows-mingw)
-
-on:
- pull_request:
- types: [opened, synchronize, reopened]
- push:
- branches:
- - master
- - '1.0'
-
-jobs:
- build_and_test:
- strategy:
- fail-fast: false
- matrix:
- version:
- - stable
- - nightly
-
- name: ${{ matrix.version }} - x86_64-pc-windows-gnu
- runs-on: windows-latest
-
- steps:
- - uses: actions/checkout@v2
-
- - name: Install ${{ matrix.version }}
- uses: actions-rs/toolchain@v1
- with:
- toolchain: ${{ matrix.version }}-x86_64-pc-windows-gnu
- profile: minimal
- override: true
-
- - name: Install MSYS2
- uses: msys2/setup-msys2@v2
-
- - name: Install packages
- run: |
- msys2 -c 'pacman -Sy --noconfirm pacman'
- msys2 -c 'pacman --noconfirm -S base-devel pkg-config'
-
- - name: check build
- uses: actions-rs/cargo@v1
- with:
- command: check
- args: --workspace --bins --examples --tests
diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml
deleted file mode 100644
index b2b57989..00000000
--- a/.github/workflows/windows.yml
+++ /dev/null
@@ -1,69 +0,0 @@
-name: CI (Windows)
-
-on:
- pull_request:
- types: [opened, synchronize, reopened]
- push:
- branches:
- - master
- - '1.0'
-
-env:
- VCPKGRS_DYNAMIC: 1
-
-jobs:
- build_and_test:
- strategy:
- fail-fast: false
- matrix:
- version:
- - stable
- - nightly
- target:
- - x86_64-pc-windows-msvc
- - i686-pc-windows-msvc
-
- name: ${{ matrix.version }} - ${{ matrix.target }}
- runs-on: windows-latest
-
- steps:
- - uses: actions/checkout@v2
-
- - name: Install ${{ matrix.version }}
- uses: actions-rs/toolchain@v1
- with:
- toolchain: ${{ matrix.version }}-${{ matrix.target }}
- profile: minimal
- override: true
-
- - name: Install OpenSSL (x64)
- if: matrix.target == 'x86_64-pc-windows-msvc'
- run: |
- vcpkg integrate install
- vcpkg install openssl:x64-windows
- Get-ChildItem C:\vcpkg\installed\x64-windows\bin
- Get-ChildItem C:\vcpkg\installed\x64-windows\lib
- Copy-Item C:\vcpkg\installed\x64-windows\bin\libcrypto-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libcrypto.dll
- Copy-Item C:\vcpkg\installed\x64-windows\bin\libssl-1_1-x64.dll C:\vcpkg\installed\x64-windows\bin\libssl.dll
-
- - name: Install OpenSSL (x86)
- if: matrix.target == 'i686-pc-windows-msvc'
- run: |
- vcpkg integrate install
- vcpkg install openssl:x86-windows
- Get-ChildItem C:\vcpkg\installed\x86-windows\bin
- Get-ChildItem C:\vcpkg\installed\x86-windows\lib
- Copy-Item C:\vcpkg\installed\x86-windows\bin\libcrypto-1_1.dll C:\vcpkg\installed\x86-windows\bin\libcrypto.dll
- Copy-Item C:\vcpkg\installed\x86-windows\bin\libssl-1_1.dll C:\vcpkg\installed\x86-windows\bin\libssl.dll
-
- - name: check build
- uses: actions-rs/cargo@v1
- with:
- command: check
- args: --workspace --bins --examples --tests
-
- - name: tests
- uses: actions-rs/cargo@v1
- with:
- command: test
- args: --workspace --exclude=actix-tls --no-fail-fast -- --nocapture
diff --git a/Cargo.toml b/Cargo.toml
index 78e54d35..5bf72300 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,6 +10,8 @@ members = [
"actix-tracing",
"actix-utils",
"bytestring",
+ "local-channel",
+ "local-waker",
]
[patch.crates-io]
@@ -23,3 +25,5 @@ actix-tls = { path = "actix-tls" }
actix-tracing = { path = "actix-tracing" }
actix-utils = { path = "actix-utils" }
bytestring = { path = "bytestring" }
+local-channel = { path = "local-channel" }
+local-waker = { path = "local-waker" }
diff --git a/LICENSE-APACHE b/LICENSE-APACHE
index 6cdf2d16..8f5ba39b 100644
--- a/LICENSE-APACHE
+++ b/LICENSE-APACHE
@@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier
identification within third-party archives.
- Copyright 2017-NOW Nikolay Kim
+ Copyright 2017-NOW Actix Team
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
diff --git a/LICENSE-MIT b/LICENSE-MIT
index 0f80296a..d559b1cd 100644
--- a/LICENSE-MIT
+++ b/LICENSE-MIT
@@ -1,4 +1,4 @@
-Copyright (c) 2017 Nikolay Kim
+Copyright (c) 2017-NOW Actix Team
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md
index f6102cbf..fd893454 100644
--- a/actix-codec/CHANGES.md
+++ b/actix-codec/CHANGES.md
@@ -3,6 +3,10 @@
## Unreleased - 2021-xx-xx
+## 0.4.0 - 2021-04-20
+* No significant changes since v0.4.0-beta.1.
+
+
## 0.4.0-beta.1 - 2020-12-28
* Replace `pin-project` with `pin-project-lite`. [#237]
* Upgrade `tokio` dependency to `1`. [#237]
@@ -23,28 +27,28 @@
## 0.3.0-beta.1 - 2020-08-19
* Use `.advance()` instead of `.split_to()`.
* Upgrade `tokio-util` to `0.3`.
-* Improve `BytesCodec` `.encode()` performance
-* Simplify `BytesCodec` `.decode()`
+* Improve `BytesCodec::encode()` performance.
+* Simplify `BytesCodec::decode()`.
* Rename methods on `Framed` to better describe their use.
* Add method on `Framed` to get a pinned reference to the underlying I/O.
* Add method on `Framed` check emptiness of read buffer.
## 0.2.0 - 2019-12-10
-* Use specific futures dependencies
+* Use specific futures dependencies.
## 0.2.0-alpha.4
-* Fix buffer remaining capacity calculation
+* Fix buffer remaining capacity calculation.
## 0.2.0-alpha.3
-* Use tokio 0.2
-* Fix low/high watermark for write/read buffers
+* Use tokio 0.2.
+* Fix low/high watermark for write/read buffers.
## 0.2.0-alpha.2
-* Migrated to `std::future`
+* Migrated to `std::future`.
## 0.1.2 - 2019-03-27
@@ -56,4 +60,4 @@
## 0.1.0 - 2018-12-09
-* Move codec to separate crate
+* Move codec to separate crate.
diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml
index 95a24764..815f1039 100644
--- a/actix-codec/Cargo.toml
+++ b/actix-codec/Cargo.toml
@@ -1,12 +1,10 @@
[package]
name = "actix-codec"
-version = "0.4.0-beta.1"
+version = "0.4.0"
authors = ["Nikolay Kim "]
description = "Codec utilities for working with framed protocols"
keywords = ["network", "framework", "async", "futures"]
-homepage = "https://actix.rs"
-repository = "https://github.com/actix/actix-net.git"
-documentation = "https://docs.rs/actix-codec"
+repository = "https://github.com/actix/actix-net"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs
index dec30ba6..c7713bfe 100644
--- a/actix-codec/src/lib.rs
+++ b/actix-codec/src/lib.rs
@@ -7,7 +7,7 @@
//! [`Sink`]: futures_sink::Sink
//! [`Stream`]: futures_core::Stream
-#![deny(rust_2018_idioms, nonstandard_style)]
+#![deny(rust_2018_idioms, nonstandard_style, future_incompatible)]
#![warn(missing_docs)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
diff --git a/actix-macros/Cargo.toml b/actix-macros/Cargo.toml
index 0555f990..1664fc27 100644
--- a/actix-macros/Cargo.toml
+++ b/actix-macros/Cargo.toml
@@ -19,5 +19,5 @@ syn = { version = "^1", features = ["full"] }
[dev-dependencies]
actix-rt = "2.0.0"
-futures-util = { version = "0.3", default-features = false }
+futures-util = { version = "0.3.7", default-features = false }
trybuild = "1"
diff --git a/actix-router/src/resource.rs b/actix-router/src/resource.rs
index 8dbef26c..32162c53 100644
--- a/actix-router/src/resource.rs
+++ b/actix-router/src/resource.rs
@@ -581,10 +581,7 @@ impl ResourceDef {
mut for_prefix: bool,
) -> (String, Vec, bool, usize) {
if pattern.find('{').is_none() {
- // TODO: MSRV: 1.45
- #[allow(clippy::manual_strip)]
- return if pattern.ends_with('*') {
- let path = &pattern[..pattern.len() - 1];
+ return if let Some(path) = pattern.strip_suffix('*') {
let re = String::from("^") + path + "(.*)";
(re, vec![PatternElement::Str(String::from(path))], true, 0)
} else {
@@ -670,8 +667,6 @@ pub(crate) fn insert_slash(path: &str) -> String {
#[cfg(test)]
mod tests {
use super::*;
- use http::Uri;
- use std::convert::TryFrom;
#[test]
fn test_parse_static() {
@@ -833,8 +828,11 @@ mod tests {
assert!(re.is_match("/user/2345/sdg"));
}
+ #[cfg(feature = "http")]
#[test]
fn test_parse_urlencoded_param() {
+ use std::convert::TryFrom;
+
let re = ResourceDef::new("/user/{id}/test");
let mut path = Path::new("/user/2345/test");
@@ -845,7 +843,7 @@ mod tests {
assert!(re.match_path(&mut path));
assert_eq!(path.get("id").unwrap(), "qwe%25");
- let uri = Uri::try_from("/user/qwe%25/test").unwrap();
+ let uri = http::Uri::try_from("/user/qwe%25/test").unwrap();
let mut path = Path::new(uri);
assert!(re.match_path(&mut path));
assert_eq!(path.get("id").unwrap(), "qwe%25");
diff --git a/actix-router/src/url.rs b/actix-router/src/url.rs
index d2dd7a19..f669da99 100644
--- a/actix-router/src/url.rs
+++ b/actix-router/src/url.rs
@@ -170,13 +170,11 @@ impl Quoter {
idx += 1;
}
- if let Some(data) = cloned {
- // Unsafe: we get data from http::Uri, which does utf-8 checks already
+ cloned.map(|data| {
+ // SAFETY: we get data from http::Uri, which does UTF-8 checks already
// this code only decodes valid pct encoded values
- Some(unsafe { String::from_utf8_unchecked(data) })
- } else {
- None
- }
+ unsafe { String::from_utf8_unchecked(data) }
+ })
}
}
diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md
index 6754ca33..459d91a7 100644
--- a/actix-rt/CHANGES.md
+++ b/actix-rt/CHANGES.md
@@ -3,6 +3,22 @@
## Unreleased - 2021-xx-xx
+## 2.2.0 - 2021-03-29
+* **BREAKING** `ActixStream::{poll_read_ready, poll_write_ready}` methods now return
+ `Ready` object in ok variant. [#293]
+ * Breakage is acceptable since `ActixStream` was not intended to be public.
+
+[#293] https://github.com/actix/actix-net/pull/293
+
+
+## 2.1.0 - 2021-02-24
+* Add `ActixStream` extension trait to include readiness methods. [#276]
+* Re-export `tokio::net::TcpSocket` in `net` module [#282]
+
+[#276]: https://github.com/actix/actix-net/pull/276
+[#282]: https://github.com/actix/actix-net/pull/282
+
+
## 2.0.2 - 2021-02-06
* Add `Arbiter::handle` to get a handle of an owned Arbiter. [#274]
* Add `System::try_current` for situations where actix may or may not be running a System. [#275]
@@ -56,10 +72,7 @@
## 2.0.0-beta.1 - 2020-12-28
-### Added
* Add `System::attach_to_tokio` method. [#173]
-
-### Changed
* Update `tokio` dependency to `1.0`. [#236]
* Rename `time` module `delay_for` to `sleep`, `delay_until` to `sleep_until`, `Delay` to `Sleep`
to stay aligned with Tokio's naming. [#236]
@@ -67,27 +80,19 @@
* These methods now accept `&self` when calling. [#236]
* Remove `'static` lifetime requirement for `System::run` and `Builder::run`. [#236]
* `Arbiter::spawn` now panics when `System` is not in scope. [#207]
-
-### Fixed
* Fix work load issue by removing `PENDING` thread local. [#207]
[#207]: https://github.com/actix/actix-net/pull/207
[#236]: https://github.com/actix/actix-net/pull/236
-## [1.1.1] - 2020-04-30
-
-### Fixed
+## 1.1.1 - 2020-04-30
* Fix memory leak due to [#94] (see [#129] for more detail)
[#129]: https://github.com/actix/actix-net/issues/129
-## [1.1.0] - 2020-04-08
-
-**This version has been yanked.**
-
-### Added
+## 1.1.0 - 2020-04-08 (YANKED)
* Expose `System::is_set` to check if current system has ben started [#99]
* Add `Arbiter::is_running` to check if event loop is running [#124]
* Add `Arbiter::local_join` associated function
@@ -97,96 +102,57 @@
[#99]: https://github.com/actix/actix-net/pull/99
[#124]: https://github.com/actix/actix-net/pull/124
-## [1.0.0] - 2019-12-11
+## 1.0.0 - 2019-12-11
* Update dependencies
-## [1.0.0-alpha.3] - 2019-12-07
-
-### Fixed
+## 1.0.0-alpha.3 - 2019-12-07
+* Migrate to tokio 0.2
* Fix compilation on non-unix platforms
-### Changed
-
-* Migrate to tokio 0.2
-
-
-## [1.0.0-alpha.2] - 2019-12-02
-
-Added
+## 1.0.0-alpha.2 - 2019-12-02
* Export `main` and `test` attribute macros
-
* Export `time` module (re-export of tokio-timer)
-
* Export `net` module (re-export of tokio-net)
-## [1.0.0-alpha.1] - 2019-11-22
-
-### Changed
-
+## 1.0.0-alpha.1 - 2019-11-22
* Migrate to std::future and tokio 0.2
-## [0.2.6] - 2019-11-14
-
-### Fixed
-
+## 0.2.6 - 2019-11-14
+* Allow to join arbiter's thread. #60
* Fix arbiter's thread panic message.
-### Added
-
-* Allow to join arbiter's thread. #60
-
-
-## [0.2.5] - 2019-09-02
-
-### Added
+## 0.2.5 - 2019-09-02
* Add arbiter specific storage
-## [0.2.4] - 2019-07-17
-
-### Changed
-
+## 0.2.4 - 2019-07-17
* Avoid a copy of the Future when initializing the Box. #29
-## [0.2.3] - 2019-06-22
-
-### Added
-
-* Allow to start System using exsiting CurrentThread Handle #22
+## 0.2.3 - 2019-06-22
+* Allow to start System using existing CurrentThread Handle #22
-## [0.2.2] - 2019-03-28
-
-### Changed
-
+## 0.2.2 - 2019-03-28
* Moved `blocking` module to `actix-threadpool` crate
-## [0.2.1] - 2019-03-11
-
-### Added
-
+## 0.2.1 - 2019-03-11
* Added `blocking` module
-
-* Arbiter::exec_fn - execute fn on the arbiter's thread
-
-* Arbiter::exec - execute fn on the arbiter's thread and wait result
+* Added `Arbiter::exec_fn` - execute fn on the arbiter's thread
+* Added `Arbiter::exec` - execute fn on the arbiter's thread and wait result
-## [0.2.0] - 2019-03-06
-
+## 0.2.0 - 2019-03-06
* `run` method returns `io::Result<()>`
-
* Removed `Handle`
-## [0.1.0] - 2018-12-09
-
+## 0.1.0 - 2018-12-09
* Initial release
diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml
index 7990e67d..f4a90d2c 100644
--- a/actix-rt/Cargo.toml
+++ b/actix-rt/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "actix-rt"
-version = "2.0.2"
+version = "2.2.0"
authors = [
"Nikolay Kim ",
"Rob Ede ",
@@ -8,7 +8,7 @@ authors = [
description = "Tokio-based single-threaded async runtime for the Actix ecosystem"
keywords = ["async", "futures", "io", "runtime"]
homepage = "https://actix.rs"
-repository = "https://github.com/actix/actix-net.git"
+repository = "https://github.com/actix/actix-net"
documentation = "https://docs.rs/actix-rt"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
@@ -26,7 +26,7 @@ macros = ["actix-macros"]
actix-macros = { version = "0.2.0", optional = true }
futures-core = { version = "0.3", default-features = false }
-tokio = { version = "1.2", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
+tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }
[dev-dependencies]
tokio = { version = "1.2", features = ["full"] }
diff --git a/actix-rt/README.md b/actix-rt/README.md
index c29d563d..4ad75f09 100644
--- a/actix-rt/README.md
+++ b/actix-rt/README.md
@@ -2,4 +2,13 @@
> Tokio-based single-threaded async runtime for the Actix ecosystem.
+[](https://crates.io/crates/actix-rt)
+[](https://docs.rs/actix-rt/2.2.0)
+[](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
+
+
+[](https://deps.rs/crate/actix-rt/2.2.0)
+
+[](https://discord.gg/WghFtEH6Hb)
+
See crate documentation for more: https://docs.rs/actix-rt.
diff --git a/actix-rt/examples/multi_thread_system.rs b/actix-rt/examples/multi_thread_system.rs
new file mode 100644
index 00000000..0ecd1ef1
--- /dev/null
+++ b/actix-rt/examples/multi_thread_system.rs
@@ -0,0 +1,60 @@
+//! An example on how to build a multi-thread tokio runtime for Actix System.
+//! Then spawn async task that can make use of work stealing of tokio runtime.
+
+use actix_rt::System;
+
+fn main() {
+ System::with_tokio_rt(|| {
+ // build system with a multi-thread tokio runtime.
+ tokio::runtime::Builder::new_multi_thread()
+ .worker_threads(2)
+ .enable_all()
+ .build()
+ .unwrap()
+ })
+ .block_on(async_main());
+}
+
+// async main function that acts like #[actix_web::main] or #[tokio::main]
+async fn async_main() {
+ let (tx, rx) = tokio::sync::oneshot::channel();
+
+ // get a handle to system arbiter and spawn async task on it
+ System::current().arbiter().spawn(async {
+ // use tokio::spawn to get inside the context of multi thread tokio runtime
+ let h1 = tokio::spawn(async {
+ println!("thread id is {:?}", std::thread::current().id());
+ std::thread::sleep(std::time::Duration::from_secs(2));
+ });
+
+ // work stealing occurs for this task spawn
+ let h2 = tokio::spawn(async {
+ println!("thread id is {:?}", std::thread::current().id());
+ });
+
+ h1.await.unwrap();
+ h2.await.unwrap();
+ let _ = tx.send(());
+ });
+
+ rx.await.unwrap();
+
+ let (tx, rx) = tokio::sync::oneshot::channel();
+ let now = std::time::Instant::now();
+
+ // without additional tokio::spawn, all spawned tasks run on single thread
+ System::current().arbiter().spawn(async {
+ println!("thread id is {:?}", std::thread::current().id());
+ std::thread::sleep(std::time::Duration::from_secs(2));
+ let _ = tx.send(());
+ });
+
+ // previous spawn task has blocked the system arbiter thread
+ // so this task will wait for 2 seconds until it can be run
+ System::current().arbiter().spawn(async move {
+ println!("thread id is {:?}", std::thread::current().id());
+ assert!(now.elapsed() > std::time::Duration::from_secs(2));
+ });
+
+ rx.await.unwrap();
+}
diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs
index a7e9f309..4454b3c4 100644
--- a/actix-rt/src/lib.rs
+++ b/actix-rt/src/lib.rs
@@ -70,13 +70,74 @@ pub mod signal {
}
pub mod net {
- //! TCP/UDP/Unix bindings (Tokio re-exports).
+ //! TCP/UDP/Unix bindings (mostly Tokio re-exports).
+ use std::{
+ future::Future,
+ io,
+ task::{Context, Poll},
+ };
+
+ pub use tokio::io::Ready;
+ use tokio::io::{AsyncRead, AsyncWrite, Interest};
pub use tokio::net::UdpSocket;
- pub use tokio::net::{TcpListener, TcpStream};
+ pub use tokio::net::{TcpListener, TcpSocket, TcpStream};
#[cfg(unix)]
pub use tokio::net::{UnixDatagram, UnixListener, UnixStream};
+
+ /// Extension trait over async read+write types that can also signal readiness.
+ #[doc(hidden)]
+ pub trait ActixStream: AsyncRead + AsyncWrite + Unpin {
+ /// Poll stream and check read readiness of Self.
+ ///
+ /// See [tokio::net::TcpStream::poll_read_ready] for detail on intended use.
+ fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll>;
+
+ /// Poll stream and check write readiness of Self.
+ ///
+ /// See [tokio::net::TcpStream::poll_write_ready] for detail on intended use.
+ fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll>;
+ }
+
+ impl ActixStream for TcpStream {
+ fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> {
+ let ready = self.ready(Interest::READABLE);
+ tokio::pin!(ready);
+ ready.poll(cx)
+ }
+
+ fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> {
+ let ready = self.ready(Interest::WRITABLE);
+ tokio::pin!(ready);
+ ready.poll(cx)
+ }
+ }
+
+ #[cfg(unix)]
+ impl ActixStream for UnixStream {
+ fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> {
+ let ready = self.ready(Interest::READABLE);
+ tokio::pin!(ready);
+ ready.poll(cx)
+ }
+
+ fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> {
+ let ready = self.ready(Interest::WRITABLE);
+ tokio::pin!(ready);
+ ready.poll(cx)
+ }
+ }
+
+ impl ActixStream for Box {
+ fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> {
+ (**self).poll_read_ready(cx)
+ }
+
+ fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> {
+ (**self).poll_write_ready(cx)
+ }
+ }
}
pub mod time {
diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md
index 966f2370..53b60d1c 100644
--- a/actix-server/CHANGES.md
+++ b/actix-server/CHANGES.md
@@ -6,6 +6,19 @@
[#230]: https://github.com/actix/actix-net/pull/230
+## 2.0.0-beta.5 - 2021-04-20
+* Server shutdown would notify all workers to exit regardless if shutdown is graceful.
+ This would make all worker shutdown immediately in force shutdown case. [#333]
+
+[#333]: https://github.com/actix/actix-net/pull/333
+
+
+## 2.0.0-beta.4 - 2021-04-01
+* Prevent panic when `shutdown_timeout` is very large. [f9262db]
+
+[f9262db]: https://github.com/actix/actix-net/commit/f9262db
+
+
## 2.0.0-beta.3 - 2021-02-06
* Hidden `ServerBuilder::start` method has been removed. Use `ServerBuilder::run`. [#246]
* Add retry for EINTR signal (`io::Interrupted`) in `Accept`'s poll loop. [#264]
diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml
index 9d13e456..333e7549 100755
--- a/actix-server/Cargo.toml
+++ b/actix-server/Cargo.toml
@@ -1,18 +1,15 @@
[package]
name = "actix-server"
-version = "2.0.0-beta.3"
+version = "2.0.0-beta.5"
authors = [
"Nikolay Kim ",
"fakeshadow <24548779@qq.com>",
]
description = "General purpose TCP server built for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"]
-homepage = "https://actix.rs"
-repository = "https://github.com/actix/actix-net.git"
-documentation = "https://docs.rs/actix-server"
+repository = "https://github.com/actix/actix-net"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
-exclude = [".gitignore", ".cargo/config"]
edition = "2018"
[lib]
@@ -23,20 +20,21 @@ path = "src/lib.rs"
default = []
[dependencies]
-actix-codec = "0.4.0-beta.1"
actix-rt = { version = "2.0.0", default-features = false }
-actix-service = "2.0.0-beta.4"
-actix-utils = "3.0.0-beta.2"
+actix-service = "2.0.0"
+actix-utils = "3.0.0"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13"
slab = "0.4"
-tokio = { version = "1", features = ["sync"] }
+tokio = { version = "1.2", features = ["sync"] }
[dev-dependencies]
+actix-codec = "0.4.0-beta.1"
actix-rt = "2.0.0"
+
bytes = "1"
env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
diff --git a/actix-server/examples/basic.rs b/actix-server/examples/tcp-echo.rs
similarity index 91%
rename from actix-server/examples/basic.rs
rename to actix-server/examples/tcp-echo.rs
index 45e473a9..8b038da4 100644
--- a/actix-server/examples/basic.rs
+++ b/actix-server/examples/tcp-echo.rs
@@ -9,15 +9,17 @@
//! Start typing. When you press enter the typed line will be echoed back. The server will log
//! the length of each line it echos and the total size of data sent when the connection is closed.
-use std::sync::{
- atomic::{AtomicUsize, Ordering},
- Arc,
+use std::{
+ env, io,
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ Arc,
+ },
};
-use std::{env, io};
use actix_rt::net::TcpStream;
use actix_server::Server;
-use actix_service::pipeline_factory;
+use actix_service::{fn_service, ServiceFactoryExt as _};
use bytes::BytesMut;
use futures_util::future::ok;
use log::{error, info};
@@ -25,7 +27,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[actix_rt::main]
async fn main() -> io::Result<()> {
- env::set_var("RUST_LOG", "actix=trace,basic=trace");
+ env::set_var("RUST_LOG", "info");
env_logger::init();
let count = Arc::new(AtomicUsize::new(0));
@@ -41,7 +43,7 @@ async fn main() -> io::Result<()> {
let count = Arc::clone(&count);
let num2 = Arc::clone(&count);
- pipeline_factory(move |mut stream: TcpStream| {
+ fn_service(move |mut stream: TcpStream| {
let count = Arc::clone(&count);
async move {
diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs
index c8c1da47..23ba616c 100644
--- a/actix-server/src/accept.rs
+++ b/actix-server/src/accept.rs
@@ -2,7 +2,7 @@ use std::time::Duration;
use std::{io, thread};
use actix_rt::{
- time::{sleep_until, Instant},
+ time::{sleep, Instant},
System,
};
use log::{error, info};
@@ -12,18 +12,21 @@ use slab::Slab;
use crate::server::Server;
use crate::socket::{MioListener, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
-use crate::worker::{Conn, WorkerHandle};
+use crate::worker::{Conn, WorkerHandleAccept};
use crate::Token;
struct ServerSocketInfo {
- // addr for socket. mainly used for logging.
+ /// Address of socket. Mainly used for logging.
addr: SocketAddr,
- // be ware this is the crate token for identify socket and should not be confused with
- // mio::Token
+
+ /// Beware this is the crate token for identify socket and should not be confused
+ /// with `mio::Token`.
token: Token,
+
lst: MioListener,
- // timeout is used to mark the deadline when this socket's listener should be registered again
- // after an error.
+
+ /// Timeout is used to mark the deadline when this socket's listener should be registered again
+ /// after an error.
timeout: Option,
}
@@ -63,7 +66,7 @@ impl AcceptLoop {
pub(crate) fn start(
&mut self,
socks: Vec<(Token, MioListener)>,
- handles: Vec,
+ handles: Vec,
) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap();
@@ -77,12 +80,59 @@ impl AcceptLoop {
struct Accept {
poll: Poll,
waker: WakerQueue,
- handles: Vec,
+ handles: Vec,
srv: Server,
next: usize,
+ avail: Availability,
backpressure: bool,
}
+/// Array of u128 with every bit as marker for a worker handle's availability.
+struct Availability([u128; 4]);
+
+impl Default for Availability {
+ fn default() -> Self {
+ Self([0; 4])
+ }
+}
+
+impl Availability {
+ /// Check if any worker handle is available
+ fn available(&self) -> bool {
+ self.0.iter().any(|a| *a != 0)
+ }
+
+ /// Set worker handle available state by index.
+ fn set_available(&mut self, idx: usize, avail: bool) {
+ let (offset, idx) = if idx < 128 {
+ (0, idx)
+ } else if idx < 128 * 2 {
+ (1, idx - 128)
+ } else if idx < 128 * 3 {
+ (2, idx - 128 * 2)
+ } else if idx < 128 * 4 {
+ (3, idx - 128 * 3)
+ } else {
+ panic!("Max WorkerHandle count is 512")
+ };
+
+ let off = 1 << idx as u128;
+ if avail {
+ self.0[offset] |= off;
+ } else {
+ self.0[offset] &= !off
+ }
+ }
+
+ /// Set all worker handle to available state.
+ /// This would result in a re-check on all workers' availability.
+ fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
+ handles.iter().for_each(|handle| {
+ self.set_available(handle.idx(), true);
+ })
+ }
+}
+
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
@@ -102,7 +152,7 @@ impl Accept {
waker: WakerQueue,
socks: Vec<(Token, MioListener)>,
srv: Server,
- handles: Vec,
+ handles: Vec,
) {
// Accept runs in its own thread and would want to spawn additional futures to current
// actix system.
@@ -113,6 +163,7 @@ impl Accept {
System::set_current(sys);
let (mut accept, sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv);
+
accept.poll_with(sockets);
})
.unwrap();
@@ -122,7 +173,7 @@ impl Accept {
poll: Poll,
waker: WakerQueue,
socks: Vec<(Token, MioListener)>,
- handles: Vec,
+ handles: Vec,
srv: Server,
) -> (Accept, Slab) {
let mut sockets = Slab::new();
@@ -145,12 +196,18 @@ impl Accept {
});
}
+ let mut avail = Availability::default();
+
+ // Assume all handles are avail at construct time.
+ avail.set_available_all(&handles);
+
let accept = Accept {
poll,
waker,
handles,
srv,
next: 0,
+ avail,
backpressure: false,
};
@@ -163,12 +220,8 @@ impl Accept {
loop {
if let Err(e) = self.poll.poll(&mut events, None) {
match e.kind() {
- std::io::ErrorKind::Interrupted => {
- continue;
- }
- _ => {
- panic!("Poll error: {}", e);
- }
+ std::io::ErrorKind::Interrupted => continue,
+ _ => panic!("Poll error: {}", e),
}
}
@@ -184,38 +237,28 @@ impl Accept {
let mut guard = self.waker.guard();
match guard.pop_front() {
// worker notify it becomes available. we may want to recover
- // from backpressure.
- Some(WakerInterest::WorkerAvailable) => {
+ // from backpressure.
+ Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard);
self.maybe_backpressure(&mut sockets, false);
+ self.avail.set_available(idx, true);
}
- // a new worker thread is made and it's handle would be added
- // to Accept
+ // a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
// maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false);
+ self.avail.set_available(handle.idx(), true);
self.handles.push(handle);
}
- // got timer interest and it's time to try register socket(s)
- // again.
+ // got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(&mut sockets)
}
Some(WakerInterest::Pause) => {
drop(guard);
- sockets.iter_mut().for_each(|(_, info)| {
- match self.deregister(info) {
- Ok(_) => info!(
- "Paused accepting connections on {}",
- info.addr
- ),
- Err(e) => {
- error!("Can not deregister server socket {}", e)
- }
- }
- });
+ self.deregister_all(&mut sockets);
}
Some(WakerInterest::Resume) => {
drop(guard);
@@ -226,10 +269,9 @@ impl Accept {
Some(WakerInterest::Stop) => {
return self.deregister_all(&mut sockets);
}
- // waker queue is drained.
+ // waker queue is drained
None => {
- // Reset the WakerQueue before break so it does not grow
- // infinitely.
+ // Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard);
break 'waker;
}
@@ -246,16 +288,23 @@ impl Accept {
fn process_timer(&self, sockets: &mut Slab) {
let now = Instant::now();
- sockets.iter_mut().for_each(|(token, info)| {
- // only the ServerSocketInfo have an associate timeout value was de registered.
- if let Some(inst) = info.timeout.take() {
- if now > inst {
- self.register_logged(token, info);
- } else {
+ sockets
+ .iter_mut()
+ // Only sockets that had an associated timeout were deregistered.
+ .filter(|(_, info)| info.timeout.is_some())
+ .for_each(|(token, info)| {
+ let inst = info.timeout.take().unwrap();
+
+ if now < inst {
info.timeout = Some(inst);
+ } else if !self.backpressure {
+ self.register_logged(token, info);
}
- }
- });
+
+ // Drop the timeout if server is in backpressure and socket timeout is expired.
+ // When server recovers from backpressure it will register all sockets without
+ // a timeout value so this socket register will be delayed till then.
+ });
}
#[cfg(not(target_os = "windows"))]
@@ -293,136 +342,236 @@ impl Accept {
self.poll.registry().deregister(&mut info.lst)
}
+ fn deregister_logged(&self, info: &mut ServerSocketInfo) {
+ match self.deregister(info) {
+ Ok(_) => info!("Paused accepting connections on {}", info.addr),
+ Err(e) => {
+ error!("Can not deregister server socket {}", e)
+ }
+ }
+ }
+
fn deregister_all(&self, sockets: &mut Slab) {
- sockets.iter_mut().for_each(|(_, info)| {
- info!("Accepting connections on {} has been paused", info.addr);
- let _ = self.deregister(info);
- });
+ // This is a best effort implementation with following limitation:
+ //
+ // Every ServerSocketInfo with associate timeout will be skipped and it's timeout
+ // is removed in the process.
+ //
+ // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short
+ // gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered
+ // before expected timing.
+ sockets
+ .iter_mut()
+ // Take all timeout.
+ // This is to prevent Accept::process_timer method re-register a socket afterwards.
+ .map(|(_, info)| (info.timeout.take(), info))
+ // Socket info with a timeout is already deregistered so skip them.
+ .filter(|(timeout, _)| timeout.is_none())
+ .for_each(|(_, info)| self.deregister_logged(info));
}
fn maybe_backpressure(&mut self, sockets: &mut Slab, on: bool) {
- if self.backpressure {
- if !on {
- self.backpressure = false;
- for (token, info) in sockets.iter_mut() {
- if info.timeout.is_some() {
- // socket will attempt to re-register itself when its timeout completes
- continue;
+ // Only operate when server is in a different backpressure than the given flag.
+ if self.backpressure != on {
+ self.backpressure = on;
+ sockets
+ .iter_mut()
+ // Only operate on sockets without associated timeout.
+ // Sockets with it should be handled by `accept` and `process_timer` methods.
+ // They are already deregistered or need to be reregister in the future.
+ .filter(|(_, info)| info.timeout.is_none())
+ .for_each(|(token, info)| {
+ if on {
+ self.deregister_logged(info);
+ } else {
+ self.register_logged(token, info);
}
- self.register_logged(token, info);
- }
- }
- } else if on {
- self.backpressure = true;
- self.deregister_all(sockets);
+ });
}
}
- fn accept_one(&mut self, sockets: &mut Slab, mut msg: Conn) {
+ fn accept_one(&mut self, sockets: &mut Slab, mut conn: Conn) {
if self.backpressure {
- while !self.handles.is_empty() {
- match self.handles[self.next].send(msg) {
- Ok(_) => {
- self.set_next();
- break;
- }
- Err(tmp) => {
- // worker lost contact and could be gone. a message is sent to
- // `ServerBuilder` future to notify it a new worker should be made.
- // after that remove the fault worker.
- self.srv.worker_faulted(self.handles[self.next].idx);
- msg = tmp;
- self.handles.swap_remove(self.next);
- if self.handles.is_empty() {
- error!("No workers");
- return;
- } else if self.handles.len() <= self.next {
- self.next = 0;
- }
- continue;
- }
- }
+ // send_connection would remove fault worker from handles.
+ // worst case here is conn get dropped after all handles are gone.
+ while let Err(c) = self.send_connection(sockets, conn) {
+ conn = c
}
} else {
- let mut idx = 0;
- while idx < self.handles.len() {
- idx += 1;
- if self.handles[self.next].available() {
- match self.handles[self.next].send(msg) {
- Ok(_) => {
- self.set_next();
- return;
- }
- // worker lost contact and could be gone. a message is sent to
- // `ServerBuilder` future to notify it a new worker should be made.
- // after that remove the fault worker and enter backpressure if necessary.
- Err(tmp) => {
- self.srv.worker_faulted(self.handles[self.next].idx);
- msg = tmp;
- self.handles.swap_remove(self.next);
- if self.handles.is_empty() {
- error!("No workers");
- self.maybe_backpressure(sockets, true);
- return;
- } else if self.handles.len() <= self.next {
- self.next = 0;
- }
- continue;
- }
+ while self.avail.available() {
+ let next = self.next();
+ let idx = next.idx();
+ if next.available() {
+ self.avail.set_available(idx, true);
+ match self.send_connection(sockets, conn) {
+ Ok(_) => return,
+ Err(c) => conn = c,
}
+ } else {
+ self.avail.set_available(idx, false);
+ self.set_next();
}
- self.set_next();
}
- // enable backpressure
+
+ // Sending Conn failed due to either all workers are in error or not available.
+ // Enter backpressure state and try again.
self.maybe_backpressure(sockets, true);
- self.accept_one(sockets, msg);
+ self.accept_one(sockets, conn);
}
}
- // set next worker handle that would accept work.
- fn set_next(&mut self) {
- self.next = (self.next + 1) % self.handles.len();
+ // Send connection to worker and handle error.
+ fn send_connection(
+ &mut self,
+ sockets: &mut Slab,
+ conn: Conn,
+ ) -> Result<(), Conn> {
+ match self.next().send(conn) {
+ Ok(_) => {
+ self.set_next();
+ Ok(())
+ }
+ Err(conn) => {
+ // Worker thread is error and could be gone.
+ // Remove worker handle and notify `ServerBuilder`.
+ self.remove_next();
+
+ if self.handles.is_empty() {
+ error!("No workers");
+ self.maybe_backpressure(sockets, true);
+ // All workers are gone and Conn is nowhere to be sent.
+ // Treat this situation as Ok and drop Conn.
+ return Ok(());
+ } else if self.handles.len() <= self.next {
+ self.next = 0;
+ }
+
+ Err(conn)
+ }
+ }
}
fn accept(&mut self, sockets: &mut Slab, token: usize) {
loop {
- let msg = if let Some(info) = sockets.get_mut(token) {
- match info.lst.accept() {
- Ok(Some((io, addr))) => Conn {
+ let info = sockets
+ .get_mut(token)
+ .expect("ServerSocketInfo is removed from Slab");
+
+ match info.lst.accept() {
+ Ok(io) => {
+ let msg = Conn {
io,
token: info.token,
- peer: Some(addr),
- },
- Ok(None) => return,
- Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
- Err(ref e) if connection_error(e) => continue,
- Err(e) => {
- // deregister listener temporary
- error!("Error accepting connection: {}", e);
- if let Err(err) = self.deregister(info) {
- error!("Can not deregister server socket {}", err);
- }
-
- // sleep after error. write the timeout to socket info as later the poll
- // would need it mark which socket and when it's listener should be
- // registered.
- info.timeout = Some(Instant::now() + Duration::from_millis(500));
-
- // after the sleep a Timer interest is sent to Accept Poll
- let waker = self.waker.clone();
- System::current().arbiter().spawn(async move {
- sleep_until(Instant::now() + Duration::from_millis(510)).await;
- waker.wake(WakerInterest::Timer);
- });
-
- return;
- }
+ };
+ self.accept_one(sockets, msg);
}
- } else {
- return;
- };
+ Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
+ Err(ref e) if connection_error(e) => continue,
+ Err(e) => {
+ error!("Error accepting connection: {}", e);
- self.accept_one(sockets, msg);
+ // deregister listener temporary
+ self.deregister_logged(info);
+
+ // sleep after error. write the timeout to socket info as later
+ // the poll would need it mark which socket and when it's
+ // listener should be registered
+ info.timeout = Some(Instant::now() + Duration::from_millis(500));
+
+ // after the sleep a Timer interest is sent to Accept Poll
+ let waker = self.waker.clone();
+ System::current().arbiter().spawn(async move {
+ sleep(Duration::from_millis(510)).await;
+ waker.wake(WakerInterest::Timer);
+ });
+
+ return;
+ }
+ };
}
}
+
+ fn next(&self) -> &WorkerHandleAccept {
+ &self.handles[self.next]
+ }
+
+ /// Set next worker handle that would accept connection.
+ fn set_next(&mut self) {
+ self.next = (self.next + 1) % self.handles.len();
+ }
+
+ /// Remove next worker handle that fail to accept connection.
+ fn remove_next(&mut self) {
+ let handle = self.handles.swap_remove(self.next);
+ let idx = handle.idx();
+ // A message is sent to `ServerBuilder` future to notify it a new worker
+ // should be made.
+ self.srv.worker_faulted(idx);
+ self.avail.set_available(idx, false);
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::Availability;
+
+ fn single(aval: &mut Availability, idx: usize) {
+ aval.set_available(idx, true);
+ assert!(aval.available());
+
+ aval.set_available(idx, true);
+
+ aval.set_available(idx, false);
+ assert!(!aval.available());
+
+ aval.set_available(idx, false);
+ assert!(!aval.available());
+ }
+
+ fn multi(aval: &mut Availability, mut idx: Vec) {
+ idx.iter().for_each(|idx| aval.set_available(*idx, true));
+
+ assert!(aval.available());
+
+ while let Some(idx) = idx.pop() {
+ assert!(aval.available());
+ aval.set_available(idx, false);
+ }
+
+ assert!(!aval.available());
+ }
+
+ #[test]
+ fn availability() {
+ let mut aval = Availability::default();
+
+ single(&mut aval, 1);
+ single(&mut aval, 128);
+ single(&mut aval, 256);
+ single(&mut aval, 511);
+
+ let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
+
+ multi(&mut aval, idx);
+
+ multi(&mut aval, (0..511).collect())
+ }
+
+ #[test]
+ #[should_panic]
+ fn overflow() {
+ let mut aval = Availability::default();
+ single(&mut aval, 512);
+ }
+
+ #[test]
+ fn pin_point() {
+ let mut aval = Availability::default();
+
+ aval.set_available(438, true);
+
+ aval.set_available(479, true);
+
+ assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
+ }
}
diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs
index 66f40664..a509a21b 100644
--- a/actix-server/src/builder.rs
+++ b/actix-server/src/builder.rs
@@ -1,12 +1,12 @@
-use std::future::Future;
-use std::pin::Pin;
-use std::task::{Context, Poll};
-use std::time::Duration;
-use std::{io, mem};
+use std::{
+ future::Future,
+ io, mem,
+ pin::Pin,
+ task::{Context, Poll},
+ time::Duration,
+};
-use actix_rt::net::TcpStream;
-use actix_rt::time::{sleep_until, Instant};
-use actix_rt::{self as rt, System};
+use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use log::{error, info};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::sync::oneshot;
@@ -19,7 +19,10 @@ use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue};
-use crate::worker::{self, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle};
+use crate::worker::{
+ ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept,
+ WorkerHandleServer,
+};
use crate::{join_all, Token};
use futures_core::future::LocalBoxFuture;
@@ -28,7 +31,7 @@ pub struct ServerBuilder {
threads: usize,
token: Token,
backlog: u32,
- handles: Vec<(usize, WorkerHandle)>,
+ handles: Vec<(usize, WorkerHandleServer)>,
services: Vec>,
sockets: Vec<(Token, String, MioListener)>,
accept: AcceptLoop,
@@ -120,18 +123,18 @@ impl ServerBuilder {
/// reached for each worker.
///
/// By default max connections is set to a 25k per worker.
- pub fn maxconn(self, num: usize) -> Self {
- worker::max_concurrent_connections(num);
+ pub fn maxconn(mut self, num: usize) -> Self {
+ self.worker_config.max_concurrent_connections(num);
self
}
- /// Stop actix system.
+ /// Stop Actix system.
pub fn system_exit(mut self) -> Self {
self.exit = true;
self
}
- /// Disable signal handling
+ /// Disable signal handling.
pub fn disable_signals(mut self) -> Self {
self.no_signals = true;
self
@@ -139,9 +142,8 @@ impl ServerBuilder {
/// Timeout for graceful workers shutdown in seconds.
///
- /// After receiving a stop signal, workers have this much time to finish
- /// serving requests. Workers still alive after the timeout are force
- /// dropped.
+ /// After receiving a stop signal, workers have this much time to finish serving requests.
+ /// Workers still alive after the timeout are force dropped.
///
/// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u64) -> Self {
@@ -150,11 +152,10 @@ impl ServerBuilder {
self
}
- /// Execute external configuration as part of the server building
- /// process.
+ /// Execute external configuration as part of the server building process.
///
- /// This function is useful for moving parts of configuration to a
- /// different module or even library.
+ /// This function is useful for moving parts of configuration to a different module or
+ /// even library.
pub fn configure(mut self, f: F) -> io::Result
where
F: Fn(&mut ServiceConfig) -> io::Result<()>,
@@ -271,6 +272,7 @@ impl ServerBuilder {
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
+
Ok(self)
}
@@ -284,10 +286,11 @@ impl ServerBuilder {
// start workers
let handles = (0..self.threads)
.map(|idx| {
- let handle = self.start_worker(idx, self.accept.waker_owned());
- self.handles.push((idx, handle.clone()));
+ let (handle_accept, handle_server) =
+ self.start_worker(idx, self.accept.waker_owned());
+ self.handles.push((idx, handle_server));
- handle
+ handle_accept
})
.collect();
@@ -402,41 +405,18 @@ impl ServerBuilder {
std::mem::swap(&mut self.on_stop, &mut on_stop);
// stop workers
- if !self.handles.is_empty() && graceful {
- let iter = self
- .handles
- .iter()
- .map(move |worker| worker.1.stop(graceful))
- .collect();
+ let stop = self
+ .handles
+ .iter()
+ .map(move |worker| worker.1.stop(graceful))
+ .collect();
- let fut = join_all(iter);
-
- rt::spawn(async move {
- on_stop().await;
-
- let _ = fut.await;
- if let Some(tx) = completion {
- let _ = tx.send(());
- }
- for tx in notify {
- let _ = tx.send(());
- }
- if exit {
- rt::spawn(async {
- sleep_until(Instant::now() + Duration::from_millis(300)).await;
- System::current().stop();
- });
- }
- });
- // we need to stop system if server was spawned
- } else {
- rt::spawn(async move {
- on_stop().await;
- if exit {
- sleep_until(Instant::now() + Duration::from_millis(300)).await;
- System::current().stop();
- }
- });
+ rt::spawn(async move {
+ on_stop().await;
+
+ if graceful {
+ let _ = join_all(stop).await;
+ }
if let Some(tx) = completion {
let _ = tx.send(());
@@ -444,7 +424,12 @@ impl ServerBuilder {
for tx in notify {
let _ = tx.send(());
}
- }
+
+ if exit {
+ sleep(Duration::from_millis(300)).await;
+ System::current().stop();
+ }
+ });
}
ServerCommand::WorkerFaulted(idx) => {
let mut found = false;
@@ -470,9 +455,10 @@ impl ServerBuilder {
break;
}
- let handle = self.start_worker(new_idx, self.accept.waker_owned());
- self.handles.push((new_idx, handle.clone()));
- self.accept.wake(WakerInterest::Worker(handle));
+ let (handle_accept, handle_server) =
+ self.start_worker(new_idx, self.accept.waker_owned());
+ self.handles.push((new_idx, handle_server));
+ self.accept.wake(WakerInterest::Worker(handle_accept));
}
}
}
diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs
index 20270a2f..c5e63630 100644
--- a/actix-server/src/config.rs
+++ b/actix-server/src/config.rs
@@ -7,14 +7,14 @@ use actix_service::{
fn_service, IntoServiceFactory as IntoBaseServiceFactory,
ServiceFactory as BaseServiceFactory,
};
-use actix_utils::counter::CounterGuard;
+use actix_utils::{counter::CounterGuard, future::ready};
use futures_core::future::LocalBoxFuture;
use log::error;
use crate::builder::bind_addr;
use crate::service::{BoxedServerService, InternalServiceFactory, StreamService};
use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
-use crate::{ready, Token};
+use crate::Token;
pub struct ServiceConfig {
pub(crate) services: Vec<(String, MioTcpListener)>,
@@ -243,7 +243,7 @@ impl ServiceRuntime {
type BoxedNewService = Box<
dyn BaseServiceFactory<
- (Option, MioStream),
+ (CounterGuard, MioStream),
Response = (),
Error = (),
InitError = (),
@@ -257,7 +257,7 @@ struct ServiceFactory {
inner: T,
}
-impl BaseServiceFactory<(Option, MioStream)> for ServiceFactory
+impl BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory
where
T: BaseServiceFactory,
T::Future: 'static,
diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs
index 24129b5a..af9ab0b0 100644
--- a/actix-server/src/lib.rs
+++ b/actix-server/src/lib.rs
@@ -55,24 +55,6 @@ pub fn new() -> ServerBuilder {
ServerBuilder::default()
}
-// temporary Ready type for std::future::{ready, Ready}; Can be removed when MSRV surpass 1.48
-#[doc(hidden)]
-pub struct Ready(Option);
-
-pub(crate) fn ready(t: T) -> Ready {
- Ready(Some(t))
-}
-
-impl Unpin for Ready {}
-
-impl Future for Ready {
- type Output = T;
-
- fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll {
- Poll::Ready(self.get_mut().0.take().unwrap())
- }
-}
-
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAll {
@@ -132,6 +114,8 @@ impl Future for JoinAll {
mod test {
use super::*;
+ use actix_utils::future::ready;
+
#[actix_rt::test]
async fn test_join_all() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs
index 63d2c1f5..da57af67 100644
--- a/actix-server/src/service.rs
+++ b/actix-server/src/service.rs
@@ -3,12 +3,15 @@ use std::net::SocketAddr;
use std::task::{Context, Poll};
use actix_service::{Service, ServiceFactory as BaseServiceFactory};
-use actix_utils::counter::CounterGuard;
+use actix_utils::{
+ counter::CounterGuard,
+ future::{ready, Ready},
+};
use futures_core::future::LocalBoxFuture;
use log::error;
use crate::socket::{FromStream, MioStream};
-use crate::{ready, Ready, Token};
+use crate::Token;
pub trait ServiceFactory: Send + Clone + 'static {
type Factory: BaseServiceFactory;
@@ -26,7 +29,7 @@ pub(crate) trait InternalServiceFactory: Send {
pub(crate) type BoxedServerService = Box<
dyn Service<
- (Option, MioStream),
+ (CounterGuard, MioStream),
Response = (),
Error = (),
Future = Ready>,
@@ -47,7 +50,7 @@ impl StreamService {
}
}
-impl Service<(Option, MioStream)> for StreamService
+impl Service<(CounterGuard, MioStream)> for StreamService
where
S: Service,
S::Future: 'static,
@@ -62,7 +65,7 @@ where
self.service.poll_ready(ctx).map_err(|_| ())
}
- fn call(&self, (guard, req): (Option, MioStream)) -> Self::Future {
+ fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future {
ready(match FromStream::from_mio(req) {
Ok(stream) => {
let f = self.service.call(stream);
diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs
index 416e253b..948b5f1f 100644
--- a/actix-server/src/socket.rs
+++ b/actix-server/src/socket.rs
@@ -12,18 +12,7 @@ pub(crate) use {
use std::{fmt, io};
use actix_rt::net::TcpStream;
-use mio::event::Source;
-use mio::net::TcpStream as MioTcpStream;
-use mio::{Interest, Registry, Token};
-
-#[cfg(windows)]
-use std::os::windows::io::{FromRawSocket, IntoRawSocket};
-#[cfg(unix)]
-use {
- actix_rt::net::UnixStream,
- mio::net::{SocketAddr as MioSocketAddr, UnixStream as MioUnixStream},
- std::os::unix::io::{FromRawFd, IntoRawFd},
-};
+use mio::{event::Source, Interest, Registry, Token};
pub(crate) enum MioListener {
Tcp(MioTcpListener),
@@ -40,15 +29,11 @@ impl MioListener {
}
}
- pub(crate) fn accept(&self) -> io::Result