diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index ef01f5a7..2db40b2b 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -10,7 +10,7 @@ use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::worker::{self, ServerWorkerConfig}; +use crate::worker::ServerWorkerConfig; use crate::Token; /// Server builder @@ -100,8 +100,8 @@ impl ServerBuilder { /// reached for each worker. /// /// By default max connections is set to a 25k per worker. - pub fn maxconn(self, num: usize) -> Self { - worker::max_concurrent_connections(num); + pub fn maxconn(mut self, num: usize) -> Self { + self.worker_config.max_concurrent_connections(num); self } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 07128dda..9efb167f 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -210,11 +210,15 @@ impl Server { self.worker_config, ); - if let Ok(handle) = res { - self.handles.push((new_idx, handle.clone())); - self.waker_queue.wake(WakerInterest::Worker(handle)); + match res { + Ok(handle) => { + self.handles.push((new_idx, handle.clone())); + self.waker_queue.wake(WakerInterest::Worker(handle)); + } + Err(e) => error!("Can not start worker: {:?}", e), } } + None } } diff --git a/actix-server/src/spsc.rs b/actix-server/src/spsc.rs new file mode 100644 index 00000000..21c1c25c --- /dev/null +++ b/actix-server/src/spsc.rs @@ -0,0 +1,477 @@ +use std::{ + cell::{Cell, UnsafeCell}, + fmt, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; + +#[derive(Debug)] +#[repr(align(64))] +struct ProducerCacheline { + /// The bounded size as specified by the user. + capacity: usize, + + /// Index position of current tail + tail: AtomicUsize, + shadow_head: Cell, + /// Id == 0 : never connected + /// Id == usize::MAX: disconnected + consumer_id: AtomicUsize, +} + +#[derive(Debug)] +#[repr(align(64))] +struct ConsumerCacheline { + /// The bounded size as specified by the user. + capacity: usize, + + /// Index position of the current head + head: AtomicUsize, + shadow_tail: Cell, + /// Id == 0 : never connected + /// Id == usize::MAX: disconnected + producer_id: AtomicUsize, +} + +/// The internal memory buffer used by the queue. +/// +/// Buffer holds a pointer to allocated memory which represents the bounded +/// ring buffer, as well as a head and tail atomicUsize which the producer and +/// consumer use to track location in the ring. +#[repr(C)] +pub(crate) struct Buffer { + buffer_storage: Vec>, + + pcache: ProducerCacheline, + ccache: ConsumerCacheline, +} + +impl fmt::Debug for Buffer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let head = self.ccache.head.load(Ordering::Relaxed); + let tail = self.pcache.tail.load(Ordering::Relaxed); + let shead = self.pcache.shadow_head.get(); + let stail = self.ccache.shadow_tail.get(); + let id_to_str = |id| match id { + 0 => "not connected".into(), + usize::MAX => "disconnected".into(), + x => format!("{}", x), + }; + + let consumer_id = id_to_str(self.pcache.consumer_id.load(Ordering::Relaxed)); + let producer_id = id_to_str(self.ccache.producer_id.load(Ordering::Relaxed)); + + f.debug_struct("SPSC Buffer") + .field("capacity:", &self.ccache.capacity) + .field("consumer_head:", &head) + .field("shadow_head:", &shead) + .field("producer_tail:", &tail) + .field("shadow_tail:", &stail) + .field("consumer_id:", &consumer_id) + .field("producer_id:", &producer_id) + .finish() + } +} + +unsafe impl Sync for Buffer {} + +/// A handle to the queue which allows consuming values from the buffer +pub(crate) struct Consumer { + pub(crate) buffer: Arc>, +} + +impl Clone for Consumer { + fn clone(&self) -> Self { + Consumer { + buffer: self.buffer.clone(), + } + } +} + +/// A handle to the queue which allows adding values onto the buffer +pub(crate) struct Producer { + pub(crate) buffer: Arc>, +} + +impl Clone for Producer { + fn clone(&self) -> Self { + Producer { + buffer: self.buffer.clone(), + } + } +} + +impl fmt::Debug for Consumer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Consumer {:?}", self.buffer) + } +} + +impl fmt::Debug for Producer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Producer {:?}", self.buffer) + } +} + +unsafe impl Send for Consumer {} +unsafe impl Send for Producer {} + +impl Buffer { + /// Attempt to pop a value off the buffer. + /// + /// If the buffer is empty, this method will not block. Instead, it will + /// return `None` signifying the buffer was empty. The caller may then + /// decide what to do next (e.g. spin-wait, sleep, process something + /// else, etc) + fn try_pop(&self) -> Option { + let current_head = self.ccache.head.load(Ordering::Relaxed); + + if current_head == self.ccache.shadow_tail.get() { + self.ccache + .shadow_tail + .set(self.pcache.tail.load(Ordering::Acquire)); + if current_head == self.ccache.shadow_tail.get() { + return None; + } + } + + let resp = unsafe { + self.buffer_storage[current_head % self.ccache.capacity] + .get() + .read() + }; + self.ccache.head.store(current_head + 1, Ordering::Release); + Some(resp) + } + + /// Attempt to push a value onto the buffer. + /// + /// If the buffer is full, this method will not block. Instead, it will + /// return `Some(v)`, where `v` was the value attempting to be pushed + /// onto the buffer. If the value was successfully pushed onto the + /// buffer, `None` will be returned signifying success. + fn try_push(&self, v: T) -> Option { + if self.consumer_disconnected() { + return Some(v); + } + let current_tail = self.pcache.tail.load(Ordering::Relaxed); + + if self.pcache.shadow_head.get() + self.pcache.capacity <= current_tail { + self.pcache + .shadow_head + .set(self.ccache.head.load(Ordering::Acquire)); + if self.pcache.shadow_head.get() + self.pcache.capacity <= current_tail { + return Some(v); + } + } + + unsafe { + self.buffer_storage[current_tail % self.pcache.capacity] + .get() + .write(v); + } + self.pcache.tail.store(current_tail + 1, Ordering::Release); + None + } + + /// Disconnects the consumer, and returns whether or not it was already + /// disconnected + pub(crate) fn disconnect_consumer(&self) -> bool { + self.pcache.consumer_id.swap(usize::MAX, Ordering::Release) == usize::MAX + } + + /// Disconnects the consumer, and returns whether or not it was already + /// disconnected + pub(crate) fn disconnect_producer(&self) -> bool { + self.ccache.producer_id.swap(usize::MAX, Ordering::Release) == usize::MAX + } + + /// Disconnects the consumer, and returns whether or not it was already + /// disconnected + pub(crate) fn producer_disconnected(&self) -> bool { + self.ccache.producer_id.load(Ordering::Acquire) == usize::MAX + } + + /// Disconnects the consumer, and returns whether or not it was already + /// disconnected + pub(crate) fn consumer_disconnected(&self) -> bool { + self.pcache.consumer_id.load(Ordering::Acquire) == usize::MAX + } + + /// Returns the current size of the queue + /// + /// This value represents the current size of the queue. This value can be + /// from 0-`capacity` inclusive. + pub(crate) fn size(&self) -> usize { + self.pcache.tail.load(Ordering::Acquire) - self.ccache.head.load(Ordering::Acquire) + } +} + +/// Handles deallocation of heap memory when the buffer is dropped +impl Drop for Buffer { + fn drop(&mut self) { + // Pop the rest of the values off the queue. By moving them into this scope, + // we implicitly call their destructor + while self.try_pop().is_some() {} + // We don't want to run any destructors here, because we didn't run + // any of the constructors through the vector. And whatever object was + // in fact still alive we popped above. + unsafe { + self.buffer_storage.set_len(0); + } + } +} + +pub(crate) fn make(capacity: usize) -> (Producer, Consumer) { + inner_make(capacity, 0) +} + +fn inner_make(capacity: usize, initial_value: usize) -> (Producer, Consumer) { + let buffer_storage = allocate_buffer(capacity); + + let arc = Arc::new(Buffer { + buffer_storage, + ccache: ConsumerCacheline { + capacity, + + head: AtomicUsize::new(initial_value), + shadow_tail: Cell::new(initial_value), + producer_id: AtomicUsize::new(0), + }, + pcache: ProducerCacheline { + capacity, + + tail: AtomicUsize::new(initial_value), + shadow_head: Cell::new(initial_value), + consumer_id: AtomicUsize::new(0), + }, + }); + + ( + Producer { + buffer: arc.clone(), + }, + Consumer { buffer: arc }, + ) +} + +fn allocate_buffer(capacity: usize) -> Vec> { + let size = capacity.next_power_of_two(); + let mut vec = Vec::with_capacity(size); + unsafe { + vec.set_len(size); + } + vec +} + +pub(crate) trait BufferHalf { + type Item; + + fn buffer(&self) -> &Buffer; + fn connect(&self, id: usize); + fn peer_id(&self) -> usize; + + /// Returns the total capacity of this queue + /// + /// This value represents the total capacity of the queue when it is full. + /// It does not represent the current usage. For that, call `size()`. + fn capacity(&self) -> usize; + + /// Returns the current size of the queue + /// + /// This value represents the current size of the queue. This value can be + /// from 0-`capacity` inclusive. + fn size(&self) -> usize { + self.buffer().size() + } +} + +impl BufferHalf for Producer { + type Item = T; + fn buffer(&self) -> &Buffer { + &*self.buffer + } + + fn capacity(&self) -> usize { + (*self.buffer).pcache.capacity + } + + fn connect(&self, id: usize) { + assert_ne!(id, 0); + assert_ne!(id, usize::MAX); + (*self.buffer) + .ccache + .producer_id + .store(id, Ordering::Release); + } + + fn peer_id(&self) -> usize { + (*self.buffer).pcache.consumer_id.load(Ordering::Acquire) + } +} + +impl Producer { + /// Attempt to push a value onto the buffer. + /// + /// This method does not block. If the queue is not full, the value will be + /// added to the queue and the method will return `None`, signifying + /// success. If the queue is full, this method will return `Some(v)``, + /// where `v` is your original value. + pub(crate) fn try_push(&self, v: T) -> Option { + (*self.buffer).try_push(v) + } + + /// Disconnects the producer, signaling to the consumer that no new values + /// are going to be produced. + /// + /// Returns the buffer status before the disconnect + pub(crate) fn disconnect(&self) -> bool { + (*self.buffer).disconnect_producer() + } + + pub(crate) fn consumer_disconnected(&self) -> bool { + (*self.buffer).consumer_disconnected() + } + + /// Returns the available space in the queue + /// + /// This value represents the number of items that can be pushed onto the + /// queue before it becomes full. + pub(crate) fn free_space(&self) -> usize { + self.capacity() - self.size() + } +} + +impl BufferHalf for Consumer { + type Item = T; + fn buffer(&self) -> &Buffer { + &(*self.buffer) + } + + fn connect(&self, id: usize) { + assert_ne!(id, usize::MAX); + assert_ne!(id, 0); + (*self.buffer) + .pcache + .consumer_id + .store(id, Ordering::Release); + } + + fn peer_id(&self) -> usize { + (*self.buffer).ccache.producer_id.load(Ordering::Acquire) + } + + fn capacity(&self) -> usize { + (*self.buffer).ccache.capacity + } +} + +impl Consumer { + /// Disconnects the consumer, signaling to the producer that no new values + /// are going to be consumed. After this is done, any attempt on the + /// producer to try_push should fail + /// + /// Returns the buffer status before the disconnect + pub(crate) fn disconnect(&self) -> bool { + (*self.buffer).disconnect_consumer() + } + + pub(crate) fn producer_disconnected(&self) -> bool { + (*self.buffer).producer_disconnected() + } + + /// Attempt to pop a value off the queue. + /// + /// This method does not block. If the queue is empty, the method will + /// return `None`. If there is a value available, the method will + /// return `Some(v)`, where `v` is the value being popped off the queue. + pub(crate) fn try_pop(&self) -> Option { + (*self.buffer).try_pop() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::thread; + + #[test] + fn test_try_push() { + let (p, _) = super::make(10); + + for i in 0..10 { + p.try_push(i); + assert!(p.capacity() == 10); + assert!(p.size() == i + 1); + } + + match p.try_push(10) { + Some(v) => { + assert!(v == 10); + } + None => assert!(false, "Queue should not have accepted another write!"), + } + } + + #[test] + fn test_try_poll() { + let (p, c) = super::make(10); + + match c.try_pop() { + Some(_) => assert!(false, "Queue was empty but a value was read!"), + None => {} + } + + p.try_push(123); + + match c.try_pop() { + Some(v) => assert!(v == 123), + None => assert!(false, "Queue was not empty but poll() returned nothing!"), + } + + match c.try_pop() { + Some(_) => assert!(false, "Queue was empty but a value was read!"), + None => {} + } + } + + #[test] + fn test_threaded() { + let (p, c) = super::make(500); + + thread::spawn(move || { + for i in 0..100000 { + loop { + if let None = p.try_push(i) { + break; + } + } + } + }); + + for i in 0..100000 { + loop { + if let Some(t) = c.try_pop() { + assert!(t == i); + break; + } + } + } + } + + #[should_panic] + #[test] + fn test_wrap() { + let (p, c) = super::inner_make(10, usize::MAX - 1); + + for i in 0..10 { + assert_eq!(p.try_push(i).is_none(), true); + } + + for i in 0..10 { + assert_eq!(c.try_pop(), Some(i)); + } + } +} \ No newline at end of file diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 6229407d..1c05615f 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,21 +1,27 @@ -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::task::{Context, Poll}; -use std::thread; -use std::time::Duration; +use std::{ + future::Future, + io, mem, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + task::{Context, Poll}, + thread, + time::Duration, +}; use actix_rt::{ - time::{sleep, Sleep}, + time::{sleep, Instant, Sleep}, System, }; use actix_utils::counter::Counter; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot, +}; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; @@ -28,7 +34,7 @@ pub(crate) struct WorkerCommand(Conn); /// and `false` if some connections still alive. pub(crate) struct StopCommand { graceful: bool, - result: oneshot::Sender, + tx: oneshot::Sender, } #[derive(Debug)] @@ -37,27 +43,6 @@ pub(crate) struct Conn { pub token: Token, } -static MAX_CONNS: AtomicUsize = AtomicUsize::new(25600); - -/// Sets the maximum per-worker number of concurrent connections. -/// -/// All socket listeners will stop accepting connections when this limit is -/// reached for each worker. -/// -/// By default max connections is set to a 25k per worker. -pub fn max_concurrent_connections(num: usize) { - MAX_CONNS.store(num, Ordering::Relaxed); -} - -thread_local! { - static MAX_CONNS_COUNTER: Counter = - Counter::new(MAX_CONNS.load(Ordering::Relaxed)); -} - -pub(crate) fn num_connections() -> usize { - MAX_CONNS_COUNTER.with(|conns| conns.total()) -} - // a handle to worker that can send message to worker and share the availability of worker to other // thread. #[derive(Clone)] @@ -92,8 +77,8 @@ impl WorkerHandle { } pub fn stop(&self, graceful: bool) -> oneshot::Receiver { - let (result, rx) = oneshot::channel(); - let _ = self.tx2.send(StopCommand { graceful, result }); + let (tx, rx) = oneshot::channel(); + let _ = self.tx2.send(StopCommand { graceful, tx }); rx } } @@ -136,7 +121,7 @@ pub(crate) struct ServerWorker { conns: Counter, factories: Vec>, state: WorkerState, - config: ServerWorkerConfig, + shutdown_timeout: Duration, } struct WorkerService { @@ -167,6 +152,7 @@ enum WorkerServiceStatus { pub(crate) struct ServerWorkerConfig { shutdown_timeout: Duration, max_blocking_threads: usize, + max_concurrent_connections: usize, } impl Default for ServerWorkerConfig { @@ -176,6 +162,7 @@ impl Default for ServerWorkerConfig { Self { shutdown_timeout: Duration::from_secs(30), max_blocking_threads, + max_concurrent_connections: 25600, } } } @@ -185,6 +172,10 @@ impl ServerWorkerConfig { self.max_blocking_threads = num; } + pub(crate) fn max_concurrent_connections(&mut self, num: usize) { + self.max_concurrent_connections = num; + } + pub(crate) fn shutdown_timeout(&mut self, dur: Duration) { self.shutdown_timeout = dur; } @@ -217,16 +208,16 @@ impl ServerWorker { System::set_current(system); } - let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker { + let mut wrk = ServerWorker { rx, rx2, - availability, - factories, - config, services: Vec::new(), - conns: conns.clone(), - state: WorkerState::Unavailable, - }); + availability, + conns: Counter::new(config.max_concurrent_connections), + factories, + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, + }; // use a custom tokio runtime builder to change the settings of runtime. let local = tokio::task::LocalSet::new(); @@ -276,11 +267,15 @@ impl ServerWorker { .unwrap_or_else(|| Ok(WorkerHandle::new(idx, tx1, tx2, avail))) } - fn restart_service(&mut self, token: Token, idx: usize) { - let factory = &self.factories[idx]; + fn restart_service(&mut self, token: Token, factory_id: usize) { + let factory = &self.factories[factory_id]; trace!("Service {:?} failed, restarting", factory.name(token)); self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = WorkerState::Restarting(idx, token, factory.create()); + self.state = WorkerState::Restarting(Restart { + factory_id, + token, + fut: factory.create(), + }); } fn shutdown(&mut self, force: bool) { @@ -346,66 +341,87 @@ impl ServerWorker { enum WorkerState { Available, Unavailable, - Restarting( - usize, - Token, - LocalBoxFuture<'static, Result, ()>>, - ), - Shutdown( - Pin>, - Pin>, - Option>, - ), + Restarting(Restart), + Shutdown(Shutdown), +} + +struct Restart { + factory_id: usize, + token: Token, + fut: LocalBoxFuture<'static, Result, ()>>, +} + +// Shutdown keep states necessary for server shutdown: +// Sleep for interval check the shutdown progress. +// Instant for the start time of shutdown. +// Sender for send back the shutdown outcome(force/grace) to StopCommand caller. +struct Shutdown { + timer: Pin>, + start_from: Instant, + tx: oneshot::Sender, +} + +impl Default for WorkerState { + fn default() -> Self { + Self::Unavailable + } } impl Future for ServerWorker { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().get_mut(); + // `StopWorker` message handler - if let Poll::Ready(Some(StopCommand { graceful, result })) = - Pin::new(&mut self.rx2).poll_recv(cx) + if let Poll::Ready(Some(StopCommand { graceful, tx })) = + Pin::new(&mut this.rx2).poll_recv(cx) { - self.availability.set(false); - let num = num_connections(); + this.availability.set(false); + let num = this.conns.total(); if num == 0 { info!("Shutting down worker, 0 connections"); - let _ = result.send(true); + let _ = tx.send(true); return Poll::Ready(()); } else if graceful { - self.shutdown(false); info!("Graceful worker shutdown, {} connections", num); - self.state = WorkerState::Shutdown( - Box::pin(sleep(Duration::from_secs(1))), - Box::pin(sleep(self.config.shutdown_timeout)), - Some(result), - ); + this.shutdown(false); + + this.state = WorkerState::Shutdown(Shutdown { + timer: Box::pin(sleep(Duration::from_secs(1))), + start_from: Instant::now(), + tx, + }); } else { info!("Force shutdown worker, {} connections", num); - self.shutdown(true); - let _ = result.send(false); + this.shutdown(true); + + let _ = tx.send(false); return Poll::Ready(()); } } - match self.state { - WorkerState::Unavailable => match self.check_readiness(cx) { + match this.state { + WorkerState::Unavailable => match this.check_readiness(cx) { Ok(true) => { - self.state = WorkerState::Available; - self.availability.set(true); + this.state = WorkerState::Available; + this.availability.set(true); self.poll(cx) } Ok(false) => Poll::Pending, Err((token, idx)) => { - self.restart_service(token, idx); + this.restart_service(token, idx); self.poll(cx) } }, - WorkerState::Restarting(idx, token, ref mut fut) => { - let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { + WorkerState::Restarting(ref mut restart) => { + let factory_id = restart.factory_id; + let token = restart.token; + + let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| { panic!( "Can not restart {:?} service", - self.factories[idx].name(token) + this.factories[factory_id].name(token) ) }); @@ -417,58 +433,59 @@ impl Future for ServerWorker { trace!( "Service {:?} has been restarted", - self.factories[idx].name(token) + this.factories[factory_id].name(token) ); - self.services[token.0].created(service); - self.state = WorkerState::Unavailable; + this.services[token.0].created(service); + this.state = WorkerState::Unavailable; self.poll(cx) } - WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => { - let num = num_connections(); - if num == 0 { - let _ = tx.take().unwrap().send(true); - return Poll::Ready(()); - } + WorkerState::Shutdown(ref mut shutdown) => { + // Wait for 1 second. + ready!(shutdown.timer.as_mut().poll(cx)); - // check graceful timeout - if Pin::new(t2).poll(cx).is_ready() { - let _ = tx.take().unwrap().send(false); - self.shutdown(true); - return Poll::Ready(()); + if this.conns.total() == 0 { + // Graceful shutdown. + if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { + let _ = shutdown.tx.send(true); + } + Poll::Ready(()) + } else if shutdown.start_from.elapsed() >= this.shutdown_timeout { + // Timeout forceful shutdown. + if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) { + let _ = shutdown.tx.send(false); + } + Poll::Ready(()) + } else { + // Reset timer and wait for 1 second. + let time = Instant::now() + Duration::from_secs(1); + shutdown.timer.as_mut().reset(time); + shutdown.timer.as_mut().poll(cx) } - - // sleep for 1 second and then check again - if t1.as_mut().poll(cx).is_ready() { - *t1 = Box::pin(sleep(Duration::from_secs(1))); - let _ = t1.as_mut().poll(cx); - } - - Poll::Pending } // actively poll stream and handle worker command WorkerState::Available => loop { - match self.check_readiness(cx) { + match this.check_readiness(cx) { Ok(true) => {} Ok(false) => { trace!("Worker is unavailable"); - self.availability.set(false); - self.state = WorkerState::Unavailable; + this.availability.set(false); + this.state = WorkerState::Unavailable; return self.poll(cx); } Err((token, idx)) => { - self.restart_service(token, idx); - self.availability.set(false); + this.restart_service(token, idx); + this.availability.set(false); return self.poll(cx); } } - match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { // handle incoming io stream Some(WorkerCommand(msg)) => { - let guard = self.conns.get(); - let _ = self.services[msg.token.0].service.call((guard, msg.io)); + let guard = this.conns.get(); + let _ = this.services[msg.token.0].service.call((guard, msg.io)); } None => return Poll::Ready(()), };