From f21eaa954fe756f22ae0b9483281f27a5aacf179 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 31 Mar 2021 22:55:33 -0700 Subject: [PATCH 1/6] Reduce size of Conn by removing unused addr field (#304) --- actix-server/src/accept.rs | 3 +-- actix-server/src/socket.rs | 10 +++------- actix-server/src/worker.rs | 3 +-- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index f5484434..2b9c7206 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -391,11 +391,10 @@ impl Accept { .expect("ServerSocketInfo is removed from Slab"); match info.lst.accept() { - Ok((io, addr)) => { + Ok(io) => { let msg = Conn { io, token: info.token, - peer: Some(addr), }; self.accept_one(sockets, msg); } diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index baf02cbe..0625cfda 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -40,15 +40,11 @@ impl MioListener { } } - pub(crate) fn accept(&self) -> io::Result<(MioStream, SocketAddr)> { + pub(crate) fn accept(&self) -> io::Result { match *self { - MioListener::Tcp(ref lst) => lst - .accept() - .map(|(stream, addr)| (MioStream::Tcp(stream), SocketAddr::Tcp(addr))), + MioListener::Tcp(ref lst) => lst.accept().map(|(stream, _)| MioStream::Tcp(stream)), #[cfg(unix)] - MioListener::Uds(ref lst) => lst - .accept() - .map(|(stream, addr)| (MioStream::Uds(stream), SocketAddr::Uds(addr))), + MioListener::Uds(ref lst) => lst.accept().map(|(stream, _)| MioStream::Uds(stream)), } } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index aa6d31fc..63c45757 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -14,7 +14,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; use crate::service::{BoxedServerService, InternalServiceFactory}; -use crate::socket::{MioStream, SocketAddr}; +use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::{join_all, Token}; @@ -31,7 +31,6 @@ pub(crate) struct StopCommand { pub(crate) struct Conn { pub io: MioStream, pub token: Token, - pub peer: Option, } static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); From ee3a548a8568e939021cc8c73e77638678d8c4ef Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Wed, 31 Mar 2021 23:45:49 -0700 Subject: [PATCH 2/6] Refactor Accept::accept_one (#303) --- actix-server/src/accept.rs | 94 +++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 46 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 2b9c7206..34bb029d 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -318,72 +318,74 @@ impl Accept { } } - fn accept_one(&mut self, sockets: &mut Slab, mut msg: Conn) { + fn accept_one(&mut self, sockets: &mut Slab, mut conn: Conn) { if self.backpressure { + // send_connection would remove fault worker from handles. + // worst case here is conn get dropped after all handles are gone. while !self.handles.is_empty() { - match self.handles[self.next].send(msg) { - Ok(_) => { - self.set_next(); - break; - } - Err(tmp) => { - // worker lost contact and could be gone. a message is sent to - // `ServerBuilder` future to notify it a new worker should be made - // after that remove the fault worker - self.srv.worker_faulted(self.handles[self.next].idx); - msg = tmp; - self.handles.swap_remove(self.next); - if self.handles.is_empty() { - error!("No workers"); - return; - } else if self.handles.len() <= self.next { - self.next = 0; - } - continue; - } + match self.send_connection(sockets, conn) { + Ok(_) => return, + Err(c) => conn = c, } } } else { + // Do one round and try to send conn to all workers until it succeed. + // Start from self.next. let mut idx = 0; while idx < self.handles.len() { idx += 1; if self.handles[self.next].available() { - match self.handles[self.next].send(msg) { - Ok(_) => { - self.set_next(); - return; - } - // worker lost contact and could be gone. a message is sent to - // `ServerBuilder` future to notify it a new worker should be made. - // after that remove the fault worker and enter backpressure if necessary. - Err(tmp) => { - self.srv.worker_faulted(self.handles[self.next].idx); - msg = tmp; - self.handles.swap_remove(self.next); - if self.handles.is_empty() { - error!("No workers"); - self.maybe_backpressure(sockets, true); - return; - } else if self.handles.len() <= self.next { - self.next = 0; - } - continue; - } + match self.send_connection(sockets, conn) { + Ok(_) => return, + Err(c) => conn = c, } + } else { + self.set_next(); } - self.set_next(); } - // enable backpressure + // Sending Conn failed due to either all workers are in error or not available. + // Enter backpressure state and try again. self.maybe_backpressure(sockets, true); - self.accept_one(sockets, msg); + self.accept_one(sockets, conn); } } - // set next worker handle that would accept work. + // Set next worker handle that would accept work. fn set_next(&mut self) { self.next = (self.next + 1) % self.handles.len(); } + // Send connection to worker and handle error. + fn send_connection( + &mut self, + sockets: &mut Slab, + conn: Conn, + ) -> Result<(), Conn> { + match self.handles[self.next].send(conn) { + Ok(_) => { + self.set_next(); + Ok(()) + } + Err(conn) => { + // worker lost contact and could be gone. a message is sent to + // `ServerBuilder` future to notify it a new worker should be made. + // after that remove the fault worker and enter backpressure if necessary. + self.srv.worker_faulted(self.handles[self.next].idx); + self.handles.swap_remove(self.next); + if self.handles.is_empty() { + error!("No workers"); + self.maybe_backpressure(sockets, true); + // All workers are gone and Conn is nowhere to be sent. + // Treat this situation as Ok and drop Conn. + return Ok(()); + } else if self.handles.len() <= self.next { + self.next = 0; + } + Err(conn) + } + } + } + fn accept(&mut self, sockets: &mut Slab, token: usize) { loop { let info = sockets From 2c5c9167a58ee81bcdfb4e7ad91c555c2bffce37 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 1 Apr 2021 00:25:24 -0700 Subject: [PATCH 3/6] =?UTF-8?q?Fix=20bug=20where=20timed=20out=20socket=20?= =?UTF-8?q?would=20register=20itself=20when=20server=20in=20b=E2=80=A6=20(?= =?UTF-8?q?#302)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Rob Ede --- actix-server/src/accept.rs | 56 +++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 25 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 34bb029d..2750d1c5 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -187,21 +187,19 @@ impl Accept { let mut guard = self.waker.guard(); match guard.pop_front() { // worker notify it becomes available. we may want to recover - // from backpressure. + // from backpressure. Some(WakerInterest::WorkerAvailable) => { drop(guard); self.maybe_backpressure(&mut sockets, false); } - // a new worker thread is made and it's handle would be added - // to Accept + // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { drop(guard); // maybe we want to recover from a backpressure. self.maybe_backpressure(&mut sockets, false); self.handles.push(handle); } - // got timer interest and it's time to try register socket(s) - // again. + // got timer interest and it's time to try register socket(s) again Some(WakerInterest::Timer) => { drop(guard); self.process_timer(&mut sockets) @@ -238,16 +236,23 @@ impl Accept { fn process_timer(&self, sockets: &mut Slab) { let now = Instant::now(); - sockets.iter_mut().for_each(|(token, info)| { - // only the ServerSocketInfo have an associate timeout value was de registered. - if let Some(inst) = info.timeout.take() { - if now > inst { - self.register_logged(token, info); - } else { + sockets + .iter_mut() + // Only sockets that had an associated timeout were deregistered. + .filter(|(_, info)| info.timeout.is_some()) + .for_each(|(token, info)| { + let inst = info.timeout.take().unwrap(); + + if now < inst { info.timeout = Some(inst); + } else if !self.backpressure { + self.register_logged(token, info); } - } - }); + + // Drop the timeout if server is in backpressure and socket timeout is expired. + // When server recovers from backpressure it will register all sockets without + // a timeout value so this socket register will be delayed till then. + }); } #[cfg(not(target_os = "windows"))] @@ -301,20 +306,21 @@ impl Accept { } fn maybe_backpressure(&mut self, sockets: &mut Slab, on: bool) { - if self.backpressure { - if !on { + // Only operate when server is in a different backpressure than the given flag. + if self.backpressure != on { + if on { + self.backpressure = true; + // TODO: figure out if timing out sockets can be safely de-registered twice. + self.deregister_all(sockets); + } else { self.backpressure = false; - for (token, info) in sockets.iter_mut() { - if info.timeout.is_some() { - // socket will attempt to re-register itself when its timeout completes - continue; - } - self.register_logged(token, info); - } + sockets + .iter_mut() + // Only operate on sockets without associated timeout. + // Sockets with it will attempt to re-register when their timeout expires. + .filter(|(_, info)| info.timeout.is_none()) + .for_each(|(token, info)| self.register_logged(token, info)); } - } else if on { - self.backpressure = true; - self.deregister_all(sockets); } } From b09e7cd417b3b453fd2fb31ebe26cfe9c2191e75 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 1 Apr 2021 09:01:56 +0100 Subject: [PATCH 4/6] fix local waker metadata --- .cargo/config.toml | 2 +- local-channel/Cargo.toml | 7 +++---- local-waker/CHANGES.md | 4 ++++ local-waker/Cargo.toml | 1 - 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/.cargo/config.toml b/.cargo/config.toml index 77788410..40fe3e57 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,3 @@ [alias] +chk = "hack check --workspace --all-features --tests --examples" lint = "hack --clean-per-run clippy --workspace --tests --examples" -chk = "hack check --workspace --tests --examples" diff --git a/local-channel/Cargo.toml b/local-channel/Cargo.toml index a9d3691e..0ffd3597 100644 --- a/local-channel/Cargo.toml +++ b/local-channel/Cargo.toml @@ -1,16 +1,15 @@ [package] name = "local-channel" -version = "0.1.1" +version = "0.1.2" description = "A non-threadsafe multi-producer, single-consumer, futures-aware, FIFO queue" authors = [ "Nikolay Kim ", "Rob Ede ", ] -edition = "2018" -license = "MIT OR Apache-2.0" repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-server" keywords = ["channel", "local", "futures"] +license = "MIT OR Apache-2.0" +edition = "2018" [dependencies] futures-core = { version = "0.3.7", default-features = false } diff --git a/local-waker/CHANGES.md b/local-waker/CHANGES.md index edb5aa3e..5caf69ca 100644 --- a/local-waker/CHANGES.md +++ b/local-waker/CHANGES.md @@ -3,5 +3,9 @@ ## Unreleased - 2021-xx-xx +## 0.1.2 - 2021-04-01 +* Fix crate metadata. + + ## 0.1.1 - 2021-03-29 * Move `LocalWaker` to it's own crate. diff --git a/local-waker/Cargo.toml b/local-waker/Cargo.toml index df1f9ab8..af512966 100644 --- a/local-waker/Cargo.toml +++ b/local-waker/Cargo.toml @@ -8,7 +8,6 @@ authors = [ ] keywords = ["waker", "local", "futures", "no-std"] repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/local-waker" categories = ["asynchronous", "no-std"] license = "MIT OR Apache-2.0" edition = "2018" From 4eebdf4070d44709768263db3d10e0f6b3013992 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 1 Apr 2021 09:31:42 +0100 Subject: [PATCH 5/6] prepare actix-utils release 3.0.0-beta.3 --- actix-utils/CHANGES.md | 3 ++ actix-utils/Cargo.toml | 11 ++++--- actix-utils/src/counter.rs | 67 ++++++++++++++++++++++---------------- actix-utils/src/lib.rs | 4 +-- 4 files changed, 50 insertions(+), 35 deletions(-) diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index c911a211..57ab7add 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx + + +## 3.0.0-beta.3 - 2021-04-01 * Moved `mpsc` to own crate `local-channel`. [#301] * Moved `task::LocalWaker` to own crate `local-waker`. [#301] * Remove `timeout` module. [#301] diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 02bc3114..019d6d98 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,12 +1,13 @@ [package] name = "actix-utils" -version = "3.0.0-beta.2" -authors = ["Nikolay Kim "] -description = "Various network related services and utilities for the Actix ecosystem" +version = "3.0.0-beta.3" +authors = [ + "Nikolay Kim ", + "Rob Ede ", +] +description = "Utilities for the Actix ecosystem" keywords = ["network", "framework", "async", "futures"] -homepage = "https://actix.rs" repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-utils" categories = ["network-programming", "asynchronous"] license = "MIT OR Apache-2.0" edition = "2018" diff --git a/actix-utils/src/counter.rs b/actix-utils/src/counter.rs index c0926b73..7a87fa3d 100644 --- a/actix-utils/src/counter.rs +++ b/actix-utils/src/counter.rs @@ -1,24 +1,18 @@ //! Task-notifying counter. -use core::{cell::Cell, task}; +use core::{cell::Cell, fmt, task}; use std::rc::Rc; use local_waker::LocalWaker; -#[derive(Clone)] /// Simple counter with ability to notify task on reaching specific number /// /// Counter could be cloned, total n-count is shared across all clones. +#[derive(Debug, Clone)] pub struct Counter(Rc); -struct CounterInner { - count: Cell, - capacity: usize, - task: LocalWaker, -} - impl Counter { - /// Create `Counter` instance and set max value. + /// Create `Counter` instance with max value. pub fn new(capacity: usize) -> Self { Counter(Rc::new(CounterInner { capacity, @@ -27,38 +21,26 @@ impl Counter { })) } - /// Get counter guard. + /// Create new counter guard, incrementing the counter. pub fn get(&self) -> CounterGuard { CounterGuard::new(self.0.clone()) } - /// Check if counter is not at capacity. If counter at capacity - /// it registers notification for current task. + /// Notify current task and return true if counter is at capacity. pub fn available(&self, cx: &mut task::Context<'_>) -> bool { self.0.available(cx) } - /// Get total number of acquired counts + /// Get total number of acquired guards. pub fn total(&self) -> usize { self.0.count.get() } } -pub struct CounterGuard(Rc); - -impl CounterGuard { - fn new(inner: Rc) -> Self { - inner.inc(); - CounterGuard(inner) - } -} - -impl Unpin for CounterGuard {} - -impl Drop for CounterGuard { - fn drop(&mut self) { - self.0.dec(); - } +struct CounterInner { + count: Cell, + capacity: usize, + task: LocalWaker, } impl CounterInner { @@ -83,3 +65,32 @@ impl CounterInner { } } } + +impl fmt::Debug for CounterInner { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Counter") + .field("count", &self.count.get()) + .field("capacity", &self.capacity) + .field("task", &self.task) + .finish() + } +} + +/// An RAII structure that keeps the underlying counter incremented until this guard is dropped. +#[derive(Debug)] +pub struct CounterGuard(Rc); + +impl CounterGuard { + fn new(inner: Rc) -> Self { + inner.inc(); + CounterGuard(inner) + } +} + +impl Unpin for CounterGuard {} + +impl Drop for CounterGuard { + fn drop(&mut self) { + self.0.dec(); + } +} diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index d0e057ff..f94147ec 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -1,7 +1,7 @@ -//! Various network related services and utilities for the Actix ecosystem. +//! Various utilities for the Actix ecosystem. #![deny(rust_2018_idioms, nonstandard_style)] -#![allow(clippy::type_complexity)] +#![warn(missing_docs)] #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] From b068ea16f89afecf6becf56178098742342e67ab Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Thu, 1 Apr 2021 09:36:07 +0100 Subject: [PATCH 6/6] prepare server release 2.0.0-beta.4 --- actix-server/CHANGES.md | 5 ++++- actix-server/Cargo.toml | 3 +-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index aaa38911..60d7ce37 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,7 +1,10 @@ # Changes ## Unreleased - 2021-xx-xx -* Prevent panic when shutdown_timeout is very large. [f9262db] + + +## 2.0.0-beta.4 - 2021-04-01 +* Prevent panic when `shutdown_timeout` is very large. [f9262db] [f9262db]: https://github.com/actix/actix-net/commit/f9262db diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 620cbf51..e557dbd2 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-server" -version = "2.0.0-beta.3" +version = "2.0.0-beta.4" authors = [ "Nikolay Kim ", "fakeshadow <24548779@qq.com>", @@ -9,7 +9,6 @@ description = "General purpose TCP server built for the Actix ecosystem" keywords = ["network", "framework", "async", "futures"] homepage = "https://actix.rs" repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-server" categories = ["network-programming", "asynchronous"] license = "MIT OR Apache-2.0" edition = "2018"