merge master

This commit is contained in:
fakeshadow 2021-04-01 18:30:23 +08:00
commit d22ab417da
13 changed files with 149 additions and 130 deletions

View File

@ -1,3 +1,3 @@
[alias]
chk = "hack check --workspace --all-features --tests --examples"
lint = "hack --clean-per-run clippy --workspace --tests --examples"
chk = "hack check --workspace --tests --examples"

View File

@ -1,7 +1,10 @@
# Changes
## Unreleased - 2021-xx-xx
* Prevent panic when shutdown_timeout is very large. [f9262db]
## 2.0.0-beta.4 - 2021-04-01
* Prevent panic when `shutdown_timeout` is very large. [f9262db]
[f9262db]: https://github.com/actix/actix-net/commit/f9262db

View File

@ -1,6 +1,6 @@
[package]
name = "actix-server"
version = "2.0.0-beta.3"
version = "2.0.0-beta.4"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"fakeshadow <24548779@qq.com>",
@ -9,7 +9,6 @@ description = "General purpose TCP server built for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-server"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"

View File

@ -142,9 +142,7 @@ impl Accept {
self.process_timeout(&mut sockets);
continue;
}
_ => {
panic!("Poll error: {}", e);
}
_ => panic!("Poll error: {}", e),
}
}
@ -160,13 +158,12 @@ impl Accept {
let mut guard = self.waker_queue.guard();
match guard.pop_front() {
// worker notify it becomes available. we may want to recover
// from backpressure.
// from backpressure.
Some(WakerInterest::WorkerAvailable) => {
drop(guard);
self.maybe_backpressure(&mut sockets, false);
}
// a new worker thread is made and it's handle would be added
// to Accept
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
// maybe we want to recover from a backpressure.
@ -207,23 +204,29 @@ impl Accept {
}
fn process_timeout(&mut self, sockets: &mut Slab<ServerSocketInfo>) {
// take old timeout as it's no use after each iteration.
// Take old timeout as it's no use after each iteration.
if self.timeout.take().is_some() {
let now = Instant::now();
sockets.iter_mut().for_each(|(token, info)| {
// only the ServerSocketInfo have an associate timeout value was de registered.
if let Some(inst) = info.timeout_deadline {
// timeout expired register socket again.
if now >= inst {
info.timeout_deadline = None;
self.register_logged(token, info);
} else {
sockets
.iter_mut()
// Only sockets that had an associated timeout were deregistered.
.filter(|(_, info)| info.timeout_deadline.is_some())
.for_each(|(token, info)| {
let inst = info.timeout_deadline.take().unwrap();
if now < inst {
// still timed out. try set new timeout.
let dur = inst - now;
self.set_timeout(dur);
} else if !self.backpressure {
// timeout expired register socket again.
self.register_logged(token, info);
}
}
});
// Drop the timeout if server is in backpressure and socket timeout is expired.
// When server recovers from backpressure it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
}
}
@ -290,89 +293,92 @@ impl Accept {
}
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
if self.backpressure {
if !on {
// Only operate when server is in a different backpressure than the given flag.
if self.backpressure != on {
if on {
self.backpressure = true;
// TODO: figure out if timing out sockets can be safely de-registered twice.
self.deregister_all(sockets);
} else {
self.backpressure = false;
for (token, info) in sockets.iter_mut() {
if info.timeout_deadline.is_some() {
// socket will attempt to re-register itself when its timeout completes
continue;
}
self.register_logged(token, info);
}
sockets
.iter_mut()
// Only operate on sockets without associated timeout.
// Sockets with it will attempt to re-register when their timeout expires.
.filter(|(_, info)| info.timeout_deadline.is_none())
.for_each(|(token, info)| self.register_logged(token, info));
}
} else if on {
self.backpressure = true;
self.deregister_all(sockets);
}
}
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut msg: Conn) {
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut conn: Conn) {
if self.backpressure {
// send_connection would remove fault worker from handles.
// worst case here is conn get dropped after all handles are gone.
while !self.handles.is_empty() {
match self.handles[self.next].send(msg) {
Ok(_) => {
self.set_next();
break;
}
Err(tmp) => {
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made
// after that remove the fault worker
self.srv.worker_faulted(self.handles[self.next].idx);
msg = tmp;
self.handles.swap_remove(self.next);
if self.handles.is_empty() {
error!("No workers");
return;
} else if self.handles.len() <= self.next {
self.next = 0;
}
continue;
}
match self.send_connection(sockets, conn) {
Ok(_) => return,
Err(c) => conn = c,
}
}
} else {
// Do one round and try to send conn to all workers until it succeed.
// Start from self.next.
let mut idx = 0;
while idx < self.handles.len() {
idx += 1;
if self.handles[self.next].available() {
match self.handles[self.next].send(msg) {
Ok(_) => {
self.set_next();
return;
}
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made.
// after that remove the fault worker and enter backpressure if necessary.
Err(tmp) => {
self.srv.worker_faulted(self.handles[self.next].idx);
msg = tmp;
self.handles.swap_remove(self.next);
if self.handles.is_empty() {
error!("No workers");
self.maybe_backpressure(sockets, true);
return;
} else if self.handles.len() <= self.next {
self.next = 0;
}
continue;
}
match self.send_connection(sockets, conn) {
Ok(_) => return,
Err(c) => conn = c,
}
} else {
self.set_next();
}
self.set_next();
}
// enable backpressure
// Sending Conn failed due to either all workers are in error or not available.
// Enter backpressure state and try again.
self.maybe_backpressure(sockets, true);
self.accept_one(sockets, msg);
self.accept_one(sockets, conn);
}
}
// set next worker handle that would accept work.
// Set next worker handle that would accept work.
fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len();
}
// Send connection to worker and handle error.
fn send_connection(
&mut self,
sockets: &mut Slab<ServerSocketInfo>,
conn: Conn,
) -> Result<(), Conn> {
match self.handles[self.next].send(conn) {
Ok(_) => {
self.set_next();
Ok(())
}
Err(conn) => {
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made.
// after that remove the fault worker and enter backpressure if necessary.
self.srv.worker_faulted(self.handles[self.next].idx);
self.handles.swap_remove(self.next);
if self.handles.is_empty() {
error!("No workers");
self.maybe_backpressure(sockets, true);
// All workers are gone and Conn is nowhere to be sent.
// Treat this situation as Ok and drop Conn.
return Ok(());
} else if self.handles.len() <= self.next {
self.next = 0;
}
Err(conn)
}
}
}
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
loop {
let info = sockets
@ -380,11 +386,10 @@ impl Accept {
.expect("ServerSocketInfo is removed from Slab");
match info.lst.accept() {
Ok((io, addr)) => {
Ok(io) => {
let msg = Conn {
io,
token: info.token,
peer: Some(addr),
};
self.accept_one(sockets, msg);
}

View File

@ -40,15 +40,11 @@ impl MioListener {
}
}
pub(crate) fn accept(&self) -> io::Result<(MioStream, SocketAddr)> {
pub(crate) fn accept(&self) -> io::Result<MioStream> {
match *self {
MioListener::Tcp(ref lst) => lst
.accept()
.map(|(stream, addr)| (MioStream::Tcp(stream), SocketAddr::Tcp(addr))),
MioListener::Tcp(ref lst) => lst.accept().map(|(stream, _)| MioStream::Tcp(stream)),
#[cfg(unix)]
MioListener::Uds(ref lst) => lst
.accept()
.map(|(stream, addr)| (MioStream::Uds(stream), SocketAddr::Uds(addr))),
MioListener::Uds(ref lst) => lst.accept().map(|(stream, _)| MioStream::Uds(stream)),
}
}
}

View File

@ -16,7 +16,7 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::{MioStream, SocketAddr};
use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::{join_all, Token};
@ -33,7 +33,6 @@ pub(crate) struct StopCommand {
pub(crate) struct Conn {
pub io: MioStream,
pub token: Token,
pub peer: Option<SocketAddr>,
}
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);

View File

@ -1,6 +1,9 @@
# Changes
## Unreleased - 2021-xx-xx
## 3.0.0-beta.3 - 2021-04-01
* Moved `mpsc` to own crate `local-channel`. [#301]
* Moved `task::LocalWaker` to own crate `local-waker`. [#301]
* Remove `timeout` module. [#301]

View File

@ -1,12 +1,13 @@
[package]
name = "actix-utils"
version = "3.0.0-beta.2"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Various network related services and utilities for the Actix ecosystem"
version = "3.0.0-beta.3"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
description = "Utilities for the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-utils"
categories = ["network-programming", "asynchronous"]
license = "MIT OR Apache-2.0"
edition = "2018"

View File

@ -1,24 +1,18 @@
//! Task-notifying counter.
use core::{cell::Cell, task};
use core::{cell::Cell, fmt, task};
use std::rc::Rc;
use local_waker::LocalWaker;
#[derive(Clone)]
/// Simple counter with ability to notify task on reaching specific number
///
/// Counter could be cloned, total n-count is shared across all clones.
#[derive(Debug, Clone)]
pub struct Counter(Rc<CounterInner>);
struct CounterInner {
count: Cell<usize>,
capacity: usize,
task: LocalWaker,
}
impl Counter {
/// Create `Counter` instance and set max value.
/// Create `Counter` instance with max value.
pub fn new(capacity: usize) -> Self {
Counter(Rc::new(CounterInner {
capacity,
@ -27,38 +21,26 @@ impl Counter {
}))
}
/// Get counter guard.
/// Create new counter guard, incrementing the counter.
pub fn get(&self) -> CounterGuard {
CounterGuard::new(self.0.clone())
}
/// Check if counter is not at capacity. If counter at capacity
/// it registers notification for current task.
/// Notify current task and return true if counter is at capacity.
pub fn available(&self, cx: &mut task::Context<'_>) -> bool {
self.0.available(cx)
}
/// Get total number of acquired counts
/// Get total number of acquired guards.
pub fn total(&self) -> usize {
self.0.count.get()
}
}
pub struct CounterGuard(Rc<CounterInner>);
impl CounterGuard {
fn new(inner: Rc<CounterInner>) -> Self {
inner.inc();
CounterGuard(inner)
}
}
impl Unpin for CounterGuard {}
impl Drop for CounterGuard {
fn drop(&mut self) {
self.0.dec();
}
struct CounterInner {
count: Cell<usize>,
capacity: usize,
task: LocalWaker,
}
impl CounterInner {
@ -83,3 +65,32 @@ impl CounterInner {
}
}
}
impl fmt::Debug for CounterInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Counter")
.field("count", &self.count.get())
.field("capacity", &self.capacity)
.field("task", &self.task)
.finish()
}
}
/// An RAII structure that keeps the underlying counter incremented until this guard is dropped.
#[derive(Debug)]
pub struct CounterGuard(Rc<CounterInner>);
impl CounterGuard {
fn new(inner: Rc<CounterInner>) -> Self {
inner.inc();
CounterGuard(inner)
}
}
impl Unpin for CounterGuard {}
impl Drop for CounterGuard {
fn drop(&mut self) {
self.0.dec();
}
}

View File

@ -1,7 +1,7 @@
//! Various network related services and utilities for the Actix ecosystem.
//! Various utilities for the Actix ecosystem.
#![deny(rust_2018_idioms, nonstandard_style)]
#![allow(clippy::type_complexity)]
#![warn(missing_docs)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]

View File

@ -1,16 +1,15 @@
[package]
name = "local-channel"
version = "0.1.1"
version = "0.1.2"
description = "A non-threadsafe multi-producer, single-consumer, futures-aware, FIFO queue"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
]
edition = "2018"
license = "MIT OR Apache-2.0"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-server"
keywords = ["channel", "local", "futures"]
license = "MIT OR Apache-2.0"
edition = "2018"
[dependencies]
futures-core = { version = "0.3.7", default-features = false }

View File

@ -3,5 +3,9 @@
## Unreleased - 2021-xx-xx
## 0.1.2 - 2021-04-01
* Fix crate metadata.
## 0.1.1 - 2021-03-29
* Move `LocalWaker` to it's own crate.

View File

@ -8,7 +8,6 @@ authors = [
]
keywords = ["waker", "local", "futures", "no-std"]
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/local-waker"
categories = ["asynchronous", "no-std"]
license = "MIT OR Apache-2.0"
edition = "2018"