diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 97084f05..ec336cbf 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -240,6 +240,15 @@ impl Arbiter { }) } + /// Try to get current running arbiter handle. + /// + /// Returns `None` if no Arbiter has been started. + /// + /// Contrary to [`current`](Self::current), this never panics. + pub fn try_current() -> Option { + HANDLE.with(|cell| cell.borrow().clone()) + } + /// Stop Arbiter from continuing it's event loop. /// /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 23d692a4..ebe0b347 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -130,7 +130,7 @@ impl System { /// /// Returns `None` if no System has been started. /// - /// Contrary to `current`, this never panics. + /// Contrary to [`current`](Self::current), this never panics. pub fn try_current() -> Option { CURRENT.with(|cell| cell.borrow().clone()) } diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 8b038da4..5e2b99ad 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -10,7 +10,7 @@ //! the length of each line it echos and the total size of data sent when the connection is closed. use std::{ - env, io, + io, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -25,10 +25,8 @@ use futures_util::future::ok; use log::{error, info}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -#[actix_rt::main] -async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "info"); - env_logger::init(); +async fn run() -> io::Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); let count = Arc::new(AtomicUsize::new(0)); @@ -85,6 +83,25 @@ async fn main() -> io::Result<()> { }) })? .workers(1) + // .system_exit() .run() .await } + +fn main() -> io::Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let ls = tokio::task::LocalSet::new(); + rt.block_on(ls.run_until(run()))?; + + Ok(()) +} + +// #[actix_rt::main] +// async fn main() -> io::Result<()> { +// run().await?; +// Ok(()) +// } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index a872853c..bb75a1be 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -156,13 +156,17 @@ impl Accept { srv: ServerHandle, handles: Vec, ) { - // Accept runs in its own thread and would want to spawn additional futures to current - // actix system. - let sys = System::current(); + // Accept runs in its own thread and might spawn additional futures to current system + let sys = System::try_current(); + thread::Builder::new() .name("actix-server accept loop".to_owned()) .spawn(move || { - System::set_current(sys); + // forward existing actix system context + if let Some(sys) = sys { + System::set_current(sys); + } + let (mut accept, mut sockets) = Accept::new_with_sockets(poll, waker, socks, handles, srv); @@ -479,10 +483,23 @@ impl Accept { // after the sleep a Timer interest is sent to Accept Poll let waker = self.waker.clone(); - System::current().arbiter().spawn(async move { - sleep(Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); + + match System::try_current() { + Some(sys) => { + sys.arbiter().spawn(async move { + sleep(Duration::from_millis(510)).await; + waker.wake(WakerInterest::Timer); + }); + } + + None => { + let rt = tokio::runtime::Handle::current(); + rt.spawn(async move { + sleep(Duration::from_millis(510)).await; + waker.wake(WakerInterest::Timer); + }); + } + } return; } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 7e3df9d8..36c79655 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -7,7 +7,7 @@ use std::{ }; use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; -use log::{error, info}; +use log::{error, info, trace}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver}, oneshot, @@ -160,6 +160,8 @@ impl ServerBuilder { { let sockets = bind_addr(addr, self.backlog)?; + trace!("binding server to: {:?}", &sockets); + for lst in sockets { let token = self.next_token(); self.services.push(StreamNewService::create( @@ -171,6 +173,7 @@ impl ServerBuilder { self.sockets .push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); } + Ok(self) } @@ -255,6 +258,8 @@ impl ServerBuilder { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { + trace!("start running server"); + for (_, name, lst) in &self.sockets { info!( r#"Starting service: "{}", workers: {}, listening on: {}"#, @@ -264,6 +269,8 @@ impl ServerBuilder { ); } + trace!("run server"); + // start workers let handles = (0..self.threads) .map(|idx| { @@ -301,8 +308,8 @@ impl ServerBuilder { idx: usize, waker_queue: WakerQueue, ) -> (WorkerHandleAccept, WorkerHandleServer) { + trace!("start server worker {}", idx); let services = self.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, services, waker_queue, self.worker_config) } @@ -384,7 +391,7 @@ impl ServerBuilder { if exit { sleep(Duration::from_millis(300)).await; - System::current().stop(); + System::try_current().as_ref().map(System::stop); } }); } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index f8550e18..39ae7914 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -14,7 +14,7 @@ use std::{ use actix_rt::{ spawn, time::{sleep, Instant, Sleep}, - Arbiter, + Arbiter, ArbiterHandle, System, }; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; @@ -23,12 +23,14 @@ use tokio::sync::{ oneshot, }; -use crate::join_all; -use crate::service::{BoxedServerService, InternalServiceFactory}; -use crate::socket::MioStream; -use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::{ + join_all, + service::{BoxedServerService, InternalServiceFactory}, + socket::MioStream, + waker_queue::{WakerInterest, WakerQueue}, +}; -/// Stop worker message. Returns `true` on successful graceful shutdown. +/// Stop worker message. Returns `true` on successful graceful shutdown /// and `false` if some connections still alive when shutdown execute. pub(crate) struct Stop { graceful: bool, @@ -273,6 +275,8 @@ impl ServerWorker { waker_queue: WakerQueue, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { + trace!("starting server worker {}", idx); + let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); @@ -289,63 +293,77 @@ impl ServerWorker { Arbiter::new() }; - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] - let arbiter = Arbiter::with_tokio_rt(move || { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap() - }); + // get actix system context if it is set + let sys = System::try_current(); - arbiter.spawn(async move { - let fut = factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { fut.await.map(|(t, s)| (idx, t, s)) } - }) - .collect::>(); + std::thread::Builder::new() + .name("eofibef".to_owned()) + .spawn(move || { + // forward existing actix system context + if let Some(sys) = sys { + System::set_current(sys); + } - // a second spawn to run !Send future tasks. - spawn(async move { - let res = join_all(fut) - .await - .into_iter() - .collect::, _>>(); - let services = match res { - Ok(res) => res - .into_iter() - .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token, services.len()); - services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - services + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap(); + + rt.block_on(tokio::task::LocalSet::new().run_until(async move { + let fut = factories + .iter() + .enumerate() + .map(|(idx, factory)| { + let fut = factory.create(); + async move { fut.await.map(|(t, s)| (idx, t, s)) } }) - .into_boxed_slice(), - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - return; - } - }; + .collect::>(); - // a third spawn to make sure ServerWorker runs as non boxed future. - spawn(ServerWorker { - rx, - rx2, - services, - counter: WorkerCounter::new(idx, waker_queue, counter_clone), - factories: factories.into_boxed_slice(), - state: Default::default(), - shutdown_timeout: config.shutdown_timeout, - }); - }); - }); + // a second spawn to run !Send future tasks. + spawn(async move { + let res = join_all(fut) + .await + .into_iter() + .collect::, _>>(); + let services = match res { + Ok(res) => res + .into_iter() + .fold(Vec::new(), |mut services, (factory, token, service)| { + assert_eq!(token, services.len()); + services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); + services + }) + .into_boxed_slice(), + Err(e) => { + error!("Can not start worker: {:?}", e); + Arbiter::try_current().as_ref().map(ArbiterHandle::stop); + return; + } + }; + + // a third spawn to make sure ServerWorker runs as non boxed future. + spawn(ServerWorker { + rx, + rx2, + services, + counter: WorkerCounter::new(idx, waker_queue, counter_clone), + factories: factories.into_boxed_slice(), + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, + }) + .await + .expect("task 3 panic"); + }) + .await + .expect("task 2 panic"); + })) + }) + .expect("worker thread error/panic"); handle_pair(idx, tx1, tx2, counter) } @@ -450,8 +468,9 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { + trace!("dropping ServerWorker"); // Stop the Arbiter ServerWorker runs on on drop. - Arbiter::current().stop(); + Arbiter::try_current().as_ref().map(ArbiterHandle::stop); } }