diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index ab0ae707..3c4f839e 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,16 +1,16 @@ -use std::time::Duration; use std::{io, thread}; - -use actix_rt::{ - time::{sleep, Instant}, - System, +use std::{ + task::{self, Context}, + time::Duration, }; + +use actix_rt::{time::Instant, System}; use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; use crate::server::Server; use crate::socket::{MioListener, SocketAddr}; -use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; +use crate::waker::{self, WakerInterest, WakerRx, WakerTx, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandleAccept}; struct ServerSocketInfo { @@ -28,35 +28,39 @@ struct ServerSocketInfo { /// Accept loop would live with `ServerBuilder`. /// -/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to -/// `Accept` and `Worker`. +/// It's tasked with construct `Poll` instance and `WakerTx` +/// which would be distributed to `Worker`. /// -/// It would also listen to `ServerCommand` and push interests to `WakerQueue`. +/// `WakerRx` is passed to `Accept` for recieving `WakerInterest`. +/// +/// It would also listen to `ServerCommand` and push `WakerInterest` to `Accept`. pub(crate) struct AcceptLoop { srv: Option, poll: Option, - waker: WakerQueue, + waker_tx: WakerTx, + waker_rx: Option, } impl AcceptLoop { pub fn new(srv: Server) -> Self { let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); - let waker = WakerQueue::new(poll.registry()) - .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); + + let (waker_tx, waker_rx) = waker::waker_channel(); Self { srv: Some(srv), poll: Some(poll), - waker, + waker_tx, + waker_rx: Some(waker_rx), } } - pub(crate) fn waker_owned(&self) -> WakerQueue { - self.waker.clone() + pub(crate) fn waker_tx(&self) -> WakerTx { + self.waker_tx.clone() } - pub fn wake(&self, i: WakerInterest) { - self.waker.wake(i); + pub(crate) fn wake(&self, interest: WakerInterest) { + let _ = self.waker_tx.wake(interest); } pub(crate) fn start( @@ -66,16 +70,16 @@ impl AcceptLoop { ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let poll = self.poll.take().unwrap(); - let waker = self.waker.clone(); + let wake_rx = self.waker_rx.take().unwrap(); - Accept::start(poll, waker, socks, srv, handles); + Accept::start(poll, wake_rx, socks, srv, handles); } } /// poll instance of the server. struct Accept { poll: Poll, - waker: WakerQueue, + waker_rx: WakerRx, handles: Vec, srv: Server, next: usize, @@ -159,7 +163,7 @@ fn connection_error(e: &io::Error) -> bool { impl Accept { pub(crate) fn start( poll: Poll, - waker: WakerQueue, + waker_rx: WakerRx, socks: Vec<(usize, MioListener)>, srv: Server, handles: Vec, @@ -172,16 +176,20 @@ impl Accept { .spawn(move || { System::set_current(sys); let (mut accept, mut sockets) = - Accept::new_with_sockets(poll, waker, socks, handles, srv); + Accept::new_with_sockets(poll, waker_rx, socks, handles, srv); - accept.poll_with(&mut sockets); + // Construct Context from waker. + let waker = waker::from_registry(accept.poll.registry()).unwrap().into(); + let cx = &mut Context::from_waker(&waker); + + accept.poll_with(&mut sockets, cx); }) .unwrap(); } fn new_with_sockets( poll: Poll, - waker: WakerQueue, + waker_rx: WakerRx, socks: Vec<(usize, MioListener)>, handles: Vec, srv: Server, @@ -212,7 +220,7 @@ impl Accept { let accept = Accept { poll, - waker, + waker_rx, handles, srv, next: 0, @@ -223,9 +231,16 @@ impl Accept { (accept, sockets) } - fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { + fn poll_with(&mut self, sockets: &mut [ServerSocketInfo], cx: &mut Context<'_>) { let mut events = mio::Events::with_capacity(128); + // poll waker channel once and register the context/waker. + let exit = self.poll_waker(sockets, cx); + if exit { + info!("Accept is stopped."); + return; + } + loop { if let Err(e) = self.poll.poll(&mut events, None) { match e.kind() { @@ -238,7 +253,7 @@ impl Accept { let token = event.token(); match token { WAKER_TOKEN => { - let exit = self.handle_waker(sockets); + let exit = self.poll_waker(sockets, cx); if exit { info!("Accept is stopped."); return; @@ -253,19 +268,13 @@ impl Accept { } } - fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool { + fn poll_waker(&mut self, sockets: &mut [ServerSocketInfo], cx: &mut Context<'_>) -> bool { // This is a loop because interests for command from previous version was // a loop that would try to drain the command channel. It's yet unknown // if it's necessary/good practice to actively drain the waker queue. - loop { - // take guard with every iteration so no new interest can be added - // until the current task is done. - let mut guard = self.waker.guard(); - match guard.pop_front() { - // worker notify it becomes available. - Some(WakerInterest::WorkerAvailable(idx)) => { - drop(guard); - + while let task::Poll::Ready(Some(msg)) = self.waker_rx.poll_recv(cx) { + match msg { + WakerInterest::WorkerAvailable(idx) => { self.avail.set_available(idx, true); if !self.paused { @@ -273,9 +282,7 @@ impl Accept { } } // a new worker thread is made and it's handle would be added to Accept - Some(WakerInterest::Worker(handle)) => { - drop(guard); - + WakerInterest::Worker(handle) => { self.avail.set_available(handle.idx(), true); self.handles.push(handle); @@ -283,65 +290,34 @@ impl Accept { self.accept_all(sockets); } } - // got timer interest and it's time to try register socket(s) again - Some(WakerInterest::Timer) => { - drop(guard); - - self.process_timer(sockets) + WakerInterest::Pause => { + if !self.paused { + self.paused = true; + self.deregister_all(sockets); + } } - Some(WakerInterest::Pause) => { - drop(guard); + WakerInterest::Resume => { + if self.paused { + self.paused = false; - self.paused = true; + sockets.iter_mut().for_each(|info| { + self.register_logged(info); + }); - self.deregister_all(sockets); + self.accept_all(sockets); + } } - Some(WakerInterest::Resume) => { - drop(guard); - - self.paused = false; - - sockets.iter_mut().for_each(|info| { - self.register_logged(info); - }); - - self.accept_all(sockets); - } - Some(WakerInterest::Stop) => { - self.deregister_all(sockets); + WakerInterest::Stop => { + if !self.paused { + self.deregister_all(sockets); + } return true; } - // waker queue is drained - None => { - // Reset the WakerQueue before break so it does not grow infinitely - WakerQueue::reset(&mut guard); - - return false; - } } } - } - fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { - let now = Instant::now(); - sockets - .iter_mut() - // Only sockets that had an associated timeout were deregistered. - .filter(|info| info.timeout.is_some()) - .for_each(|info| { - let inst = info.timeout.take().unwrap(); - - if now < inst { - info.timeout = Some(inst); - } else if !self.paused { - self.register_logged(info); - } - - // Drop the timeout if server is paused and socket timeout is expired. - // When server recovers from pause it will register all sockets without - // a timeout value so this socket register will be delayed till then. - }); + false } #[cfg(not(target_os = "windows"))] @@ -482,13 +458,6 @@ impl Accept { // listener should be registered info.timeout = Some(Instant::now() + Duration::from_millis(500)); - // after the sleep a Timer interest is sent to Accept Poll - let waker = self.waker.clone(); - System::current().arbiter().spawn(async move { - sleep(Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); - return; } }; diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index bd694e7c..f9abb330 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -21,7 +21,7 @@ use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::waker::WakerInterest; use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; /// Server builder @@ -282,8 +282,7 @@ impl ServerBuilder { // start workers let handles = (0..self.threads) .map(|idx| { - let (handle_accept, handle_server) = - self.start_worker(idx, self.accept.waker_owned()); + let (handle_accept, handle_server) = self.start_worker(idx); self.handles.push((idx, handle_server)); handle_accept @@ -314,14 +313,10 @@ impl ServerBuilder { } } - fn start_worker( - &self, - idx: usize, - waker_queue: WakerQueue, - ) -> (WorkerHandleAccept, WorkerHandleServer) { + fn start_worker(&self, idx: usize) -> (WorkerHandleAccept, WorkerHandleServer) { let services = self.services.iter().map(|v| v.clone_factory()).collect(); - - ServerWorker::start(idx, services, waker_queue, self.worker_config) + let waker_tx = self.accept.waker_tx(); + ServerWorker::start(idx, services, waker_tx, self.worker_config) } fn handle_cmd(&mut self, item: ServerCommand) { @@ -427,8 +422,7 @@ impl ServerBuilder { break; } - let (handle_accept, handle_server) = - self.start_worker(new_idx, self.accept.waker_owned()); + let (handle_accept, handle_server) = self.start_worker(new_idx); self.handles.push((new_idx, handle_server)); self.accept.wake(WakerInterest::Worker(handle_accept)); } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index ba7d0c29..b3b6fb5d 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -12,7 +12,7 @@ mod service; mod signals; mod socket; mod test_server; -mod waker_queue; +mod waker; mod worker; pub use self::builder::ServerBuilder; diff --git a/actix-server/src/waker.rs b/actix-server/src/waker.rs new file mode 100644 index 00000000..db4cf627 --- /dev/null +++ b/actix-server/src/waker.rs @@ -0,0 +1,90 @@ +use std::{ + ops::{Deref, DerefMut}, + sync::Arc, + task::{Wake, Waker}, +}; + +use mio::{Registry, Token}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +use crate::worker::WorkerHandleAccept; + +/// Waker token for `mio::Poll` instance. +pub(crate) const WAKER_TOKEN: Token = Token(usize::MAX); + +/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker. +/// +/// These interests should not be confused with `mio::Interest` and mostly not I/O related +pub(crate) enum WakerInterest { + /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker + /// available and can accept new tasks. + WorkerAvailable(usize), + /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to + /// `ServerCommand` and notify `Accept` to do exactly these tasks. + Pause, + Resume, + Stop, + /// `Worker` is an interest happen after a worker runs into faulted state(This is determined + /// by if work can be sent to it successfully).`Accept` would be waked up and add the new + /// `WorkerHandleAccept`. + Worker(WorkerHandleAccept), +} +/// Wrapper type for mio::Waker in order to impl std::task::Wake trait. +struct _Waker(mio::Waker); + +impl Wake for _Waker { + fn wake(self: Arc) { + Wake::wake_by_ref(&self) + } + + fn wake_by_ref(self: &Arc) { + self.0 + .wake() + .unwrap_or_else(|e| panic!("Can not wake up Accept Poll: {}", e)); + } +} + +/// Wrapper type for tokio unbounded channel sender. +pub(crate) struct WakerTx(UnboundedSender); + +impl Clone for WakerTx { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl WakerTx { + /// Send WakerInterest through channel and panic on error. + pub(crate) fn wake(&self, interest: WakerInterest) { + self.0 + .send(interest) + .unwrap_or_else(|e| panic!("Can not send WakerInterest: {}", e)); + } +} + +/// Wrapper type for tokio unbounded channel receiver. +pub(crate) struct WakerRx(UnboundedReceiver); + +impl Deref for WakerRx { + type Target = UnboundedReceiver; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for WakerRx { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +pub(crate) fn from_registry(registry: &Registry) -> std::io::Result { + mio::Waker::new(registry, WAKER_TOKEN).map(|waker| Arc::new(_Waker(waker)).into()) +} + +pub(crate) fn waker_channel() -> (WakerTx, WakerRx) { + let (tx, rx) = unbounded_channel(); + + (WakerTx(tx), WakerRx(rx)) +} diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs deleted file mode 100644 index 3f8669d4..00000000 --- a/actix-server/src/waker_queue.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::{ - collections::VecDeque, - ops::Deref, - sync::{Arc, Mutex, MutexGuard}, -}; - -use mio::{Registry, Token as MioToken, Waker}; - -use crate::worker::WorkerHandleAccept; - -/// Waker token for `mio::Poll` instance. -pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); - -/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` -/// the `Poll` would want to look into. -pub(crate) struct WakerQueue(Arc<(Waker, Mutex>)>); - -impl Clone for WakerQueue { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl Deref for WakerQueue { - type Target = (Waker, Mutex>); - - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} - -impl WakerQueue { - /// Construct a waker queue with given `Poll`'s `Registry` and capacity. - /// - /// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match - /// event's token for it to properly handle `WakerInterest`. - pub(crate) fn new(registry: &Registry) -> std::io::Result { - let waker = Waker::new(registry, WAKER_TOKEN)?; - let queue = Mutex::new(VecDeque::with_capacity(16)); - - Ok(Self(Arc::new((waker, queue)))) - } - - /// Push a new interest to the queue and wake up the accept poll afterwards. - pub(crate) fn wake(&self, interest: WakerInterest) { - let (waker, queue) = self.deref(); - - queue - .lock() - .expect("Failed to lock WakerQueue") - .push_back(interest); - - waker - .wake() - .unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e)); - } - - /// Get a MutexGuard of the waker queue. - pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque> { - self.deref().1.lock().expect("Failed to lock WakerQueue") - } - - /// Reset the waker queue so it does not grow infinitely. - pub(crate) fn reset(queue: &mut VecDeque) { - std::mem::swap(&mut VecDeque::::with_capacity(16), queue); - } -} - -/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker. -/// -/// These interests should not be confused with `mio::Interest` and mostly not I/O related -pub(crate) enum WakerInterest { - /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker - /// available and can accept new tasks. - WorkerAvailable(usize), - /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to - /// `ServerCommand` and notify `Accept` to do exactly these tasks. - Pause, - Resume, - Stop, - /// `Timer` is an interest sent as a delayed future. When an error happens on accepting - /// connection `Accept` would deregister socket listener temporary and wake up the poll and - /// register them again after the delayed future resolve. - Timer, - /// `Worker` is an interest happen after a worker runs into faulted state(This is determined - /// by if work can be sent to it successfully).`Accept` would be waked up and add the new - /// `WorkerHandleAccept`. - Worker(WorkerHandleAccept), -} diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index df8bc723..9eb8d129 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -26,7 +26,7 @@ use tokio::sync::{ use crate::join_all; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; -use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::waker::{WakerInterest, WakerTx}; /// Stop worker message. Returns `true` on successful graceful shutdown. /// and `false` if some connections still alive when shutdown execute. @@ -91,7 +91,7 @@ impl Counter { pub(crate) struct WorkerCounter { idx: usize, - inner: Rc<(WakerQueue, Counter)>, + inner: Rc<(WakerTx, Counter)>, } impl Clone for WorkerCounter { @@ -104,10 +104,10 @@ impl Clone for WorkerCounter { } impl WorkerCounter { - pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self { + pub(crate) fn new(idx: usize, waker_tx: WakerTx, counter: Counter) -> Self { Self { idx, - inner: Rc::new((waker_queue, counter)), + inner: Rc::new((waker_tx, counter)), } } @@ -125,9 +125,9 @@ pub(crate) struct WorkerCounterGuard(WorkerCounter); impl Drop for WorkerCounterGuard { fn drop(&mut self) { - let (waker_queue, counter) = &*self.0.inner; + let (waker_tx, counter) = &*self.0.inner; if counter.derc() { - waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx)); + waker_tx.wake(WakerInterest::WorkerAvailable(self.0.idx)); } } } @@ -251,7 +251,7 @@ impl ServerWorker { pub(crate) fn start( idx: usize, factories: Vec>, - waker_queue: WakerQueue, + waker_tx: WakerTx, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { let (tx1, rx) = unbounded_channel(); @@ -315,7 +315,7 @@ impl ServerWorker { rx, rx2, services, - counter: WorkerCounter::new(idx, waker_queue, counter_clone), + counter: WorkerCounter::new(idx, waker_tx, counter_clone), factories: factories.into_boxed_slice(), state: Default::default(), shutdown_timeout: config.shutdown_timeout,