From e91d33ba87a2ee990539d61e66de3199b8dc3f36 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 17 Apr 2021 19:13:20 +0800 Subject: [PATCH] Add conditional build on existing tokio runtime --- actix-server/src/worker.rs | 144 +++++++++++++++++++++++-------------- 1 file changed, 89 insertions(+), 55 deletions(-) diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 9f9bd48f..87628858 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -251,77 +251,111 @@ impl ServerWorker { { assert!(!availability.available()); - // Try to get actix system when have one. + // Try to get actix system. let system = System::try_current(); + // Try to get tokio runtime handle. + let rt_handle = tokio::runtime::Handle::try_current(); + let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); - // every worker runs in it's own thread. thread::Builder::new() .name(format!("actix-server-worker-{}", idx)) .spawn(move || { - // conditionally setup actix system. + // Conditionally setup actix system. if let Some(system) = system { System::set_current(system); } - let mut services = Vec::new(); + // Prepare service construct future. + let fut = async { + let mut services = Vec::new(); - // use a custom tokio runtime builder to change the settings of runtime. - let local = tokio::task::LocalSet::new(); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build(); + for (idx, factory) in factories.iter().enumerate() { + let service = factory.create().await.map_err(|_| { + io::Error::new(io::ErrorKind::Other, "Can not start worker service") + })?; - let res = rt.and_then(|rt| { - let fut = async { - for (idx, factory) in factories.iter().enumerate() { - let service = factory.create().await.map_err(|_| { - io::Error::new( - io::ErrorKind::Other, - "Can not start worker service", - ) - })?; - - for (token, service) in service { - assert_eq!(token.0, services.len()); - services.push(WorkerService { - factory: idx, - service, - status: WorkerServiceStatus::Unavailable, - }) - } + for (token, service) in service { + assert_eq!(token.0, services.len()); + services.push(WorkerService { + factory: idx, + service, + status: WorkerServiceStatus::Unavailable, + }) + } + } + Ok::<_, io::Error>(services) + }; + + // All future runs in a LocalSet for being able to run !Send future. + let local = tokio::task::LocalSet::new(); + + match rt_handle { + Ok(handle) => { + // Use existing tokio runtime with handle. + let res = handle.block_on(local.run_until(fut)); + + match res { + Ok(services) => { + f(None); + + let worker = ServerWorker { + rx, + rx2, + services: services.into_boxed_slice(), + availability, + conns: Counter::new(config.max_concurrent_connections), + factories, + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, + }; + + let worker = local.spawn_local(worker); + + handle.block_on(local.run_until(async { + let _ = worker.await; + })); + } + Err(e) => f(Some(e)), + } + } + Err(_) => { + // No existing tokio runtime found. Start new runtime. + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build(); + + let res = rt.and_then(|rt| { + local.block_on(&rt, fut).map(|services| (services, rt)) + }); + + match res { + Ok((services, rt)) => { + f(None); + + let worker = ServerWorker { + rx, + rx2, + services: services.into_boxed_slice(), + availability, + conns: Counter::new(config.max_concurrent_connections), + factories, + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, + }; + + let handle = local.spawn_local(worker); + + local.block_on(&rt, async { + let _ = handle.await; + }); + } + Err(e) => f(Some(e)), } - Ok::<_, io::Error>(()) - }; - - local.block_on(&rt, fut).map(|_| rt) - }); - - match res { - Ok(rt) => { - f(None); - - let worker = ServerWorker { - rx, - rx2, - services: services.into_boxed_slice(), - availability, - conns: Counter::new(config.max_concurrent_connections), - factories, - state: Default::default(), - shutdown_timeout: config.shutdown_timeout, - }; - - let handle = local.spawn_local(worker); - - local.block_on(&rt, async { - let _ = handle.await; - }); } - Err(e) => f(Some(e)), } }) .map(|_| (tx1, tx2))