mirror of https://github.com/fafhrd91/actix-net
cleanup and rustfmt
This commit is contained in:
parent
1e82dad4f0
commit
0394b49841
|
@ -17,7 +17,7 @@ fn main() {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
// you can remove this two lines if you don't have tokio_uring feature enabled.
|
// you can remove this two lines if you don't have tokio_uring feature enabled.
|
||||||
// Note: tokio_uring is single thread and can't be multithreaded,
|
// Note: tokio_uring is single thread and can't be multithreaded,
|
||||||
// unless you spawn multiple of them
|
// unless you spawn multiple of them
|
||||||
#[cfg(feature = "io-uring")]
|
#[cfg(feature = "io-uring")]
|
||||||
tokio_uring::Runtime::new(&tokio_uring::builder()).unwrap()
|
tokio_uring::Runtime::new(&tokio_uring::builder()).unwrap()
|
||||||
|
|
|
@ -95,7 +95,6 @@ impl Arbiter {
|
||||||
///
|
///
|
||||||
/// # Panics
|
/// # Panics
|
||||||
/// Panics if a [System] is not registered on the current thread.
|
/// Panics if a [System] is not registered on the current thread.
|
||||||
//#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
|
||||||
#[allow(clippy::new_without_default)]
|
#[allow(clippy::new_without_default)]
|
||||||
pub fn new() -> Arbiter {
|
pub fn new() -> Arbiter {
|
||||||
Self::with_tokio_rt(|| {
|
Self::with_tokio_rt(|| {
|
||||||
|
@ -107,7 +106,6 @@ impl Arbiter {
|
||||||
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
|
||||||
///
|
///
|
||||||
/// [tokio-runtime]: tokio::runtime::Runtime
|
/// [tokio-runtime]: tokio::runtime::Runtime
|
||||||
//#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
|
||||||
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
|
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
|
||||||
where
|
where
|
||||||
F: Fn() -> crate::runtime::GlobalRuntime + Send + 'static,
|
F: Fn() -> crate::runtime::GlobalRuntime + Send + 'static,
|
||||||
|
@ -158,58 +156,6 @@ impl Arbiter {
|
||||||
Arbiter { tx, thread_handle }
|
Arbiter { tx, thread_handle }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
|
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
/// Panics if a [System] is not registered on the current thread.
|
|
||||||
/*#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
|
||||||
#[allow(clippy::new_without_default)]
|
|
||||||
pub fn new() -> Arbiter {
|
|
||||||
let sys = System::current();
|
|
||||||
let system_id = sys.id();
|
|
||||||
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);
|
|
||||||
|
|
||||||
let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
|
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
|
|
||||||
|
|
||||||
let thread_handle = thread::Builder::new()
|
|
||||||
.name(name.clone())
|
|
||||||
.spawn({
|
|
||||||
let tx = tx.clone();
|
|
||||||
move || {
|
|
||||||
let hnd = ArbiterHandle::new(tx);
|
|
||||||
|
|
||||||
System::set_current(sys);
|
|
||||||
|
|
||||||
HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));
|
|
||||||
|
|
||||||
// register arbiter
|
|
||||||
let _ = System::current()
|
|
||||||
.tx()
|
|
||||||
.send(SystemCommand::RegisterArbiter(arb_id, hnd));
|
|
||||||
|
|
||||||
ready_tx.send(()).unwrap();
|
|
||||||
|
|
||||||
// run arbiter event processing loop
|
|
||||||
tokio_uring::start(ArbiterRunner { rx });
|
|
||||||
|
|
||||||
// deregister arbiter
|
|
||||||
let _ = System::current()
|
|
||||||
.tx()
|
|
||||||
.send(SystemCommand::DeregisterArbiter(arb_id));
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.unwrap_or_else(|err| {
|
|
||||||
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
|
|
||||||
});
|
|
||||||
|
|
||||||
ready_rx.recv().unwrap();
|
|
||||||
|
|
||||||
Arbiter { tx, thread_handle }
|
|
||||||
}*/
|
|
||||||
|
|
||||||
/// Sets up an Arbiter runner in a new System using the environment's local set.
|
/// Sets up an Arbiter runner in a new System using the environment's local set.
|
||||||
pub(crate) fn in_new_system() -> ArbiterHandle {
|
pub(crate) fn in_new_system() -> ArbiterHandle {
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
|
@ -6,7 +6,7 @@ use tokio::task::{JoinHandle, LocalSet};
|
||||||
///
|
///
|
||||||
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
|
/// All spawned futures will be executed on the current thread. Therefore, there is no `Send` bound
|
||||||
/// on submitted futures.
|
/// on submitted futures.
|
||||||
#[cfg_attr(not(all(target_os = "linux", feature = "io-uring")),derive(Debug))]
|
#[cfg_attr(not(all(target_os = "linux", feature = "io-uring")), derive(Debug))]
|
||||||
pub struct Runtime {
|
pub struct Runtime {
|
||||||
local: LocalSet,
|
local: LocalSet,
|
||||||
rt: GlobalRuntime,
|
rt: GlobalRuntime,
|
||||||
|
|
|
@ -29,7 +29,6 @@ pub struct System {
|
||||||
arbiter_handle: ArbiterHandle,
|
arbiter_handle: ArbiterHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
//#[cfg(not(feature = "io-uring"))]
|
|
||||||
impl System {
|
impl System {
|
||||||
/// Create a new system.
|
/// Create a new system.
|
||||||
///
|
///
|
||||||
|
@ -193,7 +192,6 @@ impl System {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
||||||
//#[cfg(not(feature = "io-uring"))]
|
|
||||||
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct SystemRunner {
|
pub struct SystemRunner {
|
||||||
|
@ -201,7 +199,6 @@ pub struct SystemRunner {
|
||||||
stop_rx: oneshot::Receiver<i32>,
|
stop_rx: oneshot::Receiver<i32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
//#[cfg(not(feature = "io-uring"))]
|
|
||||||
impl SystemRunner {
|
impl SystemRunner {
|
||||||
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
||||||
pub fn run(self) -> io::Result<()> {
|
pub fn run(self) -> io::Result<()> {
|
||||||
|
@ -232,55 +229,6 @@ impl SystemRunner {
|
||||||
self.rt.block_on(fut)
|
self.rt.block_on(fut)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
/// Runner that keeps a [System]'s event loop alive until stop message is received.
|
|
||||||
#[cfg(feature = "io-uring")]
|
|
||||||
#[must_use = "A SystemRunner does nothing unless `run` is called."]
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct SystemRunner {
|
|
||||||
rt: crate::runtime::Runtime,
|
|
||||||
stop_rx: oneshot::Receiver<i32>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "io-uring")]
|
|
||||||
impl SystemRunner {
|
|
||||||
/// Starts event loop and will return once [System] is [stopped](System::stop).
|
|
||||||
pub fn run(self) -> io::Result<()> {
|
|
||||||
unimplemented!("SystemRunner::run is not implemented for io-uring feature yet");
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Runs the event loop until [stopped](System::stop_with_code), returning the exit code.
|
|
||||||
pub fn run_with_code(self) -> io::Result<i32> {
|
|
||||||
unimplemented!(
|
|
||||||
"SystemRunner::run_with_code is not implemented for io-uring feature yet"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Runs the provided future, blocking the current thread until the future completes.
|
|
||||||
#[inline]
|
|
||||||
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
|
|
||||||
tokio_uring::start(async move {
|
|
||||||
let (stop_tx, stop_rx) = oneshot::channel();
|
|
||||||
let (sys_tx, sys_rx) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
let sys_arbiter = Arbiter::in_new_system();
|
|
||||||
let system = System::construct(sys_tx, sys_arbiter.clone());
|
|
||||||
|
|
||||||
system
|
|
||||||
.tx()
|
|
||||||
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
// init background system arbiter
|
|
||||||
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
|
|
||||||
tokio_uring::spawn(sys_ctrl);
|
|
||||||
|
|
||||||
let res = fut.await;
|
|
||||||
drop(stop_rx);
|
|
||||||
res
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) enum SystemCommand {
|
pub(crate) enum SystemCommand {
|
||||||
|
|
|
@ -337,9 +337,8 @@ fn new_arbiter_with_tokio() {
|
||||||
|
|
||||||
let _ = System::new();
|
let _ = System::new();
|
||||||
|
|
||||||
let arb = Arbiter::with_tokio_rt(|| {
|
let arb =
|
||||||
tokio_uring::Runtime::new(&tokio_uring::builder()).unwrap()
|
Arbiter::with_tokio_rt(|| tokio_uring::Runtime::new(&tokio_uring::builder()).unwrap());
|
||||||
});
|
|
||||||
|
|
||||||
let counter = Arc::new(AtomicBool::new(true));
|
let counter = Arc::new(AtomicBool::new(true));
|
||||||
|
|
||||||
|
|
|
@ -84,13 +84,13 @@ impl ServerBuilder {
|
||||||
self.worker_config.max_blocking_threads(num);
|
self.worker_config.max_blocking_threads(num);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
||||||
/// Set max number of submission queue and completion queue AKA. ring size
|
/// Set max number of submission queue and completion queue AKA. ring size
|
||||||
/// for each worker's ring.
|
/// for each worker's ring.
|
||||||
///
|
///
|
||||||
/// Max ring size is defined here: [io_uring.c](https://github.com/torvalds/linux/blob/f339c2597ebb00e738f2b6328c14804ed19f5d57/io_uring/io_uring.c#L99)
|
/// Max ring size is defined here: [io_uring.c](https://github.com/torvalds/linux/blob/f339c2597ebb00e738f2b6328c14804ed19f5d57/io_uring/io_uring.c#L99)
|
||||||
///
|
///
|
||||||
/// # Examples:
|
/// # Examples:
|
||||||
/// ```
|
/// ```
|
||||||
/// # use actix_server::ServerBuilder;
|
/// # use actix_server::ServerBuilder;
|
||||||
|
@ -98,13 +98,13 @@ impl ServerBuilder {
|
||||||
/// .workers(4) // server has 4 worker thread.
|
/// .workers(4) // server has 4 worker thread.
|
||||||
/// .worker_max_blocking_threads(512); // every worker has 512 sq & cq.
|
/// .worker_max_blocking_threads(512); // every worker has 512 sq & cq.
|
||||||
/// ```
|
/// ```
|
||||||
///
|
///
|
||||||
/// See [tokio_uring::Builder::entries] for behavior reference.
|
/// See [tokio_uring::Builder::entries] for behavior reference.
|
||||||
pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
|
pub fn worker_max_blocking_threads(mut self, num: usize) -> Self {
|
||||||
self.worker_config.max_blocking_threads(num);
|
self.worker_config.max_blocking_threads(num);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set the maximum number of pending connections.
|
/// Set the maximum number of pending connections.
|
||||||
///
|
///
|
||||||
/// This refers to the number of clients that can be waiting to be served. Exceeding this number
|
/// This refers to the number of clients that can be waiting to be served. Exceeding this number
|
||||||
|
|
|
@ -252,7 +252,7 @@ impl Default for ServerWorkerConfig {
|
||||||
// 512 is the default max blocking thread count of tokio runtime.
|
// 512 is the default max blocking thread count of tokio runtime.
|
||||||
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
|
||||||
let max_blocking_threads = std::cmp::max(512 / num_cpus::get_physical(), 1);
|
let max_blocking_threads = std::cmp::max(512 / num_cpus::get_physical(), 1);
|
||||||
// 256 is default sq & cq size used by tokio_uring
|
// 256 is default sq & cq size used by tokio_uring
|
||||||
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
||||||
let max_blocking_threads = 256;
|
let max_blocking_threads = 256;
|
||||||
Self {
|
Self {
|
||||||
|
@ -389,9 +389,10 @@ impl ServerWorker {
|
||||||
|
|
||||||
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
#[cfg(all(target_os = "linux", feature = "io-uring"))]
|
||||||
{
|
{
|
||||||
// passing `max_blocking_threads` as submission queue & completion queue
|
// passing `max_blocking_threads` as submission queue & completion queue
|
||||||
// should be useful than let it sit here
|
// should be useful than let it sit here
|
||||||
let queue_size = config.max_blocking_threads.clamp(1, u32::MAX as usize) as u32;
|
let queue_size =
|
||||||
|
config.max_blocking_threads.clamp(1, u32::MAX as usize) as u32;
|
||||||
let mut builder = tokio_uring::builder();
|
let mut builder = tokio_uring::builder();
|
||||||
builder.entries(queue_size);
|
builder.entries(queue_size);
|
||||||
builder.start(worker_fut);
|
builder.start(worker_fut);
|
||||||
|
|
Loading…
Reference in New Issue