Add Acceptable trait. Make actix-server can manage generic io source

This commit is contained in:
fakeshadow 2021-05-03 13:07:46 +08:00
parent 62c971e5aa
commit 2a1300ffbf
14 changed files with 1410 additions and 1381 deletions

View File

@ -16,8 +16,13 @@ edition = "2018"
name = "actix_server"
path = "src/lib.rs"
[features]
default = []
[[example]]
name = "mio_tcp"
path = "examples/mio-tcp.rs"
[[example]]
name = "tcp_echo"
path = "examples/tcp-echo.rs"
[dependencies]
actix-rt = { version = "2.0.0", default-features = false }
@ -38,3 +43,6 @@ bytes = "1"
env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1", features = ["io-util"] }
[target.'cfg(_loom)'.dependencies]
loom = "0.4.1"

View File

@ -0,0 +1,46 @@
//! A Tcp Server using mio::net::TcpListener.
//!
//! actix-server is used to bridge `mio` and multiple tokio current-thread runtime
//! for a thread per core like server.
//!
//! Server would return "Hello World!" String.
use std::{env, io};
use actix_rt::net::TcpStream;
use actix_server::ServerBuilder;
use actix_service::fn_service;
use tokio::io::AsyncWriteExt;
use mio::net::TcpListener;
// A dummy buffer always return hello world to client.
const BUF: &[u8] = b"HTTP/1.1 200 OK\r\n\
content-length: 12\r\n\
connection: close\r\n\
date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\
\r\n\
Hello World!";
#[actix_rt::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "info");
env_logger::init();
let name = "hello_world";
let addr = "127.0.0.1:8080".parse().unwrap();
let lst = TcpListener::bind(addr)?;
ServerBuilder::new()
.bind_acceptable(name, addr, lst, || fn_service(response))
.run()
.await
}
async fn response(mut stream: TcpStream) -> io::Result<()> {
stream.write(BUF).await?;
stream.flush().await?;
stream.shutdown().await
}

View File

@ -0,0 +1,20 @@
use std::{fmt, io};
use mio::{Registry, Token};
#[doc(hidden)]
/// Trait define IO source that can be managed by [super::Accept].
pub trait Acceptable: fmt::Debug {
/// Type accepted from IO source.
type Connection: Send + 'static;
fn accept(&mut self) -> io::Result<Option<Self::Connection>>;
/// Register IO source to Acceptor [Registry](mio::Registry).
/// Self must impl [Source](mio::event::Source) trait.
fn register(&mut self, registry: &Registry, token: Token) -> io::Result<()>;
/// Deregister IO source to Acceptor [Registry](mio::Registry).
/// Self must impl [Source](mio::event::Source) trait.
fn deregister(&mut self, registry: &Registry) -> io::Result<()>;
}

View File

@ -0,0 +1,117 @@
/// Array of u128 with every bit as marker for a worker handle's availability.
pub(super) struct Availability([u128; 4]);
impl Default for Availability {
fn default() -> Self {
Self([0; 4])
}
}
impl Availability {
/// Check if any worker handle is available
#[inline(always)]
pub(super) fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
}
/// Check if worker handle is available by index
#[inline(always)]
pub(super) fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index.
pub(super) fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = Self::offset(idx);
let off = 1 << idx as u128;
if avail {
self.0[offset] |= off;
} else {
self.0[offset] &= !off
}
}
/// Get offset and adjusted index of given worker handle index.
#[inline(always)]
fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
}
}
}
#[cfg(test)]
mod test {
use super::Availability;
fn single(aval: &mut Availability, idx: usize) {
aval.set_available(idx, true);
assert!(aval.available());
aval.set_available(idx, true);
aval.set_available(idx, false);
assert!(!aval.available());
aval.set_available(idx, false);
assert!(!aval.available());
}
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
idx.iter().for_each(|idx| aval.set_available(*idx, true));
assert!(aval.available());
while let Some(idx) = idx.pop() {
assert!(aval.available());
aval.set_available(idx, false);
}
assert!(!aval.available());
}
#[test]
fn availability() {
let mut aval = Availability::default();
single(&mut aval, 1);
single(&mut aval, 128);
single(&mut aval, 256);
single(&mut aval, 511);
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
multi(&mut aval, idx);
multi(&mut aval, (0..511).collect())
}
#[test]
#[should_panic]
fn overflow() {
let mut aval = Availability::default();
single(&mut aval, 512);
}
#[test]
fn pin_point() {
let mut aval = Availability::default();
aval.set_available(438, true);
aval.set_available(479, true);
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
}
}

View File

