mirror of https://github.com/fafhrd91/actix-net
remove RUNNING Q PENDING thread locals from actix-rt
This commit is contained in:
parent
704af672b9
commit
c479243934
|
@ -19,10 +19,8 @@ path = "src/lib.rs"
|
||||||
actix-macros = "0.1.0"
|
actix-macros = "0.1.0"
|
||||||
actix-threadpool = "0.3"
|
actix-threadpool = "0.3"
|
||||||
futures-channel = { version = "0.3.4", default-features = false }
|
futures-channel = { version = "0.3.4", default-features = false }
|
||||||
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] }
|
|
||||||
copyless = "0.1.4"
|
|
||||||
smallvec = "1"
|
|
||||||
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
|
tokio = { version = "0.2.6", default-features = false, features = ["rt-core", "rt-util", "io-driver", "tcp", "uds", "udp", "time", "signal", "stream"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] }
|
||||||
tokio = { version = "0.2.6", features = ["full"] }
|
tokio = { version = "0.2.6", features = ["full"] }
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
use std::any::{Any, TypeId};
|
use std::any::{Any, TypeId};
|
||||||
use std::cell::{Cell, RefCell};
|
use std::cell::RefCell;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
use std::future::Future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
@ -8,24 +9,14 @@ use std::{fmt, thread};
|
||||||
|
|
||||||
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
|
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
|
||||||
use futures_channel::oneshot::{channel, Canceled, Sender};
|
use futures_channel::oneshot::{channel, Canceled, Sender};
|
||||||
use futures_util::{
|
use tokio::stream::Stream;
|
||||||
future::{self, Future, FutureExt},
|
use tokio::task::LocalSet;
|
||||||
stream::Stream,
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::runtime::Runtime;
|
use crate::runtime::Runtime;
|
||||||
use crate::system::System;
|
use crate::system::System;
|
||||||
|
|
||||||
use copyless::BoxHelper;
|
|
||||||
|
|
||||||
use smallvec::SmallVec;
|
|
||||||
pub use tokio::task::JoinHandle;
|
|
||||||
|
|
||||||
thread_local!(
|
thread_local!(
|
||||||
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
|
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
|
||||||
static RUNNING: Cell<bool> = Cell::new(false);
|
|
||||||
static Q: RefCell<Vec<Pin<Box<dyn Future<Output = ()>>>>> = RefCell::new(Vec::new());
|
|
||||||
static PENDING: RefCell<SmallVec<[JoinHandle<()>; 8]>> = RefCell::new(SmallVec::new());
|
|
||||||
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
|
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -69,14 +60,14 @@ impl Default for Arbiter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Arbiter {
|
impl Arbiter {
|
||||||
pub(crate) fn new_system() -> Self {
|
pub(crate) fn new_system(local: &LocalSet) -> Self {
|
||||||
let (tx, rx) = unbounded();
|
let (tx, rx) = unbounded();
|
||||||
|
|
||||||
let arb = Arbiter::with_sender(tx);
|
let arb = Arbiter::with_sender(tx);
|
||||||
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
||||||
RUNNING.with(|cell| cell.set(false));
|
|
||||||
STORAGE.with(|cell| cell.borrow_mut().clear());
|
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||||
Arbiter::spawn(ArbiterController { stop: None, rx });
|
|
||||||
|
local.spawn_local(ArbiterController { rx });
|
||||||
|
|
||||||
arb
|
arb
|
||||||
}
|
}
|
||||||
|
@ -91,8 +82,9 @@ impl Arbiter {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if current arbiter is running.
|
/// Check if current arbiter is running.
|
||||||
|
#[deprecated(note = "Thread local variables for running state of Arbiter is removed")]
|
||||||
pub fn is_running() -> bool {
|
pub fn is_running() -> bool {
|
||||||
RUNNING.with(|cell| cell.get())
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop arbiter from continuing it's event loop.
|
/// Stop arbiter from continuing it's event loop.
|
||||||
|
@ -106,72 +98,47 @@ impl Arbiter {
|
||||||
let id = COUNT.fetch_add(1, Ordering::Relaxed);
|
let id = COUNT.fetch_add(1, Ordering::Relaxed);
|
||||||
let name = format!("actix-rt:worker:{}", id);
|
let name = format!("actix-rt:worker:{}", id);
|
||||||
let sys = System::current();
|
let sys = System::current();
|
||||||
let (arb_tx, arb_rx) = unbounded();
|
let (tx, rx) = unbounded();
|
||||||
let arb_tx2 = arb_tx.clone();
|
|
||||||
|
|
||||||
let handle = thread::Builder::new()
|
let handle = thread::Builder::new()
|
||||||
.name(name.clone())
|
.name(name.clone())
|
||||||
.spawn(move || {
|
.spawn({
|
||||||
let mut rt = Runtime::new().expect("Can not create Runtime");
|
let tx = tx.clone();
|
||||||
let arb = Arbiter::with_sender(arb_tx);
|
move || {
|
||||||
|
let mut rt = Runtime::new().expect("Can not create Runtime");
|
||||||
|
let arb = Arbiter::with_sender(tx);
|
||||||
|
|
||||||
let (stop, stop_rx) = channel();
|
STORAGE.with(|cell| cell.borrow_mut().clear());
|
||||||
RUNNING.with(|cell| cell.set(true));
|
|
||||||
STORAGE.with(|cell| cell.borrow_mut().clear());
|
|
||||||
|
|
||||||
System::set_current(sys);
|
System::set_current(sys);
|
||||||
|
|
||||||
// start arbiter controller
|
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
||||||
rt.spawn(ArbiterController {
|
|
||||||
stop: Some(stop),
|
|
||||||
rx: arb_rx,
|
|
||||||
});
|
|
||||||
ADDR.with(|cell| *cell.borrow_mut() = Some(arb.clone()));
|
|
||||||
|
|
||||||
// register arbiter
|
// register arbiter
|
||||||
let _ = System::current()
|
let _ = System::current()
|
||||||
.sys()
|
.sys()
|
||||||
.unbounded_send(SystemCommand::RegisterArbiter(id, arb));
|
.unbounded_send(SystemCommand::RegisterArbiter(id, arb));
|
||||||
|
|
||||||
// run loop
|
// start arbiter controller
|
||||||
let _ = match rt.block_on(stop_rx) {
|
// run loop
|
||||||
Ok(code) => code,
|
rt.block_on(ArbiterController { rx });
|
||||||
Err(_) => 1,
|
|
||||||
};
|
|
||||||
|
|
||||||
// unregister arbiter
|
// unregister arbiter
|
||||||
let _ = System::current()
|
let _ = System::current()
|
||||||
.sys()
|
.sys()
|
||||||
.unbounded_send(SystemCommand::UnregisterArbiter(id));
|
.unbounded_send(SystemCommand::UnregisterArbiter(id));
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.unwrap_or_else(|err| {
|
.unwrap_or_else(|err| {
|
||||||
panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
|
panic!("Cannot spawn an arbiter's thread {:?}: {:?}", &name, err)
|
||||||
});
|
});
|
||||||
|
|
||||||
Arbiter {
|
Arbiter {
|
||||||
sender: arb_tx2,
|
sender: tx,
|
||||||
thread_handle: Some(handle),
|
thread_handle: Some(handle),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn run_system(rt: Option<&Runtime>) {
|
|
||||||
RUNNING.with(|cell| cell.set(true));
|
|
||||||
Q.with(|cell| {
|
|
||||||
let mut v = cell.borrow_mut();
|
|
||||||
for fut in v.drain(..) {
|
|
||||||
if let Some(rt) = rt {
|
|
||||||
rt.spawn(fut);
|
|
||||||
} else {
|
|
||||||
tokio::task::spawn_local(fut);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn stop_system() {
|
|
||||||
RUNNING.with(|cell| cell.set(false));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Spawn a future on the current thread. This does not create a new Arbiter
|
/// Spawn a future on the current thread. This does not create a new Arbiter
|
||||||
/// or Arbiter address, it is simply a helper for spawning futures on the current
|
/// or Arbiter address, it is simply a helper for spawning futures on the current
|
||||||
/// thread.
|
/// thread.
|
||||||
|
@ -179,26 +146,7 @@ impl Arbiter {
|
||||||
where
|
where
|
||||||
F: Future<Output = ()> + 'static,
|
F: Future<Output = ()> + 'static,
|
||||||
{
|
{
|
||||||
RUNNING.with(move |cell| {
|
tokio::task::spawn_local(future);
|
||||||
if cell.get() {
|
|
||||||
// Spawn the future on running executor
|
|
||||||
let len = PENDING.with(move |cell| {
|
|
||||||
let mut p = cell.borrow_mut();
|
|
||||||
p.push(tokio::task::spawn_local(future));
|
|
||||||
p.len()
|
|
||||||
});
|
|
||||||
if len > 7 {
|
|
||||||
// Before reaching the inline size
|
|
||||||
tokio::task::spawn_local(CleanupPending);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Box the future and push it to the queue, this results in double boxing
|
|
||||||
// because the executor boxes the future again, but works for now
|
|
||||||
Q.with(move |cell| {
|
|
||||||
cell.borrow_mut().push(Pin::from(Box::alloc().init(future)))
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Executes a future on the current thread. This does not create a new Arbiter
|
/// Executes a future on the current thread. This does not create a new Arbiter
|
||||||
|
@ -209,7 +157,9 @@ impl Arbiter {
|
||||||
F: FnOnce() -> R + 'static,
|
F: FnOnce() -> R + 'static,
|
||||||
R: Future<Output = ()> + 'static,
|
R: Future<Output = ()> + 'static,
|
||||||
{
|
{
|
||||||
Arbiter::spawn(future::lazy(|_| f()).flatten())
|
Arbiter::spawn(async {
|
||||||
|
f();
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a future to the Arbiter's thread, and spawn it.
|
/// Send a future to the Arbiter's thread, and spawn it.
|
||||||
|
@ -316,40 +266,13 @@ impl Arbiter {
|
||||||
|
|
||||||
/// Returns a future that will be completed once all currently spawned futures
|
/// Returns a future that will be completed once all currently spawned futures
|
||||||
/// have completed.
|
/// have completed.
|
||||||
pub fn local_join() -> impl Future<Output = ()> {
|
#[deprecated(note = "local_join has been removed")]
|
||||||
PENDING.with(move |cell| {
|
pub async fn local_join() {
|
||||||
let current = cell.replace(SmallVec::new());
|
unimplemented!()
|
||||||
future::join_all(current).map(|_| ())
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Future used for cleaning-up already finished `JoinHandle`s
|
|
||||||
/// from the `PENDING` list so the vector doesn't grow indefinitely
|
|
||||||
struct CleanupPending;
|
|
||||||
|
|
||||||
impl Future for CleanupPending {
|
|
||||||
type Output = ();
|
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
PENDING.with(move |cell| {
|
|
||||||
let mut pending = cell.borrow_mut();
|
|
||||||
let mut i = 0;
|
|
||||||
while i != pending.len() {
|
|
||||||
if let Poll::Ready(_) = Pin::new(&mut pending[i]).poll(cx) {
|
|
||||||
pending.remove(i);
|
|
||||||
} else {
|
|
||||||
i += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Poll::Ready(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ArbiterController {
|
struct ArbiterController {
|
||||||
stop: Option<Sender<i32>>,
|
|
||||||
rx: UnboundedReceiver<ArbiterCommand>,
|
rx: UnboundedReceiver<ArbiterCommand>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -374,22 +297,9 @@ impl Future for ArbiterController {
|
||||||
match Pin::new(&mut self.rx).poll_next(cx) {
|
match Pin::new(&mut self.rx).poll_next(cx) {
|
||||||
Poll::Ready(None) => return Poll::Ready(()),
|
Poll::Ready(None) => return Poll::Ready(()),
|
||||||
Poll::Ready(Some(item)) => match item {
|
Poll::Ready(Some(item)) => match item {
|
||||||
ArbiterCommand::Stop => {
|
ArbiterCommand::Stop => return Poll::Ready(()),
|
||||||
if let Some(stop) = self.stop.take() {
|
|
||||||
let _ = stop.send(0);
|
|
||||||
};
|
|
||||||
return Poll::Ready(());
|
|
||||||
}
|
|
||||||
ArbiterCommand::Execute(fut) => {
|
ArbiterCommand::Execute(fut) => {
|
||||||
let len = PENDING.with(move |cell| {
|
tokio::task::spawn_local(fut);
|
||||||
let mut p = cell.borrow_mut();
|
|
||||||
p.push(tokio::task::spawn_local(fut));
|
|
||||||
p.len()
|
|
||||||
});
|
|
||||||
if len > 7 {
|
|
||||||
// Before reaching the inline size
|
|
||||||
tokio::task::spawn_local(CleanupPending);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
ArbiterCommand::ExecuteFn(f) => {
|
ArbiterCommand::ExecuteFn(f) => {
|
||||||
f.call_box();
|
f.call_box();
|
||||||
|
|
|
@ -1,9 +1,9 @@
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
use std::future::Future;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
use futures_channel::mpsc::unbounded;
|
use futures_channel::mpsc::unbounded;
|
||||||
use futures_channel::oneshot::{channel, Receiver};
|
use futures_channel::oneshot::{channel, Receiver};
|
||||||
use futures_util::future::{lazy, Future, FutureExt};
|
|
||||||
use tokio::task::LocalSet;
|
use tokio::task::LocalSet;
|
||||||
|
|
||||||
use crate::arbiter::{Arbiter, SystemArbiter};
|
use crate::arbiter::{Arbiter, SystemArbiter};
|
||||||
|
@ -74,7 +74,8 @@ impl Builder {
|
||||||
let (stop_tx, stop) = channel();
|
let (stop_tx, stop) = channel();
|
||||||
let (sys_sender, sys_receiver) = unbounded();
|
let (sys_sender, sys_receiver) = unbounded();
|
||||||
|
|
||||||
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
|
let system =
|
||||||
|
System::construct(sys_sender, Arbiter::new_system(local), self.stop_on_panic);
|
||||||
|
|
||||||
// system arbiter
|
// system arbiter
|
||||||
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
||||||
|
@ -92,16 +93,21 @@ impl Builder {
|
||||||
let (stop_tx, stop) = channel();
|
let (stop_tx, stop) = channel();
|
||||||
let (sys_sender, sys_receiver) = unbounded();
|
let (sys_sender, sys_receiver) = unbounded();
|
||||||
|
|
||||||
let system = System::construct(sys_sender, Arbiter::new_system(), self.stop_on_panic);
|
let mut rt = Runtime::new().unwrap();
|
||||||
|
|
||||||
|
let system = System::construct(
|
||||||
|
sys_sender,
|
||||||
|
Arbiter::new_system(rt.local()),
|
||||||
|
self.stop_on_panic,
|
||||||
|
);
|
||||||
|
|
||||||
// system arbiter
|
// system arbiter
|
||||||
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
let arb = SystemArbiter::new(stop_tx, sys_receiver);
|
||||||
|
|
||||||
let mut rt = Runtime::new().unwrap();
|
|
||||||
rt.spawn(arb);
|
rt.spawn(arb);
|
||||||
|
|
||||||
// init system arbiter and run configuration method
|
// init system arbiter and run configuration method
|
||||||
rt.block_on(lazy(move |_| f()));
|
rt.block_on(async { f() });
|
||||||
|
|
||||||
SystemRunner { rt, stop, system }
|
SystemRunner { rt, stop, system }
|
||||||
}
|
}
|
||||||
|
@ -120,27 +126,21 @@ impl AsyncSystemRunner {
|
||||||
let AsyncSystemRunner { stop, .. } = self;
|
let AsyncSystemRunner { stop, .. } = self;
|
||||||
|
|
||||||
// run loop
|
// run loop
|
||||||
lazy(|_| {
|
async {
|
||||||
Arbiter::run_system(None);
|
match stop.await {
|
||||||
async {
|
Ok(code) => {
|
||||||
let res = match stop.await {
|
if code != 0 {
|
||||||
Ok(code) => {
|
Err(io::Error::new(
|
||||||
if code != 0 {
|
io::ErrorKind::Other,
|
||||||
Err(io::Error::new(
|
format!("Non-zero exit code: {}", code),
|
||||||
io::ErrorKind::Other,
|
))
|
||||||
format!("Non-zero exit code: {}", code),
|
} else {
|
||||||
))
|
Ok(())
|
||||||
} else {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
}
|
||||||
};
|
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
||||||
Arbiter::stop_system();
|
|
||||||
res
|
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.flatten()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,8 +160,7 @@ impl SystemRunner {
|
||||||
let SystemRunner { mut rt, stop, .. } = self;
|
let SystemRunner { mut rt, stop, .. } = self;
|
||||||
|
|
||||||
// run loop
|
// run loop
|
||||||
Arbiter::run_system(Some(&rt));
|
match rt.block_on(stop) {
|
||||||
let result = match rt.block_on(stop) {
|
|
||||||
Ok(code) => {
|
Ok(code) => {
|
||||||
if code != 0 {
|
if code != 0 {
|
||||||
Err(io::Error::new(
|
Err(io::Error::new(
|
||||||
|
@ -173,19 +172,15 @@ impl SystemRunner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
|
||||||
};
|
}
|
||||||
Arbiter::stop_system();
|
|
||||||
result
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute a future and wait for result.
|
/// Execute a future and wait for result.
|
||||||
|
#[inline]
|
||||||
pub fn block_on<F, O>(&mut self, fut: F) -> O
|
pub fn block_on<F, O>(&mut self, fut: F) -> O
|
||||||
where
|
where
|
||||||
F: Future<Output = O> + 'static,
|
F: Future<Output = O>,
|
||||||
{
|
{
|
||||||
Arbiter::run_system(Some(&self.rt));
|
self.rt.block_on(fut)
|
||||||
let res = self.rt.block_on(fut);
|
|
||||||
Arbiter::stop_system();
|
|
||||||
res
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,8 @@
|
||||||
#![deny(rust_2018_idioms, warnings)]
|
#![deny(rust_2018_idioms, warnings)]
|
||||||
#![allow(clippy::type_complexity)]
|
#![allow(clippy::type_complexity)]
|
||||||
|
|
||||||
|
use std::future::Future;
|
||||||
|
|
||||||
#[cfg(not(test))] // Work around for rust-lang/rust#62127
|
#[cfg(not(test))] // Work around for rust-lang/rust#62127
|
||||||
pub use actix_macros::{main, test};
|
pub use actix_macros::{main, test};
|
||||||
|
|
||||||
|
@ -23,15 +25,12 @@ pub use actix_threadpool as blocking;
|
||||||
/// # Panics
|
/// # Panics
|
||||||
///
|
///
|
||||||
/// This function panics if actix system is not running.
|
/// This function panics if actix system is not running.
|
||||||
|
#[inline]
|
||||||
pub fn spawn<F>(f: F)
|
pub fn spawn<F>(f: F)
|
||||||
where
|
where
|
||||||
F: futures_util::future::Future<Output = ()> + 'static,
|
F: Future<Output = ()> + 'static,
|
||||||
{
|
{
|
||||||
if !System::is_set() {
|
Arbiter::spawn(f)
|
||||||
panic!("System is not running");
|
|
||||||
}
|
|
||||||
|
|
||||||
Arbiter::spawn(f);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Asynchronous signal handling
|
/// Asynchronous signal handling
|
||||||
|
@ -64,3 +63,7 @@ pub mod time {
|
||||||
pub use tokio::time::{interval, interval_at, Interval};
|
pub use tokio::time::{interval, interval_at, Interval};
|
||||||
pub use tokio::time::{timeout, Timeout};
|
pub use tokio::time::{timeout, Timeout};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub mod task {
|
||||||
|
pub use tokio::task::yield_now;
|
||||||
|
}
|
||||||
|
|
|
@ -24,10 +24,11 @@ impl Runtime {
|
||||||
.basic_scheduler()
|
.basic_scheduler()
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
Ok(Runtime {
|
Ok(Runtime { rt, local: LocalSet::new() })
|
||||||
rt,
|
}
|
||||||
local: LocalSet::new(),
|
|
||||||
})
|
pub(super) fn local(&self) -> &LocalSet {
|
||||||
|
&self.local
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawn a future onto the single-threaded runtime.
|
/// Spawn a future onto the single-threaded runtime.
|
||||||
|
@ -84,7 +85,7 @@ impl Runtime {
|
||||||
/// complete execution by calling `block_on` or `run`.
|
/// complete execution by calling `block_on` or `run`.
|
||||||
pub fn block_on<F>(&mut self, f: F) -> F::Output
|
pub fn block_on<F>(&mut self, f: F) -> F::Output
|
||||||
where
|
where
|
||||||
F: Future + 'static,
|
F: Future,
|
||||||
{
|
{
|
||||||
self.local.block_on(&mut self.rt, f)
|
self.local.block_on(&mut self.rt, f)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,19 +1,5 @@
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn start_and_stop() {
|
|
||||||
actix_rt::System::new("start_and_stop").block_on(async move {
|
|
||||||
assert!(
|
|
||||||
actix_rt::Arbiter::is_running(),
|
|
||||||
"System doesn't seem to have started"
|
|
||||||
);
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
!actix_rt::Arbiter::is_running(),
|
|
||||||
"System doesn't seem to have stopped"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn await_for_timer() {
|
fn await_for_timer() {
|
||||||
let time = Duration::from_secs(2);
|
let time = Duration::from_secs(2);
|
||||||
|
@ -75,40 +61,3 @@ fn join_another_arbiter() {
|
||||||
"Premature stop of arbiter should conclude regardless of it's current state"
|
"Premature stop of arbiter should conclude regardless of it's current state"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn join_current_arbiter() {
|
|
||||||
let time = Duration::from_secs(2);
|
|
||||||
|
|
||||||
let instant = Instant::now();
|
|
||||||
actix_rt::System::new("test_join_current_arbiter").block_on(async move {
|
|
||||||
actix_rt::spawn(async move {
|
|
||||||
tokio::time::delay_for(time).await;
|
|
||||||
actix_rt::Arbiter::current().stop();
|
|
||||||
});
|
|
||||||
actix_rt::Arbiter::local_join().await;
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
instant.elapsed() >= time,
|
|
||||||
"Join on current arbiter should wait for all spawned futures"
|
|
||||||
);
|
|
||||||
|
|
||||||
let large_timer = Duration::from_secs(20);
|
|
||||||
let instant = Instant::now();
|
|
||||||
actix_rt::System::new("test_join_current_arbiter").block_on(async move {
|
|
||||||
actix_rt::spawn(async move {
|
|
||||||
tokio::time::delay_for(time).await;
|
|
||||||
actix_rt::Arbiter::current().stop();
|
|
||||||
});
|
|
||||||
let f = actix_rt::Arbiter::local_join();
|
|
||||||
actix_rt::spawn(async move {
|
|
||||||
tokio::time::delay_for(large_timer).await;
|
|
||||||
actix_rt::Arbiter::current().stop();
|
|
||||||
});
|
|
||||||
f.await;
|
|
||||||
});
|
|
||||||
assert!(
|
|
||||||
instant.elapsed() < large_timer,
|
|
||||||
"local_join should await only for the already spawned futures"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
|
@ -22,13 +22,16 @@ fn test_bind() {
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
let h = thread::spawn(move || {
|
let h = thread::spawn(move || {
|
||||||
let sys = actix_rt::System::new("test");
|
let mut sys = actix_rt::System::new("test");
|
||||||
let srv = Server::build()
|
|
||||||
.workers(1)
|
let srv = sys.block_on(async {
|
||||||
.disable_signals()
|
Server::build()
|
||||||
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
|
.workers(1)
|
||||||
.unwrap()
|
.disable_signals()
|
||||||
.start();
|
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
|
||||||
|
.unwrap()
|
||||||
|
.start()
|
||||||
|
});
|
||||||
let _ = tx.send((srv, actix_rt::System::current()));
|
let _ = tx.send((srv, actix_rt::System::current()));
|
||||||
let _ = sys.run();
|
let _ = sys.run();
|
||||||
});
|
});
|
||||||
|
@ -46,14 +49,16 @@ fn test_listen() {
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
let h = thread::spawn(move || {
|
let h = thread::spawn(move || {
|
||||||
let sys = actix_rt::System::new("test");
|
let mut sys = actix_rt::System::new("test");
|
||||||
let lst = net::TcpListener::bind(addr).unwrap();
|
let lst = net::TcpListener::bind(addr).unwrap();
|
||||||
Server::build()
|
sys.block_on(async {
|
||||||
.disable_signals()
|
Server::build()
|
||||||
.workers(1)
|
.disable_signals()
|
||||||
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
|
.workers(1)
|
||||||
.unwrap()
|
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
|
||||||
.start();
|
.unwrap()
|
||||||
|
.start()
|
||||||
|
});
|
||||||
let _ = tx.send(actix_rt::System::current());
|
let _ = tx.send(actix_rt::System::current());
|
||||||
let _ = sys.run();
|
let _ = sys.run();
|
||||||
});
|
});
|
||||||
|
@ -78,19 +83,21 @@ fn test_start() {
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
|
|
||||||
let h = thread::spawn(move || {
|
let h = thread::spawn(move || {
|
||||||
let sys = actix_rt::System::new("test");
|
let mut sys = actix_rt::System::new("test");
|
||||||
let srv: Server = Server::build()
|
let srv = sys.block_on(async {
|
||||||
.backlog(100)
|
Server::build()
|
||||||
.disable_signals()
|
.backlog(100)
|
||||||
.bind("test", addr, move || {
|
.disable_signals()
|
||||||
fn_service(|io: TcpStream| async move {
|
.bind("test", addr, move || {
|
||||||
let mut f = Framed::new(io, BytesCodec);
|
fn_service(|io: TcpStream| async move {
|
||||||
f.send(Bytes::from_static(b"test")).await.unwrap();
|
let mut f = Framed::new(io, BytesCodec);
|
||||||
Ok::<_, ()>(())
|
f.send(Bytes::from_static(b"test")).await.unwrap();
|
||||||
|
Ok::<_, ()>(())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
.unwrap()
|
||||||
.unwrap()
|
.start()
|
||||||
.start();
|
});
|
||||||
|
|
||||||
let _ = tx.send((srv, actix_rt::System::current()));
|
let _ = tx.send((srv, actix_rt::System::current()));
|
||||||
let _ = sys.run();
|
let _ = sys.run();
|
||||||
|
@ -144,29 +151,31 @@ fn test_configure() {
|
||||||
|
|
||||||
let h = thread::spawn(move || {
|
let h = thread::spawn(move || {
|
||||||
let num = num2.clone();
|
let num = num2.clone();
|
||||||
let sys = actix_rt::System::new("test");
|
let mut sys = actix_rt::System::new("test");
|
||||||
let srv = Server::build()
|
let srv = sys.block_on(async {
|
||||||
.disable_signals()
|
Server::build()
|
||||||
.configure(move |cfg| {
|
.disable_signals()
|
||||||
let num = num.clone();
|
.configure(move |cfg| {
|
||||||
let lst = net::TcpListener::bind(addr3).unwrap();
|
let num = num.clone();
|
||||||
cfg.bind("addr1", addr1)
|
let lst = net::TcpListener::bind(addr3).unwrap();
|
||||||
.unwrap()
|
cfg.bind("addr1", addr1)
|
||||||
.bind("addr2", addr2)
|
.unwrap()
|
||||||
.unwrap()
|
.bind("addr2", addr2)
|
||||||
.listen("addr3", lst)
|
.unwrap()
|
||||||
.apply(move |rt| {
|
.listen("addr3", lst)
|
||||||
let num = num.clone();
|
.apply(move |rt| {
|
||||||
rt.service("addr1", fn_service(|_| ok::<_, ()>(())));
|
let num = num.clone();
|
||||||
rt.service("addr3", fn_service(|_| ok::<_, ()>(())));
|
rt.service("addr1", fn_service(|_| ok::<_, ()>(())));
|
||||||
rt.on_start(lazy(move |_| {
|
rt.service("addr3", fn_service(|_| ok::<_, ()>(())));
|
||||||
let _ = num.fetch_add(1, Relaxed);
|
rt.on_start(lazy(move |_| {
|
||||||
}))
|
let _ = num.fetch_add(1, Relaxed);
|
||||||
})
|
}))
|
||||||
})
|
})
|
||||||
.unwrap()
|
})
|
||||||
.workers(1)
|
.unwrap()
|
||||||
.start();
|
.workers(1)
|
||||||
|
.start()
|
||||||
|
});
|
||||||
let _ = tx.send((srv, actix_rt::System::current()));
|
let _ = tx.send((srv, actix_rt::System::current()));
|
||||||
let _ = sys.run();
|
let _ = sys.run();
|
||||||
});
|
});
|
||||||
|
|
|
@ -80,15 +80,18 @@ impl TestServer {
|
||||||
|
|
||||||
// run server in separate thread
|
// run server in separate thread
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
let sys = System::new("actix-test-server");
|
let mut sys = System::new("actix-test-server");
|
||||||
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
|
||||||
let local_addr = tcp.local_addr().unwrap();
|
let local_addr = tcp.local_addr().unwrap();
|
||||||
|
|
||||||
Server::build()
|
sys.block_on(async {
|
||||||
.listen("test", tcp, factory)?
|
Server::build()
|
||||||
.workers(1)
|
.listen("test", tcp, factory)
|
||||||
.disable_signals()
|
.unwrap()
|
||||||
.start();
|
.workers(1)
|
||||||
|
.disable_signals()
|
||||||
|
.start()
|
||||||
|
});
|
||||||
|
|
||||||
tx.send((System::current(), local_addr)).unwrap();
|
tx.send((System::current(), local_addr)).unwrap();
|
||||||
sys.run()
|
sys.run()
|
||||||
|
|
Loading…
Reference in New Issue