Remove MAX_CONN

This commit is contained in:
fakeshadow 2021-04-04 21:10:13 +08:00
parent 8079c50ddb
commit 3775fb6f2d
2 changed files with 39 additions and 54 deletions

View File

@ -19,7 +19,7 @@ use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{self, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle}; use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle};
use crate::{join_all, Token}; use crate::{join_all, Token};
/// Server builder /// Server builder
@ -117,8 +117,8 @@ impl ServerBuilder {
/// reached for each worker. /// reached for each worker.
/// ///
/// By default max connections is set to a 25k per worker. /// By default max connections is set to a 25k per worker.
pub fn maxconn(self, num: usize) -> Self { pub fn maxconn(mut self, num: usize) -> Self {
worker::max_concurrent_connections(num); self.worker_config.max_concurrent_connections(num);
self self
} }

View File

@ -1,6 +1,6 @@
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
@ -33,27 +33,6 @@ pub(crate) struct Conn {
pub token: Token, pub token: Token,
} }
static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600);
/// Sets the maximum per-worker number of concurrent connections.
///
/// All socket listeners will stop accepting connections when this limit is
/// reached for each worker.
///
/// By default max connections is set to a 25k per worker.
pub fn max_concurrent_connections(num: usize) {
MAX_CONNS.store(num, Ordering::Relaxed);
}
thread_local! {
static MAX_CONNS_COUNTER: Counter =
Counter::new(MAX_CONNS.load(Ordering::Relaxed));
}
pub(crate) fn num_connections() -> usize {
MAX_CONNS_COUNTER.with(|conns| conns.total())
}
// a handle to worker that can send message to worker and share the availability of worker to other // a handle to worker that can send message to worker and share the availability of worker to other
// thread. // thread.
#[derive(Clone)] #[derive(Clone)]
@ -163,6 +142,7 @@ enum WorkerServiceStatus {
pub(crate) struct ServerWorkerConfig { pub(crate) struct ServerWorkerConfig {
shutdown_timeout: Duration, shutdown_timeout: Duration,
max_blocking_threads: usize, max_blocking_threads: usize,
max_concurrent_connections: usize,
} }
impl Default for ServerWorkerConfig { impl Default for ServerWorkerConfig {
@ -172,6 +152,7 @@ impl Default for ServerWorkerConfig {
Self { Self {
shutdown_timeout: Duration::from_secs(30), shutdown_timeout: Duration::from_secs(30),
max_blocking_threads, max_blocking_threads,
max_concurrent_connections: 25600,
} }
} }
} }
@ -181,6 +162,10 @@ impl ServerWorkerConfig {
self.max_blocking_threads = num; self.max_blocking_threads = num;
} }
pub(crate) fn max_concurrent_connections(&mut self, num: usize) {
self.max_concurrent_connections = num;
}
pub(crate) fn shutdown_timeout(&mut self, dur: Duration) { pub(crate) fn shutdown_timeout(&mut self, dur: Duration) {
self.shutdown_timeout = dur; self.shutdown_timeout = dur;
} }
@ -208,16 +193,16 @@ impl ServerWorker {
}) })
.spawn(async move { .spawn(async move {
availability.set(false); availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { let mut wrk = ServerWorker {
rx, rx,
rx2, rx2,
availability, availability,
factories, factories,
config, config,
services: Vec::new(), services: Vec::new(),
conns: conns.clone(), conns: Counter::new(config.max_concurrent_connections),
state: WorkerState::Unavailable, state: WorkerState::Unavailable,
}); };
let fut = wrk let fut = wrk
.factories .factories
@ -348,37 +333,38 @@ impl Future for ServerWorker {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
// `StopWorker` message handler // `StopWorker` message handler
if let Poll::Ready(Some(StopCommand { graceful, result })) = if let Poll::Ready(Some(StopCommand { graceful, result })) =
Pin::new(&mut self.rx2).poll_recv(cx) Pin::new(&mut this.rx2).poll_recv(cx)
{ {
self.availability.set(false); this.availability.set(false);
let num = num_connections(); let num = this.conns.total();
if num == 0 { if num == 0 {
info!("Shutting down worker, 0 connections"); info!("Shutting down worker, 0 connections");
let _ = result.send(true); let _ = result.send(true);
return Poll::Ready(()); return Poll::Ready(());
} else if graceful { } else if graceful {
self.shutdown(false); this.shutdown(false);
info!("Graceful worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown( this.state = WorkerState::Shutdown(
Box::pin(sleep(Duration::from_secs(1))), Box::pin(sleep(Duration::from_secs(1))),
Box::pin(sleep(self.config.shutdown_timeout)), Box::pin(sleep(this.config.shutdown_timeout)),
Some(result), Some(result),
); );
} else { } else {
info!("Force shutdown worker, {} connections", num); info!("Force shutdown worker, {} connections", num);
self.shutdown(true); this.shutdown(true);
let _ = result.send(false); let _ = result.send(false);
return Poll::Ready(()); return Poll::Ready(());
} }
} }
match self.state { match this.state {
WorkerState::Unavailable => match self.check_readiness(cx) { WorkerState::Unavailable => match this.check_readiness(cx) {
Ok(true) => { Ok(true) => {
self.state = WorkerState::Available; this.state = WorkerState::Available;
self.availability.set(true); this.availability.set(true);
self.poll(cx) self.poll(cx)
} }
Ok(false) => Poll::Pending, Ok(false) => Poll::Pending,
@ -391,7 +377,7 @@ impl Future for ServerWorker {
let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| {
panic!( panic!(
"Can not restart {:?} service", "Can not restart {:?} service",
self.factories[idx].name(token) this.factories[idx].name(token)
) )
}); });
@ -403,17 +389,16 @@ impl Future for ServerWorker {
trace!( trace!(
"Service {:?} has been restarted", "Service {:?} has been restarted",
self.factories[idx].name(token) this.factories[idx].name(token)
); );
self.services[token.0].created(service); this.services[token.0].created(service);
self.state = WorkerState::Unavailable; this.state = WorkerState::Unavailable;
self.poll(cx) self.poll(cx)
} }
WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => { WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => {
let num = num_connections(); if this.conns.total() == 0 {
if num == 0 {
let _ = tx.take().unwrap().send(true); let _ = tx.take().unwrap().send(true);
Arbiter::current().stop(); Arbiter::current().stop();
return Poll::Ready(()); return Poll::Ready(());
@ -422,7 +407,7 @@ impl Future for ServerWorker {
// check graceful timeout // check graceful timeout
if Pin::new(t2).poll(cx).is_ready() { if Pin::new(t2).poll(cx).is_ready() {
let _ = tx.take().unwrap().send(false); let _ = tx.take().unwrap().send(false);
self.shutdown(true); this.shutdown(true);
Arbiter::current().stop(); Arbiter::current().stop();
return Poll::Ready(()); return Poll::Ready(());
} }
@ -437,26 +422,26 @@ impl Future for ServerWorker {
} }
// actively poll stream and handle worker command // actively poll stream and handle worker command
WorkerState::Available => loop { WorkerState::Available => loop {
match self.check_readiness(cx) { match this.check_readiness(cx) {
Ok(true) => {} Ok(true) => {}
Ok(false) => { Ok(false) => {
trace!("Worker is unavailable"); trace!("Worker is unavailable");
self.availability.set(false); this.availability.set(false);
self.state = WorkerState::Unavailable; this.state = WorkerState::Unavailable;
return self.poll(cx); return self.poll(cx);
} }
Err((token, idx)) => { Err((token, idx)) => {
self.restart_service(token, idx); this.restart_service(token, idx);
self.availability.set(false); this.availability.set(false);
return self.poll(cx); return self.poll(cx);
} }
} }
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
// handle incoming io stream // handle incoming io stream
Some(WorkerCommand(msg)) => { Some(WorkerCommand(msg)) => {
let guard = self.conns.get(); let guard = this.conns.get();
let _ = self.services[msg.token.0].service.call((guard, msg.io)); let _ = this.services[msg.token.0].service.call((guard, msg.io));
} }
None => return Poll::Ready(()), None => return Poll::Ready(()),
}; };