diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 7f64095e..069a6af4 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -1,6 +1,7 @@ use std::any::{Any, TypeId}; use std::cell::{Cell, RefCell}; use std::collections::HashMap; +use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; @@ -9,7 +10,7 @@ use std::{fmt, thread}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::oneshot::{channel, Canceled, Sender}; use futures_util::{ - future::{self, Future, FutureExt}, + future::{self, FutureExt}, stream::Stream, }; diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 244d1251..e875a296 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,9 +1,10 @@ use std::borrow::Cow; +use std::future::Future; use std::io; use futures_channel::mpsc::unbounded; use futures_channel::oneshot::{channel, Receiver}; -use futures_util::future::{lazy, Future, FutureExt}; +use futures_util::future::{lazy, FutureExt}; use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemArbiter}; diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index e75076ae..371663b5 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -1,6 +1,5 @@ //! A runtime implementation that runs everything on the current thread. -#![deny(rust_2018_idioms, warnings)] -#![allow(clippy::type_complexity)] +#![forbid(unsafe_code)] #[cfg(not(test))] // Work around for rust-lang/rust#62127 pub use actix_macros::{main, test}; @@ -25,7 +24,7 @@ pub use actix_threadpool as blocking; /// This function panics if actix system is not running. pub fn spawn(f: F) where - F: futures_util::future::Future + 'static, + F: std::future::Future + 'static, { if !System::is_set() { panic!("System is not running"); diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 61e571a4..c843736d 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -5,6 +5,7 @@ * Use `concurrent-queue` to manage poll wakes instead of `futures::channel::mpsc`. * Remove `AcceptNotify` type and pass `WakerQueue` to `WorkerClient` for notify the `Accept` more directly. * Convert `mio::Stream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`IntoRawSocket` and `FromRawSocket` on windows). +* Remove `AsyncRead` and `AsyncWrite` trait bound for `socket::FromStream` trait. ## 1.0.4 - 2020-09-12 * Update actix-codec to 0.3.0. diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index a3dcad92..93c49ead 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -17,6 +17,7 @@ struct ServerSocketInfo { addr: SocketAddr, token: Token, sock: MioSocketListener, + // timeout is used to mark the time this socket should be reregistered after an error. timeout: Option, } @@ -34,10 +35,7 @@ pub(crate) struct AcceptLoop { impl AcceptLoop { pub fn new(srv: Server) -> Self { - // Create a poll instance. let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create mio::Poll: {}", e)); - - // construct a waker queue which would wake up poll with associate extra interest types. let waker = WakerQueue::with_capacity(poll.registry(), 128).unwrap(); Self { @@ -51,11 +49,11 @@ impl AcceptLoop { self.waker.clone() } - pub fn wake_accept(&self, i: WakerInterest) { + pub fn wake(&self, i: WakerInterest) { self.waker.wake(i); } - pub(crate) fn start_accept( + pub(crate) fn start( &mut self, socks: Vec<(Token, StdListener)>, workers: Vec, @@ -101,9 +99,9 @@ impl Accept { srv: Server, workers: Vec, ) { + // Accept runs in its own thread and would want to spawn additional futures to current + // actix system. let sys = System::current(); - - // start accept thread thread::Builder::new() .name("actix-server accept loop".to_owned()) .spawn(move || { @@ -121,14 +119,13 @@ impl Accept { socks: Vec<(Token, StdListener)>, workers: Vec, srv: Server, - // Accept and sockets info are separated so that we can borrow mut on both at the same time ) -> (Accept, Slab) { - // Start accept + let mut sockets = Slab::new(); for (hnd_token, lst) in socks.into_iter() { let addr = lst.local_addr(); - let mut server = lst + let mut sock = lst .into_mio_listener() .unwrap_or_else(|e| panic!("Can not set non_block on listener: {}", e)); let entry = sockets.vacant_entry(); @@ -136,13 +133,13 @@ impl Accept { // Start listening for incoming connections poll.registry() - .register(&mut server, MioToken(token + DELTA), Interest::READABLE) + .register(&mut sock, MioToken(token + DELTA), Interest::READABLE) .unwrap_or_else(|e| panic!("Can not register io: {}", e)); entry.insert(ServerSocketInfo { addr, token: hnd_token, - sock: server, + sock, timeout: None, }); } @@ -160,7 +157,6 @@ impl Accept { } fn poll_with(&mut self, mut sockets: Slab) { - // Create storage for events let mut events = mio::Events::with_capacity(128); loop { @@ -171,36 +167,40 @@ impl Accept { for event in events.iter() { let token = event.token(); match token { - // This is a loop because interests for command were a loop that would try to - // drain the command channel. + // This is a loop because interests for command from previous version was a + // loop that would try to drain the command channel. It's yet unknown if it's + // necessary/good practice to actively drain the waker queue. WAKER_TOKEN => 'waker: loop { match self.waker.pop() { + // worker notify it's availability has change. we maybe want to enter + // backpressure or recover from one. Ok(WakerInterest::Notify) => { - self.maybe_backpressure(&mut sockets, false) + self.maybe_backpressure(&mut sockets, false); } Ok(WakerInterest::Pause) => { - for (_, info) in sockets.iter_mut() { - if let Err(err) = - self.poll.registry().deregister(&mut info.sock) - { + sockets.iter_mut().for_each(|(_, info)| { + if let Err(err) = self.deregister(info) { error!("Can not deregister server socket {}", err); } else { info!("Paused accepting connections on {}", info.addr); } - } + }); } Ok(WakerInterest::Resume) => { - for (token, info) in sockets.iter_mut() { + sockets.iter_mut().for_each(|(token, info)| { self.register_logged(token, info); - } + }); } Ok(WakerInterest::Stop) => { - return self.deregister_all(&mut sockets) + return self.deregister_all(&mut sockets); } + // a new worker thread is made and it's client would be added to Accept Ok(WakerInterest::Worker(worker)) => { + // maybe we want to recover from a backpressure. self.maybe_backpressure(&mut sockets, false); self.workers.push(worker); } + // got timer interest and it's time to try register socket(s) again. Ok(WakerInterest::Timer) => self.process_timer(&mut sockets), Err(WakerQueueError::Empty) => break 'waker, Err(WakerQueueError::Closed) => { @@ -220,9 +220,10 @@ impl Accept { } } - fn process_timer(&mut self, sockets: &mut Slab) { + fn process_timer(&self, sockets: &mut Slab) { let now = Instant::now(); for (token, info) in sockets.iter_mut() { + // only the sockets have an associate timeout value was de registered. if let Some(inst) = info.timeout.take() { if now > inst { self.register_logged(token, info); @@ -270,9 +271,13 @@ impl Accept { } } - fn deregister_all(&mut self, sockets: &mut Slab) { + fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> { + self.poll.registry().deregister(&mut info.sock) + } + + fn deregister_all(&self, sockets: &mut Slab) { sockets.iter_mut().for_each(|(_, info)| { - let _ = self.poll.registry().deregister(&mut info.sock); + let _ = self.deregister(info); }); } @@ -299,6 +304,9 @@ impl Accept { break; } Err(tmp) => { + // worker lost contact and could be gone. a message is sent to + // `ServerBuilder` future to notify it a new worker should be made. + // after that remove the fault worker and enter backpressure if necessary. self.srv.worker_faulted(self.workers[self.next].idx); msg = tmp; self.workers.swap_remove(self.next); @@ -322,6 +330,9 @@ impl Accept { self.set_next(); return; } + // worker lost contact and could be gone. a message is sent to + // `ServerBuilder` future to notify it a new worker should be made. + // after that remove the fault worker and enter backpressure if necessary. Err(tmp) => { self.srv.worker_faulted(self.workers[self.next].idx); msg = tmp; @@ -363,14 +374,17 @@ impl Accept { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, Err(e) => { + // deregister socket temporary error!("Error accepting connection: {}", e); if let Err(err) = self.poll.registry().deregister(&mut info.sock) { error!("Can not deregister server socket {}", err); } - // sleep after error + // sleep after error. write the timeout to socket info as later the poll + // would need it mark which socket and when it should be registered. info.timeout = Some(Instant::now() + Duration::from_millis(500)); + // after the sleep a Timer interest is sent to Accept Poll let waker = self.waker.clone(); System::current().arbiter().send(Box::pin(async move { sleep_until(Instant::now() + Duration::from_millis(510)).await; diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index d01eff44..e5551201 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -203,7 +203,7 @@ impl ServerBuilder { self.listen_uds(name, lst, factory) } - #[cfg(all(unix))] + #[cfg(unix)] /// Add new unix domain service to the server. /// Useful when running as a systemd service and /// a socket FD can be acquired using the systemd crate. @@ -278,7 +278,7 @@ impl ServerBuilder { for sock in &self.sockets { info!("Starting \"{}\" service on {}", sock.1, sock.2); } - self.accept.start_accept( + self.accept.start( mem::take(&mut self.sockets) .into_iter() .map(|t| (t.0, t.2)) @@ -309,11 +309,11 @@ impl ServerBuilder { fn handle_cmd(&mut self, item: ServerCommand) { match item { ServerCommand::Pause(tx) => { - self.accept.wake_accept(WakerInterest::Pause); + self.accept.wake(WakerInterest::Pause); let _ = tx.send(()); } ServerCommand::Resume(tx) => { - self.accept.wake_accept(WakerInterest::Resume); + self.accept.wake(WakerInterest::Resume); let _ = tx.send(()); } ServerCommand::Signal(sig) => { @@ -357,7 +357,7 @@ impl ServerBuilder { let exit = self.exit; // stop accept thread - self.accept.wake_accept(WakerInterest::Stop); + self.accept.wake(WakerInterest::Stop); let notify = std::mem::take(&mut self.notify); // stop workers @@ -436,7 +436,7 @@ impl ServerBuilder { let worker = self.start_worker(new_idx, self.accept.waker_owned()); self.workers.push((new_idx, worker.clone())); - self.accept.wake_accept(WakerInterest::Worker(worker)); + self.accept.wake(WakerInterest::Worker(worker)); } } } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index b29a9e02..69f16628 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -5,7 +5,6 @@ use std::task::{Context, Poll}; use futures_channel::mpsc::UnboundedSender; use futures_channel::oneshot; -use futures_util::FutureExt; use crate::builder::ServerBuilder; use crate::signals::Signal; @@ -56,14 +55,18 @@ impl Server { pub fn pause(&self) -> impl Future { let (tx, rx) = oneshot::channel(); let _ = self.0.unbounded_send(ServerCommand::Pause(tx)); - rx.map(|_| ()) + async { + let _ = rx.await; + } } /// Resume accepting incoming connections pub fn resume(&self) -> impl Future { let (tx, rx) = oneshot::channel(); let _ = self.0.unbounded_send(ServerCommand::Resume(tx)); - rx.map(|_| ()) + async { + let _ = rx.await; + } } /// Stop incoming connection processing, stop all workers and exit. @@ -75,7 +78,9 @@ impl Server { graceful, completion: Some(tx), }); - rx.map(|_| ()) + async { + let _ = rx.await; + } } } @@ -101,8 +106,7 @@ impl Future for Server { match Pin::new(this.1.as_mut().unwrap()).poll(cx) { Poll::Pending => Poll::Pending, - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - Poll::Ready(Err(_)) => Poll::Ready(Ok(())), + Poll::Ready(_) => Poll::Ready(Ok(())), } } } diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 46d76761..435f3eab 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -77,7 +77,7 @@ where fn call(&mut self, (guard, req): (Option, ServerMessage)) -> Self::Future { match req { ServerMessage::Connect(stream) => { - let stream = FromStream::from_mio_stream(stream).map_err(|e| { + let stream = FromStream::from_mio(stream).map_err(|e| { error!("Can not convert to an async tcp stream: {}", e); }); diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 69974b37..e72bd185 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -9,7 +9,6 @@ use std::os::unix::{ #[cfg(windows)] use std::os::windows::io::{FromRawSocket, IntoRawSocket}; -use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::net::TcpStream; #[cfg(unix)] use actix_rt::net::UnixStream; @@ -27,15 +26,17 @@ use mio::{Interest, Registry, Token}; pub(crate) enum StdListener { Tcp(StdTcpListener), - #[cfg(all(unix))] + #[cfg(unix)] Uds(StdUnixListener), } pub(crate) enum SocketAddr { Tcp(StdTcpSocketAddr), - #[cfg(all(unix))] + #[cfg(unix)] Uds(StdUdsSocketAddr), - #[cfg(all(unix))] + // this is a work around. mio would return different types of SocketAddr between accept and + // local_addr methods. + #[cfg(unix)] UdsMio(MioSocketAddr), } @@ -43,9 +44,9 @@ impl fmt::Display for SocketAddr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), - #[cfg(all(unix))] + #[cfg(unix)] SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), - #[cfg(all(unix))] + #[cfg(unix)] SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr), } } @@ -55,9 +56,9 @@ impl fmt::Debug for SocketAddr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), - #[cfg(all(unix))] + #[cfg(unix)] SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), - #[cfg(all(unix))] + #[cfg(unix)] SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr), } } @@ -67,7 +68,7 @@ impl fmt::Display for StdListener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), - #[cfg(all(unix))] + #[cfg(unix)] StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), } } @@ -77,7 +78,7 @@ impl StdListener { pub(crate) fn local_addr(&self) -> SocketAddr { match self { StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), - #[cfg(all(unix))] + #[cfg(unix)] StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()), } } @@ -87,13 +88,13 @@ impl StdListener { StdListener::Tcp(lst) => { // ToDo: is this non_blocking a good practice? lst.set_nonblocking(true)?; - Ok(MioSocketListener::Tcp(mio::net::TcpListener::from_std(lst))) + Ok(MioSocketListener::Tcp(MioTcpListener::from_std(lst))) } - #[cfg(all(unix))] + #[cfg(unix)] StdListener::Uds(lst) => { // ToDo: the same as above lst.set_nonblocking(true)?; - Ok(MioSocketListener::Uds(mio::net::UnixListener::from_std( + Ok(MioSocketListener::Uds(MioUnixListener::from_std( lst, ))) } @@ -104,13 +105,13 @@ impl StdListener { #[derive(Debug)] pub enum MioStream { Tcp(MioTcpStream), - #[cfg(all(unix))] + #[cfg(unix)] Uds(MioUnixStream), } pub(crate) enum MioSocketListener { Tcp(MioTcpListener), - #[cfg(all(unix))] + #[cfg(unix)] Uds(MioUnixListener), } @@ -120,7 +121,7 @@ impl MioSocketListener { MioSocketListener::Tcp(ref lst) => lst .accept() .map(|(stream, addr)| Some((MioStream::Tcp(stream), SocketAddr::Tcp(addr)))), - #[cfg(all(unix))] + #[cfg(unix)] MioSocketListener::Uds(ref lst) => lst .accept() .map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::UdsMio(addr)))), @@ -137,7 +138,7 @@ impl Source for MioSocketListener { ) -> io::Result<()> { match *self { MioSocketListener::Tcp(ref mut lst) => lst.register(registry, token, interests), - #[cfg(all(unix))] + #[cfg(unix)] MioSocketListener::Uds(ref mut lst) => lst.register(registry, token, interests), } } @@ -150,7 +151,7 @@ impl Source for MioSocketListener { ) -> io::Result<()> { match *self { MioSocketListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests), - #[cfg(all(unix))] + #[cfg(unix)] MioSocketListener::Uds(ref mut lst) => lst.reregister(registry, token, interests), } } @@ -158,7 +159,7 @@ impl Source for MioSocketListener { fn deregister(&mut self, registry: &Registry) -> io::Result<()> { match *self { MioSocketListener::Tcp(ref mut lst) => lst.deregister(registry), - #[cfg(all(unix))] + #[cfg(unix)] MioSocketListener::Uds(ref mut lst) => { let res = lst.deregister(registry); @@ -175,17 +176,19 @@ impl Source for MioSocketListener { } /// helper trait for converting mio stream to tokio stream. -pub trait FromStream: AsyncRead + AsyncWrite + Sized { - fn from_mio_stream(sock: MioStream) -> io::Result; +pub trait FromStream: Sized { + fn from_mio(sock: MioStream) -> io::Result; } // ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream #[cfg(unix)] impl FromStream for TcpStream { - fn from_mio_stream(sock: MioStream) -> io::Result { + fn from_mio(sock: MioStream) -> io::Result { match sock { MioStream::Tcp(mio) => { let raw = IntoRawFd::into_raw_fd(mio); + // # Safety: + // This is a in place conversion from mio stream to tokio stream. TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) } MioStream::Uds(_) => { @@ -198,23 +201,28 @@ impl FromStream for TcpStream { // ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream #[cfg(windows)] impl FromStream for TcpStream { - fn from_mio_stream(sock: MioStream) -> io::Result { + fn from_mio(sock: MioStream) -> io::Result { match sock { MioStream::Tcp(mio) => { let raw = IntoRawSocket::into_raw_socket(mio); + // # Safety: + // This is a in place conversion from mio stream to tokio stream. TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) } } } } +// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream #[cfg(unix)] impl FromStream for UnixStream { - fn from_mio_stream(sock: MioStream) -> io::Result { + fn from_mio(sock: MioStream) -> io::Result { match sock { MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), MioStream::Uds(mio) => { let raw = IntoRawFd::into_raw_fd(mio); + // # Safety: + // This is a in place conversion from mio stream to tokio stream. UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) } } diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index eafa1b81..544e05fb 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -49,7 +49,7 @@ impl WakerQueue { /// /// *. These interests should not be confused with `mio::Interest` and mostly not I/O related pub(crate) enum WakerInterest { - /// Interest from `Worker` notifying `Accept` to run `backpressure` method + /// Interest from `Worker` notifying `Accept` to run `maybe_backpressure` method Notify, /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to /// `ServerCommand` and notify `Accept` to do exactly these tasks. @@ -57,7 +57,7 @@ pub(crate) enum WakerInterest { Resume, Stop, /// `Timer` is an interest sent as a delayed future. When an error happens on accepting - /// connection the poll would deregister sockets temporary and wake up the poll and register + /// connection `Accept` would deregister sockets temporary and wake up the poll and register /// them again after the delayed future resolve. Timer, /// `Worker` ins an interest happen after a worker runs into faulted state(This is determined by diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 8fce670b..e122d87d 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; -use std::time; +use std::time::Duration; use actix_rt::time::{sleep_until, Instant, Sleep}; use actix_rt::{spawn, Arbiter}; @@ -134,7 +134,7 @@ pub(crate) struct Worker { conns: Counter, factories: Vec>, state: WorkerState, - shutdown_timeout: time::Duration, + shutdown_timeout: Duration, } struct WorkerService { @@ -165,61 +165,58 @@ impl Worker { idx: usize, factories: Vec>, availability: WorkerAvailability, - shutdown_timeout: time::Duration, + shutdown_timeout: Duration, ) -> WorkerClient { let (tx1, rx) = unbounded(); let (tx2, rx2) = unbounded(); let avail = availability.clone(); - Arbiter::new().send( - async move { - availability.set(false); - let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { - rx, - rx2, - availability, - factories, - shutdown_timeout, - services: Vec::new(), - conns: conns.clone(), - state: WorkerState::Unavailable(Vec::new()), - }); + Arbiter::new().send(Box::pin(async move { + availability.set(false); + let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { + rx, + rx2, + availability, + factories, + shutdown_timeout, + services: Vec::new(), + conns: conns.clone(), + state: WorkerState::Unavailable(Vec::new()), + }); - let mut fut: Vec, _>> = Vec::new(); - for (idx, factory) in wrk.factories.iter().enumerate() { - fut.push(factory.create().map_ok(move |r| { - r.into_iter() - .map(|(t, s): (Token, _)| (idx, t, s)) - .collect::>() - })); - } + let mut fut: Vec, _>> = Vec::new(); + for (idx, factory) in wrk.factories.iter().enumerate() { + fut.push(factory.create().map_ok(move |r| { + r.into_iter() + .map(|(t, s): (Token, _)| (idx, t, s)) + .collect::>() + })); + } - spawn(async move { - let res = join_all(fut).await; - let res: Result, _> = res.into_iter().collect(); - match res { - Ok(services) => { - for item in services { - for (factory, token, service) in item { - assert_eq!(token.0, wrk.services.len()); - wrk.services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - } + spawn(async move { + let res = join_all(fut).await; + let res: Result, _> = res.into_iter().collect(); + match res { + Ok(services) => { + for item in services { + for (factory, token, service) in item { + assert_eq!(token.0, wrk.services.len()); + wrk.services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); } } - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - } } - wrk.await - }); - } - .boxed(), - ); + Err(e) => { + error!("Can not start worker: {:?}", e); + Arbiter::current().stop(); + } + } + wrk.await + }); + })); WorkerClient::new(idx, tx1, tx2, avail) } @@ -330,7 +327,7 @@ impl Future for Worker { if num != 0 { info!("Graceful worker shutdown, {} connections", num); self.state = WorkerState::Shutdown( - sleep_until(Instant::now() + time::Duration::from_secs(1)), + sleep_until(Instant::now() + Duration::from_secs(1)), sleep_until(Instant::now() + self.shutdown_timeout), Some(result), ); @@ -437,7 +434,7 @@ impl Future for Worker { // sleep for 1 second and then check again if Pin::new(&mut *t1).poll(cx).is_ready() { - *t1 = sleep_until(Instant::now() + time::Duration::from_secs(1)); + *t1 = sleep_until(Instant::now() + Duration::from_secs(1)); let _ = Pin::new(t1).poll(cx); } @@ -445,7 +442,7 @@ impl Future for Worker { } WorkerState::Available => { loop { - match Pin::new(&mut self.rx).poll_next(cx) { + return match Pin::new(&mut self.rx).poll_next(cx) { // handle incoming io stream Poll::Ready(Some(WorkerCommand(msg))) => { match self.check_readiness(cx) { @@ -476,13 +473,13 @@ impl Future for Worker { ); } } - return self.poll(cx); + self.poll(cx) } Poll::Pending => { self.state = WorkerState::Available; - return Poll::Pending; + Poll::Pending } - Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(None) => Poll::Ready(()), } } }