diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index c20bb4f5..fdb02205 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -19,7 +19,7 @@ use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{self, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle}; +use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle}; use crate::{join_all, Token}; /// Server builder @@ -117,8 +117,8 @@ impl ServerBuilder { /// reached for each worker. /// /// By default max connections is set to a 25k per worker. - pub fn maxconn(self, num: usize) -> Self { - worker::max_concurrent_connections(num); + pub fn maxconn(mut self, num: usize) -> Self { + self.worker_config.max_concurrent_connections(num); self } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index bd28ccda..c01273b2 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,6 +1,6 @@ use std::future::Future; use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -33,27 +33,6 @@ pub(crate) struct Conn { pub token: Token, } -static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); - -/// Sets the maximum per-worker number of concurrent connections. -/// -/// All socket listeners will stop accepting connections when this limit is -/// reached for each worker. -/// -/// By default max connections is set to a 25k per worker. -pub fn max_concurrent_connections(num: usize) { - MAX_CONNS.store(num, Ordering::Relaxed); -} - -thread_local! { - static MAX_CONNS_COUNTER: Counter = - Counter::new(MAX_CONNS.load(Ordering::Relaxed)); -} - -pub(crate) fn num_connections() -> usize { - MAX_CONNS_COUNTER.with(|conns| conns.total()) -} - // a handle to worker that can send message to worker and share the availability of worker to other // thread. #[derive(Clone)] @@ -163,6 +142,7 @@ enum WorkerServiceStatus { pub(crate) struct ServerWorkerConfig { shutdown_timeout: Duration, max_blocking_threads: usize, + max_concurrent_connections: usize, } impl Default for ServerWorkerConfig { @@ -172,6 +152,7 @@ impl Default for ServerWorkerConfig { Self { shutdown_timeout: Duration::from_secs(30), max_blocking_threads, + max_concurrent_connections: 25600, } } } @@ -181,6 +162,10 @@ impl ServerWorkerConfig { self.max_blocking_threads = num; } + pub(crate) fn max_concurrent_connections(&mut self, num: usize) { + self.max_concurrent_connections = num; + } + pub(crate) fn shutdown_timeout(&mut self, dur: Duration) { self.shutdown_timeout = dur; } @@ -208,16 +193,16 @@ impl ServerWorker { }) .spawn(async move { availability.set(false); - let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { + let mut wrk = ServerWorker { rx, rx2, availability, factories, config, services: Vec::new(), - conns: conns.clone(), + conns: Counter::new(config.max_concurrent_connections), state: WorkerState::Unavailable, - }); + }; let fut = wrk .factories @@ -348,37 +333,38 @@ impl Future for ServerWorker { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().get_mut(); // `StopWorker` message handler if let Poll::Ready(Some(StopCommand { graceful, result })) = - Pin::new(&mut self.rx2).poll_recv(cx) + Pin::new(&mut this.rx2).poll_recv(cx) { - self.availability.set(false); - let num = num_connections(); + this.availability.set(false); + let num = this.conns.total(); if num == 0 { info!("Shutting down worker, 0 connections"); let _ = result.send(true); return Poll::Ready(()); } else if graceful { - self.shutdown(false); + this.shutdown(false); info!("Graceful worker shutdown, {} connections", num); - self.state = WorkerState::Shutdown( + this.state = WorkerState::Shutdown( Box::pin(sleep(Duration::from_secs(1))), - Box::pin(sleep(self.config.shutdown_timeout)), + Box::pin(sleep(this.config.shutdown_timeout)), Some(result), ); } else { info!("Force shutdown worker, {} connections", num); - self.shutdown(true); + this.shutdown(true); let _ = result.send(false); return Poll::Ready(()); } } - match self.state { - WorkerState::Unavailable => match self.check_readiness(cx) { + match this.state { + WorkerState::Unavailable => match this.check_readiness(cx) { Ok(true) => { - self.state = WorkerState::Available; - self.availability.set(true); + this.state = WorkerState::Available; + this.availability.set(true); self.poll(cx) } Ok(false) => Poll::Pending, @@ -391,7 +377,7 @@ impl Future for ServerWorker { let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { panic!( "Can not restart {:?} service", - self.factories[idx].name(token) + this.factories[idx].name(token) ) }); @@ -403,17 +389,16 @@ impl Future for ServerWorker { trace!( "Service {:?} has been restarted", - self.factories[idx].name(token) + this.factories[idx].name(token) ); - self.services[token.0].created(service); - self.state = WorkerState::Unavailable; + this.services[token.0].created(service); + this.state = WorkerState::Unavailable; self.poll(cx) } WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => { - let num = num_connections(); - if num == 0 { + if this.conns.total() == 0 { let _ = tx.take().unwrap().send(true); Arbiter::current().stop(); return Poll::Ready(()); @@ -422,7 +407,7 @@ impl Future for ServerWorker { // check graceful timeout if Pin::new(t2).poll(cx).is_ready() { let _ = tx.take().unwrap().send(false); - self.shutdown(true); + this.shutdown(true); Arbiter::current().stop(); return Poll::Ready(()); } @@ -437,26 +422,26 @@ impl Future for ServerWorker { } // actively poll stream and handle worker command WorkerState::Available => loop { - match self.check_readiness(cx) { + match this.check_readiness(cx) { Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); - self.availability.set(false); - self.state = WorkerState::Unavailable; + this.availability.set(false); + this.state = WorkerState::Unavailable; return self.poll(cx); } Err((token, idx)) => { - self.restart_service(token, idx); - self.availability.set(false); + this.restart_service(token, idx); + this.availability.set(false); return self.poll(cx); } } - match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { // handle incoming io stream Some(WorkerCommand(msg)) => { - let guard = self.conns.get(); - let _ = self.services[msg.token.0].service.call((guard, msg.io)); + let guard = this.conns.get(); + let _ = this.services[msg.token.0].service.call((guard, msg.io)); } None => return Poll::Ready(()), };