diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 84100b56..5f7ae931 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -72,10 +72,12 @@ impl Accept { let availability = WorkerAvailability::new(waker_queue.clone()); let factories = builder.services.iter().map(|v| v.clone_factory()).collect(); let handle = - ServerWorker::start(idx, factories, availability, builder.worker_config); + ServerWorker::start(idx, factories, availability, builder.worker_config)?; let handle_clone = (idx, handle.clone()); - (handle, handle_clone) + Ok((handle, handle_clone)) }) + .collect::, io::Error>>()? + .into_iter() .unzip(); let wake_queue_clone = waker_queue.clone(); diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 639e2ecd..0a7f92b4 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -24,10 +24,6 @@ pub use self::test_server::TestServer; #[doc(hidden)] pub use self::socket::FromStream; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - /// Socket ID token #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub(crate) struct Token(usize); @@ -54,74 +50,3 @@ impl Token { pub fn new() -> ServerBuilder { ServerBuilder::default() } - -// a poor man's join future. joined future is only used when starting/stopping the server. -// pin_project and pinned futures are overkill for this task. -pub(crate) struct JoinAll { - fut: Vec>, -} - -pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAll { - let fut = fut - .into_iter() - .map(|f| JoinFuture::Future(Box::pin(f))) - .collect(); - - JoinAll { fut } -} - -enum JoinFuture { - Future(Pin>>), - Result(Option), -} - -impl Unpin for JoinAll {} - -impl Future for JoinAll { - type Output = Vec; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut ready = true; - - let this = self.get_mut(); - for fut in this.fut.iter_mut() { - if let JoinFuture::Future(f) = fut { - match f.as_mut().poll(cx) { - Poll::Ready(t) => { - *fut = JoinFuture::Result(Some(t)); - } - Poll::Pending => ready = false, - } - } - } - - if ready { - let mut res = Vec::new(); - for fut in this.fut.iter_mut() { - if let JoinFuture::Result(f) = fut { - res.push(f.take().unwrap()); - } - } - - Poll::Ready(res) - } else { - Poll::Pending - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - use actix_utils::future::ready; - - #[actix_rt::test] - async fn test_join_all() { - let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; - let mut res = join_all(futs).await.into_iter(); - assert_eq!(Ok(1), res.next().unwrap()); - assert_eq!(Err(3), res.next().unwrap()); - assert_eq!(Ok(9), res.next().unwrap()); - } -} diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 38d218b8..07128dda 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -203,15 +203,17 @@ impl Server { let availability = WorkerAvailability::new(self.waker_queue.clone()); let factories = self.services.iter().map(|v| v.clone_factory()).collect(); - let handle = ServerWorker::start( + let res = ServerWorker::start( new_idx, factories, availability, self.worker_config, ); - self.handles.push((new_idx, handle.clone())); - self.waker_queue.wake(WakerInterest::Worker(handle)); + if let Ok(handle) = res { + self.handles.push((new_idx, handle.clone())); + self.waker_queue.wake(WakerInterest::Worker(handle)); + } } None } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 09b88983..6229407d 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,8 +1,10 @@ use std::future::Future; +use std::io; use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; +use std::thread; use std::time::Duration; use actix_rt::{ @@ -18,7 +20,7 @@ use tokio::sync::oneshot; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::{join_all, Token}; +use crate::Token; pub(crate) struct WorkerCommand(Conn); @@ -194,7 +196,7 @@ impl ServerWorker { factories: Vec>, availability: WorkerAvailability, config: ServerWorkerConfig, - ) -> WorkerHandle { + ) -> io::Result { let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); let avail = availability.clone(); @@ -204,23 +206,17 @@ impl ServerWorker { // Try to get actix system when have one. let system = System::try_current(); + let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1); + // every worker runs in it's own thread. - // use a custom tokio runtime builder to change the settings of runtime. - std::thread::spawn(move || { - // conditionally setup actix system. - if let Some(system) = system { - System::set_current(system); - } + thread::Builder::new() + .name(format!("actix-server-worker-{}", idx)) + .spawn(move || { + // conditionally setup actix system. + if let Some(system) = system { + System::set_current(system); + } - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap(); - - let local = tokio::task::LocalSet::new(); - - rt.block_on(local.run_until(async move { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { rx, rx2, @@ -231,46 +227,53 @@ impl ServerWorker { conns: conns.clone(), state: WorkerState::Unavailable, }); - let fut = wrk - .factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { - fut.await.map(|r| { - r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() - }) - } - }) - .collect::>(); - let res = join_all(fut) - .await - .into_iter() - .collect::, _>>(); - match res { - Ok(services) => { - for item in services { - for (factory, token, service) in item { - assert_eq!(token.0, wrk.services.len()); - wrk.services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); + // use a custom tokio runtime builder to change the settings of runtime. + let local = tokio::task::LocalSet::new(); + let res = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .and_then(|rt| { + local.block_on(&rt, async { + for (idx, factory) in wrk.factories.iter().enumerate() { + let service = factory.create().await.map_err(|_| { + io::Error::new( + io::ErrorKind::Other, + "Can not start worker service", + ) + })?; + + for (token, service) in service { + assert_eq!(token.0, wrk.services.len()); + wrk.services.push(WorkerService { + factory: idx, + service, + status: WorkerServiceStatus::Unavailable, + }) + } } - } - } - Err(e) => { - error!("Can not start worker: {:?}", e); - } - } - wrk.await - })) - }); + Ok::<_, io::Error>(()) + })?; - WorkerHandle::new(idx, tx1, tx2, avail) + Ok(rt) + }); + + match res { + Ok(rt) => { + factory_tx.send(None).unwrap(); + local.block_on(&rt, wrk) + } + Err(e) => factory_tx.send(Some(e)).unwrap(), + } + }) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + factory_rx + .recv() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? + .map(Err) + .unwrap_or_else(|| Ok(WorkerHandle::new(idx, tx1, tx2, avail))) } fn restart_service(&mut self, token: Token, idx: usize) {