Reduce ServerWorker size

This commit is contained in:
fakeshadow 2021-04-07 10:01:01 +08:00
parent 5961eb892e
commit edbc7564b7
1 changed files with 38 additions and 34 deletions

View File

@ -116,10 +116,10 @@ impl WorkerAvailability {
pub(crate) struct ServerWorker {
rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>,
services: Vec<WorkerService>,
services: Box<[WorkerService]>,
availability: WorkerAvailability,
conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>,
factories: Box<[Box<dyn InternalServiceFactory>]>,
state: WorkerState,
shutdown_timeout: Duration,
}
@ -188,6 +188,8 @@ impl ServerWorker {
availability: WorkerAvailability,
config: ServerWorkerConfig,
) -> WorkerHandle {
assert!(!availability.available());
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let avail = availability.clone();
@ -202,20 +204,7 @@ impl ServerWorker {
.unwrap()
})
.spawn(async move {
availability.set(false);
let mut wrk = ServerWorker {
rx,
rx2,
services: Vec::new(),
availability,
conns: Counter::new(config.max_concurrent_connections),
factories,
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
};
let fut = wrk
.factories
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
@ -228,29 +217,44 @@ impl ServerWorker {
})
.collect::<Vec<_>>();
// a second spawn to make sure worker future runs as non boxed future.
// As Arbiter::spawn would box the future before send it to arbiter.
// a second spawn to run !Send future tasks.
spawn(async move {
let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect();
match res {
Ok(services) => {
for item in services {
for (factory, token, service) in item {
assert_eq!(token.0, wrk.services.len());
wrk.services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
}
}
}
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.flatten()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token.0, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
return;
}
}
wrk.await
};
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
rx,
rx2,
services,
availability,
conns: Counter::new(config.max_concurrent_connections),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
});
});
});