From 311354bfabd719448e0efb215cbf1f3e022d7967 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 10 Apr 2021 14:48:30 +0800 Subject: [PATCH] add spsc channel --- actix-server/Cargo.toml | 7 +++-- actix-server/src/accept.rs | 4 +-- actix-server/src/builder.rs | 4 +-- actix-server/src/lib.rs | 1 + actix-server/src/socket.rs | 8 +++--- actix-server/src/spsc.rs | 54 +++++++++++++++++++++++++++++++++++++ actix-server/src/worker.rs | 52 ++++++++++++++++------------------- 7 files changed, 87 insertions(+), 43 deletions(-) create mode 100644 actix-server/src/spsc.rs diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 80b44c6d..8b6f92e2 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -17,18 +17,17 @@ edition = "2018" name = "actix_server" path = "src/lib.rs" -[features] -default = [] - [dependencies] actix-rt = { version = "2.0.0", default-features = false } actix-service = "2.0.0-beta.5" actix-utils = "3.0.0-beta.4" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } +futures-util = { version = "0.3.7", default-features = false } log = "0.4" -mio = { version = "0.7.6", features = ["os-poll", "net"] } +mio = { version = "0.7.11", features = ["os-poll", "net", "os-util"] } num_cpus = "1.13" +rtrb = "" slab = "0.4" tokio = { version = "1.2", features = ["sync"] } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 5b9f99c7..aa788d59 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -408,9 +408,7 @@ impl Accept { fn accept(&mut self, sockets: &mut Slab, token: usize) { loop { - let info = sockets - .get_mut(token) - .expect("ServerSocketInfo is removed from Slab"); + let info = &mut sockets[token]; match info.lst.accept() { Ok(io) => { diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 6019ff16..67c3df74 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -323,7 +323,7 @@ impl ServerBuilder { let avail = WorkerAvailability::new(waker); let services = self.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, services, avail, self.worker_config) + ServerWorker::start(idx, self.backlog, services, avail, self.worker_config) } fn handle_cmd(&mut self, item: ServerCommand) { @@ -384,7 +384,7 @@ impl ServerBuilder { if !self.handles.is_empty() && graceful { let iter = self .handles - .iter() + .iter_mut() .map(move |worker| worker.1.stop(graceful)) .collect(); diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index af9ab0b0..580a8cd9 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -11,6 +11,7 @@ mod server; mod service; mod signals; mod socket; +mod spsc; mod test_server; mod waker_queue; mod worker; diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 0625cfda..c8a353b6 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -13,7 +13,6 @@ use std::{fmt, io}; use actix_rt::net::TcpStream; use mio::event::Source; -use mio::net::TcpStream as MioTcpStream; use mio::{Interest, Registry, Token}; #[cfg(windows)] @@ -21,7 +20,6 @@ use std::os::windows::io::{FromRawSocket, IntoRawSocket}; #[cfg(unix)] use { actix_rt::net::UnixStream, - mio::net::{SocketAddr as MioSocketAddr, UnixStream as MioUnixStream}, std::os::unix::io::{FromRawFd, IntoRawFd}, }; @@ -131,7 +129,7 @@ impl fmt::Display for MioListener { pub(crate) enum SocketAddr { Tcp(StdSocketAddr), #[cfg(unix)] - Uds(MioSocketAddr), + Uds(mio::net::SocketAddr), } impl fmt::Display for SocketAddr { @@ -156,9 +154,9 @@ impl fmt::Debug for SocketAddr { #[derive(Debug)] pub enum MioStream { - Tcp(MioTcpStream), + Tcp(mio::net::TcpStream), #[cfg(unix)] - Uds(MioUnixStream), + Uds(mio::net::UnixStream), } /// helper trait for converting mio stream to tokio stream. diff --git a/actix-server/src/spsc.rs b/actix-server/src/spsc.rs new file mode 100644 index 00000000..fe6656ac --- /dev/null +++ b/actix-server/src/spsc.rs @@ -0,0 +1,54 @@ +use std::{ + sync::Arc, + task::{Context, Poll}, +}; + +use futures_util::task::AtomicWaker; +use rtrb::{Consumer, Producer, PushError, RingBuffer}; + +pub fn channel(cap: usize) -> (Sender, Receiver) { + let (tx, rx) = RingBuffer::new(cap).split(); + let waker = Arc::new(AtomicWaker::new()); + let sender = Sender { + tx, + waker: waker.clone(), + }; + + let receiver = Receiver { rx, waker }; + + (sender, receiver) +} + +pub struct Sender { + tx: Producer, + waker: Arc, +} + +impl Sender { + pub fn send(&mut self, item: T) -> Result<(), T> { + match self.tx.push(item) { + Ok(_) => { + self.waker.wake(); + Ok(()) + } + Err(PushError::Full(item)) => Err(item), + } + } +} + +pub struct Receiver { + rx: Consumer, + waker: Arc, +} + +impl Receiver { + pub fn poll_recv_unpin(&mut self, cx: &mut Context<'_>) -> Poll { + match self.rx.pop() { + Ok(item) => Poll::Ready(item), + Err(_) => { + self.waker.register(cx.waker()); + Poll::Pending + } + } + } +} diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 8e122623..816aff33 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -18,19 +18,17 @@ use actix_rt::{ use actix_utils::counter::Counter; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot, -}; +use tokio::sync::oneshot; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; +use crate::spsc::{channel, Receiver, Sender}; use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::{join_all, Token}; /// Stop worker message. Returns `true` on successful graceful shutdown. /// and `false` if some connections still alive when shutdown execute. -pub(crate) struct Stop { +struct Stop { graceful: bool, tx: oneshot::Sender, } @@ -43,8 +41,8 @@ pub(crate) struct Conn { fn handle_pair( idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, + tx1: Sender, + tx2: Sender, avail: WorkerAvailability, ) -> (WorkerHandleAccept, WorkerHandleServer) { let accept = WorkerHandleAccept { @@ -64,13 +62,13 @@ fn handle_pair( /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { pub idx: usize, - tx: UnboundedSender, + tx: Sender, avail: WorkerAvailability, } impl WorkerHandleAccept { - pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { - self.tx.send(msg).map_err(|msg| msg.0) + pub(crate) fn send(&mut self, msg: Conn) -> Result<(), Conn> { + self.tx.send(msg) } pub(crate) fn available(&self) -> bool { @@ -83,11 +81,11 @@ impl WorkerHandleAccept { /// Held by [ServerBuilder](crate::builder::ServerBuilder). pub(crate) struct WorkerHandleServer { pub idx: usize, - tx: UnboundedSender, + tx: Sender, } impl WorkerHandleServer { - pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver { + pub(crate) fn stop(&mut self, graceful: bool) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); let _ = self.tx.send(Stop { graceful, tx }); rx @@ -125,8 +123,8 @@ impl WorkerAvailability { /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. pub(crate) struct ServerWorker { - rx: UnboundedReceiver, - rx2: UnboundedReceiver, + rx: Receiver, + rx2: Receiver, services: Vec, availability: WorkerAvailability, conns: Counter, @@ -195,12 +193,13 @@ impl ServerWorkerConfig { impl ServerWorker { pub(crate) fn start( idx: usize, + backlog: u32, factories: Vec>, availability: WorkerAvailability, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let (tx1, rx) = unbounded_channel(); - let (tx2, rx2) = unbounded_channel(); + let (tx1, rx) = channel(backlog as _); + let (tx2, rx2) = channel(1); let avail = availability.clone(); // every worker runs in it's own arbiter. @@ -351,7 +350,7 @@ struct Restart { // Shutdown keep states necessary for server shutdown: // Sleep for interval check the shutdown progress. // Instant for the start time of shutdown. -// Sender for send back the shutdown outcome(force/grace) to StopCommand caller. +// Sender for send back the shutdown outcome(force/grace) to Stop caller. struct Shutdown { timer: Pin>, start_from: Instant, @@ -371,8 +370,7 @@ impl Future for ServerWorker { let this = self.as_mut().get_mut(); // `StopWorker` message handler - if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) - { + if let Poll::Ready(Stop { graceful, tx }) = this.rx2.poll_recv_unpin(cx) { this.availability.set(false); let num = this.conns.total(); if num == 0 { @@ -466,7 +464,12 @@ impl Future for ServerWorker { // actively poll stream and handle worker command WorkerState::Available => loop { match this.check_readiness(cx) { - Ok(true) => {} + Ok(true) => { + let msg = ready!(this.rx.poll_recv_unpin(cx)); + // handle incoming io stream + let guard = this.conns.get(); + let _ = this.services[msg.token.0].service.call((guard, msg.io)); + } Ok(false) => { trace!("Worker is unavailable"); this.availability.set(false); @@ -479,15 +482,6 @@ impl Future for ServerWorker { return self.poll(cx); } } - - match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { - // handle incoming io stream - Some(msg) => { - let guard = this.conns.get(); - let _ = this.services[msg.token.0].service.call((guard, msg.io)); - } - None => return Poll::Ready(()), - }; }, } }