diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index a20df0c1..cc9f8190 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -436,3 +436,144 @@ async fn test_service_restart() { let _ = server.stop(false); let _ = h.join().unwrap(); } + +#[ignore] +#[actix_rt::test] +async fn worker_restart() { + use actix_service::{Service, ServiceFactory}; + use futures_core::future::LocalBoxFuture; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + struct TestServiceFactory(Arc); + + impl ServiceFactory for TestServiceFactory { + type Response = (); + type Error = (); + type Config = (); + type Service = TestService; + type InitError = (); + type Future = LocalBoxFuture<'static, Result>; + + fn new_service(&self, _: Self::Config) -> Self::Future { + let counter = self.0.fetch_add(1, Ordering::Relaxed); + + Box::pin(async move { Ok(TestService(counter)) }) + } + } + + struct TestService(usize); + + impl Service for TestService { + type Response = (); + type Error = (); + type Future = LocalBoxFuture<'static, Result>; + + actix_service::always_ready!(); + + fn call(&self, stream: TcpStream) -> Self::Future { + let counter = self.0; + + let mut stream = stream.into_std().unwrap(); + use std::io::Write; + let str = counter.to_string(); + let buf = str.as_bytes(); + + let mut written = 0; + + while written < buf.len() { + if let Ok(n) = stream.write(&buf[written..]) { + written += n; + } + } + stream.flush().unwrap(); + stream.shutdown(net::Shutdown::Write).unwrap(); + + // force worker 2 to restart service once. + if counter == 2 { + panic!("panic on purpose") + } else { + Box::pin(async { Ok(()) }) + } + } + } + + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let counter = Arc::new(AtomicUsize::new(1)); + let h = thread::spawn(move || { + let counter = counter.clone(); + actix_rt::System::new().block_on(async { + let server = Server::build() + .disable_signals() + .bind("addr", addr, move || TestServiceFactory(counter.clone())) + .unwrap() + .workers(2) + .run(); + + let _ = tx.send((server.clone(), actix_rt::System::current())); + server.await + }) + }); + + let (server, sys) = rx.recv().unwrap(); + + sleep(Duration::from_secs(3)).await; + + let mut buf = [0; 8]; + + // worker 1 would not restart and return it's id consistently. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // worker 2 dead after return response. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("2", id); + stream.shutdown().await.unwrap(); + + // request to worker 1 + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarting and work goes to worker 1. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarted but worker 1 was still the next to accept connection. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 accept connection again but it's id is 3. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("3", id); + stream.shutdown().await.unwrap(); + + sys.stop(); + let _ = server.stop(false); + let _ = h.join().unwrap(); +}