diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 53b60d1c..a564cfb0 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -2,8 +2,11 @@ ## Unreleased - 2021-xx-xx * Added ServerBuilder::on_stop to run an async closure before Server shutdown [#230] +* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349] +* Remove `ServerBuilder::configure` [#349] [#230]: https://github.com/actix/actix-net/pull/230 +[#349]: https://github.com/actix/actix-net/pull/349 ## 2.0.0-beta.5 - 2021-04-20 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 333e7549..58471cf9 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -28,7 +28,6 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc log = "0.4" mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" -slab = "0.4" tokio = { version = "1.2", features = ["sync"] } [dev-dependencies] diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 7aaa57d0..a14842cf 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -7,18 +7,14 @@ use actix_rt::{ }; use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; -use slab::Slab; use crate::server::Server; use crate::socket::MioListener; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandleAccept}; -use crate::Token; struct ServerSocketInfo { - /// Beware this is the crate token for identify socket and should not be confused - /// with `mio::Token`. - token: Token, + token: usize, lst: MioListener, @@ -62,7 +58,7 @@ impl AcceptLoop { pub(crate) fn start( &mut self, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, handles: Vec, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); @@ -81,7 +77,7 @@ struct Accept { srv: Server, next: usize, avail: Availability, - backpressure: bool, + paused: bool, } /// Array of u128 with every bit as marker for a worker handle's availability. @@ -95,23 +91,22 @@ impl Default for Availability { impl Availability { /// Check if any worker handle is available + #[inline(always)] fn available(&self) -> bool { self.0.iter().any(|a| *a != 0) } + /// Check if worker handle is available by index + #[inline(always)] + fn get_available(&self, idx: usize) -> bool { + let (offset, idx) = Self::offset(idx); + + self.0[offset] & (1 << idx as u128) != 0 + } + /// Set worker handle available state by index. fn set_available(&mut self, idx: usize, avail: bool) { - let (offset, idx) = if idx < 128 { - (0, idx) - } else if idx < 128 * 2 { - (1, idx - 128) - } else if idx < 128 * 3 { - (2, idx - 128 * 2) - } else if idx < 128 * 4 { - (3, idx - 128 * 3) - } else { - panic!("Max WorkerHandle count is 512") - }; + let (offset, idx) = Self::offset(idx); let off = 1 << idx as u128; if avail { @@ -128,6 +123,21 @@ impl Availability { self.set_available(handle.idx(), true); }) } + + /// Get offset and adjusted index of given worker handle index. + fn offset(idx: usize) -> (usize, usize) { + if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + } + } } /// This function defines errors that are per-connection. Which basically @@ -147,7 +157,7 @@ impl Accept { pub(crate) fn start( poll: Poll, waker: WakerQueue, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, srv: Server, handles: Vec, ) { @@ -158,10 +168,10 @@ impl Accept { .name("actix-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - let (mut accept, sockets) = + let (mut accept, mut sockets) = Accept::new_with_sockets(poll, waker, socks, handles, srv); - accept.poll_with(sockets); + accept.poll_with(&mut sockets); }) .unwrap(); } @@ -169,26 +179,25 @@ impl Accept { fn new_with_sockets( poll: Poll, waker: WakerQueue, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, handles: Vec, srv: Server, - ) -> (Accept, Slab) { - let mut sockets = Slab::new(); - for (hnd_token, mut lst) in socks.into_iter() { - let entry = sockets.vacant_entry(); - let token = entry.key(); + ) -> (Accept, Vec) { + let sockets = socks + .into_iter() + .map(|(token, mut lst)| { + // Start listening for incoming connections + poll.registry() + .register(&mut lst, MioToken(token), Interest::READABLE) + .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - // Start listening for incoming connections - poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - - entry.insert(ServerSocketInfo { - token: hnd_token, - lst, - timeout: None, - }); - } + ServerSocketInfo { + token, + lst, + timeout: None, + } + }) + .collect(); let mut avail = Availability::default(); @@ -202,19 +211,19 @@ impl Accept { srv, next: 0, avail, - backpressure: false, + paused: false, }; (accept, sockets) } - fn poll_with(&mut self, mut sockets: Slab) { + fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { let mut events = mio::Events::with_capacity(128); loop { if let Err(e) = self.poll.poll(&mut events, None) { match e.kind() { - std::io::ErrorKind::Interrupted => continue, + io::ErrorKind::Interrupted => continue, _ => panic!("Poll error: {}", e), } } @@ -222,122 +231,146 @@ impl Accept { for event in events.iter() { let token = event.token(); match token { - // This is a loop because interests for command from previous version was - // a loop that would try to drain the command channel. It's yet unknown - // if it's necessary/good practice to actively drain the waker queue. - WAKER_TOKEN => 'waker: loop { - // take guard with every iteration so no new interest can be added - // until the current task is done. - let mut guard = self.waker.guard(); - match guard.pop_front() { - // worker notify it becomes available. we may want to recover - // from backpressure. - Some(WakerInterest::WorkerAvailable(idx)) => { - drop(guard); - self.maybe_backpressure(&mut sockets, false); - self.avail.set_available(idx, true); - } - // a new worker thread is made and it's handle would be added to Accept - Some(WakerInterest::Worker(handle)) => { - drop(guard); - // maybe we want to recover from a backpressure. - self.maybe_backpressure(&mut sockets, false); - self.avail.set_available(handle.idx(), true); - self.handles.push(handle); - } - // got timer interest and it's time to try register socket(s) again - Some(WakerInterest::Timer) => { - drop(guard); - self.process_timer(&mut sockets) - } - Some(WakerInterest::Pause) => { - drop(guard); - self.deregister_all(&mut sockets); - } - Some(WakerInterest::Resume) => { - drop(guard); - sockets.iter_mut().for_each(|(token, info)| { - self.register_logged(token, info); - }); - } - Some(WakerInterest::Stop) => { - return self.deregister_all(&mut sockets); - } - // waker queue is drained - None => { - // Reset the WakerQueue before break so it does not grow infinitely - WakerQueue::reset(&mut guard); - break 'waker; - } + WAKER_TOKEN => { + let exit = self.handle_waker(sockets); + if exit { + info!("Accept is stopped."); + return; } - }, + } _ => { let token = usize::from(token); - self.accept(&mut sockets, token); + self.accept(sockets, token); } } } } } - fn process_timer(&self, sockets: &mut Slab) { + fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool { + // This is a loop because interests for command from previous version was + // a loop that would try to drain the command channel. It's yet unknown + // if it's necessary/good practice to actively drain the waker queue. + loop { + // take guard with every iteration so no new interest can be added + // until the current task is done. + let mut guard = self.waker.guard(); + match guard.pop_front() { + // worker notify it becomes available. + Some(WakerInterest::WorkerAvailable(idx)) => { + drop(guard); + + self.avail.set_available(idx, true); + + if !self.paused { + self.accept_all(sockets); + } + } + // a new worker thread is made and it's handle would be added to Accept + Some(WakerInterest::Worker(handle)) => { + drop(guard); + + self.avail.set_available(handle.idx(), true); + self.handles.push(handle); + + if !self.paused { + self.accept_all(sockets); + } + } + // got timer interest and it's time to try register socket(s) again + Some(WakerInterest::Timer) => { + drop(guard); + + self.process_timer(sockets) + } + Some(WakerInterest::Pause) => { + drop(guard); + + self.paused = true; + + self.deregister_all(sockets); + } + Some(WakerInterest::Resume) => { + drop(guard); + + self.paused = false; + + sockets.iter_mut().for_each(|info| { + self.register_logged(info); + }); + + self.accept_all(sockets); + } + Some(WakerInterest::Stop) => { + self.deregister_all(sockets); + + return true; + } + // waker queue is drained + None => { + // Reset the WakerQueue before break so it does not grow infinitely + WakerQueue::reset(&mut guard); + + return false; + } + } + } + } + + fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { let now = Instant::now(); sockets .iter_mut() // Only sockets that had an associated timeout were deregistered. - .filter(|(_, info)| info.timeout.is_some()) - .for_each(|(token, info)| { + .filter(|info| info.timeout.is_some()) + .for_each(|info| { let inst = info.timeout.take().unwrap(); if now < inst { info.timeout = Some(inst); - } else if !self.backpressure { - self.register_logged(token, info); + } else if !self.paused { + self.register_logged(info); } - // Drop the timeout if server is in backpressure and socket timeout is expired. - // When server recovers from backpressure it will register all sockets without + // Drop the timeout if server is paused and socket timeout is expired. + // When server recovers from pause it will register all sockets without // a timeout value so this socket register will be delayed till then. }); } #[cfg(not(target_os = "windows"))] - fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { + let token = MioToken(info.token); self.poll .registry() - .register(&mut info.lst, MioToken(token), Interest::READABLE) + .register(&mut info.lst, token, Interest::READABLE) } #[cfg(target_os = "windows")] - fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { // On windows, calling register without deregister cause an error. // See https://github.com/actix/actix-web/issues/905 // Calling reregister seems to fix the issue. + let token = MioToken(info.token); self.poll .registry() - .register(&mut info.lst, mio::Token(token), Interest::READABLE) + .register(&mut info.lst, token, Interest::READABLE) .or_else(|_| { - self.poll.registry().reregister( - &mut info.lst, - mio::Token(token), - Interest::READABLE, - ) + self.poll + .registry() + .reregister(&mut info.lst, token, Interest::READABLE) }) } - fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) { - match self.register(token, info) { + fn register_logged(&self, info: &mut ServerSocketInfo) { + match self.register(info) { Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()), Err(e) => error!("Can not register server socket {}", e), } } - fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> { - self.poll.registry().deregister(&mut info.lst) - } - fn deregister_logged(&self, info: &mut ServerSocketInfo) { - match self.deregister(info) { + match self.poll.registry().deregister(&mut info.lst) { Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()), Err(e) => { error!("Can not deregister server socket {}", e) @@ -345,7 +378,7 @@ impl Accept { } } - fn deregister_all(&self, sockets: &mut Slab) { + fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { // This is a best effort implementation with following limitation: // // Every ServerSocketInfo with associate timeout will be skipped and it's timeout @@ -358,70 +391,23 @@ impl Accept { .iter_mut() // Take all timeout. // This is to prevent Accept::process_timer method re-register a socket afterwards. - .map(|(_, info)| (info.timeout.take(), info)) + .map(|info| (info.timeout.take(), info)) // Socket info with a timeout is already deregistered so skip them. .filter(|(timeout, _)| timeout.is_none()) .for_each(|(_, info)| self.deregister_logged(info)); } - fn maybe_backpressure(&mut self, sockets: &mut Slab, on: bool) { - // Only operate when server is in a different backpressure than the given flag. - if self.backpressure != on { - self.backpressure = on; - sockets - .iter_mut() - // Only operate on sockets without associated timeout. - // Sockets with it should be handled by `accept` and `process_timer` methods. - // They are already deregistered or need to be reregister in the future. - .filter(|(_, info)| info.timeout.is_none()) - .for_each(|(token, info)| { - if on { - self.deregister_logged(info); - } else { - self.register_logged(token, info); - } - }); - } - } - - fn accept_one(&mut self, sockets: &mut Slab, mut conn: Conn) { - if self.backpressure { - // send_connection would remove fault worker from handles. - // worst case here is conn get dropped after all handles are gone. - while let Err(c) = self.send_connection(sockets, conn) { - conn = c - } - } else { - while self.avail.available() { - let next = self.next(); - let idx = next.idx(); - if next.available() { - self.avail.set_available(idx, true); - match self.send_connection(sockets, conn) { - Ok(_) => return, - Err(c) => conn = c, - } - } else { - self.avail.set_available(idx, false); - self.set_next(); - } - } - - // Sending Conn failed due to either all workers are in error or not available. - // Enter backpressure state and try again. - self.maybe_backpressure(sockets, true); - self.accept_one(sockets, conn); - } - } - // Send connection to worker and handle error. - fn send_connection( - &mut self, - sockets: &mut Slab, - conn: Conn, - ) -> Result<(), Conn> { - match self.next().send(conn) { + fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> { + let next = self.next(); + match next.send(conn) { Ok(_) => { + // Increment counter of WorkerHandle. + // Set worker to unavailable with it hit max (Return false). + if !next.inc_counter() { + let idx = next.idx(); + self.avail.set_available(idx, false); + } self.set_next(); Ok(()) } @@ -432,7 +418,6 @@ impl Accept { if self.handles.is_empty() { error!("No workers"); - self.maybe_backpressure(sockets, true); // All workers are gone and Conn is nowhere to be sent. // Treat this situation as Ok and drop Conn. return Ok(()); @@ -445,19 +430,38 @@ impl Accept { } } - fn accept(&mut self, sockets: &mut Slab, token: usize) { + fn accept_one(&mut self, mut conn: Conn) { loop { - let info = sockets - .get_mut(token) - .expect("ServerSocketInfo is removed from Slab"); + let next = self.next(); + let idx = next.idx(); + + if self.avail.get_available(idx) { + match self.send_connection(conn) { + Ok(_) => return, + Err(c) => conn = c, + } + } else { + self.avail.set_available(idx, false); + self.set_next(); + + if !self.avail.available() { + while let Err(c) = self.send_connection(conn) { + conn = c; + } + return; + } + } + } + } + + fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { + while self.avail.available() { + let info = &mut sockets[token]; match info.lst.accept() { Ok(io) => { - let msg = Conn { - io, - token: info.token, - }; - self.accept_one(sockets, msg); + let conn = Conn { io, token }; + self.accept_one(conn); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, @@ -485,11 +489,22 @@ impl Accept { } } + fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) { + sockets + .iter_mut() + .map(|info| info.token) + .collect::>() + .into_iter() + .for_each(|idx| self.accept(sockets, idx)) + } + + #[inline(always)] fn next(&self) -> &WorkerHandleAccept { &self.handles[self.next] } /// Set next worker handle that would accept connection. + #[inline(always)] fn set_next(&mut self) { self.next = (self.next + 1) % self.handles.len(); } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index a971894b..f0c2db51 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -9,31 +9,29 @@ use std::{ use actix_rt::{net::TcpStream, time::sleep, System}; use futures_core::future::BoxFuture; use log::{error, info}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; -use tokio::sync::oneshot; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver}, + oneshot, +}; use crate::accept::AcceptLoop; -use crate::config::{ConfiguredService, ServiceConfig}; +use crate::join_all; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ - ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept, - WorkerHandleServer, -}; -use crate::{join_all, Token}; +use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; /// Server builder pub struct ServerBuilder { threads: usize, - token: Token, + token: usize, backlog: u32, handles: Vec<(usize, WorkerHandleServer)>, services: Vec>, - sockets: Vec<(Token, String, MioListener)>, + sockets: Vec<(usize, String, MioListener)>, accept: AcceptLoop, exit: bool, no_signals: bool, @@ -58,7 +56,7 @@ impl ServerBuilder { ServerBuilder { threads: num_cpus::get(), - token: Token::default(), + token: 0, handles: Vec::new(), services: Vec::new(), sockets: Vec::new(), @@ -152,32 +150,6 @@ impl ServerBuilder { self } - /// Execute external configuration as part of the server building process. - /// - /// This function is useful for moving parts of configuration to a different module or - /// even library. - pub fn configure(mut self, f: F) -> io::Result - where - F: Fn(&mut ServiceConfig) -> io::Result<()>, - { - let mut cfg = ServiceConfig::new(self.threads, self.backlog); - - f(&mut cfg)?; - - if let Some(apply) = cfg.apply { - let mut srv = ConfiguredService::new(apply); - for (name, lst) in cfg.services { - let token = self.token.next(); - srv.stream(token, name.clone(), lst.local_addr()?); - self.sockets.push((token, name, MioListener::Tcp(lst))); - } - self.services.push(Box::new(srv)); - } - self.threads = cfg.threads; - - Ok(self) - } - /// Add new service to the server. pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where @@ -187,7 +159,7 @@ impl ServerBuilder { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { - let token = self.token.next(); + let token = self.next_token(); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -236,7 +208,7 @@ impl ServerBuilder { { use std::net::{IpAddr, Ipv4Addr}; lst.set_nonblocking(true)?; - let token = self.token.next(); + let token = self.next_token(); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); self.services.push(StreamNewService::create( name.as_ref().to_string(), @@ -262,7 +234,7 @@ impl ServerBuilder { lst.set_nonblocking(true)?; let addr = lst.local_addr()?; - let token = self.token.next(); + let token = self.next_token(); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -338,12 +310,11 @@ impl ServerBuilder { fn start_worker( &self, idx: usize, - waker: WakerQueue, + waker_queue: WakerQueue, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let avail = WorkerAvailability::new(idx, waker); let services = self.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, services, avail, self.worker_config) + ServerWorker::start(idx, services, waker_queue, self.worker_config) } fn handle_cmd(&mut self, item: ServerCommand) { @@ -465,6 +436,12 @@ impl ServerBuilder { } } } + + fn next_token(&mut self) -> usize { + let token = self.token; + self.token += 1; + token + } } impl Future for ServerBuilder { diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs deleted file mode 100644 index c5e63630..00000000 --- a/actix-server/src/config.rs +++ /dev/null @@ -1,287 +0,0 @@ -use std::collections::HashMap; -use std::future::Future; -use std::{fmt, io}; - -use actix_rt::net::TcpStream; -use actix_service::{ - fn_service, IntoServiceFactory as IntoBaseServiceFactory, - ServiceFactory as BaseServiceFactory, -}; -use actix_utils::{counter::CounterGuard, future::ready}; -use futures_core::future::LocalBoxFuture; -use log::error; - -use crate::builder::bind_addr; -use crate::service::{BoxedServerService, InternalServiceFactory, StreamService}; -use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::Token; - -pub struct ServiceConfig { - pub(crate) services: Vec<(String, MioTcpListener)>, - pub(crate) apply: Option>, - pub(crate) threads: usize, - pub(crate) backlog: u32, -} - -impl ServiceConfig { - pub(super) fn new(threads: usize, backlog: u32) -> ServiceConfig { - ServiceConfig { - threads, - backlog, - services: Vec::new(), - apply: None, - } - } - - /// Set number of workers to start. - /// - /// By default server uses number of available logical cpu as workers - /// count. - pub fn workers(&mut self, num: usize) { - self.threads = num; - } - - /// Add new service to server - pub fn bind>(&mut self, name: N, addr: U) -> io::Result<&mut Self> - where - U: ToSocketAddrs, - { - let sockets = bind_addr(addr, self.backlog)?; - - for lst in sockets { - self._listen(name.as_ref(), lst); - } - - Ok(self) - } - - /// Add new service to server - pub fn listen>(&mut self, name: N, lst: StdTcpListener) -> &mut Self { - self._listen(name, MioTcpListener::from_std(lst)) - } - - /// Register service configuration function. This function get called - /// during worker runtime configuration. It get executed in worker thread. - pub fn apply(&mut self, f: F) -> io::Result<()> - where - F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, - { - self.apply = Some(Box::new(f)); - Ok(()) - } - - fn _listen>(&mut self, name: N, lst: MioTcpListener) -> &mut Self { - if self.apply.is_none() { - self.apply = Some(Box::new(not_configured)); - } - self.services.push((name.as_ref().to_string(), lst)); - self - } -} - -pub(super) struct ConfiguredService { - rt: Box, - names: HashMap, - topics: HashMap, - services: Vec, -} - -impl ConfiguredService { - pub(super) fn new(rt: Box) -> Self { - ConfiguredService { - rt, - names: HashMap::new(), - topics: HashMap::new(), - services: Vec::new(), - } - } - - pub(super) fn stream(&mut self, token: Token, name: String, addr: StdSocketAddr) { - self.names.insert(token, (name.clone(), addr)); - self.topics.insert(name, token); - self.services.push(token); - } -} - -impl InternalServiceFactory for ConfiguredService { - fn name(&self, token: Token) -> &str { - &self.names[&token].0 - } - - fn clone_factory(&self) -> Box { - Box::new(Self { - rt: self.rt.clone(), - names: self.names.clone(), - topics: self.topics.clone(), - services: self.services.clone(), - }) - } - - fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { - // configure services - let mut rt = ServiceRuntime::new(self.topics.clone()); - self.rt.configure(&mut rt); - rt.validate(); - let mut names = self.names.clone(); - let tokens = self.services.clone(); - - // construct services - Box::pin(async move { - let mut services = rt.services; - // TODO: Proper error handling here - for f in rt.onstart.into_iter() { - f.await; - } - let mut res = vec![]; - for token in tokens { - if let Some(srv) = services.remove(&token) { - let newserv = srv.new_service(()); - match newserv.await { - Ok(serv) => { - res.push((token, serv)); - } - Err(_) => { - error!("Can not construct service"); - return Err(()); - } - } - } else { - let name = names.remove(&token).unwrap().0; - res.push(( - token, - Box::new(StreamService::new(fn_service(move |_: TcpStream| { - error!("Service {:?} is not configured", name); - ready::>(Ok(())) - }))), - )); - }; - } - Ok(res) - }) - } -} - -pub(super) trait ServiceRuntimeConfiguration: Send { - fn clone(&self) -> Box; - - fn configure(&self, rt: &mut ServiceRuntime); -} - -impl ServiceRuntimeConfiguration for F -where - F: Fn(&mut ServiceRuntime) + Send + Clone + 'static, -{ - fn clone(&self) -> Box { - Box::new(self.clone()) - } - - fn configure(&self, rt: &mut ServiceRuntime) { - (self)(rt) - } -} - -fn not_configured(_: &mut ServiceRuntime) { - error!("Service is not configured"); -} - -pub struct ServiceRuntime { - names: HashMap, - services: HashMap, - onstart: Vec>, -} - -impl ServiceRuntime { - fn new(names: HashMap) -> Self { - ServiceRuntime { - names, - services: HashMap::new(), - onstart: Vec::new(), - } - } - - fn validate(&self) { - for (name, token) in &self.names { - if !self.services.contains_key(&token) { - error!("Service {:?} is not configured", name); - } - } - } - - /// Register service. - /// - /// Name of the service must be registered during configuration stage with - /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods. - pub fn service(&mut self, name: &str, service: F) - where - F: IntoBaseServiceFactory, - T: BaseServiceFactory + 'static, - T::Future: 'static, - T::Service: 'static, - T::InitError: fmt::Debug, - { - // let name = name.to_owned(); - if let Some(token) = self.names.get(name) { - self.services.insert( - *token, - Box::new(ServiceFactory { - inner: service.into_factory(), - }), - ); - } else { - panic!("Unknown service: {:?}", name); - } - } - - /// Execute future before services initialization. - pub fn on_start(&mut self, fut: F) - where - F: Future + 'static, - { - self.onstart.push(Box::pin(fut)) - } -} - -type BoxedNewService = Box< - dyn BaseServiceFactory< - (CounterGuard, MioStream), - Response = (), - Error = (), - InitError = (), - Config = (), - Service = BoxedServerService, - Future = LocalBoxFuture<'static, Result>, - >, ->; - -struct ServiceFactory { - inner: T, -} - -impl BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory -where - T: BaseServiceFactory, - T::Future: 'static, - T::Service: 'static, - T::Error: 'static, - T::InitError: fmt::Debug + 'static, -{ - type Response = (); - type Error = (); - type Config = (); - type Service = BoxedServerService; - type InitError = (); - type Future = LocalBoxFuture<'static, Result>; - - fn new_service(&self, _: ()) -> Self::Future { - let fut = self.inner.new_service(()); - Box::pin(async move { - match fut.await { - Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService), - Err(e) => { - error!("Can not construct service: {:?}", e); - Err(()) - } - } - }) - } -} diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index af9ab0b0..b2117191 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -6,7 +6,6 @@ mod accept; mod builder; -mod config; mod server; mod service; mod signals; @@ -16,7 +15,6 @@ mod waker_queue; mod worker; pub use self::builder::ServerBuilder; -pub use self::config::{ServiceConfig, ServiceRuntime}; pub use self::server::Server; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; @@ -28,28 +26,6 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -/// Socket ID token -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -pub(crate) struct Token(usize); - -impl Default for Token { - fn default() -> Self { - Self::new() - } -} - -impl Token { - fn new() -> Self { - Self(0) - } - - pub(crate) fn next(&mut self) -> Token { - let token = Token(self.0); - self.0 += 1; - token - } -} - /// Start server building process pub fn new() -> ServerBuilder { ServerBuilder::default() diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index da57af67..28ffb4f1 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -3,15 +3,12 @@ use std::net::SocketAddr; use std::task::{Context, Poll}; use actix_service::{Service, ServiceFactory as BaseServiceFactory}; -use actix_utils::{ - counter::CounterGuard, - future::{ready, Ready}, -}; +use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; use log::error; use crate::socket::{FromStream, MioStream}; -use crate::Token; +use crate::worker::WorkerCounterGuard; pub trait ServiceFactory: Send + Clone + 'static { type Factory: BaseServiceFactory; @@ -20,16 +17,16 @@ pub trait ServiceFactory: Send + Clone + 'static { } pub(crate) trait InternalServiceFactory: Send { - fn name(&self, token: Token) -> &str; + fn name(&self, token: usize) -> &str; fn clone_factory(&self) -> Box; - fn create(&self) -> LocalBoxFuture<'static, Result, ()>>; + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>; } pub(crate) type BoxedServerService = Box< dyn Service< - (CounterGuard, MioStream), + (WorkerCounterGuard, MioStream), Response = (), Error = (), Future = Ready>, @@ -50,7 +47,7 @@ impl StreamService { } } -impl Service<(CounterGuard, MioStream)> for StreamService +impl Service<(WorkerCounterGuard, MioStream)> for StreamService where S: Service, S::Future: 'static, @@ -65,7 +62,7 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future { + fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future { ready(match FromStream::from_mio(req) { Ok(stream) => { let f = self.service.call(stream); @@ -86,7 +83,7 @@ where pub(crate) struct StreamNewService, Io: FromStream> { name: String, inner: F, - token: Token, + token: usize, addr: SocketAddr, _t: PhantomData, } @@ -98,7 +95,7 @@ where { pub(crate) fn create( name: String, - token: Token, + token: usize, inner: F, addr: SocketAddr, ) -> Box { @@ -117,7 +114,7 @@ where F: ServiceFactory, Io: FromStream + Send + 'static, { - fn name(&self, _: Token) -> &str { + fn name(&self, _: usize) -> &str { &self.name } @@ -131,14 +128,14 @@ where }) } - fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { let token = self.token; let fut = self.inner.create().new_service(()); Box::pin(async move { match fut.await { Ok(inner) => { let service = Box::new(StreamService::new(inner)) as _; - Ok(vec![(token, service)]) + Ok((token, service)) } Err(_) => Err(()), } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 7bc211b1..79f15b16 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -2,8 +2,9 @@ use std::{ future::Future, mem, pin::Pin, + rc::Rc, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicUsize, Ordering}, Arc, }, task::{Context, Poll}, @@ -15,7 +16,6 @@ use actix_rt::{ time::{sleep, Instant, Sleep}, Arbiter, }; -use actix_utils::counter::Counter; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; use tokio::sync::{ @@ -23,10 +23,10 @@ use tokio::sync::{ oneshot, }; +use crate::join_all; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::{join_all, Token}; /// Stop worker message. Returns `true` on successful graceful shutdown. /// and `false` if some connections still alive when shutdown execute. @@ -38,35 +38,131 @@ pub(crate) struct Stop { #[derive(Debug)] pub(crate) struct Conn { pub io: MioStream, - pub token: Token, + pub token: usize, } fn handle_pair( idx: usize, tx1: UnboundedSender, tx2: UnboundedSender, - avail: WorkerAvailability, + counter: Counter, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let accept = WorkerHandleAccept { tx: tx1, avail }; + let accept = WorkerHandleAccept { + idx, + tx: tx1, + counter, + }; let server = WorkerHandleServer { idx, tx: tx2 }; (accept, server) } +/// counter: Arc field is owned by `Accept` thread and `ServerWorker` thread. +/// +/// `Accept` would increment the counter and `ServerWorker` would decrement it. +/// +/// # Atomic Ordering: +/// +/// `Accept` always look into it's cached `Availability` field for `ServerWorker` state. +/// It lazily increment counter after successful dispatching new work to `ServerWorker`. +/// On reaching counter limit `Accept` update it's cached `Availability` and mark worker as +/// unable to accept any work. +/// +/// `ServerWorker` always decrement the counter when every work received from `Accept` is done. +/// On reaching counter limit worker would use `mio::Waker` and `WakerQueue` to wake up `Accept` +/// and notify it to update cached `Availability` again to mark worker as able to accept work again. +/// +/// Hense a wake up would only happen after `Accept` increment it to limit. +/// And a decrement to limit always wake up `Accept`. +#[derive(Clone)] +pub(crate) struct Counter { + counter: Arc, + limit: usize, +} + +impl Counter { + pub(crate) fn new(limit: usize) -> Self { + Self { + counter: Arc::new(AtomicUsize::new(1)), + limit, + } + } + + /// Increment counter by 1 and return true when hitting limit + #[inline(always)] + pub(crate) fn inc(&self) -> bool { + self.counter.fetch_add(1, Ordering::Relaxed) != self.limit + } + + /// Decrement counter by 1 and return true if crossing limit. + #[inline(always)] + pub(crate) fn dec(&self) -> bool { + self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit + } + + pub(crate) fn total(&self) -> usize { + self.counter.load(Ordering::SeqCst) - 1 + } +} + +pub(crate) struct WorkerCounter { + idx: usize, + inner: Rc<(WakerQueue, Counter)>, +} + +impl Clone for WorkerCounter { + fn clone(&self) -> Self { + Self { + idx: self.idx, + inner: self.inner.clone(), + } + } +} + +impl WorkerCounter { + pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self { + Self { + idx, + inner: Rc::new((waker_queue, counter)), + } + } + + #[inline(always)] + pub(crate) fn guard(&self) -> WorkerCounterGuard { + WorkerCounterGuard(self.clone()) + } + + fn total(&self) -> usize { + self.inner.1.total() + } +} + +pub(crate) struct WorkerCounterGuard(WorkerCounter); + +impl Drop for WorkerCounterGuard { + fn drop(&mut self) { + let (waker_queue, counter) = &*self.0.inner; + if counter.dec() { + waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx)); + } + } +} + /// Handle to worker that can send connection message to worker and share the /// availability of worker to other thread. /// /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { + idx: usize, tx: UnboundedSender, - avail: WorkerAvailability, + counter: Counter, } impl WorkerHandleAccept { #[inline(always)] pub(crate) fn idx(&self) -> usize { - self.avail.idx + self.idx } #[inline(always)] @@ -75,8 +171,8 @@ impl WorkerHandleAccept { } #[inline(always)] - pub(crate) fn available(&self) -> bool { - self.avail.available() + pub(crate) fn inc_counter(&self) -> bool { + self.counter.inc() } } @@ -96,40 +192,6 @@ impl WorkerHandleServer { } } -#[derive(Clone)] -pub(crate) struct WorkerAvailability { - idx: usize, - waker: WakerQueue, - available: Arc, -} - -impl WorkerAvailability { - pub fn new(idx: usize, waker: WakerQueue) -> Self { - WorkerAvailability { - idx, - waker, - available: Arc::new(AtomicBool::new(false)), - } - } - - #[inline(always)] - pub fn available(&self) -> bool { - self.available.load(Ordering::Acquire) - } - - pub fn set(&self, val: bool) { - // Ordering: - // - // There could be multiple set calls happen in one ::poll. - // Order is important between them. - let old = self.available.swap(val, Ordering::AcqRel); - // Notify the accept on switched to available. - if !old && val { - self.waker.wake(WakerInterest::WorkerAvailable(self.idx)); - } - } -} - /// Service worker. /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. @@ -138,9 +200,8 @@ pub(crate) struct ServerWorker { // It must be dropped as soon as ServerWorker dropping. rx: UnboundedReceiver, rx2: UnboundedReceiver, + counter: WorkerCounter, services: Box<[WorkerService]>, - availability: WorkerAvailability, - conns: Counter, factories: Box<[Box]>, state: WorkerState, shutdown_timeout: Duration, @@ -207,15 +268,15 @@ impl ServerWorker { pub(crate) fn start( idx: usize, factories: Vec>, - availability: WorkerAvailability, + waker_queue: WakerQueue, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { - assert!(!availability.available()); - let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); - let avail = availability.clone(); + let counter = Counter::new(config.max_concurrent_connections); + + let counter_clone = counter.clone(); // every worker runs in it's own arbiter. // use a custom tokio runtime builder to change the settings of runtime. Arbiter::with_tokio_rt(move || { @@ -231,11 +292,7 @@ impl ServerWorker { .enumerate() .map(|(idx, factory)| { let fut = factory.create(); - async move { - fut.await.map(|r| { - r.into_iter().map(|(t, s)| (idx, t, s)).collect::>() - }) - } + async move { fut.await.map(|(t, s)| (idx, t, s)) } }) .collect::>(); @@ -248,9 +305,8 @@ impl ServerWorker { let services = match res { Ok(res) => res .into_iter() - .flatten() .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token.0, services.len()); + assert_eq!(token, services.len()); services.push(WorkerService { factory, service, @@ -271,8 +327,7 @@ impl ServerWorker { rx, rx2, services, - availability, - conns: Counter::new(config.max_concurrent_connections), + counter: WorkerCounter::new(idx, waker_queue, counter_clone), factories: factories.into_boxed_slice(), state: Default::default(), shutdown_timeout: config.shutdown_timeout, @@ -280,16 +335,16 @@ impl ServerWorker { }); }); - handle_pair(idx, tx1, tx2, avail) + handle_pair(idx, tx1, tx2, counter) } - fn restart_service(&mut self, token: Token, factory_id: usize) { + fn restart_service(&mut self, idx: usize, factory_id: usize) { let factory = &self.factories[factory_id]; - trace!("Service {:?} failed, restarting", factory.name(token)); - self.services[token.0].status = WorkerServiceStatus::Restarting; + trace!("Service {:?} failed, restarting", factory.name(idx)); + self.services[idx].status = WorkerServiceStatus::Restarting; self.state = WorkerState::Restarting(Restart { factory_id, - token, + token: idx, fut: factory.create(), }); } @@ -307,8 +362,8 @@ impl ServerWorker { }); } - fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { - let mut ready = self.conns.available(cx); + fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { + let mut ready = true; for (idx, srv) in self.services.iter_mut().enumerate() { if srv.status == WorkerServiceStatus::Available || srv.status == WorkerServiceStatus::Unavailable @@ -318,7 +373,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Unavailable { trace!( "Service {:?} is available", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Available; } @@ -329,7 +384,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Available { trace!( "Service {:?} is unavailable", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Unavailable; } @@ -337,10 +392,10 @@ impl ServerWorker { Poll::Ready(Err(_)) => { error!( "Service {:?} readiness check returned error, restarting", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Failed; - return Err((Token(idx), srv.factory)); + return Err((idx, srv.factory)); } } } @@ -359,8 +414,8 @@ enum WorkerState { struct Restart { factory_id: usize, - token: Token, - fut: LocalBoxFuture<'static, Result, ()>>, + token: usize, + fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>, } // Shutdown keep states necessary for server shutdown: @@ -381,10 +436,6 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { - // Set availability to true so if accept try to send connection to this worker - // it would find worker is gone and remove it. - // This is helpful when worker is dropped unexpected. - self.availability.set(true); // Stop the Arbiter ServerWorker runs on on drop. Arbiter::current().stop(); } @@ -399,8 +450,7 @@ impl Future for ServerWorker { // `StopWorker` message handler if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) { - this.availability.set(false); - let num = this.conns.total(); + let num = this.counter.total(); if num == 0 { info!("Shutting down worker, 0 connections"); let _ = tx.send(true); @@ -427,7 +477,6 @@ impl Future for ServerWorker { WorkerState::Unavailable => match this.check_readiness(cx) { Ok(true) => { this.state = WorkerState::Available; - this.availability.set(true); self.poll(cx) } Ok(false) => Poll::Pending, @@ -440,26 +489,22 @@ impl Future for ServerWorker { let factory_id = restart.factory_id; let token = restart.token; - let service = ready!(restart.fut.as_mut().poll(cx)) + let (token_new, service) = ready!(restart.fut.as_mut().poll(cx)) .unwrap_or_else(|_| { panic!( "Can not restart {:?} service", this.factories[factory_id].name(token) ) - }) - .into_iter() - // Find the same token from vector. There should be only one - // So the first match would be enough. - .find(|(t, _)| *t == token) - .map(|(_, service)| service) - .expect("No BoxedServerService found"); + }); + + assert_eq!(token, token_new); trace!( "Service {:?} has been restarted", this.factories[factory_id].name(token) ); - this.services[token.0].created(service); + this.services[token].created(service); this.state = WorkerState::Unavailable; self.poll(cx) @@ -468,7 +513,7 @@ impl Future for ServerWorker { // Wait for 1 second. ready!(shutdown.timer.as_mut().poll(cx)); - if this.conns.total() == 0 { + if this.counter.total() == 0 { // Graceful shutdown. if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { let _ = shutdown.tx.send(true); @@ -493,22 +538,20 @@ impl Future for ServerWorker { Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); - this.availability.set(false); this.state = WorkerState::Unavailable; return self.poll(cx); } Err((token, idx)) => { this.restart_service(token, idx); - this.availability.set(false); return self.poll(cx); } } + // handle incoming io stream match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { - // handle incoming io stream Some(msg) => { - let guard = this.conns.get(); - let _ = this.services[msg.token.0].service.call((guard, msg.io)); + let guard = this.counter.guard(); + let _ = this.services[msg.token].service.call((guard, msg.io)); } None => return Poll::Ready(()), }; diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 09dbf676..31997676 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -142,57 +142,6 @@ fn test_start() { let _ = h.join(); } -#[test] -fn test_configure() { - let addr1 = unused_addr(); - let addr2 = unused_addr(); - let addr3 = unused_addr(); - let (tx, rx) = mpsc::channel(); - let num = Arc::new(AtomicUsize::new(0)); - let num2 = num.clone(); - - let h = thread::spawn(move || { - let num = num2.clone(); - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() - .disable_signals() - .configure(move |cfg| { - let num = num.clone(); - let lst = net::TcpListener::bind(addr3).unwrap(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .listen("addr3", lst) - .apply(move |rt| { - let num = num.clone(); - rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); - rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); - rt.on_start(lazy(move |_| { - let _ = num.fetch_add(1, Ordering::Relaxed); - })) - }) - }) - .unwrap() - .workers(1) - .run() - })); - - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); - }); - let (_, sys) = rx.recv().unwrap(); - thread::sleep(Duration::from_millis(500)); - - assert!(net::TcpStream::connect(addr1).is_ok()); - assert!(net::TcpStream::connect(addr2).is_ok()); - assert!(net::TcpStream::connect(addr3).is_ok()); - assert_eq!(num.load(Ordering::Relaxed), 1); - sys.stop(); - let _ = h.join(); -} - #[actix_rt::test] async fn test_max_concurrent_connections() { // Note: @@ -305,81 +254,6 @@ async fn test_service_restart() { let num_clone = num.clone(); let num2_clone = num2.clone(); - let h = thread::spawn(move || { - actix_rt::System::new().block_on(async { - let server = Server::build() - .backlog(1) - .disable_signals() - .configure(move |cfg| { - let num = num.clone(); - let num2 = num2.clone(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .apply(move |rt| { - let num = num.clone(); - let num2 = num2.clone(); - rt.service( - "addr1", - fn_factory(move || { - let num = num.clone(); - async move { Ok::<_, ()>(TestService(num)) } - }), - ); - rt.service( - "addr2", - fn_factory(move || { - let num2 = num2.clone(); - async move { Ok::<_, ()>(TestService(num2)) } - }), - ); - }) - }) - .unwrap() - .workers(1) - .run(); - - let _ = tx.send((server.clone(), actix_rt::System::current())); - server.await - }) - }); - - let (server, sys) = rx.recv().unwrap(); - - for _ in 0..5 { - TcpStream::connect(addr1) - .await - .unwrap() - .shutdown() - .await - .unwrap(); - TcpStream::connect(addr2) - .await - .unwrap() - .shutdown() - .await - .unwrap(); - } - - sleep(Duration::from_secs(3)).await; - - assert!(num_clone.load(Ordering::SeqCst) > 5); - assert!(num2_clone.load(Ordering::SeqCst) > 5); - - sys.stop(); - let _ = server.stop(false); - let _ = h.join().unwrap(); - - let addr1 = unused_addr(); - let addr2 = unused_addr(); - let (tx, rx) = mpsc::channel(); - let num = Arc::new(AtomicUsize::new(0)); - let num2 = Arc::new(AtomicUsize::new(0)); - - let num_clone = num.clone(); - let num2_clone = num2.clone(); - let h = thread::spawn(move || { let num = num.clone(); actix_rt::System::new().block_on(async {