add more comments

This commit is contained in:
fakeshadow 2020-10-21 12:07:35 +08:00
parent 0df09dc81d
commit 8582aaaeb6
11 changed files with 150 additions and 125 deletions

View File

@ -1,6 +1,7 @@
use std::any::{Any, TypeId}; use std::any::{Any, TypeId};
use std::cell::{Cell, RefCell}; use std::cell::{Cell, RefCell};
use std::collections::HashMap; use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -9,7 +10,7 @@ use std::{fmt, thread};
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures_channel::oneshot::{channel, Canceled, Sender}; use futures_channel::oneshot::{channel, Canceled, Sender};
use futures_util::{ use futures_util::{
future::{self, Future, FutureExt}, future::{self, FutureExt},
stream::Stream, stream::Stream,
}; };

View File

@ -1,9 +1,10 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::future::Future;
use std::io; use std::io;
use futures_channel::mpsc::unbounded; use futures_channel::mpsc::unbounded;
use futures_channel::oneshot::{channel, Receiver}; use futures_channel::oneshot::{channel, Receiver};
use futures_util::future::{lazy, Future, FutureExt}; use futures_util::future::{lazy, FutureExt};
use tokio::task::LocalSet; use tokio::task::LocalSet;
use crate::arbiter::{Arbiter, SystemArbiter}; use crate::arbiter::{Arbiter, SystemArbiter};

View File

