diff --git a/.cargo/config.toml b/.cargo/config.toml index 03a995c7..1c282baf 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,22 +1,26 @@ [alias] -lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo" +lint = "clippy --workspace --tests --examples --bins -- -Dclippy::todo" +lint-all = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo" ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture" # just check the library (without dev deps) ci-check-min = "hack --workspace check --no-default-features" -ci-check-lib = "hack --workspace --feature-powerset --exclude-features io-uring check" +ci-check-lib = "hack --workspace --feature-powerset --exclude-features=io-uring check" ci-check-lib-linux = "hack --workspace --feature-powerset check" # check everything -ci-check = "hack --workspace --feature-powerset --exclude-features io-uring check --tests --examples" +ci-check = "hack --workspace --feature-powerset --exclude-features=io-uring check --tests --examples" ci-check-linux = "hack --workspace --feature-powerset check --tests --examples" # tests avoiding io-uring feature ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture" -ci-test-rt = " hack --feature-powerset --exclude-features io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture" -ci-test-server = "hack --feature-powerset --exclude-features io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture" +ci-test-rt = " hack --feature-powerset --exclude-features=io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture" +ci-test-server = "hack --feature-powerset --exclude-features=io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture" # test with io-uring feature ci-test-rt-linux = " hack --feature-powerset test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture" ci-test-server-linux = "hack --feature-powerset test --package=actix-server --lib --tests --no-fail-fast -- --nocapture" + +# test lower msrv +ci-test-lower-msrv = "hack --workspace --exclude=actix-server --exclude=actix-tls --feature-powerset test --lib --tests --no-fail-fast -- --nocapture" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 45841fb8..0533b8af 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -18,7 +18,7 @@ jobs: - { name: Windows (MinGW), os: windows-latest, triple: x86_64-pc-windows-gnu } - { name: Windows (32-bit), os: windows-latest, triple: i686-pc-windows-msvc } version: - - 1.46.0 # MSRV + - 1.52.0 # MSRV for -server and -tls - stable - nightly @@ -64,8 +64,7 @@ jobs: # - name: Generate Cargo.lock # uses: actions-rs/cargo@v1 - # with: - # command: generate-lockfile + # with: { command: generate-lockfile } # - name: Cache Dependencies # uses: Swatinem/rust-cache@v1.2.0 @@ -113,30 +112,69 @@ jobs: - name: tests if: matrix.target.os == 'ubuntu-latest' run: | - cargo ci-test - cargo ci-test-rt-linux - cargo ci-test-server-linux - - - name: Generate coverage file - if: > - matrix.target.os == 'ubuntu-latest' - && matrix.version == 'stable' - && github.ref == 'refs/heads/master' - run: | - cargo install cargo-tarpaulin - cargo tarpaulin --out Xml --verbose - - name: Upload to Codecov - if: > - matrix.target.os == 'ubuntu-latest' - && matrix.version == 'stable' - && github.ref == 'refs/heads/master' - uses: codecov/codecov-action@v1 - with: { file: cobertura.xml } + sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-rt-linux && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-server-linux" - name: Clear the cargo caches run: | cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean cargo-cache + + build_and_test_lower_msrv: + name: Linux / 1.46 (lower MSRV) + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Install 1.46.0 # MSRV for all but -server and -tls + uses: actions-rs/toolchain@v1 + with: + toolchain: 1.46.0-x86_64-unknown-linux-gnu + profile: minimal + override: true + + - name: Install cargo-hack + uses: actions-rs/cargo@v1 + with: + command: install + args: cargo-hack + + - name: tests + run: | + sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=1.46 cargo ci-test-lower-msrv" + + - name: Clear the cargo caches + run: | + cargo install cargo-cache --version 0.6.2 --no-default-features --features ci-autoclean + cargo-cache + + coverage: + name: coverage + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Install Rust (nightly) + uses: actions-rs/toolchain@v1 + with: + toolchain: stable-x86_64-unknown-linux-gnu + profile: minimal + override: true + + - name: Generate Cargo.lock + uses: actions-rs/cargo@v1 + with: { command: generate-lockfile } + - name: Cache Dependencies + uses: Swatinem/rust-cache@v1.3.0 + + - name: Generate coverage file + if: github.ref == 'refs/heads/master' + run: | + cargo install cargo-tarpaulin + cargo tarpaulin --out Xml --verbose + - name: Upload to Codecov + if: github.ref == 'refs/heads/master' + uses: codecov/codecov-action@v1 + with: { file: cobertura.xml } rustdoc: name: rustdoc @@ -158,13 +196,6 @@ jobs: - name: Cache Dependencies uses: Swatinem/rust-cache@v1.3.0 - - name: Install cargo-hack - uses: actions-rs/cargo@v1 - with: - command: install - args: cargo-hack - - - name: doc tests - uses: actions-rs/cargo@v1 - timeout-minutes: 40 - with: { command: ci-doctest } + - name: doc tests io-uring + run: | + sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=nightly cargo ci-doctest" diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index fd893454..19e17927 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -3,6 +3,14 @@ ## Unreleased - 2021-xx-xx +## 0.4.1 - 2021-11-05 +* Added `LinesCodec.` [#338] +* `Framed::poll_ready` flushes when the buffer is full. [#409] + +[#338]: https://github.com/actix/actix-net/pull/338 +[#409]: https://github.com/actix/actix-net/pull/409 + + ## 0.4.0 - 2021-04-20 * No significant changes since v0.4.0-beta.1. diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 7bf1c941..b6a9d298 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "actix-codec" -version = "0.4.0" -authors = ["Nikolay Kim "] +version = "0.4.1" +authors = [ + "Nikolay Kim ", + "Rob Ede ", +] description = "Codec utilities for working with framed protocols" keywords = ["network", "framework", "async", "futures"] repository = "https://github.com/actix/actix-net" @@ -19,6 +22,15 @@ bytes = "1" futures-core = { version = "0.3.7", default-features = false } futures-sink = { version = "0.3.7", default-features = false } log = "0.4" +memchr = "2.3" pin-project-lite = "0.2" tokio = "1.5.1" tokio-util = { version = "0.6", features = ["codec", "io"] } + +[dev-dependencies] +criterion = { version = "0.3", features = ["html_reports"] } +tokio-test = "0.4.2" + +[[bench]] +name = "lines" +harness = false diff --git a/actix-codec/benches/lines.rs b/actix-codec/benches/lines.rs new file mode 100644 index 00000000..e32b8365 --- /dev/null +++ b/actix-codec/benches/lines.rs @@ -0,0 +1,57 @@ +use bytes::BytesMut; +use criterion::{criterion_group, criterion_main, Criterion}; + +const INPUT: &[u8] = include_bytes!("./lorem.txt"); + +fn bench_lines_codec(c: &mut Criterion) { + let mut decode_group = c.benchmark_group("lines decode"); + + decode_group.bench_function("actix", |b| { + b.iter(|| { + use actix_codec::Decoder as _; + + let mut codec = actix_codec::LinesCodec::default(); + let mut buf = BytesMut::from(INPUT); + while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {} + }); + }); + + decode_group.bench_function("tokio", |b| { + b.iter(|| { + use tokio_util::codec::Decoder as _; + + let mut codec = tokio_util::codec::LinesCodec::new(); + let mut buf = BytesMut::from(INPUT); + while let Ok(Some(_bytes)) = codec.decode_eof(&mut buf) {} + }); + }); + + decode_group.finish(); + + let mut encode_group = c.benchmark_group("lines encode"); + + encode_group.bench_function("actix", |b| { + b.iter(|| { + use actix_codec::Encoder as _; + + let mut codec = actix_codec::LinesCodec::default(); + let mut buf = BytesMut::new(); + codec.encode("123", &mut buf).unwrap(); + }); + }); + + encode_group.bench_function("tokio", |b| { + b.iter(|| { + use tokio_util::codec::Encoder as _; + + let mut codec = tokio_util::codec::LinesCodec::new(); + let mut buf = BytesMut::new(); + codec.encode("123", &mut buf).unwrap(); + }); + }); + + encode_group.finish(); +} + +criterion_group!(benches, bench_lines_codec); +criterion_main!(benches); diff --git a/actix-codec/benches/lorem.txt b/actix-codec/benches/lorem.txt new file mode 100644 index 00000000..108b3c46 --- /dev/null +++ b/actix-codec/benches/lorem.txt @@ -0,0 +1,5 @@ +Lorem ipsum dolor sit amet, consectetur adipiscing elit. In tortor quam, pulvinar sit amet vestibulum eget, tincidunt non urna. Sed eu sem in felis malesuada venenatis. Suspendisse volutpat aliquet nisi, in condimentum nibh convallis id. Quisque gravida felis scelerisque ipsum aliquam consequat. Praesent libero odio, malesuada vitae odio quis, aliquam aliquet enim. In fringilla ut turpis nec pharetra. Duis eu posuere metus. Sed a aliquet massa. Mauris non tempus mi, quis mattis libero. Vivamus ornare ex at semper cursus. Vestibulum sed facilisis erat, aliquet mollis est. In interdum, magna iaculis ultricies elementum, mi ante vestibulum mauris, nec viverra turpis lorem quis ante. Proin in auctor erat. Vivamus dictum congue massa, fermentum bibendum leo pretium quis. Integer dapibus sodales ligula, sit amet imperdiet felis suscipit eu. Phasellus non ornare enim. +Nam feugiat neque sit amet hendrerit rhoncus. Nunc suscipit molestie vehicula. Aenean vulputate porttitor augue, sit amet molestie dolor volutpat vitae. Nulla vitae condimentum eros. Aliquam tristique purus at metus lacinia egestas. Cras euismod lorem eu orci lobortis, sed tincidunt nisl laoreet. Ut suscipit fermentum mi, et euismod tortor. Pellentesque vitae tempor quam, sed dignissim mi. Suspendisse luctus lacus vitae ligula blandit vehicula. Quisque interdum iaculis tincidunt. Nunc elementum mi vitae tempor placerat. Suspendisse potenti. Donec blandit laoreet ipsum, quis rhoncus velit vulputate sed. +Aliquam suscipit lectus eros, at maximus dolor efficitur quis. Integer blandit tortor orci, nec mattis nunc eleifend ac. Mauris pharetra vel quam quis lacinia. Duis lobortis condimentum nunc ut facilisis. Praesent arcu nisi, porta sit amet viverra sit amet, pellentesque ut nisi. Nunc gravida tortor eu ligula tempus, in interdum magna pretium. Fusce eu ornare sapien. Nullam pellentesque cursus eros. Nam orci massa, faucibus eget leo eget, elementum vulputate erat. Fusce vehicula augue et dui hendrerit vulputate. Mauris neque lacus, porttitor ut condimentum id, efficitur ac neque. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia curae; Donec accumsan, lectus fermentum elementum tristique, ipsum tortor mollis ante, non lacinia nibh ex quis sapien. +Donec pharetra, elit eget rutrum luctus, urna ligula facilisis lorem, sit amet rhoncus ante est eu mi. Vestibulum vestibulum ultricies interdum. Nulla tincidunt ante non hendrerit venenatis. Curabitur vestibulum turpis erat, id efficitur quam venenatis eu. Fusce nulla sem, dapibus vel quam feugiat, ornare fermentum ligula. Praesent tempus tincidunt mauris, non pellentesque felis varius in. Aenean eu arcu ligula. Morbi dapibus maximus nulla a pharetra. Fusce leo metus, luctus ut cursus non, sollicitudin non lectus. Integer pellentesque eleifend erat, vel gravida purus tempus a. Mauris id vestibulum quam. Nunc vitae ullamcorper metus, pharetra placerat enim. Fusce in ultrices nisl. Curabitur justo mauris, dignissim in aliquam sit amet, sollicitudin ut risus. Cras tempor rutrum justo, non tincidunt est maximus at. +Aliquam ac velit tincidunt, ullamcorper velit sit amet, pulvinar nisi. Nullam rhoncus rhoncus egestas. Cras ac luctus nisi. Mauris sit amet risus at magna volutpat ultrices quis ac dui. Aliquam condimentum tellus purus, vel sagittis odio vulputate at. Sed ut finibus tellus. Aliquam tincidunt vehicula diam. diff --git a/actix-codec/src/bcodec.rs b/actix-codec/src/bcodec.rs index b06279ea..ca015b33 100644 --- a/actix-codec/src/bcodec.rs +++ b/actix-codec/src/bcodec.rs @@ -1,11 +1,10 @@ -use bytes::{Buf, Bytes, BytesMut}; use std::io; +use bytes::{Buf, Bytes, BytesMut}; + use super::{Decoder, Encoder}; -/// Bytes codec. -/// -/// Reads/Writes chunks of bytes from a stream. +/// Bytes codec. Reads/writes chunks of bytes from a stream. #[derive(Debug, Copy, Clone)] pub struct BytesCodec; diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index a3a13f4d..04b8a8f5 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -178,7 +178,7 @@ impl Framed { U: Decoder, { loop { - let mut this = self.as_mut().project(); + let this = self.as_mut().project(); // Repeatedly call `decode` or `decode_eof` as long as it is "readable". Readable is // defined as not having returned `None`. If the upstream has returned EOF, and the // decoder is no longer readable, it can be assumed that the decoder will never become @@ -186,7 +186,7 @@ impl Framed { if this.flags.contains(Flags::READABLE) { if this.flags.contains(Flags::EOF) { - match this.codec.decode_eof(&mut this.read_buf) { + match this.codec.decode_eof(this.read_buf) { Ok(Some(frame)) => return Poll::Ready(Some(Ok(frame))), Ok(None) => return Poll::Ready(None), Err(e) => return Poll::Ready(Some(Err(e))), @@ -195,7 +195,7 @@ impl Framed { log::trace!("attempting to decode a frame"); - match this.codec.decode(&mut this.read_buf) { + match this.codec.decode(this.read_buf) { Ok(Some(frame)) => { log::trace!("frame decoded from buffer"); return Poll::Ready(Some(Ok(frame))); @@ -300,11 +300,11 @@ where { type Error = U::Error; - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.is_write_ready() { Poll::Ready(Ok(())) } else { - Poll::Pending + self.flush(cx) } } diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index c7713bfe..5842fa7b 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -14,9 +14,11 @@ mod bcodec; mod framed; +mod lines; pub use self::bcodec::BytesCodec; pub use self::framed::{Framed, FramedParts}; +pub use self::lines::LinesCodec; pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; pub use tokio_util::codec::{Decoder, Encoder}; diff --git a/actix-codec/src/lines.rs b/actix-codec/src/lines.rs new file mode 100644 index 00000000..af399e8f --- /dev/null +++ b/actix-codec/src/lines.rs @@ -0,0 +1,158 @@ +use std::io; + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use memchr::memchr; + +use super::{Decoder, Encoder}; + +/// Lines codec. Reads/writes line delimited strings. +/// +/// Will split input up by LF or CRLF delimiters. I.e. carriage return characters at the end of +/// lines are not preserved. +#[derive(Debug, Copy, Clone, Default)] +#[non_exhaustive] +pub struct LinesCodec; + +impl> Encoder for LinesCodec { + type Error = io::Error; + + #[inline] + fn encode(&mut self, item: T, dst: &mut BytesMut) -> Result<(), Self::Error> { + let item = item.as_ref(); + dst.reserve(item.len() + 1); + dst.put_slice(item.as_bytes()); + dst.put_u8(b'\n'); + Ok(()) + } +} + +impl Decoder for LinesCodec { + type Item = String; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if src.is_empty() { + return Ok(None); + } + + let len = match memchr(b'\n', src) { + Some(n) => n, + None => { + return Ok(None); + } + }; + + // split up to new line char + let mut buf = src.split_to(len); + debug_assert_eq!(len, buf.len()); + + // remove new line char from source + src.advance(1); + + match buf.last() { + // remove carriage returns at the end of buf + Some(b'\r') => buf.truncate(len - 1), + + // line is empty + None => return Ok(Some(String::new())), + + _ => {} + } + + try_into_utf8(buf.freeze()) + } + + fn decode_eof(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self.decode(src)? { + Some(frame) => Ok(Some(frame)), + None if src.is_empty() => Ok(None), + None => { + let buf = match src.last() { + // if last line ends in a CR then take everything up to it + Some(b'\r') => src.split_to(src.len() - 1), + + // take all bytes from source + _ => src.split(), + }; + + if buf.is_empty() { + return Ok(None); + } + + try_into_utf8(buf.freeze()) + } + } + } +} + +// Attempts to convert bytes into a `String`. +fn try_into_utf8(buf: Bytes) -> io::Result> { + String::from_utf8(buf.to_vec()) + .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err)) + .map(Some) +} + +#[cfg(test)] +mod tests { + use bytes::BufMut as _; + + use super::*; + + #[test] + fn lines_decoder() { + let mut codec = LinesCodec::default(); + let mut buf = BytesMut::from("\nline 1\nline 2\r\nline 3\n\r\n\r"); + + assert_eq!("", codec.decode(&mut buf).unwrap().unwrap()); + assert_eq!("line 1", codec.decode(&mut buf).unwrap().unwrap()); + assert_eq!("line 2", codec.decode(&mut buf).unwrap().unwrap()); + assert_eq!("line 3", codec.decode(&mut buf).unwrap().unwrap()); + assert_eq!("", codec.decode(&mut buf).unwrap().unwrap()); + assert!(codec.decode(&mut buf).unwrap().is_none()); + assert!(codec.decode_eof(&mut buf).unwrap().is_none()); + + buf.put_slice(b"k"); + assert!(codec.decode(&mut buf).unwrap().is_none()); + assert_eq!("\rk", codec.decode_eof(&mut buf).unwrap().unwrap()); + + assert!(codec.decode(&mut buf).unwrap().is_none()); + assert!(codec.decode_eof(&mut buf).unwrap().is_none()); + } + + #[test] + fn lines_encoder() { + let mut codec = LinesCodec::default(); + + let mut buf = BytesMut::new(); + + codec.encode("", &mut buf).unwrap(); + assert_eq!(&buf[..], b"\n"); + + codec.encode("test", &mut buf).unwrap(); + assert_eq!(&buf[..], b"\ntest\n"); + + codec.encode("a\nb", &mut buf).unwrap(); + assert_eq!(&buf[..], b"\ntest\na\nb\n"); + } + + #[test] + fn lines_encoder_no_overflow() { + let mut codec = LinesCodec::default(); + + let mut buf = BytesMut::new(); + codec.encode("1234567", &mut buf).unwrap(); + assert_eq!(&buf[..], b"1234567\n"); + + let mut buf = BytesMut::new(); + codec.encode("12345678", &mut buf).unwrap(); + assert_eq!(&buf[..], b"12345678\n"); + + let mut buf = BytesMut::new(); + codec.encode("123456789111213", &mut buf).unwrap(); + assert_eq!(&buf[..], b"123456789111213\n"); + + let mut buf = BytesMut::new(); + codec.encode("1234567891112131", &mut buf).unwrap(); + assert_eq!(&buf[..], b"1234567891112131\n"); + } +} diff --git a/actix-codec/tests/test_framed_sink.rs b/actix-codec/tests/test_framed_sink.rs new file mode 100644 index 00000000..aca4760d --- /dev/null +++ b/actix-codec/tests/test_framed_sink.rs @@ -0,0 +1,221 @@ +use actix_codec::*; +use bytes::Buf; +use bytes::{BufMut, BytesMut}; +use futures_sink::Sink; +use std::collections::VecDeque; +use std::io::{self, Write}; +use std::pin::Pin; +use std::task::Poll::{Pending, Ready}; +use std::task::{Context, Poll}; +use tokio_test::{assert_ready, task}; + +macro_rules! bilateral { + ($($x:expr,)*) => {{ + let mut v = VecDeque::new(); + v.extend(vec![$($x),*]); + Bilateral { calls: v } + }}; +} + +macro_rules! assert_ready { + ($e:expr) => {{ + use core::task::Poll::*; + match $e { + Ready(v) => v, + Pending => panic!("pending"), + } + }}; + ($e:expr, $($msg:tt),+) => {{ + use core::task::Poll::*; + match $e { + Ready(v) => v, + Pending => { + let msg = format_args!($($msg),+); + panic!("pending; {}", msg) + } + } + }}; +} + +#[derive(Debug)] +pub struct Bilateral { + pub calls: VecDeque>>, +} + +impl Write for Bilateral { + fn write(&mut self, src: &[u8]) -> io::Result { + match self.calls.pop_front() { + Some(Ok(data)) => { + assert!(src.len() >= data.len()); + assert_eq!(&data[..], &src[..data.len()]); + Ok(data.len()) + } + Some(Err(e)) => Err(e), + None => panic!("unexpected write; {:?}", src), + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl AsyncWrite for Bilateral { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + match Pin::get_mut(self).write(buf) { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Pending, + other => Ready(other), + } + } + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + match Pin::get_mut(self).flush() { + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => Pending, + other => Ready(other), + } + } + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + unimplemented!() + } +} + +impl AsyncRead for Bilateral { + fn poll_read( + mut self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + use io::ErrorKind::WouldBlock; + + match self.calls.pop_front() { + Some(Ok(data)) => { + debug_assert!(buf.remaining() >= data.len()); + buf.put_slice(&data); + Ready(Ok(())) + } + Some(Err(ref e)) if e.kind() == WouldBlock => Pending, + Some(Err(e)) => Ready(Err(e)), + None => Ready(Ok(())), + } + } +} + +pub struct U32; + +impl Encoder for U32 { + type Error = io::Error; + + fn encode(&mut self, item: u32, dst: &mut BytesMut) -> io::Result<()> { + // Reserve space + dst.reserve(4); + dst.put_u32(item); + Ok(()) + } +} + +impl Decoder for U32 { + type Item = u32; + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { + if buf.len() < 4 { + return Ok(None); + } + + let n = buf.split_to(4).get_u32(); + Ok(Some(n)) + } +} + +#[test] +fn test_write_hits_highwater_mark() { + // see here for what this test is based on: + // https://github.com/tokio-rs/tokio/blob/75c07770bfbfea4e5fd914af819c741ed9c3fc36/tokio-util/tests/framed_write.rs#L69 + + const ITER: usize = 2 * 1024; + + let mut bi = bilateral! { + Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready")), + Ok(b"".to_vec()), + }; + + for i in 0..=ITER { + let mut b = BytesMut::with_capacity(4); + b.put_u32(i as u32); + + // Append to the end + match bi.calls.back_mut().unwrap() { + Ok(ref mut data) => { + // Write in 2kb chunks + if data.len() < ITER { + data.extend_from_slice(&b[..]); + continue; + } // else fall through and create a new buffer + } + _ => unreachable!(), + } + + // Push a new new chunk + bi.calls.push_back(Ok(b[..].to_vec())); + } + + assert_eq!(bi.calls.len(), 6); + let mut framed = Framed::new(bi, U32); + // Send 8KB. This fills up FramedWrite2 buffer + let mut task = task::spawn(()); + task.enter(|cx, _| { + // Send 8KB. This fills up Framed buffer + for i in 0..ITER { + { + #[allow(unused_mut)] + let mut framed = Pin::new(&mut framed); + assert!(assert_ready!(framed.poll_ready(cx)).is_ok()); + } + + #[allow(unused_mut)] + let mut framed = Pin::new(&mut framed); + // write the buffer + assert!(framed.start_send(i as u32).is_ok()); + } + + { + #[allow(unused_mut)] + let mut framed = Pin::new(&mut framed); + + // Now we poll_ready which forces a flush. The bilateral pops the front message + // and decides to block. + assert!(framed.poll_ready(cx).is_pending()); + } + + { + #[allow(unused_mut)] + let mut framed = Pin::new(&mut framed); + // We poll again, forcing another flush, which this time succeeds + // The whole 8KB buffer is flushed + assert!(assert_ready!(framed.poll_ready(cx)).is_ok()); + } + + { + #[allow(unused_mut)] + let mut framed = Pin::new(&mut framed); + // Send more data. This matches the final message expected by the bilateral + assert!(framed.start_send(ITER as u32).is_ok()); + } + + { + #[allow(unused_mut)] + let mut framed = Pin::new(&mut framed); + // Flush the rest of the buffer + assert!(assert_ready!(framed.poll_flush(cx)).is_ok()); + } + + // Ensure the mock is empty + assert_eq!(0, Pin::new(&framed).get_ref().io_ref().calls.len()); + }); +} diff --git a/actix-macros/CHANGES.md b/actix-macros/CHANGES.md index 0509eb35..8cb03dd0 100644 --- a/actix-macros/CHANGES.md +++ b/actix-macros/CHANGES.md @@ -3,6 +3,12 @@ ## Unreleased - 2021-xx-xx +## 0.2.3 - 2021-10-19 +* Fix test macro in presence of other imports named "test". [#399] + +[#399]: https://github.com/actix/actix-net/pull/399 + + ## 0.2.2 - 2021-10-14 * Improve error recovery potential when macro input is invalid. [#391] * Allow custom `System`s on test macro. [#391] diff --git a/actix-macros/Cargo.toml b/actix-macros/Cargo.toml index ea1b51d0..75ac8b72 100644 --- a/actix-macros/Cargo.toml +++ b/actix-macros/Cargo.toml @@ -1,9 +1,10 @@ [package] name = "actix-macros" -version = "0.2.2" +version = "0.2.3" authors = [ "Nikolay Kim ", "Ibraheem Ahmed ", + "Rob Ede ", ] description = "Macros for Actix system and runtime" repository = "https://github.com/actix/actix-net.git" @@ -22,4 +23,5 @@ syn = { version = "^1", features = ["full"] } actix-rt = "2.0.0" futures-util = { version = "0.3.7", default-features = false } +rustversion = "1" trybuild = "1" diff --git a/actix-macros/src/lib.rs b/actix-macros/src/lib.rs index 4be79178..be69b7b0 100644 --- a/actix-macros/src/lib.rs +++ b/actix-macros/src/lib.rs @@ -139,9 +139,9 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream { sig.asyncness = None; let missing_test_attr = if has_test_attr { - quote!() + quote! {} } else { - quote!(#[test]) + quote! { #[::core::prelude::v1::test] } }; let mut system = syn::parse_str::("::actix_rt::System").unwrap(); @@ -198,5 +198,5 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream { fn input_and_compile_error(mut item: TokenStream, err: syn::Error) -> TokenStream { let compile_err = TokenStream::from(err.to_compile_error()); item.extend(compile_err); - return item; + item } diff --git a/actix-macros/tests/trybuild.rs b/actix-macros/tests/trybuild.rs index c7f4a5ca..2af99636 100644 --- a/actix-macros/tests/trybuild.rs +++ b/actix-macros/tests/trybuild.rs @@ -1,3 +1,4 @@ +#[rustversion::stable(1.46)] // MSRV #[test] fn compile_macros() { let t = trybuild::TestCases::new(); diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 642cf27a..bee05bdf 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,6 +1,16 @@ # Changes ## Unreleased - 2021-xx-xx +* Add `System::run_with_code` to allow retrieving the exit code on stop. [#411] + +[#411]: https://github.com/actix/actix-net/pull/411 + +## 2.4.0 - 2021-11-05 +* Add `Arbiter::try_current` for situations where thread may or may not have Arbiter context. [#408] +* Start io-uring with `System::new` when feature is enabled. [#395] + +[#395]: https://github.com/actix/actix-net/pull/395 +[#408]: https://github.com/actix/actix-net/pull/408 ## 2.3.0 - 2021-10-11 @@ -100,7 +110,7 @@ [#129]: https://github.com/actix/actix-net/issues/129 -## 1.1.0 - 2020-04-08 (YANKED) +## 1.1.0 - 2020-04-08 _(YANKED)_ * Expose `System::is_set` to check if current system has ben started [#99] * Add `Arbiter::is_running` to check if event loop is running [#124] * Add `Arbiter::local_join` associated function diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 942d54aa..2eccf329 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-rt" -version = "2.3.0" +version = "2.4.0" authors = [ "Nikolay Kim ", "Rob Ede ", @@ -23,7 +23,7 @@ macros = ["actix-macros"] io-uring = ["tokio-uring"] [dependencies] -actix-macros = { version = "0.2.0", optional = true } +actix-macros = { version = "0.2.3", optional = true } futures-core = { version = "0.3", default-features = false } tokio = { version = "1.5.1", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] } diff --git a/actix-rt/README.md b/actix-rt/README.md index eb1d1b6f..b1bf2658 100644 --- a/actix-rt/README.md +++ b/actix-rt/README.md @@ -3,11 +3,11 @@ > Tokio-based single-threaded async runtime for the Actix ecosystem. [![crates.io](https://img.shields.io/crates/v/actix-rt?label=latest)](https://crates.io/crates/actix-rt) -[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.3.0)](https://docs.rs/actix-rt/2.3.0) +[![Documentation](https://docs.rs/actix-rt/badge.svg?version=2.4.0)](https://docs.rs/actix-rt/2.4.0) [![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html) ![MIT or Apache 2.0 licensed](https://img.shields.io/crates/l/actix-rt.svg)
-[![dependency status](https://deps.rs/crate/actix-rt/2.3.0/status.svg)](https://deps.rs/crate/actix-rt/2.3.0) +[![dependency status](https://deps.rs/crate/actix-rt/2.4.0/status.svg)](https://deps.rs/crate/actix-rt/2.4.0) ![Download](https://img.shields.io/crates/d/actix-rt.svg) [![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/WghFtEH6Hb) diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 97084f05..43b1bdc3 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -240,6 +240,15 @@ impl Arbiter { }) } + /// Try to get current running arbiter handle. + /// + /// Returns `None` if no Arbiter has been started. + /// + /// Unlike [`current`](Self::current), this never panics. + pub fn try_current() -> Option { + HANDLE.with(|cell| cell.borrow().clone()) + } + /// Stop Arbiter from continuing it's event loop. /// /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index e078dd06..590335c1 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -15,7 +15,7 @@ //! blocking task thread-pool using [`task::spawn_blocking`]. //! //! # Examples -//! ``` +//! ```no_run //! use std::sync::mpsc; //! use actix_rt::{Arbiter, System}; //! diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 4f262ede..395c4f96 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -11,7 +11,7 @@ use std::{ use futures_core::ready; use tokio::sync::{mpsc, oneshot}; -use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime}; +use crate::{arbiter::ArbiterHandle, Arbiter}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -29,6 +29,7 @@ pub struct System { arbiter_handle: ArbiterHandle, } +#[cfg(not(feature = "io-uring"))] impl System { /// Create a new system. /// @@ -37,7 +38,7 @@ impl System { #[allow(clippy::new_ret_no_self)] pub fn new() -> SystemRunner { Self::with_tokio_rt(|| { - default_tokio_runtime() + crate::runtime::default_tokio_runtime() .expect("Default Actix (Tokio) runtime could not be created.") }) } @@ -53,7 +54,7 @@ impl System { let (stop_tx, stop_rx) = oneshot::channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel(); - let rt = Runtime::from(runtime_factory()); + let rt = crate::runtime::Runtime::from(runtime_factory()); let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() }); let system = System::construct(sys_tx, sys_arbiter.clone()); @@ -72,7 +73,32 @@ impl System { system, } } +} +#[cfg(feature = "io-uring")] +impl System { + /// Create a new system. + /// + /// # Panics + /// Panics if underlying Tokio runtime can not be created. + #[allow(clippy::new_ret_no_self)] + pub fn new() -> SystemRunner { + SystemRunner + } + + /// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure. + /// + /// [tokio-runtime]: tokio::runtime::Runtime + #[doc(hidden)] + pub fn with_tokio_rt(_: F) -> SystemRunner + where + F: Fn() -> tokio::runtime::Runtime, + { + unimplemented!("System::with_tokio_rt is not implemented yet") + } +} + +impl System { /// Constructs new system and registers it on the current thread. pub(crate) fn construct( sys_tx: mpsc::UnboundedSender, @@ -104,7 +130,7 @@ impl System { /// /// Returns `None` if no System has been started. /// - /// Contrary to `current`, this never panics. + /// Unlike [`current`](Self::current), this never panics. pub fn try_current() -> Option { CURRENT.with(|cell| cell.borrow().clone()) } @@ -150,34 +176,38 @@ impl System { } /// Runner that keeps a [System]'s event loop alive until stop message is received. +#[cfg(not(feature = "io-uring"))] #[must_use = "A SystemRunner does nothing unless `run` is called."] #[derive(Debug)] pub struct SystemRunner { - rt: Runtime, + rt: crate::runtime::Runtime, stop_rx: oneshot::Receiver, + #[allow(dead_code)] system: System, } +#[cfg(not(feature = "io-uring"))] impl SystemRunner { /// Starts event loop and will return once [System] is [stopped](System::stop). pub fn run(self) -> io::Result<()> { + let exit_code = self.run_with_code()?; + + match exit_code { + 0 => Ok(()), + nonzero => Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", nonzero), + )), + } + } + + /// Runs the event loop until [stopped](System::stop_with_code), returning the exit code. + pub fn run_with_code(self) -> io::Result { let SystemRunner { rt, stop_rx, .. } = self; // run loop - match rt.block_on(stop_rx) { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - } + rt.block_on(stop_rx) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err)) } /// Runs the provided future, blocking the current thread until the future completes. @@ -187,6 +217,52 @@ impl SystemRunner { } } +/// Runner that keeps a [System]'s event loop alive until stop message is received. +#[cfg(feature = "io-uring")] +#[must_use = "A SystemRunner does nothing unless `run` is called."] +#[derive(Debug)] +pub struct SystemRunner; + +#[cfg(feature = "io-uring")] +impl SystemRunner { + /// Starts event loop and will return once [System] is [stopped](System::stop). + pub fn run(self) -> io::Result<()> { + unimplemented!("SystemRunner::run is not implemented for io-uring feature yet"); + } + + /// Runs the event loop until [stopped](System::stop_with_code), returning the exit code. + pub fn run_with_code(self) -> io::Result { + unimplemented!( + "SystemRunner::run_with_code is not implemented for io-uring feature yet" + ); + } + + /// Runs the provided future, blocking the current thread until the future completes. + #[inline] + pub fn block_on(&self, fut: F) -> F::Output { + tokio_uring::start(async move { + let (stop_tx, stop_rx) = oneshot::channel(); + let (sys_tx, sys_rx) = mpsc::unbounded_channel(); + + let sys_arbiter = Arbiter::in_new_system(); + let system = System::construct(sys_tx, sys_arbiter.clone()); + + system + .tx() + .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter)) + .unwrap(); + + // init background system arbiter + let sys_ctrl = SystemController::new(sys_rx, stop_tx); + tokio_uring::spawn(sys_ctrl); + + let res = fut.await; + drop(stop_rx); + res + }) + } +} + #[derive(Debug)] pub(crate) enum SystemCommand { Exit(i32), diff --git a/actix-rt/tests/test-macro-import-conflict.rs b/actix-rt/tests/test-macro-import-conflict.rs new file mode 100644 index 00000000..3760d34c --- /dev/null +++ b/actix-rt/tests/test-macro-import-conflict.rs @@ -0,0 +1,17 @@ +//! Checks that test macro does not cause problems in the presence of imports named "test" that +//! could be either a module with test items or the "test with runtime" macro itself. +//! +//! Before actix/actix-net#399 was implemented, this macro was running twice. The first run output +//! `#[test]` and it got run again and since it was in scope. +//! +//! Prevented by using the fully-qualified test marker (`#[::core::prelude::v1::test]`). + +#![cfg(feature = "macros")] + +use actix_rt::time as test; + +#[actix_rt::test] +async fn test_naming_conflict() { + use test as time; + time::sleep(std::time::Duration::from_millis(2)).await; +} diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 5fe1e894..551a395d 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -1,12 +1,15 @@ use std::{ future::Future, - sync::mpsc::channel, - thread, time::{Duration, Instant}, }; use actix_rt::{task::JoinError, Arbiter, System}; -use tokio::sync::oneshot; + +#[cfg(not(feature = "io-uring"))] +use { + std::{sync::mpsc::channel, thread}, + tokio::sync::oneshot, +}; #[test] fn await_for_timer() { @@ -21,6 +24,15 @@ fn await_for_timer() { ); } +#[cfg(not(feature = "io-uring"))] +#[test] +fn run_with_code() { + let sys = System::new(); + System::current().stop_with_code(42); + let exit_code = sys.run_with_code().expect("system stop should not error"); + assert_eq!(exit_code, 42); +} + #[test] fn join_another_arbiter() { let time = Duration::from_secs(1); @@ -96,13 +108,17 @@ fn wait_for_spawns() { let handle = rt.spawn(async { println!("running on the runtime"); - // assertion panic is caught at task boundary - assert_eq!(1, 2); + // panic is caught at task boundary + panic!("intentional test panic"); }); assert!(rt.block_on(handle).is_err()); } +// Temporary disabled tests for io-uring feature. +// They should be enabled when possible. + +#[cfg(not(feature = "io-uring"))] #[test] fn arbiter_spawn_fn_runs() { let _ = System::new(); @@ -119,6 +135,7 @@ fn arbiter_spawn_fn_runs() { arbiter.join().unwrap(); } +#[cfg(not(feature = "io-uring"))] #[test] fn arbiter_handle_spawn_fn_runs() { let sys = System::new(); @@ -141,6 +158,7 @@ fn arbiter_handle_spawn_fn_runs() { sys.run().unwrap(); } +#[cfg(not(feature = "io-uring"))] #[test] fn arbiter_drop_no_panic_fn() { let _ = System::new(); @@ -152,6 +170,7 @@ fn arbiter_drop_no_panic_fn() { arbiter.join().unwrap(); } +#[cfg(not(feature = "io-uring"))] #[test] fn arbiter_drop_no_panic_fut() { let _ = System::new(); @@ -163,18 +182,7 @@ fn arbiter_drop_no_panic_fut() { arbiter.join().unwrap(); } -#[test] -#[should_panic] -fn no_system_current_panic() { - System::current(); -} - -#[test] -#[should_panic] -fn no_system_arbiter_new_panic() { - Arbiter::new(); -} - +#[cfg(not(feature = "io-uring"))] #[test] fn system_arbiter_spawn() { let runner = System::new(); @@ -205,6 +213,7 @@ fn system_arbiter_spawn() { thread.join().unwrap(); } +#[cfg(not(feature = "io-uring"))] #[test] fn system_stop_stops_arbiters() { let sys = System::new(); @@ -293,6 +302,18 @@ fn new_arbiter_with_tokio() { assert!(!counter.load(Ordering::SeqCst)); } +#[test] +#[should_panic] +fn no_system_current_panic() { + System::current(); +} + +#[test] +#[should_panic] +fn no_system_arbiter_new_panic() { + Arbiter::new(); +} + #[test] fn try_current_no_system() { assert!(System::try_current().is_none()) @@ -330,28 +351,27 @@ fn spawn_local() { #[cfg(all(target_os = "linux", feature = "io-uring"))] #[test] fn tokio_uring_arbiter() { - let system = System::new(); - let (tx, rx) = std::sync::mpsc::channel(); + System::new().block_on(async { + let (tx, rx) = std::sync::mpsc::channel(); - Arbiter::new().spawn(async move { - let handle = actix_rt::spawn(async move { - let f = tokio_uring::fs::File::create("test.txt").await.unwrap(); - let buf = b"Hello World!"; + Arbiter::new().spawn(async move { + let handle = actix_rt::spawn(async move { + let f = tokio_uring::fs::File::create("test.txt").await.unwrap(); + let buf = b"Hello World!"; - let (res, _) = f.write_at(&buf[..], 0).await; - assert!(res.is_ok()); + let (res, _) = f.write_at(&buf[..], 0).await; + assert!(res.is_ok()); - f.sync_all().await.unwrap(); - f.close().await.unwrap(); + f.sync_all().await.unwrap(); + f.close().await.unwrap(); - std::fs::remove_file("test.txt").unwrap(); + std::fs::remove_file("test.txt").unwrap(); + }); + + handle.await.unwrap(); + tx.send(true).unwrap(); }); - handle.await.unwrap(); - tx.send(true).unwrap(); - }); - - assert!(rx.recv().unwrap()); - - drop(system); + assert!(rx.recv().unwrap()); + }) } diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 54096eca..51da40f2 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -3,6 +3,31 @@ ## Unreleased - 2021-xx-xx +## 2.0.0-beta.9 - 2021-11-15 +* Restore `Arbiter` support lost in `beta.8`. [#417] + +[#417]: https://github.com/actix/actix-net/pull/417 + + +## 2.0.0-beta.8 - 2021-11-05 _(YANKED)_ +* Fix non-unix signal handler. [#410] + +[#410]: https://github.com/actix/actix-net/pull/410 + + +## 2.0.0-beta.7 - 2021-11-05 _(YANKED)_ +* Server can be started in regular Tokio runtime. [#408] +* Expose new `Server` type whose `Future` impl resolves when server stops. [#408] +* Rename `Server` to `ServerHandle`. [#407] +* Add `Server::handle` to obtain handle to server. [#408] +* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#407] +* Deprecate crate-level `new` shortcut for server builder. [#408] +* Minimum supported Rust version (MSRV) is now 1.52. + +[#407]: https://github.com/actix/actix-net/pull/407 +[#408]: https://github.com/actix/actix-net/pull/408 + + ## 2.0.0-beta.6 - 2021-10-11 * Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374] * Server no long listens to `SIGHUP` signal. Previously, the received was not used but did block diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 8fd3112b..fecc20c0 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-server" -version = "2.0.0-beta.6" +version = "2.0.0-beta.9" authors = [ "Nikolay Kim ", "fakeshadow <24548779@qq.com>", @@ -18,24 +18,29 @@ path = "src/lib.rs" [features] default = [] -io-uring = ["actix-rt/io-uring"] +io-uring = ["tokio-uring", "actix-rt/io-uring"] [dependencies] -actix-rt = { version = "2.0.0", default-features = false } +actix-rt = { version = "2.4.0", default-features = false } actix-service = "2.0.0" actix-utils = "3.0.0" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } +futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } log = "0.4" -mio = { version = "0.7.6", features = ["os-poll", "net"] } +mio = { version = "0.8", features = ["os-poll", "net"] } num_cpus = "1.13" +socket2 = "0.4.2" tokio = { version = "1.5.1", features = ["sync"] } +# runtime for io-uring feature +tokio-uring = { version = "0.1", optional = true } + [dev-dependencies] -actix-codec = "0.4.0-beta.1" +actix-codec = "0.4.0" actix-rt = "2.0.0" bytes = "1" env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } -tokio = { version = "1.5.1", features = ["io-util"] } +tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 8b038da4..930ebf0a 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -10,7 +10,7 @@ //! the length of each line it echos and the total size of data sent when the connection is closed. use std::{ - env, io, + io, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -23,12 +23,10 @@ use actix_service::{fn_service, ServiceFactoryExt as _}; use bytes::BytesMut; use futures_util::future::ok; use log::{error, info}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; -#[actix_rt::main] -async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "info"); - env_logger::init(); +async fn run() -> io::Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); let count = Arc::new(AtomicUsize::new(0)); @@ -88,3 +86,16 @@ async fn main() -> io::Result<()> { .run() .await } + +#[tokio::main] +async fn main() -> io::Result<()> { + run().await?; + Ok(()) +} + +// alternatively: +// #[actix_rt::main] +// async fn main() -> io::Result<()> { +// run().await?; +// Ok(()) +// } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index d9451d37..cd37460b 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,17 +1,18 @@ -use std::time::Duration; -use std::{io, thread}; +use std::{io, thread, time::Duration}; -use actix_rt::{ - time::{sleep, Instant}, - System, -}; -use log::{error, info}; +use actix_rt::time::Instant; +use log::{debug, error, info}; use mio::{Interest, Poll, Token as MioToken}; -use crate::server::Server; -use crate::socket::MioListener; -use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; -use crate::worker::{Conn, WorkerHandleAccept}; +use crate::{ + availability::Availability, + socket::MioListener, + waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}, + worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer}, + ServerBuilder, ServerHandle, +}; + +const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510); struct ServerSocketInfo { token: usize, @@ -20,208 +21,113 @@ struct ServerSocketInfo { /// Timeout is used to mark the deadline when this socket's listener should be registered again /// after an error. - timeout: Option, -} - -/// Accept loop would live with `ServerBuilder`. -/// -/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to -/// `Accept` and `Worker`. -/// -/// It would also listen to `ServerCommand` and push interests to `WakerQueue`. -pub(crate) struct AcceptLoop { - srv: Option, - poll: Option, - waker: WakerQueue, -} - -impl AcceptLoop { - pub fn new(srv: Server) -> Self { - let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); - let waker = WakerQueue::new(poll.registry()) - .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); - - Self { - srv: Some(srv), - poll: Some(poll), - waker, - } - } - - pub(crate) fn waker_owned(&self) -> WakerQueue { - self.waker.clone() - } - - pub fn wake(&self, i: WakerInterest) { - self.waker.wake(i); - } - - pub(crate) fn start( - &mut self, - socks: Vec<(usize, MioListener)>, - handles: Vec, - ) { - let srv = self.srv.take().expect("Can not re-use AcceptInfo"); - let poll = self.poll.take().unwrap(); - let waker = self.waker.clone(); - - Accept::start(poll, waker, socks, srv, handles); - } + timeout: Option, } /// poll instance of the server. -struct Accept { +pub(crate) struct Accept { poll: Poll, - waker: WakerQueue, + waker_queue: WakerQueue, handles: Vec, - srv: Server, + srv: ServerHandle, next: usize, avail: Availability, + /// use the smallest duration from sockets timeout. + timeout: Option, paused: bool, } -/// Array of u128 with every bit as marker for a worker handle's availability. -struct Availability([u128; 4]); - -impl Default for Availability { - fn default() -> Self { - Self([0; 4]) - } -} - -impl Availability { - /// Check if any worker handle is available - #[inline(always)] - fn available(&self) -> bool { - self.0.iter().any(|a| *a != 0) - } - - /// Check if worker handle is available by index - #[inline(always)] - fn get_available(&self, idx: usize) -> bool { - let (offset, idx) = Self::offset(idx); - - self.0[offset] & (1 << idx as u128) != 0 - } - - /// Set worker handle available state by index. - fn set_available(&mut self, idx: usize, avail: bool) { - let (offset, idx) = Self::offset(idx); - - let off = 1 << idx as u128; - if avail { - self.0[offset] |= off; - } else { - self.0[offset] &= !off - } - } - - /// Set all worker handle to available state. - /// This would result in a re-check on all workers' availability. - fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { - handles.iter().for_each(|handle| { - self.set_available(handle.idx(), true); - }) - } - - /// Get offset and adjusted index of given worker handle index. - fn offset(idx: usize) -> (usize, usize) { - if idx < 128 { - (0, idx) - } else if idx < 128 * 2 { - (1, idx - 128) - } else if idx < 128 * 3 { - (2, idx - 128 * 2) - } else if idx < 128 * 4 { - (3, idx - 128 * 3) - } else { - panic!("Max WorkerHandle count is 512") - } - } -} - -/// This function defines errors that are per-connection. Which basically -/// means that if we get this error from `accept()` system call it means -/// next connection might be ready to be accepted. -/// -/// All other errors will incur a timeout before next `accept()` is performed. -/// The timeout is useful to handle resource exhaustion errors like ENFILE -/// and EMFILE. Otherwise, could enter into tight loop. -fn connection_error(e: &io::Error) -> bool { - e.kind() == io::ErrorKind::ConnectionRefused - || e.kind() == io::ErrorKind::ConnectionAborted - || e.kind() == io::ErrorKind::ConnectionReset -} - impl Accept { pub(crate) fn start( - poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, - srv: Server, - handles: Vec, - ) { - // Accept runs in its own thread and would want to spawn additional futures to current - // actix system. - let sys = System::current(); - thread::Builder::new() - .name("actix-server accept loop".to_owned()) - .spawn(move || { - System::set_current(sys); - let (mut accept, mut sockets) = - Accept::new_with_sockets(poll, waker, socks, handles, srv); + sockets: Vec<(usize, MioListener)>, + builder: &ServerBuilder, + ) -> io::Result<(WakerQueue, Vec)> { + let handle_server = ServerHandle::new(builder.cmd_tx.clone()); - accept.poll_with(&mut sockets); + // construct poll instance and its waker + let poll = Poll::new()?; + let waker_queue = WakerQueue::new(poll.registry())?; + + // start workers and collect handles + let (handles_accept, handles_server) = (0..builder.threads) + .map(|idx| { + // clone service factories + let factories = builder + .factories + .iter() + .map(|f| f.clone_factory()) + .collect::>(); + + // start worker using service factories + ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config) }) - .unwrap(); + .collect::>>()? + .into_iter() + .unzip(); + + let (mut accept, mut sockets) = Accept::new_with_sockets( + poll, + waker_queue.clone(), + sockets, + handles_accept, + handle_server, + )?; + + thread::Builder::new() + .name("actix-server acceptor".to_owned()) + .spawn(move || accept.poll_with(&mut sockets)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + Ok((waker_queue, handles_server)) } fn new_with_sockets( poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, - handles: Vec, - srv: Server, - ) -> (Accept, Vec) { - let sockets = socks + waker_queue: WakerQueue, + sockets: Vec<(usize, MioListener)>, + accept_handles: Vec, + server_handle: ServerHandle, + ) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> { + let sockets = sockets .into_iter() .map(|(token, mut lst)| { // Start listening for incoming connections poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); + .register(&mut lst, MioToken(token), Interest::READABLE)?; - ServerSocketInfo { + Ok(ServerSocketInfo { token, lst, timeout: None, - } + }) }) - .collect(); + .collect::>()?; let mut avail = Availability::default(); // Assume all handles are avail at construct time. - avail.set_available_all(&handles); + avail.set_available_all(&accept_handles); let accept = Accept { poll, - waker, - handles, - srv, + waker_queue, + handles: accept_handles, + srv: server_handle, next: 0, avail, + timeout: None, paused: false, }; - (accept, sockets) + Ok((accept, sockets)) } + /// blocking wait for readiness events triggered by mio fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { - let mut events = mio::Events::with_capacity(128); + let mut events = mio::Events::with_capacity(256); loop { - if let Err(e) = self.poll.poll(&mut events, None) { + if let Err(e) = self.poll.poll(&mut events, self.timeout) { match e.kind() { io::ErrorKind::Interrupted => {} _ => panic!("Poll error: {}", e), @@ -234,7 +140,7 @@ impl Accept { WAKER_TOKEN => { let exit = self.handle_waker(sockets); if exit { - info!("Accept is stopped."); + info!("Accept thread stopped"); return; } } @@ -244,6 +150,9 @@ impl Accept { } } } + + // check for timeout and re-register sockets + self.process_timeout(sockets); } } @@ -254,7 +163,7 @@ impl Accept { loop { // take guard with every iteration so no new interest can be added // until the current task is done. - let mut guard = self.waker.guard(); + let mut guard = self.waker_queue.guard(); match guard.pop_front() { // worker notify it becomes available. Some(WakerInterest::WorkerAvailable(idx)) => { @@ -266,6 +175,7 @@ impl Accept { self.accept_all(sockets); } } + // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { drop(guard); @@ -277,12 +187,7 @@ impl Accept { self.accept_all(sockets); } } - // got timer interest and it's time to try register socket(s) again - Some(WakerInterest::Timer) => { - drop(guard); - self.process_timer(sockets) - } Some(WakerInterest::Pause) => { drop(guard); @@ -292,6 +197,7 @@ impl Accept { self.deregister_all(sockets); } } + Some(WakerInterest::Resume) => { drop(guard); @@ -305,6 +211,7 @@ impl Accept { self.accept_all(sockets); } } + Some(WakerInterest::Stop) => { if !self.paused { self.deregister_all(sockets); @@ -312,6 +219,7 @@ impl Accept { return true; } + // waker queue is drained None => { // Reset the WakerQueue before break so it does not grow infinitely @@ -323,25 +231,44 @@ impl Accept { } } - fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { - let now = Instant::now(); - sockets - .iter_mut() - // Only sockets that had an associated timeout were deregistered. - .filter(|info| info.timeout.is_some()) - .for_each(|info| { - let inst = info.timeout.take().unwrap(); + fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) { + // always remove old timeouts + if self.timeout.take().is_some() { + let now = Instant::now(); - if now < inst { - info.timeout = Some(inst); - } else if !self.paused { - self.register_logged(info); + sockets + .iter_mut() + // Only sockets that had an associated timeout were deregistered. + .filter(|info| info.timeout.is_some()) + .for_each(|info| { + let inst = info.timeout.take().unwrap(); + + if now < inst { + // still timed out; try to set new timeout + info.timeout = Some(inst); + self.set_timeout(inst - now); + } else if !self.paused { + // timeout expired; register socket again + self.register_logged(info); + } + + // Drop the timeout if server is paused and socket timeout is expired. + // When server recovers from pause it will register all sockets without + // a timeout value so this socket register will be delayed till then. + }); + } + } + + /// Update accept timeout with `duration` if it is shorter than current timeout. + fn set_timeout(&mut self, duration: Duration) { + match self.timeout { + Some(ref mut timeout) => { + if *timeout > duration { + *timeout = duration; } - - // Drop the timeout if server is paused and socket timeout is expired. - // When server recovers from pause it will register all sockets without - // a timeout value so this socket register will be delayed till then. - }); + } + None => self.timeout = Some(duration), + } } #[cfg(not(target_os = "windows"))] @@ -370,14 +297,14 @@ impl Accept { fn register_logged(&self, info: &mut ServerSocketInfo) { match self.register(info) { - Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()), + Ok(_) => debug!("Resume accepting connections on {}", info.lst.local_addr()), Err(e) => error!("Can not register server socket {}", e), } } fn deregister_logged(&self, info: &mut ServerSocketInfo) { match self.poll.registry().deregister(&mut info.lst) { - Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()), + Ok(_) => debug!("Paused accepting connections on {}", info.lst.local_addr()), Err(e) => { error!("Can not deregister server socket {}", e) } @@ -387,12 +314,12 @@ impl Accept { fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { // This is a best effort implementation with following limitation: // - // Every ServerSocketInfo with associate timeout will be skipped and it's timeout - // is removed in the process. + // Every ServerSocketInfo with associated timeout will be skipped and it's timeout is + // removed in the process. // - // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short - // gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered - // before expected timing. + // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short gap + // (less than 500ms) would cause all timing out ServerSocketInfos be re-registered before + // expected timing. sockets .iter_mut() // Take all timeout. @@ -481,13 +408,7 @@ impl Accept { // the poll would need it mark which socket and when it's // listener should be registered info.timeout = Some(Instant::now() + Duration::from_millis(500)); - - // after the sleep a Timer interest is sent to Accept Poll - let waker = self.waker.clone(); - System::current().arbiter().spawn(async move { - sleep(Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); + self.set_timeout(TIMEOUT_DURATION_ON_ERROR); return; } @@ -526,67 +447,14 @@ impl Accept { } } -#[cfg(test)] -mod test { - use super::Availability; - - fn single(aval: &mut Availability, idx: usize) { - aval.set_available(idx, true); - assert!(aval.available()); - - aval.set_available(idx, true); - - aval.set_available(idx, false); - assert!(!aval.available()); - - aval.set_available(idx, false); - assert!(!aval.available()); - } - - fn multi(aval: &mut Availability, mut idx: Vec) { - idx.iter().for_each(|idx| aval.set_available(*idx, true)); - - assert!(aval.available()); - - while let Some(idx) = idx.pop() { - assert!(aval.available()); - aval.set_available(idx, false); - } - - assert!(!aval.available()); - } - - #[test] - fn availability() { - let mut aval = Availability::default(); - - single(&mut aval, 1); - single(&mut aval, 128); - single(&mut aval, 256); - single(&mut aval, 511); - - let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect(); - - multi(&mut aval, idx); - - multi(&mut aval, (0..511).collect()) - } - - #[test] - #[should_panic] - fn overflow() { - let mut aval = Availability::default(); - single(&mut aval, 512); - } - - #[test] - fn pin_point() { - let mut aval = Availability::default(); - - aval.set_available(438, true); - - aval.set_available(479, true); - - assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384)); - } +/// This function defines errors that are per-connection; if we get this error from the `accept()` +/// system call it means the next connection might be ready to be accepted. +/// +/// All other errors will incur a timeout before next `accept()` call is attempted. The timeout is +/// useful to handle resource exhaustion errors like `ENFILE` and `EMFILE`. Otherwise, it could +/// enter into a temporary spin loop. +fn connection_error(e: &io::Error) -> bool { + e.kind() == io::ErrorKind::ConnectionRefused + || e.kind() == io::ErrorKind::ConnectionAborted + || e.kind() == io::ErrorKind::ConnectionReset } diff --git a/actix-server/src/availability.rs b/actix-server/src/availability.rs new file mode 100644 index 00000000..801b08f2 --- /dev/null +++ b/actix-server/src/availability.rs @@ -0,0 +1,121 @@ +use crate::worker::WorkerHandleAccept; + +/// Array of u128 with every bit as marker for a worker handle's availability. +#[derive(Debug, Default)] +pub(crate) struct Availability([u128; 4]); + +impl Availability { + /// Check if any worker handle is available + #[inline(always)] + pub(crate) fn available(&self) -> bool { + self.0.iter().any(|a| *a != 0) + } + + /// Check if worker handle is available by index + #[inline(always)] + pub(crate) fn get_available(&self, idx: usize) -> bool { + let (offset, idx) = Self::offset(idx); + + self.0[offset] & (1 << idx as u128) != 0 + } + + /// Set worker handle available state by index. + pub(crate) fn set_available(&mut self, idx: usize, avail: bool) { + let (offset, idx) = Self::offset(idx); + + let off = 1 << idx as u128; + if avail { + self.0[offset] |= off; + } else { + self.0[offset] &= !off + } + } + + /// Set all worker handle to available state. + /// This would result in a re-check on all workers' availability. + pub(crate) fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { + handles.iter().for_each(|handle| { + self.set_available(handle.idx(), true); + }) + } + + /// Get offset and adjusted index of given worker handle index. + pub(crate) fn offset(idx: usize) -> (usize, usize) { + if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn single(aval: &mut Availability, idx: usize) { + aval.set_available(idx, true); + assert!(aval.available()); + + aval.set_available(idx, true); + + aval.set_available(idx, false); + assert!(!aval.available()); + + aval.set_available(idx, false); + assert!(!aval.available()); + } + + fn multi(aval: &mut Availability, mut idx: Vec) { + idx.iter().for_each(|idx| aval.set_available(*idx, true)); + + assert!(aval.available()); + + while let Some(idx) = idx.pop() { + assert!(aval.available()); + aval.set_available(idx, false); + } + + assert!(!aval.available()); + } + + #[test] + fn availability() { + let mut aval = Availability::default(); + + single(&mut aval, 1); + single(&mut aval, 128); + single(&mut aval, 256); + single(&mut aval, 511); + + let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect(); + + multi(&mut aval, idx); + + multi(&mut aval, (0..511).collect()) + } + + #[test] + #[should_panic] + fn overflow() { + let mut aval = Availability::default(); + single(&mut aval, 512); + } + + #[test] + fn pin_point() { + let mut aval = Availability::default(); + + aval.set_available(438, true); + + aval.set_available(479, true); + + assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384)); + } +} diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 871abb5b..4f4d6e26 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,43 +1,32 @@ -use std::{ - future::Future, - io, mem, - pin::Pin, - task::{Context, Poll}, - time::Duration, +use std::{io, time::Duration}; + +use actix_rt::net::TcpStream; +use log::{info, trace}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +use crate::{ + server::ServerCommand, + service::{InternalServiceFactory, ServiceFactory, StreamNewService}, + socket::{ + create_mio_tcp_listener, MioListener, MioTcpListener, StdSocketAddr, StdTcpListener, + ToSocketAddrs, + }, + worker::ServerWorkerConfig, + Server, }; -use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; -use log::{error, info}; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver}, - oneshot, -}; - -use crate::accept::AcceptLoop; -use crate::join_all; -use crate::server::{Server, ServerCommand}; -use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; -use crate::signals::{Signal, Signals}; -use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; - -/// Server builder +/// [Server] builder. pub struct ServerBuilder { - threads: usize, - token: usize, - backlog: u32, - handles: Vec<(usize, WorkerHandleServer)>, - services: Vec>, - sockets: Vec<(usize, String, MioListener)>, - accept: AcceptLoop, - exit: bool, - no_signals: bool, - cmd: UnboundedReceiver, - server: Server, - notify: Vec>, - worker_config: ServerWorkerConfig, + pub(crate) threads: usize, + pub(crate) token: usize, + pub(crate) backlog: u32, + pub(crate) factories: Vec>, + pub(crate) sockets: Vec<(usize, String, MioListener)>, + pub(crate) exit: bool, + pub(crate) listen_os_signals: bool, + pub(crate) cmd_tx: UnboundedSender, + pub(crate) cmd_rx: UnboundedReceiver, + pub(crate) worker_config: ServerWorkerConfig, } impl Default for ServerBuilder { @@ -49,30 +38,26 @@ impl Default for ServerBuilder { impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { - let (tx, rx) = unbounded_channel(); - let server = Server::new(tx); + let (cmd_tx, cmd_rx) = unbounded_channel(); ServerBuilder { threads: num_cpus::get(), token: 0, - handles: Vec::new(), - services: Vec::new(), + factories: Vec::new(), sockets: Vec::new(), - accept: AcceptLoop::new(server.clone()), backlog: 2048, exit: false, - no_signals: false, - cmd: rx, - notify: Vec::new(), - server, + listen_os_signals: true, + cmd_tx, + cmd_rx, worker_config: ServerWorkerConfig::default(), } } /// Set number of workers to start. /// - /// By default server uses number of available logical cpu as workers - /// count. Workers must be greater than 0. + /// By default server uses number of available logical CPU as workers count. Workers must be + /// greater than 0. pub fn workers(mut self, num: usize) -> Self { assert_ne!(num, 0, "workers must be greater than 0"); self.threads = num; @@ -99,10 +84,9 @@ impl ServerBuilder { /// Set the maximum number of pending connections. /// - /// This refers to the number of clients that can be waiting to be served. - /// Exceeding this number results in the client getting an error when - /// attempting to connect. It should only affect servers under significant - /// load. + /// This refers to the number of clients that can be waiting to be served. Exceeding this number + /// results in the client getting an error when attempting to connect. It should only affect + /// servers under significant load. /// /// Generally set in the 64-2048 range. Default value is 2048. /// @@ -114,24 +98,30 @@ impl ServerBuilder { /// Sets the maximum per-worker number of concurrent connections. /// - /// All socket listeners will stop accepting connections when this limit is - /// reached for each worker. + /// All socket listeners will stop accepting connections when this limit is reached for + /// each worker. /// /// By default max connections is set to a 25k per worker. - pub fn maxconn(mut self, num: usize) -> Self { + pub fn max_concurrent_connections(mut self, num: usize) -> Self { self.worker_config.max_concurrent_connections(num); self } - /// Stop Actix system. + #[doc(hidden)] + #[deprecated(since = "2.0.0", note = "Renamed to `max_concurrent_connections`.")] + pub fn maxconn(self, num: usize) -> Self { + self.max_concurrent_connections(num) + } + + /// Stop Actix `System` after server shutdown. pub fn system_exit(mut self) -> Self { self.exit = true; self } - /// Disable signal handling. + /// Disable OS signal handling. pub fn disable_signals(mut self) -> Self { - self.no_signals = true; + self.listen_os_signals = false; self } @@ -155,9 +145,11 @@ impl ServerBuilder { { let sockets = bind_addr(addr, self.backlog)?; + trace!("binding server to: {:?}", &sockets); + for lst in sockets { let token = self.next_token(); - self.services.push(StreamNewService::create( + self.factories.push(StreamNewService::create( name.as_ref().to_string(), token, factory.clone(), @@ -166,11 +158,57 @@ impl ServerBuilder { self.sockets .push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); } + Ok(self) } + /// Add new service to the server. + pub fn listen>( + mut self, + name: N, + lst: StdTcpListener, + factory: F, + ) -> io::Result + where + F: ServiceFactory, + { + lst.set_nonblocking(true)?; + let addr = lst.local_addr()?; + + let token = self.next_token(); + self.factories.push(StreamNewService::create( + name.as_ref().to_string(), + token, + factory, + addr, + )); + + self.sockets + .push((token, name.as_ref().to_string(), MioListener::from(lst))); + + Ok(self) + } + + /// Starts processing incoming connections and return server controller. + pub fn run(self) -> Server { + if self.sockets.is_empty() { + panic!("Server should have at least one bound socket"); + } else { + info!("Starting {} workers", self.threads); + Server::new(self) + } + } + + fn next_token(&mut self) -> usize { + let token = self.token; + self.token += 1; + token + } +} + +#[cfg(unix)] +impl ServerBuilder { /// Add new unix domain service to the server. - #[cfg(unix)] pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result where F: ServiceFactory, @@ -191,9 +229,8 @@ impl ServerBuilder { } /// Add new unix domain service to the server. - /// Useful when running as a systemd service and - /// a socket FD can be acquired using the systemd crate. - #[cfg(unix)] + /// + /// Useful when running as a systemd service and a socket FD is acquired externally. pub fn listen_uds>( mut self, name: N, @@ -207,7 +244,7 @@ impl ServerBuilder { lst.set_nonblocking(true)?; let token = self.next_token(); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); - self.services.push(StreamNewService::create( + self.factories.push(StreamNewService::create( name.as_ref().to_string(), token, factory, @@ -217,219 +254,6 @@ impl ServerBuilder { .push((token, name.as_ref().to_string(), MioListener::from(lst))); Ok(self) } - - /// Add new service to the server. - pub fn listen>( - mut self, - name: N, - lst: StdTcpListener, - factory: F, - ) -> io::Result - where - F: ServiceFactory, - { - lst.set_nonblocking(true)?; - let addr = lst.local_addr()?; - - let token = self.next_token(); - self.services.push(StreamNewService::create( - name.as_ref().to_string(), - token, - factory, - addr, - )); - - self.sockets - .push((token, name.as_ref().to_string(), MioListener::from(lst))); - - Ok(self) - } - - /// Starts processing incoming connections and return server controller. - pub fn run(mut self) -> Server { - if self.sockets.is_empty() { - panic!("Server should have at least one bound socket"); - } else { - info!("Starting {} workers", self.threads); - - // start workers - let handles = (0..self.threads) - .map(|idx| { - let (handle_accept, handle_server) = - self.start_worker(idx, self.accept.waker_owned()); - self.handles.push((idx, handle_server)); - - handle_accept - }) - .collect(); - - // start accept thread - for sock in &self.sockets { - info!("Starting \"{}\" service on {}", sock.1, sock.2); - } - self.accept.start( - mem::take(&mut self.sockets) - .into_iter() - .map(|t| (t.0, t.2)) - .collect(), - handles, - ); - - // handle signals - if !self.no_signals { - Signals::start(self.server.clone()); - } - - // start http server actor - let server = self.server.clone(); - rt::spawn(self); - server - } - } - - fn start_worker( - &self, - idx: usize, - waker_queue: WakerQueue, - ) -> (WorkerHandleAccept, WorkerHandleServer) { - let services = self.services.iter().map(|v| v.clone_factory()).collect(); - - ServerWorker::start(idx, services, waker_queue, self.worker_config) - } - - fn handle_cmd(&mut self, item: ServerCommand) { - match item { - ServerCommand::Pause(tx) => { - self.accept.wake(WakerInterest::Pause); - let _ = tx.send(()); - } - ServerCommand::Resume(tx) => { - self.accept.wake(WakerInterest::Resume); - let _ = tx.send(()); - } - ServerCommand::Signal(sig) => { - // Signals support - // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system - match sig { - Signal::Int => { - info!("SIGINT received, starting forced shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - - Signal::Term => { - info!("SIGTERM received, starting graceful shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: true, - completion: None, - }) - } - - Signal::Quit => { - info!("SIGQUIT received, starting forced shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - } - } - ServerCommand::Notify(tx) => { - self.notify.push(tx); - } - ServerCommand::Stop { - graceful, - completion, - } => { - let exit = self.exit; - - // stop accept thread - self.accept.wake(WakerInterest::Stop); - let notify = std::mem::take(&mut self.notify); - - // stop workers - let stop = self - .handles - .iter() - .map(move |worker| worker.1.stop(graceful)) - .collect(); - - rt::spawn(async move { - if graceful { - // wait for all workers to shut down - let _ = join_all(stop).await; - } - - if let Some(tx) = completion { - let _ = tx.send(()); - } - - for tx in notify { - let _ = tx.send(()); - } - - if exit { - sleep(Duration::from_millis(300)).await; - System::current().stop(); - } - }); - } - ServerCommand::WorkerFaulted(idx) => { - let mut found = false; - for i in 0..self.handles.len() { - if self.handles[i].0 == idx { - self.handles.swap_remove(i); - found = true; - break; - } - } - - if found { - error!("Worker has died {:?}, restarting", idx); - - let mut new_idx = self.handles.len(); - 'found: loop { - for i in 0..self.handles.len() { - if self.handles[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let (handle_accept, handle_server) = - self.start_worker(new_idx, self.accept.waker_owned()); - self.handles.push((new_idx, handle_server)); - self.accept.wake(WakerInterest::Worker(handle_accept)); - } - } - } - } - - fn next_token(&mut self) -> usize { - let token = self.token; - self.token += 1; - token - } -} - -impl Future for ServerBuilder { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match Pin::new(&mut self.cmd).poll_recv(cx) { - Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), - _ => return Poll::Pending, - } - } - } } pub(super) fn bind_addr( @@ -437,39 +261,26 @@ pub(super) fn bind_addr( backlog: u32, ) -> io::Result> { let mut err = None; - let mut succ = false; + let mut success = false; let mut sockets = Vec::new(); for addr in addr.to_socket_addrs()? { - match create_tcp_listener(addr, backlog) { + match create_mio_tcp_listener(addr, backlog) { Ok(lst) => { - succ = true; + success = true; sockets.push(lst); } Err(e) => err = Some(e), } } - if !succ { - if let Some(e) = err.take() { - Err(e) - } else { - Err(io::Error::new( - io::ErrorKind::Other, - "Can not bind to address.", - )) - } - } else { + if success { Ok(sockets) + } else if let Some(err) = err.take() { + Err(err) + } else { + Err(io::Error::new( + io::ErrorKind::Other, + "Can not bind to address.", + )) } } - -fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result { - let socket = match addr { - StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?, - StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?, - }; - - socket.set_reuseaddr(true)?; - socket.bind(addr)?; - socket.listen(backlog) -} diff --git a/actix-server/src/handle.rs b/actix-server/src/handle.rs new file mode 100644 index 00000000..53f00bee --- /dev/null +++ b/actix-server/src/handle.rs @@ -0,0 +1,55 @@ +use std::future::Future; + +use tokio::sync::{mpsc::UnboundedSender, oneshot}; + +use crate::server::ServerCommand; + +/// Server handle. +#[derive(Debug, Clone)] +pub struct ServerHandle { + cmd_tx: UnboundedSender, +} + +impl ServerHandle { + pub(crate) fn new(cmd_tx: UnboundedSender) -> Self { + ServerHandle { cmd_tx } + } + + pub(crate) fn worker_faulted(&self, idx: usize) { + let _ = self.cmd_tx.send(ServerCommand::WorkerFaulted(idx)); + } + + /// Pause accepting incoming connections. + /// + /// May drop socket pending connection. All open connections remain active. + pub fn pause(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(ServerCommand::Pause(tx)); + async { + let _ = rx.await; + } + } + + /// Resume accepting incoming connections. + pub fn resume(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(ServerCommand::Resume(tx)); + async { + let _ = rx.await; + } + } + + /// Stop incoming connection processing, stop all workers and exit. + pub fn stop(&self, graceful: bool) -> impl Future { + let (tx, rx) = oneshot::channel(); + + let _ = self.cmd_tx.send(ServerCommand::Stop { + graceful, + completion: Some(tx), + }); + + async { + let _ = rx.await; + } + } +} diff --git a/actix-server/src/join_all.rs b/actix-server/src/join_all.rs new file mode 100644 index 00000000..bdef62ef --- /dev/null +++ b/actix-server/src/join_all.rs @@ -0,0 +1,78 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_core::future::BoxFuture; + +// a poor man's join future. joined future is only used when starting/stopping the server. +// pin_project and pinned futures are overkill for this task. +pub(crate) struct JoinAll { + fut: Vec>, +} + +pub(crate) fn join_all(fut: Vec + Send + 'static>) -> JoinAll { + let fut = fut + .into_iter() + .map(|f| JoinFuture::Future(Box::pin(f))) + .collect(); + + JoinAll { fut } +} + +enum JoinFuture { + Future(BoxFuture<'static, T>), + Result(Option), +} + +impl Unpin for JoinAll {} + +impl Future for JoinAll { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut ready = true; + + let this = self.get_mut(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Future(f) = fut { + match f.as_mut().poll(cx) { + Poll::Ready(t) => { + *fut = JoinFuture::Result(Some(t)); + } + Poll::Pending => ready = false, + } + } + } + + if ready { + let mut res = Vec::new(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Result(f) = fut { + res.push(f.take().unwrap()); + } + } + + Poll::Ready(res) + } else { + Poll::Pending + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + use actix_utils::future::ready; + + #[actix_rt::test] + async fn test_join_all() { + let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; + let mut res = join_all(futs).await.into_iter(); + assert_eq!(Ok(1), res.next().unwrap()); + assert_eq!(Err(3), res.next().unwrap()); + assert_eq!(Ok(9), res.next().unwrap()); + } +} diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index b2117191..6ac8ba7e 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -5,7 +5,10 @@ #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] mod accept; +mod availability; mod builder; +mod handle; +mod join_all; mod server; mod service; mod signals; @@ -15,6 +18,7 @@ mod waker_queue; mod worker; pub use self::builder::ServerBuilder; +pub use self::handle::ServerHandle; pub use self::server::Server; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; @@ -22,82 +26,9 @@ pub use self::test_server::TestServer; #[doc(hidden)] pub use self::socket::FromStream; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - /// Start server building process +#[doc(hidden)] +#[deprecated(since = "2.0.0", note = "Use `Server::build()`.")] pub fn new() -> ServerBuilder { ServerBuilder::default() } - -// a poor man's join future. joined future is only used when starting/stopping the server. -// pin_project and pinned futures are overkill for this task. -pub(crate) struct JoinAll { - fut: Vec>, -} - -pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAll { - let fut = fut - .into_iter() - .map(|f| JoinFuture::Future(Box::pin(f))) - .collect(); - - JoinAll { fut } -} - -enum JoinFuture { - Future(Pin>>), - Result(Option), -} - -impl Unpin for JoinAll {} - -impl Future for JoinAll { - type Output = Vec; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut ready = true; - - let this = self.get_mut(); - for fut in this.fut.iter_mut() { - if let JoinFuture::Future(f) = fut { - match f.as_mut().poll(cx) { - Poll::Ready(t) => { - *fut = JoinFuture::Result(Some(t)); - } - Poll::Pending => ready = false, - } - } - } - - if ready { - let mut res = Vec::new(); - for fut in this.fut.iter_mut() { - if let JoinFuture::Result(f) = fut { - res.push(f.take().unwrap()); - } - } - - Poll::Ready(res) - } else { - Poll::Pending - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - use actix_utils::future::ready; - - #[actix_rt::test] - async fn test_join_all() { - let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; - let mut res = join_all(futs).await.into_iter(); - assert_eq!(Ok(1), res.next().unwrap()); - assert_eq!(Err(3), res.next().unwrap()); - assert_eq!(Ok(9), res.next().unwrap()); - } -} diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index f0dfca0b..08036eeb 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -1,119 +1,359 @@ -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{ + future::Future, + io, mem, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::oneshot; +use actix_rt::{time::sleep, System}; +use futures_core::future::BoxFuture; +use log::{error, info}; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, +}; -use crate::builder::ServerBuilder; -use crate::signals::Signal; +use crate::{ + accept::Accept, + builder::ServerBuilder, + join_all::join_all, + service::InternalServiceFactory, + signals::{Signal, Signals}, + waker_queue::{WakerInterest, WakerQueue}, + worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}, + ServerHandle, +}; #[derive(Debug)] pub(crate) enum ServerCommand { + /// TODO WorkerFaulted(usize), + + /// Contains return channel to notify caller of successful state change. Pause(oneshot::Sender<()>), + + /// Contains return channel to notify caller of successful state change. Resume(oneshot::Sender<()>), - Signal(Signal), + + /// TODO Stop { /// True if shut down should be graceful. graceful: bool, + + /// Return channel to notify caller that shutdown is complete. completion: Option>, }, - /// Notify of server stop - Notify(oneshot::Sender<()>), } -/// Server handle. +/// General purpose TCP server that runs services receiving Tokio `TcpStream`s. +/// +/// Handles creating worker threads, restarting faulted workers, connection accepting, and +/// back-pressure logic. +/// +/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and +/// distributes connections with a round-robin strategy. +/// +/// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve +/// when the server has fully shut down. /// /// # Shutdown Signals /// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a -/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown. +/// forced shutdown. On Windows, a Ctrl-C signal will start a forced shutdown. /// /// A graceful shutdown will wait for all workers to stop first. -#[derive(Debug)] -pub struct Server( - UnboundedSender, - Option>, -); +/// +/// # Examples +/// The following is a TCP echo server. Test using `telnet 127.0.0.1 8080`. +/// +/// ```no_run +/// use std::io; +/// +/// use actix_rt::net::TcpStream; +/// use actix_server::Server; +/// use actix_service::{fn_service, ServiceFactoryExt as _}; +/// use bytes::BytesMut; +/// use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; +/// +/// #[actix_rt::main] +/// async fn main() -> io::Result<()> { +/// let bind_addr = ("127.0.0.1", 8080); +/// +/// Server::build() +/// .bind("echo", bind_addr, move || { +/// fn_service(move |mut stream: TcpStream| { +/// async move { +/// let mut size = 0; +/// let mut buf = BytesMut::new(); +/// +/// loop { +/// match stream.read_buf(&mut buf).await { +/// // end of stream; bail from loop +/// Ok(0) => break, +/// +/// // write bytes back to stream +/// Ok(bytes_read) => { +/// stream.write_all(&buf[size..]).await.unwrap(); +/// size += bytes_read; +/// } +/// +/// Err(err) => { +/// eprintln!("Stream Error: {:?}", err); +/// return Err(()); +/// } +/// } +/// } +/// +/// Ok(()) +/// } +/// }) +/// .map_err(|err| eprintln!("Service Error: {:?}", err)) +/// })? +/// .run() +/// .await +/// } +/// ``` +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub enum Server { + Server(ServerInner), + Error(Option), +} impl Server { - pub(crate) fn new(tx: UnboundedSender) -> Self { - Server(tx, None) - } - - /// Start server building process + /// Create server build. pub fn build() -> ServerBuilder { ServerBuilder::default() } - pub(crate) fn signal(&self, sig: Signal) { - let _ = self.0.send(ServerCommand::Signal(sig)); + pub(crate) fn new(mut builder: ServerBuilder) -> Self { + let sockets = mem::take(&mut builder.sockets) + .into_iter() + .map(|t| (t.0, t.2)) + .collect(); + + // Give log information on what runtime will be used. + let is_actix = actix_rt::System::try_current().is_some(); + let is_tokio = tokio::runtime::Handle::try_current().is_ok(); + + match (is_actix, is_tokio) { + (false, true) => info!("Tokio runtime found. Starting in existing Tokio runtime"), + (true, _) => info!("Actix runtime found. Starting in Actix runtime"), + (_, _) => info!( + "Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime" + ), + } + + for (_, name, lst) in &builder.sockets { + info!( + r#"Starting service: "{}", workers: {}, listening on: {}"#, + name, + builder.threads, + lst.local_addr() + ); + } + + match Accept::start(sockets, &builder) { + Ok((waker_queue, worker_handles)) => { + // construct OS signals listener future + let signals = (builder.listen_os_signals).then(Signals::new); + + Self::Server(ServerInner { + cmd_tx: builder.cmd_tx.clone(), + cmd_rx: builder.cmd_rx, + signals, + waker_queue, + worker_handles, + worker_config: builder.worker_config, + services: builder.factories, + exit: builder.exit, + stop_task: None, + }) + } + + Err(err) => Self::Error(Some(err)), + } } - pub(crate) fn worker_faulted(&self, idx: usize) { - let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); - } - - /// Pause accepting incoming connections + /// Get a handle for ServerFuture that can be used to change state of actix server. /// - /// If socket contains some pending connection, they might be dropped. - /// All opened connection remains active. - pub fn pause(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Pause(tx)); - async { - let _ = rx.await; + /// See [ServerHandle](ServerHandle) for usage. + pub fn handle(&self) -> ServerHandle { + match self { + Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()), + Server::Error(err) => { + // TODO: i don't think this is the best way to handle server startup fail + panic!( + "server handle can not be obtained because server failed to start up: {}", + err.as_ref().unwrap() + ); + } } } - - /// Resume accepting incoming connections - pub fn resume(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Resume(tx)); - async { - let _ = rx.await; - } - } - - /// Stop incoming connection processing, stop all workers and exit. - /// - /// If server starts with `spawn()` method, then spawned thread get terminated. - pub fn stop(&self, graceful: bool) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Stop { - graceful, - completion: Some(tx), - }); - async { - let _ = rx.await; - } - } -} - -impl Clone for Server { - fn clone(&self) -> Self { - Self(self.0.clone(), None) - } } impl Future for Server { type Output = io::Result<()>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().get_mut() { + Self::Error(err) => Poll::Ready(Err(err + .take() + .expect("Server future cannot be polled after error"))), - if this.1.is_none() { - let (tx, rx) = oneshot::channel(); - if this.0.send(ServerCommand::Notify(tx)).is_err() { - return Poll::Ready(Ok(())); + Self::Server(inner) => { + // poll Signals + if let Some(ref mut signals) = inner.signals { + if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { + inner.stop_task = inner.handle_signal(signal); + // drop signals listener + inner.signals = None; + } + } + + // handle stop tasks and eager drain command channel + loop { + if let Some(ref mut fut) = inner.stop_task { + // only resolve stop task and exit + return fut.as_mut().poll(cx).map(|_| Ok(())); + } + + match Pin::new(&mut inner.cmd_rx).poll_recv(cx) { + Poll::Ready(Some(cmd)) => { + // if stop task is required, set it and loop + inner.stop_task = inner.handle_cmd(cmd); + } + _ => return Poll::Pending, + } + } + } + } + } +} + +pub struct ServerInner { + worker_handles: Vec, + worker_config: ServerWorkerConfig, + services: Vec>, + exit: bool, + cmd_tx: UnboundedSender, + cmd_rx: UnboundedReceiver, + signals: Option, + waker_queue: WakerQueue, + stop_task: Option>, +} + +impl ServerInner { + fn handle_cmd(&mut self, item: ServerCommand) -> Option> { + match item { + ServerCommand::Pause(tx) => { + self.waker_queue.wake(WakerInterest::Pause); + let _ = tx.send(()); + None + } + + ServerCommand::Resume(tx) => { + self.waker_queue.wake(WakerInterest::Resume); + let _ = tx.send(()); + None + } + + ServerCommand::Stop { + graceful, + completion, + } => { + let exit = self.exit; + + // stop accept thread + self.waker_queue.wake(WakerInterest::Stop); + + // stop workers + let workers_stop = self + .worker_handles + .iter() + .map(|worker| worker.stop(graceful)) + .collect::>(); + + Some(Box::pin(async move { + if graceful { + // wait for all workers to shut down + let _ = join_all(workers_stop).await; + } + + if let Some(tx) = completion { + let _ = tx.send(()); + } + + if exit { + sleep(Duration::from_millis(300)).await; + System::try_current().as_ref().map(System::stop); + } + })) + } + + ServerCommand::WorkerFaulted(idx) => { + // TODO: maybe just return with warning log if not found ? + assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx)); + + error!("Worker {} has died; restarting", idx); + + let factories = self + .services + .iter() + .map(|service| service.clone_factory()) + .collect(); + + match ServerWorker::start( + idx, + factories, + self.waker_queue.clone(), + self.worker_config, + ) { + Ok((handle_accept, handle_server)) => { + *self + .worker_handles + .iter_mut() + .find(|wrk| wrk.idx == idx) + .unwrap() = handle_server; + + self.waker_queue.wake(WakerInterest::Worker(handle_accept)); + } + + Err(err) => error!("can not restart worker {}: {}", idx, err), + }; + + None + } + } + } + + fn handle_signal(&mut self, signal: Signal) -> Option> { + match signal { + Signal::Int => { + info!("SIGINT received; starting forced shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + + Signal::Term => { + info!("SIGTERM received; starting graceful shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: true, + completion: None, + }) + } + + Signal::Quit => { + info!("SIGQUIT received; starting forced shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) } - this.1 = Some(rx); - } - - match Pin::new(this.1.as_mut().unwrap()).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => Poll::Ready(Ok(())), } } } diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index c9cdb45e..0822a433 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -1,12 +1,16 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{ + fmt, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; -use crate::server::Server; +use log::trace; /// Types of process signals. -#[allow(dead_code)] -#[derive(PartialEq, Clone, Copy, Debug)] +// #[allow(dead_code)] +#[derive(Debug, Clone, Copy, PartialEq)] +#[allow(dead_code)] // variants are never constructed on non-unix pub(crate) enum Signal { /// `SIGINT` Int, @@ -18,26 +22,35 @@ pub(crate) enum Signal { Quit, } +impl fmt::Display for Signal { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + Signal::Int => "SIGINT", + Signal::Term => "SIGTERM", + Signal::Quit => "SIGQUIT", + }) + } +} + /// Process signal listener. pub(crate) struct Signals { - srv: Server, - #[cfg(not(unix))] - signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>, + signals: futures_core::future::BoxFuture<'static, std::io::Result<()>>, #[cfg(unix)] signals: Vec<(Signal, actix_rt::signal::unix::Signal)>, } impl Signals { - /// Spawns a signal listening future that is able to send commands to the `Server`. - pub(crate) fn start(srv: Server) { + /// Constructs an OS signal listening future. + pub(crate) fn new() -> Self { + trace!("setting up OS signal listener"); + #[cfg(not(unix))] { - actix_rt::spawn(Signals { - srv, + Signals { signals: Box::pin(actix_rt::signal::ctrl_c()), - }); + } } #[cfg(unix)] @@ -66,33 +79,30 @@ impl Signals { }) .collect::>(); - actix_rt::spawn(Signals { srv, signals }); + Signals { signals } } } } impl Future for Signals { - type Output = (); + type Output = Signal; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { #[cfg(not(unix))] - match self.signals.as_mut().poll(cx) { - Poll::Ready(_) => { - self.srv.signal(Signal::Int); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, + { + self.signals.as_mut().poll(cx).map(|_| Signal::Int) } #[cfg(unix)] { for (sig, fut) in self.signals.iter_mut() { + // TODO: match on if let Some ? if Pin::new(fut).poll_recv(cx).is_ready() { - let sig = *sig; - self.srv.signal(sig); - return Poll::Ready(()); + trace!("{} received", sig); + return Poll::Ready(*sig); } } + Poll::Pending } } diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index cd7ccc1a..6f641d73 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -2,7 +2,7 @@ pub(crate) use std::net::{ SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs, }; -pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket}; +pub(crate) use mio::net::TcpListener as MioTcpListener; #[cfg(unix)] pub(crate) use { mio::net::UnixListener as MioUnixListener, @@ -223,6 +223,22 @@ mod unix_impl { } } +pub(crate) fn create_mio_tcp_listener( + addr: StdSocketAddr, + backlog: u32, +) -> io::Result { + use socket2::{Domain, Protocol, Socket, Type}; + + let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?; + + socket.set_reuse_address(true)?; + socket.set_nonblocking(true)?; + socket.bind(&addr.into())?; + socket.listen(backlog as i32)?; + + Ok(MioTcpListener::from_std(StdTcpListener::from(socket))) +} + #[cfg(test)] mod tests { use super::*; @@ -234,11 +250,8 @@ mod tests { assert_eq!(format!("{}", addr), "127.0.0.1:8080"); let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = MioTcpSocket::new_v4().unwrap(); - socket.set_reuseaddr(true).unwrap(); - socket.bind(addr).unwrap(); - let tcp = socket.listen(128).unwrap(); - let lst = MioListener::Tcp(tcp); + let lst = create_mio_tcp_listener(addr, 128).unwrap(); + let lst = MioListener::Tcp(lst); assert!(format!("{:?}", lst).contains("TcpListener")); assert!(format!("{}", lst).contains("127.0.0.1")); } diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index ad6ee8ee..fc3bcbe3 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -1,9 +1,8 @@ -use std::sync::mpsc; -use std::{net, thread}; +use std::{io, net, sync::mpsc, thread}; use actix_rt::{net::TcpStream, System}; -use crate::{Server, ServerBuilder, ServiceFactory}; +use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory}; /// A testing server. /// @@ -34,7 +33,8 @@ pub struct TestServerRuntime { addr: net::SocketAddr, host: String, port: u16, - system: System, + server_handle: ServerHandle, + thread_handle: Option>>, } impl TestServer { @@ -46,20 +46,22 @@ impl TestServer { let (tx, rx) = mpsc::channel(); // run server in separate thread - thread::spawn(move || { - let sys = System::new(); - factory(Server::build()).workers(1).disable_signals().run(); - - tx.send(System::current()).unwrap(); - sys.run() + let thread_handle = thread::spawn(move || { + System::new().block_on(async { + let server = factory(Server::build()).workers(1).disable_signals().run(); + tx.send(server.handle()).unwrap(); + server.await + }) }); - let system = rx.recv().unwrap(); + + let server_handle = rx.recv().unwrap(); TestServerRuntime { - system, addr: "127.0.0.1:0".parse().unwrap(), host: "127.0.0.1".to_string(), port: 0, + server_handle, + thread_handle: Some(thread_handle), } } @@ -68,24 +70,25 @@ impl TestServer { let (tx, rx) = mpsc::channel(); // run server in separate thread - thread::spawn(move || { + let thread_handle = thread::spawn(move || { let sys = System::new(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); sys.block_on(async { - Server::build() + let server = Server::build() .listen("test", tcp, factory) .unwrap() .workers(1) .disable_signals() .run(); - tx.send((System::current(), local_addr)).unwrap(); - }); - sys.run() + + tx.send((server.handle(), local_addr)).unwrap(); + server.await + }) }); - let (system, addr) = rx.recv().unwrap(); + let (server_handle, addr) = rx.recv().unwrap(); let host = format!("{}", addr.ip()); let port = addr.port(); @@ -94,18 +97,23 @@ impl TestServer { addr, host, port, - system, + server_handle, + thread_handle: Some(thread_handle), } } /// Get first available unused local address. pub fn unused_addr() -> net::SocketAddr { + use socket2::{Domain, Protocol, Socket, Type}; + let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = mio::net::TcpSocket::new_v4().unwrap(); - socket.bind(addr).unwrap(); - socket.set_reuseaddr(true).unwrap(); - let tcp = socket.listen(1024).unwrap(); - tcp.local_addr().unwrap() + let socket = + Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP)).unwrap(); + socket.set_reuse_address(true).unwrap(); + socket.set_nonblocking(true).unwrap(); + socket.bind(&addr.into()).unwrap(); + socket.listen(1024).unwrap(); + net::TcpListener::from(socket).local_addr().unwrap() } } @@ -127,7 +135,8 @@ impl TestServerRuntime { /// Stop server. fn stop(&mut self) { - self.system.stop(); + let _ = self.server_handle.stop(false); + self.thread_handle.take().unwrap().join().unwrap().unwrap(); } /// Connect to server, returning a Tokio `TcpStream`. @@ -141,3 +150,16 @@ impl Drop for TestServerRuntime { self.stop() } } + +#[cfg(test)] +mod tests { + use actix_service::fn_service; + + use super::*; + + #[tokio::test] + async fn plain_tokio_runtime() { + let srv = TestServer::with(|| fn_service(|_sock| async move { Ok::<_, ()>(()) })); + assert!(srv.connect().is_ok()); + } +} diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 3f8669d4..a7280901 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -78,12 +78,7 @@ pub(crate) enum WakerInterest { Pause, Resume, Stop, - /// `Timer` is an interest sent as a delayed future. When an error happens on accepting - /// connection `Accept` would deregister socket listener temporary and wake up the poll and - /// register them again after the delayed future resolve. - Timer, - /// `Worker` is an interest happen after a worker runs into faulted state(This is determined - /// by if work can be sent to it successfully).`Accept` would be waked up and add the new - /// `WorkerHandleAccept`. + /// `Worker` is an interest that is triggered after a worker faults. This is determined by + /// trying to send work to it. `Accept` would be waked up and add the new `WorkerHandleAccept`. Worker(WorkerHandleAccept), } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index b99b2da2..0822ab7c 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,6 +1,6 @@ use std::{ future::Future, - mem, + io, mem, pin::Pin, rc::Rc, sync::{ @@ -14,7 +14,7 @@ use std::{ use actix_rt::{ spawn, time::{sleep, Instant, Sleep}, - Arbiter, + Arbiter, ArbiterHandle, System, }; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; @@ -23,12 +23,13 @@ use tokio::sync::{ oneshot, }; -use crate::join_all; -use crate::service::{BoxedServerService, InternalServiceFactory}; -use crate::socket::MioStream; -use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::{ + service::{BoxedServerService, InternalServiceFactory}, + socket::MioStream, + waker_queue::{WakerInterest, WakerQueue}, +}; -/// Stop worker message. Returns `true` on successful graceful shutdown. +/// Stop worker message. Returns `true` on successful graceful shutdown /// and `false` if some connections still alive when shutdown execute. pub(crate) struct Stop { graceful: bool, @@ -41,19 +42,20 @@ pub(crate) struct Conn { pub token: usize, } +/// Create accept and server worker handles. fn handle_pair( idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, + conn_tx: UnboundedSender, + stop_tx: UnboundedSender, counter: Counter, ) -> (WorkerHandleAccept, WorkerHandleServer) { let accept = WorkerHandleAccept { idx, - tx: tx1, + conn_tx, counter, }; - let server = WorkerHandleServer { idx, tx: tx2 }; + let server = WorkerHandleServer { idx, stop_tx }; (accept, server) } @@ -149,13 +151,13 @@ impl Drop for WorkerCounterGuard { } } -/// Handle to worker that can send connection message to worker and share the -/// availability of worker to other thread. +/// Handle to worker that can send connection message to worker and share the availability of worker +/// to other threads. /// /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { idx: usize, - tx: UnboundedSender, + conn_tx: UnboundedSender, counter: Counter, } @@ -166,8 +168,8 @@ impl WorkerHandleAccept { } #[inline(always)] - pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { - self.tx.send(msg).map_err(|msg| msg.0) + pub(crate) fn send(&self, conn: Conn) -> Result<(), Conn> { + self.conn_tx.send(conn).map_err(|msg| msg.0) } #[inline(always)] @@ -181,14 +183,14 @@ impl WorkerHandleAccept { /// Held by [ServerBuilder](crate::builder::ServerBuilder). #[derive(Debug)] pub(crate) struct WorkerHandleServer { - idx: usize, - tx: UnboundedSender, + pub(crate) idx: usize, + stop_tx: UnboundedSender, } impl WorkerHandleServer { pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); - let _ = self.tx.send(Stop { graceful, tx }); + let _ = self.stop_tx.send(Stop { graceful, tx }); rx } } @@ -199,8 +201,8 @@ impl WorkerHandleServer { pub(crate) struct ServerWorker { // UnboundedReceiver should always be the first field. // It must be dropped as soon as ServerWorker dropping. - rx: UnboundedReceiver, - rx2: UnboundedReceiver, + conn_rx: UnboundedReceiver, + stop_rx: UnboundedReceiver, counter: WorkerCounter, services: Box<[WorkerService]>, factories: Box<[Box]>, @@ -209,7 +211,7 @@ pub(crate) struct ServerWorker { } struct WorkerService { - factory: usize, + factory_idx: usize, status: WorkerServiceStatus, service: BoxedServerService, } @@ -221,7 +223,7 @@ impl WorkerService { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum WorkerServiceStatus { Available, Unavailable, @@ -231,8 +233,14 @@ enum WorkerServiceStatus { Stopped, } +impl Default for WorkerServiceStatus { + fn default() -> Self { + Self::Unavailable + } +} + /// Config for worker behavior passed down from server builder. -#[derive(Copy, Clone)] +#[derive(Debug, Clone, Copy)] pub(crate) struct ServerWorkerConfig { shutdown_timeout: Duration, max_blocking_threads: usize, @@ -271,82 +279,199 @@ impl ServerWorker { factories: Vec>, waker_queue: WakerQueue, config: ServerWorkerConfig, - ) -> (WorkerHandleAccept, WorkerHandleServer) { - let (tx1, rx) = unbounded_channel(); - let (tx2, rx2) = unbounded_channel(); + ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { + trace!("starting server worker {}", idx); + + let (tx1, conn_rx) = unbounded_channel(); + let (tx2, stop_rx) = unbounded_channel(); let counter = Counter::new(config.max_concurrent_connections); + let pair = handle_pair(idx, tx1, tx2, counter.clone()); - let counter_clone = counter.clone(); - // every worker runs in it's own arbiter. + // get actix system context if it is set + let actix_system = System::try_current(); + + // get tokio runtime handle if it is set + let tokio_handle = tokio::runtime::Handle::try_current().ok(); + + // service factories initialization channel + let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel::>(1); + + // outline of following code: + // + // if system exists + // if uring enabled + // start arbiter using uring method + // else + // start arbiter with regular tokio + // else + // if uring enabled + // start uring in spawned thread + // else + // start regular tokio in spawned thread + + // every worker runs in it's own thread and tokio runtime. // use a custom tokio runtime builder to change the settings of runtime. - #[cfg(all(target_os = "linux", feature = "io-uring"))] - let arbiter = { - // TODO: pass max blocking thread config when tokio-uring enable configuration - // on building runtime. - let _ = config.max_blocking_threads; - Arbiter::new() - }; - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] - let arbiter = Arbiter::with_tokio_rt(move || { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap() - }); + match (actix_system, tokio_handle) { + (None, None) => { + panic!("No runtime detected. Start a Tokio (or Actix) runtime."); + } - arbiter.spawn(async move { - let fut = factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { fut.await.map(|(t, s)| (idx, t, s)) } - }) - .collect::>(); + // no actix system + (None, Some(rt_handle)) => { + std::thread::Builder::new() + .name(format!("actix-server worker {}", idx)) + .spawn(move || { + let (worker_stopped_tx, worker_stopped_rx) = oneshot::channel(); - // a second spawn to run !Send future tasks. - spawn(async move { - let res = join_all(fut) - .await - .into_iter() - .collect::, _>>(); - let services = match res { - Ok(res) => res - .into_iter() - .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token, services.len()); - services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, + // local set for running service init futures and worker services + let ls = tokio::task::LocalSet::new(); + + // init services using existing Tokio runtime (so probably on main thread) + let services = rt_handle.block_on(ls.run_until(async { + let mut services = Vec::new(); + + for (idx, factory) in factories.iter().enumerate() { + match factory.create().await { + Ok((token, svc)) => services.push((idx, token, svc)), + + Err(err) => { + error!("Can not start worker: {:?}", err); + return Err(io::Error::new( + io::ErrorKind::Other, + format!("can not start server service {}", idx), + )); + } + } + } + + Ok(services) + })); + + let services = match services { + Ok(services) => { + factory_tx.send(Ok(())).unwrap(); + services + } + Err(err) => { + factory_tx.send(Err(err)).unwrap(); + return; + } + }; + + let worker_services = wrap_worker_services(services); + + let worker_fut = async move { + // spawn to make sure ServerWorker runs as non boxed future. + spawn(async move { + ServerWorker { + conn_rx, + stop_rx, + services: worker_services.into_boxed_slice(), + counter: WorkerCounter::new(idx, waker_queue, counter), + factories: factories.into_boxed_slice(), + state: WorkerState::default(), + shutdown_timeout: config.shutdown_timeout, + } + .await; + + // wake up outermost task waiting for shutdown + worker_stopped_tx.send(()).unwrap(); }); - services - }) - .into_boxed_slice(), - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - return; - } + + worker_stopped_rx.await.unwrap(); + }; + + #[cfg(all(target_os = "linux", feature = "io-uring"))] + { + // TODO: pass max blocking thread config when tokio-uring enable configuration + // on building runtime. + let _ = config.max_blocking_threads; + tokio_uring::start(worker_fut); + } + + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap(); + + rt.block_on(ls.run_until(worker_fut)); + } + }) + .expect("cannot spawn server worker thread"); + } + + // with actix system + (Some(_sys), _) => { + #[cfg(all(target_os = "linux", feature = "io-uring"))] + let arbiter = { + // TODO: pass max blocking thread config when tokio-uring enable configuration + // on building runtime. + let _ = config.max_blocking_threads; + Arbiter::new() }; - // a third spawn to make sure ServerWorker runs as non boxed future. - spawn(ServerWorker { - rx, - rx2, - services, - counter: WorkerCounter::new(idx, waker_queue, counter_clone), - factories: factories.into_boxed_slice(), - state: Default::default(), - shutdown_timeout: config.shutdown_timeout, - }); - }); - }); + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + let arbiter = { + Arbiter::with_tokio_rt(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap() + }) + }; - handle_pair(idx, tx1, tx2, counter) + arbiter.spawn(async move { + // spawn_local to run !Send future tasks. + spawn(async move { + let mut services = Vec::new(); + + for (idx, factory) in factories.iter().enumerate() { + match factory.create().await { + Ok((token, svc)) => services.push((idx, token, svc)), + + Err(err) => { + error!("Can not start worker: {:?}", err); + Arbiter::current().stop(); + factory_tx + .send(Err(io::Error::new( + io::ErrorKind::Other, + format!("can not start server service {}", idx), + ))) + .unwrap(); + return; + } + } + } + + factory_tx.send(Ok(())).unwrap(); + + let worker_services = wrap_worker_services(services); + + // spawn to make sure ServerWorker runs as non boxed future. + spawn(ServerWorker { + conn_rx, + stop_rx, + services: worker_services.into_boxed_slice(), + counter: WorkerCounter::new(idx, waker_queue, counter), + factories: factories.into_boxed_slice(), + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, + }); + }); + }); + } + }; + + // wait for service factories initialization + factory_rx.recv().unwrap()?; + + Ok(pair) } fn restart_service(&mut self, idx: usize, factory_id: usize) { @@ -384,7 +509,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Unavailable { trace!( "Service {:?} is available", - self.factories[srv.factory].name(idx) + self.factories[srv.factory_idx].name(idx) ); srv.status = WorkerServiceStatus::Available; } @@ -395,7 +520,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Available { trace!( "Service {:?} is unavailable", - self.factories[srv.factory].name(idx) + self.factories[srv.factory_idx].name(idx) ); srv.status = WorkerServiceStatus::Unavailable; } @@ -403,10 +528,10 @@ impl ServerWorker { Poll::Ready(Err(_)) => { error!( "Service {:?} readiness check returned error, restarting", - self.factories[srv.factory].name(idx) + self.factories[srv.factory_idx].name(idx) ); srv.status = WorkerServiceStatus::Failed; - return Err((idx, srv.factory)); + return Err((idx, srv.factory_idx)); } } } @@ -437,7 +562,7 @@ struct Shutdown { /// Start time of shutdown. start_from: Instant, - /// Notify of the shutdown outcome (force/grace) to stop caller. + /// Notify caller of the shutdown outcome (graceful/force). tx: oneshot::Sender, } @@ -449,8 +574,7 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { - // Stop the Arbiter ServerWorker runs on on drop. - Arbiter::current().stop(); + Arbiter::try_current().as_ref().map(ArbiterHandle::stop); } } @@ -461,15 +585,16 @@ impl Future for ServerWorker { let this = self.as_mut().get_mut(); // `StopWorker` message handler - if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) + if let Poll::Ready(Some(Stop { graceful, tx })) = + Pin::new(&mut this.stop_rx).poll_recv(cx) { let num = this.counter.total(); if num == 0 { - info!("Shutting down worker, 0 connections"); + info!("Shutting down idle worker"); let _ = tx.send(true); return Poll::Ready(()); } else if graceful { - info!("Graceful worker shutdown, {} connections", num); + info!("Graceful worker shutdown; finishing {} connections", num); this.shutdown(false); this.state = WorkerState::Shutdown(Shutdown { @@ -478,7 +603,7 @@ impl Future for ServerWorker { tx, }); } else { - info!("Force shutdown worker, {} connections", num); + info!("Force shutdown worker, closing {} connections", num); this.shutdown(true); let _ = tx.send(false); @@ -523,6 +648,14 @@ impl Future for ServerWorker { self.poll(cx) } WorkerState::Shutdown(ref mut shutdown) => { + // drop all pending connections in rx channel. + while let Poll::Ready(Some(conn)) = Pin::new(&mut this.conn_rx).poll_recv(cx) { + // WorkerCounterGuard is needed as Accept thread has incremented counter. + // It's guard's job to decrement the counter together with drop of Conn. + let guard = this.counter.guard(); + drop((conn, guard)); + } + // wait for 1 second ready!(shutdown.timer.as_mut().poll(cx)); @@ -563,7 +696,7 @@ impl Future for ServerWorker { } // handle incoming io stream - match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { + match ready!(Pin::new(&mut this.conn_rx).poll_recv(cx)) { Some(msg) => { let guard = this.counter.guard(); let _ = this.services[msg.token].service.call((guard, msg.io)); @@ -574,3 +707,19 @@ impl Future for ServerWorker { } } } + +fn wrap_worker_services( + services: Vec<(usize, usize, BoxedServerService)>, +) -> Vec { + services + .into_iter() + .fold(Vec::new(), |mut services, (idx, token, service)| { + assert_eq!(token, services.len()); + services.push(WorkerService { + factory_idx: idx, + service, + status: WorkerServiceStatus::Unavailable, + }); + services + }) +} diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 78894816..89e3916a 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -1,20 +1,19 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{mpsc, Arc}; -use std::{net, thread, time::Duration}; +use std::{ + net, + sync::{ + atomic::{AtomicUsize, Ordering}, + mpsc, Arc, + }, + thread, + time::Duration, +}; use actix_rt::{net::TcpStream, time::sleep}; -use actix_server::Server; +use actix_server::{Server, TestServer}; use actix_service::fn_service; -use actix_utils::future::ok; -use futures_util::future::lazy; fn unused_addr() -> net::SocketAddr { - let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = mio::net::TcpSocket::new_v4().unwrap(); - socket.bind(addr).unwrap(); - socket.set_reuseaddr(true).unwrap(); - let tcp = socket.listen(32).unwrap(); - tcp.local_addr().unwrap() + TestServer::unused_addr() } #[test] @@ -23,52 +22,94 @@ fn test_bind() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() + actix_rt::System::new().block_on(async { + let srv = Server::build() .workers(1) .disable_signals() - .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .run() - })); + .bind("test", addr, move || { + fn_service(|_| async { Ok::<_, ()>(()) }) + })? + .run(); - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); + let _ = tx.send(srv.handle()); + + srv.await + }) }); - let (_, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); - sys.stop(); - let _ = h.join(); + + let _ = srv.stop(true); + h.join().unwrap().unwrap(); +} + +#[test] +fn plain_tokio_runtime() { + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let h = thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + let srv = Server::build() + .workers(1) + .disable_signals() + .bind("test", addr, move || { + fn_service(|_| async { Ok::<_, ()>(()) }) + })? + .run(); + + tx.send(srv.handle()).unwrap(); + + srv.await + }) + }); + + let srv = rx.recv().unwrap(); + + thread::sleep(Duration::from_millis(500)); + assert!(net::TcpStream::connect(addr).is_ok()); + + let _ = srv.stop(true); + h.join().unwrap().unwrap(); } #[test] fn test_listen() { let addr = unused_addr(); + let lst = net::TcpListener::bind(addr).unwrap(); + let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new(); - let lst = net::TcpListener::bind(addr).unwrap(); - sys.block_on(async { - Server::build() + actix_rt::System::new().block_on(async { + let srv = Server::build() .disable_signals() .workers(1) - .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() + .listen("test", lst, move || { + fn_service(|_| async { Ok::<_, ()>(()) }) + })? .run(); - let _ = tx.send(actix_rt::System::current()); - }); - let _ = sys.run(); + + let _ = tx.send(srv.handle()); + + srv.await + }) }); - let sys = rx.recv().unwrap(); + + let srv = rx.recv().unwrap(); thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); - sys.stop(); - let _ = h.join(); + + let _ = srv.stop(true); + h.join().unwrap().unwrap(); } #[test] @@ -84,9 +125,8 @@ fn test_start() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() + actix_rt::System::new().block_on(async { + let srv = Server::build() .backlog(100) .disable_signals() .bind("test", addr, move || { @@ -95,13 +135,13 @@ fn test_start() { f.send(Bytes::from_static(b"test")).await.unwrap(); Ok::<_, ()>(()) }) - }) - .unwrap() - .run() - })); + })? + .run(); - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); + let _ = tx.send((srv.handle(), actix_rt::System::current())); + + srv.await + }) }); let (srv, sys) = rx.recv().unwrap(); @@ -134,12 +174,11 @@ fn test_start() { // stop let _ = srv.stop(false); - thread::sleep(Duration::from_millis(100)); - assert!(net::TcpStream::connect(addr).is_err()); - - thread::sleep(Duration::from_millis(100)); sys.stop(); - let _ = h.join(); + h.join().unwrap().unwrap(); + + thread::sleep(Duration::from_secs(1)); + assert!(net::TcpStream::connect(addr).is_err()); } #[actix_rt::test] @@ -162,11 +201,11 @@ async fn test_max_concurrent_connections() { let h = thread::spawn(move || { actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() // Set a relative higher backlog. .backlog(12) // max connection for a worker is 3. - .maxconn(max_conn) + .max_concurrent_connections(max_conn) .workers(1) .disable_signals() .bind("test", addr, move || { @@ -183,9 +222,9 @@ async fn test_max_concurrent_connections() { })? .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); - server.await + srv.await }) }); @@ -209,9 +248,8 @@ async fn test_max_concurrent_connections() { } srv.stop(false).await; - sys.stop(); - let _ = h.join().unwrap(); + h.join().unwrap().unwrap(); } #[actix_rt::test] @@ -257,7 +295,7 @@ async fn test_service_restart() { let h = thread::spawn(move || { let num = num.clone(); actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() .backlog(1) .disable_signals() .bind("addr1", addr1, move || { @@ -266,25 +304,23 @@ async fn test_service_restart() { let num = num.clone(); async move { Ok::<_, ()>(TestService(num)) } }) - }) - .unwrap() + })? .bind("addr2", addr2, move || { let num2 = num2.clone(); fn_factory(move || { let num2 = num2.clone(); async move { Ok::<_, ()>(TestService(num2)) } }) - }) - .unwrap() + })? .workers(1) .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); - server.await + let _ = tx.send(srv.handle()); + srv.await }) }); - let (server, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); for _ in 0..5 { TcpStream::connect(addr1) @@ -306,12 +342,11 @@ async fn test_service_restart() { assert!(num_clone.load(Ordering::SeqCst) > 5); assert!(num2_clone.load(Ordering::SeqCst) > 5); - sys.stop(); - let _ = server.stop(false); - let _ = h.join().unwrap(); + let _ = srv.stop(false); + h.join().unwrap().unwrap(); } -#[ignore] +#[ignore] // non-deterministic on CI #[actix_rt::test] async fn worker_restart() { use actix_service::{Service, ServiceFactory}; @@ -378,19 +413,19 @@ async fn worker_restart() { let h = thread::spawn(move || { let counter = counter.clone(); actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() .disable_signals() - .bind("addr", addr, move || TestServiceFactory(counter.clone())) - .unwrap() + .bind("addr", addr, move || TestServiceFactory(counter.clone()))? .workers(2) .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); - server.await + let _ = tx.send(srv.handle()); + + srv.await }) }); - let (server, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); sleep(Duration::from_secs(3)).await; @@ -447,7 +482,6 @@ async fn worker_restart() { assert_eq!("3", id); stream.shutdown().await.unwrap(); - sys.stop(); - let _ = server.stop(false); - let _ = h.join().unwrap(); + let _ = srv.stop(false); + h.join().unwrap().unwrap(); } diff --git a/actix-service/examples/clone.rs b/actix-service/examples/clone.rs new file mode 100644 index 00000000..1f61a648 --- /dev/null +++ b/actix-service/examples/clone.rs @@ -0,0 +1,33 @@ +use std::{future::Future, sync::mpsc, time::Duration}; + +async fn oracle(f: F) -> (u32, u32) +where + F: FnOnce() -> Fut + Clone + Send + 'static, + Fut: Future + 'static, +{ + let f1 = actix_rt::spawn(f.clone()()); + let f2 = actix_rt::spawn(f()); + + (f1.await.unwrap(), f2.await.unwrap()) +} + +#[actix_rt::main] +async fn main() { + let (tx, rx) = mpsc::channel(); + + let (r1, r2) = oracle({ + let tx = tx.clone(); + + || async move { + tx.send(()).unwrap(); + 4 * 4 + } + }) + .await; + assert_eq!(r1, r2); + + tx.send(()).unwrap(); + + rx.recv_timeout(Duration::from_millis(100)).unwrap(); + rx.recv_timeout(Duration::from_millis(100)).unwrap(); +} diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index 8c1a6f51..f83ef81f 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -52,9 +52,9 @@ pub fn fn_factory( f: F, ) -> FnServiceNoConfig where - Srv: Service, F: Fn() -> Fut, Fut: Future>, + Srv: Service, { FnServiceNoConfig::new(f) } diff --git a/actix-service/src/macros.rs b/actix-service/src/macros.rs index 6cf3ef08..503cf116 100644 --- a/actix-service/src/macros.rs +++ b/actix-service/src/macros.rs @@ -1,7 +1,7 @@ /// An implementation of [`poll_ready`]() that always signals readiness. /// /// This should only be used for basic leaf services that have no concept of un-readiness. -/// For wrapper or other serivice types, use [`forward_ready!`] for simple cases or write a bespoke +/// For wrapper or other service types, use [`forward_ready!`] for simple cases or write a bespoke /// `poll_ready` implementation. /// /// [`poll_ready`]: crate::Service::poll_ready diff --git a/actix-service/src/map_err.rs b/actix-service/src/map_err.rs index 7b1ac2ab..aad0f421 100644 --- a/actix-service/src/map_err.rs +++ b/actix-service/src/map_err.rs @@ -9,26 +9,25 @@ use pin_project_lite::pin_project; use super::{Service, ServiceFactory}; -/// Service for the `map_err` combinator, changing the type of a service's -/// error. +/// Service for the `map_err` combinator, changing the type of a service's error. /// /// This is created by the `ServiceExt::map_err` method. pub struct MapErr { service: S, - f: F, + mapper: F, _t: PhantomData<(E, Req)>, } impl MapErr { /// Create new `MapErr` combinator - pub(crate) fn new(service: S, f: F) -> Self + pub(crate) fn new(service: S, mapper: F) -> Self where S: Service, F: Fn(S::Error) -> E, { Self { service, - f, + mapper, _t: PhantomData, } } @@ -42,7 +41,7 @@ where fn clone(&self) -> Self { MapErr { service: self.service.clone(), - f: self.f.clone(), + mapper: self.mapper.clone(), _t: PhantomData, } } @@ -58,11 +57,11 @@ where type Future = MapErrFuture; fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll> { - self.service.poll_ready(ctx).map_err(&self.f) + self.service.poll_ready(ctx).map_err(&self.mapper) } fn call(&self, req: Req) -> Self::Future { - MapErrFuture::new(self.service.call(req), self.f.clone()) + MapErrFuture::new(self.service.call(req), self.mapper.clone()) } } @@ -105,23 +104,23 @@ where /// service's error. /// /// This is created by the `NewServiceExt::map_err` method. -pub struct MapErrServiceFactory +pub struct MapErrServiceFactory where - A: ServiceFactory, - F: Fn(A::Error) -> E + Clone, + SF: ServiceFactory, + F: Fn(SF::Error) -> E + Clone, { - a: A, + a: SF, f: F, e: PhantomData<(E, Req)>, } -impl MapErrServiceFactory +impl MapErrServiceFactory where - A: ServiceFactory, - F: Fn(A::Error) -> E + Clone, + SF: ServiceFactory, + F: Fn(SF::Error) -> E + Clone, { /// Create new `MapErr` new service instance - pub(crate) fn new(a: A, f: F) -> Self { + pub(crate) fn new(a: SF, f: F) -> Self { Self { a, f, @@ -130,10 +129,10 @@ where } } -impl Clone for MapErrServiceFactory +impl Clone for MapErrServiceFactory where - A: ServiceFactory + Clone, - F: Fn(A::Error) -> E + Clone, + SF: ServiceFactory + Clone, + F: Fn(SF::Error) -> E + Clone, { fn clone(&self) -> Self { Self { @@ -144,57 +143,57 @@ where } } -impl ServiceFactory for MapErrServiceFactory +impl ServiceFactory for MapErrServiceFactory where - A: ServiceFactory, - F: Fn(A::Error) -> E + Clone, + SF: ServiceFactory, + F: Fn(SF::Error) -> E + Clone, { - type Response = A::Response; + type Response = SF::Response; type Error = E; - type Config = A::Config; - type Service = MapErr; - type InitError = A::InitError; - type Future = MapErrServiceFuture; + type Config = SF::Config; + type Service = MapErr; + type InitError = SF::InitError; + type Future = MapErrServiceFuture; - fn new_service(&self, cfg: A::Config) -> Self::Future { + fn new_service(&self, cfg: SF::Config) -> Self::Future { MapErrServiceFuture::new(self.a.new_service(cfg), self.f.clone()) } } pin_project! { - pub struct MapErrServiceFuture + pub struct MapErrServiceFuture where - A: ServiceFactory, - F: Fn(A::Error) -> E, + SF: ServiceFactory, + F: Fn(SF::Error) -> E, { #[pin] - fut: A::Future, - f: F, + fut: SF::Future, + mapper: F, } } -impl MapErrServiceFuture +impl MapErrServiceFuture where - A: ServiceFactory, - F: Fn(A::Error) -> E, + SF: ServiceFactory, + F: Fn(SF::Error) -> E, { - fn new(fut: A::Future, f: F) -> Self { - MapErrServiceFuture { fut, f } + fn new(fut: SF::Future, mapper: F) -> Self { + MapErrServiceFuture { fut, mapper } } } -impl Future for MapErrServiceFuture +impl Future for MapErrServiceFuture where - A: ServiceFactory, - F: Fn(A::Error) -> E + Clone, + SF: ServiceFactory, + F: Fn(SF::Error) -> E + Clone, { - type Output = Result, A::InitError>; + type Output = Result, SF::InitError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); if let Poll::Ready(svc) = this.fut.poll(cx)? { - Poll::Ready(Ok(MapErr::new(svc, this.f.clone()))) + Poll::Ready(Ok(MapErr::new(svc, this.mapper.clone()))) } else { Poll::Pending } diff --git a/actix-tls/CHANGES.md b/actix-tls/CHANGES.md index 28dc612a..ae52fd0f 100644 --- a/actix-tls/CHANGES.md +++ b/actix-tls/CHANGES.md @@ -3,14 +3,35 @@ ## Unreleased - 2021-xx-xx +## 3.0.0-beta.8 - 2021-11-15 +* Add `Connect::request` for getting a reference to the connection request. [#415] + +[#415]: https://github.com/actix/actix-net/pull/415 + + +## 3.0.0-beta.7 - 2021-10-20 +* Add `webpki_roots_cert_store()` to get rustls compatible webpki roots cert store. [#401] +* Alias `connect::ssl` to `connect::tls`. [#401] + +[#401]: https://github.com/actix/actix-net/pull/401 + + +## 3.0.0-beta.6 - 2021-10-19 +* Update `tokio-rustls` to `0.23` which uses `rustls` `0.20`. [#396] +* Removed a re-export of `Session` from `rustls` as it no longer exist. [#396] +* Minimum supported Rust version (MSRV) is now 1.52. + +[#396]: https://github.com/actix/actix-net/pull/396 + + ## 3.0.0-beta.5 - 2021-03-29 -* Changed `connect::ssl::rustls::RustlsConnectorService` to return error when `DNSNameRef` +* Changed `connect::ssl::rustls::RustlsConnectorService` to return error when `DNSNameRef` generation failed instead of panic. [#296] * Remove `connect::ssl::openssl::OpensslConnectServiceFactory`. [#297] * Remove `connect::ssl::openssl::OpensslConnectService`. [#297] * Add `connect::ssl::native_tls` module for native tls support. [#295] * Rename `accept::{nativetls => native_tls}`. [#295] -* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use +* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use `connect::ConnectService` instead and call `Connection::into_parts`. [#299] [#295]: https://github.com/actix/actix-net/pull/295 diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index df97b83b..626a2674 100755 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -1,12 +1,10 @@ [package] name = "actix-tls" -version = "3.0.0-beta.5" +version = "3.0.0-beta.8" authors = ["Nikolay Kim "] description = "TLS acceptor and connector services for Actix ecosystem" keywords = ["network", "tls", "ssl", "async", "transport"] -homepage = "https://actix.rs" repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-tls" categories = ["network-programming", "asynchronous"] license = "MIT OR Apache-2.0" edition = "2018" @@ -40,7 +38,7 @@ native-tls = ["tokio-native-tls"] uri = ["http"] [dependencies] -actix-codec = "0.4.0-beta.1" +actix-codec = "0.4.0" actix-rt = { version = "2.2.0", default-features = false } actix-service = "2.0.0" actix-utils = "3.0.0" @@ -57,19 +55,20 @@ tls-openssl = { package = "openssl", version = "0.10.9", optional = true } tokio-openssl = { version = "0.6", optional = true } # rustls -tokio-rustls = { version = "0.22", optional = true } -webpki-roots = { version = "0.21", optional = true } +tokio-rustls = { version = "0.23", optional = true } +webpki-roots = { version = "0.22", optional = true } # native-tls tokio-native-tls = { version = "0.3", optional = true } [dev-dependencies] actix-rt = "2.2.0" -actix-server = "2.0.0-beta.6" +actix-server = "2.0.0-beta.9" bytes = "1" -env_logger = "0.8" +env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } log = "0.4" +rustls-pemfile = "0.2.1" trust-dns-resolver = "0.20.0" [[example]] diff --git a/actix-tls/examples/tcp-rustls.rs b/actix-tls/examples/tcp-rustls.rs index 687c1f86..f347e164 100644 --- a/actix-tls/examples/tcp-rustls.rs +++ b/actix-tls/examples/tcp-rustls.rs @@ -35,25 +35,29 @@ use actix_service::ServiceFactoryExt as _; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use futures_util::future::ok; use log::info; -use rustls::{ - internal::pemfile::certs, internal::pemfile::rsa_private_keys, NoClientAuth, ServerConfig, -}; +use rustls::{server::ServerConfig, Certificate, PrivateKey}; +use rustls_pemfile::{certs, rsa_private_keys}; #[actix_rt::main] async fn main() -> io::Result<()> { env::set_var("RUST_LOG", "info"); env_logger::init(); - let mut tls_config = ServerConfig::new(NoClientAuth::new()); - // Load TLS key and cert files let cert_file = &mut BufReader::new(File::open("./examples/cert.pem").unwrap()); let key_file = &mut BufReader::new(File::open("./examples/key.pem").unwrap()); - let cert_chain = certs(cert_file).unwrap(); + let cert_chain = certs(cert_file) + .unwrap() + .into_iter() + .map(Certificate) + .collect(); let mut keys = rsa_private_keys(key_file).unwrap(); - tls_config - .set_single_cert(cert_chain, keys.remove(0)) + + let tls_config = ServerConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + .with_single_cert(cert_chain, PrivateKey(keys.remove(0))) .unwrap(); let tls_acceptor = RustlsAcceptor::new(tls_config); diff --git a/actix-tls/src/accept/rustls.rs b/actix-tls/src/accept/rustls.rs index a5a0b461..047791d3 100644 --- a/actix-tls/src/accept/rustls.rs +++ b/actix-tls/src/accept/rustls.rs @@ -19,7 +19,7 @@ use futures_core::future::LocalBoxFuture; use pin_project_lite::pin_project; use tokio_rustls::{Accept, TlsAcceptor}; -pub use tokio_rustls::rustls::{ServerConfig, Session}; +pub use tokio_rustls::rustls::ServerConfig; use super::MAX_CONN_COUNTER; diff --git a/actix-tls/src/connect/connect.rs b/actix-tls/src/connect/connect.rs index 730486cf..65d9e05e 100755 --- a/actix-tls/src/connect/connect.rs +++ b/actix-tls/src/connect/connect.rs @@ -63,16 +63,16 @@ impl From> for ConnectAddrs { /// Connection info. #[derive(Debug, PartialEq, Eq, Hash)] -pub struct Connect { - pub(crate) req: T, +pub struct Connect { + pub(crate) req: R, pub(crate) port: u16, pub(crate) addr: ConnectAddrs, pub(crate) local_addr: Option, } -impl Connect { +impl Connect { /// Create `Connect` instance by splitting the string by ':' and convert the second part to u16 - pub fn new(req: T) -> Connect { + pub fn new(req: R) -> Connect { let (_, port) = parse_host(req.hostname()); Connect { @@ -85,7 +85,7 @@ impl Connect { /// Create new `Connect` instance from host and address. Connector skips name resolution stage /// for such connect messages. - pub fn with_addr(req: T, addr: SocketAddr) -> Connect { + pub fn with_addr(req: R, addr: SocketAddr) -> Connect { Connect { req, port: 0, @@ -155,15 +155,20 @@ impl Connect { ConnectAddrs::Multi(addrs) => ConnectAddrsIter::MultiOwned(addrs.into_iter()), } } + + /// Returns a reference to the connection request. + pub fn request(&self) -> &R { + &self.req + } } -impl From for Connect { - fn from(addr: T) -> Self { +impl From for Connect { + fn from(addr: R) -> Self { Connect::new(addr) } } -impl fmt::Display for Connect { +impl fmt::Display for Connect { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}:{}", self.hostname(), self.port()) } @@ -347,4 +352,10 @@ mod tests { IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) ) } + + #[test] + fn request_ref() { + let conn = Connect::new("hello"); + assert_eq!(conn.request(), &"hello") + } } diff --git a/actix-tls/src/connect/mod.rs b/actix-tls/src/connect/mod.rs index ad4f40a3..60bb3344 100644 --- a/actix-tls/src/connect/mod.rs +++ b/actix-tls/src/connect/mod.rs @@ -21,7 +21,9 @@ mod connector; mod error; mod resolve; mod service; -pub mod ssl; +pub mod tls; +#[doc(hidden)] +pub use tls as ssl; #[cfg(feature = "uri")] mod uri; diff --git a/actix-tls/src/connect/ssl/mod.rs b/actix-tls/src/connect/tls/mod.rs similarity index 89% rename from actix-tls/src/connect/ssl/mod.rs rename to actix-tls/src/connect/tls/mod.rs index 6e0e8aac..7f48d06c 100644 --- a/actix-tls/src/connect/ssl/mod.rs +++ b/actix-tls/src/connect/tls/mod.rs @@ -1,4 +1,4 @@ -//! SSL Services +//! TLS Services #[cfg(feature = "openssl")] pub mod openssl; diff --git a/actix-tls/src/connect/ssl/native_tls.rs b/actix-tls/src/connect/tls/native_tls.rs similarity index 100% rename from actix-tls/src/connect/ssl/native_tls.rs rename to actix-tls/src/connect/tls/native_tls.rs diff --git a/actix-tls/src/connect/ssl/openssl.rs b/actix-tls/src/connect/tls/openssl.rs similarity index 100% rename from actix-tls/src/connect/ssl/openssl.rs rename to actix-tls/src/connect/tls/openssl.rs diff --git a/actix-tls/src/connect/ssl/rustls.rs b/actix-tls/src/connect/tls/rustls.rs similarity index 84% rename from actix-tls/src/connect/ssl/rustls.rs rename to actix-tls/src/connect/tls/rustls.rs index ee8ad02d..5abc7673 100755 --- a/actix-tls/src/connect/ssl/rustls.rs +++ b/actix-tls/src/connect/tls/rustls.rs @@ -1,4 +1,5 @@ use std::{ + convert::TryFrom, future::Future, io, pin::Pin, @@ -6,7 +7,6 @@ use std::{ task::{Context, Poll}, }; -pub use tokio_rustls::rustls::Session; pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig}; pub use webpki_roots::TLS_SERVER_ROOTS; @@ -14,11 +14,26 @@ use actix_rt::net::ActixStream; use actix_service::{Service, ServiceFactory}; use futures_core::{future::LocalBoxFuture, ready}; use log::trace; -use tokio_rustls::webpki::DNSNameRef; +use tokio_rustls::rustls::{client::ServerName, OwnedTrustAnchor, RootCertStore}; use tokio_rustls::{Connect, TlsConnector}; use crate::connect::{Address, Connection}; +/// Returns standard root certificates from `webpki-roots` crate as a rustls certificate store. +pub fn webpki_roots_cert_store() -> RootCertStore { + let mut root_certs = RootCertStore::empty(); + for cert in TLS_SERVER_ROOTS.0 { + let cert = OwnedTrustAnchor::from_subject_spki_name_constraints( + cert.subject, + cert.spki, + cert.name_constraints, + ); + let certs = vec![cert].into_iter(); + root_certs.add_server_trust_anchors(certs); + } + root_certs +} + /// Rustls connector factory pub struct RustlsConnector { connector: Arc, @@ -89,7 +104,7 @@ where trace!("SSL Handshake start for: {:?}", connection.host()); let (stream, connection) = connection.replace_io(()); - match DNSNameRef::try_from_ascii_str(connection.host()) { + match ServerName::try_from(connection.host()) { Ok(host) => RustlsConnectorServiceFuture::Future { connect: TlsConnector::from(self.connector.clone()).connect(host, stream), connection: Some(connection), diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index a94706a2..ed858378 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -23,3 +23,4 @@ local-waker = "0.1" [dev-dependencies] actix-rt = "2.0.0" futures-util = { version = "0.3.7", default-features = false } +static_assertions = "1.1" diff --git a/actix-utils/src/future/ready.rs b/actix-utils/src/future/ready.rs index 4a01ada3..678d6304 100644 --- a/actix-utils/src/future/ready.rs +++ b/actix-utils/src/future/ready.rs @@ -103,10 +103,16 @@ pub fn err(err: E) -> Ready> { #[cfg(test)] mod tests { + use std::rc::Rc; + use futures_util::task::noop_waker; + use static_assertions::{assert_impl_all, assert_not_impl_all}; use super::*; + assert_impl_all!(Ready<()>: Send, Sync, Clone); + assert_not_impl_all!(Ready>: Send, Sync); + #[test] #[should_panic] fn multiple_poll_panics() { diff --git a/clippy.toml b/clippy.toml new file mode 100644 index 00000000..f691ea3d --- /dev/null +++ b/clippy.toml @@ -0,0 +1 @@ +msrv = "1.48"