diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index de9f9223..a96c03af 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -23,7 +23,7 @@ default = [] [dependencies] actix-rt = { version = "2.0.0", default-features = false } actix-service = "2.0.0-beta.5" -actix-utils = "3.0.0-beta.2" +actix-utils = "3.0.0-beta.4" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } log = "0.4" diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 7e913446..8fac9e51 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -10,7 +10,7 @@ use actix_rt::{ System, }; use actix_utils::counter::Counter; -use futures_core::future::LocalBoxFuture; +use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; @@ -274,19 +274,16 @@ impl ServerWorker { } fn shutdown(&mut self, force: bool) { - if force { - self.services.iter_mut().for_each(|srv| { - if srv.status == WorkerServiceStatus::Available { - srv.status = WorkerServiceStatus::Stopped; - } + self.services + .iter_mut() + .filter(|srv| srv.status == WorkerServiceStatus::Available) + .for_each(|srv| { + srv.status = if force { + WorkerServiceStatus::Stopped + } else { + WorkerServiceStatus::Stopping + }; }); - } else { - self.services.iter_mut().for_each(move |srv| { - if srv.status == WorkerServiceStatus::Available { - srv.status = WorkerServiceStatus::Stopping; - } - }); - } } fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result { @@ -367,18 +364,12 @@ impl Future for ServerWorker { return Poll::Ready(()); } else if graceful { self.shutdown(false); - let num = num_connections(); - if num != 0 { - info!("Graceful worker shutdown, {} connections", num); - self.state = WorkerState::Shutdown( - Box::pin(sleep(Duration::from_secs(1))), - Box::pin(sleep(self.config.shutdown_timeout)), - Some(result), - ); - } else { - let _ = result.send(true); - return Poll::Ready(()); - } + info!("Graceful worker shutdown, {} connections", num); + self.state = WorkerState::Shutdown( + Box::pin(sleep(Duration::from_secs(1))), + Box::pin(sleep(self.config.shutdown_timeout)), + Some(result), + ); } else { info!("Force shutdown worker, {} connections", num); self.shutdown(true); @@ -475,16 +466,15 @@ impl Future for ServerWorker { } } - match Pin::new(&mut self.rx).poll_recv(cx) { + match ready!(Pin::new(&mut self.rx).poll_recv(cx)) { // handle incoming io stream - Poll::Ready(Some(WorkerCommand(msg))) => { + Some(WorkerCommand(msg)) => { let guard = self.conns.get(); let _ = self.services[msg.token.0] .service .call((Some(guard), msg.io)); } - Poll::Pending => return Poll::Pending, - Poll::Ready(None) => return Poll::Ready(()), + None => return Poll::Ready(()), }; }, } diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 3f767a28..7fbd94b0 100755 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -43,7 +43,7 @@ uri = ["http"] actix-codec = "0.4.0-beta.1" actix-rt = { version = "2.2.0", default-features = false } actix-service = "2.0.0-beta.5" -actix-utils = "3.0.0-beta.2" +actix-utils = "3.0.0-beta.4" derive_more = "0.99.5" futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] } diff --git a/actix-tracing/Cargo.toml b/actix-tracing/Cargo.toml index 992edbf4..ec2e4a7c 100644 --- a/actix-tracing/Cargo.toml +++ b/actix-tracing/Cargo.toml @@ -17,8 +17,8 @@ path = "src/lib.rs" [dependencies] actix-service = "2.0.0-beta.5" +actix-utils = "3.0.0-beta.4" -futures-util = { version = "0.3.7", default-features = false } tracing = "0.1" tracing-futures = "0.2" diff --git a/actix-tracing/src/lib.rs b/actix-tracing/src/lib.rs index 89e93be1..fcc1488d 100644 --- a/actix-tracing/src/lib.rs +++ b/actix-tracing/src/lib.rs @@ -9,7 +9,7 @@ use core::marker::PhantomData; use actix_service::{ apply, ApplyTransform, IntoServiceFactory, Service, ServiceFactory, Transform, }; -use futures_util::future::{ok, Either, Ready}; +use actix_utils::future::{ok, Either, Ready}; use tracing_futures::{Instrument, Instrumented}; /// A `Service` implementation that automatically enters/exits tracing spans @@ -48,9 +48,9 @@ where .clone() .map(|span| tracing::span!(parent: &span, tracing::Level::INFO, "future")) { - Either::Right(fut.instrument(span)) + Either::right(fut.instrument(span)) } else { - Either::Left(fut) + Either::left(fut) } } } diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index 57ab7add..d14446de 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -3,6 +3,12 @@ ## Unreleased - 2021-xx-xx +## 3.0.0-beta.4 - 2021-04-01 +* Add `future::Either` type. [#305] + +[#305]: https://github.com/actix/actix-net/pull/305 + + ## 3.0.0-beta.3 - 2021-04-01 * Moved `mpsc` to own crate `local-channel`. [#301] * Moved `task::LocalWaker` to own crate `local-waker`. [#301] diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 019d6d98..8b593697 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-utils" -version = "3.0.0-beta.3" +version = "3.0.0-beta.4" authors = [ "Nikolay Kim ", "Rob Ede ", @@ -17,6 +17,7 @@ name = "actix_utils" path = "src/lib.rs" [dependencies] +pin-project-lite = "0.2" local-waker = "0.1" [dev-dependencies] diff --git a/actix-utils/src/future/either.rs b/actix-utils/src/future/either.rs new file mode 100644 index 00000000..77b2118d --- /dev/null +++ b/actix-utils/src/future/either.rs @@ -0,0 +1,91 @@ +//! A symmetric either future. + +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; + +pin_project! { + /// Combines two different futures that have the same output type. + /// + /// Construct variants with [`Either::left`] and [`Either::right`]. + /// + /// # Examples + /// ``` + /// use actix_utils::future::{ready, Ready, Either}; + /// + /// # async fn run() { + /// let res = Either::<_, Ready>::left(ready(42)); + /// assert_eq!(res.await, 42); + /// + /// let res = Either::, _>::right(ready(43)); + /// assert_eq!(res.await, 43); + /// # } + /// ``` + #[project = EitherProj] + #[derive(Debug, Clone)] + pub enum Either { + /// A value of type `L`. + #[allow(missing_docs)] + Left { #[pin] value: L }, + + /// A value of type `R`. + #[allow(missing_docs)] + Right { #[pin] value: R }, + } +} + +impl Either { + /// Creates new `Either` using left variant. + pub fn left(value: L) -> Either { + Either::Left { value } + } + + /// Creates new `Either` using right variant. + pub fn right(value: R) -> Either { + Either::Right { value } + } +} + +impl Either { + /// Unwraps into inner value when left and right have a common type. + pub fn into_inner(self) -> T { + match self { + Either::Left { value } => value, + Either::Right { value } => value, + } + } +} + +impl Future for Either +where + L: Future, + R: Future, +{ + type Output = L::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project() { + EitherProj::Left { value } => value.poll(cx), + EitherProj::Right { value } => value.poll(cx), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::future::{ready, Ready}; + + #[actix_rt::test] + async fn test_either() { + let res = Either::<_, Ready>::left(ready(42)); + assert_eq!(res.await, 42); + + let res = Either::, _>::right(ready(43)); + assert_eq!(res.await, 43); + } +} diff --git a/actix-utils/src/future/mod.rs b/actix-utils/src/future/mod.rs index 0ad84ec7..be3807bf 100644 --- a/actix-utils/src/future/mod.rs +++ b/actix-utils/src/future/mod.rs @@ -1,7 +1,9 @@ //! Asynchronous values. +mod either; mod poll_fn; mod ready; +pub use self::either::Either; pub use self::poll_fn::{poll_fn, PollFn}; pub use self::ready::{err, ok, ready, Ready}; diff --git a/actix-utils/src/future/poll_fn.rs b/actix-utils/src/future/poll_fn.rs index 2e5285d8..5e911bf5 100644 --- a/actix-utils/src/future/poll_fn.rs +++ b/actix-utils/src/future/poll_fn.rs @@ -7,7 +7,7 @@ use core::{ task::{Context, Poll}, }; -/// Create a future driven by the provided function that receives a task context. +/// Creates a future driven by the provided function that receives a task context. pub fn poll_fn(f: F) -> PollFn where F: FnMut(&mut Context<'_>) -> Poll, diff --git a/actix-utils/src/future/ready.rs b/actix-utils/src/future/ready.rs index be2ee146..4a01ada3 100644 --- a/actix-utils/src/future/ready.rs +++ b/actix-utils/src/future/ready.rs @@ -69,7 +69,7 @@ pub fn ready(val: T) -> Ready { Ready { val: Some(val) } } -/// Create a future that is immediately ready with a success value. +/// Creates a future that is immediately ready with a success value. /// /// # Examples /// ```no_run @@ -84,7 +84,7 @@ pub fn ok(val: T) -> Ready> { Ready { val: Some(Ok(val)) } } -/// Create a future that is immediately ready with an error value. +/// Creates a future that is immediately ready with an error value. /// /// # Examples /// ```no_run