merge branch master

This commit is contained in:
fakeshadow 2021-02-04 23:25:45 +08:00
commit da9c40b04f
5 changed files with 97 additions and 25 deletions

View File

@ -2,8 +2,12 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
* Hidden `ServerBuilder::start` method has been removed. Use `ServerBuilder::run`. [#246] * Hidden `ServerBuilder::start` method has been removed. Use `ServerBuilder::run`. [#246]
* Add retry for EINTR(`io::Interrupted`) in `Accept`'s poll loop. [#264]
* Add `ServerBuilder::worker_max_blocking_threads` for customize blocking thread pool. [#265]
[#246]: https://github.com/actix/actix-net/pull/246 [#246]: https://github.com/actix/actix-net/pull/246
[#264]: https://github.com/actix/actix-net/pull/264
[#265]: https://github.com/actix/actix-net/pull/265
## 2.0.0-beta.2 - 2021-01-03 ## 2.0.0-beta.2 - 2021-01-03

View File

@ -36,7 +36,7 @@ slab = "0.4"
tokio = { version = "1", features = ["sync"] } tokio = { version = "1", features = ["sync"] }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0-beta.2" actix-rt = "2.0.0"
bytes = "1" bytes = "1"
env_logger = "0.8" env_logger = "0.8"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }

View File

@ -128,9 +128,16 @@ impl Accept {
let mut events = mio::Events::with_capacity(128); let mut events = mio::Events::with_capacity(128);
loop { loop {
self.poll if let Err(e) = self.poll.poll(&mut events, None) {
.poll(&mut events, None) match e.kind() {
.unwrap_or_else(|e| panic!("Poll error: {}", e)); std::io::ErrorKind::Interrupted => {
continue;
}
_ => {
panic!("Poll error: {}", e);
}
}
}
for event in events.iter() { for event in events.iter() {
let token = event.token(); let token = event.token();

View File

@ -20,7 +20,7 @@ use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{self, ServerWorker, WorkerAvailability, WorkerHandle}; use crate::worker::{self, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle};
use crate::Token; use crate::Token;
/// Server builder /// Server builder
@ -31,10 +31,10 @@ pub struct ServerBuilder {
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, String, MioListener)>, sockets: Vec<(Token, String, MioListener)>,
exit: bool, exit: bool,
shutdown_timeout: Duration,
no_signals: bool, no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>, cmd: UnboundedReceiver<ServerCommand>,
server: Server, server: Server,
worker_config: ServerWorkerConfig,
} }
impl Default for ServerBuilder { impl Default for ServerBuilder {
@ -56,10 +56,10 @@ impl ServerBuilder {
sockets: Vec::new(), sockets: Vec::new(),
backlog: 2048, backlog: 2048,
exit: false, exit: false,
shutdown_timeout: Duration::from_secs(30),
no_signals: false, no_signals: false,
cmd: rx, cmd: rx,
server, server,
worker_config: ServerWorkerConfig::default(),
} }
} }
@ -73,6 +73,24 @@ impl ServerBuilder {
self self
} }
/// Set max number of threads for each worker's blocking task thread pool.
///
/// One thread pool is set up **per worker**; not shared across workers.
///
/// # Examples:
/// ```
/// # use actix_server::ServerBuilder;
/// let builder = ServerBuilder::new()
/// .workers(4) // server has 4 worker thread.
/// .worker_max_blocking_threads(4); // every worker has 4 max blocking threads.
/// ```
///
/// See [tokio::runtime::Builder::max_blocking_threads] for behavior reference.
pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
self.worker_config.max_blocking_threads(num);
self
}
/// Set the maximum number of pending connections. /// Set the maximum number of pending connections.
/// ///
/// This refers to the number of clients that can be waiting to be served. /// This refers to the number of clients that can be waiting to be served.
@ -119,7 +137,8 @@ impl ServerBuilder {
/// ///
/// By default shutdown timeout sets to 30 seconds. /// By default shutdown timeout sets to 30 seconds.
pub fn shutdown_timeout(mut self, sec: u64) -> Self { pub fn shutdown_timeout(mut self, sec: u64) -> Self {
self.shutdown_timeout = Duration::from_secs(sec); self.worker_config
.shutdown_timeout(Duration::from_secs(sec));
self self
} }
@ -274,14 +293,14 @@ impl ServerBuilder {
(0..self.threads) (0..self.threads)
.map(|idx| { .map(|idx| {
// start workers // start workers
let avail = WorkerAvailability::new(waker.clone()); let availability = WorkerAvailability::new(waker.clone());
let services = let factories =
self.services.iter().map(|v| v.clone_factory()).collect(); self.services.iter().map(|v| v.clone_factory()).collect();
let handle = ServerWorker::start( let handle = ServerWorker::start(
idx, idx,
services, factories,
avail, availability,
self.shutdown_timeout, self.worker_config,
); );
handles.push((idx, handle.clone())); handles.push((idx, handle.clone()));
handle handle
@ -303,7 +322,7 @@ impl ServerBuilder {
services: self.services, services: self.services,
notify: Vec::new(), notify: Vec::new(),
exit: self.exit, exit: self.exit,
shutdown_timeout: self.shutdown_timeout, worker_config: self.worker_config,
signals, signals,
on_stop_task: None, on_stop_task: None,
waker_queue, waker_queue,
@ -324,7 +343,7 @@ struct ServerFuture {
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
notify: Vec<oneshot::Sender<()>>, notify: Vec<oneshot::Sender<()>>,
exit: bool, exit: bool,
shutdown_timeout: Duration, worker_config: ServerWorkerConfig,
signals: Option<Signals>, signals: Option<Signals>,
on_stop_task: Option<BoxFuture<'static, ()>>, on_stop_task: Option<BoxFuture<'static, ()>>,
waker_queue: WakerQueue, waker_queue: WakerQueue,
@ -452,10 +471,14 @@ impl ServerFuture {
break; break;
} }
let avail = WorkerAvailability::new(self.waker_queue.clone()); let availability = WorkerAvailability::new(self.waker_queue.clone());
let services = self.services.iter().map(|v| v.clone_factory()).collect(); let factories = self.services.iter().map(|v| v.clone_factory()).collect();
let handle = let handle = ServerWorker::start(
ServerWorker::start(new_idx, services, avail, self.shutdown_timeout); new_idx,
factories,
availability,
self.worker_config,
);
self.handles.push((new_idx, handle.clone())); self.handles.push((new_idx, handle.clone()));
self.waker_queue.wake(WakerInterest::Worker(handle)); self.waker_queue.wake(WakerInterest::Worker(handle));

View File

@ -133,7 +133,7 @@ pub(crate) struct ServerWorker {
conns: Counter, conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState, state: WorkerState,
shutdown_timeout: Duration, config: ServerWorkerConfig,
} }
struct WorkerService { struct WorkerService {
@ -159,26 +159,62 @@ enum WorkerServiceStatus {
Stopped, Stopped,
} }
/// Config for worker behavior passed down from server builder.
#[derive(Copy, Clone)]
pub(crate) struct ServerWorkerConfig {
shutdown_timeout: Duration,
max_blocking_threads: usize,
}
impl Default for ServerWorkerConfig {
fn default() -> Self {
// 512 is the default max blocking thread count of tokio runtime.
let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1);
Self {
shutdown_timeout: Duration::from_secs(30),
max_blocking_threads,
}
}
}
impl ServerWorkerConfig {
pub(crate) fn max_blocking_threads(&mut self, num: usize) {
self.max_blocking_threads = num;
}
pub(crate) fn shutdown_timeout(&mut self, dur: Duration) {
self.shutdown_timeout = dur;
}
}
impl ServerWorker { impl ServerWorker {
pub(crate) fn start( pub(crate) fn start(
idx: usize, idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability, availability: WorkerAvailability,
shutdown_timeout: Duration, config: ServerWorkerConfig,
) -> WorkerHandle { ) -> WorkerHandle {
let (tx1, rx) = unbounded_channel(); let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel(); let (tx2, rx2) = unbounded_channel();
let avail = availability.clone(); let avail = availability.clone();
// every worker runs in it's own arbiter. // every worker runs in it's own arbiter.
Arbiter::new().spawn(Box::pin(async move { // use a custom tokio runtime builder to change the settings of runtime.
Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
})
.spawn(async move {
availability.set(false); availability.set(false);
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
rx, rx,
rx2, rx2,
availability, availability,
factories, factories,
shutdown_timeout, config,
services: Vec::new(), services: Vec::new(),
conns: conns.clone(), conns: conns.clone(),
state: WorkerState::Unavailable, state: WorkerState::Unavailable,
@ -198,6 +234,8 @@ impl ServerWorker {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// a second spawn to make sure worker future runs as non boxed future.
// As Arbiter::spawn would box the future before send it to arbiter.
spawn(async move { spawn(async move {
let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect(); let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect();
match res { match res {
@ -220,7 +258,7 @@ impl ServerWorker {
} }
wrk.await wrk.await
}); });
})); });
WorkerHandle::new(idx, tx1, tx2, avail) WorkerHandle::new(idx, tx1, tx2, avail)
} }
@ -324,7 +362,7 @@ impl Future for ServerWorker {
info!("Graceful worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown( self.state = WorkerState::Shutdown(
Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))), Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))),
Box::pin(sleep_until(Instant::now() + self.shutdown_timeout)), Box::pin(sleep_until(Instant::now() + self.config.shutdown_timeout)),
Some(result), Some(result),
); );
} else { } else {