mirror of https://github.com/fafhrd91/actix-net
add hidden from tokio methods
This commit is contained in:
parent
ff5c7ae64c
commit
21ba890c0a
|
@ -30,3 +30,4 @@ tokio = { version = "1", features = ["rt", "net", "parking_lot", "signal", "sync
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] }
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
use hyper::service::{make_service_fn, service_fn};
|
||||||
|
use hyper::{Body, Request, Response, Server};
|
||||||
|
use std::convert::Infallible;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
async fn handle(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
|
||||||
|
Ok(Response::new(Body::from("Hello World")))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
actix_rt::System::with_tokio_rt(|| {
|
||||||
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
.block_on(async {
|
||||||
|
let make_service =
|
||||||
|
make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(handle)) });
|
||||||
|
|
||||||
|
let server =
|
||||||
|
Server::bind(&SocketAddr::from(([127, 0, 0, 1], 3000))).serve(make_service);
|
||||||
|
|
||||||
|
if let Err(e) = server.await {
|
||||||
|
eprintln!("server error: {}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
|
@ -12,7 +12,7 @@ use futures_core::ready;
|
||||||
use tokio::{sync::mpsc, task::LocalSet};
|
use tokio::{sync::mpsc, task::LocalSet};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
runtime::Runtime,
|
runtime::{default_tokio_runtime, Runtime},
|
||||||
system::{System, SystemCommand},
|
system::{System, SystemCommand},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -94,12 +94,25 @@ pub struct Arbiter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Arbiter {
|
impl Arbiter {
|
||||||
/// Spawn new Arbiter thread and start its event loop.
|
/// Spawn a new Arbiter thread and start its event loop.
|
||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
/// Panics if a [System] is not registered on the current thread.
|
/// Panics if a [System] is not registered on the current thread.
|
||||||
#[allow(clippy::new_without_default)]
|
#[allow(clippy::new_without_default)]
|
||||||
pub fn new() -> Arbiter {
|
pub fn new() -> Arbiter {
|
||||||
|
Self::with_tokio_rt(|| {
|
||||||
|
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
||||||
|
///
|
||||||
|
/// [tokio-runtime]: tokio::runtime::Runtime
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
|
||||||
|
where
|
||||||
|
F: Fn() -> tokio::runtime::Runtime + Send + 'static,
|
||||||
|
{
|
||||||
let id = COUNT.fetch_add(1, Ordering::Relaxed);
|
let id = COUNT.fetch_add(1, Ordering::Relaxed);
|
||||||
let system_id = System::current().id();
|
let system_id = System::current().id();
|
||||||
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id);
|
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, id);
|
||||||
|
@ -113,7 +126,7 @@ impl Arbiter {
|
||||||
.spawn({
|
.spawn({
|
||||||
let tx = tx.clone();
|
let tx = tx.clone();
|
||||||
move || {
|
move || {
|
||||||
let rt = Runtime::new().expect("Cannot create new Arbiter's Runtime.");
|
let rt = Runtime::from(runtime_factory());
|
||||||
let hnd = ArbiterHandle::new(tx);
|
let hnd = ArbiterHandle::new(tx);
|
||||||
|
|
||||||
System::set_current(sys);
|
System::set_current(sys);
|
||||||
|
|
|
@ -2,7 +2,7 @@ use std::{future::Future, io};
|
||||||
|
|
||||||
use tokio::task::{JoinHandle, LocalSet};
|
use tokio::task::{JoinHandle, LocalSet};
|
||||||
|
|
||||||
/// A single-threaded runtime based on Tokio's "current thread" runtime.
|
/// A Tokio-based runtime proxy.
|
||||||
///
|
///
|
||||||
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
|
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
|
||||||
/// on submitted futures.
|
/// on submitted futures.
|
||||||
|
@ -12,14 +12,18 @@ pub struct Runtime {
|
||||||
rt: tokio::runtime::Runtime,
|
rt: tokio::runtime::Runtime,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn default_tokio_runtime() -> io::Result<tokio::runtime::Runtime> {
|
||||||
|
tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_io()
|
||||||
|
.enable_time()
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
|
||||||
impl Runtime {
|
impl Runtime {
|
||||||
/// Returns a new runtime initialized with default configuration values.
|
/// Returns a new runtime initialized with default configuration values.
|
||||||
#[allow(clippy::new_ret_no_self)]
|
#[allow(clippy::new_ret_no_self)]
|
||||||
pub fn new() -> io::Result<Runtime> {
|
pub fn new() -> io::Result<Self> {
|
||||||
let rt = tokio::runtime::Builder::new_current_thread()
|
let rt = default_tokio_runtime()?;
|
||||||
.enable_io()
|
|
||||||
.enable_time()
|
|
||||||
.build()?;
|
|
||||||
|
|
||||||
Ok(Runtime {
|
Ok(Runtime {
|
||||||
rt,
|
rt,
|
||||||
|
@ -81,3 +85,12 @@ impl Runtime {
|
||||||
self.local.block_on(&self.rt, f)
|
self.local.block_on(&self.rt, f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<tokio::runtime::Runtime> for Runtime {
|
||||||
|
fn from(rt: tokio::runtime::Runtime) -> Self {
|
||||||
|
Self {
|
||||||
|
local: LocalSet::new(),
|
||||||
|
rt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ use std::{
|
||||||
use futures_core::ready;
|
use futures_core::ready;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use crate::{arbiter::ArbiterHandle, Arbiter, Runtime};
|
use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
|
||||||
|
|
||||||
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||||
|
|
||||||
|
@ -36,10 +36,24 @@ impl System {
|
||||||
/// Panics if underlying Tokio runtime can not be created.
|
/// Panics if underlying Tokio runtime can not be created.
|
||||||
#[allow(clippy::new_ret_no_self)]
|
#[allow(clippy::new_ret_no_self)]
|
||||||
pub fn new() -> SystemRunner {
|
pub fn new() -> SystemRunner {
|
||||||
|
Self::with_tokio_rt(|| {
|
||||||
|
default_tokio_runtime()
|
||||||
|
.expect("Default Actix (Tokio) runtime could not be created.")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
||||||
|
///
|
||||||
|
/// [tokio-runtime]: tokio::runtime::Runtime
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn with_tokio_rt<F>(runtime_factory: F) -> SystemRunner
|
||||||
|
where
|
||||||
|
F: Fn() -> tokio::runtime::Runtime,
|
||||||
|
{
|
||||||
let (stop_tx, stop_rx) = oneshot::channel();
|
let (stop_tx, stop_rx) = oneshot::channel();
|
||||||
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let rt = Runtime::new().expect("Actix (Tokio) runtime could not be created.");
|
let rt = Runtime::from(runtime_factory());
|
||||||
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
|
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
|
||||||
let system = System::construct(sys_tx, sys_arbiter.clone());
|
let system = System::construct(sys_tx, sys_arbiter.clone());
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,9 @@
|
||||||
use std::{
|
use std::{
|
||||||
sync::mpsc::channel,
|
sync::{
|
||||||
|
atomic::{AtomicBool, Ordering},
|
||||||
|
mpsc::channel,
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
thread,
|
thread,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
@ -200,3 +204,62 @@ fn system_stop_stops_arbiters() {
|
||||||
|
|
||||||
arb.join().unwrap();
|
arb.join().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn new_system_with_tokio() {
|
||||||
|
let (tx, rx) = channel();
|
||||||
|
|
||||||
|
let res = System::with_tokio_rt(move || {
|
||||||
|
tokio::runtime::Builder::new_multi_thread()
|
||||||
|
.enable_io()
|
||||||
|
.enable_time()
|
||||||
|
.thread_keep_alive(Duration::from_millis(1000))
|
||||||
|
.worker_threads(2)
|
||||||
|
.max_blocking_threads(2)
|
||||||
|
.on_thread_start(|| {})
|
||||||
|
.on_thread_stop(|| {})
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
})
|
||||||
|
.block_on(async {
|
||||||
|
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||||
|
|
||||||
|
tokio::task::spawn(async move {
|
||||||
|
tx.send(42).unwrap();
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
123usize
|
||||||
|
});
|
||||||
|
|
||||||
|
assert_eq!(res, 123);
|
||||||
|
assert_eq!(rx.recv().unwrap(), 42);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn new_arbiter_with_tokio() {
|
||||||
|
let _ = System::new();
|
||||||
|
|
||||||
|
let arb = Arbiter::with_tokio_rt(|| {
|
||||||
|
tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
let counter = Arc::new(AtomicBool::new(true));
|
||||||
|
|
||||||
|
let counter1 = counter.clone();
|
||||||
|
let did_spawn = arb.spawn(async move {
|
||||||
|
actix_rt::time::sleep(Duration::from_millis(1)).await;
|
||||||
|
counter1.store(false, Ordering::SeqCst);
|
||||||
|
Arbiter::current().stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
assert!(did_spawn);
|
||||||
|
|
||||||
|
arb.join().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(false, counter.load(Ordering::SeqCst));
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue