diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 051e16a8..57710a7f 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -17,10 +17,10 @@ path = "src/lib.rs" [dependencies] actix-macros = "0.1.0" -futures-channel = "0.3.4" -futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } + +futures-channel = "0.3.7" tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] } [dev-dependencies] -futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } +futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] } tokio = { version = "0.2.6", features = ["full"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 4e0cb8e0..3fe81b99 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -9,18 +9,23 @@ use std::{fmt, thread}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::oneshot::{channel, Canceled, Sender}; -use futures_util::stream::FuturesUnordered; -use tokio::stream::{Stream, StreamExt}; -use tokio::task::{JoinHandle, LocalSet}; +// use futures_util::stream::FuturesUnordered; +// use tokio::task::JoinHandle; +// use tokio::stream::StreamExt; +use tokio::stream::Stream; +use tokio::task::LocalSet; use crate::runtime::Runtime; use crate::system::System; thread_local!( static ADDR: RefCell> = RefCell::new(None); - /// stores join handle for spawned async tasks. - static HANDLE: RefCell>> = - RefCell::new(FuturesUnordered::new()); + // TODO: Commented out code are for Arbiter::local_join function. + // It can be safely removed if this function is not used in actix-*. + // + // /// stores join handle for spawned async tasks. + // static HANDLE: RefCell>> = + // RefCell::new(FuturesUnordered::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -150,11 +155,12 @@ impl Arbiter { where F: Future + 'static, { - HANDLE.with(|handle| { - let handle = handle.borrow(); - handle.push(tokio::task::spawn_local(future)); - }); - let _ = tokio::task::spawn_local(CleanupPending); + // HANDLE.with(|handle| { + // let handle = handle.borrow(); + // handle.push(tokio::task::spawn_local(future)); + // }); + // let _ = tokio::task::spawn_local(CleanupPending); + let _ = tokio::task::spawn_local(future); } /// Executes a future on the current thread. This does not create a new Arbiter @@ -274,29 +280,31 @@ impl Arbiter { /// Returns a future that will be completed once all currently spawned futures /// have completed. - pub fn local_join() -> impl Future { - let handle = HANDLE.with(|fut| std::mem::take(&mut *fut.borrow_mut())); - async move { - handle.collect::>().await; - } + #[deprecated(since = "1.2.0", note = "Arbiter::local_join function is removed.")] + pub async fn local_join() { + // let handle = HANDLE.with(|fut| std::mem::take(&mut *fut.borrow_mut())); + // async move { + // handle.collect::>().await; + // } + unimplemented!("Arbiter::local_join function is removed.") } } -/// Future used for cleaning-up already finished `JoinHandle`s -/// from the `PENDING` list so the vector doesn't grow indefinitely -struct CleanupPending; - -impl Future for CleanupPending { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - HANDLE.with(move |handle| { - recycle_join_handle(&mut *handle.borrow_mut(), cx); - }); - - Poll::Ready(()) - } -} +// /// Future used for cleaning-up already finished `JoinHandle`s +// /// from the `PENDING` list so the vector doesn't grow indefinitely +// struct CleanupPending; +// +// impl Future for CleanupPending { +// type Output = (); +// +// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +// HANDLE.with(move |handle| { +// recycle_join_handle(&mut *handle.borrow_mut(), cx); +// }); +// +// Poll::Ready(()) +// } +// } struct ArbiterController { rx: UnboundedReceiver, @@ -325,11 +333,12 @@ impl Future for ArbiterController { Poll::Ready(Some(item)) => match item { ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Execute(fut) => { - HANDLE.with(|handle| { - let mut handle = handle.borrow_mut(); - handle.push(tokio::task::spawn_local(fut)); - recycle_join_handle(&mut *handle, cx); - }); + // HANDLE.with(|handle| { + // let mut handle = handle.borrow_mut(); + // handle.push(tokio::task::spawn_local(fut)); + // recycle_join_handle(&mut *handle, cx); + // }); + tokio::task::spawn_local(fut); } ArbiterCommand::ExecuteFn(f) => { f.call_box(); @@ -341,19 +350,19 @@ impl Future for ArbiterController { } } -fn recycle_join_handle(handle: &mut FuturesUnordered>, cx: &mut Context<'_>) { - let _ = Pin::new(&mut *handle).poll_next(cx); - - // Try to recycle more join handles and free up memory. - // - // this is a guess. The yield limit for FuturesUnordered is 32. - // So poll an extra 3 times would make the total poll below 128. - if handle.len() > 64 { - (0..3).for_each(|_| { - let _ = Pin::new(&mut *handle).poll_next(cx); - }) - } -} +// fn recycle_join_handle(handle: &mut FuturesUnordered>, cx: &mut Context<'_>) { +// let _ = Pin::new(&mut *handle).poll_next(cx); +// +// // Try to recycle more join handles and free up memory. +// // +// // this is a guess. The yield limit for FuturesUnordered is 32. +// // So poll an extra 3 times would make the total poll below 128. +// if handle.len() > 64 { +// (0..3).for_each(|_| { +// let _ = Pin::new(&mut *handle).poll_next(cx); +// }) +// } +// } #[derive(Debug)] pub(crate) enum SystemCommand { diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index 1244e2ed..b3265476 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -62,42 +62,42 @@ fn join_another_arbiter() { ); } -#[test] -fn join_current_arbiter() { - let time = Duration::from_secs(2); - - let instant = Instant::now(); - actix_rt::System::new("test_join_current_arbiter").block_on(async move { - actix_rt::spawn(async move { - tokio::time::delay_for(time).await; - actix_rt::Arbiter::current().stop(); - }); - actix_rt::Arbiter::local_join().await; - }); - assert!( - instant.elapsed() >= time, - "Join on current arbiter should wait for all spawned futures" - ); - - let large_timer = Duration::from_secs(20); - let instant = Instant::now(); - actix_rt::System::new("test_join_current_arbiter").block_on(async move { - actix_rt::spawn(async move { - tokio::time::delay_for(time).await; - actix_rt::Arbiter::current().stop(); - }); - let f = actix_rt::Arbiter::local_join(); - actix_rt::spawn(async move { - tokio::time::delay_for(large_timer).await; - actix_rt::Arbiter::current().stop(); - }); - f.await; - }); - assert!( - instant.elapsed() < large_timer, - "local_join should await only for the already spawned futures" - ); -} +// #[test] +// fn join_current_arbiter() { +// let time = Duration::from_secs(2); +// +// let instant = Instant::now(); +// actix_rt::System::new("test_join_current_arbiter").block_on(async move { +// actix_rt::spawn(async move { +// tokio::time::delay_for(time).await; +// actix_rt::Arbiter::current().stop(); +// }); +// actix_rt::Arbiter::local_join().await; +// }); +// assert!( +// instant.elapsed() >= time, +// "Join on current arbiter should wait for all spawned futures" +// ); +// +// let large_timer = Duration::from_secs(20); +// let instant = Instant::now(); +// actix_rt::System::new("test_join_current_arbiter").block_on(async move { +// actix_rt::spawn(async move { +// tokio::time::delay_for(time).await; +// actix_rt::Arbiter::current().stop(); +// }); +// let f = actix_rt::Arbiter::local_join(); +// actix_rt::spawn(async move { +// tokio::time::delay_for(large_timer).await; +// actix_rt::Arbiter::current().stop(); +// }); +// f.await; +// }); +// assert!( +// instant.elapsed() < large_timer, +// "local_join should await only for the already spawned futures" +// ); +// } #[test] fn non_static_block_on() {