From 048314913cfd9bd22d40bddd2f84e6971fa7db5b Mon Sep 17 00:00:00 2001
From: George Hahn <george.hahn@smartthings.com>
Date: Thu, 23 May 2019 13:34:47 -0500
Subject: [PATCH] Enable System to be executed on an external CurrentThread
 runtime

---
 actix-rt/src/builder.rs | 68 ++++++++++++++++++++++++++++++++++++++++-
 actix-rt/src/system.rs  | 11 ++++++-
 2 files changed, 77 insertions(+), 2 deletions(-)

diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs
index 49e8c1f5..ef33528f 100644
--- a/actix-rt/src/builder.rs
+++ b/actix-rt/src/builder.rs
@@ -1,11 +1,12 @@
 use std::borrow::Cow;
 use std::io;
 
+use futures::future;
 use futures::future::{lazy, Future};
 use futures::sync::mpsc::unbounded;
 use futures::sync::oneshot::{channel, Receiver};
 
-use tokio_current_thread::CurrentThread;
+use tokio_current_thread::{CurrentThread, Handle};
 use tokio_reactor::Reactor;
 use tokio_timer::clock::Clock;
 use tokio_timer::timer::Timer;
@@ -69,6 +70,13 @@ impl Builder {
         self.create_runtime(|| {})
     }
 
+    /// Create new System that can run asynchronously.
+    ///
+    /// This method panics if it cannot start the system arbiter
+    pub fn build_async(self, executor: Handle) -> AsyncSystemRunner {
+        self.create_async_runtime(executor)
+    }
+
     /// This function will start tokio runtime and will finish once the
     /// `System::stop()` message get called.
     /// Function `f` get called within tokio runtime context.
@@ -79,6 +87,22 @@ impl Builder {
         self.create_runtime(f).run()
     }
 
+    fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner
+    {
+        let (stop_tx, stop) = channel();
+        let (sys_sender, sys_receiver) = unbounded();
+
+        let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
+
+        // system arbiter
+        let arb = SystemArbiter::new(stop_tx, sys_receiver);
+
+        // start the system arbiter
+        executor.spawn(arb).expect("could not start system arbiter");
+
+        AsyncSystemRunner { stop, system }
+    }
+
     fn create_runtime<F>(self, f: F) -> SystemRunner
     where
         F: FnOnce() + 'static,
@@ -127,6 +151,48 @@ impl Builder {
     }
 }
 
+#[derive(Debug)]
+pub struct AsyncSystemRunner {
+    stop: Receiver<i32>,
+    system: System,
+}
+
+impl AsyncSystemRunner {
+    /// This function will start event loop and returns a future that
+    /// resolves once the `System::stop()` function is called.
+    pub fn run_nonblocking(self) -> Box<Future<Item = (), Error = io::Error> + Send + 'static> {
+        let AsyncSystemRunner { stop, .. } = self;
+
+        // run loop
+        Box::new(future::ok(())
+            .and_then(|_| {
+                Arbiter::run_system();
+                future::ok(())
+            }).
+            and_then(|_| {
+                stop.then(|res| {
+                    match res {
+                        Ok(code) => {
+                            if code != 0 {
+                                Err(io::Error::new(
+                                    io::ErrorKind::Other,
+                                    format!("Non-zero exit code: {}", code),
+                                ))
+                            } else {
+                                Ok(())
+                            }
+                        }
+                        Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+                    }
+                })
+            }).then(|result| {
+                Arbiter::stop_system();
+                result
+            })
+        )
+    }
+}
+
 /// Helper object that runs System's event loop
 #[must_use = "SystemRunner must be run"]
 #[derive(Debug)]
diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs
index aaf15c7c..7bead2a3 100644
--- a/actix-rt/src/system.rs
+++ b/actix-rt/src/system.rs
@@ -2,10 +2,11 @@ use std::cell::RefCell;
 use std::io;
 use std::sync::atomic::{AtomicUsize, Ordering};
 
+use tokio_current_thread::Handle;
 use futures::sync::mpsc::UnboundedSender;
 
 use crate::arbiter::{Arbiter, SystemCommand};
-use crate::builder::{Builder, SystemRunner};
+use crate::builder::{Builder, SystemRunner, AsyncSystemRunner};
 
 static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
 
@@ -55,6 +56,14 @@ impl System {
         Self::builder().name(name).build()
     }
 
+    #[allow(clippy::new_ret_no_self)]
+    /// Create new system.
+    ///
+    /// This method panics if it can not create tokio runtime
+    pub fn new_async<T: Into<String>>(name: T, executor: Handle) -> AsyncSystemRunner {
+        Self::builder().name(name).build_async(executor)
+    }
+
     /// Get current running system.
     pub fn current() -> System {
         CURRENT.with(|cell| match *cell.borrow() {