convert AndThen to use RefCell

This commit is contained in:
Maksym Vorobiov 2020-03-05 18:26:20 +02:00
parent 693d5132a9
commit 1efafd4d02
4 changed files with 116 additions and 8 deletions

View File

@ -10,7 +10,7 @@ use actix_service::IntoService;
use std::future::Future;
use std::pin::Pin;
use futures_util::future::TryFutureExt;
use actix_service::boxed::BoxFuture;
use actix_service::{boxed::BoxFuture, pipeline};
/*
@ -319,6 +319,7 @@ pub fn service_benches() {
bench_async_service(&mut criterion, AndThenUC::new(svc1.into_service(), svc2.into_service()), "AndThen with UnsafeCell");
bench_async_service(&mut criterion, AndThenRC::new(svc1.into_service(), svc2.into_service()), "AndThen with RefCell");
bench_async_service(&mut criterion, AndThenRCFuture::new(svc1.into_service(), svc2.into_service()), "AndThen with RefCell via future::and_then");
bench_async_service(&mut criterion, pipeline(svc1.into_service()).and_then(svc2.into_service()), "Pipeline::and_then based RC<RefCell>");
}
criterion_main!(service_benches);

View File

@ -4,13 +4,13 @@ use std::rc::Rc;
use std::task::{Context, Poll};
use super::{Service, ServiceFactory};
use crate::cell::Cell;
use crate::axcell::AXCell;
/// Service for the `and_then` combinator, chaining a computation onto the end
/// of another service which completes successfully.
///
/// This is created by the `ServiceExt::and_then` method.
pub(crate) struct AndThenService<A, B>(Cell<(A, B)>);
pub(crate) struct AndThenService<A, B>(AXCell<(A, B)>);
impl<A, B> AndThenService<A, B> {
/// Create new `AndThen` combinator
@ -19,7 +19,7 @@ impl<A, B> AndThenService<A, B> {
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
Self(Cell::new((a, b)))
Self(AXCell::new((a, b)))
}
}
@ -40,7 +40,7 @@ where
type Future = AndThenServiceResponse<A, B>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let srv = self.0.get_mut();
let mut srv = self.0.get_mut();
let not_ready = !srv.0.poll_ready(cx)?.is_ready();
if !srv.1.poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
@ -50,8 +50,9 @@ where
}
fn call(&mut self, req: A::Request) -> Self::Future {
let fut = self.0.get_mut().0.call(req);
AndThenServiceResponse {
state: State::A(self.0.get_mut().0.call(req), Some(self.0.clone())),
state: State::A(fut, Some(self.0.clone())),
}
}
}
@ -72,7 +73,7 @@ where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
A(#[pin] A::Future, Option<Cell<(A, B)>>),
A(#[pin] A::Future, Option<AXCell<(A, B)>>),
B(#[pin] B::Future),
Empty,
}
@ -257,9 +258,11 @@ mod tests {
use std::rc::Rc;
use std::task::{Context, Poll};
use futures_util::future::{lazy, ok, ready, Ready};
use futures_util::future::{lazy, ok, ready, Ready, join_all, Future, FutureExt};
use actix_rt::time::delay_for;
use crate::{fn_factory, pipeline, pipeline_factory, Service, ServiceFactory};
use std::pin::Pin;
struct Srv1(Rc<Cell<usize>>);
@ -298,6 +301,32 @@ mod tests {
}
}
#[derive(Clone)]
struct SrvSlow(Rc<Cell<usize>>);
impl Service for SrvSlow {
type Request = &'static str;
type Response = &'static str;
type Error = ();
type Future = Pin<Box<dyn Future<Output=Result<Self::Response, Self::Error>>>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
Poll::Ready(Ok(()))
}
fn call(&mut self, _: &'static str) -> Self::Future {
let cnt = self.0.clone();
Box::pin(
delay_for(std::time::Duration::from_millis(10))
.then(|()| async move {
cnt.set(cnt.get() + 1);
Ok("slow")
})
)
}
}
#[actix_rt::test]
async fn test_poll_ready() {
let cnt = Rc::new(Cell::new(0));
@ -329,4 +358,33 @@ mod tests {
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv1", "srv2"));
}
#[actix_rt::test]
async fn test_concurrency_slow_first() {
let cnt = Rc::new(Cell::new(0));
let mut srv = pipeline(SrvSlow(cnt.clone())).and_then(Srv1(cnt.clone()));
let futures = (1u16..=1000).map(|_| srv.call("req"));
let res = join_all(futures).await;
assert!(res.iter().all(|r| r.is_ok()));
assert_eq!(cnt.get(), 1000);
}
#[actix_rt::test]
async fn test_concurrency_slow_last() {
let cnt = Rc::new(Cell::new(0));
let mut srv = pipeline(Srv1(cnt.clone())).and_then(SrvSlow(cnt.clone()));
let futures = (1u16..=1000).map(|_| srv.call("req"));
let res = join_all(futures).await;
assert!(res.iter().all(|r| r.is_ok()));
assert_eq!(cnt.get(), 1000);
}
#[actix_rt::test]
async fn test_concurrency_fast() {
let cnt = Rc::new(Cell::new(0));
let mut srv = pipeline(Srv1(cnt.clone())).and_then(Srv2(cnt.clone()));
let futures = (1u16..=1000).map(|_| srv.call("req"));
let res = join_all(futures).await;
assert!(res.iter().all(|r| r.is_ok()));
}
}

View File

@ -0,0 +1,48 @@
//! Custom cell impl, internal use only
use std::task::{Context, Poll};
use std::{cell::{RefCell, RefMut}, fmt, rc::Rc};
pub(crate) struct AXCell<T> {
inner: Rc<RefCell<T>>,
}
impl<T> Clone for AXCell<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: fmt::Debug> fmt::Debug for AXCell<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}
impl<T> AXCell<T> {
pub(crate) fn new(inner: T) -> Self {
Self {
inner: Rc::new(RefCell::new(inner)),
}
}
pub(crate) fn get_mut(&mut self) -> RefMut<'_, T> {
self.inner.borrow_mut()
}
}
impl<T: crate::Service> crate::Service for AXCell<T> {
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().poll_ready(cx)
}
fn call(&mut self, req: Self::Request) -> Self::Future {
self.get_mut().call(req)
}
}

View File

@ -13,6 +13,7 @@ mod apply;
mod apply_cfg;
pub mod boxed;
mod cell;
mod axcell;
mod fn_service;
mod map;
mod map_config;