run test servers in io-uring rt friendly way

This commit is contained in:
Rob Ede 2021-11-21 23:11:14 +00:00
parent 452331b745
commit 1f28bea3a5
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
2 changed files with 169 additions and 143 deletions

View File

@ -66,12 +66,13 @@ pub async fn test_server_with_addr<F: ServiceFactory<TcpStream>>(
// run server in separate thread // run server in separate thread
thread::spawn(move || { thread::spawn(move || {
let sys = System::new(); System::new().block_on(async move {
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();
let srv = Server::build() let srv = Server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.system_exit()
.listen("test", tcp, factory) .listen("test", tcp, factory)
.expect("test server could not be created"); .expect("test server could not be created");
@ -81,10 +82,8 @@ pub async fn test_server_with_addr<F: ServiceFactory<TcpStream>>(
.unwrap(); .unwrap();
// drive server loop // drive server loop
sys.block_on(srv).unwrap(); srv.await.unwrap();
});
// start system event loop
sys.run().unwrap();
// notify TestServer that server and system have shut down // notify TestServer that server and system have shut down
// all thread managed resources should be dropped at this point // all thread managed resources should be dropped at this point

View File

@ -146,19 +146,23 @@ where
// run server in separate orphaned thread // run server in separate orphaned thread
thread::spawn(move || { thread::spawn(move || {
let sys = rt::System::new(); rt::System::new().block_on(async move {
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap(); let local_addr = tcp.local_addr().unwrap();
let factory = factory.clone(); let factory = factory.clone();
let srv_cfg = cfg.clone(); let srv_cfg = cfg.clone();
let timeout = cfg.client_timeout; let timeout = cfg.client_timeout;
let builder = Server::build().workers(1).disable_signals().system_exit(); let builder = Server::build().workers(1).disable_signals().system_exit();
let srv = match srv_cfg.stream { let srv = match srv_cfg.stream {
StreamType::Tcp => match srv_cfg.tp { StreamType::Tcp => match srv_cfg.tp {
HttpVer::Http1 => builder.listen("test", tcp, move || { HttpVer::Http1 => builder.listen("test", tcp, move || {
let app_cfg = let app_cfg = AppConfig::__priv_test_new(
AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); false,
local_addr.to_string(),
local_addr,
);
let fac = factory() let fac = factory()
.into_factory() .into_factory()
@ -170,8 +174,11 @@ where
.tcp() .tcp()
}), }),
HttpVer::Http2 => builder.listen("test", tcp, move || { HttpVer::Http2 => builder.listen("test", tcp, move || {
let app_cfg = let app_cfg = AppConfig::__priv_test_new(
AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); false,
local_addr.to_string(),
local_addr,
);
let fac = factory() let fac = factory()
.into_factory() .into_factory()
@ -183,8 +190,11 @@ where
.tcp() .tcp()
}), }),
HttpVer::Both => builder.listen("test", tcp, move || { HttpVer::Both => builder.listen("test", tcp, move || {
let app_cfg = let app_cfg = AppConfig::__priv_test_new(
AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); false,
local_addr.to_string(),
local_addr,
);
let fac = factory() let fac = factory()
.into_factory() .into_factory()
@ -199,8 +209,11 @@ where
#[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
StreamType::Openssl(acceptor) => match cfg.tp { StreamType::Openssl(acceptor) => match cfg.tp {
HttpVer::Http1 => builder.listen("test", tcp, move || { HttpVer::Http1 => builder.listen("test", tcp, move || {
let app_cfg = let app_cfg = AppConfig::__priv_test_new(
AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); false,
local_addr.to_string(),
local_addr,
);
let fac = factory() let fac = factory()
.into_factory() .into_factory()
@ -212,8 +225,11 @@ where
.openssl(acceptor.clone()) .openssl(acceptor.clone())
}), }),
HttpVer::Http2 => builder.listen("test", tcp, move || { HttpVer::Http2 => builder.listen("test", tcp, move || {
let app_cfg = let app_cfg = AppConfig::__priv_test_new(
AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); false,
local_addr.to_string(),
local_addr,
);
let fac = factory() let fac = factory()
.into_factory() .into_factory()
@ -225,8 +241,11 @@ where
.openssl(acceptor.clone()) .openssl(acceptor.clone())
}), }),
HttpVer::Both => builder.listen("test", tcp, move || { HttpVer::Both => builder.listen("test", tcp, move || {
let app_cfg = let app_cfg = AppConfig::__priv_test_new(
AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); false,
local_addr.to_string(),
local_addr,
);
let fac = factory() let fac = factory()
.into_factory() .into_factory()
@ -241,8 +260,11 @@ where
#[cfg(feature = "rustls")] #[cfg(feature = "rustls")]
StreamType::Rustls(config) => match cfg.tp { StreamType::Rustls(config) => match cfg.tp {
HttpVer::Http1 => builder.listen("test", tcp, move || { HttpVer::Http1 => builder.listen("test", tcp, move || {
let app_cfg = let app_cfg = AppConfig::__priv_test_new(
AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); false,
local_addr.to_string(),
local_addr,
);
let fac = factory() let fac = factory()
.into_factory() .into_factory()
@ -254,8 +276,11 @@ where
.rustls(config.clone()) .rustls(config.clone())
}), }),
HttpVer::Http2 => builder.listen("test", tcp, move || { HttpVer::Http2 => builder.listen("test", tcp, move || {
let app_cfg = let app_cfg = AppConfig::__priv_test_new(
AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); false,
local_addr.to_string(),
local_addr,
);
let fac = factory() let fac = factory()
.into_factory() .into_factory()
@ -267,8 +292,11 @@ where
.rustls(config.clone()) .rustls(config.clone())
}), }),
HttpVer::Both => builder.listen("test", tcp, move || { HttpVer::Both => builder.listen("test", tcp, move || {
let app_cfg = let app_cfg = AppConfig::__priv_test_new(
AppConfig::__priv_test_new(false, local_addr.to_string(), local_addr); false,
local_addr.to_string(),
local_addr,
);
let fac = factory() let fac = factory()
.into_factory() .into_factory()
@ -289,13 +317,12 @@ where
.unwrap(); .unwrap();
// drive server loop // drive server loop
sys.block_on(srv).unwrap(); srv.await.unwrap();
// start system event loop
sys.run().unwrap();
// notify TestServer that server and system have shut down // notify TestServer that server and system have shut down
// all thread managed resources should be dropped at this point // all thread managed resources should be dropped at this point
});
let _ = thread_stop_tx.send(()); let _ = thread_stop_tx.send(());
}); });