diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 72651993..4bff3b5c 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -27,7 +27,6 @@ actix-utils = "3.0.0" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } log = "0.4" -mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" tokio = { version = "1.2", features = ["sync"] } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index ab0ae707..f48e6ee9 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -2,165 +2,45 @@ use std::time::Duration; use std::{io, thread}; use actix_rt::{ - time::{sleep, Instant}, + time::{sleep, Instant, Sleep}, System, }; -use log::{error, info}; -use mio::{Interest, Poll, Token as MioToken}; +use log::error; +use tokio::sync::mpsc::UnboundedReceiver; use crate::server::Server; -use crate::socket::{MioListener, SocketAddr}; -use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; +use crate::socket::Listener; use crate::worker::{Conn, WorkerHandleAccept}; struct ServerSocketInfo { - /// Address of socket. Mainly used for logging. - addr: SocketAddr, - token: usize, - - lst: MioListener, - - /// Timeout is used to mark the deadline when this socket's listener should be registered again - /// after an error. - timeout: Option, -} - -/// Accept loop would live with `ServerBuilder`. -/// -/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to -/// `Accept` and `Worker`. -/// -/// It would also listen to `ServerCommand` and push interests to `WakerQueue`. -pub(crate) struct AcceptLoop { - srv: Option, - poll: Option, - waker: WakerQueue, -} - -impl AcceptLoop { - pub fn new(srv: Server) -> Self { - let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); - let waker = WakerQueue::new(poll.registry()) - .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); - - Self { - srv: Some(srv), - poll: Some(poll), - waker, - } - } - - pub(crate) fn waker_owned(&self) -> WakerQueue { - self.waker.clone() - } - - pub fn wake(&self, i: WakerInterest) { - self.waker.wake(i); - } - - pub(crate) fn start( - &mut self, - socks: Vec<(usize, MioListener)>, - handles: Vec, - ) { - let srv = self.srv.take().expect("Can not re-use AcceptInfo"); - let poll = self.poll.take().unwrap(); - let waker = self.waker.clone(); - - Accept::start(poll, waker, socks, srv, handles); - } + lst: Listener, } /// poll instance of the server. -struct Accept { - poll: Poll, - waker: WakerQueue, +pub(crate) struct Accept { handles: Vec, + sockets: Box<[ServerSocketInfo]>, + rx: UnboundedReceiver, srv: Server, next: usize, avail: Availability, paused: bool, + timeout: Pin>, } -/// Array of u128 with every bit as marker for a worker handle's availability. -struct Availability([u128; 4]); - -impl Default for Availability { - fn default() -> Self { - Self([0; 4]) - } -} - -impl Availability { - /// Check if any worker handle is available - #[inline(always)] - fn available(&self) -> bool { - self.0.iter().any(|a| *a != 0) - } - - /// Check if worker handle is available by index - #[inline(always)] - fn get_available(&self, idx: usize) -> bool { - let (offset, idx) = Self::offset(idx); - - self.0[offset] & (1 << idx as u128) != 0 - } - - /// Set worker handle available state by index. - fn set_available(&mut self, idx: usize, avail: bool) { - let (offset, idx) = Self::offset(idx); - - let off = 1 << idx as u128; - if avail { - self.0[offset] |= off; - } else { - self.0[offset] &= !off - } - } - - /// Set all worker handle to available state. - /// This would result in a re-check on all workers' availability. - fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { - handles.iter().for_each(|handle| { - self.set_available(handle.idx(), true); - }) - } - - /// Get offset and adjusted index of given worker handle index. - fn offset(idx: usize) -> (usize, usize) { - if idx < 128 { - (0, idx) - } else if idx < 128 * 2 { - (1, idx - 128) - } else if idx < 128 * 3 { - (2, idx - 128 * 2) - } else if idx < 128 * 4 { - (3, idx - 128 * 3) - } else { - panic!("Max WorkerHandle count is 512") - } - } -} - -/// This function defines errors that are per-connection. Which basically -/// means that if we get this error from `accept()` system call it means -/// next connection might be ready to be accepted. -/// -/// All other errors will incur a timeout before next `accept()` is performed. -/// The timeout is useful to handle resource exhaustion errors like ENFILE -/// and EMFILE. Otherwise, could enter into tight loop. -fn connection_error(e: &io::Error) -> bool { - e.kind() == io::ErrorKind::ConnectionRefused - || e.kind() == io::ErrorKind::ConnectionAborted - || e.kind() == io::ErrorKind::ConnectionReset +pub(crate) enum Interest { + Pause, + Resume, + Stop, + WorkerIndex(usize), + Worker(WorkerHandleAccept), } impl Accept { pub(crate) fn start( - poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, + socks: Vec<(usize, Listener)>, + rx: UnboundedReceiver, srv: Server, handles: Vec, ) { @@ -171,38 +51,28 @@ impl Accept { .name("actix-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - let (mut accept, mut sockets) = - Accept::new_with_sockets(poll, waker, socks, handles, srv); - accept.poll_with(&mut sockets); + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let accept = Self::new(socks, rx, srv, handles); + accept.await + }); }) .unwrap(); } - fn new_with_sockets( - poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, - handles: Vec, + fn new( + socks: Vec<(usize, Listener)>, + rx: UnboundedReceiver, srv: Server, - ) -> (Accept, Vec) { + handles: Vec, + ) -> Self { let sockets = socks .into_iter() - .map(|(token, mut lst)| { - let addr = lst.local_addr(); - - // Start listening for incoming connections - poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - - ServerSocketInfo { - addr, - token, - lst, - timeout: None, - } - }) + .map(|(token, lst)| ServerSocketInfo { token, lst }) .collect(); let mut avail = Availability::default(); @@ -210,199 +80,18 @@ impl Accept { // Assume all handles are avail at construct time. avail.set_available_all(&handles); - let accept = Accept { - poll, - waker, + Accept { handles, + sockets, + rx, srv, next: 0, avail, paused: false, - }; - - (accept, sockets) - } - - fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { - let mut events = mio::Events::with_capacity(128); - - loop { - if let Err(e) = self.poll.poll(&mut events, None) { - match e.kind() { - io::ErrorKind::Interrupted => continue, - _ => panic!("Poll error: {}", e), - } - } - - for event in events.iter() { - let token = event.token(); - match token { - WAKER_TOKEN => { - let exit = self.handle_waker(sockets); - if exit { - info!("Accept is stopped."); - return; - } - } - _ => { - let token = usize::from(token); - self.accept(sockets, token); - } - } - } + timeout: Box::pin(sleep(Duration::from_millis(500))), } } - fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool { - // This is a loop because interests for command from previous version was - // a loop that would try to drain the command channel. It's yet unknown - // if it's necessary/good practice to actively drain the waker queue. - loop { - // take guard with every iteration so no new interest can be added - // until the current task is done. - let mut guard = self.waker.guard(); - match guard.pop_front() { - // worker notify it becomes available. - Some(WakerInterest::WorkerAvailable(idx)) => { - drop(guard); - - self.avail.set_available(idx, true); - - if !self.paused { - self.accept_all(sockets); - } - } - // a new worker thread is made and it's handle would be added to Accept - Some(WakerInterest::Worker(handle)) => { - drop(guard); - - self.avail.set_available(handle.idx(), true); - self.handles.push(handle); - - if !self.paused { - self.accept_all(sockets); - } - } - // got timer interest and it's time to try register socket(s) again - Some(WakerInterest::Timer) => { - drop(guard); - - self.process_timer(sockets) - } - Some(WakerInterest::Pause) => { - drop(guard); - - self.paused = true; - - self.deregister_all(sockets); - } - Some(WakerInterest::Resume) => { - drop(guard); - - self.paused = false; - - sockets.iter_mut().for_each(|info| { - self.register_logged(info); - }); - - self.accept_all(sockets); - } - Some(WakerInterest::Stop) => { - self.deregister_all(sockets); - - return true; - } - // waker queue is drained - None => { - // Reset the WakerQueue before break so it does not grow infinitely - WakerQueue::reset(&mut guard); - - return false; - } - } - } - } - - fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { - let now = Instant::now(); - sockets - .iter_mut() - // Only sockets that had an associated timeout were deregistered. - .filter(|info| info.timeout.is_some()) - .for_each(|info| { - let inst = info.timeout.take().unwrap(); - - if now < inst { - info.timeout = Some(inst); - } else if !self.paused { - self.register_logged(info); - } - - // Drop the timeout if server is paused and socket timeout is expired. - // When server recovers from pause it will register all sockets without - // a timeout value so this socket register will be delayed till then. - }); - } - - #[cfg(not(target_os = "windows"))] - fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { - let token = MioToken(info.token); - self.poll - .registry() - .register(&mut info.lst, token, Interest::READABLE) - } - - #[cfg(target_os = "windows")] - fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { - // On windows, calling register without deregister cause an error. - // See https://github.com/actix/actix-web/issues/905 - // Calling reregister seems to fix the issue. - let token = MioToken(info.token); - self.poll - .registry() - .register(&mut info.lst, token, Interest::READABLE) - .or_else(|_| { - self.poll - .registry() - .reregister(&mut info.lst, token, Interest::READABLE) - }) - } - - fn register_logged(&self, info: &mut ServerSocketInfo) { - match self.register(info) { - Ok(_) => info!("Resume accepting connections on {}", info.addr), - Err(e) => error!("Can not register server socket {}", e), - } - } - - fn deregister_logged(&self, info: &mut ServerSocketInfo) { - match self.poll.registry().deregister(&mut info.lst) { - Ok(_) => info!("Paused accepting connections on {}", info.addr), - Err(e) => { - error!("Can not deregister server socket {}", e) - } - } - } - - fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { - // This is a best effort implementation with following limitation: - // - // Every ServerSocketInfo with associate timeout will be skipped and it's timeout - // is removed in the process. - // - // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short - // gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered - // before expected timing. - sockets - .iter_mut() - // Take all timeout. - // This is to prevent Accept::process_timer method re-register a socket afterwards. - .map(|info| (info.timeout.take(), info)) - // Socket info with a timeout is already deregistered so skip them. - .filter(|(timeout, _)| timeout.is_none()) - .for_each(|(_, info)| self.deregister_logged(info)); - } - // Send connection to worker and handle error. fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> { let next = self.next(); @@ -460,50 +149,6 @@ impl Accept { } } - fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { - while self.avail.available() { - let info = &mut sockets[token]; - - match info.lst.accept() { - Ok(io) => { - let conn = Conn { io, token }; - self.accept_one(conn); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, - Err(ref e) if connection_error(e) => continue, - Err(e) => { - error!("Error accepting connection: {}", e); - - // deregister listener temporary - self.deregister_logged(info); - - // sleep after error. write the timeout to socket info as later - // the poll would need it mark which socket and when it's - // listener should be registered - info.timeout = Some(Instant::now() + Duration::from_millis(500)); - - // after the sleep a Timer interest is sent to Accept Poll - let waker = self.waker.clone(); - System::current().arbiter().spawn(async move { - sleep(Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); - - return; - } - }; - } - } - - fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) { - sockets - .iter_mut() - .map(|info| info.token) - .collect::>() - .into_iter() - .for_each(|idx| self.accept(sockets, idx)) - } - #[inline(always)] fn next(&self) -> &WorkerHandleAccept { &self.handles[self.next] @@ -526,6 +171,149 @@ impl Accept { } } +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +impl Future for Accept { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + while let Poll::Ready(Some(interest)) = this.rx.poll_recv(cx) { + match interest { + Interest::WorkerIndex(idx) => { + this.avail.set_available(idx, true); + } + Interest::Worker(handle) => { + this.avail.set_available(handle.idx(), true); + this.handles.push(handle); + } + Interest::Pause => { + this.paused = true; + break; + } + Interest::Resume => this.paused = false, + Interest::Stop => return Poll::Ready(()), + } + } + + if this.paused { + return Poll::Pending; + } + + let len = this.sockets.len(); + let mut idx = 0; + while idx < len { + 'socket: loop { + if !this.avail.available() { + return Poll::Pending; + } + + let socket = &mut this.sockets[idx]; + + match socket.lst.poll_accept(cx) { + Poll::Ready(Ok(io)) => { + let conn = Conn { + io, + token: socket.token, + }; + this.accept_one(conn); + } + Poll::Ready(Err(ref e)) if connection_error(e) => continue 'socket, + Poll::Ready(Err(ref e)) => { + error!("Error accepting connection: {}", e); + + let deadline = Instant::now() + Duration::from_millis(500); + this.timeout.as_mut().reset(deadline); + let _ = this.timeout.as_mut().poll(cx); + + break 'socket; + } + Poll::Pending => break 'socket, + }; + } + idx += 1; + } + + Poll::Pending + } +} + +/// This function defines errors that are per-connection. Which basically +/// means that if we get this error from `accept()` system call it means +/// next connection might be ready to be accepted. +/// +/// All other errors will incur a timeout before next `accept()` is performed. +/// The timeout is useful to handle resource exhaustion errors like ENFILE +/// and EMFILE. Otherwise, could enter into tight loop. +fn connection_error(e: &io::Error) -> bool { + e.kind() == io::ErrorKind::ConnectionRefused + || e.kind() == io::ErrorKind::ConnectionAborted + || e.kind() == io::ErrorKind::ConnectionReset +} + +/// Array of u128 with every bit as marker for a worker handle's availability. +struct Availability([u128; 4]); + +impl Default for Availability { + fn default() -> Self { + Self([0; 4]) + } +} + +impl Availability { + /// Check if any worker handle is available + #[inline(always)] + fn available(&self) -> bool { + self.0.iter().any(|a| *a != 0) + } + + /// Check if worker handle is available by index + #[inline(always)] + fn get_available(&self, idx: usize) -> bool { + let (offset, idx) = Self::offset(idx); + + self.0[offset] & (1 << idx as u128) != 0 + } + + /// Set worker handle available state by index. + fn set_available(&mut self, idx: usize, avail: bool) { + let (offset, idx) = Self::offset(idx); + + let off = 1 << idx as u128; + if avail { + self.0[offset] |= off; + } else { + self.0[offset] &= !off + } + } + + /// Set all worker handle to available state. + /// This would result in a re-check on all workers' availability. + fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { + handles.iter().for_each(|handle| { + self.set_available(handle.idx(), true); + }) + } + + /// Get offset and adjusted index of given worker handle index. + fn offset(idx: usize) -> (usize, usize) { + if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + } + } +} + #[cfg(test)] mod test { use super::Availability; diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index bd694e7c..31dda10a 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -9,19 +9,18 @@ use std::{ use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; use log::{error, info}; use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver}, + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, }; -use crate::accept::AcceptLoop; +use crate::accept::{Accept, Interest}; use crate::config::{ConfiguredService, ServiceConfig}; use crate::join_all; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; -use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::socket::{Listener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; +use crate::socket::{TcpListener, TcpSocket}; use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; /// Server builder @@ -31,8 +30,8 @@ pub struct ServerBuilder { backlog: u32, handles: Vec<(usize, WorkerHandleServer)>, services: Vec>, - sockets: Vec<(usize, String, MioListener)>, - accept: AcceptLoop, + sockets: Vec<(usize, String, Listener)>, + accept: Option>, exit: bool, no_signals: bool, cmd: UnboundedReceiver, @@ -59,7 +58,7 @@ impl ServerBuilder { handles: Vec::new(), services: Vec::new(), sockets: Vec::new(), - accept: AcceptLoop::new(server.clone()), + accept: None, backlog: 2048, exit: false, no_signals: false, @@ -165,7 +164,7 @@ impl ServerBuilder { for (name, lst) in cfg.services { let token = self.next_token(); srv.stream(token, name.clone(), lst.local_addr()?); - self.sockets.push((token, name, MioListener::Tcp(lst))); + self.sockets.push((token, name, Listener::Tcp(lst))); } self.services.push(Box::new(srv)); } @@ -191,7 +190,7 @@ impl ServerBuilder { lst.local_addr()?, )); self.sockets - .push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); + .push((token, name.as_ref().to_string(), Listener::Tcp(lst))); } Ok(self) } @@ -241,7 +240,7 @@ impl ServerBuilder { addr, )); self.sockets - .push((token, name.as_ref().to_string(), MioListener::from(lst))); + .push((token, name.as_ref().to_string(), Listener::from(lst))); Ok(self) } @@ -267,7 +266,7 @@ impl ServerBuilder { )); self.sockets - .push((token, name.as_ref().to_string(), MioListener::from(lst))); + .push((token, name.as_ref().to_string(), Listener::from(lst))); Ok(self) } @@ -279,11 +278,14 @@ impl ServerBuilder { } else { info!("Starting {} workers", self.threads); + let (tx, rx) = unbounded_channel(); + + self.accept = Some(tx); + // start workers let handles = (0..self.threads) .map(|idx| { - let (handle_accept, handle_server) = - self.start_worker(idx, self.accept.waker_owned()); + let (handle_accept, handle_server) = self.start_worker(idx); self.handles.push((idx, handle_server)); handle_accept @@ -294,13 +296,13 @@ impl ServerBuilder { for sock in &self.sockets { info!("Starting \"{}\" service on {}", sock.1, sock.2); } - self.accept.start( - mem::take(&mut self.sockets) - .into_iter() - .map(|t| (t.0, t.2)) - .collect(), - handles, - ); + + let sockets = mem::take(&mut self.sockets) + .into_iter() + .map(|t| (t.0, t.2)) + .collect(); + + Accept::start(sockets, rx, self.server.clone(), handles); // handle signals if !self.no_signals { @@ -314,24 +316,20 @@ impl ServerBuilder { } } - fn start_worker( - &self, - idx: usize, - waker_queue: WakerQueue, - ) -> (WorkerHandleAccept, WorkerHandleServer) { + fn start_worker(&self, idx: usize) -> (WorkerHandleAccept, WorkerHandleServer) { let services = self.services.iter().map(|v| v.clone_factory()).collect(); - - ServerWorker::start(idx, services, waker_queue, self.worker_config) + let accept_tx = self.accept.as_ref().cloned().unwrap(); + ServerWorker::start(idx, services, accept_tx, self.worker_config) } fn handle_cmd(&mut self, item: ServerCommand) { match item { ServerCommand::Pause(tx) => { - self.accept.wake(WakerInterest::Pause); + let _ = self.accept.as_ref().unwrap().send(Interest::Pause); let _ = tx.send(()); } ServerCommand::Resume(tx) => { - self.accept.wake(WakerInterest::Resume); + let _ = self.accept.as_ref().unwrap().send(Interest::Resume); let _ = tx.send(()); } ServerCommand::Signal(sig) => { @@ -375,7 +373,7 @@ impl ServerBuilder { let exit = self.exit; // stop accept thread - self.accept.wake(WakerInterest::Stop); + let _ = self.accept.as_ref().unwrap().send(Interest::Stop); let notify = std::mem::take(&mut self.notify); // stop workers @@ -427,10 +425,13 @@ impl ServerBuilder { break; } - let (handle_accept, handle_server) = - self.start_worker(new_idx, self.accept.waker_owned()); + let (handle_accept, handle_server) = self.start_worker(new_idx); self.handles.push((new_idx, handle_server)); - self.accept.wake(WakerInterest::Worker(handle_accept)); + let _ = self + .accept + .as_ref() + .unwrap() + .send(Interest::Worker(handle_accept)); } } } @@ -459,7 +460,7 @@ impl Future for ServerBuilder { pub(super) fn bind_addr( addr: S, backlog: u32, -) -> io::Result> { +) -> io::Result> { let mut err = None; let mut succ = false; let mut sockets = Vec::new(); @@ -487,10 +488,10 @@ pub(super) fn bind_addr( } } -fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result { +fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result { let socket = match addr { - StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?, - StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?, + StdSocketAddr::V4(_) => TcpSocket::new_v4()?, + StdSocketAddr::V6(_) => TcpSocket::new_v6()?, }; socket.set_reuseaddr(true)?; diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 111daf90..2a28efb5 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -14,12 +14,12 @@ use log::error; use crate::{ builder::bind_addr, service::{BoxedServerService, InternalServiceFactory, StreamService}, - socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}, + socket::{StdSocketAddr, StdTcpListener, Stream, TcpListener, ToSocketAddrs}, worker::WorkerCounterGuard, }; pub struct ServiceConfig { - pub(crate) services: Vec<(String, MioTcpListener)>, + pub(crate) services: Vec<(String, TcpListener)>, pub(crate) apply: Option>, pub(crate) threads: usize, pub(crate) backlog: u32, @@ -59,7 +59,8 @@ impl ServiceConfig { /// Add new service to server pub fn listen>(&mut self, name: N, lst: StdTcpListener) -> &mut Self { - self._listen(name, MioTcpListener::from_std(lst)) + // TODO: Handle unwrap + self._listen(name, TcpListener::from_std(lst).unwrap()) } /// Register service configuration function. This function get called @@ -72,7 +73,7 @@ impl ServiceConfig { Ok(()) } - fn _listen>(&mut self, name: N, lst: MioTcpListener) -> &mut Self { + fn _listen>(&mut self, name: N, lst: TcpListener) -> &mut Self { if self.apply.is_none() { self.apply = Some(Box::new(not_configured)); } @@ -245,7 +246,7 @@ impl ServiceRuntime { type BoxedNewService = Box< dyn BaseServiceFactory< - (WorkerCounterGuard, MioStream), + (WorkerCounterGuard, Stream), Response = (), Error = (), InitError = (), @@ -259,7 +260,7 @@ struct ServiceFactory { inner: T, } -impl BaseServiceFactory<(WorkerCounterGuard, MioStream)> for ServiceFactory +impl BaseServiceFactory<(WorkerCounterGuard, Stream)> for ServiceFactory where T: BaseServiceFactory, T::Future: 'static, diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index ba7d0c29..33c43a91 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -12,7 +12,6 @@ mod service; mod signals; mod socket; mod test_server; -mod waker_queue; mod worker; pub use self::builder::ServerBuilder; diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index bc436e75..c6b2fcc4 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -7,7 +7,7 @@ use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; use log::error; -use crate::socket::{FromStream, MioStream}; +use crate::socket::{FromStream, Stream}; use crate::worker::WorkerCounterGuard; pub trait ServiceFactory: Send + Clone + 'static { @@ -26,7 +26,7 @@ pub(crate) trait InternalServiceFactory: Send { pub(crate) type BoxedServerService = Box< dyn Service< - (WorkerCounterGuard, MioStream), + (WorkerCounterGuard, Stream), Response = (), Error = (), Future = Ready>, @@ -47,7 +47,7 @@ impl StreamService { } } -impl Service<(WorkerCounterGuard, MioStream)> for StreamService +impl Service<(WorkerCounterGuard, Stream)> for StreamService where S: Service, S::Future: 'static, @@ -62,7 +62,7 @@ where self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future { + fn call(&self, (guard, req): (WorkerCounterGuard, Stream)) -> Self::Future { ready(match FromStream::from_mio(req) { Ok(stream) => { let f = self.service.call(stream); diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 948b5f1f..ab6fad3e 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -2,157 +2,85 @@ pub(crate) use std::net::{ SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs, }; -pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket}; +pub(crate) use actix_rt::net::{TcpListener, TcpSocket}; #[cfg(unix)] pub(crate) use { - mio::net::UnixListener as MioUnixListener, - std::os::unix::net::UnixListener as StdUnixListener, + actix_rt::net::UnixListener, std::os::unix::net::UnixListener as StdUnixListener, }; -use std::{fmt, io}; +use std::{ + fmt, io, + task::{Context, Poll}, +}; use actix_rt::net::TcpStream; -use mio::{event::Source, Interest, Registry, Token}; -pub(crate) enum MioListener { - Tcp(MioTcpListener), +pub(crate) enum Listener { + Tcp(TcpListener), #[cfg(unix)] - Uds(MioUnixListener), + Uds(tokio::net::UnixListener), } -impl MioListener { - pub(crate) fn local_addr(&self) -> SocketAddr { +impl Listener { + pub(crate) fn poll_accept(&self, cx: &mut Context<'_>) -> Poll> { match *self { - MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), + Self::Tcp(ref lst) => lst + .poll_accept(cx) + .map_ok(|(stream, _)| Stream::Tcp(stream)), #[cfg(unix)] - MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()), - } - } - - pub(crate) fn accept(&self) -> io::Result { - match *self { - MioListener::Tcp(ref lst) => lst.accept().map(|(stream, _)| MioStream::Tcp(stream)), - #[cfg(unix)] - MioListener::Uds(ref lst) => lst.accept().map(|(stream, _)| MioStream::Uds(stream)), + Self::Uds(ref lst) => lst + .poll_accept(cx) + .map_ok(|(stream, _)| Stream::Uds(stream)), } } } -impl Source for MioListener { - fn register( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { - match *self { - MioListener::Tcp(ref mut lst) => lst.register(registry, token, interests), - #[cfg(unix)] - MioListener::Uds(ref mut lst) => lst.register(registry, token, interests), - } - } - - fn reregister( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { - match *self { - MioListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests), - #[cfg(unix)] - MioListener::Uds(ref mut lst) => lst.reregister(registry, token, interests), - } - } - - fn deregister(&mut self, registry: &Registry) -> io::Result<()> { - match *self { - MioListener::Tcp(ref mut lst) => lst.deregister(registry), - #[cfg(unix)] - MioListener::Uds(ref mut lst) => { - let res = lst.deregister(registry); - - // cleanup file path - if let Ok(addr) = lst.local_addr() { - if let Some(path) = addr.as_pathname() { - let _ = std::fs::remove_file(path); - } - } - res - } - } - } -} - -impl From for MioListener { +// TODO: use TryFrom +impl From for Listener { fn from(lst: StdTcpListener) -> Self { - MioListener::Tcp(MioTcpListener::from_std(lst)) + let lst = TcpListener::from_std(lst).unwrap(); + Listener::Tcp(lst) } } #[cfg(unix)] -impl From for MioListener { +impl From for Listener { fn from(lst: StdUnixListener) -> Self { - MioListener::Uds(MioUnixListener::from_std(lst)) + let lst = UnixListener::from_std(lst).unwrap(); + Listener::Uds(lst) } } -impl fmt::Debug for MioListener { +impl fmt::Debug for Listener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - MioListener::Tcp(ref lst) => write!(f, "{:?}", lst), + Listener::Tcp(ref lst) => write!(f, "{:?}", lst), #[cfg(all(unix))] - MioListener::Uds(ref lst) => write!(f, "{:?}", lst), + Listener::Uds(ref lst) => write!(f, "{:?}", lst), } } } -impl fmt::Display for MioListener { +impl fmt::Display for Listener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), + Listener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), #[cfg(unix)] - MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), - } - } -} - -pub(crate) enum SocketAddr { - Tcp(StdSocketAddr), - #[cfg(unix)] - Uds(mio::net::SocketAddr), -} - -impl fmt::Display for SocketAddr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), - #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), - } - } -} - -impl fmt::Debug for SocketAddr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), - #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + Listener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), } } } #[derive(Debug)] -pub enum MioStream { - Tcp(mio::net::TcpStream), +pub enum Stream { + Tcp(actix_rt::net::TcpStream), #[cfg(unix)] - Uds(mio::net::UnixStream), + Uds(actix_rt::net::UnixStream), } /// helper trait for converting mio stream to tokio stream. pub trait FromStream: Sized { - fn from_mio(sock: MioStream) -> io::Result; + fn from_mio(stream: Stream) -> io::Result; } #[cfg(windows)] @@ -163,13 +91,9 @@ mod win_impl { // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream impl FromStream for TcpStream { - fn from_mio(sock: MioStream) -> io::Result { - match sock { - MioStream::Tcp(mio) => { - let raw = IntoRawSocket::into_raw_socket(mio); - // SAFETY: This is a in place conversion from mio stream to tokio stream. - TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) - } + fn from_mio(stream: MioStream) -> io::Result { + match stream { + MioStream::Tcp(stream) => Ok(stream), } } } @@ -179,20 +103,12 @@ mod win_impl { mod unix_impl { use super::*; - use std::os::unix::io::{FromRawFd, IntoRawFd}; - - use actix_rt::net::UnixStream; - // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream impl FromStream for TcpStream { - fn from_mio(sock: MioStream) -> io::Result { - match sock { - MioStream::Tcp(mio) => { - let raw = IntoRawFd::into_raw_fd(mio); - // SAFETY: This is a in place conversion from mio stream to tokio stream. - TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) - } - MioStream::Uds(_) => { + fn from_mio(stream: Stream) -> io::Result { + match stream { + Stream::Tcp(stream) => Ok(stream), + Stream::Uds(_) => { panic!("Should not happen, bug in server impl"); } } @@ -200,53 +116,49 @@ mod unix_impl { } // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream - impl FromStream for UnixStream { - fn from_mio(sock: MioStream) -> io::Result { - match sock { - MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), - MioStream::Uds(mio) => { - let raw = IntoRawFd::into_raw_fd(mio); - // SAFETY: This is a in place conversion from mio stream to tokio stream. - UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) - } + impl FromStream for actix_rt::net::UnixStream { + fn from_mio(stream: Stream) -> io::Result { + match stream { + Stream::Tcp(_) => panic!("Should not happen, bug in server impl"), + Stream::Uds(stream) => Ok(stream), } } } } -#[cfg(test)] -mod tests { - use super::*; +// #[cfg(test)] +// mod tests { +// use super::*; - #[test] - fn socket_addr() { - let addr = SocketAddr::Tcp("127.0.0.1:8080".parse().unwrap()); - assert!(format!("{:?}", addr).contains("127.0.0.1:8080")); - assert_eq!(format!("{}", addr), "127.0.0.1:8080"); +// #[test] +// fn socket_addr() { +// let addr = SocketAddr::Tcp("127.0.0.1:8080".parse().unwrap()); +// assert!(format!("{:?}", addr).contains("127.0.0.1:8080")); +// assert_eq!(format!("{}", addr), "127.0.0.1:8080"); - let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = MioTcpSocket::new_v4().unwrap(); - socket.set_reuseaddr(true).unwrap(); - socket.bind(addr).unwrap(); - let tcp = socket.listen(128).unwrap(); - let lst = MioListener::Tcp(tcp); - assert!(format!("{:?}", lst).contains("TcpListener")); - assert!(format!("{}", lst).contains("127.0.0.1")); - } +// let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); +// let socket = MioTcpSocket::new_v4().unwrap(); +// socket.set_reuseaddr(true).unwrap(); +// socket.bind(addr).unwrap(); +// let tcp = socket.listen(128).unwrap(); +// let lst = Listener::Tcp(tcp); +// assert!(format!("{:?}", lst).contains("TcpListener")); +// assert!(format!("{}", lst).contains("127.0.0.1")); +// } - #[test] - #[cfg(unix)] - fn uds() { - let _ = std::fs::remove_file("/tmp/sock.xxxxx"); - if let Ok(socket) = MioUnixListener::bind("/tmp/sock.xxxxx") { - let addr = socket.local_addr().expect("Couldn't get local address"); - let a = SocketAddr::Uds(addr); - assert!(format!("{:?}", a).contains("/tmp/sock.xxxxx")); - assert!(format!("{}", a).contains("/tmp/sock.xxxxx")); +// #[test] +// #[cfg(unix)] +// fn uds() { +// let _ = std::fs::remove_file("/tmp/sock.xxxxx"); +// if let Ok(socket) = MioUnixListener::bind("/tmp/sock.xxxxx") { +// let addr = socket.local_addr().expect("Couldn't get local address"); +// let a = SocketAddr::Uds(addr); +// assert!(format!("{:?}", a).contains("/tmp/sock.xxxxx")); +// assert!(format!("{}", a).contains("/tmp/sock.xxxxx")); - let lst = MioListener::Uds(socket); - assert!(format!("{:?}", lst).contains("/tmp/sock.xxxxx")); - assert!(format!("{}", lst).contains("/tmp/sock.xxxxx")); - } - } -} +// let lst = Listener::Uds(socket); +// assert!(format!("{:?}", lst).contains("/tmp/sock.xxxxx")); +// assert!(format!("{}", lst).contains("/tmp/sock.xxxxx")); +// } +// } +// } diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index 0611cf4b..aeb4c962 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -102,7 +102,7 @@ impl TestServer { /// Get first available unused local address pub fn unused_addr() -> net::SocketAddr { let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = mio::net::TcpSocket::new_v4().unwrap(); + let socket = actix_rt::net::TcpSocket::new_v4().unwrap(); socket.bind(addr).unwrap(); socket.set_reuseaddr(true).unwrap(); let tcp = socket.listen(1024).unwrap(); diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs deleted file mode 100644 index 3f8669d4..00000000 --- a/actix-server/src/waker_queue.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::{ - collections::VecDeque, - ops::Deref, - sync::{Arc, Mutex, MutexGuard}, -}; - -use mio::{Registry, Token as MioToken, Waker}; - -use crate::worker::WorkerHandleAccept; - -/// Waker token for `mio::Poll` instance. -pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); - -/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` -/// the `Poll` would want to look into. -pub(crate) struct WakerQueue(Arc<(Waker, Mutex>)>); - -impl Clone for WakerQueue { - fn clone(&self) -> Self { - Self(self.0.clone()) - } -} - -impl Deref for WakerQueue { - type Target = (Waker, Mutex>); - - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} - -impl WakerQueue { - /// Construct a waker queue with given `Poll`'s `Registry` and capacity. - /// - /// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match - /// event's token for it to properly handle `WakerInterest`. - pub(crate) fn new(registry: &Registry) -> std::io::Result { - let waker = Waker::new(registry, WAKER_TOKEN)?; - let queue = Mutex::new(VecDeque::with_capacity(16)); - - Ok(Self(Arc::new((waker, queue)))) - } - - /// Push a new interest to the queue and wake up the accept poll afterwards. - pub(crate) fn wake(&self, interest: WakerInterest) { - let (waker, queue) = self.deref(); - - queue - .lock() - .expect("Failed to lock WakerQueue") - .push_back(interest); - - waker - .wake() - .unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e)); - } - - /// Get a MutexGuard of the waker queue. - pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque> { - self.deref().1.lock().expect("Failed to lock WakerQueue") - } - - /// Reset the waker queue so it does not grow infinitely. - pub(crate) fn reset(queue: &mut VecDeque) { - std::mem::swap(&mut VecDeque::::with_capacity(16), queue); - } -} - -/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker. -/// -/// These interests should not be confused with `mio::Interest` and mostly not I/O related -pub(crate) enum WakerInterest { - /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker - /// available and can accept new tasks. - WorkerAvailable(usize), - /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to - /// `ServerCommand` and notify `Accept` to do exactly these tasks. - Pause, - Resume, - Stop, - /// `Timer` is an interest sent as a delayed future. When an error happens on accepting - /// connection `Accept` would deregister socket listener temporary and wake up the poll and - /// register them again after the delayed future resolve. - Timer, - /// `Worker` is an interest happen after a worker runs into faulted state(This is determined - /// by if work can be sent to it successfully).`Accept` would be waked up and add the new - /// `WorkerHandleAccept`. - Worker(WorkerHandleAccept), -} diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index df8bc723..da9b65be 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -23,10 +23,9 @@ use tokio::sync::{ oneshot, }; -use crate::join_all; use crate::service::{BoxedServerService, InternalServiceFactory}; -use crate::socket::MioStream; -use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::socket::Stream; +use crate::{accept::Interest, join_all}; /// Stop worker message. Returns `true` on successful graceful shutdown. /// and `false` if some connections still alive when shutdown execute. @@ -37,7 +36,7 @@ pub(crate) struct Stop { #[derive(Debug)] pub(crate) struct Conn { - pub io: MioStream, + pub io: Stream, pub token: usize, } @@ -91,7 +90,7 @@ impl Counter { pub(crate) struct WorkerCounter { idx: usize, - inner: Rc<(WakerQueue, Counter)>, + inner: Rc<(UnboundedSender, Counter)>, } impl Clone for WorkerCounter { @@ -104,10 +103,14 @@ impl Clone for WorkerCounter { } impl WorkerCounter { - pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self { + pub(crate) fn new( + idx: usize, + accept_tx: UnboundedSender, + counter: Counter, + ) -> Self { Self { idx, - inner: Rc::new((waker_queue, counter)), + inner: Rc::new((accept_tx, counter)), } } @@ -125,9 +128,9 @@ pub(crate) struct WorkerCounterGuard(WorkerCounter); impl Drop for WorkerCounterGuard { fn drop(&mut self) { - let (waker_queue, counter) = &*self.0.inner; + let (accept_tx, counter) = &*self.0.inner; if counter.derc() { - waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx)); + let _ = accept_tx.send(Interest::WorkerIndex(self.0.idx)); } } } @@ -251,7 +254,7 @@ impl ServerWorker { pub(crate) fn start( idx: usize, factories: Vec>, - waker_queue: WakerQueue, + accept_tx: UnboundedSender, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { let (tx1, rx) = unbounded_channel(); @@ -315,7 +318,7 @@ impl ServerWorker { rx, rx2, services, - counter: WorkerCounter::new(idx, waker_queue, counter_clone), + counter: WorkerCounter::new(idx, accept_tx, counter_clone), factories: factories.into_boxed_slice(), state: Default::default(), shutdown_timeout: config.shutdown_timeout, diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index cc9f8190..1b54b3ca 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -10,65 +10,44 @@ use futures_util::future::lazy; fn unused_addr() -> net::SocketAddr { let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = mio::net::TcpSocket::new_v4().unwrap(); + let socket = actix_rt::net::TcpSocket::new_v4().unwrap(); socket.bind(addr).unwrap(); socket.set_reuseaddr(true).unwrap(); let tcp = socket.listen(32).unwrap(); tcp.local_addr().unwrap() } -#[test] -fn test_bind() { +#[actix_rt::test] +async fn test_bind() { let addr = unused_addr(); - let (tx, rx) = mpsc::channel(); + let srv = Server::build() + .workers(1) + .disable_signals() + .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) + .unwrap() + .run(); - let h = thread::spawn(move || { - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() - .workers(1) - .disable_signals() - .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .run() - })); - - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); - }); - let (_, sys) = rx.recv().unwrap(); - - thread::sleep(Duration::from_millis(500)); + actix_rt::time::sleep(Duration::from_millis(500)).await; assert!(net::TcpStream::connect(addr).is_ok()); - sys.stop(); - let _ = h.join(); + srv.stop(true).await; } -#[test] -fn test_listen() { +#[actix_rt::test] +async fn test_listen() { let addr = unused_addr(); - let (tx, rx) = mpsc::channel(); - let h = thread::spawn(move || { - let sys = actix_rt::System::new(); - let lst = net::TcpListener::bind(addr).unwrap(); - sys.block_on(async { - Server::build() - .disable_signals() - .workers(1) - .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .run(); - let _ = tx.send(actix_rt::System::current()); - }); - let _ = sys.run(); - }); - let sys = rx.recv().unwrap(); + let lst = net::TcpListener::bind(addr).unwrap(); + let server = Server::build() + .disable_signals() + .workers(1) + .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) + .unwrap() + .run(); - thread::sleep(Duration::from_millis(500)); + actix_rt::time::sleep(Duration::from_millis(500)).await; assert!(net::TcpStream::connect(addr).is_ok()); - sys.stop(); - let _ = h.join(); + + server.stop(true).await; } #[test] @@ -80,13 +59,13 @@ fn test_start() { use bytes::Bytes; use futures_util::sink::SinkExt; - let addr = unused_addr(); let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() + actix_rt::System::new().block_on(async { + let addr = unused_addr(); + + let srv = Server::build() .backlog(100) .disable_signals() .bind("test", addr, move || { @@ -97,14 +76,15 @@ fn test_start() { }) }) .unwrap() - .run() - })); + .run(); - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); + let _ = tx.send((srv.clone(), addr)); + + let _ = srv.await; + }); }); - let (srv, sys) = rx.recv().unwrap(); + let (srv, addr) = rx.recv().unwrap(); let mut buf = [1u8; 4]; let mut conn = net::TcpStream::connect(addr).unwrap(); @@ -137,25 +117,22 @@ fn test_start() { thread::sleep(Duration::from_millis(100)); assert!(net::TcpStream::connect(addr).is_err()); - thread::sleep(Duration::from_millis(100)); - sys.stop(); let _ = h.join(); } #[test] fn test_configure() { - let addr1 = unused_addr(); - let addr2 = unused_addr(); - let addr3 = unused_addr(); let (tx, rx) = mpsc::channel(); let num = Arc::new(AtomicUsize::new(0)); let num2 = num.clone(); let h = thread::spawn(move || { let num = num2.clone(); - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() + actix_rt::System::new().block_on(async { + let addr1 = unused_addr(); + let addr2 = unused_addr(); + let addr3 = unused_addr(); + let srv = Server::build() .disable_signals() .configure(move |cfg| { let num = num.clone(); @@ -176,20 +153,20 @@ fn test_configure() { }) .unwrap() .workers(1) - .run() - })); + .run(); + let _ = tx.send((srv.clone(), addr1, addr2, addr3)); - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); + let _ = srv.await; + }); }); - let (_, sys) = rx.recv().unwrap(); + let (srv, addr1, addr2, addr3) = rx.recv().unwrap(); thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok()); assert!(net::TcpStream::connect(addr3).is_ok()); assert_eq!(num.load(Ordering::Relaxed), 1); - sys.stop(); + let _ = srv.stop(false); let _ = h.join(); } @@ -234,13 +211,13 @@ async fn test_max_concurrent_connections() { })? .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); + let _ = tx.send(server.clone()); server.await }) }); - let (srv, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); let mut conns = vec![]; @@ -261,7 +238,6 @@ async fn test_max_concurrent_connections() { srv.stop(false).await; - sys.stop(); let _ = h.join().unwrap(); }