Merge branch 'master' into refactor/simplify_server_future

This commit is contained in:
fakeshadow 2021-04-17 04:18:11 -07:00 committed by GitHub
commit d38c088139
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 69 additions and 58 deletions

View File

@ -186,7 +186,7 @@
same "printed page" as the copyright notice for easier same "printed page" as the copyright notice for easier
identification within third-party archives. identification within third-party archives.
Copyright 2017-NOW Nikolay Kim Copyright 2017-NOW Actix Team
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.

View File

@ -1,4 +1,4 @@
Copyright (c) 2017 Nikolay Kim Copyright (c) 2017-NOW Actix Team
Permission is hereby granted, free of charge, to any Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated person obtaining a copy of this software and associated

View File

@ -7,7 +7,7 @@
//! [`Sink`]: futures_sink::Sink //! [`Sink`]: futures_sink::Sink
//! [`Stream`]: futures_core::Stream //! [`Stream`]: futures_core::Stream
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style, future_incompatible)]
#![warn(missing_docs)] #![warn(missing_docs)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]

View File

@ -581,10 +581,7 @@ impl ResourceDef {
mut for_prefix: bool, mut for_prefix: bool,
) -> (String, Vec<PatternElement>, bool, usize) { ) -> (String, Vec<PatternElement>, bool, usize) {
if pattern.find('{').is_none() { if pattern.find('{').is_none() {
// TODO: MSRV: 1.45 return if let Some(path) = pattern.strip_suffix('*') {
#[allow(clippy::manual_strip)]
return if pattern.ends_with('*') {
let path = &pattern[..pattern.len() - 1];
let re = String::from("^") + path + "(.*)"; let re = String::from("^") + path + "(.*)";
(re, vec![PatternElement::Str(String::from(path))], true, 0) (re, vec![PatternElement::Str(String::from(path))], true, 0)
} else { } else {

View File

@ -22,8 +22,8 @@ default = []
[dependencies] [dependencies]
actix-rt = { version = "2.0.0", default-features = false } actix-rt = { version = "2.0.0", default-features = false }
actix-service = "2.0.0-beta.5" actix-service = "2.0.0"
actix-utils = "3.0.0-beta.4" actix-utils = "3.0.0"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4" log = "0.4"

View File

@ -73,14 +73,11 @@ impl Availability {
panic!("Max WorkerHandle count is 512") panic!("Max WorkerHandle count is 512")
}; };
let off = 1 << idx as u128;
if avail { if avail {
self.0[offset] |= 1 << idx as u128; self.0[offset] |= off;
} else { } else {
let shift = 1 << idx as u128; self.0[offset] &= !off
debug_assert_ne!(self.0[offset] & shift, 0);
self.0[offset] ^= shift;
} }
} }
@ -88,7 +85,7 @@ impl Availability {
/// This would result in a re-check on all workers' availability. /// This would result in a re-check on all workers' availability.
fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) {
handles.iter().for_each(|handle| { handles.iter().for_each(|handle| {
self.set_available(handle.idx, true); self.set_available(handle.idx(), true);
}) })
} }
} }
@ -238,12 +235,10 @@ impl Accept {
match guard.pop_front() { match guard.pop_front() {
// worker notify it becomes available. we may want to recover // worker notify it becomes available. we may want to recover
// from backpressure. // from backpressure.
Some(WakerInterest::WorkerAvailable) => { Some(WakerInterest::WorkerAvailable(idx)) => {
drop(guard); drop(guard);
self.maybe_backpressure(sockets, false); self.maybe_backpressure(sockets, false);
self.avail.set_available(idx, true);
// Assume all worker are avail as no worker index returned.
self.avail.set_available_all(&self.handles);
} }
// a new worker thread is made and it's handle would be added to Accept // a new worker thread is made and it's handle would be added to Accept
Some(WakerInterest::Worker(handle)) => { Some(WakerInterest::Worker(handle)) => {
@ -406,7 +401,7 @@ impl Accept {
} else { } else {
while self.avail.available() { while self.avail.available() {
let next = self.next(); let next = self.next();
let idx = next.idx; let idx = next.idx();
if next.available() { if next.available() {
self.avail.set_available(idx, true); self.avail.set_available(idx, true);
match self.send_connection(sockets, conn) { match self.send_connection(sockets, conn) {
@ -503,7 +498,7 @@ impl Accept {
/// Remove next worker handle that fail to accept connection. /// Remove next worker handle that fail to accept connection.
fn remove_next(&mut self) { fn remove_next(&mut self) {
let handle = self.handles.swap_remove(self.next); let handle = self.handles.swap_remove(self.next);
let idx = handle.idx; let idx = handle.idx();
// A message is sent to `ServerBuilder` future to notify it a new worker // A message is sent to `ServerBuilder` future to notify it a new worker
// should be made. // should be made.
self.srv.worker_faulted(idx); self.srv.worker_faulted(idx);
@ -523,6 +518,9 @@ mod test {
aval.set_available(idx, false); aval.set_available(idx, false);
assert!(!aval.available()); assert!(!aval.available());
aval.set_available(idx, false);
assert!(!aval.available());
} }
fn multi(aval: &mut Availability, mut idx: Vec<usize>) { fn multi(aval: &mut Availability, mut idx: Vec<usize>) {
@ -561,13 +559,6 @@ mod test {
single(&mut aval, 512); single(&mut aval, 512);
} }
#[test]
#[should_panic]
fn double_set_unavailable() {
let mut aval = Availability::default();
aval.set_available(233, false);
}
#[test] #[test]
fn pin_point() { fn pin_point() {
let mut aval = Availability::default(); let mut aval = Availability::default();

View File

@ -72,7 +72,7 @@ impl WakerQueue {
pub(crate) enum WakerInterest { pub(crate) enum WakerInterest {
/// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker /// `WorkerAvailable` is an interest from `Worker` notifying `Accept` there is a worker
/// available and can accept new tasks. /// available and can accept new tasks.
WorkerAvailable, WorkerAvailable(usize),
/// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to
/// `ServerCommand` and notify `Accept` to do exactly these tasks. /// `ServerCommand` and notify `Accept` to do exactly these tasks.
Pause, Pause,

View File

@ -47,11 +47,7 @@ fn handle_pair(
tx2: UnboundedSender<Stop>, tx2: UnboundedSender<Stop>,
avail: WorkerAvailability, avail: WorkerAvailability,
) -> (WorkerHandleAccept, WorkerHandleServer) { ) -> (WorkerHandleAccept, WorkerHandleServer) {
let accept = WorkerHandleAccept { let accept = WorkerHandleAccept { tx: tx1, avail };
idx,
tx: tx1,
avail,
};
let server = WorkerHandleServer { idx, tx: tx2 }; let server = WorkerHandleServer { idx, tx: tx2 };
@ -63,16 +59,22 @@ fn handle_pair(
/// ///
/// Held by [Accept](crate::accept::Accept). /// Held by [Accept](crate::accept::Accept).
pub(crate) struct WorkerHandleAccept { pub(crate) struct WorkerHandleAccept {
pub idx: usize,
tx: UnboundedSender<Conn>, tx: UnboundedSender<Conn>,
avail: WorkerAvailability, avail: WorkerAvailability,
} }
impl WorkerHandleAccept { impl WorkerHandleAccept {
#[inline(always)]
pub(crate) fn idx(&self) -> usize {
self.avail.idx
}
#[inline(always)]
pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
self.tx.send(msg).map_err(|msg| msg.0) self.tx.send(msg).map_err(|msg| msg.0)
} }
#[inline(always)]
pub(crate) fn available(&self) -> bool { pub(crate) fn available(&self) -> bool {
self.avail.available() self.avail.available()
} }
@ -96,27 +98,34 @@ impl WorkerHandleServer {
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct WorkerAvailability { pub(crate) struct WorkerAvailability {
idx: usize,
waker: WakerQueue, waker: WakerQueue,
available: Arc<AtomicBool>, available: Arc<AtomicBool>,
} }
impl WorkerAvailability { impl WorkerAvailability {
pub fn new(waker: WakerQueue) -> Self { pub fn new(idx: usize, waker: WakerQueue) -> Self {
WorkerAvailability { WorkerAvailability {
idx,
waker, waker,
available: Arc::new(AtomicBool::new(false)), available: Arc::new(AtomicBool::new(false)),
} }
} }
#[inline(always)]
pub fn available(&self) -> bool { pub fn available(&self) -> bool {
self.available.load(Ordering::Acquire) self.available.load(Ordering::Acquire)
} }
pub fn set(&self, val: bool) { pub fn set(&self, val: bool) {
let old = self.available.swap(val, Ordering::Release); // Ordering:
// notify the accept on switched to available. //
// There could be multiple set calls happen in one <ServerWorker as Future>::poll.
// Order is important between them.
let old = self.available.swap(val, Ordering::AcqRel);
// Notify the accept on switched to available.
if !old && val { if !old && val {
self.waker.wake(WakerInterest::WorkerAvailable); self.waker.wake(WakerInterest::WorkerAvailable(self.idx));
} }
} }
} }
@ -458,6 +467,15 @@ impl Default for WorkerState {
} }
} }
impl Drop for ServerWorker {
fn drop(&mut self) {
// Set availability to true so if accept try to send connection to this worker
// it would find worker is gone and remove it.
// This is helpful when worker is dropped unexpected.
self.availability.set(true);
}
}
impl Future for ServerWorker { impl Future for ServerWorker {
type Output = (); type Output = ();

View File

@ -440,6 +440,7 @@ async fn test_service_restart() {
let _ = h.join().unwrap(); let _ = h.join().unwrap();
} }
#[ignore]
#[actix_rt::test] #[actix_rt::test]
async fn worker_restart() { async fn worker_restart() {
use actix_service::{Service, ServiceFactory}; use actix_service::{Service, ServiceFactory};

View File

@ -1,6 +1,9 @@
# Changes # Changes
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 2.0.0 - 2021-04-16
* Removed pipeline and related structs/functions. [#335] * Removed pipeline and related structs/functions. [#335]
[#335]: https://github.com/actix/actix-net/pull/335 [#335]: https://github.com/actix/actix-net/pull/335

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-service" name = "actix-service"
version = "2.0.0-beta.5" version = "2.0.0"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>", "Rob Ede <robjtede@icloud.com>",
@ -8,11 +8,8 @@ authors = [
] ]
description = "Service trait and combinators for representing asynchronous request/response operations." description = "Service trait and combinators for representing asynchronous request/response operations."
keywords = ["network", "framework", "async", "futures", "service"] keywords = ["network", "framework", "async", "futures", "service"]
homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git"
documentation = "https://docs.rs/actix-service"
readme = "README.md"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
repository = "https://github.com/actix/actix-net"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"
@ -27,5 +24,5 @@ pin-project-lite = "0.2"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.0.0" actix-rt = "2.0.0"
actix-utils = "3.0.0-beta.4" actix-utils = "3.0.0"
futures-util = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false }

View File

@ -3,10 +3,10 @@
> Service trait and combinators for representing asynchronous request/response operations. > Service trait and combinators for representing asynchronous request/response operations.
[![crates.io](https://img.shields.io/crates/v/actix-service?label=latest)](https://crates.io/crates/actix-service) [![crates.io](https://img.shields.io/crates/v/actix-service?label=latest)](https://crates.io/crates/actix-service)
[![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0-beta.5)](https://docs.rs/actix-service/2.0.0-beta.5) [![Documentation](https://docs.rs/actix-service/badge.svg?version=2.0.0)](https://docs.rs/actix-service/2.0.0)
[![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html) [![Version](https://img.shields.io/badge/rustc-1.46+-ab6000.svg)](https://blog.rust-lang.org/2020/03/12/Rust-1.46.html)
![License](https://img.shields.io/crates/l/actix-service.svg) ![License](https://img.shields.io/crates/l/actix-service.svg)
[![Dependency Status](https://deps.rs/crate/actix-service/2.0.0-beta.5/status.svg)](https://deps.rs/crate/actix-service/2.0.0-beta.5) [![Dependency Status](https://deps.rs/crate/actix-service/2.0.0/status.svg)](https://deps.rs/crate/actix-service/2.0.0)
![Download](https://img.shields.io/crates/d/actix-service.svg) ![Download](https://img.shields.io/crates/d/actix-service.svg)
[![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x) [![Chat on Discord](https://img.shields.io/discord/771444961383153695?label=chat&logo=discord)](https://discord.gg/NWpN5mmg3x)

View File

@ -1,7 +1,7 @@
//! See [`Service`] docs for information on this crate's foundational trait. //! See [`Service`] docs for information on this crate's foundational trait.
#![no_std] #![no_std]
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style, future_incompatible)]
#![warn(missing_docs)] #![warn(missing_docs)]
#![allow(clippy::type_complexity)] #![allow(clippy::type_complexity)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]

View File

@ -42,8 +42,8 @@ uri = ["http"]
[dependencies] [dependencies]
actix-codec = "0.4.0-beta.1" actix-codec = "0.4.0-beta.1"
actix-rt = { version = "2.2.0", default-features = false } actix-rt = { version = "2.2.0", default-features = false }
actix-service = "2.0.0-beta.5" actix-service = "2.0.0"
actix-utils = "3.0.0-beta.4" actix-utils = "3.0.0"
derive_more = "0.99.5" derive_more = "0.99.5"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }

View File

@ -56,7 +56,7 @@ pub enum Resolver {
/// An interface for custom async DNS resolvers. /// An interface for custom async DNS resolvers.
/// ///
/// # Usage /// # Usage
/// ```rust /// ```
/// use std::net::SocketAddr; /// use std::net::SocketAddr;
/// ///
/// use actix_tls::connect::{Resolve, Resolver}; /// use actix_tls::connect::{Resolve, Resolver};

View File

@ -16,8 +16,8 @@ name = "actix_tracing"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-service = "2.0.0-beta.5" actix-service = "2.0.0"
actix-utils = "3.0.0-beta.4" actix-utils = "3.0.0"
tracing = "0.1" tracing = "0.1"
tracing-futures = "0.2" tracing-futures = "0.2"

View File

@ -3,6 +3,10 @@
## Unreleased - 2021-xx-xx ## Unreleased - 2021-xx-xx
## 3.0.0 - 2021-04-16
* No significant changes from `3.0.0-beta.4`.
## 3.0.0-beta.4 - 2021-04-01 ## 3.0.0-beta.4 - 2021-04-01
* Add `future::Either` type. [#305] * Add `future::Either` type. [#305]

View File

@ -1,14 +1,14 @@
[package] [package]
name = "actix-utils" name = "actix-utils"
version = "3.0.0-beta.4" version = "3.0.0"
authors = [ authors = [
"Nikolay Kim <fafhrd91@gmail.com>", "Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>", "Rob Ede <robjtede@icloud.com>",
] ]
description = "Utilities for the Actix ecosystem" description = "Various utilities used in the Actix ecosystem"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
repository = "https://github.com/actix/actix-net.git"
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
repository = "https://github.com/actix/actix-net"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
edition = "2018" edition = "2018"

View File

@ -1,4 +1,4 @@
//! Various utilities for the Actix ecosystem. //! Various utilities used in the Actix ecosystem.
#![deny(rust_2018_idioms, nonstandard_style)] #![deny(rust_2018_idioms, nonstandard_style)]
#![warn(missing_docs)] #![warn(missing_docs)]