Do not panic in ServerWorker::start method

This commit is contained in:
fakeshadow 2021-04-05 02:18:47 +08:00
parent d533ca38b0
commit eae7afecf3
4 changed files with 66 additions and 134 deletions

View File

@ -72,10 +72,12 @@ impl Accept {
let availability = WorkerAvailability::new(waker_queue.clone());
let factories = builder.services.iter().map(|v| v.clone_factory()).collect();
let handle =
ServerWorker::start(idx, factories, availability, builder.worker_config);
ServerWorker::start(idx, factories, availability, builder.worker_config)?;
let handle_clone = (idx, handle.clone());
(handle, handle_clone)
Ok((handle, handle_clone))
})
.collect::<Result<Vec<_>, io::Error>>()?
.into_iter()
.unzip();
let wake_queue_clone = waker_queue.clone();

View File

@ -24,10 +24,6 @@ pub use self::test_server::TestServer;
#[doc(hidden)]
pub use self::socket::FromStream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Socket ID token
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Token(usize);
@ -54,74 +50,3 @@ impl Token {
pub fn new() -> ServerBuilder {
ServerBuilder::default()
}
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAll<T> {
fut: Vec<JoinFuture<T>>,
}
pub(crate) fn join_all<T>(fut: Vec<impl Future<Output = T> + 'static>) -> JoinAll<T> {
let fut = fut
.into_iter()
.map(|f| JoinFuture::Future(Box::pin(f)))
.collect();
JoinAll { fut }
}
enum JoinFuture<T> {
Future(Pin<Box<dyn Future<Output = T>>>),
Result(Option<T>),
}
impl<T> Unpin for JoinAll<T> {}
impl<T> Future for JoinAll<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready = true;
let this = self.get_mut();
for fut in this.fut.iter_mut() {
if let JoinFuture::Future(f) = fut {
match f.as_mut().poll(cx) {
Poll::Ready(t) => {
*fut = JoinFuture::Result(Some(t));
}
Poll::Pending => ready = false,
}
}
}
if ready {
let mut res = Vec::new();
for fut in this.fut.iter_mut() {
if let JoinFuture::Result(f) = fut {
res.push(f.take().unwrap());
}
}
Poll::Ready(res)
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod test {
use super::*;
use actix_utils::future::ready;
#[actix_rt::test]
async fn test_join_all() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
}

View File

@ -203,15 +203,17 @@ impl Server {
let availability = WorkerAvailability::new(self.waker_queue.clone());
let factories = self.services.iter().map(|v| v.clone_factory()).collect();
let handle = ServerWorker::start(
let res = ServerWorker::start(
new_idx,
factories,
availability,
self.worker_config,
);
self.handles.push((new_idx, handle.clone()));
self.waker_queue.wake(WakerInterest::Worker(handle));
if let Ok(handle) = res {
self.handles.push((new_idx, handle.clone()));
self.waker_queue.wake(WakerInterest::Worker(handle));
}
}
None
}

View File

@ -1,8 +1,10 @@
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use actix_rt::{
@ -18,7 +20,7 @@ use tokio::sync::oneshot;
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::{join_all, Token};
use crate::Token;
pub(crate) struct WorkerCommand(Conn);
@ -194,7 +196,7 @@ impl ServerWorker {
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
config: ServerWorkerConfig,
) -> WorkerHandle {
) -> io::Result<WorkerHandle> {
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let avail = availability.clone();
@ -204,23 +206,17 @@ impl ServerWorker {
// Try to get actix system when have one.
let system = System::try_current();
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1);
// every worker runs in it's own thread.
// use a custom tokio runtime builder to change the settings of runtime.
std::thread::spawn(move || {
// conditionally setup actix system.
if let Some(system) = system {
System::set_current(system);
}
thread::Builder::new()
.name(format!("actix-server-worker-{}", idx))
.spawn(move || {
// conditionally setup actix system.
if let Some(system) = system {
System::set_current(system);
}
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap();
let local = tokio::task::LocalSet::new();
rt.block_on(local.run_until(async move {
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
rx,
rx2,
@ -231,46 +227,53 @@ impl ServerWorker {
conns: conns.clone(),
state: WorkerState::Unavailable,
});
let fut = wrk
.factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move {
fut.await.map(|r| {
r.into_iter().map(|(t, s)| (idx, t, s)).collect::<Vec<_>>()
})
}
})
.collect::<Vec<_>>();
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
match res {
Ok(services) => {
for item in services {
for (factory, token, service) in item {
assert_eq!(token.0, wrk.services.len());
wrk.services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
// use a custom tokio runtime builder to change the settings of runtime.
let local = tokio::task::LocalSet::new();
let res = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.and_then(|rt| {
local.block_on(&rt, async {
for (idx, factory) in wrk.factories.iter().enumerate() {
let service = factory.create().await.map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"Can not start worker service",
)
})?;
for (token, service) in service {
assert_eq!(token.0, wrk.services.len());
wrk.services.push(WorkerService {
factory: idx,
service,
status: WorkerServiceStatus::Unavailable,
})
}
}
}
}
Err(e) => {
error!("Can not start worker: {:?}", e);
}
}
wrk.await
}))
});
Ok::<_, io::Error>(())
})?;
WorkerHandle::new(idx, tx1, tx2, avail)
Ok(rt)
});
match res {
Ok(rt) => {
factory_tx.send(None).unwrap();
local.block_on(&rt, wrk)
}
Err(e) => factory_tx.send(Some(e)).unwrap(),
}
})
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
factory_rx
.recv()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.map(Err)
.unwrap_or_else(|| Ok(WorkerHandle::new(idx, tx1, tx2, avail)))
}
fn restart_service(&mut self, token: Token, idx: usize) {