mirror of https://github.com/fafhrd91/actix-net
checkpoint based off fake's earlier work
This commit is contained in:
parent
9662a6e51c
commit
7b99e85797
|
@ -88,15 +88,9 @@ async fn run() -> io::Result<()> {
|
|||
.await
|
||||
}
|
||||
|
||||
fn main() -> io::Result<()> {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let ls = tokio::task::LocalSet::new();
|
||||
rt.block_on(ls.run_until(run()))?;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> io::Result<()> {
|
||||
run().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -1,17 +1,20 @@
|
|||
use std::time::Duration;
|
||||
use std::{io, thread};
|
||||
|
||||
use actix_rt::{
|
||||
time::{sleep, Instant},
|
||||
System,
|
||||
};
|
||||
use actix_rt::time::Instant;
|
||||
use actix_rt::{time::sleep, System};
|
||||
use log::{debug, error, info};
|
||||
use mio::{Interest, Poll, Token as MioToken};
|
||||
|
||||
use crate::server::ServerHandle;
|
||||
use crate::socket::MioListener;
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
|
||||
use crate::worker::{Conn, WorkerHandleAccept};
|
||||
use crate::worker::ServerWorker;
|
||||
use crate::{
|
||||
availability::Availability,
|
||||
server::ServerHandle,
|
||||
socket::MioListener,
|
||||
waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN},
|
||||
worker::{Conn, WorkerHandleAccept, WorkerHandleServer},
|
||||
ServerBuilder,
|
||||
};
|
||||
|
||||
struct ServerSocketInfo {
|
||||
token: usize,
|
||||
|
@ -20,59 +23,13 @@ struct ServerSocketInfo {
|
|||
|
||||
/// Timeout is used to mark the deadline when this socket's listener should be registered again
|
||||
/// after an error.
|
||||
timeout: Option<Instant>,
|
||||
}
|
||||
|
||||
/// Accept loop would live with `ServerBuilder`.
|
||||
///
|
||||
/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to
|
||||
/// `Accept` and `Worker`.
|
||||
///
|
||||
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
|
||||
pub(crate) struct AcceptLoop {
|
||||
srv: Option<ServerHandle>,
|
||||
poll: Option<Poll>,
|
||||
waker: WakerQueue,
|
||||
}
|
||||
|
||||
impl AcceptLoop {
|
||||
pub fn new(srv: ServerHandle) -> Self {
|
||||
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
|
||||
let waker = WakerQueue::new(poll.registry())
|
||||
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
|
||||
|
||||
Self {
|
||||
srv: Some(srv),
|
||||
poll: Some(poll),
|
||||
waker,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn waker_owned(&self) -> WakerQueue {
|
||||
self.waker.clone()
|
||||
}
|
||||
|
||||
pub fn wake(&self, i: WakerInterest) {
|
||||
self.waker.wake(i);
|
||||
}
|
||||
|
||||
pub(crate) fn start(
|
||||
&mut self,
|
||||
socks: Vec<(usize, MioListener)>,
|
||||
handles: Vec<WorkerHandleAccept>,
|
||||
) {
|
||||
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
|
||||
let poll = self.poll.take().unwrap();
|
||||
let waker = self.waker.clone();
|
||||
|
||||
Accept::start(poll, waker, socks, srv, handles).expect("accept failed to start");
|
||||
}
|
||||
timeout: Option<actix_rt::time::Instant>,
|
||||
}
|
||||
|
||||
/// poll instance of the server.
|
||||
struct Accept {
|
||||
pub(crate) struct Accept {
|
||||
poll: Poll,
|
||||
waker: WakerQueue,
|
||||
waker_queue: WakerQueue,
|
||||
handles: Vec<WorkerHandleAccept>,
|
||||
srv: ServerHandle,
|
||||
next: usize,
|
||||
|
@ -80,111 +37,58 @@ struct Accept {
|
|||
paused: bool,
|
||||
}
|
||||
|
||||
/// Array of u128 with every bit as marker for a worker handle's availability.
|
||||
#[derive(Debug, Default)]
|
||||
struct Availability([u128; 4]);
|
||||
|
||||
impl Availability {
|
||||
/// Check if any worker handle is available
|
||||
#[inline(always)]
|
||||
fn available(&self) -> bool {
|
||||
self.0.iter().any(|a| *a != 0)
|
||||
}
|
||||
|
||||
/// Check if worker handle is available by index
|
||||
#[inline(always)]
|
||||
fn get_available(&self, idx: usize) -> bool {
|
||||
let (offset, idx) = Self::offset(idx);
|
||||
|
||||
self.0[offset] & (1 << idx as u128) != 0
|
||||
}
|
||||
|
||||
/// Set worker handle available state by index.
|
||||
fn set_available(&mut self, idx: usize, avail: bool) {
|
||||
let (offset, idx) = Self::offset(idx);
|
||||
|
||||
let off = 1 << idx as u128;
|
||||
if avail {
|
||||
self.0[offset] |= off;
|
||||
} else {
|
||||
self.0[offset] &= !off
|
||||
}
|
||||
}
|
||||
|
||||
/// Set all worker handle to available state.
|
||||
/// This would result in a re-check on all workers' availability.
|
||||
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
|
||||
handles.iter().for_each(|handle| {
|
||||
self.set_available(handle.idx(), true);
|
||||
})
|
||||
}
|
||||
|
||||
/// Get offset and adjusted index of given worker handle index.
|
||||
fn offset(idx: usize) -> (usize, usize) {
|
||||
if idx < 128 {
|
||||
(0, idx)
|
||||
} else if idx < 128 * 2 {
|
||||
(1, idx - 128)
|
||||
} else if idx < 128 * 3 {
|
||||
(2, idx - 128 * 2)
|
||||
} else if idx < 128 * 4 {
|
||||
(3, idx - 128 * 3)
|
||||
} else {
|
||||
panic!("Max WorkerHandle count is 512")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This function defines errors that are per-connection. Which basically
|
||||
/// means that if we get this error from `accept()` system call it means
|
||||
/// next connection might be ready to be accepted.
|
||||
///
|
||||
/// All other errors will incur a timeout before next `accept()` is performed.
|
||||
/// The timeout is useful to handle resource exhaustion errors like ENFILE
|
||||
/// and EMFILE. Otherwise, could enter into tight loop.
|
||||
fn connection_error(e: &io::Error) -> bool {
|
||||
e.kind() == io::ErrorKind::ConnectionRefused
|
||||
|| e.kind() == io::ErrorKind::ConnectionAborted
|
||||
|| e.kind() == io::ErrorKind::ConnectionReset
|
||||
}
|
||||
|
||||
impl Accept {
|
||||
pub(crate) fn start(
|
||||
poll: Poll,
|
||||
waker: WakerQueue,
|
||||
socks: Vec<(usize, MioListener)>,
|
||||
srv: ServerHandle,
|
||||
handles: Vec<WorkerHandleAccept>,
|
||||
) -> io::Result<()> {
|
||||
// Accept runs in its own thread and might spawn additional futures to current system
|
||||
let sys = System::try_current();
|
||||
sockets: Vec<(usize, MioListener)>,
|
||||
builder: &ServerBuilder,
|
||||
) -> io::Result<(WakerQueue, Vec<WorkerHandleServer>)> {
|
||||
let handle_server = ServerHandle::new(builder.cmd_tx.clone());
|
||||
|
||||
let (mut accept, mut sockets) =
|
||||
Accept::new_with_sockets(poll, waker, socks, handles, srv)?;
|
||||
// construct poll instance and its waker
|
||||
let poll = Poll::new()?;
|
||||
let waker_queue = WakerQueue::new(poll.registry())?;
|
||||
|
||||
// start workers and collect handles
|
||||
let (handles_accept, handles_server) = (0..builder.threads)
|
||||
.map(|idx| {
|
||||
// clone service factories
|
||||
let factories = builder
|
||||
.factories
|
||||
.iter()
|
||||
.map(|f| f.clone_factory())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// start worker using service factories
|
||||
ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config)
|
||||
})
|
||||
.collect::<io::Result<Vec<_>>>()?
|
||||
.into_iter()
|
||||
.unzip();
|
||||
|
||||
let (mut accept, mut sockets) = Accept::new_with_sockets(
|
||||
poll,
|
||||
waker_queue.clone(),
|
||||
sockets,
|
||||
handles_accept,
|
||||
handle_server,
|
||||
)?;
|
||||
|
||||
thread::Builder::new()
|
||||
.name("actix-server accept loop".to_owned())
|
||||
.spawn(move || {
|
||||
// forward existing actix system context
|
||||
if let Some(sys) = sys {
|
||||
System::set_current(sys);
|
||||
}
|
||||
.name("actix-server acceptor".to_owned())
|
||||
.spawn(move || accept.poll_with(&mut sockets))
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
|
||||
|
||||
accept.poll_with(&mut sockets);
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
Ok(())
|
||||
Ok((waker_queue, handles_server))
|
||||
}
|
||||
|
||||
fn new_with_sockets(
|
||||
poll: Poll,
|
||||
waker: WakerQueue,
|
||||
socks: Vec<(usize, MioListener)>,
|
||||
handles: Vec<WorkerHandleAccept>,
|
||||
srv: ServerHandle,
|
||||
waker_queue: WakerQueue,
|
||||
sockets: Vec<(usize, MioListener)>,
|
||||
accept_handles: Vec<WorkerHandleAccept>,
|
||||
server_handle: ServerHandle,
|
||||
) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> {
|
||||
let sockets = socks
|
||||
let sockets = sockets
|
||||
.into_iter()
|
||||
.map(|(token, mut lst)| {
|
||||
// Start listening for incoming connections
|
||||
|
@ -202,13 +106,13 @@ impl Accept {
|
|||
let mut avail = Availability::default();
|
||||
|
||||
// Assume all handles are avail at construct time.
|
||||
avail.set_available_all(&handles);
|
||||
avail.set_available_all(&accept_handles);
|
||||
|
||||
let accept = Accept {
|
||||
poll,
|
||||
waker,
|
||||
handles,
|
||||
srv,
|
||||
waker_queue,
|
||||
handles: accept_handles,
|
||||
srv: server_handle,
|
||||
next: 0,
|
||||
avail,
|
||||
paused: false,
|
||||
|
@ -217,8 +121,9 @@ impl Accept {
|
|||
Ok((accept, sockets))
|
||||
}
|
||||
|
||||
/// blocking wait for readiness events triggered by mio
|
||||
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
|
||||
let mut events = mio::Events::with_capacity(128);
|
||||
let mut events = mio::Events::with_capacity(256);
|
||||
|
||||
loop {
|
||||
if let Err(e) = self.poll.poll(&mut events, None) {
|
||||
|
@ -254,7 +159,7 @@ impl Accept {
|
|||
loop {
|
||||
// take guard with every iteration so no new interest can be added
|
||||
// until the current task is done.
|
||||
let mut guard = self.waker.guard();
|
||||
let mut guard = self.waker_queue.guard();
|
||||
match guard.pop_front() {
|
||||
// worker notify it becomes available.
|
||||
Some(WakerInterest::WorkerAvailable(idx)) => {
|
||||
|
@ -325,6 +230,7 @@ impl Accept {
|
|||
|
||||
fn process_timer(&self, sockets: &mut [ServerSocketInfo]) {
|
||||
let now = Instant::now();
|
||||
|
||||
sockets
|
||||
.iter_mut()
|
||||
// Only sockets that had an associated timeout were deregistered.
|
||||
|
@ -387,12 +293,12 @@ impl Accept {
|
|||
fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
|
||||
// This is a best effort implementation with following limitation:
|
||||
//
|
||||
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout
|
||||
// is removed in the process.
|
||||
// Every ServerSocketInfo with associated timeout will be skipped and it's timeout is
|
||||
// removed in the process.
|
||||
//
|
||||
// Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short
|
||||
// gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered
|
||||
// before expected timing.
|
||||
// Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short gap
|
||||
// (less than 500ms) would cause all timing out ServerSocketInfos be re-registered before
|
||||
// expected timing.
|
||||
sockets
|
||||
.iter_mut()
|
||||
// Take all timeout.
|
||||
|
@ -483,7 +389,7 @@ impl Accept {
|
|||
info.timeout = Some(Instant::now() + Duration::from_millis(500));
|
||||
|
||||
// after the sleep a Timer interest is sent to Accept Poll
|
||||
let waker = self.waker.clone();
|
||||
let waker = self.waker_queue.clone();
|
||||
|
||||
match System::try_current() {
|
||||
Some(sys) => {
|
||||
|
@ -539,67 +445,14 @@ impl Accept {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::Availability;
|
||||
|
||||
fn single(aval: &mut Availability, idx: usize) {
|
||||
aval.set_available(idx, true);
|
||||
assert!(aval.available());
|
||||
|
||||
aval.set_available(idx, true);
|
||||
|
||||
aval.set_available(idx, false);
|
||||
assert!(!aval.available());
|
||||
|
||||
aval.set_available(idx, false);
|
||||
assert!(!aval.available());
|
||||
}
|
||||
|
||||
fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
|
||||
idx.iter().for_each(|idx| aval.set_available(*idx, true));
|
||||
|
||||
assert!(aval.available());
|
||||
|
||||
while let Some(idx) = idx.pop() {
|
||||
assert!(aval.available());
|
||||
aval.set_available(idx, false);
|
||||
}
|
||||
|
||||
assert!(!aval.available());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn availability() {
|
||||
let mut aval = Availability::default();
|
||||
|
||||
single(&mut aval, 1);
|
||||
single(&mut aval, 128);
|
||||
single(&mut aval, 256);
|
||||
single(&mut aval, 511);
|
||||
|
||||
let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect();
|
||||
|
||||
multi(&mut aval, idx);
|
||||
|
||||
multi(&mut aval, (0..511).collect())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn overflow() {
|
||||
let mut aval = Availability::default();
|
||||
single(&mut aval, 512);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pin_point() {
|
||||
let mut aval = Availability::default();
|
||||
|
||||
aval.set_available(438, true);
|
||||
|
||||
aval.set_available(479, true);
|
||||
|
||||
assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384));
|
||||
}
|
||||
/// This function defines errors that are per-connection; if we get this error from the `accept()`
|
||||
/// system call it means the next connection might be ready to be accepted.
|
||||
///
|
||||
/// All other errors will incur a timeout before next `accept()` call is attempted. The timeout is
|
||||
/// useful to handle resource exhaustion errors like `ENFILE` and `EMFILE`. Otherwise, it could
|
||||
/// enter into a temporary spin loop.
|
||||
fn connection_error(e: &io::Error) -> bool {
|
||||
e.kind() == io::ErrorKind::ConnectionRefused
|
||||
|| e.kind() == io::ErrorKind::ConnectionAborted
|
||||
|| e.kind() == io::ErrorKind::ConnectionReset
|
||||
}
|
||||
|
|
|
@ -1,43 +1,31 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
io, mem,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
time::Duration,
|
||||
};
|
||||
use std::{io, time::Duration};
|
||||
|
||||
use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
|
||||
use log::{error, info, trace};
|
||||
use tokio::sync::{
|
||||
mpsc::{unbounded_channel, UnboundedReceiver},
|
||||
oneshot,
|
||||
};
|
||||
use actix_rt::net::TcpStream;
|
||||
use log::trace;
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
||||
|
||||
use crate::accept::AcceptLoop;
|
||||
use crate::join_all;
|
||||
use crate::server::{ServerCommand, ServerHandle};
|
||||
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
|
||||
use crate::signals::{Signal, Signals};
|
||||
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
||||
use crate::socket::{MioTcpListener, MioTcpSocket};
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue};
|
||||
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
|
||||
use crate::{
|
||||
server::{ServerCommand, ServerHandle},
|
||||
service::{InternalServiceFactory, ServiceFactory, StreamNewService},
|
||||
socket::{
|
||||
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
|
||||
},
|
||||
worker::ServerWorkerConfig,
|
||||
Server,
|
||||
};
|
||||
|
||||
/// Server builder
|
||||
pub struct ServerBuilder {
|
||||
threads: usize,
|
||||
token: usize,
|
||||
backlog: u32,
|
||||
handles: Vec<(usize, WorkerHandleServer)>,
|
||||
services: Vec<Box<dyn InternalServiceFactory>>,
|
||||
sockets: Vec<(usize, String, MioListener)>,
|
||||
accept: AcceptLoop,
|
||||
exit: bool,
|
||||
no_signals: bool,
|
||||
cmd: UnboundedReceiver<ServerCommand>,
|
||||
server: ServerHandle,
|
||||
notify: Vec<oneshot::Sender<()>>,
|
||||
worker_config: ServerWorkerConfig,
|
||||
pub(super) threads: usize,
|
||||
pub(super) token: usize,
|
||||
pub(super) backlog: u32,
|
||||
pub(super) factories: Vec<Box<dyn InternalServiceFactory>>,
|
||||
pub(super) sockets: Vec<(usize, String, MioListener)>,
|
||||
pub(super) exit: bool,
|
||||
pub(super) listen_os_signals: bool,
|
||||
pub(super) cmd_tx: UnboundedSender<ServerCommand>,
|
||||
pub(super) cmd_rx: UnboundedReceiver<ServerCommand>,
|
||||
pub(super) worker_config: ServerWorkerConfig,
|
||||
}
|
||||
|
||||
impl Default for ServerBuilder {
|
||||
|
@ -50,21 +38,18 @@ impl ServerBuilder {
|
|||
/// Create new Server builder instance
|
||||
pub fn new() -> ServerBuilder {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
let server = ServerHandle::new(tx);
|
||||
let _server = ServerHandle::new(tx.clone());
|
||||
|
||||
ServerBuilder {
|
||||
threads: num_cpus::get(),
|
||||
token: 0,
|
||||
handles: Vec::new(),
|
||||
services: Vec::new(),
|
||||
factories: Vec::new(),
|
||||
sockets: Vec::new(),
|
||||
accept: AcceptLoop::new(server.clone()),
|
||||
backlog: 2048,
|
||||
exit: false,
|
||||
no_signals: false,
|
||||
cmd: rx,
|
||||
notify: Vec::new(),
|
||||
server,
|
||||
listen_os_signals: true,
|
||||
cmd_tx: tx,
|
||||
cmd_rx: rx,
|
||||
worker_config: ServerWorkerConfig::default(),
|
||||
}
|
||||
}
|
||||
|
@ -128,15 +113,16 @@ impl ServerBuilder {
|
|||
self.max_concurrent_connections(num)
|
||||
}
|
||||
|
||||
// TODO: wtf is this for
|
||||
/// Stop Actix system.
|
||||
pub fn system_exit(mut self) -> Self {
|
||||
self.exit = true;
|
||||
self
|
||||
}
|
||||
|
||||
/// Disable signal handling.
|
||||
/// Disable OS signal handling.
|
||||
pub fn disable_signals(mut self) -> Self {
|
||||
self.no_signals = true;
|
||||
self.listen_os_signals = false;
|
||||
self
|
||||
}
|
||||
|
||||
|
@ -164,7 +150,7 @@ impl ServerBuilder {
|
|||
|
||||
for lst in sockets {
|
||||
let token = self.next_token();
|
||||
self.services.push(StreamNewService::create(
|
||||
self.factories.push(StreamNewService::create(
|
||||
name.as_ref().to_string(),
|
||||
token,
|
||||
factory.clone(),
|
||||
|
@ -215,7 +201,7 @@ impl ServerBuilder {
|
|||
lst.set_nonblocking(true)?;
|
||||
let token = self.next_token();
|
||||
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
||||
self.services.push(StreamNewService::create(
|
||||
self.factories.push(StreamNewService::create(
|
||||
name.as_ref().to_string(),
|
||||
token,
|
||||
factory,
|
||||
|
@ -240,7 +226,7 @@ impl ServerBuilder {
|
|||
let addr = lst.local_addr()?;
|
||||
|
||||
let token = self.next_token();
|
||||
self.services.push(StreamNewService::create(
|
||||
self.factories.push(StreamNewService::create(
|
||||
name.as_ref().to_string(),
|
||||
token,
|
||||
factory,
|
||||
|
@ -254,177 +240,11 @@ impl ServerBuilder {
|
|||
}
|
||||
|
||||
/// Starts processing incoming connections and return server controller.
|
||||
pub fn run(mut self) -> ServerHandle {
|
||||
pub fn run(self) -> Server {
|
||||
if self.sockets.is_empty() {
|
||||
panic!("Server should have at least one bound socket");
|
||||
} else {
|
||||
trace!("start running server");
|
||||
|
||||
for (_, name, lst) in &self.sockets {
|
||||
info!(
|
||||
r#"Starting service: "{}", workers: {}, listening on: {}"#,
|
||||
name,
|
||||
self.threads,
|
||||
lst.local_addr()
|
||||
);
|
||||
}
|
||||
|
||||
trace!("run server");
|
||||
|
||||
// start workers
|
||||
let handles = (0..self.threads)
|
||||
.map(|idx| {
|
||||
let (handle_accept, handle_server) =
|
||||
self.start_worker(idx, self.accept.waker_owned());
|
||||
self.handles.push((idx, handle_server));
|
||||
|
||||
handle_accept
|
||||
})
|
||||
.collect();
|
||||
|
||||
// start accept thread
|
||||
self.accept.start(
|
||||
mem::take(&mut self.sockets)
|
||||
.into_iter()
|
||||
.map(|t| (t.0, t.2))
|
||||
.collect(),
|
||||
handles,
|
||||
);
|
||||
|
||||
// handle signals
|
||||
if !self.no_signals {
|
||||
Signals::start(self.server.clone());
|
||||
}
|
||||
|
||||
// start http server actor
|
||||
let server = self.server.clone();
|
||||
rt::spawn(self);
|
||||
server
|
||||
}
|
||||
}
|
||||
|
||||
fn start_worker(
|
||||
&self,
|
||||
idx: usize,
|
||||
waker_queue: WakerQueue,
|
||||
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
||||
trace!("start server worker {}", idx);
|
||||
let services = self.services.iter().map(|v| v.clone_factory()).collect();
|
||||
ServerWorker::start(idx, services, waker_queue, self.worker_config)
|
||||
}
|
||||
|
||||
fn handle_cmd(&mut self, item: ServerCommand) {
|
||||
match item {
|
||||
ServerCommand::Pause(tx) => {
|
||||
self.accept.wake(WakerInterest::Pause);
|
||||
let _ = tx.send(());
|
||||
}
|
||||
ServerCommand::Resume(tx) => {
|
||||
self.accept.wake(WakerInterest::Resume);
|
||||
let _ = tx.send(());
|
||||
}
|
||||
ServerCommand::Signal(sig) => {
|
||||
// Signals support
|
||||
// Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system
|
||||
match sig {
|
||||
Signal::Int => {
|
||||
info!("SIGINT received; starting forced shutdown");
|
||||
self.exit = true;
|
||||
self.handle_cmd(ServerCommand::Stop {
|
||||
graceful: false,
|
||||
completion: None,
|
||||
})
|
||||
}
|
||||
|
||||
Signal::Term => {
|
||||
info!("SIGTERM received; starting graceful shutdown");
|
||||
self.exit = true;
|
||||
self.handle_cmd(ServerCommand::Stop {
|
||||
graceful: true,
|
||||
completion: None,
|
||||
})
|
||||
}
|
||||
|
||||
Signal::Quit => {
|
||||
info!("SIGQUIT received; starting forced shutdown");
|
||||
self.exit = true;
|
||||
self.handle_cmd(ServerCommand::Stop {
|
||||
graceful: false,
|
||||
completion: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
ServerCommand::Notify(tx) => {
|
||||
self.notify.push(tx);
|
||||
}
|
||||
ServerCommand::Stop {
|
||||
graceful,
|
||||
completion,
|
||||
} => {
|
||||
let exit = self.exit;
|
||||
|
||||
// stop accept thread
|
||||
self.accept.wake(WakerInterest::Stop);
|
||||
let notify = std::mem::take(&mut self.notify);
|
||||
|
||||
// stop workers
|
||||
let stop = self
|
||||
.handles
|
||||
.iter()
|
||||
.map(move |worker| worker.1.stop(graceful))
|
||||
.collect();
|
||||
|
||||
rt::spawn(async move {
|
||||
if graceful {
|
||||
// wait for all workers to shut down
|
||||
let _ = join_all(stop).await;
|
||||
}
|
||||
|
||||
if let Some(tx) = completion {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
|
||||
for tx in notify {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
|
||||
if exit {
|
||||
sleep(Duration::from_millis(300)).await;
|
||||
System::try_current().as_ref().map(System::stop);
|
||||
}
|
||||
});
|
||||
}
|
||||
ServerCommand::WorkerFaulted(idx) => {
|
||||
let mut found = false;
|
||||
for i in 0..self.handles.len() {
|
||||
if self.handles[i].0 == idx {
|
||||
self.handles.swap_remove(i);
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if found {
|
||||
error!("Worker {} has died; restarting", idx);
|
||||
|
||||
let mut new_idx = self.handles.len();
|
||||
'found: loop {
|
||||
for i in 0..self.handles.len() {
|
||||
if self.handles[i].0 == new_idx {
|
||||
new_idx += 1;
|
||||
continue 'found;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
let (handle_accept, handle_server) =
|
||||
self.start_worker(new_idx, self.accept.waker_owned());
|
||||
self.handles.push((new_idx, handle_server));
|
||||
self.accept.wake(WakerInterest::Worker(handle_accept));
|
||||
}
|
||||
}
|
||||
Server::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -435,19 +255,6 @@ impl ServerBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
impl Future for ServerBuilder {
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
match Pin::new(&mut self.cmd).poll_recv(cx) {
|
||||
Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it),
|
||||
_ => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn bind_addr<S: ToSocketAddrs>(
|
||||
addr: S,
|
||||
backlog: u32,
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
|
||||
|
||||
mod accept;
|
||||
mod availability;
|
||||
mod builder;
|
||||
mod server;
|
||||
mod service;
|
||||
|
|
|
@ -1,64 +1,170 @@
|
|||
use std::future::Future;
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
use std::{io, mem};
|
||||
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use actix_rt::time::sleep;
|
||||
use actix_rt::System;
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use log::{error, info, trace};
|
||||
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::accept::Accept;
|
||||
use crate::builder::ServerBuilder;
|
||||
use crate::signals::Signal;
|
||||
use crate::join_all;
|
||||
use crate::service::InternalServiceFactory;
|
||||
use crate::signals::{Signal, Signals};
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue};
|
||||
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum ServerCommand {
|
||||
WorkerFaulted(usize),
|
||||
Pause(oneshot::Sender<()>),
|
||||
Resume(oneshot::Sender<()>),
|
||||
Signal(Signal),
|
||||
Stop {
|
||||
/// True if shut down should be graceful.
|
||||
graceful: bool,
|
||||
completion: Option<oneshot::Sender<()>>,
|
||||
},
|
||||
/// Notify of server stop
|
||||
Notify(oneshot::Sender<()>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[non_exhaustive]
|
||||
pub struct Server;
|
||||
// TODO: docs + must use
|
||||
|
||||
impl Server {
|
||||
/// Start server building process.
|
||||
pub fn build() -> ServerBuilder {
|
||||
ServerBuilder::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Server handle.
|
||||
/// Server
|
||||
///
|
||||
/// # Shutdown Signals
|
||||
/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a
|
||||
/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown.
|
||||
///
|
||||
/// A graceful shutdown will wait for all workers to stop first.
|
||||
#[derive(Debug)]
|
||||
pub struct ServerHandle(
|
||||
UnboundedSender<ServerCommand>,
|
||||
Option<oneshot::Receiver<()>>,
|
||||
);
|
||||
#[must_use = "futures do nothing unless you `.await` or poll them"]
|
||||
pub enum Server {
|
||||
Server(ServerInner),
|
||||
Error(Option<io::Error>),
|
||||
}
|
||||
|
||||
impl ServerHandle {
|
||||
pub(crate) fn new(tx: UnboundedSender<ServerCommand>) -> Self {
|
||||
ServerHandle(tx, None)
|
||||
impl Server {
|
||||
/// Start server building process.
|
||||
pub fn build() -> ServerBuilder {
|
||||
ServerBuilder::default()
|
||||
}
|
||||
|
||||
pub(crate) fn signal(&self, sig: Signal) {
|
||||
let _ = self.0.send(ServerCommand::Signal(sig));
|
||||
pub(crate) fn new(mut builder: ServerBuilder) -> Self {
|
||||
trace!("start running server");
|
||||
|
||||
let sockets = mem::take(&mut builder.sockets)
|
||||
.into_iter()
|
||||
.map(|t| (t.0, t.2))
|
||||
.collect();
|
||||
|
||||
// Give log information on what runtime will be used.
|
||||
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
|
||||
let is_actix = actix_rt::System::try_current().is_some();
|
||||
|
||||
match (is_tokio, is_actix) {
|
||||
(true, false) => info!("Tokio runtime found. Starting in existing Tokio runtime"),
|
||||
(_, true) => info!("Actix runtime found. Starting in Actix runtime"),
|
||||
(_, _) => info!(
|
||||
"Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime"
|
||||
),
|
||||
}
|
||||
|
||||
for (_, name, lst) in &builder.sockets {
|
||||
info!(
|
||||
r#"Starting service: "{}", workers: {}, listening on: {}"#,
|
||||
name,
|
||||
builder.threads,
|
||||
lst.local_addr()
|
||||
);
|
||||
}
|
||||
|
||||
trace!("run server");
|
||||
|
||||
match Accept::start(sockets, &builder) {
|
||||
Ok((waker_queue, worker_handles)) => {
|
||||
// construct OS signals listener future
|
||||
let signals = (!builder.listen_os_signals).then(Signals::new);
|
||||
|
||||
Self::Server(ServerInner {
|
||||
cmd_tx: builder.cmd_tx.clone(),
|
||||
cmd_rx: builder.cmd_rx,
|
||||
signals,
|
||||
waker_queue,
|
||||
worker_handles,
|
||||
worker_config: builder.worker_config,
|
||||
services: builder.factories,
|
||||
exit: builder.exit,
|
||||
stop_task: None,
|
||||
})
|
||||
}
|
||||
|
||||
Err(err) => Self::Error(Some(err)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle(&self) -> ServerHandle {
|
||||
match self {
|
||||
Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()),
|
||||
Server::Error(err) => {
|
||||
// TODO: i don't think this is the best way to handle server startup fail
|
||||
panic!(
|
||||
"server handle can not be obtained because server failed to start up: {:?}",
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Server {
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.as_mut().get_mut() {
|
||||
Server::Error(err) => Poll::Ready(Err(err
|
||||
.take()
|
||||
.expect("Server future cannot be polled after error"))),
|
||||
|
||||
Server::Server(inner) => {
|
||||
// poll Signals
|
||||
if let Some(ref mut signals) = inner.signals {
|
||||
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {
|
||||
inner.stop_task = inner.handle_signal(signal);
|
||||
// drop signals listener
|
||||
inner.signals = None;
|
||||
}
|
||||
}
|
||||
|
||||
// eager drain command channel and handle command
|
||||
loop {
|
||||
match Pin::new(&mut inner.cmd_rx).poll_recv(cx) {
|
||||
Poll::Ready(Some(cmd)) => {
|
||||
inner.stop_task = inner.handle_cmd(cmd);
|
||||
}
|
||||
_ => return Poll::Pending,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Server handle.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServerHandle {
|
||||
tx_cmd: UnboundedSender<ServerCommand>,
|
||||
}
|
||||
|
||||
impl ServerHandle {
|
||||
pub(crate) fn new(tx_cmd: UnboundedSender<ServerCommand>) -> Self {
|
||||
ServerHandle { tx_cmd }
|
||||
}
|
||||
|
||||
pub(crate) fn worker_faulted(&self, idx: usize) {
|
||||
let _ = self.0.send(ServerCommand::WorkerFaulted(idx));
|
||||
let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx));
|
||||
}
|
||||
|
||||
/// Pause accepting incoming connections
|
||||
|
@ -67,7 +173,7 @@ impl ServerHandle {
|
|||
/// All opened connection remains active.
|
||||
pub fn pause(&self) -> impl Future<Output = ()> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = self.0.send(ServerCommand::Pause(tx));
|
||||
let _ = self.tx_cmd.send(ServerCommand::Pause(tx));
|
||||
async {
|
||||
let _ = rx.await;
|
||||
}
|
||||
|
@ -76,7 +182,7 @@ impl ServerHandle {
|
|||
/// Resume accepting incoming connections
|
||||
pub fn resume(&self) -> impl Future<Output = ()> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = self.0.send(ServerCommand::Resume(tx));
|
||||
let _ = self.tx_cmd.send(ServerCommand::Resume(tx));
|
||||
async {
|
||||
let _ = rx.await;
|
||||
}
|
||||
|
@ -87,7 +193,7 @@ impl ServerHandle {
|
|||
/// If server starts with `spawn()` method, then spawned thread get terminated.
|
||||
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = self.0.send(ServerCommand::Stop {
|
||||
let _ = self.tx_cmd.send(ServerCommand::Stop {
|
||||
graceful,
|
||||
completion: Some(tx),
|
||||
});
|
||||
|
@ -97,29 +203,129 @@ impl ServerHandle {
|
|||
}
|
||||
}
|
||||
|
||||
impl Clone for ServerHandle {
|
||||
fn clone(&self) -> Self {
|
||||
Self(self.0.clone(), None)
|
||||
}
|
||||
pub struct ServerInner {
|
||||
worker_handles: Vec<WorkerHandleServer>,
|
||||
worker_config: ServerWorkerConfig,
|
||||
services: Vec<Box<dyn InternalServiceFactory>>,
|
||||
exit: bool,
|
||||
cmd_tx: UnboundedSender<ServerCommand>,
|
||||
cmd_rx: UnboundedReceiver<ServerCommand>,
|
||||
signals: Option<Signals>,
|
||||
waker_queue: WakerQueue,
|
||||
stop_task: Option<LocalBoxFuture<'static, ()>>,
|
||||
}
|
||||
|
||||
impl Future for ServerHandle {
|
||||
type Output = io::Result<()>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
if this.1.is_none() {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
if this.0.send(ServerCommand::Notify(tx)).is_err() {
|
||||
return Poll::Ready(Ok(()));
|
||||
impl ServerInner {
|
||||
fn handle_cmd(&mut self, item: ServerCommand) -> Option<LocalBoxFuture<'static, ()>> {
|
||||
match item {
|
||||
ServerCommand::Pause(tx) => {
|
||||
self.waker_queue.wake(WakerInterest::Pause);
|
||||
let _ = tx.send(());
|
||||
None
|
||||
}
|
||||
this.1 = Some(rx);
|
||||
}
|
||||
|
||||
match Pin::new(this.1.as_mut().unwrap()).poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(_) => Poll::Ready(Ok(())),
|
||||
ServerCommand::Resume(tx) => {
|
||||
self.waker_queue.wake(WakerInterest::Resume);
|
||||
let _ = tx.send(());
|
||||
None
|
||||
}
|
||||
|
||||
ServerCommand::Stop {
|
||||
graceful,
|
||||
completion,
|
||||
} => {
|
||||
let exit = self.exit;
|
||||
|
||||
// stop accept thread
|
||||
self.waker_queue.wake(WakerInterest::Stop);
|
||||
|
||||
// stop workers
|
||||
let stop = self
|
||||
.worker_handles
|
||||
.iter()
|
||||
.map(|worker| worker.stop(graceful))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Some(Box::pin(async move {
|
||||
if graceful {
|
||||
// wait for all workers to shut down
|
||||
let _ = join_all(stop).await;
|
||||
}
|
||||
|
||||
if let Some(tx) = completion {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
|
||||
if exit {
|
||||
sleep(Duration::from_millis(300)).await;
|
||||
System::try_current().as_ref().map(System::stop);
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
ServerCommand::WorkerFaulted(idx) => {
|
||||
// TODO: maybe just return if not found ?
|
||||
assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
|
||||
|
||||
error!("Worker {} has died; restarting", idx);
|
||||
|
||||
let factories = self
|
||||
.services
|
||||
.iter()
|
||||
.map(|service| service.clone_factory())
|
||||
.collect();
|
||||
|
||||
match ServerWorker::start(
|
||||
idx,
|
||||
factories,
|
||||
self.waker_queue.clone(),
|
||||
self.worker_config,
|
||||
) {
|
||||
Ok((handle_accept, handle_server)) => {
|
||||
*self
|
||||
.worker_handles
|
||||
.iter_mut()
|
||||
.find(|wrk| wrk.idx == idx)
|
||||
.unwrap() = handle_server;
|
||||
|
||||
self.waker_queue.wake(WakerInterest::Worker(handle_accept));
|
||||
}
|
||||
Err(_) => todo!(),
|
||||
};
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_signal(&mut self, signal: Signal) -> Option<LocalBoxFuture<'static, ()>> {
|
||||
match signal {
|
||||
Signal::Int => {
|
||||
info!("SIGINT received; starting forced shutdown");
|
||||
self.exit = true;
|
||||
self.handle_cmd(ServerCommand::Stop {
|
||||
graceful: false,
|
||||
completion: None,
|
||||
})
|
||||
}
|
||||
|
||||
Signal::Term => {
|
||||
info!("SIGTERM received; starting graceful shutdown");
|
||||
self.exit = true;
|
||||
self.handle_cmd(ServerCommand::Stop {
|
||||
graceful: true,
|
||||
completion: None,
|
||||
})
|
||||
}
|
||||
|
||||
Signal::Quit => {
|
||||
info!("SIGQUIT received; starting forced shutdown");
|
||||
self.exit = true;
|
||||
self.handle_cmd(ServerCommand::Stop {
|
||||
graceful: false,
|
||||
completion: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,11 +2,9 @@ use std::future::Future;
|
|||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use crate::server::ServerHandle;
|
||||
|
||||
/// Types of process signals.
|
||||
#[allow(dead_code)]
|
||||
#[derive(PartialEq, Clone, Copy, Debug)]
|
||||
// #[allow(dead_code)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq)]
|
||||
pub(crate) enum Signal {
|
||||
/// `SIGINT`
|
||||
Int,
|
||||
|
@ -20,8 +18,6 @@ pub(crate) enum Signal {
|
|||
|
||||
/// Process signal listener.
|
||||
pub(crate) struct Signals {
|
||||
srv: ServerHandle,
|
||||
|
||||
#[cfg(not(unix))]
|
||||
signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>,
|
||||
|
||||
|
@ -30,14 +26,13 @@ pub(crate) struct Signals {
|
|||
}
|
||||
|
||||
impl Signals {
|
||||
/// Spawns a signal listening future that is able to send commands to the `Server`.
|
||||
pub(crate) fn start(srv: ServerHandle) {
|
||||
/// Constructs an OS signal listening future.
|
||||
pub(crate) fn new() -> Self {
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
actix_rt::spawn(Signals {
|
||||
srv,
|
||||
Signals {
|
||||
signals: Box::pin(actix_rt::signal::ctrl_c()),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
|
@ -66,33 +61,29 @@ impl Signals {
|
|||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
actix_rt::spawn(Signals { srv, signals });
|
||||
Signals { signals }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Future for Signals {
|
||||
type Output = ();
|
||||
type Output = Signal;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
#[cfg(not(unix))]
|
||||
match self.signals.as_mut().poll(cx) {
|
||||
Poll::Ready(_) => {
|
||||
self.srv.signal(Signal::Int);
|
||||
Poll::Ready(())
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
{
|
||||
self.signals.as_mut().poll(cx).map(|_| Signal::Int)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
for (sig, fut) in self.signals.iter_mut() {
|
||||
// TODO: match on if let Some ?
|
||||
if Pin::new(fut).poll_recv(cx).is_ready() {
|
||||
let sig = *sig;
|
||||
self.srv.signal(sig);
|
||||
return Poll::Ready(());
|
||||
return Poll::Ready(*sig);
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
mem,
|
||||
io, mem,
|
||||
pin::Pin,
|
||||
rc::Rc,
|
||||
sync::{
|
||||
|
@ -43,19 +43,20 @@ pub(crate) struct Conn {
|
|||
pub token: usize,
|
||||
}
|
||||
|
||||
///
|
||||
fn handle_pair(
|
||||
idx: usize,
|
||||
tx1: UnboundedSender<Conn>,
|
||||
tx2: UnboundedSender<Stop>,
|
||||
tx_conn: UnboundedSender<Conn>,
|
||||
tx_stop: UnboundedSender<Stop>,
|
||||
counter: Counter,
|
||||
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
||||
let accept = WorkerHandleAccept {
|
||||
idx,
|
||||
tx: tx1,
|
||||
tx_conn,
|
||||
counter,
|
||||
};
|
||||
|
||||
let server = WorkerHandleServer { idx, tx: tx2 };
|
||||
let server = WorkerHandleServer { idx, tx_stop };
|
||||
|
||||
(accept, server)
|
||||
}
|
||||
|
@ -151,13 +152,13 @@ impl Drop for WorkerCounterGuard {
|
|||
}
|
||||
}
|
||||
|
||||
/// Handle to worker that can send connection message to worker and share the
|
||||
/// availability of worker to other thread.
|
||||
/// Handle to worker that can send connection message to worker and share the availability of worker
|
||||
/// to other threads.
|
||||
///
|
||||
/// Held by [Accept](crate::accept::Accept).
|
||||
pub(crate) struct WorkerHandleAccept {
|
||||
idx: usize,
|
||||
tx: UnboundedSender<Conn>,
|
||||
tx_conn: UnboundedSender<Conn>,
|
||||
counter: Counter,
|
||||
}
|
||||
|
||||
|
@ -168,8 +169,8 @@ impl WorkerHandleAccept {
|
|||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
|
||||
self.tx.send(msg).map_err(|msg| msg.0)
|
||||
pub(crate) fn send(&self, conn: Conn) -> Result<(), Conn> {
|
||||
self.tx_conn.send(conn).map_err(|msg| msg.0)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
|
@ -183,15 +184,14 @@ impl WorkerHandleAccept {
|
|||
/// Held by [ServerBuilder](crate::builder::ServerBuilder).
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct WorkerHandleServer {
|
||||
#[allow(dead_code)]
|
||||
idx: usize,
|
||||
tx: UnboundedSender<Stop>,
|
||||
pub(crate) idx: usize,
|
||||
tx_stop: UnboundedSender<Stop>,
|
||||
}
|
||||
|
||||
impl WorkerHandleServer {
|
||||
pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
let _ = self.tx.send(Stop { graceful, tx });
|
||||
let _ = self.tx_stop.send(Stop { graceful, tx });
|
||||
rx
|
||||
}
|
||||
}
|
||||
|
@ -274,7 +274,7 @@ impl ServerWorker {
|
|||
factories: Vec<Box<dyn InternalServiceFactory>>,
|
||||
waker_queue: WakerQueue,
|
||||
config: ServerWorkerConfig,
|
||||
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
||||
) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
|
||||
trace!("starting server worker {}", idx);
|
||||
|
||||
let (tx1, rx) = unbounded_channel();
|
||||
|
@ -296,6 +296,8 @@ impl ServerWorker {
|
|||
// get actix system context if it is set
|
||||
let sys = System::try_current();
|
||||
|
||||
// TODO: wait for server startup with sync channel
|
||||
|
||||
std::thread::Builder::new()
|
||||
.name("eofibef".to_owned())
|
||||
.spawn(move || {
|
||||
|
@ -339,6 +341,7 @@ impl ServerWorker {
|
|||
services
|
||||
})
|
||||
.into_boxed_slice(),
|
||||
|
||||
Err(e) => {
|
||||
error!("Can not start worker: {:?}", e);
|
||||
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
|
||||
|
@ -365,7 +368,7 @@ impl ServerWorker {
|
|||
})
|
||||
.expect("worker thread error/panic");
|
||||
|
||||
handle_pair(idx, tx1, tx2, counter)
|
||||
Ok(handle_pair(idx, tx1, tx2, counter))
|
||||
}
|
||||
|
||||
fn restart_service(&mut self, idx: usize, factory_id: usize) {
|
||||
|
|
|
@ -30,7 +30,7 @@ fn test_bind() {
|
|||
})?
|
||||
.run();
|
||||
|
||||
let _ = tx.send((srv.clone(), actix_rt::System::current()));
|
||||
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
||||
|
||||
srv.await
|
||||
})
|
||||
|
@ -61,7 +61,7 @@ fn test_listen() {
|
|||
})?
|
||||
.run();
|
||||
|
||||
let _ = tx.send((srv.clone(), actix_rt::System::current()));
|
||||
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
||||
|
||||
srv.await
|
||||
})
|
||||
|
@ -103,7 +103,7 @@ fn test_start() {
|
|||
})?
|
||||
.run();
|
||||
|
||||
let _ = tx.send((srv.clone(), actix_rt::System::current()));
|
||||
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
||||
|
||||
srv.await
|
||||
})
|
||||
|
@ -166,7 +166,7 @@ async fn test_max_concurrent_connections() {
|
|||
|
||||
let h = thread::spawn(move || {
|
||||
actix_rt::System::new().block_on(async {
|
||||
let server = Server::build()
|
||||
let srv = Server::build()
|
||||
// Set a relative higher backlog.
|
||||
.backlog(12)
|
||||
// max connection for a worker is 3.
|
||||
|
@ -187,9 +187,9 @@ async fn test_max_concurrent_connections() {
|
|||
})?
|
||||
.run();
|
||||
|
||||
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
||||
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
||||
|
||||
server.await
|
||||
srv.await
|
||||
})
|
||||
});
|
||||
|
||||
|
@ -260,7 +260,7 @@ async fn test_service_restart() {
|
|||
let h = thread::spawn(move || {
|
||||
let num = num.clone();
|
||||
actix_rt::System::new().block_on(async {
|
||||
let server = Server::build()
|
||||
let srv = Server::build()
|
||||
.backlog(1)
|
||||
.disable_signals()
|
||||
.bind("addr1", addr1, move || {
|
||||
|
@ -280,12 +280,12 @@ async fn test_service_restart() {
|
|||
.workers(1)
|
||||
.run();
|
||||
|
||||
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
||||
server.await
|
||||
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
||||
srv.await
|
||||
})
|
||||
});
|
||||
|
||||
let (server, sys) = rx.recv().unwrap();
|
||||
let (srv, sys) = rx.recv().unwrap();
|
||||
|
||||
for _ in 0..5 {
|
||||
TcpStream::connect(addr1)
|
||||
|
@ -307,7 +307,7 @@ async fn test_service_restart() {
|
|||
assert!(num_clone.load(Ordering::SeqCst) > 5);
|
||||
assert!(num2_clone.load(Ordering::SeqCst) > 5);
|
||||
|
||||
let _ = server.stop(false);
|
||||
let _ = srv.stop(false);
|
||||
sys.stop();
|
||||
h.join().unwrap().unwrap();
|
||||
}
|
||||
|
@ -379,19 +379,19 @@ async fn worker_restart() {
|
|||
let h = thread::spawn(move || {
|
||||
let counter = counter.clone();
|
||||
actix_rt::System::new().block_on(async {
|
||||
let server = Server::build()
|
||||
let srv = Server::build()
|
||||
.disable_signals()
|
||||
.bind("addr", addr, move || TestServiceFactory(counter.clone()))?
|
||||
.workers(2)
|
||||
.run();
|
||||
|
||||
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
||||
let _ = tx.send((srv.handle(), actix_rt::System::current()));
|
||||
|
||||
server.await
|
||||
srv.await
|
||||
})
|
||||
});
|
||||
|
||||
let (server, sys) = rx.recv().unwrap();
|
||||
let (srv, sys) = rx.recv().unwrap();
|
||||
|
||||
sleep(Duration::from_secs(3)).await;
|
||||
|
||||
|
@ -448,7 +448,7 @@ async fn worker_restart() {
|
|||
assert_eq!("3", id);
|
||||
stream.shutdown().await.unwrap();
|
||||
|
||||
let _ = server.stop(false);
|
||||
let _ = srv.stop(false);
|
||||
sys.stop();
|
||||
h.join().unwrap().unwrap();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue