add a non_blocking method for start worker

This commit is contained in:
fakeshadow 2021-04-13 09:10:00 +08:00
parent 42c53159da
commit 2a5fd067f4
1 changed files with 78 additions and 38 deletions

View File

@ -193,22 +193,67 @@ impl ServerWorkerConfig {
}
impl ServerWorker {
/// Start server worker in sync.
pub(crate) fn start(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
avail: WorkerAvailability,
config: ServerWorkerConfig,
) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1);
let (tx1, tx2) = Self::_start(idx, factories, avail.clone(), config, move |err| {
factory_tx.send(err).unwrap()
})?;
factory_rx
.recv()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.map(Err)
.unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail)))
}
#[allow(dead_code)]
/// Start server worker in async.
pub(crate) fn start_non_blocking(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
avail: WorkerAvailability,
config: ServerWorkerConfig,
) -> impl Future<Output = io::Result<(WorkerHandleAccept, WorkerHandleServer)>> {
let (factory_tx, factory_rx) = oneshot::channel();
let res = Self::_start(idx, factories, avail.clone(), config, move |err| {
factory_tx.send(err).unwrap()
});
async move {
let (tx1, tx2) = res?;
factory_rx
.await
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.map(Err)
.unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail)))
}
}
fn _start<F>(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
config: ServerWorkerConfig,
) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
f: F,
) -> io::Result<(UnboundedSender<Conn>, UnboundedSender<Stop>)>
where
F: FnOnce(Option<io::Error>) + Send + 'static,
{
assert!(!availability.available());
let avail = availability.clone();
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
// Try to get actix system when have one.
let system = System::try_current();
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1);
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
// every worker runs in it's own thread.
thread::Builder::new()
@ -223,38 +268,38 @@ impl ServerWorker {
// use a custom tokio runtime builder to change the settings of runtime.
let local = tokio::task::LocalSet::new();
let res = tokio::runtime::Builder::new_current_thread()
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.and_then(|rt| {
local.block_on(&rt, async {
for (idx, factory) in factories.iter().enumerate() {
let service = factory.create().await.map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"Can not start worker service",
)
})?;
.build();
let res = rt.and_then(|rt| {
let fut = async {
for (idx, factory) in factories.iter().enumerate() {
let service = factory.create().await.map_err(|_| {
io::Error::new(
io::ErrorKind::Other,
"Can not start worker service",
)
})?;
for (token, service) in service {
assert_eq!(token.0, services.len());
services.push(WorkerService {
factory: idx,
service,
status: WorkerServiceStatus::Unavailable,
})
}
for (token, service) in service {
assert_eq!(token.0, services.len());
services.push(WorkerService {
factory: idx,
service,
status: WorkerServiceStatus::Unavailable,
})
}
Ok::<_, io::Error>(())
})?;
}
Ok::<_, io::Error>(())
};
Ok(rt)
});
local.block_on(&rt, fut).map(|_| rt)
});
match res {
Ok(rt) => {
factory_tx.send(None).unwrap();
f(None);
let worker = ServerWorker {
rx,
@ -269,16 +314,11 @@ impl ServerWorker {
local.block_on(&rt, async { worker.await });
}
Err(e) => factory_tx.send(Some(e)).unwrap(),
Err(e) => f(Some(e)),
}
})
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
factory_rx
.recv()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.map(Err)
.unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail)))
.map(|_| (tx1, tx2))
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn restart_service(&mut self, token: Token, factory_id: usize) {