update actix-utils

This commit is contained in:
Nikolay Kim 2019-11-14 04:55:23 +06:00
parent 0be859d440
commit eb07b5477b
26 changed files with 1044 additions and 760 deletions

View File

@ -28,8 +28,8 @@ pub struct Fuse<T, U>(pub T, pub U);
impl<T, U> Framed<T, U>
where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
{
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
@ -223,43 +223,65 @@ impl<T, U> Framed<T, U> {
}
}
impl<T, U> Framed<T, U>
where
T: AsyncRead + Unpin,
U: Decoder + Unpin,
{
pub fn poll_next_item(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<U::Item, U::Error>>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}
impl<T, U> Stream for Framed<T, U>
where
T: AsyncRead,
U: Decoder,
T: AsyncRead + Unpin,
U: Decoder + Unpin,
{
type Item = Result<U::Item, U::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx) }
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.as_mut().inner).poll_next(cx)
}
}
impl<T, U> Sink<U::Item> for Framed<T, U>
where
T: AsyncWrite,
U: Encoder,
T: AsyncWrite + Unpin,
U: Encoder + Unpin,
U::Error: From<io::Error>,
{
type Error = U::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_ready(cx) }
fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.as_mut().inner.get_mut()).poll_ready(cx)
}
fn start_send(self: Pin<&mut Self>, item: <U as Encoder>::Item) -> Result<(), Self::Error> {
unsafe {
self.map_unchecked_mut(|s| s.inner.get_mut())
.start_send(item)
}
fn start_send(
mut self: Pin<&mut Self>,
item: <U as Encoder>::Item,
) -> Result<(), Self::Error> {
Pin::new(&mut self.as_mut().inner.get_mut()).start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_flush(cx) }
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.as_mut().inner.get_mut()).poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_close(cx) }
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.as_mut().inner.get_mut()).poll_close(cx)
}
}

View File

@ -39,12 +39,11 @@ actix-utils = "0.4.0"
actix-rt = "0.2.5"
derive_more = "0.15"
either = "1.5.2"
futures = "0.1.25"
futures = "0.3.1"
http = { version = "0.1.17", optional = true }
log = "0.4"
tokio-tcp = "0.1.3"
tokio-current-thread = "0.1.5"
trust-dns-resolver = { version="0.11.0", default-features = false }
tokio-net = "=0.2.0-alpha.6"
trust-dns-resolver = { version="0.18.0-alpha.1", default-features = false }
# openssl
openssl = { version="0.10", optional = true }

View File

@ -1,6 +1,6 @@
[package]
name = "actix-ioframe"
version = "0.1.1"
version = "0.2.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix framed service"
keywords = ["network", "framework", "async", "futures"]
@ -22,8 +22,8 @@ actix-service = "0.4.1"
actix-codec = "0.1.2"
bytes = "0.4"
either = "1.5.2"
futures = "0.1.25"
tokio-current-thread = "0.1.4"
futures = "0.3.1"
tokio-executor = "=0.2.0-alpha.6"
log = "0.4"
[dev-dependencies]

View File

