From c33fb4ef9aba131c13382b71565e66d4f2761f82 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Nov 2021 14:55:24 +0000 Subject: [PATCH] fix join_all --- actix-server/src/join_all.rs | 72 +++++++++++++++++++++++++++++++++++- actix-server/src/server.rs | 10 ++--- actix-server/src/worker.rs | 5 ++- 3 files changed, 77 insertions(+), 10 deletions(-) diff --git a/actix-server/src/join_all.rs b/actix-server/src/join_all.rs index aa47d72d..ae68871c 100644 --- a/actix-server/src/join_all.rs +++ b/actix-server/src/join_all.rs @@ -4,13 +4,15 @@ use std::{ task::{Context, Poll}, }; +use futures_core::future::{BoxFuture, LocalBoxFuture}; + // a poor man's join future. joined future is only used when starting/stopping the server. // pin_project and pinned futures are overkill for this task. pub(crate) struct JoinAll { fut: Vec>, } -pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAll { +pub(crate) fn join_all(fut: Vec + Send + 'static>) -> JoinAll { let fut = fut .into_iter() .map(|f| JoinFuture::Future(Box::pin(f))) @@ -20,7 +22,7 @@ pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAl } enum JoinFuture { - Future(Pin>>), + Future(BoxFuture<'static, T>), Result(Option), } @@ -59,6 +61,63 @@ impl Future for JoinAll { } } +pub(crate) fn join_all_local( + fut: Vec + 'static>, +) -> JoinAllLocal { + let fut = fut + .into_iter() + .map(|f| JoinLocalFuture::LocalFuture(Box::pin(f))) + .collect(); + + JoinAllLocal { fut } +} + +// a poor man's join future. joined future is only used when starting/stopping the server. +// pin_project and pinned futures are overkill for this task. +pub(crate) struct JoinAllLocal { + fut: Vec>, +} + +enum JoinLocalFuture { + LocalFuture(LocalBoxFuture<'static, T>), + Result(Option), +} + +impl Unpin for JoinAllLocal {} + +impl Future for JoinAllLocal { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut ready = true; + + let this = self.get_mut(); + for fut in this.fut.iter_mut() { + if let JoinLocalFuture::LocalFuture(f) = fut { + match f.as_mut().poll(cx) { + Poll::Ready(t) => { + *fut = JoinLocalFuture::Result(Some(t)); + } + Poll::Pending => ready = false, + } + } + } + + if ready { + let mut res = Vec::new(); + for fut in this.fut.iter_mut() { + if let JoinLocalFuture::Result(f) = fut { + res.push(f.take().unwrap()); + } + } + + Poll::Ready(res) + } else { + Poll::Pending + } + } +} + #[cfg(test)] mod test { use super::*; @@ -73,4 +132,13 @@ mod test { assert_eq!(Err(3), res.next().unwrap()); assert_eq!(Ok(9), res.next().unwrap()); } + + #[actix_rt::test] + async fn test_join_all_local() { + let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; + let mut res = join_all_local(futs).await.into_iter(); + assert_eq!(Ok(1), res.next().unwrap()); + assert_eq!(Err(3), res.next().unwrap()); + assert_eq!(Ok(9), res.next().unwrap()); + } } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 4122de6b..e75cc537 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -7,7 +7,7 @@ use std::{ }; use actix_rt::{time::sleep, System}; -use futures_core::future::LocalBoxFuture; +use futures_core::future::BoxFuture; use log::{error, info, trace}; use tokio::sync::{ mpsc::{UnboundedReceiver, UnboundedSender}, @@ -45,8 +45,6 @@ pub(crate) enum ServerCommand { }, } -// TODO: docs + must use - /// Server /// /// # Shutdown Signals @@ -232,11 +230,11 @@ pub struct ServerInner { cmd_rx: UnboundedReceiver, signals: Option, waker_queue: WakerQueue, - stop_task: Option>, + stop_task: Option>, } impl ServerInner { - fn handle_cmd(&mut self, item: ServerCommand) -> Option> { + fn handle_cmd(&mut self, item: ServerCommand) -> Option> { match item { ServerCommand::Pause(tx) => { self.waker_queue.wake(WakerInterest::Pause); @@ -319,7 +317,7 @@ impl ServerInner { } } - fn handle_signal(&mut self, signal: Signal) -> Option> { + fn handle_signal(&mut self, signal: Signal) -> Option> { match signal { Signal::Int => { info!("SIGINT received; starting forced shutdown"); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 1c9d6135..0fedd8b9 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -24,7 +24,7 @@ use tokio::sync::{ }; use crate::{ - join_all::join_all, + join_all::join_all_local, service::{BoxedServerService, InternalServiceFactory}, socket::MioStream, waker_queue::{WakerInterest, WakerQueue}, @@ -324,10 +324,11 @@ impl ServerWorker { // a second spawn to run !Send future tasks. spawn(async move { - let res = join_all(fut) + let res = join_all_local(fut) .await .into_iter() .collect::, _>>(); + let services = match res { Ok(res) => res .into_iter()