From 9f846643c1849c32045035ec66f2ba6e436b82f6 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Sun, 25 Oct 2020 21:31:33 +0800 Subject: [PATCH] more refactor for actix-server --- .github/workflows/linux.yml | 2 +- actix-codec/CHANGES.md | 5 +- actix-codec/Cargo.toml | 2 +- actix-rt/src/arbiter.rs | 9 +- actix-server/CHANGES.md | 11 ++- actix-server/src/accept.rs | 98 +++++++++---------- actix-server/src/builder.rs | 34 +++---- actix-server/src/config.rs | 16 ++-- actix-server/src/lib.rs | 10 ++ actix-server/src/socket.rs | 2 +- actix-server/src/waker_queue.rs | 35 +++++-- actix-server/src/worker.rs | 162 +++++++++++++------------------- actix-service/CHANGES.md | 2 +- actix-service/Cargo.toml | 2 +- actix-tls/Cargo.toml | 2 +- actix-utils/CHANGES.md | 3 +- actix-utils/Cargo.toml | 2 +- string/CHANGES.md | 3 + 18 files changed, 192 insertions(+), 208 deletions(-) diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml index 3e77e990..8f63c710 100644 --- a/.github/workflows/linux.yml +++ b/.github/workflows/linux.yml @@ -14,7 +14,7 @@ jobs: fail-fast: false matrix: version: - - 1.42.0 + - 1.43.0 - stable - nightly diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index 6aa05bcf..c7f2d576 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -1,8 +1,9 @@ # Changes ## Unreleased - 2020-xx-xx -* Update `tokio` dependency to 0.3.1 -* Update `tokio-util` dependency to 0.4 +* Upgrade `pin-project` to `1.0`. +* Update `tokio` dependency to 0.3.1. +* Update `tokio-util` dependency to 0.4. ## 0.3.0 - 2020-08-23 * No changes from beta 2. diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 7280a43c..1ae07dfb 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -23,6 +23,6 @@ bytes = "0.5" futures-core = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false } log = "0.4" -pin-project = "0.4.17" +pin-project = "1.0.0" tokio = "0.3.1" tokio-util = { version = "0.4.0", default-features = false, features = ["codec"] } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 069a6af4..e4bfa8a6 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -7,21 +7,20 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; use std::{fmt, thread}; +use copyless::BoxHelper; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures_channel::oneshot::{channel, Canceled, Sender}; use futures_util::{ future::{self, FutureExt}, stream::Stream, }; +use smallvec::SmallVec; + +pub use tokio::task::JoinHandle; use crate::runtime::Runtime; use crate::system::System; -use copyless::BoxHelper; - -use smallvec::SmallVec; -pub use tokio::task::JoinHandle; - thread_local!( static ADDR: RefCell> = RefCell::new(None); static RUNNING: Cell = Cell::new(false); diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 6d5c2b73..37763323 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,11 +1,12 @@ # Changes ## Unreleased - 2020-xx-xx -* Update `mio` dependency to 0.7.3 -* `ServerBuilder::backlog` would accept `u32` instead of `i32` -* Use `concurrent-queue` to manage poll wakes instead of `futures::channel::mpsc`. -* Remove `AcceptNotify` type and pass `WakerQueue` to `WorkerClient` for notify the `Accept` more directly. -* Convert `mio::Stream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`IntoRawSocket` and `FromRawSocket` on windows). +* Update `mio` dependency to 0.7.3. +* Remove `socket2` dependency. +* `ServerBuilder::backlog` would accept `u32` instead of `i32`. +* Use `concurrent-queue` to manage poll wakes instead of `futures::channel::mpsc::unbounded`. +* Remove `AcceptNotify` type and pass `WakerQueue` to `Worker` for wake up the `Accept`'s `Poll`. +* Convert `mio::net::TcpStream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`FromRawSocket` and `IntoRawSocket` on windows). * Remove `AsyncRead` and `AsyncWrite` trait bound for `socket::FromStream` trait. ## 1.0.4 - 2020-09-12 diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 0ee8c128..c40bcd07 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -61,13 +61,13 @@ impl AcceptLoop { pub(crate) fn start( &mut self, socks: Vec<(Token, MioListener)>, - workers: Vec, + handles: Vec, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); let poll = self.poll.take().unwrap(); let waker = self.waker.clone(); - Accept::start(poll, waker, socks, srv, workers); + Accept::start(poll, waker, socks, srv, handles); } } @@ -75,14 +75,12 @@ impl AcceptLoop { struct Accept { poll: Poll, waker: WakerQueue, - workers: Vec, + handles: Vec, srv: Server, next: usize, backpressure: bool, } -const DELTA: usize = 100; - /// This function defines errors that are per-connection. Which basically /// means that if we get this error from `accept()` system call it means /// next connection might be ready to be accepted. @@ -102,7 +100,7 @@ impl Accept { waker: WakerQueue, socks: Vec<(Token, MioListener)>, srv: Server, - workers: Vec, + handles: Vec, ) { // Accept runs in its own thread and would want to spawn additional futures to current // actix system. @@ -112,7 +110,7 @@ impl Accept { .spawn(move || { System::set_current(sys); let (mut accept, sockets) = - Accept::new_with_sockets(poll, waker, socks, workers, srv); + Accept::new_with_sockets(poll, waker, socks, handles, srv); accept.poll_with(sockets); }) .unwrap(); @@ -122,7 +120,7 @@ impl Accept { poll: Poll, waker: WakerQueue, socks: Vec<(Token, MioListener)>, - workers: Vec, + handles: Vec, srv: Server, ) -> (Accept, Slab) { let mut sockets = Slab::new(); @@ -134,7 +132,7 @@ impl Accept { // Start listening for incoming connections poll.registry() - .register(&mut lst, MioToken(token + DELTA), Interest::READABLE) + .register(&mut lst, MioToken(token), Interest::READABLE) .unwrap_or_else(|e| panic!("Can not register io: {}", e)); entry.insert(ServerSocketInfo { @@ -148,7 +146,7 @@ impl Accept { let accept = Accept { poll, waker, - workers, + handles, srv, next: 0, backpressure: false, @@ -173,11 +171,20 @@ impl Accept { // necessary/good practice to actively drain the waker queue. WAKER_TOKEN => 'waker: loop { match self.waker.pop() { - // worker notify it's availability has change. we maybe want to recover - // from backpressure. - Ok(WakerInterest::Notify) => { + // worker notify it becomes available. we may want to recover from + // backpressure. + Ok(WakerInterest::WorkerAvailable) => { self.maybe_backpressure(&mut sockets, false); } + // a new worker thread is made and it's handle would be added to Accept + Ok(WakerInterest::Worker(handle)) => { + // maybe we want to recover from a backpressure. + self.maybe_backpressure(&mut sockets, false); + self.handles.push(handle); + } + // got timer interest and it's time to try register socket(s) again. + Ok(WakerInterest::Timer) => self.process_timer(&mut sockets), + Err(WakerQueueError::Empty) => break 'waker, Ok(WakerInterest::Pause) => { sockets.iter_mut().for_each(|(_, info)| { match self.deregister(info) { @@ -196,29 +203,14 @@ impl Accept { self.register_logged(token, info); }); } - Ok(WakerInterest::Stop) => { - return self.deregister_all(&mut sockets); - } - // a new worker thread is made and it's handle would be added to Accept - Ok(WakerInterest::Worker(handle)) => { - // maybe we want to recover from a backpressure. - self.maybe_backpressure(&mut sockets, false); - self.workers.push(handle); - } - // got timer interest and it's time to try register socket(s) again. - Ok(WakerInterest::Timer) => self.process_timer(&mut sockets), - Err(WakerQueueError::Empty) => break 'waker, - Err(WakerQueueError::Closed) => { + Ok(WakerInterest::Stop) | Err(WakerQueueError::Closed) => { return self.deregister_all(&mut sockets); } } }, _ => { let token = usize::from(token); - if token < DELTA { - continue; - } - self.accept(&mut sockets, token - DELTA); + self.accept(&mut sockets, token); } } } @@ -241,11 +233,9 @@ impl Accept { #[cfg(not(target_os = "windows"))] fn register(&self, token: usize, info: &mut ServerSocketInfo) -> io::Result<()> { - self.poll.registry().register( - &mut info.lst, - MioToken(token + DELTA), - Interest::READABLE, - ) + self.poll + .registry() + .register(&mut info.lst, MioToken(token), Interest::READABLE) } #[cfg(target_os = "windows")] @@ -255,11 +245,11 @@ impl Accept { // Calling reregister seems to fix the issue. self.poll .registry() - .register(&mut info.lst, mio::Token(token + DELTA), Interest::READABLE) + .register(&mut info.lst, mio::Token(token), Interest::READABLE) .or_else(|_| { self.poll.registry().reregister( &mut info.lst, - mio::Token(token + DELTA), + mio::Token(token), Interest::READABLE, ) }) @@ -298,8 +288,8 @@ impl Accept { fn accept_one(&mut self, sockets: &mut Slab, mut msg: Conn) { if self.backpressure { - while !self.workers.is_empty() { - match self.workers[self.next].send(msg) { + while !self.handles.is_empty() { + match self.handles[self.next].send(msg) { Ok(_) => { self.set_next(); break; @@ -308,13 +298,13 @@ impl Accept { // worker lost contact and could be gone. a message is sent to // `ServerBuilder` future to notify it a new worker should be made. // after that remove the fault worker. - self.srv.worker_faulted(self.workers[self.next].idx); + self.srv.worker_faulted(self.handles[self.next].idx); msg = tmp; - self.workers.swap_remove(self.next); - if self.workers.is_empty() { + self.handles.swap_remove(self.next); + if self.handles.is_empty() { error!("No workers"); return; - } else if self.workers.len() <= self.next { + } else if self.handles.len() <= self.next { self.next = 0; } continue; @@ -323,10 +313,10 @@ impl Accept { } } else { let mut idx = 0; - while idx < self.workers.len() { + while idx < self.handles.len() { idx += 1; - if self.workers[self.next].available() { - match self.workers[self.next].send(msg) { + if self.handles[self.next].available() { + match self.handles[self.next].send(msg) { Ok(_) => { self.set_next(); return; @@ -335,14 +325,14 @@ impl Accept { // `ServerBuilder` future to notify it a new worker should be made. // after that remove the fault worker and enter backpressure if necessary. Err(tmp) => { - self.srv.worker_faulted(self.workers[self.next].idx); + self.srv.worker_faulted(self.handles[self.next].idx); msg = tmp; - self.workers.swap_remove(self.next); - if self.workers.is_empty() { + self.handles.swap_remove(self.next); + if self.handles.is_empty() { error!("No workers"); self.maybe_backpressure(sockets, true); return; - } else if self.workers.len() <= self.next { + } else if self.handles.len() <= self.next { self.next = 0; } continue; @@ -357,9 +347,9 @@ impl Accept { } } - // set next worker that would accept work. + // set next worker handle that would accept work. fn set_next(&mut self) { - self.next = (self.next + 1) % self.workers.len(); + self.next = (self.next + 1) % self.handles.len(); } fn accept(&mut self, sockets: &mut Slab, token: usize) { @@ -375,9 +365,9 @@ impl Accept { Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return, Err(ref e) if connection_error(e) => continue, Err(e) => { - // deregister socket listener temporary + // deregister listener temporary error!("Error accepting connection: {}", e); - if let Err(err) = self.poll.registry().deregister(&mut info.lst) { + if let Err(err) = self.deregister(info) { error!("Can not deregister server socket {}", err); } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index ea7a4046..2c35864c 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -30,7 +30,7 @@ pub struct ServerBuilder { threads: usize, token: Token, backlog: u32, - workers: Vec<(usize, WorkerHandle)>, + handles: Vec<(usize, WorkerHandle)>, services: Vec>, sockets: Vec<(Token, String, MioListener)>, accept: AcceptLoop, @@ -56,8 +56,8 @@ impl ServerBuilder { ServerBuilder { threads: num_cpus::get(), - token: Token(0), - workers: Vec::new(), + token: Token::default(), + handles: Vec::new(), services: Vec::new(), sockets: Vec::new(), accept: AcceptLoop::new(server.clone()), @@ -264,12 +264,12 @@ impl ServerBuilder { info!("Starting {} workers", self.threads); // start workers - let workers = (0..self.threads) + let handles = (0..self.threads) .map(|idx| { - let worker = self.start_worker(idx, self.accept.waker_owned()); - self.workers.push((idx, worker.clone())); + let handle = self.start_worker(idx, self.accept.waker_owned()); + self.handles.push((idx, handle.clone())); - worker + handle }) .collect(); @@ -282,7 +282,7 @@ impl ServerBuilder { .into_iter() .map(|t| (t.0, t.2)) .collect(), - workers, + handles, ); // handle signals @@ -359,9 +359,9 @@ impl ServerBuilder { let notify = std::mem::take(&mut self.notify); // stop workers - if !self.workers.is_empty() && graceful { + if !self.handles.is_empty() && graceful { spawn( - self.workers + self.handles .iter() .map(move |worker| worker.1.stop(graceful)) .collect::>() @@ -403,9 +403,9 @@ impl ServerBuilder { } ServerCommand::WorkerFaulted(idx) => { let mut found = false; - for i in 0..self.workers.len() { - if self.workers[i].0 == idx { - self.workers.swap_remove(i); + for i in 0..self.handles.len() { + if self.handles[i].0 == idx { + self.handles.swap_remove(i); found = true; break; } @@ -414,10 +414,10 @@ impl ServerBuilder { if found { error!("Worker has died {:?}, restarting", idx); - let mut new_idx = self.workers.len(); + let mut new_idx = self.handles.len(); 'found: loop { - for i in 0..self.workers.len() { - if self.workers[i].0 == new_idx { + for i in 0..self.handles.len() { + if self.handles[i].0 == new_idx { new_idx += 1; continue 'found; } @@ -426,7 +426,7 @@ impl ServerBuilder { } let handle = self.start_worker(new_idx, self.accept.waker_owned()); - self.workers.push((new_idx, handle.clone())); + self.handles.push((new_idx, handle.clone())); self.accept.wake(WakerInterest::Worker(handle)); } } diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 6087efa9..cda21b74 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -60,14 +60,6 @@ impl ServiceConfig { self._listen(name, MioTcpListener::from_std(lst)) } - fn _listen>(&mut self, name: N, lst: MioTcpListener) -> &mut Self { - if self.apply.is_none() { - self.apply = Some(Box::new(not_configured)); - } - self.services.push((name.as_ref().to_string(), lst)); - self - } - /// Register service configuration function. This function get called /// during worker runtime configuration. It get executed in worker thread. pub fn apply(&mut self, f: F) -> io::Result<()> @@ -77,6 +69,14 @@ impl ServiceConfig { self.apply = Some(Box::new(f)); Ok(()) } + + fn _listen>(&mut self, name: N, lst: MioTcpListener) -> &mut Self { + if self.apply.is_none() { + self.apply = Some(Box::new(not_configured)); + } + self.services.push((name.as_ref().to_string(), lst)); + self + } } pub(super) struct ConfiguredService { diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index b15ff26e..eea63cd2 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -22,7 +22,17 @@ pub use self::socket::FromStream; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub(crate) struct Token(usize); +impl Default for Token { + fn default() -> Self { + Self::new() + } +} + impl Token { + fn new() -> Self { + Self(0) + } + pub(crate) fn next(&mut self) -> Token { let token = Token(self.0); self.0 += 1; diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index d9c61c75..6eb67d3c 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -235,7 +235,7 @@ mod tests { assert_eq!(format!("{}", addr), "127.0.0.1:8080"); let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = mio::net::TcpSocket::new_v4().unwrap(); + let socket = MioTcpSocket::new_v4().unwrap(); socket.set_reuseaddr(true).unwrap(); socket.bind(addr).unwrap(); let tcp = socket.listen(128).unwrap(); diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 3a1a5ee3..fadfee15 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -1,3 +1,4 @@ +use std::ops::Deref; use std::sync::Arc; use concurrent_queue::{ConcurrentQueue, PopError}; @@ -6,7 +7,7 @@ use mio::{Registry, Token as MioToken, Waker}; use crate::worker::WorkerHandle; /// waker token for `mio::Poll` instance -pub(crate) const WAKER_TOKEN: MioToken = MioToken(1); +pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX); /// `mio::Waker` with a queue for waking up the `Accept`'s `Poll` and contains the `WakerInterest` /// the `Poll` would want to look into. @@ -18,6 +19,14 @@ impl Clone for WakerQueue { } } +impl Deref for WakerQueue { + type Target = (Waker, ConcurrentQueue); + + fn deref(&self) -> &Self::Target { + self.0.deref() + } +} + impl WakerQueue { /// construct a waker queue with given `Poll`'s `Registry` and capacity. /// @@ -32,16 +41,21 @@ impl WakerQueue { /// push a new interest to the queue and wake up the accept poll afterwards. pub(crate) fn wake(&self, interest: WakerInterest) { - // ToDo: should we handle error here? - let r = (self.0).1.push(interest); - assert!(r.is_ok()); + let (waker, queue) = self.deref(); - (self.0).0.wake().expect("can not wake up Accept Poll"); + // ToDo: should we handle error here? + queue + .push(interest) + .unwrap_or_else(|e| panic!("WakerQueue overflow: {}", e)); + + waker + .wake() + .unwrap_or_else(|e| panic!("can not wake up Accept Poll: {}", e)); } /// pop an `WakerInterest` from the back of the queue. pub(crate) fn pop(&self) -> Result { - (self.0).1.pop() + self.deref().1.pop() } } @@ -49,8 +63,9 @@ impl WakerQueue { /// /// *. These interests should not be confused with `mio::Interest` and mostly not I/O related pub(crate) enum WakerInterest { - /// Interest from `Worker` notifying `Accept` to run `maybe_backpressure` method - Notify, + /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker + /// available and can accept new tasks. + WorkerAvailable, /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to /// `ServerCommand` and notify `Accept` to do exactly these tasks. Pause, @@ -60,8 +75,8 @@ pub(crate) enum WakerInterest { /// connection `Accept` would deregister socket listener temporary and wake up the poll and /// register them again after the delayed future resolve. Timer, - /// `Worker` ins an interest happen after a worker runs into faulted state(This is determined by - /// if work can be sent to it successfully).`Accept` would be waked up and add the new + /// `WorkerNew` is an interest happen after a worker runs into faulted state(This is determined + /// by if work can be sent to it successfully).`Accept` would be waked up and add the new /// `WorkerHandle`. Worker(WorkerHandle), } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 05e383f8..54158492 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -48,15 +48,15 @@ pub fn max_concurrent_connections(num: usize) { MAX_CONNS.store(num, Ordering::Relaxed); } -pub(crate) fn num_connections() -> usize { - MAX_CONNS_COUNTER.with(|conns| conns.total()) -} - thread_local! { static MAX_CONNS_COUNTER: Counter = Counter::new(MAX_CONNS.load(Ordering::Relaxed)); } +pub(crate) fn num_connections() -> usize { + MAX_CONNS_COUNTER.with(|conns| conns.total()) +} + // a handle to worker that can send message to worker and share the availability of worker to other // thread. #[derive(Clone)] @@ -119,8 +119,9 @@ impl WorkerAvailability { pub fn set(&self, val: bool) { let old = self.available.swap(val, Ordering::Release); + // notify the accept on switched to available. if !old && val { - self.waker.wake(WakerInterest::Notify); + self.waker.wake(WakerInterest::WorkerAvailable); } } } @@ -174,6 +175,7 @@ impl Worker { let (tx2, rx2) = unbounded(); let avail = availability.clone(); + // every worker runs in it's own arbiter. Arbiter::new().send(Box::pin(async move { availability.set(false); let mut wrk = MAX_CONNS_COUNTER.with(move |conns| Worker { @@ -184,7 +186,7 @@ impl Worker { shutdown_timeout, services: Vec::new(), conns: conns.clone(), - state: WorkerState::Unavailable(Vec::new()), + state: WorkerState::Unavailable, }); let fut = wrk @@ -253,7 +255,7 @@ impl Worker { fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { let mut ready = self.conns.available(cx); let mut failed = None; - for (idx, srv) in &mut self.services.iter_mut().enumerate() { + for (idx, srv) in self.services.iter_mut().enumerate() { if srv.status == WorkerServiceStatus::Available || srv.status == WorkerServiceStatus::Unavailable { @@ -299,7 +301,7 @@ impl Worker { enum WorkerState { Available, - Unavailable(Vec), + Unavailable, Restarting( usize, Token, @@ -345,43 +347,24 @@ impl Future for Worker { } match self.state { - WorkerState::Unavailable(ref mut conns) => { - let conn = conns.pop(); - match self.check_readiness(cx) { - Ok(true) => { - // process requests from wait queue - if let Some(conn) = conn { - let guard = self.conns.get(); - let _ = self.services[conn.token.0] - .service - .call((Some(guard), ServerMessage::Connect(conn.io))); - } else { - self.state = WorkerState::Available; - self.availability.set(true); - } - self.poll(cx) - } - Ok(false) => { - // push connection back to queue - if let Some(conn) = conn { - if let WorkerState::Unavailable(ref mut conns) = self.state { - conns.push(conn); - } - } - Poll::Pending - } - Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); - self.services[token.0].status = WorkerServiceStatus::Restarting; - self.state = - WorkerState::Restarting(idx, token, self.factories[idx].create()); - self.poll(cx) - } + WorkerState::Unavailable => match self.check_readiness(cx) { + Ok(true) => { + self.state = WorkerState::Available; + self.availability.set(true); + self.poll(cx) } - } + Ok(false) => Poll::Pending, + Err((token, idx)) => { + trace!( + "Service {:?} failed, restarting", + self.factories[idx].name(token) + ); + self.services[token.0].status = WorkerServiceStatus::Restarting; + self.state = + WorkerState::Restarting(idx, token, self.factories[idx].create()); + self.poll(cx) + } + }, WorkerState::Restarting(idx, token, ref mut fut) => { match fut.as_mut().poll(cx) { Poll::Ready(Ok(item)) => { @@ -392,18 +375,9 @@ impl Future for Worker { self.factories[idx].name(token) ); self.services[token.0].created(service); - self.state = WorkerState::Unavailable(Vec::new()); + self.state = WorkerState::Unavailable; return self.poll(cx); } - // for (token, service) in item { - // trace!( - // "Service {:?} has been restarted", - // self.factories[idx].name(token) - // ); - // self.services[token.0].created(service); - // self.state = WorkerState::Unavailable(Vec::new()); - // return self.poll(cx); - // } } Poll::Ready(Err(_)) => { panic!( @@ -411,9 +385,7 @@ impl Future for Worker { self.factories[idx].name(token) ); } - Poll::Pending => { - return Poll::Pending; - } + Poll::Pending => return Poll::Pending, } self.poll(cx) } @@ -441,49 +413,41 @@ impl Future for Worker { Poll::Pending } - WorkerState::Available => { - loop { - return match Pin::new(&mut self.rx).poll_next(cx) { - // handle incoming io stream - Poll::Ready(Some(WorkerCommand(msg))) => { - match self.check_readiness(cx) { - Ok(true) => { - let guard = self.conns.get(); - let _ = self.services[msg.token.0] - .service - .call((Some(guard), ServerMessage::Connect(msg.io))); - continue; - } - Ok(false) => { - trace!("Worker is unavailable"); - self.availability.set(false); - self.state = WorkerState::Unavailable(vec![msg]); - } - Err((token, idx)) => { - trace!( - "Service {:?} failed, restarting", - self.factories[idx].name(token) - ); - self.availability.set(false); - self.services[token.0].status = - WorkerServiceStatus::Restarting; - self.state = WorkerState::Restarting( - idx, - token, - self.factories[idx].create(), - ); - } - } - self.poll(cx) - } - Poll::Pending => { - self.state = WorkerState::Available; - Poll::Pending - } - Poll::Ready(None) => Poll::Ready(()), - }; + // actively poll stream and handle worker command + WorkerState::Available => loop { + match self.check_readiness(cx) { + Ok(true) => (), + Ok(false) => { + trace!("Worker is unavailable"); + self.availability.set(false); + self.state = WorkerState::Unavailable; + return self.poll(cx); + } + Err((token, idx)) => { + trace!( + "Service {:?} failed, restarting", + self.factories[idx].name(token) + ); + self.availability.set(false); + self.services[token.0].status = WorkerServiceStatus::Restarting; + self.state = + WorkerState::Restarting(idx, token, self.factories[idx].create()); + return self.poll(cx); + } } - } + + match Pin::new(&mut self.rx).poll_next(cx) { + // handle incoming io stream + Poll::Ready(Some(WorkerCommand(msg))) => { + let guard = self.conns.get(); + let _ = self.services[msg.token.0] + .service + .call((Some(guard), ServerMessage::Connect(msg.io))); + } + Poll::Pending => return Poll::Pending, + Poll::Ready(None) => return Poll::Ready(()), + }; + }, } } } diff --git a/actix-service/CHANGES.md b/actix-service/CHANGES.md index 011ac65e..82481f46 100644 --- a/actix-service/CHANGES.md +++ b/actix-service/CHANGES.md @@ -1,7 +1,7 @@ # Changes ## Unreleased - 2020-xx-xx - +* Upgrade `pin-project` to `1.0`. ## 1.0.6 - 2020-08-09 diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index 7c73e9bf..1505873b 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -18,7 +18,7 @@ path = "src/lib.rs" [dependencies] futures-util = "0.3.1" -pin-project = "0.4.17" +pin-project = "1.0.0" [dev-dependencies] actix-rt = "1.0.0" diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 6e6de84e..b68e135c 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -47,9 +47,9 @@ tokio-openssl = { version = "0.5.0", optional = true } # rustls rust-tls = { package = "rustls", version = "0.18.0", optional = true } +tokio-rustls = { version = "0.20.0", optional = true } webpki = { version = "0.21", optional = true } webpki-roots = { version = "0.20", optional = true } -tokio-rustls = { version = "0.20.0", optional = true } # native-tls native-tls = { version = "0.2", optional = true } diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 032fa778..fb02ddbb 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,7 +1,8 @@ # Changes ## Unreleased - 2020-xx-xx -* Update `bytes` to 0.6 +* Upgrade `pin-project` to `1.0`. +* Update `bytes` dependency to 0.6. ## 2.0.0 - 2020-08-23 * No changes from beta 1. diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index e9e3d3ba..92087ed8 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -26,5 +26,5 @@ futures-channel = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false } log = "0.4" -pin-project = "0.4.17" +pin-project = "1.0.0" slab = "0.4" diff --git a/string/CHANGES.md b/string/CHANGES.md index 030c3cd5..bd357a45 100644 --- a/string/CHANGES.md +++ b/string/CHANGES.md @@ -1,5 +1,8 @@ # Changes +## Unreleased - 2020-xx-xx +* Update `bytes` dependency to 0.6 + ## [0.1.5] - 2020-03-30 * Serde support