diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 58471cf9..444e610d 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -16,8 +16,13 @@ edition = "2018" name = "actix_server" path = "src/lib.rs" -[features] -default = [] +[[example]] +name = "mio_tcp" +path = "examples/mio-tcp.rs" + +[[example]] +name = "tcp_echo" +path = "examples/tcp-echo.rs" [dependencies] actix-rt = { version = "2.0.0", default-features = false } @@ -38,3 +43,6 @@ bytes = "1" env_logger = "0.8" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } tokio = { version = "1", features = ["io-util"] } + +[target.'cfg(_loom)'.dependencies] +loom = "0.4.1" diff --git a/actix-server/examples/mio-tcp.rs b/actix-server/examples/mio-tcp.rs new file mode 100644 index 00000000..f44d7c3a --- /dev/null +++ b/actix-server/examples/mio-tcp.rs @@ -0,0 +1,46 @@ +//! A Tcp Server using mio::net::TcpListener. +//! +//! actix-server is used to bridge `mio` and multiple tokio current-thread runtime +//! for a thread per core like server. +//! +//! Server would return "Hello World!" String. + +use std::{env, io}; + +use actix_rt::net::TcpStream; +use actix_server::ServerBuilder; +use actix_service::fn_service; +use tokio::io::AsyncWriteExt; + +use mio::net::TcpListener; + +// A dummy buffer always return hello world to client. +const BUF: &[u8] = b"HTTP/1.1 200 OK\r\n\ + content-length: 12\r\n\ + connection: close\r\n\ + date: Thu, 01 Jan 1970 12:34:56 UTC\r\n\ + \r\n\ + Hello World!"; + +#[actix_rt::main] +async fn main() -> io::Result<()> { + env::set_var("RUST_LOG", "info"); + env_logger::init(); + + let name = "hello_world"; + + let addr = "127.0.0.1:8080".parse().unwrap(); + + let lst = TcpListener::bind(addr)?; + + ServerBuilder::new() + .bind_acceptable(name, addr, lst, || fn_service(response)) + .run() + .await +} + +async fn response(mut stream: TcpStream) -> io::Result<()> { + stream.write(BUF).await?; + stream.flush().await?; + stream.shutdown().await +} diff --git a/actix-server/src/accept/acceptable.rs b/actix-server/src/accept/acceptable.rs new file mode 100644 index 00000000..e0371775 --- /dev/null +++ b/actix-server/src/accept/acceptable.rs @@ -0,0 +1,20 @@ +use std::{fmt, io}; + +use mio::{Registry, Token}; + +#[doc(hidden)] +/// Trait define IO source that can be managed by [super::Accept]. +pub trait Acceptable: fmt::Debug { + /// Type accepted from IO source. + type Connection: Send + 'static; + + fn accept(&mut self) -> io::Result>; + + /// Register IO source to Acceptor [Registry](mio::Registry). + /// Self must impl [Source](mio::event::Source) trait. + fn register(&mut self, registry: &Registry, token: Token) -> io::Result<()>; + + /// Deregister IO source to Acceptor [Registry](mio::Registry). + /// Self must impl [Source](mio::event::Source) trait. + fn deregister(&mut self, registry: &Registry) -> io::Result<()>; +} diff --git a/actix-server/src/accept/availability.rs b/actix-server/src/accept/availability.rs new file mode 100644 index 00000000..df6e9ac2 --- /dev/null +++ b/actix-server/src/accept/availability.rs @@ -0,0 +1,117 @@ +/// Array of u128 with every bit as marker for a worker handle's availability. +pub(super) struct Availability([u128; 4]); + +impl Default for Availability { + fn default() -> Self { + Self([0; 4]) + } +} + +impl Availability { + /// Check if any worker handle is available + #[inline(always)] + pub(super) fn available(&self) -> bool { + self.0.iter().any(|a| *a != 0) + } + + /// Check if worker handle is available by index + #[inline(always)] + pub(super) fn get_available(&self, idx: usize) -> bool { + let (offset, idx) = Self::offset(idx); + + self.0[offset] & (1 << idx as u128) != 0 + } + + /// Set worker handle available state by index. + pub(super) fn set_available(&mut self, idx: usize, avail: bool) { + let (offset, idx) = Self::offset(idx); + + let off = 1 << idx as u128; + if avail { + self.0[offset] |= off; + } else { + self.0[offset] &= !off + } + } + + /// Get offset and adjusted index of given worker handle index. + #[inline(always)] + fn offset(idx: usize) -> (usize, usize) { + if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + } + } +} + +#[cfg(test)] +mod test { + use super::Availability; + + fn single(aval: &mut Availability, idx: usize) { + aval.set_available(idx, true); + assert!(aval.available()); + + aval.set_available(idx, true); + + aval.set_available(idx, false); + assert!(!aval.available()); + + aval.set_available(idx, false); + assert!(!aval.available()); + } + + fn multi(aval: &mut Availability, mut idx: Vec) { + idx.iter().for_each(|idx| aval.set_available(*idx, true)); + + assert!(aval.available()); + + while let Some(idx) = idx.pop() { + assert!(aval.available()); + aval.set_available(idx, false); + } + + assert!(!aval.available()); + } + + #[test] + fn availability() { + let mut aval = Availability::default(); + + single(&mut aval, 1); + single(&mut aval, 128); + single(&mut aval, 256); + single(&mut aval, 511); + + let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect(); + + multi(&mut aval, idx); + + multi(&mut aval, (0..511).collect()) + } + + #[test] + #[should_panic] + fn overflow() { + let mut aval = Availability::default(); + single(&mut aval, 512); + } + + #[test] + fn pin_point() { + let mut aval = Availability::default(); + + aval.set_available(438, true); + + aval.set_available(479, true); + + assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384)); + } +} diff --git a/actix-server/src/accept.rs b/actix-server/src/accept/mod.rs similarity index 56% rename from actix-server/src/accept.rs rename to actix-server/src/accept/mod.rs index d9451d37..914bde44 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept/mod.rs @@ -1,41 +1,46 @@ -use std::time::Duration; -use std::{io, thread}; +#[cfg(not(_loom))] +use std::thread; + +#[cfg(_loom)] +use loom::thread; + +mod acceptable; +mod availability; + +pub use acceptable::Acceptable; + +use std::{io, time::Duration}; use actix_rt::{ time::{sleep, Instant}, System, }; use log::{error, info}; -use mio::{Interest, Poll, Token as MioToken}; +use mio::{Poll, Registry, Token}; +use tokio::sync::oneshot; + +use availability::Availability; use crate::server::Server; -use crate::socket::MioListener; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandleAccept}; -struct ServerSocketInfo { - token: usize, - - lst: MioListener, - - /// Timeout is used to mark the deadline when this socket's listener should be registered again - /// after an error. - timeout: Option, -} - /// Accept loop would live with `ServerBuilder`. /// /// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to /// `Accept` and `Worker`. /// /// It would also listen to `ServerCommand` and push interests to `WakerQueue`. -pub(crate) struct AcceptLoop { +pub(crate) struct AcceptLoop { srv: Option, poll: Option, - waker: WakerQueue, + waker: WakerQueue, } -impl AcceptLoop { +impl AcceptLoop +where + A: Acceptable + Send + 'static, +{ pub fn new(srv: Server) -> Self { let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); let waker = WakerQueue::new(poll.registry()) @@ -48,18 +53,18 @@ impl AcceptLoop { } } - pub(crate) fn waker_owned(&self) -> WakerQueue { + pub(crate) fn waker_owned(&self) -> WakerQueue { self.waker.clone() } - pub fn wake(&self, i: WakerInterest) { + pub fn wake(&self, i: WakerInterest) { self.waker.wake(i); } pub(crate) fn start( &mut self, - socks: Vec<(usize, MioListener)>, - handles: Vec, + socks: Vec<(usize, A)>, + handles: Vec>, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let poll = self.poll.take().unwrap(); @@ -70,72 +75,42 @@ impl AcceptLoop { } /// poll instance of the server. -struct Accept { +struct Accept { poll: Poll, - waker: WakerQueue, - handles: Vec, + source: Box<[Source]>, + waker: WakerQueue, + handles: Vec>, srv: Server, next: usize, avail: Availability, paused: bool, } -/// Array of u128 with every bit as marker for a worker handle's availability. -struct Availability([u128; 4]); +struct Source { + token: usize, -impl Default for Availability { - fn default() -> Self { - Self([0; 4]) - } + acceptable: A, + + /// Timeout is used to mark the deadline when this socket's listener should be registered again + /// after an error. + timeout: Option, } -impl Availability { - /// Check if any worker handle is available +impl Source { #[inline(always)] - fn available(&self) -> bool { - self.0.iter().any(|a| *a != 0) - } - - /// Check if worker handle is available by index - #[inline(always)] - fn get_available(&self, idx: usize) -> bool { - let (offset, idx) = Self::offset(idx); - - self.0[offset] & (1 << idx as u128) != 0 - } - - /// Set worker handle available state by index. - fn set_available(&mut self, idx: usize, avail: bool) { - let (offset, idx) = Self::offset(idx); - - let off = 1 << idx as u128; - if avail { - self.0[offset] |= off; - } else { - self.0[offset] &= !off + fn register(&mut self, registry: &Registry) { + let token = Token(self.token); + match self.acceptable.register(registry, token) { + Ok(_) => info!("Start accepting connections on {:?}", &self.acceptable), + Err(e) => error!("Can not register {}", e), } } - /// Set all worker handle to available state. - /// This would result in a re-check on all workers' availability. - fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { - handles.iter().for_each(|handle| { - self.set_available(handle.idx(), true); - }) - } - - /// Get offset and adjusted index of given worker handle index. - fn offset(idx: usize) -> (usize, usize) { - if idx < 128 { - (0, idx) - } else if idx < 128 * 2 { - (1, idx - 128) - } else if idx < 128 * 3 { - (2, idx - 128 * 2) - } else if idx < 128 * 4 { - (3, idx - 128 * 3) - } else { - panic!("Max WorkerHandle count is 512") + #[inline(always)] + fn deregister(&mut self, registry: &Registry) { + match self.acceptable.deregister(registry) { + Ok(_) => info!("Paused accepting connections on {:?}", &self.acceptable), + Err(e) => error!("Can not deregister {}", e), } } } @@ -153,71 +128,63 @@ fn connection_error(e: &io::Error) -> bool { || e.kind() == io::ErrorKind::ConnectionReset } -impl Accept { +impl Accept +where + A: Acceptable + Send + 'static, +{ pub(crate) fn start( poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, + waker: WakerQueue, + source: Vec<(usize, A)>, srv: Server, - handles: Vec, + handles: Vec>, ) { // Accept runs in its own thread and would want to spawn additional futures to current // actix system. let sys = System::current(); thread::Builder::new() - .name("actix-server accept loop".to_owned()) + .name("actix-server acceptor".to_owned()) .spawn(move || { System::set_current(sys); - let (mut accept, mut sockets) = - Accept::new_with_sockets(poll, waker, socks, handles, srv); + let source = source + .into_iter() + .map(|(token, mut lst)| { + // Start listening for incoming connections + lst.register(poll.registry(), Token(token)) + .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - accept.poll_with(&mut sockets); + Source { + token, + acceptable: lst, + timeout: None, + } + }) + .collect(); + + let mut avail = Availability::default(); + + // Assume all handles are avail at construct time. + handles.iter().for_each(|handle| { + avail.set_available(handle.idx(), true); + }); + + let accept = Accept { + poll, + source, + waker, + handles, + srv, + next: 0, + avail, + paused: false, + }; + + accept.poll(); }) .unwrap(); } - fn new_with_sockets( - poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, - handles: Vec, - srv: Server, - ) -> (Accept, Vec) { - let sockets = socks - .into_iter() - .map(|(token, mut lst)| { - // Start listening for incoming connections - poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - - ServerSocketInfo { - token, - lst, - timeout: None, - } - }) - .collect(); - - let mut avail = Availability::default(); - - // Assume all handles are avail at construct time. - avail.set_available_all(&handles); - - let accept = Accept { - poll, - waker, - handles, - srv, - next: 0, - avail, - paused: false, - }; - - (accept, sockets) - } - - fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { + fn poll(mut self) { let mut events = mio::Events::with_capacity(128); loop { @@ -232,7 +199,7 @@ impl Accept { let token = event.token(); match token { WAKER_TOKEN => { - let exit = self.handle_waker(sockets); + let exit = self.handle_waker(); if exit { info!("Accept is stopped."); return; @@ -240,14 +207,14 @@ impl Accept { } _ => { let token = usize::from(token); - self.accept(sockets, token); + self.accept(token); } } } } } - fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool { + fn handle_waker(&mut self) -> bool { // This is a loop because interests for command from previous version was // a loop that would try to drain the command channel. It's yet unknown // if it's necessary/good practice to actively drain the waker queue. @@ -263,7 +230,7 @@ impl Accept { self.avail.set_available(idx, true); if !self.paused { - self.accept_all(sockets); + self.accept_all(); } } // a new worker thread is made and it's handle would be added to Accept @@ -274,14 +241,14 @@ impl Accept { self.handles.push(handle); if !self.paused { - self.accept_all(sockets); + self.accept_all(); } } // got timer interest and it's time to try register socket(s) again Some(WakerInterest::Timer) => { drop(guard); - self.process_timer(sockets) + self.process_timer() } Some(WakerInterest::Pause) => { drop(guard); @@ -289,7 +256,7 @@ impl Accept { if !self.paused { self.paused = true; - self.deregister_all(sockets); + self.deregister_all(); } } Some(WakerInterest::Resume) => { @@ -298,18 +265,29 @@ impl Accept { if self.paused { self.paused = false; - sockets.iter_mut().for_each(|info| { - self.register_logged(info); - }); + self.register_all(); - self.accept_all(sockets); + self.accept_all(); } } - Some(WakerInterest::Stop) => { + Some(WakerInterest::Stop(AcceptorStop { graceful, tx })) => { + drop(guard); + if !self.paused { - self.deregister_all(sockets); + self.deregister_all(); } + // Collect oneshot receiver if WorkerHandle::stop returns it. + let res = self + .handles + .iter() + .filter_map(|handle| handle.stop(graceful)) + .collect::>(); + + let _ = tx.send(res); + + // TODO: Should try to drain backlog? + return true; } // waker queue is drained @@ -323,88 +301,11 @@ impl Accept { } } - fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { - let now = Instant::now(); - sockets - .iter_mut() - // Only sockets that had an associated timeout were deregistered. - .filter(|info| info.timeout.is_some()) - .for_each(|info| { - let inst = info.timeout.take().unwrap(); - - if now < inst { - info.timeout = Some(inst); - } else if !self.paused { - self.register_logged(info); - } - - // Drop the timeout if server is paused and socket timeout is expired. - // When server recovers from pause it will register all sockets without - // a timeout value so this socket register will be delayed till then. - }); - } - - #[cfg(not(target_os = "windows"))] - fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { - let token = MioToken(info.token); - self.poll - .registry() - .register(&mut info.lst, token, Interest::READABLE) - } - - #[cfg(target_os = "windows")] - fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { - // On windows, calling register without deregister cause an error. - // See https://github.com/actix/actix-web/issues/905 - // Calling reregister seems to fix the issue. - let token = MioToken(info.token); - self.poll - .registry() - .register(&mut info.lst, token, Interest::READABLE) - .or_else(|_| { - self.poll - .registry() - .reregister(&mut info.lst, token, Interest::READABLE) - }) - } - - fn register_logged(&self, info: &mut ServerSocketInfo) { - match self.register(info) { - Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()), - Err(e) => error!("Can not register server socket {}", e), - } - } - - fn deregister_logged(&self, info: &mut ServerSocketInfo) { - match self.poll.registry().deregister(&mut info.lst) { - Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()), - Err(e) => { - error!("Can not deregister server socket {}", e) - } - } - } - - fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { - // This is a best effort implementation with following limitation: - // - // Every ServerSocketInfo with associate timeout will be skipped and it's timeout - // is removed in the process. - // - // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short - // gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered - // before expected timing. - sockets - .iter_mut() - // Take all timeout. - // This is to prevent Accept::process_timer method re-register a socket afterwards. - .map(|info| (info.timeout.take(), info)) - // Socket info with a timeout is already deregistered so skip them. - .filter(|(timeout, _)| timeout.is_none()) - .for_each(|(_, info)| self.deregister_logged(info)); - } - // Send connection to worker and handle error. - fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> { + fn send_connection( + &mut self, + conn: Conn, + ) -> Result<(), Conn> { let next = self.next(); match next.send(conn) { Ok(_) => { @@ -436,7 +337,7 @@ impl Accept { } } - fn accept_one(&mut self, mut conn: Conn) { + fn accept_one(&mut self, mut conn: Conn) { loop { let next = self.next(); let idx = next.idx(); @@ -447,7 +348,6 @@ impl Accept { Err(c) => conn = c, } } else { - self.avail.set_available(idx, false); self.set_next(); if !self.avail.available() { @@ -460,27 +360,28 @@ impl Accept { } } - fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { + fn accept(&mut self, token: usize) { while self.avail.available() { - let info = &mut sockets[token]; + let source = &mut self.source[token]; - match info.lst.accept() { - Ok(io) => { - let conn = Conn { io, token }; + match source.acceptable.accept() { + Ok(Some(io)) => { + let conn = Conn { token, io }; self.accept_one(conn); } + Ok(None) => continue, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, Err(e) => { error!("Error accepting connection: {}", e); // deregister listener temporary - self.deregister_logged(info); + source.deregister(self.poll.registry()); // sleep after error. write the timeout to socket info as later // the poll would need it mark which socket and when it's // listener should be registered - info.timeout = Some(Instant::now() + Duration::from_millis(500)); + source.timeout = Some(Instant::now() + Duration::from_millis(500)); // after the sleep a Timer interest is sent to Accept Poll let waker = self.waker.clone(); @@ -495,17 +396,69 @@ impl Accept { } } - fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) { - sockets + fn accept_all(&mut self) { + self.source .iter_mut() .map(|info| info.token) .collect::>() .into_iter() - .for_each(|idx| self.accept(sockets, idx)) + .for_each(|idx| self.accept(idx)) + } + + fn process_timer(&mut self) { + let now = Instant::now(); + + let registry = self.poll.registry(); + let paused = self.paused; + + self.source + .iter_mut() + // Only sockets that had an associated timeout were deregistered. + .filter(|info| info.timeout.is_some()) + .for_each(|info| { + let inst = info.timeout.take().unwrap(); + + if now < inst { + info.timeout = Some(inst); + } else if !paused { + info.register(registry); + } + + // Drop the timeout if server is paused and socket timeout is expired. + // When server recovers from pause it will register all sockets without + // a timeout value so this socket register will be delayed till then. + }); + } + + fn register_all(&mut self) { + let registry = self.poll.registry(); + self.source.iter_mut().for_each(|info| { + info.register(registry); + }); + } + + fn deregister_all(&mut self) { + let registry = self.poll.registry(); + // This is a best effort implementation with following limitation: + // + // Every ServerSocketInfo with associate timeout will be skipped and it's timeout + // is removed in the process. + // + // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short + // gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered + // before expected timing. + self.source + .iter_mut() + // Take all timeout. + // This is to prevent Accept::process_timer method re-register a socket afterwards. + .map(|info| (info.timeout.take(), info)) + // Socket info with a timeout is already deregistered so skip them. + .filter(|(timeout, _)| timeout.is_none()) + .for_each(|(_, info)| info.deregister(registry)); } #[inline(always)] - fn next(&self) -> &WorkerHandleAccept { + fn next(&self) -> &WorkerHandleAccept { &self.handles[self.next] } @@ -526,67 +479,19 @@ impl Accept { } } -#[cfg(test)] -mod test { - use super::Availability; +pub(crate) struct AcceptorStop { + graceful: bool, + tx: oneshot::Sender>>, +} - fn single(aval: &mut Availability, idx: usize) { - aval.set_available(idx, true); - assert!(aval.available()); +impl AcceptorStop { + pub(crate) fn new( + graceful: bool, + ) -> (Self, oneshot::Receiver>>) { + let (tx, rx) = oneshot::channel(); - aval.set_available(idx, true); + let this = Self { graceful, tx }; - aval.set_available(idx, false); - assert!(!aval.available()); - - aval.set_available(idx, false); - assert!(!aval.available()); - } - - fn multi(aval: &mut Availability, mut idx: Vec) { - idx.iter().for_each(|idx| aval.set_available(*idx, true)); - - assert!(aval.available()); - - while let Some(idx) = idx.pop() { - assert!(aval.available()); - aval.set_available(idx, false); - } - - assert!(!aval.available()); - } - - #[test] - fn availability() { - let mut aval = Availability::default(); - - single(&mut aval, 1); - single(&mut aval, 128); - single(&mut aval, 256); - single(&mut aval, 511); - - let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect(); - - multi(&mut aval, idx); - - multi(&mut aval, (0..511).collect()) - } - - #[test] - #[should_panic] - fn overflow() { - let mut aval = Availability::default(); - single(&mut aval, 512); - } - - #[test] - fn pin_point() { - let mut aval = Availability::default(); - - aval.set_available(438, true); - - aval.set_available(479, true); - - assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384)); + (this, rx) } } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index e84a887d..51803623 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -6,32 +6,32 @@ use std::{ time::Duration, }; -use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; +use actix_rt::{net::TcpStream, time::sleep, System}; use log::{error, info}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver}, oneshot, }; -use crate::accept::AcceptLoop; -use crate::join_all; +use crate::accept::{AcceptLoop, Acceptable, AcceptorStop}; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; -use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; +use crate::socket::{ + FromConnection, MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, + ToSocketAddrs, +}; +use crate::waker_queue::WakerInterest; +use crate::worker::{ServerWorkerConfig, Worker, WorkerHandleAccept}; /// Server builder -pub struct ServerBuilder { +pub struct ServerBuilder { threads: usize, token: usize, backlog: u32, - handles: Vec<(usize, WorkerHandleServer)>, - services: Vec>, - sockets: Vec<(usize, String, MioListener)>, - accept: AcceptLoop, + services: Vec>>, + sockets: Vec<(usize, String, A)>, + accept: AcceptLoop, exit: bool, no_signals: bool, cmd: UnboundedReceiver, @@ -46,16 +46,18 @@ impl Default for ServerBuilder { } } -impl ServerBuilder { +impl ServerBuilder +where + A: Acceptable + Send + Unpin + 'static, +{ /// Create new Server builder instance - pub fn new() -> ServerBuilder { + pub fn new() -> Self { let (tx, rx) = unbounded_channel(); let server = Server::new(tx); - ServerBuilder { + Self { threads: num_cpus::get(), token: 0, - handles: Vec::new(), services: Vec::new(), sockets: Vec::new(), accept: AcceptLoop::new(server.clone()), @@ -86,7 +88,7 @@ impl ServerBuilder { /// # Examples: /// ``` /// # use actix_server::ServerBuilder; - /// let builder = ServerBuilder::new() + /// let builder = ServerBuilder::default() /// .workers(4) // server has 4 worker thread. /// .worker_max_blocking_threads(4); // every worker has 4 max blocking threads. /// ``` @@ -147,154 +149,49 @@ impl ServerBuilder { self } - /// Add new service to the server. - pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result - where - F: ServiceFactory, - U: ToSocketAddrs, - { - let sockets = bind_addr(addr, self.backlog)?; - - for lst in sockets { - let token = self.next_token(); - self.services.push(StreamNewService::create( - name.as_ref().to_string(), - token, - factory.clone(), - lst.local_addr()?, - )); - self.sockets - .push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); - } - Ok(self) + fn next_token(&mut self) -> usize { + let token = self.token; + self.token += 1; + token } - /// Add new unix domain service to the server. - #[cfg(unix)] - pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result - where - F: ServiceFactory, - N: AsRef, - U: AsRef, - { - // The path must not exist when we try to bind. - // Try to remove it to avoid bind error. - if let Err(e) = std::fs::remove_file(addr.as_ref()) { - // NotFound is expected and not an issue. Anything else is. - if e.kind() != std::io::ErrorKind::NotFound { - return Err(e); - } - } - - let lst = crate::socket::StdUnixListener::bind(addr)?; - self.listen_uds(name, lst, factory) - } - - /// Add new unix domain service to the server. - /// Useful when running as a systemd service and - /// a socket FD can be acquired using the systemd crate. - #[cfg(unix)] - pub fn listen_uds>( - mut self, - name: N, - lst: crate::socket::StdUnixListener, - factory: F, - ) -> io::Result - where - F: ServiceFactory, - { - use std::net::{IpAddr, Ipv4Addr}; - lst.set_nonblocking(true)?; - let token = self.next_token(); - let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); - self.services.push(StreamNewService::create( - name.as_ref().to_string(), - token, - factory, - addr, - )); - self.sockets - .push((token, name.as_ref().to_string(), MioListener::from(lst))); - Ok(self) - } - - /// Add new service to the server. - pub fn listen>( - mut self, - name: N, - lst: StdTcpListener, - factory: F, - ) -> io::Result - where - F: ServiceFactory, - { - lst.set_nonblocking(true)?; - let addr = lst.local_addr()?; - - let token = self.next_token(); - self.services.push(StreamNewService::create( - name.as_ref().to_string(), - token, - factory, - addr, - )); - - self.sockets - .push((token, name.as_ref().to_string(), MioListener::from(lst))); - - Ok(self) - } - - /// Starts processing incoming connections and return server controller. - pub fn run(mut self) -> Server { - if self.sockets.is_empty() { - panic!("Server should have at least one bound socket"); - } else { - info!("Starting {} workers", self.threads); - - // start workers - let handles = (0..self.threads) - .map(|idx| { - let (handle_accept, handle_server) = - self.start_worker(idx, self.accept.waker_owned()); - self.handles.push((idx, handle_server)); - - handle_accept - }) - .collect(); - - // start accept thread - for sock in &self.sockets { - info!("Starting \"{}\" service on {}", sock.1, sock.2); - } - self.accept.start( - mem::take(&mut self.sockets) - .into_iter() - .map(|t| (t.0, t.2)) - .collect(), - handles, - ); - - // handle signals - if !self.no_signals { - Signals::start(self.server.clone()); - } - - // start http server actor - let server = self.server.clone(); - rt::spawn(self); - server - } - } - - fn start_worker( - &self, - idx: usize, - waker_queue: WakerQueue, - ) -> (WorkerHandleAccept, WorkerHandleServer) { + fn start_worker(&self, idx: usize) -> WorkerHandleAccept { let services = self.services.iter().map(|v| v.clone_factory()).collect(); + let config = self.worker_config; + let waker_queue = self.accept.waker_owned(); + Worker::start(idx, services, waker_queue, config) + } - ServerWorker::start(idx, services, waker_queue, self.worker_config) + fn handle_signal(&mut self, sig: Signal) { + // Signals support + // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system + match sig { + Signal::Int => { + info!("SIGINT received, exiting"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + Signal::Term => { + info!("SIGTERM received, stopping"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: true, + completion: None, + }) + } + Signal::Quit => { + info!("SIGQUIT received, exiting"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + _ => (), + } } fn handle_cmd(&mut self, item: ServerCommand) { @@ -307,37 +204,7 @@ impl ServerBuilder { self.accept.wake(WakerInterest::Resume); let _ = tx.send(()); } - ServerCommand::Signal(sig) => { - // Signals support - // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system - match sig { - Signal::Int => { - info!("SIGINT received, exiting"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - Signal::Term => { - info!("SIGTERM received, stopping"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: true, - completion: None, - }) - } - Signal::Quit => { - info!("SIGQUIT received, exiting"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - _ => (), - } - } + ServerCommand::Signal(sig) => self.handle_signal(sig), ServerCommand::Notify(tx) => { self.notify.push(tx); } @@ -348,19 +215,14 @@ impl ServerBuilder { let exit = self.exit; // stop accept thread - self.accept.wake(WakerInterest::Stop); + let (stop, rx) = AcceptorStop::new(graceful); + + self.accept.wake(WakerInterest::Stop(stop)); let notify = std::mem::take(&mut self.notify); - // stop workers - let stop = self - .handles - .iter() - .map(move |worker| worker.1.stop(graceful)) - .collect(); - - rt::spawn(async move { - if graceful { - let _ = join_all(stop).await; + actix_rt::spawn(async move { + for rx in rx.await.unwrap_or_else(|_| Vec::new()) { + let _ = rx.await; } if let Some(tx) = completion { @@ -377,52 +239,171 @@ impl ServerBuilder { }); } ServerCommand::WorkerFaulted(idx) => { - let mut found = false; - for i in 0..self.handles.len() { - if self.handles[i].0 == idx { - self.handles.swap_remove(i); - found = true; - break; - } - } + error!("Worker has died {:?}, restarting", idx); - if found { - error!("Worker has died {:?}, restarting", idx); - - let mut new_idx = self.handles.len(); - 'found: loop { - for i in 0..self.handles.len() { - if self.handles[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let (handle_accept, handle_server) = - self.start_worker(new_idx, self.accept.waker_owned()); - self.handles.push((new_idx, handle_server)); - self.accept.wake(WakerInterest::Worker(handle_accept)); - } + let handle = self.start_worker(idx); + self.accept.wake(WakerInterest::Worker(handle)); } } } - fn next_token(&mut self) -> usize { - let token = self.token; - self.token += 1; - token + /// Starts processing incoming connections and return server controller. + pub fn run(mut self) -> Server { + if self.sockets.is_empty() { + panic!("Server should have at least one bound socket"); + } else { + info!("Starting {} workers", self.threads); + + // start workers + let handles = (0..self.threads) + .map(|idx| self.start_worker(idx)) + .collect(); + + // start accept thread + for sock in &self.sockets { + info!("Starting \"{}\" service on {:?}", sock.1, sock.2); + } + self.accept.start( + mem::take(&mut self.sockets) + .into_iter() + .map(|t| (t.0, t.2)) + .collect(), + handles, + ); + + // handle signals + if !self.no_signals { + Signals::start(self.server.clone()); + } + + // start http server actor + let server = self.server.clone(); + actix_rt::spawn(self); + server + } + } + + #[doc(hidden)] + pub fn bind_acceptable( + mut self, + name: &str, + addr: StdSocketAddr, + lst: A, + factory: F, + ) -> Self + where + F: ServiceFactory, + Io: FromConnection + Send + 'static, + { + let token = self.next_token(); + self.services.push(StreamNewService::create( + name.to_string(), + token, + factory, + addr, + )); + + self.sockets.push((token, name.to_string(), lst)); + + self } } -impl Future for ServerBuilder { +impl ServerBuilder { + /// Add new service to the server. + pub fn bind(mut self, name: N, addr: U, factory: F) -> io::Result + where + F: ServiceFactory, + N: AsRef, + U: ToSocketAddrs, + { + let sockets = bind_addr(addr, self.backlog)?; + + for lst in sockets { + let addr = lst.local_addr()?; + let lst = MioListener::Tcp(lst); + + self = self.bind_acceptable(name.as_ref(), addr, lst, factory.clone()); + } + + Ok(self) + } + + /// Add new service to the server. + pub fn listen>( + self, + name: N, + lst: StdTcpListener, + factory: F, + ) -> io::Result + where + F: ServiceFactory, + { + lst.set_nonblocking(true)?; + + let addr = lst.local_addr()?; + let lst = MioListener::from(lst); + + Ok(self.bind_acceptable(name.as_ref(), addr, lst, factory)) + } +} + +#[cfg(unix)] +impl ServerBuilder { + /// Add new unix domain service to the server. + pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result + where + F: ServiceFactory, + N: AsRef, + U: AsRef, + { + // The path must not exist when we try to bind. + // Try to remove it to avoid bind error. + if let Err(e) = std::fs::remove_file(addr.as_ref()) { + // NotFound is expected and not an issue. Anything else is. + if e.kind() != io::ErrorKind::NotFound { + return Err(e); + } + } + + let lst = crate::socket::StdUnixListener::bind(addr)?; + self.listen_uds(name, lst, factory) + } + + /// Add new unix domain service to the server. + /// Useful when running as a systemd service and + /// a socket FD can be acquired using the systemd crate. + pub fn listen_uds( + self, + name: N, + lst: crate::socket::StdUnixListener, + factory: F, + ) -> io::Result + where + F: ServiceFactory, + N: AsRef, + { + lst.set_nonblocking(true)?; + + let addr = "127.0.0.1:8080".parse().unwrap(); + + let lst = MioListener::from(lst); + + Ok(self.bind_acceptable(name.as_ref(), addr, lst, factory)) + } +} + +impl Future for ServerBuilder +where + A: Acceptable + Send + Unpin + 'static, +{ type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); loop { - match Pin::new(&mut self.cmd).poll_recv(cx) { - Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), + match Pin::new(&mut this.cmd).poll_recv(cx) { + Poll::Ready(Some(it)) => this.handle_cmd(it), _ => return Poll::Pending, } } diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index b2117191..3505ce46 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -14,13 +14,14 @@ mod test_server; mod waker_queue; mod worker; +pub use self::accept::Acceptable; pub use self::builder::ServerBuilder; pub use self::server::Server; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; #[doc(hidden)] -pub use self::socket::FromStream; +pub use self::socket::FromConnection; use std::future::Future; use std::pin::Pin; diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index 28ffb4f1..7449ffb4 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -7,38 +7,42 @@ use actix_utils::future::{ready, Ready}; use futures_core::future::LocalBoxFuture; use log::error; -use crate::socket::{FromStream, MioStream}; +use crate::socket::{FromConnection, MioStream}; use crate::worker::WorkerCounterGuard; -pub trait ServiceFactory: Send + Clone + 'static { - type Factory: BaseServiceFactory; +pub trait ServiceFactory +where + Io: FromConnection, + Self: Send + Clone + 'static, +{ + type Factory: BaseServiceFactory; fn create(&self) -> Self::Factory; } -pub(crate) trait InternalServiceFactory: Send { +pub(crate) trait InternalServiceFactory: Send { fn name(&self, token: usize) -> &str; - fn clone_factory(&self) -> Box; + fn clone_factory(&self) -> Box>; - fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>; + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>; } -pub(crate) type BoxedServerService = Box< +pub(crate) type BoxedServerService = Box< dyn Service< - (WorkerCounterGuard, MioStream), + (WorkerCounterGuard, C), Response = (), Error = (), Future = Ready>, >, >; -pub(crate) struct StreamService { +pub(crate) struct StreamService { service: S, - _phantom: PhantomData, + _phantom: PhantomData, } -impl StreamService { +impl StreamService { pub(crate) fn new(service: S) -> Self { StreamService { service, @@ -47,26 +51,28 @@ impl StreamService { } } -impl Service<(WorkerCounterGuard, MioStream)> for StreamService +impl Service<(WorkerCounterGuard, C)> for StreamService where - S: Service, + S: Service, S::Future: 'static, S::Error: 'static, - I: FromStream, + Io: FromConnection, + C: 'static, { type Response = (); type Error = (); type Future = Ready>; + #[inline] fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll> { self.service.poll_ready(ctx).map_err(|_| ()) } - fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future { - ready(match FromStream::from_mio(req) { + fn call(&self, (guard, req): (WorkerCounterGuard, C)) -> Self::Future { + ready(match FromConnection::from_conn(req) { Ok(stream) => { let f = self.service.call(stream); - actix_rt::spawn(async move { + actix_rt::spawn(async { let _ = f.await; drop(guard); }); @@ -80,25 +86,30 @@ where } } -pub(crate) struct StreamNewService, Io: FromStream> { +pub(crate) struct StreamNewService +where + F: ServiceFactory, + Io: FromConnection + Send, +{ name: String, inner: F, token: usize, addr: SocketAddr, - _t: PhantomData, + _t: PhantomData<(Io, C)>, } -impl StreamNewService +impl StreamNewService where - F: ServiceFactory, - Io: FromStream + Send + 'static, + F: ServiceFactory, + Io: FromConnection + Send + 'static, + C: Send + 'static, { pub(crate) fn create( name: String, token: usize, inner: F, addr: SocketAddr, - ) -> Box { + ) -> Box> { Box::new(Self { name, token, @@ -109,16 +120,17 @@ where } } -impl InternalServiceFactory for StreamNewService +impl InternalServiceFactory for StreamNewService where - F: ServiceFactory, - Io: FromStream + Send + 'static, + F: ServiceFactory, + Io: FromConnection + Send + 'static, + C: Send + 'static, { fn name(&self, _: usize) -> &str { &self.name } - fn clone_factory(&self) -> Box { + fn clone_factory(&self) -> Box> { Box::new(Self { name: self.name.clone(), inner: self.inner.clone(), @@ -128,7 +140,7 @@ where }) } - fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> { let token = self.token; let fut = self.inner.create().new_service(()); Box::pin(async move { @@ -143,11 +155,11 @@ where } } -impl ServiceFactory for F +impl ServiceFactory for F where F: Fn() -> T + Send + Clone + 'static, - T: BaseServiceFactory, - I: FromStream, + T: BaseServiceFactory, + Io: FromConnection, { type Factory = T; diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index cd7ccc1a..b0e2e85a 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -12,71 +12,73 @@ pub(crate) use { use std::{fmt, io}; use actix_rt::net::TcpStream; +use mio::net::TcpStream as MioTcpStream; use mio::{event::Source, Interest, Registry, Token}; -pub(crate) enum MioListener { +use crate::accept::Acceptable; + +/// impl Acceptable trait for [mio::net::TcpListener] so it can be managed by server and it's [mio::Poll] instance. +impl Acceptable for MioTcpListener { + type Connection = MioTcpStream; + + fn accept(&mut self) -> io::Result> { + Self::accept(self).map(|stream| Some(stream.0)) + } + + fn register(&mut self, registry: &mio::Registry, token: mio::Token) -> io::Result<()> { + Source::register(self, registry, token, Interest::READABLE) + } + + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + Source::deregister(self, registry) + } +} + +pub enum MioListener { Tcp(MioTcpListener), #[cfg(unix)] Uds(MioUnixListener), } -impl MioListener { - pub(crate) fn local_addr(&self) -> SocketAddr { - match *self { - MioListener::Tcp(ref lst) => lst - .local_addr() - .map(SocketAddr::Tcp) - .unwrap_or(SocketAddr::Unknown), - #[cfg(unix)] - MioListener::Uds(ref lst) => lst - .local_addr() - .map(SocketAddr::Uds) - .unwrap_or(SocketAddr::Unknown), - } - } - - pub(crate) fn accept(&self) -> io::Result { - match *self { - MioListener::Tcp(ref lst) => lst.accept().map(|(stream, _)| MioStream::Tcp(stream)), - #[cfg(unix)] - MioListener::Uds(ref lst) => lst.accept().map(|(stream, _)| MioStream::Uds(stream)), - } +impl From for MioListener { + fn from(lst: StdTcpListener) -> Self { + MioListener::Tcp(MioTcpListener::from_std(lst)) } } -impl Source for MioListener { - fn register( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { +impl Acceptable for MioListener { + type Connection = MioStream; + + fn accept(&mut self) -> io::Result> { match *self { - MioListener::Tcp(ref mut lst) => lst.register(registry, token, interests), + MioListener::Tcp(ref mut lst) => { + MioTcpListener::accept(lst).map(|stream| Some(MioStream::Tcp(stream.0))) + } #[cfg(unix)] - MioListener::Uds(ref mut lst) => lst.register(registry, token, interests), + MioListener::Uds(ref mut lst) => { + MioUnixListener::accept(lst).map(|stream| Some(MioStream::Uds(stream.0))) + } } } - fn reregister( - &mut self, - registry: &Registry, - token: Token, - interests: Interest, - ) -> io::Result<()> { + fn register(&mut self, registry: &Registry, token: Token) -> io::Result<()> { match *self { - MioListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests), + MioListener::Tcp(ref mut lst) => { + Source::register(lst, registry, token, Interest::READABLE) + } #[cfg(unix)] - MioListener::Uds(ref mut lst) => lst.reregister(registry, token, interests), + MioListener::Uds(ref mut lst) => { + Source::register(lst, registry, token, Interest::READABLE) + } } } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { match *self { - MioListener::Tcp(ref mut lst) => lst.deregister(registry), + MioListener::Tcp(ref mut lst) => Source::deregister(lst, registry), #[cfg(unix)] MioListener::Uds(ref mut lst) => { - let res = lst.deregister(registry); + let res = Source::deregister(lst, registry); // cleanup file path if let Ok(addr) = lst.local_addr() { @@ -90,19 +92,6 @@ impl Source for MioListener { } } -impl From for MioListener { - fn from(lst: StdTcpListener) -> Self { - MioListener::Tcp(MioTcpListener::from_std(lst)) - } -} - -#[cfg(unix)] -impl From for MioListener { - fn from(lst: StdUnixListener) -> Self { - MioListener::Uds(MioUnixListener::from_std(lst)) - } -} - impl fmt::Debug for MioListener { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { @@ -113,55 +102,16 @@ impl fmt::Debug for MioListener { } } -impl fmt::Display for MioListener { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - MioListener::Tcp(ref lst) => write!(f, "{:?}", lst), - #[cfg(unix)] - MioListener::Uds(ref lst) => write!(f, "{:?}", lst), - } - } -} - -pub(crate) enum SocketAddr { - Unknown, - Tcp(StdSocketAddr), - #[cfg(unix)] - Uds(mio::net::SocketAddr), -} - -impl fmt::Display for SocketAddr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - Self::Unknown => write!(f, "Unknown SocketAddr"), - Self::Tcp(ref addr) => write!(f, "{}", addr), - #[cfg(unix)] - Self::Uds(ref addr) => write!(f, "{:?}", addr), - } - } -} - -impl fmt::Debug for SocketAddr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - Self::Unknown => write!(f, "Unknown SocketAddr"), - Self::Tcp(ref addr) => write!(f, "{:?}", addr), - #[cfg(unix)] - Self::Uds(ref addr) => write!(f, "{:?}", addr), - } - } -} - #[derive(Debug)] pub enum MioStream { - Tcp(mio::net::TcpStream), + Tcp(MioTcpStream), #[cfg(unix)] Uds(mio::net::UnixStream), } /// helper trait for converting mio stream to tokio stream. -pub trait FromStream: Sized { - fn from_mio(sock: MioStream) -> io::Result; +pub trait FromConnection: Sized { + fn from_conn(conn: C) -> io::Result; } #[cfg(windows)] @@ -171,14 +121,18 @@ mod win_impl { use std::os::windows::io::{FromRawSocket, IntoRawSocket}; // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream - impl FromStream for TcpStream { - fn from_mio(sock: MioStream) -> io::Result { - match sock { - MioStream::Tcp(mio) => { - let raw = IntoRawSocket::into_raw_socket(mio); - // SAFETY: This is a in place conversion from mio stream to tokio stream. - TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) - } + impl FromConnection for TcpStream { + fn from_conn(conn: MioTcpStream) -> io::Result { + let raw = IntoRawSocket::into_raw_socket(mio); + // SAFETY: This is a in place conversion from mio stream to tokio stream. + TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) + } + } + + impl FromConnection for TcpStream { + fn from_conn(stream: MioStream) -> io::Result { + match stream { + MioStream::Tcp(tcp) => FromConnection::from_conn(tcp), } } } @@ -191,71 +145,64 @@ mod unix_impl { use std::os::unix::io::{FromRawFd, IntoRawFd}; use actix_rt::net::UnixStream; + use mio::net::UnixStream as MioUnixStream; - // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream - impl FromStream for TcpStream { - fn from_mio(sock: MioStream) -> io::Result { - match sock { - MioStream::Tcp(mio) => { - let raw = IntoRawFd::into_raw_fd(mio); - // SAFETY: This is a in place conversion from mio stream to tokio stream. - TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) - } - MioStream::Uds(_) => { - panic!("Should not happen, bug in server impl"); - } - } + impl From for MioListener { + fn from(lst: StdUnixListener) -> Self { + MioListener::Uds(MioUnixListener::from_std(lst)) + } + } + + /// impl Acceptable trait for [mio::net::UnixListener] so it can be managed by server and it's [mio::Poll] instance. + impl Acceptable for MioUnixListener { + type Connection = MioUnixStream; + + fn accept(&mut self) -> io::Result> { + Self::accept(self).map(|stream| Some(stream.0)) + } + + fn register(&mut self, registry: &mio::Registry, token: mio::Token) -> io::Result<()> { + Source::register(self, registry, token, Interest::READABLE) + } + + fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> { + Source::deregister(self, registry) } } // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream - impl FromStream for UnixStream { - fn from_mio(sock: MioStream) -> io::Result { - match sock { - MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), - MioStream::Uds(mio) => { - let raw = IntoRawFd::into_raw_fd(mio); - // SAFETY: This is a in place conversion from mio stream to tokio stream. - UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) - } + impl FromConnection for TcpStream { + fn from_conn(conn: MioTcpStream) -> io::Result { + let raw = IntoRawFd::into_raw_fd(conn); + // SAFETY: This is a in place conversion from mio stream to tokio stream. + TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) + } + } + + // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream + impl FromConnection for UnixStream { + fn from_conn(conn: MioUnixStream) -> io::Result { + let raw = IntoRawFd::into_raw_fd(conn); + // SAFETY: This is a in place conversion from mio stream to tokio stream. + UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) + } + } + + impl FromConnection for TcpStream { + fn from_conn(stream: MioStream) -> io::Result { + match stream { + MioStream::Tcp(tcp) => FromConnection::from_conn(tcp), + MioStream::Uds(_) => unreachable!("UnixStream can not convert to TcpStream"), + } + } + } + + impl FromConnection for UnixStream { + fn from_conn(stream: MioStream) -> io::Result { + match stream { + MioStream::Tcp(_) => unreachable!("TcpStream can not convert to UnixStream"), + MioStream::Uds(uds) => FromConnection::from_conn(uds), } } } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn socket_addr() { - let addr = SocketAddr::Tcp("127.0.0.1:8080".parse().unwrap()); - assert!(format!("{:?}", addr).contains("127.0.0.1:8080")); - assert_eq!(format!("{}", addr), "127.0.0.1:8080"); - - let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = MioTcpSocket::new_v4().unwrap(); - socket.set_reuseaddr(true).unwrap(); - socket.bind(addr).unwrap(); - let tcp = socket.listen(128).unwrap(); - let lst = MioListener::Tcp(tcp); - assert!(format!("{:?}", lst).contains("TcpListener")); - assert!(format!("{}", lst).contains("127.0.0.1")); - } - - #[test] - #[cfg(unix)] - fn uds() { - let _ = std::fs::remove_file("/tmp/sock.xxxxx"); - if let Ok(socket) = MioUnixListener::bind("/tmp/sock.xxxxx") { - let addr = socket.local_addr().expect("Couldn't get local address"); - let a = SocketAddr::Uds(addr); - assert!(format!("{:?}", a).contains("/tmp/sock.xxxxx")); - assert!(format!("{}", a).contains("/tmp/sock.xxxxx")); - - let lst = MioListener::Uds(socket); - assert!(format!("{:?}", lst).contains("/tmp/sock.xxxxx")); - assert!(format!("{}", lst).contains("/tmp/sock.xxxxx")); - } - } -} diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 3f8669d4..3b88a61f 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -6,6 +6,7 @@ use std::{ use mio::{Registry, Token as MioToken, Waker}; +use crate::accept::AcceptorStop; use crate::worker::WorkerHandleAccept; /// Waker token for `mio::Poll` instance. @@ -13,23 +14,23 @@ pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); /// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` /// the `Poll` would want to look into. -pub(crate) struct WakerQueue(Arc<(Waker, Mutex>)>); +pub(crate) struct WakerQueue(Arc<(Waker, Mutex>>)>); -impl Clone for WakerQueue { +impl Clone for WakerQueue { fn clone(&self) -> Self { Self(self.0.clone()) } } -impl Deref for WakerQueue { - type Target = (Waker, Mutex>); +impl Deref for WakerQueue { + type Target = (Waker, Mutex>>); fn deref(&self) -> &Self::Target { self.0.deref() } } -impl WakerQueue { +impl WakerQueue { /// Construct a waker queue with given `Poll`'s `Registry` and capacity. /// /// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match @@ -42,7 +43,7 @@ impl WakerQueue { } /// Push a new interest to the queue and wake up the accept poll afterwards. - pub(crate) fn wake(&self, interest: WakerInterest) { + pub(crate) fn wake(&self, interest: WakerInterest) { let (waker, queue) = self.deref(); queue @@ -56,20 +57,20 @@ impl WakerQueue { } /// Get a MutexGuard of the waker queue. - pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque> { + pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque>> { self.deref().1.lock().expect("Failed to lock WakerQueue") } /// Reset the waker queue so it does not grow infinitely. - pub(crate) fn reset(queue: &mut VecDeque) { - std::mem::swap(&mut VecDeque::::with_capacity(16), queue); + pub(crate) fn reset(queue: &mut VecDeque>) { + std::mem::swap(&mut VecDeque::>::with_capacity(16), queue); } } /// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker. /// /// These interests should not be confused with `mio::Interest` and mostly not I/O related -pub(crate) enum WakerInterest { +pub(crate) enum WakerInterest { /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker /// available and can accept new tasks. WorkerAvailable(usize), @@ -77,7 +78,7 @@ pub(crate) enum WakerInterest { /// `ServerCommand` and notify `Accept` to do exactly these tasks. Pause, Resume, - Stop, + Stop(AcceptorStop), /// `Timer` is an interest sent as a delayed future. When an error happens on accepting /// connection `Accept` would deregister socket listener temporary and wake up the poll and /// register them again after the delayed future resolve. @@ -85,5 +86,5 @@ pub(crate) enum WakerInterest { /// `Worker` is an interest happen after a worker runs into faulted state(This is determined /// by if work can be sent to it successfully).`Accept` would be waked up and add the new /// `WorkerHandleAccept`. - Worker(WorkerHandleAccept), + Worker(WorkerHandleAccept), } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs deleted file mode 100644 index 17535c05..00000000 --- a/actix-server/src/worker.rs +++ /dev/null @@ -1,645 +0,0 @@ -use std::{ - future::Future, - mem, - pin::Pin, - rc::Rc, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - task::{Context, Poll}, - time::Duration, -}; - -use actix_rt::{ - spawn, - time::{sleep, Instant, Sleep}, - Arbiter, -}; -use futures_core::{future::LocalBoxFuture, ready}; -use log::{error, info, trace}; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - oneshot, -}; - -use crate::join_all; -use crate::service::{BoxedServerService, InternalServiceFactory}; -use crate::socket::MioStream; -use crate::waker_queue::{WakerInterest, WakerQueue}; - -/// Stop worker message. Returns `true` on successful graceful shutdown. -/// and `false` if some connections still alive when shutdown execute. -pub(crate) struct Stop { - graceful: bool, - tx: oneshot::Sender, -} - -#[derive(Debug)] -pub(crate) struct Conn { - pub io: MioStream, - pub token: usize, -} - -fn handle_pair( - idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, - counter: Counter, -) -> (WorkerHandleAccept, WorkerHandleServer) { - let accept = WorkerHandleAccept { - idx, - tx: tx1, - counter, - }; - - let server = WorkerHandleServer { idx, tx: tx2 }; - - (accept, server) -} - -/// counter: Arc field is owned by `Accept` thread and `ServerWorker` thread. -/// -/// `Accept` would increment the counter and `ServerWorker` would decrement it. -/// -/// # Atomic Ordering: -/// -/// `Accept` always look into it's cached `Availability` field for `ServerWorker` state. -/// It lazily increment counter after successful dispatching new work to `ServerWorker`. -/// On reaching counter limit `Accept` update it's cached `Availability` and mark worker as -/// unable to accept any work. -/// -/// `ServerWorker` always decrement the counter when every work received from `Accept` is done. -/// On reaching counter limit worker would use `mio::Waker` and `WakerQueue` to wake up `Accept` -/// and notify it to update cached `Availability` again to mark worker as able to accept work again. -/// -/// Hense a wake up would only happen after `Accept` increment it to limit. -/// And a decrement to limit always wake up `Accept`. -#[derive(Clone)] -pub(crate) struct Counter { - counter: Arc, - limit: usize, -} - -impl Counter { - pub(crate) fn new(limit: usize) -> Self { - Self { - counter: Arc::new(AtomicUsize::new(1)), - limit, - } - } - - /// Increment counter by 1 and return true when hitting limit - #[inline(always)] - pub(crate) fn inc(&self) -> bool { - self.counter.fetch_add(1, Ordering::Relaxed) != self.limit - } - - /// Decrement counter by 1 and return true if crossing limit. - #[inline(always)] - pub(crate) fn dec(&self) -> bool { - self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit - } - - pub(crate) fn total(&self) -> usize { - self.counter.load(Ordering::SeqCst) - 1 - } -} - -pub(crate) struct WorkerCounter { - idx: usize, - inner: Rc<(WakerQueue, Counter)>, -} - -impl Clone for WorkerCounter { - fn clone(&self) -> Self { - Self { - idx: self.idx, - inner: self.inner.clone(), - } - } -} - -impl WorkerCounter { - pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self { - Self { - idx, - inner: Rc::new((waker_queue, counter)), - } - } - - #[inline(always)] - pub(crate) fn guard(&self) -> WorkerCounterGuard { - WorkerCounterGuard(self.clone()) - } - - fn total(&self) -> usize { - self.inner.1.total() - } -} - -pub(crate) struct WorkerCounterGuard(WorkerCounter); - -impl Drop for WorkerCounterGuard { - fn drop(&mut self) { - let (waker_queue, counter) = &*self.0.inner; - if counter.dec() { - waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx)); - } - } -} - -/// Handle to worker that can send connection message to worker and share the -/// availability of worker to other thread. -/// -/// Held by [Accept](crate::accept::Accept). -pub(crate) struct WorkerHandleAccept { - idx: usize, - tx: UnboundedSender, - counter: Counter, -} - -impl WorkerHandleAccept { - #[inline(always)] - pub(crate) fn idx(&self) -> usize { - self.idx - } - - #[inline(always)] - pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { - self.tx.send(msg).map_err(|msg| msg.0) - } - - #[inline(always)] - pub(crate) fn inc_counter(&self) -> bool { - self.counter.inc() - } -} - -/// Handle to worker than can send stop message to worker. -/// -/// Held by [ServerBuilder](crate::builder::ServerBuilder). -pub(crate) struct WorkerHandleServer { - pub idx: usize, - tx: UnboundedSender, -} - -impl WorkerHandleServer { - pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - let _ = self.tx.send(Stop { graceful, tx }); - rx - } -} - -/// Server worker. -/// -/// Worker accepts Socket objects via unbounded channel and starts stream processing. -pub(crate) struct ServerWorker { - worker: Worker, - state: WorkerState, -} - -impl ServerWorker { - fn new( - rx: UnboundedReceiver, - rx2: UnboundedReceiver, - counter: WorkerCounter, - services: Box<[WorkerService]>, - factories: Box<[Box]>, - shutdown_timeout: Duration, - ) -> Self { - Self { - worker: Worker { - rx, - rx2, - counter, - services, - factories, - shutdown_timeout, - }, - state: WorkerState::default(), - } - } - - pub(crate) fn start( - idx: usize, - factories: Vec>, - waker_queue: WakerQueue, - config: ServerWorkerConfig, - ) -> (WorkerHandleAccept, WorkerHandleServer) { - let (tx1, rx) = unbounded_channel(); - let (tx2, rx2) = unbounded_channel(); - - let counter = Counter::new(config.max_concurrent_connections); - - let counter_clone = counter.clone(); - // every worker runs in it's own arbiter. - // use a custom tokio runtime builder to change the settings of runtime. - Arbiter::with_tokio_rt(move || { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap() - }) - .spawn(async move { - let fut = factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { fut.await.map(|(t, s)| (idx, t, s)) } - }) - .collect::>(); - - // a second spawn to run !Send future tasks. - spawn(async move { - let res = join_all(fut) - .await - .into_iter() - .collect::, _>>(); - let services = match res { - Ok(res) => res - .into_iter() - .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token, services.len()); - services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - services - }) - .into_boxed_slice(), - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - return; - } - }; - - let counter = WorkerCounter::new(idx, waker_queue, counter_clone); - - // a third spawn to make sure ServerWorker runs as non boxed future. - spawn(ServerWorker::new( - rx, - rx2, - counter, - services, - factories.into_boxed_slice(), - config.shutdown_timeout, - )); - }); - }); - - handle_pair(idx, tx1, tx2, counter) - } -} - -struct Worker { - // UnboundedReceiver should always be the first field. - // It must be dropped as soon as ServerWorker dropping. - rx: UnboundedReceiver, - rx2: UnboundedReceiver, - counter: WorkerCounter, - services: Box<[WorkerService]>, - factories: Box<[Box]>, - shutdown_timeout: Duration, -} - -impl Worker { - /// `Conn` message and worker/service state switch handler - fn poll_running(&mut self, running: &mut Running, cx: &mut Context<'_>) -> Poll<()> { - // loop only exit on `Conn` channel shutdown or any poll method returns Pending state. - loop { - match *running { - Running::Unavailable => match self.poll_ready(cx) { - Ok(true) => { - *running = Running::Available; - } - Ok(false) => return Poll::Pending, - Err((token, idx)) => { - let restart = self.restart_service(token, idx); - *running = Running::Restart(restart); - } - }, - Running::Restart(Restart { - factory_id, - token, - ref mut fut, - }) => { - let (token_new, service) = - ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { - // Restart failure would result in a panic drop of ServerWorker. - // This would prevent busy loop of poll_running. - panic!( - "Can not restart {:?} service", - self.factories[factory_id].name(token) - ) - }); - - assert_eq!(token, token_new); - - trace!( - "Service {:?} has been restarted", - self.factories[factory_id].name(token) - ); - - self.services[token].created(service); - *running = Running::Unavailable; - } - // actively poll Conn channel and handle MioStream. - Running::Available => loop { - match self.poll_ready(cx) { - Ok(true) => {} - Ok(false) => { - trace!("Worker is unavailable"); - *running = Running::Unavailable; - } - Err((token, idx)) => { - let restart = self.restart_service(token, idx); - *running = Running::Restart(restart); - } - } - - match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { - Some(msg) => { - let guard = self.counter.guard(); - let _ = self.services[msg.token].service.call((guard, msg.io)); - } - None => return Poll::Ready(()), - }; - }, - } - } - } - - /// `Stop` message handler. - /// - /// Return Ready when worker should shutdown. - fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll> { - match ready!(Pin::new(&mut self.rx2).poll_recv(cx)) { - Some(Stop { graceful, tx }) => { - self.rx2.close(); - - let num = self.counter.total(); - if num == 0 { - info!("Shutting down worker, 0 connections"); - let _ = tx.send(true); - Poll::Ready(None) - } else if graceful { - info!("Graceful worker shutdown, {} connections", num); - self.shutdown(false); - - let shutdown = DelayShutdown { - timer: Box::pin(sleep(Duration::from_secs(1))), - start_from: Instant::now(), - tx, - }; - - Poll::Ready(Some(shutdown)) - } else { - info!("Force shutdown worker, {} connections", num); - self.shutdown(true); - - let _ = tx.send(false); - Poll::Ready(None) - } - } - None => Poll::Pending, - } - } - - /// Check readiness of services. - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Result { - let mut ready = true; - for (idx, srv) in self.services.iter_mut().enumerate() { - if srv.status == WorkerServiceStatus::Available - || srv.status == WorkerServiceStatus::Unavailable - { - match srv.service.poll_ready(cx) { - Poll::Ready(Ok(_)) => { - if srv.status == WorkerServiceStatus::Unavailable { - trace!( - "Service {:?} is available", - self.factories[srv.factory].name(idx) - ); - srv.status = WorkerServiceStatus::Available; - } - } - Poll::Pending => { - ready = false; - - if srv.status == WorkerServiceStatus::Available { - trace!( - "Service {:?} is unavailable", - self.factories[srv.factory].name(idx) - ); - srv.status = WorkerServiceStatus::Unavailable; - } - } - Poll::Ready(Err(_)) => { - error!( - "Service {:?} readiness check returned error, restarting", - self.factories[srv.factory].name(idx) - ); - srv.status = WorkerServiceStatus::Failed; - return Err((idx, srv.factory)); - } - } - } - } - - Ok(ready) - } - - /// Delay shutdown and drain all unhandled `Conn`. - fn poll_shutdown( - &mut self, - delay: &mut DelayShutdown, - running: &mut Running, - cx: &mut Context<'_>, - ) -> Poll { - if self.counter.total() == 0 { - // Graceful shutdown. - Poll::Ready(true) - } else if delay.start_from.elapsed() >= self.shutdown_timeout { - // Timeout forceful shutdown. - Poll::Ready(false) - } else { - // Poll Running state and try to drain all `Conn` from channel. - let _ = self.poll_running(running, cx); - - // Wait for 1 second. - ready!(delay.timer.as_mut().poll(cx)); - - // Reset timer and try again. - let time = Instant::now() + Duration::from_secs(1); - delay.timer.as_mut().reset(time); - - delay.timer.as_mut().poll(cx).map(|_| false) - } - } - - fn restart_service(&mut self, idx: usize, factory_id: usize) -> Restart { - let factory = &self.factories[factory_id]; - trace!("Service {:?} failed, restarting", factory.name(idx)); - self.services[idx].status = WorkerServiceStatus::Restarting; - Restart { - factory_id, - token: idx, - fut: factory.create(), - } - } - - fn shutdown(&mut self, force: bool) { - self.services - .iter_mut() - .filter(|srv| srv.status == WorkerServiceStatus::Available) - .for_each(|srv| { - srv.status = if force { - WorkerServiceStatus::Stopped - } else { - WorkerServiceStatus::Stopping - }; - }); - } -} - -struct WorkerService { - factory: usize, - status: WorkerServiceStatus, - service: BoxedServerService, -} - -impl WorkerService { - fn created(&mut self, service: BoxedServerService) { - self.service = service; - self.status = WorkerServiceStatus::Unavailable; - } -} - -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -enum WorkerServiceStatus { - Available, - Unavailable, - Failed, - Restarting, - Stopping, - Stopped, -} - -/// Config for worker behavior passed down from server builder. -#[derive(Copy, Clone)] -pub(crate) struct ServerWorkerConfig { - shutdown_timeout: Duration, - max_blocking_threads: usize, - max_concurrent_connections: usize, -} - -impl Default for ServerWorkerConfig { - fn default() -> Self { - // 512 is the default max blocking thread count of tokio runtime. - let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1); - Self { - shutdown_timeout: Duration::from_secs(30), - max_blocking_threads, - max_concurrent_connections: 25600, - } - } -} - -impl ServerWorkerConfig { - pub(crate) fn max_blocking_threads(&mut self, num: usize) { - self.max_blocking_threads = num; - } - - pub(crate) fn max_concurrent_connections(&mut self, num: usize) { - self.max_concurrent_connections = num; - } - - pub(crate) fn shutdown_timeout(&mut self, dur: Duration) { - self.shutdown_timeout = dur; - } -} - -enum WorkerState { - Running(Running), - DelayShutdown(DelayShutdown, Running), -} - -impl Default for WorkerState { - fn default() -> Self { - Self::Running(Running::Unavailable) - } -} - -enum Running { - Available, - Unavailable, - Restart(Restart), -} - -struct Restart { - factory_id: usize, - token: usize, - fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>, -} - -// Keep states necessary for delayed server shutdown: -// Sleep for interval check the shutdown progress. -// Instant for the start time of shutdown. -// Sender for send back the shutdown outcome(force/grace) to `Stop` caller. -struct DelayShutdown { - timer: Pin>, - start_from: Instant, - tx: oneshot::Sender, -} - -impl Drop for ServerWorker { - fn drop(&mut self) { - // Stop the Arbiter ServerWorker runs on on drop. - Arbiter::current().stop(); - } -} - -impl Future for ServerWorker { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().get_mut(); - - // poll Stop message first. - match this.worker.poll_stop(cx) { - Poll::Pending => {} - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(delay)) => { - // Take running state and pass it to DelayShutdown. - // During shutdown there could be unhandled `Conn` message left in channel. - // They should be drainned and worker would try to handle them all until - // delay shutdown timeout met. - this.state = match mem::take(&mut this.state) { - WorkerState::Running(running) => WorkerState::DelayShutdown(delay, running), - _ => unreachable!( - "Duplicate worker::Stop message sent to one worker::ServerWorker." - ), - }; - } - } - - match this.state { - WorkerState::Running(ref mut running) => this.worker.poll_running(running, cx), - WorkerState::DelayShutdown(ref mut delay, ref mut running) => { - let is_graceful = ready!(this.worker.poll_shutdown(delay, running, cx)); - - // Report back shutdown outcome to caller. - if let WorkerState::DelayShutdown(delay, _) = mem::take(&mut this.state) { - let _ = delay.tx.send(is_graceful); - } - - Poll::Ready(()) - } - } - } -} diff --git a/actix-server/src/worker/config.rs b/actix-server/src/worker/config.rs new file mode 100644 index 00000000..afefab69 --- /dev/null +++ b/actix-server/src/worker/config.rs @@ -0,0 +1,35 @@ +use std::time::Duration; + +/// Config for worker behavior passed down from server builder. +#[derive(Copy, Clone)] +pub(crate) struct ServerWorkerConfig { + pub(super) shutdown_timeout: Duration, + pub(super) max_blocking_threads: usize, + pub(super) max_concurrent_connections: usize, +} + +impl Default for ServerWorkerConfig { + fn default() -> Self { + // 512 is the default max blocking thread count of tokio runtime. + let max_blocking_threads = std::cmp::max(512 / num_cpus::get(), 1); + Self { + shutdown_timeout: Duration::from_secs(30), + max_blocking_threads, + max_concurrent_connections: 25600, + } + } +} + +impl ServerWorkerConfig { + pub(crate) fn max_blocking_threads(&mut self, num: usize) { + self.max_blocking_threads = num; + } + + pub(crate) fn max_concurrent_connections(&mut self, num: usize) { + self.max_concurrent_connections = num; + } + + pub(crate) fn shutdown_timeout(&mut self, dur: Duration) { + self.shutdown_timeout = dur; + } +} diff --git a/actix-server/src/worker/counter.rs b/actix-server/src/worker/counter.rs new file mode 100644 index 00000000..ed2ff1dc --- /dev/null +++ b/actix-server/src/worker/counter.rs @@ -0,0 +1,106 @@ +use std::rc::Rc; + +#[cfg(_loom)] +use loom::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +#[cfg(not(_loom))] +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +use crate::waker_queue::{WakerInterest, WakerQueue}; + +/// counter: Arc field is owned by `Accept` thread and `ServerWorker` thread. +/// +/// `Accept` would increment the counter and `ServerWorker` would decrement it. +/// +/// # Atomic Ordering: +/// +/// `Accept` always look into it's cached `Availability` field for `ServerWorker` state. +/// It lazily increment counter after successful dispatching new work to `ServerWorker`. +/// On reaching counter limit `Accept` update it's cached `Availability` and mark worker as +/// unable to accept any work. +/// +/// `ServerWorker` always decrement the counter when every work received from `Accept` is done. +/// On reaching counter limit worker would use `mio::Waker` and `WakerQueue` to wake up `Accept` +/// and notify it to update cached `Availability` again to mark worker as able to accept work again. +/// +/// Hense a wake up would only happen after `Accept` increment it to limit. +/// And a decrement to limit always wake up `Accept`. +#[derive(Clone)] +pub(crate) struct Counter { + counter: Arc, + limit: usize, +} + +impl Counter { + pub(crate) fn new(limit: usize) -> Self { + Self { + counter: Arc::new(AtomicUsize::new(1)), + limit, + } + } + + /// Increment counter by 1 and return true when hitting limit + #[inline(always)] + pub(crate) fn inc(&self) -> bool { + self.counter.fetch_add(1, Ordering::Relaxed) != self.limit + } + + /// Decrement counter by 1 and return true if crossing limit. + #[inline(always)] + pub(crate) fn dec(&self) -> bool { + self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit + } + + pub(crate) fn total(&self) -> usize { + self.counter.load(Ordering::SeqCst) - 1 + } +} + +pub(super) struct WorkerCounter { + idx: usize, + inner: Rc<(WakerQueue, Counter)>, +} + +impl Clone for WorkerCounter { + fn clone(&self) -> Self { + Self { + idx: self.idx, + inner: self.inner.clone(), + } + } +} + +impl WorkerCounter { + pub(super) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self { + Self { + idx, + inner: Rc::new((waker_queue, counter)), + } + } + + #[inline(always)] + pub(super) fn guard(&self) -> WorkerCounterGuard { + WorkerCounterGuard(self.clone()) + } + + pub(super) fn total(&self) -> usize { + self.inner.1.total() + } +} + +pub(crate) struct WorkerCounterGuard(WorkerCounter); + +impl Drop for WorkerCounterGuard { + fn drop(&mut self) { + let (waker_queue, counter) = &*self.0.inner; + if counter.dec() { + waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx)); + } + } +} diff --git a/actix-server/src/worker/mod.rs b/actix-server/src/worker/mod.rs new file mode 100644 index 00000000..ab308bd8 --- /dev/null +++ b/actix-server/src/worker/mod.rs @@ -0,0 +1,495 @@ +mod config; +mod counter; + +pub(crate) use config::ServerWorkerConfig; +pub(crate) use counter::{Counter, WorkerCounterGuard}; + +use std::{ + future::Future, + mem, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use actix_rt::{ + spawn, + time::{sleep, Instant, Sleep}, + Arbiter, +}; +use futures_core::{future::LocalBoxFuture, ready}; +use log::{error, info, trace}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, + oneshot, +}; + +use counter::WorkerCounter; + +use crate::join_all; +use crate::service::{BoxedServerService, InternalServiceFactory}; +use crate::waker_queue::WakerQueue; + +pub(crate) enum WorkerMessage { + Conn(Conn), + Stop(Stop), +} + +/// Connection message. token field is used to find the corresponding Service from +/// Worker that can handle C type correctly. +#[derive(Debug)] +pub(crate) struct Conn { + pub token: usize, + pub io: C, +} + +/// Stop worker message. Returns `true` on successful graceful shutdown. +/// and `false` if some connections still alive when shutdown execute. +pub(crate) enum Stop { + Graceful(oneshot::Sender), + Force, +} + +// TOD: Remove default MioStream type. +/// Handle to worker that can send message to worker. +pub(crate) struct WorkerHandleAccept { + idx: usize, + tx: UnboundedSender>, + counter: Counter, +} + +impl WorkerHandleAccept { + #[inline(always)] + pub(crate) fn idx(&self) -> usize { + self.idx + } + + #[inline(always)] + pub(crate) fn inc_counter(&self) -> bool { + self.counter.inc() + } + + #[inline(always)] + pub(crate) fn send(&self, conn: Conn) -> Result<(), Conn> { + self.tx + .send(WorkerMessage::Conn(conn)) + .map_err(|msg| match msg.0 { + WorkerMessage::Conn(conn) => conn, + _ => unreachable!(), + }) + } + + pub(crate) fn stop(&self, graceful: bool) -> Option> { + let (stop, rx) = if graceful { + let (tx, rx) = oneshot::channel(); + + (Stop::Graceful(tx), Some(rx)) + } else { + (Stop::Force, None) + }; + + let _ = self.tx.send(WorkerMessage::Stop(stop)); + + rx + } +} + +/// Server worker. +/// +/// Worker accepts Socket objects via unbounded channel and starts stream processing. +pub(crate) struct Worker { + worker: WorkerInner, + state: WorkerState, +} + +impl Worker +where + C: Send + 'static, +{ + fn new( + rx: UnboundedReceiver>, + counter: WorkerCounter, + services: Box<[WorkerService]>, + factories: Box<[Box>]>, + shutdown_timeout: Duration, + ) -> Self { + Self { + worker: WorkerInner { + rx, + counter, + services, + factories, + shutdown_timeout, + }, + state: WorkerState::default(), + } + } + + pub(crate) fn start( + idx: usize, + factories: Box<[Box>]>, + waker_queue: WakerQueue, + config: ServerWorkerConfig, + ) -> WorkerHandleAccept { + let (tx, rx) = unbounded_channel(); + + let counter = Counter::new(config.max_concurrent_connections); + + let counter_clone = counter.clone(); + // every worker runs in it's own arbiter. + // use a custom tokio runtime builder to change the settings of runtime. + Arbiter::with_tokio_rt(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap() + }) + .spawn(async move { + let fut = factories + .iter() + .enumerate() + .map(|(idx, factory)| { + let fut = factory.create(); + async move { fut.await.map(|(t, s)| (idx, t, s)) } + }) + .collect::>(); + + // a second spawn to run !Send future tasks. + spawn(async move { + let res = join_all(fut) + .await + .into_iter() + .collect::, _>>(); + let services = match res { + Ok(res) => res + .into_iter() + .fold(Vec::new(), |mut services, (factory, token, service)| { + assert_eq!(token, services.len()); + services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); + services + }) + .into_boxed_slice(), + Err(e) => { + error!("Can not start worker: {:?}", e); + Arbiter::current().stop(); + return; + } + }; + + let counter = WorkerCounter::new(idx, waker_queue, counter_clone); + + // a third spawn to make sure ServerWorker runs as non boxed future. + spawn(Worker::new( + rx, + counter, + services, + factories, + config.shutdown_timeout, + )); + }); + }); + + WorkerHandleAccept { idx, tx, counter } + } +} + +struct WorkerInner { + // UnboundedReceiver should always be the first field. + // It must be dropped as soon as ServerWorker dropping. + rx: UnboundedReceiver>, + counter: WorkerCounter, + services: Box<[WorkerService]>, + factories: Box<[Box>]>, + shutdown_timeout: Duration, +} + +impl WorkerInner { + /// `Conn` message and worker/service state switch handler + fn poll_running(&mut self, running: &mut Running, cx: &mut Context<'_>) -> Poll { + match *running { + // Actively poll Conn channel and handle MioStream. + Running::Available => loop { + match self.poll_ready(cx) { + Ok(true) => match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { + Some(WorkerMessage::Conn(conn)) => { + let guard = self.counter.guard(); + let _ = self.services[conn.token].service.call((guard, conn.io)); + } + Some(WorkerMessage::Stop(stop)) => return Poll::Ready(stop), + None => return Poll::Ready(Stop::Force), + }, + Ok(false) => { + trace!("Worker is unavailable"); + return Poll::Pending; + } + Err((token, idx)) => { + let restart = self.restart_service(token, idx); + *running = Running::Restart(restart); + + return self.poll_running(running, cx); + } + } + }, + Running::Restart(Restart { + factory_id, + token, + ref mut fut, + }) => { + let name = self.factories[factory_id].name(token); + + let (token_new, service) = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| { + // Restart failure would result in a panic drop of ServerWorker. + // This would prevent busy loop of poll_running. + panic!("Can not restart {:?} service", name) + }); + + assert_eq!(token, token_new); + + trace!("Service {:?} has been restarted", name); + + self.services[token].created(service); + *running = Running::Available; + + self.poll_running(running, cx) + } + } + } + + /// Delay shutdown and drain all unhandled `Conn`. + fn poll_shutdown( + &mut self, + delay: &mut DelayShutdown, + running: &mut Running, + cx: &mut Context<'_>, + ) -> Poll { + let num = self.counter.total(); + if num == 0 { + // Graceful shutdown. + info!("Graceful worker shutdown, 0 connections unhandled"); + Poll::Ready(true) + } else if delay.start_from.elapsed() >= self.shutdown_timeout { + // Timeout forceful shutdown. + info!( + "Graceful worker shutdown timeout, {} connections unhandled", + num + ); + Poll::Ready(false) + } else { + // Poll Running state and try to drain all `Conn` from channel. + let _ = self.poll_running(running, cx); + + // Wait for 1 second. + ready!(delay.timer.as_mut().poll(cx)); + + // Reset timer and try again. + let time = Instant::now() + Duration::from_secs(1); + delay.timer.as_mut().reset(time); + + delay.timer.as_mut().poll(cx).map(|_| false) + } + } + + /// Check readiness of services. + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Result { + let mut ready = true; + for (idx, srv) in self.services.iter_mut().enumerate() { + if srv.status == WorkerServiceStatus::Available + || srv.status == WorkerServiceStatus::Unavailable + { + match srv.service.poll_ready(cx) { + Poll::Ready(Ok(_)) => { + if srv.status == WorkerServiceStatus::Unavailable { + trace!( + "Service {:?} is available", + self.factories[srv.factory].name(idx) + ); + srv.status = WorkerServiceStatus::Available; + } + } + Poll::Pending => { + ready = false; + + if srv.status == WorkerServiceStatus::Available { + trace!( + "Service {:?} is unavailable", + self.factories[srv.factory].name(idx) + ); + srv.status = WorkerServiceStatus::Unavailable; + } + } + Poll::Ready(Err(_)) => { + error!( + "Service {:?} readiness check returned error, restarting", + self.factories[srv.factory].name(idx) + ); + srv.status = WorkerServiceStatus::Failed; + return Err((idx, srv.factory)); + } + } + } + } + + Ok(ready) + } + + /// `Stop` message handler. + /// + /// Return Some when worker should enter delay shutdown. + /// Return None when worker is ready to shutdown in place. + fn try_shutdown(&mut self, stop: Stop) -> Option { + let num = self.counter.total(); + + match stop { + Stop::Graceful(tx) => { + self.shutdown_service(false); + + let shutdown = DelayShutdown { + timer: Box::pin(sleep(Duration::from_secs(1))), + start_from: Instant::now(), + tx, + }; + + Some(shutdown) + } + Stop::Force => { + info!("Force worker shutdown, {} connections unhandled", num); + + self.shutdown_service(true); + + None + } + } + } + + fn restart_service(&mut self, idx: usize, factory_id: usize) -> Restart { + let factory = &self.factories[factory_id]; + trace!("Service {:?} failed, restarting", factory.name(idx)); + self.services[idx].status = WorkerServiceStatus::Restarting; + Restart { + factory_id, + token: idx, + fut: factory.create(), + } + } + + fn shutdown_service(&mut self, force: bool) { + self.services + .iter_mut() + .filter(|srv| srv.status == WorkerServiceStatus::Available) + .for_each(|srv| { + srv.status = if force { + WorkerServiceStatus::Stopped + } else { + WorkerServiceStatus::Stopping + }; + }); + } +} + +struct WorkerService { + factory: usize, + status: WorkerServiceStatus, + service: BoxedServerService, +} + +impl WorkerService { + fn created(&mut self, service: BoxedServerService) { + self.service = service; + self.status = WorkerServiceStatus::Unavailable; + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +enum WorkerServiceStatus { + Available, + Unavailable, + Failed, + Restarting, + Stopping, + Stopped, +} + +enum WorkerState { + Running(Running), + DelayShutdown(DelayShutdown, Running), +} + +impl Default for WorkerState { + fn default() -> Self { + Self::Running(Running::Available) + } +} + +enum Running { + Available, + Restart(Restart), +} + +struct Restart { + factory_id: usize, + token: usize, + fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>, +} + +// Keep states necessary for delayed server shutdown: +// Sleep for interval check the shutdown progress. +// Instant for the start time of shutdown. +// Sender for send back the shutdown outcome(force/grace) to `Stop` caller. +struct DelayShutdown { + timer: Pin>, + start_from: Instant, + tx: oneshot::Sender, +} + +impl Drop for Worker { + fn drop(&mut self) { + // Stop the Arbiter ServerWorker runs on on drop. + Arbiter::current().stop(); + } +} + +impl Future for Worker { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.as_mut().get_mut(); + + match this.state { + WorkerState::Running(ref mut running) => { + let stop = ready!(this.worker.poll_running(running, cx)); + match this.worker.try_shutdown(stop) { + Some(delay) => { + // Take running state and pass it to DelayShutdown. + // During shutdown there could be unhandled `Conn` message left in channel. + // They should be drained and worker would try to handle them all until + // delay shutdown timeout met. + this.state = match mem::take(&mut this.state) { + WorkerState::Running(running) => { + WorkerState::DelayShutdown(delay, running) + } + _ => unreachable!("ServerWorker enter DelayShutdown already"), + }; + + self.poll(cx) + } + None => Poll::Ready(()), + } + } + WorkerState::DelayShutdown(ref mut delay, ref mut running) => { + let is_graceful = ready!(this.worker.poll_shutdown(delay, running, cx)); + + // Report back shutdown outcome to caller. + if let WorkerState::DelayShutdown(delay, _) = mem::take(&mut this.state) { + let _ = delay.tx.send(is_graceful); + } + + Poll::Ready(()) + } + } + } +}