diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 28e4fdeb..c5e63630 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -243,7 +243,7 @@ impl ServiceRuntime { type BoxedNewService = Box< dyn BaseServiceFactory< - (Option, MioStream), + (CounterGuard, MioStream), Response = (), Error = (), InitError = (), @@ -257,7 +257,7 @@ struct ServiceFactory { inner: T, } -impl BaseServiceFactory<(Option, MioStream)> for ServiceFactory +impl BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory where T: BaseServiceFactory, T::Future: 'static, diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 835ee10b..da57af67 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -29,7 +29,7 @@ pub(crate) trait InternalServiceFactory: Send { pub(crate) type BoxedServerService = Box< dyn Service< - (Option, MioStream), + (CounterGuard, MioStream), Response = (), Error = (), Future = Ready>, @@ -50,7 +50,7 @@ impl StreamService { } } -impl Service<(Option, MioStream)> for StreamService +impl Service<(CounterGuard, MioStream)> for StreamService where S: Service, S::Future: 'static, @@ -65,7 +65,7 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&self, (guard, req): (Option, MioStream)) -> Self::Future { + fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future { ready(match FromStream::from_mio(req) { Ok(stream) => { let f = self.service.call(stream); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 56fb5e1a..09b88983 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -273,6 +273,13 @@ impl ServerWorker { WorkerHandle::new(idx, tx1, tx2, avail) } + fn restart_service(&mut self, token: Token, idx: usize) { + let factory = &self.factories[idx]; + trace!("Service {:?} failed, restarting", factory.name(token)); + self.services[token.0].status = WorkerServiceStatus::Restarting; + self.state = WorkerState::Restarting(idx, token, factory.create()); + } + fn shutdown(&mut self, force: bool) { self.services .iter_mut() @@ -387,13 +394,7 @@ impl Future for ServerWorker { } Ok(false) => Poll::Pending, Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); - self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = - WorkerState::Restarting(idx, token, self.factories[idx].create()); + self.restart_service(token, idx); self.poll(cx) } }, @@ -446,7 +447,7 @@ impl Future for ServerWorker { // actively poll stream and handle worker command WorkerState::Available => loop { match self.check_readiness(cx) { - Ok(true) => (), + Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); self.availability.set(false); @@ -454,14 +455,8 @@ impl Future for ServerWorker { return self.poll(cx); } Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); + self.restart_service(token, idx); self.availability.set(false); - self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = - WorkerState::Restarting(idx, token, self.factories[idx].create()); return self.poll(cx); } } @@ -470,9 +465,7 @@ impl Future for ServerWorker { // handle incoming io stream Some(WorkerCommand(msg)) => { let guard = self.conns.get(); - let _ = self.services[msg.token.0] - .service - .call((Some(guard), msg.io)); + let _ = self.services[msg.token.0].service.call((guard, msg.io)); } None => return Poll::Ready(()), };