align accept timeout logic with fake's impl

This commit is contained in:
Rob Ede 2021-11-03 03:42:19 +00:00
parent 7b99e85797
commit d89c7e2de4
10 changed files with 224 additions and 201 deletions

View File

@ -38,4 +38,4 @@ actix-rt = "2.0.0"
bytes = "1"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1.5.1", features = ["io-util"] }
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }

View File

@ -94,6 +94,7 @@ async fn main() -> io::Result<()> {
Ok(())
}
// alternatively:
// #[actix_rt::main]
// async fn main() -> io::Result<()> {
// run().await?;

View File

@ -1,21 +1,20 @@
use std::time::Duration;
use std::{io, thread};
use std::{io, thread, time::Duration};
use actix_rt::time::Instant;
use actix_rt::{time::sleep, System};
use log::{debug, error, info};
use mio::{Interest, Poll, Token as MioToken};
use crate::worker::ServerWorker;
use crate::{
availability::Availability,
server::ServerHandle,
socket::MioListener,
waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN},
worker::{Conn, WorkerHandleAccept, WorkerHandleServer},
worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer},
ServerBuilder,
};
const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510);
struct ServerSocketInfo {
token: usize,
@ -34,6 +33,8 @@ pub(crate) struct Accept {
srv: ServerHandle,
next: usize,
avail: Availability,
/// use the smallest duration from sockets timeout.
timeout: Option<Duration>,
paused: bool,
}
@ -115,6 +116,7 @@ impl Accept {
srv: server_handle,
next: 0,
avail,
timeout: None,
paused: false,
};
@ -149,6 +151,9 @@ impl Accept {
}
}
}
// check for timeout and re-register sockets
self.process_timeout(sockets);
}
}
@ -171,6 +176,7 @@ impl Accept {
self.accept_all(sockets);
}
}
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
@ -182,12 +188,7 @@ impl Accept {
self.accept_all(sockets);
}
}
// got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(sockets)
}
Some(WakerInterest::Pause) => {
drop(guard);
@ -197,6 +198,7 @@ impl Accept {
self.deregister_all(sockets);
}
}
Some(WakerInterest::Resume) => {
drop(guard);
@ -210,6 +212,7 @@ impl Accept {
self.accept_all(sockets);
}
}
Some(WakerInterest::Stop) => {
if !self.paused {
self.deregister_all(sockets);
@ -217,6 +220,7 @@ impl Accept {
return true;
}
// waker queue is drained
None => {
// Reset the WakerQueue before break so it does not grow infinitely
@ -228,26 +232,44 @@ impl Accept {
}
}
fn process_timer(&self, sockets: &mut [ServerSocketInfo]) {
let now = Instant::now();
fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) {
// always remove old timeouts
if self.timeout.take().is_some() {
let now = Instant::now();
sockets
.iter_mut()
// Only sockets that had an associated timeout were deregistered.
.filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
sockets
.iter_mut()
// Only sockets that had an associated timeout were deregistered.
.filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
if now < inst {
info.timeout = Some(inst);
} else if !self.paused {
self.register_logged(info);
if now < inst {
// still timed out; try to set new timeout
info.timeout = Some(inst);
self.set_timeout(inst - now);
} else if !self.paused {
// timeout expired; register socket again
self.register_logged(info);
}
// Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
}
}
/// Update accept timeout with `duration` if it is shorter than current timeout.
fn set_timeout(&mut self, duration: Duration) {
match self.timeout {
Some(ref mut timeout) => {
if *timeout > duration {
*timeout = duration;
}
// Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
}
None => self.timeout = Some(duration),
}
}
#[cfg(not(target_os = "windows"))]
@ -387,26 +409,7 @@ impl Accept {
// the poll would need it mark which socket and when it's
// listener should be registered
info.timeout = Some(Instant::now() + Duration::from_millis(500));
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker_queue.clone();
match System::try_current() {
Some(sys) => {
sys.arbiter().spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
}
None => {
let rt = tokio::runtime::Handle::current();
rt.spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
}
}
self.set_timeout(TIMEOUT_DURATION_ON_ERROR);
return;
}

View File

@ -1,11 +1,11 @@
use std::{io, time::Duration};
use actix_rt::net::TcpStream;
use log::trace;
use log::{info, trace};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use crate::{
server::{ServerCommand, ServerHandle},
server::ServerCommand,
service::{InternalServiceFactory, ServiceFactory, StreamNewService},
socket::{
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
@ -14,18 +14,18 @@ use crate::{
Server,
};
/// Server builder
/// Server builder.
pub struct ServerBuilder {
pub(super) threads: usize,
pub(super) token: usize,
pub(super) backlog: u32,
pub(super) factories: Vec<Box<dyn InternalServiceFactory>>,
pub(super) sockets: Vec<(usize, String, MioListener)>,
pub(super) exit: bool,
pub(super) listen_os_signals: bool,
pub(super) cmd_tx: UnboundedSender<ServerCommand>,
pub(super) cmd_rx: UnboundedReceiver<ServerCommand>,
pub(super) worker_config: ServerWorkerConfig,
pub(crate) threads: usize,
pub(crate) token: usize,
pub(crate) backlog: u32,
pub(crate) factories: Vec<Box<dyn InternalServiceFactory>>,
pub(crate) sockets: Vec<(usize, String, MioListener)>,
pub(crate) exit: bool,
pub(crate) listen_os_signals: bool,
pub(crate) cmd_tx: UnboundedSender<ServerCommand>,
pub(crate) cmd_rx: UnboundedReceiver<ServerCommand>,
pub(crate) worker_config: ServerWorkerConfig,
}
impl Default for ServerBuilder {
@ -37,8 +37,7 @@ impl Default for ServerBuilder {
impl ServerBuilder {
/// Create new Server builder instance
pub fn new() -> ServerBuilder {
let (tx, rx) = unbounded_channel();
let _server = ServerHandle::new(tx.clone());
let (cmd_tx, cmd_rx) = unbounded_channel();
ServerBuilder {
threads: num_cpus::get(),
@ -48,8 +47,8 @@ impl ServerBuilder {
backlog: 2048,
exit: false,
listen_os_signals: true,
cmd_tx: tx,
cmd_rx: rx,
cmd_tx,
cmd_rx,
worker_config: ServerWorkerConfig::default(),
}
}
@ -244,6 +243,7 @@ impl ServerBuilder {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
info!("Starting {} workers", self.threads);
Server::new(self)
}
}

View File

@ -0,0 +1,76 @@
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAll<T> {
fut: Vec<JoinFuture<T>>,
}
pub(crate) fn join_all<T>(fut: Vec<impl Future<Output = T> + 'static>) -> JoinAll<T> {
let fut = fut
.into_iter()
.map(|f| JoinFuture::Future(Box::pin(f)))
.collect();
JoinAll { fut }
}
enum JoinFuture<T> {
Future(Pin<Box<dyn Future<Output = T>>>),
Result(Option<T>),
}
impl<T> Unpin for JoinAll<T> {}
impl<T> Future for JoinAll<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready = true;
let this = self.get_mut();
for fut in this.fut.iter_mut() {
if let JoinFuture::Future(f) = fut {
match f.as_mut().poll(cx) {
Poll::Ready(t) => {
*fut = JoinFuture::Result(Some(t));
}
Poll::Pending => ready = false,
}
}
}
if ready {
let mut res = Vec::new();
for fut in this.fut.iter_mut() {
if let JoinFuture::Result(f) = fut {
res.push(f.take().unwrap());
}
}
Poll::Ready(res)
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod test {
use super::*;
use actix_utils::future::ready;
#[actix_rt::test]
async fn test_join_all() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
}

View File

@ -7,6 +7,7 @@
mod accept;
mod availability;
mod builder;
mod join_all;
mod server;
mod service;
mod signals;
@ -22,83 +23,3 @@ pub use self::test_server::TestServer;
#[doc(hidden)]
pub use self::socket::FromStream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Start server building process
pub fn new() -> ServerBuilder {
ServerBuilder::default()
}
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAll<T> {
fut: Vec<JoinFuture<T>>,
}
pub(crate) fn join_all<T>(fut: Vec<impl Future<Output = T> + 'static>) -> JoinAll<T> {
let fut = fut
.into_iter()
.map(|f| JoinFuture::Future(Box::pin(f)))
.collect();
JoinAll { fut }
}
enum JoinFuture<T> {
Future(Pin<Box<dyn Future<Output = T>>>),
Result(Option<T>),
}
impl<T> Unpin for JoinAll<T> {}
impl<T> Future for JoinAll<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready = true;
let this = self.get_mut();
for fut in this.fut.iter_mut() {
if let JoinFuture::Future(f) = fut {
match f.as_mut().poll(cx) {
Poll::Ready(t) => {
*fut = JoinFuture::Result(Some(t));
}
Poll::Pending => ready = false,
}
}
}
if ready {
let mut res = Vec::new();
for fut in this.fut.iter_mut() {
if let JoinFuture::Result(f) = fut {
res.push(f.take().unwrap());
}
}
Poll::Ready(res)
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod test {
use super::*;
use actix_utils::future::ready;
#[actix_rt::test]
async fn test_join_all() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
}

View File

@ -1,32 +1,46 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use std::{io, mem};
use std::{
future::Future,
io, mem,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use actix_rt::time::sleep;
use actix_rt::System;
use actix_rt::{time::sleep, System};
use futures_core::future::LocalBoxFuture;
use log::{error, info, trace};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
};
use crate::accept::Accept;
use crate::builder::ServerBuilder;
use crate::join_all;
use crate::service::InternalServiceFactory;
use crate::signals::{Signal, Signals};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer};
use crate::{
accept::Accept,
builder::ServerBuilder,
join_all::join_all,
service::InternalServiceFactory,
signals::{Signal, Signals},
waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer},
};
#[derive(Debug)]
pub(crate) enum ServerCommand {
/// TODO
WorkerFaulted(usize),
/// Contains return channel to notify caller of successful state change.
Pause(oneshot::Sender<()>),
/// Contains return channel to notify caller of successful state change.
Resume(oneshot::Sender<()>),
/// TODO
Stop {
/// True if shut down should be graceful.
graceful: bool,
/// Return channel to notify caller that shutdown is complete.
completion: Option<oneshot::Sender<()>>,
},
}
@ -105,14 +119,17 @@ impl Server {
}
}
/// Get a handle for ServerFuture that can be used to change state of actix server.
///
/// See [ServerHandle](ServerHandle) for usage.
pub fn handle(&self) -> ServerHandle {
match self {
Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()),
Server::Error(err) => {
// TODO: i don't think this is the best way to handle server startup fail
panic!(
"server handle can not be obtained because server failed to start up: {:?}",
err
"server handle can not be obtained because server failed to start up: {}",
err.as_ref().unwrap()
);
}
}
@ -138,10 +155,16 @@ impl Future for Server {
}
}
// eager drain command channel and handle command
// handle stop tasks and eager drain command channel
loop {
if let Some(ref mut fut) = inner.stop_task {
// only resolve stop task and exit
return fut.as_mut().poll(cx).map(|_| Ok(()));
}
match Pin::new(&mut inner.cmd_rx).poll_recv(cx) {
Poll::Ready(Some(cmd)) => {
// if stop task is required, set it and loop
inner.stop_task = inner.handle_cmd(cmd);
}
_ => return Poll::Pending,
@ -167,10 +190,9 @@ impl ServerHandle {
let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx));
}
/// Pause accepting incoming connections
/// Pause accepting incoming connections.
///
/// If socket contains some pending connection, they might be dropped.
/// All opened connection remains active.
/// May drop socket pending connection. All open connections remain active.
pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx_cmd.send(ServerCommand::Pause(tx));
@ -179,7 +201,7 @@ impl ServerHandle {
}
}
/// Resume accepting incoming connections
/// Resume accepting incoming connections.
pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx_cmd.send(ServerCommand::Resume(tx));
@ -189,8 +211,6 @@ impl ServerHandle {
}
/// Stop incoming connection processing, stop all workers and exit.
///
/// If server starts with `spawn()` method, then spawned thread get terminated.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel();
let _ = self.tx_cmd.send(ServerCommand::Stop {
@ -264,7 +284,7 @@ impl ServerInner {
}
ServerCommand::WorkerFaulted(idx) => {
// TODO: maybe just return if not found ?
// TODO: maybe just return with warning log if not found ?
assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx));
error!("Worker {} has died; restarting", idx);
@ -290,7 +310,8 @@ impl ServerInner {
self.waker_queue.wake(WakerInterest::Worker(handle_accept));
}
Err(_) => todo!(),
Err(err) => error!("can not restart worker {}: {}", idx, err),
};
None

View File

@ -1,9 +1,9 @@
use std::sync::mpsc;
use std::{net, thread};
use std::{io, net, thread};
use actix_rt::{net::TcpStream, System};
use crate::{Server, ServerBuilder, ServiceFactory};
use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory};
/// A testing server.
///
@ -34,7 +34,8 @@ pub struct TestServerRuntime {
addr: net::SocketAddr,
host: String,
port: u16,
system: System,
server_handle: ServerHandle,
thread_handle: Option<thread::JoinHandle<io::Result<()>>>,
}
impl TestServer {
@ -46,20 +47,22 @@ impl TestServer {
let (tx, rx) = mpsc::channel();
// run server in separate thread
thread::spawn(move || {
let sys = System::new();
factory(Server::build()).workers(1).disable_signals().run();
tx.send(System::current()).unwrap();
sys.run()
let thread_handle = thread::spawn(move || {
System::new().block_on(async {
let server = factory(Server::build()).workers(1).disable_signals().run();
tx.send(server.handle()).unwrap();
server.await
})
});
let system = rx.recv().unwrap();
let server_handle = rx.recv().unwrap();
TestServerRuntime {
system,
addr: "127.0.0.1:0".parse().unwrap(),
host: "127.0.0.1".to_string(),
port: 0,
server_handle,
thread_handle: Some(thread_handle),
}
}
@ -68,24 +71,25 @@ impl TestServer {
let (tx, rx) = mpsc::channel();
// run server in separate thread
thread::spawn(move || {
let thread_handle = thread::spawn(move || {
let sys = System::new();
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
let local_addr = tcp.local_addr().unwrap();
sys.block_on(async {
Server::build()
let server = Server::build()
.listen("test", tcp, factory)
.unwrap()
.workers(1)
.disable_signals()
.run();
tx.send((System::current(), local_addr)).unwrap();
});
sys.run()
tx.send((server.handle(), local_addr)).unwrap();
server.await
})
});
let (system, addr) = rx.recv().unwrap();
let (server_handle, addr) = rx.recv().unwrap();
let host = format!("{}", addr.ip());
let port = addr.port();
@ -94,7 +98,8 @@ impl TestServer {
addr,
host,
port,
system,
server_handle,
thread_handle: Some(thread_handle),
}
}
@ -127,7 +132,8 @@ impl TestServerRuntime {
/// Stop server.
fn stop(&mut self) {
self.system.stop();
let _ = self.server_handle.stop(false);
self.thread_handle.take().unwrap().join().unwrap().unwrap();
}
/// Connect to server, returning a Tokio `TcpStream`.

View File

@ -78,12 +78,7 @@ pub(crate) enum WakerInterest {
Pause,
Resume,
Stop,
/// `Timer` is an interest sent as a delayed future. When an error happens on accepting
/// connection `Accept` would deregister socket listener temporary and wake up the poll and
/// register them again after the delayed future resolve.
Timer,
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandleAccept`.
/// `Worker` is an interest that is triggered after a worker faults. This is determined by
/// trying to send work to it. `Accept` would be waked up and add the new `WorkerHandleAccept`.
Worker(WorkerHandleAccept),
}

View File

@ -24,7 +24,7 @@ use tokio::sync::{
};
use crate::{
join_all,
join_all::join_all,
service::{BoxedServerService, InternalServiceFactory},
socket::MioStream,
waker_queue::{WakerInterest, WakerQueue},
@ -224,7 +224,7 @@ impl WorkerService {
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum WorkerServiceStatus {
Available,
Unavailable,
@ -235,7 +235,7 @@ enum WorkerServiceStatus {
}
/// Config for worker behavior passed down from server builder.
#[derive(Copy, Clone)]
#[derive(Debug, Clone, Copy)]
pub(crate) struct ServerWorkerConfig {
shutdown_timeout: Duration,
max_blocking_threads: usize,