Merge branch 'master' into refactor/ServerWorker_size

This commit is contained in:
fakeshadow 2021-04-12 16:16:21 -07:00 committed by GitHub
commit ea167d4378
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 126 additions and 105 deletions

View File

@ -12,7 +12,7 @@ use slab::Slab;
use crate::server::Server; use crate::server::Server;
use crate::socket::{MioListener, SocketAddr}; use crate::socket::{MioListener, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandle}; use crate::worker::{Conn, WorkerHandleAccept};
use crate::Token; use crate::Token;
struct ServerSocketInfo { struct ServerSocketInfo {
@ -66,7 +66,7 @@ impl AcceptLoop {
pub(crate) fn start( pub(crate) fn start(
&mut self, &mut self,
socks: Vec<(Token, MioListener)>, socks: Vec<(Token, MioListener)>,
handles: Vec<WorkerHandle>, handles: Vec<WorkerHandleAccept>,
) { ) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap(); let poll = self.poll.take().unwrap();
@ -80,7 +80,7 @@ impl AcceptLoop {
struct Accept { struct Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
handles: Vec<WorkerHandle>, handles: Vec<WorkerHandleAccept>,
srv: Server, srv: Server,
next: usize, next: usize,
backpressure: bool, backpressure: bool,
@ -105,7 +105,7 @@ impl Accept {
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(Token, MioListener)>, socks: Vec<(Token, MioListener)>,
srv: Server, srv: Server,
handles: Vec<WorkerHandle>, handles: Vec<WorkerHandleAccept>,
) { ) {
// Accept runs in its own thread and would want to spawn additional futures to current // Accept runs in its own thread and would want to spawn additional futures to current
// actix system. // actix system.
@ -125,7 +125,7 @@ impl Accept {
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(Token, MioListener)>, socks: Vec<(Token, MioListener)>,
handles: Vec<WorkerHandle>, handles: Vec<WorkerHandleAccept>,
srv: Server, srv: Server,
) -> (Accept, Slab<ServerSocketInfo>) { ) -> (Accept, Slab<ServerSocketInfo>) {
let mut sockets = Slab::new(); let mut sockets = Slab::new();
@ -407,7 +407,7 @@ impl Accept {
} }
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) { fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
while !self.backpressure { loop {
let info = sockets let info = sockets
.get_mut(token) .get_mut(token)
.expect("ServerSocketInfo is removed from Slab"); .expect("ServerSocketInfo is removed from Slab");

View File

@ -19,7 +19,10 @@ use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle}; use crate::worker::{
ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept,
WorkerHandleServer,
};
use crate::{join_all, Token}; use crate::{join_all, Token};
/// Server builder /// Server builder
@ -27,7 +30,7 @@ pub struct ServerBuilder {
threads: usize, threads: usize,
token: Token, token: Token,
backlog: u32, backlog: u32,
handles: Vec<(usize, WorkerHandle)>, handles: Vec<(usize, WorkerHandleServer)>,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, String, MioListener)>, sockets: Vec<(Token, String, MioListener)>,
accept: AcceptLoop, accept: AcceptLoop,
@ -280,10 +283,11 @@ impl ServerBuilder {
// start workers // start workers
let handles = (0..self.threads) let handles = (0..self.threads)
.map(|idx| { .map(|idx| {
let handle = self.start_worker(idx, self.accept.waker_owned()); let (handle_accept, handle_server) =
self.handles.push((idx, handle.clone())); self.start_worker(idx, self.accept.waker_owned());
self.handles.push((idx, handle_server));
handle handle_accept
}) })
.collect(); .collect();
@ -311,7 +315,11 @@ impl ServerBuilder {
} }
} }
fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerHandle { fn start_worker(
&self,
idx: usize,
waker: WakerQueue,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let avail = WorkerAvailability::new(waker); let avail = WorkerAvailability::new(waker);
let services = self.services.iter().map(|v| v.clone_factory()).collect(); let services = self.services.iter().map(|v| v.clone_factory()).collect();
@ -437,9 +445,10 @@ impl ServerBuilder {
break; break;
} }
let handle = self.start_worker(new_idx, self.accept.waker_owned()); let (handle_accept, handle_server) =
self.handles.push((new_idx, handle.clone())); self.start_worker(new_idx, self.accept.waker_owned());
self.accept.wake(WakerInterest::Worker(handle)); self.handles.push((new_idx, handle_server));
self.accept.wake(WakerInterest::Worker(handle_accept));
} }
} }
} }

View File

@ -12,18 +12,7 @@ pub(crate) use {
use std::{fmt, io}; use std::{fmt, io};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use mio::event::Source; use mio::{event::Source, Interest, Registry, Token};
use mio::net::TcpStream as MioTcpStream;
use mio::{Interest, Registry, Token};
#[cfg(windows)]
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
#[cfg(unix)]
use {
actix_rt::net::UnixStream,
mio::net::{SocketAddr as MioSocketAddr, UnixStream as MioUnixStream},
std::os::unix::io::{FromRawFd, IntoRawFd},
};
pub(crate) enum MioListener { pub(crate) enum MioListener {
Tcp(MioTcpListener), Tcp(MioTcpListener),
@ -131,7 +120,7 @@ impl fmt::Display for MioListener {
pub(crate) enum SocketAddr { pub(crate) enum SocketAddr {
Tcp(StdSocketAddr), Tcp(StdSocketAddr),
#[cfg(unix)] #[cfg(unix)]
Uds(MioSocketAddr), Uds(mio::net::SocketAddr),
} }
impl fmt::Display for SocketAddr { impl fmt::Display for SocketAddr {
@ -156,9 +145,9 @@ impl fmt::Debug for SocketAddr {
#[derive(Debug)] #[derive(Debug)]
pub enum MioStream { pub enum MioStream {
Tcp(MioTcpStream), Tcp(mio::net::TcpStream),
#[cfg(unix)] #[cfg(unix)]
Uds(MioUnixStream), Uds(mio::net::UnixStream),
} }
/// helper trait for converting mio stream to tokio stream. /// helper trait for converting mio stream to tokio stream.
@ -166,47 +155,60 @@ pub trait FromStream: Sized {
fn from_mio(sock: MioStream) -> io::Result<Self>; fn from_mio(sock: MioStream) -> io::Result<Self>;
} }
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(unix)]
impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(mio) => {
let raw = IntoRawFd::into_raw_fd(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
MioStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
}
}
}
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(windows)] #[cfg(windows)]
impl FromStream for TcpStream { mod win_impl {
fn from_mio(sock: MioStream) -> io::Result<Self> { use super::*;
match sock {
MioStream::Tcp(mio) => { use std::os::windows::io::{FromRawSocket, IntoRawSocket};
let raw = IntoRawSocket::into_raw_socket(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream. // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(mio) => {
let raw = IntoRawSocket::into_raw_socket(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) })
}
} }
} }
} }
} }
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(unix)] #[cfg(unix)]
impl FromStream for UnixStream { mod unix_impl {
fn from_mio(sock: MioStream) -> io::Result<Self> { use super::*;
match sock {
MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), use std::os::unix::io::{FromRawFd, IntoRawFd};
MioStream::Uds(mio) => {
let raw = IntoRawFd::into_raw_fd(mio); use actix_rt::net::UnixStream;
// SAFETY: This is a in place conversion from mio stream to tokio stream.
UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(mio) => {
let raw = IntoRawFd::into_raw_fd(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
MioStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
}
}
}
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
impl FromStream for UnixStream {
fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
MioStream::Uds(mio) => {
let raw = IntoRawFd::into_raw_fd(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
} }
} }
} }

