From 39d1f282f77d4b5abce06a18de6999a87f458678 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 3 Apr 2021 11:01:00 -0700 Subject: [PATCH 1/2] add test for max concurrent connections (#311) --- actix-server/tests/test_server.rs | 79 +++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 6d413eea..cd61df9f 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{mpsc, Arc}; use std::{net, thread, time}; @@ -169,7 +169,7 @@ fn test_configure() { rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); rt.on_start(lazy(move |_| { - let _ = num.fetch_add(1, Relaxed); + let _ = num.fetch_add(1, Ordering::Relaxed); })) }) }) @@ -187,7 +187,80 @@ fn test_configure() { assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok()); assert!(net::TcpStream::connect(addr3).is_ok()); - assert_eq!(num.load(Relaxed), 1); + assert_eq!(num.load(Ordering::Relaxed), 1); sys.stop(); let _ = h.join(); } + +#[actix_rt::test] +async fn test_max_concurrent_connections() { + // Note: + // A tcp listener would accept connects based on it's backlog setting. + // + // The limit test on the other hand is only for concurrent tcp stream limiting a work + // thread accept. + + use actix_rt::net::TcpStream; + use tokio::io::AsyncWriteExt; + + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + + let max_conn = 3; + + let h = thread::spawn(move || { + actix_rt::System::new().block_on(async { + let server = Server::build() + // Set a relative higher backlog. + .backlog(12) + // max connection for a worker is 3. + .maxconn(max_conn) + .workers(1) + .disable_signals() + .bind("test", addr, move || { + let counter = counter.clone(); + fn_service(move |_io: TcpStream| { + let counter = counter.clone(); + async move { + counter.fetch_add(1, Ordering::SeqCst); + actix_rt::time::sleep(time::Duration::from_secs(20)).await; + counter.fetch_sub(1, Ordering::SeqCst); + Ok::<(), ()>(()) + } + }) + })? + .run(); + + let _ = tx.send((server.clone(), actix_rt::System::current())); + + server.await + }) + }); + + let (srv, sys) = rx.recv().unwrap(); + + let mut conns = vec![]; + + for _ in 0..12 { + let conn = tokio::net::TcpStream::connect(addr).await.unwrap(); + conns.push(conn); + } + + actix_rt::time::sleep(time::Duration::from_secs(5)).await; + + // counter would remain at 3 even with 12 successful connection. + // and 9 of them remain in backlog. + assert_eq!(max_conn, counter_clone.load(Ordering::SeqCst)); + + for mut conn in conns { + conn.shutdown().await.unwrap(); + } + + srv.stop(false).await; + + sys.stop(); + let _ = h.join().unwrap(); +} From fd3e5fba027c83fe4cbc05ddaa1eb9cb5d6eba0b Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 3 Apr 2021 11:40:12 -0700 Subject: [PATCH 2/2] Refactor actix_server WorkerState::Restarting enum variant. (#306) Co-authored-by: Rob Ede --- actix-server/src/config.rs | 4 ++-- actix-server/src/lib.rs | 20 ++---------------- actix-server/src/service.rs | 7 +++++-- actix-server/src/worker.rs | 42 ++++++++++++++++++------------------- 4 files changed, 30 insertions(+), 43 deletions(-) diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 20270a2f..28e4fdeb 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -7,14 +7,14 @@ use actix_service::{ fn_service, IntoServiceFactory as IntoBaseServiceFactory, ServiceFactory as BaseServiceFactory, }; -use actix_utils::counter::CounterGuard; +use actix_utils::{counter::CounterGuard, future::ready}; use futures_core::future::LocalBoxFuture; use log::error; use crate::builder::bind_addr; use crate::service::{BoxedServerService, InternalServiceFactory, StreamService}; use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::{ready, Token}; +use crate::Token; pub struct ServiceConfig { pub(crate) services: Vec<(String, MioTcpListener)>, diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 24129b5a..af9ab0b0 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -55,24 +55,6 @@ pub fn new() -> ServerBuilder { ServerBuilder::default() } -// temporary Ready type for std::future::{ready, Ready}; Can be removed when MSRV surpass 1.48 -#[doc(hidden)] -pub struct Ready(Option); - -pub(crate) fn ready(t: T) -> Ready { - Ready(Some(t)) -} - -impl Unpin for Ready {} - -impl Future for Ready { - type Output = T; - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - Poll::Ready(self.get_mut().0.take().unwrap()) - } -} - // a poor man's join future. joined future is only used when starting/stopping the server. // pin_project and pinned futures are overkill for this task. pub(crate) struct JoinAll { @@ -132,6 +114,8 @@ impl Future for JoinAll { mod test { use super::*; + use actix_utils::future::ready; + #[actix_rt::test] async fn test_join_all() { let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 63d2c1f5..835ee10b 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -3,12 +3,15 @@ use std::net::SocketAddr; use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory as BaseServiceFactory}; -use actix_utils::counter::CounterGuard; +use actix_utils::{ + counter::CounterGuard, + future::{ready, Ready}, +}; use futures_core::future::LocalBoxFuture; use log::error; use crate::socket::{FromStream, MioStream}; -use crate::{ready, Ready, Token}; +use crate::Token; pub trait ServiceFactory: Send + Clone + 'static { type Factory: BaseServiceFactory; diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 5e843983..366dab0b 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -387,27 +387,27 @@ impl Future for ServerWorker { } }, WorkerState::Restarting(idx, token, ref mut fut) => { - match fut.as_mut().poll(cx) { - Poll::Ready(Ok(item)) => { - // only interest in the first item? - if let Some((token, service)) = item.into_iter().next() { - trace!( - "Service {:?} has been restarted", - self.factories[idx].name(token) - ); - self.services[token.0].created(service); - self.state = WorkerState::Unavailable; - return self.poll(cx); - } - } - Poll::Ready(Err(_)) => { - panic!( - "Can not restart {:?} service", - self.factories[idx].name(token) - ); - } - Poll::Pending => return Poll::Pending, - } + let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { + panic!( + "Can not restart {:?} service", + self.factories[idx].name(token) + ) + }); + + // Only interest in the first item? + let (token, service) = item + .into_iter() + .next() + .expect("No BoxedServerService. Restarting can not progress"); + + trace!( + "Service {:?} has been restarted", + self.factories[idx].name(token) + ); + + self.services[token.0].created(service); + self.state = WorkerState::Unavailable; + self.poll(cx) } WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => {