From c47924393439501dc4bc6c71aeb27d3b61f2e9f8 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 29 Oct 2020 00:15:17 +0800 Subject: [PATCH] remove RUNNING Q PENDING thread locals from actix-rt --- actix-rt/Cargo.toml | 4 +- actix-rt/src/arbiter.rs | 170 +++++++--------------------- actix-rt/src/builder.rs | 63 +++++------ actix-rt/src/lib.rs | 15 ++- actix-rt/src/runtime.rs | 11 +- actix-rt/tests/integration_tests.rs | 51 --------- actix-server/tests/test_server.rs | 107 +++++++++-------- actix-testing/src/lib.rs | 15 ++- 8 files changed, 152 insertions(+), 284 deletions(-) diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index b7d272cd..50da68b8 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -19,10 +19,8 @@ path = "src/lib.rs" actix-macros = "0.1.0" actix-threadpool = "0.3" futures-channel = { version = "0.3.4", default-features = false } -futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } -copyless = "0.1.4" -smallvec = "1" tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } [dev-dependencies] +futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } tokio = { version = "0.2.6", features = ["full"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index eff10ca3..1b94df2e 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -1,6 +1,7 @@ use std::any::{Any, TypeId}; -use std::cell::{Cell, RefCell}; +use std::cell::RefCell; use std::collections::HashMap; +use std::future::Future; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; @@ -8,24 +9,14 @@ use std::{fmt, thread}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::oneshot::{channel, Canceled, Sender}; -use futures_util::{ - future::{self, Future, FutureExt}, - stream::Stream, -}; +use tokio::stream::Stream; +use tokio::task::LocalSet; use crate::runtime::Runtime; use crate::system::System; -use copyless::BoxHelper; - -use smallvec::SmallVec; -pub use tokio::task::JoinHandle; - thread_local!( static ADDR: RefCell> = RefCell::new(None); - static RUNNING: Cell = Cell::new(false); - static Q: RefCell>>>> = RefCell::new(Vec::new()); - static PENDING: RefCell; 8]>> = RefCell::new(SmallVec::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -69,14 +60,14 @@ impl Default for Arbiter { } impl Arbiter { - pub(crate) fn new_system() -> Self { + pub(crate) fn new_system(local: &LocalSet) -> Self { let (tx, rx) = unbounded(); let arb = Arbiter::with_sender(tx); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - RUNNING.with(|cell| cell.set(false)); STORAGE.with(|cell| cell.borrow_mut().clear()); - Arbiter::spawn(ArbiterController { stop: None, rx }); + + local.spawn_local(ArbiterController { rx }); arb } @@ -91,8 +82,9 @@ impl Arbiter { } /// Check if current arbiter is running. + #[deprecated(note = "Thread local variables for running state of Arbiter is removed")] pub fn is_running() -> bool { - RUNNING.with(|cell| cell.get()) + false } /// Stop arbiter from continuing it's event loop. @@ -106,72 +98,47 @@ impl Arbiter { let id = COUNT.fetch_add(1, Ordering::Relaxed); let name = format!("actix-rt:worker:{}", id); let sys = System::current(); - let (arb_tx, arb_rx) = unbounded(); - let arb_tx2 = arb_tx.clone(); + let (tx, rx) = unbounded(); let handle = thread::Builder::new() .name(name.clone()) - .spawn(move || { - let mut rt = Runtime::new().expect("Can not create Runtime"); - let arb = Arbiter::with_sender(arb_tx); + .spawn({ + let tx = tx.clone(); + move || { + let mut rt = Runtime::new().expect("Can not create Runtime"); + let arb = Arbiter::with_sender(tx); - let (stop, stop_rx) = channel(); - RUNNING.with(|cell| cell.set(true)); - STORAGE.with(|cell| cell.borrow_mut().clear()); + STORAGE.with(|cell| cell.borrow_mut().clear()); - System::set_current(sys); + System::set_current(sys); - // start arbiter controller - rt.spawn(ArbiterController { - stop: Some(stop), - rx: arb_rx, - }); - ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); + ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); - // register arbiter - let _ = System::current() - .sys() - .unbounded_send(SystemCommand::RegisterArbiter(id, arb)); + // register arbiter + let _ = System::current() + .sys() + .unbounded_send(SystemCommand::RegisterArbiter(id, arb)); - // run loop - let _ = match rt.block_on(stop_rx) { - Ok(code) => code, - Err(_) => 1, - }; + // start arbiter controller + // run loop + rt.block_on(ArbiterController { rx }); - // unregister arbiter - let _ = System::current() - .sys() - .unbounded_send(SystemCommand::UnregisterArbiter(id)); + // unregister arbiter + let _ = System::current() + .sys() + .unbounded_send(SystemCommand::UnregisterArbiter(id)); + } }) .unwrap_or_else(|err| { panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err) }); Arbiter { - sender: arb_tx2, + sender: tx, thread_handle: Some(handle), } } - pub(crate) fn run_system(rt: Option<&Runtime>) { - RUNNING.with(|cell| cell.set(true)); - Q.with(|cell| { - let mut v = cell.borrow_mut(); - for fut in v.drain(..) { - if let Some(rt) = rt { - rt.spawn(fut); - } else { - tokio::task::spawn_local(fut); - } - } - }); - } - - pub(crate) fn stop_system() { - RUNNING.with(|cell| cell.set(false)); - } - /// Spawn a future on the current thread. This does not create a new Arbiter /// or Arbiter address, it is simply a helper for spawning futures on the current /// thread. @@ -179,26 +146,7 @@ impl Arbiter { where F: Future + 'static, { - RUNNING.with(move |cell| { - if cell.get() { - // Spawn the future on running executor - let len = PENDING.with(move |cell| { - let mut p = cell.borrow_mut(); - p.push(tokio::task::spawn_local(future)); - p.len() - }); - if len > 7 { - // Before reaching the inline size - tokio::task::spawn_local(CleanupPending); - } - } else { - // Box the future and push it to the queue, this results in double boxing - // because the executor boxes the future again, but works for now - Q.with(move |cell| { - cell.borrow_mut().push(Pin::from(Box::alloc().init(future))) - }); - } - }); + tokio::task::spawn_local(future); } /// Executes a future on the current thread. This does not create a new Arbiter @@ -209,7 +157,9 @@ impl Arbiter { F: FnOnce() -> R + 'static, R: Future + 'static, { - Arbiter::spawn(future::lazy(|_| f()).flatten()) + Arbiter::spawn(async { + f(); + }) } /// Send a future to the Arbiter's thread, and spawn it. @@ -316,40 +266,13 @@ impl Arbiter { /// Returns a future that will be completed once all currently spawned futures /// have completed. - pub fn local_join() -> impl Future { - PENDING.with(move |cell| { - let current = cell.replace(SmallVec::new()); - future::join_all(current).map(|_| ()) - }) - } -} - -/// Future used for cleaning-up already finished `JoinHandle`s -/// from the `PENDING` list so the vector doesn't grow indefinitely -struct CleanupPending; - -impl Future for CleanupPending { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - PENDING.with(move |cell| { - let mut pending = cell.borrow_mut(); - let mut i = 0; - while i != pending.len() { - if let Poll::Ready(_) = Pin::new(&mut pending[i]).poll(cx) { - pending.remove(i); - } else { - i += 1; - } - } - }); - - Poll::Ready(()) + #[deprecated(note = "local_join has been removed")] + pub async fn local_join() { + unimplemented!() } } struct ArbiterController { - stop: Option>, rx: UnboundedReceiver, } @@ -374,22 +297,9 @@ impl Future for ArbiterController { match Pin::new(&mut self.rx).poll_next(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(item)) => match item { - ArbiterCommand::Stop => { - if let Some(stop) = self.stop.take() { - let _ = stop.send(0); - }; - return Poll::Ready(()); - } + ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Execute(fut) => { - let len = PENDING.with(move |cell| { - let mut p = cell.borrow_mut(); - p.push(tokio::task::spawn_local(fut)); - p.len() - }); - if len > 7 { - // Before reaching the inline size - tokio::task::spawn_local(CleanupPending); - } + tokio::task::spawn_local(fut); } ArbiterCommand::ExecuteFn(f) => { f.call_box(); diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index f4d9b1bf..d0e2a627 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,9 +1,9 @@ use std::borrow::Cow; +use std::future::Future; use std::io; use futures_channel::mpsc::unbounded; use futures_channel::oneshot::{channel, Receiver}; -use futures_util::future::{lazy, Future, FutureExt}; use tokio::task::LocalSet; use crate::arbiter::{Arbiter, SystemArbiter}; @@ -74,7 +74,8 @@ impl Builder { let (stop_tx, stop) = channel(); let (sys_sender, sys_receiver) = unbounded(); - let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); + let system = + System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic); // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); @@ -92,16 +93,21 @@ impl Builder { let (stop_tx, stop) = channel(); let (sys_sender, sys_receiver) = unbounded(); - let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); + let mut rt = Runtime::new().unwrap(); + + let system = System::construct( + sys_sender, + Arbiter::new_system(rt.local()), + self.stop_on_panic, + ); // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); - let mut rt = Runtime::new().unwrap(); rt.spawn(arb); // init system arbiter and run configuration method - rt.block_on(lazy(move |_| f())); + rt.block_on(async { f() }); SystemRunner { rt, stop, system } } @@ -120,27 +126,21 @@ impl AsyncSystemRunner { let AsyncSystemRunner { stop, .. } = self; // run loop - lazy(|_| { - Arbiter::run_system(None); - async { - let res = match stop.await { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } + async { + match stop.await { + Ok(code) => { + if code != 0 { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Non-zero exit code: {}", code), + )) + } else { + Ok(()) } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - }; - Arbiter::stop_system(); - res + } + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), } - }) - .flatten() + } } } @@ -160,8 +160,7 @@ impl SystemRunner { let SystemRunner { mut rt, stop, .. } = self; // run loop - Arbiter::run_system(Some(&rt)); - let result = match rt.block_on(stop) { + match rt.block_on(stop) { Ok(code) => { if code != 0 { Err(io::Error::new( @@ -173,19 +172,15 @@ impl SystemRunner { } } Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - }; - Arbiter::stop_system(); - result + } } /// Execute a future and wait for result. + #[inline] pub fn block_on(&mut self, fut: F) -> O where - F: Future + 'static, + F: Future, { - Arbiter::run_system(Some(&self.rt)); - let res = self.rt.block_on(fut); - Arbiter::stop_system(); - res + self.rt.block_on(fut) } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index aef78f12..d3b64187 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -2,6 +2,8 @@ #![deny(rust_2018_idioms, warnings)] #![allow(clippy::type_complexity)] +use std::future::Future; + #[cfg(not(test))] // Work around for rust-lang/rust#62127 pub use actix_macros::{main, test}; @@ -23,15 +25,12 @@ pub use actix_threadpool as blocking; /// # Panics /// /// This function panics if actix system is not running. +#[inline] pub fn spawn(f: F) where - F: futures_util::future::Future + 'static, + F: Future + 'static, { - if !System::is_set() { - panic!("System is not running"); - } - - Arbiter::spawn(f); + Arbiter::spawn(f) } /// Asynchronous signal handling @@ -64,3 +63,7 @@ pub mod time { pub use tokio::time::{interval, interval_at, Interval}; pub use tokio::time::{timeout, Timeout}; } + +pub mod task { + pub use tokio::task::yield_now; +} diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index bbafb6f0..1e782e0b 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -24,10 +24,11 @@ impl Runtime { .basic_scheduler() .build()?; - Ok(Runtime { - rt, - local: LocalSet::new(), - }) + Ok(Runtime { rt, local: LocalSet::new() }) + } + + pub(super) fn local(&self) -> &LocalSet { + &self.local } /// Spawn a future onto the single-threaded runtime. @@ -84,7 +85,7 @@ impl Runtime { /// complete execution by calling `block_on` or `run`. pub fn block_on(&mut self, f: F) -> F::Output where - F: Future + 'static, + F: Future, { self.local.block_on(&mut self.rt, f) } diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index 8e775bab..e3296e89 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -1,19 +1,5 @@ use std::time::{Duration, Instant}; -#[test] -fn start_and_stop() { - actix_rt::System::new("start_and_stop").block_on(async move { - assert!( - actix_rt::Arbiter::is_running(), - "System doesn't seem to have started" - ); - }); - assert!( - !actix_rt::Arbiter::is_running(), - "System doesn't seem to have stopped" - ); -} - #[test] fn await_for_timer() { let time = Duration::from_secs(2); @@ -75,40 +61,3 @@ fn join_another_arbiter() { "Premature stop of arbiter should conclude regardless of it's current state" ); } - -#[test] -fn join_current_arbiter() { - let time = Duration::from_secs(2); - - let instant = Instant::now(); - actix_rt::System::new("test_join_current_arbiter").block_on(async move { - actix_rt::spawn(async move { - tokio::time::delay_for(time).await; - actix_rt::Arbiter::current().stop(); - }); - actix_rt::Arbiter::local_join().await; - }); - assert!( - instant.elapsed() >= time, - "Join on current arbiter should wait for all spawned futures" - ); - - let large_timer = Duration::from_secs(20); - let instant = Instant::now(); - actix_rt::System::new("test_join_current_arbiter").block_on(async move { - actix_rt::spawn(async move { - tokio::time::delay_for(time).await; - actix_rt::Arbiter::current().stop(); - }); - let f = actix_rt::Arbiter::local_join(); - actix_rt::spawn(async move { - tokio::time::delay_for(large_timer).await; - actix_rt::Arbiter::current().stop(); - }); - f.await; - }); - assert!( - instant.elapsed() < large_timer, - "local_join should await only for the already spawned futures" - ); -} diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index ce309c94..e62953c7 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -22,13 +22,16 @@ fn test_bind() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new("test"); - let srv = Server::build() - .workers(1) - .disable_signals() - .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .start(); + let mut sys = actix_rt::System::new("test"); + + let srv = sys.block_on(async { + Server::build() + .workers(1) + .disable_signals() + .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) + .unwrap() + .start() + }); let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); }); @@ -46,14 +49,16 @@ fn test_listen() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new("test"); + let mut sys = actix_rt::System::new("test"); let lst = net::TcpListener::bind(addr).unwrap(); - Server::build() - .disable_signals() - .workers(1) - .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .start(); + sys.block_on(async { + Server::build() + .disable_signals() + .workers(1) + .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) + .unwrap() + .start() + }); let _ = tx.send(actix_rt::System::current()); let _ = sys.run(); }); @@ -78,19 +83,21 @@ fn test_start() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new("test"); - let srv: Server = Server::build() - .backlog(100) - .disable_signals() - .bind("test", addr, move || { - fn_service(|io: TcpStream| async move { - let mut f = Framed::new(io, BytesCodec); - f.send(Bytes::from_static(b"test")).await.unwrap(); - Ok::<_, ()>(()) + let mut sys = actix_rt::System::new("test"); + let srv = sys.block_on(async { + Server::build() + .backlog(100) + .disable_signals() + .bind("test", addr, move || { + fn_service(|io: TcpStream| async move { + let mut f = Framed::new(io, BytesCodec); + f.send(Bytes::from_static(b"test")).await.unwrap(); + Ok::<_, ()>(()) + }) }) - }) - .unwrap() - .start(); + .unwrap() + .start() + }); let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); @@ -144,29 +151,31 @@ fn test_configure() { let h = thread::spawn(move || { let num = num2.clone(); - let sys = actix_rt::System::new("test"); - let srv = Server::build() - .disable_signals() - .configure(move |cfg| { - let num = num.clone(); - let lst = net::TcpListener::bind(addr3).unwrap(); - cfg.bind("addr1", addr1) - .unwrap() - .bind("addr2", addr2) - .unwrap() - .listen("addr3", lst) - .apply(move |rt| { - let num = num.clone(); - rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); - rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); - rt.on_start(lazy(move |_| { - let _ = num.fetch_add(1, Relaxed); - })) - }) - }) - .unwrap() - .workers(1) - .start(); + let mut sys = actix_rt::System::new("test"); + let srv = sys.block_on(async { + Server::build() + .disable_signals() + .configure(move |cfg| { + let num = num.clone(); + let lst = net::TcpListener::bind(addr3).unwrap(); + cfg.bind("addr1", addr1) + .unwrap() + .bind("addr2", addr2) + .unwrap() + .listen("addr3", lst) + .apply(move |rt| { + let num = num.clone(); + rt.service("addr1", fn_service(|_| ok::<_, ()>(()))); + rt.service("addr3", fn_service(|_| ok::<_, ()>(()))); + rt.on_start(lazy(move |_| { + let _ = num.fetch_add(1, Relaxed); + })) + }) + }) + .unwrap() + .workers(1) + .start() + }); let _ = tx.send((srv, actix_rt::System::current())); let _ = sys.run(); }); diff --git a/actix-testing/src/lib.rs b/actix-testing/src/lib.rs index 3ec81061..4bf7015a 100644 --- a/actix-testing/src/lib.rs +++ b/actix-testing/src/lib.rs @@ -80,15 +80,18 @@ impl TestServer { // run server in separate thread thread::spawn(move || { - let sys = System::new("actix-test-server"); + let mut sys = System::new("actix-test-server"); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); - Server::build() - .listen("test", tcp, factory)? - .workers(1) - .disable_signals() - .start(); + sys.block_on(async { + Server::build() + .listen("test", tcp, factory) + .unwrap() + .workers(1) + .disable_signals() + .start() + }); tx.send((System::current(), local_addr)).unwrap(); sys.run()