diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 46a0ad1d..a4c403f1 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -38,4 +38,4 @@ actix-rt = "2.0.0" bytes = "1" env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } -tokio = { version = "1.5.1", features = ["io-util"] } +tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 3529fab8..01f58479 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -94,6 +94,7 @@ async fn main() -> io::Result<()> { Ok(()) } +// alternatively: // #[actix_rt::main] // async fn main() -> io::Result<()> { // run().await?; diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 1f2ecafa..3bca4305 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,21 +1,20 @@ -use std::time::Duration; -use std::{io, thread}; +use std::{io, thread, time::Duration}; use actix_rt::time::Instant; -use actix_rt::{time::sleep, System}; use log::{debug, error, info}; use mio::{Interest, Poll, Token as MioToken}; -use crate::worker::ServerWorker; use crate::{ availability::Availability, server::ServerHandle, socket::MioListener, waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}, - worker::{Conn, WorkerHandleAccept, WorkerHandleServer}, + worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer}, ServerBuilder, }; +const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510); + struct ServerSocketInfo { token: usize, @@ -34,6 +33,8 @@ pub(crate) struct Accept { srv: ServerHandle, next: usize, avail: Availability, + /// use the smallest duration from sockets timeout. + timeout: Option, paused: bool, } @@ -115,6 +116,7 @@ impl Accept { srv: server_handle, next: 0, avail, + timeout: None, paused: false, }; @@ -149,6 +151,9 @@ impl Accept { } } } + + // check for timeout and re-register sockets + self.process_timeout(sockets); } } @@ -171,6 +176,7 @@ impl Accept { self.accept_all(sockets); } } + // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { drop(guard); @@ -182,12 +188,7 @@ impl Accept { self.accept_all(sockets); } } - // got timer interest and it's time to try register socket(s) again - Some(WakerInterest::Timer) => { - drop(guard); - self.process_timer(sockets) - } Some(WakerInterest::Pause) => { drop(guard); @@ -197,6 +198,7 @@ impl Accept { self.deregister_all(sockets); } } + Some(WakerInterest::Resume) => { drop(guard); @@ -210,6 +212,7 @@ impl Accept { self.accept_all(sockets); } } + Some(WakerInterest::Stop) => { if !self.paused { self.deregister_all(sockets); @@ -217,6 +220,7 @@ impl Accept { return true; } + // waker queue is drained None => { // Reset the WakerQueue before break so it does not grow infinitely @@ -228,26 +232,44 @@ impl Accept { } } - fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { - let now = Instant::now(); + fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) { + // always remove old timeouts + if self.timeout.take().is_some() { + let now = Instant::now(); - sockets - .iter_mut() - // Only sockets that had an associated timeout were deregistered. - .filter(|info| info.timeout.is_some()) - .for_each(|info| { - let inst = info.timeout.take().unwrap(); + sockets + .iter_mut() + // Only sockets that had an associated timeout were deregistered. + .filter(|info| info.timeout.is_some()) + .for_each(|info| { + let inst = info.timeout.take().unwrap(); - if now < inst { - info.timeout = Some(inst); - } else if !self.paused { - self.register_logged(info); + if now < inst { + // still timed out; try to set new timeout + info.timeout = Some(inst); + self.set_timeout(inst - now); + } else if !self.paused { + // timeout expired; register socket again + self.register_logged(info); + } + + // Drop the timeout if server is paused and socket timeout is expired. + // When server recovers from pause it will register all sockets without + // a timeout value so this socket register will be delayed till then. + }); + } + } + + /// Update accept timeout with `duration` if it is shorter than current timeout. + fn set_timeout(&mut self, duration: Duration) { + match self.timeout { + Some(ref mut timeout) => { + if *timeout > duration { + *timeout = duration; } - - // Drop the timeout if server is paused and socket timeout is expired. - // When server recovers from pause it will register all sockets without - // a timeout value so this socket register will be delayed till then. - }); + } + None => self.timeout = Some(duration), + } } #[cfg(not(target_os = "windows"))] @@ -387,26 +409,7 @@ impl Accept { // the poll would need it mark which socket and when it's // listener should be registered info.timeout = Some(Instant::now() + Duration::from_millis(500)); - - // after the sleep a Timer interest is sent to Accept Poll - let waker = self.waker_queue.clone(); - - match System::try_current() { - Some(sys) => { - sys.arbiter().spawn(async move { - sleep(Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); - } - - None => { - let rt = tokio::runtime::Handle::current(); - rt.spawn(async move { - sleep(Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); - } - } + self.set_timeout(TIMEOUT_DURATION_ON_ERROR); return; } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 62ae57e4..34288d11 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,11 +1,11 @@ use std::{io, time::Duration}; use actix_rt::net::TcpStream; -use log::trace; +use log::{info, trace}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::{ - server::{ServerCommand, ServerHandle}, + server::ServerCommand, service::{InternalServiceFactory, ServiceFactory, StreamNewService}, socket::{ MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, @@ -14,18 +14,18 @@ use crate::{ Server, }; -/// Server builder +/// Server builder. pub struct ServerBuilder { - pub(super) threads: usize, - pub(super) token: usize, - pub(super) backlog: u32, - pub(super) factories: Vec>, - pub(super) sockets: Vec<(usize, String, MioListener)>, - pub(super) exit: bool, - pub(super) listen_os_signals: bool, - pub(super) cmd_tx: UnboundedSender, - pub(super) cmd_rx: UnboundedReceiver, - pub(super) worker_config: ServerWorkerConfig, + pub(crate) threads: usize, + pub(crate) token: usize, + pub(crate) backlog: u32, + pub(crate) factories: Vec>, + pub(crate) sockets: Vec<(usize, String, MioListener)>, + pub(crate) exit: bool, + pub(crate) listen_os_signals: bool, + pub(crate) cmd_tx: UnboundedSender, + pub(crate) cmd_rx: UnboundedReceiver, + pub(crate) worker_config: ServerWorkerConfig, } impl Default for ServerBuilder { @@ -37,8 +37,7 @@ impl Default for ServerBuilder { impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { - let (tx, rx) = unbounded_channel(); - let _server = ServerHandle::new(tx.clone()); + let (cmd_tx, cmd_rx) = unbounded_channel(); ServerBuilder { threads: num_cpus::get(), @@ -48,8 +47,8 @@ impl ServerBuilder { backlog: 2048, exit: false, listen_os_signals: true, - cmd_tx: tx, - cmd_rx: rx, + cmd_tx, + cmd_rx, worker_config: ServerWorkerConfig::default(), } } @@ -244,6 +243,7 @@ impl ServerBuilder { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { + info!("Starting {} workers", self.threads); Server::new(self) } } diff --git a/actix-server/src/join_all.rs b/actix-server/src/join_all.rs new file mode 100644 index 00000000..aa47d72d --- /dev/null +++ b/actix-server/src/join_all.rs @@ -0,0 +1,76 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +// a poor man's join future. joined future is only used when starting/stopping the server. +// pin_project and pinned futures are overkill for this task. +pub(crate) struct JoinAll { + fut: Vec>, +} + +pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAll { + let fut = fut + .into_iter() + .map(|f| JoinFuture::Future(Box::pin(f))) + .collect(); + + JoinAll { fut } +} + +enum JoinFuture { + Future(Pin>>), + Result(Option), +} + +impl Unpin for JoinAll {} + +impl Future for JoinAll { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut ready = true; + + let this = self.get_mut(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Future(f) = fut { + match f.as_mut().poll(cx) { + Poll::Ready(t) => { + *fut = JoinFuture::Result(Some(t)); + } + Poll::Pending => ready = false, + } + } + } + + if ready { + let mut res = Vec::new(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Result(f) = fut { + res.push(f.take().unwrap()); + } + } + + Poll::Ready(res) + } else { + Poll::Pending + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + use actix_utils::future::ready; + + #[actix_rt::test] + async fn test_join_all() { + let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; + let mut res = join_all(futs).await.into_iter(); + assert_eq!(Ok(1), res.next().unwrap()); + assert_eq!(Err(3), res.next().unwrap()); + assert_eq!(Ok(9), res.next().unwrap()); + } +} diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index d0c6de3e..7862b810 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -7,6 +7,7 @@ mod accept; mod availability; mod builder; +mod join_all; mod server; mod service; mod signals; @@ -22,83 +23,3 @@ pub use self::test_server::TestServer; #[doc(hidden)] pub use self::socket::FromStream; - -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Start server building process -pub fn new() -> ServerBuilder { - ServerBuilder::default() -} - -// a poor man's join future. joined future is only used when starting/stopping the server. -// pin_project and pinned futures are overkill for this task. -pub(crate) struct JoinAll { - fut: Vec>, -} - -pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAll { - let fut = fut - .into_iter() - .map(|f| JoinFuture::Future(Box::pin(f))) - .collect(); - - JoinAll { fut } -} - -enum JoinFuture { - Future(Pin>>), - Result(Option), -} - -impl Unpin for JoinAll {} - -impl Future for JoinAll { - type Output = Vec; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut ready = true; - - let this = self.get_mut(); - for fut in this.fut.iter_mut() { - if let JoinFuture::Future(f) = fut { - match f.as_mut().poll(cx) { - Poll::Ready(t) => { - *fut = JoinFuture::Result(Some(t)); - } - Poll::Pending => ready = false, - } - } - } - - if ready { - let mut res = Vec::new(); - for fut in this.fut.iter_mut() { - if let JoinFuture::Result(f) = fut { - res.push(f.take().unwrap()); - } - } - - Poll::Ready(res) - } else { - Poll::Pending - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - use actix_utils::future::ready; - - #[actix_rt::test] - async fn test_join_all() { - let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; - let mut res = join_all(futs).await.into_iter(); - assert_eq!(Ok(1), res.next().unwrap()); - assert_eq!(Err(3), res.next().unwrap()); - assert_eq!(Ok(9), res.next().unwrap()); - } -} diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 633bf1c0..3b37248e 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -1,32 +1,46 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; -use std::{io, mem}; +use std::{ + future::Future, + io, mem, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; -use actix_rt::time::sleep; -use actix_rt::System; +use actix_rt::{time::sleep, System}; use futures_core::future::LocalBoxFuture; use log::{error, info, trace}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, +}; -use crate::accept::Accept; -use crate::builder::ServerBuilder; -use crate::join_all; -use crate::service::InternalServiceFactory; -use crate::signals::{Signal, Signals}; -use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}; +use crate::{ + accept::Accept, + builder::ServerBuilder, + join_all::join_all, + service::InternalServiceFactory, + signals::{Signal, Signals}, + waker_queue::{WakerInterest, WakerQueue}, + worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}, +}; #[derive(Debug)] pub(crate) enum ServerCommand { + /// TODO WorkerFaulted(usize), + + /// Contains return channel to notify caller of successful state change. Pause(oneshot::Sender<()>), + + /// Contains return channel to notify caller of successful state change. Resume(oneshot::Sender<()>), + + /// TODO Stop { /// True if shut down should be graceful. graceful: bool, + + /// Return channel to notify caller that shutdown is complete. completion: Option>, }, } @@ -105,14 +119,17 @@ impl Server { } } + /// Get a handle for ServerFuture that can be used to change state of actix server. + /// + /// See [ServerHandle](ServerHandle) for usage. pub fn handle(&self) -> ServerHandle { match self { Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()), Server::Error(err) => { // TODO: i don't think this is the best way to handle server startup fail panic!( - "server handle can not be obtained because server failed to start up: {:?}", - err + "server handle can not be obtained because server failed to start up: {}", + err.as_ref().unwrap() ); } } @@ -138,10 +155,16 @@ impl Future for Server { } } - // eager drain command channel and handle command + // handle stop tasks and eager drain command channel loop { + if let Some(ref mut fut) = inner.stop_task { + // only resolve stop task and exit + return fut.as_mut().poll(cx).map(|_| Ok(())); + } + match Pin::new(&mut inner.cmd_rx).poll_recv(cx) { Poll::Ready(Some(cmd)) => { + // if stop task is required, set it and loop inner.stop_task = inner.handle_cmd(cmd); } _ => return Poll::Pending, @@ -167,10 +190,9 @@ impl ServerHandle { let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx)); } - /// Pause accepting incoming connections + /// Pause accepting incoming connections. /// - /// If socket contains some pending connection, they might be dropped. - /// All opened connection remains active. + /// May drop socket pending connection. All open connections remain active. pub fn pause(&self) -> impl Future { let (tx, rx) = oneshot::channel(); let _ = self.tx_cmd.send(ServerCommand::Pause(tx)); @@ -179,7 +201,7 @@ impl ServerHandle { } } - /// Resume accepting incoming connections + /// Resume accepting incoming connections. pub fn resume(&self) -> impl Future { let (tx, rx) = oneshot::channel(); let _ = self.tx_cmd.send(ServerCommand::Resume(tx)); @@ -189,8 +211,6 @@ impl ServerHandle { } /// Stop incoming connection processing, stop all workers and exit. - /// - /// If server starts with `spawn()` method, then spawned thread get terminated. pub fn stop(&self, graceful: bool) -> impl Future { let (tx, rx) = oneshot::channel(); let _ = self.tx_cmd.send(ServerCommand::Stop { @@ -264,7 +284,7 @@ impl ServerInner { } ServerCommand::WorkerFaulted(idx) => { - // TODO: maybe just return if not found ? + // TODO: maybe just return with warning log if not found ? assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx)); error!("Worker {} has died; restarting", idx); @@ -290,7 +310,8 @@ impl ServerInner { self.waker_queue.wake(WakerInterest::Worker(handle_accept)); } - Err(_) => todo!(), + + Err(err) => error!("can not restart worker {}: {}", idx, err), }; None diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index ad6ee8ee..7cf0d0a6 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -1,9 +1,9 @@ use std::sync::mpsc; -use std::{net, thread}; +use std::{io, net, thread}; use actix_rt::{net::TcpStream, System}; -use crate::{Server, ServerBuilder, ServiceFactory}; +use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory}; /// A testing server. /// @@ -34,7 +34,8 @@ pub struct TestServerRuntime { addr: net::SocketAddr, host: String, port: u16, - system: System, + server_handle: ServerHandle, + thread_handle: Option>>, } impl TestServer { @@ -46,20 +47,22 @@ impl TestServer { let (tx, rx) = mpsc::channel(); // run server in separate thread - thread::spawn(move || { - let sys = System::new(); - factory(Server::build()).workers(1).disable_signals().run(); - - tx.send(System::current()).unwrap(); - sys.run() + let thread_handle = thread::spawn(move || { + System::new().block_on(async { + let server = factory(Server::build()).workers(1).disable_signals().run(); + tx.send(server.handle()).unwrap(); + server.await + }) }); - let system = rx.recv().unwrap(); + + let server_handle = rx.recv().unwrap(); TestServerRuntime { - system, addr: "127.0.0.1:0".parse().unwrap(), host: "127.0.0.1".to_string(), port: 0, + server_handle, + thread_handle: Some(thread_handle), } } @@ -68,24 +71,25 @@ impl TestServer { let (tx, rx) = mpsc::channel(); // run server in separate thread - thread::spawn(move || { + let thread_handle = thread::spawn(move || { let sys = System::new(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); sys.block_on(async { - Server::build() + let server = Server::build() .listen("test", tcp, factory) .unwrap() .workers(1) .disable_signals() .run(); - tx.send((System::current(), local_addr)).unwrap(); - }); - sys.run() + + tx.send((server.handle(), local_addr)).unwrap(); + server.await + }) }); - let (system, addr) = rx.recv().unwrap(); + let (server_handle, addr) = rx.recv().unwrap(); let host = format!("{}", addr.ip()); let port = addr.port(); @@ -94,7 +98,8 @@ impl TestServer { addr, host, port, - system, + server_handle, + thread_handle: Some(thread_handle), } } @@ -127,7 +132,8 @@ impl TestServerRuntime { /// Stop server. fn stop(&mut self) { - self.system.stop(); + let _ = self.server_handle.stop(false); + self.thread_handle.take().unwrap().join().unwrap().unwrap(); } /// Connect to server, returning a Tokio `TcpStream`. diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 3f8669d4..a7280901 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -78,12 +78,7 @@ pub(crate) enum WakerInterest { Pause, Resume, Stop, - /// `Timer` is an interest sent as a delayed future. When an error happens on accepting - /// connection `Accept` would deregister socket listener temporary and wake up the poll and - /// register them again after the delayed future resolve. - Timer, - /// `Worker` is an interest happen after a worker runs into faulted state(This is determined - /// by if work can be sent to it successfully).`Accept` would be waked up and add the new - /// `WorkerHandleAccept`. + /// `Worker` is an interest that is triggered after a worker faults. This is determined by + /// trying to send work to it. `Accept` would be waked up and add the new `WorkerHandleAccept`. Worker(WorkerHandleAccept), } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 24f79e60..1c9d6135 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -24,7 +24,7 @@ use tokio::sync::{ }; use crate::{ - join_all, + join_all::join_all, service::{BoxedServerService, InternalServiceFactory}, socket::MioStream, waker_queue::{WakerInterest, WakerQueue}, @@ -224,7 +224,7 @@ impl WorkerService { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum WorkerServiceStatus { Available, Unavailable, @@ -235,7 +235,7 @@ enum WorkerServiceStatus { } /// Config for worker behavior passed down from server builder. -#[derive(Copy, Clone)] +#[derive(Debug, Clone, Copy)] pub(crate) struct ServerWorkerConfig { shutdown_timeout: Duration, max_blocking_threads: usize,