diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 82c00ef5..39be07e3 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,10 +1,6 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; use std::{io, thread}; -use actix_rt::{ - time::{sleep_until, Instant}, - System, -}; use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; use slab::Slab; @@ -15,6 +11,8 @@ use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandle}; use crate::Token; +const DUR_ON_ERR: Duration = Duration::from_millis(500); + struct ServerSocketInfo { // addr for socket. mainly used for logging. addr: SocketAddr, @@ -22,9 +20,9 @@ struct ServerSocketInfo { // mio::Token token: Token, lst: MioListener, - // timeout is used to mark the deadline when this socket's listener should be registered again + // mark the deadline when this socket's listener should be registered again // after an error. - timeout: Option, + timeout_deadline: Option, } /// Accept loop would live with `ServerBuilder`. @@ -81,6 +79,9 @@ struct Accept { srv: Server, next: usize, backpressure: bool, + // poll time duration. + // use the nearest timeout from socket timeout settings. + timeout: Option, } /// This function defines errors that are per-connection. Which basically @@ -104,13 +105,10 @@ impl Accept { srv: Server, handles: Vec, ) { - // Accept runs in its own thread and would want to spawn additional futures to current - // actix system. - let sys = System::current(); + // Accept runs in its own thread thread::Builder::new() .name("actix-server accept loop".to_owned()) .spawn(move || { - System::set_current(sys); let (mut accept, sockets) = Accept::new_with_sockets(poll, waker, socks, handles, srv); accept.poll_with(sockets); @@ -141,7 +139,7 @@ impl Accept { addr, token: hnd_token, lst, - timeout: None, + timeout_deadline: None, }); } @@ -152,6 +150,7 @@ impl Accept { srv, next: 0, backpressure: false, + timeout: None, }; (accept, sockets) @@ -162,9 +161,10 @@ impl Accept { loop { self.poll - .poll(&mut events, None) + .poll(&mut events, self.timeout) .unwrap_or_else(|e| panic!("Poll error: {}", e)); + // handle events for event in events.iter() { let token = event.token(); match token { @@ -190,12 +190,6 @@ impl Accept { self.maybe_backpressure(&mut sockets, false); self.handles.push(handle); } - // got timer interest and it's time to try register socket(s) - // again. - Some(WakerInterest::Timer) => { - drop(guard); - self.process_timer(&mut sockets) - } Some(WakerInterest::Pause) => { drop(guard); sockets.iter_mut().for_each(|(_, info)| { @@ -234,21 +228,43 @@ impl Accept { } } } + + // check for timeout and re-register sockets. + self.process_timeout(&mut sockets); } } - fn process_timer(&self, sockets: &mut Slab) { - let now = Instant::now(); - sockets.iter_mut().for_each(|(token, info)| { - // only the ServerSocketInfo have an associate timeout value was de registered. - if let Some(inst) = info.timeout.take() { - if now > inst { - self.register_logged(token, info); - } else { - info.timeout = Some(inst); + fn process_timeout(&mut self, sockets: &mut Slab) { + // take old timeout as it's no use after each iteration. + if self.timeout.take().is_some() { + let now = Instant::now(); + sockets.iter_mut().for_each(|(token, info)| { + // only the ServerSocketInfo have an associate timeout value was de registered. + if let Some(inst) = info.timeout_deadline { + // timeout expired register socket again. + if now >= inst { + info.timeout_deadline = None; + self.register_logged(token, info); + } else { + // still timed out. try set new timeout. + let dur = inst - now; + self.set_timeout(dur); + } + } + }); + } + } + + // update Accept timeout duration. would keep the smallest duration. + fn set_timeout(&mut self, dur: Duration) { + match self.timeout { + Some(timeout) => { + if timeout > dur { + self.timeout = Some(dur); } } - }); + None => self.timeout = Some(dur), + } } #[cfg(not(target_os = "windows"))] @@ -298,7 +314,7 @@ impl Accept { if !on { self.backpressure = false; for (token, info) in sockets.iter_mut() { - if info.timeout.is_some() { + if info.timeout_deadline.is_some() { // socket will attempt to re-register itself when its timeout completes continue; } @@ -396,17 +412,11 @@ impl Accept { error!("Can not deregister server socket {}", err); } - // sleep after error. write the timeout to socket info as later the poll - // would need it mark which socket and when it's listener should be - // registered. - info.timeout = Some(Instant::now() + Duration::from_millis(500)); - - // after the sleep a Timer interest is sent to Accept Poll - let waker = self.waker.clone(); - System::current().worker().spawn(async move { - sleep_until(Instant::now() + Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); + // sleep after error. write the timeout deadline to socket info + // as later the poll would need it mark which socket and when + // it's listener should be registered again. + info.timeout_deadline = Some(Instant::now() + DUR_ON_ERR); + self.set_timeout(DUR_ON_ERR); return; } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 7290f9dd..a78e4175 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -19,7 +19,7 @@ use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{self, ServerWorker, WorkerAvailability, WorkerHandle}; +use crate::worker::{self, Worker, WorkerAvailability, WorkerHandle}; use crate::{join_all, Token}; /// Server builder @@ -297,7 +297,7 @@ impl ServerBuilder { let avail = WorkerAvailability::new(waker); let services = self.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, services, avail, self.shutdown_timeout) + Worker::start(idx, services, avail, self.shutdown_timeout) } fn handle_cmd(&mut self, item: ServerCommand) { diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index f92363b5..6b103689 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -78,10 +78,6 @@ pub(crate) enum WakerInterest { Pause, Resume, Stop, - /// `Timer` is an interest sent as a delayed future. When an error happens on accepting - /// connection `Accept` would deregister socket listener temporary and wake up the poll and - /// register them again after the delayed future resolve. - Timer, /// `Worker` is an interest happen after a worker runs into faulted state(This is determined /// by if work can be sent to it successfully).`Accept` would be waked up and add the new /// `WorkerHandle`. diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 6f15d044..4665612b 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -6,7 +6,6 @@ use std::task::{Context, Poll}; use std::time::Duration; use actix_rt::time::{sleep_until, Instant, Sleep}; -use actix_rt::{spawn, Worker as Arbiter}; use actix_utils::counter::Counter; use futures_core::future::LocalBoxFuture; use log::{error, info, trace}; @@ -122,10 +121,8 @@ impl WorkerAvailability { } } -/// Service worker. -/// /// Worker accepts Socket objects via unbounded channel and starts stream processing. -pub(crate) struct ServerWorker { +pub(crate) struct Worker { rx: UnboundedReceiver, rx2: UnboundedReceiver, services: Vec, @@ -159,7 +156,7 @@ enum WorkerServiceStatus { Stopped, } -impl ServerWorker { +impl Worker { pub(crate) fn start( idx: usize, factories: Vec>, @@ -170,10 +167,9 @@ impl ServerWorker { let (tx2, rx2) = unbounded_channel(); let avail = availability.clone(); - // every worker runs in it's own arbiter. - Arbiter::new().spawn(Box::pin(async move { - availability.set(false); - let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { + // every worker runs in it's own thread. + std::thread::spawn(move || { + let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { rx, rx2, availability, @@ -184,23 +180,26 @@ impl ServerWorker { state: WorkerState::Unavailable, }); - let fut = wrk - .factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { - fut.await.map(|r| { - r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() - }) - } - }) - .collect::>(); + actix_rt::Runtime::new().unwrap().block_on(async move { + let fut = wrk + .factories + .iter() + .enumerate() + .map(|(idx, factory)| { + let fut = factory.create(); + async move { + fut.await.map(|r| { + r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() + }) + } + }) + .collect::>(); - spawn(async move { - let res: Result, _> = join_all(fut).await.into_iter().collect(); - match res { + match join_all(fut) + .await + .into_iter() + .collect::, _>>() + { Ok(services) => { for item in services { for (factory, token, service) in item { @@ -215,12 +214,12 @@ impl ServerWorker { } Err(e) => { error!("Can not start worker: {:?}", e); - Arbiter::handle().stop(); } } + wrk.await - }); - })); + }) + }); WorkerHandle::new(idx, tx1, tx2, avail) } @@ -303,7 +302,7 @@ enum WorkerState { ), } -impl Future for ServerWorker { +impl Future for Worker { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -386,7 +385,6 @@ impl Future for ServerWorker { let num = num_connections(); if num == 0 { let _ = tx.take().unwrap().send(true); - Arbiter::handle().stop(); return Poll::Ready(()); } @@ -394,7 +392,6 @@ impl Future for ServerWorker { if Pin::new(t2).poll(cx).is_ready() { let _ = tx.take().unwrap().send(false); self.shutdown(true); - Arbiter::handle().stop(); return Poll::Ready(()); }