diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 28e4fdeb..c5e63630 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -243,7 +243,7 @@ impl ServiceRuntime { type BoxedNewService = Box< dyn BaseServiceFactory< - (Option, MioStream), + (CounterGuard, MioStream), Response = (), Error = (), InitError = (), @@ -257,7 +257,7 @@ struct ServiceFactory { inner: T, } -impl BaseServiceFactory<(Option, MioStream)> for ServiceFactory +impl BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory where T: BaseServiceFactory, T::Future: 'static, diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 835ee10b..da57af67 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -29,7 +29,7 @@ pub(crate) trait InternalServiceFactory: Send { pub(crate) type BoxedServerService = Box< dyn Service< - (Option, MioStream), + (CounterGuard, MioStream), Response = (), Error = (), Future = Ready>, @@ -50,7 +50,7 @@ impl StreamService { } } -impl Service<(Option, MioStream)> for StreamService +impl Service<(CounterGuard, MioStream)> for StreamService where S: Service, S::Future: 'static, @@ -65,7 +65,7 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&self, (guard, req): (Option, MioStream)) -> Self::Future { + fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future { ready(match FromStream::from_mio(req) { Ok(stream) => { let f = self.service.call(stream); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 608e87a0..9409dfb4 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -272,6 +272,13 @@ impl ServerWorker { WorkerHandle::new(idx, tx1, tx2, avail) } + fn restart_service(&mut self, token: Token, idx: usize) { + let factory = &self.factories[idx]; + trace!("Service {:?} failed, restarting", factory.name(token)); + self.services[token.0].status = WorkerServiceStatus::Restarting; + self.state = WorkerState::Restarting(idx, token, factory.create()); + } + fn shutdown(&mut self, force: bool) { self.services .iter_mut() @@ -394,13 +401,7 @@ impl Future for ServerWorker { } Ok(false) => Poll::Pending, Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - this.factories[idx].name(token) - ); - this.services[token.0].status = WorkerServiceStatus::Restarting; - this.state = - WorkerState::Restarting(idx, token, this.factories[idx].create()); + this.restart_service(token, idx); self.poll(cx) } }, @@ -456,7 +457,7 @@ impl Future for ServerWorker { // actively poll stream and handle worker command WorkerState::Available => loop { match this.check_readiness(cx) { - Ok(true) => (), + Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); this.availability.set(false); @@ -464,14 +465,8 @@ impl Future for ServerWorker { return self.poll(cx); } Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - this.factories[idx].name(token) - ); + this.restart_service(token, idx); this.availability.set(false); - this.services[token.0].status = WorkerServiceStatus::Restarting; - this.state = - WorkerState::Restarting(idx, token, this.factories[idx].create()); return self.poll(cx); } } @@ -480,9 +475,7 @@ impl Future for ServerWorker { // handle incoming io stream Some(WorkerCommand(msg)) => { let guard = this.conns.get(); - let _ = this.services[msg.token.0] - .service - .call((Some(guard), msg.io)); + let _ = this.services[msg.token.0].service.call((guard, msg.io)); } None => return Poll::Ready(()), };