From 859f45868d8551381d8877a89f394e3bfcb7ba43 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 9 Apr 2021 21:04:41 +0100 Subject: [PATCH 1/3] Revert "do no drain backlog on backpressure" (#324) This reverts commit d4829b046defc6466045aeeff3c18f5b495b579a. --- actix-server/src/accept.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 2463ce47..5b6345cc 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -407,7 +407,7 @@ impl Accept { } fn accept(&mut self, sockets: &mut Slab, token: usize) { - while !self.backpressure { + loop { let info = sockets .get_mut(token) .expect("ServerSocketInfo is removed from Slab"); From 0a11cf5cba713d56b449658117a41b9c68e714ce Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Fri, 9 Apr 2021 17:03:28 -0700 Subject: [PATCH 2/3] Separate WorkerHandle to two parts (#323) --- actix-server/src/accept.rs | 10 ++-- actix-server/src/builder.rs | 27 +++++++---- actix-server/src/waker_queue.rs | 6 +-- actix-server/src/worker.rs | 86 ++++++++++++++++++--------------- 4 files changed, 74 insertions(+), 55 deletions(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 5b6345cc..5b9f99c7 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -12,7 +12,7 @@ use slab::Slab; use crate::server::Server; use crate::socket::{MioListener, SocketAddr}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; -use crate::worker::{Conn, WorkerHandle}; +use crate::worker::{Conn, WorkerHandleAccept}; use crate::Token; struct ServerSocketInfo { @@ -66,7 +66,7 @@ impl AcceptLoop { pub(crate) fn start( &mut self, socks: Vec<(Token, MioListener)>, - handles: Vec, + handles: Vec, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let poll = self.poll.take().unwrap(); @@ -80,7 +80,7 @@ impl AcceptLoop { struct Accept { poll: Poll, waker: WakerQueue, - handles: Vec, + handles: Vec, srv: Server, next: usize, backpressure: bool, @@ -105,7 +105,7 @@ impl Accept { waker: WakerQueue, socks: Vec<(Token, MioListener)>, srv: Server, - handles: Vec, + handles: Vec, ) { // Accept runs in its own thread and would want to spawn additional futures to current // actix system. @@ -125,7 +125,7 @@ impl Accept { poll: Poll, waker: WakerQueue, socks: Vec<(Token, MioListener)>, - handles: Vec, + handles: Vec, srv: Server, ) -> (Accept, Slab) { let mut sockets = Slab::new(); diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index fdb02205..6019ff16 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -19,7 +19,10 @@ use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle}; +use crate::worker::{ + ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept, + WorkerHandleServer, +}; use crate::{join_all, Token}; /// Server builder @@ -27,7 +30,7 @@ pub struct ServerBuilder { threads: usize, token: Token, backlog: u32, - handles: Vec<(usize, WorkerHandle)>, + handles: Vec<(usize, WorkerHandleServer)>, services: Vec>, sockets: Vec<(Token, String, MioListener)>, accept: AcceptLoop, @@ -280,10 +283,11 @@ impl ServerBuilder { // start workers let handles = (0..self.threads) .map(|idx| { - let handle = self.start_worker(idx, self.accept.waker_owned()); - self.handles.push((idx, handle.clone())); + let (handle_accept, handle_server) = + self.start_worker(idx, self.accept.waker_owned()); + self.handles.push((idx, handle_server)); - handle + handle_accept }) .collect(); @@ -311,7 +315,11 @@ impl ServerBuilder { } } - fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerHandle { + fn start_worker( + &self, + idx: usize, + waker: WakerQueue, + ) -> (WorkerHandleAccept, WorkerHandleServer) { let avail = WorkerAvailability::new(waker); let services = self.services.iter().map(|v| v.clone_factory()).collect(); @@ -437,9 +445,10 @@ impl ServerBuilder { break; } - let handle = self.start_worker(new_idx, self.accept.waker_owned()); - self.handles.push((new_idx, handle.clone())); - self.accept.wake(WakerInterest::Worker(handle)); + let (handle_accept, handle_server) = + self.start_worker(new_idx, self.accept.waker_owned()); + self.handles.push((new_idx, handle_server)); + self.accept.wake(WakerInterest::Worker(handle_accept)); } } } diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index e38a9782..8aa493aa 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -6,7 +6,7 @@ use std::{ use mio::{Registry, Token as MioToken, Waker}; -use crate::worker::WorkerHandle; +use crate::worker::WorkerHandleAccept; /// Waker token for `mio::Poll` instance. pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); @@ -84,6 +84,6 @@ pub(crate) enum WakerInterest { Timer, /// `Worker` is an interest happen after a worker runs into faulted state(This is determined /// by if work can be sent to it successfully).`Accept` would be waked up and add the new - /// `WorkerHandle`. - Worker(WorkerHandle), + /// `WorkerHandleAccept`. + Worker(WorkerHandleAccept), } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index c3074646..8e122623 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -28,11 +28,9 @@ use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::{join_all, Token}; -pub(crate) struct WorkerCommand(Conn); - -/// Stop worker message. Returns `true` on successful shutdown -/// and `false` if some connections still alive. -pub(crate) struct StopCommand { +/// Stop worker message. Returns `true` on successful graceful shutdown. +/// and `false` if some connections still alive when shutdown execute. +pub(crate) struct Stop { graceful: bool, tx: oneshot::Sender, } @@ -43,42 +41,55 @@ pub(crate) struct Conn { pub token: Token, } -// a handle to worker that can send message to worker and share the availability of worker to other -// thread. -#[derive(Clone)] -pub(crate) struct WorkerHandle { +fn handle_pair( + idx: usize, + tx1: UnboundedSender, + tx2: UnboundedSender, + avail: WorkerAvailability, +) -> (WorkerHandleAccept, WorkerHandleServer) { + let accept = WorkerHandleAccept { + idx, + tx: tx1, + avail, + }; + + let server = WorkerHandleServer { idx, tx: tx2 }; + + (accept, server) +} + +/// Handle to worker that can send connection message to worker and share the +/// availability of worker to other thread. +/// +/// Held by [Accept](crate::accept::Accept). +pub(crate) struct WorkerHandleAccept { pub idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, + tx: UnboundedSender, avail: WorkerAvailability, } -impl WorkerHandle { - pub fn new( - idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, - avail: WorkerAvailability, - ) -> Self { - WorkerHandle { - idx, - tx1, - tx2, - avail, - } +impl WorkerHandleAccept { + pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { + self.tx.send(msg).map_err(|msg| msg.0) } - pub fn send(&self, msg: Conn) -> Result<(), Conn> { - self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0) - } - - pub fn available(&self) -> bool { + pub(crate) fn available(&self) -> bool { self.avail.available() } +} - pub fn stop(&self, graceful: bool) -> oneshot::Receiver { +/// Handle to worker than can send stop message to worker. +/// +/// Held by [ServerBuilder](crate::builder::ServerBuilder). +pub(crate) struct WorkerHandleServer { + pub idx: usize, + tx: UnboundedSender, +} + +impl WorkerHandleServer { + pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); - let _ = self.tx2.send(StopCommand { graceful, tx }); + let _ = self.tx.send(Stop { graceful, tx }); rx } } @@ -114,8 +125,8 @@ impl WorkerAvailability { /// /// Worker accepts Socket objects via unbounded channel and starts stream processing. pub(crate) struct ServerWorker { - rx: UnboundedReceiver, - rx2: UnboundedReceiver, + rx: UnboundedReceiver, + rx2: UnboundedReceiver, services: Vec, availability: WorkerAvailability, conns: Counter, @@ -187,7 +198,7 @@ impl ServerWorker { factories: Vec>, availability: WorkerAvailability, config: ServerWorkerConfig, - ) -> WorkerHandle { + ) -> (WorkerHandleAccept, WorkerHandleServer) { let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); let avail = availability.clone(); @@ -254,7 +265,7 @@ impl ServerWorker { }); }); - WorkerHandle::new(idx, tx1, tx2, avail) + handle_pair(idx, tx1, tx2, avail) } fn restart_service(&mut self, token: Token, factory_id: usize) { @@ -360,8 +371,7 @@ impl Future for ServerWorker { let this = self.as_mut().get_mut(); // `StopWorker` message handler - if let Poll::Ready(Some(StopCommand { graceful, tx })) = - Pin::new(&mut this.rx2).poll_recv(cx) + if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) { this.availability.set(false); let num = this.conns.total(); @@ -472,7 +482,7 @@ impl Future for ServerWorker { match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { // handle incoming io stream - Some(WorkerCommand(msg)) => { + Some(msg) => { let guard = this.conns.get(); let _ = this.services[msg.token.0].service.call((guard, msg.io)); } From ddce2d6d12bb98a0fbf585a242089fa416fb78b3 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 10 Apr 2021 08:05:50 -0700 Subject: [PATCH 3/3] Reduce cfg flags in actix_server::socket (#325) --- actix-server/src/socket.rs | 100 +++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 49 deletions(-) diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 0625cfda..948b5f1f 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -12,18 +12,7 @@ pub(crate) use { use std::{fmt, io}; use actix_rt::net::TcpStream; -use mio::event::Source; -use mio::net::TcpStream as MioTcpStream; -use mio::{Interest, Registry, Token}; - -#[cfg(windows)] -use std::os::windows::io::{FromRawSocket, IntoRawSocket}; -#[cfg(unix)] -use { - actix_rt::net::UnixStream, - mio::net::{SocketAddr as MioSocketAddr, UnixStream as MioUnixStream}, - std::os::unix::io::{FromRawFd, IntoRawFd}, -}; +use mio::{event::Source, Interest, Registry, Token}; pub(crate) enum MioListener { Tcp(MioTcpListener), @@ -131,7 +120,7 @@ impl fmt::Display for MioListener { pub(crate) enum SocketAddr { Tcp(StdSocketAddr), #[cfg(unix)] - Uds(MioSocketAddr), + Uds(mio::net::SocketAddr), } impl fmt::Display for SocketAddr { @@ -156,9 +145,9 @@ impl fmt::Debug for SocketAddr { #[derive(Debug)] pub enum MioStream { - Tcp(MioTcpStream), + Tcp(mio::net::TcpStream), #[cfg(unix)] - Uds(MioUnixStream), + Uds(mio::net::UnixStream), } /// helper trait for converting mio stream to tokio stream. @@ -166,47 +155,60 @@ pub trait FromStream: Sized { fn from_mio(sock: MioStream) -> io::Result; } -// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream -#[cfg(unix)] -impl FromStream for TcpStream { - fn from_mio(sock: MioStream) -> io::Result { - match sock { - MioStream::Tcp(mio) => { - let raw = IntoRawFd::into_raw_fd(mio); - // SAFETY: This is a in place conversion from mio stream to tokio stream. - TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) - } - MioStream::Uds(_) => { - panic!("Should not happen, bug in server impl"); - } - } - } -} - -// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream #[cfg(windows)] -impl FromStream for TcpStream { - fn from_mio(sock: MioStream) -> io::Result { - match sock { - MioStream::Tcp(mio) => { - let raw = IntoRawSocket::into_raw_socket(mio); - // SAFETY: This is a in place conversion from mio stream to tokio stream. - TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) +mod win_impl { + use super::*; + + use std::os::windows::io::{FromRawSocket, IntoRawSocket}; + + // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream + impl FromStream for TcpStream { + fn from_mio(sock: MioStream) -> io::Result { + match sock { + MioStream::Tcp(mio) => { + let raw = IntoRawSocket::into_raw_socket(mio); + // SAFETY: This is a in place conversion from mio stream to tokio stream. + TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) + } } } } } -// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream #[cfg(unix)] -impl FromStream for UnixStream { - fn from_mio(sock: MioStream) -> io::Result { - match sock { - MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), - MioStream::Uds(mio) => { - let raw = IntoRawFd::into_raw_fd(mio); - // SAFETY: This is a in place conversion from mio stream to tokio stream. - UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) +mod unix_impl { + use super::*; + + use std::os::unix::io::{FromRawFd, IntoRawFd}; + + use actix_rt::net::UnixStream; + + // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream + impl FromStream for TcpStream { + fn from_mio(sock: MioStream) -> io::Result { + match sock { + MioStream::Tcp(mio) => { + let raw = IntoRawFd::into_raw_fd(mio); + // SAFETY: This is a in place conversion from mio stream to tokio stream. + TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) + } + MioStream::Uds(_) => { + panic!("Should not happen, bug in server impl"); + } + } + } + } + + // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream + impl FromStream for UnixStream { + fn from_mio(sock: MioStream) -> io::Result { + match sock { + MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), + MioStream::Uds(mio) => { + let raw = IntoRawFd::into_raw_fd(mio); + // SAFETY: This is a in place conversion from mio stream to tokio stream. + UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) + } } } }