From 161d1ee94b5dca1e2a9cfc99d5dcd4bdcfb31072 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 7 Nov 2021 21:00:19 +0800 Subject: [PATCH 1/9] fix accept timeout and worker graceful shutdown (#412) --- actix-server/src/accept.rs | 2 +- actix-server/src/worker.rs | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index bdeb6004..cd37460b 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -127,7 +127,7 @@ impl Accept { let mut events = mio::Events::with_capacity(256); loop { - if let Err(e) = self.poll.poll(&mut events, None) { + if let Err(e) = self.poll.poll(&mut events, self.timeout) { match e.kind() { io::ErrorKind::Interrupted => {} _ => panic!("Poll error: {}", e), diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index af14ab4b..2d104b8d 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -552,6 +552,14 @@ impl Future for ServerWorker { self.poll(cx) } WorkerState::Shutdown(ref mut shutdown) => { + // drop all pending connections in rx channel. + while let Poll::Ready(Some(conn)) = Pin::new(&mut this.rx).poll_recv(cx) { + // WorkerCounterGuard is needed as Accept thread has incremented counter. + // It's guard's job to decrement the counter together with drop of Conn. + let guard = this.counter.guard(); + drop((conn, guard)); + } + // wait for 1 second ready!(shutdown.timer.as_mut().poll(cx)); From 3f49d8ab546789da46f1c8d12b8706e6b93d4262 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 7 Nov 2021 22:18:23 +0800 Subject: [PATCH 2/9] remove usage of mio::net::TcpSocket (#413) --- actix-server/Cargo.toml | 1 + actix-server/src/builder.rs | 16 +++------------- actix-server/src/socket.rs | 25 +++++++++++++++++++------ actix-server/tests/test_server.rs | 13 ++++++++----- 4 files changed, 31 insertions(+), 24 deletions(-) diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index d58aaa39..b6665b29 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -29,6 +29,7 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc log = "0.4" mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" +socket2 = "0.4.2" tokio = { version = "1.5.1", features = ["sync"] } [dev-dependencies] diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 0d4abe78..dbf00303 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -8,7 +8,8 @@ use crate::{ server::ServerCommand, service::{InternalServiceFactory, ServiceFactory, StreamNewService}, socket::{ - MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, + create_mio_tcp_listener, MioListener, MioTcpListener, StdSocketAddr, StdTcpListener, + ToSocketAddrs, }, worker::ServerWorkerConfig, Server, @@ -263,7 +264,7 @@ pub(super) fn bind_addr( let mut success = false; let mut sockets = Vec::new(); for addr in addr.to_socket_addrs()? { - match create_tcp_listener(addr, backlog) { + match create_mio_tcp_listener(addr, backlog) { Ok(lst) => { success = true; sockets.push(lst); @@ -283,14 +284,3 @@ pub(super) fn bind_addr( )) } } - -fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result { - let socket = match addr { - StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?, - StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?, - }; - - socket.set_reuseaddr(true)?; - socket.bind(addr)?; - socket.listen(backlog) -} diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index cd7ccc1a..6f641d73 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -2,7 +2,7 @@ pub(crate) use std::net::{ SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs, }; -pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket}; +pub(crate) use mio::net::TcpListener as MioTcpListener; #[cfg(unix)] pub(crate) use { mio::net::UnixListener as MioUnixListener, @@ -223,6 +223,22 @@ mod unix_impl { } } +pub(crate) fn create_mio_tcp_listener( + addr: StdSocketAddr, + backlog: u32, +) -> io::Result { + use socket2::{Domain, Protocol, Socket, Type}; + + let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?; + + socket.set_reuse_address(true)?; + socket.set_nonblocking(true)?; + socket.bind(&addr.into())?; + socket.listen(backlog as i32)?; + + Ok(MioTcpListener::from_std(StdTcpListener::from(socket))) +} + #[cfg(test)] mod tests { use super::*; @@ -234,11 +250,8 @@ mod tests { assert_eq!(format!("{}", addr), "127.0.0.1:8080"); let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = MioTcpSocket::new_v4().unwrap(); - socket.set_reuseaddr(true).unwrap(); - socket.bind(addr).unwrap(); - let tcp = socket.listen(128).unwrap(); - let lst = MioListener::Tcp(tcp); + let lst = create_mio_tcp_listener(addr, 128).unwrap(); + let lst = MioListener::Tcp(lst); assert!(format!("{:?}", lst).contains("TcpListener")); assert!(format!("{}", lst).contains("127.0.0.1")); } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 0506586e..0a8cd2ae 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -5,14 +5,17 @@ use std::{net, thread, time::Duration}; use actix_rt::{net::TcpStream, time::sleep}; use actix_server::Server; use actix_service::fn_service; +use socket2::{Domain, Protocol, Socket, Type}; fn unused_addr() -> net::SocketAddr { let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = mio::net::TcpSocket::new_v4().unwrap(); - socket.bind(addr).unwrap(); - socket.set_reuseaddr(true).unwrap(); - let tcp = socket.listen(32).unwrap(); - tcp.local_addr().unwrap() + let socket = + Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP)).unwrap(); + socket.set_reuse_address(true).unwrap(); + socket.set_nonblocking(true).unwrap(); + socket.bind(&addr.into()).unwrap(); + socket.listen(32).unwrap(); + net::TcpListener::from(socket).local_addr().unwrap() } #[test] From 365892901052485b01c205748caccddcf34b0d9f Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 7 Nov 2021 23:43:59 +0800 Subject: [PATCH 3/9] fix io-uring feature for actix-server (#414) Co-authored-by: Rob Ede --- .github/workflows/ci.yml | 13 +++---------- actix-server/Cargo.toml | 5 ++++- actix-server/src/worker.rs | 40 ++++++++++++++++++++++---------------- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a1353f60..0533b8af 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -196,13 +196,6 @@ jobs: - name: Cache Dependencies uses: Swatinem/rust-cache@v1.3.0 - - name: Install cargo-hack - uses: actions-rs/cargo@v1 - with: - command: install - args: cargo-hack - - - name: doc tests - uses: actions-rs/cargo@v1 - timeout-minutes: 40 - with: { command: ci-doctest } + - name: doc tests io-uring + run: | + sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=nightly cargo ci-doctest" diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index b6665b29..5e700ae0 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" [features] default = [] -io-uring = ["actix-rt/io-uring"] +io-uring = ["tokio-uring"] [dependencies] actix-rt = { version = "2.4.0", default-features = false } @@ -32,6 +32,9 @@ num_cpus = "1.13" socket2 = "0.4.2" tokio = { version = "1.5.1", features = ["sync"] } +# runtime for io-uring feature +tokio-uring = { version = "0.1", optional = true } + [dev-dependencies] actix-codec = "0.4.0" actix-rt = "2.0.0" diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 2d104b8d..02f68294 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -283,15 +283,6 @@ impl ServerWorker { let counter = Counter::new(config.max_concurrent_connections); let counter_clone = counter.clone(); - // every worker runs in it's own arbiter. - // use a custom tokio runtime builder to change the settings of runtime. - #[cfg(all(target_os = "linux", feature = "io-uring"))] - let arbiter = { - // TODO: pass max blocking thread config when tokio-uring enable configuration - // on building runtime. - let _ = config.max_blocking_threads; - Arbiter::new() - }; // get actix system context if it is set let sys = System::try_current(); @@ -299,6 +290,8 @@ impl ServerWorker { // service factories initialization channel let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1); + // every worker runs in it's own thread and tokio runtime. + // use a custom tokio runtime builder to change the settings of runtime. std::thread::Builder::new() .name(format!("actix-server worker {}", idx)) .spawn(move || { @@ -307,13 +300,7 @@ impl ServerWorker { System::set_current(sys); } - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap(); - - rt.block_on(tokio::task::LocalSet::new().run_until(async move { + let worker_fut = async move { let fut = factories .iter() .enumerate() @@ -368,7 +355,26 @@ impl ServerWorker { }) .await .expect("task 2 panic"); - })) + }; + + #[cfg(all(target_os = "linux", feature = "io-uring"))] + { + // TODO: pass max blocking thread config when tokio-uring enable configuration + // on building runtime. + let _ = config.max_blocking_threads; + tokio_uring::start(worker_fut) + } + + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap(); + + rt.block_on(tokio::task::LocalSet::new().run_until(worker_fut)) + } }) .expect("worker thread error/panic"); From ed987eef06176262c2a9cfdfb6f6a50b23b923bb Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Fri, 5 Nov 2021 02:18:20 +0000 Subject: [PATCH 4/9] prepare actix-server release 2.0.0-beta.8 --- actix-server/CHANGES.md | 3 +++ actix-server/Cargo.toml | 2 +- actix-tls/Cargo.toml | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 098c37fb..50dd65e9 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx + + +## 2.0.0-beta.8 - 2021-11-05 * Fix non-unix signal handler. [#410] [#410]: https://github.com/actix/actix-net/pull/410 diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 5e700ae0..91096cd6 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-server" -version = "2.0.0-beta.7" +version = "2.0.0-beta.8" authors = [ "Nikolay Kim ", "fakeshadow <24548779@qq.com>", diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 91b323b8..b7903fc1 100755 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -62,7 +62,7 @@ tokio-native-tls = { version = "0.3", optional = true } [dev-dependencies] actix-rt = "2.2.0" -actix-server = "2.0.0-beta.7" +actix-server = "2.0.0-beta.8" bytes = "1" env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } From 38caa8f088a9ce3a21cdc6e2b68e9d3aa9e06cf0 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sun, 14 Nov 2021 19:45:15 +0000 Subject: [PATCH 5/9] Fix server arbiter support (#417) --- actix-server/Cargo.toml | 3 +- actix-server/src/join_all.rs | 68 +------- actix-server/src/server.rs | 8 +- actix-server/src/test_server.rs | 13 ++ actix-server/src/worker.rs | 280 ++++++++++++++++++++---------- actix-server/tests/test_server.rs | 58 +++++-- actix-service/src/macros.rs | 2 +- actix-utils/Cargo.toml | 1 + actix-utils/src/future/ready.rs | 6 + 9 files changed, 266 insertions(+), 173 deletions(-) diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 91096cd6..66e77c2f 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" [features] default = [] -io-uring = ["tokio-uring"] +io-uring = ["tokio-uring", "actix-rt/io-uring"] [dependencies] actix-rt = { version = "2.4.0", default-features = false } @@ -26,6 +26,7 @@ actix-service = "2.0.0" actix-utils = "3.0.0" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } +futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } log = "0.4" mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" diff --git a/actix-server/src/join_all.rs b/actix-server/src/join_all.rs index ae68871c..bdef62ef 100644 --- a/actix-server/src/join_all.rs +++ b/actix-server/src/join_all.rs @@ -4,7 +4,7 @@ use std::{ task::{Context, Poll}, }; -use futures_core::future::{BoxFuture, LocalBoxFuture}; +use futures_core::future::BoxFuture; // a poor man's join future. joined future is only used when starting/stopping the server. // pin_project and pinned futures are overkill for this task. @@ -61,63 +61,6 @@ impl Future for JoinAll { } } -pub(crate) fn join_all_local( - fut: Vec + 'static>, -) -> JoinAllLocal { - let fut = fut - .into_iter() - .map(|f| JoinLocalFuture::LocalFuture(Box::pin(f))) - .collect(); - - JoinAllLocal { fut } -} - -// a poor man's join future. joined future is only used when starting/stopping the server. -// pin_project and pinned futures are overkill for this task. -pub(crate) struct JoinAllLocal { - fut: Vec>, -} - -enum JoinLocalFuture { - LocalFuture(LocalBoxFuture<'static, T>), - Result(Option), -} - -impl Unpin for JoinAllLocal {} - -impl Future for JoinAllLocal { - type Output = Vec; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut ready = true; - - let this = self.get_mut(); - for fut in this.fut.iter_mut() { - if let JoinLocalFuture::LocalFuture(f) = fut { - match f.as_mut().poll(cx) { - Poll::Ready(t) => { - *fut = JoinLocalFuture::Result(Some(t)); - } - Poll::Pending => ready = false, - } - } - } - - if ready { - let mut res = Vec::new(); - for fut in this.fut.iter_mut() { - if let JoinLocalFuture::Result(f) = fut { - res.push(f.take().unwrap()); - } - } - - Poll::Ready(res) - } else { - Poll::Pending - } - } -} - #[cfg(test)] mod test { use super::*; @@ -132,13 +75,4 @@ mod test { assert_eq!(Err(3), res.next().unwrap()); assert_eq!(Ok(9), res.next().unwrap()); } - - #[actix_rt::test] - async fn test_join_all_local() { - let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; - let mut res = join_all_local(futs).await.into_iter(); - assert_eq!(Ok(1), res.next().unwrap()); - assert_eq!(Err(3), res.next().unwrap()); - assert_eq!(Ok(9), res.next().unwrap()); - } } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index f1edcb23..9611062a 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -132,12 +132,12 @@ impl Server { .collect(); // Give log information on what runtime will be used. - let is_tokio = tokio::runtime::Handle::try_current().is_ok(); let is_actix = actix_rt::System::try_current().is_some(); + let is_tokio = tokio::runtime::Handle::try_current().is_ok(); - match (is_tokio, is_actix) { - (true, false) => info!("Tokio runtime found. Starting in existing Tokio runtime"), - (_, true) => info!("Actix runtime found. Starting in Actix runtime"), + match (is_actix, is_tokio) { + (false, true) => info!("Tokio runtime found. Starting in existing Tokio runtime"), + (true, _) => info!("Actix runtime found. Starting in Actix runtime"), (_, _) => info!( "Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime" ), diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index 7cf0d0a6..a7914372 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -147,3 +147,16 @@ impl Drop for TestServerRuntime { self.stop() } } + +#[cfg(test)] +mod tests { + use actix_service::fn_service; + + use super::*; + + #[tokio::test] + async fn plain_tokio_runtime() { + let srv = TestServer::with(|| fn_service(|_sock| async move { Ok::<_, ()>(()) })); + assert!(srv.connect().is_ok()); + } +} diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 02f68294..0822ab7c 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -24,7 +24,6 @@ use tokio::sync::{ }; use crate::{ - join_all::join_all_local, service::{BoxedServerService, InternalServiceFactory}, socket::MioStream, waker_queue::{WakerInterest, WakerQueue}, @@ -202,8 +201,8 @@ impl WorkerHandleServer { pub(crate) struct ServerWorker { // UnboundedReceiver should always be the first field. // It must be dropped as soon as ServerWorker dropping. - rx: UnboundedReceiver, - rx2: UnboundedReceiver, + conn_rx: UnboundedReceiver, + stop_rx: UnboundedReceiver, counter: WorkerCounter, services: Box<[WorkerService]>, factories: Box<[Box]>, @@ -212,7 +211,7 @@ pub(crate) struct ServerWorker { } struct WorkerService { - factory: usize, + factory_idx: usize, status: WorkerServiceStatus, service: BoxedServerService, } @@ -234,6 +233,12 @@ enum WorkerServiceStatus { Stopped, } +impl Default for WorkerServiceStatus { + fn default() -> Self { + Self::Unavailable + } +} + /// Config for worker behavior passed down from server builder. #[derive(Debug, Clone, Copy)] pub(crate) struct ServerWorkerConfig { @@ -277,111 +282,196 @@ impl ServerWorker { ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { trace!("starting server worker {}", idx); - let (tx1, rx) = unbounded_channel(); - let (tx2, rx2) = unbounded_channel(); + let (tx1, conn_rx) = unbounded_channel(); + let (tx2, stop_rx) = unbounded_channel(); let counter = Counter::new(config.max_concurrent_connections); - - let counter_clone = counter.clone(); + let pair = handle_pair(idx, tx1, tx2, counter.clone()); // get actix system context if it is set - let sys = System::try_current(); + let actix_system = System::try_current(); + + // get tokio runtime handle if it is set + let tokio_handle = tokio::runtime::Handle::try_current().ok(); // service factories initialization channel - let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1); + let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel::>(1); + + // outline of following code: + // + // if system exists + // if uring enabled + // start arbiter using uring method + // else + // start arbiter with regular tokio + // else + // if uring enabled + // start uring in spawned thread + // else + // start regular tokio in spawned thread // every worker runs in it's own thread and tokio runtime. // use a custom tokio runtime builder to change the settings of runtime. - std::thread::Builder::new() - .name(format!("actix-server worker {}", idx)) - .spawn(move || { - // forward existing actix system context - if let Some(sys) = sys { - System::set_current(sys); - } - let worker_fut = async move { - let fut = factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { fut.await.map(|(t, s)| (idx, t, s)) } - }) - .collect::>(); + match (actix_system, tokio_handle) { + (None, None) => { + panic!("No runtime detected. Start a Tokio (or Actix) runtime."); + } - // a second spawn to run !Send future tasks. - spawn(async move { - let res = join_all_local(fut) - .await - .into_iter() - .collect::, _>>(); + // no actix system + (None, Some(rt_handle)) => { + std::thread::Builder::new() + .name(format!("actix-server worker {}", idx)) + .spawn(move || { + let (worker_stopped_tx, worker_stopped_rx) = oneshot::channel(); - let services = match res { - Ok(res) => res - .into_iter() - .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token, services.len()); - services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - services - }) - .into_boxed_slice(), + // local set for running service init futures and worker services + let ls = tokio::task::LocalSet::new(); - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::try_current().as_ref().map(ArbiterHandle::stop); + // init services using existing Tokio runtime (so probably on main thread) + let services = rt_handle.block_on(ls.run_until(async { + let mut services = Vec::new(); + + for (idx, factory) in factories.iter().enumerate() { + match factory.create().await { + Ok((token, svc)) => services.push((idx, token, svc)), + + Err(err) => { + error!("Can not start worker: {:?}", err); + return Err(io::Error::new( + io::ErrorKind::Other, + format!("can not start server service {}", idx), + )); + } + } + } + + Ok(services) + })); + + let services = match services { + Ok(services) => { + factory_tx.send(Ok(())).unwrap(); + services + } + Err(err) => { + factory_tx.send(Err(err)).unwrap(); return; } }; - factory_tx.send(()).unwrap(); + let worker_services = wrap_worker_services(services); - // a third spawn to make sure ServerWorker runs as non boxed future. - spawn(ServerWorker { - rx, - rx2, - services, - counter: WorkerCounter::new(idx, waker_queue, counter_clone), - factories: factories.into_boxed_slice(), - state: Default::default(), - shutdown_timeout: config.shutdown_timeout, - }) - .await - .expect("task 3 panic"); + let worker_fut = async move { + // spawn to make sure ServerWorker runs as non boxed future. + spawn(async move { + ServerWorker { + conn_rx, + stop_rx, + services: worker_services.into_boxed_slice(), + counter: WorkerCounter::new(idx, waker_queue, counter), + factories: factories.into_boxed_slice(), + state: WorkerState::default(), + shutdown_timeout: config.shutdown_timeout, + } + .await; + + // wake up outermost task waiting for shutdown + worker_stopped_tx.send(()).unwrap(); + }); + + worker_stopped_rx.await.unwrap(); + }; + + #[cfg(all(target_os = "linux", feature = "io-uring"))] + { + // TODO: pass max blocking thread config when tokio-uring enable configuration + // on building runtime. + let _ = config.max_blocking_threads; + tokio_uring::start(worker_fut); + } + + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap(); + + rt.block_on(ls.run_until(worker_fut)); + } }) - .await - .expect("task 2 panic"); - }; + .expect("cannot spawn server worker thread"); + } + // with actix system + (Some(_sys), _) => { #[cfg(all(target_os = "linux", feature = "io-uring"))] - { + let arbiter = { // TODO: pass max blocking thread config when tokio-uring enable configuration // on building runtime. let _ = config.max_blocking_threads; - tokio_uring::start(worker_fut) - } + Arbiter::new() + }; #[cfg(not(all(target_os = "linux", feature = "io-uring")))] - { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap(); + let arbiter = { + Arbiter::with_tokio_rt(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap() + }) + }; - rt.block_on(tokio::task::LocalSet::new().run_until(worker_fut)) - } - }) - .expect("worker thread error/panic"); + arbiter.spawn(async move { + // spawn_local to run !Send future tasks. + spawn(async move { + let mut services = Vec::new(); + + for (idx, factory) in factories.iter().enumerate() { + match factory.create().await { + Ok((token, svc)) => services.push((idx, token, svc)), + + Err(err) => { + error!("Can not start worker: {:?}", err); + Arbiter::current().stop(); + factory_tx + .send(Err(io::Error::new( + io::ErrorKind::Other, + format!("can not start server service {}", idx), + ))) + .unwrap(); + return; + } + } + } + + factory_tx.send(Ok(())).unwrap(); + + let worker_services = wrap_worker_services(services); + + // spawn to make sure ServerWorker runs as non boxed future. + spawn(ServerWorker { + conn_rx, + stop_rx, + services: worker_services.into_boxed_slice(), + counter: WorkerCounter::new(idx, waker_queue, counter), + factories: factories.into_boxed_slice(), + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, + }); + }); + }); + } + }; // wait for service factories initialization - factory_rx.recv().unwrap(); + factory_rx.recv().unwrap()?; - Ok(handle_pair(idx, tx1, tx2, counter)) + Ok(pair) } fn restart_service(&mut self, idx: usize, factory_id: usize) { @@ -419,7 +509,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Unavailable { trace!( "Service {:?} is available", - self.factories[srv.factory].name(idx) + self.factories[srv.factory_idx].name(idx) ); srv.status = WorkerServiceStatus::Available; } @@ -430,7 +520,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Available { trace!( "Service {:?} is unavailable", - self.factories[srv.factory].name(idx) + self.factories[srv.factory_idx].name(idx) ); srv.status = WorkerServiceStatus::Unavailable; } @@ -438,10 +528,10 @@ impl ServerWorker { Poll::Ready(Err(_)) => { error!( "Service {:?} readiness check returned error, restarting", - self.factories[srv.factory].name(idx) + self.factories[srv.factory_idx].name(idx) ); srv.status = WorkerServiceStatus::Failed; - return Err((idx, srv.factory)); + return Err((idx, srv.factory_idx)); } } } @@ -484,7 +574,6 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { - trace!("stopping ServerWorker Arbiter"); Arbiter::try_current().as_ref().map(ArbiterHandle::stop); } } @@ -496,7 +585,8 @@ impl Future for ServerWorker { let this = self.as_mut().get_mut(); // `StopWorker` message handler - if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) + if let Poll::Ready(Some(Stop { graceful, tx })) = + Pin::new(&mut this.stop_rx).poll_recv(cx) { let num = this.counter.total(); if num == 0 { @@ -559,7 +649,7 @@ impl Future for ServerWorker { } WorkerState::Shutdown(ref mut shutdown) => { // drop all pending connections in rx channel. - while let Poll::Ready(Some(conn)) = Pin::new(&mut this.rx).poll_recv(cx) { + while let Poll::Ready(Some(conn)) = Pin::new(&mut this.conn_rx).poll_recv(cx) { // WorkerCounterGuard is needed as Accept thread has incremented counter. // It's guard's job to decrement the counter together with drop of Conn. let guard = this.counter.guard(); @@ -606,7 +696,7 @@ impl Future for ServerWorker { } // handle incoming io stream - match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { + match ready!(Pin::new(&mut this.conn_rx).poll_recv(cx)) { Some(msg) => { let guard = this.counter.guard(); let _ = this.services[msg.token].service.call((guard, msg.io)); @@ -617,3 +707,19 @@ impl Future for ServerWorker { } } } + +fn wrap_worker_services( + services: Vec<(usize, usize, BoxedServerService)>, +) -> Vec { + services + .into_iter() + .fold(Vec::new(), |mut services, (idx, token, service)| { + assert_eq!(token, services.len()); + services.push(WorkerService { + factory_idx: idx, + service, + status: WorkerServiceStatus::Unavailable, + }); + services + }) +} diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 0a8cd2ae..9a14e78a 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -33,28 +33,63 @@ fn test_bind() { })? .run(); - let _ = tx.send((srv.handle(), actix_rt::System::current())); + let _ = tx.send(srv.handle()); srv.await }) }); - let (srv, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); + + thread::sleep(Duration::from_millis(500)); + assert!(net::TcpStream::connect(addr).is_ok()); + + let _ = srv.stop(true); + h.join().unwrap().unwrap(); +} + +#[test] +fn plain_tokio_runtime() { + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let h = thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + rt.block_on(async { + let srv = Server::build() + .workers(1) + .disable_signals() + .bind("test", addr, move || { + fn_service(|_| async { Ok::<_, ()>(()) }) + })? + .run(); + + tx.send(srv.handle()).unwrap(); + + srv.await + }) + }); + + let srv = rx.recv().unwrap(); thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); let _ = srv.stop(true); - sys.stop(); h.join().unwrap().unwrap(); } #[test] fn test_listen() { let addr = unused_addr(); + let lst = net::TcpListener::bind(addr).unwrap(); + let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let lst = net::TcpListener::bind(addr)?; actix_rt::System::new().block_on(async { let srv = Server::build() .disable_signals() @@ -64,19 +99,18 @@ fn test_listen() { })? .run(); - let _ = tx.send((srv.handle(), actix_rt::System::current())); + let _ = tx.send(srv.handle()); srv.await }) }); - let (srv, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); let _ = srv.stop(true); - sys.stop(); h.join().unwrap().unwrap(); } @@ -283,12 +317,12 @@ async fn test_service_restart() { .workers(1) .run(); - let _ = tx.send((srv.handle(), actix_rt::System::current())); + let _ = tx.send(srv.handle()); srv.await }) }); - let (srv, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); for _ in 0..5 { TcpStream::connect(addr1) @@ -311,7 +345,6 @@ async fn test_service_restart() { assert!(num2_clone.load(Ordering::SeqCst) > 5); let _ = srv.stop(false); - sys.stop(); h.join().unwrap().unwrap(); } @@ -388,13 +421,13 @@ async fn worker_restart() { .workers(2) .run(); - let _ = tx.send((srv.handle(), actix_rt::System::current())); + let _ = tx.send(srv.handle()); srv.await }) }); - let (srv, sys) = rx.recv().unwrap(); + let srv = rx.recv().unwrap(); sleep(Duration::from_secs(3)).await; @@ -452,6 +485,5 @@ async fn worker_restart() { stream.shutdown().await.unwrap(); let _ = srv.stop(false); - sys.stop(); h.join().unwrap().unwrap(); } diff --git a/actix-service/src/macros.rs b/actix-service/src/macros.rs index 6cf3ef08..503cf116 100644 --- a/actix-service/src/macros.rs +++ b/actix-service/src/macros.rs @@ -1,7 +1,7 @@ /// An implementation of [`poll_ready`]() that always signals readiness. /// /// This should only be used for basic leaf services that have no concept of un-readiness. -/// For wrapper or other serivice types, use [`forward_ready!`] for simple cases or write a bespoke +/// For wrapper or other service types, use [`forward_ready!`] for simple cases or write a bespoke /// `poll_ready` implementation. /// /// [`poll_ready`]: crate::Service::poll_ready diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index a94706a2..ed858378 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -23,3 +23,4 @@ local-waker = "0.1" [dev-dependencies] actix-rt = "2.0.0" futures-util = { version = "0.3.7", default-features = false } +static_assertions = "1.1" diff --git a/actix-utils/src/future/ready.rs b/actix-utils/src/future/ready.rs index 4a01ada3..678d6304 100644 --- a/actix-utils/src/future/ready.rs +++ b/actix-utils/src/future/ready.rs @@ -103,10 +103,16 @@ pub fn err(err: E) -> Ready> { #[cfg(test)] mod tests { + use std::rc::Rc; + use futures_util::task::noop_waker; + use static_assertions::{assert_impl_all, assert_not_impl_all}; use super::*; + assert_impl_all!(Ready<()>: Send, Sync, Clone); + assert_not_impl_all!(Ready>: Send, Sync); + #[test] #[should_panic] fn multiple_poll_panics() { From 58a67ade328a1eb8a5d869b41c2f03c4f562a241 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 15 Nov 2021 02:33:13 +0000 Subject: [PATCH 6/9] improve docs of system_exit --- actix-server/src/builder.rs | 2 +- actix-server/src/handle.rs | 2 ++ actix-server/src/server.rs | 4 ++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index dbf00303..4f4d6e26 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -113,7 +113,7 @@ impl ServerBuilder { self.max_concurrent_connections(num) } - /// Stop Actix system. + /// Stop Actix `System` after server shutdown. pub fn system_exit(mut self) -> Self { self.exit = true; self diff --git a/actix-server/src/handle.rs b/actix-server/src/handle.rs index 49d8eb01..53f00bee 100644 --- a/actix-server/src/handle.rs +++ b/actix-server/src/handle.rs @@ -42,10 +42,12 @@ impl ServerHandle { /// Stop incoming connection processing, stop all workers and exit. pub fn stop(&self, graceful: bool) -> impl Future { let (tx, rx) = oneshot::channel(); + let _ = self.cmd_tx.send(ServerCommand::Stop { graceful, completion: Some(tx), }); + async { let _ = rx.await; } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 9611062a..08036eeb 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -196,11 +196,11 @@ impl Future for Server { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.as_mut().get_mut() { - Server::Error(err) => Poll::Ready(Err(err + Self::Error(err) => Poll::Ready(Err(err .take() .expect("Server future cannot be polled after error"))), - Server::Server(inner) => { + Self::Server(inner) => { // poll Signals if let Some(ref mut signals) = inner.signals { if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { From 443a328fb44a996349c4e3979bf0acf3ddd65b8d Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 15 Nov 2021 02:39:55 +0000 Subject: [PATCH 7/9] prepare actix-server release 2.0.0-beta.9 --- actix-rt/CHANGES.md | 2 +- actix-server/CHANGES.md | 10 ++++++++-- actix-server/Cargo.toml | 2 +- actix-tls/Cargo.toml | 2 +- 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 4a0c5cfb..efb78d53 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -108,7 +108,7 @@ [#129]: https://github.com/actix/actix-net/issues/129 -## 1.1.0 - 2020-04-08 (YANKED) +## 1.1.0 - 2020-04-08 _(YANKED)_ * Expose `System::is_set` to check if current system has ben started [#99] * Add `Arbiter::is_running` to check if event loop is running [#124] * Add `Arbiter::local_join` associated function diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 50dd65e9..51da40f2 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -3,13 +3,19 @@ ## Unreleased - 2021-xx-xx -## 2.0.0-beta.8 - 2021-11-05 +## 2.0.0-beta.9 - 2021-11-15 +* Restore `Arbiter` support lost in `beta.8`. [#417] + +[#417]: https://github.com/actix/actix-net/pull/417 + + +## 2.0.0-beta.8 - 2021-11-05 _(YANKED)_ * Fix non-unix signal handler. [#410] [#410]: https://github.com/actix/actix-net/pull/410 -## 2.0.0-beta.7 - 2021-11-05 +## 2.0.0-beta.7 - 2021-11-05 _(YANKED)_ * Server can be started in regular Tokio runtime. [#408] * Expose new `Server` type whose `Future` impl resolves when server stops. [#408] * Rename `Server` to `ServerHandle`. [#407] diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 66e77c2f..0c369c81 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-server" -version = "2.0.0-beta.8" +version = "2.0.0-beta.9" authors = [ "Nikolay Kim ", "fakeshadow <24548779@qq.com>", diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index b7903fc1..313e642d 100755 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -62,7 +62,7 @@ tokio-native-tls = { version = "0.3", optional = true } [dev-dependencies] actix-rt = "2.2.0" -actix-server = "2.0.0-beta.8" +actix-server = "2.0.0-beta.9" bytes = "1" env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } From 0b0cbd53886c031891618b39b9b49d3fc7febead Mon Sep 17 00:00:00 2001 From: Alexander Polakov Date: Mon, 15 Nov 2021 13:39:42 +0300 Subject: [PATCH 8/9] actix-tls: allow getting uri from Connect (#415) Co-authored-by: Rob Ede --- actix-tls/CHANGES.md | 3 +++ actix-tls/src/connect/connect.rs | 27 +++++++++++++++++++-------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/actix-tls/CHANGES.md b/actix-tls/CHANGES.md index d3d1f761..ad991d42 100644 --- a/actix-tls/CHANGES.md +++ b/actix-tls/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +* Add `Connect::request` for getting a reference to the connection request. [#415] + +[#415]: https://github.com/actix/actix-net/pull/415 ## 3.0.0-beta.7 - 2021-10-20 diff --git a/actix-tls/src/connect/connect.rs b/actix-tls/src/connect/connect.rs index 730486cf..65d9e05e 100755 --- a/actix-tls/src/connect/connect.rs +++ b/actix-tls/src/connect/connect.rs @@ -63,16 +63,16 @@ impl From> for ConnectAddrs { /// Connection info. #[derive(Debug, PartialEq, Eq, Hash)] -pub struct Connect { - pub(crate) req: T, +pub struct Connect { + pub(crate) req: R, pub(crate) port: u16, pub(crate) addr: ConnectAddrs, pub(crate) local_addr: Option, } -impl Connect { +impl Connect { /// Create `Connect` instance by splitting the string by ':' and convert the second part to u16 - pub fn new(req: T) -> Connect { + pub fn new(req: R) -> Connect { let (_, port) = parse_host(req.hostname()); Connect { @@ -85,7 +85,7 @@ impl Connect { /// Create new `Connect` instance from host and address. Connector skips name resolution stage /// for such connect messages. - pub fn with_addr(req: T, addr: SocketAddr) -> Connect { + pub fn with_addr(req: R, addr: SocketAddr) -> Connect { Connect { req, port: 0, @@ -155,15 +155,20 @@ impl Connect { ConnectAddrs::Multi(addrs) => ConnectAddrsIter::MultiOwned(addrs.into_iter()), } } + + /// Returns a reference to the connection request. + pub fn request(&self) -> &R { + &self.req + } } -impl From for Connect { - fn from(addr: T) -> Self { +impl From for Connect { + fn from(addr: R) -> Self { Connect::new(addr) } } -impl fmt::Display for Connect { +impl fmt::Display for Connect { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}:{}", self.hostname(), self.port()) } @@ -347,4 +352,10 @@ mod tests { IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) ) } + + #[test] + fn request_ref() { + let conn = Connect::new("hello"); + assert_eq!(conn.request(), &"hello") + } } From 54d1d9e520b575b58b104b0fceaf103727c9d542 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 15 Nov 2021 17:55:23 +0000 Subject: [PATCH 9/9] prepare actix-tls release 3.0.0-beta.8 --- actix-tls/CHANGES.md | 3 +++ actix-tls/Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/actix-tls/CHANGES.md b/actix-tls/CHANGES.md index ad991d42..ae52fd0f 100644 --- a/actix-tls/CHANGES.md +++ b/actix-tls/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx + + +## 3.0.0-beta.8 - 2021-11-15 * Add `Connect::request` for getting a reference to the connection request. [#415] [#415]: https://github.com/actix/actix-net/pull/415 diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 313e642d..9abe10af 100755 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-tls" -version = "3.0.0-beta.7" +version = "3.0.0-beta.8" authors = ["Nikolay Kim "] description = "TLS acceptor and connector services for Actix ecosystem" keywords = ["network", "tls", "ssl", "async", "transport"]