From ac4b6dcd6a6c65491e3c81879071a61971010f89 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Tue, 6 Apr 2021 01:14:57 +0800 Subject: [PATCH] add test --- actix-server/src/worker.rs | 10 +-- actix-server/tests/test_server.rs | 145 ++++++++++++++++++++++++++++++ 2 files changed, 149 insertions(+), 6 deletions(-) diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 1dbaeebd..2a52afee 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -404,18 +404,16 @@ impl Future for ServerWorker { let factory_id = restart.factory_id; let token = restart.token; - let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| { + let mut item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| { panic!( "Can not restart {:?} service", this.factories[factory_id].name(token) ) }); - // Only interest in the first item? - let (token, service) = item - .into_iter() - .next() - .expect("No BoxedServerService. Restarting can not progress"); + // Token should have a matching index with returned item vec. + debug_assert_eq!(item.get(token.0).unwrap().0, token); + let (token, service) = item.remove(token.0); trace!( "Service {:?} has been restarted", diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index cd61df9f..d7176d85 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -264,3 +264,148 @@ async fn test_max_concurrent_connections() { sys.stop(); let _ = h.join().unwrap(); } + +#[test] +fn test_service_restart() { + use std::net::Shutdown; + use std::task::{Context, Poll}; + + use actix_rt::net::TcpStream; + use actix_service::{fn_factory, Service}; + use futures_core::future::LocalBoxFuture; + + struct TestService(Arc); + + impl Service for TestService { + type Response = (); + type Error = (); + type Future = LocalBoxFuture<'static, Result>; + + fn poll_ready(&self, _: &mut Context<'_>) -> Poll> { + let TestService(ref counter) = self; + let c = counter.fetch_add(1, Ordering::SeqCst); + // Force the service to restart on first readiness check. + if c > 0 { + Poll::Ready(Ok(())) + } else { + Poll::Ready(Err(())) + } + } + + fn call(&self, _: TcpStream) -> Self::Future { + Box::pin(async { Ok(()) }) + } + } + + let addr1 = unused_addr(); + let addr2 = unused_addr(); + let (tx, rx) = mpsc::channel(); + let num = Arc::new(AtomicUsize::new(0)); + let num2 = Arc::new(AtomicUsize::new(0)); + + let num_clone = num.clone(); + let num2_clone = num2.clone(); + + let h = thread::spawn(move || { + actix_rt::System::new().block_on(async { + let server = Server::build() + .backlog(1) + .disable_signals() + .configure(move |cfg| { + let num = num.clone(); + let num2 = num2.clone(); + cfg.bind("addr1", addr1) + .unwrap() + .bind("addr2", addr2) + .unwrap() + .apply(move |rt| { + let num = num.clone(); + let num2 = num2.clone(); + rt.service( + "addr1", + fn_factory(move || { + let num = num.clone(); + async move { Ok::<_, ()>(TestService(num)) } + }), + ); + rt.service( + "addr2", + fn_factory(move || { + let num2 = num2.clone(); + async move { Ok::<_, ()>(TestService(num2)) } + }), + ); + }) + }) + .unwrap() + .workers(1) + .run(); + + let _ = tx.send((server.clone(), actix_rt::System::current())); + server.await + }) + }); + + let (server, sys) = rx.recv().unwrap(); + thread::sleep(time::Duration::from_millis(500)); + + for _ in 0..5 { + let conn = net::TcpStream::connect(addr1).unwrap(); + conn.shutdown(Shutdown::Both).unwrap(); + let conn = net::TcpStream::connect(addr2).unwrap(); + conn.shutdown(Shutdown::Both).unwrap(); + } + + thread::sleep(time::Duration::from_secs(1)); + + assert!(num_clone.load(Ordering::SeqCst) > 5); + assert!(num2_clone.load(Ordering::SeqCst) > 5); + + sys.stop(); + let _ = server.stop(false); + let _ = h.join().unwrap(); + + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + let num = Arc::new(AtomicUsize::new(0)); + + let num_clone = num.clone(); + + let h = thread::spawn(move || { + let num = num.clone(); + actix_rt::System::new().block_on(async { + let server = Server::build() + .backlog(1) + .disable_signals() + .bind("addr", addr, move || { + let num = num.clone(); + fn_factory(move || { + let num = num.clone(); + async move { Ok::<_, ()>(TestService(num)) } + }) + }) + .unwrap() + .workers(1) + .run(); + + let _ = tx.send((server.clone(), actix_rt::System::current())); + server.await + }) + }); + + let (server, sys) = rx.recv().unwrap(); + thread::sleep(time::Duration::from_millis(500)); + + for _ in 0..5 { + let conn = net::TcpStream::connect(addr).unwrap(); + conn.shutdown(Shutdown::Both).unwrap(); + } + + thread::sleep(time::Duration::from_secs(1)); + + assert!(num_clone.load(Ordering::SeqCst) > 5); + + sys.stop(); + let _ = server.stop(false); + let _ = h.join().unwrap(); +}