@ -4,7 +4,7 @@ use std::{net, thread, time};
use actix_codec::{BytesCodec, Framed};
use actix_server::{Io, Server, ServerConfig};
use actix_service::{service_fn, service_fn2, service_fn_config, IntoService};
use actix_service::{factory_fn_cfg, service_fn, service_fn2};
use bytes::Bytes;
use futures::{future::ok, SinkExt};
use net2::TcpBuilder;
@ -28,9 +28,9 @@ fn test_bind() {
let sys = actix_rt::System::new("test");
let srv = Server::build()
.bind("test", addr, move || {
service_fn_config(move |cfg: &ServerConfig| {
factory_fn_cfg(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr);
ok::<_, ()>((|_| ok::<_, ()>(())).into_service())
ok::<_, ()>(service_fn2(|_| ok::<_, ()>(())))
})
})
.unwrap()
@ -76,9 +76,9 @@ fn test_listen() {
let lst = net::TcpListener::bind(addr).unwrap();
let srv = Server::build()
.listen("test", lst, move || {
service_fn_config(move |cfg: &ServerConfig| {
factory_fn_cfg(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr);
ok::<_, ()>((|_| ok::<_, ()>(())).into_service())
ok::<_, ()>(service_fn2(|_| ok::<_, ()>(())))
})
})
.unwrap()
@ -105,7 +105,7 @@ fn test_start() {
let srv: Server = Server::build()
.backlog(100)
.bind("test", addr, move || {
service_fn_config(move |cfg: &ServerConfig| {
factory_fn_cfg(move |cfg: &ServerConfig| {
assert_eq!(cfg.local_addr(), addr);
let srv = service_fn2(|io: Io<TcpStream>| {

View File

@ -24,7 +24,7 @@ path = "src/lib.rs"
[dependencies]
futures = "0.3.1"
pin-project = "0.4.0-alpha.11"
pin-project = "0.4.5"
[dev-dependencies]
tokio = "0.2.0-alpha.5"

View File

@ -31,10 +31,11 @@ where
}
/// Create `ServiceFactory` for function that can produce services
pub fn service_fn_factory<S, F, Cfg, Fut, Err>(
pub fn factory_fn<S, F, Cfg, Fut, Err>(
f: F,
) -> impl ServiceFactory<
Config = Cfg,
Service = S,
Request = S::Request,
Response = S::Response,
Error = S::Error,
@ -50,10 +51,11 @@ where
}
/// Create `ServiceFactory` for function that can produce services with configuration
pub fn service_fn_config<F, Fut, Cfg, Srv, Err>(
pub fn factory_fn_cfg<F, Fut, Cfg, Srv, Err>(
f: F,
) -> impl ServiceFactory<
Config = Cfg,
Service = Srv,
Request = Srv::Request,
Response = Srv::Response,
Error = Srv::Error,

View File

@ -22,11 +22,11 @@ mod transform_err;
pub use self::apply::{apply_fn, apply_fn_factory};
pub use self::apply_cfg::{apply_cfg, apply_cfg_factory};
pub use self::fn_service::{service_fn, service_fn2, service_fn_config, service_fn_factory};
pub use self::fn_service::{factory_fn, factory_fn_cfg, service_fn, service_fn2};
pub use self::into::{into_factory, into_service, ServiceFactoryMapper, ServiceMapper};
pub use self::map_config::{map_config, unit_config, MappedConfig};
pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory};
pub use self::transform::{apply_transform, IntoTransform, Transform};
pub use self::transform::{apply, IntoTransform, Transform};
/// An asynchronous function from `Request` to a `Response`.
pub trait Service {

View File

@ -4,7 +4,7 @@ use std::rc::Rc;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::transform_err::{TransformFromErr, TransformMapInitErr};
use crate::transform_err::TransformMapInitErr;
use crate::{IntoServiceFactory, Service, ServiceFactory};
use pin_project::pin_project;
@ -49,32 +49,6 @@ pub trait Transform<S> {
{
TransformMapInitErr::new(self, f)
}
/// Map this service's init error to any error implementing `From` for
/// this service`s `Error`.
///
/// Note that this function consumes the receiving transform and returns a
/// wrapped version of it.
fn from_err<E>(self) -> TransformFromErr<Self, S, E>
where
Self: Sized,
E: From<Self::InitError>,
{
TransformFromErr::new(self)
}
// /// Map this service's init error to service's init error
// /// if it is implementing `Into` to this service`s `InitError`.
// ///
// /// Note that this function consumes the receiving transform and returns a
// /// wrapped version of it.
// fn into_err<E>(self) -> TransformIntoErr<Self, S>
// where
// Self: Sized,
// Self::InitError: From<Self::InitError>,
// {
// TransformFromErr::new(self)
// }
}
impl<T, S> Transform<S> for Rc<T>
@ -127,10 +101,10 @@ where
}
}
/// Apply transform to service factory. Function returns
/// Apply transform to a service. Function returns
/// services factory that in initialization creates
/// service and applies transform to this service.
pub fn apply_transform<T, S, F, U>(
pub fn apply<T, S, F, U>(
t: F,
service: U,
) -> impl ServiceFactory<

View File

@ -89,79 +89,3 @@ where
this.fut.poll(cx).map_err(this.f)
}
}
/// Transform for the `from_err` combinator, changing the type of a new
/// transform's init error.
///
/// This is created by the `Transform::from_err` method.
pub struct TransformFromErr<T, S, E> {
t: T,
e: PhantomData<(S, E)>,
}
impl<T, S, E> TransformFromErr<T, S, E>
where
T: Transform<S>,
E: From<T::InitError>,
{
/// Create new `TransformFromErr` new transform instance
pub fn new(t: T) -> Self {
Self { t, e: PhantomData }
}
}
impl<T, S, E> Clone for TransformFromErr<T, S, E>
where
T: Clone,
{
fn clone(&self) -> Self {
Self {
t: self.t.clone(),
e: PhantomData,
}
}
}
impl<T, S, E> Transform<S> for TransformFromErr<T, S, E>
where
T: Transform<S>,
E: From<T::InitError>,
{
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Transform = T::Transform;
type InitError = E;
type Future = TransformFromErrFuture<T, S, E>;
fn new_transform(&self, service: S) -> Self::Future {
TransformFromErrFuture {
fut: self.t.new_transform(service),
_t: PhantomData,
}
}
}
#[pin_project]
pub struct TransformFromErrFuture<T, S, E>
where
T: Transform<S>,
E: From<T::InitError>,
{
#[pin]
fut: T::Future,
_t: PhantomData<E>,
}
impl<T, S, E> Future for TransformFromErrFuture<T, S, E>
where
T: Transform<S>,
E: From<T::InitError>,
{
type Output = Result<T::Transform, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx).map_err(E::from)
}
}

View File

@ -24,6 +24,6 @@ actix-service = "0.4.2"
log = "0.4"
net2 = "0.2"
futures = "0.1"
tokio-tcp = "0.1"
tokio-reactor = "0.1"
futures = "0.3.1"
tokio = "0.2.0-alpha.6"
tokio-net = { version = "0.2.0-alpha.6" }

View File

@ -3,12 +3,12 @@ use std::sync::mpsc;
use std::{net, thread};
use actix_rt::System;
use actix_server::{Server, ServerBuilder, StreamServiceFactory};
use actix_server::{Server, ServerBuilder, ServiceFactory};
pub use actix_server_config::{Io, ServerConfig};
use net2::TcpBuilder;
use tokio_reactor::Handle;
use tokio_tcp::TcpStream;
use tokio_net::driver::Handle;
use tokio_net::tcp::TcpStream;
mod rt;
pub use self::rt::*;
@ -75,7 +75,7 @@ impl TestServer {
}
/// Start new test server with application factory
pub fn with<F: StreamServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
let (tx, rx) = mpsc::channel();
// run server in separate thread

View File

@ -1,9 +1,11 @@
//! Various helpers for Actix applications to use during testing.
use std::cell::RefCell;
use std::future::Future;
use actix_rt::{System, SystemRunner};
use actix_service::Service;
use futures::future::{lazy, Future, IntoFuture};
use futures::future::{lazy, FutureExt};
// use futures_util::future::FutureExt;
thread_local! {
static RT: RefCell<Inner> = {
@ -35,11 +37,11 @@ impl Drop for Inner {
///
/// Note that this function is intended to be used only for testing purpose.
/// This function panics on nested call.
pub fn block_on<F>(f: F) -> Result<F::Item, F::Error>
pub fn block_on<F>(f: F) -> F::Output
where
F: IntoFuture,
F: Future,
{
RT.with(move |rt| rt.borrow_mut().get_mut().block_on(f.into_future()))
RT.with(move |rt| rt.borrow_mut().get_mut().block_on(f))
}
/// Runs the provided function, blocking the current thread until the result
@ -52,21 +54,21 @@ where
///
/// Note that this function is intended to be used only for testing purpose.
/// This function panics on nested call.
pub fn block_fn<F, R>(f: F) -> Result<R::Item, R::Error>
pub fn block_fn<F, R>(f: F) -> F::Output
where
F: FnOnce() -> R,
R: IntoFuture,
R: Future,
{
RT.with(move |rt| rt.borrow_mut().get_mut().block_on(lazy(f)))
RT.with(move |rt| rt.borrow_mut().get_mut().block_on(lazy(|_| f())))
}
/// Spawn future to the current test runtime.
pub fn spawn<F>(fut: F)
where
F: Future<Item = (), Error = ()> + 'static,
F: Future + 'static,
{
run_on(move || {
actix_rt::spawn(fut);
actix_rt::spawn(fut.map(|_| ()));
});
}
@ -78,12 +80,7 @@ pub fn run_on<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
RT.with(move |rt| {
rt.borrow_mut()
.get_mut()
.block_on(lazy(|| Ok::<_, ()>(f())))
})
.unwrap()
RT.with(move |rt| rt.borrow_mut().get_mut().block_on(lazy(|_| f())))
}
/// Calls service and waits for response future completion.

View File

@ -1,6 +1,6 @@
[package]
name = "actix-utils"
version = "0.4.7"
version = "0.5.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services"
keywords = ["network", "framework", "async", "futures"]
@ -22,9 +22,10 @@ actix-service = "0.4.1"
actix-codec = "0.1.2"
bytes = "0.4"
either = "1.5.2"
futures = "0.1.25"
tokio-timer = "0.2.8"
tokio-current-thread = "0.1.4"
futures = "0.3.1"
pin-project = "0.4.5"
tokio-timer = "0.3.0-alpha.6"
tokio-executor = "=0.2.0-alpha.6"
log = "0.4"
[dev-dependencies]

View File

@ -1,7 +1,8 @@
use std::cell::Cell;
use std::rc::Rc;
use std::task;
use futures::task::AtomicTask;
use crate::task::LocalWaker;
#[derive(Clone)]
/// Simple counter with ability to notify task on reaching specific number
@ -12,7 +13,7 @@ pub struct Counter(Rc<CounterInner>);
struct CounterInner {
count: Cell<usize>,
capacity: usize,
task: AtomicTask,
task: LocalWaker,
}
impl Counter {
@ -21,7 +22,7 @@ impl Counter {
Counter(Rc::new(CounterInner {
capacity,
count: Cell::new(0),
task: AtomicTask::new(),
task: LocalWaker::new(),
}))
}
@ -32,8 +33,8 @@ impl Counter {
/// Check if counter is not at capacity. If counter at capacity
/// it registers notification for current task.
pub fn available(&self) -> bool {
self.0.available()
pub fn available(&self, cx: &mut task::Context) -> bool {
self.0.available(cx)
}
/// Get total number of acquired counts
@ -66,15 +67,15 @@ impl CounterInner {
let num = self.count.get();
self.count.set(num - 1);
if num == self.capacity {
self.task.notify();
self.task.wake();
}
}
fn available(&self) -> bool {
fn available(&self, cx: &mut task::Context) -> bool {
if self.count.get() < self.capacity {
true
} else {
self.task.register();
self.task.register(cx.waker());
false
}
}

View File

@ -1,6 +1,10 @@
//! Contains `Either` service and related types and functions.
use actix_service::{IntoNewService, NewService, Service};
use futures::{future, try_ready, Async, Future, IntoFuture, Poll};
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{Service, ServiceFactory};
use futures::{future, ready, Future};
use pin_project::pin_project;
/// Combine two different service types into a single type.
///
@ -31,21 +35,21 @@ where
type Error = A::Error;
type Future = future::Either<A::Future, B::Future>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
let left = self.left.poll_ready()?;
let right = self.right.poll_ready()?;
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
let left = self.left.poll_ready(cx)?;
let right = self.right.poll_ready(cx)?;
if left.is_ready() && right.is_ready() {
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
fn call(&mut self, req: either::Either<A::Request, B::Request>) -> Self::Future {
match req {
either::Either::Left(req) => future::Either::A(self.left.call(req)),
either::Either::Right(req) => future::Either::B(self.right.call(req)),
either::Either::Left(req) => future::Either::Left(self.left.call(req)),
either::Either::Right(req) => future::Either::Right(self.right.call(req)),
}
}
}
@ -57,29 +61,24 @@ pub struct Either<A, B> {
}
impl<A, B> Either<A, B> {
pub fn new<F1, F2>(srv_a: F1, srv_b: F2) -> Either<A, B>
pub fn new(left: A, right: B) -> Either<A, B>
where
A: NewService,
B: NewService<
A: ServiceFactory,
B: ServiceFactory<
Config = A::Config,
Response = A::Response,
Error = A::Error,
InitError = A::InitError,
>,
F1: IntoNewService<A>,
F2: IntoNewService<B>,
{
Either {
left: srv_a.into_new_service(),
right: srv_b.into_new_service(),
}
Either { left, right }
}
}
impl<A, B> NewService for Either<A, B>
impl<A, B> ServiceFactory for Either<A, B>
where
A: NewService,
B: NewService<
A: ServiceFactory,
B: ServiceFactory<
Config = A::Config,
Response = A::Response,
Error = A::Error,
@ -113,37 +112,41 @@ impl<A: Clone, B: Clone> Clone for Either<A, B> {
}
}
#[pin_project]
#[doc(hidden)]
pub struct EitherNewService<A: NewService, B: NewService> {
pub struct EitherNewService<A: ServiceFactory, B: ServiceFactory> {
left: Option<A::Service>,
right: Option<B::Service>,
left_fut: <A::Future as IntoFuture>::Future,
right_fut: <B::Future as IntoFuture>::Future,
#[pin]
left_fut: A::Future,
#[pin]
right_fut: B::Future,
}
impl<A, B> Future for EitherNewService<A, B>
where
A: NewService,
B: NewService<Response = A::Response, Error = A::Error, InitError = A::InitError>,
A: ServiceFactory,
B: ServiceFactory<Response = A::Response, Error = A::Error, InitError = A::InitError>,
{
type Item = EitherService<A::Service, B::Service>;
type Error = A::InitError;
type Output = Result<EitherService<A::Service, B::Service>, A::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.left.is_none() {
self.left = Some(try_ready!(self.left_fut.poll()));
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
if this.left.is_none() {
*this.left = Some(ready!(this.left_fut.poll(cx))?);
}
if self.right.is_none() {
self.right = Some(try_ready!(self.right_fut.poll()));
if this.right.is_none() {
*this.right = Some(ready!(this.right_fut.poll(cx))?);
}
if self.left.is_some() && self.right.is_some() {
Ok(Async::Ready(EitherService {
left: self.left.take().unwrap(),
right: self.right.take().unwrap(),
if this.left.is_some() && this.right.is_some() {
Poll::Ready(Ok(EitherService {
left: this.left.take().unwrap(),
right: this.right.take().unwrap(),
}))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}

View File

@ -1,15 +1,19 @@
//! Framed dispatcher service and related utilities
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, mem};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service};
use futures::task::AtomicTask;
use futures::unsync::mpsc;
use futures::{Async, Future, Poll, Sink, Stream};
use futures::future::{ready, FutureExt};
use futures::{Future, Sink, Stream};
use log::debug;
use pin_project::pin_project;
use crate::cell::Cell;
use crate::mpsc;
use crate::task::LocalWaker;
type Request<U> = <U as Decoder>::Item;
type Response<U> = <U as Encoder>::Item;
@ -68,22 +72,26 @@ pub enum FramedMessage<T> {
Close,
}
type Rx<U> = Option<mpsc::Receiver<FramedMessage<<U as Encoder>::Item>>>;
type Inner<S: Service, U> = Cell<FramedTransportInner<<U as Encoder>::Item, S::Error>>;
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
#[pin_project]
pub struct FramedTransport<S, T, U>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Encoder + Decoder,
T: AsyncRead + AsyncWrite + Unpin,
U: Encoder + Decoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
service: S,
state: TransportState<S, U>,
framed: Framed<T, U>,
rx: Option<mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>>,
rx: Option<mpsc::Receiver<FramedMessage<<U as Encoder>::Item>>>,
inner: Cell<FramedTransportInner<<U as Encoder>::Item, S::Error>>,
}
@ -97,7 +105,7 @@ enum TransportState<S: Service, U: Encoder + Decoder> {
struct FramedTransportInner<I, E> {
buf: VecDeque<Result<I, E>>,
task: AtomicTask,
task: LocalWaker,
}
impl<S, T, U> FramedTransport<S, T, U>
@ -105,130 +113,8 @@ where
S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
fn poll_read(&mut self) -> bool {
loop {
match self.service.poll_ready() {
Ok(Async::Ready(_)) => {
let item = match self.framed.poll() {
Ok(Async::Ready(Some(el))) => el,
Err(err) => {
self.state =
TransportState::FramedError(FramedTransportError::Decoder(err));
return true;
}
Ok(Async::NotReady) => return false,
Ok(Async::Ready(None)) => {
self.state = TransportState::Stopping;
return true;
}
};
let mut cell = self.inner.clone();
tokio_current_thread::spawn(self.service.call(item).then(move |item| {
let inner = cell.get_mut();
inner.buf.push_back(item);
inner.task.notify();
Ok(())
}));
}
Ok(Async::NotReady) => return false,
Err(err) => {
self.state = TransportState::Error(FramedTransportError::Service(err));
return true;
}
}
}
}
/// write to framed object
fn poll_write(&mut self) -> bool {
let inner = self.inner.get_mut();
let mut rx_done = self.rx.is_none();
let mut buf_empty = inner.buf.is_empty();
loop {
while !self.framed.is_write_buf_full() {
if !buf_empty {
match inner.buf.pop_front().unwrap() {
Ok(msg) => {
if let Err(err) = self.framed.force_send(msg) {
self.state = TransportState::FramedError(
FramedTransportError::Encoder(err),
);
return true;
}
buf_empty = inner.buf.is_empty();
}
Err(err) => {
self.state =
TransportState::Error(FramedTransportError::Service(err));
return true;
}
}
}
if !rx_done && self.rx.is_some() {
match self.rx.as_mut().unwrap().poll() {
Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => {
if let Err(err) = self.framed.force_send(msg) {
self.state = TransportState::FramedError(
FramedTransportError::Encoder(err),
);
return true;
}
}
Ok(Async::Ready(Some(FramedMessage::Close))) => {
self.state = TransportState::FlushAndStop;
return true;
}
Ok(Async::Ready(None)) => {
rx_done = true;
let _ = self.rx.take();
}
Ok(Async::NotReady) => rx_done = true,
Err(_e) => {
rx_done = true;
let _ = self.rx.take();
}
}
}
if rx_done && buf_empty {
break;
}
}
if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() {
Ok(Async::NotReady) => break,
Err(err) => {
debug!("Error sending data: {:?}", err);
self.state =
TransportState::FramedError(FramedTransportError::Encoder(err));
return true;
}
Ok(Async::Ready(_)) => (),
}
} else {
break;
}
}
false
}
}
impl<S, T, U> FramedTransport<S, T, U>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
@ -240,7 +126,7 @@ where
state: TransportState::Processing,
inner: Cell::new(FramedTransportInner {
buf: VecDeque::new(),
task: AtomicTask::new(),
task: LocalWaker::new(),
}),
}
}
@ -248,7 +134,7 @@ where
/// Get Sender
pub fn set_receiver(
mut self,
rx: mpsc::UnboundedReceiver<FramedMessage<<U as Encoder>::Item>>,
rx: mpsc::Receiver<FramedMessage<<U as Encoder>::Item>>,
) -> Self {
self.rx = Some(rx);
self
@ -283,51 +169,216 @@ where
S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
type Item = ();
type Error = FramedTransportError<S::Error, U>;
type Output = Result<(), FramedTransportError<S::Error, U>>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.get_ref().task.register();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.inner.get_ref().task.register(cx.waker());
match mem::replace(&mut self.state, TransportState::Processing) {
TransportState::Processing => {
if self.poll_read() || self.poll_write() {
self.poll()
} else {
Ok(Async::NotReady)
}
let this = self.project();
poll(
cx,
this.service,
this.state,
this.framed,
this.rx,
this.inner,
)
}
}
fn poll<S, T, U>(
cx: &mut Context,
srv: &mut S,
state: &mut TransportState<S, U>,
framed: &mut Framed<T, U>,
rx: &mut Rx<U>,
inner: &mut Inner<S, U>,
) -> Poll<Result<(), FramedTransportError<S::Error, U>>>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
match mem::replace(state, TransportState::Processing) {
TransportState::Processing => {
if poll_read(cx, srv, state, framed, inner)
|| poll_write(cx, state, framed, rx, inner)
{
poll(cx, srv, state, framed, rx, inner)
} else {
Poll::Pending
}
TransportState::Error(err) => {
if self.framed.is_write_buf_empty()
|| (self.poll_write() || self.framed.is_write_buf_empty())
{
Err(err)
} else {
self.state = TransportState::Error(err);
Ok(Async::NotReady)
}
}
TransportState::Error(err) => {
let is_empty = framed.is_write_buf_empty();
if is_empty || (poll_write(cx, state, framed, rx, inner) || is_empty) {
Poll::Ready(Err(err))
} else {
*state = TransportState::Error(err);
Poll::Pending
}
TransportState::FlushAndStop => {
if !self.framed.is_write_buf_empty() {
match self.framed.poll_complete() {
Err(err) => {
debug!("Error sending data: {:?}", err);
Ok(Async::Ready(()))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => Ok(Async::Ready(())),
}
TransportState::FlushAndStop => {
if !framed.is_write_buf_empty() {
match Pin::new(framed).poll_flush(cx) {
Poll::Ready(Err(err)) => {
debug!("Error sending data: {:?}", err);
Poll::Ready(Ok(()))
}
} else {
Ok(Async::Ready(()))
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
}
} else {
Poll::Ready(Ok(()))
}
}
TransportState::FramedError(err) => Poll::Ready(Err(err)),
TransportState::Stopping => Poll::Ready(Ok(())),
}
}
fn poll_read<S, T, U>(
cx: &mut Context,
srv: &mut S,
state: &mut TransportState<S, U>,
framed: &mut Framed<T, U>,
inner: &mut Inner<S, U>,
) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
loop {
match srv.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
let item = match framed.poll_next_item(cx) {
Poll::Ready(Some(Ok(el))) => el,
Poll::Ready(Some(Err(err))) => {
*state =
TransportState::FramedError(FramedTransportError::Decoder(err));
return true;
}
Poll::Pending => return false,
Poll::Ready(None) => {
*state = TransportState::Stopping;
return true;
}
};
let mut cell = inner.clone();
let fut = srv.call(item).then(move |item| {
let inner = cell.get_mut();
inner.buf.push_back(item);
inner.task.wake();
ready(())
});
tokio_executor::current_thread::spawn(fut);
}
Poll::Pending => return false,
Poll::Ready(Err(err)) => {
*state = TransportState::Error(FramedTransportError::Service(err));
return true;
}
TransportState::FramedError(err) => Err(err),
TransportState::Stopping => Ok(Async::Ready(())),
}
}
}
/// write to framed object
fn poll_write<S, T, U>(
cx: &mut Context,
state: &mut TransportState<S, U>,
framed: &mut Framed<T, U>,
rx: &mut Rx<U>,
inner: &mut Inner<S, U>,
) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite + Unpin,
U: Decoder + Encoder + Unpin,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
{
// let this = self.project();
let inner = inner.get_mut();
let mut rx_done = rx.is_none();
let mut buf_empty = inner.buf.is_empty();
loop {
while !framed.is_write_buf_full() {
if !buf_empty {
match inner.buf.pop_front().unwrap() {
Ok(msg) => {
if let Err(err) = framed.force_send(msg) {
*state =
TransportState::FramedError(FramedTransportError::Encoder(err));
return true;
}
buf_empty = inner.buf.is_empty();
}
Err(err) => {
*state = TransportState::Error(FramedTransportError::Service(err));
return true;
}
}
}
if !rx_done && rx.is_some() {
match Pin::new(rx.as_mut().unwrap()).poll_next(cx) {
Poll::Ready(Some(FramedMessage::Message(msg))) => {
if let Err(err) = framed.force_send(msg) {
*state =
TransportState::FramedError(FramedTransportError::Encoder(err));
return true;
}
}
Poll::Ready(Some(FramedMessage::Close)) => {
*state = TransportState::FlushAndStop;
return true;
}
Poll::Ready(None) => {
rx_done = true;
let _ = rx.take();
}
Poll::Pending => rx_done = true,
}
}
if rx_done && buf_empty {
break;
}
}
if !framed.is_write_buf_empty() {
// match this.framed.poll_flush(cx) {
// Poll::Pending => break,
// Poll::Ready(Err(err)) => {
// debug!("Error sending data: {:?}", err);
// self.state =
// TransportState::FramedError(FramedTransportError::Encoder(err));
// return true;
// }
// Poll::Ready(Ok(_)) => (),
// }
} else {
break;
}
}
false
}

View File

@ -1,8 +1,11 @@
use std::convert::Infallible;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{IntoService, Service, Transform};
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use futures::future::{ok, Ready};
use pin_project::pin_project;
use super::counter::{Counter, CounterGuard};
@ -32,7 +35,7 @@ impl<S: Service> Transform<S> for InFlight {
type Error = S::Error;
type InitError = Infallible;
type Transform = InFlightService<S>;
type Future = FutureResult<Self::Transform, Self::InitError>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(InFlightService::new(self.max_inflight, service))
@ -68,14 +71,14 @@ where
type Error = T::Error;
type Future = InFlightServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if let Async::NotReady = self.service.poll_ready()? {
Ok(Async::NotReady)
} else if !self.count.available() {
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
if let Poll::Pending = self.service.poll_ready(cx)? {
Poll::Pending
} else if !self.count.available(cx) {
log::trace!("InFlight limit exceeded");
Ok(Async::NotReady)
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
}
}
@ -87,31 +90,31 @@ where
}
}
#[pin_project]
#[doc(hidden)]
pub struct InFlightServiceResponse<T: Service> {
#[pin]
fut: T::Future,
_guard: CounterGuard,
}
impl<T: Service> Future for InFlightServiceResponse<T> {
type Item = T::Response;
type Error = T::Error;
type Output = Result<T::Response, T::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll()
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.project().fut.poll(cx)
}
}
#[cfg(test)]
mod tests {
use futures::future::lazy;
use futures::{Async, Poll};
use std::task::{Context, Poll};
use std::time::Duration;
use super::*;
use actix_service::blank::{Blank, BlankNewService};
use actix_service::{NewService, Service, ServiceExt};
use actix_service::{apply, factory_fn, Service, ServiceFactory};
use futures::future::{lazy, ok, FutureExt, LocalBoxFuture};
struct SleepService(Duration);
@ -119,57 +122,49 @@ mod tests {
type Request = ();
type Response = ();
type Error = ();
type Future = Box<dyn Future<Item = (), Error = ()>>;
type Future = LocalBoxFuture<'static, Result<(), ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
Box::new(tokio_timer::sleep(self.0).map_err(|_| ()))
tokio_timer::delay_for(self.0)
.then(|_| ok::<_, ()>(()))
.boxed_local()
}
}
#[test]
fn test_transform() {
let wait_time = Duration::from_millis(50);
let _ = actix_rt::System::new("test").block_on(lazy(|| {
let mut srv =
Blank::new().and_then(InFlightService::new(1, SleepService(wait_time)));
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
let _ = actix_rt::System::new("test").block_on(async {
let mut srv = InFlightService::new(1, SleepService(wait_time));
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
let mut res = srv.call(());
let _ = res.poll();
assert_eq!(srv.poll_ready(), Ok(Async::NotReady));
let res = srv.call(());
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
drop(res);
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
Ok::<_, ()>(())
}));
let _ = res.await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
});
}
#[test]
fn test_newtransform() {
let wait_time = Duration::from_millis(50);
let _ = actix_rt::System::new("test").block_on(lazy(|| {
let srv =
BlankNewService::new().apply(InFlight::new(1), || Ok(SleepService(wait_time)));
if let Async::Ready(mut srv) = srv.new_service(&()).poll().unwrap() {
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
actix_rt::System::new("test").block_on(async {
let srv = apply(InFlight::new(1), factory_fn(|| ok(SleepService(wait_time))));
let mut res = srv.call(());
let _ = res.poll();
assert_eq!(srv.poll_ready(), Ok(Async::NotReady));
let mut srv = srv.new_service(&()).await.unwrap();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
drop(res);
assert_eq!(srv.poll_ready(), Ok(Async::Ready(())));
} else {
panic!()
}
let res = srv.call(());
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending);
Ok::<_, ()>(())
}));
let _ = res.await;
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
});
}
}

View File

@ -1,11 +1,13 @@
use std::convert::Infallible;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use actix_service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use tokio_timer::Delay;
use actix_service::{Service, ServiceFactory};
use futures::future::{ok, Ready};
use tokio_timer::{delay, Delay};
use super::time::{LowResTime, LowResTimeService};
@ -44,7 +46,7 @@ where
}
}
impl<R, E, F> NewService for KeepAlive<R, E, F>
impl<R, E, F> ServiceFactory for KeepAlive<R, E, F>
where
F: Fn() -> E + Clone,
{
@ -54,7 +56,7 @@ where
type InitError = Infallible;
type Config = ();
type Service = KeepAliveService<R, E, F>;
type Future = FutureResult<Self::Service, Self::InitError>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &()) -> Self::Future {
ok(KeepAliveService::new(
@ -85,7 +87,7 @@ where
ka,
time,
expire,
delay: Delay::new(expire),
delay: delay(expire),
_t: PhantomData,
}
}
@ -98,22 +100,21 @@ where
type Request = R;
type Response = R;
type Error = E;
type Future = FutureResult<R, E>;
type Future = Ready<Result<R, E>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
match self.delay.poll() {
Ok(Async::Ready(_)) => {
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
match Pin::new(&mut self.delay).poll(cx) {
Poll::Ready(_) => {
let now = self.time.now();
if self.expire <= now {
Err((self.f)())
Poll::Ready(Err((self.f)()))
} else {
self.delay.reset(self.expire);
let _ = self.delay.poll();
Ok(Async::Ready(()))
let _ = Pin::new(&mut self.delay).poll(cx);
Poll::Ready(Ok(()))
}
}
Ok(Async::NotReady) => Ok(Async::Ready(())),
Err(_e) => panic!(),
Poll::Pending => Poll::Ready(Ok(())),
}
}

View File

@ -6,7 +6,9 @@ pub mod either;
pub mod framed;
pub mod inflight;
pub mod keepalive;
pub mod mpsc;
pub mod oneshot;
pub mod order;
pub mod stream;
pub mod task;
pub mod time;
pub mod timeout;

203
actix-utils/src/mpsc.rs Normal file
View File

@ -0,0 +1,203 @@
//! A multi-producer, single-consumer, futures-aware, FIFO queue with back
//! pressure, for use communicating between tasks on the same thread.
//!
//! These queues are the same as those in `futures::sync`, except they're not
//! intended to be sent across threads.
use std::any::Any;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::error::Error;
use std::pin::Pin;
use std::rc::{Rc, Weak};
use std::task::{Context, Poll};
use std::{fmt, mem};
use futures::{Sink, Stream};
use crate::task::LocalWaker;
/// Creates a unbounded in-memory channel with buffered storage.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let shared = Rc::new(RefCell::new(Shared {
buffer: VecDeque::new(),
blocked_recv: LocalWaker::new(),
}));
let sender = Sender {
shared: Rc::downgrade(&shared),
};
let receiver = Receiver {
state: State::Open(shared),
};
(sender, receiver)
}
#[derive(Debug)]
struct Shared<T> {
buffer: VecDeque<T>,
blocked_recv: LocalWaker,
}
/// The transmission end of a channel.
///
/// This is created by the `channel` function.
#[derive(Debug)]
pub struct Sender<T> {
shared: Weak<RefCell<Shared<T>>>,
}
impl<T> Sender<T> {
/// Sends the provided message along this channel.
pub fn send(&self, item: T) -> Result<(), SendError<T>> {
let shared = match self.shared.upgrade() {
Some(shared) => shared,
None => return Err(SendError(item)), // receiver was dropped
};
let mut shared = shared.borrow_mut();
shared.buffer.push_back(item);
shared.blocked_recv.wake();
Ok(())
}
}
impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
shared: self.shared.clone(),
}
}
}
impl<T> Sink<T> for Sender<T> {
type Error = SendError<T>;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), SendError<T>> {
self.send(item)
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), SendError<T>>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let shared = match self.shared.upgrade() {
Some(shared) => shared,
None => return,
};
// The number of existing `Weak` indicates if we are possibly the last
// `Sender`. If we are the last, we possibly must notify a blocked
// `Receiver`. `self.shared` is always one of the `Weak` to this shared
// data. Therefore the smallest possible Rc::weak_count(&shared) is 1.
if Rc::weak_count(&shared) == 1 {
// Wake up receiver as its stream has ended
shared.borrow_mut().blocked_recv.wake();
}
}
}
/// The receiving end of a channel which implements the `Stream` trait.
///
/// This is created by the `channel` function.
#[derive(Debug)]
pub struct Receiver<T> {
state: State<T>,
}
impl<T> Unpin for Receiver<T> {}
/// Possible states of a receiver. We're either Open (can receive more messages)
/// or we're closed with a list of messages we have left to receive.
#[derive(Debug)]
enum State<T> {
Open(Rc<RefCell<Shared<T>>>),
Closed(VecDeque<T>),
}
impl<T> Receiver<T> {
/// Closes the receiving half
///
/// This prevents any further messages from being sent on the channel while
/// still enabling the receiver to drain messages that are buffered.
pub fn close(&mut self) {
let items = match self.state {
State::Open(ref state) => {
let mut state = state.borrow_mut();
let items = mem::replace(&mut state.buffer, VecDeque::new());
items
}
State::Closed(_) => return,
};
self.state = State::Closed(items);
}
}
impl<T> Stream for Receiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let me = match self.state {
State::Open(ref mut me) => me,
State::Closed(ref mut items) => return Poll::Ready(items.pop_front()),
};
if let Some(shared) = Rc::get_mut(me) {
// All senders have been dropped, so drain the buffer and end the
// stream.
return Poll::Ready(shared.borrow_mut().buffer.pop_front());
}
let mut shared = me.borrow_mut();
if let Some(msg) = shared.buffer.pop_front() {
Poll::Ready(Some(msg))
} else {
shared.blocked_recv.register(cx.waker());
Poll::Pending
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.close();
}
}
/// Error type for sending, used when the receiving end of a channel is
/// dropped
pub struct SendError<T>(T);
impl<T> fmt::Debug for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("SendError").field(&"...").finish()
}
}
impl<T> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "send failed because receiver is gone")
}
}
impl<T: Any> Error for SendError<T> {
fn description(&self) -> &str {
"send failed because receiver is gone"
}
}
impl<T> SendError<T> {
/// Returns the message that was attempted to be sent but failed.
pub fn into_inner(self) -> T {
self.0
}
}

