diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 70728064..be59f125 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -2,8 +2,12 @@ ## Unreleased - 2021-xx-xx * Hidden `ServerBuilder::start` method has been removed. Use `ServerBuilder::run`. [#246] +* Add retry for EINTR(`io::Interrupted`) in `Accept`'s poll loop. [#264] +* Add `ServerBuilder::worker_max_blocking_threads` for customize blocking thread pool. [#265] [#246]: https://github.com/actix/actix-net/pull/246 +[#264]: https://github.com/actix/actix-net/pull/264 +[#265]: https://github.com/actix/actix-net/pull/265 ## 2.0.0-beta.2 - 2021-01-03 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 845dc03e..db9d4d8b 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -36,7 +36,7 @@ slab = "0.4" tokio = { version = "1", features = ["sync"] } [dev-dependencies] -actix-rt = "2.0.0-beta.2" +actix-rt = "2.0.0" bytes = "1" env_logger = "0.8" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index ac960ae8..170ef1dc 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -128,9 +128,16 @@ impl Accept { let mut events = mio::Events::with_capacity(128); loop { - self.poll - .poll(&mut events, None) - .unwrap_or_else(|e| panic!("Poll error: {}", e)); + if let Err(e) = self.poll.poll(&mut events, None) { + match e.kind() { + std::io::ErrorKind::Interrupted => { + continue; + } + _ => { + panic!("Poll error: {}", e); + } + } + } for event in events.iter() { let token = event.token(); diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index fdaf94f4..429be852 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -20,7 +20,7 @@ use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{self, ServerWorker, WorkerAvailability, WorkerHandle}; +use crate::worker::{self, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle}; use crate::Token; /// Server builder @@ -31,10 +31,10 @@ pub struct ServerBuilder { services: Vec>, sockets: Vec<(Token, String, MioListener)>, exit: bool, - shutdown_timeout: Duration, no_signals: bool, cmd: UnboundedReceiver, server: Server, + worker_config: ServerWorkerConfig, } impl Default for ServerBuilder { @@ -56,10 +56,10 @@ impl ServerBuilder { sockets: Vec::new(), backlog: 2048, exit: false, - shutdown_timeout: Duration::from_secs(30), no_signals: false, cmd: rx, server, + worker_config: ServerWorkerConfig::default(), } } @@ -73,6 +73,24 @@ impl ServerBuilder { self } + /// Set max number of threads for each worker's blocking task thread pool. + /// + /// One thread pool is set up **per worker**; not shared across workers. + /// + /// # Examples: + /// ``` + /// # use actix_server::ServerBuilder; + /// let builder = ServerBuilder::new() + /// .workers(4) // server has 4 worker thread. + /// .worker_max_blocking_threads(4); // every worker has 4 max blocking threads. + /// ``` + /// + /// See [tokio::runtime::Builder::max_blocking_threads] for behavior reference. + pub fn worker_max_blocking_threads(mut self, num: usize) -> Self { + self.worker_config.max_blocking_threads(num); + self + } + /// Set the maximum number of pending connections. /// /// This refers to the number of clients that can be waiting to be served. @@ -119,7 +137,8 @@ impl ServerBuilder { /// /// By default shutdown timeout sets to 30 seconds. pub fn shutdown_timeout(mut self, sec: u64) -> Self { - self.shutdown_timeout = Duration::from_secs(sec); + self.worker_config + .shutdown_timeout(Duration::from_secs(sec)); self } @@ -274,14 +293,14 @@ impl ServerBuilder { (0..self.threads) .map(|idx| { // start workers - let avail = WorkerAvailability::new(waker.clone()); - let services = + let availability = WorkerAvailability::new(waker.clone()); + let factories = self.services.iter().map(|v| v.clone_factory()).collect(); let handle = ServerWorker::start( idx, - services, - avail, - self.shutdown_timeout, + factories, + availability, + self.worker_config, ); handles.push((idx, handle.clone())); handle @@ -303,7 +322,7 @@ impl ServerBuilder { services: self.services, notify: Vec::new(), exit: self.exit, - shutdown_timeout: self.shutdown_timeout, + worker_config: self.worker_config, signals, on_stop_task: None, waker_queue, @@ -324,7 +343,7 @@ struct ServerFuture { services: Vec>, notify: Vec>, exit: bool, - shutdown_timeout: Duration, + worker_config: ServerWorkerConfig, signals: Option, on_stop_task: Option>, waker_queue: WakerQueue, @@ -452,10 +471,14 @@ impl ServerFuture { break; } - let avail = WorkerAvailability::new(self.waker_queue.clone()); - let services = self.services.iter().map(|v| v.clone_factory()).collect(); - let handle = - ServerWorker::start(new_idx, services, avail, self.shutdown_timeout); + let availability = WorkerAvailability::new(self.waker_queue.clone()); + let factories = self.services.iter().map(|v| v.clone_factory()).collect(); + let handle = ServerWorker::start( + new_idx, + factories, + availability, + self.worker_config, + ); self.handles.push((new_idx, handle.clone())); self.waker_queue.wake(WakerInterest::Worker(handle)); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 25a0429c..defc7306 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -133,7 +133,7 @@ pub(crate) struct ServerWorker { conns: Counter, factories: Vec>, state: WorkerState, - shutdown_timeout: Duration, + config: ServerWorkerConfig, } struct WorkerService { @@ -159,26 +159,62 @@ enum WorkerServiceStatus { Stopped, } +/// Config for worker behavior passed down from server builder. +#[derive(Copy, Clone)] +pub(crate) struct ServerWorkerConfig { + shutdown_timeout: Duration, + max_blocking_threads: usize, +} + +impl Default for ServerWorkerConfig { + fn default() -> Self { + // 512 is the default max blocking thread count of tokio runtime. + let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1); + Self { + shutdown_timeout: Duration::from_secs(30), + max_blocking_threads, + } + } +} + +impl ServerWorkerConfig { + pub(crate) fn max_blocking_threads(&mut self, num: usize) { + self.max_blocking_threads = num; + } + + pub(crate) fn shutdown_timeout(&mut self, dur: Duration) { + self.shutdown_timeout = dur; + } +} + impl ServerWorker { pub(crate) fn start( idx: usize, factories: Vec>, availability: WorkerAvailability, - shutdown_timeout: Duration, + config: ServerWorkerConfig, ) -> WorkerHandle { let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); let avail = availability.clone(); // every worker runs in it's own arbiter. - Arbiter::new().spawn(Box::pin(async move { + // use a custom tokio runtime builder to change the settings of runtime. + Arbiter::with_tokio_rt(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap() + }) + .spawn(async move { availability.set(false); let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { rx, rx2, availability, factories, - shutdown_timeout, + config, services: Vec::new(), conns: conns.clone(), state: WorkerState::Unavailable, @@ -198,6 +234,8 @@ impl ServerWorker { }) .collect::>(); + // a second spawn to make sure worker future runs as non boxed future. + // As Arbiter::spawn would box the future before send it to arbiter. spawn(async move { let res: Result, _> = join_all(fut).await.into_iter().collect(); match res { @@ -220,7 +258,7 @@ impl ServerWorker { } wrk.await }); - })); + }); WorkerHandle::new(idx, tx1, tx2, avail) } @@ -324,7 +362,7 @@ impl Future for ServerWorker { info!("Graceful worker shutdown, {} connections", num); self.state = WorkerState::Shutdown( Box::pin(sleep_until(Instant::now() + Duration::from_secs(1))), - Box::pin(sleep_until(Instant::now() + self.shutdown_timeout)), + Box::pin(sleep_until(Instant::now() + self.config.shutdown_timeout)), Some(result), ); } else {