deprecated Arbiter::local_join. Commented related code for future removal

This commit is contained in:
fakeshadow 2020-12-27 06:15:45 +08:00
parent d64bc403a8
commit ccf0f08538
3 changed files with 97 additions and 88 deletions

View File

@ -17,10 +17,10 @@ path = "src/lib.rs"
[dependencies] [dependencies]
actix-macros = "0.1.0" actix-macros = "0.1.0"
futures-channel = "0.3.4"
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } futures-channel = "0.3.7"
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
[dev-dependencies] [dev-dependencies]
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
tokio = { version = "0.2.6", features = ["full"] } tokio = { version = "0.2.6", features = ["full"] }

View File

@ -9,18 +9,23 @@ use std::{fmt, thread};
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures_channel::oneshot::{channel, Canceled, Sender}; use futures_channel::oneshot::{channel, Canceled, Sender};
use futures_util::stream::FuturesUnordered; // use futures_util::stream::FuturesUnordered;
use tokio::stream::{Stream, StreamExt}; // use tokio::task::JoinHandle;
use tokio::task::{JoinHandle, LocalSet}; // use tokio::stream::StreamExt;
use tokio::stream::Stream;
use tokio::task::LocalSet;
use crate::runtime::Runtime; use crate::runtime::Runtime;
use crate::system::System; use crate::system::System;
thread_local!( thread_local!(
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None); static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
/// stores join handle for spawned async tasks. // TODO: Commented out code are for Arbiter::local_join function.
static HANDLE: RefCell<FuturesUnordered<JoinHandle<()>>> = // It can be safely removed if this function is not used in actix-*.
RefCell::new(FuturesUnordered::new()); //
// /// stores join handle for spawned async tasks.
// static HANDLE: RefCell<FuturesUnordered<JoinHandle<()>>> =
// RefCell::new(FuturesUnordered::new());
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new()); static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
); );
@ -150,11 +155,12 @@ impl Arbiter {
where where
F: Future<Output = ()> + 'static, F: Future<Output = ()> + 'static,
{ {
HANDLE.with(|handle| { // HANDLE.with(|handle| {
let handle = handle.borrow(); // let handle = handle.borrow();
handle.push(tokio::task::spawn_local(future)); // handle.push(tokio::task::spawn_local(future));
}); // });
let _ = tokio::task::spawn_local(CleanupPending); // let _ = tokio::task::spawn_local(CleanupPending);
let _ = tokio::task::spawn_local(future);
} }
/// Executes a future on the current thread. This does not create a new Arbiter /// Executes a future on the current thread. This does not create a new Arbiter
@ -274,29 +280,31 @@ impl Arbiter {
/// Returns a future that will be completed once all currently spawned futures /// Returns a future that will be completed once all currently spawned futures
/// have completed. /// have completed.
pub fn local_join() -> impl Future<Output = ()> { #[deprecated(since = "1.2.0", note = "Arbiter::local_join function is removed.")]
let handle = HANDLE.with(|fut| std::mem::take(&mut *fut.borrow_mut())); pub async fn local_join() {
async move { // let handle = HANDLE.with(|fut| std::mem::take(&mut *fut.borrow_mut()));
handle.collect::<Vec<_>>().await; // async move {
} // handle.collect::<Vec<_>>().await;
// }
unimplemented!("Arbiter::local_join function is removed.")
} }
} }
/// Future used for cleaning-up already finished `JoinHandle`s // /// Future used for cleaning-up already finished `JoinHandle`s
/// from the `PENDING` list so the vector doesn't grow indefinitely // /// from the `PENDING` list so the vector doesn't grow indefinitely
struct CleanupPending; // struct CleanupPending;
//
impl Future for CleanupPending { // impl Future for CleanupPending {
type Output = (); // type Output = ();
//
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
HANDLE.with(move |handle| { // HANDLE.with(move |handle| {
recycle_join_handle(&mut *handle.borrow_mut(), cx); // recycle_join_handle(&mut *handle.borrow_mut(), cx);
}); // });
//
Poll::Ready(()) // Poll::Ready(())
} // }
} // }
struct ArbiterController { struct ArbiterController {
rx: UnboundedReceiver<ArbiterCommand>, rx: UnboundedReceiver<ArbiterCommand>,
@ -325,11 +333,12 @@ impl Future for ArbiterController {
Poll::Ready(Some(item)) => match item { Poll::Ready(Some(item)) => match item {
ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Stop => return Poll::Ready(()),
ArbiterCommand::Execute(fut) => { ArbiterCommand::Execute(fut) => {
HANDLE.with(|handle| { // HANDLE.with(|handle| {
let mut handle = handle.borrow_mut(); // let mut handle = handle.borrow_mut();
handle.push(tokio::task::spawn_local(fut)); // handle.push(tokio::task::spawn_local(fut));
recycle_join_handle(&mut *handle, cx); // recycle_join_handle(&mut *handle, cx);
}); // });
tokio::task::spawn_local(fut);
} }
ArbiterCommand::ExecuteFn(f) => { ArbiterCommand::ExecuteFn(f) => {
f.call_box(); f.call_box();
@ -341,19 +350,19 @@ impl Future for ArbiterController {
} }
} }
fn recycle_join_handle(handle: &mut FuturesUnordered<JoinHandle<()>>, cx: &mut Context<'_>) { // fn recycle_join_handle(handle: &mut FuturesUnordered<JoinHandle<()>>, cx: &mut Context<'_>) {
let _ = Pin::new(&mut *handle).poll_next(cx); // let _ = Pin::new(&mut *handle).poll_next(cx);
//
// Try to recycle more join handles and free up memory. // // Try to recycle more join handles and free up memory.
// // //
// this is a guess. The yield limit for FuturesUnordered is 32. // // this is a guess. The yield limit for FuturesUnordered is 32.
// So poll an extra 3 times would make the total poll below 128. // // So poll an extra 3 times would make the total poll below 128.
if handle.len() > 64 { // if handle.len() > 64 {
(0..3).for_each(|_| { // (0..3).for_each(|_| {
let _ = Pin::new(&mut *handle).poll_next(cx); // let _ = Pin::new(&mut *handle).poll_next(cx);
}) // })
} // }
} // }
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum SystemCommand { pub(crate) enum SystemCommand {

View File

@ -62,42 +62,42 @@ fn join_another_arbiter() {
); );
} }
#[test] // #[test]
fn join_current_arbiter() { // fn join_current_arbiter() {
let time = Duration::from_secs(2); // let time = Duration::from_secs(2);
//
let instant = Instant::now(); // let instant = Instant::now();
actix_rt::System::new("test_join_current_arbiter").block_on(async move { // actix_rt::System::new("test_join_current_arbiter").block_on(async move {
actix_rt::spawn(async move { // actix_rt::spawn(async move {
tokio::time::delay_for(time).await; // tokio::time::delay_for(time).await;
actix_rt::Arbiter::current().stop(); // actix_rt::Arbiter::current().stop();
}); // });
actix_rt::Arbiter::local_join().await; // actix_rt::Arbiter::local_join().await;
}); // });
assert!( // assert!(
instant.elapsed() >= time, // instant.elapsed() >= time,
"Join on current arbiter should wait for all spawned futures" // "Join on current arbiter should wait for all spawned futures"
); // );
//
let large_timer = Duration::from_secs(20); // let large_timer = Duration::from_secs(20);
let instant = Instant::now(); // let instant = Instant::now();
actix_rt::System::new("test_join_current_arbiter").block_on(async move { // actix_rt::System::new("test_join_current_arbiter").block_on(async move {
actix_rt::spawn(async move { // actix_rt::spawn(async move {
tokio::time::delay_for(time).await; // tokio::time::delay_for(time).await;
actix_rt::Arbiter::current().stop(); // actix_rt::Arbiter::current().stop();
}); // });
let f = actix_rt::Arbiter::local_join(); // let f = actix_rt::Arbiter::local_join();
actix_rt::spawn(async move { // actix_rt::spawn(async move {
tokio::time::delay_for(large_timer).await; // tokio::time::delay_for(large_timer).await;
actix_rt::Arbiter::current().stop(); // actix_rt::Arbiter::current().stop();
}); // });
f.await; // f.await;
}); // });
assert!( // assert!(
instant.elapsed() < large_timer, // instant.elapsed() < large_timer,
"local_join should await only for the already spawned futures" // "local_join should await only for the already spawned futures"
); // );
} // }
#[test] #[test]
fn non_static_block_on() { fn non_static_block_on() {