Rework waker to use std::task::wake trait

This commit is contained in:
fakeshadow 2021-04-23 21:09:00 +08:00
parent fe1a3a60e6
commit 9eb371101c
6 changed files with 170 additions and 206 deletions

View File

@ -1,16 +1,16 @@
use std::time::Duration;
use std::{io, thread};
use actix_rt::{
time::{sleep, Instant},
System,
use std::{
task::{self, Context},
time::Duration,
};
use actix_rt::{time::Instant, System};
use log::{error, info};
use mio::{Interest, Poll, Token as MioToken};
use crate::server::Server;
use crate::socket::{MioListener, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::waker::{self, WakerInterest, WakerRx, WakerTx, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept};
struct ServerSocketInfo {
@ -28,35 +28,39 @@ struct ServerSocketInfo {
/// Accept loop would live with `ServerBuilder`.
///
/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to
/// `Accept` and `Worker`.
/// It's tasked with construct `Poll` instance and `WakerTx`
/// which would be distributed to `Worker`.
///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
/// `WakerRx` is passed to `Accept` for recieving `WakerInterest`.
///
/// It would also listen to `ServerCommand` and push `WakerInterest` to `Accept`.
pub(crate) struct AcceptLoop {
srv: Option<Server>,
poll: Option<Poll>,
waker: WakerQueue,
waker_tx: WakerTx,
waker_rx: Option<WakerRx>,
}
impl AcceptLoop {
pub fn new(srv: Server) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
let (waker_tx, waker_rx) = waker::waker_channel();
Self {
srv: Some(srv),
poll: Some(poll),
waker,
waker_tx,
waker_rx: Some(waker_rx),
}
}
pub(crate) fn waker_owned(&self) -> WakerQueue {
self.waker.clone()
pub(crate) fn waker_tx(&self) -> WakerTx {
self.waker_tx.clone()
}
pub fn wake(&self, i: WakerInterest) {
self.waker.wake(i);
pub(crate) fn wake(&self, interest: WakerInterest) {
let _ = self.waker_tx.wake(interest);
}
pub(crate) fn start(
@ -66,16 +70,16 @@ impl AcceptLoop {
) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap();
let waker = self.waker.clone();
let wake_rx = self.waker_rx.take().unwrap();
Accept::start(poll, waker, socks, srv, handles);
Accept::start(poll, wake_rx, socks, srv, handles);
}
}
/// poll instance of the server.
struct Accept {
poll: Poll,
waker: WakerQueue,
waker_rx: WakerRx,
handles: Vec<WorkerHandleAccept>,
srv: Server,
next: usize,
@ -159,7 +163,7 @@ fn connection_error(e: &io::Error) -> bool {
impl Accept {
pub(crate) fn start(
poll: Poll,
waker: WakerQueue,
waker_rx: WakerRx,
socks: Vec<(usize, MioListener)>,
srv: Server,
handles: Vec<WorkerHandleAccept>,
@ -172,16 +176,20 @@ impl Accept {
.spawn(move || {
System::set_current(sys);
let (mut accept, mut sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv);
Accept::new_with_sockets(poll, waker_rx, socks, handles, srv);
accept.poll_with(&mut sockets);
// Construct Context from waker.
let waker = waker::from_registry(accept.poll.registry()).unwrap().into();
let cx = &mut Context::from_waker(&waker);
accept.poll_with(&mut sockets, cx);
})
.unwrap();
}
fn new_with_sockets(
poll: Poll,
waker: WakerQueue,
waker_rx: WakerRx,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
srv: Server,
@ -212,7 +220,7 @@ impl Accept {
let accept = Accept {
poll,
waker,
waker_rx,
handles,
srv,
next: 0,
@ -223,9 +231,16 @@ impl Accept {
(accept, sockets)
}
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo], cx: &mut Context<'_>) {
let mut events = mio::Events::with_capacity(128);
// poll waker channel once and register the context/waker.
let exit = self.poll_waker(sockets, cx);
if exit {
info!("Accept is stopped.");
return;
}
loop {
if let Err(e) = self.poll.poll(&mut events, None) {
match e.kind() {
@ -238,7 +253,7 @@ impl Accept {
let token = event.token();
match token {
WAKER_TOKEN => {
let exit = self.handle_waker(sockets);
let exit = self.poll_waker(sockets, cx);
if exit {
info!("Accept is stopped.");
return;
@ -253,19 +268,13 @@ impl Accept {
}
}
fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool {
fn poll_waker(&mut self, sockets: &mut [ServerSocketInfo], cx: &mut Context<'_>) -> bool {
// This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue.
loop {
// take guard with every iteration so no new interest can be added
// until the current task is done.
let mut guard = self.waker.guard();
match guard.pop_front() {
// worker notify it becomes available.
Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard);
while let task::Poll::Ready(Some(msg)) = self.waker_rx.poll_recv(cx) {
match msg {
WakerInterest::WorkerAvailable(idx) => {
self.avail.set_available(idx, true);
if !self.paused {
@ -273,9 +282,7 @@ impl Accept {
}
}
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
WakerInterest::Worker(handle) => {
self.avail.set_available(handle.idx(), true);
self.handles.push(handle);
@ -283,22 +290,14 @@ impl Accept {
self.accept_all(sockets);
}
}
// got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(sockets)
}
Some(WakerInterest::Pause) => {
drop(guard);
WakerInterest::Pause => {
if !self.paused {
self.paused = true;
self.deregister_all(sockets);
}
Some(WakerInterest::Resume) => {
drop(guard);
}
WakerInterest::Resume => {
if self.paused {
self.paused = false;
sockets.iter_mut().for_each(|info| {
@ -307,41 +306,18 @@ impl Accept {
self.accept_all(sockets);
}
Some(WakerInterest::Stop) => {
}
WakerInterest::Stop => {
if !self.paused {
self.deregister_all(sockets);
}
return true;
}
// waker queue is drained
None => {
// Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard);
return false;
}
}
}
}
fn process_timer(&self, sockets: &mut [ServerSocketInfo]) {
let now = Instant::now();
sockets
.iter_mut()
// Only sockets that had an associated timeout were deregistered.
.filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
if now < inst {
info.timeout = Some(inst);
} else if !self.paused {
self.register_logged(info);
}
// Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
false
}
#[cfg(not(target_os = "windows"))]
@ -482,13 +458,6 @@ impl Accept {
// listener should be registered
info.timeout = Some(Instant::now() + Duration::from_millis(500));
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
System::current().arbiter().spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
return;
}
};

View File

@ -21,7 +21,7 @@ use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::waker::WakerInterest;
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
/// Server builder
@ -282,8 +282,7 @@ impl ServerBuilder {
// start workers
let handles = (0..self.threads)
.map(|idx| {
let (handle_accept, handle_server) =
self.start_worker(idx, self.accept.waker_owned());
let (handle_accept, handle_server) = self.start_worker(idx);
self.handles.push((idx, handle_server));
handle_accept
@ -314,14 +313,10 @@ impl ServerBuilder {
}
}
fn start_worker(
&self,
idx: usize,
waker_queue: WakerQueue,
) -> (WorkerHandleAccept, WorkerHandleServer) {
fn start_worker(&self, idx: usize) -> (WorkerHandleAccept, WorkerHandleServer) {
let services = self.services.iter().map(|v| v.clone_factory()).collect();
ServerWorker::start(idx, services, waker_queue, self.worker_config)
let waker_tx = self.accept.waker_tx();
ServerWorker::start(idx, services, waker_tx, self.worker_config)
}
fn handle_cmd(&mut self, item: ServerCommand) {
@ -427,8 +422,7 @@ impl ServerBuilder {
break;
}
let (handle_accept, handle_server) =
self.start_worker(new_idx, self.accept.waker_owned());
let (handle_accept, handle_server) = self.start_worker(new_idx);
self.handles.push((new_idx, handle_server));
self.accept.wake(WakerInterest::Worker(handle_accept));
}

View File

@ -12,7 +12,7 @@ mod service;
mod signals;
mod socket;
mod test_server;
mod waker_queue;
mod waker;
mod worker;
pub use self::builder::ServerBuilder;

90
actix-server/src/waker.rs Normal file
View File

@ -0,0 +1,90 @@
use std::{
ops::{Deref, DerefMut},
sync::Arc,
task::{Wake, Waker},
};
use mio::{Registry, Token};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::worker::WorkerHandleAccept;
/// Waker token for `mio::Poll` instance.
pub(crate) const WAKER_TOKEN: Token = Token(usize::MAX);
/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
///
/// These interests should not be confused with `mio::Interest` and mostly not I/O related
pub(crate) enum WakerInterest {
/// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
/// available and can accept new tasks.
WorkerAvailable(usize),
/// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to
/// `ServerCommand` and notify `Accept` to do exactly these tasks.
Pause,
Resume,
Stop,
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandleAccept`.
Worker(WorkerHandleAccept),
}
/// Wrapper type for mio::Waker in order to impl std::task::Wake trait.
struct _Waker(mio::Waker);
impl Wake for _Waker {
fn wake(self: Arc<Self>) {
Wake::wake_by_ref(&self)
}
fn wake_by_ref(self: &Arc<Self>) {
self.0
.wake()
.unwrap_or_else(|e| panic!("Can not wake up Accept Poll: {}", e));
}
}
/// Wrapper type for tokio unbounded channel sender.
pub(crate) struct WakerTx(UnboundedSender<WakerInterest>);
impl Clone for WakerTx {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl WakerTx {
/// Send WakerInterest through channel and panic on error.
pub(crate) fn wake(&self, interest: WakerInterest) {
self.0
.send(interest)
.unwrap_or_else(|e| panic!("Can not send WakerInterest: {}", e));
}
}
/// Wrapper type for tokio unbounded channel receiver.
pub(crate) struct WakerRx(UnboundedReceiver<WakerInterest>);
impl Deref for WakerRx {
type Target = UnboundedReceiver<WakerInterest>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for WakerRx {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
pub(crate) fn from_registry(registry: &Registry) -> std::io::Result<Waker> {
mio::Waker::new(registry, WAKER_TOKEN).map(|waker| Arc::new(_Waker(waker)).into())
}
pub(crate) fn waker_channel() -> (WakerTx, WakerRx) {
let (tx, rx) = unbounded_channel();
(WakerTx(tx), WakerRx(rx))
}

View File

@ -1,89 +0,0 @@
use std::{
collections::VecDeque,
ops::Deref,
sync::{Arc, Mutex, MutexGuard},
};
use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerHandleAccept;
/// Waker token for `mio::Poll` instance.
pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
/// the `Poll` would want to look into.
pub(crate) struct WakerQueue(Arc<(Waker, Mutex<VecDeque<WakerInterest>>)>);
impl Clone for WakerQueue {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl Deref for WakerQueue {
type Target = (Waker, Mutex<VecDeque<WakerInterest>>);
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl WakerQueue {
/// Construct a waker queue with given `Poll`'s `Registry` and capacity.
///
/// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match
/// event's token for it to properly handle `WakerInterest`.
pub(crate) fn new(registry: &Registry) -> std::io::Result<Self> {
let waker = Waker::new(registry, WAKER_TOKEN)?;
let queue = Mutex::new(VecDeque::with_capacity(16));
Ok(Self(Arc::new((waker, queue))))
}
/// Push a new interest to the queue and wake up the accept poll afterwards.
pub(crate) fn wake(&self, interest: WakerInterest) {
let (waker, queue) = self.deref();
queue
.lock()
.expect("Failed to lock WakerQueue")
.push_back(interest);
waker
.wake()
.unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e));
}
/// Get a MutexGuard of the waker queue.
pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest>> {
self.deref().1.lock().expect("Failed to lock WakerQueue")
}
/// Reset the waker queue so it does not grow infinitely.
pub(crate) fn reset(queue: &mut VecDeque<WakerInterest>) {
std::mem::swap(&mut VecDeque::<WakerInterest>::with_capacity(16), queue);
}
}
/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
///
/// These interests should not be confused with `mio::Interest` and mostly not I/O related
pub(crate) enum WakerInterest {
/// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
/// available and can accept new tasks.
WorkerAvailable(usize),
/// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to
/// `ServerCommand` and notify `Accept` to do exactly these tasks.
Pause,
Resume,
Stop,
/// `Timer` is an interest sent as a delayed future. When an error happens on accepting
/// connection `Accept` would deregister socket listener temporary and wake up the poll and
/// register them again after the delayed future resolve.
Timer,
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandleAccept`.
Worker(WorkerHandleAccept),
}

View File

@ -26,7 +26,7 @@ use tokio::sync::{
use crate::join_all;
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::waker::{WakerInterest, WakerTx};
/// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute.
@ -91,7 +91,7 @@ impl Counter {
pub(crate) struct WorkerCounter {
idx: usize,
inner: Rc<(WakerQueue, Counter)>,
inner: Rc<(WakerTx, Counter)>,
}
impl Clone for WorkerCounter {
@ -104,10 +104,10 @@ impl Clone for WorkerCounter {
}
impl WorkerCounter {
pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self {
pub(crate) fn new(idx: usize, waker_tx: WakerTx, counter: Counter) -> Self {
Self {
idx,
inner: Rc::new((waker_queue, counter)),
inner: Rc::new((waker_tx, counter)),
}
}
@ -125,9 +125,9 @@ pub(crate) struct WorkerCounterGuard(WorkerCounter);
impl Drop for WorkerCounterGuard {
fn drop(&mut self) {
let (waker_queue, counter) = &*self.0.inner;
let (waker_tx, counter) = &*self.0.inner;
if counter.derc() {
waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx));
waker_tx.wake(WakerInterest::WorkerAvailable(self.0.idx));
}
}
}
@ -251,7 +251,7 @@ impl ServerWorker {
pub(crate) fn start(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
waker_queue: WakerQueue,
waker_tx: WakerTx,
config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let (tx1, rx) = unbounded_channel();
@ -315,7 +315,7 @@ impl ServerWorker {
rx,
rx2,
services,
counter: WorkerCounter::new(idx, waker_queue, counter_clone),
counter: WorkerCounter::new(idx, waker_tx, counter_clone),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,