209
actix-utils/src/oneshot.rs Normal file
View File

@ -0,0 +1,209 @@
//! A one-shot, futures-aware channel
//!
//! This channel is similar to that in `sync::oneshot` but cannot be sent across
//! threads.
use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::{Rc, Weak};
use std::task::{Context, Poll};
pub use futures::channel::oneshot::Canceled;
use crate::task::LocalWaker;
/// Creates a new futures-aware, one-shot channel.
///
/// This function is the same as `sync::oneshot::channel` except that the
/// returned values cannot be sent across threads.
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Rc::new(RefCell::new(Inner {
value: None,
tx_task: LocalWaker::new(),
rx_task: LocalWaker::new(),
}));
let tx = Sender {
inner: Rc::downgrade(&inner),
};
let rx = Receiver {
state: State::Open(inner),
};
(tx, rx)
}
/// Represents the completion half of a oneshot through which the result of a
/// computation is signaled.
///
/// This is created by the `unsync::oneshot::channel` function and is equivalent
/// in functionality to `sync::oneshot::Sender` except that it cannot be sent
/// across threads.
#[derive(Debug)]
pub struct Sender<T> {
inner: Weak<RefCell<Inner<T>>>,
}
/// A future representing the completion of a computation happening elsewhere in
/// memory.
///
/// This is created by the `unsync::oneshot::channel` function and is equivalent
/// in functionality to `sync::oneshot::Receiver` except that it cannot be sent
/// across threads.
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Receiver<T> {
state: State<T>,
}
// The channels do not ever project Pin to the inner T
impl<T> Unpin for Receiver<T> {}
impl<T> Unpin for Sender<T> {}
#[derive(Debug)]
enum State<T> {
Open(Rc<RefCell<Inner<T>>>),
Closed(Option<T>),
}
#[derive(Debug)]
struct Inner<T> {
value: Option<T>,
tx_task: LocalWaker,
rx_task: LocalWaker,
}
impl<T> Sender<T> {
/// Completes this oneshot with a successful result.
///
/// This function will consume `self` and indicate to the other end, the
/// `Receiver`, that the error provided is the result of the computation this
/// represents.
///
/// If the value is successfully enqueued for the remote end to receive,
/// then `Ok(())` is returned. If the receiving end was deallocated before
/// this function was called, however, then `Err` is returned with the value
/// provided.
pub fn send(self, val: T) -> Result<(), T> {
if let Some(inner) = self.inner.upgrade() {
inner.borrow_mut().value = Some(val);
Ok(())
} else {
Err(val)
}
}
/// Polls this `Sender` half to detect whether the `Receiver` this has
/// paired with has gone away.
///
/// This function can be used to learn about when the `Receiver` (consumer)
/// half has gone away and nothing will be able to receive a message sent
/// from `complete`.
///
/// Like `Future::poll`, this function will panic if it's not called from
/// within the context of a task. In other words, this should only ever be
/// called from inside another future.
///
/// If `Ready` is returned then it means that the `Receiver` has disappeared
/// and the result this `Sender` would otherwise produce should no longer
/// be produced.
///
/// If `NotReady` is returned then the `Receiver` is still alive and may be
/// able to receive a message if sent. The current task, however, is
/// scheduled to receive a notification if the corresponding `Receiver` goes
/// away.
pub fn poll_canceled(&mut self, cx: &mut Context) -> Poll<()> {
match self.inner.upgrade() {
Some(inner) => {
inner.borrow_mut().tx_task.register(cx.waker());
Poll::Pending
}
None => Poll::Ready(()),
}
}
/// Tests to see whether this `Sender`'s corresponding `Receiver`
/// has gone away.
///
/// This function can be used to learn about when the `Receiver` (consumer)
/// half has gone away and nothing will be able to receive a message sent
/// from `send`.
///
/// Note that this function is intended to *not* be used in the context of a
/// future. If you're implementing a future you probably want to call the
/// `poll_cancel` function which will block the current task if the
/// cancellation hasn't happened yet. This can be useful when working on a
/// non-futures related thread, though, which would otherwise panic if
/// `poll_cancel` were called.
pub fn is_canceled(&self) -> bool {
!self.inner.upgrade().is_some()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => return,
};
inner.borrow().rx_task.wake();
}
}
impl<T> Receiver<T> {
/// Gracefully close this receiver, preventing sending any future messages.
///
/// Any `send` operation which happens after this method returns is
/// guaranteed to fail. Once this method is called the normal `poll` method
/// can be used to determine whether a message was actually sent or not. If
/// `Canceled` is returned from `poll` then no message was sent.
pub fn close(&mut self) {
match self.state {
State::Open(ref inner) => {
let mut inner = inner.borrow_mut();
inner.tx_task.wake();
let value = inner.value.take();
drop(inner);
self.state = State::Closed(value);
}
State::Closed(_) => return,
};
}
}
impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let inner = match this.state {
State::Open(ref mut inner) => inner,
State::Closed(ref mut item) => match item.take() {
Some(item) => return Poll::Ready(Ok(item.into())),
None => return Poll::Ready(Err(Canceled)),
},
};
// If we've got a value, then skip the logic below as we're done.
if let Some(val) = inner.borrow_mut().value.take() {
return Poll::Ready(Ok(val));
}
// If we can get mutable access, then the sender has gone away. We
// didn't see a value above, so we're canceled. Otherwise we park
// our task and wait for a value to come in.
if Rc::get_mut(inner).is_some() {
Poll::Ready(Err(Canceled))
} else {
inner.borrow().rx_task.register(cx.waker());
Poll::Pending
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.close();
}
}

View File

@ -1,14 +1,17 @@
use std::collections::VecDeque;
use std::convert::Infallible;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use actix_service::{IntoService, Service, Transform};
use futures::future::{ok, FutureResult};
use futures::task::AtomicTask;
use futures::unsync::oneshot;
use futures::{Async, Future, Poll};
use futures::future::{ok, ready, FutureExt, Ready};
use crate::oneshot;
use crate::task::LocalWaker;
struct Record<I, E> {
rx: oneshot::Receiver<Result<I, E>>,
@ -93,7 +96,7 @@ where
type Error = InOrderError<S::Error>;
type InitError = Infallible;
type Transform = InOrderService<S>;
type Future = FutureResult<Self::Transform, Self::InitError>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(InOrderService::new(service))
@ -102,7 +105,7 @@ where
pub struct InOrderService<S: Service> {
service: S,
task: Rc<AtomicTask>,
task: Rc<LocalWaker>,
acks: VecDeque<Record<S::Response, S::Error>>,
}
@ -120,7 +123,7 @@ where
Self {
service: service.into_service(),
acks: VecDeque::new(),
task: Rc::new(AtomicTask::new()),
task: Rc::new(LocalWaker::new()),
}
}
}
@ -137,28 +140,30 @@ where
type Error = InOrderError<S::Error>;
type Future = InOrderServiceResponse<S>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
// poll_ready could be called from different task
self.task.register();
self.task.register(cx.waker());
// check acks
while !self.acks.is_empty() {
let rec = self.acks.front_mut().unwrap();
match rec.rx.poll() {
Ok(Async::Ready(res)) => {
match Pin::new(&mut rec.rx).poll(cx) {
Poll::Ready(Ok(res)) => {
let rec = self.acks.pop_front().unwrap();
let _ = rec.tx.send(res);
}
Ok(Async::NotReady) => break,
Err(oneshot::Canceled) => return Err(InOrderError::Disconnected),
Poll::Pending => break,
Poll::Ready(Err(oneshot::Canceled)) => {
return Poll::Ready(Err(InOrderError::Disconnected))
}
}
}
// check nested service
if let Async::NotReady = self.service.poll_ready().map_err(InOrderError::Service)? {
Ok(Async::NotReady)
if let Poll::Pending = self.service.poll_ready(cx).map_err(InOrderError::Service)? {
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
}
}
@ -168,10 +173,10 @@ where
self.acks.push_back(Record { rx: rx1, tx: tx2 });
let task = self.task.clone();
tokio_current_thread::spawn(self.service.call(request).then(move |res| {
task.notify();
tokio_executor::current_thread::spawn(self.service.call(request).then(move |res| {
task.wake();
let _ = tx1.send(res);
Ok(())
ready(())
}));
InOrderServiceResponse { rx: rx2 }
@ -184,29 +189,29 @@ pub struct InOrderServiceResponse<S: Service> {
}
impl<S: Service> Future for InOrderServiceResponse<S> {
type Item = S::Response;
type Error = InOrderError<S::Error>;
type Output = Result<S::Response, InOrderError<S::Error>>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.rx.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(Ok(res))) => Ok(Async::Ready(res)),
Ok(Async::Ready(Err(e))) => Err(e.into()),
Err(oneshot::Canceled) => Err(InOrderError::Disconnected),
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match Pin::new(&mut self.rx).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(Ok(res))) => Poll::Ready(Ok(res)),
Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e.into())),
Poll::Ready(Err(_)) => Poll::Ready(Err(InOrderError::Disconnected)),
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{lazy, Future};
use futures::{stream::futures_unordered, sync::oneshot, Async, Poll, Stream};
use std::task::{Context, Poll};
use std::time::Duration;
use super::*;
use actix_service::blank::Blank;
use actix_service::{Service, ServiceExt};
use actix_service::Service;
use futures::channel::oneshot;
use futures::future::{lazy, LocalBoxFuture};
use futures::stream::{futures_unordered::FuturesUnordered, StreamExt};
struct Srv;
@ -214,28 +219,14 @@ mod tests {
type Request = oneshot::Receiver<usize>;
type Response = usize;
type Error = ();
type Future = Box<dyn Future<Item = usize, Error = ()>>;
type Future = LocalBoxFuture<'static, Result<usize, ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: oneshot::Receiver<usize>) -> Self::Future {
Box::new(req.map_err(|_| ()))
}
}
struct SrvPoll<S: Service> {
s: S,
}
impl<S: Service> Future for SrvPoll<S> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
let _ = self.s.poll_ready();
Ok(Async::NotReady)
req.map(|res| res.map_err(|_| ())).boxed_local()
}
}
@ -251,23 +242,26 @@ mod tests {
let rx2 = rx2;
let rx3 = rx3;
let tx_stop = tx_stop;
let _ = actix_rt::System::new("test").block_on(lazy(move || {
let mut srv = Blank::new().and_then(InOrderService::new(Srv));
let _ = actix_rt::System::new("test").block_on(async {
let mut srv = InOrderService::new(Srv);
let res1 = srv.call(rx1);
let res2 = srv.call(rx2);
let res3 = srv.call(rx3);
tokio_current_thread::spawn(SrvPoll { s: srv });
futures_unordered(vec![res1, res2, res3])
.collect()
.and_then(move |res: Vec<_>| {
assert_eq!(res, vec![1, 2, 3]);
let _ = tx_stop.send(());
actix_rt::System::current().stop();
Ok(())
})
}));
let _ = lazy(|cx| srv.poll_ready(cx)).await;
// dispatcher do this
tokio_timer::delay_for(Duration::from_millis(100)).await;
let _ = lazy(|cx| srv.poll_ready(cx)).await;
assert_eq!(res1.await.unwrap(), 1);
assert_eq!(res2.await.unwrap(), 2);
assert_eq!(res3.await.unwrap(), 3);
let _ = tx_stop.send(());
actix_rt::System::current().stop();
});
});
let _ = tx3.send(3);
@ -275,7 +269,7 @@ mod tests {
let _ = tx2.send(2);
let _ = tx1.send(1);
let _ = rx_stop.wait();
let _ = actix_rt::System::new("test").block_on(rx_stop);
let _ = h.join();
}
}

