From 1efafd4d02080fa81ff876d41e28fcbfa6e2310e Mon Sep 17 00:00:00 2001 From: Maksym Vorobiov Date: Thu, 5 Mar 2020 18:26:20 +0200 Subject: [PATCH] convert AndThen to use RefCell --- actix-service/benches/and_then.rs | 3 +- actix-service/src/and_then.rs | 72 ++++++++++++++++++++++++++++--- actix-service/src/axcell.rs | 48 +++++++++++++++++++++ actix-service/src/lib.rs | 1 + 4 files changed, 116 insertions(+), 8 deletions(-) create mode 100644 actix-service/src/axcell.rs diff --git a/actix-service/benches/and_then.rs b/actix-service/benches/and_then.rs index 1094fdaf..58c00b6c 100644 --- a/actix-service/benches/and_then.rs +++ b/actix-service/benches/and_then.rs @@ -10,7 +10,7 @@ use actix_service::IntoService; use std::future::Future; use std::pin::Pin; use futures_util::future::TryFutureExt; -use actix_service::boxed::BoxFuture; +use actix_service::{boxed::BoxFuture, pipeline}; /* @@ -319,6 +319,7 @@ pub fn service_benches() { bench_async_service(&mut criterion, AndThenUC::new(svc1.into_service(), svc2.into_service()), "AndThen with UnsafeCell"); bench_async_service(&mut criterion, AndThenRC::new(svc1.into_service(), svc2.into_service()), "AndThen with RefCell"); bench_async_service(&mut criterion, AndThenRCFuture::new(svc1.into_service(), svc2.into_service()), "AndThen with RefCell via future::and_then"); + bench_async_service(&mut criterion, pipeline(svc1.into_service()).and_then(svc2.into_service()), "Pipeline::and_then based RC"); } criterion_main!(service_benches); diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 76ed35e9..20ea1612 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -4,13 +4,13 @@ use std::rc::Rc; use std::task::{Context, Poll}; use super::{Service, ServiceFactory}; -use crate::cell::Cell; +use crate::axcell::AXCell; /// Service for the `and_then` combinator, chaining a computation onto the end /// of another service which completes successfully. /// /// This is created by the `ServiceExt::and_then` method. -pub(crate) struct AndThenService(Cell<(A, B)>); +pub(crate) struct AndThenService(AXCell<(A, B)>); impl AndThenService { /// Create new `AndThen` combinator @@ -19,7 +19,7 @@ impl AndThenService { A: Service, B: Service, { - Self(Cell::new((a, b))) + Self(AXCell::new((a, b))) } } @@ -40,7 +40,7 @@ where type Future = AndThenServiceResponse; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let srv = self.0.get_mut(); + let mut srv = self.0.get_mut(); let not_ready = !srv.0.poll_ready(cx)?.is_ready(); if !srv.1.poll_ready(cx)?.is_ready() || not_ready { Poll::Pending @@ -50,8 +50,9 @@ where } fn call(&mut self, req: A::Request) -> Self::Future { + let fut = self.0.get_mut().0.call(req); AndThenServiceResponse { - state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())), + state: State::A(fut, Some(self.0.clone())), } } } @@ -72,7 +73,7 @@ where A: Service, B: Service, { - A(#[pin] A::Future, Option>), + A(#[pin] A::Future, Option>), B(#[pin] B::Future), Empty, } @@ -257,9 +258,11 @@ mod tests { use std::rc::Rc; use std::task::{Context, Poll}; - use futures_util::future::{lazy, ok, ready, Ready}; + use futures_util::future::{lazy, ok, ready, Ready, join_all, Future, FutureExt}; + use actix_rt::time::delay_for; use crate::{fn_factory, pipeline, pipeline_factory, Service, ServiceFactory}; + use std::pin::Pin; struct Srv1(Rc>); @@ -298,6 +301,32 @@ mod tests { } } + #[derive(Clone)] + struct SrvSlow(Rc>); + + impl Service for SrvSlow { + type Request = &'static str; + type Response = &'static str; + type Error = (); + type Future = Pin>>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + self.0.set(self.0.get() + 1); + Poll::Ready(Ok(())) + } + + fn call(&mut self, _: &'static str) -> Self::Future { + let cnt = self.0.clone(); + Box::pin( + delay_for(std::time::Duration::from_millis(10)) + .then(|()| async move { + cnt.set(cnt.get() + 1); + Ok("slow") + }) + ) + } + } + #[actix_rt::test] async fn test_poll_ready() { let cnt = Rc::new(Cell::new(0)); @@ -329,4 +358,33 @@ mod tests { assert!(res.is_ok()); assert_eq!(res.unwrap(), ("srv1", "srv2")); } + + #[actix_rt::test] + async fn test_concurrency_slow_first() { + let cnt = Rc::new(Cell::new(0)); + let mut srv = pipeline(SrvSlow(cnt.clone())).and_then(Srv1(cnt.clone())); + let futures = (1u16..=1000).map(|_| srv.call("req")); + let res = join_all(futures).await; + assert!(res.iter().all(|r| r.is_ok())); + assert_eq!(cnt.get(), 1000); + } + + #[actix_rt::test] + async fn test_concurrency_slow_last() { + let cnt = Rc::new(Cell::new(0)); + let mut srv = pipeline(Srv1(cnt.clone())).and_then(SrvSlow(cnt.clone())); + let futures = (1u16..=1000).map(|_| srv.call("req")); + let res = join_all(futures).await; + assert!(res.iter().all(|r| r.is_ok())); + assert_eq!(cnt.get(), 1000); + } + + #[actix_rt::test] + async fn test_concurrency_fast() { + let cnt = Rc::new(Cell::new(0)); + let mut srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt.clone())); + let futures = (1u16..=1000).map(|_| srv.call("req")); + let res = join_all(futures).await; + assert!(res.iter().all(|r| r.is_ok())); + } } diff --git a/actix-service/src/axcell.rs b/actix-service/src/axcell.rs new file mode 100644 index 00000000..0e2f286f --- /dev/null +++ b/actix-service/src/axcell.rs @@ -0,0 +1,48 @@ +//! Custom cell impl, internal use only +use std::task::{Context, Poll}; +use std::{cell::{RefCell, RefMut}, fmt, rc::Rc}; + +pub(crate) struct AXCell { + inner: Rc>, +} + +impl Clone for AXCell { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl fmt::Debug for AXCell { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} + +impl AXCell { + pub(crate) fn new(inner: T) -> Self { + Self { + inner: Rc::new(RefCell::new(inner)), + } + } + + pub(crate) fn get_mut(&mut self) -> RefMut<'_, T> { + self.inner.borrow_mut() + } +} + +impl crate::Service for AXCell { + type Request = T::Request; + type Response = T::Response; + type Error = T::Error; + type Future = T::Future; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.get_mut().poll_ready(cx) + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + self.get_mut().call(req) + } +} diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 90d4c790..04a72814 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -13,6 +13,7 @@ mod apply; mod apply_cfg; pub mod boxed; mod cell; +mod axcell; mod fn_service; mod map; mod map_config;