Merge branch 'master' into ups/system-run-until-stop

This commit is contained in:
Rob Ede 2021-11-15 17:58:20 +00:00 committed by GitHub
commit a9eccaa26f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 361 additions and 223 deletions

View File

@ -196,13 +196,6 @@ jobs:
- name: Cache Dependencies - name: Cache Dependencies
uses: Swatinem/rust-cache@v1.3.0 uses: Swatinem/rust-cache@v1.3.0
- name: Install cargo-hack - name: doc tests io-uring
uses: actions-rs/cargo@v1 run: |
with: sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=nightly cargo ci-doctest"
command: install
args: cargo-hack
- name: doc tests
uses: actions-rs/cargo@v1
timeout-minutes: 40
with: { command: ci-doctest }

View File

@ -111,7 +111,7 @@
[#129]: https://github.com/actix/actix-net/issues/129 [#129]: https://github.com/actix/actix-net/issues/129
## 1.1.0 - 2020-04-08 (YANKED) ## 1.1.0 - 2020-04-08 _(YANKED)_
* Expose `System::is_set` to check if current system has ben started [#99] * Expose `System::is_set` to check if current system has ben started [#99]
* Add `Arbiter::is_running` to check if event loop is running [#124] * Add `Arbiter::is_running` to check if event loop is running [#124]
* Add `Arbiter::local_join` associated function * Add `Arbiter::local_join` associated function

View File

@ -1,12 +1,21 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 2.0.0-beta.9 - 2021-11-15
* Restore `Arbiter` support lost in `beta.8`. [#417]
[#417]: https://github.com/actix/actix-net/pull/417
## 2.0.0-beta.8 - 2021-11-05 _(YANKED)_
* Fix non-unix signal handler. [#410] * Fix non-unix signal handler. [#410]
[#410]: https://github.com/actix/actix-net/pull/410 [#410]: https://github.com/actix/actix-net/pull/410
## 2.0.0-beta.7 - 2021-11-05 ## 2.0.0-beta.7 - 2021-11-05 _(YANKED)_
* Server can be started in regular Tokio runtime. [#408] * Server can be started in regular Tokio runtime. [#408]
* Expose new `Server` type whose `Future` impl resolves when server stops. [#408] * Expose new `Server` type whose `Future` impl resolves when server stops. [#408]
* Rename `Server` to `ServerHandle`. [#407] * Rename `Server` to `ServerHandle`. [#407]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-server" name = "actix-server"
version = "2.0.0-beta.7" version = "2.0.0-beta.9"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"fakeshadow <24548779@qq.com>", "fakeshadow <24548779@qq.com>",
@ -18,7 +18,7 @@ path = "src/lib.rs"
[features] [features]
default = [] default = []
io-uring = ["actix-rt/io-uring"] io-uring = ["tokio-uring", "actix-rt/io-uring"]
[dependencies] [dependencies]
actix-rt = { version = "2.4.0", default-features = false } actix-rt = { version = "2.4.0", default-features = false }
@ -26,11 +26,16 @@ actix-service = "2.0.0"
actix-utils = "3.0.0" actix-utils = "3.0.0"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4" log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] } mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13" num_cpus = "1.13"
socket2 = "0.4.2"
tokio = { version = "1.5.1", features = ["sync"] } tokio = { version = "1.5.1", features = ["sync"] }
# runtime for io-uring feature
tokio-uring = { version = "0.1", optional = true }
[dev-dependencies] [dev-dependencies]
actix-codec = "0.4.0" actix-codec = "0.4.0"
actix-rt = "2.0.0" actix-rt = "2.0.0"

View File

@ -127,7 +127,7 @@ impl Accept {
let mut events = mio::Events::with_capacity(256); let mut events = mio::Events::with_capacity(256);
loop { loop {
if let Err(e) = self.poll.poll(&mut events, None) { if let Err(e) = self.poll.poll(&mut events, self.timeout) {
match e.kind() { match e.kind() {
io::ErrorKind::Interrupted => {} io::ErrorKind::Interrupted => {}
_ => panic!("Poll error: {}", e), _ => panic!("Poll error: {}", e),

View File

@ -8,7 +8,8 @@ use crate::{
server::ServerCommand, server::ServerCommand,
service::{InternalServiceFactory, ServiceFactory, StreamNewService}, service::{InternalServiceFactory, ServiceFactory, StreamNewService},
socket::{ socket::{
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, create_mio_tcp_listener, MioListener, MioTcpListener, StdSocketAddr, StdTcpListener,
ToSocketAddrs,
}, },
worker::ServerWorkerConfig, worker::ServerWorkerConfig,
Server, Server,
@ -112,7 +113,7 @@ impl ServerBuilder {
self.max_concurrent_connections(num) self.max_concurrent_connections(num)
} }
/// Stop Actix system. /// Stop Actix `System` after server shutdown.
pub fn system_exit(mut self) -> Self { pub fn system_exit(mut self) -> Self {
self.exit = true; self.exit = true;
self self
@ -263,7 +264,7 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
let mut success = false; let mut success = false;
let mut sockets = Vec::new(); let mut sockets = Vec::new();
for addr in addr.to_socket_addrs()? { for addr in addr.to_socket_addrs()? {
match create_tcp_listener(addr, backlog) { match create_mio_tcp_listener(addr, backlog) {
Ok(lst) => { Ok(lst) => {
success = true; success = true;
sockets.push(lst); sockets.push(lst);
@ -283,14 +284,3 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
)) ))
} }
} }
fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result<MioTcpListener> {
let socket = match addr {
StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?,
StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?,
};
socket.set_reuseaddr(true)?;
socket.bind(addr)?;
socket.listen(backlog)
}

View File

@ -42,10 +42,12 @@ impl ServerHandle {
/// Stop incoming connection processing, stop all workers and exit. /// Stop incoming connection processing, stop all workers and exit.
pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> { pub fn stop(&self, graceful: bool) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.cmd_tx.send(ServerCommand::Stop { let _ = self.cmd_tx.send(ServerCommand::Stop {
graceful, graceful,
completion: Some(tx), completion: Some(tx),
}); });
async { async {
let _ = rx.await; let _ = rx.await;
} }

View File

@ -4,7 +4,7 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use futures_core::future::{BoxFuture, LocalBoxFuture}; use futures_core::future::BoxFuture;
// a poor man's join future. joined future is only used when starting/stopping the server. // a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task. // pin_project and pinned futures are overkill for this task.
@ -61,63 +61,6 @@ impl<T> Future for JoinAll<T> {
} }
} }
pub(crate) fn join_all_local<T>(
fut: Vec<impl Future<Output = T> + 'static>,
) -> JoinAllLocal<T> {
let fut = fut
.into_iter()
.map(|f| JoinLocalFuture::LocalFuture(Box::pin(f)))
.collect();
JoinAllLocal { fut }
}
// a poor man's join future. joined future is only used when starting/stopping the server.
// pin_project and pinned futures are overkill for this task.
pub(crate) struct JoinAllLocal<T> {
fut: Vec<JoinLocalFuture<T>>,
}
enum JoinLocalFuture<T> {
LocalFuture(LocalBoxFuture<'static, T>),
Result(Option<T>),
}
impl<T> Unpin for JoinAllLocal<T> {}
impl<T> Future for JoinAllLocal<T> {
type Output = Vec<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut ready = true;
let this = self.get_mut();
for fut in this.fut.iter_mut() {
if let JoinLocalFuture::LocalFuture(f) = fut {
match f.as_mut().poll(cx) {
Poll::Ready(t) => {
*fut = JoinLocalFuture::Result(Some(t));
}
Poll::Pending => ready = false,
}
}
}
if ready {
let mut res = Vec::new();
for fut in this.fut.iter_mut() {
if let JoinLocalFuture::Result(f) = fut {
res.push(f.take().unwrap());
}
}
Poll::Ready(res)
} else {
Poll::Pending
}
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
@ -132,13 +75,4 @@ mod test {
assert_eq!(Err(3), res.next().unwrap()); assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap()); assert_eq!(Ok(9), res.next().unwrap());
} }
#[actix_rt::test]
async fn test_join_all_local() {
let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))];
let mut res = join_all_local(futs).await.into_iter();
assert_eq!(Ok(1), res.next().unwrap());
assert_eq!(Err(3), res.next().unwrap());
assert_eq!(Ok(9), res.next().unwrap());
}
} }

View File

@ -132,12 +132,12 @@ impl Server {
.collect(); .collect();
// Give log information on what runtime will be used. // Give log information on what runtime will be used.
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
let is_actix = actix_rt::System::try_current().is_some(); let is_actix = actix_rt::System::try_current().is_some();
let is_tokio = tokio::runtime::Handle::try_current().is_ok();
match (is_tokio, is_actix) { match (is_actix, is_tokio) {
(true, false) => info!("Tokio runtime found. Starting in existing Tokio runtime"), (false, true) => info!("Tokio runtime found. Starting in existing Tokio runtime"),
(_, true) => info!("Actix runtime found. Starting in Actix runtime"), (true, _) => info!("Actix runtime found. Starting in Actix runtime"),
(_, _) => info!( (_, _) => info!(
"Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime" "Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime"
), ),
@ -196,11 +196,11 @@ impl Future for Server {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.as_mut().get_mut() { match self.as_mut().get_mut() {
Server::Error(err) => Poll::Ready(Err(err Self::Error(err) => Poll::Ready(Err(err
.take() .take()
.expect("Server future cannot be polled after error"))), .expect("Server future cannot be polled after error"))),
Server::Server(inner) => { Self::Server(inner) => {
// poll Signals // poll Signals
if let Some(ref mut signals) = inner.signals { if let Some(ref mut signals) = inner.signals {
if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { if let Poll::Ready(signal) = Pin::new(signals).poll(cx) {

View File

@ -2,7 +2,7 @@ pub(crate) use std::net::{
SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs, SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs,
}; };
pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket}; pub(crate) use mio::net::TcpListener as MioTcpListener;
#[cfg(unix)] #[cfg(unix)]
pub(crate) use { pub(crate) use {
mio::net::UnixListener as MioUnixListener, mio::net::UnixListener as MioUnixListener,
@ -223,6 +223,22 @@ mod unix_impl {
} }
} }
pub(crate) fn create_mio_tcp_listener(
addr: StdSocketAddr,
backlog: u32,
) -> io::Result<MioTcpListener> {
use socket2::{Domain, Protocol, Socket, Type};
let socket = Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP))?;
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.bind(&addr.into())?;
socket.listen(backlog as i32)?;
Ok(MioTcpListener::from_std(StdTcpListener::from(socket)))
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -234,11 +250,8 @@ mod tests {
assert_eq!(format!("{}", addr), "127.0.0.1:8080"); assert_eq!(format!("{}", addr), "127.0.0.1:8080");
let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = MioTcpSocket::new_v4().unwrap(); let lst = create_mio_tcp_listener(addr, 128).unwrap();
socket.set_reuseaddr(true).unwrap(); let lst = MioListener::Tcp(lst);
socket.bind(addr).unwrap();
let tcp = socket.listen(128).unwrap();
let lst = MioListener::Tcp(tcp);
assert!(format!("{:?}", lst).contains("TcpListener")); assert!(format!("{:?}", lst).contains("TcpListener"));
assert!(format!("{}", lst).contains("127.0.0.1")); assert!(format!("{}", lst).contains("127.0.0.1"));
} }

View File

@ -147,3 +147,16 @@ impl Drop for TestServerRuntime {
self.stop() self.stop()
} }
} }
#[cfg(test)]
mod tests {
use actix_service::fn_service;
use super::*;
#[tokio::test]
async fn plain_tokio_runtime() {
let srv = TestServer::with(|| fn_service(|_sock| async move { Ok::<_, ()>(()) }));
assert!(srv.connect().is_ok());
}
}

View File

@ -24,7 +24,6 @@ use tokio::sync::{
}; };
use crate::{ use crate::{
join_all::join_all_local,
service::{BoxedServerService, InternalServiceFactory}, service::{BoxedServerService, InternalServiceFactory},
socket::MioStream, socket::MioStream,
waker_queue::{WakerInterest, WakerQueue}, waker_queue::{WakerInterest, WakerQueue},
@ -202,8 +201,8 @@ impl WorkerHandleServer {
pub(crate) struct ServerWorker { pub(crate) struct ServerWorker {
// UnboundedReceiver<Conn> should always be the first field. // UnboundedReceiver<Conn> should always be the first field.
// It must be dropped as soon as ServerWorker dropping. // It must be dropped as soon as ServerWorker dropping.
rx: UnboundedReceiver<Conn>, conn_rx: UnboundedReceiver<Conn>,
rx2: UnboundedReceiver<Stop>, stop_rx: UnboundedReceiver<Stop>,
counter: WorkerCounter, counter: WorkerCounter,
services: Box<[WorkerService]>, services: Box<[WorkerService]>,
factories: Box<[Box<dyn InternalServiceFactory>]>, factories: Box<[Box<dyn InternalServiceFactory>]>,
@ -212,7 +211,7 @@ pub(crate) struct ServerWorker {
} }
struct WorkerService { struct WorkerService {
factory: usize, factory_idx: usize,
status: WorkerServiceStatus, status: WorkerServiceStatus,
service: BoxedServerService, service: BoxedServerService,
} }
@ -234,6 +233,12 @@ enum WorkerServiceStatus {
Stopped, Stopped,
} }
impl Default for WorkerServiceStatus {
fn default() -> Self {
Self::Unavailable
}
}
/// Config for worker behavior passed down from server builder. /// Config for worker behavior passed down from server builder.
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub(crate) struct ServerWorkerConfig { pub(crate) struct ServerWorkerConfig {
@ -277,14 +282,131 @@ impl ServerWorker {
) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> {
trace!("starting server worker {}", idx); trace!("starting server worker {}", idx);
let (tx1, rx) = unbounded_channel(); let (tx1, conn_rx) = unbounded_channel();
let (tx2, rx2) = unbounded_channel(); let (tx2, stop_rx) = unbounded_channel();
let counter = Counter::new(config.max_concurrent_connections); let counter = Counter::new(config.max_concurrent_connections);
let pair = handle_pair(idx, tx1, tx2, counter.clone());
let counter_clone = counter.clone(); // get actix system context if it is set
// every worker runs in it's own arbiter. let actix_system = System::try_current();
// get tokio runtime handle if it is set
let tokio_handle = tokio::runtime::Handle::try_current().ok();
// service factories initialization channel
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel::<io::Result<()>>(1);
// outline of following code:
//
// if system exists
// if uring enabled
// start arbiter using uring method
// else
// start arbiter with regular tokio
// else
// if uring enabled
// start uring in spawned thread
// else
// start regular tokio in spawned thread
// every worker runs in it's own thread and tokio runtime.
// use a custom tokio runtime builder to change the settings of runtime. // use a custom tokio runtime builder to change the settings of runtime.
match (actix_system, tokio_handle) {
(None, None) => {
panic!("No runtime detected. Start a Tokio (or Actix) runtime.");
}
// no actix system
(None, Some(rt_handle)) => {
std::thread::Builder::new()
.name(format!("actix-server worker {}", idx))
.spawn(move || {
let (worker_stopped_tx, worker_stopped_rx) = oneshot::channel();
// local set for running service init futures and worker services
let ls = tokio::task::LocalSet::new();
// init services using existing Tokio runtime (so probably on main thread)
let services = rt_handle.block_on(ls.run_until(async {
let mut services = Vec::new();
for (idx, factory) in factories.iter().enumerate() {
match factory.create().await {
Ok((token, svc)) => services.push((idx, token, svc)),
Err(err) => {
error!("Can not start worker: {:?}", err);
return Err(io::Error::new(
io::ErrorKind::Other,
format!("can not start server service {}", idx),
));
}
}
}
Ok(services)
}));
let services = match services {
Ok(services) => {
factory_tx.send(Ok(())).unwrap();
services
}
Err(err) => {
factory_tx.send(Err(err)).unwrap();
return;
}
};
let worker_services = wrap_worker_services(services);
let worker_fut = async move {
// spawn to make sure ServerWorker runs as non boxed future.
spawn(async move {
ServerWorker {
conn_rx,
stop_rx,
services: worker_services.into_boxed_slice(),
counter: WorkerCounter::new(idx, waker_queue, counter),
factories: factories.into_boxed_slice(),
state: WorkerState::default(),
shutdown_timeout: config.shutdown_timeout,
}
.await;
// wake up outermost task waiting for shutdown
worker_stopped_tx.send(()).unwrap();
});
worker_stopped_rx.await.unwrap();
};
#[cfg(all(target_os = "linux", feature = "io-uring"))]
{
// TODO: pass max blocking thread config when tokio-uring enable configuration
// on building runtime.
let _ = config.max_blocking_threads;
tokio_uring::start(worker_fut);
}
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
{
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.max_blocking_threads(config.max_blocking_threads)
.build()
.unwrap();
rt.block_on(ls.run_until(worker_fut));
}
})
.expect("cannot spawn server worker thread");
}
// with actix system
(Some(_sys), _) => {
#[cfg(all(target_os = "linux", feature = "io-uring"))] #[cfg(all(target_os = "linux", feature = "io-uring"))]
let arbiter = { let arbiter = {
// TODO: pass max blocking thread config when tokio-uring enable configuration // TODO: pass max blocking thread config when tokio-uring enable configuration
@ -293,89 +415,63 @@ impl ServerWorker {
Arbiter::new() Arbiter::new()
}; };
// get actix system context if it is set #[cfg(not(all(target_os = "linux", feature = "io-uring")))]
let sys = System::try_current(); let arbiter = {
Arbiter::with_tokio_rt(move || {
// service factories initialization channel tokio::runtime::Builder::new_current_thread()
let (factory_tx, factory_rx) = std::sync::mpsc::sync_channel(1);
std::thread::Builder::new()
.name(format!("actix-server worker {}", idx))
.spawn(move || {
// forward existing actix system context
if let Some(sys) = sys {
System::set_current(sys);
}
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.max_blocking_threads(config.max_blocking_threads) .max_blocking_threads(config.max_blocking_threads)
.build() .build()
.unwrap(); .unwrap()
rt.block_on(tokio::task::LocalSet::new().run_until(async move {
let fut = factories
.iter()
.enumerate()
.map(|(idx, factory)| {
let fut = factory.create();
async move { fut.await.map(|(t, s)| (idx, t, s)) }
}) })
.collect::<Vec<_>>();
// a second spawn to run !Send future tasks.
spawn(async move {
let res = join_all_local(fut)
.await
.into_iter()
.collect::<Result<Vec<_>, _>>();
let services = match res {
Ok(res) => res
.into_iter()
.fold(Vec::new(), |mut services, (factory, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
.into_boxed_slice(),
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
return;
}
}; };
factory_tx.send(()).unwrap(); arbiter.spawn(async move {
// spawn_local to run !Send future tasks.
spawn(async move {
let mut services = Vec::new();
// a third spawn to make sure ServerWorker runs as non boxed future. for (idx, factory) in factories.iter().enumerate() {
match factory.create().await {
Ok((token, svc)) => services.push((idx, token, svc)),
Err(err) => {
error!("Can not start worker: {:?}", err);
Arbiter::current().stop();
factory_tx
.send(Err(io::Error::new(
io::ErrorKind::Other,
format!("can not start server service {}", idx),
)))
.unwrap();
return;
}
}
}
factory_tx.send(Ok(())).unwrap();
let worker_services = wrap_worker_services(services);
// spawn to make sure ServerWorker runs as non boxed future.
spawn(ServerWorker { spawn(ServerWorker {
rx, conn_rx,
rx2, stop_rx,
services, services: worker_services.into_boxed_slice(),
counter: WorkerCounter::new(idx, waker_queue, counter_clone), counter: WorkerCounter::new(idx, waker_queue, counter),
factories: factories.into_boxed_slice(), factories: factories.into_boxed_slice(),
state: Default::default(), state: Default::default(),
shutdown_timeout: config.shutdown_timeout, shutdown_timeout: config.shutdown_timeout,
}) });
.await });
.expect("task 3 panic"); });
}) }
.await };
.expect("task 2 panic");
}))
})
.expect("worker thread error/panic");
// wait for service factories initialization // wait for service factories initialization
factory_rx.recv().unwrap(); factory_rx.recv().unwrap()?;
Ok(handle_pair(idx, tx1, tx2, counter)) Ok(pair)
} }
fn restart_service(&mut self, idx: usize, factory_id: usize) { fn restart_service(&mut self, idx: usize, factory_id: usize) {
@ -413,7 +509,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Unavailable { if srv.status == WorkerServiceStatus::Unavailable {
trace!( trace!(
"Service {:?} is available", "Service {:?} is available",
self.factories[srv.factory].name(idx) self.factories[srv.factory_idx].name(idx)
); );
srv.status = WorkerServiceStatus::Available; srv.status = WorkerServiceStatus::Available;
} }
@ -424,7 +520,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Available { if srv.status == WorkerServiceStatus::Available {
trace!( trace!(
"Service {:?} is unavailable", "Service {:?} is unavailable",
self.factories[srv.factory].name(idx) self.factories[srv.factory_idx].name(idx)
); );
srv.status = WorkerServiceStatus::Unavailable; srv.status = WorkerServiceStatus::Unavailable;
} }
@ -432,10 +528,10 @@ impl ServerWorker {
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
error!( error!(
"Service {:?} readiness check returned error, restarting", "Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(idx) self.factories[srv.factory_idx].name(idx)
); );
srv.status = WorkerServiceStatus::Failed; srv.status = WorkerServiceStatus::Failed;
return Err((idx, srv.factory)); return Err((idx, srv.factory_idx));
} }
} }
} }
@ -478,7 +574,6 @@ impl Default for WorkerState {
impl Drop for ServerWorker { impl Drop for ServerWorker {
fn drop(&mut self) { fn drop(&mut self) {
trace!("stopping ServerWorker Arbiter");
Arbiter::try_current().as_ref().map(ArbiterHandle::stop); Arbiter::try_current().as_ref().map(ArbiterHandle::stop);
} }
} }
@ -490,7 +585,8 @@ impl Future for ServerWorker {
let this = self.as_mut().get_mut(); let this = self.as_mut().get_mut();
// `StopWorker` message handler // `StopWorker` message handler
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx) if let Poll::Ready(Some(Stop { graceful, tx })) =
Pin::new(&mut this.stop_rx).poll_recv(cx)
{ {
let num = this.counter.total(); let num = this.counter.total();
if num == 0 { if num == 0 {
@ -552,6 +648,14 @@ impl Future for ServerWorker {
self.poll(cx) self.poll(cx)
} }
WorkerState::Shutdown(ref mut shutdown) => { WorkerState::Shutdown(ref mut shutdown) => {
// drop all pending connections in rx channel.
while let Poll::Ready(Some(conn)) = Pin::new(&mut this.conn_rx).poll_recv(cx) {
// WorkerCounterGuard is needed as Accept thread has incremented counter.
// It's guard's job to decrement the counter together with drop of Conn.
let guard = this.counter.guard();
drop((conn, guard));
}
// wait for 1 second // wait for 1 second
ready!(shutdown.timer.as_mut().poll(cx)); ready!(shutdown.timer.as_mut().poll(cx));
@ -592,7 +696,7 @@ impl Future for ServerWorker {
} }
// handle incoming io stream // handle incoming io stream
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) { match ready!(Pin::new(&mut this.conn_rx).poll_recv(cx)) {
Some(msg) => { Some(msg) => {
let guard = this.counter.guard(); let guard = this.counter.guard();
let _ = this.services[msg.token].service.call((guard, msg.io)); let _ = this.services[msg.token].service.call((guard, msg.io));
@ -603,3 +707,19 @@ impl Future for ServerWorker {
} }
} }
} }
fn wrap_worker_services(
services: Vec<(usize, usize, BoxedServerService)>,
) -> Vec<WorkerService> {
services
.into_iter()
.fold(Vec::new(), |mut services, (idx, token, service)| {
assert_eq!(token, services.len());
services.push(WorkerService {
factory_idx: idx,
service,
status: WorkerServiceStatus::Unavailable,
});
services
})
}

View File

@ -5,14 +5,17 @@ use std::{net, thread, time::Duration};
use actix_rt::{net::TcpStream, time::sleep}; use actix_rt::{net::TcpStream, time::sleep};
use actix_server::Server; use actix_server::Server;
use actix_service::fn_service; use actix_service::fn_service;
use socket2::{Domain, Protocol, Socket, Type};
fn unused_addr() -> net::SocketAddr { fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = mio::net::TcpSocket::new_v4().unwrap(); let socket =
socket.bind(addr).unwrap(); Socket::new(Domain::for_address(addr), Type::STREAM, Some(Protocol::TCP)).unwrap();
socket.set_reuseaddr(true).unwrap(); socket.set_reuse_address(true).unwrap();
let tcp = socket.listen(32).unwrap(); socket.set_nonblocking(true).unwrap();
tcp.local_addr().unwrap() socket.bind(&addr.into()).unwrap();
socket.listen(32).unwrap();
net::TcpListener::from(socket).local_addr().unwrap()
} }
#[test] #[test]
@ -30,28 +33,63 @@ fn test_bind() {
})? })?
.run(); .run();
let _ = tx.send((srv.handle(), actix_rt::System::current())); let _ = tx.send(srv.handle());
srv.await srv.await
}) })
}); });
let (srv, sys) = rx.recv().unwrap(); let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true);
h.join().unwrap().unwrap();
}
#[test]
fn plain_tokio_runtime() {
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let srv = Server::build()
.workers(1)
.disable_signals()
.bind("test", addr, move || {
fn_service(|_| async { Ok::<_, ()>(()) })
})?
.run();
tx.send(srv.handle()).unwrap();
srv.await
})
});
let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true); let _ = srv.stop(true);
sys.stop();
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }
#[test] #[test]
fn test_listen() { fn test_listen() {
let addr = unused_addr(); let addr = unused_addr();
let lst = net::TcpListener::bind(addr).unwrap();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let lst = net::TcpListener::bind(addr)?;
actix_rt::System::new().block_on(async { actix_rt::System::new().block_on(async {
let srv = Server::build() let srv = Server::build()
.disable_signals() .disable_signals()
@ -61,19 +99,18 @@ fn test_listen() {
})? })?
.run(); .run();
let _ = tx.send((srv.handle(), actix_rt::System::current())); let _ = tx.send(srv.handle());
srv.await srv.await
}) })
}); });
let (srv, sys) = rx.recv().unwrap(); let srv = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
let _ = srv.stop(true); let _ = srv.stop(true);
sys.stop();
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }
@ -280,12 +317,12 @@ async fn test_service_restart() {
.workers(1) .workers(1)
.run(); .run();
let _ = tx.send((srv.handle(), actix_rt::System::current())); let _ = tx.send(srv.handle());
srv.await srv.await
}) })
}); });
let (srv, sys) = rx.recv().unwrap(); let srv = rx.recv().unwrap();
for _ in 0..5 { for _ in 0..5 {
TcpStream::connect(addr1) TcpStream::connect(addr1)
@ -308,7 +345,6 @@ async fn test_service_restart() {
assert!(num2_clone.load(Ordering::SeqCst) > 5); assert!(num2_clone.load(Ordering::SeqCst) > 5);
let _ = srv.stop(false); let _ = srv.stop(false);
sys.stop();
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }
@ -385,13 +421,13 @@ async fn worker_restart() {
.workers(2) .workers(2)
.run(); .run();
let _ = tx.send((srv.handle(), actix_rt::System::current())); let _ = tx.send(srv.handle());
srv.await srv.await
}) })
}); });
let (srv, sys) = rx.recv().unwrap(); let srv = rx.recv().unwrap();
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(3)).await;
@ -449,6 +485,5 @@ async fn worker_restart() {
stream.shutdown().await.unwrap(); stream.shutdown().await.unwrap();
let _ = srv.stop(false); let _ = srv.stop(false);
sys.stop();
h.join().unwrap().unwrap(); h.join().unwrap().unwrap();
} }

View File

@ -1,7 +1,7 @@
/// An implementation of [`poll_ready`]() that always signals readiness. /// An implementation of [`poll_ready`]() that always signals readiness.
/// ///
/// This should only be used for basic leaf services that have no concept of un-readiness. /// This should only be used for basic leaf services that have no concept of un-readiness.
/// For wrapper or other serivice types, use [`forward_ready!`] for simple cases or write a bespoke /// For wrapper or other service types, use [`forward_ready!`] for simple cases or write a bespoke
/// `poll_ready` implementation. /// `poll_ready` implementation.
/// ///
/// [`poll_ready`]: crate::Service::poll_ready /// [`poll_ready`]: crate::Service::poll_ready

View File

@ -3,6 +3,12 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 3.0.0-beta.8 - 2021-11-15
* Add `Connect::request` for getting a reference to the connection request. [#415]
[#415]: https://github.com/actix/actix-net/pull/415
## 3.0.0-beta.7 - 2021-10-20 ## 3.0.0-beta.7 - 2021-10-20
* Add `webpki_roots_cert_store()` to get rustls compatible webpki roots cert store. [#401] * Add `webpki_roots_cert_store()` to get rustls compatible webpki roots cert store. [#401]
* Alias `connect::ssl` to `connect::tls`. [#401] * Alias `connect::ssl` to `connect::tls`. [#401]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-tls" name = "actix-tls"
version = "3.0.0-beta.7" version = "3.0.0-beta.8"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "TLS acceptor and connector services for Actix ecosystem" description = "TLS acceptor and connector services for Actix ecosystem"
keywords = ["network", "tls", "ssl", "async", "transport"] keywords = ["network", "tls", "ssl", "async", "transport"]
@ -62,7 +62,7 @@ tokio-native-tls = { version = "0.3", optional = true }
[dev-dependencies] [dev-dependencies]
actix-rt = "2.2.0" actix-rt = "2.2.0"
actix-server = "2.0.0-beta.7" actix-server = "2.0.0-beta.9"
bytes = "1" bytes = "1"
env_logger = "0.9" env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }

View File

@ -63,16 +63,16 @@ impl From<Option<SocketAddr>> for ConnectAddrs {
/// Connection info. /// Connection info.
#[derive(Debug, PartialEq, Eq, Hash)] #[derive(Debug, PartialEq, Eq, Hash)]
pub struct Connect<T> { pub struct Connect<R> {
pub(crate) req: T, pub(crate) req: R,
pub(crate) port: u16, pub(crate) port: u16,
pub(crate) addr: ConnectAddrs, pub(crate) addr: ConnectAddrs,
pub(crate) local_addr: Option<IpAddr>, pub(crate) local_addr: Option<IpAddr>,
} }
impl<T: Address> Connect<T> { impl<R: Address> Connect<R> {
/// Create `Connect` instance by splitting the string by ':' and convert the second part to u16 /// Create `Connect` instance by splitting the string by ':' and convert the second part to u16
pub fn new(req: T) -> Connect<T> { pub fn new(req: R) -> Connect<R> {
let (_, port) = parse_host(req.hostname()); let (_, port) = parse_host(req.hostname());
Connect { Connect {
@ -85,7 +85,7 @@ impl<T: Address> Connect<T> {
/// Create new `Connect` instance from host and address. Connector skips name resolution stage /// Create new `Connect` instance from host and address. Connector skips name resolution stage
/// for such connect messages. /// for such connect messages.
pub fn with_addr(req: T, addr: SocketAddr) -> Connect<T> { pub fn with_addr(req: R, addr: SocketAddr) -> Connect<R> {
Connect { Connect {
req, req,
port: 0, port: 0,
@ -155,15 +155,20 @@ impl<T: Address> Connect<T> {
ConnectAddrs::Multi(addrs) => ConnectAddrsIter::MultiOwned(addrs.into_iter()), ConnectAddrs::Multi(addrs) => ConnectAddrsIter::MultiOwned(addrs.into_iter()),
} }
} }
/// Returns a reference to the connection request.
pub fn request(&self) -> &R {
&self.req
}
} }
impl<T: Address> From<T> for Connect<T> { impl<R: Address> From<R> for Connect<R> {
fn from(addr: T) -> Self { fn from(addr: R) -> Self {
Connect::new(addr) Connect::new(addr)
} }
} }
impl<T: Address> fmt::Display for Connect<T> { impl<R: Address> fmt::Display for Connect<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:{}", self.hostname(), self.port()) write!(f, "{}:{}", self.hostname(), self.port())
} }
@ -347,4 +352,10 @@ mod tests {
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)) IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))
) )
} }
#[test]
fn request_ref() {
let conn = Connect::new("hello");
assert_eq!(conn.request(), &"hello")
}
} }

View File

@ -23,3 +23,4 @@ local-waker = "0.1"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0" actix-rt = "2.0.0"
futures-util = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false }
static_assertions = "1.1"

View File

@ -103,10 +103,16 @@ pub fn err<T, E>(err: E) -> Ready<Result<T, E>> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::rc::Rc;
use futures_util::task::noop_waker; use futures_util::task::noop_waker;
use static_assertions::{assert_impl_all, assert_not_impl_all};
use super::*; use super::*;
assert_impl_all!(Ready<()>: Send, Sync, Clone);
assert_not_impl_all!(Ready<Rc<()>>: Send, Sync);
#[test] #[test]
#[should_panic] #[should_panic]
fn multiple_poll_panics() { fn multiple_poll_panics() {