diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 20270a2f..c5e63630 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -7,14 +7,14 @@ use actix_service::{ fn_service, IntoServiceFactory as IntoBaseServiceFactory, ServiceFactory as BaseServiceFactory, }; -use actix_utils::counter::CounterGuard; +use actix_utils::{counter::CounterGuard, future::ready}; use futures_core::future::LocalBoxFuture; use log::error; use crate::builder::bind_addr; use crate::service::{BoxedServerService, InternalServiceFactory, StreamService}; use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::{ready, Token}; +use crate::Token; pub struct ServiceConfig { pub(crate) services: Vec<(String, MioTcpListener)>, @@ -243,7 +243,7 @@ impl ServiceRuntime { type BoxedNewService = Box< dyn BaseServiceFactory< - (Option, MioStream), + (CounterGuard, MioStream), Response = (), Error = (), InitError = (), @@ -257,7 +257,7 @@ struct ServiceFactory { inner: T, } -impl BaseServiceFactory<(Option, MioStream)> for ServiceFactory +impl BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory where T: BaseServiceFactory, T::Future: 'static, diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 24129b5a..af9ab0b0 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -55,24 +55,6 @@ pub fn new() -> ServerBuilder { ServerBuilder::default() } -// temporary Ready type for std::future::{ready, Ready}; Can be removed when MSRV surpass 1.48 -#[doc(hidden)] -pub struct Ready(Option); - -pub(crate) fn ready(t: T) -> Ready { - Ready(Some(t)) -} - -impl Unpin for Ready {} - -impl Future for Ready { - type Output = T; - - fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { - Poll::Ready(self.get_mut().0.take().unwrap()) - } -} - // a poor man's join future. joined future is only used when starting/stopping the server. // pin_project and pinned futures are overkill for this task. pub(crate) struct JoinAll { @@ -132,6 +114,8 @@ impl Future for JoinAll { mod test { use super::*; + use actix_utils::future::ready; + #[actix_rt::test] async fn test_join_all() { let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 63d2c1f5..da57af67 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -3,12 +3,15 @@ use std::net::SocketAddr; use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory as BaseServiceFactory}; -use actix_utils::counter::CounterGuard; +use actix_utils::{ + counter::CounterGuard, + future::{ready, Ready}, +}; use futures_core::future::LocalBoxFuture; use log::error; use crate::socket::{FromStream, MioStream}; -use crate::{ready, Ready, Token}; +use crate::Token; pub trait ServiceFactory: Send + Clone + 'static { type Factory: BaseServiceFactory; @@ -26,7 +29,7 @@ pub(crate) trait InternalServiceFactory: Send { pub(crate) type BoxedServerService = Box< dyn Service< - (Option, MioStream), + (CounterGuard, MioStream), Response = (), Error = (), Future = Ready>, @@ -47,7 +50,7 @@ impl StreamService { } } -impl Service<(Option, MioStream)> for StreamService +impl Service<(CounterGuard, MioStream)> for StreamService where S: Service, S::Future: 'static, @@ -62,7 +65,7 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&self, (guard, req): (Option, MioStream)) -> Self::Future { + fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future { ready(match FromStream::from_mio(req) { Ok(stream) => { let f = self.service.call(stream); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 5e843983..bd28ccda 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -262,6 +262,13 @@ impl ServerWorker { WorkerHandle::new(idx, tx1, tx2, avail) } + fn restart_service(&mut self, token: Token, idx: usize) { + let factory = &self.factories[idx]; + trace!("Service {:?} failed, restarting", factory.name(token)); + self.services[token.0].status = WorkerServiceStatus::Restarting; + self.state = WorkerState::Restarting(idx, token, factory.create()); + } + fn shutdown(&mut self, force: bool) { self.services .iter_mut() @@ -376,38 +383,32 @@ impl Future for ServerWorker { } Ok(false) => Poll::Pending, Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); - self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = - WorkerState::Restarting(idx, token, self.factories[idx].create()); + self.restart_service(token, idx); self.poll(cx) } }, WorkerState::Restarting(idx, token, ref mut fut) => { - match fut.as_mut().poll(cx) { - Poll::Ready(Ok(item)) => { - // only interest in the first item? - if let Some((token, service)) = item.into_iter().next() { - trace!( - "Service {:?} has been restarted", - self.factories[idx].name(token) - ); - self.services[token.0].created(service); - self.state = WorkerState::Unavailable; - return self.poll(cx); - } - } - Poll::Ready(Err(_)) => { - panic!( - "Can not restart {:?} service", - self.factories[idx].name(token) - ); - } - Poll::Pending => return Poll::Pending, - } + let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { + panic!( + "Can not restart {:?} service", + self.factories[idx].name(token) + ) + }); + + // Only interest in the first item? + let (token, service) = item + .into_iter() + .next() + .expect("No BoxedServerService. Restarting can not progress"); + + trace!( + "Service {:?} has been restarted", + self.factories[idx].name(token) + ); + + self.services[token.0].created(service); + self.state = WorkerState::Unavailable; + self.poll(cx) } WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => { @@ -437,7 +438,7 @@ impl Future for ServerWorker { // actively poll stream and handle worker command WorkerState::Available => loop { match self.check_readiness(cx) { - Ok(true) => (), + Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); self.availability.set(false); @@ -445,14 +446,8 @@ impl Future for ServerWorker { return self.poll(cx); } Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); + self.restart_service(token, idx); self.availability.set(false); - self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = - WorkerState::Restarting(idx, token, self.factories[idx].create()); return self.poll(cx); } } @@ -461,9 +456,7 @@ impl Future for ServerWorker { // handle incoming io stream Some(WorkerCommand(msg)) => { let guard = self.conns.get(); - let _ = self.services[msg.token.0] - .service - .call((Some(guard), msg.io)); + let _ = self.services[msg.token.0].service.call((guard, msg.io)); } None => return Poll::Ready(()), }; diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 6d413eea..cd61df9f 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{mpsc, Arc}; use std::{net, thread, time}; @@ -169,7 +169,7 @@ fn test_configure() { rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); rt.on_start(lazy(move |_| { - let _ = num.fetch_add(1, Relaxed); + let _ = num.fetch_add(1, Ordering::Relaxed); })) }) }) @@ -187,7 +187,80 @@ fn test_configure() { assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok()); assert!(net::TcpStream::connect(addr3).is_ok()); - assert_eq!(num.load(Relaxed), 1); + assert_eq!(num.load(Ordering::Relaxed), 1); sys.stop(); let _ = h.join(); } + +#[actix_rt::test] +async fn test_max_concurrent_connections() { + // Note: + // A tcp listener would accept connects based on it's backlog setting. + // + // The limit test on the other hand is only for concurrent tcp stream limiting a work + // thread accept. + + use actix_rt::net::TcpStream; + use tokio::io::AsyncWriteExt; + + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + + let max_conn = 3; + + let h = thread::spawn(move || { + actix_rt::System::new().block_on(async { + let server = Server::build() + // Set a relative higher backlog. + .backlog(12) + // max connection for a worker is 3. + .maxconn(max_conn) + .workers(1) + .disable_signals() + .bind("test", addr, move || { + let counter = counter.clone(); + fn_service(move |_io: TcpStream| { + let counter = counter.clone(); + async move { + counter.fetch_add(1, Ordering::SeqCst); + actix_rt::time::sleep(time::Duration::from_secs(20)).await; + counter.fetch_sub(1, Ordering::SeqCst); + Ok::<(), ()>(()) + } + }) + })? + .run(); + + let _ = tx.send((server.clone(), actix_rt::System::current())); + + server.await + }) + }); + + let (srv, sys) = rx.recv().unwrap(); + + let mut conns = vec![]; + + for _ in 0..12 { + let conn = tokio::net::TcpStream::connect(addr).await.unwrap(); + conns.push(conn); + } + + actix_rt::time::sleep(time::Duration::from_secs(5)).await; + + // counter would remain at 3 even with 12 successful connection. + // and 9 of them remain in backlog. + assert_eq!(max_conn, counter_clone.load(Ordering::SeqCst)); + + for mut conn in conns { + conn.shutdown().await.unwrap(); + } + + srv.stop(false).await; + + sys.stop(); + let _ = h.join().unwrap(); +}