From b0f02ae8fc9968fae8f0581432236e0a9ddfefa2 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sat, 17 Apr 2021 11:07:11 +0800 Subject: [PATCH] Remove Slab --- actix-server/Cargo.toml | 5 +- actix-server/src/accept.rs | 299 ++++++++++++++++-------------- actix-server/src/builder.rs | 28 ++- actix-server/src/config.rs | 21 +-- actix-server/src/lib.rs | 22 --- actix-server/src/service.rs | 13 +- actix-server/src/worker.rs | 36 ++-- actix-server/tests/test_server.rs | 141 ++++++++++++++ actix-service/CHANGES.md | 3 + actix-service/Cargo.toml | 9 +- actix-service/README.md | 4 +- actix-tls/Cargo.toml | 4 +- actix-tracing/Cargo.toml | 4 +- actix-utils/CHANGES.md | 4 + actix-utils/Cargo.toml | 6 +- actix-utils/src/lib.rs | 2 +- 16 files changed, 370 insertions(+), 231 deletions(-) diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 80b44c6d..72651993 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -22,14 +22,13 @@ default = [] [dependencies] actix-rt = { version = "2.0.0", default-features = false } -actix-service = "2.0.0-beta.5" -actix-utils = "3.0.0-beta.4" +actix-service = "2.0.0" +actix-utils = "3.0.0" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } log = "0.4" mio = { version = "0.7.6", features = ["os-poll", "net"] } num_cpus = "1.13" -slab = "0.4" tokio = { version = "1.2", features = ["sync"] } [dev-dependencies] diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 23ba616c..c5215d3a 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -7,21 +7,17 @@ use actix_rt::{ }; use log::{error, info}; use mio::{Interest, Poll, Token as MioToken}; -use slab::Slab; use crate::server::Server; use crate::socket::{MioListener, SocketAddr}; use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandleAccept}; -use crate::Token; struct ServerSocketInfo { /// Address of socket. Mainly used for logging. addr: SocketAddr, - /// Beware this is the crate token for identify socket and should not be confused - /// with `mio::Token`. - token: Token, + token: usize, lst: MioListener, @@ -65,7 +61,7 @@ impl AcceptLoop { pub(crate) fn start( &mut self, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, handles: Vec, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); @@ -98,6 +94,7 @@ impl Default for Availability { impl Availability { /// Check if any worker handle is available + #[inline(always)] fn available(&self) -> bool { self.0.iter().any(|a| *a != 0) } @@ -150,7 +147,7 @@ impl Accept { pub(crate) fn start( poll: Poll, waker: WakerQueue, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, srv: Server, handles: Vec, ) { @@ -161,10 +158,10 @@ impl Accept { .name("actix-server accept loop".to_owned()) .spawn(move || { System::set_current(sys); - let (mut accept, sockets) = + let (mut accept, mut sockets) = Accept::new_with_sockets(poll, waker, socks, handles, srv); - accept.poll_with(sockets); + accept.poll_with(&mut sockets); }) .unwrap(); } @@ -172,29 +169,28 @@ impl Accept { fn new_with_sockets( poll: Poll, waker: WakerQueue, - socks: Vec<(Token, MioListener)>, + socks: Vec<(usize, MioListener)>, handles: Vec, srv: Server, - ) -> (Accept, Slab) { - let mut sockets = Slab::new(); - for (hnd_token, mut lst) in socks.into_iter() { - let addr = lst.local_addr(); + ) -> (Accept, Vec) { + let sockets = socks + .into_iter() + .map(|(token, mut lst)| { + let addr = lst.local_addr(); - let entry = sockets.vacant_entry(); - let token = entry.key(); + // Start listening for incoming connections + poll.registry() + .register(&mut lst, MioToken(token), Interest::READABLE) + .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - // Start listening for incoming connections - poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); - - entry.insert(ServerSocketInfo { - addr, - token: hnd_token, - lst, - timeout: None, - }); - } + ServerSocketInfo { + addr, + token, + lst, + timeout: None, + } + }) + .collect(); let mut avail = Availability::default(); @@ -214,7 +210,7 @@ impl Accept { (accept, sockets) } - fn poll_with(&mut self, mut sockets: Slab) { + fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { let mut events = mio::Events::with_capacity(128); loop { @@ -228,77 +224,89 @@ impl Accept { for event in events.iter() { let token = event.token(); match token { - // This is a loop because interests for command from previous version was - // a loop that would try to drain the command channel. It's yet unknown - // if it's necessary/good practice to actively drain the waker queue. - WAKER_TOKEN => 'waker: loop { - // take guard with every iteration so no new interest can be added - // until the current task is done. - let mut guard = self.waker.guard(); - match guard.pop_front() { - // worker notify it becomes available. we may want to recover - // from backpressure. - Some(WakerInterest::WorkerAvailable(idx)) => { - drop(guard); - self.maybe_backpressure(&mut sockets, false); - self.avail.set_available(idx, true); - } - // a new worker thread is made and it's handle would be added to Accept - Some(WakerInterest::Worker(handle)) => { - drop(guard); - // maybe we want to recover from a backpressure. - self.maybe_backpressure(&mut sockets, false); - self.avail.set_available(handle.idx(), true); - self.handles.push(handle); - } - // got timer interest and it's time to try register socket(s) again - Some(WakerInterest::Timer) => { - drop(guard); - self.process_timer(&mut sockets) - } - Some(WakerInterest::Pause) => { - drop(guard); - self.deregister_all(&mut sockets); - } - Some(WakerInterest::Resume) => { - drop(guard); - sockets.iter_mut().for_each(|(token, info)| { - self.register_logged(token, info); - }); - } - Some(WakerInterest::Stop) => { - return self.deregister_all(&mut sockets); - } - // waker queue is drained - None => { - // Reset the WakerQueue before break so it does not grow infinitely - WakerQueue::reset(&mut guard); - break 'waker; - } + WAKER_TOKEN => { + let exit = self.handle_waker(sockets); + if exit { + return; } - }, + } _ => { let token = usize::from(token); - self.accept(&mut sockets, token); + self.accept(sockets, token); } } } } } - fn process_timer(&self, sockets: &mut Slab) { + fn handle_waker(&mut self, sockets: &mut [ServerSocketInfo]) -> bool { + // This is a loop because interests for command from previous version was + // a loop that would try to drain the command channel. It's yet unknown + // if it's necessary/good practice to actively drain the waker queue. + loop { + // take guard with every iteration so no new interest can be added + // until the current task is done. + let mut guard = self.waker.guard(); + match guard.pop_front() { + // worker notify it becomes available. we may want to recover + // from backpressure. + Some(WakerInterest::WorkerAvailable(idx)) => { + drop(guard); + self.maybe_backpressure(sockets, false); + self.avail.set_available(idx, true); + self.accept_all(sockets); + } + // a new worker thread is made and it's handle would be added to Accept + Some(WakerInterest::Worker(handle)) => { + drop(guard); + // maybe we want to recover from a backpressure. + self.maybe_backpressure(sockets, false); + self.avail.set_available(handle.idx(), true); + self.handles.push(handle); + self.accept_all(sockets); + } + // got timer interest and it's time to try register socket(s) again + Some(WakerInterest::Timer) => { + drop(guard); + self.process_timer(sockets) + } + Some(WakerInterest::Pause) => { + drop(guard); + self.deregister_all(sockets); + } + Some(WakerInterest::Resume) => { + drop(guard); + sockets.iter_mut().for_each(|info| { + self.register_logged(info); + }); + } + Some(WakerInterest::Stop) => { + self.deregister_all(sockets); + return true; + } + // waker queue is drained + None => { + // Reset the WakerQueue before break so it does not grow infinitely + WakerQueue::reset(&mut guard); + return false; + } + } + } + } + + fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { let now = Instant::now(); sockets .iter_mut() // Only sockets that had an associated timeout were deregistered. - .filter(|(_, info)| info.timeout.is_some()) - .for_each(|(token, info)| { + .filter(|info| info.timeout.is_some()) + .for_each(|info| { let inst = info.timeout.take().unwrap(); if now < inst { info.timeout = Some(inst); } else if !self.backpressure { - self.register_logged(token, info); + self.register_logged(info); } // Drop the timeout if server is in backpressure and socket timeout is expired. @@ -308,42 +316,38 @@ impl Accept { } #[cfg(not(target_os = "windows"))] - fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { + let token = MioToken(info.token); self.poll .registry() - .register(&mut info.lst, MioToken(token), Interest::READABLE) + .register(&mut info.lst, token, Interest::READABLE) } #[cfg(target_os = "windows")] - fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { + fn register(&self, info: &mut ServerSocketInfo) -> io::Result<()> { // On windows, calling register without deregister cause an error. // See https://github.com/actix/actix-web/issues/905 // Calling reregister seems to fix the issue. + let token = MioToken(info.token); self.poll .registry() - .register(&mut info.lst, mio::Token(token), Interest::READABLE) + .register(&mut info.lst, token, Interest::READABLE) .or_else(|_| { - self.poll.registry().reregister( - &mut info.lst, - mio::Token(token), - Interest::READABLE, - ) + self.poll + .registry() + .reregister(&mut info.lst, token, Interest::READABLE) }) } - fn register_logged(&self, token: usize, info: &mut ServerSocketInfo) { - match self.register(token, info) { + fn register_logged(&self, info: &mut ServerSocketInfo) { + match self.register(info) { Ok(_) => info!("Resume accepting connections on {}", info.addr), Err(e) => error!("Can not register server socket {}", e), } } - fn deregister(&self, info: &mut ServerSocketInfo) -> io::Result<()> { - self.poll.registry().deregister(&mut info.lst) - } - fn deregister_logged(&self, info: &mut ServerSocketInfo) { - match self.deregister(info) { + match self.poll.registry().deregister(&mut info.lst) { Ok(_) => info!("Paused accepting connections on {}", info.addr), Err(e) => { error!("Can not deregister server socket {}", e) @@ -351,7 +355,7 @@ impl Accept { } } - fn deregister_all(&self, sockets: &mut Slab) { + fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { // This is a best effort implementation with following limitation: // // Every ServerSocketInfo with associate timeout will be skipped and it's timeout @@ -364,13 +368,13 @@ impl Accept { .iter_mut() // Take all timeout. // This is to prevent Accept::process_timer method re-register a socket afterwards. - .map(|(_, info)| (info.timeout.take(), info)) + .map(|info| (info.timeout.take(), info)) // Socket info with a timeout is already deregistered so skip them. .filter(|(timeout, _)| timeout.is_none()) .for_each(|(_, info)| self.deregister_logged(info)); } - fn maybe_backpressure(&mut self, sockets: &mut Slab, on: bool) { + fn maybe_backpressure(&mut self, sockets: &mut [ServerSocketInfo], on: bool) { // Only operate when server is in a different backpressure than the given flag. if self.backpressure != on { self.backpressure = on; @@ -379,51 +383,21 @@ impl Accept { // Only operate on sockets without associated timeout. // Sockets with it should be handled by `accept` and `process_timer` methods. // They are already deregistered or need to be reregister in the future. - .filter(|(_, info)| info.timeout.is_none()) - .for_each(|(token, info)| { + .filter(|info| info.timeout.is_none()) + .for_each(|info| { if on { self.deregister_logged(info); } else { - self.register_logged(token, info); + self.register_logged(info); } }); } } - fn accept_one(&mut self, sockets: &mut Slab, mut conn: Conn) { - if self.backpressure { - // send_connection would remove fault worker from handles. - // worst case here is conn get dropped after all handles are gone. - while let Err(c) = self.send_connection(sockets, conn) { - conn = c - } - } else { - while self.avail.available() { - let next = self.next(); - let idx = next.idx(); - if next.available() { - self.avail.set_available(idx, true); - match self.send_connection(sockets, conn) { - Ok(_) => return, - Err(c) => conn = c, - } - } else { - self.avail.set_available(idx, false); - self.set_next(); - } - } - - // Sending Conn failed due to either all workers are in error or not available. - // Enter backpressure state and try again. - self.maybe_backpressure(sockets, true); - self.accept_one(sockets, conn); - } - } - // Send connection to worker and handle error. fn send_connection( &mut self, - sockets: &mut Slab, + sockets: &mut [ServerSocketInfo], conn: Conn, ) -> Result<(), Conn> { match self.next().send(conn) { @@ -451,19 +425,45 @@ impl Accept { } } - fn accept(&mut self, sockets: &mut Slab, token: usize) { + fn send_conn(&mut self, sockets: &mut [ServerSocketInfo], mut conn: Conn) { loop { - let info = sockets - .get_mut(token) - .expect("ServerSocketInfo is removed from Slab"); + let next = self.next(); + let idx = next.idx(); + if next.available() { + self.avail.set_available(idx, true); + match self.send_connection(sockets, conn) { + Ok(_) => return, + Err(c) => conn = c, + } + } else { + self.avail.set_available(idx, false); + self.set_next(); + + if !self.avail.available() { + break; + } + } + } + + // Sending connection failed due to either all workers are in error or not available. + // Enter backpressure state and try again. + self.maybe_backpressure(sockets, true); + + // Force send connection to worker regardless it's avail state. + // Worst case here is conn get dropped after all handles are gone. + while let Err(c) = self.send_connection(sockets, conn) { + conn = c + } + } + + fn accept(&mut self, sockets: &mut [ServerSocketInfo], token: usize) { + while self.avail.available() { + let info = &mut sockets[token]; match info.lst.accept() { Ok(io) => { - let msg = Conn { - io, - token: info.token, - }; - self.accept_one(sockets, msg); + let conn = Conn { io, token }; + self.send_conn(sockets, conn); } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, @@ -491,11 +491,22 @@ impl Accept { } } + fn accept_all(&mut self, sockets: &mut [ServerSocketInfo]) { + sockets + .iter_mut() + .map(|info| info.token) + .collect::>() + .into_iter() + .for_each(|idx| self.accept(sockets, idx)); + } + + #[inline(always)] fn next(&self) -> &WorkerHandleAccept { &self.handles[self.next] } /// Set next worker handle that would accept connection. + #[inline(always)] fn set_next(&mut self) { self.next = (self.next + 1) % self.handles.len(); } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 66aba10c..bce22dbe 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -8,11 +8,14 @@ use std::{ use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; use log::{error, info}; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; -use tokio::sync::oneshot; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver}, + oneshot, +}; use crate::accept::AcceptLoop; use crate::config::{ConfiguredService, ServiceConfig}; +use crate::join_all; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; @@ -23,16 +26,15 @@ use crate::worker::{ ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept, WorkerHandleServer, }; -use crate::{join_all, Token}; /// Server builder pub struct ServerBuilder { threads: usize, - token: Token, + token: usize, backlog: u32, handles: Vec<(usize, WorkerHandleServer)>, services: Vec>, - sockets: Vec<(Token, String, MioListener)>, + sockets: Vec<(usize, String, MioListener)>, accept: AcceptLoop, exit: bool, no_signals: bool, @@ -56,7 +58,7 @@ impl ServerBuilder { ServerBuilder { threads: num_cpus::get(), - token: Token::default(), + token: 0, handles: Vec::new(), services: Vec::new(), sockets: Vec::new(), @@ -164,7 +166,7 @@ impl ServerBuilder { if let Some(apply) = cfg.apply { let mut srv = ConfiguredService::new(apply); for (name, lst) in cfg.services { - let token = self.token.next(); + let token = self.next_token(); srv.stream(token, name.clone(), lst.local_addr()?); self.sockets.push((token, name, MioListener::Tcp(lst))); } @@ -184,7 +186,7 @@ impl ServerBuilder { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { - let token = self.token.next(); + let token = self.next_token(); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -233,7 +235,7 @@ impl ServerBuilder { { use std::net::{IpAddr, Ipv4Addr}; lst.set_nonblocking(true)?; - let token = self.token.next(); + let token = self.next_token(); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); self.services.push(StreamNewService::create( name.as_ref().to_string(), @@ -259,7 +261,7 @@ impl ServerBuilder { lst.set_nonblocking(true)?; let addr = lst.local_addr()?; - let token = self.token.next(); + let token = self.next_token(); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -437,6 +439,12 @@ impl ServerBuilder { } } } + + fn next_token(&mut self) -> usize { + let token = self.token; + self.token += 1; + token + } } impl Future for ServerBuilder { diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index c5e63630..722ff1ea 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -14,7 +14,6 @@ use log::error; use crate::builder::bind_addr; use crate::service::{BoxedServerService, InternalServiceFactory, StreamService}; use crate::socket::{MioStream, MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::Token; pub struct ServiceConfig { pub(crate) services: Vec<(String, MioTcpListener)>, @@ -81,9 +80,9 @@ impl ServiceConfig { pub(super) struct ConfiguredService { rt: Box, - names: HashMap, - topics: HashMap, - services: Vec, + names: HashMap, + topics: HashMap, + services: Vec, } impl ConfiguredService { @@ -96,7 +95,7 @@ impl ConfiguredService { } } - pub(super) fn stream(&mut self, token: Token, name: String, addr: StdSocketAddr) { + pub(super) fn stream(&mut self, token: usize, name: String, addr: StdSocketAddr) { self.names.insert(token, (name.clone(), addr)); self.topics.insert(name, token); self.services.push(token); @@ -104,8 +103,8 @@ impl ConfiguredService { } impl InternalServiceFactory for ConfiguredService { - fn name(&self, token: Token) -> &str { - &self.names[&token].0 + fn name(&self, idx: usize) -> &str { + &self.names[&idx].0 } fn clone_factory(&self) -> Box { @@ -117,7 +116,7 @@ impl InternalServiceFactory for ConfiguredService { }) } - fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { // configure services let mut rt = ServiceRuntime::new(self.topics.clone()); self.rt.configure(&mut rt); @@ -185,13 +184,13 @@ fn not_configured(_: &mut ServiceRuntime) { } pub struct ServiceRuntime { - names: HashMap, - services: HashMap, + names: HashMap, + services: HashMap, onstart: Vec>, } impl ServiceRuntime { - fn new(names: HashMap) -> Self { + fn new(names: HashMap) -> Self { ServiceRuntime { names, services: HashMap::new(), diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index af9ab0b0..ba7d0c29 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -28,28 +28,6 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -/// Socket ID token -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -pub(crate) struct Token(usize); - -impl Default for Token { - fn default() -> Self { - Self::new() - } -} - -impl Token { - fn new() -> Self { - Self(0) - } - - pub(crate) fn next(&mut self) -> Token { - let token = Token(self.0); - self.0 += 1; - token - } -} - /// Start server building process pub fn new() -> ServerBuilder { ServerBuilder::default() diff --git a/actix-server/src/service.rs b/actix-server/src/service.rs index da57af67..6faa3d00 100644 --- a/actix-server/src/service.rs +++ b/actix-server/src/service.rs @@ -11,7 +11,6 @@ use futures_core::future::LocalBoxFuture; use log::error; use crate::socket::{FromStream, MioStream}; -use crate::Token; pub trait ServiceFactory: Send + Clone + 'static { type Factory: BaseServiceFactory; @@ -20,11 +19,11 @@ pub trait ServiceFactory: Send + Clone + 'static { } pub(crate) trait InternalServiceFactory: Send { - fn name(&self, token: Token) -> &str; + fn name(&self, token: usize) -> &str; fn clone_factory(&self) -> Box; - fn create(&self) -> LocalBoxFuture<'static, Result, ()>>; + fn create(&self) -> LocalBoxFuture<'static, Result, ()>>; } pub(crate) type BoxedServerService = Box< @@ -86,7 +85,7 @@ where pub(crate) struct StreamNewService, Io: FromStream> { name: String, inner: F, - token: Token, + token: usize, addr: SocketAddr, _t: PhantomData, } @@ -98,7 +97,7 @@ where { pub(crate) fn create( name: String, - token: Token, + token: usize, inner: F, addr: SocketAddr, ) -> Box { @@ -117,7 +116,7 @@ where F: ServiceFactory, Io: FromStream + Send + 'static, { - fn name(&self, _: Token) -> &str { + fn name(&self, _: usize) -> &str { &self.name } @@ -131,7 +130,7 @@ where }) } - fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { + fn create(&self) -> LocalBoxFuture<'static, Result, ()>> { let token = self.token; let fut = self.inner.create().new_service(()); Box::pin(async move { diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 7bc211b1..5f8a8cf5 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -23,10 +23,10 @@ use tokio::sync::{ oneshot, }; +use crate::join_all; use crate::service::{BoxedServerService, InternalServiceFactory}; use crate::socket::MioStream; use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::{join_all, Token}; /// Stop worker message. Returns `true` on successful graceful shutdown. /// and `false` if some connections still alive when shutdown execute. @@ -38,7 +38,7 @@ pub(crate) struct Stop { #[derive(Debug)] pub(crate) struct Conn { pub io: MioStream, - pub token: Token, + pub token: usize, } fn handle_pair( @@ -249,8 +249,8 @@ impl ServerWorker { Ok(res) => res .into_iter() .flatten() - .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token.0, services.len()); + .fold(Vec::new(), |mut services, (factory, idx, service)| { + assert_eq!(idx, services.len()); services.push(WorkerService { factory, service, @@ -283,13 +283,13 @@ impl ServerWorker { handle_pair(idx, tx1, tx2, avail) } - fn restart_service(&mut self, token: Token, factory_id: usize) { + fn restart_service(&mut self, idx: usize, factory_id: usize) { let factory = &self.factories[factory_id]; - trace!("Service {:?} failed, restarting", factory.name(token)); - self.services[token.0].status = WorkerServiceStatus::Restarting; + trace!("Service {:?} failed, restarting", factory.name(idx)); + self.services[idx].status = WorkerServiceStatus::Restarting; self.state = WorkerState::Restarting(Restart { factory_id, - token, + idx, fut: factory.create(), }); } @@ -307,7 +307,7 @@ impl ServerWorker { }); } - fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { + fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { let mut ready = self.conns.available(cx); for (idx, srv) in self.services.iter_mut().enumerate() { if srv.status == WorkerServiceStatus::Available @@ -318,7 +318,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Unavailable { trace!( "Service {:?} is available", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Available; } @@ -329,7 +329,7 @@ impl ServerWorker { if srv.status == WorkerServiceStatus::Available { trace!( "Service {:?} is unavailable", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Unavailable; } @@ -337,10 +337,10 @@ impl ServerWorker { Poll::Ready(Err(_)) => { error!( "Service {:?} readiness check returned error, restarting", - self.factories[srv.factory].name(Token(idx)) + self.factories[srv.factory].name(idx) ); srv.status = WorkerServiceStatus::Failed; - return Err((Token(idx), srv.factory)); + return Err((idx, srv.factory)); } } } @@ -359,8 +359,8 @@ enum WorkerState { struct Restart { factory_id: usize, - token: Token, - fut: LocalBoxFuture<'static, Result, ()>>, + idx: usize, + fut: LocalBoxFuture<'static, Result, ()>>, } // Shutdown keep states necessary for server shutdown: @@ -438,7 +438,7 @@ impl Future for ServerWorker { }, WorkerState::Restarting(ref mut restart) => { let factory_id = restart.factory_id; - let token = restart.token; + let token = restart.idx; let service = ready!(restart.fut.as_mut().poll(cx)) .unwrap_or_else(|_| { @@ -459,7 +459,7 @@ impl Future for ServerWorker { this.factories[factory_id].name(token) ); - this.services[token.0].created(service); + this.services[token].created(service); this.state = WorkerState::Unavailable; self.poll(cx) @@ -508,7 +508,7 @@ impl Future for ServerWorker { // handle incoming io stream Some(msg) => { let guard = this.conns.get(); - let _ = this.services[msg.token.0].service.call((guard, msg.io)); + let _ = this.services[msg.token].service.call((guard, msg.io)); } None => return Poll::Ready(()), }; diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index a20df0c1..cc9f8190 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -436,3 +436,144 @@ async fn test_service_restart() { let _ = server.stop(false); let _ = h.join().unwrap(); } + +#[ignore] +#[actix_rt::test] +async fn worker_restart() { + use actix_service::{Service, ServiceFactory}; + use futures_core::future::LocalBoxFuture; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + struct TestServiceFactory(Arc); + + impl ServiceFactory for TestServiceFactory { + type Response = (); + type Error = (); + type Config = (); + type Service = TestService; + type InitError = (); + type Future = LocalBoxFuture<'static, Result>; + + fn new_service(&self, _: Self::Config) -> Self::Future { + let counter = self.0.fetch_add(1, Ordering::Relaxed); + + Box::pin(async move { Ok(TestService(counter)) }) + } + } + + struct TestService(usize); + + impl Service for TestService { + type Response = (); + type Error = (); + type Future = LocalBoxFuture<'static, Result>; + + actix_service::always_ready!(); + + fn call(&self, stream: TcpStream) -> Self::Future { + let counter = self.0; + + let mut stream = stream.into_std().unwrap(); + use std::io::Write; + let str = counter.to_string(); + let buf = str.as_bytes(); + + let mut written = 0; + + while written < buf.len() { + if let Ok(n) = stream.write(&buf[written..]) { + written += n; + } + } + stream.flush().unwrap(); + stream.shutdown(net::Shutdown::Write).unwrap(); + + // force worker 2 to restart service once. + if counter == 2 { + panic!("panic on purpose") + } else { + Box::pin(async { Ok(()) }) + } + } + } + + let addr = unused_addr(); + let (tx, rx) = mpsc::channel(); + + let counter = Arc::new(AtomicUsize::new(1)); + let h = thread::spawn(move || { + let counter = counter.clone(); + actix_rt::System::new().block_on(async { + let server = Server::build() + .disable_signals() + .bind("addr", addr, move || TestServiceFactory(counter.clone())) + .unwrap() + .workers(2) + .run(); + + let _ = tx.send((server.clone(), actix_rt::System::current())); + server.await + }) + }); + + let (server, sys) = rx.recv().unwrap(); + + sleep(Duration::from_secs(3)).await; + + let mut buf = [0; 8]; + + // worker 1 would not restart and return it's id consistently. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // worker 2 dead after return response. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("2", id); + stream.shutdown().await.unwrap(); + + // request to worker 1 + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarting and work goes to worker 1. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 restarted but worker 1 was still the next to accept connection. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("1", id); + stream.shutdown().await.unwrap(); + + // TODO: Remove sleep if it can pass CI. + sleep(Duration::from_secs(3)).await; + + // worker 2 accept connection again but it's id is 3. + let mut stream = TcpStream::connect(addr).await.unwrap(); + let n = stream.read(&mut buf).await.unwrap(); + let id = String::from_utf8_lossy(&buf[0..n]); + assert_eq!("3", id); + stream.shutdown().await.unwrap(); + + sys.stop(); + let _ = server.stop(false); + let _ = h.join().unwrap(); +} diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index c99cc2eb..a0130dbc 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx + + +## 2.0.0 - 2021-04-16 * Removed pipeline and related structs/functions. [#335] [#335]: https://github.com/actix/actix-net/pull/335 diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index 1c82f703..7865cd86 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-service" -version = "2.0.0-beta.5" +version = "2.0.0" authors = [ "Nikolay Kim ", "Rob Ede ", @@ -8,11 +8,8 @@ authors = [ ] description = "Service trait and combinators for representing asynchronous request/response operations." keywords = ["network", "framework", "async", "futures", "service"] -homepage = "https://actix.rs" -repository = "https://github.com/actix/actix-net.git" -documentation = "https://docs.rs/actix-service" -readme = "README.md" categories = ["network-programming", "asynchronous"] +repository = "https://github.com/actix/actix-net" license = "MIT OR Apache-2.0" edition = "2018" @@ -27,5 +24,5 @@ pin-project-lite = "0.2" [dev-dependencies] actix-rt = "2.0.0" -actix-utils = "3.0.0-beta.4" +actix-utils = "3.0.0" futures-util = { version = "0.3.7", default-features = false } diff --git a/actix-service/README.md b/actix-service/README.md index 54171274..913ac199 100644 --- a/actix-service/README.md +++ b/actix-service/README.md @@ -3,10 +3,10 @@ > Service trait and combinators for representing asynchronous request/response operations. [![crates.io](https://img.shields.io/crates/v/actix-service?label=latest)](https://crates.io/crates/actix-service) -[![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0-beta.5)](https://docs.rs/actix-service/2.0.0-beta.5) +[![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0)](https://docs.rs/actix-service/2.0.0) [![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html) ![License](https://img.shields.io/crates/l/actix-service.svg) -[![Dependency Status](https://deps.rs/crate/actix-service/2.0.0-beta.5/status.svg)](https://deps.rs/crate/actix-service/2.0.0-beta.5) +[![Dependency Status](https://deps.rs/crate/actix-service/2.0.0/status.svg)](https://deps.rs/crate/actix-service/2.0.0) ![Download](https://img.shields.io/crates/d/actix-service.svg) [![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x) diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 7fbd94b0..73395a14 100755 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -42,8 +42,8 @@ uri = ["http"] [dependencies] actix-codec = "0.4.0-beta.1" actix-rt = { version = "2.2.0", default-features = false } -actix-service = "2.0.0-beta.5" -actix-utils = "3.0.0-beta.4" +actix-service = "2.0.0" +actix-utils = "3.0.0" derive_more = "0.99.5" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } diff --git a/actix-tracing/Cargo.toml b/actix-tracing/Cargo.toml index ec2e4a7c..2ed2c434 100644 --- a/actix-tracing/Cargo.toml +++ b/actix-tracing/Cargo.toml @@ -16,8 +16,8 @@ name = "actix_tracing" path = "src/lib.rs" [dependencies] -actix-service = "2.0.0-beta.5" -actix-utils = "3.0.0-beta.4" +actix-service = "2.0.0" +actix-utils = "3.0.0" tracing = "0.1" tracing-futures = "0.2" diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index d14446de..79f171b4 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -3,6 +3,10 @@ ## Unreleased - 2021-xx-xx +## 3.0.0 - 2021-04-16 +* No significant changes from `3.0.0-beta.4`. + + ## 3.0.0-beta.4 - 2021-04-01 * Add `future::Either` type. [#305] diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 8b593697..a94706a2 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,14 +1,14 @@ [package] name = "actix-utils" -version = "3.0.0-beta.4" +version = "3.0.0" authors = [ "Nikolay Kim ", "Rob Ede ", ] -description = "Utilities for the Actix ecosystem" +description = "Various utilities used in the Actix ecosystem" keywords = ["network", "framework", "async", "futures"] -repository = "https://github.com/actix/actix-net.git" categories = ["network-programming", "asynchronous"] +repository = "https://github.com/actix/actix-net" license = "MIT OR Apache-2.0" edition = "2018" diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index f94147ec..6d431d52 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -1,4 +1,4 @@ -//! Various utilities for the Actix ecosystem. +//! Various utilities used in the Actix ecosystem. #![deny(rust_2018_idioms, nonstandard_style)] #![warn(missing_docs)]