View File

@ -6,7 +6,7 @@ use std::{
use mio::{Registry, Token as MioToken, Waker}; use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerHandle; use crate::worker::WorkerHandleAccept;
/// Waker token for `mio::Poll` instance. /// Waker token for `mio::Poll` instance.
pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
@ -84,6 +84,6 @@ pub(crate) enum WakerInterest {
Timer, Timer,
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined /// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new /// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandle`. /// `WorkerHandleAccept`.
Worker(WorkerHandle), Worker(WorkerHandleAccept),
} }

View File

@ -28,11 +28,9 @@ use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::{join_all, Token}; use crate::{join_all, Token};
pub(crate) struct WorkerCommand(Conn); /// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute.
/// Stop worker message. Returns `true` on successful shutdown pub(crate) struct Stop {
/// and `false` if some connections still alive.
pub(crate) struct StopCommand {
graceful: bool, graceful: bool,
tx: oneshot::Sender<bool>, tx: oneshot::Sender<bool>,
} }
@ -43,42 +41,55 @@ pub(crate) struct Conn {
pub token: Token, pub token: Token,
} }
// a handle to worker that can send message to worker and share the availability of worker to other fn handle_pair(
// thread. idx: usize,
#[derive(Clone)] tx1: UnboundedSender<Conn>,
pub(crate) struct WorkerHandle { tx2: UnboundedSender<Stop>,
avail: WorkerAvailability,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept {
idx,
tx: tx1,
avail,
};
let server = WorkerHandleServer { idx, tx: tx2 };
(accept, server)
}
/// Handle to worker that can send connection message to worker and share the
/// availability of worker to other thread.
///
/// Held by [Accept](crate::accept::Accept).
pub(crate) struct WorkerHandleAccept {
pub idx: usize, pub idx: usize,
tx1: UnboundedSender<WorkerCommand>, tx: UnboundedSender<Conn>,
tx2: UnboundedSender<StopCommand>,
avail: WorkerAvailability, avail: WorkerAvailability,
} }
impl WorkerHandle { impl WorkerHandleAccept {
pub fn new( pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
idx: usize, self.tx.send(msg).map_err(|msg| msg.0)
tx1: UnboundedSender<WorkerCommand>,
tx2: UnboundedSender<StopCommand>,
avail: WorkerAvailability,
) -> Self {
WorkerHandle {
idx,
tx1,
tx2,
avail,
}
} }
pub fn send(&self, msg: Conn) -> Result<(), Conn> { pub(crate) fn available(&self) -> bool {
self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0)
}
pub fn available(&self) -> bool {
self.avail.available() self.avail.available()
} }
}
pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> { /// Handle to worker than can send stop message to worker.
///
/// Held by [ServerBuilder](crate::builder::ServerBuilder).
pub(crate) struct WorkerHandleServer {
pub idx: usize,
tx: UnboundedSender<Stop>,
}
impl WorkerHandleServer {
pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.tx2.send(StopCommand { graceful, tx }); let _ = self.tx.send(Stop { graceful, tx });
rx rx
} }
} }
@ -114,8 +125,8 @@ impl WorkerAvailability {
/// ///
/// Worker accepts Socket objects via unbounded channel and starts stream processing. /// Worker accepts Socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker { pub(crate) struct ServerWorker {
rx: UnboundedReceiver<WorkerCommand>, rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<StopCommand>, rx2: UnboundedReceiver<Stop>,
services: Box<[WorkerService]>, services: Box<[WorkerService]>,
availability: WorkerAvailability, availability: WorkerAvailability,
conns: Counter, conns: Counter,
@ -187,7 +198,7 @@ impl ServerWorker {
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability, availability: WorkerAvailability,
config: ServerWorkerConfig, config: ServerWorkerConfig,
) -> WorkerHandle { ) -> (WorkerHandleAccept, WorkerHandleServer) {
assert!(!availability.available()); assert!(!availability.available());
let (tx1, rx) = unbounded_channel(); let (tx1, rx) = unbounded_channel();
@ -258,7 +269,7 @@ impl ServerWorker {
}); });
}); });
WorkerHandle::new(idx, tx1, tx2, avail) handle_pair(idx, tx1, tx2, avail)
} }
fn restart_service(&mut self, token: Token, factory_id: usize) { fn restart_service(&mut self, token: Token, factory_id: usize) {
@ -364,8 +375,7 @@ impl Future for ServerWorker {
let this = self.as_mut().get_mut(); let this = self.as_mut().get_mut();
// `StopWorker` message handler // `StopWorker` message handler
if let Poll::Ready(Some(StopCommand { graceful, tx })) = if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
Pin::new(&mut this.rx2).poll_recv(cx)
{ {
this.availability.set(false); this.availability.set(false);
let num = this.conns.total(); let num = this.conns.total();
@ -476,7 +486,7 @@ impl Future for ServerWorker {
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
// handle incoming io stream // handle incoming io stream
Some(WorkerCommand(msg)) => { Some(msg) => {
let guard = this.conns.get(); let guard = this.conns.get();
let _ = this.services[msg.token.0].service.call((guard, msg.io)); let _ = this.services[msg.token.0].service.call((guard, msg.io));
} }