merge master

This commit is contained in:
fakeshadow 2021-04-10 15:14:26 +08:00
commit bf74ffe015
5 changed files with 77 additions and 60 deletions

View File

@ -9,7 +9,9 @@ use crate::builder::ServerBuilder;
use crate::server::ServerHandle;
use crate::socket::{MioListener, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, ServerWorker, WorkerAvailability, WorkerHandle};
use crate::worker::{
Conn, ServerWorker, WorkerAvailability, WorkerHandleAccept, WorkerHandleServer,
};
use crate::Token;
const DUR_ON_ERR: Duration = Duration::from_millis(500);
@ -32,7 +34,7 @@ struct ServerSocketInfo {
pub(crate) struct Accept {
poll: Poll,
waker_queue: WakerQueue,
handles: Vec<WorkerHandle>,
handles: Vec<WorkerHandleAccept>,
srv: ServerHandle,
next: usize,
backpressure: bool,
@ -58,7 +60,7 @@ impl Accept {
pub(crate) fn start(
sockets: Vec<(Token, MioListener)>,
builder: &ServerBuilder,
) -> io::Result<(WakerQueue, Vec<(usize, WorkerHandle)>)> {
) -> io::Result<(WakerQueue, Vec<(usize, WorkerHandleServer)>)> {
let server_handle = ServerHandle::new(builder.cmd_tx.clone());
// construct poll instance and it's waker
@ -66,15 +68,14 @@ impl Accept {
let waker_queue = WakerQueue::new(poll.registry())?;
// construct workers and collect handles.
let (handles, handles_clone) = (0..builder.threads)
let (handles_accept, handles_server) = (0..builder.threads)
.map(|idx| {
// start workers
let availability = WorkerAvailability::new(waker_queue.clone());
let factories = builder.services.iter().map(|v| v.clone_factory()).collect();
let handle =
let (handle_accept, handle_server) =
ServerWorker::start(idx, factories, availability, builder.worker_config)?;
let handle_clone = (idx, handle.clone());
Ok((handle, handle_clone))
Ok((handle_accept, (idx, handle_server)))
})
.collect::<Result<Vec<_>, io::Error>>()?
.into_iter()
@ -82,8 +83,13 @@ impl Accept {
let wake_queue_clone = waker_queue.clone();
let (mut accept, sockets) =
Accept::new_with_sockets(poll, wake_queue_clone, sockets, handles, server_handle)?;
let (mut accept, sockets) = Accept::new_with_sockets(
poll,
wake_queue_clone,
sockets,
handles_accept,
server_handle,
)?;
// Accept runs in its own thread.
thread::Builder::new()
@ -92,14 +98,14 @@ impl Accept {
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
// return waker and worker handle clones to server builder.
Ok((waker_queue, handles_clone))
Ok((waker_queue, handles_server))
}
fn new_with_sockets(
poll: Poll,
waker_queue: WakerQueue,
socks: Vec<(Token, MioListener)>,
handles: Vec<WorkerHandle>,
handles: Vec<WorkerHandleAccept>,
srv: ServerHandle,
) -> io::Result<(Accept, Slab<ServerSocketInfo>)> {
let mut sockets = Slab::new();
@ -393,7 +399,7 @@ impl Accept {
}
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
while !self.backpressure {
loop {
let info = sockets
.get_mut(token)
.expect("ServerSocketInfo is removed from Slab");

View File

@ -8,8 +8,9 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::config::{ConfiguredService, ServiceConfig};
use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::socket::{
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
};
use crate::worker::ServerWorkerConfig;
use crate::Token;

View File

@ -19,14 +19,14 @@ use crate::builder::ServerBuilder;
use crate::service::InternalServiceFactory;
use crate::signals::{Signal, Signals};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleServer};
/// When awaited or spawned would listen to signal and message from [ServerHandle](ServerHandle).
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Server {
cmd_tx: UnboundedSender<ServerCommand>,
cmd_rx: UnboundedReceiver<ServerCommand>,
handles: Vec<(usize, WorkerHandle)>,
handles: Vec<(usize, WorkerHandleServer)>,
services: Vec<Box<dyn InternalServiceFactory>>,
notify: Vec<oneshot::Sender<()>>,
exit: bool,
@ -216,9 +216,9 @@ impl Server {
);
match res {
Ok(handle) => {
self.handles.push((new_idx, handle.clone()));
self.waker_queue.wake(WakerInterest::Worker(handle));
Ok((handle_accept, handle_server)) => {
self.handles.push((new_idx, handle_server));
self.waker_queue.wake(WakerInterest::Worker(handle_accept));
}
Err(e) => error!("Can not start worker: {:?}", e),
}

View File

@ -6,7 +6,7 @@ use std::{
use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerHandle;
use crate::worker::WorkerHandleAccept;
/// Waker token for `mio::Poll` instance.
pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
@ -80,6 +80,6 @@ pub(crate) enum WakerInterest {
Stop,
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandle`.
Worker(WorkerHandle),
/// `WorkerHandleAccept`.
Worker(WorkerHandleAccept),
}

View File

@ -28,11 +28,9 @@ use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::Token;
pub(crate) struct WorkerCommand(Conn);
/// Stop worker message. Returns `true` on successful shutdown
/// and `false` if some connections still alive.
pub(crate) struct StopCommand {
/// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute.
struct Stop {
graceful: bool,
tx: oneshot::Sender<bool>,
}
@ -43,42 +41,55 @@ pub(crate) struct Conn {
pub token: Token,
}
// a handle to worker that can send message to worker and share the availability of worker to other
// thread.
#[derive(Clone)]
pub(crate) struct WorkerHandle {
fn handle_pair(
idx: usize,
tx1: UnboundedSender<Conn>,
tx2: UnboundedSender<Stop>,
avail: WorkerAvailability,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept {
idx,
tx: tx1,
avail,
};
let server = WorkerHandleServer { idx, tx: tx2 };
(accept, server)
}
/// Handle to worker that can send connection message to worker and share the
/// availability of worker to other thread.
///
/// Held by [Accept](crate::accept::Accept).
pub(crate) struct WorkerHandleAccept {
pub idx: usize,
tx1: UnboundedSender<WorkerCommand>,
tx2: UnboundedSender<StopCommand>,
tx: UnboundedSender<Conn>,
avail: WorkerAvailability,
}
impl WorkerHandle {
pub fn new(
idx: usize,
tx1: UnboundedSender<WorkerCommand>,
tx2: UnboundedSender<StopCommand>,
avail: WorkerAvailability,
) -> Self {
WorkerHandle {
idx,
tx1,
tx2,
avail,
}
impl WorkerHandleAccept {
pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
self.tx.send(msg).map_err(|msg| msg.0)
}
pub fn send(&self, msg: Conn) -> Result<(), Conn> {
self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0)
}
pub fn available(&self) -> bool {
pub(crate) fn available(&self) -> bool {
self.avail.available()
}
}
pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
/// Handle to worker than can send stop message to worker.
///
/// Held by [ServerBuilder](crate::builder::ServerBuilder).
pub(crate) struct WorkerHandleServer {
pub idx: usize,
tx: UnboundedSender<Stop>,
}
impl WorkerHandleServer {
pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel();
let _ = self.tx2.send(StopCommand { graceful, tx });
let _ = self.tx.send(Stop { graceful, tx });
rx
}
}
@ -114,8 +125,8 @@ impl WorkerAvailability {
///
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker {
rx: UnboundedReceiver<WorkerCommand>,
rx2: UnboundedReceiver<StopCommand>,
rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>,
services: Vec<WorkerService>,
availability: WorkerAvailability,
conns: Counter,
@ -187,7 +198,7 @@ impl ServerWorker {
factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability,
config: ServerWorkerConfig,
) -> io::Result<WorkerHandle> {
) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
assert!(!availability.available());
let (tx1, rx) = unbounded_channel();
@ -267,7 +278,7 @@ impl ServerWorker {
.recv()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?
.map(Err)
.unwrap_or_else(|| Ok(WorkerHandle::new(idx, tx1, tx2, avail)))
.unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail)))
}
fn restart_service(&mut self, token: Token, factory_id: usize) {
@ -373,8 +384,7 @@ impl Future for ServerWorker {
let this = self.as_mut().get_mut();
// `StopWorker` message handler
if let Poll::Ready(Some(StopCommand { graceful, tx })) =
Pin::new(&mut this.rx2).poll_recv(cx)
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
{
this.availability.set(false);
let num = this.conns.total();
@ -483,7 +493,7 @@ impl Future for ServerWorker {
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
// handle incoming io stream
Some(WorkerCommand(msg)) => {
Some(msg) => {
let guard = this.conns.get();
let _ = this.services[msg.token.0].service.call((guard, msg.io));
}