get rtless working with manual localset

This commit is contained in:
Rob Ede 2021-11-02 23:12:16 +00:00
parent 81d7295486
commit c56265d9aa
6 changed files with 146 additions and 77 deletions

View File

@ -240,6 +240,15 @@ impl Arbiter {
})
}
/// Try to get current running arbiter handle.
///
/// Returns `None` if no Arbiter has been started.
///
/// Contrary to [`current`](Self::current), this never panics.
pub fn try_current() -> Option<ArbiterHandle> {
HANDLE.with(|cell| cell.borrow().clone())
}
/// Stop Arbiter from continuing it's event loop.
///
/// Returns true if stop message was sent successfully and false if the Arbiter has been dropped.

View File

@ -130,7 +130,7 @@ impl System {
///
/// Returns `None` if no System has been started.
///
/// Contrary to `current`, this never panics.
/// Contrary to [`current`](Self::current), this never panics.
pub fn try_current() -> Option<System> {
CURRENT.with(|cell| cell.borrow().clone())
}

View File

@ -10,7 +10,7 @@
//! the length of each line it echos and the total size of data sent when the connection is closed.
use std::{
env, io,
io,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
@ -25,10 +25,8 @@ use futures_util::future::ok;
use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[actix_rt::main]
async fn main() -> io::Result<()> {
env::set_var("RUST_LOG", "info");
env_logger::init();
async fn run() -> io::Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let count = Arc::new(AtomicUsize::new(0));
@ -85,6 +83,25 @@ async fn main() -> io::Result<()> {
})
})?
.workers(1)
// .system_exit()
.run()
.await
}
fn main() -> io::Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let ls = tokio::task::LocalSet::new();
rt.block_on(ls.run_until(run()))?;
Ok(())
}
// #[actix_rt::main]
// async fn main() -> io::Result<()> {
// run().await?;
// Ok(())
// }

View File

@ -156,13 +156,17 @@ impl Accept {
srv: ServerHandle,
handles: Vec<WorkerHandleAccept>,
) {
// Accept runs in its own thread and would want to spawn additional futures to current
// actix system.
let sys = System::current();
// Accept runs in its own thread and might spawn additional futures to current system
let sys = System::try_current();
thread::Builder::new()
.name("actix-server accept loop".to_owned())
.spawn(move || {
System::set_current(sys);
// forward existing actix system context
if let Some(sys) = sys {
System::set_current(sys);
}
let (mut accept, mut sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv);
@ -479,10 +483,23 @@ impl Accept {
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
System::current().arbiter().spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
match System::try_current() {
Some(sys) => {
sys.arbiter().spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
}
None => {
let rt = tokio::runtime::Handle::current();
rt.spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
}
}
return;
}

View File

@ -7,7 +7,7 @@ use std::{
};
use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use log::{error, info};
use log::{error, info, trace};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
oneshot,
@ -160,6 +160,8 @@ impl ServerBuilder {
{
let sockets = bind_addr(addr, self.backlog)?;
trace!("binding server to: {:?}", &sockets);
for lst in sockets {
let token = self.next_token();
self.services.push(StreamNewService::create(
@ -171,6 +173,7 @@ impl ServerBuilder {
self.sockets
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
}
Ok(self)
}
@ -255,6 +258,8 @@ impl ServerBuilder {
if self.sockets.is_empty() {
panic!("Server should have at least one bound socket");
} else {
trace!("start running server");
for (_, name, lst) in &self.sockets {
info!(
r#"Starting service: "{}", workers: {}, listening on: {}"#,
@ -264,6 +269,8 @@ impl ServerBuilder {
);
}
trace!("run server");
// start workers
let handles = (0..self.threads)
.map(|idx| {
@ -301,8 +308,8 @@ impl ServerBuilder {
idx: usize,
waker_queue: WakerQueue,
) -> (WorkerHandleAccept, WorkerHandleServer) {
trace!("start server worker {}", idx);
let services = self.services.iter().map(|v| v.clone_factory()).collect();
ServerWorker::start(idx, services, waker_queue, self.worker_config)
}
@ -384,7 +391,7 @@ impl ServerBuilder {
if exit {
sleep(Duration::from_millis(300)).await;
System::current().stop();
System::try_current().as_ref().map(System::stop);
}
});
}

View File

@ -14,7 +14,7 @@ use std::{
use actix_rt::{
spawn,
time::{sleep, Instant, Sleep},
Arbiter,
Arbiter, ArbiterHandle, System,
};
use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace};
@ -23,12 +23,14 @@ use tokio::sync::{
oneshot,
};
use crate::join_all;
use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::{
join_all,
service::{BoxedServerService, InternalServiceFactory},
socket::MioStream,
waker_queue::{WakerInterest, WakerQueue},
};
/// Stop worker message. Returns `true` on successful graceful shutdown.
/// Stop worker message. Returns `true` on successful graceful shutdown
/// and `false` if some connections still alive when shutdown execute.
pub(crate) struct Stop {
graceful: bool,
@ -273,6 +275,8 @@ impl ServerWorker {
waker_queue: WakerQueue,
config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) {
trace!("starting server worker {}", idx);
let (tx1, rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel();
@ -289,63 +293,77 @@ impl ServerWorker {
Arbiter::new()
};
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
let arbiter = Arbiter::with_tokio_rt(move || {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap()
});
// get actix system context if it is set
let sys = System::try_current();
arbiter.spawn(async move {
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
})
.collect::<Vec<_>>();
std::thread::Builder::new()
.name("eofibef".to_owned())
.spawn(move || {
// forward existing actix system context
if let Some(sys) = sys {
System::set_current(sys);
}
// a second spawn to run !Send future tasks.
spawn(async move {
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
services
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap();
rt.block_on(tokio::task::LocalSet::new().run_until(async move {
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
return;
}
};
.collect::<Vec<_>>();
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
rx,
rx2,
services,
counter: WorkerCounter::new(idx, waker_queue, counter_clone),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
});
});
});
// a second spawn to run !Send future tasks.
spawn(async move {
let res = join_all(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
return;
}
};
// a third spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker {
rx,
rx2,
services,
counter: WorkerCounter::new(idx, waker_queue, counter_clone),
factories: factories.into_boxed_slice(),
state: Default::default(),
shutdown_timeout: config.shutdown_timeout,
})
.await
.expect("task 3 panic");
})
.await
.expect("task 2 panic");
}))
})
.expect("worker thread error/panic");
handle_pair(idx, tx1, tx2, counter)
}
@ -450,8 +468,9 @@ impl Default for WorkerState {
impl Drop for ServerWorker {
fn drop(&mut self) {
trace!("dropping ServerWorker");
// Stop the Arbiter ServerWorker runs on on drop.
Arbiter::current().stop();
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
}
}