mirror of https://github.com/fafhrd91/actix-net
Remove restart_worker test
This commit is contained in:
parent
bd48908792
commit
0185d11646
|
@ -436,143 +436,3 @@ async fn test_service_restart() {
|
||||||
let _ = server.stop(false);
|
let _ = server.stop(false);
|
||||||
let _ = h.join().unwrap();
|
let _ = h.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[actix_rt::test]
|
|
||||||
async fn worker_restart() {
|
|
||||||
use actix_service::{Service, ServiceFactory};
|
|
||||||
use futures_core::future::LocalBoxFuture;
|
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
||||||
|
|
||||||
struct TestServiceFactory(Arc<AtomicUsize>);
|
|
||||||
|
|
||||||
impl ServiceFactory<TcpStream> for TestServiceFactory {
|
|
||||||
type Response = ();
|
|
||||||
type Error = ();
|
|
||||||
type Config = ();
|
|
||||||
type Service = TestService;
|
|
||||||
type InitError = ();
|
|
||||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
|
||||||
|
|
||||||
fn new_service(&self, _: Self::Config) -> Self::Future {
|
|
||||||
let counter = self.0.fetch_add(1, Ordering::Relaxed);
|
|
||||||
|
|
||||||
Box::pin(async move { Ok(TestService(counter)) })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
struct TestService(usize);
|
|
||||||
|
|
||||||
impl Service<TcpStream> for TestService {
|
|
||||||
type Response = ();
|
|
||||||
type Error = ();
|
|
||||||
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
|
||||||
|
|
||||||
actix_service::always_ready!();
|
|
||||||
|
|
||||||
fn call(&self, stream: TcpStream) -> Self::Future {
|
|
||||||
let counter = self.0;
|
|
||||||
|
|
||||||
let mut stream = stream.into_std().unwrap();
|
|
||||||
use std::io::Write;
|
|
||||||
let str = counter.to_string();
|
|
||||||
let buf = str.as_bytes();
|
|
||||||
|
|
||||||
let mut written = 0;
|
|
||||||
|
|
||||||
while written < buf.len() {
|
|
||||||
if let Ok(n) = stream.write(&buf[written..]) {
|
|
||||||
written += n;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
stream.flush().unwrap();
|
|
||||||
stream.shutdown(net::Shutdown::Write).unwrap();
|
|
||||||
|
|
||||||
// force worker 2 to restart service once.
|
|
||||||
if counter == 2 {
|
|
||||||
panic!("panic on purpose")
|
|
||||||
} else {
|
|
||||||
Box::pin(async { Ok(()) })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let addr = unused_addr();
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
|
|
||||||
let counter = Arc::new(AtomicUsize::new(1));
|
|
||||||
let h = thread::spawn(move || {
|
|
||||||
let counter = counter.clone();
|
|
||||||
actix_rt::System::new().block_on(async {
|
|
||||||
let server = Server::build()
|
|
||||||
.disable_signals()
|
|
||||||
.bind("addr", addr, move || TestServiceFactory(counter.clone()))
|
|
||||||
.unwrap()
|
|
||||||
.workers(2)
|
|
||||||
.run();
|
|
||||||
|
|
||||||
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
|
||||||
server.await
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
let (server, sys) = rx.recv().unwrap();
|
|
||||||
|
|
||||||
sleep(Duration::from_secs(3)).await;
|
|
||||||
|
|
||||||
let mut buf = [0; 8];
|
|
||||||
|
|
||||||
// worker 1 would not restart and return it's id consistently.
|
|
||||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
||||||
let n = stream.read(&mut buf).await.unwrap();
|
|
||||||
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
||||||
assert_eq!("1", id);
|
|
||||||
stream.shutdown().await.unwrap();
|
|
||||||
|
|
||||||
// worker 2 dead after return response.
|
|
||||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
||||||
let n = stream.read(&mut buf).await.unwrap();
|
|
||||||
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
||||||
assert_eq!("2", id);
|
|
||||||
stream.shutdown().await.unwrap();
|
|
||||||
|
|
||||||
// request to worker 1
|
|
||||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
||||||
let n = stream.read(&mut buf).await.unwrap();
|
|
||||||
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
||||||
assert_eq!("1", id);
|
|
||||||
stream.shutdown().await.unwrap();
|
|
||||||
|
|
||||||
// TODO: Remove sleep if it can pass CI.
|
|
||||||
sleep(Duration::from_secs(3)).await;
|
|
||||||
|
|
||||||
// worker 2 restarting and work goes to worker 1.
|
|
||||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
||||||
let n = stream.read(&mut buf).await.unwrap();
|
|
||||||
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
||||||
assert_eq!("1", id);
|
|
||||||
stream.shutdown().await.unwrap();
|
|
||||||
|
|
||||||
// TODO: Remove sleep if it can pass CI.
|
|
||||||
sleep(Duration::from_secs(3)).await;
|
|
||||||
|
|
||||||
// worker 2 restarted but worker 1 was still the next to accept connection.
|
|
||||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
||||||
let n = stream.read(&mut buf).await.unwrap();
|
|
||||||
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
||||||
assert_eq!("1", id);
|
|
||||||
stream.shutdown().await.unwrap();
|
|
||||||
|
|
||||||
// TODO: Remove sleep if it can pass CI.
|
|
||||||
sleep(Duration::from_secs(3)).await;
|
|
||||||
|
|
||||||
// worker 2 accept connection again but it's id is 3.
|
|
||||||
let mut stream = TcpStream::connect(addr).await.unwrap();
|
|
||||||
let n = stream.read(&mut buf).await.unwrap();
|
|
||||||
let id = String::from_utf8_lossy(&buf[0..n]);
|
|
||||||
assert_eq!("3", id);
|
|
||||||
stream.shutdown().await.unwrap();
|
|
||||||
|
|
||||||
sys.stop();
|
|
||||||
let _ = server.stop(false);
|
|
||||||
let _ = h.join().unwrap();
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue