diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index f6102cbf..fd893454 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -3,6 +3,10 @@ ## Unreleased - 2021-xx-xx +## 0.4.0 - 2021-04-20 +* No significant changes since v0.4.0-beta.1. + + ## 0.4.0-beta.1 - 2020-12-28 * Replace `pin-project` with `pin-project-lite`. [#237] * Upgrade `tokio` dependency to `1`. [#237] @@ -23,28 +27,28 @@ ## 0.3.0-beta.1 - 2020-08-19 * Use `.advance()` instead of `.split_to()`. * Upgrade `tokio-util` to `0.3`. -* Improve `BytesCodec` `.encode()` performance -* Simplify `BytesCodec` `.decode()` +* Improve `BytesCodec::encode()` performance. +* Simplify `BytesCodec::decode()`. * Rename methods on `Framed` to better describe their use. * Add method on `Framed` to get a pinned reference to the underlying I/O. * Add method on `Framed` check emptiness of read buffer. ## 0.2.0 - 2019-12-10 -* Use specific futures dependencies +* Use specific futures dependencies. ## 0.2.0-alpha.4 -* Fix buffer remaining capacity calculation +* Fix buffer remaining capacity calculation. ## 0.2.0-alpha.3 -* Use tokio 0.2 -* Fix low/high watermark for write/read buffers +* Use tokio 0.2. +* Fix low/high watermark for write/read buffers. ## 0.2.0-alpha.2 -* Migrated to `std::future` +* Migrated to `std::future`. ## 0.1.2 - 2019-03-27 @@ -56,4 +60,4 @@ ## 0.1.0 - 2018-12-09 -* Move codec to separate crate +* Move codec to separate crate. diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 95a24764..815f1039 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -1,12 +1,10 @@ [package] name = "actix-codec" -version = "0.4.0-beta.1" +version = "0.4.0" authors = ["Nikolay Kim "] description = "Codec utilities for working with framed protocols" keywords = ["network", "framework", "async", "futures"] -homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-codec" +repository = "https://github.com/actix/actix-net" categories = ["network-programming", "asynchronous"] license = "MIT OR Apache-2.0" edition = "2018" diff --git a/actix-router/CHANGES.md b/actix-router/CHANGES.md index 4c19aedc..d3f15405 100644 --- a/actix-router/CHANGES.md +++ b/actix-router/CHANGES.md @@ -3,6 +3,21 @@ ## Unreleased - 2021-xx-xx +## 0.4.0 - 2021-06-06 +* When matching path parameters, `%25` is now kept in the percent-encoded form; no longer decoded to `%`. [#357] +* Path tail patterns now match new lines (`\n`) in request URL. [#360] +* Fixed a safety bug where `Path` could return a malformed string after percent decoding. [#359] +* Methods `Path::{add, add_static}` now take `impl Into>`. [#345] + +[#345]: https://github.com/actix/actix-net/pull/345 +[#357]: https://github.com/actix/actix-net/pull/357 +[#359]: https://github.com/actix/actix-net/pull/359 +[#360]: https://github.com/actix/actix-net/pull/360 + +## 0.3.0 - 2019-12-31 +* Version was yanked previously. See https://crates.io/crates/actix-router/0.3.0 + + ## 0.2.7 - 2021-02-06 * Add `Router::recognize_checked` [#247] diff --git a/actix-router/Cargo.toml b/actix-router/Cargo.toml index f55c2c38..5f0c22e2 100644 --- a/actix-router/Cargo.toml +++ b/actix-router/Cargo.toml @@ -1,12 +1,10 @@ [package] name = "actix-router" -version = "0.2.7" +version = "0.4.0" authors = ["Nikolay Kim "] description = "Resource path matching library" keywords = ["actix", "router", "routing"] -homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-router" +repository = "https://github.com/actix/actix-net" license = "MIT OR Apache-2.0" edition = "2018" diff --git a/actix-router/src/path.rs b/actix-router/src/path.rs index 76fd97da..6e4e2fdf 100644 --- a/actix-router/src/path.rs +++ b/actix-router/src/path.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::ops::Index; use serde::de; @@ -5,12 +6,18 @@ use serde::de; use crate::de::PathDeserializer; use crate::{Resource, ResourcePath}; -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub(crate) enum PathItem { - Static(&'static str), + Static(Cow<'static, str>), Segment(u16, u16), } +impl Default for PathItem { + fn default() -> Self { + Self::Static(Cow::Borrowed("")) + } +} + /// Resource path match information /// /// If resource path contains variable patterns, `Path` stores them. @@ -18,7 +25,7 @@ pub(crate) enum PathItem { pub struct Path { path: T, pub(crate) skip: u16, - pub(crate) segments: Vec<(&'static str, PathItem)>, + pub(crate) segments: Vec<(Cow<'static, str>, PathItem)>, } impl Default for Path { @@ -95,18 +102,24 @@ impl Path { self.skip += n; } - pub(crate) fn add(&mut self, name: &'static str, value: PathItem) { + pub(crate) fn add(&mut self, name: impl Into>, value: PathItem) { match value { - PathItem::Static(s) => self.segments.push((name, PathItem::Static(s))), - PathItem::Segment(begin, end) => self - .segments - .push((name, PathItem::Segment(self.skip + begin, self.skip + end))), + PathItem::Static(s) => self.segments.push((name.into(), PathItem::Static(s))), + PathItem::Segment(begin, end) => self.segments.push(( + name.into(), + PathItem::Segment(self.skip + begin, self.skip + end), + )), } } #[doc(hidden)] - pub fn add_static(&mut self, name: &'static str, value: &'static str) { - self.segments.push((name, PathItem::Static(value))); + pub fn add_static( + &mut self, + name: impl Into>, + value: impl Into>, + ) { + self.segments + .push((name.into(), PathItem::Static(value.into()))); } /// Check if there are any matched patterns diff --git a/actix-router/src/resource.rs b/actix-router/src/resource.rs index 32162c53..d0caaebf 100644 --- a/actix-router/src/resource.rs +++ b/actix-router/src/resource.rs @@ -1,6 +1,7 @@ use std::cmp::min; use std::collections::HashMap; use std::hash::{Hash, Hasher}; +use std::mem; use regex::{escape, Regex, RegexSet}; @@ -9,6 +10,11 @@ use crate::{IntoPattern, Resource, ResourcePath}; const MAX_DYNAMIC_SEGMENTS: usize = 16; +/// Regex flags to allow '.' in regex to match '\n' +/// +/// See the docs under: https://docs.rs/regex/1.5.4/regex/#grouping-and-flags +const REGEX_FLAGS: &str = "(?s-m)"; + /// ResourceDef describes an entry in resources table /// /// Resource definition can contain only 16 dynamic segments @@ -272,15 +278,12 @@ impl ResourceDef { true } PatternType::Dynamic(ref re, ref names, len) => { - let mut idx = 0; let mut pos = 0; - let mut segments: [PathItem; MAX_DYNAMIC_SEGMENTS] = - [PathItem::Static(""); MAX_DYNAMIC_SEGMENTS]; + let mut segments: [PathItem; MAX_DYNAMIC_SEGMENTS] = Default::default(); if let Some(captures) = re.captures(path.path()) { for (no, name) in names.iter().enumerate() { if let Some(m) = captures.name(&name) { - idx += 1; pos = m.end(); segments[no] = PathItem::Segment(m.start() as u16, m.end() as u16); } else { @@ -294,8 +297,8 @@ impl ResourceDef { } else { return false; } - for idx in 0..idx { - path.add(names[idx], segments[idx]); + for i in 0..names.len() { + path.add(names[i], mem::take(&mut segments[i])); } path.skip((pos + len) as u16); true @@ -303,15 +306,12 @@ impl ResourceDef { PatternType::DynamicSet(ref re, ref params) => { if let Some(idx) = re.matches(path.path()).into_iter().next() { let (ref pattern, ref names, len) = params[idx]; - let mut idx = 0; let mut pos = 0; - let mut segments: [PathItem; MAX_DYNAMIC_SEGMENTS] = - [PathItem::Static(""); MAX_DYNAMIC_SEGMENTS]; + let mut segments: [PathItem; MAX_DYNAMIC_SEGMENTS] = Default::default(); if let Some(captures) = pattern.captures(path.path()) { for (no, name) in names.iter().enumerate() { if let Some(m) = captures.name(&name) { - idx += 1; pos = m.end(); segments[no] = PathItem::Segment(m.start() as u16, m.end() as u16); @@ -326,8 +326,8 @@ impl ResourceDef { } else { return false; } - for idx in 0..idx { - path.add(names[idx], segments[idx]); + for i in 0..names.len() { + path.add(names[i], mem::take(&mut segments[i])); } path.skip((pos + len) as u16); true @@ -385,15 +385,12 @@ impl ResourceDef { true } PatternType::Dynamic(ref re, ref names, len) => { - let mut idx = 0; let mut pos = 0; - let mut segments: [PathItem; MAX_DYNAMIC_SEGMENTS] = - [PathItem::Static(""); MAX_DYNAMIC_SEGMENTS]; + let mut segments: [PathItem; MAX_DYNAMIC_SEGMENTS] = Default::default(); if let Some(captures) = re.captures(res.resource_path().path()) { for (no, name) in names.iter().enumerate() { if let Some(m) = captures.name(&name) { - idx += 1; pos = m.end(); segments[no] = PathItem::Segment(m.start() as u16, m.end() as u16); } else { @@ -413,8 +410,8 @@ impl ResourceDef { } let path = res.resource_path(); - for idx in 0..idx { - path.add(names[idx], segments[idx]); + for i in 0..names.len() { + path.add(names[i], mem::take(&mut segments[i])); } path.skip((pos + len) as u16); true @@ -423,15 +420,12 @@ impl ResourceDef { let path = res.resource_path().path(); if let Some(idx) = re.matches(path).into_iter().next() { let (ref pattern, ref names, len) = params[idx]; - let mut idx = 0; let mut pos = 0; - let mut segments: [PathItem; MAX_DYNAMIC_SEGMENTS] = - [PathItem::Static(""); MAX_DYNAMIC_SEGMENTS]; + let mut segments: [PathItem; MAX_DYNAMIC_SEGMENTS] = Default::default(); if let Some(captures) = pattern.captures(path) { for (no, name) in names.iter().enumerate() { if let Some(m) = captures.name(&name) { - idx += 1; pos = m.end(); segments[no] = PathItem::Segment(m.start() as u16, m.end() as u16); @@ -452,8 +446,8 @@ impl ResourceDef { } let path = res.resource_path(); - for idx in 0..idx { - path.add(names[idx], segments[idx]); + for i in 0..names.len() { + path.add(names[i], mem::take(&mut segments[i])); } path.skip((pos + len) as u16); true @@ -582,7 +576,7 @@ impl ResourceDef { ) -> (String, Vec, bool, usize) { if pattern.find('{').is_none() { return if let Some(path) = pattern.strip_suffix('*') { - let re = String::from("^") + path + "(.*)"; + let re = format!("{}^{}(.*)", REGEX_FLAGS, path); (re, vec![PatternElement::Str(String::from(path))], true, 0) } else { ( @@ -595,7 +589,7 @@ impl ResourceDef { } let mut elements = Vec::new(); - let mut re = String::from("^"); + let mut re = format!("{}^", REGEX_FLAGS); let mut dyn_elements = 0; while let Some(idx) = pattern.find('{') { @@ -828,6 +822,32 @@ mod tests { assert!(re.is_match("/user/2345/sdg")); } + #[test] + fn test_newline() { + let re = ResourceDef::new("/user/a\nb"); + assert!(re.is_match("/user/a\nb")); + assert!(!re.is_match("/user/a\nb/profile")); + + let re = ResourceDef::new("/a{x}b/test/a{y}b"); + let mut path = Path::new("/a\nb/test/a\nb"); + assert!(re.match_path(&mut path)); + assert_eq!(path.get("x").unwrap(), "\n"); + assert_eq!(path.get("y").unwrap(), "\n"); + + let re = ResourceDef::new("/user/*"); + assert!(re.is_match("/user/a\nb/")); + + let re = ResourceDef::new("/user/{id}*"); + let mut path = Path::new("/user/a\nb/a\nb"); + assert!(re.match_path(&mut path)); + assert_eq!(path.get("id").unwrap(), "a\nb/a\nb"); + + let re = ResourceDef::new("/user/{id:.*}"); + let mut path = Path::new("/user/a\nb/a\nb"); + assert!(re.match_path(&mut path)); + assert_eq!(path.get("id").unwrap(), "a\nb/a\nb"); + } + #[cfg(feature = "http")] #[test] fn test_parse_urlencoded_param() { diff --git a/actix-router/src/url.rs b/actix-router/src/url.rs index f669da99..130ac76f 100644 --- a/actix-router/src/url.rs +++ b/actix-router/src/url.rs @@ -31,7 +31,7 @@ fn set_bit(array: &mut [u8], ch: u8) { } thread_local! { - static DEFAULT_QUOTER: Quoter = Quoter::new(b"@:", b"/+"); + static DEFAULT_QUOTER: Quoter = Quoter::new(b"@:", b"%/+"); } #[derive(Default, Clone, Debug)] @@ -170,11 +170,7 @@ impl Quoter { idx += 1; } - cloned.map(|data| { - // SAFETY: we get data from http::Uri, which does UTF-8 checks already - // this code only decodes valid pct encoded values - unsafe { String::from_utf8_unchecked(data) } - }) + cloned.map(|data| String::from_utf8_lossy(&data).into_owned()) } } @@ -204,24 +200,69 @@ mod tests { use super::*; use crate::{Path, ResourceDef}; + const PROTECTED: &[u8] = b"%/+"; + + fn match_url(pattern: &'static str, url: impl AsRef) -> Path { + let re = ResourceDef::new(pattern); + let uri = Uri::try_from(url.as_ref()).unwrap(); + let mut path = Path::new(Url::new(uri)); + assert!(re.match_path(&mut path)); + path + } + + fn percent_encode(data: &[u8]) -> String { + data.into_iter().map(|c| format!("%{:02X}", c)).collect() + } + #[test] fn test_parse_url() { - let re = ResourceDef::new("/user/{id}/test"); + let re = "/user/{id}/test"; - let url = Uri::try_from("/user/2345/test").unwrap(); - let mut path = Path::new(Url::new(url)); - assert!(re.match_path(&mut path)); + let path = match_url(re, "/user/2345/test"); assert_eq!(path.get("id").unwrap(), "2345"); - let url = Uri::try_from("/user/qwe%25/test").unwrap(); - let mut path = Path::new(Url::new(url)); - assert!(re.match_path(&mut path)); - assert_eq!(path.get("id").unwrap(), "qwe%"); + // "%25" should never be decoded into '%' to gurantee the output is a valid + // percent-encoded format + let path = match_url(re, "/user/qwe%25/test"); + assert_eq!(path.get("id").unwrap(), "qwe%25"); - let url = Uri::try_from("/user/qwe%25rty/test").unwrap(); - let mut path = Path::new(Url::new(url)); - assert!(re.match_path(&mut path)); - assert_eq!(path.get("id").unwrap(), "qwe%rty"); + let path = match_url(re, "/user/qwe%25rty/test"); + assert_eq!(path.get("id").unwrap(), "qwe%25rty"); + } + + #[test] + fn test_protected_chars() { + let encoded = percent_encode(PROTECTED); + let path = match_url("/user/{id}/test", format!("/user/{}/test", encoded)); + assert_eq!(path.get("id").unwrap(), &encoded); + } + + #[test] + fn test_non_protecteed_ascii() { + let nonprotected_ascii = ('\u{0}'..='\u{7F}') + .filter(|&c| c.is_ascii() && !PROTECTED.contains(&(c as u8))) + .collect::(); + let encoded = percent_encode(nonprotected_ascii.as_bytes()); + let path = match_url("/user/{id}/test", format!("/user/{}/test", encoded)); + assert_eq!(path.get("id").unwrap(), &nonprotected_ascii); + } + + #[test] + fn test_valid_utf8_multibyte() { + let test = ('\u{FF00}'..='\u{FFFF}').collect::(); + let encoded = percent_encode(test.as_bytes()); + let path = match_url("/a/{id}/b", format!("/a/{}/b", &encoded)); + assert_eq!(path.get("id").unwrap(), &test); + } + + #[test] + fn test_invalid_utf8() { + let invalid_utf8 = percent_encode((0x80..=0xff).collect::>().as_slice()); + let uri = Uri::try_from(format!("/{}", invalid_utf8)).unwrap(); + let path = Path::new(Url::new(uri)); + + // We should always get a valid utf8 string + assert!(String::from_utf8(path.path().as_bytes().to_owned()).is_ok()); } #[test] diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 7ccada0e..28c7b206 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,6 +1,13 @@ # Changes ## Unreleased - 2021-xx-xx +* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349] +* Remove `ServerBuilder::configure` [#349] + +[#349]: https://github.com/actix/actix-net/pull/349 + + +## 2.0.0-beta.5 - 2021-04-20 * Server shutdown would notify all workers to exit regardless if shutdown is graceful. This would make all worker shutdown immediately in force shutdown case. [#333] diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index a1cf520a..58471cf9 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -1,14 +1,13 @@ [package] name = "actix-server" -version = "2.0.0-beta.4" +version = "2.0.0-beta.5" authors = [ "Nikolay Kim ", "fakeshadow <24548779@qq.com>", ] description = "General purpose TCP server built for the Actix ecosystem" keywords = ["network", "framework", "async", "futures"] -homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-net.git" +repository = "https://github.com/actix/actix-net" categories = ["network-programming", "asynchronous"] license = "MIT OR Apache-2.0" edition = "2018" @@ -29,7 +28,6 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc log = "0.4" mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" -slab = "0.4" tokio = { version = "1.2", features = ["sync"] } [dev-dependencies] diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 23ba616c..d9451d37 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -7,21 +7,14 @@ use actix_rt::{ }; use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; -use slab::Slab; use crate::server::Server; -use crate::socket::{MioListener, SocketAddr}; +use crate::socket::MioListener; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandleAccept}; -use crate::Token; struct ServerSocketInfo { - /// Address of socket. Mainly used for logging. - addr: SocketAddr, - - /// Beware this is the crate token for identify socket and should not be confused - /// with `mio::Token`. - token: Token, + token: usize, lst: MioListener, @@ -65,7 +58,7 @@ impl AcceptLoop { pub(crate) fn start( &mut self, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, handles: Vec, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); @@ -84,7 +77,7 @@ struct Accept { srv: Server, next: usize, avail: Availability, - backpressure: bool, + paused: bool, } /// Array of u128 with every bit as marker for a worker handle's availability. @@ -98,23 +91,22 @@ impl Default for Availability { impl Availability { /// Check if any worker handle is available + #[inline(always)] fn available(&self) -> bool { self.0.iter().any(|a| *a != 0) } + /// Check if worker handle is available by index + #[inline(always)] + fn get_available(&self, idx: usize) -> bool { + let (offset, idx) = Self::offset(idx); + + self.0[offset] & (1 << idx as u128) != 0 + } + /// Set worker handle available state by index. fn set_available(&mut self, idx: usize, avail: bool) { - let (offset, idx) = if idx < 128 { - (0, idx) - } else if idx < 128 * 2 { - (1, idx - 128) - } else if idx < 128 * 3 { - (2, idx - 128 * 2) - } else if idx < 128 * 4 { - (3, idx - 128 * 3) - } else { - panic!("Max WorkerHandle count is 512") - }; + let (offset, idx) = Self::offset(idx); let off = 1 << idx as u128; if avail { @@ -131,6 +123,21 @@ impl Availability { self.set_available(handle.idx(), true); }) } + + /// Get offset and adjusted index of given worker handle index. + fn offset(idx: usize) -> (usize, usize) { + if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + } + } } /// This function defines errors that are per-connection. Which basically @@ -150,7 +157,7 @@ impl Accept { pub(crate) fn start( poll: Poll, waker: WakerQueue, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, srv: Server, handles: Vec, ) { @@ -161,10 +168,10 @@ impl Accept { .name("actix-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - let (mut accept, sockets) = + let (mut accept, mut sockets) = Accept::new_with_sockets(poll, waker, socks, handles, srv); - accept.poll_with(sockets); + accept.poll_with(&mut sockets); }) .unwrap(); } @@ -172,29 +179,25 @@ impl Accept { fn new_with_sockets( poll: Poll, waker: WakerQueue, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, handles: Vec, srv: Server, - ) -> (Accept, Slab) { - let mut sockets = Slab::new(); - for (hnd_token, mut lst) in socks.into_iter() { - let addr = lst.local_addr(); + ) -> (Accept, Vec) { + let sockets = socks + .into_iter() + .map(|(token, mut lst)| { + // Start listening for incoming connections + poll.registry() + .register(&mut lst, MioToken(token), Interest::READABLE) + .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - let entry = sockets.vacant_entry(); - let token = entry.key(); - - // Start listening for incoming connections - poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - - entry.insert(ServerSocketInfo { - addr, - token: hnd_token, - lst, - timeout: None, - }); - } + ServerSocketInfo { + token, + lst, + timeout: None, + } + }) + .collect(); let mut avail = Availability::default(); @@ -208,19 +211,19 @@ impl Accept { srv, next: 0, avail, - backpressure: false, + paused: false, }; (accept, sockets) } - fn poll_with(&mut self, mut sockets: Slab) { + fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { let mut events = mio::Events::with_capacity(128); loop { if let Err(e) = self.poll.poll(&mut events, None) { match e.kind() { - std::io::ErrorKind::Interrupted => continue, + io::ErrorKind::Interrupted => {} _ => panic!("Poll error: {}", e), } } @@ -228,130 +231,160 @@ impl Accept { for event in events.iter() { let token = event.token(); match token { - // This is a loop because interests for command from previous version was - // a loop that would try to drain the command channel. It's yet unknown - // if it's necessary/good practice to actively drain the waker queue. - WAKER_TOKEN => 'waker: loop { - // take guard with every iteration so no new interest can be added - // until the current task is done. - let mut guard = self.waker.guard(); - match guard.pop_front() { - // worker notify it becomes available. we may want to recover - // from backpressure. - Some(WakerInterest::WorkerAvailable(idx)) => { - drop(guard); - self.maybe_backpressure(&mut sockets, false); - self.avail.set_available(idx, true); - } - // a new worker thread is made and it's handle would be added to Accept - Some(WakerInterest::Worker(handle)) => { - drop(guard); - // maybe we want to recover from a backpressure. - self.maybe_backpressure(&mut sockets, false); - self.avail.set_available(handle.idx(), true); - self.handles.push(handle); - } - // got timer interest and it's time to try register socket(s) again - Some(WakerInterest::Timer) => { - drop(guard); - self.process_timer(&mut sockets) - } - Some(WakerInterest::Pause) => { - drop(guard); - self.deregister_all(&mut sockets); - } - Some(WakerInterest::Resume) => { - drop(guard); - sockets.iter_mut().for_each(|(token, info)| { - self.register_logged(token, info); - }); - } - Some(WakerInterest::Stop) => { - return self.deregister_all(&mut sockets); - } - // waker queue is drained - None => { - // Reset the WakerQueue before break so it does not grow infinitely - WakerQueue::reset(&mut guard); - break 'waker; - } + WAKER_TOKEN => { + let exit = self.handle_waker(sockets); + if exit { + info!("Accept is stopped."); + return; } - }, + } _ => { let token = usize::from(token); - self.accept(&mut sockets, token); + self.accept(sockets, token); } } } } } - fn process_timer(&self, sockets: &mut Slab) { + fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool { + // This is a loop because interests for command from previous version was + // a loop that would try to drain the command channel. It's yet unknown + // if it's necessary/good practice to actively drain the waker queue. + loop { + // take guard with every iteration so no new interest can be added + // until the current task is done. + let mut guard = self.waker.guard(); + match guard.pop_front() { + // worker notify it becomes available. + Some(WakerInterest::WorkerAvailable(idx)) => { + drop(guard); + + self.avail.set_available(idx, true); + + if !self.paused { + self.accept_all(sockets); + } + } + // a new worker thread is made and it's handle would be added to Accept + Some(WakerInterest::Worker(handle)) => { + drop(guard); + + self.avail.set_available(handle.idx(), true); + self.handles.push(handle); + + if !self.paused { + self.accept_all(sockets); + } + } + // got timer interest and it's time to try register socket(s) again + Some(WakerInterest::Timer) => { + drop(guard); + + self.process_timer(sockets) + } + Some(WakerInterest::Pause) => { + drop(guard); + + if !self.paused { + self.paused = true; + + self.deregister_all(sockets); + } + } + Some(WakerInterest::Resume) => { + drop(guard); + + if self.paused { + self.paused = false; + + sockets.iter_mut().for_each(|info| { + self.register_logged(info); + }); + + self.accept_all(sockets); + } + } + Some(WakerInterest::Stop) => { + if !self.paused { + self.deregister_all(sockets); + } + + return true; + } + // waker queue is drained + None => { + // Reset the WakerQueue before break so it does not grow infinitely + WakerQueue::reset(&mut guard); + + return false; + } + } + } + } + + fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { let now = Instant::now(); sockets .iter_mut() // Only sockets that had an associated timeout were deregistered. - .filter(|(_, info)| info.timeout.is_some()) - .for_each(|(token, info)| { + .filter(|info| info.timeout.is_some()) + .for_each(|info| { let inst = info.timeout.take().unwrap(); if now < inst { info.timeout = Some(inst); - } else if !self.backpressure { - self.register_logged(token, info); + } else if !self.paused { + self.register_logged(info); } - // Drop the timeout if server is in backpressure and socket timeout is expired. - // When server recovers from backpressure it will register all sockets without + // Drop the timeout if server is paused and socket timeout is expired. + // When server recovers from pause it will register all sockets without // a timeout value so this socket register will be delayed till then. }); } #[cfg(not(target_os = "windows"))] - fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { + let token = MioToken(info.token); self.poll .registry() - .register(&mut info.lst, MioToken(token), Interest::READABLE) + .register(&mut info.lst, token, Interest::READABLE) } #[cfg(target_os = "windows")] - fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { // On windows, calling register without deregister cause an error. // See https://github.com/actix/actix-web/issues/905 // Calling reregister seems to fix the issue. + let token = MioToken(info.token); self.poll .registry() - .register(&mut info.lst, mio::Token(token), Interest::READABLE) + .register(&mut info.lst, token, Interest::READABLE) .or_else(|_| { - self.poll.registry().reregister( - &mut info.lst, - mio::Token(token), - Interest::READABLE, - ) + self.poll + .registry() + .reregister(&mut info.lst, token, Interest::READABLE) }) } - fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) { - match self.register(token, info) { - Ok(_) => info!("Resume accepting connections on {}", info.addr), + fn register_logged(&self, info: &mut ServerSocketInfo) { + match self.register(info) { + Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()), Err(e) => error!("Can not register server socket {}", e), } } - fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> { - self.poll.registry().deregister(&mut info.lst) - } - fn deregister_logged(&self, info: &mut ServerSocketInfo) { - match self.deregister(info) { - Ok(_) => info!("Paused accepting connections on {}", info.addr), + match self.poll.registry().deregister(&mut info.lst) { + Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()), Err(e) => { error!("Can not deregister server socket {}", e) } } } - fn deregister_all(&self, sockets: &mut Slab) { + fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { // This is a best effort implementation with following limitation: // // Every ServerSocketInfo with associate timeout will be skipped and it's timeout @@ -364,70 +397,23 @@ impl Accept { .iter_mut() // Take all timeout. // This is to prevent Accept::process_timer method re-register a socket afterwards. - .map(|(_, info)| (info.timeout.take(), info)) + .map(|info| (info.timeout.take(), info)) // Socket info with a timeout is already deregistered so skip them. .filter(|(timeout, _)| timeout.is_none()) .for_each(|(_, info)| self.deregister_logged(info)); } - fn maybe_backpressure(&mut self, sockets: &mut Slab, on: bool) { - // Only operate when server is in a different backpressure than the given flag. - if self.backpressure != on { - self.backpressure = on; - sockets - .iter_mut() - // Only operate on sockets without associated timeout. - // Sockets with it should be handled by `accept` and `process_timer` methods. - // They are already deregistered or need to be reregister in the future. - .filter(|(_, info)| info.timeout.is_none()) - .for_each(|(token, info)| { - if on { - self.deregister_logged(info); - } else { - self.register_logged(token, info); - } - }); - } - } - - fn accept_one(&mut self, sockets: &mut Slab, mut conn: Conn) { - if self.backpressure { - // send_connection would remove fault worker from handles. - // worst case here is conn get dropped after all handles are gone. - while let Err(c) = self.send_connection(sockets, conn) { - conn = c - } - } else { - while self.avail.available() { - let next = self.next(); - let idx = next.idx(); - if next.available() { - self.avail.set_available(idx, true); - match self.send_connection(sockets, conn) { - Ok(_) => return, - Err(c) => conn = c, - } - } else { - self.avail.set_available(idx, false); - self.set_next(); - } - } - - // Sending Conn failed due to either all workers are in error or not available. - // Enter backpressure state and try again. - self.maybe_backpressure(sockets, true); - self.accept_one(sockets, conn); - } - } - // Send connection to worker and handle error. - fn send_connection( - &mut self, - sockets: &mut Slab, - conn: Conn, - ) -> Result<(), Conn> { - match self.next().send(conn) { + fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> { + let next = self.next(); + match next.send(conn) { Ok(_) => { + // Increment counter of WorkerHandle. + // Set worker to unavailable with it hit max (Return false). + if !next.inc_counter() { + let idx = next.idx(); + self.avail.set_available(idx, false); + } self.set_next(); Ok(()) } @@ -438,7 +424,6 @@ impl Accept { if self.handles.is_empty() { error!("No workers"); - self.maybe_backpressure(sockets, true); // All workers are gone and Conn is nowhere to be sent. // Treat this situation as Ok and drop Conn. return Ok(()); @@ -451,19 +436,38 @@ impl Accept { } } - fn accept(&mut self, sockets: &mut Slab, token: usize) { + fn accept_one(&mut self, mut conn: Conn) { loop { - let info = sockets - .get_mut(token) - .expect("ServerSocketInfo is removed from Slab"); + let next = self.next(); + let idx = next.idx(); + + if self.avail.get_available(idx) { + match self.send_connection(conn) { + Ok(_) => return, + Err(c) => conn = c, + } + } else { + self.avail.set_available(idx, false); + self.set_next(); + + if !self.avail.available() { + while let Err(c) = self.send_connection(conn) { + conn = c; + } + return; + } + } + } + } + + fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { + while self.avail.available() { + let info = &mut sockets[token]; match info.lst.accept() { Ok(io) => { - let msg = Conn { - io, - token: info.token, - }; - self.accept_one(sockets, msg); + let conn = Conn { io, token }; + self.accept_one(conn); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, @@ -491,11 +495,22 @@ impl Accept { } } + fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) { + sockets + .iter_mut() + .map(|info| info.token) + .collect::>() + .into_iter() + .for_each(|idx| self.accept(sockets, idx)) + } + + #[inline(always)] fn next(&self) -> &WorkerHandleAccept { &self.handles[self.next] } /// Set next worker handle that would accept connection. + #[inline(always)] fn set_next(&mut self) { self.next = (self.next + 1) % self.handles.len(); } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 66aba10c..e84a887d 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -8,31 +8,29 @@ use std::{ use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; use log::{error, info}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; -use tokio::sync::oneshot; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver}, + oneshot, +}; use crate::accept::AcceptLoop; -use crate::config::{ConfiguredService, ServiceConfig}; +use crate::join_all; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ - ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept, - WorkerHandleServer, -}; -use crate::{join_all, Token}; +use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; /// Server builder pub struct ServerBuilder { threads: usize, - token: Token, + token: usize, backlog: u32, handles: Vec<(usize, WorkerHandleServer)>, services: Vec>, - sockets: Vec<(Token, String, MioListener)>, + sockets: Vec<(usize, String, MioListener)>, accept: AcceptLoop, exit: bool, no_signals: bool, @@ -56,7 +54,7 @@ impl ServerBuilder { ServerBuilder { threads: num_cpus::get(), - token: Token::default(), + token: 0, handles: Vec::new(), services: Vec::new(), sockets: Vec::new(), @@ -149,32 +147,6 @@ impl ServerBuilder { self } - /// Execute external configuration as part of the server building process. - /// - /// This function is useful for moving parts of configuration to a different module or - /// even library. - pub fn configure(mut self, f: F) -> io::Result - where - F: Fn(&mut ServiceConfig) -> io::Result<()>, - { - let mut cfg = ServiceConfig::new(self.threads, self.backlog); - - f(&mut cfg)?; - - if let Some(apply) = cfg.apply { - let mut srv = ConfiguredService::new(apply); - for (name, lst) in cfg.services { - let token = self.token.next(); - srv.stream(token, name.clone(), lst.local_addr()?); - self.sockets.push((token, name, MioListener::Tcp(lst))); - } - self.services.push(Box::new(srv)); - } - self.threads = cfg.threads; - - Ok(self) - } - /// Add new service to the server. pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where @@ -184,7 +156,7 @@ impl ServerBuilder { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { - let token = self.token.next(); + let token = self.next_token(); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -233,7 +205,7 @@ impl ServerBuilder { { use std::net::{IpAddr, Ipv4Addr}; lst.set_nonblocking(true)?; - let token = self.token.next(); + let token = self.next_token(); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); self.services.push(StreamNewService::create( name.as_ref().to_string(), @@ -259,7 +231,7 @@ impl ServerBuilder { lst.set_nonblocking(true)?; let addr = lst.local_addr()?; - let token = self.token.next(); + let token = self.next_token(); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -318,12 +290,11 @@ impl ServerBuilder { fn start_worker( &self, idx: usize, - waker: WakerQueue, + waker_queue: WakerQueue, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let avail = WorkerAvailability::new(idx, waker); let services = self.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, services, avail, self.worker_config) + ServerWorker::start(idx, services, waker_queue, self.worker_config) } fn handle_cmd(&mut self, item: ServerCommand) { @@ -437,6 +408,12 @@ impl ServerBuilder { } } } + + fn next_token(&mut self) -> usize { + let token = self.token; + self.token += 1; + token + } } impl Future for ServerBuilder { diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs deleted file mode 100644 index c5e63630..00000000 --- a/actix-server/src/config.rs +++ /dev/null @@ -1,287 +0,0 @@ -use std::collections::HashMap; -use std::future::Future; -use std::{fmt, io}; - -use actix_rt::net::TcpStream; -use actix_service::{ - fn_service, IntoServiceFactory as IntoBaseServiceFactory, - ServiceFactory as BaseServiceFactory, -}; -use actix_utils::{counter::CounterGuard, future::ready}; -use futures_core::future::LocalBoxFuture; -use log::error; - -use crate::builder::bind_addr; -use crate::service::{BoxedServerService, InternalServiceFactory, StreamService}; -use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::Token; - -pub struct ServiceConfig { - pub(crate) services: Vec<(String, MioTcpListener)>, - pub(crate) apply: Option>, - pub(crate) threads: usize, - pub(crate) backlog: u32, -} - -impl ServiceConfig { - pub(super) fn new(threads: usize, backlog: u32) -> ServiceConfig { - ServiceConfig { - threads, - backlog, - services: Vec::new(), - apply: None, - } - } - - /// Set number of workers to start. - /// - /// By default server uses number of available logical cpu as workers - /// count. - pub fn workers(&mut self, num: usize) { - self.threads = num; - } - - /// Add new service to server - pub fn bind>(&mut self, name: N, addr: U) -> io::Result<&mut Self> - where - U: ToSocketAddrs, - { - let sockets = bind_addr(addr, self.backlog)?; - - for lst in sockets { - self._listen(name.as_ref(), lst); - } - - Ok(self) - } - - /// Add new service to server - pub fn listen>(&mut self, name: N, lst: StdTcpListener) -> &mut Self { - self._listen(name, MioTcpListener::from_std(lst)) - } - - /// Register service configuration function. This function get called - /// during worker runtime configuration. It get executed in worker thread. - pub fn apply(&mut self, f: F) -> io::Result<()> - where - F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, - { - self.apply = Some(Box::new(f)); - Ok(()) - } - - fn _listen>(&mut self, name: N, lst: MioTcpListener) -> &mut Self { - if self.apply.is_none() { - self.apply = Some(Box::new(not_configured)); - } - self.services.push((name.as_ref().to_string(), lst)); - self - } -} - -pub(super) struct ConfiguredService { - rt: Box, - names: HashMap, - topics: HashMap, - services: Vec, -} - -impl ConfiguredService { - pub(super) fn new(rt: Box) -> Self { - ConfiguredService { - rt, - names: HashMap::new(), - topics: HashMap::new(), - services: Vec::new(), - } - } - - pub(super) fn stream(&mut self, token: Token, name: String, addr: StdSocketAddr) { - self.names.insert(token, (name.clone(), addr)); - self.topics.insert(name, token); - self.services.push(token); - } -} - -impl InternalServiceFactory for ConfiguredService { - fn name(&self, token: Token) -> &str { - &self.names[&token].0 - } - - fn clone_factory(&self) -> Box { - Box::new(Self { - rt: self.rt.clone(), - names: self.names.clone(), - topics: self.topics.clone(), - services: self.services.clone(), - }) - } - - fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { - // configure services - let mut rt = ServiceRuntime::new(self.topics.clone()); - self.rt.configure(&mut rt); - rt.validate(); - let mut names = self.names.clone(); - let tokens = self.services.clone(); - - // construct services - Box::pin(async move { - let mut services = rt.services; - // TODO: Proper error handling here - for f in rt.onstart.into_iter() { - f.await; - } - let mut res = vec![]; - for token in tokens { - if let Some(srv) = services.remove(&token) { - let newserv = srv.new_service(()); - match newserv.await { - Ok(serv) => { - res.push((token, serv)); - } - Err(_) => { - error!("Can not construct service"); - return Err(()); - } - } - } else { - let name = names.remove(&token).unwrap().0; - res.push(( - token, - Box::new(StreamService::new(fn_service(move |_: TcpStream| { - error!("Service {:?} is not configured", name); - ready::>(Ok(())) - }))), - )); - }; - } - Ok(res) - }) - } -} - -pub(super) trait ServiceRuntimeConfiguration: Send { - fn clone(&self) -> Box; - - fn configure(&self, rt: &mut ServiceRuntime); -} - -impl ServiceRuntimeConfiguration for F -where - F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, -{ - fn clone(&self) -> Box { - Box::new(self.clone()) - } - - fn configure(&self, rt: &mut ServiceRuntime) { - (self)(rt) - } -} - -fn not_configured(_: &mut ServiceRuntime) { - error!("Service is not configured"); -} - -pub struct ServiceRuntime { - names: HashMap, - services: HashMap, - onstart: Vec>, -} - -impl ServiceRuntime { - fn new(names: HashMap) -> Self { - ServiceRuntime { - names, - services: HashMap::new(), - onstart: Vec::new(), - } - } - - fn validate(&self) { - for (name, token) in &self.names { - if !self.services.contains_key(&token) { - error!("Service {:?} is not configured", name); - } - } - } - - /// Register service. - /// - /// Name of the service must be registered during configuration stage with - /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods. - pub fn service(&mut self, name: &str, service: F) - where - F: IntoBaseServiceFactory, - T: BaseServiceFactory + 'static, - T::Future: 'static, - T::Service: 'static, - T::InitError: fmt::Debug, - { - // let name = name.to_owned(); - if let Some(token) = self.names.get(name) { - self.services.insert( - *token, - Box::new(ServiceFactory { - inner: service.into_factory(), - }), - ); - } else { - panic!("Unknown service: {:?}", name); - } - } - - /// Execute future before services initialization. - pub fn on_start(&mut self, fut: F) - where - F: Future + 'static, - { - self.onstart.push(Box::pin(fut)) - } -} - -type BoxedNewService = Box< - dyn BaseServiceFactory< - (CounterGuard, MioStream), - Response = (), - Error = (), - InitError = (), - Config = (), - Service = BoxedServerService, - Future = LocalBoxFuture<'static, Result>, - >, ->; - -struct ServiceFactory { - inner: T, -} - -impl BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory -where - T: BaseServiceFactory, - T::Future: 'static, - T::Service: 'static, - T::Error: 'static, - T::InitError: fmt::Debug + 'static, -{ - type Response = (); - type Error = (); - type Config = (); - type Service = BoxedServerService; - type InitError = (); - type Future = LocalBoxFuture<'static, Result>; - - fn new_service(&self, _: ()) -> Self::Future { - let fut = self.inner.new_service(()); - Box::pin(async move { - match fut.await { - Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), - Err(e) => { - error!("Can not construct service: {:?}", e); - Err(()) - } - } - }) - } -} diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index af9ab0b0..b2117191 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -6,7 +6,6 @@ mod accept; mod builder; -mod config; mod server; mod service; mod signals; @@ -16,7 +15,6 @@ mod waker_queue; mod worker; pub use self::builder::ServerBuilder; -pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::server::Server; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; @@ -28,28 +26,6 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -/// Socket ID token -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -pub(crate) struct Token(usize); - -impl Default for Token { - fn default() -> Self { - Self::new() - } -} - -impl Token { - fn new() -> Self { - Self(0) - } - - pub(crate) fn next(&mut self) -> Token { - let token = Token(self.0); - self.0 += 1; - token - } -} - /// Start server building process pub fn new() -> ServerBuilder { ServerBuilder::default() diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index da57af67..28ffb4f1 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -3,15 +3,12 @@ use std::net::SocketAddr; use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory as BaseServiceFactory}; -use actix_utils::{ - counter::CounterGuard, - future::{ready, Ready}, -}; +use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; use log::error; use crate::socket::{FromStream, MioStream}; -use crate::Token; +use crate::worker::WorkerCounterGuard; pub trait ServiceFactory: Send + Clone + 'static { type Factory: BaseServiceFactory; @@ -20,16 +17,16 @@ pub trait ServiceFactory: Send + Clone + 'static { } pub(crate) trait InternalServiceFactory: Send { - fn name(&self, token: Token) -> &str; + fn name(&self, token: usize) -> &str; fn clone_factory(&self) -> Box; - fn create(&self) -> LocalBoxFuture<'static, Result, ()>>; + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>; } pub(crate) type BoxedServerService = Box< dyn Service< - (CounterGuard, MioStream), + (WorkerCounterGuard, MioStream), Response = (), Error = (), Future = Ready>, @@ -50,7 +47,7 @@ impl StreamService { } } -impl Service<(CounterGuard, MioStream)> for StreamService +impl Service<(WorkerCounterGuard, MioStream)> for StreamService where S: Service, S::Future: 'static, @@ -65,7 +62,7 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future { + fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future { ready(match FromStream::from_mio(req) { Ok(stream) => { let f = self.service.call(stream); @@ -86,7 +83,7 @@ where pub(crate) struct StreamNewService, Io: FromStream> { name: String, inner: F, - token: Token, + token: usize, addr: SocketAddr, _t: PhantomData, } @@ -98,7 +95,7 @@ where { pub(crate) fn create( name: String, - token: Token, + token: usize, inner: F, addr: SocketAddr, ) -> Box { @@ -117,7 +114,7 @@ where F: ServiceFactory, Io: FromStream + Send + 'static, { - fn name(&self, _: Token) -> &str { + fn name(&self, _: usize) -> &str { &self.name } @@ -131,14 +128,14 @@ where }) } - fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { let token = self.token; let fut = self.inner.create().new_service(()); Box::pin(async move { match fut.await { Ok(inner) => { let service = Box::new(StreamService::new(inner)) as _; - Ok(vec![(token, service)]) + Ok((token, service)) } Err(_) => Err(()), } diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 948b5f1f..cd7ccc1a 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -23,9 +23,15 @@ pub(crate) enum MioListener { impl MioListener { pub(crate) fn local_addr(&self) -> SocketAddr { match *self { - MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), + MioListener::Tcp(ref lst) => lst + .local_addr() + .map(SocketAddr::Tcp) + .unwrap_or(SocketAddr::Unknown), #[cfg(unix)] - MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()), + MioListener::Uds(ref lst) => lst + .local_addr() + .map(SocketAddr::Uds) + .unwrap_or(SocketAddr::Unknown), } } @@ -110,14 +116,15 @@ impl fmt::Debug for MioListener { impl fmt::Display for MioListener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), + MioListener::Tcp(ref lst) => write!(f, "{:?}", lst), #[cfg(unix)] - MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), + MioListener::Uds(ref lst) => write!(f, "{:?}", lst), } } } pub(crate) enum SocketAddr { + Unknown, Tcp(StdSocketAddr), #[cfg(unix)] Uds(mio::net::SocketAddr), @@ -126,9 +133,10 @@ pub(crate) enum SocketAddr { impl fmt::Display for SocketAddr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), + Self::Unknown => write!(f, "Unknown SocketAddr"), + Self::Tcp(ref addr) => write!(f, "{}", addr), #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + Self::Uds(ref addr) => write!(f, "{:?}", addr), } } } @@ -136,9 +144,10 @@ impl fmt::Display for SocketAddr { impl fmt::Debug for SocketAddr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), + Self::Unknown => write!(f, "Unknown SocketAddr"), + Self::Tcp(ref addr) => write!(f, "{:?}", addr), #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + Self::Uds(ref addr) => write!(f, "{:?}", addr), } } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 7bc211b1..79f15b16 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -2,8 +2,9 @@ use std::{ future::Future, mem, pin::Pin, + rc::Rc, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }, task::{Context, Poll}, @@ -15,7 +16,6 @@ use actix_rt::{ time::{sleep, Instant, Sleep}, Arbiter, }; -use actix_utils::counter::Counter; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; use tokio::sync::{ @@ -23,10 +23,10 @@ use tokio::sync::{ oneshot, }; +use crate::join_all; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::{join_all, Token}; /// Stop worker message. Returns `true` on successful graceful shutdown. /// and `false` if some connections still alive when shutdown execute. @@ -38,35 +38,131 @@ pub(crate) struct Stop { #[derive(Debug)] pub(crate) struct Conn { pub io: MioStream, - pub token: Token, + pub token: usize, } fn handle_pair( idx: usize, tx1: UnboundedSender, tx2: UnboundedSender, - avail: WorkerAvailability, + counter: Counter, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let accept = WorkerHandleAccept { tx: tx1, avail }; + let accept = WorkerHandleAccept { + idx, + tx: tx1, + counter, + }; let server = WorkerHandleServer { idx, tx: tx2 }; (accept, server) } +/// counter: Arc field is owned by `Accept` thread and `ServerWorker` thread. +/// +/// `Accept` would increment the counter and `ServerWorker` would decrement it. +/// +/// # Atomic Ordering: +/// +/// `Accept` always look into it's cached `Availability` field for `ServerWorker` state. +/// It lazily increment counter after successful dispatching new work to `ServerWorker`. +/// On reaching counter limit `Accept` update it's cached `Availability` and mark worker as +/// unable to accept any work. +/// +/// `ServerWorker` always decrement the counter when every work received from `Accept` is done. +/// On reaching counter limit worker would use `mio::Waker` and `WakerQueue` to wake up `Accept` +/// and notify it to update cached `Availability` again to mark worker as able to accept work again. +/// +/// Hense a wake up would only happen after `Accept` increment it to limit. +/// And a decrement to limit always wake up `Accept`. +#[derive(Clone)] +pub(crate) struct Counter { + counter: Arc, + limit: usize, +} + +impl Counter { + pub(crate) fn new(limit: usize) -> Self { + Self { + counter: Arc::new(AtomicUsize::new(1)), + limit, + } + } + + /// Increment counter by 1 and return true when hitting limit + #[inline(always)] + pub(crate) fn inc(&self) -> bool { + self.counter.fetch_add(1, Ordering::Relaxed) != self.limit + } + + /// Decrement counter by 1 and return true if crossing limit. + #[inline(always)] + pub(crate) fn dec(&self) -> bool { + self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit + } + + pub(crate) fn total(&self) -> usize { + self.counter.load(Ordering::SeqCst) - 1 + } +} + +pub(crate) struct WorkerCounter { + idx: usize, + inner: Rc<(WakerQueue, Counter)>, +} + +impl Clone for WorkerCounter { + fn clone(&self) -> Self { + Self { + idx: self.idx, + inner: self.inner.clone(), + } + } +} + +impl WorkerCounter { + pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self { + Self { + idx, + inner: Rc::new((waker_queue, counter)), + } + } + + #[inline(always)] + pub(crate) fn guard(&self) -> WorkerCounterGuard { + WorkerCounterGuard(self.clone()) + } + + fn total(&self) -> usize { + self.inner.1.total() + } +} + +pub(crate) struct WorkerCounterGuard(WorkerCounter); + +impl Drop for WorkerCounterGuard { + fn drop(&mut self) { + let (waker_queue, counter) = &*self.0.inner; + if counter.dec() { + waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx)); + } + } +} + /// Handle to worker that can send connection message to worker and share the /// availability of worker to other thread. /// /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { + idx: usize, tx: UnboundedSender, - avail: WorkerAvailability, + counter: Counter, } impl WorkerHandleAccept { #[inline(always)] pub(crate) fn idx(&self) -> usize { - self.avail.idx + self.idx } #[inline(always)] @@ -75,8 +171,8 @@ impl WorkerHandleAccept { } #[inline(always)] - pub(crate) fn available(&self) -> bool { - self.avail.available() + pub(crate) fn inc_counter(&self) -> bool { + self.counter.inc() } } @@ -96,40 +192,6 @@ impl WorkerHandleServer { } } -#[derive(Clone)] -pub(crate) struct WorkerAvailability { - idx: usize, - waker: WakerQueue, - available: Arc, -} - -impl WorkerAvailability { - pub fn new(idx: usize, waker: WakerQueue) -> Self { - WorkerAvailability { - idx, - waker, - available: Arc::new(AtomicBool::new(false)), - } - } - - #[inline(always)] - pub fn available(&self) -> bool { - self.available.load(Ordering::Acquire) - } - - pub fn set(&self, val: bool) { - // Ordering: - // - // There could be multiple set calls happen in one ::poll. - // Order is important between them. - let old = self.available.swap(val, Ordering::AcqRel); - // Notify the accept on switched to available. - if !old && val { - self.waker.wake(WakerInterest::WorkerAvailable(self.idx)); - } - } -} - /// Service worker. /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. @@ -138,9 +200,8 @@ pub(crate) struct ServerWorker { // It must be dropped as soon as ServerWorker dropping. rx: UnboundedReceiver, rx2: UnboundedReceiver, + counter: WorkerCounter, services: Box<[WorkerService]>, - availability: WorkerAvailability, - conns: Counter, factories: Box<[Box]>, state: WorkerState, shutdown_timeout: Duration, @@ -207,15 +268,15 @@ impl ServerWorker { pub(crate) fn start( idx: usize, factories: Vec>, - availability: WorkerAvailability, + waker_queue: WakerQueue, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { - assert!(!availability.available()); - let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); - let avail = availability.clone(); + let counter = Counter::new(config.max_concurrent_connections); + + let counter_clone = counter.clone(); // every worker runs in it's own arbiter. // use a custom tokio runtime builder to change the settings of runtime. Arbiter::with_tokio_rt(move || { @@ -231,11 +292,7 @@ impl ServerWorker { .enumerate() .map(|(idx, factory)| { let fut = factory.create(); - async move { - fut.await.map(|r| { - r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() - }) - } + async move { fut.await.map(|(t, s)| (idx, t, s)) } }) .collect::>(); @@ -248,9 +305,8 @@ impl ServerWorker { let services = match res { Ok(res) => res .into_iter() - .flatten() .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token.0, services.len()); + assert_eq!(token, services.len()); services.push(WorkerService { factory, service, @@ -271,8 +327,7 @@ impl ServerWorker { rx, rx2, services, - availability, - conns: Counter::new(config.max_concurrent_connections), + counter: WorkerCounter::new(idx, waker_queue, counter_clone), factories: factories.into_boxed_slice(), state: Default::default(), shutdown_timeout: config.shutdown_timeout, @@ -280,16 +335,16 @@ impl ServerWorker { }); }); - handle_pair(idx, tx1, tx2, avail) + handle_pair(idx, tx1, tx2, counter) } - fn restart_service(&mut self, token: Token, factory_id: usize) { + fn restart_service(&mut self, idx: usize, factory_id: usize) { let factory = &self.factories[factory_id]; - trace!("Service {:?} failed, restarting", factory.name(token)); - self.services[token.0].status = WorkerServiceStatus::Restarting; + trace!("Service {:?} failed, restarting", factory.name(idx)); + self.services[idx].status = WorkerServiceStatus::Restarting; self.state = WorkerState::Restarting(Restart { factory_id, - token, + token: idx, fut: factory.create(), }); } @@ -307,8 +362,8 @@ impl ServerWorker { }); } - fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { - let mut ready = self.conns.available(cx); + fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { + let mut ready = true; for (idx, srv) in self.services.iter_mut().enumerate() { if srv.status == WorkerServiceStatus::Available || srv.status == WorkerServiceStatus::Unavailable @@ -318,7 +373,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Unavailable { trace!( "Service {:?} is available", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Available; } @@ -329,7 +384,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Available { trace!( "Service {:?} is unavailable", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Unavailable; } @@ -337,10 +392,10 @@ impl ServerWorker { Poll::Ready(Err(_)) => { error!( "Service {:?} readiness check returned error, restarting", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Failed; - return Err((Token(idx), srv.factory)); + return Err((idx, srv.factory)); } } } @@ -359,8 +414,8 @@ enum WorkerState { struct Restart { factory_id: usize, - token: Token, - fut: LocalBoxFuture<'static, Result, ()>>, + token: usize, + fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>, } // Shutdown keep states necessary for server shutdown: @@ -381,10 +436,6 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { - // Set availability to true so if accept try to send connection to this worker - // it would find worker is gone and remove it. - // This is helpful when worker is dropped unexpected. - self.availability.set(true); // Stop the Arbiter ServerWorker runs on on drop. Arbiter::current().stop(); } @@ -399,8 +450,7 @@ impl Future for ServerWorker { // `StopWorker` message handler if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) { - this.availability.set(false); - let num = this.conns.total(); + let num = this.counter.total(); if num == 0 { info!("Shutting down worker, 0 connections"); let _ = tx.send(true); @@ -427,7 +477,6 @@ impl Future for ServerWorker { WorkerState::Unavailable => match this.check_readiness(cx) { Ok(true) => { this.state = WorkerState::Available; - this.availability.set(true); self.poll(cx) } Ok(false) => Poll::Pending, @@ -440,26 +489,22 @@ impl Future for ServerWorker { let factory_id = restart.factory_id; let token = restart.token; - let service = ready!(restart.fut.as_mut().poll(cx)) + let (token_new, service) = ready!(restart.fut.as_mut().poll(cx)) .unwrap_or_else(|_| { panic!( "Can not restart {:?} service", this.factories[factory_id].name(token) ) - }) - .into_iter() - // Find the same token from vector. There should be only one - // So the first match would be enough. - .find(|(t, _)| *t == token) - .map(|(_, service)| service) - .expect("No BoxedServerService found"); + }); + + assert_eq!(token, token_new); trace!( "Service {:?} has been restarted", this.factories[factory_id].name(token) ); - this.services[token.0].created(service); + this.services[token].created(service); this.state = WorkerState::Unavailable; self.poll(cx) @@ -468,7 +513,7 @@ impl Future for ServerWorker { // Wait for 1 second. ready!(shutdown.timer.as_mut().poll(cx)); - if this.conns.total() == 0 { + if this.counter.total() == 0 { // Graceful shutdown. if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { let _ = shutdown.tx.send(true); @@ -493,22 +538,20 @@ impl Future for ServerWorker { Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); - this.availability.set(false); this.state = WorkerState::Unavailable; return self.poll(cx); } Err((token, idx)) => { this.restart_service(token, idx); - this.availability.set(false); return self.poll(cx); } } + // handle incoming io stream match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { - // handle incoming io stream Some(msg) => { - let guard = this.conns.get(); - let _ = this.services[msg.token.0].service.call((guard, msg.io)); + let guard = this.counter.guard(); + let _ = this.services[msg.token].service.call((guard, msg.io)); } None => return Poll::Ready(()), }; diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index cc9f8190..78894816 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -142,57 +142,6 @@ fn test_start() { let _ = h.join(); } -#[test] -fn test_configure() { - let addr1 = unused_addr(); - let addr2 = unused_addr(); - let addr3 = unused_addr(); - let (tx, rx) = mpsc::channel(); - let num = Arc::new(AtomicUsize::new(0)); - let num2 = num.clone(); - - let h = thread::spawn(move || { - let num = num2.clone(); - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() - .disable_signals() - .configure(move |cfg| { - let num = num.clone(); - let lst = net::TcpListener::bind(addr3).unwrap(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .listen("addr3", lst) - .apply(move |rt| { - let num = num.clone(); - rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); - rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); - rt.on_start(lazy(move |_| { - let _ = num.fetch_add(1, Ordering::Relaxed); - })) - }) - }) - .unwrap() - .workers(1) - .run() - })); - - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); - }); - let (_, sys) = rx.recv().unwrap(); - thread::sleep(Duration::from_millis(500)); - - assert!(net::TcpStream::connect(addr1).is_ok()); - assert!(net::TcpStream::connect(addr2).is_ok()); - assert!(net::TcpStream::connect(addr3).is_ok()); - assert_eq!(num.load(Ordering::Relaxed), 1); - sys.stop(); - let _ = h.join(); -} - #[actix_rt::test] async fn test_max_concurrent_connections() { // Note: @@ -305,81 +254,6 @@ async fn test_service_restart() { let num_clone = num.clone(); let num2_clone = num2.clone(); - let h = thread::spawn(move || { - actix_rt::System::new().block_on(async { - let server = Server::build() - .backlog(1) - .disable_signals() - .configure(move |cfg| { - let num = num.clone(); - let num2 = num2.clone(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .apply(move |rt| { - let num = num.clone(); - let num2 = num2.clone(); - rt.service( - "addr1", - fn_factory(move || { - let num = num.clone(); - async move { Ok::<_, ()>(TestService(num)) } - }), - ); - rt.service( - "addr2", - fn_factory(move || { - let num2 = num2.clone(); - async move { Ok::<_, ()>(TestService(num2)) } - }), - ); - }) - }) - .unwrap() - .workers(1) - .run(); - - let _ = tx.send((server.clone(), actix_rt::System::current())); - server.await - }) - }); - - let (server, sys) = rx.recv().unwrap(); - - for _ in 0..5 { - TcpStream::connect(addr1) - .await - .unwrap() - .shutdown() - .await - .unwrap(); - TcpStream::connect(addr2) - .await - .unwrap() - .shutdown() - .await - .unwrap(); - } - - sleep(Duration::from_secs(3)).await; - - assert!(num_clone.load(Ordering::SeqCst) > 5); - assert!(num2_clone.load(Ordering::SeqCst) > 5); - - sys.stop(); - let _ = server.stop(false); - let _ = h.join().unwrap(); - - let addr1 = unused_addr(); - let addr2 = unused_addr(); - let (tx, rx) = mpsc::channel(); - let num = Arc::new(AtomicUsize::new(0)); - let num2 = Arc::new(AtomicUsize::new(0)); - - let num_clone = num.clone(); - let num2_clone = num2.clone(); - let h = thread::spawn(move || { let num = num.clone(); actix_rt::System::new().block_on(async { diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index 00b686f9..e39f3058 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -41,7 +41,7 @@ where /// /// actix_service::forward_ready!(service); /// -/// fn call(&self, req: S::Request) -> Self::Future { +/// fn call(&self, req: Req) -> Self::Future { /// TimeoutServiceResponse { /// fut: self.service.call(req), /// sleep: Sleep::new(clock::now() + self.timeout), diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 73395a14..9fa260c7 100755 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -64,7 +64,7 @@ tokio-native-tls = { version = "0.3", optional = true } [dev-dependencies] actix-rt = "2.2.0" -actix-server = "2.0.0-beta.3" +actix-server = "2.0.0-beta.5" bytes = "1" env_logger = "0.8" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }