mirror of https://github.com/fafhrd91/actix-net
Merge branch 'master' into master
This commit is contained in:
commit
c353af7965
|
@ -3,6 +3,10 @@
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
|
||||||
|
|
||||||
|
## 0.4.0 - 2021-04-20
|
||||||
|
* No significant changes since v0.4.0-beta.1.
|
||||||
|
|
||||||
|
|
||||||
## 0.4.0-beta.1 - 2020-12-28
|
## 0.4.0-beta.1 - 2020-12-28
|
||||||
* Replace `pin-project` with `pin-project-lite`. [#237]
|
* Replace `pin-project` with `pin-project-lite`. [#237]
|
||||||
* Upgrade `tokio` dependency to `1`. [#237]
|
* Upgrade `tokio` dependency to `1`. [#237]
|
||||||
|
@ -23,28 +27,28 @@
|
||||||
## 0.3.0-beta.1 - 2020-08-19
|
## 0.3.0-beta.1 - 2020-08-19
|
||||||
* Use `.advance()` instead of `.split_to()`.
|
* Use `.advance()` instead of `.split_to()`.
|
||||||
* Upgrade `tokio-util` to `0.3`.
|
* Upgrade `tokio-util` to `0.3`.
|
||||||
* Improve `BytesCodec` `.encode()` performance
|
* Improve `BytesCodec::encode()` performance.
|
||||||
* Simplify `BytesCodec` `.decode()`
|
* Simplify `BytesCodec::decode()`.
|
||||||
* Rename methods on `Framed` to better describe their use.
|
* Rename methods on `Framed` to better describe their use.
|
||||||
* Add method on `Framed` to get a pinned reference to the underlying I/O.
|
* Add method on `Framed` to get a pinned reference to the underlying I/O.
|
||||||
* Add method on `Framed` check emptiness of read buffer.
|
* Add method on `Framed` check emptiness of read buffer.
|
||||||
|
|
||||||
|
|
||||||
## 0.2.0 - 2019-12-10
|
## 0.2.0 - 2019-12-10
|
||||||
* Use specific futures dependencies
|
* Use specific futures dependencies.
|
||||||
|
|
||||||
|
|
||||||
## 0.2.0-alpha.4
|
## 0.2.0-alpha.4
|
||||||
* Fix buffer remaining capacity calculation
|
* Fix buffer remaining capacity calculation.
|
||||||
|
|
||||||
|
|
||||||
## 0.2.0-alpha.3
|
## 0.2.0-alpha.3
|
||||||
* Use tokio 0.2
|
* Use tokio 0.2.
|
||||||
* Fix low/high watermark for write/read buffers
|
* Fix low/high watermark for write/read buffers.
|
||||||
|
|
||||||
|
|
||||||
## 0.2.0-alpha.2
|
## 0.2.0-alpha.2
|
||||||
* Migrated to `std::future`
|
* Migrated to `std::future`.
|
||||||
|
|
||||||
|
|
||||||
## 0.1.2 - 2019-03-27
|
## 0.1.2 - 2019-03-27
|
||||||
|
@ -56,4 +60,4 @@
|
||||||
|
|
||||||
|
|
||||||
## 0.1.0 - 2018-12-09
|
## 0.1.0 - 2018-12-09
|
||||||
* Move codec to separate crate
|
* Move codec to separate crate.
|
||||||
|
|
|
@ -1,12 +1,10 @@
|
||||||
[package]
|
[package]
|
||||||
name = "actix-codec"
|
name = "actix-codec"
|
||||||
version = "0.4.0-beta.1"
|
version = "0.4.0"
|
||||||
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
|
||||||
description = "Codec utilities for working with framed protocols"
|
description = "Codec utilities for working with framed protocols"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
homepage = "https://actix.rs"
|
repository = "https://github.com/actix/actix-net"
|
||||||
repository = "https://github.com/actix/actix-net.git"
|
|
||||||
documentation = "https://docs.rs/actix-codec"
|
|
||||||
categories = ["network-programming", "asynchronous"]
|
categories = ["network-programming", "asynchronous"]
|
||||||
license = "MIT OR Apache-2.0"
|
license = "MIT OR Apache-2.0"
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
|
@ -1,6 +1,13 @@
|
||||||
# Changes
|
# Changes
|
||||||
|
|
||||||
## Unreleased - 2021-xx-xx
|
## Unreleased - 2021-xx-xx
|
||||||
|
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349]
|
||||||
|
* Remove `ServerBuilder::configure` [#349]
|
||||||
|
|
||||||
|
[#349]: https://github.com/actix/actix-net/pull/349
|
||||||
|
|
||||||
|
|
||||||
|
## 2.0.0-beta.5 - 2021-04-20
|
||||||
* Server shutdown would notify all workers to exit regardless if shutdown is graceful.
|
* Server shutdown would notify all workers to exit regardless if shutdown is graceful.
|
||||||
This would make all worker shutdown immediately in force shutdown case. [#333]
|
This would make all worker shutdown immediately in force shutdown case. [#333]
|
||||||
|
|
||||||
|
|
|
@ -1,14 +1,13 @@
|
||||||
[package]
|
[package]
|
||||||
name = "actix-server"
|
name = "actix-server"
|
||||||
version = "2.0.0-beta.4"
|
version = "2.0.0-beta.5"
|
||||||
authors = [
|
authors = [
|
||||||
"Nikolay Kim <fafhrd91@gmail.com>",
|
"Nikolay Kim <fafhrd91@gmail.com>",
|
||||||
"fakeshadow <24548779@qq.com>",
|
"fakeshadow <24548779@qq.com>",
|
||||||
]
|
]
|
||||||
description = "General purpose TCP server built for the Actix ecosystem"
|
description = "General purpose TCP server built for the Actix ecosystem"
|
||||||
keywords = ["network", "framework", "async", "futures"]
|
keywords = ["network", "framework", "async", "futures"]
|
||||||
homepage = "https://actix.rs"
|
repository = "https://github.com/actix/actix-net"
|
||||||
repository = "https://github.com/actix/actix-net.git"
|
|
||||||
categories = ["network-programming", "asynchronous"]
|
categories = ["network-programming", "asynchronous"]
|
||||||
license = "MIT OR Apache-2.0"
|
license = "MIT OR Apache-2.0"
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
@ -29,7 +28,6 @@ futures-core = { version = "0.3.7", default-features = false, features = ["alloc
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
mio = { version = "0.7.6", features = ["os-poll", "net"] }
|
mio = { version = "0.7.6", features = ["os-poll", "net"] }
|
||||||
num_cpus = "1.13"
|
num_cpus = "1.13"
|
||||||
slab = "0.4"
|
|
||||||
tokio = { version = "1.2", features = ["sync"] }
|
tokio = { version = "1.2", features = ["sync"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|
|
@ -7,21 +7,14 @@ use actix_rt::{
|
||||||
};
|
};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use mio::{Interest, Poll, Token as MioToken};
|
use mio::{Interest, Poll, Token as MioToken};
|
||||||
use slab::Slab;
|
|
||||||
|
|
||||||
use crate::server::Server;
|
use crate::server::Server;
|
||||||
use crate::socket::{MioListener, SocketAddr};
|
use crate::socket::MioListener;
|
||||||
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
|
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
|
||||||
use crate::worker::{Conn, WorkerHandleAccept};
|
use crate::worker::{Conn, WorkerHandleAccept};
|
||||||
use crate::Token;
|
|
||||||
|
|
||||||
struct ServerSocketInfo {
|
struct ServerSocketInfo {
|
||||||
/// Address of socket. Mainly used for logging.
|
token: usize,
|
||||||
addr: SocketAddr,
|
|
||||||
|
|
||||||
/// Beware this is the crate token for identify socket and should not be confused
|
|
||||||
/// with `mio::Token`.
|
|
||||||
token: Token,
|
|
||||||
|
|
||||||
lst: MioListener,
|
lst: MioListener,
|
||||||
|
|
||||||
|
@ -65,7 +58,7 @@ impl AcceptLoop {
|
||||||
|
|
||||||
pub(crate) fn start(
|
pub(crate) fn start(
|
||||||
&mut self,
|
&mut self,
|
||||||
socks: Vec<(Token, MioListener)>,
|
socks: Vec<(usize, MioListener)>,
|
||||||
handles: Vec<WorkerHandleAccept>,
|
handles: Vec<WorkerHandleAccept>,
|
||||||
) {
|
) {
|
||||||
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
|
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
|
||||||
|
@ -84,7 +77,7 @@ struct Accept {
|
||||||
srv: Server,
|
srv: Server,
|
||||||
next: usize,
|
next: usize,
|
||||||
avail: Availability,
|
avail: Availability,
|
||||||
backpressure: bool,
|
paused: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Array of u128 with every bit as marker for a worker handle's availability.
|
/// Array of u128 with every bit as marker for a worker handle's availability.
|
||||||
|
@ -98,23 +91,22 @@ impl Default for Availability {
|
||||||
|
|
||||||
impl Availability {
|
impl Availability {
|
||||||
/// Check if any worker handle is available
|
/// Check if any worker handle is available
|
||||||
|
#[inline(always)]
|
||||||
fn available(&self) -> bool {
|
fn available(&self) -> bool {
|
||||||
self.0.iter().any(|a| *a != 0)
|
self.0.iter().any(|a| *a != 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if worker handle is available by index
|
||||||
|
#[inline(always)]
|
||||||
|
fn get_available(&self, idx: usize) -> bool {
|
||||||
|
let (offset, idx) = Self::offset(idx);
|
||||||
|
|
||||||
|
self.0[offset] & (1 << idx as u128) != 0
|
||||||
|
}
|
||||||
|
|
||||||
/// Set worker handle available state by index.
|
/// Set worker handle available state by index.
|
||||||
fn set_available(&mut self, idx: usize, avail: bool) {
|
fn set_available(&mut self, idx: usize, avail: bool) {
|
||||||
let (offset, idx) = if idx < 128 {
|
let (offset, idx) = Self::offset(idx);
|
||||||
(0, idx)
|
|
||||||
} else if idx < 128 * 2 {
|
|
||||||
(1, idx - 128)
|
|
||||||
} else if idx < 128 * 3 {
|
|
||||||
(2, idx - 128 * 2)
|
|
||||||
} else if idx < 128 * 4 {
|
|
||||||
(3, idx - 128 * 3)
|
|
||||||
} else {
|
|
||||||
panic!("Max WorkerHandle count is 512")
|
|
||||||
};
|
|
||||||
|
|
||||||
let off = 1 << idx as u128;
|
let off = 1 << idx as u128;
|
||||||
if avail {
|
if avail {
|
||||||
|
@ -131,6 +123,21 @@ impl Availability {
|
||||||
self.set_available(handle.idx(), true);
|
self.set_available(handle.idx(), true);
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get offset and adjusted index of given worker handle index.
|
||||||
|
fn offset(idx: usize) -> (usize, usize) {
|
||||||
|
if idx < 128 {
|
||||||
|
(0, idx)
|
||||||
|
} else if idx < 128 * 2 {
|
||||||
|
(1, idx - 128)
|
||||||
|
} else if idx < 128 * 3 {
|
||||||
|
(2, idx - 128 * 2)
|
||||||
|
} else if idx < 128 * 4 {
|
||||||
|
(3, idx - 128 * 3)
|
||||||
|
} else {
|
||||||
|
panic!("Max WorkerHandle count is 512")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This function defines errors that are per-connection. Which basically
|
/// This function defines errors that are per-connection. Which basically
|
||||||
|
@ -150,7 +157,7 @@ impl Accept {
|
||||||
pub(crate) fn start(
|
pub(crate) fn start(
|
||||||
poll: Poll,
|
poll: Poll,
|
||||||
waker: WakerQueue,
|
waker: WakerQueue,
|
||||||
socks: Vec<(Token, MioListener)>,
|
socks: Vec<(usize, MioListener)>,
|
||||||
srv: Server,
|
srv: Server,
|
||||||
handles: Vec<WorkerHandleAccept>,
|
handles: Vec<WorkerHandleAccept>,
|
||||||
) {
|
) {
|
||||||
|
@ -161,10 +168,10 @@ impl Accept {
|
||||||
.name("actix-server accept loop".to_owned())
|
.name("actix-server accept loop".to_owned())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
System::set_current(sys);
|
System::set_current(sys);
|
||||||
let (mut accept, sockets) =
|
let (mut accept, mut sockets) =
|
||||||
Accept::new_with_sockets(poll, waker, socks, handles, srv);
|
Accept::new_with_sockets(poll, waker, socks, handles, srv);
|
||||||
|
|
||||||
accept.poll_with(sockets);
|
accept.poll_with(&mut sockets);
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
@ -172,29 +179,25 @@ impl Accept {
|
||||||
fn new_with_sockets(
|
fn new_with_sockets(
|
||||||
poll: Poll,
|
poll: Poll,
|
||||||
waker: WakerQueue,
|
waker: WakerQueue,
|
||||||
socks: Vec<(Token, MioListener)>,
|
socks: Vec<(usize, MioListener)>,
|
||||||
handles: Vec<WorkerHandleAccept>,
|
handles: Vec<WorkerHandleAccept>,
|
||||||
srv: Server,
|
srv: Server,
|
||||||
) -> (Accept, Slab<ServerSocketInfo>) {
|
) -> (Accept, Vec<ServerSocketInfo>) {
|
||||||
let mut sockets = Slab::new();
|
let sockets = socks
|
||||||
for (hnd_token, mut lst) in socks.into_iter() {
|
.into_iter()
|
||||||
let addr = lst.local_addr();
|
.map(|(token, mut lst)| {
|
||||||
|
|
||||||
let entry = sockets.vacant_entry();
|
|
||||||
let token = entry.key();
|
|
||||||
|
|
||||||
// Start listening for incoming connections
|
// Start listening for incoming connections
|
||||||
poll.registry()
|
poll.registry()
|
||||||
.register(&mut lst, MioToken(token), Interest::READABLE)
|
.register(&mut lst, MioToken(token), Interest::READABLE)
|
||||||
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
|
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
|
||||||
|
|
||||||
entry.insert(ServerSocketInfo {
|
ServerSocketInfo {
|
||||||
addr,
|
token,
|
||||||
token: hnd_token,
|
|
||||||
lst,
|
lst,
|
||||||
timeout: None,
|
timeout: None,
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
let mut avail = Availability::default();
|
let mut avail = Availability::default();
|
||||||
|
|
||||||
|
@ -208,19 +211,19 @@ impl Accept {
|
||||||
srv,
|
srv,
|
||||||
next: 0,
|
next: 0,
|
||||||
avail,
|
avail,
|
||||||
backpressure: false,
|
paused: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
(accept, sockets)
|
(accept, sockets)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_with(&mut self, mut sockets: Slab<ServerSocketInfo>) {
|
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
|
||||||
let mut events = mio::Events::with_capacity(128);
|
let mut events = mio::Events::with_capacity(128);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Err(e) = self.poll.poll(&mut events, None) {
|
if let Err(e) = self.poll.poll(&mut events, None) {
|
||||||
match e.kind() {
|
match e.kind() {
|
||||||
std::io::ErrorKind::Interrupted => continue,
|
io::ErrorKind::Interrupted => {}
|
||||||
_ => panic!("Poll error: {}", e),
|
_ => panic!("Poll error: {}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -228,130 +231,160 @@ impl Accept {
|
||||||
for event in events.iter() {
|
for event in events.iter() {
|
||||||
let token = event.token();
|
let token = event.token();
|
||||||
match token {
|
match token {
|
||||||
|
WAKER_TOKEN => {
|
||||||
|
let exit = self.handle_waker(sockets);
|
||||||
|
if exit {
|
||||||
|
info!("Accept is stopped.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
let token = usize::from(token);
|
||||||
|
self.accept(sockets, token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool {
|
||||||
// This is a loop because interests for command from previous version was
|
// This is a loop because interests for command from previous version was
|
||||||
// a loop that would try to drain the command channel. It's yet unknown
|
// a loop that would try to drain the command channel. It's yet unknown
|
||||||
// if it's necessary/good practice to actively drain the waker queue.
|
// if it's necessary/good practice to actively drain the waker queue.
|
||||||
WAKER_TOKEN => 'waker: loop {
|
loop {
|
||||||
// take guard with every iteration so no new interest can be added
|
// take guard with every iteration so no new interest can be added
|
||||||
// until the current task is done.
|
// until the current task is done.
|
||||||
let mut guard = self.waker.guard();
|
let mut guard = self.waker.guard();
|
||||||
match guard.pop_front() {
|
match guard.pop_front() {
|
||||||
// worker notify it becomes available. we may want to recover
|
// worker notify it becomes available.
|
||||||
// from backpressure.
|
|
||||||
Some(WakerInterest::WorkerAvailable(idx)) => {
|
Some(WakerInterest::WorkerAvailable(idx)) => {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
self.maybe_backpressure(&mut sockets, false);
|
|
||||||
self.avail.set_available(idx, true);
|
self.avail.set_available(idx, true);
|
||||||
|
|
||||||
|
if !self.paused {
|
||||||
|
self.accept_all(sockets);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// a new worker thread is made and it's handle would be added to Accept
|
// a new worker thread is made and it's handle would be added to Accept
|
||||||
Some(WakerInterest::Worker(handle)) => {
|
Some(WakerInterest::Worker(handle)) => {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
// maybe we want to recover from a backpressure.
|
|
||||||
self.maybe_backpressure(&mut sockets, false);
|
|
||||||
self.avail.set_available(handle.idx(), true);
|
self.avail.set_available(handle.idx(), true);
|
||||||
self.handles.push(handle);
|
self.handles.push(handle);
|
||||||
|
|
||||||
|
if !self.paused {
|
||||||
|
self.accept_all(sockets);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// got timer interest and it's time to try register socket(s) again
|
// got timer interest and it's time to try register socket(s) again
|
||||||
Some(WakerInterest::Timer) => {
|
Some(WakerInterest::Timer) => {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
self.process_timer(&mut sockets)
|
|
||||||
|
self.process_timer(sockets)
|
||||||
}
|
}
|
||||||
Some(WakerInterest::Pause) => {
|
Some(WakerInterest::Pause) => {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
self.deregister_all(&mut sockets);
|
|
||||||
|
if !self.paused {
|
||||||
|
self.paused = true;
|
||||||
|
|
||||||
|
self.deregister_all(sockets);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Some(WakerInterest::Resume) => {
|
Some(WakerInterest::Resume) => {
|
||||||
drop(guard);
|
drop(guard);
|
||||||
sockets.iter_mut().for_each(|(token, info)| {
|
|
||||||
self.register_logged(token, info);
|
if self.paused {
|
||||||
|
self.paused = false;
|
||||||
|
|
||||||
|
sockets.iter_mut().for_each(|info| {
|
||||||
|
self.register_logged(info);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
self.accept_all(sockets);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Some(WakerInterest::Stop) => {
|
Some(WakerInterest::Stop) => {
|
||||||
return self.deregister_all(&mut sockets);
|
if !self.paused {
|
||||||
|
self.deregister_all(sockets);
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
// waker queue is drained
|
// waker queue is drained
|
||||||
None => {
|
None => {
|
||||||
// Reset the WakerQueue before break so it does not grow infinitely
|
// Reset the WakerQueue before break so it does not grow infinitely
|
||||||
WakerQueue::reset(&mut guard);
|
WakerQueue::reset(&mut guard);
|
||||||
break 'waker;
|
|
||||||
}
|
return false;
|
||||||
}
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
let token = usize::from(token);
|
|
||||||
self.accept(&mut sockets, token);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_timer(&self, sockets: &mut Slab<ServerSocketInfo>) {
|
fn process_timer(&self, sockets: &mut [ServerSocketInfo]) {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
sockets
|
sockets
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
// Only sockets that had an associated timeout were deregistered.
|
// Only sockets that had an associated timeout were deregistered.
|
||||||
.filter(|(_, info)| info.timeout.is_some())
|
.filter(|info| info.timeout.is_some())
|
||||||
.for_each(|(token, info)| {
|
.for_each(|info| {
|
||||||
let inst = info.timeout.take().unwrap();
|
let inst = info.timeout.take().unwrap();
|
||||||
|
|
||||||
if now < inst {
|
if now < inst {
|
||||||
info.timeout = Some(inst);
|
info.timeout = Some(inst);
|
||||||
} else if !self.backpressure {
|
} else if !self.paused {
|
||||||
self.register_logged(token, info);
|
self.register_logged(info);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drop the timeout if server is in backpressure and socket timeout is expired.
|
// Drop the timeout if server is paused and socket timeout is expired.
|
||||||
// When server recovers from backpressure it will register all sockets without
|
// When server recovers from pause it will register all sockets without
|
||||||
// a timeout value so this socket register will be delayed till then.
|
// a timeout value so this socket register will be delayed till then.
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(target_os = "windows"))]
|
#[cfg(not(target_os = "windows"))]
|
||||||
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
|
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
|
||||||
|
let token = MioToken(info.token);
|
||||||
self.poll
|
self.poll
|
||||||
.registry()
|
.registry()
|
||||||
.register(&mut info.lst, MioToken(token), Interest::READABLE)
|
.register(&mut info.lst, token, Interest::READABLE)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(target_os = "windows")]
|
#[cfg(target_os = "windows")]
|
||||||
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> {
|
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
|
||||||
// On windows, calling register without deregister cause an error.
|
// On windows, calling register without deregister cause an error.
|
||||||
// See https://github.com/actix/actix-web/issues/905
|
// See https://github.com/actix/actix-web/issues/905
|
||||||
// Calling reregister seems to fix the issue.
|
// Calling reregister seems to fix the issue.
|
||||||
|
let token = MioToken(info.token);
|
||||||
self.poll
|
self.poll
|
||||||
.registry()
|
.registry()
|
||||||
.register(&mut info.lst, mio::Token(token), Interest::READABLE)
|
.register(&mut info.lst, token, Interest::READABLE)
|
||||||
.or_else(|_| {
|
.or_else(|_| {
|
||||||
self.poll.registry().reregister(
|
self.poll
|
||||||
&mut info.lst,
|
.registry()
|
||||||
mio::Token(token),
|
.reregister(&mut info.lst, token, Interest::READABLE)
|
||||||
Interest::READABLE,
|
|
||||||
)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) {
|
fn register_logged(&self, info: &mut ServerSocketInfo) {
|
||||||
match self.register(token, info) {
|
match self.register(info) {
|
||||||
Ok(_) => info!("Resume accepting connections on {}", info.addr),
|
Ok(_) => info!("Resume accepting connections on {}", info.lst.local_addr()),
|
||||||
Err(e) => error!("Can not register server socket {}", e),
|
Err(e) => error!("Can not register server socket {}", e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
|
|
||||||
self.poll.registry().deregister(&mut info.lst)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
|
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
|
||||||
match self.deregister(info) {
|
match self.poll.registry().deregister(&mut info.lst) {
|
||||||
Ok(_) => info!("Paused accepting connections on {}", info.addr),
|
Ok(_) => info!("Paused accepting connections on {}", info.lst.local_addr()),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Can not deregister server socket {}", e)
|
error!("Can not deregister server socket {}", e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn deregister_all(&self, sockets: &mut Slab<ServerSocketInfo>) {
|
fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
|
||||||
// This is a best effort implementation with following limitation:
|
// This is a best effort implementation with following limitation:
|
||||||
//
|
//
|
||||||
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout
|
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout
|
||||||
|
@ -364,70 +397,23 @@ impl Accept {
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
// Take all timeout.
|
// Take all timeout.
|
||||||
// This is to prevent Accept::process_timer method re-register a socket afterwards.
|
// This is to prevent Accept::process_timer method re-register a socket afterwards.
|
||||||
.map(|(_, info)| (info.timeout.take(), info))
|
.map(|info| (info.timeout.take(), info))
|
||||||
// Socket info with a timeout is already deregistered so skip them.
|
// Socket info with a timeout is already deregistered so skip them.
|
||||||
.filter(|(timeout, _)| timeout.is_none())
|
.filter(|(timeout, _)| timeout.is_none())
|
||||||
.for_each(|(_, info)| self.deregister_logged(info));
|
.for_each(|(_, info)| self.deregister_logged(info));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) {
|
|
||||||
// Only operate when server is in a different backpressure than the given flag.
|
|
||||||
if self.backpressure != on {
|
|
||||||
self.backpressure = on;
|
|
||||||
sockets
|
|
||||||
.iter_mut()
|
|
||||||
// Only operate on sockets without associated timeout.
|
|
||||||
// Sockets with it should be handled by `accept` and `process_timer` methods.
|
|
||||||
// They are already deregistered or need to be reregister in the future.
|
|
||||||
.filter(|(_, info)| info.timeout.is_none())
|
|
||||||
.for_each(|(token, info)| {
|
|
||||||
if on {
|
|
||||||
self.deregister_logged(info);
|
|
||||||
} else {
|
|
||||||
self.register_logged(token, info);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut conn: Conn) {
|
|
||||||
if self.backpressure {
|
|
||||||
// send_connection would remove fault worker from handles.
|
|
||||||
// worst case here is conn get dropped after all handles are gone.
|
|
||||||
while let Err(c) = self.send_connection(sockets, conn) {
|
|
||||||
conn = c
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
while self.avail.available() {
|
|
||||||
let next = self.next();
|
|
||||||
let idx = next.idx();
|
|
||||||
if next.available() {
|
|
||||||
self.avail.set_available(idx, true);
|
|
||||||
match self.send_connection(sockets, conn) {
|
|
||||||
Ok(_) => return,
|
|
||||||
Err(c) => conn = c,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.avail.set_available(idx, false);
|
|
||||||
self.set_next();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Sending Conn failed due to either all workers are in error or not available.
|
|
||||||
// Enter backpressure state and try again.
|
|
||||||
self.maybe_backpressure(sockets, true);
|
|
||||||
self.accept_one(sockets, conn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send connection to worker and handle error.
|
// Send connection to worker and handle error.
|
||||||
fn send_connection(
|
fn send_connection(&mut self, conn: Conn) -> Result<(), Conn> {
|
||||||
&mut self,
|
let next = self.next();
|
||||||
sockets: &mut Slab<ServerSocketInfo>,
|
match next.send(conn) {
|
||||||
conn: Conn,
|
|
||||||
) -> Result<(), Conn> {
|
|
||||||
match self.next().send(conn) {
|
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
|
// Increment counter of WorkerHandle.
|
||||||
|
// Set worker to unavailable with it hit max (Return false).
|
||||||
|
if !next.inc_counter() {
|
||||||
|
let idx = next.idx();
|
||||||
|
self.avail.set_available(idx, false);
|
||||||
|
}
|
||||||
self.set_next();
|
self.set_next();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -438,7 +424,6 @@ impl Accept {
|
||||||
|
|
||||||
if self.handles.is_empty() {
|
if self.handles.is_empty() {
|
||||||
error!("No workers");
|
error!("No workers");
|
||||||
self.maybe_backpressure(sockets, true);
|
|
||||||
// All workers are gone and Conn is nowhere to be sent.
|
// All workers are gone and Conn is nowhere to be sent.
|
||||||
// Treat this situation as Ok and drop Conn.
|
// Treat this situation as Ok and drop Conn.
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
@ -451,19 +436,38 @@ impl Accept {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) {
|
fn accept_one(&mut self, mut conn: Conn) {
|
||||||
loop {
|
loop {
|
||||||
let info = sockets
|
let next = self.next();
|
||||||
.get_mut(token)
|
let idx = next.idx();
|
||||||
.expect("ServerSocketInfo is removed from Slab");
|
|
||||||
|
if self.avail.get_available(idx) {
|
||||||
|
match self.send_connection(conn) {
|
||||||
|
Ok(_) => return,
|
||||||
|
Err(c) => conn = c,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.avail.set_available(idx, false);
|
||||||
|
self.set_next();
|
||||||
|
|
||||||
|
if !self.avail.available() {
|
||||||
|
while let Err(c) = self.send_connection(conn) {
|
||||||
|
conn = c;
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {
|
||||||
|
while self.avail.available() {
|
||||||
|
let info = &mut sockets[token];
|
||||||
|
|
||||||
match info.lst.accept() {
|
match info.lst.accept() {
|
||||||
Ok(io) => {
|
Ok(io) => {
|
||||||
let msg = Conn {
|
let conn = Conn { io, token };
|
||||||
io,
|
self.accept_one(conn);
|
||||||
token: info.token,
|
|
||||||
};
|
|
||||||
self.accept_one(sockets, msg);
|
|
||||||
}
|
}
|
||||||
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
|
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
|
||||||
Err(ref e) if connection_error(e) => continue,
|
Err(ref e) if connection_error(e) => continue,
|
||||||
|
@ -491,11 +495,22 @@ impl Accept {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) {
|
||||||
|
sockets
|
||||||
|
.iter_mut()
|
||||||
|
.map(|info| info.token)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.into_iter()
|
||||||
|
.for_each(|idx| self.accept(sockets, idx))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
fn next(&self) -> &WorkerHandleAccept {
|
fn next(&self) -> &WorkerHandleAccept {
|
||||||
&self.handles[self.next]
|
&self.handles[self.next]
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set next worker handle that would accept connection.
|
/// Set next worker handle that would accept connection.
|
||||||
|
#[inline(always)]
|
||||||
fn set_next(&mut self) {
|
fn set_next(&mut self) {
|
||||||
self.next = (self.next + 1) % self.handles.len();
|
self.next = (self.next + 1) % self.handles.len();
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,31 +8,29 @@ use std::{
|
||||||
|
|
||||||
use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
|
use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
|
use tokio::sync::{
|
||||||
use tokio::sync::oneshot;
|
mpsc::{unbounded_channel, UnboundedReceiver},
|
||||||
|
oneshot,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::accept::AcceptLoop;
|
use crate::accept::AcceptLoop;
|
||||||
use crate::config::{ConfiguredService, ServiceConfig};
|
use crate::join_all;
|
||||||
use crate::server::{Server, ServerCommand};
|
use crate::server::{Server, ServerCommand};
|
||||||
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
|
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
|
||||||
use crate::signals::{Signal, Signals};
|
use crate::signals::{Signal, Signals};
|
||||||
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
||||||
use crate::socket::{MioTcpListener, MioTcpSocket};
|
use crate::socket::{MioTcpListener, MioTcpSocket};
|
||||||
use crate::waker_queue::{WakerInterest, WakerQueue};
|
use crate::waker_queue::{WakerInterest, WakerQueue};
|
||||||
use crate::worker::{
|
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
|
||||||
ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept,
|
|
||||||
WorkerHandleServer,
|
|
||||||
};
|
|
||||||
use crate::{join_all, Token};
|
|
||||||
|
|
||||||
/// Server builder
|
/// Server builder
|
||||||
pub struct ServerBuilder {
|
pub struct ServerBuilder {
|
||||||
threads: usize,
|
threads: usize,
|
||||||
token: Token,
|
token: usize,
|
||||||
backlog: u32,
|
backlog: u32,
|
||||||
handles: Vec<(usize, WorkerHandleServer)>,
|
handles: Vec<(usize, WorkerHandleServer)>,
|
||||||
services: Vec<Box<dyn InternalServiceFactory>>,
|
services: Vec<Box<dyn InternalServiceFactory>>,
|
||||||
sockets: Vec<(Token, String, MioListener)>,
|
sockets: Vec<(usize, String, MioListener)>,
|
||||||
accept: AcceptLoop,
|
accept: AcceptLoop,
|
||||||
exit: bool,
|
exit: bool,
|
||||||
no_signals: bool,
|
no_signals: bool,
|
||||||
|
@ -56,7 +54,7 @@ impl ServerBuilder {
|
||||||
|
|
||||||
ServerBuilder {
|
ServerBuilder {
|
||||||
threads: num_cpus::get(),
|
threads: num_cpus::get(),
|
||||||
token: Token::default(),
|
token: 0,
|
||||||
handles: Vec::new(),
|
handles: Vec::new(),
|
||||||
services: Vec::new(),
|
services: Vec::new(),
|
||||||
sockets: Vec::new(),
|
sockets: Vec::new(),
|
||||||
|
@ -149,32 +147,6 @@ impl ServerBuilder {
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Execute external configuration as part of the server building process.
|
|
||||||
///
|
|
||||||
/// This function is useful for moving parts of configuration to a different module or
|
|
||||||
/// even library.
|
|
||||||
pub fn configure<F>(mut self, f: F) -> io::Result<ServerBuilder>
|
|
||||||
where
|
|
||||||
F: Fn(&mut ServiceConfig) -> io::Result<()>,
|
|
||||||
{
|
|
||||||
let mut cfg = ServiceConfig::new(self.threads, self.backlog);
|
|
||||||
|
|
||||||
f(&mut cfg)?;
|
|
||||||
|
|
||||||
if let Some(apply) = cfg.apply {
|
|
||||||
let mut srv = ConfiguredService::new(apply);
|
|
||||||
for (name, lst) in cfg.services {
|
|
||||||
let token = self.token.next();
|
|
||||||
srv.stream(token, name.clone(), lst.local_addr()?);
|
|
||||||
self.sockets.push((token, name, MioListener::Tcp(lst)));
|
|
||||||
}
|
|
||||||
self.services.push(Box::new(srv));
|
|
||||||
}
|
|
||||||
self.threads = cfg.threads;
|
|
||||||
|
|
||||||
Ok(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add new service to the server.
|
/// Add new service to the server.
|
||||||
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
|
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
|
||||||
where
|
where
|
||||||
|
@ -184,7 +156,7 @@ impl ServerBuilder {
|
||||||
let sockets = bind_addr(addr, self.backlog)?;
|
let sockets = bind_addr(addr, self.backlog)?;
|
||||||
|
|
||||||
for lst in sockets {
|
for lst in sockets {
|
||||||
let token = self.token.next();
|
let token = self.next_token();
|
||||||
self.services.push(StreamNewService::create(
|
self.services.push(StreamNewService::create(
|
||||||
name.as_ref().to_string(),
|
name.as_ref().to_string(),
|
||||||
token,
|
token,
|
||||||
|
@ -233,7 +205,7 @@ impl ServerBuilder {
|
||||||
{
|
{
|
||||||
use std::net::{IpAddr, Ipv4Addr};
|
use std::net::{IpAddr, Ipv4Addr};
|
||||||
lst.set_nonblocking(true)?;
|
lst.set_nonblocking(true)?;
|
||||||
let token = self.token.next();
|
let token = self.next_token();
|
||||||
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
||||||
self.services.push(StreamNewService::create(
|
self.services.push(StreamNewService::create(
|
||||||
name.as_ref().to_string(),
|
name.as_ref().to_string(),
|
||||||
|
@ -259,7 +231,7 @@ impl ServerBuilder {
|
||||||
lst.set_nonblocking(true)?;
|
lst.set_nonblocking(true)?;
|
||||||
let addr = lst.local_addr()?;
|
let addr = lst.local_addr()?;
|
||||||
|
|
||||||
let token = self.token.next();
|
let token = self.next_token();
|
||||||
self.services.push(StreamNewService::create(
|
self.services.push(StreamNewService::create(
|
||||||
name.as_ref().to_string(),
|
name.as_ref().to_string(),
|
||||||
token,
|
token,
|
||||||
|
@ -318,12 +290,11 @@ impl ServerBuilder {
|
||||||
fn start_worker(
|
fn start_worker(
|
||||||
&self,
|
&self,
|
||||||
idx: usize,
|
idx: usize,
|
||||||
waker: WakerQueue,
|
waker_queue: WakerQueue,
|
||||||
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
||||||
let avail = WorkerAvailability::new(idx, waker);
|
|
||||||
let services = self.services.iter().map(|v| v.clone_factory()).collect();
|
let services = self.services.iter().map(|v| v.clone_factory()).collect();
|
||||||
|
|
||||||
ServerWorker::start(idx, services, avail, self.worker_config)
|
ServerWorker::start(idx, services, waker_queue, self.worker_config)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_cmd(&mut self, item: ServerCommand) {
|
fn handle_cmd(&mut self, item: ServerCommand) {
|
||||||
|
@ -437,6 +408,12 @@ impl ServerBuilder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn next_token(&mut self) -> usize {
|
||||||
|
let token = self.token;
|
||||||
|
self.token += 1;
|
||||||
|
token
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Future for ServerBuilder {
|
impl Future for ServerBuilder {
|
||||||
|
|
|
@ -1,287 +0,0 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::future::Future;
|
|
||||||
use std::{fmt, io};
|
|
||||||
|
|
||||||
use actix_rt::net::TcpStream;
|
|
||||||
use actix_service::{
|
|
||||||
fn_service, IntoServiceFactory as IntoBaseServiceFactory,
|
|
||||||
ServiceFactory as BaseServiceFactory,
|
|
||||||
};
|
|
||||||
use actix_utils::{counter::CounterGuard, future::ready};
|
|
||||||
use futures_core::future::LocalBoxFuture;
|
|
||||||
use log::error;
|
|
||||||
|
|
||||||
use crate::builder::bind_addr;
|
|
||||||
use crate::service::{BoxedServerService, InternalServiceFactory, StreamService};
|
|
||||||
use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
|
||||||
use crate::Token;
|
|
||||||
|
|
||||||
pub struct ServiceConfig {
|
|
||||||
pub(crate) services: Vec<(String, MioTcpListener)>,
|
|
||||||
pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
|
|
||||||
pub(crate) threads: usize,
|
|
||||||
pub(crate) backlog: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServiceConfig {
|
|
||||||
pub(super) fn new(threads: usize, backlog: u32) -> ServiceConfig {
|
|
||||||
ServiceConfig {
|
|
||||||
threads,
|
|
||||||
backlog,
|
|
||||||
services: Vec::new(),
|
|
||||||
apply: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Set number of workers to start.
|
|
||||||
///
|
|
||||||
/// By default server uses number of available logical cpu as workers
|
|
||||||
/// count.
|
|
||||||
pub fn workers(&mut self, num: usize) {
|
|
||||||
self.threads = num;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add new service to server
|
|
||||||
pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
|
|
||||||
where
|
|
||||||
U: ToSocketAddrs,
|
|
||||||
{
|
|
||||||
let sockets = bind_addr(addr, self.backlog)?;
|
|
||||||
|
|
||||||
for lst in sockets {
|
|
||||||
self._listen(name.as_ref(), lst);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(self)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add new service to server
|
|
||||||
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: StdTcpListener) -> &mut Self {
|
|
||||||
self._listen(name, MioTcpListener::from_std(lst))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Register service configuration function. This function get called
|
|
||||||
/// during worker runtime configuration. It get executed in worker thread.
|
|
||||||
pub fn apply<F>(&mut self, f: F) -> io::Result<()>
|
|
||||||
where
|
|
||||||
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
|
|
||||||
{
|
|
||||||
self.apply = Some(Box::new(f));
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn _listen<N: AsRef<str>>(&mut self, name: N, lst: MioTcpListener) -> &mut Self {
|
|
||||||
if self.apply.is_none() {
|
|
||||||
self.apply = Some(Box::new(not_configured));
|
|
||||||
}
|
|
||||||
self.services.push((name.as_ref().to_string(), lst));
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) struct ConfiguredService {
|
|
||||||
rt: Box<dyn ServiceRuntimeConfiguration>,
|
|
||||||
names: HashMap<Token, (String, StdSocketAddr)>,
|
|
||||||
topics: HashMap<String, Token>,
|
|
||||||
services: Vec<Token>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ConfiguredService {
|
|
||||||
pub(super) fn new(rt: Box<dyn ServiceRuntimeConfiguration>) -> Self {
|
|
||||||
ConfiguredService {
|
|
||||||
rt,
|
|
||||||
names: HashMap::new(),
|
|
||||||
topics: HashMap::new(),
|
|
||||||
services: Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn stream(&mut self, token: Token, name: String, addr: StdSocketAddr) {
|
|
||||||
self.names.insert(token, (name.clone(), addr));
|
|
||||||
self.topics.insert(name, token);
|
|
||||||
self.services.push(token);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl InternalServiceFactory for ConfiguredService {
|
|
||||||
fn name(&self, token: Token) -> &str {
|
|
||||||
&self.names[&token].0
|
|
||||||
}
|
|
||||||
|
|
||||||
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
|
|
||||||
Box::new(Self {
|
|
||||||
rt: self.rt.clone(),
|
|
||||||
names: self.names.clone(),
|
|
||||||
topics: self.topics.clone(),
|
|
||||||
services: self.services.clone(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
|
|
||||||
// configure services
|
|
||||||
let mut rt = ServiceRuntime::new(self.topics.clone());
|
|
||||||
self.rt.configure(&mut rt);
|
|
||||||
rt.validate();
|
|
||||||
let mut names = self.names.clone();
|
|
||||||
let tokens = self.services.clone();
|
|
||||||
|
|
||||||
// construct services
|
|
||||||
Box::pin(async move {
|
|
||||||
let mut services = rt.services;
|
|
||||||
// TODO: Proper error handling here
|
|
||||||
for f in rt.onstart.into_iter() {
|
|
||||||
f.await;
|
|
||||||
}
|
|
||||||
let mut res = vec![];
|
|
||||||
for token in tokens {
|
|
||||||
if let Some(srv) = services.remove(&token) {
|
|
||||||
let newserv = srv.new_service(());
|
|
||||||
match newserv.await {
|
|
||||||
Ok(serv) => {
|
|
||||||
res.push((token, serv));
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
error!("Can not construct service");
|
|
||||||
return Err(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
let name = names.remove(&token).unwrap().0;
|
|
||||||
res.push((
|
|
||||||
token,
|
|
||||||
Box::new(StreamService::new(fn_service(move |_: TcpStream| {
|
|
||||||
error!("Service {:?} is not configured", name);
|
|
||||||
ready::<Result<_, ()>>(Ok(()))
|
|
||||||
}))),
|
|
||||||
));
|
|
||||||
};
|
|
||||||
}
|
|
||||||
Ok(res)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) trait ServiceRuntimeConfiguration: Send {
|
|
||||||
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration>;
|
|
||||||
|
|
||||||
fn configure(&self, rt: &mut ServiceRuntime);
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F> ServiceRuntimeConfiguration for F
|
|
||||||
where
|
|
||||||
F: Fn(&mut ServiceRuntime) + Send + Clone + 'static,
|
|
||||||
{
|
|
||||||
fn clone(&self) -> Box<dyn ServiceRuntimeConfiguration> {
|
|
||||||
Box::new(self.clone())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn configure(&self, rt: &mut ServiceRuntime) {
|
|
||||||
(self)(rt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn not_configured(_: &mut ServiceRuntime) {
|
|
||||||
error!("Service is not configured");
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ServiceRuntime {
|
|
||||||
names: HashMap<String, Token>,
|
|
||||||
services: HashMap<Token, BoxedNewService>,
|
|
||||||
onstart: Vec<LocalBoxFuture<'static, ()>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServiceRuntime {
|
|
||||||
fn new(names: HashMap<String, Token>) -> Self {
|
|
||||||
ServiceRuntime {
|
|
||||||
names,
|
|
||||||
services: HashMap::new(),
|
|
||||||
onstart: Vec::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn validate(&self) {
|
|
||||||
for (name, token) in &self.names {
|
|
||||||
if !self.services.contains_key(&token) {
|
|
||||||
error!("Service {:?} is not configured", name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Register service.
|
|
||||||
///
|
|
||||||
/// Name of the service must be registered during configuration stage with
|
|
||||||
/// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
|
|
||||||
pub fn service<T, F>(&mut self, name: &str, service: F)
|
|
||||||
where
|
|
||||||
F: IntoBaseServiceFactory<T, TcpStream>,
|
|
||||||
T: BaseServiceFactory<TcpStream, Config = ()> + 'static,
|
|
||||||
T::Future: 'static,
|
|
||||||
T::Service: 'static,
|
|
||||||
T::InitError: fmt::Debug,
|
|
||||||
{
|
|
||||||
// let name = name.to_owned();
|
|
||||||
if let Some(token) = self.names.get(name) {
|
|
||||||
self.services.insert(
|
|
||||||
*token,
|
|
||||||
Box::new(ServiceFactory {
|
|
||||||
inner: service.into_factory(),
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
panic!("Unknown service: {:?}", name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Execute future before services initialization.
|
|
||||||
pub fn on_start<F>(&mut self, fut: F)
|
|
||||||
where
|
|
||||||
F: Future<Output = ()> + 'static,
|
|
||||||
{
|
|
||||||
self.onstart.push(Box::pin(fut))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type BoxedNewService = Box<
|
|
||||||
dyn BaseServiceFactory<
|
|
||||||
(CounterGuard, MioStream),
|
|
||||||
Response = (),
|
|
||||||
Error = (),
|
|
||||||
InitError = (),
|
|
||||||
Config = (),
|
|
||||||
Service = BoxedServerService,
|
|
||||||
Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>,
|
|
||||||
>,
|
|
||||||
>;
|
|
||||||
|
|
||||||
struct ServiceFactory<T> {
|
|
||||||
inner: T,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> BaseServiceFactory<(CounterGuard, MioStream)> for ServiceFactory<T>
|
|
||||||
where
|
|
||||||
T: BaseServiceFactory<TcpStream, Config = ()>,
|
|
||||||
T::Future: 'static,
|
|
||||||
T::Service: 'static,
|
|
||||||
T::Error: 'static,
|
|
||||||
T::InitError: fmt::Debug + 'static,
|
|
||||||
{
|
|
||||||
type Response = ();
|
|
||||||
type Error = ();
|
|
||||||
type Config = ();
|
|
||||||
type Service = BoxedServerService;
|
|
||||||
type InitError = ();
|
|
||||||
type Future = LocalBoxFuture<'static, Result<BoxedServerService, ()>>;
|
|
||||||
|
|
||||||
fn new_service(&self, _: ()) -> Self::Future {
|
|
||||||
let fut = self.inner.new_service(());
|
|
||||||
Box::pin(async move {
|
|
||||||
match fut.await {
|
|
||||||
Ok(s) => Ok(Box::new(StreamService::new(s)) as BoxedServerService),
|
|
||||||
Err(e) => {
|
|
||||||
error!("Can not construct service: {:?}", e);
|
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -6,7 +6,6 @@
|
||||||
|
|
||||||
mod accept;
|
mod accept;
|
||||||
mod builder;
|
mod builder;
|
||||||
mod config;
|
|
||||||
mod server;
|
mod server;
|
||||||
mod service;
|
mod service;
|
||||||
mod signals;
|
mod signals;
|
||||||
|
@ -16,7 +15,6 @@ mod waker_queue;
|
||||||
mod worker;
|
mod worker;
|
||||||
|
|
||||||
pub use self::builder::ServerBuilder;
|
pub use self::builder::ServerBuilder;
|
||||||
pub use self::config::{ServiceConfig, ServiceRuntime};
|
|
||||||
pub use self::server::Server;
|
pub use self::server::Server;
|
||||||
pub use self::service::ServiceFactory;
|
pub use self::service::ServiceFactory;
|
||||||
pub use self::test_server::TestServer;
|
pub use self::test_server::TestServer;
|
||||||
|
@ -28,28 +26,6 @@ use std::future::Future;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
/// Socket ID token
|
|
||||||
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
|
|
||||||
pub(crate) struct Token(usize);
|
|
||||||
|
|
||||||
impl Default for Token {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Token {
|
|
||||||
fn new() -> Self {
|
|
||||||
Self(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn next(&mut self) -> Token {
|
|
||||||
let token = Token(self.0);
|
|
||||||
self.0 += 1;
|
|
||||||
token
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Start server building process
|
/// Start server building process
|
||||||
pub fn new() -> ServerBuilder {
|
pub fn new() -> ServerBuilder {
|
||||||
ServerBuilder::default()
|
ServerBuilder::default()
|
||||||
|
|
|
@ -3,15 +3,12 @@ use std::net::SocketAddr;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use actix_service::{Service, ServiceFactory as BaseServiceFactory};
|
use actix_service::{Service, ServiceFactory as BaseServiceFactory};
|
||||||
use actix_utils::{
|
use actix_utils::future::{ready, Ready};
|
||||||
counter::CounterGuard,
|
|
||||||
future::{ready, Ready},
|
|
||||||
};
|
|
||||||
use futures_core::future::LocalBoxFuture;
|
use futures_core::future::LocalBoxFuture;
|
||||||
use log::error;
|
use log::error;
|
||||||
|
|
||||||
use crate::socket::{FromStream, MioStream};
|
use crate::socket::{FromStream, MioStream};
|
||||||
use crate::Token;
|
use crate::worker::WorkerCounterGuard;
|
||||||
|
|
||||||
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
|
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
|
||||||
type Factory: BaseServiceFactory<Stream, Config = ()>;
|
type Factory: BaseServiceFactory<Stream, Config = ()>;
|
||||||
|
@ -20,16 +17,16 @@ pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) trait InternalServiceFactory: Send {
|
pub(crate) trait InternalServiceFactory: Send {
|
||||||
fn name(&self, token: Token) -> &str;
|
fn name(&self, token: usize) -> &str;
|
||||||
|
|
||||||
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
|
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
|
||||||
|
|
||||||
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>;
|
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) type BoxedServerService = Box<
|
pub(crate) type BoxedServerService = Box<
|
||||||
dyn Service<
|
dyn Service<
|
||||||
(CounterGuard, MioStream),
|
(WorkerCounterGuard, MioStream),
|
||||||
Response = (),
|
Response = (),
|
||||||
Error = (),
|
Error = (),
|
||||||
Future = Ready<Result<(), ()>>,
|
Future = Ready<Result<(), ()>>,
|
||||||
|
@ -50,7 +47,7 @@ impl<S, I> StreamService<S, I> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, I> Service<(CounterGuard, MioStream)> for StreamService<S, I>
|
impl<S, I> Service<(WorkerCounterGuard, MioStream)> for StreamService<S, I>
|
||||||
where
|
where
|
||||||
S: Service<I>,
|
S: Service<I>,
|
||||||
S::Future: 'static,
|
S::Future: 'static,
|
||||||
|
@ -65,7 +62,7 @@ where
|
||||||
self.service.poll_ready(ctx).map_err(|_| ())
|
self.service.poll_ready(ctx).map_err(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn call(&self, (guard, req): (CounterGuard, MioStream)) -> Self::Future {
|
fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future {
|
||||||
ready(match FromStream::from_mio(req) {
|
ready(match FromStream::from_mio(req) {
|
||||||
Ok(stream) => {
|
Ok(stream) => {
|
||||||
let f = self.service.call(stream);
|
let f = self.service.call(stream);
|
||||||
|
@ -86,7 +83,7 @@ where
|
||||||
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
|
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
|
||||||
name: String,
|
name: String,
|
||||||
inner: F,
|
inner: F,
|
||||||
token: Token,
|
token: usize,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
_t: PhantomData<Io>,
|
_t: PhantomData<Io>,
|
||||||
}
|
}
|
||||||
|
@ -98,7 +95,7 @@ where
|
||||||
{
|
{
|
||||||
pub(crate) fn create(
|
pub(crate) fn create(
|
||||||
name: String,
|
name: String,
|
||||||
token: Token,
|
token: usize,
|
||||||
inner: F,
|
inner: F,
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
) -> Box<dyn InternalServiceFactory> {
|
) -> Box<dyn InternalServiceFactory> {
|
||||||
|
@ -117,7 +114,7 @@ where
|
||||||
F: ServiceFactory<Io>,
|
F: ServiceFactory<Io>,
|
||||||
Io: FromStream + Send + 'static,
|
Io: FromStream + Send + 'static,
|
||||||
{
|
{
|
||||||
fn name(&self, _: Token) -> &str {
|
fn name(&self, _: usize) -> &str {
|
||||||
&self.name
|
&self.name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,14 +128,14 @@ where
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> {
|
fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
|
||||||
let token = self.token;
|
let token = self.token;
|
||||||
let fut = self.inner.create().new_service(());
|
let fut = self.inner.create().new_service(());
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
match fut.await {
|
match fut.await {
|
||||||
Ok(inner) => {
|
Ok(inner) => {
|
||||||
let service = Box::new(StreamService::new(inner)) as _;
|
let service = Box::new(StreamService::new(inner)) as _;
|
||||||
Ok(vec![(token, service)])
|
Ok((token, service))
|
||||||
}
|
}
|
||||||
Err(_) => Err(()),
|
Err(_) => Err(()),
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,9 +23,15 @@ pub(crate) enum MioListener {
|
||||||
impl MioListener {
|
impl MioListener {
|
||||||
pub(crate) fn local_addr(&self) -> SocketAddr {
|
pub(crate) fn local_addr(&self) -> SocketAddr {
|
||||||
match *self {
|
match *self {
|
||||||
MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()),
|
MioListener::Tcp(ref lst) => lst
|
||||||
|
.local_addr()
|
||||||
|
.map(SocketAddr::Tcp)
|
||||||
|
.unwrap_or(SocketAddr::Unknown),
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()),
|
MioListener::Uds(ref lst) => lst
|
||||||
|
.local_addr()
|
||||||
|
.map(SocketAddr::Uds)
|
||||||
|
.unwrap_or(SocketAddr::Unknown),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,14 +116,15 @@ impl fmt::Debug for MioListener {
|
||||||
impl fmt::Display for MioListener {
|
impl fmt::Display for MioListener {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match *self {
|
match *self {
|
||||||
MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()),
|
MioListener::Tcp(ref lst) => write!(f, "{:?}", lst),
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()),
|
MioListener::Uds(ref lst) => write!(f, "{:?}", lst),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) enum SocketAddr {
|
pub(crate) enum SocketAddr {
|
||||||
|
Unknown,
|
||||||
Tcp(StdSocketAddr),
|
Tcp(StdSocketAddr),
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
Uds(mio::net::SocketAddr),
|
Uds(mio::net::SocketAddr),
|
||||||
|
@ -126,9 +133,10 @@ pub(crate) enum SocketAddr {
|
||||||
impl fmt::Display for SocketAddr {
|
impl fmt::Display for SocketAddr {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match *self {
|
match *self {
|
||||||
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
|
Self::Unknown => write!(f, "Unknown SocketAddr"),
|
||||||
|
Self::Tcp(ref addr) => write!(f, "{}", addr),
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
|
Self::Uds(ref addr) => write!(f, "{:?}", addr),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -136,9 +144,10 @@ impl fmt::Display for SocketAddr {
|
||||||
impl fmt::Debug for SocketAddr {
|
impl fmt::Debug for SocketAddr {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
match *self {
|
match *self {
|
||||||
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
|
Self::Unknown => write!(f, "Unknown SocketAddr"),
|
||||||
|
Self::Tcp(ref addr) => write!(f, "{:?}", addr),
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
|
Self::Uds(ref addr) => write!(f, "{:?}", addr),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,9 @@ use std::{
|
||||||
future::Future,
|
future::Future,
|
||||||
mem,
|
mem,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
|
rc::Rc,
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicBool, Ordering},
|
atomic::{AtomicUsize, Ordering},
|
||||||
Arc,
|
Arc,
|
||||||
},
|
},
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
|
@ -15,7 +16,6 @@ use actix_rt::{
|
||||||
time::{sleep, Instant, Sleep},
|
time::{sleep, Instant, Sleep},
|
||||||
Arbiter,
|
Arbiter,
|
||||||
};
|
};
|
||||||
use actix_utils::counter::Counter;
|
|
||||||
use futures_core::{future::LocalBoxFuture, ready};
|
use futures_core::{future::LocalBoxFuture, ready};
|
||||||
use log::{error, info, trace};
|
use log::{error, info, trace};
|
||||||
use tokio::sync::{
|
use tokio::sync::{
|
||||||
|
@ -23,10 +23,10 @@ use tokio::sync::{
|
||||||
oneshot,
|
oneshot,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use crate::join_all;
|
||||||
use crate::service::{BoxedServerService, InternalServiceFactory};
|
use crate::service::{BoxedServerService, InternalServiceFactory};
|
||||||
use crate::socket::MioStream;
|
use crate::socket::MioStream;
|
||||||
use crate::waker_queue::{WakerInterest, WakerQueue};
|
use crate::waker_queue::{WakerInterest, WakerQueue};
|
||||||
use crate::{join_all, Token};
|
|
||||||
|
|
||||||
/// Stop worker message. Returns `true` on successful graceful shutdown.
|
/// Stop worker message. Returns `true` on successful graceful shutdown.
|
||||||
/// and `false` if some connections still alive when shutdown execute.
|
/// and `false` if some connections still alive when shutdown execute.
|
||||||
|
@ -38,35 +38,131 @@ pub(crate) struct Stop {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct Conn {
|
pub(crate) struct Conn {
|
||||||
pub io: MioStream,
|
pub io: MioStream,
|
||||||
pub token: Token,
|
pub token: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_pair(
|
fn handle_pair(
|
||||||
idx: usize,
|
idx: usize,
|
||||||
tx1: UnboundedSender<Conn>,
|
tx1: UnboundedSender<Conn>,
|
||||||
tx2: UnboundedSender<Stop>,
|
tx2: UnboundedSender<Stop>,
|
||||||
avail: WorkerAvailability,
|
counter: Counter,
|
||||||
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
||||||
let accept = WorkerHandleAccept { tx: tx1, avail };
|
let accept = WorkerHandleAccept {
|
||||||
|
idx,
|
||||||
|
tx: tx1,
|
||||||
|
counter,
|
||||||
|
};
|
||||||
|
|
||||||
let server = WorkerHandleServer { idx, tx: tx2 };
|
let server = WorkerHandleServer { idx, tx: tx2 };
|
||||||
|
|
||||||
(accept, server)
|
(accept, server)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// counter: Arc<AtomicUsize> field is owned by `Accept` thread and `ServerWorker` thread.
|
||||||
|
///
|
||||||
|
/// `Accept` would increment the counter and `ServerWorker` would decrement it.
|
||||||
|
///
|
||||||
|
/// # Atomic Ordering:
|
||||||
|
///
|
||||||
|
/// `Accept` always look into it's cached `Availability` field for `ServerWorker` state.
|
||||||
|
/// It lazily increment counter after successful dispatching new work to `ServerWorker`.
|
||||||
|
/// On reaching counter limit `Accept` update it's cached `Availability` and mark worker as
|
||||||
|
/// unable to accept any work.
|
||||||
|
///
|
||||||
|
/// `ServerWorker` always decrement the counter when every work received from `Accept` is done.
|
||||||
|
/// On reaching counter limit worker would use `mio::Waker` and `WakerQueue` to wake up `Accept`
|
||||||
|
/// and notify it to update cached `Availability` again to mark worker as able to accept work again.
|
||||||
|
///
|
||||||
|
/// Hense a wake up would only happen after `Accept` increment it to limit.
|
||||||
|
/// And a decrement to limit always wake up `Accept`.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub(crate) struct Counter {
|
||||||
|
counter: Arc<AtomicUsize>,
|
||||||
|
limit: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Counter {
|
||||||
|
pub(crate) fn new(limit: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
counter: Arc::new(AtomicUsize::new(1)),
|
||||||
|
limit,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Increment counter by 1 and return true when hitting limit
|
||||||
|
#[inline(always)]
|
||||||
|
pub(crate) fn inc(&self) -> bool {
|
||||||
|
self.counter.fetch_add(1, Ordering::Relaxed) != self.limit
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Decrement counter by 1 and return true if crossing limit.
|
||||||
|
#[inline(always)]
|
||||||
|
pub(crate) fn dec(&self) -> bool {
|
||||||
|
self.counter.fetch_sub(1, Ordering::Relaxed) == self.limit
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn total(&self) -> usize {
|
||||||
|
self.counter.load(Ordering::SeqCst) - 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct WorkerCounter {
|
||||||
|
idx: usize,
|
||||||
|
inner: Rc<(WakerQueue, Counter)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for WorkerCounter {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
idx: self.idx,
|
||||||
|
inner: self.inner.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WorkerCounter {
|
||||||
|
pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self {
|
||||||
|
Self {
|
||||||
|
idx,
|
||||||
|
inner: Rc::new((waker_queue, counter)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
pub(crate) fn guard(&self) -> WorkerCounterGuard {
|
||||||
|
WorkerCounterGuard(self.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn total(&self) -> usize {
|
||||||
|
self.inner.1.total()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct WorkerCounterGuard(WorkerCounter);
|
||||||
|
|
||||||
|
impl Drop for WorkerCounterGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let (waker_queue, counter) = &*self.0.inner;
|
||||||
|
if counter.dec() {
|
||||||
|
waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle to worker that can send connection message to worker and share the
|
/// Handle to worker that can send connection message to worker and share the
|
||||||
/// availability of worker to other thread.
|
/// availability of worker to other thread.
|
||||||
///
|
///
|
||||||
/// Held by [Accept](crate::accept::Accept).
|
/// Held by [Accept](crate::accept::Accept).
|
||||||
pub(crate) struct WorkerHandleAccept {
|
pub(crate) struct WorkerHandleAccept {
|
||||||
|
idx: usize,
|
||||||
tx: UnboundedSender<Conn>,
|
tx: UnboundedSender<Conn>,
|
||||||
avail: WorkerAvailability,
|
counter: Counter,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WorkerHandleAccept {
|
impl WorkerHandleAccept {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub(crate) fn idx(&self) -> usize {
|
pub(crate) fn idx(&self) -> usize {
|
||||||
self.avail.idx
|
self.idx
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
|
@ -75,8 +171,8 @@ impl WorkerHandleAccept {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
pub(crate) fn available(&self) -> bool {
|
pub(crate) fn inc_counter(&self) -> bool {
|
||||||
self.avail.available()
|
self.counter.inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,40 +192,6 @@ impl WorkerHandleServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub(crate) struct WorkerAvailability {
|
|
||||||
idx: usize,
|
|
||||||
waker: WakerQueue,
|
|
||||||
available: Arc<AtomicBool>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl WorkerAvailability {
|
|
||||||
pub fn new(idx: usize, waker: WakerQueue) -> Self {
|
|
||||||
WorkerAvailability {
|
|
||||||
idx,
|
|
||||||
waker,
|
|
||||||
available: Arc::new(AtomicBool::new(false)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[inline(always)]
|
|
||||||
pub fn available(&self) -> bool {
|
|
||||||
self.available.load(Ordering::Acquire)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn set(&self, val: bool) {
|
|
||||||
// Ordering:
|
|
||||||
//
|
|
||||||
// There could be multiple set calls happen in one <ServerWorker as Future>::poll.
|
|
||||||
// Order is important between them.
|
|
||||||
let old = self.available.swap(val, Ordering::AcqRel);
|
|
||||||
// Notify the accept on switched to available.
|
|
||||||
if !old && val {
|
|
||||||
self.waker.wake(WakerInterest::WorkerAvailable(self.idx));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Service worker.
|
/// Service worker.
|
||||||
///
|
///
|
||||||
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
|
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
|
||||||
|
@ -138,9 +200,8 @@ pub(crate) struct ServerWorker {
|
||||||
// It must be dropped as soon as ServerWorker dropping.
|
// It must be dropped as soon as ServerWorker dropping.
|
||||||
rx: UnboundedReceiver<Conn>,
|
rx: UnboundedReceiver<Conn>,
|
||||||
rx2: UnboundedReceiver<Stop>,
|
rx2: UnboundedReceiver<Stop>,
|
||||||
|
counter: WorkerCounter,
|
||||||
services: Box<[WorkerService]>,
|
services: Box<[WorkerService]>,
|
||||||
availability: WorkerAvailability,
|
|
||||||
conns: Counter,
|
|
||||||
factories: Box<[Box<dyn InternalServiceFactory>]>,
|
factories: Box<[Box<dyn InternalServiceFactory>]>,
|
||||||
state: WorkerState,
|
state: WorkerState,
|
||||||
shutdown_timeout: Duration,
|
shutdown_timeout: Duration,
|
||||||
|
@ -207,15 +268,15 @@ impl ServerWorker {
|
||||||
pub(crate) fn start(
|
pub(crate) fn start(
|
||||||
idx: usize,
|
idx: usize,
|
||||||
factories: Vec<Box<dyn InternalServiceFactory>>,
|
factories: Vec<Box<dyn InternalServiceFactory>>,
|
||||||
availability: WorkerAvailability,
|
waker_queue: WakerQueue,
|
||||||
config: ServerWorkerConfig,
|
config: ServerWorkerConfig,
|
||||||
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
) -> (WorkerHandleAccept, WorkerHandleServer) {
|
||||||
assert!(!availability.available());
|
|
||||||
|
|
||||||
let (tx1, rx) = unbounded_channel();
|
let (tx1, rx) = unbounded_channel();
|
||||||
let (tx2, rx2) = unbounded_channel();
|
let (tx2, rx2) = unbounded_channel();
|
||||||
let avail = availability.clone();
|
|
||||||
|
|
||||||
|
let counter = Counter::new(config.max_concurrent_connections);
|
||||||
|
|
||||||
|
let counter_clone = counter.clone();
|
||||||
// every worker runs in it's own arbiter.
|
// every worker runs in it's own arbiter.
|
||||||
// use a custom tokio runtime builder to change the settings of runtime.
|
// use a custom tokio runtime builder to change the settings of runtime.
|
||||||
Arbiter::with_tokio_rt(move || {
|
Arbiter::with_tokio_rt(move || {
|
||||||
|
@ -231,11 +292,7 @@ impl ServerWorker {
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(idx, factory)| {
|
.map(|(idx, factory)| {
|
||||||
let fut = factory.create();
|
let fut = factory.create();
|
||||||
async move {
|
async move { fut.await.map(|(t, s)| (idx, t, s)) }
|
||||||
fut.await.map(|r| {
|
|
||||||
r.into_iter().map(|(t, s)| (idx, t, s)).collect::<Vec<_>>()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
@ -248,9 +305,8 @@ impl ServerWorker {
|
||||||
let services = match res {
|
let services = match res {
|
||||||
Ok(res) => res
|
Ok(res) => res
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.flatten()
|
|
||||||
.fold(Vec::new(), |mut services, (factory, token, service)| {
|
.fold(Vec::new(), |mut services, (factory, token, service)| {
|
||||||
assert_eq!(token.0, services.len());
|
assert_eq!(token, services.len());
|
||||||
services.push(WorkerService {
|
services.push(WorkerService {
|
||||||
factory,
|
factory,
|
||||||
service,
|
service,
|
||||||
|
@ -271,8 +327,7 @@ impl ServerWorker {
|
||||||
rx,
|
rx,
|
||||||
rx2,
|
rx2,
|
||||||
services,
|
services,
|
||||||
availability,
|
counter: WorkerCounter::new(idx, waker_queue, counter_clone),
|
||||||
conns: Counter::new(config.max_concurrent_connections),
|
|
||||||
factories: factories.into_boxed_slice(),
|
factories: factories.into_boxed_slice(),
|
||||||
state: Default::default(),
|
state: Default::default(),
|
||||||
shutdown_timeout: config.shutdown_timeout,
|
shutdown_timeout: config.shutdown_timeout,
|
||||||
|
@ -280,16 +335,16 @@ impl ServerWorker {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
handle_pair(idx, tx1, tx2, avail)
|
handle_pair(idx, tx1, tx2, counter)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn restart_service(&mut self, token: Token, factory_id: usize) {
|
fn restart_service(&mut self, idx: usize, factory_id: usize) {
|
||||||
let factory = &self.factories[factory_id];
|
let factory = &self.factories[factory_id];
|
||||||
trace!("Service {:?} failed, restarting", factory.name(token));
|
trace!("Service {:?} failed, restarting", factory.name(idx));
|
||||||
self.services[token.0].status = WorkerServiceStatus::Restarting;
|
self.services[idx].status = WorkerServiceStatus::Restarting;
|
||||||
self.state = WorkerState::Restarting(Restart {
|
self.state = WorkerState::Restarting(Restart {
|
||||||
factory_id,
|
factory_id,
|
||||||
token,
|
token: idx,
|
||||||
fut: factory.create(),
|
fut: factory.create(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -307,8 +362,8 @@ impl ServerWorker {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
|
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (usize, usize)> {
|
||||||
let mut ready = self.conns.available(cx);
|
let mut ready = true;
|
||||||
for (idx, srv) in self.services.iter_mut().enumerate() {
|
for (idx, srv) in self.services.iter_mut().enumerate() {
|
||||||
if srv.status == WorkerServiceStatus::Available
|
if srv.status == WorkerServiceStatus::Available
|
||||||
|| srv.status == WorkerServiceStatus::Unavailable
|
|| srv.status == WorkerServiceStatus::Unavailable
|
||||||
|
@ -318,7 +373,7 @@ impl ServerWorker {
|
||||||
if srv.status == WorkerServiceStatus::Unavailable {
|
if srv.status == WorkerServiceStatus::Unavailable {
|
||||||
trace!(
|
trace!(
|
||||||
"Service {:?} is available",
|
"Service {:?} is available",
|
||||||
self.factories[srv.factory].name(Token(idx))
|
self.factories[srv.factory].name(idx)
|
||||||
);
|
);
|
||||||
srv.status = WorkerServiceStatus::Available;
|
srv.status = WorkerServiceStatus::Available;
|
||||||
}
|
}
|
||||||
|
@ -329,7 +384,7 @@ impl ServerWorker {
|
||||||
if srv.status == WorkerServiceStatus::Available {
|
if srv.status == WorkerServiceStatus::Available {
|
||||||
trace!(
|
trace!(
|
||||||
"Service {:?} is unavailable",
|
"Service {:?} is unavailable",
|
||||||
self.factories[srv.factory].name(Token(idx))
|
self.factories[srv.factory].name(idx)
|
||||||
);
|
);
|
||||||
srv.status = WorkerServiceStatus::Unavailable;
|
srv.status = WorkerServiceStatus::Unavailable;
|
||||||
}
|
}
|
||||||
|
@ -337,10 +392,10 @@ impl ServerWorker {
|
||||||
Poll::Ready(Err(_)) => {
|
Poll::Ready(Err(_)) => {
|
||||||
error!(
|
error!(
|
||||||
"Service {:?} readiness check returned error, restarting",
|
"Service {:?} readiness check returned error, restarting",
|
||||||
self.factories[srv.factory].name(Token(idx))
|
self.factories[srv.factory].name(idx)
|
||||||
);
|
);
|
||||||
srv.status = WorkerServiceStatus::Failed;
|
srv.status = WorkerServiceStatus::Failed;
|
||||||
return Err((Token(idx), srv.factory));
|
return Err((idx, srv.factory));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -359,8 +414,8 @@ enum WorkerState {
|
||||||
|
|
||||||
struct Restart {
|
struct Restart {
|
||||||
factory_id: usize,
|
factory_id: usize,
|
||||||
token: Token,
|
token: usize,
|
||||||
fut: LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
|
fut: LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Shutdown keep states necessary for server shutdown:
|
// Shutdown keep states necessary for server shutdown:
|
||||||
|
@ -381,10 +436,6 @@ impl Default for WorkerState {
|
||||||
|
|
||||||
impl Drop for ServerWorker {
|
impl Drop for ServerWorker {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// Set availability to true so if accept try to send connection to this worker
|
|
||||||
// it would find worker is gone and remove it.
|
|
||||||
// This is helpful when worker is dropped unexpected.
|
|
||||||
self.availability.set(true);
|
|
||||||
// Stop the Arbiter ServerWorker runs on on drop.
|
// Stop the Arbiter ServerWorker runs on on drop.
|
||||||
Arbiter::current().stop();
|
Arbiter::current().stop();
|
||||||
}
|
}
|
||||||
|
@ -399,8 +450,7 @@ impl Future for ServerWorker {
|
||||||
// `StopWorker` message handler
|
// `StopWorker` message handler
|
||||||
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
|
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
|
||||||
{
|
{
|
||||||
this.availability.set(false);
|
let num = this.counter.total();
|
||||||
let num = this.conns.total();
|
|
||||||
if num == 0 {
|
if num == 0 {
|
||||||
info!("Shutting down worker, 0 connections");
|
info!("Shutting down worker, 0 connections");
|
||||||
let _ = tx.send(true);
|
let _ = tx.send(true);
|
||||||
|
@ -427,7 +477,6 @@ impl Future for ServerWorker {
|
||||||
WorkerState::Unavailable => match this.check_readiness(cx) {
|
WorkerState::Unavailable => match this.check_readiness(cx) {
|
||||||
Ok(true) => {
|
Ok(true) => {
|
||||||
this.state = WorkerState::Available;
|
this.state = WorkerState::Available;
|
||||||
this.availability.set(true);
|
|
||||||
self.poll(cx)
|
self.poll(cx)
|
||||||
}
|
}
|
||||||
Ok(false) => Poll::Pending,
|
Ok(false) => Poll::Pending,
|
||||||
|
@ -440,26 +489,22 @@ impl Future for ServerWorker {
|
||||||
let factory_id = restart.factory_id;
|
let factory_id = restart.factory_id;
|
||||||
let token = restart.token;
|
let token = restart.token;
|
||||||
|
|
||||||
let service = ready!(restart.fut.as_mut().poll(cx))
|
let (token_new, service) = ready!(restart.fut.as_mut().poll(cx))
|
||||||
.unwrap_or_else(|_| {
|
.unwrap_or_else(|_| {
|
||||||
panic!(
|
panic!(
|
||||||
"Can not restart {:?} service",
|
"Can not restart {:?} service",
|
||||||
this.factories[factory_id].name(token)
|
this.factories[factory_id].name(token)
|
||||||
)
|
)
|
||||||
})
|
});
|
||||||
.into_iter()
|
|
||||||
// Find the same token from vector. There should be only one
|
assert_eq!(token, token_new);
|
||||||
// So the first match would be enough.
|
|
||||||
.find(|(t, _)| *t == token)
|
|
||||||
.map(|(_, service)| service)
|
|
||||||
.expect("No BoxedServerService found");
|
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
"Service {:?} has been restarted",
|
"Service {:?} has been restarted",
|
||||||
this.factories[factory_id].name(token)
|
this.factories[factory_id].name(token)
|
||||||
);
|
);
|
||||||
|
|
||||||
this.services[token.0].created(service);
|
this.services[token].created(service);
|
||||||
this.state = WorkerState::Unavailable;
|
this.state = WorkerState::Unavailable;
|
||||||
|
|
||||||
self.poll(cx)
|
self.poll(cx)
|
||||||
|
@ -468,7 +513,7 @@ impl Future for ServerWorker {
|
||||||
// Wait for 1 second.
|
// Wait for 1 second.
|
||||||
ready!(shutdown.timer.as_mut().poll(cx));
|
ready!(shutdown.timer.as_mut().poll(cx));
|
||||||
|
|
||||||
if this.conns.total() == 0 {
|
if this.counter.total() == 0 {
|
||||||
// Graceful shutdown.
|
// Graceful shutdown.
|
||||||
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
|
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
|
||||||
let _ = shutdown.tx.send(true);
|
let _ = shutdown.tx.send(true);
|
||||||
|
@ -493,22 +538,20 @@ impl Future for ServerWorker {
|
||||||
Ok(true) => {}
|
Ok(true) => {}
|
||||||
Ok(false) => {
|
Ok(false) => {
|
||||||
trace!("Worker is unavailable");
|
trace!("Worker is unavailable");
|
||||||
this.availability.set(false);
|
|
||||||
this.state = WorkerState::Unavailable;
|
this.state = WorkerState::Unavailable;
|
||||||
return self.poll(cx);
|
return self.poll(cx);
|
||||||
}
|
}
|
||||||
Err((token, idx)) => {
|
Err((token, idx)) => {
|
||||||
this.restart_service(token, idx);
|
this.restart_service(token, idx);
|
||||||
this.availability.set(false);
|
|
||||||
return self.poll(cx);
|
return self.poll(cx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
|
|
||||||
// handle incoming io stream
|
// handle incoming io stream
|
||||||
|
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
|
||||||
Some(msg) => {
|
Some(msg) => {
|
||||||
let guard = this.conns.get();
|
let guard = this.counter.guard();
|
||||||
let _ = this.services[msg.token.0].service.call((guard, msg.io));
|
let _ = this.services[msg.token].service.call((guard, msg.io));
|
||||||
}
|
}
|
||||||
None => return Poll::Ready(()),
|
None => return Poll::Ready(()),
|
||||||
};
|
};
|
||||||
|
|
|
@ -142,57 +142,6 @@ fn test_start() {
|
||||||
let _ = h.join();
|
let _ = h.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_configure() {
|
|
||||||
let addr1 = unused_addr();
|
|
||||||
let addr2 = unused_addr();
|
|
||||||
let addr3 = unused_addr();
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
let num = Arc::new(AtomicUsize::new(0));
|
|
||||||
let num2 = num.clone();
|
|
||||||
|
|
||||||
let h = thread::spawn(move || {
|
|
||||||
let num = num2.clone();
|
|
||||||
let sys = actix_rt::System::new();
|
|
||||||
let srv = sys.block_on(lazy(|_| {
|
|
||||||
Server::build()
|
|
||||||
.disable_signals()
|
|
||||||
.configure(move |cfg| {
|
|
||||||
let num = num.clone();
|
|
||||||
let lst = net::TcpListener::bind(addr3).unwrap();
|
|
||||||
cfg.bind("addr1", addr1)
|
|
||||||
.unwrap()
|
|
||||||
.bind("addr2", addr2)
|
|
||||||
.unwrap()
|
|
||||||
.listen("addr3", lst)
|
|
||||||
.apply(move |rt| {
|
|
||||||
let num = num.clone();
|
|
||||||
rt.service("addr1", fn_service(|_| ok::<_, ()>(())));
|
|
||||||
rt.service("addr3", fn_service(|_| ok::<_, ()>(())));
|
|
||||||
rt.on_start(lazy(move |_| {
|
|
||||||
let _ = num.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}))
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.unwrap()
|
|
||||||
.workers(1)
|
|
||||||
.run()
|
|
||||||
}));
|
|
||||||
|
|
||||||
let _ = tx.send((srv, actix_rt::System::current()));
|
|
||||||
let _ = sys.run();
|
|
||||||
});
|
|
||||||
let (_, sys) = rx.recv().unwrap();
|
|
||||||
thread::sleep(Duration::from_millis(500));
|
|
||||||
|
|
||||||
assert!(net::TcpStream::connect(addr1).is_ok());
|
|
||||||
assert!(net::TcpStream::connect(addr2).is_ok());
|
|
||||||
assert!(net::TcpStream::connect(addr3).is_ok());
|
|
||||||
assert_eq!(num.load(Ordering::Relaxed), 1);
|
|
||||||
sys.stop();
|
|
||||||
let _ = h.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn test_max_concurrent_connections() {
|
async fn test_max_concurrent_connections() {
|
||||||
// Note:
|
// Note:
|
||||||
|
@ -305,81 +254,6 @@ async fn test_service_restart() {
|
||||||
let num_clone = num.clone();
|
let num_clone = num.clone();
|
||||||
let num2_clone = num2.clone();
|
let num2_clone = num2.clone();
|
||||||
|
|
||||||
let h = thread::spawn(move || {
|
|
||||||
actix_rt::System::new().block_on(async {
|
|
||||||
let server = Server::build()
|
|
||||||
.backlog(1)
|
|
||||||
.disable_signals()
|
|
||||||
.configure(move |cfg| {
|
|
||||||
let num = num.clone();
|
|
||||||
let num2 = num2.clone();
|
|
||||||
cfg.bind("addr1", addr1)
|
|
||||||
.unwrap()
|
|
||||||
.bind("addr2", addr2)
|
|
||||||
.unwrap()
|
|
||||||
.apply(move |rt| {
|
|
||||||
let num = num.clone();
|
|
||||||
let num2 = num2.clone();
|
|
||||||
rt.service(
|
|
||||||
"addr1",
|
|
||||||
fn_factory(move || {
|
|
||||||
let num = num.clone();
|
|
||||||
async move { Ok::<_, ()>(TestService(num)) }
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
rt.service(
|
|
||||||
"addr2",
|
|
||||||
fn_factory(move || {
|
|
||||||
let num2 = num2.clone();
|
|
||||||
async move { Ok::<_, ()>(TestService(num2)) }
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.unwrap()
|
|
||||||
.workers(1)
|
|
||||||
.run();
|
|
||||||
|
|
||||||
let _ = tx.send((server.clone(), actix_rt::System::current()));
|
|
||||||
server.await
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
let (server, sys) = rx.recv().unwrap();
|
|
||||||
|
|
||||||
for _ in 0..5 {
|
|
||||||
TcpStream::connect(addr1)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.shutdown()
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
TcpStream::connect(addr2)
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.shutdown()
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
sleep(Duration::from_secs(3)).await;
|
|
||||||
|
|
||||||
assert!(num_clone.load(Ordering::SeqCst) > 5);
|
|
||||||
assert!(num2_clone.load(Ordering::SeqCst) > 5);
|
|
||||||
|
|
||||||
sys.stop();
|
|
||||||
let _ = server.stop(false);
|
|
||||||
let _ = h.join().unwrap();
|
|
||||||
|
|
||||||
let addr1 = unused_addr();
|
|
||||||
let addr2 = unused_addr();
|
|
||||||
let (tx, rx) = mpsc::channel();
|
|
||||||
let num = Arc::new(AtomicUsize::new(0));
|
|
||||||
let num2 = Arc::new(AtomicUsize::new(0));
|
|
||||||
|
|
||||||
let num_clone = num.clone();
|
|
||||||
let num2_clone = num2.clone();
|
|
||||||
|
|
||||||
let h = thread::spawn(move || {
|
let h = thread::spawn(move || {
|
||||||
let num = num.clone();
|
let num = num.clone();
|
||||||
actix_rt::System::new().block_on(async {
|
actix_rt::System::new().block_on(async {
|
||||||
|
|
|
@ -41,7 +41,7 @@ where
|
||||||
///
|
///
|
||||||
/// actix_service::forward_ready!(service);
|
/// actix_service::forward_ready!(service);
|
||||||
///
|
///
|
||||||
/// fn call(&self, req: S::Request) -> Self::Future {
|
/// fn call(&self, req: Req) -> Self::Future {
|
||||||
/// TimeoutServiceResponse {
|
/// TimeoutServiceResponse {
|
||||||
/// fut: self.service.call(req),
|
/// fut: self.service.call(req),
|
||||||
/// sleep: Sleep::new(clock::now() + self.timeout),
|
/// sleep: Sleep::new(clock::now() + self.timeout),
|
||||||
|
|
|
@ -64,7 +64,7 @@ tokio-native-tls = { version = "0.3", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.2.0"
|
actix-rt = "2.2.0"
|
||||||
actix-server = "2.0.0-beta.3"
|
actix-server = "2.0.0-beta.5"
|
||||||
bytes = "1"
|
bytes = "1"
|
||||||
env_logger = "0.8"
|
env_logger = "0.8"
|
||||||
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
|
||||||
|
|
Loading…
Reference in New Issue