diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index ad18a1ac..76918967 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -16,7 +16,7 @@ use std::sync::{ use std::{env, io}; use actix_rt::net::TcpStream; -use actix_server::Server; +use actix_server::ServerHandle; use actix_service::pipeline_factory; use bytes::BytesMut; use futures_util::future::ok; @@ -36,7 +36,7 @@ async fn main() -> io::Result<()> { // Bind socket address and start worker(s). By default, the server uses the number of available // logical CPU cores as the worker count. For this reason, the closure passed to bind needs // to return a service *factory*; so it can be created once per worker. - Server::build() + ServerHandle::build() .bind("echo", addr, move || { let count = Arc::clone(&count); let num2 = Arc::clone(&count); diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 3a7e72bf..6d76ef67 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -5,11 +5,11 @@ use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; use slab::Slab; -use crate::server_handle::Server; +use crate::server_handle::ServerHandle; use crate::socket::{MioListener, SocketAddr}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; -use crate::worker::{Conn, WorkerHandle}; -use crate::Token; +use crate::worker::{Conn, ServerWorker, WorkerAvailability, WorkerHandle}; +use crate::{ServerBuilder, Token}; const DUR_ON_ERR: Duration = Duration::from_millis(500); @@ -32,7 +32,7 @@ pub(crate) struct Accept { poll: Poll, waker_queue: WakerQueue, handles: Vec, - srv: Server, + srv: ServerHandle, next: usize, backpressure: bool, // poll time duration. @@ -54,40 +54,42 @@ fn connection_error(e: &io::Error) -> bool { } impl Accept { - pub(crate) fn start( + pub(crate) fn start( sockets: Vec<(Token, MioListener)>, - server_handle: Server, - worker_factory: F, - ) -> WakerQueue - where - F: FnOnce(&WakerQueue) -> Vec, - { + builder: &ServerBuilder, + ) -> io::Result<(WakerQueue, Vec<(usize, WorkerHandle)>)> { + let server_handle = ServerHandle::new(builder.cmd_tx.clone()); + // construct poll instance and it's waker - let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); - let waker_queue = WakerQueue::new(poll.registry()) - .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); - let waker_clone = waker_queue.clone(); + let poll = Poll::new()?; + let waker_queue = WakerQueue::new(poll.registry())?; // construct workers and collect handles. - let handles = worker_factory(&waker_queue); + let (handles, handles_clone) = (0..builder.threads) + .map(|idx| { + // start workers + let availability = WorkerAvailability::new(waker_queue.clone()); + let factories = builder.services.iter().map(|v| v.clone_factory()).collect(); + let handle = + ServerWorker::start(idx, factories, availability, builder.worker_config); + let handle_clone = (idx, handle.clone()); + (handle, handle_clone) + }) + .unzip(); + + let wake_queue_clone = waker_queue.clone(); + + let (mut accept, sockets) = + Accept::new_with_sockets(poll, wake_queue_clone, sockets, handles, server_handle)?; // Accept runs in its own thread. thread::Builder::new() .name("actix-server acceptor".to_owned()) - .spawn(move || { - let (mut accept, sockets) = Accept::new_with_sockets( - poll, - waker_queue, - sockets, - handles, - server_handle, - ); - accept.poll_with(sockets); - }) + .spawn(move || accept.poll_with(sockets)) .unwrap(); - // return waker to server builder. - waker_clone + // return waker and worker handle clones to server builder. + Ok((waker_queue, handles_clone)) } fn new_with_sockets( @@ -95,8 +97,8 @@ impl Accept { waker_queue: WakerQueue, socks: Vec<(Token, MioListener)>, handles: Vec, - srv: Server, - ) -> (Accept, Slab) { + srv: ServerHandle, + ) -> io::Result<(Accept, Slab)> { let mut sockets = Slab::new(); for (hnd_token, mut lst) in socks.into_iter() { let addr = lst.local_addr(); @@ -106,8 +108,7 @@ impl Accept { // Start listening for incoming connections poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); + .register(&mut lst, MioToken(token), Interest::READABLE)?; entry.insert(ServerSocketInfo { addr, @@ -127,7 +128,7 @@ impl Accept { timeout: None, }; - (accept, sockets) + Ok((accept, sockets)) } fn poll_with(&mut self, mut sockets: Slab) { diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index f9ba7fc9..2da34703 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -14,7 +14,7 @@ use tokio::sync::oneshot; use crate::accept::Accept; use crate::config::{ConfiguredService, ServiceConfig}; -use crate::server_handle::{Server, ServerCommand}; +use crate::server_handle::{ServerCommand, ServerHandle}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; @@ -25,16 +25,16 @@ use crate::Token; /// Server builder pub struct ServerBuilder { - threads: usize, + pub(super) threads: usize, token: Token, backlog: u32, - services: Vec>, + pub(super) services: Vec>, sockets: Vec<(Token, String, MioListener)>, exit: bool, no_signals: bool, - cmd_tx: UnboundedSender, + pub(super) cmd_tx: UnboundedSender, cmd_rx: UnboundedReceiver, - worker_config: ServerWorkerConfig, + pub(super) worker_config: ServerWorkerConfig, } impl Default for ServerBuilder { @@ -264,7 +264,7 @@ impl ServerBuilder { } /// Starts processing incoming connections and return server controller. - pub fn run(mut self) -> ServerFuture { + pub fn run(mut self) -> Server { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { @@ -278,33 +278,10 @@ impl ServerBuilder { }) .collect(); - // collect worker handles on start. - let mut handles = Vec::new(); - - // start accept thread. return waker_queue for wake up it. - let waker_queue = Accept::start( - sockets, - Server::new(self.cmd_tx.clone()), - // closure for construct worker and return it's handler. - |waker| { - (0..self.threads) - .map(|idx| { - // start workers - let availability = WorkerAvailability::new(waker.clone()); - let factories = - self.services.iter().map(|v| v.clone_factory()).collect(); - let handle = ServerWorker::start( - idx, - factories, - availability, - self.worker_config, - ); - handles.push((idx, handle.clone())); - handle - }) - .collect() - }, - ); + // start accept thread. return waker_queue and worker handles. + let (waker_queue, handles) = Accept::start(sockets, &self) + // TODO: include error to Server type and poll return it in Future. + .unwrap_or_else(|e| panic!("Can not start Accept: {}", e)); // construct signals future. let signals = if !self.no_signals { @@ -313,7 +290,7 @@ impl ServerBuilder { None }; - ServerFuture { + Server { cmd_tx: self.cmd_tx, cmd_rx: self.cmd_rx, handles, @@ -331,7 +308,7 @@ impl ServerBuilder { /// When awaited or spawned would listen to signal and message from [ServerHandle](crate::server::ServerHandle). #[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ServerFuture { +pub struct Server { cmd_tx: UnboundedSender, cmd_rx: UnboundedReceiver, handles: Vec<(usize, WorkerHandle)>, @@ -344,12 +321,12 @@ pub struct ServerFuture { waker_queue: WakerQueue, } -impl ServerFuture { +impl Server { /// Obtain a Handle for ServerFuture that can be used to change state of actix server. /// /// See [ServerHandle](crate::server::ServerHandle) for usage. - pub fn handle(&self) -> Server { - Server::new(self.cmd_tx.clone()) + pub fn handle(&self) -> ServerHandle { + ServerHandle::new(self.cmd_tx.clone()) } fn handle_cmd(&mut self, item: ServerCommand) -> Option> { @@ -493,7 +470,7 @@ impl ServerFuture { } } -impl Future for ServerFuture { +impl Future for Server { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 6be373bd..8a87b4b3 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -15,9 +15,9 @@ mod test_server; mod waker_queue; mod worker; -pub use self::builder::{ServerBuilder, ServerFuture}; +pub use self::builder::{Server, ServerBuilder}; pub use self::config::{ServiceConfig, ServiceRuntime}; -pub use self::server_handle::Server; +pub use self::server_handle::ServerHandle; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; diff --git a/actix-server/src/server_handle.rs b/actix-server/src/server_handle.rs index 74228b3f..679dc1a1 100644 --- a/actix-server/src/server_handle.rs +++ b/actix-server/src/server_handle.rs @@ -25,14 +25,14 @@ pub(crate) enum ServerCommand { } #[derive(Debug)] -pub struct Server( +pub struct ServerHandle( UnboundedSender, Option>, ); -impl Server { +impl ServerHandle { pub(crate) fn new(tx: UnboundedSender) -> Self { - Server(tx, None) + ServerHandle(tx, None) } /// Start server building process @@ -80,13 +80,13 @@ impl Server { } } -impl Clone for Server { +impl Clone for ServerHandle { fn clone(&self) -> Self { Self(self.0.clone(), None) } } -impl Future for Server { +impl Future for ServerHandle { type Output = io::Result<()>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index 35707293..683f4b97 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -3,7 +3,7 @@ use std::{net, thread}; use actix_rt::{net::TcpStream, System}; -use crate::{Server, ServerBuilder, ServiceFactory}; +use crate::{ServerBuilder, ServerHandle, ServiceFactory}; /// The `TestServer` type. /// @@ -49,7 +49,10 @@ impl TestServer { // run server in separate thread thread::spawn(move || { System::new().block_on(async { - let server = factory(Server::build()).workers(1).disable_signals().run(); + let server = factory(ServerHandle::build()) + .workers(1) + .disable_signals() + .run(); tx.send(System::current()).unwrap(); server.await @@ -76,7 +79,7 @@ impl TestServer { let local_addr = tcp.local_addr().unwrap(); sys.block_on(async { - let server = Server::build() + let server = ServerHandle::build() .listen("test", tcp, factory) .unwrap() .workers(1) diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index a0a5b27b..4a090ed5 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicUsize, Ordering::Relaxed}; use std::sync::{mpsc, Arc}; use std::{net, thread, time}; -use actix_server::Server; +use actix_server::ServerHandle; use actix_service::fn_service; use actix_utils::future::ok; use futures_util::future::lazy; @@ -24,7 +24,7 @@ fn test_bind() { let h = thread::spawn(move || { let system = actix_rt::System::new(); system.block_on(async { - let server = Server::build() + let server = ServerHandle::build() .workers(1) .disable_signals() .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) @@ -55,7 +55,7 @@ fn test_listen() { let sys = actix_rt::System::new(); let lst = net::TcpListener::bind(addr).unwrap(); sys.block_on(async { - let server = Server::build() + let server = ServerHandle::build() .disable_signals() .workers(1) .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) @@ -92,7 +92,7 @@ fn test_start() { let h = thread::spawn(move || { actix_rt::System::new().block_on(async { - let server = Server::build() + let server = ServerHandle::build() .backlog(100) .disable_signals() .bind("test", addr, move || { @@ -160,7 +160,7 @@ fn test_configure() { let h = thread::spawn(move || { let num = num2.clone(); actix_rt::System::new().block_on(async { - let server = Server::build() + let server = ServerHandle::build() .disable_signals() .configure(move |cfg| { let num = num.clone(); diff --git a/actix-tls/examples/tcp-rustls.rs b/actix-tls/examples/tcp-rustls.rs index d0c20428..52d483fb 100644 --- a/actix-tls/examples/tcp-rustls.rs +++ b/actix-tls/examples/tcp-rustls.rs @@ -30,7 +30,7 @@ use std::{ }; use actix_rt::net::TcpStream; -use actix_server::Server; +use actix_server::ServerHandle; use actix_service::pipeline_factory; use actix_tls::accept::rustls::{Acceptor as RustlsAcceptor, TlsStream}; use futures_util::future::ok; @@ -68,7 +68,7 @@ async fn main() -> io::Result<()> { let addr = ("127.0.0.1", 8443); info!("starting server on port: {}", &addr.0); - Server::build() + ServerHandle::build() .bind("tls-example", addr, move || { let count = Arc::clone(&count);