merge master

This commit is contained in:
fakeshadow 2021-04-04 23:44:41 +08:00
commit f1ad7957cf
3 changed files with 16 additions and 23 deletions

View File

@ -243,7 +243,7 @@ impl ServiceRuntime {
type BoxedNewService = Box<
dyn BaseServiceFactory<
(Option<CounterGuard>, MioStream),
(CounterGuard, MioStream),
Response = (),
Error = (),
InitError = (),
@ -257,7 +257,7 @@ struct ServiceFactory<T> {
inner: T,
}
impl<T> BaseServiceFactory<(Option<CounterGuard>, MioStream)> for ServiceFactory<T>
impl<T> BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory<T>
where
T: BaseServiceFactory<TcpStream, Config = ()>,
T::Future: 'static,

View File

@ -29,7 +29,7 @@ pub(crate) trait InternalServiceFactory: Send {
pub(crate) type BoxedServerService = Box<
dyn Service<
(Option<CounterGuard>, MioStream),
(CounterGuard, MioStream),
Response = (),
Error = (),
Future = Ready<Result<(), ()>>,
@ -50,7 +50,7 @@ impl<S, I> StreamService<S, I> {
}
}
impl<S, I> Service<(Option<CounterGuard>, MioStream)> for StreamService<S, I>
impl<S, I> Service<(CounterGuard, MioStream)> for StreamService<S, I>
where
S: Service<I>,
S::Future: 'static,
@ -65,7 +65,7 @@ where
self.service.poll_ready(ctx).map_err(|_| ())
}
fn call(&self, (guard, req): (Option<CounterGuard>, MioStream)) -> Self::Future {
fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future {
ready(match FromStream::from_mio(req) {
Ok(stream) => {
let f = self.service.call(stream);

View File

@ -272,6 +272,13 @@ impl ServerWorker {
WorkerHandle::new(idx, tx1, tx2, avail)
}
fn restart_service(&mut self, token: Token, idx: usize) {
let factory = &self.factories[idx];
trace!("Service {:?} failed, restarting", factory.name(token));
self.services[token.0].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(idx, token, factory.create());
}
fn shutdown(&mut self, force: bool) {
self.services
.iter_mut()
@ -394,13 +401,7 @@ impl Future for ServerWorker {
}
Ok(false) => Poll::Pending,
Err((token, idx)) => {
trace!(
"Service {:?} failed, restarting",
this.factories[idx].name(token)
);
this.services[token.0].status = WorkerServiceStatus::Restarting;
this.state =
WorkerState::Restarting(idx, token, this.factories[idx].create());
this.restart_service(token, idx);
self.poll(cx)
}
},
@ -456,7 +457,7 @@ impl Future for ServerWorker {
// actively poll stream and handle worker command
WorkerState::Available => loop {
match this.check_readiness(cx) {
Ok(true) => (),
Ok(true) => {}
Ok(false) => {
trace!("Worker is unavailable");
this.availability.set(false);
@ -464,14 +465,8 @@ impl Future for ServerWorker {
return self.poll(cx);
}
Err((token, idx)) => {
trace!(
"Service {:?} failed, restarting",
this.factories[idx].name(token)
);
this.restart_service(token, idx);
this.availability.set(false);
this.services[token.0].status = WorkerServiceStatus::Restarting;
this.state =
WorkerState::Restarting(idx, token, this.factories[idx].create());
return self.poll(cx);
}
}
@ -480,9 +475,7 @@ impl Future for ServerWorker {
// handle incoming io stream
Some(WorkerCommand(msg)) => {
let guard = this.conns.get();
let _ = this.services[msg.token.0]
.service
.call((Some(guard), msg.io));
let _ = this.services[msg.token.0].service.call((guard, msg.io));
}
None => return Poll::Ready(()),
};