mirror of https://github.com/fafhrd91/actix-net
Rework counter
This commit is contained in:
parent
6af7173fa1
commit
0bad2258b5
|
@ -22,10 +22,7 @@ use crate::signals::{Signal, Signals};
|
|||
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
||||
use crate::socket::{MioTcpListener, MioTcpSocket};
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue};
|
||||
use crate::worker::{
|
||||
ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept,
|
||||
WorkerHandleServer,
|
||||
};
|
||||
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
|
||||
|
||||
/// Server builder
|
||||
pub struct ServerBuilder {
|
||||
|
@ -320,12 +317,11 @@ impl ServerBuilder {
|
|||
fn start_worker(
|
||||
&self,
|
||||
idx: usize,
|
||||
waker: WakerQueue,
|
||||
waker_queue: WakerQueue,
|
||||
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
||||
let avail = WorkerAvailability::new(idx, waker);
|
||||
let services = self.services.iter().map(|v| v.clone_factory()).collect();
|
||||
|
||||
ServerWorker::start(idx, services, avail, self.worker_config)
|
||||
ServerWorker::start(idx, services, waker_queue, self.worker_config)
|
||||
}
|
||||
|
||||
fn handle_cmd(&mut self, item: ServerCommand) {
|
||||
|
|
|
@ -7,13 +7,16 @@ use actix_service::{
|
|||
fn_service, IntoServiceFactory as IntoBaseServiceFactory,
|
||||
ServiceFactory as BaseServiceFactory,
|
||||
};
|
||||
use actix_utils::{counter::CounterGuard, future::ready};
|
||||
use actix_utils::future::ready;
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use log::error;
|
||||
|
||||
use crate::builder::bind_addr;
|
||||
use crate::service::{BoxedServerService, InternalServiceFactory, StreamService};
|
||||
use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
||||
use crate::{
|
||||
builder::bind_addr,
|
||||
service::{BoxedServerService, InternalServiceFactory, StreamService},
|
||||
socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs},
|
||||
worker::WorkerCounterGuard,
|
||||
};
|
||||
|
||||
pub struct ServiceConfig {
|
||||
pub(crate) services: Vec<(String, MioTcpListener)>,
|
||||
|
@ -242,7 +245,7 @@ impl ServiceRuntime {
|
|||
|
||||
type BoxedNewService = Box<
|
||||
dyn BaseServiceFactory<
|
||||
(CounterGuard, MioStream),
|
||||
(WorkerCounterGuard, MioStream),
|
||||
Response = (),
|
||||
Error = (),
|
||||
InitError = (),
|
||||
|
@ -256,7 +259,7 @@ struct ServiceFactory<T> {
|
|||
inner: T,
|
||||
}
|
||||
|
||||
impl<T> BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory<T>
|
||||
impl<T> BaseServiceFactory<(WorkerCounterGuard, MioStream)> for ServiceFactory<T>
|
||||
where
|
||||
T: BaseServiceFactory<TcpStream, Config = ()>,
|
||||
T::Future: 'static,
|
||||
|
|
|
@ -3,14 +3,12 @@ use std::net::SocketAddr;
|
|||
use std::task::{Context, Poll};
|
||||
|
||||
use actix_service::{Service, ServiceFactory as BaseServiceFactory};
|
||||
use actix_utils::{
|
||||
counter::CounterGuard,
|
||||
future::{ready, Ready},
|
||||
};
|
||||
use actix_utils::future::{ready, Ready};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use log::error;
|
||||
|
||||
use crate::socket::{FromStream, MioStream};
|
||||
use crate::worker::WorkerCounterGuard;
|
||||
|
||||
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
|
||||
type Factory: BaseServiceFactory<Stream, Config = ()>;
|
||||
|
@ -28,7 +26,7 @@ pub(crate) trait InternalServiceFactory: Send {
|
|||
|
||||
pub(crate) type BoxedServerService = Box<
|
||||
dyn Service<
|
||||
(CounterGuard, MioStream),
|
||||
(WorkerCounterGuard, MioStream),
|
||||
Response = (),
|
||||
Error = (),
|
||||
Future = Ready<Result<(), ()>>,
|
||||
|
@ -49,7 +47,7 @@ impl<S, I> StreamService<S, I> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<S, I> Service<(CounterGuard, MioStream)> for StreamService<S, I>
|
||||
impl<S, I> Service<(WorkerCounterGuard, MioStream)> for StreamService<S, I>
|
||||
where
|
||||
S: Service<I>,
|
||||
S::Future: 'static,
|
||||
|
@ -64,7 +62,7 @@ where
|
|||
self.service.poll_ready(ctx).map_err(|_| ())
|
||||
}
|
||||
|
||||
fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future {
|
||||
fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future {
|
||||
ready(match FromStream::from_mio(req) {
|
||||
Ok(stream) => {
|
||||
let f = self.service.call(stream);
|
||||
|
|
|
@ -2,8 +2,9 @@ use std::{
|
|||
future::Future,
|
||||
mem,
|
||||
pin::Pin,
|
||||
rc::Rc,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
task::{Context, Poll},
|
||||
|
@ -15,7 +16,6 @@ use actix_rt::{
|
|||
time::{sleep, Instant, Sleep},
|
||||
Arbiter,
|
||||
};
|
||||
use actix_utils::counter::Counter;
|
||||
use futures_core::{future::LocalBoxFuture, ready};
|
||||
use log::{error, info, trace};
|
||||
use tokio::sync::{
|
||||
|
@ -45,28 +45,104 @@ fn handle_pair(
|
|||
idx: usize,
|
||||
tx1: UnboundedSender<Conn>,
|
||||
tx2: UnboundedSender<Stop>,
|
||||
avail: WorkerAvailability,
|
||||
counter: Counter,
|
||||
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
||||
let accept = WorkerHandleAccept { tx: tx1, avail };
|
||||
let accept = WorkerHandleAccept {
|
||||
idx,
|
||||
tx: tx1,
|
||||
counter,
|
||||
};
|
||||
|
||||
let server = WorkerHandleServer { idx, tx: tx2 };
|
||||
|
||||
(accept, server)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct Counter {
|
||||
counter: Arc<AtomicUsize>,
|
||||
limit: usize,
|
||||
}
|
||||
|
||||
impl Counter {
|
||||
pub(crate) fn new(limit: usize) -> Self {
|
||||
Self {
|
||||
counter: Arc::new(AtomicUsize::new(1)),
|
||||
limit,
|
||||
}
|
||||
}
|
||||
|
||||
/// Increment counter it by 1 and return true when hitting limit
|
||||
pub(crate) fn incr(&self) -> bool {
|
||||
self.counter.fetch_add(1, Ordering::Relaxed) != self.limit
|
||||
}
|
||||
|
||||
/// Decrement counter it by 1 and return true when hitting limit
|
||||
pub(crate) fn derc(&self) -> bool {
|
||||
self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit
|
||||
}
|
||||
|
||||
pub(crate) fn total(&self) -> usize {
|
||||
self.counter.load(Ordering::SeqCst) - 1
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct WorkerCounter {
|
||||
idx: usize,
|
||||
inner: Rc<(WakerQueue, Counter)>,
|
||||
}
|
||||
|
||||
impl Clone for WorkerCounter {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
idx: self.idx,
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl WorkerCounter {
|
||||
pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self {
|
||||
Self {
|
||||
idx,
|
||||
inner: Rc::new((waker_queue, counter)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn guard(&self) -> WorkerCounterGuard {
|
||||
WorkerCounterGuard(self.clone())
|
||||
}
|
||||
|
||||
fn total(&self) -> usize {
|
||||
self.inner.1.total()
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct WorkerCounterGuard(WorkerCounter);
|
||||
|
||||
impl Drop for WorkerCounterGuard {
|
||||
fn drop(&mut self) {
|
||||
let (waker_queue, counter) = &*self.0.inner;
|
||||
if counter.derc() {
|
||||
waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle to worker that can send connection message to worker and share the
|
||||
/// availability of worker to other thread.
|
||||
///
|
||||
/// Held by [Accept](crate::accept::Accept).
|
||||
pub(crate) struct WorkerHandleAccept {
|
||||
idx: usize,
|
||||
tx: UnboundedSender<Conn>,
|
||||
avail: WorkerAvailability,
|
||||
counter: Counter,
|
||||
}
|
||||
|
||||
impl WorkerHandleAccept {
|
||||
#[inline(always)]
|
||||
pub(crate) fn idx(&self) -> usize {
|
||||
self.avail.idx
|
||||
self.idx
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
|
@ -76,7 +152,7 @@ impl WorkerHandleAccept {
|
|||
|
||||
#[inline(always)]
|
||||
pub(crate) fn available(&self) -> bool {
|
||||
self.avail.available()
|
||||
self.counter.incr()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -96,40 +172,6 @@ impl WorkerHandleServer {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct WorkerAvailability {
|
||||
idx: usize,
|
||||
waker: WakerQueue,
|
||||
available: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl WorkerAvailability {
|
||||
pub fn new(idx: usize, waker: WakerQueue) -> Self {
|
||||
WorkerAvailability {
|
||||
idx,
|
||||
waker,
|
||||
available: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn available(&self) -> bool {
|
||||
self.available.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub fn set(&self, val: bool) {
|
||||
// Ordering:
|
||||
//
|
||||
// There could be multiple set calls happen in one <ServerWorker as Future>::poll.
|
||||
// Order is important between them.
|
||||
let old = self.available.swap(val, Ordering::AcqRel);
|
||||
// Notify the accept on switched to available.
|
||||
if !old && val {
|
||||
self.waker.wake(WakerInterest::WorkerAvailable(self.idx));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Service worker.
|
||||
///
|
||||
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
|
||||
|
@ -138,9 +180,8 @@ pub(crate) struct ServerWorker {
|
|||
// It must be dropped as soon as ServerWorker dropping.
|
||||
rx: UnboundedReceiver<Conn>,
|
||||
rx2: UnboundedReceiver<Stop>,
|
||||
counter: WorkerCounter,
|
||||
services: Box<[WorkerService]>,
|
||||
availability: WorkerAvailability,
|
||||
conns: Counter,
|
||||
factories: Box<[Box<dyn InternalServiceFactory>]>,
|
||||
state: WorkerState,
|
||||
shutdown_timeout: Duration,
|
||||
|
@ -207,15 +248,15 @@ impl ServerWorker {
|
|||
pub(crate) fn start(
|
||||
idx: usize,
|
||||
factories: Vec<Box<dyn InternalServiceFactory>>,
|
||||
availability: WorkerAvailability,
|
||||
waker_queue: WakerQueue,
|
||||
config: ServerWorkerConfig,
|
||||
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
||||
assert!(!availability.available());
|
||||
|
||||
let (tx1, rx) = unbounded_channel();
|
||||
let (tx2, rx2) = unbounded_channel();
|
||||
let avail = availability.clone();
|
||||
|
||||
let counter = Counter::new(config.max_concurrent_connections);
|
||||
|
||||
let counter_clone = counter.clone();
|
||||
// every worker runs in it's own arbiter.
|
||||
// use a custom tokio runtime builder to change the settings of runtime.
|
||||
Arbiter::with_tokio_rt(move || {
|
||||
|
@ -271,8 +312,7 @@ impl ServerWorker {
|
|||
rx,
|
||||
rx2,
|
||||
services,
|
||||
availability,
|
||||
conns: Counter::new(config.max_concurrent_connections),
|
||||
counter: WorkerCounter::new(idx, waker_queue, counter_clone),
|
||||
factories: factories.into_boxed_slice(),
|
||||
state: Default::default(),
|
||||
shutdown_timeout: config.shutdown_timeout,
|
||||
|
@ -280,7 +320,7 @@ impl ServerWorker {
|
|||
});
|
||||
});
|
||||
|
||||
handle_pair(idx, tx1, tx2, avail)
|
||||
handle_pair(idx, tx1, tx2, counter)
|
||||
}
|
||||
|
||||
fn restart_service(&mut self, idx: usize, factory_id: usize) {
|
||||
|
@ -308,7 +348,7 @@ impl ServerWorker {
|
|||
}
|
||||
|
||||
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (usize, usize)> {
|
||||
let mut ready = self.conns.available(cx);
|
||||
let mut ready = true;
|
||||
for (idx, srv) in self.services.iter_mut().enumerate() {
|
||||
if srv.status == WorkerServiceStatus::Available
|
||||
|| srv.status == WorkerServiceStatus::Unavailable
|
||||
|
@ -381,10 +421,6 @@ impl Default for WorkerState {
|
|||
|
||||
impl Drop for ServerWorker {
|
||||
fn drop(&mut self) {
|
||||
// Set availability to true so if accept try to send connection to this worker
|
||||
// it would find worker is gone and remove it.
|
||||
// This is helpful when worker is dropped unexpected.
|
||||
self.availability.set(true);
|
||||
// Stop the Arbiter ServerWorker runs on on drop.
|
||||
Arbiter::current().stop();
|
||||
}
|
||||
|
@ -399,8 +435,7 @@ impl Future for ServerWorker {
|
|||
// `StopWorker` message handler
|
||||
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
|
||||
{
|
||||
this.availability.set(false);
|
||||
let num = this.conns.total();
|
||||
let num = this.counter.total();
|
||||
if num == 0 {
|
||||
info!("Shutting down worker, 0 connections");
|
||||
let _ = tx.send(true);
|
||||
|
@ -427,7 +462,6 @@ impl Future for ServerWorker {
|
|||
WorkerState::Unavailable => match this.check_readiness(cx) {
|
||||
Ok(true) => {
|
||||
this.state = WorkerState::Available;
|
||||
this.availability.set(true);
|
||||
self.poll(cx)
|
||||
}
|
||||
Ok(false) => Poll::Pending,
|
||||
|
@ -468,7 +502,7 @@ impl Future for ServerWorker {
|
|||
// Wait for 1 second.
|
||||
ready!(shutdown.timer.as_mut().poll(cx));
|
||||
|
||||
if this.conns.total() == 0 {
|
||||
if this.counter.total() == 0 {
|
||||
// Graceful shutdown.
|
||||
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
|
||||
let _ = shutdown.tx.send(true);
|
||||
|
@ -493,13 +527,11 @@ impl Future for ServerWorker {
|
|||
Ok(true) => {}
|
||||
Ok(false) => {
|
||||
trace!("Worker is unavailable");
|
||||
this.availability.set(false);
|
||||
this.state = WorkerState::Unavailable;
|
||||
return self.poll(cx);
|
||||
}
|
||||
Err((token, idx)) => {
|
||||
this.restart_service(token, idx);
|
||||
this.availability.set(false);
|
||||
return self.poll(cx);
|
||||
}
|
||||
}
|
||||
|
@ -507,7 +539,7 @@ impl Future for ServerWorker {
|
|||
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
|
||||
// handle incoming io stream
|
||||
Some(msg) => {
|
||||
let guard = this.conns.get();
|
||||
let guard = this.counter.guard();
|
||||
let _ = this.services[msg.token].service.call((guard, msg.io));
|
||||
}
|
||||
None => return Poll::Ready(()),
|
||||
|
|
Loading…
Reference in New Issue