revert actix_server::worker changes

This commit is contained in:
fakeshadow 2021-01-15 20:12:20 +08:00
parent 409c63f948
commit 1009617538
1 changed files with 58 additions and 61 deletions

View File

@ -6,9 +6,9 @@ use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use actix_rt::time::{sleep_until, Instant, Sleep}; use actix_rt::time::{sleep_until, Instant, Sleep};
use actix_rt::{spawn, Arbiter};
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use futures_core::ready;
use log::{error, info, trace}; use log::{error, info, trace};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot; use tokio::sync::oneshot;
@ -172,63 +172,56 @@ impl Worker {
let avail = availability.clone(); let avail = availability.clone();
// every worker runs in it's own arbiter. // every worker runs in it's own arbiter.
std::thread::spawn(move || { Arbiter::new().send(Box::pin(async move {
tokio::runtime::Builder::new_current_thread() availability.set(false);
.enable_all() let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
.build() rx,
.unwrap() rx2,
.block_on(tokio::task::LocalSet::new().run_until(async { availability,
availability.set(false); factories,
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { shutdown_timeout,
rx, services: Vec::new(),
rx2, conns: conns.clone(),
availability, state: WorkerState::Unavailable,
factories, });
shutdown_timeout,
services: Vec::new(),
conns: conns.clone(),
state: WorkerState::Unavailable,
});
let fut = wrk let fut = wrk
.factories .factories
.iter() .iter()
.enumerate() .enumerate()
.map(|(idx, factory)| { .map(|(idx, factory)| {
let fut = factory.create(); let fut = factory.create();
async move { async move {
fut.await.map(|res| { fut.await.map(|r| {
res.into_iter() r.into_iter().map(|(t, s)| (idx, t, s)).collect::<Vec<_>>()
.map(|(t, s)| (idx, t, s))
.collect::<Vec<_>>()
})
}
}) })
.collect::<Vec<_>>();
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
match res {
Ok(res) => {
res.into_iter()
.flatten()
.for_each(|(factory, token, service)| {
assert_eq!(token.0, wrk.services.len());
wrk.services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
});
wrk.await
}
Err(e) => error!("Can not start worker: {:?}", e),
} }
})); })
}); .collect::<Vec<_>>();
spawn(async move {
let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect();
match res {
Ok(services) => {
for item in services {
for (factory, token, service) in item {
assert_eq!(token.0, wrk.services.len());
wrk.services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
}
}
}
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
}
}
wrk.await
});
}));
WorkerHandle::new(idx, tx1, tx2, avail) WorkerHandle::new(idx, tx1, tx2, avail)
} }
@ -367,8 +360,8 @@ impl Future for Worker {
} }
}, },
WorkerState::Restarting(idx, token, ref mut fut) => { WorkerState::Restarting(idx, token, ref mut fut) => {
match ready!(fut.as_mut().poll(cx)) { match fut.as_mut().poll(cx) {
Ok(item) => { Poll::Ready(Ok(item)) => {
// only interest in the first item? // only interest in the first item?
if let Some((token, service)) = item.into_iter().next() { if let Some((token, service)) = item.into_iter().next() {
trace!( trace!(
@ -380,12 +373,13 @@ impl Future for Worker {
return self.poll(cx); return self.poll(cx);
} }
} }
Err(_) => { Poll::Ready(Err(_)) => {
panic!( panic!(
"Can not restart {:?} service", "Can not restart {:?} service",
self.factories[idx].name(token) self.factories[idx].name(token)
); );
} }
Poll::Pending => return Poll::Pending,
} }
self.poll(cx) self.poll(cx)
} }
@ -393,6 +387,7 @@ impl Future for Worker {
let num = num_connections(); let num = num_connections();
if num == 0 { if num == 0 {
let _ = tx.take().unwrap().send(true); let _ = tx.take().unwrap().send(true);
Arbiter::current().stop();
return Poll::Ready(()); return Poll::Ready(());
} }
@ -400,6 +395,7 @@ impl Future for Worker {
if Pin::new(t2).poll(cx).is_ready() { if Pin::new(t2).poll(cx).is_ready() {
let _ = tx.take().unwrap().send(false); let _ = tx.take().unwrap().send(false);
self.shutdown(true); self.shutdown(true);
Arbiter::current().stop();
return Poll::Ready(()); return Poll::Ready(());
} }
@ -434,15 +430,16 @@ impl Future for Worker {
} }
} }
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { match Pin::new(&mut self.rx).poll_recv(cx) {
// handle incoming io stream // handle incoming io stream
Some(WorkerCommand(msg)) => { Poll::Ready(Some(WorkerCommand(msg))) => {
let guard = self.conns.get(); let guard = self.conns.get();
let _ = self.services[msg.token.0] let _ = self.services[msg.token.0]
.service .service
.call((Some(guard), msg.io)); .call((Some(guard), msg.io));
} }
None => return Poll::Ready(()), Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
}; };
}, },
} }