From dca7cad09d2cccb514b29608728312fd815bc2f8 Mon Sep 17 00:00:00 2001 From: Martell Malone <martellmalone@gmail.com> Date: Wed, 27 Nov 2019 23:41:49 -0800 Subject: [PATCH] WIP: Update actix-rt to tokio 0.2 --- actix-rt/src/arbiter.rs | 14 ++- actix-rt/src/builder.rs | 242 ---------------------------------------- actix-rt/src/lib.rs | 25 +---- actix-rt/src/mod.rs | 15 +-- actix-rt/src/runtime.rs | 161 -------------------------- actix-rt/src/system.rs | 46 +++++--- 6 files changed, 50 insertions(+), 453 deletions(-) delete mode 100644 actix-rt/src/builder.rs delete mode 100644 actix-rt/src/runtime.rs diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 0b0ae659..19cdab18 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -9,9 +9,8 @@ use std::{fmt, thread}; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot::{channel, Canceled, Sender}; use futures::{future, Future, FutureExt, Stream}; -use tokio_executor::current_thread::spawn; +use tokio::runtime::{Builder, Runtime}; -use crate::builder::Builder; use crate::system::System; use copyless::BoxHelper; @@ -101,7 +100,7 @@ impl Arbiter { let handle = thread::Builder::new() .name(name.clone()) .spawn(move || { - let mut rt = Builder::new().build_rt().expect("Can not create Runtime"); + let mut rt = Builder::new().build().expect("Can not create Runtime"); let arb = Arbiter::with_sender(arb_tx); let (stop, stop_rx) = channel(); @@ -150,7 +149,8 @@ impl Arbiter { for fut in v.drain(..) { // We pin the boxed future, so it can never again be moved. let fut = unsafe { Pin::new_unchecked(fut) }; - tokio_executor::current_thread::spawn(fut); + // FIXME + //tokio_executor::current_thread::spawn(fut); } }); } @@ -169,7 +169,8 @@ impl Arbiter { RUNNING.with(move |cell| { if cell.get() { // Spawn the future on running executor - spawn(future); + // FIXME + //spawn(future); } else { // Box the future and push it to the queue, this results in double boxing // because the executor boxes the future again, but works for now @@ -325,7 +326,8 @@ impl Future for ArbiterController { return Poll::Ready(()); } ArbiterCommand::Execute(fut) => { - spawn(fut); + // FIXME + //spawn(fut); } ArbiterCommand::ExecuteFn(f) => { f.call_box(); diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs deleted file mode 100644 index c9e4a250..00000000 --- a/actix-rt/src/builder.rs +++ /dev/null @@ -1,242 +0,0 @@ -use std::borrow::Cow; -use std::io; - -use futures::channel::mpsc::unbounded; -use futures::channel::oneshot::{channel, Receiver}; -use futures::future::{lazy, Future, FutureExt}; - -use tokio::runtime::current_thread::Handle; -use tokio_executor::current_thread::CurrentThread; -use tokio::net::driver::Reactor; -use tokio::time::{clock::Clock, timer::Timer}; - -use crate::arbiter::{Arbiter, SystemArbiter}; -use crate::runtime::Runtime; -use crate::system::System; - -/// Builder struct for a actix runtime. -/// -/// Either use `Builder::build` to create a system and start actors. -/// Alternatively, use `Builder::run` to start the tokio runtime and -/// run a function in its context. -pub struct Builder { - /// Name of the System. Defaults to "actix" if unset. - name: Cow<'static, str>, - - /// The clock to use - clock: Clock, - - /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. - stop_on_panic: bool, -} - -impl Builder { - pub(crate) fn new() -> Self { - Builder { - name: Cow::Borrowed("actix"), - clock: Clock::new(), - stop_on_panic: false, - } - } - - /// Sets the name of the System. - pub fn name<T: Into<String>>(mut self, name: T) -> Self { - self.name = Cow::Owned(name.into()); - self - } - - /// Set the Clock instance that will be used by this System. - /// - /// Defaults to the system clock. - pub fn clock(mut self, clock: Clock) -> Self { - self.clock = clock; - self - } - - /// Sets the option 'stop_on_panic' which controls whether the System is stopped when an - /// uncaught panic is thrown from a worker thread. - /// - /// Defaults to false. - pub fn stop_on_panic(mut self, stop_on_panic: bool) -> Self { - self.stop_on_panic = stop_on_panic; - self - } - - /// Create new System. - /// - /// This method panics if it can not create tokio runtime - pub fn build(self) -> SystemRunner { - self.create_runtime(|| {}) - } - - /// Create new System that can run asynchronously. - /// - /// This method panics if it cannot start the system arbiter - pub(crate) fn build_async(self, executor: Handle) -> AsyncSystemRunner { - self.create_async_runtime(executor) - } - - /// This function will start tokio runtime and will finish once the - /// `System::stop()` message get called. - /// Function `f` get called within tokio runtime context. - pub fn run<F>(self, f: F) -> io::Result<()> - where - F: FnOnce() + 'static, - { - self.create_runtime(f).run() - } - - fn create_async_runtime(self, executor: Handle) -> AsyncSystemRunner { - let (stop_tx, stop) = channel(); - let (sys_sender, sys_receiver) = unbounded(); - - let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); - - // system arbiter - let arb = SystemArbiter::new(stop_tx, sys_receiver); - - // start the system arbiter - executor.spawn(arb).expect("could not start system arbiter"); - - AsyncSystemRunner { stop, system } - } - - fn create_runtime<F>(self, f: F) -> SystemRunner - where - F: FnOnce() + 'static, - { - let (stop_tx, stop) = channel(); - let (sys_sender, sys_receiver) = unbounded(); - - let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); - - // system arbiter - let arb = SystemArbiter::new(stop_tx, sys_receiver); - - let mut rt = self.build_rt().unwrap(); - rt.spawn(arb); - - // init system arbiter and run configuration method - let _ = rt.block_on(lazy(move |_| { - f(); - Ok::<_, ()>(()) - })); - - SystemRunner { rt, stop, system } - } - - pub(crate) fn build_rt(&self) -> io::Result<Runtime> { - // We need a reactor to receive events about IO objects from kernel - let reactor = Reactor::new()?; - let reactor_handle = reactor.handle(); - - // Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the - // reactor pick up some new external events. - let timer = Timer::new_with_now(reactor, self.clock.clone()); - let timer_handle = timer.handle(); - - // And now put a single-threaded executor on top of the timer. When there are no futures ready - // to do something, it'll let the timer or the reactor to generate some new stimuli for the - // futures to continue in their life. - let executor = CurrentThread::new_with_park(timer); - - Ok(Runtime::new2( - reactor_handle, - timer_handle, - self.clock.clone(), - executor, - )) - } -} - -#[derive(Debug)] -pub(crate) struct AsyncSystemRunner { - stop: Receiver<i32>, - system: System, -} - -impl AsyncSystemRunner { - /// This function will start event loop and returns a future that - /// resolves once the `System::stop()` function is called. - pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(), io::Error>> + Send { - let AsyncSystemRunner { stop, .. } = self; - - // run loop - lazy(|_| { - Arbiter::run_system(); - async { - let res = match stop.await { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - }; - Arbiter::stop_system(); - return res; - } - }) - .flatten() - } -} - -/// Helper object that runs System's event loop -#[must_use = "SystemRunner must be run"] -#[derive(Debug)] -pub struct SystemRunner { - rt: Runtime, - stop: Receiver<i32>, - system: System, -} - -impl SystemRunner { - /// This function will start event loop and will finish once the - /// `System::stop()` function is called. - pub fn run(self) -> io::Result<()> { - let SystemRunner { mut rt, stop, .. } = self; - - // run loop - let _ = rt.block_on(async { - Arbiter::run_system(); - Ok::<_, ()>(()) - }); - let result = match rt.block_on(stop) { - Ok(code) => { - if code != 0 { - Err(io::Error::new( - io::ErrorKind::Other, - format!("Non-zero exit code: {}", code), - )) - } else { - Ok(()) - } - } - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), - }; - Arbiter::stop_system(); - result - } - - /// Execute a future and wait for result. - pub fn block_on<F, O>(&mut self, fut: F) -> O - where - F: Future<Output = O>, - { - self.rt.block_on(async { - Arbiter::run_system(); - }); - - let res = self.rt.block_on(fut); - self.rt.block_on(async { - Arbiter::stop_system(); - }); - - res - } -} diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index c1faaca9..ca959928 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -4,13 +4,9 @@ pub use actix_macros::{main, test}; mod arbiter; -mod builder; -mod runtime; mod system; pub use self::arbiter::Arbiter; -pub use self::builder::{Builder, SystemRunner}; -pub use self::runtime::Runtime; pub use self::system::System; #[doc(hidden)] @@ -34,21 +30,8 @@ where /// Utilities for tracking time. pub mod time { - use std::time::{Duration, Instant}; - - pub use tokio_timer::Interval; - pub use tokio_timer::{delay, delay_for, Delay}; - pub use tokio_timer::{timeout, Timeout}; - - /// Creates new `Interval` that yields with interval of `duration`. The first - /// tick completes immediately. - pub fn interval(duration: Duration) -> Interval { - Interval::new(Instant::now(), duration) - } - - /// Creates new `Interval` that yields with interval of `period` with the - /// first tick completing at `at`. - pub fn interval_at(start: Instant, duration: Duration) -> Interval { - Interval::new(start, duration) - } + pub use tokio::time::{Duration, Instant}; + pub use tokio::time::{delay_for, delay_until, Delay}; + pub use tokio::time::{interval, interval_at, Interval}; + pub use tokio::time::{timeout, Timeout}; } diff --git a/actix-rt/src/mod.rs b/actix-rt/src/mod.rs index dca41711..673181c3 100644 --- a/actix-rt/src/mod.rs +++ b/actix-rt/src/mod.rs @@ -66,15 +66,8 @@ //! [executor]: https://tokio.rs/docs/getting-started/runtime-model/#executors //! [timer]: ../../timer/index.html -mod builder; -mod runtime; - -pub use self::builder::Builder; -pub use self::runtime::{Runtime, Handle}; -pub use tokio_current_thread::spawn; -pub use tokio_current_thread::TaskExecutor; - use futures::Future; +use tokio::runtime; /// Run the provided future to completion using a runtime running on the current thread. /// @@ -85,7 +78,11 @@ pub fn block_on_all<F>(future: F) -> Result<F::Item, F::Error> where F: Future, { - let mut r = Runtime::new().expect("failed to start runtime on current thread"); + let mut r = runtime::Builder::new() + .basic_scheduler() + .enable_io() + .build() + .expect("failed to start runtime on current thread"); let v = r.block_on(future)?; r.run().expect("failed to resolve remaining futures"); Ok(v) diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs deleted file mode 100644 index 3788cb81..00000000 --- a/actix-rt/src/runtime.rs +++ /dev/null @@ -1,161 +0,0 @@ -use std::error::Error; -use std::{fmt, io}; - -use futures::Future; -use tokio_executor::current_thread::{self, CurrentThread}; -use tokio::net::driver::{Handle as ReactorHandle, Reactor}; -use tokio_timer::{ - clock::Clock, - timer::{self, Timer}, -}; - -use crate::builder::Builder; - -/// Single-threaded runtime provides a way to start reactor -/// and executor on the current thread. -/// -/// See [module level][mod] documentation for more details. -/// -/// [mod]: index.html -#[derive(Debug)] -pub struct Runtime { - reactor_handle: ReactorHandle, - timer_handle: timer::Handle, - clock: Clock, - executor: CurrentThread<Timer<Reactor>>, -} - -/// Error returned by the `run` function. -#[derive(Debug)] -pub struct RunError { - inner: current_thread::RunError, -} - -impl fmt::Display for RunError { - fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - write!(fmt, "{}", self.inner) - } -} - -impl Error for RunError { - fn description(&self) -> &str { - self.inner.description() - } - fn cause(&self) -> Option<&dyn Error> { - self.inner.source() - } -} - -impl Runtime { - #[allow(clippy::new_ret_no_self)] - /// Returns a new runtime initialized with default configuration values. - pub fn new() -> io::Result<Runtime> { - Builder::new().build_rt() - } - - pub(super) fn new2( - reactor_handle: ReactorHandle, - timer_handle: timer::Handle, - clock: Clock, - executor: CurrentThread<Timer<Reactor>>, - ) -> Runtime { - Runtime { - reactor_handle, - timer_handle, - clock, - executor, - } - } - - /// Spawn a future onto the single-threaded Tokio runtime. - /// - /// See [module level][mod] documentation for more details. - /// - /// [mod]: index.html - /// - /// # Examples - /// - /// ```rust - /// # use futures::{future, Future, Stream}; - /// use actix_rt::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); - /// - /// // Spawn a future onto the runtime - /// rt.spawn(future::lazy(|_| { - /// println!("running on the runtime"); - /// })); - /// # } - /// # pub fn main() {} - /// ``` - /// - /// # Panics - /// - /// This function panics if the spawn fails. Failure occurs if the executor - /// is currently at capacity and is unable to spawn a new future. - pub fn spawn<F>(&mut self, future: F) -> &mut Self - where - F: Future<Output = ()> + 'static, - { - self.executor.spawn(future); - self - } - - /// Runs the provided future, blocking the current thread until the future - /// completes. - /// - /// This function can be used to synchronously block the current thread - /// until the provided `future` has resolved either successfully or with an - /// error. The result of the future is then returned from this function - /// call. - /// - /// Note that this function will **also** execute any spawned futures on the - /// current thread, but will **not** block until these other spawned futures - /// have completed. Once the function returns, any uncompleted futures - /// remain pending in the `Runtime` instance. These futures will not run - /// until `block_on` or `run` is called again. - /// - /// The caller is responsible for ensuring that other spawned futures - /// complete execution by calling `block_on` or `run`. - pub fn block_on<F>(&mut self, f: F) -> F::Output - where - F: Future, - { - self.enter(|executor| { - // Run the provided future - executor.block_on(f) - }) - } - - /// Run the executor to completion, blocking the thread until **all** - /// spawned futures have completed. - pub fn run(&mut self) -> Result<(), RunError> { - self.enter(|executor| executor.run()) - .map_err(|e| RunError { inner: e }) - } - - fn enter<F, R>(&mut self, f: F) -> R - where - F: FnOnce(&mut CurrentThread<Timer<Reactor>>) -> R, - { - let Runtime { - ref reactor_handle, - ref timer_handle, - ref clock, - ref mut executor, - .. - } = *self; - - // WARN: We do not enter the executor here, since in tokio 0.2 the executor is entered - // automatically inside its `block_on` and `run` methods - tokio_executor::with_default(&mut current_thread::TaskExecutor::current(), || { - tokio_timer::clock::with_default(clock, || { - let _reactor_guard = tokio::net::driver::set_default(reactor_handle); - let _timer_guard = tokio_timer::set_default(timer_handle); - f(executor) - }) - }) - } -} diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 7f643095..de076f94 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -4,10 +4,10 @@ use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; use futures::channel::mpsc::UnboundedSender; -use tokio::runtime::current_thread::Handle; +use tokio::runtime::{Builder, Runtime}; use crate::arbiter::{Arbiter, SystemCommand}; -use crate::builder::{Builder, SystemRunner}; + static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -53,23 +53,23 @@ impl System { /// Create new system. /// /// This method panics if it can not create tokio runtime - pub fn new<T: Into<String>>(name: T) -> SystemRunner { - Self::builder().name(name).build() + pub fn new<T: Into<String>>(name: T) -> Runtime { + Self::builder().thread_name(name).build().unwrap() } #[allow(clippy::new_ret_no_self)] /// Create new system using provided CurrentThread Handle. /// /// This method panics if it can not spawn system arbiter - pub fn run_in_executor<T: Into<String>>( - name: T, - executor: Handle, - ) -> impl Future<Output = Result<(), io::Error>> + Send { - Self::builder() - .name(name) - .build_async(executor) - .run_nonblocking() - } + // pub fn run_in_executor<T: Into<String>>( + // name: T, + // executor: Handle, + // ) -> impl Future<Output = Result<(), io::Error>> + Send { + // Self::builder() + // .name(name) + // .build_async(executor) + // .run_nonblocking() + // } /// Get current running system. pub fn current() -> System { @@ -140,6 +140,24 @@ impl System { where F: FnOnce() + 'static, { - Self::builder().run(f) + // let (stop_tx, stop) = channel(); + // let (sys_sender, sys_receiver) = unbounded(); + + // let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic); + + // // system arbiter + // let arb = SystemArbiter::new(stop_tx, sys_receiver); + + // let mut rt = self.build_rt().unwrap(); + // rt.spawn(arb); + + // // init system arbiter and run configuration method + // let _ = rt.block_on(lazy(move |_| { + // f(); + // Ok::<_, ()>(()) + // })); + // FIXME + Ok(()) + //Self::builder().run(f) } }