From 10096175386e2c3a2c022bad0ab963001aa6f8c0 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 15 Jan 2021 20:12:20 +0800 Subject: [PATCH] revert actix_server::worker changes --- actix-server/src/worker.rs | 119 ++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 61 deletions(-) diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index eb18590a..91e98fc2 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -6,9 +6,9 @@ use std::task::{Context, Poll}; use std::time::Duration; use actix_rt::time::{sleep_until, Instant, Sleep}; +use actix_rt::{spawn, Arbiter}; use actix_utils::counter::Counter; use futures_core::future::LocalBoxFuture; -use futures_core::ready; use log::{error, info, trace}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; @@ -172,63 +172,56 @@ impl Worker { let avail = availability.clone(); // every worker runs in it's own arbiter. - std::thread::spawn(move || { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap() - .block_on(tokio::task::LocalSet::new().run_until(async { - availability.set(false); - let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { - rx, - rx2, - availability, - factories, - shutdown_timeout, - services: Vec::new(), - conns: conns.clone(), - state: WorkerState::Unavailable, - }); + Arbiter::new().send(Box::pin(async move { + availability.set(false); + let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { + rx, + rx2, + availability, + factories, + shutdown_timeout, + services: Vec::new(), + conns: conns.clone(), + state: WorkerState::Unavailable, + }); - let fut = wrk - .factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { - fut.await.map(|res| { - res.into_iter() - .map(|(t, s)| (idx, t, s)) - .collect::>() - }) - } + let fut = wrk + .factories + .iter() + .enumerate() + .map(|(idx, factory)| { + let fut = factory.create(); + async move { + fut.await.map(|r| { + r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() }) - .collect::>(); - - let res = join_all(fut) - .await - .into_iter() - .collect::, _>>(); - - match res { - Ok(res) => { - res.into_iter() - .flatten() - .for_each(|(factory, token, service)| { - assert_eq!(token.0, wrk.services.len()); - wrk.services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - }); - wrk.await - } - Err(e) => error!("Can not start worker: {:?}", e), } - })); - }); + }) + .collect::>(); + + spawn(async move { + let res: Result, _> = join_all(fut).await.into_iter().collect(); + match res { + Ok(services) => { + for item in services { + for (factory, token, service) in item { + assert_eq!(token.0, wrk.services.len()); + wrk.services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); + } + } + } + Err(e) => { + error!("Can not start worker: {:?}", e); + Arbiter::current().stop(); + } + } + wrk.await + }); + })); WorkerHandle::new(idx, tx1, tx2, avail) } @@ -367,8 +360,8 @@ impl Future for Worker { } }, WorkerState::Restarting(idx, token, ref mut fut) => { - match ready!(fut.as_mut().poll(cx)) { - Ok(item) => { + match fut.as_mut().poll(cx) { + Poll::Ready(Ok(item)) => { // only interest in the first item? if let Some((token, service)) = item.into_iter().next() { trace!( @@ -380,12 +373,13 @@ impl Future for Worker { return self.poll(cx); } } - Err(_) => { + Poll::Ready(Err(_)) => { panic!( "Can not restart {:?} service", self.factories[idx].name(token) ); } + Poll::Pending => return Poll::Pending, } self.poll(cx) } @@ -393,6 +387,7 @@ impl Future for Worker { let num = num_connections(); if num == 0 { let _ = tx.take().unwrap().send(true); + Arbiter::current().stop(); return Poll::Ready(()); } @@ -400,6 +395,7 @@ impl Future for Worker { if Pin::new(t2).poll(cx).is_ready() { let _ = tx.take().unwrap().send(false); self.shutdown(true); + Arbiter::current().stop(); return Poll::Ready(()); } @@ -434,15 +430,16 @@ impl Future for Worker { } } - match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + match Pin::new(&mut self.rx).poll_recv(cx) { // handle incoming io stream - Some(WorkerCommand(msg)) => { + Poll::Ready(Some(WorkerCommand(msg))) => { let guard = self.conns.get(); let _ = self.services[msg.token.0] .service .call((Some(guard), msg.io)); } - None => return Poll::Ready(()), + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(()), }; }, }