diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index bce22dbe..bd694e7c 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -22,10 +22,7 @@ use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ - ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept, - WorkerHandleServer, -}; +use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; /// Server builder pub struct ServerBuilder { @@ -320,12 +317,11 @@ impl ServerBuilder { fn start_worker( &self, idx: usize, - waker: WakerQueue, + waker_queue: WakerQueue, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let avail = WorkerAvailability::new(idx, waker); let services = self.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, services, avail, self.worker_config) + ServerWorker::start(idx, services, waker_queue, self.worker_config) } fn handle_cmd(&mut self, item: ServerCommand) { diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 722ff1ea..111daf90 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -7,13 +7,16 @@ use actix_service::{ fn_service, IntoServiceFactory as IntoBaseServiceFactory, ServiceFactory as BaseServiceFactory, }; -use actix_utils::{counter::CounterGuard, future::ready}; +use actix_utils::future::ready; use futures_core::future::LocalBoxFuture; use log::error; -use crate::builder::bind_addr; -use crate::service::{BoxedServerService, InternalServiceFactory, StreamService}; -use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; +use crate::{ + builder::bind_addr, + service::{BoxedServerService, InternalServiceFactory, StreamService}, + socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}, + worker::WorkerCounterGuard, +}; pub struct ServiceConfig { pub(crate) services: Vec<(String, MioTcpListener)>, @@ -242,7 +245,7 @@ impl ServiceRuntime { type BoxedNewService = Box< dyn BaseServiceFactory< - (CounterGuard, MioStream), + (WorkerCounterGuard, MioStream), Response = (), Error = (), InitError = (), @@ -256,7 +259,7 @@ struct ServiceFactory { inner: T, } -impl BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory +impl BaseServiceFactory<(WorkerCounterGuard, MioStream)> for ServiceFactory where T: BaseServiceFactory, T::Future: 'static, diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 6faa3d00..bc436e75 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -3,14 +3,12 @@ use std::net::SocketAddr; use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory as BaseServiceFactory}; -use actix_utils::{ - counter::CounterGuard, - future::{ready, Ready}, -}; +use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; use log::error; use crate::socket::{FromStream, MioStream}; +use crate::worker::WorkerCounterGuard; pub trait ServiceFactory: Send + Clone + 'static { type Factory: BaseServiceFactory; @@ -28,7 +26,7 @@ pub(crate) trait InternalServiceFactory: Send { pub(crate) type BoxedServerService = Box< dyn Service< - (CounterGuard, MioStream), + (WorkerCounterGuard, MioStream), Response = (), Error = (), Future = Ready>, @@ -49,7 +47,7 @@ impl StreamService { } } -impl Service<(CounterGuard, MioStream)> for StreamService +impl Service<(WorkerCounterGuard, MioStream)> for StreamService where S: Service, S::Future: 'static, @@ -64,7 +62,7 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future { + fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future { ready(match FromStream::from_mio(req) { Ok(stream) => { let f = self.service.call(stream); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 5f8a8cf5..8a67046b 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -2,8 +2,9 @@ use std::{ future::Future, mem, pin::Pin, + rc::Rc, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }, task::{Context, Poll}, @@ -15,7 +16,6 @@ use actix_rt::{ time::{sleep, Instant, Sleep}, Arbiter, }; -use actix_utils::counter::Counter; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; use tokio::sync::{ @@ -45,28 +45,104 @@ fn handle_pair( idx: usize, tx1: UnboundedSender, tx2: UnboundedSender, - avail: WorkerAvailability, + counter: Counter, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let accept = WorkerHandleAccept { tx: tx1, avail }; + let accept = WorkerHandleAccept { + idx, + tx: tx1, + counter, + }; let server = WorkerHandleServer { idx, tx: tx2 }; (accept, server) } +#[derive(Clone)] +pub(crate) struct Counter { + counter: Arc, + limit: usize, +} + +impl Counter { + pub(crate) fn new(limit: usize) -> Self { + Self { + counter: Arc::new(AtomicUsize::new(1)), + limit, + } + } + + /// Increment counter it by 1 and return true when hitting limit + pub(crate) fn incr(&self) -> bool { + self.counter.fetch_add(1, Ordering::Relaxed) != self.limit + } + + /// Decrement counter it by 1 and return true when hitting limit + pub(crate) fn derc(&self) -> bool { + self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit + } + + pub(crate) fn total(&self) -> usize { + self.counter.load(Ordering::SeqCst) - 1 + } +} + +pub(crate) struct WorkerCounter { + idx: usize, + inner: Rc<(WakerQueue, Counter)>, +} + +impl Clone for WorkerCounter { + fn clone(&self) -> Self { + Self { + idx: self.idx, + inner: self.inner.clone(), + } + } +} + +impl WorkerCounter { + pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self { + Self { + idx, + inner: Rc::new((waker_queue, counter)), + } + } + + pub(crate) fn guard(&self) -> WorkerCounterGuard { + WorkerCounterGuard(self.clone()) + } + + fn total(&self) -> usize { + self.inner.1.total() + } +} + +pub(crate) struct WorkerCounterGuard(WorkerCounter); + +impl Drop for WorkerCounterGuard { + fn drop(&mut self) { + let (waker_queue, counter) = &*self.0.inner; + if counter.derc() { + waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx)); + } + } +} + /// Handle to worker that can send connection message to worker and share the /// availability of worker to other thread. /// /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { + idx: usize, tx: UnboundedSender, - avail: WorkerAvailability, + counter: Counter, } impl WorkerHandleAccept { #[inline(always)] pub(crate) fn idx(&self) -> usize { - self.avail.idx + self.idx } #[inline(always)] @@ -76,7 +152,7 @@ impl WorkerHandleAccept { #[inline(always)] pub(crate) fn available(&self) -> bool { - self.avail.available() + self.counter.incr() } } @@ -96,40 +172,6 @@ impl WorkerHandleServer { } } -#[derive(Clone)] -pub(crate) struct WorkerAvailability { - idx: usize, - waker: WakerQueue, - available: Arc, -} - -impl WorkerAvailability { - pub fn new(idx: usize, waker: WakerQueue) -> Self { - WorkerAvailability { - idx, - waker, - available: Arc::new(AtomicBool::new(false)), - } - } - - #[inline(always)] - pub fn available(&self) -> bool { - self.available.load(Ordering::Acquire) - } - - pub fn set(&self, val: bool) { - // Ordering: - // - // There could be multiple set calls happen in one ::poll. - // Order is important between them. - let old = self.available.swap(val, Ordering::AcqRel); - // Notify the accept on switched to available. - if !old && val { - self.waker.wake(WakerInterest::WorkerAvailable(self.idx)); - } - } -} - /// Service worker. /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. @@ -138,9 +180,8 @@ pub(crate) struct ServerWorker { // It must be dropped as soon as ServerWorker dropping. rx: UnboundedReceiver, rx2: UnboundedReceiver, + counter: WorkerCounter, services: Box<[WorkerService]>, - availability: WorkerAvailability, - conns: Counter, factories: Box<[Box]>, state: WorkerState, shutdown_timeout: Duration, @@ -207,15 +248,15 @@ impl ServerWorker { pub(crate) fn start( idx: usize, factories: Vec>, - availability: WorkerAvailability, + waker_queue: WakerQueue, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { - assert!(!availability.available()); - let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); - let avail = availability.clone(); + let counter = Counter::new(config.max_concurrent_connections); + + let counter_clone = counter.clone(); // every worker runs in it's own arbiter. // use a custom tokio runtime builder to change the settings of runtime. Arbiter::with_tokio_rt(move || { @@ -271,8 +312,7 @@ impl ServerWorker { rx, rx2, services, - availability, - conns: Counter::new(config.max_concurrent_connections), + counter: WorkerCounter::new(idx, waker_queue, counter_clone), factories: factories.into_boxed_slice(), state: Default::default(), shutdown_timeout: config.shutdown_timeout, @@ -280,7 +320,7 @@ impl ServerWorker { }); }); - handle_pair(idx, tx1, tx2, avail) + handle_pair(idx, tx1, tx2, counter) } fn restart_service(&mut self, idx: usize, factory_id: usize) { @@ -308,7 +348,7 @@ impl ServerWorker { } fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { - let mut ready = self.conns.available(cx); + let mut ready = true; for (idx, srv) in self.services.iter_mut().enumerate() { if srv.status == WorkerServiceStatus::Available || srv.status == WorkerServiceStatus::Unavailable @@ -381,10 +421,6 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { - // Set availability to true so if accept try to send connection to this worker - // it would find worker is gone and remove it. - // This is helpful when worker is dropped unexpected. - self.availability.set(true); // Stop the Arbiter ServerWorker runs on on drop. Arbiter::current().stop(); } @@ -399,8 +435,7 @@ impl Future for ServerWorker { // `StopWorker` message handler if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) { - this.availability.set(false); - let num = this.conns.total(); + let num = this.counter.total(); if num == 0 { info!("Shutting down worker, 0 connections"); let _ = tx.send(true); @@ -427,7 +462,6 @@ impl Future for ServerWorker { WorkerState::Unavailable => match this.check_readiness(cx) { Ok(true) => { this.state = WorkerState::Available; - this.availability.set(true); self.poll(cx) } Ok(false) => Poll::Pending, @@ -468,7 +502,7 @@ impl Future for ServerWorker { // Wait for 1 second. ready!(shutdown.timer.as_mut().poll(cx)); - if this.conns.total() == 0 { + if this.counter.total() == 0 { // Graceful shutdown. if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { let _ = shutdown.tx.send(true); @@ -493,13 +527,11 @@ impl Future for ServerWorker { Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); - this.availability.set(false); this.state = WorkerState::Unavailable; return self.poll(cx); } Err((token, idx)) => { this.restart_service(token, idx); - this.availability.set(false); return self.poll(cx); } } @@ -507,7 +539,7 @@ impl Future for ServerWorker { match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { // handle incoming io stream Some(msg) => { - let guard = this.conns.get(); + let guard = this.counter.guard(); let _ = this.services[msg.token].service.call((guard, msg.io)); } None => return Poll::Ready(()),