add spsc channel

This commit is contained in:
fakeshadow 2021-04-10 14:48:30 +08:00
parent 0a11cf5cba
commit 311354bfab
7 changed files with 87 additions and 43 deletions

View File

@ -17,18 +17,17 @@ edition = "2018"
name = "actix_server" name = "actix_server"
path = "src/lib.rs" path = "src/lib.rs"
[features]
default = []
[dependencies] [dependencies]
actix-rt = { version = "2.0.0", default-features = false } actix-rt = { version = "2.0.0", default-features = false }
actix-service = "2.0.0-beta.5" actix-service = "2.0.0-beta.5"
actix-utils = "3.0.0-beta.4" actix-utils = "3.0.0-beta.4"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.7", default-features = false }
log = "0.4" log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] } mio = { version = "0.7.11", features = ["os-poll", "net", "os-util"] }
num_cpus = "1.13" num_cpus = "1.13"
rtrb = ""
slab = "0.4" slab = "0.4"
tokio = { version = "1.2", features = ["sync"] } tokio = { version = "1.2", features = ["sync"] }

View File

@ -408,9 +408,7 @@ impl Accept {
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) { fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
loop { loop {
let info = sockets let info = &mut sockets[token];
.get_mut(token)
.expect("ServerSocketInfo is removed from Slab");
match info.lst.accept() { match info.lst.accept() {
Ok(io) => { Ok(io) => {

View File

@ -323,7 +323,7 @@ impl ServerBuilder {
let avail = WorkerAvailability::new(waker); let avail = WorkerAvailability::new(waker);
let services = self.services.iter().map(|v| v.clone_factory()).collect(); let services = self.services.iter().map(|v| v.clone_factory()).collect();
ServerWorker::start(idx, services, avail, self.worker_config) ServerWorker::start(idx, self.backlog, services, avail, self.worker_config)
} }
fn handle_cmd(&mut self, item: ServerCommand) { fn handle_cmd(&mut self, item: ServerCommand) {
@ -384,7 +384,7 @@ impl ServerBuilder {
if !self.handles.is_empty() && graceful { if !self.handles.is_empty() && graceful {
let iter = self let iter = self
.handles .handles
.iter() .iter_mut()
.map(move |worker| worker.1.stop(graceful)) .map(move |worker| worker.1.stop(graceful))
.collect(); .collect();

View File

@ -11,6 +11,7 @@ mod server;
mod service; mod service;
mod signals; mod signals;
mod socket; mod socket;
mod spsc;
mod test_server; mod test_server;
mod waker_queue; mod waker_queue;
mod worker; mod worker;

View File

@ -13,7 +13,6 @@ use std::{fmt, io};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use mio::event::Source; use mio::event::Source;
use mio::net::TcpStream as MioTcpStream;
use mio::{Interest, Registry, Token}; use mio::{Interest, Registry, Token};
#[cfg(windows)] #[cfg(windows)]
@ -21,7 +20,6 @@ use std::os::windows::io::{FromRawSocket, IntoRawSocket};
#[cfg(unix)] #[cfg(unix)]
use { use {
actix_rt::net::UnixStream, actix_rt::net::UnixStream,
mio::net::{SocketAddr as MioSocketAddr, UnixStream as MioUnixStream},
std::os::unix::io::{FromRawFd, IntoRawFd}, std::os::unix::io::{FromRawFd, IntoRawFd},
}; };
@ -131,7 +129,7 @@ impl fmt::Display for MioListener {
pub(crate) enum SocketAddr { pub(crate) enum SocketAddr {
Tcp(StdSocketAddr), Tcp(StdSocketAddr),
#[cfg(unix)] #[cfg(unix)]
Uds(MioSocketAddr), Uds(mio::net::SocketAddr),
} }
impl fmt::Display for SocketAddr { impl fmt::Display for SocketAddr {
@ -156,9 +154,9 @@ impl fmt::Debug for SocketAddr {
#[derive(Debug)] #[derive(Debug)]
pub enum MioStream { pub enum MioStream {
Tcp(MioTcpStream), Tcp(mio::net::TcpStream),
#[cfg(unix)] #[cfg(unix)]
Uds(MioUnixStream), Uds(mio::net::UnixStream),
} }
/// helper trait for converting mio stream to tokio stream. /// helper trait for converting mio stream to tokio stream.

54
actix-server/src/spsc.rs Normal file
View File

@ -0,0 +1,54 @@
use std::{
sync::Arc,
task::{Context, Poll},
};
use futures_util::task::AtomicWaker;
use rtrb::{Consumer, Producer, PushError, RingBuffer};
pub fn channel<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
let (tx, rx) = RingBuffer::new(cap).split();
let waker = Arc::new(AtomicWaker::new());
let sender = Sender {
tx,
waker: waker.clone(),
};
let receiver = Receiver { rx, waker };
(sender, receiver)
}
pub struct Sender<T> {
tx: Producer<T>,
waker: Arc<AtomicWaker>,
}
impl<T> Sender<T> {
pub fn send(&mut self, item: T) -> Result<(), T> {
match self.tx.push(item) {
Ok(_) => {
self.waker.wake();
Ok(())
}
Err(PushError::Full(item)) => Err(item),
}
}
}
pub struct Receiver<T> {
rx: Consumer<T>,
waker: Arc<AtomicWaker>,
}
impl<T> Receiver<T> {
pub fn poll_recv_unpin(&mut self, cx: &mut Context<'_>) -> Poll<T> {
match self.rx.pop() {
Ok(item) => Poll::Ready(item),
Err(_) => {
self.waker.register(cx.waker());
Poll::Pending
}
}
}
}

View File

@ -18,19 +18,17 @@ use actix_rt::{
use actix_utils::counter::Counter; use actix_utils::counter::Counter;
use futures_core::{future::LocalBoxFuture, ready}; use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace}; use log::{error, info, trace};
use tokio::sync::{ use tokio::sync::oneshot;
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream; use crate::socket::MioStream;
use crate::spsc::{channel, Receiver, Sender};
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::{join_all, Token}; use crate::{join_all, Token};
/// Stop worker message. Returns `true` on successful graceful shutdown. /// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute. /// and `false` if some connections still alive when shutdown execute.
pub(crate) struct Stop { struct Stop {
graceful: bool, graceful: bool,
tx: oneshot::Sender<bool>, tx: oneshot::Sender<bool>,
} }
@ -43,8 +41,8 @@ pub(crate) struct Conn {
fn handle_pair( fn handle_pair(
idx: usize, idx: usize,
tx1: UnboundedSender<Conn>, tx1: Sender<Conn>,
tx2: UnboundedSender<Stop>, tx2: Sender<Stop>,
avail: WorkerAvailability, avail: WorkerAvailability,
) -> (WorkerHandleAccept, WorkerHandleServer) { ) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept { let accept = WorkerHandleAccept {
@ -64,13 +62,13 @@ fn handle_pair(
/// Held by [Accept](crate::accept::Accept). /// Held by [Accept](crate::accept::Accept).
pub(crate) struct WorkerHandleAccept { pub(crate) struct WorkerHandleAccept {
pub idx: usize, pub idx: usize,
tx: UnboundedSender<Conn>, tx: Sender<Conn>,
avail: WorkerAvailability, avail: WorkerAvailability,
} }
impl WorkerHandleAccept { impl WorkerHandleAccept {
pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { pub(crate) fn send(&mut self, msg: Conn) -> Result<(), Conn> {
self.tx.send(msg).map_err(|msg| msg.0) self.tx.send(msg)
} }
pub(crate) fn available(&self) -> bool { pub(crate) fn available(&self) -> bool {
@ -83,11 +81,11 @@ impl WorkerHandleAccept {
/// Held by [ServerBuilder](crate::builder::ServerBuilder). /// Held by [ServerBuilder](crate::builder::ServerBuilder).
pub(crate) struct WorkerHandleServer { pub(crate) struct WorkerHandleServer {
pub idx: usize, pub idx: usize,
tx: UnboundedSender<Stop>, tx: Sender<Stop>,
} }
impl WorkerHandleServer { impl WorkerHandleServer {
pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> { pub(crate) fn stop(&mut self, graceful: bool) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.tx.send(Stop { graceful, tx }); let _ = self.tx.send(Stop { graceful, tx });
rx rx
@ -125,8 +123,8 @@ impl WorkerAvailability {
/// ///
/// Worker accepts Socket objects via unbounded channel and starts stream processing. /// Worker accepts Socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker { pub(crate) struct ServerWorker {
rx: UnboundedReceiver<Conn>, rx: Receiver<Conn>,
rx2: UnboundedReceiver<Stop>, rx2: Receiver<Stop>,
services: Vec<WorkerService>, services: Vec<WorkerService>,
availability: WorkerAvailability, availability: WorkerAvailability,
conns: Counter, conns: Counter,
@ -195,12 +193,13 @@ impl ServerWorkerConfig {
impl ServerWorker { impl ServerWorker {
pub(crate) fn start( pub(crate) fn start(
idx: usize, idx: usize,
backlog: u32,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability, availability: WorkerAvailability,
config: ServerWorkerConfig, config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) { ) -> (WorkerHandleAccept, WorkerHandleServer) {
let (tx1, rx) = unbounded_channel(); let (tx1, rx) = channel(backlog as _);
let (tx2, rx2) = unbounded_channel(); let (tx2, rx2) = channel(1);
let avail = availability.clone(); let avail = availability.clone();
// every worker runs in it's own arbiter. // every worker runs in it's own arbiter.
@ -351,7 +350,7 @@ struct Restart {
// Shutdown keep states necessary for server shutdown: // Shutdown keep states necessary for server shutdown:
// Sleep for interval check the shutdown progress. // Sleep for interval check the shutdown progress.
// Instant for the start time of shutdown. // Instant for the start time of shutdown.
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller. // Sender for send back the shutdown outcome(force/grace) to Stop caller.
struct Shutdown { struct Shutdown {
timer: Pin<Box<Sleep>>, timer: Pin<Box<Sleep>>,
start_from: Instant, start_from: Instant,
@ -371,8 +370,7 @@ impl Future for ServerWorker {
let this = self.as_mut().get_mut(); let this = self.as_mut().get_mut();
// `StopWorker` message handler // `StopWorker` message handler
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) if let Poll::Ready(Stop { graceful, tx }) = this.rx2.poll_recv_unpin(cx) {
{
this.availability.set(false); this.availability.set(false);
let num = this.conns.total(); let num = this.conns.total();
if num == 0 { if num == 0 {
@ -466,7 +464,12 @@ impl Future for ServerWorker {
// actively poll stream and handle worker command // actively poll stream and handle worker command
WorkerState::Available => loop { WorkerState::Available => loop {
match this.check_readiness(cx) { match this.check_readiness(cx) {
Ok(true) => {} Ok(true) => {
let msg = ready!(this.rx.poll_recv_unpin(cx));
// handle incoming io stream
let guard = this.conns.get();
let _ = this.services[msg.token.0].service.call((guard, msg.io));
}
Ok(false) => { Ok(false) => {
trace!("Worker is unavailable"); trace!("Worker is unavailable");
this.availability.set(false); this.availability.set(false);
@ -479,15 +482,6 @@ impl Future for ServerWorker {
return self.poll(cx); return self.poll(cx);
} }
} }
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
// handle incoming io stream
Some(msg) => {
let guard = this.conns.get();
let _ = this.services[msg.token.0].service.call((guard, msg.io));
}
None => return Poll::Ready(()),
};
}, },
} }
} }