mirror of https://github.com/fafhrd91/actix-net
add test
This commit is contained in:
parent
462cc98c4a
commit
ac4b6dcd6a
|
@ -404,18 +404,16 @@ impl Future for ServerWorker {
|
|||
let factory_id = restart.factory_id;
|
||||
let token = restart.token;
|
||||
|
||||
let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| {
|
||||
let mut item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| {
|
||||
panic!(
|
||||
"Can not restart {:?} service",
|
||||
this.factories[factory_id].name(token)
|
||||
)
|
||||
});
|
||||
|
||||
// Only interest in the first item?
|
||||
let (token, service) = item
|
||||
.into_iter()
|
||||
.next()
|
||||
.expect("No BoxedServerService. Restarting can not progress");
|
||||
// Token should have a matching index with returned item vec.
|
||||
debug_assert_eq!(item.get(token.0).unwrap().0, token);
|
||||
let (token, service) = item.remove(token.0);
|
||||
|
||||
trace!(
|
||||
"Service {:?} has been restarted",
|
||||
|
|
|
@ -264,3 +264,148 @@ async fn test_max_concurrent_connections() {
|
|||
sys.stop();
|
||||
let _ = h.join().unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_service_restart() {
|
||||
use std::net::Shutdown;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_service::{fn_factory, Service};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
|
||||
struct TestService(Arc<AtomicUsize>);
|
||||
|
||||
impl Service<TcpStream> for TestService {
|
||||
type Response = ();
|
||||
type Error = ();
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
|
||||
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
let TestService(ref counter) = self;
|
||||
let c = counter.fetch_add(1, Ordering::SeqCst);
|
||||
// Force the service to restart on first readiness check.
|
||||
if c > 0 {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Ready(Err(()))
|
||||
}
|
||||
}
|
||||
|
||||
fn call(&self, _: TcpStream) -> Self::Future {
|
||||
Box::pin(async { Ok(()) })
|
||||
}
|
||||
}
|
||||
|
||||
let addr1 = unused_addr();
|
||||
let addr2 = unused_addr();
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let num = Arc::new(AtomicUsize::new(0));
|
||||
let num2 = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let num_clone = num.clone();
|
||||
let num2_clone = num2.clone();
|
||||
|
||||
let h = thread::spawn(move || {
|
||||
actix_rt::System::new().block_on(async {
|
||||
let server = Server::build()
|
||||
.backlog(1)
|
||||
.disable_signals()
|
||||
.configure(move |cfg| {
|
||||
let num = num.clone();
|
||||
let num2 = num2.clone();
|
||||
cfg.bind("addr1", addr1)
|
||||
.unwrap()
|
||||
.bind("addr2", addr2)
|
||||
.unwrap()
|
||||
.apply(move |rt| {
|
||||
let num = num.clone();
|
||||
let num2 = num2.clone();
|
||||
rt.service(
|
||||
"addr1",
|
||||
fn_factory(move || {
|
||||
let num = num.clone();
|
||||
async move { Ok::<_, ()>(TestService(num)) }
|
||||
}),
|
||||
);
|
||||
rt.service(
|
||||
"addr2",
|
||||
fn_factory(move || {
|
||||
let num2 = num2.clone();
|
||||
async move { Ok::<_, ()>(TestService(num2)) }
|
||||
}),
|
||||
);
|
||||
})
|
||||
})
|
||||
.unwrap()
|
||||
.workers(1)
|
||||
.run();
|
||||
|
||||
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
||||
server.await
|
||||
})
|
||||
});
|
||||
|
||||
let (server, sys) = rx.recv().unwrap();
|
||||
thread::sleep(time::Duration::from_millis(500));
|
||||
|
||||
for _ in 0..5 {
|
||||
let conn = net::TcpStream::connect(addr1).unwrap();
|
||||
conn.shutdown(Shutdown::Both).unwrap();
|
||||
let conn = net::TcpStream::connect(addr2).unwrap();
|
||||
conn.shutdown(Shutdown::Both).unwrap();
|
||||
}
|
||||
|
||||
thread::sleep(time::Duration::from_secs(1));
|
||||
|
||||
assert!(num_clone.load(Ordering::SeqCst) > 5);
|
||||
assert!(num2_clone.load(Ordering::SeqCst) > 5);
|
||||
|
||||
sys.stop();
|
||||
let _ = server.stop(false);
|
||||
let _ = h.join().unwrap();
|
||||
|
||||
let addr = unused_addr();
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let num = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
let num_clone = num.clone();
|
||||
|
||||
let h = thread::spawn(move || {
|
||||
let num = num.clone();
|
||||
actix_rt::System::new().block_on(async {
|
||||
let server = Server::build()
|
||||
.backlog(1)
|
||||
.disable_signals()
|
||||
.bind("addr", addr, move || {
|
||||
let num = num.clone();
|
||||
fn_factory(move || {
|
||||
let num = num.clone();
|
||||
async move { Ok::<_, ()>(TestService(num)) }
|
||||
})
|
||||
})
|
||||
.unwrap()
|
||||
.workers(1)
|
||||
.run();
|
||||
|
||||
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
||||
server.await
|
||||
})
|
||||
});
|
||||
|
||||
let (server, sys) = rx.recv().unwrap();
|
||||
thread::sleep(time::Duration::from_millis(500));
|
||||
|
||||
for _ in 0..5 {
|
||||
let conn = net::TcpStream::connect(addr).unwrap();
|
||||
conn.shutdown(Shutdown::Both).unwrap();
|
||||
}
|
||||
|
||||
thread::sleep(time::Duration::from_secs(1));
|
||||
|
||||
assert!(num_clone.load(Ordering::SeqCst) > 5);
|
||||
|
||||
sys.stop();
|
||||
let _ = server.stop(false);
|
||||
let _ = h.join().unwrap();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue