more refactor for actix-server

This commit is contained in:
fakeshadow 2020-10-25 21:31:33 +08:00
parent f18f709ba7
commit 9f846643c1
18 changed files with 192 additions and 208 deletions

View File

@ -14,7 +14,7 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
version: version:
- 1.42.0 - 1.43.0
- stable - stable
- nightly - nightly

View File

@ -1,8 +1,9 @@
# Changes # Changes
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
* Update `tokio` dependency to 0.3.1 * Upgrade `pin-project` to `1.0`.
* Update `tokio-util` dependency to 0.4 * Update `tokio` dependency to 0.3.1.
* Update `tokio-util` dependency to 0.4.
## 0.3.0 - 2020-08-23 ## 0.3.0 - 2020-08-23
* No changes from beta 2. * No changes from beta 2.

View File

@ -23,6 +23,6 @@ bytes = "0.5"
futures-core = { version = "0.3.4", default-features = false } futures-core = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false }
log = "0.4" log = "0.4"
pin-project = "0.4.17" pin-project = "1.0.0"
tokio = "0.3.1" tokio = "0.3.1"
tokio-util = { version = "0.4.0", default-features = false, features = ["codec"] } tokio-util = { version = "0.4.0", default-features = false, features = ["codec"] }

View File

@ -7,21 +7,20 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::{fmt, thread}; use std::{fmt, thread};
use copyless::BoxHelper;
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures_channel::oneshot::{channel, Canceled, Sender}; use futures_channel::oneshot::{channel, Canceled, Sender};
use futures_util::{ use futures_util::{
future::{self, FutureExt}, future::{self, FutureExt},
stream::Stream, stream::Stream,
}; };
use smallvec::SmallVec;
pub use tokio::task::JoinHandle;
use crate::runtime::Runtime; use crate::runtime::Runtime;
use crate::system::System; use crate::system::System;
use copyless::BoxHelper;
use smallvec::SmallVec;
pub use tokio::task::JoinHandle;
thread_local!( thread_local!(
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None); static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
static RUNNING: Cell<bool> = Cell::new(false); static RUNNING: Cell<bool> = Cell::new(false);

View File

@ -1,11 +1,12 @@
# Changes # Changes
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
* Update `mio` dependency to 0.7.3 * Update `mio` dependency to 0.7.3.
* `ServerBuilder::backlog` would accept `u32` instead of `i32` * Remove `socket2` dependency.
* Use `concurrent-queue` to manage poll wakes instead of `futures::channel::mpsc`. * `ServerBuilder::backlog` would accept `u32` instead of `i32`.
* Remove `AcceptNotify` type and pass `WakerQueue` to `WorkerClient` for notify the `Accept` more directly. * Use `concurrent-queue` to manage poll wakes instead of `futures::channel::mpsc::unbounded`.
* Convert `mio::Stream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`IntoRawSocket` and `FromRawSocket` on windows). * Remove `AcceptNotify` type and pass `WakerQueue` to `Worker` for wake up the `Accept`'s `Poll`.
* Convert `mio::net::TcpStream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`FromRawSocket` and `IntoRawSocket` on windows).
* Remove `AsyncRead` and `AsyncWrite` trait bound for `socket::FromStream` trait. * Remove `AsyncRead` and `AsyncWrite` trait bound for `socket::FromStream` trait.
## 1.0.4 - 2020-09-12 ## 1.0.4 - 2020-09-12

View File

@ -61,13 +61,13 @@ impl AcceptLoop {
pub(crate) fn start( pub(crate) fn start(
&mut self, &mut self,
socks: Vec<(Token, MioListener)>, socks: Vec<(Token, MioListener)>,
workers: Vec<WorkerHandle>, handles: Vec<WorkerHandle>,
) { ) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap(); let poll = self.poll.take().unwrap();
let waker = self.waker.clone(); let waker = self.waker.clone();
Accept::start(poll, waker, socks, srv, workers); Accept::start(poll, waker, socks, srv, handles);
} }
} }
@ -75,14 +75,12 @@ impl AcceptLoop {
struct Accept { struct Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
workers: Vec<WorkerHandle>, handles: Vec<WorkerHandle>,
srv: Server, srv: Server,
next: usize, next: usize,
backpressure: bool, backpressure: bool,
} }
const DELTA: usize = 100;
/// This function defines errors that are per-connection. Which basically /// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means /// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted. /// next connection might be ready to be accepted.
@ -102,7 +100,7 @@ impl Accept {
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(Token, MioListener)>, socks: Vec<(Token, MioListener)>,
srv: Server, srv: Server,
workers: Vec<WorkerHandle>, handles: Vec<WorkerHandle>,
) { ) {
// Accept runs in its own thread and would want to spawn additional futures to current // Accept runs in its own thread and would want to spawn additional futures to current
// actix system. // actix system.
@ -112,7 +110,7 @@ impl Accept {
.spawn(move || { .spawn(move || {
System::set_current(sys); System::set_current(sys);
let (mut accept, sockets) = let (mut accept, sockets) =
Accept::new_with_sockets(poll, waker, socks, workers, srv); Accept::new_with_sockets(poll, waker, socks, handles, srv);
accept.poll_with(sockets); accept.poll_with(sockets);
}) })
.unwrap(); .unwrap();
@ -122,7 +120,7 @@ impl Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(Token, MioListener)>, socks: Vec<(Token, MioListener)>,
workers: Vec<WorkerHandle>, handles: Vec<WorkerHandle>,
srv: Server, srv: Server,
) -> (Accept, Slab<ServerSocketInfo>) { ) -> (Accept, Slab<ServerSocketInfo>) {
let mut sockets = Slab::new(); let mut sockets = Slab::new();
@ -134,7 +132,7 @@ impl Accept {
// Start listening for incoming connections // Start listening for incoming connections
poll.registry() poll.registry()
.register(&mut lst, MioToken(token + DELTA), Interest::READABLE) .register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e)); .unwrap_or_else(|e| panic!("Can not register io: {}", e));
entry.insert(ServerSocketInfo { entry.insert(ServerSocketInfo {
@ -148,7 +146,7 @@ impl Accept {
let accept = Accept { let accept = Accept {
poll, poll,
waker, waker,
workers, handles,
srv, srv,
next: 0, next: 0,
backpressure: false, backpressure: false,
@ -173,11 +171,20 @@ impl Accept {
// necessary/good practice to actively drain the waker queue. // necessary/good practice to actively drain the waker queue.
WAKER_TOKEN => 'waker: loop { WAKER_TOKEN => 'waker: loop {
match self.waker.pop() { match self.waker.pop() {
// worker notify it's availability has change. we maybe want to recover // worker notify it becomes available. we may want to recover from
// from backpressure. // backpressure.
Ok(WakerInterest::Notify) => { Ok(WakerInterest::WorkerAvailable) => {
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
} }
// a new worker thread is made and it's handle would be added to Accept
Ok(WakerInterest::Worker(handle)) => {
// maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false);
self.handles.push(handle);
}
// got timer interest and it's time to try register socket(s) again.
Ok(WakerInterest::Timer) => self.process_timer(&mut sockets),
Err(WakerQueueError::Empty) => break 'waker,
Ok(WakerInterest::Pause) => { Ok(WakerInterest::Pause) => {
sockets.iter_mut().for_each(|(_, info)| { sockets.iter_mut().for_each(|(_, info)| {
match self.deregister(info) { match self.deregister(info) {
@ -196,29 +203,14 @@ impl Accept {
self.register_logged(token, info); self.register_logged(token, info);
}); });
} }
Ok(WakerInterest::Stop) => { Ok(WakerInterest::Stop) | Err(WakerQueueError::Closed) => {
return self.deregister_all(&mut sockets);
}
// a new worker thread is made and it's handle would be added to Accept
Ok(WakerInterest::Worker(handle)) => {
// maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false);
self.workers.push(handle);
}
// got timer interest and it's time to try register socket(s) again.
Ok(WakerInterest::Timer) => self.process_timer(&mut sockets),
Err(WakerQueueError::Empty) => break 'waker,
Err(WakerQueueError::Closed) => {
return self.deregister_all(&mut sockets); return self.deregister_all(&mut sockets);
} }
} }
}, },
_ => { _ => {
let token = usize::from(token); let token = usize::from(token);
if token < DELTA { self.accept(&mut sockets, token);
continue;
}
self.accept(&mut sockets, token - DELTA);
} }
} }
} }
@ -241,11 +233,9 @@ impl Accept {
#[cfg(not(target_os = "windows"))] #[cfg(not(target_os = "windows"))]
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
self.poll.registry().register( self.poll
&mut info.lst, .registry()
MioToken(token + DELTA), .register(&mut info.lst, MioToken(token), Interest::READABLE)
Interest::READABLE,
)
} }
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
@ -255,11 +245,11 @@ impl Accept {
// Calling reregister seems to fix the issue. // Calling reregister seems to fix the issue.
self.poll self.poll
.registry() .registry()
.register(&mut info.lst, mio::Token(token + DELTA), Interest::READABLE) .register(&mut info.lst, mio::Token(token), Interest::READABLE)
.or_else(|_| { .or_else(|_| {
self.poll.registry().reregister( self.poll.registry().reregister(
&mut info.lst, &mut info.lst,
mio::Token(token + DELTA), mio::Token(token),
Interest::READABLE, Interest::READABLE,
) )
}) })
@ -298,8 +288,8 @@ impl Accept {
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut msg: Conn) { fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut msg: Conn) {
if self.backpressure { if self.backpressure {
while !self.workers.is_empty() { while !self.handles.is_empty() {
match self.workers[self.next].send(msg) { match self.handles[self.next].send(msg) {
Ok(_) => { Ok(_) => {
self.set_next(); self.set_next();
break; break;
@ -308,13 +298,13 @@ impl Accept {
// worker lost contact and could be gone. a message is sent to // worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made. // `ServerBuilder` future to notify it a new worker should be made.
// after that remove the fault worker. // after that remove the fault worker.
self.srv.worker_faulted(self.workers[self.next].idx); self.srv.worker_faulted(self.handles[self.next].idx);
msg = tmp; msg = tmp;
self.workers.swap_remove(self.next); self.handles.swap_remove(self.next);
if self.workers.is_empty() { if self.handles.is_empty() {
error!("No workers"); error!("No workers");
return; return;
} else if self.workers.len() <= self.next { } else if self.handles.len() <= self.next {
self.next = 0; self.next = 0;
} }
continue; continue;
@ -323,10 +313,10 @@ impl Accept {
} }
} else { } else {
let mut idx = 0; let mut idx = 0;
while idx < self.workers.len() { while idx < self.handles.len() {
idx += 1; idx += 1;
if self.workers[self.next].available() { if self.handles[self.next].available() {
match self.workers[self.next].send(msg) { match self.handles[self.next].send(msg) {
Ok(_) => { Ok(_) => {
self.set_next(); self.set_next();
return; return;
@ -335,14 +325,14 @@ impl Accept {
// `ServerBuilder` future to notify it a new worker should be made. // `ServerBuilder` future to notify it a new worker should be made.
// after that remove the fault worker and enter backpressure if necessary. // after that remove the fault worker and enter backpressure if necessary.
Err(tmp) => { Err(tmp) => {
self.srv.worker_faulted(self.workers[self.next].idx); self.srv.worker_faulted(self.handles[self.next].idx);
msg = tmp; msg = tmp;
self.workers.swap_remove(self.next); self.handles.swap_remove(self.next);
if self.workers.is_empty() { if self.handles.is_empty() {
error!("No workers"); error!("No workers");
self.maybe_backpressure(sockets, true); self.maybe_backpressure(sockets, true);
return; return;
} else if self.workers.len() <= self.next { } else if self.handles.len() <= self.next {
self.next = 0; self.next = 0;
} }
continue; continue;
@ -357,9 +347,9 @@ impl Accept {
} }
} }
// set next worker that would accept work. // set next worker handle that would accept work.
fn set_next(&mut self) { fn set_next(&mut self) {
self.next = (self.next + 1) % self.workers.len(); self.next = (self.next + 1) % self.handles.len();
} }
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) { fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
@ -375,9 +365,9 @@ impl Accept {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue, Err(ref e) if connection_error(e) => continue,
Err(e) => { Err(e) => {
// deregister socket listener temporary // deregister listener temporary
error!("Error accepting connection: {}", e); error!("Error accepting connection: {}", e);
if let Err(err) = self.poll.registry().deregister(&mut info.lst) { if let Err(err) = self.deregister(info) {
error!("Can not deregister server socket {}", err); error!("Can not deregister server socket {}", err);
} }

View File

@ -30,7 +30,7 @@ pub struct ServerBuilder {
threads: usize, threads: usize,
token: Token, token: Token,
backlog: u32, backlog: u32,
workers: Vec<(usize, WorkerHandle)>, handles: Vec<(usize, WorkerHandle)>,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, String, MioListener)>, sockets: Vec<(Token, String, MioListener)>,
accept: AcceptLoop, accept: AcceptLoop,
@ -56,8 +56,8 @@ impl ServerBuilder {
ServerBuilder { ServerBuilder {
threads: num_cpus::get(), threads: num_cpus::get(),
token: Token(0), token: Token::default(),
workers: Vec::new(), handles: Vec::new(),
services: Vec::new(), services: Vec::new(),
sockets: Vec::new(), sockets: Vec::new(),
accept: AcceptLoop::new(server.clone()), accept: AcceptLoop::new(server.clone()),
@ -264,12 +264,12 @@ impl ServerBuilder {
info!("Starting {} workers", self.threads); info!("Starting {} workers", self.threads);
// start workers // start workers
let workers = (0..self.threads) let handles = (0..self.threads)
.map(|idx| { .map(|idx| {
let worker = self.start_worker(idx, self.accept.waker_owned()); let handle = self.start_worker(idx, self.accept.waker_owned());
self.workers.push((idx, worker.clone())); self.handles.push((idx, handle.clone()));
worker handle
}) })
.collect(); .collect();
@ -282,7 +282,7 @@ impl ServerBuilder {
.into_iter() .into_iter()
.map(|t| (t.0, t.2)) .map(|t| (t.0, t.2))
.collect(), .collect(),
workers, handles,
); );
// handle signals // handle signals
@ -359,9 +359,9 @@ impl ServerBuilder {
let notify = std::mem::take(&mut self.notify); let notify = std::mem::take(&mut self.notify);
// stop workers // stop workers
if !self.workers.is_empty() && graceful { if !self.handles.is_empty() && graceful {
spawn( spawn(
self.workers self.handles
.iter() .iter()
.map(move |worker| worker.1.stop(graceful)) .map(move |worker| worker.1.stop(graceful))
.collect::<FuturesUnordered<_>>() .collect::<FuturesUnordered<_>>()
@ -403,9 +403,9 @@ impl ServerBuilder {
} }
ServerCommand::WorkerFaulted(idx) => { ServerCommand::WorkerFaulted(idx) => {
let mut found = false; let mut found = false;
for i in 0..self.workers.len() { for i in 0..self.handles.len() {
if self.workers[i].0 == idx { if self.handles[i].0 == idx {
self.workers.swap_remove(i); self.handles.swap_remove(i);
found = true; found = true;
break; break;
} }
@ -414,10 +414,10 @@ impl ServerBuilder {
if found { if found {
error!("Worker has died {:?}, restarting", idx); error!("Worker has died {:?}, restarting", idx);
let mut new_idx = self.workers.len(); let mut new_idx = self.handles.len();
'found: loop { 'found: loop {
for i in 0..self.workers.len() { for i in 0..self.handles.len() {
if self.workers[i].0 == new_idx { if self.handles[i].0 == new_idx {
new_idx += 1; new_idx += 1;
continue 'found; continue 'found;
} }
@ -426,7 +426,7 @@ impl ServerBuilder {
} }
let handle = self.start_worker(new_idx, self.accept.waker_owned()); let handle = self.start_worker(new_idx, self.accept.waker_owned());
self.workers.push((new_idx, handle.clone())); self.handles.push((new_idx, handle.clone()));
self.accept.wake(WakerInterest::Worker(handle)); self.accept.wake(WakerInterest::Worker(handle));
} }
} }

View File

@ -60,14 +60,6 @@ impl ServiceConfig {
self._listen(name, MioTcpListener::from_std(lst)) self._listen(name, MioTcpListener::from_std(lst))
} }
fn _listen<N: AsRef<str>>(&mut self, name: N, lst: MioTcpListener) -> &mut Self {
if self.apply.is_none() {
self.apply = Some(Box::new(not_configured));
}
self.services.push((name.as_ref().to_string(), lst));
self
}
/// Register service configuration function. This function get called /// Register service configuration function. This function get called
/// during worker runtime configuration. It get executed in worker thread. /// during worker runtime configuration. It get executed in worker thread.
pub fn apply<F>(&mut self, f: F) -> io::Result<()> pub fn apply<F>(&mut self, f: F) -> io::Result<()>
@ -77,6 +69,14 @@ impl ServiceConfig {
self.apply = Some(Box::new(f)); self.apply = Some(Box::new(f));
Ok(()) Ok(())
} }
fn _listen<N: AsRef<str>>(&mut self, name: N, lst: MioTcpListener) -> &mut Self {
if self.apply.is_none() {
self.apply = Some(Box::new(not_configured));
}
self.services.push((name.as_ref().to_string(), lst));
self
}
} }
pub(super) struct ConfiguredService { pub(super) struct ConfiguredService {

View File

@ -22,7 +22,17 @@ pub use self::socket::FromStream;
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Token(usize); pub(crate) struct Token(usize);
impl Default for Token {
fn default() -> Self {
Self::new()
}
}
impl Token { impl Token {
fn new() -> Self {
Self(0)
}
pub(crate) fn next(&mut self) -> Token { pub(crate) fn next(&mut self) -> Token {
let token = Token(self.0); let token = Token(self.0);
self.0 += 1; self.0 += 1;

View File

@ -235,7 +235,7 @@ mod tests {
assert_eq!(format!("{}", addr), "127.0.0.1:8080"); assert_eq!(format!("{}", addr), "127.0.0.1:8080");
let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = mio::net::TcpSocket::new_v4().unwrap(); let socket = MioTcpSocket::new_v4().unwrap();
socket.set_reuseaddr(true).unwrap(); socket.set_reuseaddr(true).unwrap();
socket.bind(addr).unwrap(); socket.bind(addr).unwrap();
let tcp = socket.listen(128).unwrap(); let tcp = socket.listen(128).unwrap();

View File

@ -1,3 +1,4 @@
use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use concurrent_queue::{ConcurrentQueue, PopError}; use concurrent_queue::{ConcurrentQueue, PopError};
@ -6,7 +7,7 @@ use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerHandle; use crate::worker::WorkerHandle;
/// waker token for `mio::Poll` instance /// waker token for `mio::Poll` instance
pub(crate) const WAKER_TOKEN: MioToken = MioToken(1); pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` /// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
/// the `Poll` would want to look into. /// the `Poll` would want to look into.
@ -18,6 +19,14 @@ impl Clone for WakerQueue {
} }
} }
impl Deref for WakerQueue {
type Target = (Waker, ConcurrentQueue<WakerInterest>);
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl WakerQueue { impl WakerQueue {
/// construct a waker queue with given `Poll`'s `Registry` and capacity. /// construct a waker queue with given `Poll`'s `Registry` and capacity.
/// ///
@ -32,16 +41,21 @@ impl WakerQueue {
/// push a new interest to the queue and wake up the accept poll afterwards. /// push a new interest to the queue and wake up the accept poll afterwards.
pub(crate) fn wake(&self, interest: WakerInterest) { pub(crate) fn wake(&self, interest: WakerInterest) {
// ToDo: should we handle error here? let (waker, queue) = self.deref();
let r = (self.0).1.push(interest);
assert!(r.is_ok());
(self.0).0.wake().expect("can not wake up Accept Poll"); // ToDo: should we handle error here?
queue
.push(interest)
.unwrap_or_else(|e| panic!("WakerQueue overflow: {}", e));
waker
.wake()
.unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e));
} }
/// pop an `WakerInterest` from the back of the queue. /// pop an `WakerInterest` from the back of the queue.
pub(crate) fn pop(&self) -> Result<WakerInterest, WakerQueueError> { pub(crate) fn pop(&self) -> Result<WakerInterest, WakerQueueError> {
(self.0).1.pop() self.deref().1.pop()
} }
} }
@ -49,8 +63,9 @@ impl WakerQueue {
/// ///
/// *. These interests should not be confused with `mio::Interest` and mostly not I/O related /// *. These interests should not be confused with `mio::Interest` and mostly not I/O related
pub(crate) enum WakerInterest { pub(crate) enum WakerInterest {
/// Interest from `Worker` notifying `Accept` to run `maybe_backpressure` method /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
Notify, /// available and can accept new tasks.
WorkerAvailable,
/// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to
/// `ServerCommand` and notify `Accept` to do exactly these tasks. /// `ServerCommand` and notify `Accept` to do exactly these tasks.
Pause, Pause,
@ -60,8 +75,8 @@ pub(crate) enum WakerInterest {
/// connection `Accept` would deregister socket listener temporary and wake up the poll and /// connection `Accept` would deregister socket listener temporary and wake up the poll and
/// register them again after the delayed future resolve. /// register them again after the delayed future resolve.
Timer, Timer,
/// `Worker` ins an interest happen after a worker runs into faulted state(This is determined by /// `WorkerNew` is an interest happen after a worker runs into faulted state(This is determined
/// if work can be sent to it successfully).`Accept` would be waked up and add the new /// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandle`. /// `WorkerHandle`.
Worker(WorkerHandle), Worker(WorkerHandle),
} }

View File

@ -48,15 +48,15 @@ pub fn max_concurrent_connections(num: usize) {
MAX_CONNS.store(num, Ordering::Relaxed); MAX_CONNS.store(num, Ordering::Relaxed);
} }
pub(crate) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|conns| conns.total())
}
thread_local! { thread_local! {
static MAX_CONNS_COUNTER: Counter = static MAX_CONNS_COUNTER: Counter =
Counter::new(MAX_CONNS.load(Ordering::Relaxed)); Counter::new(MAX_CONNS.load(Ordering::Relaxed));
} }
pub(crate) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|conns| conns.total())
}
// a handle to worker that can send message to worker and share the availability of worker to other // a handle to worker that can send message to worker and share the availability of worker to other
// thread. // thread.
#[derive(Clone)] #[derive(Clone)]
@ -119,8 +119,9 @@ impl WorkerAvailability {
pub fn set(&self, val: bool) { pub fn set(&self, val: bool) {
let old = self.available.swap(val, Ordering::Release); let old = self.available.swap(val, Ordering::Release);
// notify the accept on switched to available.
if !old && val { if !old && val {
self.waker.wake(WakerInterest::Notify); self.waker.wake(WakerInterest::WorkerAvailable);
} }
} }
} }
@ -174,6 +175,7 @@ impl Worker {
let (tx2, rx2) = unbounded(); let (tx2, rx2) = unbounded();
let avail = availability.clone(); let avail = availability.clone();
// every worker runs in it's own arbiter.
Arbiter::new().send(Box::pin(async move { Arbiter::new().send(Box::pin(async move {
availability.set(false); availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
@ -184,7 +186,7 @@ impl Worker {
shutdown_timeout, shutdown_timeout,
services: Vec::new(), services: Vec::new(),
conns: conns.clone(), conns: conns.clone(),
state: WorkerState::Unavailable(Vec::new()), state: WorkerState::Unavailable,
}); });
let fut = wrk let fut = wrk
@ -253,7 +255,7 @@ impl Worker {
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> { fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
let mut ready = self.conns.available(cx); let mut ready = self.conns.available(cx);
let mut failed = None; let mut failed = None;
for (idx, srv) in &mut self.services.iter_mut().enumerate() { for (idx, srv) in self.services.iter_mut().enumerate() {
if srv.status == WorkerServiceStatus::Available if srv.status == WorkerServiceStatus::Available
|| srv.status == WorkerServiceStatus::Unavailable || srv.status == WorkerServiceStatus::Unavailable
{ {
@ -299,7 +301,7 @@ impl Worker {
enum WorkerState { enum WorkerState {
Available, Available,
Unavailable(Vec<Conn>), Unavailable,
Restarting( Restarting(
usize, usize,
Token, Token,
@ -345,43 +347,24 @@ impl Future for Worker {
} }
match self.state { match self.state {
WorkerState::Unavailable(ref mut conns) => { WorkerState::Unavailable => match self.check_readiness(cx) {
let conn = conns.pop(); Ok(true) => {
match self.check_readiness(cx) { self.state = WorkerState::Available;
Ok(true) => { self.availability.set(true);
// process requests from wait queue self.poll(cx)
if let Some(conn) = conn {
let guard = self.conns.get();
let _ = self.services[conn.token.0]
.service
.call((Some(guard), ServerMessage::Connect(conn.io)));
} else {
self.state = WorkerState::Available;
self.availability.set(true);
}
self.poll(cx)
}
Ok(false) => {
// push connection back to queue
if let Some(conn) = conn {
if let WorkerState::Unavailable(ref mut conns) = self.state {
conns.push(conn);
}
}
Poll::Pending
}
Err((token, idx)) => {
trace!(
"Service {:?} failed, restarting",
self.factories[idx].name(token)
);
self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state =
WorkerState::Restarting(idx, token, self.factories[idx].create());
self.poll(cx)
}
} }
} Ok(false) => Poll::Pending,
Err((token, idx)) => {
trace!(
"Service {:?} failed, restarting",
self.factories[idx].name(token)
);
self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state =
WorkerState::Restarting(idx, token, self.factories[idx].create());
self.poll(cx)
}
},
WorkerState::Restarting(idx, token, ref mut fut) => { WorkerState::Restarting(idx, token, ref mut fut) => {
match fut.as_mut().poll(cx) { match fut.as_mut().poll(cx) {
Poll::Ready(Ok(item)) => { Poll::Ready(Ok(item)) => {
@ -392,18 +375,9 @@ impl Future for Worker {
self.factories[idx].name(token) self.factories[idx].name(token)
); );
self.services[token.0].created(service); self.services[token.0].created(service);
self.state = WorkerState::Unavailable(Vec::new()); self.state = WorkerState::Unavailable;
return self.poll(cx); return self.poll(cx);
} }
// for (token, service) in item {
// trace!(
// "Service {:?} has been restarted",
// self.factories[idx].name(token)
// );
// self.services[token.0].created(service);
// self.state = WorkerState::Unavailable(Vec::new());
// return self.poll(cx);
// }
} }
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
panic!( panic!(
@ -411,9 +385,7 @@ impl Future for Worker {
self.factories[idx].name(token) self.factories[idx].name(token)
); );
} }
Poll::Pending => { Poll::Pending => return Poll::Pending,
return Poll::Pending;
}
} }
self.poll(cx) self.poll(cx)
} }
@ -441,49 +413,41 @@ impl Future for Worker {
Poll::Pending Poll::Pending
} }
WorkerState::Available => { // actively poll stream and handle worker command
loop { WorkerState::Available => loop {
return match Pin::new(&mut self.rx).poll_next(cx) { match self.check_readiness(cx) {
// handle incoming io stream Ok(true) => (),
Poll::Ready(Some(WorkerCommand(msg))) => { Ok(false) => {
match self.check_readiness(cx) { trace!("Worker is unavailable");
Ok(true) => { self.availability.set(false);
let guard = self.conns.get(); self.state = WorkerState::Unavailable;
let _ = self.services[msg.token.0] return self.poll(cx);
.service }
.call((Some(guard), ServerMessage::Connect(msg.io))); Err((token, idx)) => {
continue; trace!(
} "Service {:?} failed, restarting",
Ok(false) => { self.factories[idx].name(token)
trace!("Worker is unavailable"); );
self.availability.set(false); self.availability.set(false);
self.state = WorkerState::Unavailable(vec![msg]); self.services[token.0].status = WorkerServiceStatus::Restarting;
} self.state =
Err((token, idx)) => { WorkerState::Restarting(idx, token, self.factories[idx].create());
trace!( return self.poll(cx);
"Service {:?} failed, restarting", }
self.factories[idx].name(token)
);
self.availability.set(false);
self.services[token.0].status =
WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(
idx,
token,
self.factories[idx].create(),
);
}
}
self.poll(cx)
}
Poll::Pending => {
self.state = WorkerState::Available;
Poll::Pending
}
Poll::Ready(None) => Poll::Ready(()),
};
} }
}
match Pin::new(&mut self.rx).poll_next(cx) {
// handle incoming io stream
Poll::Ready(Some(WorkerCommand(msg))) => {
let guard = self.conns.get();
let _ = self.services[msg.token.0]
.service
.call((Some(guard), ServerMessage::Connect(msg.io)));
}
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
};
},
} }
} }
} }

View File

@ -1,7 +1,7 @@
# Changes # Changes
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
* Upgrade `pin-project` to `1.0`.
## 1.0.6 - 2020-08-09 ## 1.0.6 - 2020-08-09

View File

@ -18,7 +18,7 @@ path = "src/lib.rs"
[dependencies] [dependencies]
futures-util = "0.3.1" futures-util = "0.3.1"
pin-project = "0.4.17" pin-project = "1.0.0"
[dev-dependencies] [dev-dependencies]
actix-rt = "1.0.0" actix-rt = "1.0.0"

View File

@ -47,9 +47,9 @@ tokio-openssl = { version = "0.5.0", optional = true }
# rustls # rustls
rust-tls = { package = "rustls", version = "0.18.0", optional = true } rust-tls = { package = "rustls", version = "0.18.0", optional = true }
tokio-rustls = { version = "0.20.0", optional = true }
webpki = { version = "0.21", optional = true } webpki = { version = "0.21", optional = true }
webpki-roots = { version = "0.20", optional = true } webpki-roots = { version = "0.20", optional = true }
tokio-rustls = { version = "0.20.0", optional = true }
# native-tls # native-tls
native-tls = { version = "0.2", optional = true } native-tls = { version = "0.2", optional = true }

View File

@ -1,7 +1,8 @@
# Changes # Changes
## Unreleased - 2020-xx-xx ## Unreleased - 2020-xx-xx
* Update `bytes` to 0.6 * Upgrade `pin-project` to `1.0`.
* Update `bytes` dependency to 0.6.
## 2.0.0 - 2020-08-23 ## 2.0.0 - 2020-08-23
* No changes from beta 1. * No changes from beta 1.

View File

@ -26,5 +26,5 @@ futures-channel = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false }
futures-util = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false }
log = "0.4" log = "0.4"
pin-project = "0.4.17" pin-project = "1.0.0"
slab = "0.4" slab = "0.4"

View File

@ -1,5 +1,8 @@
# Changes # Changes
## Unreleased - 2020-xx-xx
* Update `bytes` dependency to 0.6
## [0.1.5] - 2020-03-30 ## [0.1.5] - 2020-03-30
* Serde support * Serde support