Add conditional build on existing tokio runtime

This commit is contained in:
fakeshadow 2021-04-17 19:13:20 +08:00
parent 8b5dde997c
commit e91d33ba87
1 changed files with 89 additions and 55 deletions

View File

@ -251,38 +251,30 @@ impl ServerWorker {
{ {
assert!(!availability.available()); assert!(!availability.available());
// Try to get actix system when have one. // Try to get actix system.
let system = System::try_current(); let system = System::try_current();
// Try to get tokio runtime handle.
let rt_handle = tokio::runtime::Handle::try_current();
let (tx1, rx) = unbounded_channel(); let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel(); let (tx2, rx2) = unbounded_channel();
// every worker runs in it's own thread.
thread::Builder::new() thread::Builder::new()
.name(format!("actix-server-worker-{}", idx)) .name(format!("actix-server-worker-{}", idx))
.spawn(move || { .spawn(move || {
// conditionally setup actix system. // Conditionally setup actix system.
if let Some(system) = system { if let Some(system) = system {
System::set_current(system); System::set_current(system);
} }
// Prepare service construct future.
let fut = async {
let mut services = Vec::new(); let mut services = Vec::new();
// use a custom tokio runtime builder to change the settings of runtime.
let local = tokio::task::LocalSet::new();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build();
let res = rt.and_then(|rt| {
let fut = async {
for (idx, factory) in factories.iter().enumerate() { for (idx, factory) in factories.iter().enumerate() {
let service = factory.create().await.map_err(|_| { let service = factory.create().await.map_err(|_| {
io::Error::new( io::Error::new(io::ErrorKind::Other, "Can not start worker service")
io::ErrorKind::Other,
"Can not start worker service",
)
})?; })?;
for (token, service) in service { for (token, service) in service {
@ -294,14 +286,54 @@ impl ServerWorker {
}) })
} }
} }
Ok::<_, io::Error>(()) Ok::<_, io::Error>(services)
}; };
local.block_on(&rt, fut).map(|_| rt) // All future runs in a LocalSet for being able to run !Send future.
let local = tokio::task::LocalSet::new();
match rt_handle {
Ok(handle) => {
// Use existing tokio runtime with handle.
let res = handle.block_on(local.run_until(fut));
match res {
Ok(services) => {
f(None);
let worker = ServerWorker {
rx,
rx2,
services: services.into_boxed_slice(),
availability,
conns: Counter::new(config.max_concurrent_connections),
factories,
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
};
let worker = local.spawn_local(worker);
handle.block_on(local.run_until(async {
let _ = worker.await;
}));
}
Err(e) => f(Some(e)),
}
}
Err(_) => {
// No existing tokio runtime found. Start new runtime.
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build();
let res = rt.and_then(|rt| {
local.block_on(&rt, fut).map(|services| (services, rt))
}); });
match res { match res {
Ok(rt) => { Ok((services, rt)) => {
f(None); f(None);
let worker = ServerWorker { let worker = ServerWorker {
@ -323,6 +355,8 @@ impl ServerWorker {
} }
Err(e) => f(Some(e)), Err(e) => f(Some(e)),
} }
}
}
}) })
.map(|_| (tx1, tx2)) .map(|_| (tx1, tx2))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) .map_err(|e| io::Error::new(io::ErrorKind::Other, e))