Remove direct mio usage from actix-server

This commit is contained in:
fakeshadow 2021-04-21 19:00:59 +08:00
parent de149527ff
commit 0a1671387c
11 changed files with 367 additions and 777 deletions

View File

@ -27,7 +27,6 @@ actix-utils = "3.0.0"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4" log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13" num_cpus = "1.13"
tokio = { version = "1.2", features = ["sync"] } tokio = { version = "1.2", features = ["sync"] }

View File

@ -2,165 +2,45 @@ use std::time::Duration;
use std::{io, thread}; use std::{io, thread};
use actix_rt::{ use actix_rt::{
time::{sleep, Instant}, time::{sleep, Instant, Sleep},
System, System,
}; };
use log::{error, info}; use log::error;
use mio::{Interest, Poll, Token as MioToken}; use tokio::sync::mpsc::UnboundedReceiver;
use crate::server::Server; use crate::server::Server;
use crate::socket::{MioListener, SocketAddr}; use crate::socket::Listener;
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept}; use crate::worker::{Conn, WorkerHandleAccept};
struct ServerSocketInfo { struct ServerSocketInfo {
/// Address of socket. Mainly used for logging.
addr: SocketAddr,
token: usize, token: usize,
lst: Listener,
lst: MioListener,
/// Timeout is used to mark the deadline when this socket's listener should be registered again
/// after an error.
timeout: Option<Instant>,
}
/// Accept loop would live with `ServerBuilder`.
///
/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to
/// `Accept` and `Worker`.
///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
pub(crate) struct AcceptLoop {
srv: Option<Server>,
poll: Option<Poll>,
waker: WakerQueue,
}
impl AcceptLoop {
pub fn new(srv: Server) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
Self {
srv: Some(srv),
poll: Some(poll),
waker,
}
}
pub(crate) fn waker_owned(&self) -> WakerQueue {
self.waker.clone()
}
pub fn wake(&self, i: WakerInterest) {
self.waker.wake(i);
}
pub(crate) fn start(
&mut self,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
let poll = self.poll.take().unwrap();
let waker = self.waker.clone();
Accept::start(poll, waker, socks, srv, handles);
}
} }
/// poll instance of the server. /// poll instance of the server.
struct Accept { pub(crate) struct Accept {
poll: Poll,
waker: WakerQueue,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
sockets: Box<[ServerSocketInfo]>,
rx: UnboundedReceiver<Interest>,
srv: Server, srv: Server,
next: usize, next: usize,
avail: Availability, avail: Availability,
paused: bool, paused: bool,
timeout: Pin<Box<Sleep>>,
} }
/// Array of u128 with every bit as marker for a worker handle's availability. pub(crate) enum Interest {
struct Availability([u128; 4]); Pause,
Resume,
impl Default for Availability { Stop,
fn default() -> Self { WorkerIndex(usize),
Self([0; 4]) Worker(WorkerHandleAccept),
}
}
impl Availability {
/// Check if any worker handle is available
#[inline(always)]
fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
}
/// Check if worker handle is available by index
#[inline(always)]
fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index.
fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = Self::offset(idx);
let off = 1 << idx as u128;
if avail {
self.0[offset] |= off;
} else {
self.0[offset] &= !off
}
}
/// Set all worker handle to available state.
/// This would result in a re-check on all workers' availability.
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| {
self.set_available(handle.idx(), true);
})
}
/// Get offset and adjusted index of given worker handle index.
fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
}
}
}
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
} }
impl Accept { impl Accept {
pub(crate) fn start( pub(crate) fn start(
poll: Poll, socks: Vec<(usize, Listener)>,
waker: WakerQueue, rx: UnboundedReceiver<Interest>,
socks: Vec<(usize, MioListener)>,
srv: Server, srv: Server,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
) { ) {
@ -171,38 +51,28 @@ impl Accept {
.name("actix-server accept loop".to_owned()) .name("actix-server accept loop".to_owned())
.spawn(move || { .spawn(move || {
System::set_current(sys); System::set_current(sys);
let (mut accept, mut sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv);
accept.poll_with(&mut sockets); tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let accept = Self::new(socks, rx, srv, handles);
accept.await
});
}) })
.unwrap(); .unwrap();
} }
fn new_with_sockets( fn new(
poll: Poll, socks: Vec<(usize, Listener)>,
waker: WakerQueue, rx: UnboundedReceiver<Interest>,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
srv: Server, srv: Server,
) -> (Accept, Vec<ServerSocketInfo>) { handles: Vec<WorkerHandleAccept>,
) -> Self {
let sockets = socks let sockets = socks
.into_iter() .into_iter()
.map(|(token, mut lst)| { .map(|(token, lst)| ServerSocketInfo { token, lst })
let addr = lst.local_addr();
// Start listening for incoming connections
poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
ServerSocketInfo {
addr,
token,
lst,
timeout: None,
}
})
.collect(); .collect();
let mut avail = Availability::default(); let mut avail = Availability::default();
@ -210,197 +80,16 @@ impl Accept {
// Assume all handles are avail at construct time. // Assume all handles are avail at construct time.
avail.set_available_all(&handles); avail.set_available_all(&handles);
let accept = Accept { Accept {
poll,
waker,
handles, handles,
sockets,
rx,
srv, srv,
next: 0, next: 0,
avail, avail,
paused: false, paused: false,
}; timeout: Box::pin(sleep(Duration::from_millis(500))),
(accept, sockets)
} }
fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
let mut events = mio::Events::with_capacity(128);
loop {
if let Err(e) = self.poll.poll(&mut events, None) {
match e.kind() {
io::ErrorKind::Interrupted => continue,
_ => panic!("Poll error: {}", e),
}
}
for event in events.iter() {
let token = event.token();
match token {
WAKER_TOKEN => {
let exit = self.handle_waker(sockets);
if exit {
info!("Accept is stopped.");
return;
}
}
_ => {
let token = usize::from(token);
self.accept(sockets, token);
}
}
}
}
}
fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool {
// This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue.
loop {
// take guard with every iteration so no new interest can be added
// until the current task is done.
let mut guard = self.waker.guard();
match guard.pop_front() {
// worker notify it becomes available.
Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard);
self.avail.set_available(idx, true);
if !self.paused {
self.accept_all(sockets);
}
}
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
self.avail.set_available(handle.idx(), true);
self.handles.push(handle);
if !self.paused {
self.accept_all(sockets);
}
}
// got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(sockets)
}
Some(WakerInterest::Pause) => {
drop(guard);
self.paused = true;
self.deregister_all(sockets);
}
Some(WakerInterest::Resume) => {
drop(guard);
self.paused = false;
sockets.iter_mut().for_each(|info| {
self.register_logged(info);
});
self.accept_all(sockets);
}
Some(WakerInterest::Stop) => {
self.deregister_all(sockets);
return true;
}
// waker queue is drained
None => {
// Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard);
return false;
}
}
}
}
fn process_timer(&self, sockets: &mut [ServerSocketInfo]) {
let now = Instant::now();
sockets
.iter_mut()
// Only sockets that had an associated timeout were deregistered.
.filter(|info| info.timeout.is_some())
.for_each(|info| {
let inst = info.timeout.take().unwrap();
if now < inst {
info.timeout = Some(inst);
} else if !self.paused {
self.register_logged(info);
}
// Drop the timeout if server is paused and socket timeout is expired.
// When server recovers from pause it will register all sockets without
// a timeout value so this socket register will be delayed till then.
});
}
#[cfg(not(target_os = "windows"))]
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
let token = MioToken(info.token);
self.poll
.registry()
.register(&mut info.lst, token, Interest::READABLE)
}
#[cfg(target_os = "windows")]
fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
// On windows, calling register without deregister cause an error.
// See https://github.com/actix/actix-web/issues/905
// Calling reregister seems to fix the issue.
let token = MioToken(info.token);
self.poll
.registry()
.register(&mut info.lst, token, Interest::READABLE)
.or_else(|_| {
self.poll
.registry()
.reregister(&mut info.lst, token, Interest::READABLE)
})
}
fn register_logged(&self, info: &mut ServerSocketInfo) {
match self.register(info) {
Ok(_) => info!("Resume accepting connections on {}", info.addr),
Err(e) => error!("Can not register server socket {}", e),
}
}
fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.poll.registry().deregister(&mut info.lst) {
Ok(_) => info!("Paused accepting connections on {}", info.addr),
Err(e) => {
error!("Can not deregister server socket {}", e)
}
}
}
fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
// This is a best effort implementation with following limitation:
//
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout
// is removed in the process.
//
// Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short
// gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered
// before expected timing.
sockets
.iter_mut()
// Take all timeout.
// This is to prevent Accept::process_timer method re-register a socket afterwards.
.map(|info| (info.timeout.take(), info))
// Socket info with a timeout is already deregistered so skip them.
.filter(|(timeout, _)| timeout.is_none())
.for_each(|(_, info)| self.deregister_logged(info));
} }
// Send connection to worker and handle error. // Send connection to worker and handle error.
@ -460,50 +149,6 @@ impl Accept {
} }
} }
fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {
while self.avail.available() {
let info = &mut sockets[token];
match info.lst.accept() {
Ok(io) => {
let conn = Conn { io, token };
self.accept_one(conn);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue,
Err(e) => {
error!("Error accepting connection: {}", e);
// deregister listener temporary
self.deregister_logged(info);
// sleep after error. write the timeout to socket info as later
// the poll would need it mark which socket and when it's
// listener should be registered
info.timeout = Some(Instant::now() + Duration::from_millis(500));
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone();
System::current().arbiter().spawn(async move {
sleep(Duration::from_millis(510)).await;
waker.wake(WakerInterest::Timer);
});
return;
}
};
}
}
fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) {
sockets
.iter_mut()
.map(|info| info.token)
.collect::<Vec<_>>()
.into_iter()
.for_each(|idx| self.accept(sockets, idx))
}
#[inline(always)] #[inline(always)]
fn next(&self) -> &WorkerHandleAccept { fn next(&self) -> &WorkerHandleAccept {
&self.handles[self.next] &self.handles[self.next]
@ -526,6 +171,149 @@ impl Accept {
} }
} }
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
impl Future for Accept {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Poll::Ready(Some(interest)) = this.rx.poll_recv(cx) {
match interest {
Interest::WorkerIndex(idx) => {
this.avail.set_available(idx, true);
}
Interest::Worker(handle) => {
this.avail.set_available(handle.idx(), true);
this.handles.push(handle);
}
Interest::Pause => {
this.paused = true;
break;
}
Interest::Resume => this.paused = false,
Interest::Stop => return Poll::Ready(()),
}
}
if this.paused {
return Poll::Pending;
}
let len = this.sockets.len();
let mut idx = 0;
while idx < len {
'socket: loop {
if !this.avail.available() {
return Poll::Pending;
}
let socket = &mut this.sockets[idx];
match socket.lst.poll_accept(cx) {
Poll::Ready(Ok(io)) => {
let conn = Conn {
io,
token: socket.token,
};
this.accept_one(conn);
}
Poll::Ready(Err(ref e)) if connection_error(e) => continue 'socket,
Poll::Ready(Err(ref e)) => {
error!("Error accepting connection: {}", e);
let deadline = Instant::now() + Duration::from_millis(500);
this.timeout.as_mut().reset(deadline);
let _ = this.timeout.as_mut().poll(cx);
break 'socket;
}
Poll::Pending => break 'socket,
};
}
idx += 1;
}
Poll::Pending
}
}
/// This function defines errors that are per-connection. Which basically
/// means that if we get this error from `accept()` system call it means
/// next connection might be ready to be accepted.
///
/// All other errors will incur a timeout before next `accept()` is performed.
/// The timeout is useful to handle resource exhaustion errors like ENFILE
/// and EMFILE. Otherwise, could enter into tight loop.
fn connection_error(e: &io::Error) -> bool {
e.kind() == io::ErrorKind::ConnectionRefused
|| e.kind() == io::ErrorKind::ConnectionAborted
|| e.kind() == io::ErrorKind::ConnectionReset
}
/// Array of u128 with every bit as marker for a worker handle's availability.
struct Availability([u128; 4]);
impl Default for Availability {
fn default() -> Self {
Self([0; 4])
}
}
impl Availability {
/// Check if any worker handle is available
#[inline(always)]
fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0)
}
/// Check if worker handle is available by index
#[inline(always)]
fn get_available(&self, idx: usize) -> bool {
let (offset, idx) = Self::offset(idx);
self.0[offset] & (1 << idx as u128) != 0
}
/// Set worker handle available state by index.
fn set_available(&mut self, idx: usize, avail: bool) {
let (offset, idx) = Self::offset(idx);
let off = 1 << idx as u128;
if avail {
self.0[offset] |= off;
} else {
self.0[offset] &= !off
}
}
/// Set all worker handle to available state.
/// This would result in a re-check on all workers' availability.
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| {
self.set_available(handle.idx(), true);
})
}
/// Get offset and adjusted index of given worker handle index.
fn offset(idx: usize) -> (usize, usize) {
if idx < 128 {
(0, idx)
} else if idx < 128 * 2 {
(1, idx - 128)
} else if idx < 128 * 3 {
(2, idx - 128 * 2)
} else if idx < 128 * 4 {
(3, idx - 128 * 3)
} else {
panic!("Max WorkerHandle count is 512")
}
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::Availability; use super::Availability;

View File

@ -9,19 +9,18 @@ use std::{
use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use log::{error, info}; use log::{error, info};
use tokio::sync::{ use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver}, mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
oneshot, oneshot,
}; };
use crate::accept::AcceptLoop; use crate::accept::{Accept, Interest};
use crate::config::{ConfiguredService, ServiceConfig}; use crate::config::{ConfiguredService, ServiceConfig};
use crate::join_all; use crate::join_all;
use crate::server::{Server, ServerCommand}; use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals}; use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{Listener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::socket::{TcpListener, TcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};
/// Server builder /// Server builder
@ -31,8 +30,8 @@ pub struct ServerBuilder {
backlog: u32, backlog: u32,
handles: Vec<(usize, WorkerHandleServer)>, handles: Vec<(usize, WorkerHandleServer)>,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(usize, String, MioListener)>, sockets: Vec<(usize, String, Listener)>,
accept: AcceptLoop, accept: Option<UnboundedSender<Interest>>,
exit: bool, exit: bool,
no_signals: bool, no_signals: bool,
cmd: UnboundedReceiver<ServerCommand>, cmd: UnboundedReceiver<ServerCommand>,
@ -59,7 +58,7 @@ impl ServerBuilder {
handles: Vec::new(), handles: Vec::new(),
services: Vec::new(), services: Vec::new(),
sockets: Vec::new(), sockets: Vec::new(),
accept: AcceptLoop::new(server.clone()), accept: None,
backlog: 2048, backlog: 2048,
exit: false, exit: false,
no_signals: false, no_signals: false,
@ -165,7 +164,7 @@ impl ServerBuilder {
for (name, lst) in cfg.services { for (name, lst) in cfg.services {
let token = self.next_token(); let token = self.next_token();
srv.stream(token, name.clone(), lst.local_addr()?); srv.stream(token, name.clone(), lst.local_addr()?);
self.sockets.push((token, name, MioListener::Tcp(lst))); self.sockets.push((token, name, Listener::Tcp(lst)));
} }
self.services.push(Box::new(srv)); self.services.push(Box::new(srv));
} }
@ -191,7 +190,7 @@ impl ServerBuilder {
lst.local_addr()?, lst.local_addr()?,
)); ));
self.sockets self.sockets
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); .push((token, name.as_ref().to_string(), Listener::Tcp(lst)));
} }
Ok(self) Ok(self)
} }
@ -241,7 +240,7 @@ impl ServerBuilder {
addr, addr,
)); ));
self.sockets self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst))); .push((token, name.as_ref().to_string(), Listener::from(lst)));
Ok(self) Ok(self)
} }
@ -267,7 +266,7 @@ impl ServerBuilder {
)); ));
self.sockets self.sockets
.push((token, name.as_ref().to_string(), MioListener::from(lst))); .push((token, name.as_ref().to_string(), Listener::from(lst)));
Ok(self) Ok(self)
} }
@ -279,11 +278,14 @@ impl ServerBuilder {
} else { } else {
info!("Starting {} workers", self.threads); info!("Starting {} workers", self.threads);
let (tx, rx) = unbounded_channel();
self.accept = Some(tx);
// start workers // start workers
let handles = (0..self.threads) let handles = (0..self.threads)
.map(|idx| { .map(|idx| {
let (handle_accept, handle_server) = let (handle_accept, handle_server) = self.start_worker(idx);
self.start_worker(idx, self.accept.waker_owned());
self.handles.push((idx, handle_server)); self.handles.push((idx, handle_server));
handle_accept handle_accept
@ -294,13 +296,13 @@ impl ServerBuilder {
for sock in &self.sockets { for sock in &self.sockets {
info!("Starting \"{}\" service on {}", sock.1, sock.2); info!("Starting \"{}\" service on {}", sock.1, sock.2);
} }
self.accept.start(
mem::take(&mut self.sockets) let sockets = mem::take(&mut self.sockets)
.into_iter() .into_iter()
.map(|t| (t.0, t.2)) .map(|t| (t.0, t.2))
.collect(), .collect();
handles,
); Accept::start(sockets, rx, self.server.clone(), handles);
// handle signals // handle signals
if !self.no_signals { if !self.no_signals {
@ -314,24 +316,20 @@ impl ServerBuilder {
} }
} }
fn start_worker( fn start_worker(&self, idx: usize) -> (WorkerHandleAccept, WorkerHandleServer) {
&self,
idx: usize,
waker_queue: WakerQueue,
) -> (WorkerHandleAccept, WorkerHandleServer) {
let services = self.services.iter().map(|v| v.clone_factory()).collect(); let services = self.services.iter().map(|v| v.clone_factory()).collect();
let accept_tx = self.accept.as_ref().cloned().unwrap();
ServerWorker::start(idx, services, waker_queue, self.worker_config) ServerWorker::start(idx, services, accept_tx, self.worker_config)
} }
fn handle_cmd(&mut self, item: ServerCommand) { fn handle_cmd(&mut self, item: ServerCommand) {
match item { match item {
ServerCommand::Pause(tx) => { ServerCommand::Pause(tx) => {
self.accept.wake(WakerInterest::Pause); let _ = self.accept.as_ref().unwrap().send(Interest::Pause);
let _ = tx.send(()); let _ = tx.send(());
} }
ServerCommand::Resume(tx) => { ServerCommand::Resume(tx) => {
self.accept.wake(WakerInterest::Resume); let _ = self.accept.as_ref().unwrap().send(Interest::Resume);
let _ = tx.send(()); let _ = tx.send(());
} }
ServerCommand::Signal(sig) => { ServerCommand::Signal(sig) => {
@ -375,7 +373,7 @@ impl ServerBuilder {
let exit = self.exit; let exit = self.exit;
// stop accept thread // stop accept thread
self.accept.wake(WakerInterest::Stop); let _ = self.accept.as_ref().unwrap().send(Interest::Stop);
let notify = std::mem::take(&mut self.notify); let notify = std::mem::take(&mut self.notify);
// stop workers // stop workers
@ -427,10 +425,13 @@ impl ServerBuilder {
break; break;
} }
let (handle_accept, handle_server) = let (handle_accept, handle_server) = self.start_worker(new_idx);
self.start_worker(new_idx, self.accept.waker_owned());
self.handles.push((new_idx, handle_server)); self.handles.push((new_idx, handle_server));
self.accept.wake(WakerInterest::Worker(handle_accept)); let _ = self
.accept
.as_ref()
.unwrap()
.send(Interest::Worker(handle_accept));
} }
} }
} }
@ -459,7 +460,7 @@ impl Future for ServerBuilder {
pub(super) fn bind_addr<S: ToSocketAddrs>( pub(super) fn bind_addr<S: ToSocketAddrs>(
addr: S, addr: S,
backlog: u32, backlog: u32,
) -> io::Result<Vec<MioTcpListener>> { ) -> io::Result<Vec<TcpListener>> {
let mut err = None; let mut err = None;
let mut succ = false; let mut succ = false;
let mut sockets = Vec::new(); let mut sockets = Vec::new();
@ -487,10 +488,10 @@ pub(super) fn bind_addr<S: ToSocketAddrs>(
} }
} }
fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result<MioTcpListener> { fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result<TcpListener> {
let socket = match addr { let socket = match addr {
StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?, StdSocketAddr::V4(_) => TcpSocket::new_v4()?,
StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?, StdSocketAddr::V6(_) => TcpSocket::new_v6()?,
}; };
socket.set_reuseaddr(true)?; socket.set_reuseaddr(true)?;

View File

@ -14,12 +14,12 @@ use log::error;
use crate::{ use crate::{
builder::bind_addr, builder::bind_addr,
service::{BoxedServerService, InternalServiceFactory, StreamService}, service::{BoxedServerService, InternalServiceFactory, StreamService},
socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}, socket::{StdSocketAddr, StdTcpListener, Stream, TcpListener, ToSocketAddrs},
worker::WorkerCounterGuard, worker::WorkerCounterGuard,
}; };
pub struct ServiceConfig { pub struct ServiceConfig {
pub(crate) services: Vec<(String, MioTcpListener)>, pub(crate) services: Vec<(String, TcpListener)>,
pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>, pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
pub(crate) threads: usize, pub(crate) threads: usize,
pub(crate) backlog: u32, pub(crate) backlog: u32,
@ -59,7 +59,8 @@ impl ServiceConfig {
/// Add new service to server /// Add new service to server
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: StdTcpListener) -> &mut Self { pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: StdTcpListener) -> &mut Self {
self._listen(name, MioTcpListener::from_std(lst)) // TODO: Handle unwrap
self._listen(name, TcpListener::from_std(lst).unwrap())
} }
/// Register service configuration function. This function get called /// Register service configuration function. This function get called
@ -72,7 +73,7 @@ impl ServiceConfig {
Ok(()) Ok(())
} }
fn _listen<N: AsRef<str>>(&mut self, name: N, lst: MioTcpListener) -> &mut Self { fn _listen<N: AsRef<str>>(&mut self, name: N, lst: TcpListener) -> &mut Self {
if self.apply.is_none() { if self.apply.is_none() {
self.apply = Some(Box::new(not_configured)); self.apply = Some(Box::new(not_configured));
} }
@ -245,7 +246,7 @@ impl ServiceRuntime {
type BoxedNewService = Box< type BoxedNewService = Box<
dyn BaseServiceFactory< dyn BaseServiceFactory<
(WorkerCounterGuard, MioStream), (WorkerCounterGuard, Stream),
Response = (), Response = (),
Error = (), Error = (),
InitError = (), InitError = (),
@ -259,7 +260,7 @@ struct ServiceFactory<T> {
inner: T, inner: T,
} }
impl<T> BaseServiceFactory<(WorkerCounterGuard, MioStream)> for ServiceFactory<T> impl<T> BaseServiceFactory<(WorkerCounterGuard, Stream)> for ServiceFactory<T>
where where
T: BaseServiceFactory<TcpStream, Config = ()>, T: BaseServiceFactory<TcpStream, Config = ()>,
T::Future: 'static, T::Future: 'static,

View File

@ -12,7 +12,6 @@ mod service;
mod signals; mod signals;
mod socket; mod socket;
mod test_server; mod test_server;
mod waker_queue;
mod worker; mod worker;
pub use self::builder::ServerBuilder; pub use self::builder::ServerBuilder;

View File

@ -7,7 +7,7 @@ use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture; use futures_core::future::LocalBoxFuture;
use log::error; use log::error;
use crate::socket::{FromStream, MioStream}; use crate::socket::{FromStream, Stream};
use crate::worker::WorkerCounterGuard; use crate::worker::WorkerCounterGuard;
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
@ -26,7 +26,7 @@ pub(crate) trait InternalServiceFactory: Send {
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
dyn Service< dyn Service<
(WorkerCounterGuard, MioStream), (WorkerCounterGuard, Stream),
Response = (), Response = (),
Error = (), Error = (),
Future = Ready<Result<(), ()>>, Future = Ready<Result<(), ()>>,
@ -47,7 +47,7 @@ impl<S, I> StreamService<S, I> {
} }
} }
impl<S, I> Service<(WorkerCounterGuard, MioStream)> for StreamService<S, I> impl<S, I> Service<(WorkerCounterGuard, Stream)> for StreamService<S, I>
where where
S: Service<I>, S: Service<I>,
S::Future: 'static, S::Future: 'static,
@ -62,7 +62,7 @@ where
self.service.poll_ready(ctx).map_err(|_| ()) self.service.poll_ready(ctx).map_err(|_| ())
} }
fn call(&self, (guard, req): (WorkerCounterGuard, MioStream)) -> Self::Future { fn call(&self, (guard, req): (WorkerCounterGuard, Stream)) -> Self::Future {
ready(match FromStream::from_mio(req) { ready(match FromStream::from_mio(req) {
Ok(stream) => { Ok(stream) => {
let f = self.service.call(stream); let f = self.service.call(stream);

View File

@ -2,157 +2,85 @@ pub(crate) use std::net::{
SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs, SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs,
}; };
pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket}; pub(crate) use actix_rt::net::{TcpListener, TcpSocket};
#[cfg(unix)] #[cfg(unix)]
pub(crate) use { pub(crate) use {
mio::net::UnixListener as MioUnixListener, actix_rt::net::UnixListener, std::os::unix::net::UnixListener as StdUnixListener,
std::os::unix::net::UnixListener as StdUnixListener,
}; };
use std::{fmt, io}; use std::{
fmt, io,
task::{Context, Poll},
};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use mio::{event::Source, Interest, Registry, Token};
pub(crate) enum MioListener { pub(crate) enum Listener {
Tcp(MioTcpListener), Tcp(TcpListener),
#[cfg(unix)] #[cfg(unix)]
Uds(MioUnixListener), Uds(tokio::net::UnixListener),
} }
impl MioListener { impl Listener {
pub(crate) fn local_addr(&self) -> SocketAddr { pub(crate) fn poll_accept(&self, cx: &mut Context<'_>) -> Poll<io::Result<Stream>> {
match *self { match *self {
MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), Self::Tcp(ref lst) => lst
.poll_accept(cx)
.map_ok(|(stream, _)| Stream::Tcp(stream)),
#[cfg(unix)] #[cfg(unix)]
MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()), Self::Uds(ref lst) => lst
} .poll_accept(cx)
} .map_ok(|(stream, _)| Stream::Uds(stream)),
pub(crate) fn accept(&self) -> io::Result<MioStream> {
match *self {
MioListener::Tcp(ref lst) => lst.accept().map(|(stream, _)| MioStream::Tcp(stream)),
#[cfg(unix)]
MioListener::Uds(ref lst) => lst.accept().map(|(stream, _)| MioStream::Uds(stream)),
} }
} }
} }
impl Source for MioListener { // TODO: use TryFrom
fn register( impl From<StdTcpListener> for Listener {
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
match *self {
MioListener::Tcp(ref mut lst) => lst.register(registry, token, interests),
#[cfg(unix)]
MioListener::Uds(ref mut lst) => lst.register(registry, token, interests),
}
}
fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: Interest,
) -> io::Result<()> {
match *self {
MioListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests),
#[cfg(unix)]
MioListener::Uds(ref mut lst) => lst.reregister(registry, token, interests),
}
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
match *self {
MioListener::Tcp(ref mut lst) => lst.deregister(registry),
#[cfg(unix)]
MioListener::Uds(ref mut lst) => {
let res = lst.deregister(registry);
// cleanup file path
if let Ok(addr) = lst.local_addr() {
if let Some(path) = addr.as_pathname() {
let _ = std::fs::remove_file(path);
}
}
res
}
}
}
}
impl From<StdTcpListener> for MioListener {
fn from(lst: StdTcpListener) -> Self { fn from(lst: StdTcpListener) -> Self {
MioListener::Tcp(MioTcpListener::from_std(lst)) let lst = TcpListener::from_std(lst).unwrap();
Listener::Tcp(lst)
} }
} }
#[cfg(unix)] #[cfg(unix)]
impl From<StdUnixListener> for MioListener { impl From<StdUnixListener> for Listener {
fn from(lst: StdUnixListener) -> Self { fn from(lst: StdUnixListener) -> Self {
MioListener::Uds(MioUnixListener::from_std(lst)) let lst = UnixListener::from_std(lst).unwrap();
Listener::Uds(lst)
} }
} }
impl fmt::Debug for MioListener { impl fmt::Debug for Listener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {
MioListener::Tcp(ref lst) => write!(f, "{:?}", lst), Listener::Tcp(ref lst) => write!(f, "{:?}", lst),
#[cfg(all(unix))] #[cfg(all(unix))]
MioListener::Uds(ref lst) => write!(f, "{:?}", lst), Listener::Uds(ref lst) => write!(f, "{:?}", lst),
} }
} }
} }
impl fmt::Display for MioListener { impl fmt::Display for Listener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {
MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), Listener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()),
#[cfg(unix)] #[cfg(unix)]
MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), Listener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()),
}
}
}
pub(crate) enum SocketAddr {
Tcp(StdSocketAddr),
#[cfg(unix)]
Uds(mio::net::SocketAddr),
}
impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
}
}
}
impl fmt::Debug for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
} }
} }
} }
#[derive(Debug)] #[derive(Debug)]
pub enum MioStream { pub enum Stream {
Tcp(mio::net::TcpStream), Tcp(actix_rt::net::TcpStream),
#[cfg(unix)] #[cfg(unix)]
Uds(mio::net::UnixStream), Uds(actix_rt::net::UnixStream),
} }
/// helper trait for converting mio stream to tokio stream. /// helper trait for converting mio stream to tokio stream.
pub trait FromStream: Sized { pub trait FromStream: Sized {
fn from_mio(sock: MioStream) -> io::Result<Self>; fn from_mio(stream: Stream) -> io::Result<Self>;
} }
#[cfg(windows)] #[cfg(windows)]
@ -163,13 +91,9 @@ mod win_impl {
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
impl FromStream for TcpStream { impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> { fn from_mio(stream: MioStream) -> io::Result<Self> {
match sock { match stream {
MioStream::Tcp(mio) => { MioStream::Tcp(stream) => Ok(stream),
let raw = IntoRawSocket::into_raw_socket(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) })
}
} }
} }
} }
@ -179,20 +103,12 @@ mod win_impl {
mod unix_impl { mod unix_impl {
use super::*; use super::*;
use std::os::unix::io::{FromRawFd, IntoRawFd};
use actix_rt::net::UnixStream;
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
impl FromStream for TcpStream { impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> { fn from_mio(stream: Stream) -> io::Result<Self> {
match sock { match stream {
MioStream::Tcp(mio) => { Stream::Tcp(stream) => Ok(stream),
let raw = IntoRawFd::into_raw_fd(mio); Stream::Uds(_) => {
// SAFETY: This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
MioStream::Uds(_) => {
panic!("Should not happen, bug in server impl"); panic!("Should not happen, bug in server impl");
} }
} }
@ -200,53 +116,49 @@ mod unix_impl {
} }
// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
impl FromStream for UnixStream { impl FromStream for actix_rt::net::UnixStream {
fn from_mio(sock: MioStream) -> io::Result<Self> { fn from_mio(stream: Stream) -> io::Result<Self> {
match sock { match stream {
MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), Stream::Tcp(_) => panic!("Should not happen, bug in server impl"),
MioStream::Uds(mio) => { Stream::Uds(stream) => Ok(stream),
let raw = IntoRawFd::into_raw_fd(mio);
// SAFETY: This is a in place conversion from mio stream to tokio stream.
UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
}
} }
} }
} }
} }
#[cfg(test)] // #[cfg(test)]
mod tests { // mod tests {
use super::*; // use super::*;
#[test] // #[test]
fn socket_addr() { // fn socket_addr() {
let addr = SocketAddr::Tcp("127.0.0.1:8080".parse().unwrap()); // let addr = SocketAddr::Tcp("127.0.0.1:8080".parse().unwrap());
assert!(format!("{:?}", addr).contains("127.0.0.1:8080")); // assert!(format!("{:?}", addr).contains("127.0.0.1:8080"));
assert_eq!(format!("{}", addr), "127.0.0.1:8080"); // assert_eq!(format!("{}", addr), "127.0.0.1:8080");
let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); // let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = MioTcpSocket::new_v4().unwrap(); // let socket = MioTcpSocket::new_v4().unwrap();
socket.set_reuseaddr(true).unwrap(); // socket.set_reuseaddr(true).unwrap();
socket.bind(addr).unwrap(); // socket.bind(addr).unwrap();
let tcp = socket.listen(128).unwrap(); // let tcp = socket.listen(128).unwrap();
let lst = MioListener::Tcp(tcp); // let lst = Listener::Tcp(tcp);
assert!(format!("{:?}", lst).contains("TcpListener")); // assert!(format!("{:?}", lst).contains("TcpListener"));
assert!(format!("{}", lst).contains("127.0.0.1")); // assert!(format!("{}", lst).contains("127.0.0.1"));
} // }
#[test] // #[test]
#[cfg(unix)] // #[cfg(unix)]
fn uds() { // fn uds() {
let _ = std::fs::remove_file("/tmp/sock.xxxxx"); // let _ = std::fs::remove_file("/tmp/sock.xxxxx");
if let Ok(socket) = MioUnixListener::bind("/tmp/sock.xxxxx") { // if let Ok(socket) = MioUnixListener::bind("/tmp/sock.xxxxx") {
let addr = socket.local_addr().expect("Couldn't get local address"); // let addr = socket.local_addr().expect("Couldn't get local address");
let a = SocketAddr::Uds(addr); // let a = SocketAddr::Uds(addr);
assert!(format!("{:?}", a).contains("/tmp/sock.xxxxx")); // assert!(format!("{:?}", a).contains("/tmp/sock.xxxxx"));
assert!(format!("{}", a).contains("/tmp/sock.xxxxx")); // assert!(format!("{}", a).contains("/tmp/sock.xxxxx"));
let lst = MioListener::Uds(socket); // let lst = Listener::Uds(socket);
assert!(format!("{:?}", lst).contains("/tmp/sock.xxxxx")); // assert!(format!("{:?}", lst).contains("/tmp/sock.xxxxx"));
assert!(format!("{}", lst).contains("/tmp/sock.xxxxx")); // assert!(format!("{}", lst).contains("/tmp/sock.xxxxx"));
} // }
} // }
} // }

View File

@ -102,7 +102,7 @@ impl TestServer {
/// Get first available unused local address /// Get first available unused local address
pub fn unused_addr() -> net::SocketAddr { pub fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = mio::net::TcpSocket::new_v4().unwrap(); let socket = actix_rt::net::TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap(); socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap(); socket.set_reuseaddr(true).unwrap();
let tcp = socket.listen(1024).unwrap(); let tcp = socket.listen(1024).unwrap();

View File

@ -1,89 +0,0 @@
use std::{
collections::VecDeque,
ops::Deref,
sync::{Arc, Mutex, MutexGuard},
};
use mio::{Registry, Token as MioToken, Waker};
use crate::worker::WorkerHandleAccept;
/// Waker token for `mio::Poll` instance.
pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
/// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest`
/// the `Poll` would want to look into.
pub(crate) struct WakerQueue(Arc<(Waker, Mutex<VecDeque<WakerInterest>>)>);
impl Clone for WakerQueue {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl Deref for WakerQueue {
type Target = (Waker, Mutex<VecDeque<WakerInterest>>);
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl WakerQueue {
/// Construct a waker queue with given `Poll`'s `Registry` and capacity.
///
/// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match
/// event's token for it to properly handle `WakerInterest`.
pub(crate) fn new(registry: &Registry) -> std::io::Result<Self> {
let waker = Waker::new(registry, WAKER_TOKEN)?;
let queue = Mutex::new(VecDeque::with_capacity(16));
Ok(Self(Arc::new((waker, queue))))
}
/// Push a new interest to the queue and wake up the accept poll afterwards.
pub(crate) fn wake(&self, interest: WakerInterest) {
let (waker, queue) = self.deref();
queue
.lock()
.expect("Failed to lock WakerQueue")
.push_back(interest);
waker
.wake()
.unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e));
}
/// Get a MutexGuard of the waker queue.
pub(crate) fn guard(&self) -> MutexGuard<'_, VecDeque<WakerInterest>> {
self.deref().1.lock().expect("Failed to lock WakerQueue")
}
/// Reset the waker queue so it does not grow infinitely.
pub(crate) fn reset(queue: &mut VecDeque<WakerInterest>) {
std::mem::swap(&mut VecDeque::<WakerInterest>::with_capacity(16), queue);
}
}
/// Types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
///
/// These interests should not be confused with `mio::Interest` and mostly not I/O related
pub(crate) enum WakerInterest {
/// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
/// available and can accept new tasks.
WorkerAvailable(usize),
/// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to
/// `ServerCommand` and notify `Accept` to do exactly these tasks.
Pause,
Resume,
Stop,
/// `Timer` is an interest sent as a delayed future. When an error happens on accepting
/// connection `Accept` would deregister socket listener temporary and wake up the poll and
/// register them again after the delayed future resolve.
Timer,
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerHandleAccept`.
Worker(WorkerHandleAccept),
}

View File

@ -23,10 +23,9 @@ use tokio::sync::{
oneshot, oneshot,
}; };
use crate::join_all;
use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream; use crate::socket::Stream;
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::{accept::Interest, join_all};
/// Stop worker message. Returns `true` on successful graceful shutdown. /// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute. /// and `false` if some connections still alive when shutdown execute.
@ -37,7 +36,7 @@ pub(crate) struct Stop {
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Conn { pub(crate) struct Conn {
pub io: MioStream, pub io: Stream,
pub token: usize, pub token: usize,
} }
@ -91,7 +90,7 @@ impl Counter {
pub(crate) struct WorkerCounter { pub(crate) struct WorkerCounter {
idx: usize, idx: usize,
inner: Rc<(WakerQueue, Counter)>, inner: Rc<(UnboundedSender<Interest>, Counter)>,
} }
impl Clone for WorkerCounter { impl Clone for WorkerCounter {
@ -104,10 +103,14 @@ impl Clone for WorkerCounter {
} }
impl WorkerCounter { impl WorkerCounter {
pub(crate) fn new(idx: usize, waker_queue: WakerQueue, counter: Counter) -> Self { pub(crate) fn new(
idx: usize,
accept_tx: UnboundedSender<Interest>,
counter: Counter,
) -> Self {
Self { Self {
idx, idx,
inner: Rc::new((waker_queue, counter)), inner: Rc::new((accept_tx, counter)),
} }
} }
@ -125,9 +128,9 @@ pub(crate) struct WorkerCounterGuard(WorkerCounter);
impl Drop for WorkerCounterGuard { impl Drop for WorkerCounterGuard {
fn drop(&mut self) { fn drop(&mut self) {
let (waker_queue, counter) = &*self.0.inner; let (accept_tx, counter) = &*self.0.inner;
if counter.derc() { if counter.derc() {
waker_queue.wake(WakerInterest::WorkerAvailable(self.0.idx)); let _ = accept_tx.send(Interest::WorkerIndex(self.0.idx));
} }
} }
} }
@ -251,7 +254,7 @@ impl ServerWorker {
pub(crate) fn start( pub(crate) fn start(
idx: usize, idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
waker_queue: WakerQueue, accept_tx: UnboundedSender<Interest>,
config: ServerWorkerConfig, config: ServerWorkerConfig,
) -> (WorkerHandleAccept, WorkerHandleServer) { ) -> (WorkerHandleAccept, WorkerHandleServer) {
let (tx1, rx) = unbounded_channel(); let (tx1, rx) = unbounded_channel();
@ -315,7 +318,7 @@ impl ServerWorker {
rx, rx,
rx2, rx2,
services, services,
counter: WorkerCounter::new(idx, waker_queue, counter_clone), counter: WorkerCounter::new(idx, accept_tx, counter_clone),
factories: factories.into_boxed_slice(), factories: factories.into_boxed_slice(),
state: Default::default(), state: Default::default(),
shutdown_timeout: config.shutdown_timeout, shutdown_timeout: config.shutdown_timeout,

View File

@ -10,65 +10,44 @@ use futures_util::future::lazy;
fn unused_addr() -> net::SocketAddr { fn unused_addr() -> net::SocketAddr {
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
let socket = mio::net::TcpSocket::new_v4().unwrap(); let socket = actix_rt::net::TcpSocket::new_v4().unwrap();
socket.bind(addr).unwrap(); socket.bind(addr).unwrap();
socket.set_reuseaddr(true).unwrap(); socket.set_reuseaddr(true).unwrap();
let tcp = socket.listen(32).unwrap(); let tcp = socket.listen(32).unwrap();
tcp.local_addr().unwrap() tcp.local_addr().unwrap()
} }
#[test] #[actix_rt::test]
fn test_bind() { async fn test_bind() {
let addr = unused_addr(); let addr = unused_addr();
let (tx, rx) = mpsc::channel(); let srv = Server::build()
let h = thread::spawn(move || {
let sys = actix_rt::System::new();
let srv = sys.block_on(lazy(|_| {
Server::build()
.workers(1) .workers(1)
.disable_signals() .disable_signals()
.bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) .bind("test", addr, move || fn_service(|_| ok::<_, ()>(())))
.unwrap() .unwrap()
.run() .run();
}));
let _ = tx.send((srv, actix_rt::System::current())); actix_rt::time::sleep(Duration::from_millis(500)).await;
let _ = sys.run();
});
let (_, sys) = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
sys.stop(); srv.stop(true).await;
let _ = h.join();
} }
#[test] #[actix_rt::test]
fn test_listen() { async fn test_listen() {
let addr = unused_addr(); let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || {
let sys = actix_rt::System::new();
let lst = net::TcpListener::bind(addr).unwrap(); let lst = net::TcpListener::bind(addr).unwrap();
sys.block_on(async { let server = Server::build()
Server::build()
.disable_signals() .disable_signals()
.workers(1) .workers(1)
.listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) .listen("test", lst, move || fn_service(|_| ok::<_, ()>(())))
.unwrap() .unwrap()
.run(); .run();
let _ = tx.send(actix_rt::System::current());
});
let _ = sys.run();
});
let sys = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500)); actix_rt::time::sleep(Duration::from_millis(500)).await;
assert!(net::TcpStream::connect(addr).is_ok()); assert!(net::TcpStream::connect(addr).is_ok());
sys.stop();
let _ = h.join(); server.stop(true).await;
} }
#[test] #[test]
@ -80,13 +59,13 @@ fn test_start() {
use bytes::Bytes; use bytes::Bytes;
use futures_util::sink::SinkExt; use futures_util::sink::SinkExt;
let addr = unused_addr();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let sys = actix_rt::System::new(); actix_rt::System::new().block_on(async {
let srv = sys.block_on(lazy(|_| { let addr = unused_addr();
Server::build()
let srv = Server::build()
.backlog(100) .backlog(100)
.disable_signals() .disable_signals()
.bind("test", addr, move || { .bind("test", addr, move || {
@ -97,14 +76,15 @@ fn test_start() {
}) })
}) })
.unwrap() .unwrap()
.run() .run();
}));
let _ = tx.send((srv, actix_rt::System::current())); let _ = tx.send((srv.clone(), addr));
let _ = sys.run();
let _ = srv.await;
});
}); });
let (srv, sys) = rx.recv().unwrap(); let (srv, addr) = rx.recv().unwrap();
let mut buf = [1u8; 4]; let mut buf = [1u8; 4];
let mut conn = net::TcpStream::connect(addr).unwrap(); let mut conn = net::TcpStream::connect(addr).unwrap();
@ -137,25 +117,22 @@ fn test_start() {
thread::sleep(Duration::from_millis(100)); thread::sleep(Duration::from_millis(100));
assert!(net::TcpStream::connect(addr).is_err()); assert!(net::TcpStream::connect(addr).is_err());
thread::sleep(Duration::from_millis(100));
sys.stop();
let _ = h.join(); let _ = h.join();
} }
#[test] #[test]
fn test_configure() { fn test_configure() {
let addr1 = unused_addr();
let addr2 = unused_addr();
let addr3 = unused_addr();
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
let num = Arc::new(AtomicUsize::new(0)); let num = Arc::new(AtomicUsize::new(0));
let num2 = num.clone(); let num2 = num.clone();
let h = thread::spawn(move || { let h = thread::spawn(move || {
let num = num2.clone(); let num = num2.clone();
let sys = actix_rt::System::new(); actix_rt::System::new().block_on(async {
let srv = sys.block_on(lazy(|_| { let addr1 = unused_addr();
Server::build() let addr2 = unused_addr();
let addr3 = unused_addr();
let srv = Server::build()
.disable_signals() .disable_signals()
.configure(move |cfg| { .configure(move |cfg| {
let num = num.clone(); let num = num.clone();
@ -176,20 +153,20 @@ fn test_configure() {
}) })
.unwrap() .unwrap()
.workers(1) .workers(1)
.run() .run();
})); let _ = tx.send((srv.clone(), addr1, addr2, addr3));
let _ = tx.send((srv, actix_rt::System::current())); let _ = srv.await;
let _ = sys.run();
}); });
let (_, sys) = rx.recv().unwrap(); });
let (srv, addr1, addr2, addr3) = rx.recv().unwrap();
thread::sleep(Duration::from_millis(500)); thread::sleep(Duration::from_millis(500));
assert!(net::TcpStream::connect(addr1).is_ok()); assert!(net::TcpStream::connect(addr1).is_ok());
assert!(net::TcpStream::connect(addr2).is_ok()); assert!(net::TcpStream::connect(addr2).is_ok());
assert!(net::TcpStream::connect(addr3).is_ok()); assert!(net::TcpStream::connect(addr3).is_ok());
assert_eq!(num.load(Ordering::Relaxed), 1); assert_eq!(num.load(Ordering::Relaxed), 1);
sys.stop(); let _ = srv.stop(false);
let _ = h.join(); let _ = h.join();
} }
@ -234,13 +211,13 @@ async fn test_max_concurrent_connections() {
})? })?
.run(); .run();
let _ = tx.send((server.clone(), actix_rt::System::current())); let _ = tx.send(server.clone());
server.await server.await
}) })
}); });
let (srv, sys) = rx.recv().unwrap(); let srv = rx.recv().unwrap();
let mut conns = vec![]; let mut conns = vec![];
@ -261,7 +238,6 @@ async fn test_max_concurrent_connections() {
srv.stop(false).await; srv.stop(false).await;
sys.stop();
let _ = h.join().unwrap(); let _ = h.join().unwrap();
} }