From 05689b86d9c7679862c9470ed9008fd61a2f19e4 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 4 Apr 2021 02:53:06 -0700 Subject: [PATCH 1/2] Remove Option wrapper for CounterGuard (#313) --- actix-server/src/config.rs | 4 ++-- actix-server/src/service.rs | 6 +++--- actix-server/src/worker.rs | 4 +--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 28e4fdeb..c5e63630 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -243,7 +243,7 @@ impl ServiceRuntime { type BoxedNewService = Box< dyn BaseServiceFactory< - (Option, MioStream), + (CounterGuard, MioStream), Response = (), Error = (), InitError = (), @@ -257,7 +257,7 @@ struct ServiceFactory { inner: T, } -impl BaseServiceFactory<(Option, MioStream)> for ServiceFactory +impl BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory where T: BaseServiceFactory, T::Future: 'static, diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 835ee10b..da57af67 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -29,7 +29,7 @@ pub(crate) trait InternalServiceFactory: Send { pub(crate) type BoxedServerService = Box< dyn Service< - (Option, MioStream), + (CounterGuard, MioStream), Response = (), Error = (), Future = Ready>, @@ -50,7 +50,7 @@ impl StreamService { } } -impl Service<(Option, MioStream)> for StreamService +impl Service<(CounterGuard, MioStream)> for StreamService where S: Service, S::Future: 'static, @@ -65,7 +65,7 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&self, (guard, req): (Option, MioStream)) -> Self::Future { + fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future { ready(match FromStream::from_mio(req) { Ok(stream) => { let f = self.service.call(stream); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 366dab0b..f289f2d2 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -461,9 +461,7 @@ impl Future for ServerWorker { // handle incoming io stream Some(WorkerCommand(msg)) => { let guard = self.conns.get(); - let _ = self.services[msg.token.0] - .service - .call((Some(guard), msg.io)); + let _ = self.services[msg.token.0].service.call((guard, msg.io)); } None => return Poll::Ready(()), }; From 8079c50ddb3d2cc9fccd56a1c53105cacf55b821 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 4 Apr 2021 05:22:34 -0700 Subject: [PATCH 2/2] Add ServerWorker::restart_service method (#314) Co-authored-by: Rob Ede --- actix-server/src/worker.rs | 25 ++++++++++--------------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index f289f2d2..bd28ccda 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -262,6 +262,13 @@ impl ServerWorker { WorkerHandle::new(idx, tx1, tx2, avail) } + fn restart_service(&mut self, token: Token, idx: usize) { + let factory = &self.factories[idx]; + trace!("Service {:?} failed, restarting", factory.name(token)); + self.services[token.0].status = WorkerServiceStatus::Restarting; + self.state = WorkerState::Restarting(idx, token, factory.create()); + } + fn shutdown(&mut self, force: bool) { self.services .iter_mut() @@ -376,13 +383,7 @@ impl Future for ServerWorker { } Ok(false) => Poll::Pending, Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); - self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = - WorkerState::Restarting(idx, token, self.factories[idx].create()); + self.restart_service(token, idx); self.poll(cx) } }, @@ -437,7 +438,7 @@ impl Future for ServerWorker { // actively poll stream and handle worker command WorkerState::Available => loop { match self.check_readiness(cx) { - Ok(true) => (), + Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); self.availability.set(false); @@ -445,14 +446,8 @@ impl Future for ServerWorker { return self.poll(cx); } Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); + self.restart_service(token, idx); self.availability.set(false); - self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = - WorkerState::Restarting(idx, token, self.factories[idx].create()); return self.poll(cx); } }