Remove Slab

This commit is contained in:
fakeshadow 2021-04-17 11:07:11 +08:00
parent ed2c975f92
commit b0f02ae8fc
16 changed files with 370 additions and 231 deletions

View File

@ -22,14 +22,13 @@ default = []
[dependencies] [dependencies]
actix-rt = { version = "2.0.0", default-features = false } actix-rt = { version = "2.0.0", default-features = false }
actix-service = "2.0.0-beta.5" actix-service = "2.0.0"
actix-utils = "3.0.0-beta.4" actix-utils = "3.0.0"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4" log = "0.4"
mio = { version = "0.7.6", features = ["os-poll", "net"] } mio = { version = "0.7.6", features = ["os-poll", "net"] }
num_cpus = "1.13" num_cpus = "1.13"
slab = "0.4"
tokio = { version = "1.2", features = ["sync"] } tokio = { version = "1.2", features = ["sync"] }
[dev-dependencies] [dev-dependencies]

View File

@ -7,21 +7,17 @@ use actix_rt::{
}; };
use log::{error, info}; use log::{error, info};
use mio::{Interest, Poll, Token as MioToken}; use mio::{Interest, Poll, Token as MioToken};
use slab::Slab;
use crate::server::Server; use crate::server::Server;
use crate::socket::{MioListener, SocketAddr}; use crate::socket::{MioListener, SocketAddr};
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept}; use crate::worker::{Conn, WorkerHandleAccept};
use crate::Token;
struct ServerSocketInfo { struct ServerSocketInfo {
/// Address of socket. Mainly used for logging. /// Address of socket. Mainly used for logging.
addr: SocketAddr, addr: SocketAddr,
/// Beware this is the crate token for identify socket and should not be confused token: usize,
/// with `mio::Token`.
token: Token,
lst: MioListener, lst: MioListener,
@ -65,7 +61,7 @@ impl AcceptLoop {
pub(crate) fn start( pub(crate) fn start(
&mut self, &mut self,
socks: Vec<(Token, MioListener)>, socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
) { ) {
let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let srv = self.srv.take().expect("Can not re-use AcceptInfo");
@ -98,6 +94,7 @@ impl Default for Availability {
impl Availability { impl Availability {
/// Check if any worker handle is available /// Check if any worker handle is available
#[inline(always)]
fn available(&self) -> bool { fn available(&self) -> bool {
self.0.iter().any(|a| *a != 0) self.0.iter().any(|a| *a != 0)
} }
@ -150,7 +147,7 @@ impl Accept {
pub(crate) fn start( pub(crate) fn start(
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(Token, MioListener)>, socks: Vec<(usize, MioListener)>,
srv: Server, srv: Server,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
) { ) {
@ -161,10 +158,10 @@ impl Accept {
.name("actix-server accept loop".to_owned()) .name("actix-server accept loop".to_owned())
.spawn(move || { .spawn(move || {
System::set_current(sys); System::set_current(sys);
let (mut accept, sockets) = let (mut accept, mut sockets) =
Accept::new_with_sockets(poll, waker, socks, handles, srv); Accept::new_with_sockets(poll, waker, socks, handles, srv);
accept.poll_with(sockets); accept.poll_with(&mut sockets);
}) })
.unwrap(); .unwrap();
} }
@ -172,29 +169,28 @@ impl Accept {
fn new_with_sockets( fn new_with_sockets(
poll: Poll, poll: Poll,
waker: WakerQueue, waker: WakerQueue,
socks: Vec<(Token, MioListener)>, socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>, handles: Vec<WorkerHandleAccept>,
srv: Server, srv: Server,
) -> (Accept, Slab<ServerSocketInfo>) { ) -> (Accept, Vec<ServerSocketInfo>) {
let mut sockets = Slab::new(); let sockets = socks
for (hnd_token, mut lst) in socks.into_iter() { .into_iter()
let addr = lst.local_addr(); .map(|(token, mut lst)| {
let addr = lst.local_addr();
let entry = sockets.vacant_entry(); // Start listening for incoming connections
let token = entry.key(); poll.registry()
.register(&mut lst, MioToken(token), Interest::READABLE)
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
// Start listening for incoming connections ServerSocketInfo {
poll.registry() addr,
.register(&mut lst, MioToken(token), Interest::READABLE) token,
.unwrap_or_else(|e| panic!("Can not register io: {}", e)); lst,
timeout: None,
entry.insert(ServerSocketInfo { }
addr, })
token: hnd_token, .collect();
lst,
timeout: None,
});
}
let mut avail = Availability::default(); let mut avail = Availability::default();
@ -214,7 +210,7 @@ impl Accept {
(accept, sockets) (accept, sockets)
} }
fn poll_with(&mut self, mut sockets: Slab<ServerSocketInfo>) { fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) {
let mut events = mio::Events::with_capacity(128); let mut events = mio::Events::with_capacity(128);
loop { loop {
@ -228,77 +224,89 @@ impl Accept {
for event in events.iter() { for event in events.iter() {
let token = event.token(); let token = event.token();
match token { match token {
// This is a loop because interests for command from previous version was WAKER_TOKEN => {
// a loop that would try to drain the command channel. It's yet unknown let exit = self.handle_waker(sockets);
// if it's necessary/good practice to actively drain the waker queue. if exit {
WAKER_TOKEN => 'waker: loop { return;
// take guard with every iteration so no new interest can be added
// until the current task is done.
let mut guard = self.waker.guard();
match guard.pop_front() {
// worker notify it becomes available. we may want to recover
// from backpressure.
Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard);
self.maybe_backpressure(&mut sockets, false);
self.avail.set_available(idx, true);
}
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
// maybe we want to recover from a backpressure.
self.maybe_backpressure(&mut sockets, false);
self.avail.set_available(handle.idx(), true);
self.handles.push(handle);
}
// got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(&mut sockets)
}
Some(WakerInterest::Pause) => {
drop(guard);
self.deregister_all(&mut sockets);
}
Some(WakerInterest::Resume) => {
drop(guard);
sockets.iter_mut().for_each(|(token, info)| {
self.register_logged(token, info);
});
}
Some(WakerInterest::Stop) => {
return self.deregister_all(&mut sockets);
}
// waker queue is drained
None => {
// Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard);
break 'waker;
}
} }
}, }
_ => { _ => {
let token = usize::from(token); let token = usize::from(token);
self.accept(&mut sockets, token); self.accept(sockets, token);
} }
} }
} }
} }
} }
fn process_timer(&self, sockets: &mut Slab<ServerSocketInfo>) { fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool {
// This is a loop because interests for command from previous version was
// a loop that would try to drain the command channel. It's yet unknown
// if it's necessary/good practice to actively drain the waker queue.
loop {
// take guard with every iteration so no new interest can be added
// until the current task is done.
let mut guard = self.waker.guard();
match guard.pop_front() {
// worker notify it becomes available. we may want to recover
// from backpressure.
Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard);
self.maybe_backpressure(sockets, false);
self.avail.set_available(idx, true);
self.accept_all(sockets);
}
// a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => {
drop(guard);
// maybe we want to recover from a backpressure.
self.maybe_backpressure(sockets, false);
self.avail.set_available(handle.idx(), true);
self.handles.push(handle);
self.accept_all(sockets);
}
// got timer interest and it's time to try register socket(s) again
Some(WakerInterest::Timer) => {
drop(guard);
self.process_timer(sockets)
}
Some(WakerInterest::Pause) => {
drop(guard);
self.deregister_all(sockets);
}
Some(WakerInterest::Resume) => {
drop(guard);
sockets.iter_mut().for_each(|info| {
self.register_logged(info);
});
}
Some(WakerInterest::Stop) => {
self.deregister_all(sockets);
return true;
}
// waker queue is drained
None => {
// Reset the WakerQueue before break so it does not grow infinitely
WakerQueue::reset(&mut guard);
return false;
}
}
}
}
fn process_timer(&self, sockets: &mut [ServerSocketInfo]) {
let now = Instant::now(); let now = Instant::now();
sockets sockets
.iter_mut() .iter_mut()
// Only sockets that had an associated timeout were deregistered. // Only sockets that had an associated timeout were deregistered.
.filter(|(_, info)| info.timeout.is_some()) .filter(|info| info.timeout.is_some())
.for_each(|(token, info)| { .for_each(|info| {
let inst = info.timeout.take().unwrap(); let inst = info.timeout.take().unwrap();
if now < inst { if now < inst {
info.timeout = Some(inst); info.timeout = Some(inst);
} else if !self.backpressure { } else if !self.backpressure {
self.register_logged(token, info); self.register_logged(info);
} }
// Drop the timeout if server is in backpressure and socket timeout is expired. // Drop the timeout if server is in backpressure and socket timeout is expired.
@ -308,42 +316,38 @@ impl Accept {
} }
#[cfg(not(target_os = "windows"))] #[cfg(not(target_os = "windows"))]
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
let token = MioToken(info.token);
self.poll self.poll
.registry() .registry()
.register(&mut info.lst, MioToken(token), Interest::READABLE) .register(&mut info.lst, token, Interest::READABLE)
} }
#[cfg(target_os = "windows")] #[cfg(target_os = "windows")]
fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
// On windows, calling register without deregister cause an error. // On windows, calling register without deregister cause an error.
// See https://github.com/actix/actix-web/issues/905 // See https://github.com/actix/actix-web/issues/905
// Calling reregister seems to fix the issue. // Calling reregister seems to fix the issue.
let token = MioToken(info.token);
self.poll self.poll
.registry() .registry()
.register(&mut info.lst, mio::Token(token), Interest::READABLE) .register(&mut info.lst, token, Interest::READABLE)
.or_else(|_| { .or_else(|_| {
self.poll.registry().reregister( self.poll
&mut info.lst, .registry()
mio::Token(token), .reregister(&mut info.lst, token, Interest::READABLE)
Interest::READABLE,
)
}) })
} }
fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) { fn register_logged(&self, info: &mut ServerSocketInfo) {
match self.register(token, info) { match self.register(info) {
Ok(_) => info!("Resume accepting connections on {}", info.addr), Ok(_) => info!("Resume accepting connections on {}", info.addr),
Err(e) => error!("Can not register server socket {}", e), Err(e) => error!("Can not register server socket {}", e),
} }
} }
fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> {
self.poll.registry().deregister(&mut info.lst)
}
fn deregister_logged(&self, info: &mut ServerSocketInfo) { fn deregister_logged(&self, info: &mut ServerSocketInfo) {
match self.deregister(info) { match self.poll.registry().deregister(&mut info.lst) {
Ok(_) => info!("Paused accepting connections on {}", info.addr), Ok(_) => info!("Paused accepting connections on {}", info.addr),
Err(e) => { Err(e) => {
error!("Can not deregister server socket {}", e) error!("Can not deregister server socket {}", e)
@ -351,7 +355,7 @@ impl Accept {
} }
} }
fn deregister_all(&self, sockets: &mut Slab<ServerSocketInfo>) { fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) {
// This is a best effort implementation with following limitation: // This is a best effort implementation with following limitation:
// //
// Every ServerSocketInfo with associate timeout will be skipped and it's timeout // Every ServerSocketInfo with associate timeout will be skipped and it's timeout
@ -364,13 +368,13 @@ impl Accept {
.iter_mut() .iter_mut()
// Take all timeout. // Take all timeout.
// This is to prevent Accept::process_timer method re-register a socket afterwards. // This is to prevent Accept::process_timer method re-register a socket afterwards.
.map(|(_, info)| (info.timeout.take(), info)) .map(|info| (info.timeout.take(), info))
// Socket info with a timeout is already deregistered so skip them. // Socket info with a timeout is already deregistered so skip them.
.filter(|(timeout, _)| timeout.is_none()) .filter(|(timeout, _)| timeout.is_none())
.for_each(|(_, info)| self.deregister_logged(info)); .for_each(|(_, info)| self.deregister_logged(info));
} }
fn maybe_backpressure(&mut self, sockets: &mut Slab<ServerSocketInfo>, on: bool) { fn maybe_backpressure(&mut self, sockets: &mut [ServerSocketInfo], on: bool) {
// Only operate when server is in a different backpressure than the given flag. // Only operate when server is in a different backpressure than the given flag.
if self.backpressure != on { if self.backpressure != on {
self.backpressure = on; self.backpressure = on;
@ -379,51 +383,21 @@ impl Accept {
// Only operate on sockets without associated timeout. // Only operate on sockets without associated timeout.
// Sockets with it should be handled by `accept` and `process_timer` methods. // Sockets with it should be handled by `accept` and `process_timer` methods.
// They are already deregistered or need to be reregister in the future. // They are already deregistered or need to be reregister in the future.
.filter(|(_, info)| info.timeout.is_none()) .filter(|info| info.timeout.is_none())
.for_each(|(token, info)| { .for_each(|info| {
if on { if on {
self.deregister_logged(info); self.deregister_logged(info);
} else { } else {
self.register_logged(token, info); self.register_logged(info);
} }
}); });
} }
} }
fn accept_one(&mut self, sockets: &mut Slab<ServerSocketInfo>, mut conn: Conn) {
if self.backpressure {
// send_connection would remove fault worker from handles.
// worst case here is conn get dropped after all handles are gone.
while let Err(c) = self.send_connection(sockets, conn) {
conn = c
}
} else {
while self.avail.available() {
let next = self.next();
let idx = next.idx();
if next.available() {
self.avail.set_available(idx, true);
match self.send_connection(sockets, conn) {
Ok(_) => return,
Err(c) => conn = c,
}
} else {
self.avail.set_available(idx, false);
self.set_next();
}
}
// Sending Conn failed due to either all workers are in error or not available.
// Enter backpressure state and try again.
self.maybe_backpressure(sockets, true);
self.accept_one(sockets, conn);
}
}
// Send connection to worker and handle error. // Send connection to worker and handle error.
fn send_connection( fn send_connection(
&mut self, &mut self,
sockets: &mut Slab<ServerSocketInfo>, sockets: &mut [ServerSocketInfo],
conn: Conn, conn: Conn,
) -> Result<(), Conn> { ) -> Result<(), Conn> {
match self.next().send(conn) { match self.next().send(conn) {
@ -451,19 +425,45 @@ impl Accept {
} }
} }
fn accept(&mut self, sockets: &mut Slab<ServerSocketInfo>, token: usize) { fn send_conn(&mut self, sockets: &mut [ServerSocketInfo], mut conn: Conn) {
loop { loop {
let info = sockets let next = self.next();
.get_mut(token) let idx = next.idx();
.expect("ServerSocketInfo is removed from Slab"); if next.available() {
self.avail.set_available(idx, true);
match self.send_connection(sockets, conn) {
Ok(_) => return,
Err(c) => conn = c,
}
} else {
self.avail.set_available(idx, false);
self.set_next();
if !self.avail.available() {
break;
}
}
}
// Sending connection failed due to either all workers are in error or not available.
// Enter backpressure state and try again.
self.maybe_backpressure(sockets, true);
// Force send connection to worker regardless it's avail state.
// Worst case here is conn get dropped after all handles are gone.
while let Err(c) = self.send_connection(sockets, conn) {
conn = c
}
}
fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) {
while self.avail.available() {
let info = &mut sockets[token];
match info.lst.accept() { match info.lst.accept() {
Ok(io) => { Ok(io) => {
let msg = Conn { let conn = Conn { io, token };
io, self.send_conn(sockets, conn);
token: info.token,
};
self.accept_one(sockets, msg);
} }
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return,
Err(ref e) if connection_error(e) => continue, Err(ref e) if connection_error(e) => continue,
@ -491,11 +491,22 @@ impl Accept {
} }
} }
fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) {
sockets
.iter_mut()
.map(|info| info.token)
.collect::<Vec<_>>()
.into_iter()
.for_each(|idx| self.accept(sockets, idx));
}
#[inline(always)]
fn next(&self) -> &WorkerHandleAccept { fn next(&self) -> &WorkerHandleAccept {
&self.handles[self.next] &self.handles[self.next]
} }
/// Set next worker handle that would accept connection. /// Set next worker handle that would accept connection.
#[inline(always)]
fn set_next(&mut self) { fn set_next(&mut self) {
self.next = (self.next + 1) % self.handles.len(); self.next = (self.next + 1) % self.handles.len();
} }

View File

@ -8,11 +8,14 @@ use std::{
use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use log::{error, info}; use log::{error, info};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; use tokio::sync::{
use tokio::sync::oneshot; mpsc::{unbounded_channel, UnboundedReceiver},
oneshot,
};
use crate::accept::AcceptLoop; use crate::accept::AcceptLoop;
use crate::config::{ConfiguredService, ServiceConfig}; use crate::config::{ConfiguredService, ServiceConfig};
use crate::join_all;
use crate::server::{Server, ServerCommand}; use crate::server::{Server, ServerCommand};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals}; use crate::signals::{Signal, Signals};
@ -23,16 +26,15 @@ use crate::worker::{
ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept, ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept,
WorkerHandleServer, WorkerHandleServer,
}; };
use crate::{join_all, Token};
/// Server builder /// Server builder
pub struct ServerBuilder { pub struct ServerBuilder {
threads: usize, threads: usize,
token: Token, token: usize,
backlog: u32, backlog: u32,
handles: Vec<(usize, WorkerHandleServer)>, handles: Vec<(usize, WorkerHandleServer)>,
services: Vec<Box<dyn InternalServiceFactory>>, services: Vec<Box<dyn InternalServiceFactory>>,
sockets: Vec<(Token, String, MioListener)>, sockets: Vec<(usize, String, MioListener)>,
accept: AcceptLoop, accept: AcceptLoop,
exit: bool, exit: bool,
no_signals: bool, no_signals: bool,
@ -56,7 +58,7 @@ impl ServerBuilder {
ServerBuilder { ServerBuilder {
threads: num_cpus::get(), threads: num_cpus::get(),
token: Token::default(), token: 0,
handles: Vec::new(), handles: Vec::new(),
services: Vec::new(), services: Vec::new(),
sockets: Vec::new(), sockets: Vec::new(),
@ -164,7 +166,7 @@ impl ServerBuilder {
if let Some(apply) = cfg.apply { if let Some(apply) = cfg.apply {
let mut srv = ConfiguredService::new(apply); let mut srv = ConfiguredService::new(apply);
for (name, lst) in cfg.services { for (name, lst) in cfg.services {
let token = self.token.next(); let token = self.next_token();
srv.stream(token, name.clone(), lst.local_addr()?); srv.stream(token, name.clone(), lst.local_addr()?);
self.sockets.push((token, name, MioListener::Tcp(lst))); self.sockets.push((token, name, MioListener::Tcp(lst)));
} }
@ -184,7 +186,7 @@ impl ServerBuilder {
let sockets = bind_addr(addr, self.backlog)?; let sockets = bind_addr(addr, self.backlog)?;
for lst in sockets { for lst in sockets {
let token = self.token.next(); let token = self.next_token();
self.services.push(StreamNewService::create( self.services.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
@ -233,7 +235,7 @@ impl ServerBuilder {
{ {
use std::net::{IpAddr, Ipv4Addr}; use std::net::{IpAddr, Ipv4Addr};
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
let token = self.token.next(); let token = self.next_token();
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
self.services.push(StreamNewService::create( self.services.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
@ -259,7 +261,7 @@ impl ServerBuilder {
lst.set_nonblocking(true)?; lst.set_nonblocking(true)?;
let addr = lst.local_addr()?; let addr = lst.local_addr()?;
let token = self.token.next(); let token = self.next_token();
self.services.push(StreamNewService::create( self.services.push(StreamNewService::create(
name.as_ref().to_string(), name.as_ref().to_string(),
token, token,
@ -437,6 +439,12 @@ impl ServerBuilder {
} }
} }
} }
fn next_token(&mut self) -> usize {
let token = self.token;
self.token += 1;
token
}
} }
impl Future for ServerBuilder { impl Future for ServerBuilder {

View File

@ -14,7 +14,6 @@ use log::error;
use crate::builder::bind_addr; use crate::builder::bind_addr;
use crate::service::{BoxedServerService, InternalServiceFactory, StreamService}; use crate::service::{BoxedServerService, InternalServiceFactory, StreamService};
use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::Token;
pub struct ServiceConfig { pub struct ServiceConfig {
pub(crate) services: Vec<(String, MioTcpListener)>, pub(crate) services: Vec<(String, MioTcpListener)>,
@ -81,9 +80,9 @@ impl ServiceConfig {
pub(super) struct ConfiguredService { pub(super) struct ConfiguredService {
rt: Box<dyn ServiceRuntimeConfiguration>, rt: Box<dyn ServiceRuntimeConfiguration>,
names: HashMap<Token, (String, StdSocketAddr)>, names: HashMap<usize, (String, StdSocketAddr)>,
topics: HashMap<String, Token>, topics: HashMap<String, usize>,
services: Vec<Token>, services: Vec<usize>,
} }
impl ConfiguredService { impl ConfiguredService {
@ -96,7 +95,7 @@ impl ConfiguredService {
} }
} }
pub(super) fn stream(&mut self, token: Token, name: String, addr: StdSocketAddr) { pub(super) fn stream(&mut self, token: usize, name: String, addr: StdSocketAddr) {
self.names.insert(token, (name.clone(), addr)); self.names.insert(token, (name.clone(), addr));
self.topics.insert(name, token); self.topics.insert(name, token);
self.services.push(token); self.services.push(token);
@ -104,8 +103,8 @@ impl ConfiguredService {
} }
impl InternalServiceFactory for ConfiguredService { impl InternalServiceFactory for ConfiguredService {
fn name(&self, token: Token) -> &str { fn name(&self, idx: usize) -> &str {
&self.names[&token].0 &self.names[&idx].0
} }
fn clone_factory(&self) -> Box<dyn InternalServiceFactory> { fn clone_factory(&self) -> Box<dyn InternalServiceFactory> {
@ -117,7 +116,7 @@ impl InternalServiceFactory for ConfiguredService {
}) })
} }
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> { fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(usize, BoxedServerService)>, ()>> {
// configure services // configure services
let mut rt = ServiceRuntime::new(self.topics.clone()); let mut rt = ServiceRuntime::new(self.topics.clone());
self.rt.configure(&mut rt); self.rt.configure(&mut rt);
@ -185,13 +184,13 @@ fn not_configured(_: &mut ServiceRuntime) {
} }
pub struct ServiceRuntime { pub struct ServiceRuntime {
names: HashMap<String, Token>, names: HashMap<String, usize>,
services: HashMap<Token, BoxedNewService>, services: HashMap<usize, BoxedNewService>,
onstart: Vec<LocalBoxFuture<'static, ()>>, onstart: Vec<LocalBoxFuture<'static, ()>>,
} }
impl ServiceRuntime { impl ServiceRuntime {
fn new(names: HashMap<String, Token>) -> Self { fn new(names: HashMap<String, usize>) -> Self {
ServiceRuntime { ServiceRuntime {
names, names,
services: HashMap::new(), services: HashMap::new(),

View File

@ -28,28 +28,6 @@ use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
/// Socket ID token
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub(crate) struct Token(usize);
impl Default for Token {
fn default() -> Self {
Self::new()
}
}
impl Token {
fn new() -> Self {
Self(0)
}
pub(crate) fn next(&mut self) -> Token {
let token = Token(self.0);
self.0 += 1;
token
}
}
/// Start server building process /// Start server building process
pub fn new() -> ServerBuilder { pub fn new() -> ServerBuilder {
ServerBuilder::default() ServerBuilder::default()

View File

@ -11,7 +11,6 @@ use futures_core::future::LocalBoxFuture;
use log::error; use log::error;
use crate::socket::{FromStream, MioStream}; use crate::socket::{FromStream, MioStream};
use crate::Token;
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: BaseServiceFactory<Stream, Config = ()>; type Factory: BaseServiceFactory<Stream, Config = ()>;
@ -20,11 +19,11 @@ pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
} }
pub(crate) trait InternalServiceFactory: Send { pub(crate) trait InternalServiceFactory: Send {
fn name(&self, token: Token) -> &str; fn name(&self, token: usize) -> &str;
fn clone_factory(&self) -> Box<dyn InternalServiceFactory>; fn clone_factory(&self) -> Box<dyn InternalServiceFactory>;
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>; fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(usize, BoxedServerService)>, ()>>;
} }
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
@ -86,7 +85,7 @@ where
pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> { pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
name: String, name: String,
inner: F, inner: F,
token: Token, token: usize,
addr: SocketAddr, addr: SocketAddr,
_t: PhantomData<Io>, _t: PhantomData<Io>,
} }
@ -98,7 +97,7 @@ where
{ {
pub(crate) fn create( pub(crate) fn create(
name: String, name: String,
token: Token, token: usize,
inner: F, inner: F,
addr: SocketAddr, addr: SocketAddr,
) -> Box<dyn InternalServiceFactory> { ) -> Box<dyn InternalServiceFactory> {
@ -117,7 +116,7 @@ where
F: ServiceFactory<Io>, F: ServiceFactory<Io>,
Io: FromStream + Send + 'static, Io: FromStream + Send + 'static,
{ {
fn name(&self, _: Token) -> &str { fn name(&self, _: usize) -> &str {
&self.name &self.name
} }
@ -131,7 +130,7 @@ where
}) })
} }
fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>> { fn create(&self) -> LocalBoxFuture<'static, Result<Vec<(usize, BoxedServerService)>, ()>> {
let token = self.token; let token = self.token;
let fut = self.inner.create().new_service(()); let fut = self.inner.create().new_service(());
Box::pin(async move { Box::pin(async move {

View File

@ -23,10 +23,10 @@ use tokio::sync::{
oneshot, oneshot,
}; };
use crate::join_all;
use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::service::{BoxedServerService, InternalServiceFactory};
use crate::socket::MioStream; use crate::socket::MioStream;
use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::{join_all, Token};
/// Stop worker message. Returns `true` on successful graceful shutdown. /// Stop worker message. Returns `true` on successful graceful shutdown.
/// and `false` if some connections still alive when shutdown execute. /// and `false` if some connections still alive when shutdown execute.
@ -38,7 +38,7 @@ pub(crate) struct Stop {
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct Conn { pub(crate) struct Conn {
pub io: MioStream, pub io: MioStream,
pub token: Token, pub token: usize,
} }
fn handle_pair( fn handle_pair(
@ -249,8 +249,8 @@ impl ServerWorker {
Ok(res) => res Ok(res) => res
.into_iter() .into_iter()
.flatten() .flatten()
.fold(Vec::new(), |mut services, (factory, token, service)| { .fold(Vec::new(), |mut services, (factory, idx, service)| {
assert_eq!(token.0, services.len()); assert_eq!(idx, services.len());
services.push(WorkerService { services.push(WorkerService {
factory, factory,
service, service,
@ -283,13 +283,13 @@ impl ServerWorker {
handle_pair(idx, tx1, tx2, avail) handle_pair(idx, tx1, tx2, avail)
} }
fn restart_service(&mut self, token: Token, factory_id: usize) { fn restart_service(&mut self, idx: usize, factory_id: usize) {
let factory = &self.factories[factory_id]; let factory = &self.factories[factory_id];
trace!("Service {:?} failed, restarting", factory.name(token)); trace!("Service {:?} failed, restarting", factory.name(idx));
self.services[token.0].status = WorkerServiceStatus::Restarting; self.services[idx].status = WorkerServiceStatus::Restarting;
self.state = WorkerState::Restarting(Restart { self.state = WorkerState::Restarting(Restart {
factory_id, factory_id,
token, idx,
fut: factory.create(), fut: factory.create(),
}); });
} }
@ -307,7 +307,7 @@ impl ServerWorker {
}); });
} }
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> { fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (usize, usize)> {
let mut ready = self.conns.available(cx); let mut ready = self.conns.available(cx);
for (idx, srv) in self.services.iter_mut().enumerate() { for (idx, srv) in self.services.iter_mut().enumerate() {
if srv.status == WorkerServiceStatus::Available if srv.status == WorkerServiceStatus::Available
@ -318,7 +318,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Unavailable { if srv.status == WorkerServiceStatus::Unavailable {
trace!( trace!(
"Service {:?} is available", "Service {:?} is available",
self.factories[srv.factory].name(Token(idx)) self.factories[srv.factory].name(idx)
); );
srv.status = WorkerServiceStatus::Available; srv.status = WorkerServiceStatus::Available;
} }
@ -329,7 +329,7 @@ impl ServerWorker {
if srv.status == WorkerServiceStatus::Available { if srv.status == WorkerServiceStatus::Available {
trace!( trace!(
"Service {:?} is unavailable", "Service {:?} is unavailable",
self.factories[srv.factory].name(Token(idx)) self.factories[srv.factory].name(idx)
); );
srv.status = WorkerServiceStatus::Unavailable; srv.status = WorkerServiceStatus::Unavailable;
} }
@ -337,10 +337,10 @@ impl ServerWorker {
Poll::Ready(Err(_)) => { Poll::Ready(Err(_)) => {
error!( error!(
"Service {:?} readiness check returned error, restarting", "Service {:?} readiness check returned error, restarting",
self.factories[srv.factory].name(Token(idx)) self.factories[srv.factory].name(idx)
); );
srv.status = WorkerServiceStatus::Failed; srv.status = WorkerServiceStatus::Failed;
return Err((Token(idx), srv.factory)); return Err((idx, srv.factory));
} }
} }
} }
@ -359,8 +359,8 @@ enum WorkerState {
struct Restart { struct Restart {
factory_id: usize, factory_id: usize,
token: Token, idx: usize,
fut: LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>, fut: LocalBoxFuture<'static, Result<Vec<(usize, BoxedServerService)>, ()>>,
} }
// Shutdown keep states necessary for server shutdown: // Shutdown keep states necessary for server shutdown:
@ -438,7 +438,7 @@ impl Future for ServerWorker {
}, },
WorkerState::Restarting(ref mut restart) => { WorkerState::Restarting(ref mut restart) => {
let factory_id = restart.factory_id; let factory_id = restart.factory_id;
let token = restart.token; let token = restart.idx;
let service = ready!(restart.fut.as_mut().poll(cx)) let service = ready!(restart.fut.as_mut().poll(cx))
.unwrap_or_else(|_| { .unwrap_or_else(|_| {
@ -459,7 +459,7 @@ impl Future for ServerWorker {
this.factories[factory_id].name(token) this.factories[factory_id].name(token)
); );
this.services[token.0].created(service); this.services[token].created(service);
this.state = WorkerState::Unavailable; this.state = WorkerState::Unavailable;
self.poll(cx) self.poll(cx)
@ -508,7 +508,7 @@ impl Future for ServerWorker {
// handle incoming io stream // handle incoming io stream
Some(msg) => { Some(msg) => {
let guard = this.conns.get(); let guard = this.conns.get();
let _ = this.services[msg.token.0].service.call((guard, msg.io)); let _ = this.services[msg.token].service.call((guard, msg.io));
} }
None => return Poll::Ready(()), None => return Poll::Ready(()),
}; };

View File

@ -436,3 +436,144 @@ async fn test_service_restart() {
let _ = server.stop(false); let _ = server.stop(false);
let _ = h.join().unwrap(); let _ = h.join().unwrap();
} }
#[ignore]
#[actix_rt::test]
async fn worker_restart() {
use actix_service::{Service, ServiceFactory};
use futures_core::future::LocalBoxFuture;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
struct TestServiceFactory(Arc<AtomicUsize>);
impl ServiceFactory<TcpStream> for TestServiceFactory {
type Response = ();
type Error = ();
type Config = ();
type Service = TestService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: Self::Config) -> Self::Future {
let counter = self.0.fetch_add(1, Ordering::Relaxed);
Box::pin(async move { Ok(TestService(counter)) })
}
}
struct TestService(usize);
impl Service<TcpStream> for TestService {
type Response = ();
type Error = ();
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
actix_service::always_ready!();
fn call(&self, stream: TcpStream) -> Self::Future {
let counter = self.0;
let mut stream = stream.into_std().unwrap();
use std::io::Write;
let str = counter.to_string();
let buf = str.as_bytes();
let mut written = 0;
while written < buf.len() {
if let Ok(n) = stream.write(&buf[written..]) {
written += n;
}
}
stream.flush().unwrap();
stream.shutdown(net::Shutdown::Write).unwrap();
// force worker 2 to restart service once.
if counter == 2 {
panic!("panic on purpose")
} else {
Box::pin(async { Ok(()) })
}
}
}
let addr = unused_addr();
let (tx, rx) = mpsc::channel();
let counter = Arc::new(AtomicUsize::new(1));
let h = thread::spawn(move || {
let counter = counter.clone();
actix_rt::System::new().block_on(async {
let server = Server::build()
.disable_signals()
.bind("addr", addr, move || TestServiceFactory(counter.clone()))
.unwrap()
.workers(2)
.run();
let _ = tx.send((server.clone(), actix_rt::System::current()));
server.await
})
});
let (server, sys) = rx.recv().unwrap();
sleep(Duration::from_secs(3)).await;
let mut buf = [0; 8];
// worker 1 would not restart and return it's id consistently.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// worker 2 dead after return response.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("2", id);
stream.shutdown().await.unwrap();
// request to worker 1
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 restarting and work goes to worker 1.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 restarted but worker 1 was still the next to accept connection.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("1", id);
stream.shutdown().await.unwrap();
// TODO: Remove sleep if it can pass CI.
sleep(Duration::from_secs(3)).await;
// worker 2 accept connection again but it's id is 3.
let mut stream = TcpStream::connect(addr).await.unwrap();
let n = stream.read(&mut buf).await.unwrap();
let id = String::from_utf8_lossy(&buf[0..n]);
assert_eq!("3", id);
stream.shutdown().await.unwrap();
sys.stop();
let _ = server.stop(false);
let _ = h.join().unwrap();
}

View File

@ -1,6 +1,9 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 2.0.0 - 2021-04-16
* Removed pipeline and related structs/functions. [#335] * Removed pipeline and related structs/functions. [#335]
[#335]: https://github.com/actix/actix-net/pull/335 [#335]: https://github.com/actix/actix-net/pull/335

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-service" name = "actix-service"
version = "2.0.0-beta.5" version = "2.0.0"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>", "Rob Ede <robjtede@icloud.com>",
@ -8,11 +8,8 @@ authors = [
] ]
description = "Service trait and combinators for representing asynchronous request/response operations." description = "Service trait and combinators for representing asynchronous request/response operations."
keywords = ["network", "framework", "async", "futures", "service"] keywords = ["network", "framework", "async", "futures", "service"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-service"
readme = "README.md"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
repository = "https://github.com/actix/actix-net"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"
@ -27,5 +24,5 @@ pin-project-lite = "0.2"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0" actix-rt = "2.0.0"
actix-utils = "3.0.0-beta.4" actix-utils = "3.0.0"
futures-util = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false }

View File

@ -3,10 +3,10 @@
> Service trait and combinators for representing asynchronous request/response operations. > Service trait and combinators for representing asynchronous request/response operations.
[![crates.io](https://img.shields.io/crates/v/actix-service?label=latest)](https://crates.io/crates/actix-service) [![crates.io](https://img.shields.io/crates/v/actix-service?label=latest)](https://crates.io/crates/actix-service)
[![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0-beta.5)](https://docs.rs/actix-service/2.0.0-beta.5) [![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0)](https://docs.rs/actix-service/2.0.0)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html) [![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![License](https://img.shields.io/crates/l/actix-service.svg) ![License](https://img.shields.io/crates/l/actix-service.svg)
[![Dependency Status](https://deps.rs/crate/actix-service/2.0.0-beta.5/status.svg)](https://deps.rs/crate/actix-service/2.0.0-beta.5) [![Dependency Status](https://deps.rs/crate/actix-service/2.0.0/status.svg)](https://deps.rs/crate/actix-service/2.0.0)
![Download](https://img.shields.io/crates/d/actix-service.svg) ![Download](https://img.shields.io/crates/d/actix-service.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x) [![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)

View File

@ -42,8 +42,8 @@ uri = ["http"]
[dependencies] [dependencies]
actix-codec = "0.4.0-beta.1" actix-codec = "0.4.0-beta.1"
actix-rt = { version = "2.2.0", default-features = false } actix-rt = { version = "2.2.0", default-features = false }
actix-service = "2.0.0-beta.5" actix-service = "2.0.0"
actix-utils = "3.0.0-beta.4" actix-utils = "3.0.0"
derive_more = "0.99.5" derive_more = "0.99.5"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }

View File

@ -16,8 +16,8 @@ name = "actix_tracing"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-service = "2.0.0-beta.5" actix-service = "2.0.0"
actix-utils = "3.0.0-beta.4" actix-utils = "3.0.0"
tracing = "0.1" tracing = "0.1"
tracing-futures = "0.2" tracing-futures = "0.2"

View File

@ -3,6 +3,10 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 3.0.0 - 2021-04-16
* No significant changes from `3.0.0-beta.4`.
## 3.0.0-beta.4 - 2021-04-01 ## 3.0.0-beta.4 - 2021-04-01
* Add `future::Either` type. [#305] * Add `future::Either` type. [#305]

View File

@ -1,14 +1,14 @@
[package] [package]
name = "actix-utils" name = "actix-utils"
version = "3.0.0-beta.4" version = "3.0.0"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>", "Rob Ede <robjtede@icloud.com>",
] ]
description = "Utilities for the Actix ecosystem" description = "Various utilities used in the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
repository = "https://github.com/actix/actix-net.git"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
repository = "https://github.com/actix/actix-net"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"

View File

@ -1,4 +1,4 @@
//! Various utilities for the Actix ecosystem. //! Various utilities used in the Actix ecosystem.
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style)]
#![warn(missing_docs)] #![warn(missing_docs)]