diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 8b6f92e2..18aa97d4 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -23,11 +23,10 @@ actix-service = "2.0.0-beta.5" actix-utils = "3.0.0-beta.4" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } -futures-util = { version = "0.3.7", default-features = false } log = "0.4" -mio = { version = "0.7.11", features = ["os-poll", "net", "os-util"] } +mio = { version = "0.7.11", features = ["os-poll", "net"] } num_cpus = "1.13" -rtrb = "" +rtrb = "0.1" slab = "0.4" tokio = { version = "1.2", features = ["sync"] } diff --git a/actix-server/src/spsc.rs b/actix-server/src/spsc.rs index fe6656ac..6de5a76b 100644 --- a/actix-server/src/spsc.rs +++ b/actix-server/src/spsc.rs @@ -3,7 +3,7 @@ use std::{ task::{Context, Poll}, }; -use futures_util::task::AtomicWaker; +use futures_core::task::__internal::AtomicWaker; use rtrb::{Consumer, Producer, PushError, RingBuffer}; pub fn channel(cap: usize) -> (Sender, Receiver) { @@ -52,3 +52,23 @@ impl Receiver { } } } + +#[cfg(test)] +mod test { + use super::*; + + use actix_utils::future::poll_fn; + + #[actix_rt::test] + async fn test_send() { + let (mut tx, mut rx) = channel::(1); + + let res = poll_fn(|cx| Poll::Ready(rx.poll_recv_unpin(cx))).await; + assert!(res.is_pending()); + + tx.send(996).unwrap(); + + let res = poll_fn(|cx| rx.poll_recv_unpin(cx)).await; + assert_eq!(res, 996); + } +} diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 816aff33..e2f158e9 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -22,7 +22,7 @@ use tokio::sync::oneshot; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; -use crate::spsc::{channel, Receiver, Sender}; +use crate::spsc; use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::{join_all, Token}; @@ -41,17 +41,17 @@ pub(crate) struct Conn { fn handle_pair( idx: usize, - tx1: Sender, - tx2: Sender, + tx_accept: spsc::Sender, + tx_server: spsc::Sender, avail: WorkerAvailability, ) -> (WorkerHandleAccept, WorkerHandleServer) { let accept = WorkerHandleAccept { idx, - tx: tx1, + tx: tx_accept, avail, }; - let server = WorkerHandleServer { idx, tx: tx2 }; + let server = WorkerHandleServer { idx, tx: tx_server }; (accept, server) } @@ -62,7 +62,7 @@ fn handle_pair( /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { pub idx: usize, - tx: Sender, + tx: spsc::Sender, avail: WorkerAvailability, } @@ -81,7 +81,7 @@ impl WorkerHandleAccept { /// Held by [ServerBuilder](crate::builder::ServerBuilder). pub(crate) struct WorkerHandleServer { pub idx: usize, - tx: Sender, + tx: spsc::Sender, } impl WorkerHandleServer { @@ -123,8 +123,8 @@ impl WorkerAvailability { /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. pub(crate) struct ServerWorker { - rx: Receiver, - rx2: Receiver, + rx_accept: spsc::Receiver, + rx_server: spsc::Receiver, services: Vec, availability: WorkerAvailability, conns: Counter, @@ -198,8 +198,8 @@ impl ServerWorker { availability: WorkerAvailability, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { - let (tx1, rx) = channel(backlog as _); - let (tx2, rx2) = channel(1); + let (tx_accept, rx_accept) = spsc::channel(backlog as _); + let (tx_server, rx_server) = spsc::channel(1); let avail = availability.clone(); // every worker runs in it's own arbiter. @@ -214,8 +214,8 @@ impl ServerWorker { .spawn(async move { availability.set(false); let mut wrk = ServerWorker { - rx, - rx2, + rx_accept, + rx_server, services: Vec::new(), availability, conns: Counter::new(config.max_concurrent_connections), @@ -264,7 +264,7 @@ impl ServerWorker { }); }); - handle_pair(idx, tx1, tx2, avail) + handle_pair(idx, tx_accept, tx_server, avail) } fn restart_service(&mut self, token: Token, factory_id: usize) { @@ -370,7 +370,7 @@ impl Future for ServerWorker { let this = self.as_mut().get_mut(); // `StopWorker` message handler - if let Poll::Ready(Stop { graceful, tx }) = this.rx2.poll_recv_unpin(cx) { + if let Poll::Ready(Stop { graceful, tx }) = this.rx_server.poll_recv_unpin(cx) { this.availability.set(false); let num = this.conns.total(); if num == 0 { @@ -465,7 +465,7 @@ impl Future for ServerWorker { WorkerState::Available => loop { match this.check_readiness(cx) { Ok(true) => { - let msg = ready!(this.rx.poll_recv_unpin(cx)); + let msg = ready!(this.rx_accept.poll_recv_unpin(cx)); // handle incoming io stream let guard = this.conns.get(); let _ = this.services[msg.token.0].service.call((guard, msg.io));