Merge branch 'master' into fix/tls-handshake-timeout

This commit is contained in:
Rob Ede 2021-11-15 18:49:35 +00:00 committed by GitHub
commit defa599a0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 2264 additions and 1166 deletions

View File

@ -1,22 +1,26 @@
[alias]
lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo"
lint = "clippy --workspace --tests --examples --bins -- -Dclippy::todo"
lint-all = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo"
ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture"
# just check the library (without dev deps)
ci-check-min = "hack --workspace check --no-default-features"
ci-check-lib = "hack --workspace --feature-powerset --exclude-features io-uring check"
ci-check-lib = "hack --workspace --feature-powerset --exclude-features=io-uring check"
ci-check-lib-linux = "hack --workspace --feature-powerset check"
# check everything
ci-check = "hack --workspace --feature-powerset --exclude-features io-uring check --tests --examples"
ci-check = "hack --workspace --feature-powerset --exclude-features=io-uring check --tests --examples"
ci-check-linux = "hack --workspace --feature-powerset check --tests --examples"
# tests avoiding io-uring feature
ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture"
ci-test-rt = " hack --feature-powerset --exclude-features io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server = "hack --feature-powerset --exclude-features io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
ci-test-rt = " hack --feature-powerset --exclude-features=io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server = "hack --feature-powerset --exclude-features=io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
# test with io-uring feature
ci-test-rt-linux = " hack --feature-powerset test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server-linux = "hack --feature-powerset test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
# test lower msrv
ci-test-lower-msrv = "hack --workspace --exclude=actix-server --exclude=actix-tls --feature-powerset test --lib --tests --no-fail-fast -- --nocapture"

View File

@ -18,7 +18,7 @@ jobs:
- { name: Windows (MinGW), os: windows-latest, triple: x86_64-pc-windows-gnu }
- { name: Windows (32-bit), os: windows-latest, triple: i686-pc-windows-msvc }
version:
- 1.46.0 # MSRV
- 1.52.0 # MSRV for -server and -tls
- stable
- nightly
@ -64,8 +64,7 @@ jobs:
# - name: Generate Cargo.lock
# uses: actions-rs/cargo@v1
# with:
# command: generate-lockfile
# with: { command: generate-lockfile }
# - name: Cache Dependencies
# uses: Swatinem/rust-cache@v1.2.0
@ -113,30 +112,69 @@ jobs:
- name: tests
if: matrix.target.os == 'ubuntu-latest'
run: |
cargo ci-test
cargo ci-test-rt-linux
cargo ci-test-server-linux
- name: Generate coverage file
if: >
matrix.target.os == 'ubuntu-latest'
&& matrix.version == 'stable'
&& github.ref == 'refs/heads/master'
run: |
cargo install cargo-tarpaulin
cargo tarpaulin --out Xml --verbose
- name: Upload to Codecov
if: >
matrix.target.os == 'ubuntu-latest'
&& matrix.version == 'stable'
&& github.ref == 'refs/heads/master'
uses: codecov/codecov-action@v1
with: { file: cobertura.xml }
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-rt-linux && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-server-linux"
- name: Clear the cargo caches
run: |
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
cargo-cache
build_and_test_lower_msrv:
name: Linux / 1.46 (lower MSRV)
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install 1.46.0 # MSRV for all but -server and -tls
uses: actions-rs/toolchain@v1
with:
toolchain: 1.46.0-x86_64-unknown-linux-gnu
profile: minimal
override: true
- name: Install cargo-hack
uses: actions-rs/cargo@v1
with:
command: install
args: cargo-hack
- name: tests
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=1.46 cargo ci-test-lower-msrv"
- name: Clear the cargo caches
run: |
cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean
cargo-cache
coverage:
name: coverage
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Rust (nightly)
uses: actions-rs/toolchain@v1
with:
toolchain: stable-x86_64-unknown-linux-gnu
profile: minimal
override: true
- name: Generate Cargo.lock
uses: actions-rs/cargo@v1
with: { command: generate-lockfile }
- name: Cache Dependencies
uses: Swatinem/rust-cache@v1.3.0
- name: Generate coverage file
if: github.ref == 'refs/heads/master'
run: |
cargo install cargo-tarpaulin
cargo tarpaulin --out Xml --verbose
- name: Upload to Codecov
if: github.ref == 'refs/heads/master'
uses: codecov/codecov-action@v1
with: { file: cobertura.xml }
rustdoc:
name: rustdoc
@ -158,13 +196,6 @@ jobs:
- name: Cache Dependencies
uses: Swatinem/rust-cache@v1.3.0
- name: Install cargo-hack
uses: actions-rs/cargo@v1
with:
command: install
args: cargo-hack
- name: doc tests
uses: actions-rs/cargo@v1
timeout-minutes: 40
with: { command: ci-doctest }
- name: doc tests io-uring
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=nightly cargo ci-doctest"

View File

