diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 2a1d9316..29fdff02 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -193,22 +193,67 @@ impl ServerWorkerConfig { } impl ServerWorker { + /// Start server worker in sync. pub(crate) fn start( + idx: usize, + factories: Vec>, + avail: WorkerAvailability, + config: ServerWorkerConfig, + ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { + let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1); + + let (tx1, tx2) = Self::_start(idx, factories, avail.clone(), config, move |err| { + factory_tx.send(err).unwrap() + })?; + + factory_rx + .recv() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? + .map(Err) + .unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail))) + } + + #[allow(dead_code)] + /// Start server worker in async. + pub(crate) fn start_non_blocking( + idx: usize, + factories: Vec>, + avail: WorkerAvailability, + config: ServerWorkerConfig, + ) -> impl Future> { + let (factory_tx, factory_rx) = oneshot::channel(); + + let res = Self::_start(idx, factories, avail.clone(), config, move |err| { + factory_tx.send(err).unwrap() + }); + + async move { + let (tx1, tx2) = res?; + factory_rx + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? + .map(Err) + .unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail))) + } + } + + fn _start( idx: usize, factories: Vec>, availability: WorkerAvailability, config: ServerWorkerConfig, - ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { + f: F, + ) -> io::Result<(UnboundedSender, UnboundedSender)> + where + F: FnOnce(Option) + Send + 'static, + { assert!(!availability.available()); - let avail = availability.clone(); - - let (tx1, rx) = unbounded_channel(); - let (tx2, rx2) = unbounded_channel(); // Try to get actix system when have one. let system = System::try_current(); - let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1); + let (tx1, rx) = unbounded_channel(); + let (tx2, rx2) = unbounded_channel(); // every worker runs in it's own thread. thread::Builder::new() @@ -223,38 +268,38 @@ impl ServerWorker { // use a custom tokio runtime builder to change the settings of runtime. let local = tokio::task::LocalSet::new(); - let res = tokio::runtime::Builder::new_current_thread() + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .max_blocking_threads(config.max_blocking_threads) - .build() - .and_then(|rt| { - local.block_on(&rt, async { - for (idx, factory) in factories.iter().enumerate() { - let service = factory.create().await.map_err(|_| { - io::Error::new( - io::ErrorKind::Other, - "Can not start worker service", - ) - })?; + .build(); + let res = rt.and_then(|rt| { + let fut = async { + for (idx, factory) in factories.iter().enumerate() { + let service = factory.create().await.map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "Can not start worker service", + ) + })?; - for (token, service) in service { - assert_eq!(token.0, services.len()); - services.push(WorkerService { - factory: idx, - service, - status: WorkerServiceStatus::Unavailable, - }) - } + for (token, service) in service { + assert_eq!(token.0, services.len()); + services.push(WorkerService { + factory: idx, + service, + status: WorkerServiceStatus::Unavailable, + }) } - Ok::<_, io::Error>(()) - })?; + } + Ok::<_, io::Error>(()) + }; - Ok(rt) - }); + local.block_on(&rt, fut).map(|_| rt) + }); match res { Ok(rt) => { - factory_tx.send(None).unwrap(); + f(None); let worker = ServerWorker { rx, @@ -269,16 +314,11 @@ impl ServerWorker { local.block_on(&rt, async { worker.await }); } - Err(e) => factory_tx.send(Some(e)).unwrap(), + Err(e) => f(Some(e)), } }) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - - factory_rx - .recv() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? - .map(Err) - .unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail))) + .map(|_| (tx1, tx2)) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } fn restart_service(&mut self, token: Token, factory_id: usize) {