diff --git a/.cargo/config.toml b/.cargo/config.toml index 77788410..40fe3e57 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,3 @@ [alias] +chk = "hack check --workspace --all-features --tests --examples" lint = "hack --clean-per-run clippy --workspace --tests --examples" -chk = "hack check --workspace --tests --examples" diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index aaa38911..60d7ce37 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,7 +1,10 @@ # Changes ## Unreleased - 2021-xx-xx -* Prevent panic when shutdown_timeout is very large. [f9262db] + + +## 2.0.0-beta.4 - 2021-04-01 +* Prevent panic when `shutdown_timeout` is very large. [f9262db] [f9262db]: https://github.com/actix/actix-net/commit/f9262db diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 2d54124b..de9f9223 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-server" -version = "2.0.0-beta.3" +version = "2.0.0-beta.4" authors = [ "Nikolay Kim ", "fakeshadow <24548779@qq.com>", @@ -9,7 +9,6 @@ description = "General purpose TCP server built for the Actix ecosystem" keywords = ["network", "framework", "async", "futures"] homepage = "https://actix.rs" repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-server" categories = ["network-programming", "asynchronous"] license = "MIT OR Apache-2.0" edition = "2018" diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 6d76ef67..f3fcb5a8 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -142,9 +142,7 @@ impl Accept { self.process_timeout(&mut sockets); continue; } - _ => { - panic!("Poll error: {}", e); - } + _ => panic!("Poll error: {}", e), } } @@ -160,13 +158,12 @@ impl Accept { let mut guard = self.waker_queue.guard(); match guard.pop_front() { // worker notify it becomes available. we may want to recover - // from backpressure. + // from backpressure. Some(WakerInterest::WorkerAvailable) => { drop(guard); self.maybe_backpressure(&mut sockets, false); } - // a new worker thread is made and it's handle would be added - // to Accept + // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { drop(guard); // maybe we want to recover from a backpressure. @@ -207,23 +204,29 @@ impl Accept { } fn process_timeout(&mut self, sockets: &mut Slab) { - // take old timeout as it's no use after each iteration. + // Take old timeout as it's no use after each iteration. if self.timeout.take().is_some() { let now = Instant::now(); - sockets.iter_mut().for_each(|(token, info)| { - // only the ServerSocketInfo have an associate timeout value was de registered. - if let Some(inst) = info.timeout_deadline { - // timeout expired register socket again. - if now >= inst { - info.timeout_deadline = None; - self.register_logged(token, info); - } else { + sockets + .iter_mut() + // Only sockets that had an associated timeout were deregistered. + .filter(|(_, info)| info.timeout_deadline.is_some()) + .for_each(|(token, info)| { + let inst = info.timeout_deadline.take().unwrap(); + + if now < inst { // still timed out. try set new timeout. let dur = inst - now; self.set_timeout(dur); + } else if !self.backpressure { + // timeout expired register socket again. + self.register_logged(token, info); } - } - }); + + // Drop the timeout if server is in backpressure and socket timeout is expired. + // When server recovers from backpressure it will register all sockets without + // a timeout value so this socket register will be delayed till then. + }); } } @@ -290,89 +293,92 @@ impl Accept { } fn maybe_backpressure(&mut self, sockets: &mut Slab, on: bool) { - if self.backpressure { - if !on { + // Only operate when server is in a different backpressure than the given flag. + if self.backpressure != on { + if on { + self.backpressure = true; + // TODO: figure out if timing out sockets can be safely de-registered twice. + self.deregister_all(sockets); + } else { self.backpressure = false; - for (token, info) in sockets.iter_mut() { - if info.timeout_deadline.is_some() { - // socket will attempt to re-register itself when its timeout completes - continue; - } - self.register_logged(token, info); - } + sockets + .iter_mut() + // Only operate on sockets without associated timeout. + // Sockets with it will attempt to re-register when their timeout expires. + .filter(|(_, info)| info.timeout_deadline.is_none()) + .for_each(|(token, info)| self.register_logged(token, info)); } - } else if on { - self.backpressure = true; - self.deregister_all(sockets); } } - fn accept_one(&mut self, sockets: &mut Slab, mut msg: Conn) { + fn accept_one(&mut self, sockets: &mut Slab, mut conn: Conn) { if self.backpressure { + // send_connection would remove fault worker from handles. + // worst case here is conn get dropped after all handles are gone. while !self.handles.is_empty() { - match self.handles[self.next].send(msg) { - Ok(_) => { - self.set_next(); - break; - } - Err(tmp) => { - // worker lost contact and could be gone. a message is sent to - // `ServerBuilder` future to notify it a new worker should be made - // after that remove the fault worker - self.srv.worker_faulted(self.handles[self.next].idx); - msg = tmp; - self.handles.swap_remove(self.next); - if self.handles.is_empty() { - error!("No workers"); - return; - } else if self.handles.len() <= self.next { - self.next = 0; - } - continue; - } + match self.send_connection(sockets, conn) { + Ok(_) => return, + Err(c) => conn = c, } } } else { + // Do one round and try to send conn to all workers until it succeed. + // Start from self.next. let mut idx = 0; while idx < self.handles.len() { idx += 1; if self.handles[self.next].available() { - match self.handles[self.next].send(msg) { - Ok(_) => { - self.set_next(); - return; - } - // worker lost contact and could be gone. a message is sent to - // `ServerBuilder` future to notify it a new worker should be made. - // after that remove the fault worker and enter backpressure if necessary. - Err(tmp) => { - self.srv.worker_faulted(self.handles[self.next].idx); - msg = tmp; - self.handles.swap_remove(self.next); - if self.handles.is_empty() { - error!("No workers"); - self.maybe_backpressure(sockets, true); - return; - } else if self.handles.len() <= self.next { - self.next = 0; - } - continue; - } + match self.send_connection(sockets, conn) { + Ok(_) => return, + Err(c) => conn = c, } + } else { + self.set_next(); } - self.set_next(); } - // enable backpressure + // Sending Conn failed due to either all workers are in error or not available. + // Enter backpressure state and try again. self.maybe_backpressure(sockets, true); - self.accept_one(sockets, msg); + self.accept_one(sockets, conn); } } - // set next worker handle that would accept work. + // Set next worker handle that would accept work. fn set_next(&mut self) { self.next = (self.next + 1) % self.handles.len(); } + // Send connection to worker and handle error. + fn send_connection( + &mut self, + sockets: &mut Slab, + conn: Conn, + ) -> Result<(), Conn> { + match self.handles[self.next].send(conn) { + Ok(_) => { + self.set_next(); + Ok(()) + } + Err(conn) => { + // worker lost contact and could be gone. a message is sent to + // `ServerBuilder` future to notify it a new worker should be made. + // after that remove the fault worker and enter backpressure if necessary. + self.srv.worker_faulted(self.handles[self.next].idx); + self.handles.swap_remove(self.next); + if self.handles.is_empty() { + error!("No workers"); + self.maybe_backpressure(sockets, true); + // All workers are gone and Conn is nowhere to be sent. + // Treat this situation as Ok and drop Conn. + return Ok(()); + } else if self.handles.len() <= self.next { + self.next = 0; + } + Err(conn) + } + } + } + fn accept(&mut self, sockets: &mut Slab, token: usize) { loop { let info = sockets @@ -380,11 +386,10 @@ impl Accept { .expect("ServerSocketInfo is removed from Slab"); match info.lst.accept() { - Ok((io, addr)) => { + Ok(io) => { let msg = Conn { io, token: info.token, - peer: Some(addr), }; self.accept_one(sockets, msg); } diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index baf02cbe..0625cfda 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -40,15 +40,11 @@ impl MioListener { } } - pub(crate) fn accept(&self) -> io::Result<(MioStream, SocketAddr)> { + pub(crate) fn accept(&self) -> io::Result { match *self { - MioListener::Tcp(ref lst) => lst - .accept() - .map(|(stream, addr)| (MioStream::Tcp(stream), SocketAddr::Tcp(addr))), + MioListener::Tcp(ref lst) => lst.accept().map(|(stream, _)| MioStream::Tcp(stream)), #[cfg(unix)] - MioListener::Uds(ref lst) => lst - .accept() - .map(|(stream, addr)| (MioStream::Uds(stream), SocketAddr::Uds(addr))), + MioListener::Uds(ref lst) => lst.accept().map(|(stream, _)| MioStream::Uds(stream)), } } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 899e5fde..7e913446 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -16,7 +16,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use crate::service::{BoxedServerService, InternalServiceFactory}; -use crate::socket::{MioStream, SocketAddr}; +use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::{join_all, Token}; @@ -33,7 +33,6 @@ pub(crate) struct StopCommand { pub(crate) struct Conn { pub io: MioStream, pub token: Token, - pub peer: Option, } static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index c911a211..57ab7add 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx + + +## 3.0.0-beta.3 - 2021-04-01 * Moved `mpsc` to own crate `local-channel`. [#301] * Moved `task::LocalWaker` to own crate `local-waker`. [#301] * Remove `timeout` module. [#301] diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 02bc3114..019d6d98 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,12 +1,13 @@ [package] name = "actix-utils" -version = "3.0.0-beta.2" -authors = ["Nikolay Kim "] -description = "Various network related services and utilities for the Actix ecosystem" +version = "3.0.0-beta.3" +authors = [ + "Nikolay Kim ", + "Rob Ede ", +] +description = "Utilities for the Actix ecosystem" keywords = ["network", "framework", "async", "futures"] -homepage = "https://actix.rs" repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-utils" categories = ["network-programming", "asynchronous"] license = "MIT OR Apache-2.0" edition = "2018" diff --git a/actix-utils/src/counter.rs b/actix-utils/src/counter.rs index c0926b73..7a87fa3d 100644 --- a/actix-utils/src/counter.rs +++ b/actix-utils/src/counter.rs @@ -1,24 +1,18 @@ //! Task-notifying counter. -use core::{cell::Cell, task}; +use core::{cell::Cell, fmt, task}; use std::rc::Rc; use local_waker::LocalWaker; -#[derive(Clone)] /// Simple counter with ability to notify task on reaching specific number /// /// Counter could be cloned, total n-count is shared across all clones. +#[derive(Debug, Clone)] pub struct Counter(Rc); -struct CounterInner { - count: Cell, - capacity: usize, - task: LocalWaker, -} - impl Counter { - /// Create `Counter` instance and set max value. + /// Create `Counter` instance with max value. pub fn new(capacity: usize) -> Self { Counter(Rc::new(CounterInner { capacity, @@ -27,38 +21,26 @@ impl Counter { })) } - /// Get counter guard. + /// Create new counter guard, incrementing the counter. pub fn get(&self) -> CounterGuard { CounterGuard::new(self.0.clone()) } - /// Check if counter is not at capacity. If counter at capacity - /// it registers notification for current task. + /// Notify current task and return true if counter is at capacity. pub fn available(&self, cx: &mut task::Context<'_>) -> bool { self.0.available(cx) } - /// Get total number of acquired counts + /// Get total number of acquired guards. pub fn total(&self) -> usize { self.0.count.get() } } -pub struct CounterGuard(Rc); - -impl CounterGuard { - fn new(inner: Rc) -> Self { - inner.inc(); - CounterGuard(inner) - } -} - -impl Unpin for CounterGuard {} - -impl Drop for CounterGuard { - fn drop(&mut self) { - self.0.dec(); - } +struct CounterInner { + count: Cell, + capacity: usize, + task: LocalWaker, } impl CounterInner { @@ -83,3 +65,32 @@ impl CounterInner { } } } + +impl fmt::Debug for CounterInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Counter") + .field("count", &self.count.get()) + .field("capacity", &self.capacity) + .field("task", &self.task) + .finish() + } +} + +/// An RAII structure that keeps the underlying counter incremented until this guard is dropped. +#[derive(Debug)] +pub struct CounterGuard(Rc); + +impl CounterGuard { + fn new(inner: Rc) -> Self { + inner.inc(); + CounterGuard(inner) + } +} + +impl Unpin for CounterGuard {} + +impl Drop for CounterGuard { + fn drop(&mut self) { + self.0.dec(); + } +} diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index d0e057ff..f94147ec 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -1,7 +1,7 @@ -//! Various network related services and utilities for the Actix ecosystem. +//! Various utilities for the Actix ecosystem. #![deny(rust_2018_idioms, nonstandard_style)] -#![allow(clippy::type_complexity)] +#![warn(missing_docs)] #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] diff --git a/local-channel/Cargo.toml b/local-channel/Cargo.toml index a9d3691e..0ffd3597 100644 --- a/local-channel/Cargo.toml +++ b/local-channel/Cargo.toml @@ -1,16 +1,15 @@ [package] name = "local-channel" -version = "0.1.1" +version = "0.1.2" description = "A non-threadsafe multi-producer, single-consumer, futures-aware, FIFO queue" authors = [ "Nikolay Kim ", "Rob Ede ", ] -edition = "2018" -license = "MIT OR Apache-2.0" repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-server" keywords = ["channel", "local", "futures"] +license = "MIT OR Apache-2.0" +edition = "2018" [dependencies] futures-core = { version = "0.3.7", default-features = false } diff --git a/local-waker/CHANGES.md b/local-waker/CHANGES.md index edb5aa3e..5caf69ca 100644 --- a/local-waker/CHANGES.md +++ b/local-waker/CHANGES.md @@ -3,5 +3,9 @@ ## Unreleased - 2021-xx-xx +## 0.1.2 - 2021-04-01 +* Fix crate metadata. + + ## 0.1.1 - 2021-03-29 * Move `LocalWaker` to it's own crate. diff --git a/local-waker/Cargo.toml b/local-waker/Cargo.toml index df1f9ab8..af512966 100644 --- a/local-waker/Cargo.toml +++ b/local-waker/Cargo.toml @@ -8,7 +8,6 @@ authors = [ ] keywords = ["waker", "local", "futures", "no-std"] repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/local-waker" categories = ["asynchronous", "no-std"] license = "MIT OR Apache-2.0" edition = "2018"