View File

@ -1,151 +0,0 @@
use std::marker::PhantomData;
use std::rc::Rc;
use actix_service::{IntoService, NewService, Service};
use futures::unsync::mpsc;
use futures::{Async, Future, Poll, Stream};
type Request<T> = Result<<T as IntoStream>::Item, <T as IntoStream>::Error>;
pub trait IntoStream {
type Item;
type Error;
type Stream: Stream<Item = Self::Item, Error = Self::Error>;
fn into_stream(self) -> Self::Stream;
}
impl<T> IntoStream for T
where
T: Stream,
{
type Item = T::Item;
type Error = T::Error;
type Stream = T;
fn into_stream(self) -> Self::Stream {
self
}
}
pub struct StreamService<S, T: NewService, E> {
factory: Rc<T>,
config: T::Config,
_t: PhantomData<(S, E)>,
}
impl<S, T, E> Service for StreamService<S, T, E>
where
S: IntoStream + 'static,
T: NewService<Request = Request<S>, Response = (), Error = E, InitError = E>,
T::Future: 'static,
T::Service: 'static,
<T::Service as Service>::Future: 'static,
{
type Request = S;
type Response = ();
type Error = E;
type Future = Box<dyn Future<Item = (), Error = E>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
}
fn call(&mut self, req: S) -> Self::Future {
Box::new(
self.factory
.new_service(&self.config)
.and_then(move |srv| StreamDispatcher::new(req, srv)),
)
}
}
pub struct StreamDispatcher<S, T>
where
S: IntoStream + 'static,
T: Service<Request = Request<S>, Response = ()> + 'static,
T::Future: 'static,
{
stream: S,
service: T,
err_rx: mpsc::UnboundedReceiver<T::Error>,
err_tx: mpsc::UnboundedSender<T::Error>,
}
impl<S, T> StreamDispatcher<S, T>
where
S: Stream,
T: Service<Request = Request<S>, Response = ()>,
T::Future: 'static,
{
pub fn new<F1, F2>(stream: F1, service: F2) -> Self
where
F1: IntoStream<Stream = S, Item = S::Item, Error = S::Error>,
F2: IntoService<T>,
{
let (err_tx, err_rx) = mpsc::unbounded();
StreamDispatcher {
err_rx,
err_tx,
stream: stream.into_stream(),
service: service.into_service(),
}
}
}
impl<S, T> Future for StreamDispatcher<S, T>
where
S: Stream,
T: Service<Request = Request<S>, Response = ()>,
T::Future: 'static,
{
type Item = ();
type Error = T::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Ok(Async::Ready(Some(e))) = self.err_rx.poll() {
return Err(e);
}
loop {
match self.service.poll_ready()? {
Async::Ready(_) => match self.stream.poll() {
Ok(Async::Ready(Some(item))) => {
tokio_current_thread::spawn(StreamDispatcherService {
fut: self.service.call(Ok(item)),
stop: self.err_tx.clone(),
})
}
Err(err) => tokio_current_thread::spawn(StreamDispatcherService {
fut: self.service.call(Err(err)),
stop: self.err_tx.clone(),
}),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),
},
Async::NotReady => return Ok(Async::NotReady),
}
}
}
}
struct StreamDispatcherService<F: Future> {
fut: F,
stop: mpsc::UnboundedSender<F::Error>,
}
impl<F: Future> Future for StreamDispatcherService<F> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll() {
Ok(Async::Ready(_)) => Ok(Async::Ready(())),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
let _ = self.stop.unbounded_send(e);
Ok(Async::Ready(()))
}
}
}
}

69
actix-utils/src/task.rs Normal file
View File

@ -0,0 +1,69 @@
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::task::Waker;
use std::{fmt, rc};
/// A synchronization primitive for task wakeup.
///
/// Sometimes the task interested in a given event will change over time.
/// An `LocalWaker` can coordinate concurrent notifications with the consumer
/// potentially "updating" the underlying task to wake up. This is useful in
/// scenarios where a computation completes in another task and wants to
/// notify the consumer, but the consumer is in the process of being migrated to
/// a new logical task.
///
/// Consumers should call `register` before checking the result of a computation
/// and producers should call `wake` after producing the computation (this
/// differs from the usual `thread::park` pattern). It is also permitted for
/// `wake` to be called **before** `register`. This results in a no-op.
///
/// A single `AtomicWaker` may be reused for any number of calls to `register` or
/// `wake`.
pub struct LocalWaker {
waker: UnsafeCell<Option<Waker>>,
_t: PhantomData<rc::Rc<()>>,
}
impl LocalWaker {
/// Create an `LocalWaker`.
pub fn new() -> Self {
LocalWaker {
waker: UnsafeCell::new(None),
_t: PhantomData,
}
}
#[inline]
/// Registers the waker to be notified on calls to `wake`.
pub fn register(&self, waker: &Waker) {
unsafe {
let w = self.waker.get();
if (*w).is_none() {
*w = Some(waker.clone())
}
}
}
#[inline]
/// Calls `wake` on the last `Waker` passed to `register`.
///
/// If `register` has not been called yet, then this does nothing.
pub fn wake(&self) {
if let Some(waker) = self.take() {
waker.wake();
}
}
/// Returns the last `Waker` passed to `register`, so that the user can wake it.
///
/// If a waker has not been registered, this returns `None`.
pub fn take(&self) -> Option<Waker> {
unsafe { (*self.waker.get()).take() }
}
}
impl fmt::Debug for LocalWaker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LocalWaker")
}
}