@ -3,6 +3,14 @@
## Unreleased - 2021-xx-xx
## 0.4.1 - 2021-11-05
* Added `LinesCodec.` [#338]
* `Framed::poll_ready` flushes when the buffer is full. [#409]
[#338]: https://github.com/actix/actix-net/pull/338
[#409]: https://github.com/actix/actix-net/pull/409
## 0.4.0 - 2021-04-20
* No significant changes since v0.4.0-beta.1.

View File

@ -1,7 +1,10 @@
[package]
name = "actix-codec"
version = "0.4.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
version = "0.4.1"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
description = "Codec utilities for working with framed protocols"
keywords = ["network", "framework", "async", "futures"]
repository = "https://github.com/actix/actix-net"
@ -19,6 +22,15 @@ bytes = "1"
futures-core = { version = "0.3.7", default-features = false }
futures-sink = { version = "0.3.7", default-features = false }
log = "0.4"
memchr = "2.3"
pin-project-lite = "0.2"
tokio = "1.5.1"
tokio-util = { version = "0.6", features = ["codec", "io"] }
[dev-dependencies]
criterion = { version = "0.3", features = ["html_reports"] }
tokio-test = "0.4.2"
[[bench]]
name = "lines"
harness = false

View File

@ -0,0 +1,57 @@
use bytes::BytesMut;
use criterion::{criterion_group, criterion_main, Criterion};
const INPUT: &[u8] = include_bytes!("./lorem.txt");
fn bench_lines_codec(c: &mut Criterion) {
let mut decode_group = c.benchmark_group("lines decode");
decode_group.bench_function("actix", |b| {
b.iter(|| {
use actix_codec::Decoder as _;
let mut codec = actix_codec::LinesCodec::default();
let mut buf = BytesMut::from(INPUT);
while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {}
});
});
decode_group.bench_function("tokio", |b| {
b.iter(|| {
use tokio_util::codec::Decoder as _;
let mut codec = tokio_util::codec::LinesCodec::new();
let mut buf = BytesMut::from(INPUT);
while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {}
});
});
decode_group.finish();
let mut encode_group = c.benchmark_group("lines encode");
encode_group.bench_function("actix", |b| {
b.iter(|| {
use actix_codec::Encoder as _;
let mut codec = actix_codec::LinesCodec::default();
let mut buf = BytesMut::new();
codec.encode("123", &mut buf).unwrap();
});
});
encode_group.bench_function("tokio", |b| {
b.iter(|| {
use tokio_util::codec::Encoder as _;
let mut codec = tokio_util::codec::LinesCodec::new();
let mut buf = BytesMut::new();
codec.encode("123", &mut buf).unwrap();
});
});
encode_group.finish();
}
criterion_group!(benches, bench_lines_codec);
criterion_main!(benches);

View File

@ -0,0 +1,5 @@
Lorem ipsum dolor sit amet, consectetur adipiscing elit. In tortor quam, pulvinar sit amet vestibulum eget, tincidunt non urna. Sed eu sem in felis malesuada venenatis. Suspendisse volutpat aliquet nisi, in condimentum nibh convallis id. Quisque gravida felis scelerisque ipsum aliquam consequat. Praesent libero odio, malesuada vitae odio quis, aliquam aliquet enim. In fringilla ut turpis nec pharetra. Duis eu posuere metus. Sed a aliquet massa. Mauris non tempus mi, quis mattis libero. Vivamus ornare ex at semper cursus. Vestibulum sed facilisis erat, aliquet mollis est. In interdum, magna iaculis ultricies elementum, mi ante vestibulum mauris, nec viverra turpis lorem quis ante. Proin in auctor erat. Vivamus dictum congue massa, fermentum bibendum leo pretium quis. Integer dapibus sodales ligula, sit amet imperdiet felis suscipit eu. Phasellus non ornare enim.
Nam feugiat neque sit amet hendrerit rhoncus. Nunc suscipit molestie vehicula. Aenean vulputate porttitor augue, sit amet molestie dolor volutpat vitae. Nulla vitae condimentum eros. Aliquam tristique purus at metus lacinia egestas. Cras euismod lorem eu orci lobortis, sed tincidunt nisl laoreet. Ut suscipit fermentum mi, et euismod tortor. Pellentesque vitae tempor quam, sed dignissim mi. Suspendisse luctus lacus vitae ligula blandit vehicula. Quisque interdum iaculis tincidunt. Nunc elementum mi vitae tempor placerat. Suspendisse potenti. Donec blandit laoreet ipsum, quis rhoncus velit vulputate sed.
Aliquam suscipit lectus eros, at maximus dolor efficitur quis. Integer blandit tortor orci, nec mattis nunc eleifend ac. Mauris pharetra vel quam quis lacinia. Duis lobortis condimentum nunc ut facilisis. Praesent arcu nisi, porta sit amet viverra sit amet, pellentesque ut nisi. Nunc gravida tortor eu ligula tempus, in interdum magna pretium. Fusce eu ornare sapien. Nullam pellentesque cursus eros. Nam orci massa, faucibus eget leo eget, elementum vulputate erat. Fusce vehicula augue et dui hendrerit vulputate. Mauris neque lacus, porttitor ut condimentum id, efficitur ac neque. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Donec accumsan, lectus fermentum elementum tristique, ipsum tortor mollis ante, non lacinia nibh ex quis sapien.
Donec pharetra, elit eget rutrum luctus, urna ligula facilisis lorem, sit amet rhoncus ante est eu mi. Vestibulum vestibulum ultricies interdum. Nulla tincidunt ante non hendrerit venenatis. Curabitur vestibulum turpis erat, id efficitur quam venenatis eu. Fusce nulla sem, dapibus vel quam feugiat, ornare fermentum ligula. Praesent tempus tincidunt mauris, non pellentesque felis varius in. Aenean eu arcu ligula. Morbi dapibus maximus nulla a pharetra. Fusce leo metus, luctus ut cursus non, sollicitudin non lectus. Integer pellentesque eleifend erat, vel gravida purus tempus a. Mauris id vestibulum quam. Nunc vitae ullamcorper metus, pharetra placerat enim. Fusce in ultrices nisl. Curabitur justo mauris, dignissim in aliquam sit amet, sollicitudin ut risus. Cras tempor rutrum justo, non tincidunt est maximus at.
Aliquam ac velit tincidunt, ullamcorper velit sit amet, pulvinar nisi. Nullam rhoncus rhoncus egestas. Cras ac luctus nisi. Mauris sit amet risus at magna volutpat ultrices quis ac dui. Aliquam condimentum tellus purus, vel sagittis odio vulputate at. Sed ut finibus tellus. Aliquam tincidunt vehicula diam.

View File

@ -1,11 +1,10 @@
use bytes::{Buf, Bytes, BytesMut};
use std::io;
use bytes::{Buf, Bytes, BytesMut};
use super::{Decoder, Encoder};
/// Bytes codec.
///
/// Reads/Writes chunks of bytes from a stream.
/// Bytes codec. Reads/writes chunks of bytes from a stream.
#[derive(Debug, Copy, Clone)]
pub struct BytesCodec;

View File

@ -178,7 +178,7 @@ impl<T, U> Framed<T, U> {
U: Decoder,
{
loop {
let mut this = self.as_mut().project();
let this = self.as_mut().project();
// Repeatedly call `decode` or `decode_eof` as long as it is "readable". Readable is
// defined as not having returned `None`. If the upstream has returned EOF, and the
// decoder is no longer readable, it can be assumed that the decoder will never become
@ -186,7 +186,7 @@ impl<T, U> Framed<T, U> {
if this.flags.contains(Flags::READABLE) {
if this.flags.contains(Flags::EOF) {
match this.codec.decode_eof(&mut this.read_buf) {
match this.codec.decode_eof(this.read_buf) {
Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))),
Ok(None) => return Poll::Ready(None),
Err(e) => return Poll::Ready(Some(Err(e))),
@ -195,7 +195,7 @@ impl<T, U> Framed<T, U> {
log::trace!("attempting to decode a frame");
match this.codec.decode(&mut this.read_buf) {
match this.codec.decode(this.read_buf) {
Ok(Some(frame)) => {
log::trace!("frame decoded from buffer");
return Poll::Ready(Some(Ok(frame)));
@ -300,11 +300,11 @@ where
{
type Error = U::Error;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.is_write_ready() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
self.flush(cx)
}
}

View File

@ -14,9 +14,11 @@
mod bcodec;
mod framed;
mod lines;
pub use self::bcodec::BytesCodec;
pub use self::framed::{Framed, FramedParts};
pub use self::lines::LinesCodec;
pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
pub use tokio_util::codec::{Decoder, Encoder};

158
actix-codec/src/lines.rs Normal file
View File

@ -0,0 +1,158 @@
use std::io;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use memchr::memchr;
use super::{Decoder, Encoder};
/// Lines codec. Reads/writes line delimited strings.
///
/// Will split input up by LF or CRLF delimiters. I.e. carriage return characters at the end of
/// lines are not preserved.
#[derive(Debug, Copy, Clone, Default)]
#[non_exhaustive]
pub struct LinesCodec;
impl<T: AsRef<str>> Encoder<T> for LinesCodec {
type Error = io::Error;
#[inline]
fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> {
let item = item.as_ref();
dst.reserve(item.len() + 1);
dst.put_slice(item.as_bytes());
dst.put_u8(b'\n');
Ok(())
}
}
impl Decoder for LinesCodec {
type Item = String;
type Error = io::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.is_empty() {
return Ok(None);
}
let len = match memchr(b'\n', src) {
Some(n) => n,
None => {
return Ok(None);
}
};
// split up to new line char
let mut buf = src.split_to(len);
debug_assert_eq!(len, buf.len());
// remove new line char from source
src.advance(1);
match buf.last() {
// remove carriage returns at the end of buf
Some(b'\r') => buf.truncate(len - 1),
// line is empty
None => return Ok(Some(String::new())),
_ => {}
}
try_into_utf8(buf.freeze())
}
fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.decode(src)? {
Some(frame) => Ok(Some(frame)),
None if src.is_empty() => Ok(None),
None => {
let buf = match src.last() {
// if last line ends in a CR then take everything up to it
Some(b'\r') => src.split_to(src.len() - 1),
// take all bytes from source
_ => src.split(),
};
if buf.is_empty() {
return Ok(None);
}
try_into_utf8(buf.freeze())
}
}
}
}
// Attempts to convert bytes into a `String`.
fn try_into_utf8(buf: Bytes) -> io::Result<Option<String>> {
String::from_utf8(buf.to_vec())
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
.map(Some)
}
#[cfg(test)]
mod tests {
use bytes::BufMut as _;
use super::*;
#[test]
fn lines_decoder() {
let mut codec = LinesCodec::default();
let mut buf = BytesMut::from("\nline 1\nline 2\r\nline 3\n\r\n\r");
assert_eq!("", codec.decode(&mut buf).unwrap().unwrap());
assert_eq!("line 1", codec.decode(&mut buf).unwrap().unwrap());
assert_eq!("line 2", codec.decode(&mut buf).unwrap().unwrap());
assert_eq!("line 3", codec.decode(&mut buf).unwrap().unwrap());
assert_eq!("", codec.decode(&mut buf).unwrap().unwrap());
assert!(codec.decode(&mut buf).unwrap().is_none());
assert!(codec.decode_eof(&mut buf).unwrap().is_none());
buf.put_slice(b"k");
assert!(codec.decode(&mut buf).unwrap().is_none());
assert_eq!("\rk", codec.decode_eof(&mut buf).unwrap().unwrap());
assert!(codec.decode(&mut buf).unwrap().is_none());
assert!(codec.decode_eof(&mut buf).unwrap().is_none());
}
#[test]
fn lines_encoder() {
let mut codec = LinesCodec::default();
let mut buf = BytesMut::new();
codec.encode("", &mut buf).unwrap();
assert_eq!(&buf[..], b"\n");
codec.encode("test", &mut buf).unwrap();
assert_eq!(&buf[..], b"\ntest\n");
codec.encode("a\nb", &mut buf).unwrap();
assert_eq!(&buf[..], b"\ntest\na\nb\n");
}
#[test]
fn lines_encoder_no_overflow() {
let mut codec = LinesCodec::default();
let mut buf = BytesMut::new();
codec.encode("1234567", &mut buf).unwrap();
assert_eq!(&buf[..], b"1234567\n");
let mut buf = BytesMut::new();
codec.encode("12345678", &mut buf).unwrap();
assert_eq!(&buf[..], b"12345678\n");
let mut buf = BytesMut::new();
codec.encode("123456789111213", &mut buf).unwrap();
assert_eq!(&buf[..], b"123456789111213\n");
let mut buf = BytesMut::new();
codec.encode("1234567891112131", &mut buf).unwrap();
assert_eq!(&buf[..], b"1234567891112131\n");
}
}

View File

@ -0,0 +1,221 @@
use actix_codec::*;
use bytes::Buf;
use bytes::{BufMut, BytesMut};
use futures_sink::Sink;
use std::collections::VecDeque;
use std::io::{self, Write};
use std::pin::Pin;
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
use tokio_test::{assert_ready, task};
macro_rules! bilateral {
($($x:expr,)*) => {{
let mut v = VecDeque::new();
v.extend(vec![$($x),*]);
Bilateral { calls: v }
}};
}
macro_rules! assert_ready {
($e:expr) => {{
use core::task::Poll::*;
match $e {
Ready(v) => v,
Pending => panic!("pending"),
}
}};
($e:expr, $($msg:tt),+) => {{
use core::task::Poll::*;
match $e {
Ready(v) => v,
Pending => {
let msg = format_args!($($msg),+);
panic!("pending; {}", msg)
}
}
}};
}
#[derive(Debug)]
pub struct Bilateral {
pub calls: VecDeque<io::Result<Vec<u8>>>,
}
impl Write for Bilateral {
fn write(&mut self, src: &[u8]) -> io::Result<usize> {
match self.calls.pop_front() {
Some(Ok(data)) => {
assert!(src.len() >= data.len());
assert_eq!(&data[..], &src[..data.len()]);
Ok(data.len())
}
Some(Err(e)) => Err(e),
None => panic!("unexpected write; {:?}", src),
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
impl AsyncWrite for Bilateral {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
match Pin::get_mut(self).write(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Pending,
other => Ready(other),
}
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
match Pin::get_mut(self).flush() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Pending,
other => Ready(other),
}
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
unimplemented!()
}
}
impl AsyncRead for Bilateral {
fn poll_read(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<(), std::io::Error>> {
use io::ErrorKind::WouldBlock;
match self.calls.pop_front() {
Some(Ok(data)) => {
debug_assert!(buf.remaining() >= data.len());
buf.put_slice(&data);
Ready(Ok(()))
}
Some(Err(ref e)) if e.kind() == WouldBlock => Pending,
Some(Err(e)) => Ready(Err(e)),
None => Ready(Ok(())),
}
}
}
pub struct U32;
impl Encoder<u32> for U32 {
type Error = io::Error;
fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> {
// Reserve space
dst.reserve(4);
dst.put_u32(item);
Ok(())
}
}
impl Decoder for U32 {
type Item = u32;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<u32>> {
if buf.len() < 4 {
return Ok(None);
}
let n = buf.split_to(4).get_u32();
Ok(Some(n))
}
}
#[test]
fn test_write_hits_highwater_mark() {
// see here for what this test is based on:
// https://github.com/tokio-rs/tokio/blob/75c07770bfbfea4e5fd914af819c741ed9c3fc36/tokio-util/tests/framed_write.rs#L69
const ITER: usize = 2 * 1024;
let mut bi = bilateral! {
Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")),
Ok(b"".to_vec()),
};
for i in 0..=ITER {
let mut b = BytesMut::with_capacity(4);
b.put_u32(i as u32);
// Append to the end
match bi.calls.back_mut().unwrap() {
Ok(ref mut data) => {
// Write in 2kb chunks
if data.len() < ITER {
data.extend_from_slice(&b[..]);
continue;
} // else fall through and create a new buffer
}
_ => unreachable!(),
}
// Push a new new chunk
bi.calls.push_back(Ok(b[..].to_vec()));
}
assert_eq!(bi.calls.len(), 6);
let mut framed = Framed::new(bi, U32);
// Send 8KB. This fills up FramedWrite2 buffer
let mut task = task::spawn(());
task.enter(|cx, _| {
// Send 8KB. This fills up Framed buffer
for i in 0..ITER {
{
#[allow(unused_mut)]
let mut framed = Pin::new(&mut framed);
assert!(assert_ready!(framed.poll_ready(cx)).is_ok());
}
#[allow(unused_mut)]
let mut framed = Pin::new(&mut framed);
// write the buffer
assert!(framed.start_send(i as u32).is_ok());
}
{
#[allow(unused_mut)]
let mut framed = Pin::new(&mut framed);
// Now we poll_ready which forces a flush. The bilateral pops the front message
// and decides to block.
assert!(framed.poll_ready(cx).is_pending());
}
{
#[allow(unused_mut)]
let mut framed = Pin::new(&mut framed);
// We poll again, forcing another flush, which this time succeeds
// The whole 8KB buffer is flushed
assert!(assert_ready!(framed.poll_ready(cx)).is_ok());
}
{
#[allow(unused_mut)]
let mut framed = Pin::new(&mut framed);
// Send more data. This matches the final message expected by the bilateral
assert!(framed.start_send(ITER as u32).is_ok());
}
{
#[allow(unused_mut)]
let mut framed = Pin::new(&mut framed);
// Flush the rest of the buffer
assert!(assert_ready!(framed.poll_flush(cx)).is_ok());
}
// Ensure the mock is empty
assert_eq!(0, Pin::new(&framed).get_ref().io_ref().calls.len());
});
}

View File

@ -3,6 +3,12 @@
## Unreleased - 2021-xx-xx
## 0.2.3 - 2021-10-19
* Fix test macro in presence of other imports named "test". [#399]
[#399]: https://github.com/actix/actix-net/pull/399
## 0.2.2 - 2021-10-14
* Improve error recovery potential when macro input is invalid. [#391]
* Allow custom `System`s on test macro. [#391]

View File

@ -1,9 +1,10 @@
[package]
name = "actix-macros"
version = "0.2.2"
version = "0.2.3"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Ibraheem Ahmed <ibrah1440@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
description = "Macros for Actix system and runtime"
repository = "https://github.com/actix/actix-net.git"
@ -22,4 +23,5 @@ syn = { version = "^1", features = ["full"] }
actix-rt = "2.0.0"
futures-util = { version = "0.3.7", default-features = false }
rustversion = "1"
trybuild = "1"

View File

@ -139,9 +139,9 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
sig.asyncness = None;
let missing_test_attr = if has_test_attr {
quote!()
quote! {}
} else {
quote!(#[test])
quote! { #[::core::prelude::v1::test] }
};
let mut system = syn::parse_str::<syn::Path>("::actix_rt::System").unwrap();
@ -198,5 +198,5 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
fn input_and_compile_error(mut item: TokenStream, err: syn::Error) -> TokenStream {
let compile_err = TokenStream::from(err.to_compile_error());
item.extend(compile_err);
return item;
item
}

View File

@ -1,3 +1,4 @@
#[rustversion::stable(1.46)] // MSRV
#[test]
fn compile_macros() {
let t = trybuild::TestCases::new();

View File

@ -1,6 +1,16 @@
# Changes
## Unreleased - 2021-xx-xx
* Add `System::run_with_code` to allow retrieving the exit code on stop. [#411]
[#411]: https://github.com/actix/actix-net/pull/411
## 2.4.0 - 2021-11-05
* Add `Arbiter::try_current` for situations where thread may or may not have Arbiter context. [#408]
* Start io-uring with `System::new` when feature is enabled. [#395]
[#395]: https://github.com/actix/actix-net/pull/395
[#408]: https://github.com/actix/actix-net/pull/408
## 2.3.0 - 2021-10-11
@ -100,7 +110,7 @@
[#129]: https://github.com/actix/actix-net/issues/129
## 1.1.0 - 2020-04-08 (YANKED)
## 1.1.0 - 2020-04-08 _(YANKED)_
* Expose `System::is_set` to check if current system has ben started [#99]
* Add `Arbiter::is_running` to check if event loop is running [#124]
* Add `Arbiter::local_join` associated function

View File

@ -1,6 +1,6 @@
[package]
name = "actix-rt"
version = "2.3.0"
version = "2.4.0"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
@ -23,7 +23,7 @@ macros = ["actix-macros"]
io-uring = ["tokio-uring"]
[dependencies]
actix-macros = { version = "0.2.0", optional = true }
actix-macros = { version = "0.2.3", optional = true }
futures-core = { version = "0.3", default-features = false }
tokio = { version = "1.5.1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }

View File

@ -3,11 +3,11 @@
> Tokio-based single-threaded async runtime for the Actix ecosystem.
[![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt)
[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.3.0)](https://docs.rs/actix-rt/2.3.0)
[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.4.0)](https://docs.rs/actix-rt/2.4.0)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg)
<br />
[![dependency status](https://deps.rs/crate/actix-rt/2.3.0/status.svg)](https://deps.rs/crate/actix-rt/2.3.0)
[![dependency status](https://deps.rs/crate/actix-rt/2.4.0/status.svg)](https://deps.rs/crate/actix-rt/2.4.0)
![Download](https://img.shields.io/crates/d/actix-rt.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb)

View File

@ -240,6 +240,15 @@ impl Arbiter {
})
}
/// Try to get current running arbiter handle.
///
/// Returns `None` if no Arbiter has been started.
///
/// Unlike [`current`](Self::current), this never panics.
pub fn try_current() -> Option<ArbiterHandle> {
HANDLE.with(|cell| cell.borrow().clone())
}
/// Stop Arbiter from continuing it's event loop.
///
/// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.

View File

@ -15,7 +15,7 @@
//! blocking task thread-pool using [`task::spawn_blocking`].
//!
//! # Examples
//! ```
//! ```no_run
//! use std::sync::mpsc;
//! use actix_rt::{Arbiter, System};
//!

View File

@ -11,7 +11,7 @@ use std::{
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};
use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
use crate::{arbiter::ArbiterHandle, Arbiter};
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
@ -29,6 +29,7 @@ pub struct System {
arbiter_handle: ArbiterHandle,
}
#[cfg(not(feature = "io-uring"))]
impl System {
/// Create a new system.
///
@ -37,7 +38,7 @@ impl System {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
Self::with_tokio_rt(|| {
default_tokio_runtime()
crate::runtime::default_tokio_runtime()
.expect("Default Actix (Tokio) runtime could not be created.")
})
}
@ -53,7 +54,7 @@ impl System {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let rt = Runtime::from(runtime_factory());
let rt = crate::runtime::Runtime::from(runtime_factory());
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
let system = System::construct(sys_tx, sys_arbiter.clone());
@ -72,7 +73,32 @@ impl System {
system,
}
}
}
#[cfg(feature = "io-uring")]
impl System {
/// Create a new system.
///
/// # Panics
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
SystemRunner
}
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
where
F: Fn() -> tokio::runtime::Runtime,
{
unimplemented!("System::with_tokio_rt is not implemented yet")
}
}
impl System {
/// Constructs new system and registers it on the current thread.
pub(crate) fn construct(
sys_tx: mpsc::UnboundedSender<SystemCommand>,
@ -104,7 +130,7 @@ impl System {
///
/// Returns `None` if no System has been started.
///
/// Contrary to `current`, this never panics.
/// Unlike [`current`](Self::current), this never panics.
pub fn try_current() -> Option<System> {
CURRENT.with(|cell| cell.borrow().clone())
}
@ -150,34 +176,38 @@ impl System {
}
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[cfg(not(feature = "io-uring"))]
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner {
rt: Runtime,
rt: crate::runtime::Runtime,
stop_rx: oneshot::Receiver<i32>,
#[allow(dead_code)]
system: System,
}
#[cfg(not(feature = "io-uring"))]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
let exit_code = self.run_with_code()?;
match exit_code {
0 => Ok(()),
nonzero => Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", nonzero),
)),
}
}
/// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
pub fn run_with_code(self) -> io::Result<i32> {
let SystemRunner { rt, stop_rx, .. } = self;
// run loop
match rt.block_on(stop_rx) {
Ok(code) => {
if code != 0 {
Err(io::Error::new(
io::ErrorKind::Other,
format!("Non-zero exit code: {}", code),
))
} else {
Ok(())
}
}
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
}
rt.block_on(stop_rx)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
}
/// Runs the provided future, blocking the current thread until the future completes.
@ -187,6 +217,52 @@ impl SystemRunner {
}
}
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[cfg(feature = "io-uring")]
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner;
#[cfg(feature = "io-uring")]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
unimplemented!("SystemRunner::run is not implemented for io-uring feature yet");
}
/// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
pub fn run_with_code(self) -> io::Result<i32> {
unimplemented!(
"SystemRunner::run_with_code is not implemented for io-uring feature yet"
);
}
/// Runs the provided future, blocking the current thread until the future completes.
#[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
tokio_uring::start(async move {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
let sys_arbiter = Arbiter::in_new_system();
let system = System::construct(sys_tx, sys_arbiter.clone());
system
.tx()
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
.unwrap();
// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
tokio_uring::spawn(sys_ctrl);
let res = fut.await;
drop(stop_rx);
res
})
}
}
#[derive(Debug)]
pub(crate) enum SystemCommand {
Exit(i32),

View File

@ -0,0 +1,17 @@
//! Checks that test macro does not cause problems in the presence of imports named "test" that
//! could be either a module with test items or the "test with runtime" macro itself.
//!
//! Before actix/actix-net#399 was implemented, this macro was running twice. The first run output
//! `#[test]` and it got run again and since it was in scope.
//!
//! Prevented by using the fully-qualified test marker (`#[::core::prelude::v1::test]`).
#![cfg(feature = "macros")]
use actix_rt::time as test;
#[actix_rt::test]
async fn test_naming_conflict() {
use test as time;
time::sleep(std::time::Duration::from_millis(2)).await;
}

View File

@ -1,12 +1,15 @@
use std::{
future::Future,
sync::mpsc::channel,
thread,
time::{Duration, Instant},
};
use actix_rt::{task::JoinError, Arbiter, System};
use tokio::sync::oneshot;
#[cfg(not(feature = "io-uring"))]
use {
std::{sync::mpsc::channel, thread},
tokio::sync::oneshot,
};
#[test]
fn await_for_timer() {
@ -21,6 +24,15 @@ fn await_for_timer() {
);
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn run_with_code() {
let sys = System::new();
System::current().stop_with_code(42);
let exit_code = sys.run_with_code().expect("system stop should not error");
assert_eq!(exit_code, 42);
}
#[test]
fn join_another_arbiter() {
let time = Duration::from_secs(1);
@ -96,13 +108,17 @@ fn wait_for_spawns() {
let handle = rt.spawn(async {
println!("running on the runtime");
// assertion panic is caught at task boundary
assert_eq!(1, 2);
// panic is caught at task boundary
panic!("intentional test panic");
});
assert!(rt.block_on(handle).is_err());
}
// Temporary disabled tests for io-uring feature.
// They should be enabled when possible.
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_spawn_fn_runs() {
let _ = System::new();
@ -119,6 +135,7 @@ fn arbiter_spawn_fn_runs() {
arbiter.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_handle_spawn_fn_runs() {
let sys = System::new();
@ -141,6 +158,7 @@ fn arbiter_handle_spawn_fn_runs() {
sys.run().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_drop_no_panic_fn() {
let _ = System::new();
@ -152,6 +170,7 @@ fn arbiter_drop_no_panic_fn() {
arbiter.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_drop_no_panic_fut() {
let _ = System::new();
@ -163,18 +182,7 @@ fn arbiter_drop_no_panic_fut() {
arbiter.join().unwrap();
}
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn system_arbiter_spawn() {
let runner = System::new();
@ -205,6 +213,7 @@ fn system_arbiter_spawn() {
thread.join().unwrap();
}
#[cfg(not(feature = "io-uring"))]
#[test]
fn system_stop_stops_arbiters() {
let sys = System::new();
@ -293,6 +302,18 @@ fn new_arbiter_with_tokio() {
assert!(!counter.load(Ordering::SeqCst));
}
#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}
#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}
#[test]
fn try_current_no_system() {
assert!(System::try_current().is_none())
@ -330,28 +351,27 @@ fn spawn_local() {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[test]
fn tokio_uring_arbiter() {
let system = System::new();
let (tx, rx) = std::sync::mpsc::channel();
System::new().block_on(async {
let (tx, rx) = std::sync::mpsc::channel();
Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";
Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";
let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());
let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());
f.sync_all().await.unwrap();
f.close().await.unwrap();
f.sync_all().await.unwrap();
f.close().await.unwrap();
std::fs::remove_file("test.txt").unwrap();
std::fs::remove_file("test.txt").unwrap();
});
handle.await.unwrap();
tx.send(true).unwrap();
});
handle.await.unwrap();
tx.send(true).unwrap();
});
assert!(rx.recv().unwrap());
drop(system);
assert!(rx.recv().unwrap());
})
}

View File

@ -3,6 +3,31 @@
## Unreleased - 2021-xx-xx
## 2.0.0-beta.9 - 2021-11-15
* Restore `Arbiter` support lost in `beta.8`. [#417]
[#417]: https://github.com/actix/actix-net/pull/417
## 2.0.0-beta.8 - 2021-11-05 _(YANKED)_
* Fix non-unix signal handler. [#410]
[#410]: https://github.com/actix/actix-net/pull/410
## 2.0.0-beta.7 - 2021-11-05 _(YANKED)_
* Server can be started in regular Tokio runtime. [#408]
* Expose new `Server` type whose `Future` impl resolves when server stops. [#408]
* Rename `Server` to `ServerHandle`. [#407]
* Add `Server::handle` to obtain handle to server. [#408]
* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#407]
* Deprecate crate-level `new` shortcut for server builder. [#408]
* Minimum supported Rust version (MSRV) is now 1.52.
[#407]: https://github.com/actix/actix-net/pull/407
[#408]: https://github.com/actix/actix-net/pull/408
## 2.0.0-beta.6 - 2021-10-11
* Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374]
* Server no long listens to `SIGHUP` signal. Previously, the received was not used but did block

View File

@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "2.0.0-beta.6"
version = "2.0.0-beta.9"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"fakeshadow <24548779@qq.com>",
@ -18,24 +18,29 @@ path = "src/lib.rs"
[features]
default = []
io-uring = ["actix-rt/io-uring"]
io-uring = ["tokio-uring", "actix-rt/io-uring"]
[dependencies]
actix-rt = { version = "2.0.0", default-features = false }
actix-rt = { version = "2.4.0", default-features = false }
actix-service = "2.0.0"
actix-utils = "3.0.0"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] }
mio = { version = "0.8", features = ["os-poll", "net"] }
num_cpus = "1.13"
socket2 = "0.4.2"
tokio = { version = "1.5.1", features = ["sync"] }
# runtime for io-uring feature
tokio-uring = { version = "0.1", optional = true }
[dev-dependencies]
actix-codec = "0.4.0-beta.1"
actix-codec = "0.4.0"
actix-rt = "2.0.0"
bytes = "1"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1.5.1", features = ["io-util"] }
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }

View File

@ -10,7 +10,7 @@
//! the length of each line it echos and the total size of data sent when the connection is closed.
use std::{
env, io,
io,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
@ -23,12 +23,10 @@ use actix_service::{fn_service, ServiceFactoryExt as _};
use bytes::BytesMut;
use futures_util::future::ok;
use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
#[actix_rt::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "info");
env_logger::init();
async fn run() -> io::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let count = Arc::new(AtomicUsize::new(0));
@ -88,3 +86,16 @@ async fn main() -> io::Result<()> {
.run()
.await
}
#[tokio::main]
async fn main() -> io::Result<()> {
run().await?;
Ok(())
}
// alternatively:
// #[actix_rt::main]
// async fn main() -> io::Result<()> {
// run().await?;
// Ok(())
// }

View File

@ -1,17 +1,18 @@
use std::time::Duration;
use std::{io, thread};
use std::{io, thread, time::Duration};
use actix_rt::{
time::{sleep, Instant},
System,
};
use log::{error, info};
use actix_rt::time::Instant;
use log::{debug, error, info};
use mio::{Interest, Poll, Token as MioToken};
use crate::server::Server;
use crate::socket::MioListener;
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept};
use crate::{
availability::Availability,
socket::MioListener,
waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN},
worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer},
ServerBuilder, ServerHandle,
};
const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510);
struct ServerSocketInfo {
token: usize,
@ -20,208 +21,113 @@ struct ServerSocketInfo {
/// Timeout is used to mark the deadline when this socket's listener should be registered again
/// after an error.
timeout: Option<Instant>,
}
/// Accept loop would live with `ServerBuilder`.
///
/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to
/// `Accept` and `Worker`.
///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
pub(crate) struct AcceptLoop {
srv: Option<Server>,
poll: Option<Poll>,
waker: WakerQueue,
}
impl AcceptLoop {
pub fn new(srv: Server) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
Self {
srv: Some(srv),
poll: Some(poll),
waker,
}
}
pub(crate) fn waker_owned(&self) -> WakerQueue {
self.waker.clone()
}
pub fn wake(&self, i: WakerInterest) {
self.waker.wake(i);
}
pub(crate) fn start(
&mut self,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap();
let waker = self.waker.clone();
Accept::start(poll, waker, socks, srv, handles);
}
timeout: Option<actix_rt::time::Instant>,
}
/// poll instance of the server.
struct Accept {
pub(crate) struct Accept {
poll: Poll,
waker: WakerQueue,
waker_queue: WakerQueue,
handles: Vec<WorkerHandleAccept>,
srv: Server,
srv: ServerHandle,
next: usize,
avail: Availability,
/// use the smallest duration from sockets timeout.
timeout: Option<Duration>,
paused: bool,
}
/// Array of u128 with every bit as marker for a worker handle's availability.
struct Availability([u128; 4]);
impl Default for Availability {
fn default() -> Self {
Self([0; 4])
}
}
impl Availability {
/// Check if any worker handle is available
#[inline(always)]
fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
}
/// Check if worker handle is available by index
#[inline(always)]
fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index.
fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = Self::offset(idx);
let off = 1 << idx as u128;
if avail {
self.0[offset] |= off;
} else {
self.0[offset] &= !off
}
}
/// Set all worker handle to available state.
/// This would result in a re-check on all workers' availability.
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| {
self.set_available(handle.idx(), true);
})
}
/// Get offset and adjusted index of given worker handle index.
fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
}
}
}
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
}
impl Accept {
pub(crate) fn start(
poll: Poll,
waker: WakerQueue,
socks: Vec<(usize, MioListener)>,
srv: Server,
handles: Vec<WorkerHandleAccept>,
) {
// Accept runs in its own thread and would want to spawn additional futures to current
// actix system.
let sys = System::current();
thread::Builder::new()
.name("actix-server accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
let (mut accept, mut sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv);
sockets: Vec<(usize, MioListener)>,
builder: &ServerBuilder,
) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>)> {
let handle_server = ServerHandle::new(builder.cmd_tx.clone());
accept.poll_with(&mut sockets);
// construct poll instance and its waker
let poll = Poll::new()?;
let waker_queue = WakerQueue::new(poll.registry())?;
// start workers and collect handles
let (handles_accept, handles_server) = (0..builder.threads)
.map(|idx| {
// clone service factories
let factories = builder
.factories
.iter()
.map(|f| f.clone_factory())
.collect::<Vec<_>>();
// start worker using service factories
ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config)
})
.unwrap();
.collect::<io::Result<Vec<_>>>()?
.into_iter()
.unzip();
let (mut accept, mut sockets) = Accept::new_with_sockets(
poll,
waker_queue.clone(),
sockets,
handles_accept,
handle_server,
)?;
thread::Builder::new()
.name("actix-server acceptor".to_owned())
.spawn(move || accept.poll_with(&mut sockets))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
Ok((waker_queue, handles_server))
}
fn new_with_sockets(
poll: Poll,
waker: WakerQueue,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
srv: Server,
) -> (Accept, Vec<ServerSocketInfo>) {
let sockets = socks
waker_queue: WakerQueue,
sockets: Vec<(usize, MioListener)>,
accept_handles: Vec<WorkerHandleAccept>,
server_handle: ServerHandle,
) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> {
let sockets = sockets
.into_iter()
.map(|(token, mut lst)| {
// Start listening for incoming connections
poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
.register(&mut lst, MioToken(token), Interest::READABLE)?;
ServerSocketInfo {
Ok(ServerSocketInfo {
token,
lst,
timeout: None,
}
})
})
.collect();
.collect::<io::Result<_>>()?;
let mut avail = Availability::default();
// Assume all handles are avail at construct time.
avail.set_available_all(&handles);
avail.set_available_all(&accept_handles);
let accept = Accept {
poll,
waker,
handles,
srv,
waker_queue,
handles: accept_handles,
srv: server_handle,
next: 0,
avail,
timeout: None,
paused: false,
};
(accept, sockets)
Ok((accept, sockets))
}
/// blocking wait for readiness events triggered by mio
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
let mut events = mio::Events::with_capacity(128);
let mut events = mio::Events::with_capacity(256);
loop {
if let Err(e) = self.poll.poll(&mut events, None) {
if let Err(e) = self.poll.poll(&mut events, self.timeout) {
match e.kind() {
io::ErrorKind::Interrupted => {}
_ => panic!("Poll error: {}", e),
@ -234,7 +140,7 @@ impl Accept {
WAKER_TOKEN => {
let exit = self.handle_waker(sockets);
if exit {
info!("Accept is stopped.");
info!("Accept thread stopped");
return;
}
}
@ -244,6 +150,9 @@ impl Accept {
}
}
}
// check for timeout and re-register sockets
self.process_timeout(sockets);
}
}
@ -254,7 +163,7 @@ impl Accept {
loop {
// take guard with every iteration so no new interest can be added
// until the current task is done.
let mut guard = self.waker.guard();
let mut guard = self.waker_queue.guard();
match guard.pop_front() {
// worker notify it becomes available.
Some(WakerInterest::WorkerAvailable(idx)) => {
@ -266,6 +175,7 @@ impl Accept {
self.accept_all(sockets);
}
}
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
@ -277,12 +187,7 @@ impl Accept {
self.accept_all(sockets);
}
}
// got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(sockets)
}
Some(WakerInterest::Pause) => {
drop(guard);
@ -292,6 +197,7 @@ impl Accept {
self.deregister_all(sockets);
}
}
Some(WakerInterest::Resume) => {
drop(guard);
@ -305,6 +211,7 @@ impl Accept {
self.accept_all(sockets);
}
}
Some(WakerInterest::Stop) => {
if !self.paused {
self.deregister_all(sockets);
@ -312,6 +219,7 @@ impl Accept {
return true;
}
// waker queue is drained
None => {
// Reset the WakerQueue before break so it does not grow infinitely
@ -323,25 +231,44 @@ impl Accept {
}
}
fn process_timer(&self, sockets: &mut [ServerSocketInfo]) {
let now = Instant::now();
sockets
.iter_mut()
// Only sockets that had an associated timeout were deregistered.
.filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) {
// always remove old timeouts
if self.timeout.take().is_some() {
let now = Instant::now();
if now < inst {
info.timeout = Some(inst);
} else if !self.paused {
self.register_logged(info);
sockets
.iter_mut()
// Only sockets that had an associated timeout were deregistered.
.filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
if now < inst {
// still timed out; try to set new timeout
info.timeout = Some(inst);
self.set_timeout(inst - now);
} else if !self.paused {
// timeout expired; register socket again
self.register_logged(info);
}
// Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
}
}
/// Update accept timeout with `duration` if it is shorter than current timeout.
fn set_timeout(&mut self, duration: Duration) {
match self.timeout {
Some(ref mut timeout) => {
if *timeout > duration {
*timeout = duration;
}
// Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
}
None => self.timeout = Some(duration),
}
}
#[cfg(not(target_os = "windows"))]
@ -370,14 +297,14 @@ impl Accept {
fn register_logged(&self, info: &mut ServerSocketInfo) {
match self.register(info) {
Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
Ok(_) => debug!("Resume accepting connections on {}", info.lst.local_addr()),
Err(e) => error!("Can not register server socket {}", e),
}
}
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.poll.registry().deregister(&mut info.lst) {
Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()),
Ok(_) => debug!("Paused accepting connections on {}", info.lst.local_addr()),
Err(e) => {
error!("Can not deregister server socket {}", e)
}
@ -387,12 +314,12 @@ impl Accept {
fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
// This is a best effort implementation with following limitation:
//
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout
// is removed in the process.
// Every ServerSocketInfo with associated timeout will be skipped and it's timeout is
// removed in the process.
//
// Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short
// gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered
// before expected timing.
// Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short gap
// (less than 500ms) would cause all timing out ServerSocketInfos be re-registered before
// expected timing.
sockets
.iter_mut()
// Take all timeout.
@ -481,13 +408,7 @@ impl Accept {
// the poll would need it mark which socket and when it's
// listener should be registered
info.timeout = Some(Instant::now() + Duration::from_millis(500));
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
System::current().arbiter().spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
self.set_timeout(TIMEOUT_DURATION_ON_ERROR);
return;
}
@ -526,67 +447,14 @@ impl Accept {
}
}
#[cfg(test)]
mod test {
use super::Availability;
fn single(aval: &mut Availability, idx: usize) {
aval.set_available(idx, true);
assert!(aval.available());
aval.set_available(idx, true);
aval.set_available(idx, false);
assert!(!aval.available());
aval.set_available(idx, false);
assert!(!aval.available());
}
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
idx.iter().for_each(|idx| aval.set_available(*idx, true));
assert!(aval.available());
while let Some(idx) = idx.pop() {
assert!(aval.available());
aval.set_available(idx, false);
}
assert!(!aval.available());
}
#[test]
fn availability() {
let mut aval = Availability::default();
single(&mut aval, 1);
single(&mut aval, 128);
single(&mut aval, 256);
single(&mut aval, 511);
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
multi(&mut aval, idx);
multi(&mut aval, (0..511).collect())
}
#[test]
#[should_panic]
fn overflow() {
let mut aval = Availability::default();
single(&mut aval, 512);
}
#[test]
fn pin_point() {
let mut aval = Availability::default();
aval.set_available(438, true);
aval.set_available(479, true);
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
}
/// This function defines errors that are per-connection; if we get this error from the `accept()`
/// system call it means the next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` call is attempted. The timeout is
/// useful to handle resource exhaustion errors like `ENFILE` and `EMFILE`. Otherwise, it could
/// enter into a temporary spin loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
}

View File

@ -0,0 +1,121 @@
use crate::worker::WorkerHandleAccept;
/// Array of u128 with every bit as marker for a worker handle's availability.
#[derive(Debug, Default)]
pub(crate) struct Availability([u128; 4]);
impl Availability {
/// Check if any worker handle is available
#[inline(always)]
pub(crate) fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
}
/// Check if worker handle is available by index
#[inline(always)]
pub(crate) fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index.
pub(crate) fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = Self::offset(idx);
let off = 1 << idx as u128;
if avail {
self.0[offset] |= off;
} else {
self.0[offset] &= !off
}
}
/// Set all worker handle to available state.
/// This would result in a re-check on all workers' availability.
pub(crate) fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| {
self.set_available(handle.idx(), true);
})
}
/// Get offset and adjusted index of given worker handle index.
pub(crate) fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn single(aval: &mut Availability, idx: usize) {
aval.set_available(idx, true);
assert!(aval.available());
aval.set_available(idx, true);
aval.set_available(idx, false);
assert!(!aval.available());
aval.set_available(idx, false);
assert!(!aval.available());
}
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
idx.iter().for_each(|idx| aval.set_available(*idx, true));
assert!(aval.available());
while let Some(idx) = idx.pop() {
assert!(aval.available());
aval.set_available(idx, false);
}
assert!(!aval.available());
}
#[test]
fn availability() {
let mut aval = Availability::default();
single(&mut aval, 1);
single(&mut aval, 128);
single(&mut aval, 256);
single(&mut aval, 511);
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
multi(&mut aval, idx);
multi(&mut aval, (0..511).collect())
}
#[test]
#[should_panic]
fn overflow() {
let mut aval = Availability::default();
single(&mut aval, 512);
}
#[test]
fn pin_point() {
let mut aval = Availability::default();
aval.set_available(438, true);
aval.set_available(479, true);
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
}
}

View File

@ -1,43 +1,32 @@
use std::{
future::Future,
io, mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
use std::{io, time::Duration};
use actix_rt::net::TcpStream;
use log::{info, trace};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::{
server::ServerCommand,
service::{InternalServiceFactory, ServiceFactory, StreamNewService},
socket::{
create_mio_tcp_listener, MioListener, MioTcpListener, StdSocketAddr, StdTcpListener,
ToSocketAddrs,
},
worker::ServerWorkerConfig,
Server,
};
use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use log::{error, info};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
oneshot,
};
use crate::accept::AcceptLoop;
use crate::join_all;
use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
/// Server builder
/// [Server] builder.
pub struct ServerBuilder {
threads: usize,
token: usize,
backlog: u32,
handles: Vec<(usize, WorkerHandleServer)>,
services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(usize, String, MioListener)>,
accept: AcceptLoop,
exit: bool,
no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>,
server: Server,
notify: Vec<oneshot::Sender<()>>,
worker_config: ServerWorkerConfig,
pub(crate) threads: usize,
pub(crate) token: usize,
pub(crate) backlog: u32,
pub(crate) factories: Vec<Box<dyn InternalServiceFactory>>,
pub(crate) sockets: Vec<(usize, String, MioListener)>,
pub(crate) exit: bool,
pub(crate) listen_os_signals: bool,
pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>,
pub(crate) worker_config: ServerWorkerConfig,
}
impl Default for ServerBuilder {
@ -49,30 +38,26 @@ impl Default for ServerBuilder {
impl ServerBuilder {
/// Create new Server builder instance
pub fn new() -> ServerBuilder {
let (tx, rx) = unbounded_channel();
let server = Server::new(tx);
let (cmd_tx, cmd_rx) = unbounded_channel();
ServerBuilder {
threads: num_cpus::get(),
token: 0,
handles: Vec::new(),
services: Vec::new(),
factories: Vec::new(),
sockets: Vec::new(),
accept: AcceptLoop::new(server.clone()),
backlog: 2048,
exit: false,
no_signals: false,
cmd: rx,
notify: Vec::new(),
server,
listen_os_signals: true,
cmd_tx,
cmd_rx,
worker_config: ServerWorkerConfig::default(),
}
}
/// Set number of workers to start.
///
/// By default server uses number of available logical cpu as workers
/// count. Workers must be greater than 0.
/// By default server uses number of available logical CPU as workers count. Workers must be
/// greater than 0.
pub fn workers(mut self, num: usize) -> Self {
assert_ne!(num, 0, "workers must be greater than 0");
self.threads = num;
@ -99,10 +84,9 @@ impl ServerBuilder {
/// Set the maximum number of pending connections.
///
/// This refers to the number of clients that can be waiting to be served.
/// Exceeding this number results in the client getting an error when
/// attempting to connect. It should only affect servers under significant
/// load.
/// This refers to the number of clients that can be waiting to be served. Exceeding this number
/// results in the client getting an error when attempting to connect. It should only affect
/// servers under significant load.
///
/// Generally set in the 64-2048 range. Default value is 2048.
///
@ -114,24 +98,30 @@ impl ServerBuilder {
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
/// All socket listeners will stop accepting connections when this limit is reached for
/// each worker.
///
/// By default max connections is set to a 25k per worker.
pub fn maxconn(mut self, num: usize) -> Self {
pub fn max_concurrent_connections(mut self, num: usize) -> Self {
self.worker_config.max_concurrent_connections(num);
self
}
/// Stop Actix system.
#[doc(hidden)]
#[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")]
pub fn maxconn(self, num: usize) -> Self {
self.max_concurrent_connections(num)
}
/// Stop Actix `System` after server shutdown.
pub fn system_exit(mut self) -> Self {
self.exit = true;
self
}
/// Disable signal handling.
/// Disable OS signal handling.
pub fn disable_signals(mut self) -> Self {
self.no_signals = true;
self.listen_os_signals = false;
self
}
@ -155,9 +145,11 @@ impl ServerBuilder {
{
let sockets = bind_addr(addr, self.backlog)?;
trace!("binding server to: {:?}", &sockets);
for lst in sockets {
let token = self.next_token();
self.services.push(StreamNewService::create(
self.factories.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory.clone(),
@ -166,11 +158,57 @@ impl ServerBuilder {
self.sockets
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
}
Ok(self)
}
/// Add new service to the server.
pub fn listen<F, N: AsRef<str>>(
mut self,
name: N,
lst: StdTcpListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
{
lst.set_nonblocking(true)?;
let addr = lst.local_addr()?;
let token = self.next_token();
self.factories.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
/// Starts processing incoming connections and return server controller.
pub fn run(self) -> Server {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
info!("Starting {} workers", self.threads);
Server::new(self)
}
}
fn next_token(&mut self) -> usize {
let token = self.token;
self.token += 1;
token
}
}
#[cfg(unix)]
impl ServerBuilder {
/// Add new unix domain service to the server.
#[cfg(unix)]
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream>,
@ -191,9 +229,8 @@ impl ServerBuilder {
}
/// Add new unix domain service to the server.
/// Useful when running as a systemd service and
/// a socket FD can be acquired using the systemd crate.
#[cfg(unix)]
///
/// Useful when running as a systemd service and a socket FD is acquired externally.
pub fn listen_uds<F, N: AsRef<str>>(
mut self,
name: N,
@ -207,7 +244,7 @@ impl ServerBuilder {
lst.set_nonblocking(true)?;
let token = self.next_token();
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create(
self.factories.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
@ -217,219 +254,6 @@ impl ServerBuilder {
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
/// Add new service to the server.
pub fn listen<F, N: AsRef<str>>(
mut self,
name: N,
lst: StdTcpListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
{
lst.set_nonblocking(true)?;
let addr = lst.local_addr()?;
let token = self.next_token();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
/// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> Server {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
info!("Starting {} workers", self.threads);
// start workers
let handles = (0..self.threads)
.map(|idx| {
let (handle_accept, handle_server) =
self.start_worker(idx, self.accept.waker_owned());
self.handles.push((idx, handle_server));
handle_accept
})
.collect();
// start accept thread
for sock in &self.sockets {
info!("Starting \"{}\" service on {}", sock.1, sock.2);
}
self.accept.start(
mem::take(&mut self.sockets)
.into_iter()
.map(|t| (t.0, t.2))
.collect(),
handles,
);
// handle signals
if !self.no_signals {
Signals::start(self.server.clone());
}
// start http server actor
let server = self.server.clone();
rt::spawn(self);
server
}
}
fn start_worker(
&self,
idx: usize,
waker_queue: WakerQueue,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let services = self.services.iter().map(|v| v.clone_factory()).collect();
ServerWorker::start(idx, services, waker_queue, self.worker_config)
}
fn handle_cmd(&mut self, item: ServerCommand) {
match item {
ServerCommand::Pause(tx) => {
self.accept.wake(WakerInterest::Pause);
let _ = tx.send(());
}
ServerCommand::Resume(tx) => {
self.accept.wake(WakerInterest::Resume);
let _ = tx.send(());
}
ServerCommand::Signal(sig) => {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match sig {
Signal::Int => {
info!("SIGINT received, starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received, starting graceful shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
Signal::Quit => {
info!("SIGQUIT received, starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
}
}
ServerCommand::Notify(tx) => {
self.notify.push(tx);
}
ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;
// stop accept thread
self.accept.wake(WakerInterest::Stop);
let notify = std::mem::take(&mut self.notify);
// stop workers
let stop = self
.handles
.iter()
.map(move |worker| worker.1.stop(graceful))
.collect();
rt::spawn(async move {
if graceful {
// wait for all workers to shut down
let _ = join_all(stop).await;
}
if let Some(tx) = completion {
let _ = tx.send(());
}
for tx in notify {
let _ = tx.send(());
}
if exit {
sleep(Duration::from_millis(300)).await;
System::current().stop();
}
});
}
ServerCommand::WorkerFaulted(idx) => {
let mut found = false;
for i in 0..self.handles.len() {
if self.handles[i].0 == idx {
self.handles.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let mut new_idx = self.handles.len();
'found: loop {
for i in 0..self.handles.len() {
if self.handles[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let (handle_accept, handle_server) =
self.start_worker(new_idx, self.accept.waker_owned());
self.handles.push((new_idx, handle_server));
self.accept.wake(WakerInterest::Worker(handle_accept));
}
}
}
}
fn next_token(&mut self) -> usize {
let token = self.token;
self.token += 1;
token
}
}
impl Future for ServerBuilder {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.cmd).poll_recv(cx) {
Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it),
_ => return Poll::Pending,
}
}
}
}
pub(super) fn bind_addr<S: ToSocketAddrs>(
@ -437,39 +261,26 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
backlog: u32,
) -> io::Result<Vec<MioTcpListener>> {
let mut err = None;
let mut succ = false;
let mut success = false;
let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr, backlog) {
match create_mio_tcp_listener(addr, backlog) {
Ok(lst) => {
succ = true;
success = true;
sockets.push(lst);
}
Err(e) => err = Some(e),
}
}
if !succ {
if let Some(e) = err.take() {
Err(e)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
))
}
} else {
if success {
Ok(sockets)
} else if let Some(err) = err.take() {
Err(err)
} else {
Err(io::Error::new(
io::ErrorKind::Other,
"Can not bind to address.",
))
}
}
fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result<MioTcpListener> {
let socket = match addr {
StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?,
StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?,
};
socket.set_reuseaddr(true)?;
socket.bind(addr)?;
socket.listen(backlog)
}

View File

@ -0,0 +1,55 @@
use std::future::Future;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use crate::server::ServerCommand;
/// Server handle.
#[derive(Debug, Clone)]
pub struct ServerHandle {
cmd_tx: UnboundedSender<ServerCommand>,
}
impl ServerHandle {
pub(crate) fn new(cmd_tx: UnboundedSender<ServerCommand>) -> Self {
ServerHandle { cmd_tx }
}
pub(crate) fn worker_faulted(&self, idx: usize) {
let _ = self.cmd_tx.send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections.
///
/// May drop socket pending connection. All open connections remain active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd_tx.send(ServerCommand::Pause(tx));
async {
let _ = rx.await;
}
}
/// Resume accepting incoming connections.
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd_tx.send(ServerCommand::Resume(tx));
async {
let _ = rx.await;
}
}
/// Stop incoming connection processing, stop all workers and exit.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.cmd_tx.send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
async {
let _ = rx.await;
}
}
}

View File

@ -0,0 +1,78 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use futures_core::future::BoxFuture;
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAll<T> {
fut: Vec<JoinFuture<T>>,
}
pub(crate) fn join_all<T>(fut: Vec<impl Future<Output = T> + Send + 'static>) -> JoinAll<T> {
let fut = fut
.into_iter()
.map(|f| JoinFuture::Future(Box::pin(f)))
.collect();
JoinAll { fut }
}
enum JoinFuture<T> {
Future(BoxFuture<'static, T>),
Result(Option<T>),
}
impl<T> Unpin for JoinAll<T> {}
impl<T> Future for JoinAll<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready = true;
let this = self.get_mut();
for fut in this.fut.iter_mut() {
if let JoinFuture::Future(f) = fut {
match f.as_mut().poll(cx) {
Poll::Ready(t) => {
*fut = JoinFuture::Result(Some(t));
}
Poll::Pending => ready = false,
}
}
}
if ready {
let mut res = Vec::new();
for fut in this.fut.iter_mut() {
if let JoinFuture::Result(f) = fut {
res.push(f.take().unwrap());
}
}
Poll::Ready(res)
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod test {
use super::*;
use actix_utils::future::ready;
#[actix_rt::test]
async fn test_join_all() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
}

View File

@ -5,7 +5,10 @@
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
mod accept;
mod availability;
mod builder;
mod handle;
mod join_all;
mod server;
mod service;
mod signals;
@ -15,6 +18,7 @@ mod waker_queue;
mod worker;
pub use self::builder::ServerBuilder;
pub use self::handle::ServerHandle;
pub use self::server::Server;
pub use self::service::ServiceFactory;
pub use self::test_server::TestServer;
@ -22,82 +26,9 @@ pub use self::test_server::TestServer;
#[doc(hidden)]
pub use self::socket::FromStream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Start server building process
#[doc(hidden)]
#[deprecated(since = "2.0.0", note = "Use `Server::build()`.")]
pub fn new() -> ServerBuilder {
ServerBuilder::default()
}
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAll<T> {
fut: Vec<JoinFuture<T>>,
}
pub(crate) fn join_all<T>(fut: Vec<impl Future<Output = T> + 'static>) -> JoinAll<T> {
let fut = fut
.into_iter()
.map(|f| JoinFuture::Future(Box::pin(f)))
.collect();
JoinAll { fut }
}
enum JoinFuture<T> {
Future(Pin<Box<dyn Future<Output = T>>>),
Result(Option<T>),
}
impl<T> Unpin for JoinAll<T> {}
impl<T> Future for JoinAll<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready = true;
let this = self.get_mut();
for fut in this.fut.iter_mut() {
if let JoinFuture::Future(f) = fut {
match f.as_mut().poll(cx) {
Poll::Ready(t) => {
*fut = JoinFuture::Result(Some(t));
}
Poll::Pending => ready = false,
}
}
}
if ready {
let mut res = Vec::new();
for fut in this.fut.iter_mut() {
if let JoinFuture::Result(f) = fut {
res.push(f.take().unwrap());
}
}
Poll::Ready(res)
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod test {
use super::*;
use actix_utils::future::ready;
#[actix_rt::test]
async fn test_join_all() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
}

View File

@ -1,119 +1,359 @@
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{
future::Future,
io, mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use actix_rt::{time::sleep, System};
use futures_core::future::BoxFuture;
use log::{error, info};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use crate::builder::ServerBuilder;
use crate::signals::Signal;
use crate::{
accept::Accept,
builder::ServerBuilder,
join_all::join_all,
service::InternalServiceFactory,
signals::{Signal, Signals},
waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
ServerHandle,
};
#[derive(Debug)]
pub(crate) enum ServerCommand {
/// TODO
WorkerFaulted(usize),
/// Contains return channel to notify caller of successful state change.
Pause(oneshot::Sender<()>),
/// Contains return channel to notify caller of successful state change.
Resume(oneshot::Sender<()>),
Signal(Signal),
/// TODO
Stop {
/// True if shut down should be graceful.
graceful: bool,
/// Return channel to notify caller that shutdown is complete.
completion: Option<oneshot::Sender<()>>,
},
/// Notify of server stop
Notify(oneshot::Sender<()>),
}
/// Server handle.
/// General purpose TCP server that runs services receiving Tokio `TcpStream`s.
///
/// Handles creating worker threads, restarting faulted workers, connection accepting, and
/// back-pressure logic.
///
/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and
/// distributes connections with a round-robin strategy.
///
/// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve
/// when the server has fully shut down.
///
/// # Shutdown Signals
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown.
/// forced shutdown. On Windows, a Ctrl-C signal will start a forced shutdown.
///
/// A graceful shutdown will wait for all workers to stop first.
#[derive(Debug)]
pub struct Server(
UnboundedSender<ServerCommand>,
Option<oneshot::Receiver<()>>,
);
///
/// # Examples
/// The following is a TCP echo server. Test using `telnet 127.0.0.1 8080`.
///
/// ```no_run
/// use std::io;
///
/// use actix_rt::net::TcpStream;
/// use actix_server::Server;
/// use actix_service::{fn_service, ServiceFactoryExt as _};
/// use bytes::BytesMut;
/// use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
///
/// #[actix_rt::main]
/// async fn main() -> io::Result<()> {
/// let bind_addr = ("127.0.0.1", 8080);
///
/// Server::build()
/// .bind("echo", bind_addr, move || {
/// fn_service(move |mut stream: TcpStream| {
/// async move {
/// let mut size = 0;
/// let mut buf = BytesMut::new();
///
/// loop {
/// match stream.read_buf(&mut buf).await {
/// // end of stream; bail from loop
/// Ok(0) => break,
///
/// // write bytes back to stream
/// Ok(bytes_read) => {
/// stream.write_all(&buf[size..]).await.unwrap();
/// size += bytes_read;
/// }
///
/// Err(err) => {
/// eprintln!("Stream Error: {:?}", err);
/// return Err(());
/// }
/// }
/// }
///
/// Ok(())
/// }
/// })
/// .map_err(|err| eprintln!("Service Error: {:?}", err))
/// })?
/// .run()
/// .await
/// }
/// ```
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub enum Server {
Server(ServerInner),
Error(Option<io::Error>),
}
impl Server {
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
Server(tx, None)
}
/// Start server building process
/// Create server build.
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
pub(crate) fn signal(&self, sig: Signal) {
let _ = self.0.send(ServerCommand::Signal(sig));
pub(crate) fn new(mut builder: ServerBuilder) -> Self {
let sockets = mem::take(&mut builder.sockets)
.into_iter()
.map(|t| (t.0, t.2))
.collect();
// Give log information on what runtime will be used.
let is_actix = actix_rt::System::try_current().is_some();
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
match (is_actix, is_tokio) {
(false, true) => info!("Tokio runtime found. Starting in existing Tokio runtime"),
(true, _) => info!("Actix runtime found. Starting in Actix runtime"),
(_, _) => info!(
"Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime"
),
}
for (_, name, lst) in &builder.sockets {
info!(
r#"Starting service: "{}", workers: {}, listening on: {}"#,
name,
builder.threads,
lst.local_addr()
);
}
match Accept::start(sockets, &builder) {
Ok((waker_queue, worker_handles)) => {
// construct OS signals listener future
let signals = (builder.listen_os_signals).then(Signals::new);
Self::Server(ServerInner {
cmd_tx: builder.cmd_tx.clone(),
cmd_rx: builder.cmd_rx,
signals,
waker_queue,
worker_handles,
worker_config: builder.worker_config,
services: builder.factories,
exit: builder.exit,
stop_task: None,
})
}
Err(err) => Self::Error(Some(err)),
}
}
pub(crate) fn worker_faulted(&self, idx: usize) {
let _ = self.0.send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections
/// Get a handle for ServerFuture that can be used to change state of actix server.
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Pause(tx));
async {
let _ = rx.await;
/// See [ServerHandle](ServerHandle) for usage.
pub fn handle(&self) -> ServerHandle {
match self {
Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()),
Server::Error(err) => {
// TODO: i don't think this is the best way to handle server startup fail
panic!(
"server handle can not be obtained because server failed to start up: {}",
err.as_ref().unwrap()
);
}
}
}
/// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Resume(tx));
async {
let _ = rx.await;
}
}
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.0.send(ServerCommand::Stop {
graceful,
completion: Some(tx),
});
async {
let _ = rx.await;
}
}
}
impl Clone for Server {
fn clone(&self) -> Self {
Self(self.0.clone(), None)
}
}
impl Future for Server {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().get_mut() {
Self::Error(err) => Poll::Ready(Err(err
.take()
.expect("Server future cannot be polled after error"))),
if this.1.is_none() {
let (tx, rx) = oneshot::channel();
if this.0.send(ServerCommand::Notify(tx)).is_err() {
return Poll::Ready(Ok(()));
Self::Server(inner) => {
// poll Signals
if let Some(ref mut signals) = inner.signals {
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
inner.stop_task = inner.handle_signal(signal);
// drop signals listener
inner.signals = None;
}
}
// handle stop tasks and eager drain command channel
loop {
if let Some(ref mut fut) = inner.stop_task {
// only resolve stop task and exit
return fut.as_mut().poll(cx).map(|_| Ok(()));
}
match Pin::new(&mut inner.cmd_rx).poll_recv(cx) {
Poll::Ready(Some(cmd)) => {
// if stop task is required, set it and loop
inner.stop_task = inner.handle_cmd(cmd);
}
_ => return Poll::Pending,
}
}
}
}
}
}
pub struct ServerInner {
worker_handles: Vec<WorkerHandleServer>,
worker_config: ServerWorkerConfig,
services: Vec<Box<dyn InternalServiceFactory>>,
exit: bool,
cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
signals: Option<Signals>,
waker_queue: WakerQueue,
stop_task: Option<BoxFuture<'static, ()>>,
}
impl ServerInner {
fn handle_cmd(&mut self, item: ServerCommand) -> Option<BoxFuture<'static, ()>> {
match item {
ServerCommand::Pause(tx) => {
self.waker_queue.wake(WakerInterest::Pause);
let _ = tx.send(());
None
}
ServerCommand::Resume(tx) => {
self.waker_queue.wake(WakerInterest::Resume);
let _ = tx.send(());
None
}
ServerCommand::Stop {
graceful,
completion,
} => {
let exit = self.exit;
// stop accept thread
self.waker_queue.wake(WakerInterest::Stop);
// stop workers
let workers_stop = self
.worker_handles
.iter()
.map(|worker| worker.stop(graceful))
.collect::<Vec<_>>();
Some(Box::pin(async move {
if graceful {
// wait for all workers to shut down
let _ = join_all(workers_stop).await;
}
if let Some(tx) = completion {
let _ = tx.send(());
}
if exit {
sleep(Duration::from_millis(300)).await;
System::try_current().as_ref().map(System::stop);
}
}))
}
ServerCommand::WorkerFaulted(idx) => {
// TODO: maybe just return with warning log if not found ?
assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
error!("Worker {} has died; restarting", idx);
let factories = self
.services
.iter()
.map(|service| service.clone_factory())
.collect();
match ServerWorker::start(
idx,
factories,
self.waker_queue.clone(),
self.worker_config,
) {
Ok((handle_accept, handle_server)) => {
*self
.worker_handles
.iter_mut()
.find(|wrk| wrk.idx == idx)
.unwrap() = handle_server;
self.waker_queue.wake(WakerInterest::Worker(handle_accept));
}
Err(err) => error!("can not restart worker {}: {}", idx, err),
};
None
}
}
}
fn handle_signal(&mut self, signal: Signal) -> Option<BoxFuture<'static, ()>> {
match signal {
Signal::Int => {
info!("SIGINT received; starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
Signal::Term => {
info!("SIGTERM received; starting graceful shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: true,
completion: None,
})
}
Signal::Quit => {
info!("SIGQUIT received; starting forced shutdown");
self.exit = true;
self.handle_cmd(ServerCommand::Stop {
graceful: false,
completion: None,
})
}
this.1 = Some(rx);
}
match Pin::new(this.1.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Ok(())),
}
}
}

View File

@ -1,12 +1,16 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use crate::server::Server;
use log::trace;
/// Types of process signals.
#[allow(dead_code)]
#[derive(PartialEq, Clone, Copy, Debug)]
// #[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq)]
#[allow(dead_code)] // variants are never constructed on non-unix
pub(crate) enum Signal {
/// `SIGINT`
Int,
@ -18,26 +22,35 @@ pub(crate) enum Signal {
Quit,
}
impl fmt::Display for Signal {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Signal::Int => "SIGINT",
Signal::Term => "SIGTERM",
Signal::Quit => "SIGQUIT",
})
}
}
/// Process signal listener.
pub(crate) struct Signals {
srv: Server,
#[cfg(not(unix))]
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>,
signals: futures_core::future::BoxFuture<'static, std::io::Result<()>>,
#[cfg(unix)]
signals: Vec<(Signal, actix_rt::signal::unix::Signal)>,
}
impl Signals {
/// Spawns a signal listening future that is able to send commands to the `Server`.
pub(crate) fn start(srv: Server) {
/// Constructs an OS signal listening future.
pub(crate) fn new() -> Self {
trace!("setting up OS signal listener");
#[cfg(not(unix))]
{
actix_rt::spawn(Signals {
srv,
Signals {
signals: Box::pin(actix_rt::signal::ctrl_c()),
});
}
}
#[cfg(unix)]
@ -66,33 +79,30 @@ impl Signals {
})
.collect::<Vec<_>>();
actix_rt::spawn(Signals { srv, signals });
Signals { signals }
}
}
}
impl Future for Signals {
type Output = ();
type Output = Signal;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
#[cfg(not(unix))]
match self.signals.as_mut().poll(cx) {
Poll::Ready(_) => {
self.srv.signal(Signal::Int);
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
{
self.signals.as_mut().poll(cx).map(|_| Signal::Int)
}
#[cfg(unix)]
{
for (sig, fut) in self.signals.iter_mut() {
// TODO: match on if let Some ?
if Pin::new(fut).poll_recv(cx).is_ready() {
let sig = *sig;
self.srv.signal(sig);
return Poll::Ready(());
trace!("{} received", sig);
return Poll::Ready(*sig);
}
}
Poll::Pending
}
}

View File

@ -2,7 +2,7 @@ pub(crate) use std::net::{
SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs,
};
pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket};
pub(crate) use mio::net::TcpListener as MioTcpListener;
#[cfg(unix)]
pub(crate) use {
mio::net::UnixListener as MioUnixListener,
@ -223,6 +223,22 @@ mod unix_impl {
}
}
pub(crate) fn create_mio_tcp_listener(
addr: StdSocketAddr,
backlog: u32,
) -> io::Result<MioTcpListener> {
use socket2::{Domain, Protocol, Socket, Type};
let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?;
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.bind(&addr.into())?;
socket.listen(backlog as i32)?;
Ok(MioTcpListener::from_std(StdTcpListener::from(socket)))
}
#[cfg(test)]
mod tests {
use super::*;
@ -234,11 +250,8 @@ mod tests {
assert_eq!(format!("{}", addr), "127.0.0.1:8080");
let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = MioTcpSocket::new_v4().unwrap();
socket.set_reuseaddr(true).unwrap();
socket.bind(addr).unwrap();
let tcp = socket.listen(128).unwrap();
let lst = MioListener::Tcp(tcp);
let lst = create_mio_tcp_listener(addr, 128).unwrap();
let lst = MioListener::Tcp(lst);
assert!(format!("{:?}", lst).contains("TcpListener"));
assert!(format!("{}", lst).contains("127.0.0.1"));
}

View File

@ -1,9 +1,8 @@
use std::sync::mpsc;
use std::{net, thread};
use std::{io, net, sync::mpsc, thread};
use actix_rt::{net::TcpStream, System};
use crate::{Server, ServerBuilder, ServiceFactory};
use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory};
/// A testing server.
///
@ -34,7 +33,8 @@ pub struct TestServerRuntime {
addr: net::SocketAddr,
host: String,
port: u16,
system: System,
server_handle: ServerHandle,
thread_handle: Option<thread::JoinHandle<io::Result<()>>>,
}
impl TestServer {
@ -46,20 +46,22 @@ impl TestServer {
let (tx, rx) = mpsc::channel();
// run server in separate thread
thread::spawn(move || {
let sys = System::new();
factory(Server::build()).workers(1).disable_signals().run();
tx.send(System::current()).unwrap();
sys.run()
let thread_handle = thread::spawn(move || {
System::new().block_on(async {
let server = factory(Server::build()).workers(1).disable_signals().run();
tx.send(server.handle()).unwrap();
server.await
})
});
let system = rx.recv().unwrap();
let server_handle = rx.recv().unwrap();
TestServerRuntime {
system,
addr: "127.0.0.1:0".parse().unwrap(),
host: "127.0.0.1".to_string(),
port: 0,
server_handle,
thread_handle: Some(thread_handle),
}
}
@ -68,24 +70,25 @@ impl TestServer {
let (tx, rx) = mpsc::channel();
// run server in separate thread
thread::spawn(move || {
let thread_handle = thread::spawn(move || {
let sys = System::new();
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
sys.block_on(async {
Server::build()
let server = Server::build()
.listen("test", tcp, factory)
.unwrap()
.workers(1)
.disable_signals()
.run();
tx.send((System::current(), local_addr)).unwrap();
});
sys.run()
tx.send((server.handle(), local_addr)).unwrap();
server.await
})
});
let (system, addr) = rx.recv().unwrap();
let (server_handle, addr) = rx.recv().unwrap();
let host = format!("{}", addr.ip());
let port = addr.port();
@ -94,18 +97,23 @@ impl TestServer {
addr,
host,
port,
system,
server_handle,
thread_handle: Some(thread_handle),
}
}
/// Get first available unused local address.
pub fn unused_addr() -> net::SocketAddr {
use socket2::{Domain, Protocol, Socket, Type};
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = mio::net::TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap();
let tcp = socket.listen(1024).unwrap();
tcp.local_addr().unwrap()
let socket =
Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP)).unwrap();
socket.set_reuse_address(true).unwrap();
socket.set_nonblocking(true).unwrap();
socket.bind(&addr.into()).unwrap();
socket.listen(1024).unwrap();
net::TcpListener::from(socket).local_addr().unwrap()
}
}
@ -127,7 +135,8 @@ impl TestServerRuntime {
/// Stop server.
fn stop(&mut self) {
self.system.stop();
let _ = self.server_handle.stop(false);
self.thread_handle.take().unwrap().join().unwrap().unwrap();
}
/// Connect to server, returning a Tokio `TcpStream`.
@ -141,3 +150,16 @@ impl Drop for TestServerRuntime {
self.stop()
}
}
#[cfg(test)]
mod tests {
use actix_service::fn_service;
use super::*;
#[tokio::test]
async fn plain_tokio_runtime() {
let srv = TestServer::with(|| fn_service(|_sock| async move { Ok::<_, ()>(()) }));
assert!(srv.connect().is_ok());
}
}

View File

@ -78,12 +78,7 @@ pub(crate) enum WakerInterest {
Pause,
Resume,
Stop,
/// `Timer` is an interest sent as a delayed future. When an error happens on accepting
/// connection `Accept` would deregister socket listener temporary and wake up the poll and
/// register them again after the delayed future resolve.
Timer,
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandleAccept`.
/// `Worker` is an interest that is triggered after a worker faults. This is determined by
/// trying to send work to it. `Accept` would be waked up and add the new `WorkerHandleAccept`.
Worker(WorkerHandleAccept),
}

View File

@ -1,6 +1,6 @@
use std::{
future::Future,
mem,
io, mem,
pin::Pin,
rc::Rc,
sync::{
@ -14,7 +14,7 @@ use std::{
use actix_rt::{
spawn,
time::{sleep, Instant, Sleep},
Arbiter,
Arbiter, ArbiterHandle, System,
};
use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace};
@ -23,12 +23,13 @@ use tokio::sync::{
oneshot,
};
use crate::join_all;
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::{
service::{BoxedServerService, InternalServiceFactory},
socket::MioStream,
waker_queue::{WakerInterest, WakerQueue},
};
/// Stop worker message. Returns `true` on successful graceful shutdown.
/// Stop worker message. Returns `true` on successful graceful shutdown
/// and `false` if some connections still alive when shutdown execute.
pub(crate) struct Stop {
graceful: bool,
@ -41,19 +42,20 @@ pub(crate) struct Conn {
pub token: usize,
}
/// Create accept and server worker handles.
fn handle_pair(
idx: usize,
tx1: UnboundedSender<Conn>,
tx2: UnboundedSender<Stop>,
conn_tx: UnboundedSender<Conn>,
stop_tx: UnboundedSender<Stop>,
counter: Counter,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept {
idx,
tx: tx1,
conn_tx,
counter,
};
let server = WorkerHandleServer { idx, tx: tx2 };
let server = WorkerHandleServer { idx, stop_tx };
(accept, server)
}
@ -149,13 +151,13 @@ impl Drop for WorkerCounterGuard {
}
}
/// Handle to worker that can send connection message to worker and share the
/// availability of worker to other thread.
/// Handle to worker that can send connection message to worker and share the availability of worker
/// to other threads.
///
/// Held by [Accept](crate::accept::Accept).
pub(crate) struct WorkerHandleAccept {
idx: usize,
tx: UnboundedSender<Conn>,
conn_tx: UnboundedSender<Conn>,
counter: Counter,
}
@ -166,8 +168,8 @@ impl WorkerHandleAccept {
}
#[inline(always)]
pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
self.tx.send(msg).map_err(|msg| msg.0)
pub(crate) fn send(&self, conn: Conn) -> Result<(), Conn> {
self.conn_tx.send(conn).map_err(|msg| msg.0)
}
#[inline(always)]
@ -181,14 +183,14 @@ impl WorkerHandleAccept {
/// Held by [ServerBuilder](crate::builder::ServerBuilder).
#[derive(Debug)]
pub(crate) struct WorkerHandleServer {
idx: usize,
tx: UnboundedSender<Stop>,
pub(crate) idx: usize,
stop_tx: UnboundedSender<Stop>,
}
impl WorkerHandleServer {
pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.send(Stop { graceful, tx });
let _ = self.stop_tx.send(Stop { graceful, tx });
rx
}
}
@ -199,8 +201,8 @@ impl WorkerHandleServer {
pub(crate) struct ServerWorker {
// UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>,
conn_rx: UnboundedReceiver<Conn>,
stop_rx: UnboundedReceiver<Stop>,
counter: WorkerCounter,
services: Box<[WorkerService]>,
factories: Box<[Box<dyn InternalServiceFactory>]>,
@ -209,7 +211,7 @@ pub(crate) struct ServerWorker {
}
struct WorkerService {
factory: usize,
factory_idx: usize,
status: WorkerServiceStatus,
service: BoxedServerService,
}
@ -221,7 +223,7 @@ impl WorkerService {
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum WorkerServiceStatus {
Available,
Unavailable,
@ -231,8 +233,14 @@ enum WorkerServiceStatus {
Stopped,
}
impl Default for WorkerServiceStatus {
fn default() -> Self {
Self::Unavailable
}
}
/// Config for worker behavior passed down from server builder.
#[derive(Copy, Clone)]
#[derive(Debug, Clone, Copy)]
pub(crate) struct ServerWorkerConfig {
shutdown_timeout: Duration,
max_blocking_threads: usize,
@ -271,82 +279,199 @@ impl ServerWorker {
factories: Vec<Box<dyn InternalServiceFactory>>,
waker_queue: WakerQueue,
config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
trace!("starting server worker {}", idx);
let (tx1, conn_rx) = unbounded_channel();
let (tx2, stop_rx) = unbounded_channel();
let counter = Counter::new(config.max_concurrent_connections);
let pair = handle_pair(idx, tx1, tx2, counter.clone());
let counter_clone = counter.clone();
// every worker runs in it's own arbiter.
// get actix system context if it is set
let actix_system = System::try_current();
// get tokio runtime handle if it is set
let tokio_handle = tokio::runtime::Handle::try_current().ok();
// service factories initialization channel
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel::<io::Result<()>>(1);
// outline of following code:
//
// if system exists
// if uring enabled
// start arbiter using uring method
// else
// start arbiter with regular tokio
// else
// if uring enabled
// start uring in spawned thread
// else
// start regular tokio in spawned thread
// every worker runs in it's own thread and tokio runtime.
// use a custom tokio runtime builder to change the settings of runtime.
#[cfg(all(target_os = "linux", feature = "io-uring"))]
let arbiter = {
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
let _ = config.max_blocking_threads;
Arbiter::new()
};
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
let arbiter = Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
});
match (actix_system, tokio_handle) {
(None, None) => {
panic!("No runtime detected. Start a Tokio (or Actix) runtime.");
}
arbiter.spawn(async move {
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
})
.collect::<Vec<_>>();
// no actix system
(None, Some(rt_handle)) => {
std::thread::Builder::new()
.name(format!("actix-server worker {}", idx))
.spawn(move || {
let (worker_stopped_tx, worker_stopped_rx) = oneshot::channel();
// a second spawn to run !Send future tasks.
spawn(async move {
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
// local set for running service init futures and worker services
let ls = tokio::task::LocalSet::new();
// init services using existing Tokio runtime (so probably on main thread)
let services = rt_handle.block_on(ls.run_until(async {
let mut services = Vec::new();
for (idx, factory) in factories.iter().enumerate() {
match factory.create().await {
Ok((token, svc)) => services.push((idx, token, svc)),
Err(err) => {
error!("Can not start worker: {:?}", err);
return Err(io::Error::new(
io::ErrorKind::Other,
format!("can not start server service {}", idx),
));
}
}
}
Ok(services)
}));
let services = match services {
Ok(services) => {
factory_tx.send(Ok(())).unwrap();
services
}
Err(err) => {
factory_tx.send(Err(err)).unwrap();
return;
}
};
let worker_services = wrap_worker_services(services);
let worker_fut = async move {
// spawn to make sure ServerWorker runs as non boxed future.
spawn(async move {
ServerWorker {
conn_rx,
stop_rx,
services: worker_services.into_boxed_slice(),
counter: WorkerCounter::new(idx, waker_queue, counter),
factories: factories.into_boxed_slice(),
state: WorkerState::default(),
shutdown_timeout: config.shutdown_timeout,
}
.await;
// wake up outermost task waiting for shutdown
worker_stopped_tx.send(()).unwrap();
});
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
return;
}
worker_stopped_rx.await.unwrap();
};
#[cfg(all(target_os = "linux", feature = "io-uring"))]
{
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
let _ = config.max_blocking_threads;
tokio_uring::start(worker_fut);
}
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap();
rt.block_on(ls.run_until(worker_fut));
}
})
.expect("cannot spawn server worker thread");
}
// with actix system
(Some(_sys), _) => {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
let arbiter = {
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
let _ = config.max_blocking_threads;
Arbiter::new()
};
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
rx,
rx2,
services,
counter: WorkerCounter::new(idx, waker_queue, counter_clone),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
});
});
});
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
let arbiter = {
Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
})
};
handle_pair(idx, tx1, tx2, counter)
arbiter.spawn(async move {
// spawn_local to run !Send future tasks.
spawn(async move {
let mut services = Vec::new();
for (idx, factory) in factories.iter().enumerate() {
match factory.create().await {
Ok((token, svc)) => services.push((idx, token, svc)),
Err(err) => {
error!("Can not start worker: {:?}", err);
Arbiter::current().stop();
factory_tx
.send(Err(io::Error::new(
io::ErrorKind::Other,
format!("can not start server service {}", idx),
)))
.unwrap();
return;
}
}
}
factory_tx.send(Ok(())).unwrap();
let worker_services = wrap_worker_services(services);
// spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
conn_rx,
stop_rx,
services: worker_services.into_boxed_slice(),
counter: WorkerCounter::new(idx, waker_queue, counter),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
});
});
});
}
};
// wait for service factories initialization
factory_rx.recv().unwrap()?;
Ok(pair)
}
fn restart_service(&mut self, idx: usize, factory_id: usize) {
@ -384,7 +509,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Unavailable {
trace!(
"Service {:?} is available",
self.factories[srv.factory].name(idx)
self.factories[srv.factory_idx].name(idx)
);
srv.status = WorkerServiceStatus::Available;
}
@ -395,7 +520,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Available {
trace!(
"Service {:?} is unavailable",
self.factories[srv.factory].name(idx)
self.factories[srv.factory_idx].name(idx)
);
srv.status = WorkerServiceStatus::Unavailable;
}
@ -403,10 +528,10 @@ impl ServerWorker {
Poll::Ready(Err(_)) => {
error!(
"Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(idx)
self.factories[srv.factory_idx].name(idx)
);
srv.status = WorkerServiceStatus::Failed;
return Err((idx, srv.factory));
return Err((idx, srv.factory_idx));
}
}
}
@ -437,7 +562,7 @@ struct Shutdown {
/// Start time of shutdown.
start_from: Instant,
/// Notify of the shutdown outcome (force/grace) to stop caller.
/// Notify caller of the shutdown outcome (graceful/force).
tx: oneshot::Sender<bool>,
}
@ -449,8 +574,7 @@ impl Default for WorkerState {
impl Drop for ServerWorker {
fn drop(&mut self) {
// Stop the Arbiter ServerWorker runs on on drop.
Arbiter::current().stop();
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
}
}
@ -461,15 +585,16 @@ impl Future for ServerWorker {
let this = self.as_mut().get_mut();
// `StopWorker` message handler
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
if let Poll::Ready(Some(Stop { graceful, tx })) =
Pin::new(&mut this.stop_rx).poll_recv(cx)
{
let num = this.counter.total();
if num == 0 {
info!("Shutting down worker, 0 connections");
info!("Shutting down idle worker");
let _ = tx.send(true);
return Poll::Ready(());
} else if graceful {
info!("Graceful worker shutdown, {} connections", num);
info!("Graceful worker shutdown; finishing {} connections", num);
this.shutdown(false);
this.state = WorkerState::Shutdown(Shutdown {
@ -478,7 +603,7 @@ impl Future for ServerWorker {
tx,
});
} else {
info!("Force shutdown worker, {} connections", num);
info!("Force shutdown worker, closing {} connections", num);
this.shutdown(true);
let _ = tx.send(false);
@ -523,6 +648,14 @@ impl Future for ServerWorker {
self.poll(cx)
}
WorkerState::Shutdown(ref mut shutdown) => {
// drop all pending connections in rx channel.
while let Poll::Ready(Some(conn)) = Pin::new(&mut this.conn_rx).poll_recv(cx) {
// WorkerCounterGuard is needed as Accept thread has incremented counter.
// It's guard's job to decrement the counter together with drop of Conn.
let guard = this.counter.guard();
drop((conn, guard));
}
// wait for 1 second
ready!(shutdown.timer.as_mut().poll(cx));
@ -563,7 +696,7 @@ impl Future for ServerWorker {
}
// handle incoming io stream
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
match ready!(Pin::new(&mut this.conn_rx).poll_recv(cx)) {
Some(msg) => {
let guard = this.counter.guard();
let _ = this.services[msg.token].service.call((guard, msg.io));
@ -574,3 +707,19 @@ impl Future for ServerWorker {
}
}
}
fn wrap_worker_services(
services: Vec<(usize, usize, BoxedServerService)>,
) -> Vec<WorkerService> {
services
.into_iter()
.fold(Vec::new(), |mut services, (idx, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory_idx: idx,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
}

View File

@ -1,20 +1,19 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{mpsc, Arc};
use std::{net, thread, time::Duration};
use std::{
net,
sync::{
atomic::{AtomicUsize, Ordering},
mpsc, Arc,
},
thread,
time::Duration,
};
use actix_rt::{net::TcpStream, time::sleep};
use actix_server::Server;
use actix_server::{Server, TestServer};
use actix_service::fn_service;
use actix_utils::future::ok;
use futures_util::future::lazy;
fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = mio::net::TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap();
let tcp = socket.listen(32).unwrap();
tcp.local_addr().unwrap()
TestServer::unused_addr()
}
#[test]
@ -23,52 +22,94 @@ fn test_bind() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| {
Server::build()
actix_rt::System::new().block_on(async {
let srv = Server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()
.run()
}));
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run();
let _ = tx.send(srv.handle());
srv.await
})
});
let (_, sys) = rx.recv().unwrap();
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
sys.stop();
let _ = h.join();
let _ = srv.stop(true);
h.join().unwrap().unwrap();
}
#[test]
fn plain_tokio_runtime() {
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let srv = Server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
tx.send(srv.handle()).unwrap();
srv.await
})
});
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
h.join().unwrap().unwrap();
}
#[test]
fn test_listen() {
let addr = unused_addr();
let lst = net::TcpListener::bind(addr).unwrap();
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let sys = actix_rt::System::new();
let lst = net::TcpListener::bind(addr).unwrap();
sys.block_on(async {
Server::build()
actix_rt::System::new().block_on(async {
let srv = Server::build()
.disable_signals()
.workers(1)
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
.unwrap()
.listen("test", lst, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
let _ = tx.send(actix_rt::System::current());
});
let _ = sys.run();
let _ = tx.send(srv.handle());
srv.await
})
});
let sys = rx.recv().unwrap();
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
sys.stop();
let _ = h.join();
let _ = srv.stop(true);
h.join().unwrap().unwrap();
}
#[test]
@ -84,9 +125,8 @@ fn test_start() {
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| {
Server::build()
actix_rt::System::new().block_on(async {
let srv = Server::build()
.backlog(100)
.disable_signals()
.bind("test", addr, move || {
@ -95,13 +135,13 @@ fn test_start() {
f.send(Bytes::from_static(b"test")).await.unwrap();
Ok::<_, ()>(())
})
})
.unwrap()
.run()
}));
})?
.run();
let _ = tx.send((srv, actix_rt::System::current()));
let _ = sys.run();
let _ = tx.send((srv.handle(), actix_rt::System::current()));
srv.await
})
});
let (srv, sys) = rx.recv().unwrap();
@ -134,12 +174,11 @@ fn test_start() {
// stop
let _ = srv.stop(false);
thread::sleep(Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err());
thread::sleep(Duration::from_millis(100));
sys.stop();
let _ = h.join();
h.join().unwrap().unwrap();
thread::sleep(Duration::from_secs(1));
assert!(net::TcpStream::connect(addr).is_err());
}
#[actix_rt::test]
@ -162,11 +201,11 @@ async fn test_max_concurrent_connections() {
let h = thread::spawn(move || {
actix_rt::System::new().block_on(async {
let server = Server::build()
let srv = Server::build()
// Set a relative higher backlog.
.backlog(12)
// max connection for a worker is 3.
.maxconn(max_conn)
.max_concurrent_connections(max_conn)
.workers(1)
.disable_signals()
.bind("test", addr, move || {
@ -183,9 +222,9 @@ async fn test_max_concurrent_connections() {
})?
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
let _ = tx.send((srv.handle(), actix_rt::System::current()));
server.await
srv.await
})
});
@ -209,9 +248,8 @@ async fn test_max_concurrent_connections() {
}
srv.stop(false).await;
sys.stop();
let _ = h.join().unwrap();
h.join().unwrap().unwrap();
}
#[actix_rt::test]
@ -257,7 +295,7 @@ async fn test_service_restart() {
let h = thread::spawn(move || {
let num = num.clone();
actix_rt::System::new().block_on(async {
let server = Server::build()
let srv = Server::build()
.backlog(1)
.disable_signals()
.bind("addr1", addr1, move || {
@ -266,25 +304,23 @@ async fn test_service_restart() {
let num = num.clone();
async move { Ok::<_, ()>(TestService(num)) }
})
})
.unwrap()
})?
.bind("addr2", addr2, move || {
let num2 = num2.clone();
fn_factory(move || {
let num2 = num2.clone();
async move { Ok::<_, ()>(TestService(num2)) }
})
})
.unwrap()
})?
.workers(1)
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
let _ = tx.send(srv.handle());
srv.await
})
});
let (server, sys) = rx.recv().unwrap();
let srv = rx.recv().unwrap();
for _ in 0..5 {
TcpStream::connect(addr1)
@ -306,12 +342,11 @@ async fn test_service_restart() {
assert!(num_clone.load(Ordering::SeqCst) > 5);
assert!(num2_clone.load(Ordering::SeqCst) > 5);
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
let _ = srv.stop(false);
h.join().unwrap().unwrap();
}
#[ignore]
#[ignore] // non-deterministic on CI
#[actix_rt::test]
async fn worker_restart() {
use actix_service::{Service, ServiceFactory};
@ -378,19 +413,19 @@ async fn worker_restart() {
let h = thread::spawn(move || {
let counter = counter.clone();
actix_rt::System::new().block_on(async {
let server = Server::build()
let srv = Server::build()
.disable_signals()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))
.unwrap()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))?
.workers(2)
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
let _ = tx.send(srv.handle());
srv.await
})
});
let (server, sys) = rx.recv().unwrap();
let srv = rx.recv().unwrap();
sleep(Duration::from_secs(3)).await;
@ -447,7 +482,6 @@ async fn worker_restart() {
assert_eq!("3", id);
stream.shutdown().await.unwrap();
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
let _ = srv.stop(false);
h.join().unwrap().unwrap();
}

View File

@ -0,0 +1,33 @@
use std::{future::Future, sync::mpsc, time::Duration};
async fn oracle<F, Fut>(f: F) -> (u32, u32)
where
F: FnOnce() -> Fut + Clone + Send + 'static,
Fut: Future<Output = u32> + 'static,
{
let f1 = actix_rt::spawn(f.clone()());
let f2 = actix_rt::spawn(f());
(f1.await.unwrap(), f2.await.unwrap())
}
#[actix_rt::main]
async fn main() {
let (tx, rx) = mpsc::channel();
let (r1, r2) = oracle({
let tx = tx.clone();
|| async move {
tx.send(()).unwrap();
4 * 4
}
})
.await;
assert_eq!(r1, r2);
tx.send(()).unwrap();
rx.recv_timeout(Duration::from_millis(100)).unwrap();
rx.recv_timeout(Duration::from_millis(100)).unwrap();
}

View File

@ -52,9 +52,9 @@ pub fn fn_factory<F, Cfg, Srv, Req, Fut, Err>(
f: F,
) -> FnServiceNoConfig<F, Cfg, Srv, Req, Fut, Err>
where
Srv: Service<Req>,
F: Fn() -> Fut,
Fut: Future<Output = Result<Srv, Err>>,
Srv: Service<Req>,
{
FnServiceNoConfig::new(f)
}

View File

@ -1,7 +1,7 @@
/// An implementation of [`poll_ready`]() that always signals readiness.
///
/// This should only be used for basic leaf services that have no concept of un-readiness.
/// For wrapper or other serivice types, use [`forward_ready!`] for simple cases or write a bespoke
/// For wrapper or other service types, use [`forward_ready!`] for simple cases or write a bespoke
/// `poll_ready` implementation.
///
/// [`poll_ready`]: crate::Service::poll_ready

View File

@ -9,26 +9,25 @@ use pin_project_lite::pin_project;
use super::{Service, ServiceFactory};
/// Service for the `map_err` combinator, changing the type of a service's
/// error.
/// Service for the `map_err` combinator, changing the type of a service's error.
///
/// This is created by the `ServiceExt::map_err` method.
pub struct MapErr<S, Req, F, E> {
service: S,
f: F,
mapper: F,
_t: PhantomData<(E, Req)>,
}
impl<S, Req, F, E> MapErr<S, Req, F, E> {
/// Create new `MapErr` combinator
pub(crate) fn new(service: S, f: F) -> Self
pub(crate) fn new(service: S, mapper: F) -> Self
where
S: Service<Req>,
F: Fn(S::Error) -> E,
{
Self {
service,
f,
mapper,
_t: PhantomData,
}
}
@ -42,7 +41,7 @@ where
fn clone(&self) -> Self {
MapErr {
service: self.service.clone(),
f: self.f.clone(),
mapper: self.mapper.clone(),
_t: PhantomData,
}
}
@ -58,11 +57,11 @@ where
type Future = MapErrFuture<A, Req, F, E>;
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx).map_err(&self.f)
self.service.poll_ready(ctx).map_err(&self.mapper)
}
fn call(&self, req: Req) -> Self::Future {
MapErrFuture::new(self.service.call(req), self.f.clone())
MapErrFuture::new(self.service.call(req), self.mapper.clone())
}
}
@ -105,23 +104,23 @@ where
/// service's error.
///
/// This is created by the `NewServiceExt::map_err` method.
pub struct MapErrServiceFactory<A, Req, F, E>
pub struct MapErrServiceFactory<SF, Req, F, E>
where
A: ServiceFactory<Req>,
F: Fn(A::Error) -> E + Clone,
SF: ServiceFactory<Req>,
F: Fn(SF::Error) -> E + Clone,
{
a: A,
a: SF,
f: F,
e: PhantomData<(E, Req)>,
}
impl<A, Req, F, E> MapErrServiceFactory<A, Req, F, E>
impl<SF, Req, F, E> MapErrServiceFactory<SF, Req, F, E>
where
A: ServiceFactory<Req>,
F: Fn(A::Error) -> E + Clone,
SF: ServiceFactory<Req>,
F: Fn(SF::Error) -> E + Clone,
{
/// Create new `MapErr` new service instance
pub(crate) fn new(a: A, f: F) -> Self {
pub(crate) fn new(a: SF, f: F) -> Self {
Self {
a,
f,
@ -130,10 +129,10 @@ where
}
}
impl<A, Req, F, E> Clone for MapErrServiceFactory<A, Req, F, E>
impl<SF, Req, F, E> Clone for MapErrServiceFactory<SF, Req, F, E>
where
A: ServiceFactory<Req> + Clone,
F: Fn(A::Error) -> E + Clone,
SF: ServiceFactory<Req> + Clone,
F: Fn(SF::Error) -> E + Clone,
{
fn clone(&self) -> Self {
Self {
@ -144,57 +143,57 @@ where
}
}
impl<A, Req, F, E> ServiceFactory<Req> for MapErrServiceFactory<A, Req, F, E>
impl<SF, Req, F, E> ServiceFactory<Req> for MapErrServiceFactory<SF, Req, F, E>
where
A: ServiceFactory<Req>,
F: Fn(A::Error) -> E + Clone,
SF: ServiceFactory<Req>,
F: Fn(SF::Error) -> E + Clone,
{
type Response = A::Response;
type Response = SF::Response;
type Error = E;
type Config = A::Config;
type Service = MapErr<A::Service, Req, F, E>;
type InitError = A::InitError;
type Future = MapErrServiceFuture<A, Req, F, E>;
type Config = SF::Config;
type Service = MapErr<SF::Service, Req, F, E>;
type InitError = SF::InitError;
type Future = MapErrServiceFuture<SF, Req, F, E>;
fn new_service(&self, cfg: A::Config) -> Self::Future {
fn new_service(&self, cfg: SF::Config) -> Self::Future {
MapErrServiceFuture::new(self.a.new_service(cfg), self.f.clone())
}
}
pin_project! {
pub struct MapErrServiceFuture<A, Req, F, E>
pub struct MapErrServiceFuture<SF, Req, F, E>
where
A: ServiceFactory<Req>,
F: Fn(A::Error) -> E,
SF: ServiceFactory<Req>,
F: Fn(SF::Error) -> E,
{
#[pin]
fut: A::Future,
f: F,
fut: SF::Future,
mapper: F,
}
}
impl<A, Req, F, E> MapErrServiceFuture<A, Req, F, E>
impl<SF, Req, F, E> MapErrServiceFuture<SF, Req, F, E>
where
A: ServiceFactory<Req>,
F: Fn(A::Error) -> E,
SF: ServiceFactory<Req>,
F: Fn(SF::Error) -> E,
{
fn new(fut: A::Future, f: F) -> Self {
MapErrServiceFuture { fut, f }
fn new(fut: SF::Future, mapper: F) -> Self {
MapErrServiceFuture { fut, mapper }
}
}
impl<A, Req, F, E> Future for MapErrServiceFuture<A, Req, F, E>
impl<SF, Req, F, E> Future for MapErrServiceFuture<SF, Req, F, E>
where
A: ServiceFactory<Req>,
F: Fn(A::Error) -> E + Clone,
SF: ServiceFactory<Req>,
F: Fn(SF::Error) -> E + Clone,
{
type Output = Result<MapErr<A::Service, Req, F, E>, A::InitError>;
type Output = Result<MapErr<SF::Service, Req, F, E>, SF::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(svc) = this.fut.poll(cx)? {
Poll::Ready(Ok(MapErr::new(svc, this.f.clone())))
Poll::Ready(Ok(MapErr::new(svc, this.mapper.clone())))
} else {
Poll::Pending
}

View File

@ -3,14 +3,35 @@
## Unreleased - 2021-xx-xx
## 3.0.0-beta.8 - 2021-11-15
* Add `Connect::request` for getting a reference to the connection request. [#415]
[#415]: https://github.com/actix/actix-net/pull/415
## 3.0.0-beta.7 - 2021-10-20
* Add `webpki_roots_cert_store()` to get rustls compatible webpki roots cert store. [#401]
* Alias `connect::ssl` to `connect::tls`. [#401]
[#401]: https://github.com/actix/actix-net/pull/401
## 3.0.0-beta.6 - 2021-10-19
* Update `tokio-rustls` to `0.23` which uses `rustls` `0.20`. [#396]
* Removed a re-export of `Session` from `rustls` as it no longer exist. [#396]
* Minimum supported Rust version (MSRV) is now 1.52.
[#396]: https://github.com/actix/actix-net/pull/396
## 3.0.0-beta.5 - 2021-03-29
* Changed `connect::ssl::rustls::RustlsConnectorService` to return error when `DNSNameRef`
* Changed `connect::ssl::rustls::RustlsConnectorService` to return error when `DNSNameRef`
generation failed instead of panic. [#296]
* Remove `connect::ssl::openssl::OpensslConnectServiceFactory`. [#297]
* Remove `connect::ssl::openssl::OpensslConnectService`. [#297]
* Add `connect::ssl::native_tls` module for native tls support. [#295]
* Rename `accept::{nativetls => native_tls}`. [#295]
* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use
* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use
`connect::ConnectService` instead and call `Connection<T, TcpStream>::into_parts`. [#299]
[#295]: https://github.com/actix/actix-net/pull/295

View File

@ -1,12 +1,10 @@
[package]
name = "actix-tls"
version = "3.0.0-beta.5"
version = "3.0.0-beta.8"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "TLS acceptor and connector services for Actix ecosystem"
keywords = ["network", "tls", "ssl", "async", "transport"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-tls"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"
@ -40,7 +38,7 @@ native-tls = ["tokio-native-tls"]
uri = ["http"]
[dependencies]
actix-codec = "0.4.0-beta.1"
actix-codec = "0.4.0"
actix-rt = { version = "2.2.0", default-features = false }
actix-service = "2.0.0"
actix-utils = "3.0.0"
@ -57,19 +55,20 @@ tls-openssl = { package = "openssl", version = "0.10.9", optional = true }
tokio-openssl = { version = "0.6", optional = true }
# rustls
tokio-rustls = { version = "0.22", optional = true }
webpki-roots = { version = "0.21", optional = true }
tokio-rustls = { version = "0.23", optional = true }
webpki-roots = { version = "0.22", optional = true }
# native-tls
tokio-native-tls = { version = "0.3", optional = true }
[dev-dependencies]
actix-rt = "2.2.0"
actix-server = "2.0.0-beta.6"
actix-server = "2.0.0-beta.9"
bytes = "1"
env_logger = "0.8"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
log = "0.4"
rustls-pemfile = "0.2.1"
trust-dns-resolver = "0.20.0"
[[example]]

View File

@ -35,25 +35,29 @@ use actix_service::ServiceFactoryExt as _;
use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream};
use futures_util::future::ok;
use log::info;
use rustls::{
internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig,
};
use rustls::{server::ServerConfig, Certificate, PrivateKey};
use rustls_pemfile::{certs, rsa_private_keys};
#[actix_rt::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "info");
env_logger::init();
let mut tls_config = ServerConfig::new(NoClientAuth::new());
// Load TLS key and cert files
let cert_file = &mut BufReader::new(File::open("./examples/cert.pem").unwrap());
let key_file = &mut BufReader::new(File::open("./examples/key.pem").unwrap());
let cert_chain = certs(cert_file).unwrap();
let cert_chain = certs(cert_file)
.unwrap()
.into_iter()
.map(Certificate)
.collect();
let mut keys = rsa_private_keys(key_file).unwrap();
tls_config
.set_single_cert(cert_chain, keys.remove(0))
let tls_config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(cert_chain, PrivateKey(keys.remove(0)))
.unwrap();
let tls_acceptor = RustlsAcceptor::new(tls_config);

View File

@ -19,7 +19,7 @@ use futures_core::future::LocalBoxFuture;
use pin_project_lite::pin_project;
use tokio_rustls::{Accept, TlsAcceptor};
pub use tokio_rustls::rustls::{ServerConfig, Session};
pub use tokio_rustls::rustls::ServerConfig;
use super::MAX_CONN_COUNTER;

View File

@ -63,16 +63,16 @@ impl From<Option<SocketAddr>> for ConnectAddrs {
/// Connection info.
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct Connect<T> {
pub(crate) req: T,
pub struct Connect<R> {
pub(crate) req: R,
pub(crate) port: u16,
pub(crate) addr: ConnectAddrs,
pub(crate) local_addr: Option<IpAddr>,
}
impl<T: Address> Connect<T> {
impl<R: Address> Connect<R> {
/// Create `Connect` instance by splitting the string by ':' and convert the second part to u16
pub fn new(req: T) -> Connect<T> {
pub fn new(req: R) -> Connect<R> {
let (_, port) = parse_host(req.hostname());
Connect {
@ -85,7 +85,7 @@ impl<T: Address> Connect<T> {
/// Create new `Connect` instance from host and address. Connector skips name resolution stage
/// for such connect messages.
pub fn with_addr(req: T, addr: SocketAddr) -> Connect<T> {
pub fn with_addr(req: R, addr: SocketAddr) -> Connect<R> {
Connect {
req,
port: 0,
@ -155,15 +155,20 @@ impl<T: Address> Connect<T> {
ConnectAddrs::Multi(addrs) => ConnectAddrsIter::MultiOwned(addrs.into_iter()),
}
}
/// Returns a reference to the connection request.
pub fn request(&self) -> &R {
&self.req
}
}
impl<T: Address> From<T> for Connect<T> {
fn from(addr: T) -> Self {
impl<R: Address> From<R> for Connect<R> {
fn from(addr: R) -> Self {
Connect::new(addr)
}
}
impl<T: Address> fmt::Display for Connect<T> {
impl<R: Address> fmt::Display for Connect<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:{}", self.hostname(), self.port())
}
@ -347,4 +352,10 @@ mod tests {
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
)
}
#[test]
fn request_ref() {
let conn = Connect::new("hello");
assert_eq!(conn.request(), &"hello")
}
}

View File

@ -21,7 +21,9 @@ mod connector;
mod error;
mod resolve;
mod service;
pub mod ssl;
pub mod tls;
#[doc(hidden)]
pub use tls as ssl;
#[cfg(feature = "uri")]
mod uri;

View File

@ -1,4 +1,4 @@
//! SSL Services
//! TLS Services
#[cfg(feature = "openssl")]
pub mod openssl;

View File

@ -1,4 +1,5 @@
use std::{
convert::TryFrom,
future::Future,
io,
pin::Pin,
@ -6,7 +7,6 @@ use std::{
task::{Context, Poll},
};
pub use tokio_rustls::rustls::Session;
pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig};
pub use webpki_roots::TLS_SERVER_ROOTS;
@ -14,11 +14,26 @@ use actix_rt::net::ActixStream;
use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready};
use log::trace;
use tokio_rustls::webpki::DNSNameRef;
use tokio_rustls::rustls::{client::ServerName, OwnedTrustAnchor, RootCertStore};
use tokio_rustls::{Connect, TlsConnector};
use crate::connect::{Address, Connection};
/// Returns standard root certificates from `webpki-roots` crate as a rustls certificate store.
pub fn webpki_roots_cert_store() -> RootCertStore {
let mut root_certs = RootCertStore::empty();
for cert in TLS_SERVER_ROOTS.0 {
let cert = OwnedTrustAnchor::from_subject_spki_name_constraints(
cert.subject,
cert.spki,
cert.name_constraints,
);
let certs = vec![cert].into_iter();
root_certs.add_server_trust_anchors(certs);
}
root_certs
}
/// Rustls connector factory
pub struct RustlsConnector {
connector: Arc<ClientConfig>,
@ -89,7 +104,7 @@ where
trace!("SSL Handshake start for: {:?}", connection.host());
let (stream, connection) = connection.replace_io(());
match DNSNameRef::try_from_ascii_str(connection.host()) {
match ServerName::try_from(connection.host()) {
Ok(host) => RustlsConnectorServiceFuture::Future {
connect: TlsConnector::from(self.connector.clone()).connect(host, stream),
connection: Some(connection),

View File

@ -23,3 +23,4 @@ local-waker = "0.1"
[dev-dependencies]
actix-rt = "2.0.0"
futures-util = { version = "0.3.7", default-features = false }
static_assertions = "1.1"

View File

@ -103,10 +103,16 @@ pub fn err<T, E>(err: E) -> Ready<Result<T, E>> {
#[cfg(test)]
mod tests {
use std::rc::Rc;
use futures_util::task::noop_waker;
use static_assertions::{assert_impl_all, assert_not_impl_all};
use super::*;
assert_impl_all!(Ready<()>: Send, Sync, Clone);
assert_not_impl_all!(Ready<Rc<()>>: Send, Sync);
#[test]
#[should_panic]
fn multiple_poll_panics() {

1
clippy.toml Normal file
View File

@ -0,0 +1 @@
msrv = "1.48"