Merge branch 'master' into refactor/simplify_server_future

This commit is contained in:
fakeshadow 2021-04-02 05:54:38 -07:00 committed by GitHub
commit 47cc858ab4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 129 additions and 39 deletions

View File

@ -23,7 +23,7 @@ default = []
[dependencies]
actix-rt = { version = "2.0.0", default-features = false }
actix-service = "2.0.0-beta.5"
actix-utils = "3.0.0-beta.2"
actix-utils = "3.0.0-beta.4"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
log = "0.4"

View File

@ -10,7 +10,7 @@ use actix_rt::{
System,
};
use actix_utils::counter::Counter;
use futures_core::future::LocalBoxFuture;
use futures_core::{future::LocalBoxFuture, ready};
use log::{error, info, trace};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot;
@ -274,20 +274,17 @@ impl ServerWorker {
}
fn shutdown(&mut self, force: bool) {
if force {
self.services.iter_mut().for_each(|srv| {
if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopped;
}
});
self.services
.iter_mut()
.filter(|srv| srv.status == WorkerServiceStatus::Available)
.for_each(|srv| {
srv.status = if force {
WorkerServiceStatus::Stopped
} else {
self.services.iter_mut().for_each(move |srv| {
if srv.status == WorkerServiceStatus::Available {
srv.status = WorkerServiceStatus::Stopping;
}
WorkerServiceStatus::Stopping
};
});
}
}
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
let mut ready = self.conns.available(cx);
@ -367,18 +364,12 @@ impl Future for ServerWorker {
return Poll::Ready(());
} else if graceful {
self.shutdown(false);
let num = num_connections();
if num != 0 {
info!("Graceful worker shutdown, {} connections", num);
self.state = WorkerState::Shutdown(
Box::pin(sleep(Duration::from_secs(1))),
Box::pin(sleep(self.config.shutdown_timeout)),
Some(result),
);
} else {
let _ = result.send(true);
return Poll::Ready(());
}
} else {
info!("Force shutdown worker, {} connections", num);
self.shutdown(true);
@ -475,16 +466,15 @@ impl Future for ServerWorker {
}
}
match Pin::new(&mut self.rx).poll_recv(cx) {
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
// handle incoming io stream
Poll::Ready(Some(WorkerCommand(msg))) => {
Some(WorkerCommand(msg)) => {
let guard = self.conns.get();
let _ = self.services[msg.token.0]
.service
.call((Some(guard), msg.io));
}
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => return Poll::Ready(()),
None => return Poll::Ready(()),
};
},
}

View File

@ -43,7 +43,7 @@ uri = ["http"]
actix-codec = "0.4.0-beta.1"
actix-rt = { version = "2.2.0", default-features = false }
actix-service = "2.0.0-beta.5"
actix-utils = "3.0.0-beta.2"
actix-utils = "3.0.0-beta.4"
derive_more = "0.99.5"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }

View File

@ -17,8 +17,8 @@ path = "src/lib.rs"
[dependencies]
actix-service = "2.0.0-beta.5"
actix-utils = "3.0.0-beta.4"
futures-util = { version = "0.3.7", default-features = false }
tracing = "0.1"
tracing-futures = "0.2"

View File

@ -9,7 +9,7 @@ use core::marker::PhantomData;
use actix_service::{
apply, ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform,
};
use futures_util::future::{ok, Either, Ready};
use actix_utils::future::{ok, Either, Ready};
use tracing_futures::{Instrument, Instrumented};
/// A `Service` implementation that automatically enters/exits tracing spans
@ -48,9 +48,9 @@ where
.clone()
.map(|span| tracing::span!(parent: &span, tracing::Level::INFO, "future"))
{
Either::Right(fut.instrument(span))
Either::right(fut.instrument(span))
} else {
Either::Left(fut)
Either::left(fut)
}
}
}

View File

@ -3,6 +3,12 @@
## Unreleased - 2021-xx-xx
## 3.0.0-beta.4 - 2021-04-01
* Add `future::Either` type. [#305]
[#305]: https://github.com/actix/actix-net/pull/305
## 3.0.0-beta.3 - 2021-04-01
* Moved `mpsc` to own crate `local-channel`. [#301]
* Moved `task::LocalWaker` to own crate `local-waker`. [#301]

View File

@ -1,6 +1,6 @@
[package]
name = "actix-utils"
version = "3.0.0-beta.3"
version = "3.0.0-beta.4"
authors = [
"Nikolay Kim <fafhrd91@gmail.com>",
"Rob Ede <robjtede@icloud.com>",
@ -17,6 +17,7 @@ name = "actix_utils"
path = "src/lib.rs"
[dependencies]
pin-project-lite = "0.2"
local-waker = "0.1"
[dev-dependencies]

View File

@ -0,0 +1,91 @@
//! A symmetric either future.
use core::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use pin_project_lite::pin_project;
pin_project! {
/// Combines two different futures that have the same output type.
///
/// Construct variants with [`Either::left`] and [`Either::right`].
///
/// # Examples
/// ```
/// use actix_utils::future::{ready, Ready, Either};
///
/// # async fn run() {
/// let res = Either::<_, Ready<usize>>::left(ready(42));
/// assert_eq!(res.await, 42);
///
/// let res = Either::<Ready<usize>, _>::right(ready(43));
/// assert_eq!(res.await, 43);
/// # }
/// ```
#[project = EitherProj]
#[derive(Debug, Clone)]
pub enum Either<L, R> {
/// A value of type `L`.
#[allow(missing_docs)]
Left { #[pin] value: L },
/// A value of type `R`.
#[allow(missing_docs)]
Right { #[pin] value: R },
}
}
impl<L, R> Either<L, R> {
/// Creates new `Either` using left variant.
pub fn left(value: L) -> Either<L, R> {
Either::Left { value }
}
/// Creates new `Either` using right variant.
pub fn right(value: R) -> Either<L, R> {
Either::Right { value }
}
}
impl<T> Either<T, T> {
/// Unwraps into inner value when left and right have a common type.
pub fn into_inner(self) -> T {
match self {
Either::Left { value } => value,
Either::Right { value } => value,
}
}
}
impl<L, R> Future for Either<L, R>
where
L: Future,
R: Future<Output = L::Output>,
{
type Output = L::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project() {
EitherProj::Left { value } => value.poll(cx),
EitherProj::Right { value } => value.poll(cx),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::future::{ready, Ready};
#[actix_rt::test]
async fn test_either() {
let res = Either::<_, Ready<usize>>::left(ready(42));
assert_eq!(res.await, 42);
let res = Either::<Ready<usize>, _>::right(ready(43));
assert_eq!(res.await, 43);
}
}

View File

@ -1,7 +1,9 @@
//! Asynchronous values.
mod either;
mod poll_fn;
mod ready;
pub use self::either::Either;
pub use self::poll_fn::{poll_fn, PollFn};
pub use self::ready::{err, ok, ready, Ready};

View File

@ -7,7 +7,7 @@ use core::{
task::{Context, Poll},
};
/// Create a future driven by the provided function that receives a task context.
/// Creates a future driven by the provided function that receives a task context.
pub fn poll_fn<F, T>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,

View File

@ -69,7 +69,7 @@ pub fn ready<T>(val: T) -> Ready<T> {
Ready { val: Some(val) }
}
/// Create a future that is immediately ready with a success value.
/// Creates a future that is immediately ready with a success value.
///
/// # Examples
/// ```no_run
@ -84,7 +84,7 @@ pub fn ok<T, E>(val: T) -> Ready<Result<T, E>> {
Ready { val: Some(Ok(val)) }
}
/// Create a future that is immediately ready with an error value.
/// Creates a future that is immediately ready with an error value.
///
/// # Examples
/// ```no_run