From f1685d82534e41a36ccff783e6af631d81db5247 Mon Sep 17 00:00:00 2001
From: Jonathas-Conceicao <jadoliveira@inf.ufpel.edu.br>
Date: Thu, 6 Feb 2020 00:36:00 -0300
Subject: [PATCH] Add Arbiter::local_join associated function

Arbiter::local_join function can be used to await for futures spawned
on current arbiter.

Signed-off-by: Jonathas-Conceicao <jadoliveira@inf.ufpel.edu.br>
---
 actix-rt/CHANGES.md            |  2 ++
 actix-rt/src/arbiter.rs        | 20 ++++++++++++++++--
 actix-rt/tests/wait_spawned.rs | 37 ++++++++++++++++++++++++++++++++++
 3 files changed, 57 insertions(+), 2 deletions(-)

diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md
index a7792455..a95057c2 100644
--- a/actix-rt/CHANGES.md
+++ b/actix-rt/CHANGES.md
@@ -4,6 +4,8 @@
 
 - Expose `System::is_set` to check if current system is running
 
+- Add `Arbiter::local_join` associated function to get be able to `await` for spawned futures
+
 ## [1.0.0] - 2019-12-11
 
 * Update dependencies
diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs
index 16534476..08d3089c 100644
--- a/actix-rt/src/arbiter.rs
+++ b/actix-rt/src/arbiter.rs
@@ -15,10 +15,13 @@ use crate::system::System;
 
 use copyless::BoxHelper;
 
+pub use tokio::task::JoinHandle;
+
 thread_local!(
     static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
     static RUNNING: Cell<bool> = Cell::new(false);
     static Q: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new());
+    static PENDING: RefCell<Vec<JoinHandle<()>>> = RefCell::new(Vec::new());
     static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
 );
 
@@ -170,7 +173,9 @@ impl Arbiter {
         RUNNING.with(move |cell| {
             if cell.get() {
                 // Spawn the future on running executor
-                tokio::task::spawn_local(future);
+                PENDING.with(move |cell| {
+                    cell.borrow_mut().push(tokio::task::spawn_local(future));
+                })
             } else {
                 // Box the future and push it to the queue, this results in double boxing
                 // because the executor boxes the future again, but works for now
@@ -294,6 +299,15 @@ impl Arbiter {
             Ok(())
         }
     }
+
+    /// Returns a future that will be completed once all currently spawned futures
+    /// have completed.
+    pub fn local_join() -> impl Future<Output = ()> {
+        PENDING.with(move |cell| {
+            let current = cell.replace(Vec::new());
+            future::join_all(current).map(|_| ())
+        })
+    }
 }
 
 struct ArbiterController {
@@ -329,7 +343,9 @@ impl Future for ArbiterController {
                         return Poll::Ready(());
                     }
                     ArbiterCommand::Execute(fut) => {
-                        tokio::task::spawn_local(fut);
+                        PENDING.with(move |cell| {
+                            cell.borrow_mut().push(tokio::task::spawn_local(fut));
+                        });
                     }
                     ArbiterCommand::ExecuteFn(f) => {
                         f.call_box();
diff --git a/actix-rt/tests/wait_spawned.rs b/actix-rt/tests/wait_spawned.rs
index e3296e89..af5d0224 100644
--- a/actix-rt/tests/wait_spawned.rs
+++ b/actix-rt/tests/wait_spawned.rs
@@ -61,3 +61,40 @@ fn join_another_arbiter() {
         "Premature stop of arbiter should conclude regardless of it's current state"
     );
 }
+
+#[test]
+fn join_current_arbiter() {
+    let time = Duration::from_secs(2);
+
+    let instant = Instant::now();
+    actix_rt::System::new("test_join_current_arbiter").block_on(async move {
+        actix_rt::spawn(async move {
+            tokio::time::delay_for(time).await;
+            actix_rt::Arbiter::current().stop();
+        });
+        actix_rt::Arbiter::local_join().await;
+    });
+    assert!(
+        instant.elapsed() >= time,
+        "Join on current arbiter should wait for all spawned futures"
+    );
+
+    let large_timer = Duration::from_secs(20);
+    let instant = Instant::now();
+    actix_rt::System::new("test_join_current_arbiter").block_on(async move {
+        actix_rt::spawn(async move {
+            tokio::time::delay_for(time).await;
+            actix_rt::Arbiter::current().stop();
+        });
+        let f = actix_rt::Arbiter::local_join();
+        actix_rt::spawn(async move {
+            tokio::time::delay_for(large_timer).await;
+            actix_rt::Arbiter::current().stop();
+        });
+        f.await;
+    });
+    assert!(
+        instant.elapsed() < large_timer,
+        "local_join should await only for the already spawned futures"
+    );
+}