@ -1,41 +1,46 @@
use std::time::Duration;
use std::{io, thread};
#[cfg(not(_loom))]
use std::thread;
#[cfg(_loom)]
use loom::thread;
mod acceptable;
mod availability;
pub use acceptable::Acceptable;
use std::{io, time::Duration};
use actix_rt::{
time::{sleep, Instant},
System,
};
use log::{error, info};
use mio::{Interest, Poll, Token as MioToken};
use mio::{Poll, Registry, Token};
use tokio::sync::oneshot;
use availability::Availability;
use crate::server::Server;
use crate::socket::MioListener;
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept};
struct ServerSocketInfo {
token: usize,
lst: MioListener,
/// Timeout is used to mark the deadline when this socket's listener should be registered again
/// after an error.
timeout: Option<Instant>,
}
/// Accept loop would live with `ServerBuilder`.
///
/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to
/// `Accept` and `Worker`.
///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
pub(crate) struct AcceptLoop {
pub(crate) struct AcceptLoop<A: Acceptable> {
srv: Option<Server>,
poll: Option<Poll>,
waker: WakerQueue,
waker: WakerQueue<A::Connection>,
}
impl AcceptLoop {
impl<A> AcceptLoop<A>
where
A: Acceptable + Send + 'static,
{
pub fn new(srv: Server) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry())
@ -48,18 +53,18 @@ impl AcceptLoop {
}
}
pub(crate) fn waker_owned(&self) -> WakerQueue {
pub(crate) fn waker_owned(&self) -> WakerQueue<A::Connection> {
self.waker.clone()
}
pub fn wake(&self, i: WakerInterest) {
pub fn wake(&self, i: WakerInterest<A::Connection>) {
self.waker.wake(i);
}
pub(crate) fn start(
&mut self,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
socks: Vec<(usize, A)>,
handles: Vec<WorkerHandleAccept<A::Connection>>,
) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap();
@ -70,72 +75,42 @@ impl AcceptLoop {
}
/// poll instance of the server.
struct Accept {
struct Accept<A: Acceptable> {
poll: Poll,
waker: WakerQueue,
handles: Vec<WorkerHandleAccept>,
source: Box<[Source<A>]>,
waker: WakerQueue<A::Connection>,
handles: Vec<WorkerHandleAccept<A::Connection>>,
srv: Server,
next: usize,
avail: Availability,
paused: bool,
}
/// Array of u128 with every bit as marker for a worker handle's availability.
struct Availability([u128; 4]);
struct Source<A: Acceptable> {
token: usize,
impl Default for Availability {
fn default() -> Self {
Self([0; 4])
}
acceptable: A,
/// Timeout is used to mark the deadline when this socket's listener should be registered again
/// after an error.
timeout: Option<Instant>,
}
impl Availability {
/// Check if any worker handle is available
impl<A: Acceptable> Source<A> {
#[inline(always)]
fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
fn register(&mut self, registry: &Registry) {
let token = Token(self.token);
match self.acceptable.register(registry, token) {
Ok(_) => info!("Start accepting connections on {:?}", &self.acceptable),
Err(e) => error!("Can not register {}", e),
}
}
/// Check if worker handle is available by index
#[inline(always)]
fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index.
fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = Self::offset(idx);
let off = 1 << idx as u128;
if avail {
self.0[offset] |= off;
} else {
self.0[offset] &= !off
}
}
/// Set all worker handle to available state.
/// This would result in a re-check on all workers' availability.
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| {
self.set_available(handle.idx(), true);
})
}
/// Get offset and adjusted index of given worker handle index.
fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
fn deregister(&mut self, registry: &Registry) {
match self.acceptable.deregister(registry) {
Ok(_) => info!("Paused accepting connections on {:?}", &self.acceptable),
Err(e) => error!("Can not deregister {}", e),
}
}
}
@ -153,47 +128,34 @@ fn connection_error(e: &io::Error) -> bool {
|| e.kind() == io::ErrorKind::ConnectionReset
}
impl Accept {
impl<A> Accept<A>
where
A: Acceptable + Send + 'static,
{
pub(crate) fn start(
poll: Poll,
waker: WakerQueue,
socks: Vec<(usize, MioListener)>,
waker: WakerQueue<A::Connection>,
source: Vec<(usize, A)>,
srv: Server,
handles: Vec<WorkerHandleAccept>,
handles: Vec<WorkerHandleAccept<A::Connection>>,
) {
// Accept runs in its own thread and would want to spawn additional futures to current
// actix system.
let sys = System::current();
thread::Builder::new()
.name("actix-server accept loop".to_owned())
.name("actix-server acceptor".to_owned())
.spawn(move || {
System::set_current(sys);
let (mut accept, mut sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv);
accept.poll_with(&mut sockets);
})
.unwrap();
}
fn new_with_sockets(
poll: Poll,
waker: WakerQueue,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
srv: Server,
) -> (Accept, Vec<ServerSocketInfo>) {
let sockets = socks
let source = source
.into_iter()
.map(|(token, mut lst)| {
// Start listening for incoming connections
poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)
lst.register(poll.registry(), Token(token))
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
ServerSocketInfo {
Source {
token,
lst,
acceptable: lst,
timeout: None,
}
})
@ -202,10 +164,13 @@ impl Accept {
let mut avail = Availability::default();
// Assume all handles are avail at construct time.
avail.set_available_all(&handles);
handles.iter().for_each(|handle| {
avail.set_available(handle.idx(), true);
});
let accept = Accept {
poll,
source,
waker,
handles,
srv,
@ -214,10 +179,12 @@ impl Accept {
paused: false,
};
(accept, sockets)
accept.poll();
})
.unwrap();
}
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
fn poll(mut self) {
let mut events = mio::Events::with_capacity(128);
loop {
@ -232,7 +199,7 @@ impl Accept {
let token = event.token();
match token {
WAKER_TOKEN => {
let exit = self.handle_waker(sockets);
let exit = self.handle_waker();
if exit {
info!("Accept is stopped.");
return;
@ -240,14 +207,14 @@ impl Accept {
}
_ => {
let token = usize::from(token);
self.accept(sockets, token);
self.accept(token);
}
}
}
}
}
fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool {
fn handle_waker(&mut self) -> bool {
// This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue.
@ -263,7 +230,7 @@ impl Accept {
self.avail.set_available(idx, true);
if !self.paused {
self.accept_all(sockets);
self.accept_all();
}
}
// a new worker thread is made and it's handle would be added to Accept
@ -274,14 +241,14 @@ impl Accept {
self.handles.push(handle);
if !self.paused {
self.accept_all(sockets);
self.accept_all();
}
}
// got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(sockets)
self.process_timer()
}
Some(WakerInterest::Pause) => {
drop(guard);
@ -289,7 +256,7 @@ impl Accept {
if !self.paused {
self.paused = true;
self.deregister_all(sockets);
self.deregister_all();
}
}
Some(WakerInterest::Resume) => {
@ -298,18 +265,29 @@ impl Accept {
if self.paused {
self.paused = false;
sockets.iter_mut().for_each(|info| {
self.register_logged(info);
});
self.register_all();
self.accept_all(sockets);
self.accept_all();
}
}
Some(WakerInterest::Stop) => {
Some(WakerInterest::Stop(AcceptorStop { graceful, tx })) => {
drop(guard);
if !self.paused {
self.deregister_all(sockets);
self.deregister_all();
}
// Collect oneshot receiver if WorkerHandle::stop returns it.
let res = self
.handles
.iter()
.filter_map(|handle| handle.stop(graceful))
.collect::<Vec<_>>();
let _ = tx.send(res);
// TODO: Should try to drain backlog?
return true;
}
// waker queue is drained
@ -323,88 +301,11 @@ impl Accept {
}
}
fn process_timer(&self, sockets: &mut [ServerSocketInfo]) {
let now = Instant::now();
sockets
.iter_mut()
// Only sockets that had an associated timeout were deregistered.
.filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
if now < inst {
info.timeout = Some(inst);
} else if !self.paused {
self.register_logged(info);
}
// Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
}
#[cfg(not(target_os = "windows"))]
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
let token = MioToken(info.token);
self.poll
.registry()
.register(&mut info.lst, token, Interest::READABLE)
}
#[cfg(target_os = "windows")]
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
// On windows, calling register without deregister cause an error.
// See https://github.com/actix/actix-web/issues/905
// Calling reregister seems to fix the issue.
let token = MioToken(info.token);
self.poll
.registry()
.register(&mut info.lst, token, Interest::READABLE)
.or_else(|_| {
self.poll
.registry()
.reregister(&mut info.lst, token, Interest::READABLE)
})
}
fn register_logged(&self, info: &mut ServerSocketInfo) {
match self.register(info) {
Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
Err(e) => error!("Can not register server socket {}", e),
}
}
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.poll.registry().deregister(&mut info.lst) {
Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()),
Err(e) => {
error!("Can not deregister server socket {}", e)
}
}
}
fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
// This is a best effort implementation with following limitation:
//
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout
// is removed in the process.
//
// Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short
// gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered
// before expected timing.
sockets
.iter_mut()
// Take all timeout.
// This is to prevent Accept::process_timer method re-register a socket afterwards.
.map(|info| (info.timeout.take(), info))
// Socket info with a timeout is already deregistered so skip them.
.filter(|(timeout, _)| timeout.is_none())
.for_each(|(_, info)| self.deregister_logged(info));
}
// Send connection to worker and handle error.
fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> {
fn send_connection(
&mut self,
conn: Conn<A::Connection>,
) -> Result<(), Conn<A::Connection>> {
let next = self.next();
match next.send(conn) {
Ok(_) => {
@ -436,7 +337,7 @@ impl Accept {
}
}
fn accept_one(&mut self, mut conn: Conn) {
fn accept_one(&mut self, mut conn: Conn<A::Connection>) {
loop {
let next = self.next();
let idx = next.idx();
@ -447,7 +348,6 @@ impl Accept {
Err(c) => conn = c,
}
} else {
self.avail.set_available(idx, false);
self.set_next();
if !self.avail.available() {
@ -460,27 +360,28 @@ impl Accept {
}
}
fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {
fn accept(&mut self, token: usize) {
while self.avail.available() {
let info = &mut sockets[token];
let source = &mut self.source[token];
match info.lst.accept() {
Ok(io) => {
let conn = Conn { io, token };
match source.acceptable.accept() {
Ok(Some(io)) => {
let conn = Conn { token, io };
self.accept_one(conn);
}
Ok(None) => continue,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
// deregister listener temporary
self.deregister_logged(info);
source.deregister(self.poll.registry());
// sleep after error. write the timeout to socket info as later
// the poll would need it mark which socket and when it's
// listener should be registered
info.timeout = Some(Instant::now() + Duration::from_millis(500));
source.timeout = Some(Instant::now() + Duration::from_millis(500));
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
@ -495,17 +396,69 @@ impl Accept {
}
}
fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) {
sockets
fn accept_all(&mut self) {
self.source
.iter_mut()
.map(|info| info.token)
.collect::<Vec<_>>()
.into_iter()
.for_each(|idx| self.accept(sockets, idx))
.for_each(|idx| self.accept(idx))
}
fn process_timer(&mut self) {
let now = Instant::now();
let registry = self.poll.registry();
let paused = self.paused;
self.source
.iter_mut()
// Only sockets that had an associated timeout were deregistered.
.filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
if now < inst {
info.timeout = Some(inst);
} else if !paused {
info.register(registry);
}
// Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
}
fn register_all(&mut self) {
let registry = self.poll.registry();
self.source.iter_mut().for_each(|info| {
info.register(registry);
});
}
fn deregister_all(&mut self) {
let registry = self.poll.registry();
// This is a best effort implementation with following limitation:
//
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout
// is removed in the process.
//
// Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short
// gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered
// before expected timing.
self.source
.iter_mut()
// Take all timeout.
// This is to prevent Accept::process_timer method re-register a socket afterwards.
.map(|info| (info.timeout.take(), info))
// Socket info with a timeout is already deregistered so skip them.
.filter(|(timeout, _)| timeout.is_none())
.for_each(|(_, info)| info.deregister(registry));
}
#[inline(always)]
fn next(&self) -> &WorkerHandleAccept {
fn next(&self) -> &WorkerHandleAccept<A::Connection> {
&self.handles[self.next]
}
@ -526,67 +479,19 @@ impl Accept {
}
}
#[cfg(test)]
mod test {
use super::Availability;
pub(crate) struct AcceptorStop {
graceful: bool,
tx: oneshot::Sender<Vec<oneshot::Receiver<bool>>>,
}
fn single(aval: &mut Availability, idx: usize) {
aval.set_available(idx, true);
assert!(aval.available());
impl AcceptorStop {
pub(crate) fn new(
graceful: bool,
) -> (Self, oneshot::Receiver<Vec<oneshot::Receiver<bool>>>) {
let (tx, rx) = oneshot::channel();
aval.set_available(idx, true);
let this = Self { graceful, tx };
aval.set_available(idx, false);
assert!(!aval.available());
aval.set_available(idx, false);
assert!(!aval.available());
}
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
idx.iter().for_each(|idx| aval.set_available(*idx, true));
assert!(aval.available());
while let Some(idx) = idx.pop() {
assert!(aval.available());
aval.set_available(idx, false);
}
assert!(!aval.available());
}
#[test]
fn availability() {
let mut aval = Availability::default();
single(&mut aval, 1);
single(&mut aval, 128);
single(&mut aval, 256);
single(&mut aval, 511);
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
multi(&mut aval, idx);
multi(&mut aval, (0..511).collect())
}
#[test]
#[should_panic]
fn overflow() {
let mut aval = Availability::default();
single(&mut aval, 512);
}
#[test]
fn pin_point() {
let mut aval = Availability::default();
aval.set_available(438, true);
aval.set_available(479, true);
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
(this, rx)
}
}

View File

@ -6,32 +6,32 @@ use std::{
time::Duration,
};
use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use actix_rt::{net::TcpStream, time::sleep, System};
use log::{error, info};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
oneshot,
};
use crate::accept::AcceptLoop;
use crate::join_all;
use crate::accept::{AcceptLoop, Acceptable, AcceptorStop};
use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
use crate::socket::{
FromConnection, MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener,
ToSocketAddrs,
};
use crate::waker_queue::WakerInterest;
use crate::worker::{ServerWorkerConfig, Worker, WorkerHandleAccept};
/// Server builder
pub struct ServerBuilder {
pub struct ServerBuilder<A: Acceptable = MioListener> {
threads: usize,
token: usize,
backlog: u32,
handles: Vec<(usize, WorkerHandleServer)>,
services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(usize, String, MioListener)>,
accept: AcceptLoop,
services: Vec<Box<dyn InternalServiceFactory<A::Connection>>>,
sockets: Vec<(usize, String, A)>,
accept: AcceptLoop<A>,
exit: bool,
no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>,
@ -46,16 +46,18 @@ impl Default for ServerBuilder {
}
}
impl ServerBuilder {
impl<A> ServerBuilder<A>
where
A: Acceptable + Send + Unpin + 'static,
{
/// Create new Server builder instance
pub fn new() -> ServerBuilder {
pub fn new() -> Self {
let (tx, rx) = unbounded_channel();
let server = Server::new(tx);
ServerBuilder {
Self {
threads: num_cpus::get(),
token: 0,
handles: Vec::new(),
services: Vec::new(),
sockets: Vec::new(),
accept: AcceptLoop::new(server.clone()),
@ -86,7 +88,7 @@ impl ServerBuilder {
/// # Examples:
/// ```
/// # use actix_server::ServerBuilder;
/// let builder = ServerBuilder::new()
/// let builder = ServerBuilder::default()
/// .workers(4) // server has 4 worker thread.
/// .worker_max_blocking_threads(4); // every worker has 4 max blocking threads.
/// ```
@ -147,167 +149,20 @@ impl ServerBuilder {
self
}
/// Add new service to the server.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
U: ToSocketAddrs,
{
let sockets = bind_addr(addr, self.backlog)?;
for lst in sockets {
let token = self.next_token();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory.clone(),
lst.local_addr()?,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
}
Ok(self)
fn next_token(&mut self) -> usize {
let token = self.token;
self.token += 1;
token
}
/// Add new unix domain service to the server.
#[cfg(unix)]
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream>,
N: AsRef<str>,
U: AsRef<std::path::Path>,
{
// The path must not exist when we try to bind.
// Try to remove it to avoid bind error.
if let Err(e) = std::fs::remove_file(addr.as_ref()) {
// NotFound is expected and not an issue. Anything else is.
if e.kind() != std::io::ErrorKind::NotFound {
return Err(e);
}
}
let lst = crate::socket::StdUnixListener::bind(addr)?;
self.listen_uds(name, lst, factory)
}
/// Add new unix domain service to the server.
/// Useful when running as a systemd service and
/// a socket FD can be acquired using the systemd crate.
#[cfg(unix)]
pub fn listen_uds<F, N: AsRef<str>>(
mut self,
name: N,
lst: crate::socket::StdUnixListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream>,
{
use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?;
let token = self.next_token();
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
/// Add new service to the server.
pub fn listen<F, N: AsRef<str>>(
mut self,
name: N,
lst: StdTcpListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
{
lst.set_nonblocking(true)?;
let addr = lst.local_addr()?;
let token = self.next_token();
self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory,
addr,
));
self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
Ok(self)
}
/// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> Server {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
info!("Starting {} workers", self.threads);
// start workers
let handles = (0..self.threads)
.map(|idx| {
let (handle_accept, handle_server) =
self.start_worker(idx, self.accept.waker_owned());
self.handles.push((idx, handle_server));
handle_accept
})
.collect();
// start accept thread
for sock in &self.sockets {
info!("Starting \"{}\" service on {}", sock.1, sock.2);
}
self.accept.start(
mem::take(&mut self.sockets)
.into_iter()
.map(|t| (t.0, t.2))
.collect(),
handles,
);
// handle signals
if !self.no_signals {
Signals::start(self.server.clone());
}
// start http server actor
let server = self.server.clone();
rt::spawn(self);
server
}
}
fn start_worker(
&self,
idx: usize,
waker_queue: WakerQueue,
) -> (WorkerHandleAccept, WorkerHandleServer) {
fn start_worker(&self, idx: usize) -> WorkerHandleAccept<A::Connection> {
let services = self.services.iter().map(|v| v.clone_factory()).collect();
ServerWorker::start(idx, services, waker_queue, self.worker_config)
let config = self.worker_config;
let waker_queue = self.accept.waker_owned();
Worker::start(idx, services, waker_queue, config)
}
fn handle_cmd(&mut self, item: ServerCommand) {
match item {
ServerCommand::Pause(tx) => {
self.accept.wake(WakerInterest::Pause);
let _ = tx.send(());
}
ServerCommand::Resume(tx) => {
self.accept.wake(WakerInterest::Resume);
let _ = tx.send(());
}
ServerCommand::Signal(sig) => {
fn handle_signal(&mut self, sig: Signal) {
// Signals support
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
match sig {
@ -338,6 +193,18 @@ impl ServerBuilder {
_ => (),
}
}
fn handle_cmd(&mut self, item: ServerCommand) {
match item {
ServerCommand::Pause(tx) => {
self.accept.wake(WakerInterest::Pause);
let _ = tx.send(());
}
ServerCommand::Resume(tx) => {
self.accept.wake(WakerInterest::Resume);
let _ = tx.send(());
}
ServerCommand::Signal(sig) => self.handle_signal(sig),
ServerCommand::Notify(tx) => {
self.notify.push(tx);
}
@ -348,19 +215,14 @@ impl ServerBuilder {
let exit = self.exit;
// stop accept thread
self.accept.wake(WakerInterest::Stop);
let (stop, rx) = AcceptorStop::new(graceful);
self.accept.wake(WakerInterest::Stop(stop));
let notify = std::mem::take(&mut self.notify);
// stop workers
let stop = self
.handles
.iter()
.map(move |worker| worker.1.stop(graceful))
.collect();
rt::spawn(async move {
if graceful {
let _ = join_all(stop).await;
actix_rt::spawn(async move {
for rx in rx.await.unwrap_or_else(|_| Vec::new()) {
let _ = rx.await;
}
if let Some(tx) = completion {
@ -377,52 +239,171 @@ impl ServerBuilder {
});
}
ServerCommand::WorkerFaulted(idx) => {
let mut found = false;
for i in 0..self.handles.len() {
if self.handles[i].0 == idx {
self.handles.swap_remove(i);
found = true;
break;
}
}
if found {
error!("Worker has died {:?}, restarting", idx);
let mut new_idx = self.handles.len();
'found: loop {
for i in 0..self.handles.len() {
if self.handles[i].0 == new_idx {
new_idx += 1;
continue 'found;
}
}
break;
}
let (handle_accept, handle_server) =
self.start_worker(new_idx, self.accept.waker_owned());
self.handles.push((new_idx, handle_server));
self.accept.wake(WakerInterest::Worker(handle_accept));
}
let handle = self.start_worker(idx);
self.accept.wake(WakerInterest::Worker(handle));
}
}
}
fn next_token(&mut self) -> usize {
let token = self.token;
self.token += 1;
token
/// Starts processing incoming connections and return server controller.
pub fn run(mut self) -> Server {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
info!("Starting {} workers", self.threads);
// start workers
let handles = (0..self.threads)
.map(|idx| self.start_worker(idx))
.collect();
// start accept thread
for sock in &self.sockets {
info!("Starting \"{}\" service on {:?}", sock.1, sock.2);
}
self.accept.start(
mem::take(&mut self.sockets)
.into_iter()
.map(|t| (t.0, t.2))
.collect(),
handles,
);
// handle signals
if !self.no_signals {
Signals::start(self.server.clone());
}
// start http server actor
let server = self.server.clone();
actix_rt::spawn(self);
server
}
}
#[doc(hidden)]
pub fn bind_acceptable<F, Io>(
mut self,
name: &str,
addr: StdSocketAddr,
lst: A,
factory: F,
) -> Self
where
F: ServiceFactory<Io, A::Connection>,
Io: FromConnection<A::Connection> + Send + 'static,
{
let token = self.next_token();
self.services.push(StreamNewService::create(
name.to_string(),
token,
factory,
addr,
));
self.sockets.push((token, name.to_string(), lst));
self
}
}
impl Future for ServerBuilder {
impl ServerBuilder {
/// Add new service to the server.
pub fn bind<F, U, N>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
N: AsRef<str>,
U: ToSocketAddrs,
{
let sockets = bind_addr(addr, self.backlog)?;
for lst in sockets {
let addr = lst.local_addr()?;
let lst = MioListener::Tcp(lst);
self = self.bind_acceptable(name.as_ref(), addr, lst, factory.clone());
}
Ok(self)
}
/// Add new service to the server.
pub fn listen<F, N: AsRef<str>>(
self,
name: N,
lst: StdTcpListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
{
lst.set_nonblocking(true)?;
let addr = lst.local_addr()?;
let lst = MioListener::from(lst);
Ok(self.bind_acceptable(name.as_ref(), addr, lst, factory))
}
}
#[cfg(unix)]
impl ServerBuilder {
/// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream>,
N: AsRef<str>,
U: AsRef<std::path::Path>,
{
// The path must not exist when we try to bind.
// Try to remove it to avoid bind error.
if let Err(e) = std::fs::remove_file(addr.as_ref()) {
// NotFound is expected and not an issue. Anything else is.
if e.kind() != io::ErrorKind::NotFound {
return Err(e);
}
}
let lst = crate::socket::StdUnixListener::bind(addr)?;
self.listen_uds(name, lst, factory)
}
/// Add new unix domain service to the server.
/// Useful when running as a systemd service and
/// a socket FD can be acquired using the systemd crate.
pub fn listen_uds<F, N>(
self,
name: N,
lst: crate::socket::StdUnixListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream>,
N: AsRef<str>,
{
lst.set_nonblocking(true)?;
let addr = "127.0.0.1:8080".parse().unwrap();
let lst = MioListener::from(lst);
Ok(self.bind_acceptable(name.as_ref(), addr, lst, factory))
}
}
impl<A: Acceptable> Future for ServerBuilder<A>
where
A: Acceptable + Send + Unpin + 'static,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match Pin::new(&mut self.cmd).poll_recv(cx) {
Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it),
match Pin::new(&mut this.cmd).poll_recv(cx) {
Poll::Ready(Some(it)) => this.handle_cmd(it),
_ => return Poll::Pending,
}
}

View File

@ -14,13 +14,14 @@ mod test_server;
mod waker_queue;
mod worker;
pub use self::accept::Acceptable;
pub use self::builder::ServerBuilder;
pub use self::server::Server;
pub use self::service::ServiceFactory;
pub use self::test_server::TestServer;
#[doc(hidden)]
pub use self::socket::FromStream;
pub use self::socket::FromConnection;
use std::future::Future;
use std::pin::Pin;

View File

@ -7,38 +7,42 @@ use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture;
use log::error;
use crate::socket::{FromStream, MioStream};
use crate::socket::{FromConnection, MioStream};
use crate::worker::WorkerCounterGuard;
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: BaseServiceFactory<Stream, Config = ()>;
pub trait ServiceFactory<Io, C = MioStream>
where
Io: FromConnection<C>,
Self: Send + Clone + 'static,
{
type Factory: BaseServiceFactory<Io, Config = ()>;
fn create(&self) -> Self::Factory;
}
pub(crate) trait InternalServiceFactory: Send {
pub(crate) trait InternalServiceFactory<C>: Send {
fn name(&self, token: usize) -> &str;
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn clone_factory(&self) -> Box<dyn InternalServiceFactory<C>>;
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService<C>), ()>>;
}
pub(crate) type BoxedServerService = Box<
pub(crate) type BoxedServerService<C> = Box<
dyn Service<
(WorkerCounterGuard, MioStream),
(WorkerCounterGuard<C>, C),
Response = (),
Error = (),
Future = Ready<Result<(), ()>>,
>,
>;
pub(crate) struct StreamService<S, I> {
pub(crate) struct StreamService<S, Io> {
service: S,
_phantom: PhantomData<I>,
_phantom: PhantomData<Io>,
}
impl<S, I> StreamService<S, I> {
impl<S, Io> StreamService<S, Io> {
pub(crate) fn new(service: S) -> Self {
StreamService {
service,
@ -47,26 +51,28 @@ impl<S, I> StreamService<S, I> {
}
}
impl<S, I> Service<(WorkerCounterGuard, MioStream)> for StreamService<S, I>
impl<S, Io, C> Service<(WorkerCounterGuard<C>, C)> for StreamService<S, Io>
where
S: Service<I>,
S: Service<Io>,
S::Future: 'static,
S::Error: 'static,
I: FromStream,
Io: FromConnection<C>,
C: 'static,
{
type Response = ();
type Error = ();
type Future = Ready<Result<(), ()>>;
#[inline]
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx).map_err(|_| ())
}
fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future {
ready(match FromStream::from_mio(req) {
fn call(&self, (guard, req): (WorkerCounterGuard<C>, C)) -> Self::Future {
ready(match FromConnection::from_conn(req) {
Ok(stream) => {
let f = self.service.call(stream);
actix_rt::spawn(async move {
actix_rt::spawn(async {
let _ = f.await;
drop(guard);
});
@ -80,25 +86,30 @@ where
}
}
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
pub(crate) struct StreamNewService<F, Io, C>
where
F: ServiceFactory<Io, C>,
Io: FromConnection<C> + Send,
{
name: String,
inner: F,
token: usize,
addr: SocketAddr,
_t: PhantomData<Io>,
_t: PhantomData<(Io, C)>,
}
impl<F, Io> StreamNewService<F, Io>
impl<F, Io, C> StreamNewService<F, Io, C>
where
F: ServiceFactory<Io>,
Io: FromStream + Send + 'static,
F: ServiceFactory<Io, C>,
Io: FromConnection<C> + Send + 'static,
C: Send + 'static,
{
pub(crate) fn create(
name: String,
token: usize,
inner: F,
addr: SocketAddr,
) -> Box<dyn InternalServiceFactory> {
) -> Box<dyn InternalServiceFactory<C>> {
Box::new(Self {
name,
token,
@ -109,16 +120,17 @@ where
}
}
impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
impl<F, Io, C> InternalServiceFactory<C> for StreamNewService<F, Io, C>
where
F: ServiceFactory<Io>,
Io: FromStream + Send + 'static,
F: ServiceFactory<Io, C>,
Io: FromConnection<C> + Send + 'static,
C: Send + 'static,
{
fn name(&self, _: usize) -> &str {
&self.name
}
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
fn clone_factory(&self) -> Box<dyn InternalServiceFactory<C>> {
Box::new(Self {
name: self.name.clone(),
inner: self.inner.clone(),
@ -128,7 +140,7 @@ where
})
}
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService<C>), ()>> {
let token = self.token;
let fut = self.inner.create().new_service(());
Box::pin(async move {
@ -143,11 +155,11 @@ where
}
}
impl<F, T, I> ServiceFactory<I> for F
impl<F, T, Io, C> ServiceFactory<Io, C> for F
where
F: Fn() -> T + Send + Clone + 'static,
T: BaseServiceFactory<I, Config = ()>,
I: FromStream,
T: BaseServiceFactory<Io, Config = ()>,
Io: FromConnection<C>,
{
type Factory = T;

View File

@ -12,71 +12,73 @@ pub(crate) use {
use std::{fmt, io};
use actix_rt::net::TcpStream;
use mio::net::TcpStream as MioTcpStream;
use mio::{event::Source, Interest, Registry, Token};
pub(crate) enum MioListener {
use crate::accept::Acceptable;
/// impl Acceptable trait for [mio::net::TcpListener] so it can be managed by server and it's [mio::Poll] instance.
impl Acceptable for MioTcpListener {
type Connection = MioTcpStream;
fn accept(&mut self) -> io::Result<Option<Self::Connection>> {
Self::accept(self).map(|stream| Some(stream.0))
}
fn register(&mut self, registry: &mio::Registry, token: mio::Token) -> io::Result<()> {
Source::register(self, registry, token, Interest::READABLE)
}
fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
Source::deregister(self, registry)
}
}
pub enum MioListener {
Tcp(MioTcpListener),
#[cfg(unix)]
Uds(MioUnixListener),
}
impl MioListener {
pub(crate) fn local_addr(&self) -> SocketAddr {
match *self {
MioListener::Tcp(ref lst) => lst
.local_addr()
.map(SocketAddr::Tcp)
.unwrap_or(SocketAddr::Unknown),
#[cfg(unix)]
MioListener::Uds(ref lst) => lst
.local_addr()
.map(SocketAddr::Uds)
.unwrap_or(SocketAddr::Unknown),
}
}
pub(crate) fn accept(&self) -> io::Result<MioStream> {
match *self {
MioListener::Tcp(ref lst) => lst.accept().map(|(stream, _)| MioStream::Tcp(stream)),
#[cfg(unix)]
MioListener::Uds(ref lst) => lst.accept().map(|(stream, _)| MioStream::Uds(stream)),
}
impl From<StdTcpListener> for MioListener {
fn from(lst: StdTcpListener) -> Self {
MioListener::Tcp(MioTcpListener::from_std(lst))
}
}
impl Source for MioListener {
fn register(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
impl Acceptable for MioListener {
type Connection = MioStream;
fn accept(&mut self) -> io::Result<Option<Self::Connection>> {
match *self {
MioListener::Tcp(ref mut lst) => lst.register(registry, token, interests),
MioListener::Tcp(ref mut lst) => {
MioTcpListener::accept(lst).map(|stream| Some(MioStream::Tcp(stream.0)))
}
#[cfg(unix)]
MioListener::Uds(ref mut lst) => lst.register(registry, token, interests),
MioListener::Uds(ref mut lst) => {
MioUnixListener::accept(lst).map(|stream| Some(MioStream::Uds(stream.0)))
}
}
}
fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
fn register(&mut self, registry: &Registry, token: Token) -> io::Result<()> {
match *self {
MioListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests),
MioListener::Tcp(ref mut lst) => {
Source::register(lst, registry, token, Interest::READABLE)
}
#[cfg(unix)]
MioListener::Uds(ref mut lst) => lst.reregister(registry, token, interests),
MioListener::Uds(ref mut lst) => {
Source::register(lst, registry, token, Interest::READABLE)
}
}
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
match *self {
MioListener::Tcp(ref mut lst) => lst.deregister(registry),
MioListener::Tcp(ref mut lst) => Source::deregister(lst, registry),
#[cfg(unix)]
MioListener::Uds(ref mut lst) => {
let res = lst.deregister(registry);
let res = Source::deregister(lst, registry);
// cleanup file path
if let Ok(addr) = lst.local_addr() {
@ -90,19 +92,6 @@ impl Source for MioListener {
}
}
impl From<StdTcpListener> for MioListener {
fn from(lst: StdTcpListener) -> Self {
MioListener::Tcp(MioTcpListener::from_std(lst))
}
}
#[cfg(unix)]
impl From<StdUnixListener> for MioListener {
fn from(lst: StdUnixListener) -> Self {
MioListener::Uds(MioUnixListener::from_std(lst))
}
}
impl fmt::Debug for MioListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
@ -113,55 +102,16 @@ impl fmt::Debug for MioListener {
}
}
impl fmt::Display for MioListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
MioListener::Tcp(ref lst) => write!(f, "{:?}", lst),
#[cfg(unix)]
MioListener::Uds(ref lst) => write!(f, "{:?}", lst),
}
}
}
pub(crate) enum SocketAddr {
Unknown,
Tcp(StdSocketAddr),
#[cfg(unix)]
Uds(mio::net::SocketAddr),
}
impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Self::Unknown => write!(f, "Unknown SocketAddr"),
Self::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(unix)]
Self::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
impl fmt::Debug for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Self::Unknown => write!(f, "Unknown SocketAddr"),
Self::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(unix)]
Self::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
#[derive(Debug)]
pub enum MioStream {
Tcp(mio::net::TcpStream),
Tcp(MioTcpStream),
#[cfg(unix)]
Uds(mio::net::UnixStream),
}
/// helper trait for converting mio stream to tokio stream.
pub trait FromStream: Sized {
fn from_mio(sock: MioStream) -> io::Result<Self>;
pub trait FromConnection<C>: Sized {
fn from_conn(conn: C) -> io::Result<Self>;
}
#[cfg(windows)]
@ -171,15 +121,19 @@ mod win_impl {
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(mio) => {
impl FromConnection<MioTcpStream> for TcpStream {
fn from_conn(conn: MioTcpStream) -> io::Result<Self> {
let raw = IntoRawSocket::into_raw_socket(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) })
}
}
impl FromConnection<MioStream> for TcpStream {
fn from_conn(stream: MioStream) -> io::Result<Self> {
match stream {
MioStream::Tcp(tcp) => FromConnection::from_conn(tcp),
}
}
}
}
@ -191,71 +145,64 @@ mod unix_impl {
use std::os::unix::io::{FromRawFd, IntoRawFd};
use actix_rt::net::UnixStream;
use mio::net::UnixStream as MioUnixStream;
impl From<StdUnixListener> for MioListener {
fn from(lst: StdUnixListener) -> Self {
MioListener::Uds(MioUnixListener::from_std(lst))
}
}
/// impl Acceptable trait for [mio::net::UnixListener] so it can be managed by server and it's [mio::Poll] instance.
impl Acceptable for MioUnixListener {
type Connection = MioUnixStream;
fn accept(&mut self) -> io::Result<Option<Self::Connection>> {
Self::accept(self).map(|stream| Some(stream.0))
}
fn register(&mut self, registry: &mio::Registry, token: mio::Token) -> io::Result<()> {
Source::register(self, registry, token, Interest::READABLE)
}
fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
Source::deregister(self, registry)
}
}
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(mio) => {
let raw = IntoRawFd::into_raw_fd(mio);
impl FromConnection<MioTcpStream> for TcpStream {
fn from_conn(conn: MioTcpStream) -> io::Result<Self> {
let raw = IntoRawFd::into_raw_fd(conn);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
MioStream::Uds(_) => {
panic!("Should not happen, bug in server impl");
}
}
}
}
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
impl FromStream for UnixStream {
fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock {
MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
MioStream::Uds(mio) => {
let raw = IntoRawFd::into_raw_fd(mio);
impl FromConnection<MioUnixStream> for UnixStream {
fn from_conn(conn: MioUnixStream) -> io::Result<Self> {
let raw = IntoRawFd::into_raw_fd(conn);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn socket_addr() {
let addr = SocketAddr::Tcp("127.0.0.1:8080".parse().unwrap());
assert!(format!("{:?}", addr).contains("127.0.0.1:8080"));
assert_eq!(format!("{}", addr), "127.0.0.1:8080");
let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = MioTcpSocket::new_v4().unwrap();
socket.set_reuseaddr(true).unwrap();
socket.bind(addr).unwrap();
let tcp = socket.listen(128).unwrap();
let lst = MioListener::Tcp(tcp);
assert!(format!("{:?}", lst).contains("TcpListener"));
assert!(format!("{}", lst).contains("127.0.0.1"));
}
#[test]
#[cfg(unix)]
fn uds() {
let _ = std::fs::remove_file("/tmp/sock.xxxxx");
if let Ok(socket) = MioUnixListener::bind("/tmp/sock.xxxxx") {
let addr = socket.local_addr().expect("Couldn't get local address");
let a = SocketAddr::Uds(addr);
assert!(format!("{:?}", a).contains("/tmp/sock.xxxxx"));
assert!(format!("{}", a).contains("/tmp/sock.xxxxx"));
let lst = MioListener::Uds(socket);
assert!(format!("{:?}", lst).contains("/tmp/sock.xxxxx"));
assert!(format!("{}", lst).contains("/tmp/sock.xxxxx"));
impl FromConnection<MioStream> for TcpStream {
fn from_conn(stream: MioStream) -> io::Result<Self> {
match stream {
MioStream::Tcp(tcp) => FromConnection::from_conn(tcp),
MioStream::Uds(_) => unreachable!("UnixStream can not convert to TcpStream"),
}
}
}
impl FromConnection<MioStream> for UnixStream {
fn from_conn(stream: MioStream) -> io::Result<Self> {
match stream {
MioStream::Tcp(_) => unreachable!("TcpStream can not convert to UnixStream"),
MioStream::Uds(uds) => FromConnection::from_conn(uds),
}
}
}
}

View File

@ -6,6 +6,7 @@ use std::{
use mio::{Registry, Token as MioToken, Waker};
use crate::accept::AcceptorStop;
use crate::worker::WorkerHandleAccept;
/// Waker token for `mio::Poll` instance.
@ -13,23 +14,23 @@ pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
/// the `Poll` would want to look into.
pub(crate) struct WakerQueue(Arc<(Waker, Mutex<VecDeque<WakerInterest>>)>);
pub(crate) struct WakerQueue<C>(Arc<(Waker, Mutex<VecDeque<WakerInterest<C>>>)>);
impl Clone for WakerQueue {
impl<C> Clone for WakerQueue<C> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl Deref for WakerQueue {
type Target = (Waker, Mutex<VecDeque<WakerInterest>>);
impl<C> Deref for WakerQueue<C> {
type Target = (Waker, Mutex<VecDeque<WakerInterest<C>>>);
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl WakerQueue {
impl<C> WakerQueue<C> {
/// Construct a waker queue with given `Poll`'s `Registry` and capacity.
///
/// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match
@ -42,7 +43,7 @@ impl WakerQueue {
}
/// Push a new interest to the queue and wake up the accept poll afterwards.
pub(crate) fn wake(&self, interest: WakerInterest) {
pub(crate) fn wake(&self, interest: WakerInterest<C>) {
let (waker, queue) = self.deref();
queue
@ -56,20 +57,20 @@ impl WakerQueue {
}
/// Get a MutexGuard of the waker queue.
pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest>> {
pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest<C>>> {
self.deref().1.lock().expect("Failed to lock WakerQueue")
}
/// Reset the waker queue so it does not grow infinitely.
pub(crate) fn reset(queue: &mut VecDeque<WakerInterest>) {
std::mem::swap(&mut VecDeque::<WakerInterest>::with_capacity(16), queue);
pub(crate) fn reset(queue: &mut VecDeque<WakerInterest<C>>) {
std::mem::swap(&mut VecDeque::<WakerInterest<C>>::with_capacity(16), queue);
}
}
/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
///
/// These interests should not be confused with `mio::Interest` and mostly not I/O related
pub(crate) enum WakerInterest {
pub(crate) enum WakerInterest<C> {
/// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
/// available and can accept new tasks.
WorkerAvailable(usize),
@ -77,7 +78,7 @@ pub(crate) enum WakerInterest {
/// `ServerCommand` and notify `Accept` to do exactly these tasks.
Pause,
Resume,
Stop,
Stop(AcceptorStop),
/// `Timer` is an interest sent as a delayed future. When an error happens on accepting
/// connection `Accept` would deregister socket listener temporary and wake up the poll and
/// register them again after the delayed future resolve.
@ -85,5 +86,5 @@ pub(crate) enum WakerInterest {
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandleAccept`.
Worker(WorkerHandleAccept),
Worker(WorkerHandleAccept<C>),
}

View File

@ -1,645 +0,0 @@
use std::{
future::Future,
mem,
pin::Pin,
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};
use actix_rt::{
spawn,
time::{sleep, Instant, Sleep},
Arbiter,
};
use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
use crate::join_all;
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
/// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute.
pub(crate) struct Stop {
graceful: bool,
tx: oneshot::Sender<bool>,
}
#[derive(Debug)]
pub(crate) struct Conn {
pub io: MioStream,
pub token: usize,
}
fn handle_pair(
idx: usize,
tx1: UnboundedSender<Conn>,
tx2: UnboundedSender<Stop>,
counter: Counter,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept {
idx,
tx: tx1,
counter,
};
let server = WorkerHandleServer { idx, tx: tx2 };
(accept, server)
}
/// counter: Arc<AtomicUsize> field is owned by `Accept` thread and `ServerWorker` thread.
///
/// `Accept` would increment the counter and `ServerWorker` would decrement it.
///
/// # Atomic Ordering:
///
/// `Accept` always look into it's cached `Availability` field for `ServerWorker` state.
/// It lazily increment counter after successful dispatching new work to `ServerWorker`.
/// On reaching counter limit `Accept` update it's cached `Availability` and mark worker as
/// unable to accept any work.
///
/// `ServerWorker` always decrement the counter when every work received from `Accept` is done.
/// On reaching counter limit worker would use `mio::Waker` and `WakerQueue` to wake up `Accept`
/// and notify it to update cached `Availability` again to mark worker as able to accept work again.
///
/// Hense a wake up would only happen after `Accept` increment it to limit.
/// And a decrement to limit always wake up `Accept`.
#[derive(Clone)]
pub(crate) struct Counter {
counter: Arc<AtomicUsize>,
limit: usize,
}
impl Counter {
pub(crate) fn new(limit: usize) -> Self {
Self {
counter: Arc::new(AtomicUsize::new(1)),
limit,
}
}
/// Increment counter by 1 and return true when hitting limit
#[inline(always)]
pub(crate) fn inc(&self) -> bool {
self.counter.fetch_add(1, Ordering::Relaxed) != self.limit
}
/// Decrement counter by 1 and return true if crossing limit.
#[inline(always)]
pub(crate) fn dec(&self) -> bool {
self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit
}
pub(crate) fn total(&self) -> usize {
self.counter.load(Ordering::SeqCst) - 1
}
}
pub(crate) struct WorkerCounter {
idx: usize,
inner: Rc<(WakerQueue, Counter)>,
}
impl Clone for WorkerCounter {
fn clone(&self) -> Self {
Self {
idx: self.idx,
inner: self.inner.clone(),
}
}
}
impl WorkerCounter {
pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self {
Self {
idx,
inner: Rc::new((waker_queue, counter)),
}
}
#[inline(always)]
pub(crate) fn guard(&self) -> WorkerCounterGuard {
WorkerCounterGuard(self.clone())
}
fn total(&self) -> usize {
self.inner.1.total()
}
}
pub(crate) struct WorkerCounterGuard(WorkerCounter);
impl Drop for WorkerCounterGuard {
fn drop(&mut self) {
let (waker_queue, counter) = &*self.0.inner;
if counter.dec() {
waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx));
}
}
}
/// Handle to worker that can send connection message to worker and share the
/// availability of worker to other thread.
///
/// Held by [Accept](crate::accept::Accept).
pub(crate) struct WorkerHandleAccept {
idx: usize,
tx: UnboundedSender<Conn>,
counter: Counter,
}
impl WorkerHandleAccept {
#[inline(always)]
pub(crate) fn idx(&self) -> usize {
self.idx
}
#[inline(always)]
pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
self.tx.send(msg).map_err(|msg| msg.0)
}
#[inline(always)]
pub(crate) fn inc_counter(&self) -> bool {
self.counter.inc()
}
}
/// Handle to worker than can send stop message to worker.
///
/// Held by [ServerBuilder](crate::builder::ServerBuilder).
pub(crate) struct WorkerHandleServer {
pub idx: usize,
tx: UnboundedSender<Stop>,
}
impl WorkerHandleServer {
pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
let (tx, rx) = oneshot::channel();
let _ = self.tx.send(Stop { graceful, tx });
rx
}
}
/// Server worker.
///
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
pub(crate) struct ServerWorker {
worker: Worker,
state: WorkerState,
}
impl ServerWorker {
fn new(
rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>,
counter: WorkerCounter,
services: Box<[WorkerService]>,
factories: Box<[Box<dyn InternalServiceFactory>]>,
shutdown_timeout: Duration,
) -> Self {
Self {
worker: Worker {
rx,
rx2,
counter,
services,
factories,
shutdown_timeout,
},
state: WorkerState::default(),
}
}
pub(crate) fn start(
idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>,
waker_queue: WakerQueue,
config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
let counter = Counter::new(config.max_concurrent_connections);
let counter_clone = counter.clone();
// every worker runs in it's own arbiter.
// use a custom tokio runtime builder to change the settings of runtime.
Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
})
.spawn(async move {
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
})
.collect::<Vec<_>>();
// a second spawn to run !Send future tasks.
spawn(async move {
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
return;
}
};
let counter = WorkerCounter::new(idx, waker_queue, counter_clone);
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker::new(
rx,
rx2,
counter,
services,
factories.into_boxed_slice(),
config.shutdown_timeout,
));
});
});
handle_pair(idx, tx1, tx2, counter)
}
}
struct Worker {
// UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>,
counter: WorkerCounter,
services: Box<[WorkerService]>,
factories: Box<[Box<dyn InternalServiceFactory>]>,
shutdown_timeout: Duration,
}
impl Worker {
/// `Conn` message and worker/service state switch handler
fn poll_running(&mut self, running: &mut Running, cx: &mut Context<'_>) -> Poll<()> {
// loop only exit on `Conn` channel shutdown or any poll method returns Pending state.
loop {
match *running {
Running::Unavailable => match self.poll_ready(cx) {
Ok(true) => {
*running = Running::Available;
}
Ok(false) => return Poll::Pending,
Err((token, idx)) => {
let restart = self.restart_service(token, idx);
*running = Running::Restart(restart);
}
},
Running::Restart(Restart {
factory_id,
token,
ref mut fut,
}) => {
let (token_new, service) =
ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| {
// Restart failure would result in a panic drop of ServerWorker.
// This would prevent busy loop of poll_running.
panic!(
"Can not restart {:?} service",
self.factories[factory_id].name(token)
)
});
assert_eq!(token, token_new);
trace!(
"Service {:?} has been restarted",
self.factories[factory_id].name(token)
);
self.services[token].created(service);
*running = Running::Unavailable;
}
// actively poll Conn channel and handle MioStream.
Running::Available => loop {
match self.poll_ready(cx) {
Ok(true) => {}
Ok(false) => {
trace!("Worker is unavailable");
*running = Running::Unavailable;
}
Err((token, idx)) => {
let restart = self.restart_service(token, idx);
*running = Running::Restart(restart);
}
}
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
Some(msg) => {
let guard = self.counter.guard();
let _ = self.services[msg.token].service.call((guard, msg.io));
}
None => return Poll::Ready(()),
};
},
}
}
}
/// `Stop` message handler.
///
/// Return Ready when worker should shutdown.
fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll<Option<DelayShutdown>> {
match ready!(Pin::new(&mut self.rx2).poll_recv(cx)) {
Some(Stop { graceful, tx }) => {
self.rx2.close();
let num = self.counter.total();
if num == 0 {
info!("Shutting down worker, 0 connections");
let _ = tx.send(true);
Poll::Ready(None)
} else if graceful {
info!("Graceful worker shutdown, {} connections", num);
self.shutdown(false);
let shutdown = DelayShutdown {
timer: Box::pin(sleep(Duration::from_secs(1))),
start_from: Instant::now(),
tx,
};
Poll::Ready(Some(shutdown))
} else {
info!("Force shutdown worker, {} connections", num);
self.shutdown(true);
let _ = tx.send(false);
Poll::Ready(None)
}
}
None => Poll::Pending,
}
}
/// Check readiness of services.
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Result<bool, (usize, usize)> {
let mut ready = true;
for (idx, srv) in self.services.iter_mut().enumerate() {
if srv.status == WorkerServiceStatus::Available
|| srv.status == WorkerServiceStatus::Unavailable
{
match srv.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
if srv.status == WorkerServiceStatus::Unavailable {
trace!(
"Service {:?} is available",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Available;
}
}
Poll::Pending => {
ready = false;
if srv.status == WorkerServiceStatus::Available {
trace!(
"Service {:?} is unavailable",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Unavailable;
}
}
Poll::Ready(Err(_)) => {
error!(
"Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Failed;
return Err((idx, srv.factory));
}
}
}
}
Ok(ready)
}
/// Delay shutdown and drain all unhandled `Conn`.
fn poll_shutdown(
&mut self,
delay: &mut DelayShutdown,
running: &mut Running,
cx: &mut Context<'_>,
) -> Poll<bool> {
if self.counter.total() == 0 {
// Graceful shutdown.
Poll::Ready(true)
} else if delay.start_from.elapsed() >= self.shutdown_timeout {
// Timeout forceful shutdown.
Poll::Ready(false)
} else {
// Poll Running state and try to drain all `Conn` from channel.
let _ = self.poll_running(running, cx);
// Wait for 1 second.
ready!(delay.timer.as_mut().poll(cx));
// Reset timer and try again.
let time = Instant::now() + Duration::from_secs(1);
delay.timer.as_mut().reset(time);
delay.timer.as_mut().poll(cx).map(|_| false)
}
}
fn restart_service(&mut self, idx: usize, factory_id: usize) -> Restart {
let factory = &self.factories[factory_id];
trace!("Service {:?} failed, restarting", factory.name(idx));
self.services[idx].status = WorkerServiceStatus::Restarting;
Restart {
factory_id,
token: idx,
fut: factory.create(),
}
}
fn shutdown(&mut self, force: bool) {
self.services
.iter_mut()
.filter(|srv| srv.status == WorkerServiceStatus::Available)
.for_each(|srv| {
srv.status = if force {
WorkerServiceStatus::Stopped
} else {
WorkerServiceStatus::Stopping
};
});
}
}
struct WorkerService {
factory: usize,
status: WorkerServiceStatus,
service: BoxedServerService,
}
impl WorkerService {
fn created(&mut self, service: BoxedServerService) {
self.service = service;
self.status = WorkerServiceStatus::Unavailable;
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum WorkerServiceStatus {
Available,
Unavailable,
Failed,
Restarting,
Stopping,
Stopped,
}
/// Config for worker behavior passed down from server builder.
#[derive(Copy, Clone)]
pub(crate) struct ServerWorkerConfig {
shutdown_timeout: Duration,
max_blocking_threads: usize,
max_concurrent_connections: usize,
}
impl Default for ServerWorkerConfig {
fn default() -> Self {
// 512 is the default max blocking thread count of tokio runtime.
let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1);
Self {
shutdown_timeout: Duration::from_secs(30),
max_blocking_threads,
max_concurrent_connections: 25600,
}
}
}
impl ServerWorkerConfig {
pub(crate) fn max_blocking_threads(&mut self, num: usize) {
self.max_blocking_threads = num;
}
pub(crate) fn max_concurrent_connections(&mut self, num: usize) {
self.max_concurrent_connections = num;
}
pub(crate) fn shutdown_timeout(&mut self, dur: Duration) {
self.shutdown_timeout = dur;
}
}
enum WorkerState {
Running(Running),
DelayShutdown(DelayShutdown, Running),
}
impl Default for WorkerState {
fn default() -> Self {
Self::Running(Running::Unavailable)
}
}
enum Running {
Available,
Unavailable,
Restart(Restart),
}
struct Restart {
factory_id: usize,
token: usize,
fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>,
}
// Keep states necessary for delayed server shutdown:
// Sleep for interval check the shutdown progress.
// Instant for the start time of shutdown.
// Sender for send back the shutdown outcome(force/grace) to `Stop` caller.
struct DelayShutdown {
timer: Pin<Box<Sleep>>,
start_from: Instant,
tx: oneshot::Sender<bool>,
}
impl Drop for ServerWorker {
fn drop(&mut self) {
// Stop the Arbiter ServerWorker runs on on drop.
Arbiter::current().stop();
}
}
impl Future for ServerWorker {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
// poll Stop message first.
match this.worker.poll_stop(cx) {
Poll::Pending => {}
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(delay)) => {
// Take running state and pass it to DelayShutdown.
// During shutdown there could be unhandled `Conn` message left in channel.
// They should be drainned and worker would try to handle them all until
// delay shutdown timeout met.
this.state = match mem::take(&mut this.state) {
WorkerState::Running(running) => WorkerState::DelayShutdown(delay, running),
_ => unreachable!(
"Duplicate worker::Stop message sent to one worker::ServerWorker."
),
};
}
}
match this.state {
WorkerState::Running(ref mut running) => this.worker.poll_running(running, cx),
WorkerState::DelayShutdown(ref mut delay, ref mut running) => {
let is_graceful = ready!(this.worker.poll_shutdown(delay, running, cx));
// Report back shutdown outcome to caller.
if let WorkerState::DelayShutdown(delay, _) = mem::take(&mut this.state) {
let _ = delay.tx.send(is_graceful);
}
Poll::Ready(())
}
}
}
}

View File

@ -0,0 +1,35 @@
use std::time::Duration;
/// Config for worker behavior passed down from server builder.
#[derive(Copy, Clone)]
pub(crate) struct ServerWorkerConfig {
pub(super) shutdown_timeout: Duration,
pub(super) max_blocking_threads: usize,
pub(super) max_concurrent_connections: usize,
}
impl Default for ServerWorkerConfig {
fn default() -> Self {
// 512 is the default max blocking thread count of tokio runtime.
let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1);
Self {
shutdown_timeout: Duration::from_secs(30),
max_blocking_threads,
max_concurrent_connections: 25600,
}
}
}
impl ServerWorkerConfig {
pub(crate) fn max_blocking_threads(&mut self, num: usize) {
self.max_blocking_threads = num;
}
pub(crate) fn max_concurrent_connections(&mut self, num: usize) {
self.max_concurrent_connections = num;
}
pub(crate) fn shutdown_timeout(&mut self, dur: Duration) {
self.shutdown_timeout = dur;
}
}

View File

@ -0,0 +1,106 @@
use std::rc::Rc;
#[cfg(_loom)]
use loom::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[cfg(not(_loom))]
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use crate::waker_queue::{WakerInterest, WakerQueue};
/// counter: Arc<AtomicUsize> field is owned by `Accept` thread and `ServerWorker` thread.
///
/// `Accept` would increment the counter and `ServerWorker` would decrement it.
///
/// # Atomic Ordering:
///
/// `Accept` always look into it's cached `Availability` field for `ServerWorker` state.
/// It lazily increment counter after successful dispatching new work to `ServerWorker`.
/// On reaching counter limit `Accept` update it's cached `Availability` and mark worker as
/// unable to accept any work.
///
/// `ServerWorker` always decrement the counter when every work received from `Accept` is done.
/// On reaching counter limit worker would use `mio::Waker` and `WakerQueue` to wake up `Accept`
/// and notify it to update cached `Availability` again to mark worker as able to accept work again.
///
/// Hense a wake up would only happen after `Accept` increment it to limit.
/// And a decrement to limit always wake up `Accept`.
#[derive(Clone)]
pub(crate) struct Counter {
counter: Arc<AtomicUsize>,
limit: usize,
}
impl Counter {
pub(crate) fn new(limit: usize) -> Self {
Self {
counter: Arc::new(AtomicUsize::new(1)),
limit,
}
}
/// Increment counter by 1 and return true when hitting limit
#[inline(always)]
pub(crate) fn inc(&self) -> bool {
self.counter.fetch_add(1, Ordering::Relaxed) != self.limit
}
/// Decrement counter by 1 and return true if crossing limit.
#[inline(always)]
pub(crate) fn dec(&self) -> bool {
self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit
}
pub(crate) fn total(&self) -> usize {
self.counter.load(Ordering::SeqCst) - 1
}
}
pub(super) struct WorkerCounter<C> {
idx: usize,
inner: Rc<(WakerQueue<C>, Counter)>,
}
impl<C> Clone for WorkerCounter<C> {
fn clone(&self) -> Self {
Self {
idx: self.idx,
inner: self.inner.clone(),
}
}
}
impl<C> WorkerCounter<C> {
pub(super) fn new(idx: usize, waker_queue: WakerQueue<C>, counter: Counter) -> Self {
Self {
idx,
inner: Rc::new((waker_queue, counter)),
}
}
#[inline(always)]
pub(super) fn guard(&self) -> WorkerCounterGuard<C> {
WorkerCounterGuard(self.clone())
}
pub(super) fn total(&self) -> usize {
self.inner.1.total()
}
}
pub(crate) struct WorkerCounterGuard<C>(WorkerCounter<C>);
impl<C> Drop for WorkerCounterGuard<C> {
fn drop(&mut self) {
let (waker_queue, counter) = &*self.0.inner;
if counter.dec() {
waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx));
}
}
}

View File

@ -0,0 +1,495 @@
mod config;
mod counter;
pub(crate) use config::ServerWorkerConfig;
pub(crate) use counter::{Counter, WorkerCounterGuard};
use std::{
future::Future,
mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use actix_rt::{
spawn,
time::{sleep, Instant, Sleep},
Arbiter,
};
use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot,
};
use counter::WorkerCounter;
use crate::join_all;
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::waker_queue::WakerQueue;
pub(crate) enum WorkerMessage<C> {
Conn(Conn<C>),
Stop(Stop),
}
/// Connection message. token field is used to find the corresponding Service from
/// Worker that can handle C type correctly.
#[derive(Debug)]
pub(crate) struct Conn<C> {
pub token: usize,
pub io: C,
}
/// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute.
pub(crate) enum Stop {
Graceful(oneshot::Sender<bool>),
Force,
}
// TOD: Remove default MioStream type.
/// Handle to worker that can send message to worker.
pub(crate) struct WorkerHandleAccept<C> {
idx: usize,
tx: UnboundedSender<WorkerMessage<C>>,
counter: Counter,
}
impl<C> WorkerHandleAccept<C> {
#[inline(always)]
pub(crate) fn idx(&self) -> usize {
self.idx
}
#[inline(always)]
pub(crate) fn inc_counter(&self) -> bool {
self.counter.inc()
}
#[inline(always)]
pub(crate) fn send(&self, conn: Conn<C>) -> Result<(), Conn<C>> {
self.tx
.send(WorkerMessage::Conn(conn))
.map_err(|msg| match msg.0 {
WorkerMessage::Conn(conn) => conn,
_ => unreachable!(),
})
}
pub(crate) fn stop(&self, graceful: bool) -> Option<oneshot::Receiver<bool>> {
let (stop, rx) = if graceful {
let (tx, rx) = oneshot::channel();
(Stop::Graceful(tx), Some(rx))
} else {
(Stop::Force, None)
};
let _ = self.tx.send(WorkerMessage::Stop(stop));
rx
}
}
/// Server worker.
///
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
pub(crate) struct Worker<C> {
worker: WorkerInner<C>,
state: WorkerState<C>,
}
impl<C> Worker<C>
where
C: Send + 'static,
{
fn new(
rx: UnboundedReceiver<WorkerMessage<C>>,
counter: WorkerCounter<C>,
services: Box<[WorkerService<C>]>,
factories: Box<[Box<dyn InternalServiceFactory<C>>]>,
shutdown_timeout: Duration,
) -> Self {
Self {
worker: WorkerInner {
rx,
counter,
services,
factories,
shutdown_timeout,
},
state: WorkerState::default(),
}
}
pub(crate) fn start(
idx: usize,
factories: Box<[Box<dyn InternalServiceFactory<C>>]>,
waker_queue: WakerQueue<C>,
config: ServerWorkerConfig,
) -> WorkerHandleAccept<C> {
let (tx, rx) = unbounded_channel();
let counter = Counter::new(config.max_concurrent_connections);
let counter_clone = counter.clone();
// every worker runs in it's own arbiter.
// use a custom tokio runtime builder to change the settings of runtime.
Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
})
.spawn(async move {
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
})
.collect::<Vec<_>>();
// a second spawn to run !Send future tasks.
spawn(async move {
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
return;
}
};
let counter = WorkerCounter::new(idx, waker_queue, counter_clone);
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(Worker::new(
rx,
counter,
services,
factories,
config.shutdown_timeout,
));
});
});
WorkerHandleAccept { idx, tx, counter }
}
}
struct WorkerInner<C> {
// UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<WorkerMessage<C>>,
counter: WorkerCounter<C>,
services: Box<[WorkerService<C>]>,
factories: Box<[Box<dyn InternalServiceFactory<C>>]>,
shutdown_timeout: Duration,
}
impl<C> WorkerInner<C> {
/// `Conn` message and worker/service state switch handler
fn poll_running(&mut self, running: &mut Running<C>, cx: &mut Context<'_>) -> Poll<Stop> {
match *running {
// Actively poll Conn channel and handle MioStream.
Running::Available => loop {
match self.poll_ready(cx) {
Ok(true) => match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
Some(WorkerMessage::Conn(conn)) => {
let guard = self.counter.guard();
let _ = self.services[conn.token].service.call((guard, conn.io));
}
Some(WorkerMessage::Stop(stop)) => return Poll::Ready(stop),
None => return Poll::Ready(Stop::Force),
},
Ok(false) => {
trace!("Worker is unavailable");
return Poll::Pending;
}
Err((token, idx)) => {
let restart = self.restart_service(token, idx);
*running = Running::Restart(restart);
return self.poll_running(running, cx);
}
}
},
Running::Restart(Restart {
factory_id,
token,
ref mut fut,
}) => {
let name = self.factories[factory_id].name(token);
let (token_new, service) = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| {
// Restart failure would result in a panic drop of ServerWorker.
// This would prevent busy loop of poll_running.
panic!("Can not restart {:?} service", name)
});
assert_eq!(token, token_new);
trace!("Service {:?} has been restarted", name);
self.services[token].created(service);
*running = Running::Available;
self.poll_running(running, cx)
}
}
}
/// Delay shutdown and drain all unhandled `Conn`.
fn poll_shutdown(
&mut self,
delay: &mut DelayShutdown,
running: &mut Running<C>,
cx: &mut Context<'_>,
) -> Poll<bool> {
let num = self.counter.total();
if num == 0 {
// Graceful shutdown.
info!("Graceful worker shutdown, 0 connections unhandled");
Poll::Ready(true)
} else if delay.start_from.elapsed() >= self.shutdown_timeout {
// Timeout forceful shutdown.
info!(
"Graceful worker shutdown timeout, {} connections unhandled",
num
);
Poll::Ready(false)
} else {
// Poll Running state and try to drain all `Conn` from channel.
let _ = self.poll_running(running, cx);
// Wait for 1 second.
ready!(delay.timer.as_mut().poll(cx));
// Reset timer and try again.
let time = Instant::now() + Duration::from_secs(1);
delay.timer.as_mut().reset(time);
delay.timer.as_mut().poll(cx).map(|_| false)
}
}
/// Check readiness of services.
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Result<bool, (usize, usize)> {
let mut ready = true;
for (idx, srv) in self.services.iter_mut().enumerate() {
if srv.status == WorkerServiceStatus::Available
|| srv.status == WorkerServiceStatus::Unavailable
{
match srv.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
if srv.status == WorkerServiceStatus::Unavailable {
trace!(
"Service {:?} is available",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Available;
}
}
Poll::Pending => {
ready = false;
if srv.status == WorkerServiceStatus::Available {
trace!(
"Service {:?} is unavailable",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Unavailable;
}
}
Poll::Ready(Err(_)) => {
error!(
"Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(idx)
);
srv.status = WorkerServiceStatus::Failed;
return Err((idx, srv.factory));
}
}
}
}
Ok(ready)
}
/// `Stop` message handler.
///
/// Return Some<DelayShutdown> when worker should enter delay shutdown.
/// Return None when worker is ready to shutdown in place.
fn try_shutdown(&mut self, stop: Stop) -> Option<DelayShutdown> {
let num = self.counter.total();
match stop {
Stop::Graceful(tx) => {
self.shutdown_service(false);
let shutdown = DelayShutdown {
timer: Box::pin(sleep(Duration::from_secs(1))),
start_from: Instant::now(),
tx,
};
Some(shutdown)
}
Stop::Force => {
info!("Force worker shutdown, {} connections unhandled", num);
self.shutdown_service(true);
None
}
}
}
fn restart_service(&mut self, idx: usize, factory_id: usize) -> Restart<C> {
let factory = &self.factories[factory_id];
trace!("Service {:?} failed, restarting", factory.name(idx));
self.services[idx].status = WorkerServiceStatus::Restarting;
Restart {
factory_id,
token: idx,
fut: factory.create(),
}
}
fn shutdown_service(&mut self, force: bool) {
self.services
.iter_mut()
.filter(|srv| srv.status == WorkerServiceStatus::Available)
.for_each(|srv| {
srv.status = if force {
WorkerServiceStatus::Stopped
} else {
WorkerServiceStatus::Stopping
};
});
}
}
struct WorkerService<C> {
factory: usize,
status: WorkerServiceStatus,
service: BoxedServerService<C>,
}
impl<C> WorkerService<C> {
fn created(&mut self, service: BoxedServerService<C>) {
self.service = service;
self.status = WorkerServiceStatus::Unavailable;
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum WorkerServiceStatus {
Available,
Unavailable,
Failed,
Restarting,
Stopping,
Stopped,
}
enum WorkerState<C> {
Running(Running<C>),
DelayShutdown(DelayShutdown, Running<C>),
}
impl<C> Default for WorkerState<C> {
fn default() -> Self {
Self::Running(Running::Available)
}
}
enum Running<C> {
Available,
Restart(Restart<C>),
}
struct Restart<C> {
factory_id: usize,
token: usize,
fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService<C>), ()>>,
}
// Keep states necessary for delayed server shutdown:
// Sleep for interval check the shutdown progress.
// Instant for the start time of shutdown.
// Sender for send back the shutdown outcome(force/grace) to `Stop` caller.
struct DelayShutdown {
timer: Pin<Box<Sleep>>,
start_from: Instant,
tx: oneshot::Sender<bool>,
}
impl<C> Drop for Worker<C> {
fn drop(&mut self) {
// Stop the Arbiter ServerWorker runs on on drop.
Arbiter::current().stop();
}
}
impl<C> Future for Worker<C> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().get_mut();
match this.state {
WorkerState::Running(ref mut running) => {
let stop = ready!(this.worker.poll_running(running, cx));
match this.worker.try_shutdown(stop) {
Some(delay) => {
// Take running state and pass it to DelayShutdown.
// During shutdown there could be unhandled `Conn` message left in channel.
// They should be drained and worker would try to handle them all until
// delay shutdown timeout met.
this.state = match mem::take(&mut this.state) {
WorkerState::Running(running) => {
WorkerState::DelayShutdown(delay, running)
}
_ => unreachable!("ServerWorker enter DelayShutdown already"),
};
self.poll(cx)
}
None => Poll::Ready(()),
}
}
WorkerState::DelayShutdown(ref mut delay, ref mut running) => {
let is_graceful = ready!(this.worker.poll_shutdown(delay, running, cx));
// Report back shutdown outcome to caller.
if let WorkerState::DelayShutdown(delay, _) = mem::take(&mut this.state) {
let _ = delay.tx.send(is_graceful);
}
Poll::Ready(())
}
}
}
}