diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 2ebed1ca..1d608719 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -9,7 +9,9 @@ use crate::builder::ServerBuilder; use crate::server::ServerHandle; use crate::socket::{MioListener, SocketAddr}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; -use crate::worker::{Conn, ServerWorker, WorkerAvailability, WorkerHandle}; +use crate::worker::{ + Conn, ServerWorker, WorkerAvailability, WorkerHandleAccept, WorkerHandleServer, +}; use crate::Token; const DUR_ON_ERR: Duration = Duration::from_millis(500); @@ -32,7 +34,7 @@ struct ServerSocketInfo { pub(crate) struct Accept { poll: Poll, waker_queue: WakerQueue, - handles: Vec, + handles: Vec, srv: ServerHandle, next: usize, backpressure: bool, @@ -58,7 +60,7 @@ impl Accept { pub(crate) fn start( sockets: Vec<(Token, MioListener)>, builder: &ServerBuilder, - ) -> io::Result<(WakerQueue, Vec<(usize, WorkerHandle)>)> { + ) -> io::Result<(WakerQueue, Vec<(usize, WorkerHandleServer)>)> { let server_handle = ServerHandle::new(builder.cmd_tx.clone()); // construct poll instance and it's waker @@ -66,15 +68,14 @@ impl Accept { let waker_queue = WakerQueue::new(poll.registry())?; // construct workers and collect handles. - let (handles, handles_clone) = (0..builder.threads) + let (handles_accept, handles_server) = (0..builder.threads) .map(|idx| { // start workers let availability = WorkerAvailability::new(waker_queue.clone()); let factories = builder.services.iter().map(|v| v.clone_factory()).collect(); - let handle = + let (handle_accept, handle_server) = ServerWorker::start(idx, factories, availability, builder.worker_config)?; - let handle_clone = (idx, handle.clone()); - Ok((handle, handle_clone)) + Ok((handle_accept, (idx, handle_server))) }) .collect::, io::Error>>()? .into_iter() @@ -82,8 +83,13 @@ impl Accept { let wake_queue_clone = waker_queue.clone(); - let (mut accept, sockets) = - Accept::new_with_sockets(poll, wake_queue_clone, sockets, handles, server_handle)?; + let (mut accept, sockets) = Accept::new_with_sockets( + poll, + wake_queue_clone, + sockets, + handles_accept, + server_handle, + )?; // Accept runs in its own thread. thread::Builder::new() @@ -92,14 +98,14 @@ impl Accept { .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; // return waker and worker handle clones to server builder. - Ok((waker_queue, handles_clone)) + Ok((waker_queue, handles_server)) } fn new_with_sockets( poll: Poll, waker_queue: WakerQueue, socks: Vec<(Token, MioListener)>, - handles: Vec, + handles: Vec, srv: ServerHandle, ) -> io::Result<(Accept, Slab)> { let mut sockets = Slab::new(); @@ -393,7 +399,7 @@ impl Accept { } fn accept(&mut self, sockets: &mut Slab, token: usize) { - while !self.backpressure { + loop { let info = sockets .get_mut(token) .expect("ServerSocketInfo is removed from Slab"); diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 2db40b2b..2791121d 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -8,8 +8,9 @@ use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::config::{ConfiguredService, ServiceConfig}; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; -use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::socket::{MioTcpListener, MioTcpSocket}; +use crate::socket::{ + MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, +}; use crate::worker::ServerWorkerConfig; use crate::Token; diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index e85ba1dd..f7140079 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -19,14 +19,14 @@ use crate::builder::ServerBuilder; use crate::service::InternalServiceFactory; use crate::signals::{Signal, Signals}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle}; +use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleServer}; /// When awaited or spawned would listen to signal and message from [ServerHandle](ServerHandle). #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Server { cmd_tx: UnboundedSender, cmd_rx: UnboundedReceiver, - handles: Vec<(usize, WorkerHandle)>, + handles: Vec<(usize, WorkerHandleServer)>, services: Vec>, notify: Vec>, exit: bool, @@ -216,9 +216,9 @@ impl Server { ); match res { - Ok(handle) => { - self.handles.push((new_idx, handle.clone())); - self.waker_queue.wake(WakerInterest::Worker(handle)); + Ok((handle_accept, handle_server)) => { + self.handles.push((new_idx, handle_server)); + self.waker_queue.wake(WakerInterest::Worker(handle_accept)); } Err(e) => error!("Can not start worker: {:?}", e), } diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 9ac101b3..b8875078 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -6,7 +6,7 @@ use std::{ use mio::{Registry, Token as MioToken, Waker}; -use crate::worker::WorkerHandle; +use crate::worker::WorkerHandleAccept; /// Waker token for `mio::Poll` instance. pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); @@ -80,6 +80,6 @@ pub(crate) enum WakerInterest { Stop, /// `Worker` is an interest happen after a worker runs into faulted state(This is determined /// by if work can be sent to it successfully).`Accept` would be waked up and add the new - /// `WorkerHandle`. - Worker(WorkerHandle), + /// `WorkerHandleAccept`. + Worker(WorkerHandleAccept), } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 1c3d78b5..fa5d22dc 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -28,11 +28,9 @@ use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::Token; -pub(crate) struct WorkerCommand(Conn); - -/// Stop worker message. Returns `true` on successful shutdown -/// and `false` if some connections still alive. -pub(crate) struct StopCommand { +/// Stop worker message. Returns `true` on successful graceful shutdown. +/// and `false` if some connections still alive when shutdown execute. +struct Stop { graceful: bool, tx: oneshot::Sender, } @@ -43,42 +41,55 @@ pub(crate) struct Conn { pub token: Token, } -// a handle to worker that can send message to worker and share the availability of worker to other -// thread. -#[derive(Clone)] -pub(crate) struct WorkerHandle { +fn handle_pair( + idx: usize, + tx1: UnboundedSender, + tx2: UnboundedSender, + avail: WorkerAvailability, +) -> (WorkerHandleAccept, WorkerHandleServer) { + let accept = WorkerHandleAccept { + idx, + tx: tx1, + avail, + }; + + let server = WorkerHandleServer { idx, tx: tx2 }; + + (accept, server) +} + +/// Handle to worker that can send connection message to worker and share the +/// availability of worker to other thread. +/// +/// Held by [Accept](crate::accept::Accept). +pub(crate) struct WorkerHandleAccept { pub idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, + tx: UnboundedSender, avail: WorkerAvailability, } -impl WorkerHandle { - pub fn new( - idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, - avail: WorkerAvailability, - ) -> Self { - WorkerHandle { - idx, - tx1, - tx2, - avail, - } +impl WorkerHandleAccept { + pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { + self.tx.send(msg).map_err(|msg| msg.0) } - pub fn send(&self, msg: Conn) -> Result<(), Conn> { - self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0) - } - - pub fn available(&self) -> bool { + pub(crate) fn available(&self) -> bool { self.avail.available() } +} - pub fn stop(&self, graceful: bool) -> oneshot::Receiver { +/// Handle to worker than can send stop message to worker. +/// +/// Held by [ServerBuilder](crate::builder::ServerBuilder). +pub(crate) struct WorkerHandleServer { + pub idx: usize, + tx: UnboundedSender, +} + +impl WorkerHandleServer { + pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); - let _ = self.tx2.send(StopCommand { graceful, tx }); + let _ = self.tx.send(Stop { graceful, tx }); rx } } @@ -114,8 +125,8 @@ impl WorkerAvailability { /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. pub(crate) struct ServerWorker { - rx: UnboundedReceiver, - rx2: UnboundedReceiver, + rx: UnboundedReceiver, + rx2: UnboundedReceiver, services: Vec, availability: WorkerAvailability, conns: Counter, @@ -187,7 +198,7 @@ impl ServerWorker { factories: Vec>, availability: WorkerAvailability, config: ServerWorkerConfig, - ) -> io::Result { + ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { assert!(!availability.available()); let (tx1, rx) = unbounded_channel(); @@ -267,7 +278,7 @@ impl ServerWorker { .recv() .map_err(|e| io::Error::new(io::ErrorKind::Other, e))? .map(Err) - .unwrap_or_else(|| Ok(WorkerHandle::new(idx, tx1, tx2, avail))) + .unwrap_or_else(|| Ok(handle_pair(idx, tx1, tx2, avail))) } fn restart_service(&mut self, token: Token, factory_id: usize) { @@ -373,8 +384,7 @@ impl Future for ServerWorker { let this = self.as_mut().get_mut(); // `StopWorker` message handler - if let Poll::Ready(Some(StopCommand { graceful, tx })) = - Pin::new(&mut this.rx2).poll_recv(cx) + if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) { this.availability.set(false); let num = this.conns.total(); @@ -483,7 +493,7 @@ impl Future for ServerWorker { match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { // handle incoming io stream - Some(WorkerCommand(msg)) => { + Some(msg) => { let guard = this.conns.get(); let _ = this.services[msg.token.0].service.call((guard, msg.io)); }