View File

@ -1,10 +1,10 @@
use std::convert::Infallible;
use std::task::{Context, Poll};
use std::time::{self, Duration, Instant};
use actix_service::{NewService, Service};
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use tokio_timer::sleep;
use actix_service::{Service, ServiceFactory};
use futures::future::{ok, ready, FutureExt, Ready};
use tokio_timer::delay_for;
use super::cell::Cell;
@ -42,14 +42,14 @@ impl Default for LowResTime {
}
}
impl NewService for LowResTime {
impl ServiceFactory for LowResTime {
type Request = ();
type Response = Instant;
type Error = Infallible;
type InitError = Infallible;
type Config = ();
type Service = LowResTimeService;
type Future = FutureResult<Self::Service, Self::InitError>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &()) -> Self::Future {
ok(self.timer())
@ -79,12 +79,10 @@ impl LowResTimeService {
b.resolution
};
tokio_current_thread::spawn(sleep(interval).map_err(|_| panic!()).and_then(
move |_| {
inner.get_mut().current.take();
Ok(())
},
));
tokio_executor::current_thread::spawn(delay_for(interval).then(move |_| {
inner.get_mut().current.take();
ready(())
}));
now
}
}
@ -94,10 +92,10 @@ impl Service for LowResTimeService {
type Request = ();
type Response = Instant;
type Error = Infallible;
type Future = FutureResult<Self::Response, Self::Error>;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
@ -146,12 +144,10 @@ impl SystemTimeService {
b.resolution
};
tokio_current_thread::spawn(sleep(interval).map_err(|_| panic!()).and_then(
move |_| {
inner.get_mut().current.take();
Ok(())
},
));
tokio_executor::current_thread::spawn(delay_for(interval).then(move |_| {
inner.get_mut().current.take();
ready(())
}));
now
}
}
@ -160,7 +156,6 @@ impl SystemTimeService {
#[cfg(test)]
mod tests {
use super::*;
use futures::future;
use std::time::{Duration, SystemTime};
/// State Under Test: Two calls of `SystemTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
@ -170,13 +165,11 @@ mod tests {
fn system_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let _ = actix_rt::System::new("test").block_on(future::lazy(|| {
let _ = actix_rt::System::new("test").block_on(async {
let time_service = SystemTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now());
Ok::<(), ()>(())
}));
});
}
/// State Under Test: Two calls of `LowResTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`.
@ -186,13 +179,11 @@ mod tests {
fn lowres_time_service_time_does_not_immediately_change() {
let resolution = Duration::from_millis(50);
let _ = actix_rt::System::new("test").block_on(future::lazy(|| {
let _ = actix_rt::System::new("test").block_on(async {
let time_service = LowResTimeService::with(resolution);
assert_eq!(time_service.now(), time_service.now());
Ok::<(), ()>(())
}));
});
}
/// State Under Test: `SystemTimeService::now()` updates returned value every resolution period.
@ -204,7 +195,7 @@ mod tests {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(150);
let _ = actix_rt::System::new("test").block_on(future::lazy(|| {
actix_rt::System::new("test").block_on(async {
let time_service = SystemTimeService::with(resolution);
let first_time = time_service
@ -212,17 +203,15 @@ mod tests {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
sleep(wait_time).then(move |_| {
let second_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
delay_for(wait_time).await;
assert!(second_time - first_time >= wait_time);
let second_time = time_service
.now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
Ok::<(), ()>(())
})
}));
assert!(second_time - first_time >= wait_time);
});
}
/// State Under Test: `LowResTimeService::now()` updates returned value every resolution period.
@ -234,18 +223,15 @@ mod tests {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(150);
let _ = actix_rt::System::new("test").block_on(future::lazy(|| {
let _ = actix_rt::System::new("test").block_on(async {
let time_service = LowResTimeService::with(resolution);
let first_time = time_service.now();
sleep(wait_time).then(move |_| {
let second_time = time_service.now();
delay_for(wait_time).await;
assert!(second_time - first_time >= wait_time);
Ok::<(), ()>(())
})
}));
let second_time = time_service.now();
assert!(second_time - first_time >= wait_time);
});
}
}

View File

@ -2,19 +2,21 @@
//!
//! If the response does not complete within the specified timeout, the response
//! will be aborted.
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::time::Duration;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, time};
use actix_service::{IntoService, Service, Transform};
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use tokio_timer::{clock, Delay};
use futures::future::{ok, Ready};
use pin_project::pin_project;
use tokio_timer::{clock, delay, Delay};
/// Applies a timeout to requests.
#[derive(Debug)]
pub struct Timeout<E = ()> {
timeout: Duration,
timeout: time::Duration,
_t: PhantomData<E>,
}
@ -66,7 +68,7 @@ impl<E: PartialEq> PartialEq for TimeoutError<E> {
}
impl<E> Timeout<E> {
pub fn new(timeout: Duration) -> Self {
pub fn new(timeout: time::Duration) -> Self {
Timeout {
timeout,
_t: PhantomData,
@ -89,7 +91,7 @@ where
type Error = TimeoutError<S::Error>;
type InitError = E;
type Transform = TimeoutService<S>;
type Future = FutureResult<Self::Transform, Self::InitError>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(TimeoutService {
@ -103,14 +105,14 @@ where
#[derive(Debug, Clone)]
pub struct TimeoutService<S> {
service: S,
timeout: Duration,
timeout: time::Duration,
}
impl<S> TimeoutService<S>
where
S: Service,
{
pub fn new<U>(timeout: Duration, service: U) -> Self
pub fn new<U>(timeout: time::Duration, service: U) -> Self
where
U: IntoService<S>,
{
@ -130,21 +132,23 @@ where
type Error = TimeoutError<S::Error>;
type Future = TimeoutServiceResponse<S>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(TimeoutError::Service)
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx).map_err(TimeoutError::Service)
}
fn call(&mut self, request: S::Request) -> Self::Future {
TimeoutServiceResponse {
fut: self.service.call(request),
sleep: Delay::new(clock::now() + self.timeout),
sleep: delay(clock::now() + self.timeout),
}
}
}
/// `TimeoutService` response future
#[pin_project]
#[derive(Debug)]
pub struct TimeoutServiceResponse<T: Service> {
#[pin]
fut: T::Future,
sleep: Delay,
}
@ -153,36 +157,34 @@ impl<T> Future for TimeoutServiceResponse<T>
where
T: Service,
{
type Item = T::Response;
type Error = TimeoutError<T::Error>;
type Output = Result<T::Response, TimeoutError<T::Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// First, try polling the future
match self.fut.poll() {
Ok(Async::Ready(v)) => return Ok(Async::Ready(v)),
Ok(Async::NotReady) => {}
Err(e) => return Err(TimeoutError::Service(e)),
match this.fut.poll(cx) {
Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)),
Poll::Ready(Err(e)) => return Poll::Ready(Err(TimeoutError::Service(e))),
Poll::Pending => {}
}
// Now check the sleep
match self.sleep.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(_)) => Err(TimeoutError::Timeout),
Err(_) => Err(TimeoutError::Timeout),
match Pin::new(&mut this.sleep).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)),
}
}
}
#[cfg(test)]
mod tests {
use futures::future::lazy;
use futures::{Async, Poll};
use std::task::{Context, Poll};
use std::time::Duration;
use super::*;
use actix_service::blank::{Blank, BlankNewService};
use actix_service::{NewService, Service, ServiceExt};
use actix_service::{apply, factory_fn, Service, ServiceFactory};
use futures::future::{ok, FutureExt, LocalBoxFuture};
struct SleepService(Duration);
@ -190,14 +192,16 @@ mod tests {
type Request = ();
type Response = ();
type Error = ();
type Future = Box<dyn Future<Item = (), Error = ()>>;
type Future = LocalBoxFuture<'static, Result<(), ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
Box::new(tokio_timer::sleep(self.0).map_err(|_| ()))
tokio_timer::delay_for(self.0)
.then(|_| ok::<_, ()>(()))
.boxed_local()
}
}
@ -206,11 +210,10 @@ mod tests {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(50);
let res = actix_rt::System::new("test").block_on(lazy(|| {
let mut timeout = Blank::default()
.and_then(TimeoutService::new(resolution, SleepService(wait_time)));
timeout.call(())
}));
let res = actix_rt::System::new("test").block_on(async {
let mut timeout = TimeoutService::new(resolution, SleepService(wait_time));
timeout.call(()).await
});
assert_eq!(res, Ok(()));
}
@ -219,11 +222,10 @@ mod tests {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(150);
let res = actix_rt::System::new("test").block_on(lazy(|| {
let mut timeout = Blank::default()
.and_then(TimeoutService::new(resolution, SleepService(wait_time)));
timeout.call(())
}));
let res = actix_rt::System::new("test").block_on(async {
let mut timeout = TimeoutService::new(resolution, SleepService(wait_time));
timeout.call(()).await
});
assert_eq!(res, Err(TimeoutError::Timeout));
}
@ -232,15 +234,15 @@ mod tests {
let resolution = Duration::from_millis(100);
let wait_time = Duration::from_millis(150);
let res = actix_rt::System::new("test").block_on(lazy(|| {
let timeout = BlankNewService::<(), (), ()>::default()
.apply(Timeout::new(resolution), || Ok(SleepService(wait_time)));
if let Async::Ready(mut to) = timeout.new_service(&()).poll().unwrap() {
to.call(())
} else {
panic!()
}
}));
let res = actix_rt::System::new("test").block_on(async {
let timeout = apply(
Timeout::new(resolution),
factory_fn(|| ok::<_, ()>(SleepService(wait_time))),
);
let mut srv = timeout.new_service(&()).await.unwrap();
srv.call(()).await
});
assert_eq!(res, Err(TimeoutError::Timeout));
}
}