diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 20f147d8..f3413b17 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -60,19 +60,19 @@ impl Default for Arbiter { } impl Arbiter { - pub(crate) fn new_system(exec: &mut E::Executor) -> Self { + pub(crate) fn new_system(exec: &mut Exec::Executor) -> Self { let (tx, rx) = unbounded(); let arb = Arbiter::with_sender(tx); ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone())); STORAGE.with(|cell| cell.borrow_mut().clear()); - let controller: ArbiterController = ArbiterController { + let controller = ArbiterController:: { rx, _exec: Default::default(), }; - E::spawn_ref(exec, controller); + Exec::spawn_on(exec, controller); arb } @@ -285,7 +285,7 @@ impl Drop for ArbiterController { } } -impl Future for ArbiterController { +impl Future for ArbiterController { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -295,7 +295,7 @@ impl Future for ArbiterController { Poll::Ready(Some(item)) => match item { ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Execute(fut) => { - E::spawn(fut); + Exec::spawn(fut); } ArbiterCommand::ExecuteFn(f) => { f.call_box(); diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index c1a273f4..46aef974 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -15,21 +15,21 @@ use crate::system::System; /// Either use `Builder::build` to create a system and start actors. /// Alternatively, use `Builder::run` to start the tokio runtime and /// run a function in its context. -pub struct Builder { +pub struct Builder { /// Name of the System. Defaults to "actix" if unset. name: Cow<'static, str>, /// Whether the Arbiter will stop the whole System on uncaught panic. Defaults to false. stop_on_panic: bool, - exec: PhantomData, + _exec: PhantomData, } -impl Builder { - pub(crate) fn new() -> Builder { +impl Builder { + pub(crate) fn new() -> Self { Builder { name: Cow::Borrowed("actix"), stop_on_panic: false, - exec: PhantomData, + _exec: PhantomData, } } @@ -51,7 +51,7 @@ impl Builder { /// Create new System. /// /// This method panics if it can not create tokio runtime - pub fn build(self) -> SystemRunner { + pub fn build(self) -> SystemRunner { self.create_runtime(|| {}) } @@ -65,8 +65,8 @@ impl Builder { self.create_runtime(f).run() } - /// Create runtime with a given instance of type that impl `ExecFactory::Executor` trait. - pub fn create_with_runtime(self, mut rt: E::Executor, f: F) -> SystemRunner + /// Create runtime with a given instance of `ExecFactory::Executor` type. + pub fn create_with_runtime(self, mut rt: Exec::Executor, f: F) -> SystemRunner where F: FnOnce() + 'static, { @@ -75,26 +75,26 @@ impl Builder { let system = System::construct( sys_sender, - Arbiter::new_system::(&mut rt), + Arbiter::new_system::(&mut rt), self.stop_on_panic, ); // system arbiter let arb = SystemArbiter::new(stop_tx, sys_receiver); - E::spawn_ref(&mut rt, arb); + Exec::spawn_on(&mut rt, arb); // init system arbiter and run configuration method - E::block_on(&mut rt, async { f() }); + Exec::block_on(&mut rt, async { f() }); SystemRunner { rt, stop, system } } - fn create_runtime(self, f: F) -> SystemRunner + fn create_runtime(self, f: F) -> SystemRunner where F: FnOnce() + 'static, { - let rt = E::build().unwrap(); + let rt = Exec::build().unwrap(); self.create_with_runtime(rt, f) } } @@ -102,20 +102,20 @@ impl Builder { /// Helper object that runs System's event loop #[must_use = "SystemRunner must be run"] #[derive(Debug)] -pub struct SystemRunner { - rt: E::Executor, +pub struct SystemRunner { + rt: Exec::Executor, stop: Receiver, system: System, } -impl SystemRunner { +impl SystemRunner { /// This function will start event loop and will finish once the /// `System::stop()` function is called. pub fn run(self) -> io::Result<()> { let SystemRunner { mut rt, stop, .. } = self; // run loop - match E::block_on(&mut rt, stop) { + match Exec::block_on(&mut rt, stop) { Ok(code) => { if code != 0 { Err(io::Error::new( @@ -130,11 +130,12 @@ impl SystemRunner { } } + /// Spawn a future on the system arbiter. pub fn spawn(&mut self, fut: F) where F: Future + 'static, { - E::spawn_ref(&mut self.rt, fut); + Exec::spawn_on(&mut self.rt, fut); } /// Execute a future and wait for result. @@ -142,6 +143,6 @@ impl SystemRunner { where F: Future, { - E::block_on(&mut self.rt, fut) + Exec::block_on(&mut self.rt, fut) } } diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index a79ec46b..fcc96e86 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -32,41 +32,23 @@ pub trait ExecFactory: Sized + Send + Sync + Unpin + 'static { /// complete execution by calling `block_on` or `run`. fn block_on(exec: &mut Self::Executor, f: F) -> F::Output; - /// Spawn a future onto the single-threaded runtime without reference it. + /// Spawn a future onto an executor without reference it. /// /// See [module level][mod] documentation for more details. /// /// [mod]: index.html /// - /// # Examples - /// - /// ```rust,ignore - /// # use futures::{future, Future, Stream}; - /// use actix_rt::Runtime; - /// - /// # fn dox() { - /// // Create the runtime - /// let mut rt = Runtime::new().unwrap(); - /// - /// // Spawn a future onto the runtime - /// rt.spawn(future::lazy(|_| { - /// println!("running on the runtime"); - /// })); - /// # } - /// # pub fn main() {} - /// ``` - /// /// # Panics /// /// This function panics if the spawn fails. Failure occurs if the executor /// is currently at capacity and is unable to spawn a new future. fn spawn + 'static>(f: F); - /// Spawn a future onto the single-threaded runtime reference. Useful when you have direct + /// Spawn a future onto an executor reference. Useful when you have direct /// access to executor. /// - /// *. `spawn_ref` is preferred when you can choose between it and `spawn`. - fn spawn_ref + 'static>(exec: &mut Self::Executor, f: F); + /// *. `spawn_on` is preferred when you can choose between it and `spawn`. + fn spawn_on + 'static>(exec: &mut Self::Executor, f: F); /// Get a timeout sleep future with given duration. fn sleep(dur: Duration) -> Self::Sleep; @@ -105,7 +87,7 @@ impl ExecFactory for ActixExec { tokio::task::spawn_local(f); } - fn spawn_ref + 'static>(exec: &mut Self::Executor, f: F) { + fn spawn_on + 'static>(exec: &mut Self::Executor, f: F) { exec.1.spawn_local(f); } diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index dcf70f25..e7e072ac 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -92,7 +92,7 @@ impl ExecFactory for TokioCompatExec { .unwrap(); } - fn spawn_ref + 'static>(exec: &mut Self::Executor, f: F) { + fn spawn_on + 'static>(exec: &mut Self::Executor, f: F) { exec.spawn_std(f); } @@ -103,27 +103,26 @@ impl ExecFactory for TokioCompatExec { #[test] fn tokio_compat() -> std::io::Result<()> { - // manually construct a compat executor. - let rt = TokioCompatExec::build()?; + let exec = TokioCompatExec::build()?; - // do some work with rt and pass it to builder actix_rt::System::builder::() - .create_with_runtime(rt, || {}) + .create_with_runtime(exec, || {}) .block_on(async { let (tx, rx) = tokio::sync::oneshot::channel(); - tokio_01::spawn(futures_01::lazy(|| { - tx.send(251).unwrap(); - Ok(()) - })); - + use futures_01::Future; + tokio_01::spawn( + tokio_01::timer::Delay::new(Instant::now() + Duration::from_millis(1)) + .map_err(|e| panic!("tokio 0.1 timer error: {}", e)) + .map(|_| tx.send(251).unwrap()), + ); + TokioCompatExec::sleep(Duration::from_millis(1)).await; assert_eq!(251, rx.await.unwrap()); }); - // let the system construct the executor and block on it directly. actix_rt::System::new_with::("compat").block_on(async { let (tx, rx) = tokio::sync::oneshot::channel(); tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(1)).await; + TokioCompatExec::sleep(Duration::from_millis(1)).await; tx.send(996).unwrap(); }); assert_eq!(996, rx.await.unwrap()); diff --git a/actix-server/examples/custom_executor.rs b/actix-server/examples/custom_executor.rs index 2aac5690..6f4f5ee0 100644 --- a/actix-server/examples/custom_executor.rs +++ b/actix-server/examples/custom_executor.rs @@ -107,7 +107,7 @@ impl FromStream for AsyncStdTcpStream { } } -// impl trait for custom executor so server can/block_on spawn tasks +// impl trait for custom executor so server can block_on/spawn tasks impl ExecFactory for AsyncStdExec { type Executor = (); type Sleep = Pin + Send + 'static>>; @@ -124,7 +124,7 @@ impl ExecFactory for AsyncStdExec { async_std::task::spawn_local(f); } - fn spawn_ref + 'static>(_: &mut Self::Executor, f: F) { + fn spawn_on + 'static>(_: &mut Self::Executor, f: F) { async_std::task::spawn_local(f); } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 03b15b54..5df4a9e7 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -44,7 +44,7 @@ pub struct ServerBuilder { impl Default for ServerBuilder { fn default() -> Self { - Self::new() + ServerBuilder::new() } } @@ -52,16 +52,8 @@ impl ServerBuilder where Exec: ExecFactory, { - /// Create new Server builder instance with default tokio executor. - pub fn new() -> Self { - ServerBuilder::::new_with() - } - /// Create new Server builder instance with a generic executor. - pub fn new_with() -> ServerBuilder - where - E: ExecFactory, - { + pub fn new() -> Self { let (tx, rx) = unbounded(); let server = Server::new(tx); diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 54930529..eeef8a77 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -44,7 +44,7 @@ impl Server { /// Start server building process with a custom executor pub fn build_with() -> ServerBuilder { - ServerBuilder::::new_with() + ServerBuilder::::new() } pub(crate) fn signal(&self, sig: Signal) {