@ -1,6 +1,5 @@
//! A runtime implementation that runs everything on the current thread. //! A runtime implementation that runs everything on the current thread.
#![deny(rust_2018_idioms, warnings)] #![forbid(unsafe_code)]
#![allow(clippy::type_complexity)]
#[cfg(not(test))] // Work around for rust-lang/rust#62127 #[cfg(not(test))] // Work around for rust-lang/rust#62127
pub use actix_macros::{main, test}; pub use actix_macros::{main, test};
@ -25,7 +24,7 @@ pub use actix_threadpool as blocking;
/// This function panics if actix system is not running. /// This function panics if actix system is not running.
pub fn spawn<F>(f: F) pub fn spawn<F>(f: F)
where where
F: futures_util::future::Future<Output = ()> + 'static, F: std::future::Future<Output = ()> + 'static,
{ {
if !System::is_set() { if !System::is_set() {
panic!("System is not running"); panic!("System is not running");

View File

@ -5,6 +5,7 @@
* Use `concurrent-queue` to manage poll wakes instead of `futures::channel::mpsc`. * Use `concurrent-queue` to manage poll wakes instead of `futures::channel::mpsc`.
* Remove `AcceptNotify` type and pass `WakerQueue` to `WorkerClient` for notify the `Accept` more directly. * Remove `AcceptNotify` type and pass `WakerQueue` to `WorkerClient` for notify the `Accept` more directly.
* Convert `mio::Stream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`IntoRawSocket` and `FromRawSocket` on windows). * Convert `mio::Stream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`IntoRawSocket` and `FromRawSocket` on windows).
* Remove `AsyncRead` and `AsyncWrite` trait bound for `socket::FromStream` trait.
## 1.0.4 - 2020-09-12 ## 1.0.4 - 2020-09-12
* Update actix-codec to 0.3.0. * Update actix-codec to 0.3.0.

View File

@ -17,6 +17,7 @@ struct ServerSocketInfo {
addr: SocketAddr, addr: SocketAddr,
token: Token, token: Token,
sock: MioSocketListener, sock: MioSocketListener,
// timeout is used to mark the time this socket should be reregistered after an error.
timeout: Option<Instant>, timeout: Option<Instant>,
} }
@ -34,10 +35,7 @@ pub(crate) struct AcceptLoop {
impl AcceptLoop { impl AcceptLoop {
pub fn new(srv: Server) -> Self { pub fn new(srv: Server) -> Self {
// Create a poll instance.
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create mio::Poll: {}", e)); let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create mio::Poll: {}", e));
// construct a waker queue which would wake up poll with associate extra interest types.
let waker = WakerQueue::with_capacity(poll.registry(), 128).unwrap(); let waker = WakerQueue::with_capacity(poll.registry(), 128).unwrap();
Self { Self {
@ -51,11 +49,11 @@ impl AcceptLoop {
self.waker.clone() self.waker.clone()
} }
pub fn wake_accept(&self, i: WakerInterest) { pub fn wake(&self, i: WakerInterest) {
self.waker.wake(i); self.waker.wake(i);
} }
pub(crate) fn start_accept( pub(crate) fn start(
&mut self, &mut self,
socks: Vec<(Token, StdListener)>, socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
@ -101,9 +99,9 @@ impl Accept {
srv: Server, srv: Server,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
) { ) {
// Accept runs in its own thread and would want to spawn additional futures to current
// actix system.
let sys = System::current(); let sys = System::current();
// start accept thread
thread::Builder::new() thread::Builder::new()
.name("actix-server accept loop".to_owned()) .name("actix-server accept loop".to_owned())
.spawn(move || { .spawn(move || {
@ -121,14 +119,13 @@ impl Accept {
socks: Vec<(Token, StdListener)>, socks: Vec<(Token, StdListener)>,
workers: Vec<WorkerClient>, workers: Vec<WorkerClient>,
srv: Server, srv: Server,
// Accept and sockets info are separated so that we can borrow mut on both at the same time
) -> (Accept, Slab<ServerSocketInfo>) { ) -> (Accept, Slab<ServerSocketInfo>) {
// Start accept
let mut sockets = Slab::new(); let mut sockets = Slab::new();
for (hnd_token, lst) in socks.into_iter() { for (hnd_token, lst) in socks.into_iter() {
let addr = lst.local_addr(); let addr = lst.local_addr();
let mut server = lst let mut sock = lst
.into_mio_listener() .into_mio_listener()
.unwrap_or_else(|e| panic!("Can not set non_block on listener: {}", e)); .unwrap_or_else(|e| panic!("Can not set non_block on listener: {}", e));
let entry = sockets.vacant_entry(); let entry = sockets.vacant_entry();
@ -136,13 +133,13 @@ impl Accept {
// Start listening for incoming connections // Start listening for incoming connections
poll.registry() poll.registry()
.register(&mut server, MioToken(token + DELTA), Interest::READABLE) .register(&mut sock, MioToken(token + DELTA), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e)); .unwrap_or_else(|e| panic!("Can not register io: {}", e));
entry.insert(ServerSocketInfo { entry.insert(ServerSocketInfo {
addr, addr,
token: hnd_token, token: hnd_token,
sock: server, sock,
timeout: None, timeout: None,
}); });
} }
@ -160,7 +157,6 @@ impl Accept {
} }
fn poll_with(&mut self, mut sockets: Slab<ServerSocketInfo>) { fn poll_with(&mut self, mut sockets: Slab<ServerSocketInfo>) {
// Create storage for events
let mut events = mio::Events::with_capacity(128); let mut events = mio::Events::with_capacity(128);
loop { loop {
@ -171,36 +167,40 @@ impl Accept {
for event in events.iter() { for event in events.iter() {
let token = event.token(); let token = event.token();
match token { match token {
// This is a loop because interests for command were a loop that would try to // This is a loop because interests for command from previous version was a
// drain the command channel. // loop that would try to drain the command channel. It's yet unknown if it's
// necessary/good practice to actively drain the waker queue.
WAKER_TOKEN => 'waker: loop { WAKER_TOKEN => 'waker: loop {
match self.waker.pop() { match self.waker.pop() {
// worker notify it's availability has change. we maybe want to enter
// backpressure or recover from one.
Ok(WakerInterest::Notify) => { Ok(WakerInterest::Notify) => {
self.maybe_backpressure(&mut sockets, false) self.maybe_backpressure(&mut sockets, false);
} }
Ok(WakerInterest::Pause) => { Ok(WakerInterest::Pause) => {
for (_, info) in sockets.iter_mut() { sockets.iter_mut().for_each(|(_, info)| {
if let Err(err) = if let Err(err) = self.deregister(info) {
self.poll.registry().deregister(&mut info.sock)
{
error!("Can not deregister server socket {}", err); error!("Can not deregister server socket {}", err);
} else { } else {
info!("Paused accepting connections on {}", info.addr); info!("Paused accepting connections on {}", info.addr);
} }
} });
} }
Ok(WakerInterest::Resume) => { Ok(WakerInterest::Resume) => {
for (token, info) in sockets.iter_mut() { sockets.iter_mut().for_each(|(token, info)| {
self.register_logged(token, info); self.register_logged(token, info);
} });
} }
Ok(WakerInterest::Stop) => { Ok(WakerInterest::Stop) => {
return self.deregister_all(&mut sockets) return self.deregister_all(&mut sockets);
} }
// a new worker thread is made and it's client would be added to Accept
Ok(WakerInterest::Worker(worker)) => { Ok(WakerInterest::Worker(worker)) => {
// maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false); self.maybe_backpressure(&mut sockets, false);
self.workers.push(worker); self.workers.push(worker);
} }
// got timer interest and it's time to try register socket(s) again.
Ok(WakerInterest::Timer) => self.process_timer(&mut sockets), Ok(WakerInterest::Timer) => self.process_timer(&mut sockets),
Err(WakerQueueError::Empty) => break 'waker, Err(WakerQueueError::Empty) => break 'waker,
Err(WakerQueueError::Closed) => { Err(WakerQueueError::Closed) => {
@ -220,9 +220,10 @@ impl Accept {
} }
} }
fn process_timer(&mut self, sockets: &mut Slab<ServerSocketInfo>) { fn process_timer(&self, sockets: &mut Slab<ServerSocketInfo>) {
let now = Instant::now(); let now = Instant::now();
for (token, info) in sockets.iter_mut() { for (token, info) in sockets.iter_mut() {
// only the sockets have an associate timeout value was de registered.
if let Some(inst) = info.timeout.take() { if let Some(inst) = info.timeout.take() {
if now > inst { if now > inst {
self.register_logged(token, info); self.register_logged(token, info);
@ -270,9 +271,13 @@ impl Accept {
} }
} }
fn deregister_all(&mut self, sockets: &mut Slab<ServerSocketInfo>) { fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
self.poll.registry().deregister(&mut info.sock)
}
fn deregister_all(&self, sockets: &mut Slab<ServerSocketInfo>) {
sockets.iter_mut().for_each(|(_, info)| { sockets.iter_mut().for_each(|(_, info)| {
let _ = self.poll.registry().deregister(&mut info.sock); let _ = self.deregister(info);
}); });
} }
@ -299,6 +304,9 @@ impl Accept {
break; break;
} }
Err(tmp) => { Err(tmp) => {
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made.
// after that remove the fault worker and enter backpressure if necessary.
self.srv.worker_faulted(self.workers[self.next].idx); self.srv.worker_faulted(self.workers[self.next].idx);
msg = tmp; msg = tmp;
self.workers.swap_remove(self.next); self.workers.swap_remove(self.next);
@ -322,6 +330,9 @@ impl Accept {
self.set_next(); self.set_next();
return; return;
} }
// worker lost contact and could be gone. a message is sent to
// `ServerBuilder` future to notify it a new worker should be made.
// after that remove the fault worker and enter backpressure if necessary.
Err(tmp) => { Err(tmp) => {
self.srv.worker_faulted(self.workers[self.next].idx); self.srv.worker_faulted(self.workers[self.next].idx);
msg = tmp; msg = tmp;
@ -363,14 +374,17 @@ impl Accept {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue, Err(ref e) if connection_error(e) => continue,
Err(e) => { Err(e) => {
// deregister socket temporary
error!("Error accepting connection: {}", e); error!("Error accepting connection: {}", e);
if let Err(err) = self.poll.registry().deregister(&mut info.sock) { if let Err(err) = self.poll.registry().deregister(&mut info.sock) {
error!("Can not deregister server socket {}", err); error!("Can not deregister server socket {}", err);
} }
// sleep after error // sleep after error. write the timeout to socket info as later the poll
// would need it mark which socket and when it should be registered.
info.timeout = Some(Instant::now() + Duration::from_millis(500)); info.timeout = Some(Instant::now() + Duration::from_millis(500));
// after the sleep a Timer interest is sent to Accept Poll
let waker = self.waker.clone(); let waker = self.waker.clone();
System::current().arbiter().send(Box::pin(async move { System::current().arbiter().send(Box::pin(async move {
sleep_until(Instant::now() + Duration::from_millis(510)).await; sleep_until(Instant::now() + Duration::from_millis(510)).await;

View File

@ -203,7 +203,7 @@ impl ServerBuilder {
self.listen_uds(name, lst, factory) self.listen_uds(name, lst, factory)
} }
#[cfg(all(unix))] #[cfg(unix)]
/// Add new unix domain service to the server. /// Add new unix domain service to the server.
/// Useful when running as a systemd service and /// Useful when running as a systemd service and
/// a socket FD can be acquired using the systemd crate. /// a socket FD can be acquired using the systemd crate.
@ -278,7 +278,7 @@ impl ServerBuilder {
for sock in &self.sockets { for sock in &self.sockets {
info!("Starting \"{}\" service on {}", sock.1, sock.2); info!("Starting \"{}\" service on {}", sock.1, sock.2);
} }
self.accept.start_accept( self.accept.start(
mem::take(&mut self.sockets) mem::take(&mut self.sockets)
.into_iter() .into_iter()
.map(|t| (t.0, t.2)) .map(|t| (t.0, t.2))
@ -309,11 +309,11 @@ impl ServerBuilder {
fn handle_cmd(&mut self, item: ServerCommand) { fn handle_cmd(&mut self, item: ServerCommand) {
match item { match item {
ServerCommand::Pause(tx) => { ServerCommand::Pause(tx) => {
self.accept.wake_accept(WakerInterest::Pause); self.accept.wake(WakerInterest::Pause);
let _ = tx.send(()); let _ = tx.send(());
} }
ServerCommand::Resume(tx) => { ServerCommand::Resume(tx) => {
self.accept.wake_accept(WakerInterest::Resume); self.accept.wake(WakerInterest::Resume);
let _ = tx.send(()); let _ = tx.send(());
} }
ServerCommand::Signal(sig) => { ServerCommand::Signal(sig) => {
@ -357,7 +357,7 @@ impl ServerBuilder {
let exit = self.exit; let exit = self.exit;
// stop accept thread // stop accept thread
self.accept.wake_accept(WakerInterest::Stop); self.accept.wake(WakerInterest::Stop);
let notify = std::mem::take(&mut self.notify); let notify = std::mem::take(&mut self.notify);
// stop workers // stop workers
@ -436,7 +436,7 @@ impl ServerBuilder {
let worker = self.start_worker(new_idx, self.accept.waker_owned()); let worker = self.start_worker(new_idx, self.accept.waker_owned());
self.workers.push((new_idx, worker.clone())); self.workers.push((new_idx, worker.clone()));
self.accept.wake_accept(WakerInterest::Worker(worker)); self.accept.wake(WakerInterest::Worker(worker));
} }
} }
} }

View File

@ -5,7 +5,6 @@ use std::task::{Context, Poll};
use futures_channel::mpsc::UnboundedSender; use futures_channel::mpsc::UnboundedSender;
use futures_channel::oneshot; use futures_channel::oneshot;
use futures_util::FutureExt;
use crate::builder::ServerBuilder; use crate::builder::ServerBuilder;
use crate::signals::Signal; use crate::signals::Signal;
@ -56,14 +55,18 @@ impl Server {
pub fn pause(&self) -> impl Future<Output = ()> { pub fn pause(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Pause(tx)); let _ = self.0.unbounded_send(ServerCommand::Pause(tx));
rx.map(|_| ()) async {
let _ = rx.await;
}
} }
/// Resume accepting incoming connections /// Resume accepting incoming connections
pub fn resume(&self) -> impl Future<Output = ()> { pub fn resume(&self) -> impl Future<Output = ()> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let _ = self.0.unbounded_send(ServerCommand::Resume(tx)); let _ = self.0.unbounded_send(ServerCommand::Resume(tx));
rx.map(|_| ()) async {
let _ = rx.await;
}
} }
/// Stop incoming connection processing, stop all workers and exit. /// Stop incoming connection processing, stop all workers and exit.
@ -75,7 +78,9 @@ impl Server {
graceful, graceful,
completion: Some(tx), completion: Some(tx),
}); });
rx.map(|_| ()) async {
let _ = rx.await;
}
} }
} }
@ -101,8 +106,7 @@ impl Future for Server {
match Pin::new(this.1.as_mut().unwrap()).poll(cx) { match Pin::new(this.1.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), Poll::Ready(_) => Poll::Ready(Ok(())),
Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
} }
} }
} }

View File

@ -77,7 +77,7 @@ where
fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future { fn call(&mut self, (guard, req): (Option<CounterGuard>, ServerMessage)) -> Self::Future {
match req { match req {
ServerMessage::Connect(stream) => { ServerMessage::Connect(stream) => {
let stream = FromStream::from_mio_stream(stream).map_err(|e| { let stream = FromStream::from_mio(stream).map_err(|e| {
error!("Can not convert to an async tcp stream: {}", e); error!("Can not convert to an async tcp stream: {}", e);
}); });

View File

@ -9,7 +9,6 @@ use std::os::unix::{
#[cfg(windows)] #[cfg(windows)]
use std::os::windows::io::{FromRawSocket, IntoRawSocket}; use std::os::windows::io::{FromRawSocket, IntoRawSocket};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
#[cfg(unix)] #[cfg(unix)]
use actix_rt::net::UnixStream; use actix_rt::net::UnixStream;
@ -27,15 +26,17 @@ use mio::{Interest, Registry, Token};
pub(crate) enum StdListener { pub(crate) enum StdListener {
Tcp(StdTcpListener), Tcp(StdTcpListener),
#[cfg(all(unix))] #[cfg(unix)]
Uds(StdUnixListener), Uds(StdUnixListener),
} }
pub(crate) enum SocketAddr { pub(crate) enum SocketAddr {
Tcp(StdTcpSocketAddr), Tcp(StdTcpSocketAddr),
#[cfg(all(unix))] #[cfg(unix)]
Uds(StdUdsSocketAddr), Uds(StdUdsSocketAddr),
#[cfg(all(unix))] // this is a work around. mio would return different types of SocketAddr between accept and
// local_addr methods.
#[cfg(unix)]
UdsMio(MioSocketAddr), UdsMio(MioSocketAddr),
} }
@ -43,9 +44,9 @@ impl fmt::Display for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(all(unix))] #[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
#[cfg(all(unix))] #[cfg(unix)]
SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr), SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr),
} }
} }
@ -55,9 +56,9 @@ impl fmt::Debug for SocketAddr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(all(unix))] #[cfg(unix)]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
#[cfg(all(unix))] #[cfg(unix)]
SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr), SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr),
} }
} }
@ -67,7 +68,7 @@ impl fmt::Display for StdListener {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self { match *self {
StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()),
#[cfg(all(unix))] #[cfg(unix)]
StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()),
} }
} }
@ -77,7 +78,7 @@ impl StdListener {
pub(crate) fn local_addr(&self) -> SocketAddr { pub(crate) fn local_addr(&self) -> SocketAddr {
match self { match self {
StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()),
#[cfg(all(unix))] #[cfg(unix)]
StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()), StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()),
} }
} }
@ -87,13 +88,13 @@ impl StdListener {
StdListener::Tcp(lst) => { StdListener::Tcp(lst) => {
// ToDo: is this non_blocking a good practice? // ToDo: is this non_blocking a good practice?
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
Ok(MioSocketListener::Tcp(mio::net::TcpListener::from_std(lst))) Ok(MioSocketListener::Tcp(MioTcpListener::from_std(lst)))
} }
#[cfg(all(unix))] #[cfg(unix)]
StdListener::Uds(lst) => { StdListener::Uds(lst) => {
// ToDo: the same as above // ToDo: the same as above
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
Ok(MioSocketListener::Uds(mio::net::UnixListener::from_std( Ok(MioSocketListener::Uds(MioUnixListener::from_std(
lst, lst,
))) )))
} }
@ -104,13 +105,13 @@ impl StdListener {
#[derive(Debug)] #[derive(Debug)]
pub enum MioStream { pub enum MioStream {
Tcp(MioTcpStream), Tcp(MioTcpStream),
#[cfg(all(unix))] #[cfg(unix)]
Uds(MioUnixStream), Uds(MioUnixStream),
} }
pub(crate) enum MioSocketListener { pub(crate) enum MioSocketListener {
Tcp(MioTcpListener), Tcp(MioTcpListener),
#[cfg(all(unix))] #[cfg(unix)]
Uds(MioUnixListener), Uds(MioUnixListener),
} }
@ -120,7 +121,7 @@ impl MioSocketListener {
MioSocketListener::Tcp(ref lst) => lst MioSocketListener::Tcp(ref lst) => lst
.accept() .accept()
.map(|(stream, addr)| Some((MioStream::Tcp(stream), SocketAddr::Tcp(addr)))), .map(|(stream, addr)| Some((MioStream::Tcp(stream), SocketAddr::Tcp(addr)))),
#[cfg(all(unix))] #[cfg(unix)]
MioSocketListener::Uds(ref lst) => lst MioSocketListener::Uds(ref lst) => lst
.accept() .accept()
.map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::UdsMio(addr)))), .map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::UdsMio(addr)))),
@ -137,7 +138,7 @@ impl Source for MioSocketListener {
) -> io::Result<()> { ) -> io::Result<()> {
match *self { match *self {
MioSocketListener::Tcp(ref mut lst) => lst.register(registry, token, interests), MioSocketListener::Tcp(ref mut lst) => lst.register(registry, token, interests),
#[cfg(all(unix))] #[cfg(unix)]
MioSocketListener::Uds(ref mut lst) => lst.register(registry, token, interests), MioSocketListener::Uds(ref mut lst) => lst.register(registry, token, interests),
} }
} }
@ -150,7 +151,7 @@ impl Source for MioSocketListener {
) -> io::Result<()> { ) -> io::Result<()> {
match *self { match *self {
MioSocketListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests), MioSocketListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests),
#[cfg(all(unix))] #[cfg(unix)]
MioSocketListener::Uds(ref mut lst) => lst.reregister(registry, token, interests), MioSocketListener::Uds(ref mut lst) => lst.reregister(registry, token, interests),
} }
} }
@ -158,7 +159,7 @@ impl Source for MioSocketListener {
fn deregister(&mut self, registry: &Registry) -> io::Result<()> { fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
match *self { match *self {
MioSocketListener::Tcp(ref mut lst) => lst.deregister(registry), MioSocketListener::Tcp(ref mut lst) => lst.deregister(registry),
#[cfg(all(unix))] #[cfg(unix)]
MioSocketListener::Uds(ref mut lst) => { MioSocketListener::Uds(ref mut lst) => {
let res = lst.deregister(registry); let res = lst.deregister(registry);
@ -175,17 +176,19 @@ impl Source for MioSocketListener {
} }
/// helper trait for converting mio stream to tokio stream. /// helper trait for converting mio stream to tokio stream.
pub trait FromStream: AsyncRead + AsyncWrite + Sized { pub trait FromStream: Sized {
fn from_mio_stream(sock: MioStream) -> io::Result<Self>; fn from_mio(sock: MioStream) -> io::Result<Self>;
} }
// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream // ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(unix)] #[cfg(unix)]
impl FromStream for TcpStream { impl FromStream for TcpStream {
fn from_mio_stream(sock: MioStream) -> io::Result<Self> { fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock { match sock {
MioStream::Tcp(mio) => { MioStream::Tcp(mio) => {
let raw = IntoRawFd::into_raw_fd(mio); let raw = IntoRawFd::into_raw_fd(mio);
// # Safety:
// This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
} }
MioStream::Uds(_) => { MioStream::Uds(_) => {
@ -198,23 +201,28 @@ impl FromStream for TcpStream {
// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream // ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(windows)] #[cfg(windows)]
impl FromStream for TcpStream { impl FromStream for TcpStream {
fn from_mio_stream(sock: MioStream) -> io::Result<Self> { fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock { match sock {
MioStream::Tcp(mio) => { MioStream::Tcp(mio) => {
let raw = IntoRawSocket::into_raw_socket(mio); let raw = IntoRawSocket::into_raw_socket(mio);
// # Safety:
// This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) })
} }
} }
} }
} }
// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(unix)] #[cfg(unix)]
impl FromStream for UnixStream { impl FromStream for UnixStream {
fn from_mio_stream(sock: MioStream) -> io::Result<Self> { fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock { match sock {
MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
MioStream::Uds(mio) => { MioStream::Uds(mio) => {
let raw = IntoRawFd::into_raw_fd(mio); let raw = IntoRawFd::into_raw_fd(mio);
// # Safety:
// This is a in place conversion from mio stream to tokio stream.
UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
} }
} }

View File

@ -49,7 +49,7 @@ impl WakerQueue {
/// ///
/// *. These interests should not be confused with `mio::Interest` and mostly not I/O related /// *. These interests should not be confused with `mio::Interest` and mostly not I/O related
pub(crate) enum WakerInterest { pub(crate) enum WakerInterest {
/// Interest from `Worker` notifying `Accept` to run `backpressure` method /// Interest from `Worker` notifying `Accept` to run `maybe_backpressure` method
Notify, Notify,
/// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to
/// `ServerCommand` and notify `Accept` to do exactly these tasks. /// `ServerCommand` and notify `Accept` to do exactly these tasks.
@ -57,7 +57,7 @@ pub(crate) enum WakerInterest {
Resume, Resume,
Stop, Stop,
/// `Timer` is an interest sent as a delayed future. When an error happens on accepting /// `Timer` is an interest sent as a delayed future. When an error happens on accepting
/// connection the poll would deregister sockets temporary and wake up the poll and register /// connection `Accept` would deregister sockets temporary and wake up the poll and register
/// them again after the delayed future resolve. /// them again after the delayed future resolve.
Timer, Timer,
/// `Worker` ins an interest happen after a worker runs into faulted state(This is determined by /// `Worker` ins an interest happen after a worker runs into faulted state(This is determined by

View File

@ -3,7 +3,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time; use std::time::Duration;
use actix_rt::time::{sleep_until, Instant, Sleep}; use actix_rt::time::{sleep_until, Instant, Sleep};
use actix_rt::{spawn, Arbiter}; use actix_rt::{spawn, Arbiter};
@ -134,7 +134,7 @@ pub(crate) struct Worker {
conns: Counter, conns: Counter,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
state: WorkerState, state: WorkerState,
shutdown_timeout: time::Duration, shutdown_timeout: Duration,
} }
struct WorkerService { struct WorkerService {
@ -165,61 +165,58 @@ impl Worker {
idx: usize, idx: usize,
factories: Vec<Box<dyn InternalServiceFactory>>, factories: Vec<Box<dyn InternalServiceFactory>>,
availability: WorkerAvailability, availability: WorkerAvailability,
shutdown_timeout: time::Duration, shutdown_timeout: Duration,
) -> WorkerClient { ) -> WorkerClient {
let (tx1, rx) = unbounded(); let (tx1, rx) = unbounded();
let (tx2, rx2) = unbounded(); let (tx2, rx2) = unbounded();
let avail = availability.clone(); let avail = availability.clone();
Arbiter::new().send( Arbiter::new().send(Box::pin(async move {
async move { availability.set(false);
availability.set(false); let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker {
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { rx,
rx, rx2,
rx2, availability,
availability, factories,
factories, shutdown_timeout,
shutdown_timeout, services: Vec::new(),
services: Vec::new(), conns: conns.clone(),
conns: conns.clone(), state: WorkerState::Unavailable(Vec::new()),
state: WorkerState::Unavailable(Vec::new()), });
});
let mut fut: Vec<MapOk<LocalBoxFuture<'static, _>, _>> = Vec::new(); let mut fut: Vec<MapOk<LocalBoxFuture<'static, _>, _>> = Vec::new();
for (idx, factory) in wrk.factories.iter().enumerate() { for (idx, factory) in wrk.factories.iter().enumerate() {
fut.push(factory.create().map_ok(move |r| { fut.push(factory.create().map_ok(move |r| {
r.into_iter() r.into_iter()
.map(|(t, s): (Token, _)| (idx, t, s)) .map(|(t, s): (Token, _)| (idx, t, s))
.collect::<Vec<_>>() .collect::<Vec<_>>()
})); }));
} }
spawn(async move { spawn(async move {
let res = join_all(fut).await; let res = join_all(fut).await;
let res: Result<Vec<_>, _> = res.into_iter().collect(); let res: Result<Vec<_>, _> = res.into_iter().collect();
match res { match res {
Ok(services) => { Ok(services) => {
for item in services { for item in services {
for (factory, token, service) in item { for (factory, token, service) in item {
assert_eq!(token.0, wrk.services.len()); assert_eq!(token.0, wrk.services.len());
wrk.services.push(WorkerService { wrk.services.push(WorkerService {
factory, factory,
service, service,
status: WorkerServiceStatus::Unavailable, status: WorkerServiceStatus::Unavailable,
}); });
}
} }
} }
Err(e) => {
error!("Can not start worker: {:?}", e);
Arbiter::current().stop();
}
} }
wrk.await Err(e) => {
}); error!("Can not start worker: {:?}", e);
} Arbiter::current().stop();
.boxed(), }
); }
wrk.await
});
}));
WorkerClient::new(idx, tx1, tx2, avail) WorkerClient::new(idx, tx1, tx2, avail)
} }
@ -330,7 +327,7 @@ impl Future for Worker {
if num != 0 { if num != 0 {
info!("Graceful worker shutdown, {} connections", num); info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown( self.state = WorkerState::Shutdown(
sleep_until(Instant::now() + time::Duration::from_secs(1)), sleep_until(Instant::now() + Duration::from_secs(1)),
sleep_until(Instant::now() + self.shutdown_timeout), sleep_until(Instant::now() + self.shutdown_timeout),
Some(result), Some(result),
); );
@ -437,7 +434,7 @@ impl Future for Worker {
// sleep for 1 second and then check again // sleep for 1 second and then check again
if Pin::new(&mut *t1).poll(cx).is_ready() { if Pin::new(&mut *t1).poll(cx).is_ready() {
*t1 = sleep_until(Instant::now() + time::Duration::from_secs(1)); *t1 = sleep_until(Instant::now() + Duration::from_secs(1));
let _ = Pin::new(t1).poll(cx); let _ = Pin::new(t1).poll(cx);
} }
@ -445,7 +442,7 @@ impl Future for Worker {
} }
WorkerState::Available => { WorkerState::Available => {
loop { loop {
match Pin::new(&mut self.rx).poll_next(cx) { return match Pin::new(&mut self.rx).poll_next(cx) {
// handle incoming io stream // handle incoming io stream
Poll::Ready(Some(WorkerCommand(msg))) => { Poll::Ready(Some(WorkerCommand(msg))) => {
match self.check_readiness(cx) { match self.check_readiness(cx) {
@ -476,13 +473,13 @@ impl Future for Worker {
); );
} }
} }
return self.poll(cx); self.poll(cx)
} }
Poll::Pending => { Poll::Pending => {
self.state = WorkerState::Available; self.state = WorkerState::Available;
return Poll::Pending; Poll::Pending
} }
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => Poll::Ready(()),
} }
} }
} }