diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 39704e96..0346605e 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -28,8 +28,8 @@ pub struct Fuse(pub T, pub U); impl Framed where - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, { /// Provides a `Stream` and `Sink` interface for reading and writing to this /// `Io` object, using `Decode` and `Encode` to read and write the raw data. @@ -223,43 +223,65 @@ impl Framed { } } +impl Framed +where + T: AsyncRead + Unpin, + U: Decoder + Unpin, +{ + pub fn poll_next_item( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + Pin::new(&mut self.inner).poll_next(cx) + } +} + impl Stream for Framed where - T: AsyncRead, - U: Decoder, + T: AsyncRead + Unpin, + U: Decoder + Unpin, { type Item = Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx) } + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.as_mut().inner).poll_next(cx) } } impl Sink for Framed where - T: AsyncWrite, - U: Encoder, + T: AsyncWrite + Unpin, + U: Encoder + Unpin, U::Error: From, { type Error = U::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_ready(cx) } + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.as_mut().inner.get_mut()).poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { - unsafe { - self.map_unchecked_mut(|s| s.inner.get_mut()) - .start_send(item) - } + fn start_send( + mut self: Pin<&mut Self>, + item: ::Item, + ) -> Result<(), Self::Error> { + Pin::new(&mut self.as_mut().inner.get_mut()).start_send(item) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_flush(cx) } + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.as_mut().inner.get_mut()).poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_close(cx) } + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.as_mut().inner.get_mut()).poll_close(cx) } } diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 0fa5c903..85b626ae 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -39,12 +39,11 @@ actix-utils = "0.4.0" actix-rt = "0.2.5" derive_more = "0.15" either = "1.5.2" -futures = "0.1.25" +futures = "0.3.1" http = { version = "0.1.17", optional = true } log = "0.4" -tokio-tcp = "0.1.3" -tokio-current-thread = "0.1.5" -trust-dns-resolver = { version="0.11.0", default-features = false } +tokio-net = "=0.2.0-alpha.6" +trust-dns-resolver = { version="0.18.0-alpha.1", default-features = false } # openssl openssl = { version="0.10", optional = true } diff --git a/actix-ioframe/Cargo.toml b/actix-ioframe/Cargo.toml index 97e32152..09cbac6d 100644 --- a/actix-ioframe/Cargo.toml +++ b/actix-ioframe/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-ioframe" -version = "0.1.1" +version = "0.2.0" authors = ["Nikolay Kim "] description = "Actix framed service" keywords = ["network", "framework", "async", "futures"] @@ -22,8 +22,8 @@ actix-service = "0.4.1" actix-codec = "0.1.2" bytes = "0.4" either = "1.5.2" -futures = "0.1.25" -tokio-current-thread = "0.1.4" +futures = "0.3.1" +tokio-executor = "=0.2.0-alpha.6" log = "0.4" [dev-dependencies] diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 7147a642..22f4a4ab 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -4,7 +4,7 @@ use std::{net, thread, time}; use actix_codec::{BytesCodec, Framed}; use actix_server::{Io, Server, ServerConfig}; -use actix_service::{service_fn, service_fn2, service_fn_config, IntoService}; +use actix_service::{factory_fn_cfg, service_fn, service_fn2}; use bytes::Bytes; use futures::{future::ok, SinkExt}; use net2::TcpBuilder; @@ -28,9 +28,9 @@ fn test_bind() { let sys = actix_rt::System::new("test"); let srv = Server::build() .bind("test", addr, move || { - service_fn_config(move |cfg: &ServerConfig| { + factory_fn_cfg(move |cfg: &ServerConfig| { assert_eq!(cfg.local_addr(), addr); - ok::<_, ()>((|_| ok::<_, ()>(())).into_service()) + ok::<_, ()>(service_fn2(|_| ok::<_, ()>(()))) }) }) .unwrap() @@ -76,9 +76,9 @@ fn test_listen() { let lst = net::TcpListener::bind(addr).unwrap(); let srv = Server::build() .listen("test", lst, move || { - service_fn_config(move |cfg: &ServerConfig| { + factory_fn_cfg(move |cfg: &ServerConfig| { assert_eq!(cfg.local_addr(), addr); - ok::<_, ()>((|_| ok::<_, ()>(())).into_service()) + ok::<_, ()>(service_fn2(|_| ok::<_, ()>(()))) }) }) .unwrap() @@ -105,7 +105,7 @@ fn test_start() { let srv: Server = Server::build() .backlog(100) .bind("test", addr, move || { - service_fn_config(move |cfg: &ServerConfig| { + factory_fn_cfg(move |cfg: &ServerConfig| { assert_eq!(cfg.local_addr(), addr); let srv = service_fn2(|io: Io| { diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index ca5a7368..00df1178 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -24,7 +24,7 @@ path = "src/lib.rs" [dependencies] futures = "0.3.1" -pin-project = "0.4.0-alpha.11" +pin-project = "0.4.5" [dev-dependencies] tokio = "0.2.0-alpha.5" diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index f7da4407..42c9a5b5 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -31,10 +31,11 @@ where } /// Create `ServiceFactory` for function that can produce services -pub fn service_fn_factory( +pub fn factory_fn( f: F, ) -> impl ServiceFactory< Config = Cfg, + Service = S, Request = S::Request, Response = S::Response, Error = S::Error, @@ -50,10 +51,11 @@ where } /// Create `ServiceFactory` for function that can produce services with configuration -pub fn service_fn_config( +pub fn factory_fn_cfg( f: F, ) -> impl ServiceFactory< Config = Cfg, + Service = Srv, Request = Srv::Request, Response = Srv::Response, Error = Srv::Error, diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index c32f46a4..7661915b 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -22,11 +22,11 @@ mod transform_err; pub use self::apply::{apply_fn, apply_fn_factory}; pub use self::apply_cfg::{apply_cfg, apply_cfg_factory}; -pub use self::fn_service::{service_fn, service_fn2, service_fn_config, service_fn_factory}; +pub use self::fn_service::{factory_fn, factory_fn_cfg, service_fn, service_fn2}; pub use self::into::{into_factory, into_service, ServiceFactoryMapper, ServiceMapper}; pub use self::map_config::{map_config, unit_config, MappedConfig}; pub use self::pipeline::{pipeline, pipeline_factory, Pipeline, PipelineFactory}; -pub use self::transform::{apply_transform, IntoTransform, Transform}; +pub use self::transform::{apply, IntoTransform, Transform}; /// An asynchronous function from `Request` to a `Response`. pub trait Service { diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index 9bed3eb8..3dc398f2 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -4,7 +4,7 @@ use std::rc::Rc; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::transform_err::{TransformFromErr, TransformMapInitErr}; +use crate::transform_err::TransformMapInitErr; use crate::{IntoServiceFactory, Service, ServiceFactory}; use pin_project::pin_project; @@ -49,32 +49,6 @@ pub trait Transform { { TransformMapInitErr::new(self, f) } - - /// Map this service's init error to any error implementing `From` for - /// this service`s `Error`. - /// - /// Note that this function consumes the receiving transform and returns a - /// wrapped version of it. - fn from_err(self) -> TransformFromErr - where - Self: Sized, - E: From, - { - TransformFromErr::new(self) - } - - // /// Map this service's init error to service's init error - // /// if it is implementing `Into` to this service`s `InitError`. - // /// - // /// Note that this function consumes the receiving transform and returns a - // /// wrapped version of it. - // fn into_err(self) -> TransformIntoErr - // where - // Self: Sized, - // Self::InitError: From, - // { - // TransformFromErr::new(self) - // } } impl Transform for Rc @@ -127,10 +101,10 @@ where } } -/// Apply transform to service factory. Function returns +/// Apply transform to a service. Function returns /// services factory that in initialization creates /// service and applies transform to this service. -pub fn apply_transform( +pub fn apply( t: F, service: U, ) -> impl ServiceFactory< diff --git a/actix-service/src/transform_err.rs b/actix-service/src/transform_err.rs index 4242353d..d41de357 100644 --- a/actix-service/src/transform_err.rs +++ b/actix-service/src/transform_err.rs @@ -89,79 +89,3 @@ where this.fut.poll(cx).map_err(this.f) } } - -/// Transform for the `from_err` combinator, changing the type of a new -/// transform's init error. -/// -/// This is created by the `Transform::from_err` method. -pub struct TransformFromErr { - t: T, - e: PhantomData<(S, E)>, -} - -impl TransformFromErr -where - T: Transform, - E: From, -{ - /// Create new `TransformFromErr` new transform instance - pub fn new(t: T) -> Self { - Self { t, e: PhantomData } - } -} - -impl Clone for TransformFromErr -where - T: Clone, -{ - fn clone(&self) -> Self { - Self { - t: self.t.clone(), - e: PhantomData, - } - } -} - -impl Transform for TransformFromErr -where - T: Transform, - E: From, -{ - type Request = T::Request; - type Response = T::Response; - type Error = T::Error; - type Transform = T::Transform; - - type InitError = E; - type Future = TransformFromErrFuture; - - fn new_transform(&self, service: S) -> Self::Future { - TransformFromErrFuture { - fut: self.t.new_transform(service), - _t: PhantomData, - } - } -} - -#[pin_project] -pub struct TransformFromErrFuture -where - T: Transform, - E: From, -{ - #[pin] - fut: T::Future, - _t: PhantomData, -} - -impl Future for TransformFromErrFuture -where - T: Transform, - E: From, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().fut.poll(cx).map_err(E::from) - } -} diff --git a/actix-testing/Cargo.toml b/actix-testing/Cargo.toml index eb9ac455..4e139e33 100644 --- a/actix-testing/Cargo.toml +++ b/actix-testing/Cargo.toml @@ -24,6 +24,6 @@ actix-service = "0.4.2" log = "0.4" net2 = "0.2" -futures = "0.1" -tokio-tcp = "0.1" -tokio-reactor = "0.1" +futures = "0.3.1" +tokio = "0.2.0-alpha.6" +tokio-net = { version = "0.2.0-alpha.6" } diff --git a/actix-testing/src/lib.rs b/actix-testing/src/lib.rs index f3607f92..e1c8eb53 100644 --- a/actix-testing/src/lib.rs +++ b/actix-testing/src/lib.rs @@ -3,12 +3,12 @@ use std::sync::mpsc; use std::{net, thread}; use actix_rt::System; -use actix_server::{Server, ServerBuilder, StreamServiceFactory}; +use actix_server::{Server, ServerBuilder, ServiceFactory}; pub use actix_server_config::{Io, ServerConfig}; use net2::TcpBuilder; -use tokio_reactor::Handle; -use tokio_tcp::TcpStream; +use tokio_net::driver::Handle; +use tokio_net::tcp::TcpStream; mod rt; pub use self::rt::*; @@ -75,7 +75,7 @@ impl TestServer { } /// Start new test server with application factory - pub fn with>(factory: F) -> TestServerRuntime { + pub fn with>(factory: F) -> TestServerRuntime { let (tx, rx) = mpsc::channel(); // run server in separate thread diff --git a/actix-testing/src/rt.rs b/actix-testing/src/rt.rs index 488f3d65..8a135a66 100644 --- a/actix-testing/src/rt.rs +++ b/actix-testing/src/rt.rs @@ -1,9 +1,11 @@ //! Various helpers for Actix applications to use during testing. use std::cell::RefCell; +use std::future::Future; use actix_rt::{System, SystemRunner}; use actix_service::Service; -use futures::future::{lazy, Future, IntoFuture}; +use futures::future::{lazy, FutureExt}; +// use futures_util::future::FutureExt; thread_local! { static RT: RefCell = { @@ -35,11 +37,11 @@ impl Drop for Inner { /// /// Note that this function is intended to be used only for testing purpose. /// This function panics on nested call. -pub fn block_on(f: F) -> Result +pub fn block_on(f: F) -> F::Output where - F: IntoFuture, + F: Future, { - RT.with(move |rt| rt.borrow_mut().get_mut().block_on(f.into_future())) + RT.with(move |rt| rt.borrow_mut().get_mut().block_on(f)) } /// Runs the provided function, blocking the current thread until the result @@ -52,21 +54,21 @@ where /// /// Note that this function is intended to be used only for testing purpose. /// This function panics on nested call. -pub fn block_fn(f: F) -> Result +pub fn block_fn(f: F) -> F::Output where F: FnOnce() -> R, - R: IntoFuture, + R: Future, { - RT.with(move |rt| rt.borrow_mut().get_mut().block_on(lazy(f))) + RT.with(move |rt| rt.borrow_mut().get_mut().block_on(lazy(|_| f()))) } /// Spawn future to the current test runtime. pub fn spawn(fut: F) where - F: Future + 'static, + F: Future + 'static, { run_on(move || { - actix_rt::spawn(fut); + actix_rt::spawn(fut.map(|_| ())); }); } @@ -78,12 +80,7 @@ pub fn run_on(f: F) -> R where F: FnOnce() -> R, { - RT.with(move |rt| { - rt.borrow_mut() - .get_mut() - .block_on(lazy(|| Ok::<_, ()>(f()))) - }) - .unwrap() + RT.with(move |rt| rt.borrow_mut().get_mut().block_on(lazy(|_| f()))) } /// Calls service and waits for response future completion. diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index ee3c1dba..9b91931b 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-utils" -version = "0.4.7" +version = "0.5.0" authors = ["Nikolay Kim "] description = "Actix utils - various actix net related services" keywords = ["network", "framework", "async", "futures"] @@ -22,9 +22,10 @@ actix-service = "0.4.1" actix-codec = "0.1.2" bytes = "0.4" either = "1.5.2" -futures = "0.1.25" -tokio-timer = "0.2.8" -tokio-current-thread = "0.1.4" +futures = "0.3.1" +pin-project = "0.4.5" +tokio-timer = "0.3.0-alpha.6" +tokio-executor = "=0.2.0-alpha.6" log = "0.4" [dev-dependencies] diff --git a/actix-utils/src/counter.rs b/actix-utils/src/counter.rs index 2f355094..07d62454 100644 --- a/actix-utils/src/counter.rs +++ b/actix-utils/src/counter.rs @@ -1,7 +1,8 @@ use std::cell::Cell; use std::rc::Rc; +use std::task; -use futures::task::AtomicTask; +use crate::task::LocalWaker; #[derive(Clone)] /// Simple counter with ability to notify task on reaching specific number @@ -12,7 +13,7 @@ pub struct Counter(Rc); struct CounterInner { count: Cell, capacity: usize, - task: AtomicTask, + task: LocalWaker, } impl Counter { @@ -21,7 +22,7 @@ impl Counter { Counter(Rc::new(CounterInner { capacity, count: Cell::new(0), - task: AtomicTask::new(), + task: LocalWaker::new(), })) } @@ -32,8 +33,8 @@ impl Counter { /// Check if counter is not at capacity. If counter at capacity /// it registers notification for current task. - pub fn available(&self) -> bool { - self.0.available() + pub fn available(&self, cx: &mut task::Context) -> bool { + self.0.available(cx) } /// Get total number of acquired counts @@ -66,15 +67,15 @@ impl CounterInner { let num = self.count.get(); self.count.set(num - 1); if num == self.capacity { - self.task.notify(); + self.task.wake(); } } - fn available(&self) -> bool { + fn available(&self, cx: &mut task::Context) -> bool { if self.count.get() < self.capacity { true } else { - self.task.register(); + self.task.register(cx.waker()); false } } diff --git a/actix-utils/src/either.rs b/actix-utils/src/either.rs index 6bd4e59e..2bc07611 100644 --- a/actix-utils/src/either.rs +++ b/actix-utils/src/either.rs @@ -1,6 +1,10 @@ //! Contains `Either` service and related types and functions. -use actix_service::{IntoNewService, NewService, Service}; -use futures::{future, try_ready, Async, Future, IntoFuture, Poll}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use actix_service::{Service, ServiceFactory}; +use futures::{future, ready, Future}; +use pin_project::pin_project; /// Combine two different service types into a single type. /// @@ -31,21 +35,21 @@ where type Error = A::Error; type Future = future::Either; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let left = self.left.poll_ready()?; - let right = self.right.poll_ready()?; + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + let left = self.left.poll_ready(cx)?; + let right = self.right.poll_ready(cx)?; if left.is_ready() && right.is_ready() { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } else { - Ok(Async::NotReady) + Poll::Pending } } fn call(&mut self, req: either::Either) -> Self::Future { match req { - either::Either::Left(req) => future::Either::A(self.left.call(req)), - either::Either::Right(req) => future::Either::B(self.right.call(req)), + either::Either::Left(req) => future::Either::Left(self.left.call(req)), + either::Either::Right(req) => future::Either::Right(self.right.call(req)), } } } @@ -57,29 +61,24 @@ pub struct Either { } impl Either { - pub fn new(srv_a: F1, srv_b: F2) -> Either + pub fn new(left: A, right: B) -> Either where - A: NewService, - B: NewService< + A: ServiceFactory, + B: ServiceFactory< Config = A::Config, Response = A::Response, Error = A::Error, InitError = A::InitError, >, - F1: IntoNewService, - F2: IntoNewService, { - Either { - left: srv_a.into_new_service(), - right: srv_b.into_new_service(), - } + Either { left, right } } } -impl NewService for Either +impl ServiceFactory for Either where - A: NewService, - B: NewService< + A: ServiceFactory, + B: ServiceFactory< Config = A::Config, Response = A::Response, Error = A::Error, @@ -113,37 +112,41 @@ impl Clone for Either { } } +#[pin_project] #[doc(hidden)] -pub struct EitherNewService { +pub struct EitherNewService { left: Option, right: Option, - left_fut: ::Future, - right_fut: ::Future, + #[pin] + left_fut: A::Future, + #[pin] + right_fut: B::Future, } impl Future for EitherNewService where - A: NewService, - B: NewService, + A: ServiceFactory, + B: ServiceFactory, { - type Item = EitherService; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if self.left.is_none() { - self.left = Some(try_ready!(self.left_fut.poll())); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + + if this.left.is_none() { + *this.left = Some(ready!(this.left_fut.poll(cx))?); } - if self.right.is_none() { - self.right = Some(try_ready!(self.right_fut.poll())); + if this.right.is_none() { + *this.right = Some(ready!(this.right_fut.poll(cx))?); } - if self.left.is_some() && self.right.is_some() { - Ok(Async::Ready(EitherService { - left: self.left.take().unwrap(), - right: self.right.take().unwrap(), + if this.left.is_some() && this.right.is_some() { + Poll::Ready(Ok(EitherService { + left: this.left.take().unwrap(), + right: this.right.take().unwrap(), })) } else { - Ok(Async::NotReady) + Poll::Pending } } } diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index 2fa2d8e9..daa40bb8 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -1,15 +1,19 @@ //! Framed dispatcher service and related utilities use std::collections::VecDeque; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{fmt, mem}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoService, Service}; -use futures::task::AtomicTask; -use futures::unsync::mpsc; -use futures::{Async, Future, Poll, Sink, Stream}; +use futures::future::{ready, FutureExt}; +use futures::{Future, Sink, Stream}; use log::debug; +use pin_project::pin_project; use crate::cell::Cell; +use crate::mpsc; +use crate::task::LocalWaker; type Request = ::Item; type Response = ::Item; @@ -68,22 +72,26 @@ pub enum FramedMessage { Close, } +type Rx = Option::Item>>>; +type Inner = Cell::Item, S::Error>>; + /// FramedTransport - is a future that reads frames from Framed object /// and pass then to the service. +#[pin_project] pub struct FramedTransport where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Encoder + Decoder, + T: AsyncRead + AsyncWrite + Unpin, + U: Encoder + Decoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { service: S, state: TransportState, framed: Framed, - rx: Option::Item>>>, + rx: Option::Item>>>, inner: Cell::Item, S::Error>>, } @@ -97,7 +105,7 @@ enum TransportState { struct FramedTransportInner { buf: VecDeque>, - task: AtomicTask, + task: LocalWaker, } impl FramedTransport @@ -105,130 +113,8 @@ where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, -{ - fn poll_read(&mut self) -> bool { - loop { - match self.service.poll_ready() { - Ok(Async::Ready(_)) => { - let item = match self.framed.poll() { - Ok(Async::Ready(Some(el))) => el, - Err(err) => { - self.state = - TransportState::FramedError(FramedTransportError::Decoder(err)); - return true; - } - Ok(Async::NotReady) => return false, - Ok(Async::Ready(None)) => { - self.state = TransportState::Stopping; - return true; - } - }; - - let mut cell = self.inner.clone(); - tokio_current_thread::spawn(self.service.call(item).then(move |item| { - let inner = cell.get_mut(); - inner.buf.push_back(item); - inner.task.notify(); - Ok(()) - })); - } - Ok(Async::NotReady) => return false, - Err(err) => { - self.state = TransportState::Error(FramedTransportError::Service(err)); - return true; - } - } - } - } - - /// write to framed object - fn poll_write(&mut self) -> bool { - let inner = self.inner.get_mut(); - let mut rx_done = self.rx.is_none(); - let mut buf_empty = inner.buf.is_empty(); - loop { - while !self.framed.is_write_buf_full() { - if !buf_empty { - match inner.buf.pop_front().unwrap() { - Ok(msg) => { - if let Err(err) = self.framed.force_send(msg) { - self.state = TransportState::FramedError( - FramedTransportError::Encoder(err), - ); - return true; - } - buf_empty = inner.buf.is_empty(); - } - Err(err) => { - self.state = - TransportState::Error(FramedTransportError::Service(err)); - return true; - } - } - } - - if !rx_done && self.rx.is_some() { - match self.rx.as_mut().unwrap().poll() { - Ok(Async::Ready(Some(FramedMessage::Message(msg)))) => { - if let Err(err) = self.framed.force_send(msg) { - self.state = TransportState::FramedError( - FramedTransportError::Encoder(err), - ); - return true; - } - } - Ok(Async::Ready(Some(FramedMessage::Close))) => { - self.state = TransportState::FlushAndStop; - return true; - } - Ok(Async::Ready(None)) => { - rx_done = true; - let _ = self.rx.take(); - } - Ok(Async::NotReady) => rx_done = true, - Err(_e) => { - rx_done = true; - let _ = self.rx.take(); - } - } - } - - if rx_done && buf_empty { - break; - } - } - - if !self.framed.is_write_buf_empty() { - match self.framed.poll_complete() { - Ok(Async::NotReady) => break, - Err(err) => { - debug!("Error sending data: {:?}", err); - self.state = - TransportState::FramedError(FramedTransportError::Encoder(err)); - return true; - } - Ok(Async::Ready(_)) => (), - } - } else { - break; - } - } - - false - } -} - -impl FramedTransport -where - S: Service, Response = Response>, - S::Error: 'static, - S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { @@ -240,7 +126,7 @@ where state: TransportState::Processing, inner: Cell::new(FramedTransportInner { buf: VecDeque::new(), - task: AtomicTask::new(), + task: LocalWaker::new(), }), } } @@ -248,7 +134,7 @@ where /// Get Sender pub fn set_receiver( mut self, - rx: mpsc::UnboundedReceiver::Item>>, + rx: mpsc::Receiver::Item>>, ) -> Self { self.rx = Some(rx); self @@ -283,51 +169,216 @@ where S: Service, Response = Response>, S::Error: 'static, S::Future: 'static, - T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, ::Item: 'static, ::Error: std::fmt::Debug, { - type Item = (); - type Error = FramedTransportError; + type Output = Result<(), FramedTransportError>; - fn poll(&mut self) -> Poll { - self.inner.get_ref().task.register(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.inner.get_ref().task.register(cx.waker()); - match mem::replace(&mut self.state, TransportState::Processing) { - TransportState::Processing => { - if self.poll_read() || self.poll_write() { - self.poll() - } else { - Ok(Async::NotReady) - } + let this = self.project(); + poll( + cx, + this.service, + this.state, + this.framed, + this.rx, + this.inner, + ) + } +} + +fn poll( + cx: &mut Context, + srv: &mut S, + state: &mut TransportState, + framed: &mut Framed, + rx: &mut Rx, + inner: &mut Inner, +) -> Poll>> +where + S: Service, Response = Response>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + match mem::replace(state, TransportState::Processing) { + TransportState::Processing => { + if poll_read(cx, srv, state, framed, inner) + || poll_write(cx, state, framed, rx, inner) + { + poll(cx, srv, state, framed, rx, inner) + } else { + Poll::Pending } - TransportState::Error(err) => { - if self.framed.is_write_buf_empty() - || (self.poll_write() || self.framed.is_write_buf_empty()) - { - Err(err) - } else { - self.state = TransportState::Error(err); - Ok(Async::NotReady) - } + } + TransportState::Error(err) => { + let is_empty = framed.is_write_buf_empty(); + if is_empty || (poll_write(cx, state, framed, rx, inner) || is_empty) { + Poll::Ready(Err(err)) + } else { + *state = TransportState::Error(err); + Poll::Pending } - TransportState::FlushAndStop => { - if !self.framed.is_write_buf_empty() { - match self.framed.poll_complete() { - Err(err) => { - debug!("Error sending data: {:?}", err); - Ok(Async::Ready(())) - } - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => Ok(Async::Ready(())), + } + TransportState::FlushAndStop => { + if !framed.is_write_buf_empty() { + match Pin::new(framed).poll_flush(cx) { + Poll::Ready(Err(err)) => { + debug!("Error sending data: {:?}", err); + Poll::Ready(Ok(())) } - } else { - Ok(Async::Ready(())) + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), } + } else { + Poll::Ready(Ok(())) + } + } + TransportState::FramedError(err) => Poll::Ready(Err(err)), + TransportState::Stopping => Poll::Ready(Ok(())), + } +} + +fn poll_read( + cx: &mut Context, + srv: &mut S, + state: &mut TransportState, + framed: &mut Framed, + inner: &mut Inner, +) -> bool +where + S: Service, Response = Response>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + loop { + match srv.poll_ready(cx) { + Poll::Ready(Ok(_)) => { + let item = match framed.poll_next_item(cx) { + Poll::Ready(Some(Ok(el))) => el, + Poll::Ready(Some(Err(err))) => { + *state = + TransportState::FramedError(FramedTransportError::Decoder(err)); + return true; + } + Poll::Pending => return false, + Poll::Ready(None) => { + *state = TransportState::Stopping; + return true; + } + }; + + let mut cell = inner.clone(); + let fut = srv.call(item).then(move |item| { + let inner = cell.get_mut(); + inner.buf.push_back(item); + inner.task.wake(); + ready(()) + }); + tokio_executor::current_thread::spawn(fut); + } + Poll::Pending => return false, + Poll::Ready(Err(err)) => { + *state = TransportState::Error(FramedTransportError::Service(err)); + return true; } - TransportState::FramedError(err) => Err(err), - TransportState::Stopping => Ok(Async::Ready(())), } } } + +/// write to framed object +fn poll_write( + cx: &mut Context, + state: &mut TransportState, + framed: &mut Framed, + rx: &mut Rx, + inner: &mut Inner, +) -> bool +where + S: Service, Response = Response>, + S::Error: 'static, + S::Future: 'static, + T: AsyncRead + AsyncWrite + Unpin, + U: Decoder + Encoder + Unpin, + ::Item: 'static, + ::Error: std::fmt::Debug, +{ + // let this = self.project(); + + let inner = inner.get_mut(); + let mut rx_done = rx.is_none(); + let mut buf_empty = inner.buf.is_empty(); + loop { + while !framed.is_write_buf_full() { + if !buf_empty { + match inner.buf.pop_front().unwrap() { + Ok(msg) => { + if let Err(err) = framed.force_send(msg) { + *state = + TransportState::FramedError(FramedTransportError::Encoder(err)); + return true; + } + buf_empty = inner.buf.is_empty(); + } + Err(err) => { + *state = TransportState::Error(FramedTransportError::Service(err)); + return true; + } + } + } + + if !rx_done && rx.is_some() { + match Pin::new(rx.as_mut().unwrap()).poll_next(cx) { + Poll::Ready(Some(FramedMessage::Message(msg))) => { + if let Err(err) = framed.force_send(msg) { + *state = + TransportState::FramedError(FramedTransportError::Encoder(err)); + return true; + } + } + Poll::Ready(Some(FramedMessage::Close)) => { + *state = TransportState::FlushAndStop; + return true; + } + Poll::Ready(None) => { + rx_done = true; + let _ = rx.take(); + } + Poll::Pending => rx_done = true, + } + } + + if rx_done && buf_empty { + break; + } + } + + if !framed.is_write_buf_empty() { + // match this.framed.poll_flush(cx) { + // Poll::Pending => break, + // Poll::Ready(Err(err)) => { + // debug!("Error sending data: {:?}", err); + // self.state = + // TransportState::FramedError(FramedTransportError::Encoder(err)); + // return true; + // } + // Poll::Ready(Ok(_)) => (), + // } + } else { + break; + } + } + + false +} diff --git a/actix-utils/src/inflight.rs b/actix-utils/src/inflight.rs index 03d90873..35cd904d 100644 --- a/actix-utils/src/inflight.rs +++ b/actix-utils/src/inflight.rs @@ -1,8 +1,11 @@ use std::convert::Infallible; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; use actix_service::{IntoService, Service, Transform}; -use futures::future::{ok, FutureResult}; -use futures::{Async, Future, Poll}; +use futures::future::{ok, Ready}; +use pin_project::pin_project; use super::counter::{Counter, CounterGuard}; @@ -32,7 +35,7 @@ impl Transform for InFlight { type Error = S::Error; type InitError = Infallible; type Transform = InFlightService; - type Future = FutureResult; + type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { ok(InFlightService::new(self.max_inflight, service)) @@ -68,14 +71,14 @@ where type Error = T::Error; type Future = InFlightServiceResponse; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - if let Async::NotReady = self.service.poll_ready()? { - Ok(Async::NotReady) - } else if !self.count.available() { + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + if let Poll::Pending = self.service.poll_ready(cx)? { + Poll::Pending + } else if !self.count.available(cx) { log::trace!("InFlight limit exceeded"); - Ok(Async::NotReady) + Poll::Pending } else { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } } @@ -87,31 +90,31 @@ where } } +#[pin_project] #[doc(hidden)] pub struct InFlightServiceResponse { + #[pin] fut: T::Future, _guard: CounterGuard, } impl Future for InFlightServiceResponse { - type Item = T::Response; - type Error = T::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - self.fut.poll() + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.project().fut.poll(cx) } } #[cfg(test)] mod tests { - use futures::future::lazy; - use futures::{Async, Poll}; + use std::task::{Context, Poll}; use std::time::Duration; use super::*; - use actix_service::blank::{Blank, BlankNewService}; - use actix_service::{NewService, Service, ServiceExt}; + use actix_service::{apply, factory_fn, Service, ServiceFactory}; + use futures::future::{lazy, ok, FutureExt, LocalBoxFuture}; struct SleepService(Duration); @@ -119,57 +122,49 @@ mod tests { type Request = (); type Response = (); type Error = (); - type Future = Box>; + type Future = LocalBoxFuture<'static, Result<(), ()>>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { - Box::new(tokio_timer::sleep(self.0).map_err(|_| ())) + tokio_timer::delay_for(self.0) + .then(|_| ok::<_, ()>(())) + .boxed_local() } } #[test] fn test_transform() { let wait_time = Duration::from_millis(50); - let _ = actix_rt::System::new("test").block_on(lazy(|| { - let mut srv = - Blank::new().and_then(InFlightService::new(1, SleepService(wait_time))); - assert_eq!(srv.poll_ready(), Ok(Async::Ready(()))); + let _ = actix_rt::System::new("test").block_on(async { + let mut srv = InFlightService::new(1, SleepService(wait_time)); + assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - let mut res = srv.call(()); - let _ = res.poll(); - assert_eq!(srv.poll_ready(), Ok(Async::NotReady)); + let res = srv.call(()); + assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - drop(res); - assert_eq!(srv.poll_ready(), Ok(Async::Ready(()))); - - Ok::<_, ()>(()) - })); + let _ = res.await; + assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); + }); } #[test] fn test_newtransform() { let wait_time = Duration::from_millis(50); - let _ = actix_rt::System::new("test").block_on(lazy(|| { - let srv = - BlankNewService::new().apply(InFlight::new(1), || Ok(SleepService(wait_time))); - if let Async::Ready(mut srv) = srv.new_service(&()).poll().unwrap() { - assert_eq!(srv.poll_ready(), Ok(Async::Ready(()))); + actix_rt::System::new("test").block_on(async { + let srv = apply(InFlight::new(1), factory_fn(|| ok(SleepService(wait_time)))); - let mut res = srv.call(()); - let _ = res.poll(); - assert_eq!(srv.poll_ready(), Ok(Async::NotReady)); + let mut srv = srv.new_service(&()).await.unwrap(); + assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); - drop(res); - assert_eq!(srv.poll_ready(), Ok(Async::Ready(()))); - } else { - panic!() - } + let res = srv.call(()); + assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Pending); - Ok::<_, ()>(()) - })); + let _ = res.await; + assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(()))); + }); } } diff --git a/actix-utils/src/keepalive.rs b/actix-utils/src/keepalive.rs index 05436d7d..6f24dbf6 100644 --- a/actix-utils/src/keepalive.rs +++ b/actix-utils/src/keepalive.rs @@ -1,11 +1,13 @@ use std::convert::Infallible; +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use actix_service::{NewService, Service}; -use futures::future::{ok, FutureResult}; -use futures::{Async, Future, Poll}; -use tokio_timer::Delay; +use actix_service::{Service, ServiceFactory}; +use futures::future::{ok, Ready}; +use tokio_timer::{delay, Delay}; use super::time::{LowResTime, LowResTimeService}; @@ -44,7 +46,7 @@ where } } -impl NewService for KeepAlive +impl ServiceFactory for KeepAlive where F: Fn() -> E + Clone, { @@ -54,7 +56,7 @@ where type InitError = Infallible; type Config = (); type Service = KeepAliveService; - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(KeepAliveService::new( @@ -85,7 +87,7 @@ where ka, time, expire, - delay: Delay::new(expire), + delay: delay(expire), _t: PhantomData, } } @@ -98,22 +100,21 @@ where type Request = R; type Response = R; type Error = E; - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - match self.delay.poll() { - Ok(Async::Ready(_)) => { + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { + match Pin::new(&mut self.delay).poll(cx) { + Poll::Ready(_) => { let now = self.time.now(); if self.expire <= now { - Err((self.f)()) + Poll::Ready(Err((self.f)())) } else { self.delay.reset(self.expire); - let _ = self.delay.poll(); - Ok(Async::Ready(())) + let _ = Pin::new(&mut self.delay).poll(cx); + Poll::Ready(Ok(())) } } - Ok(Async::NotReady) => Ok(Async::Ready(())), - Err(_e) => panic!(), + Poll::Pending => Poll::Ready(Ok(())), } } diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index 9d1a6b48..727362bf 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -6,7 +6,9 @@ pub mod either; pub mod framed; pub mod inflight; pub mod keepalive; +pub mod mpsc; +pub mod oneshot; pub mod order; -pub mod stream; +pub mod task; pub mod time; pub mod timeout; diff --git a/actix-utils/src/mpsc.rs b/actix-utils/src/mpsc.rs new file mode 100644 index 00000000..fe88ebc8 --- /dev/null +++ b/actix-utils/src/mpsc.rs @@ -0,0 +1,203 @@ +//! A multi-producer, single-consumer, futures-aware, FIFO queue with back +//! pressure, for use communicating between tasks on the same thread. +//! +//! These queues are the same as those in `futures::sync`, except they're not +//! intended to be sent across threads. + +use std::any::Any; +use std::cell::RefCell; +use std::collections::VecDeque; +use std::error::Error; +use std::pin::Pin; +use std::rc::{Rc, Weak}; +use std::task::{Context, Poll}; +use std::{fmt, mem}; + +use futures::{Sink, Stream}; + +use crate::task::LocalWaker; + +/// Creates a unbounded in-memory channel with buffered storage. +pub fn channel() -> (Sender, Receiver) { + let shared = Rc::new(RefCell::new(Shared { + buffer: VecDeque::new(), + blocked_recv: LocalWaker::new(), + })); + let sender = Sender { + shared: Rc::downgrade(&shared), + }; + let receiver = Receiver { + state: State::Open(shared), + }; + (sender, receiver) +} + +#[derive(Debug)] +struct Shared { + buffer: VecDeque, + blocked_recv: LocalWaker, +} + +/// The transmission end of a channel. +/// +/// This is created by the `channel` function. +#[derive(Debug)] +pub struct Sender { + shared: Weak>>, +} + +impl Sender { + /// Sends the provided message along this channel. + pub fn send(&self, item: T) -> Result<(), SendError> { + let shared = match self.shared.upgrade() { + Some(shared) => shared, + None => return Err(SendError(item)), // receiver was dropped + }; + let mut shared = shared.borrow_mut(); + + shared.buffer.push_back(item); + shared.blocked_recv.wake(); + Ok(()) + } +} + +impl Clone for Sender { + fn clone(&self) -> Self { + Sender { + shared: self.shared.clone(), + } + } +} + +impl Sink for Sender { + type Error = SendError; + + fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), SendError> { + self.send(item) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll>> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } +} + +impl Drop for Sender { + fn drop(&mut self) { + let shared = match self.shared.upgrade() { + Some(shared) => shared, + None => return, + }; + // The number of existing `Weak` indicates if we are possibly the last + // `Sender`. If we are the last, we possibly must notify a blocked + // `Receiver`. `self.shared` is always one of the `Weak` to this shared + // data. Therefore the smallest possible Rc::weak_count(&shared) is 1. + if Rc::weak_count(&shared) == 1 { + // Wake up receiver as its stream has ended + shared.borrow_mut().blocked_recv.wake(); + } + } +} + +/// The receiving end of a channel which implements the `Stream` trait. +/// +/// This is created by the `channel` function. +#[derive(Debug)] +pub struct Receiver { + state: State, +} + +impl Unpin for Receiver {} + +/// Possible states of a receiver. We're either Open (can receive more messages) +/// or we're closed with a list of messages we have left to receive. +#[derive(Debug)] +enum State { + Open(Rc>>), + Closed(VecDeque), +} + +impl Receiver { + /// Closes the receiving half + /// + /// This prevents any further messages from being sent on the channel while + /// still enabling the receiver to drain messages that are buffered. + pub fn close(&mut self) { + let items = match self.state { + State::Open(ref state) => { + let mut state = state.borrow_mut(); + let items = mem::replace(&mut state.buffer, VecDeque::new()); + items + } + State::Closed(_) => return, + }; + self.state = State::Closed(items); + } +} + +impl Stream for Receiver { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let me = match self.state { + State::Open(ref mut me) => me, + State::Closed(ref mut items) => return Poll::Ready(items.pop_front()), + }; + + if let Some(shared) = Rc::get_mut(me) { + // All senders have been dropped, so drain the buffer and end the + // stream. + return Poll::Ready(shared.borrow_mut().buffer.pop_front()); + } + + let mut shared = me.borrow_mut(); + if let Some(msg) = shared.buffer.pop_front() { + Poll::Ready(Some(msg)) + } else { + shared.blocked_recv.register(cx.waker()); + Poll::Pending + } + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + self.close(); + } +} + +/// Error type for sending, used when the receiving end of a channel is +/// dropped +pub struct SendError(T); + +impl fmt::Debug for SendError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_tuple("SendError").field(&"...").finish() + } +} + +impl fmt::Display for SendError { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "send failed because receiver is gone") + } +} + +impl Error for SendError { + fn description(&self) -> &str { + "send failed because receiver is gone" + } +} + +impl SendError { + /// Returns the message that was attempted to be sent but failed. + pub fn into_inner(self) -> T { + self.0 + } +} diff --git a/actix-utils/src/oneshot.rs b/actix-utils/src/oneshot.rs new file mode 100644 index 00000000..08e4841b --- /dev/null +++ b/actix-utils/src/oneshot.rs @@ -0,0 +1,209 @@ +//! A one-shot, futures-aware channel +//! +//! This channel is similar to that in `sync::oneshot` but cannot be sent across +//! threads. + +use std::cell::RefCell; +use std::future::Future; +use std::pin::Pin; +use std::rc::{Rc, Weak}; +use std::task::{Context, Poll}; + +pub use futures::channel::oneshot::Canceled; + +use crate::task::LocalWaker; + +/// Creates a new futures-aware, one-shot channel. +/// +/// This function is the same as `sync::oneshot::channel` except that the +/// returned values cannot be sent across threads. +pub fn channel() -> (Sender, Receiver) { + let inner = Rc::new(RefCell::new(Inner { + value: None, + tx_task: LocalWaker::new(), + rx_task: LocalWaker::new(), + })); + let tx = Sender { + inner: Rc::downgrade(&inner), + }; + let rx = Receiver { + state: State::Open(inner), + }; + (tx, rx) +} + +/// Represents the completion half of a oneshot through which the result of a +/// computation is signaled. +/// +/// This is created by the `unsync::oneshot::channel` function and is equivalent +/// in functionality to `sync::oneshot::Sender` except that it cannot be sent +/// across threads. +#[derive(Debug)] +pub struct Sender { + inner: Weak>>, +} + +/// A future representing the completion of a computation happening elsewhere in +/// memory. +/// +/// This is created by the `unsync::oneshot::channel` function and is equivalent +/// in functionality to `sync::oneshot::Receiver` except that it cannot be sent +/// across threads. +#[derive(Debug)] +#[must_use = "futures do nothing unless polled"] +pub struct Receiver { + state: State, +} + +// The channels do not ever project Pin to the inner T +impl Unpin for Receiver {} +impl Unpin for Sender {} + +#[derive(Debug)] +enum State { + Open(Rc>>), + Closed(Option), +} + +#[derive(Debug)] +struct Inner { + value: Option, + tx_task: LocalWaker, + rx_task: LocalWaker, +} + +impl Sender { + /// Completes this oneshot with a successful result. + /// + /// This function will consume `self` and indicate to the other end, the + /// `Receiver`, that the error provided is the result of the computation this + /// represents. + /// + /// If the value is successfully enqueued for the remote end to receive, + /// then `Ok(())` is returned. If the receiving end was deallocated before + /// this function was called, however, then `Err` is returned with the value + /// provided. + pub fn send(self, val: T) -> Result<(), T> { + if let Some(inner) = self.inner.upgrade() { + inner.borrow_mut().value = Some(val); + Ok(()) + } else { + Err(val) + } + } + + /// Polls this `Sender` half to detect whether the `Receiver` this has + /// paired with has gone away. + /// + /// This function can be used to learn about when the `Receiver` (consumer) + /// half has gone away and nothing will be able to receive a message sent + /// from `complete`. + /// + /// Like `Future::poll`, this function will panic if it's not called from + /// within the context of a task. In other words, this should only ever be + /// called from inside another future. + /// + /// If `Ready` is returned then it means that the `Receiver` has disappeared + /// and the result this `Sender` would otherwise produce should no longer + /// be produced. + /// + /// If `NotReady` is returned then the `Receiver` is still alive and may be + /// able to receive a message if sent. The current task, however, is + /// scheduled to receive a notification if the corresponding `Receiver` goes + /// away. + pub fn poll_canceled(&mut self, cx: &mut Context) -> Poll<()> { + match self.inner.upgrade() { + Some(inner) => { + inner.borrow_mut().tx_task.register(cx.waker()); + Poll::Pending + } + None => Poll::Ready(()), + } + } + + /// Tests to see whether this `Sender`'s corresponding `Receiver` + /// has gone away. + /// + /// This function can be used to learn about when the `Receiver` (consumer) + /// half has gone away and nothing will be able to receive a message sent + /// from `send`. + /// + /// Note that this function is intended to *not* be used in the context of a + /// future. If you're implementing a future you probably want to call the + /// `poll_cancel` function which will block the current task if the + /// cancellation hasn't happened yet. This can be useful when working on a + /// non-futures related thread, though, which would otherwise panic if + /// `poll_cancel` were called. + pub fn is_canceled(&self) -> bool { + !self.inner.upgrade().is_some() + } +} + +impl Drop for Sender { + fn drop(&mut self) { + let inner = match self.inner.upgrade() { + Some(inner) => inner, + None => return, + }; + inner.borrow().rx_task.wake(); + } +} + +impl Receiver { + /// Gracefully close this receiver, preventing sending any future messages. + /// + /// Any `send` operation which happens after this method returns is + /// guaranteed to fail. Once this method is called the normal `poll` method + /// can be used to determine whether a message was actually sent or not. If + /// `Canceled` is returned from `poll` then no message was sent. + pub fn close(&mut self) { + match self.state { + State::Open(ref inner) => { + let mut inner = inner.borrow_mut(); + inner.tx_task.wake(); + let value = inner.value.take(); + drop(inner); + + self.state = State::Closed(value); + } + State::Closed(_) => return, + }; + } +} + +impl Future for Receiver { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + let inner = match this.state { + State::Open(ref mut inner) => inner, + State::Closed(ref mut item) => match item.take() { + Some(item) => return Poll::Ready(Ok(item.into())), + None => return Poll::Ready(Err(Canceled)), + }, + }; + + // If we've got a value, then skip the logic below as we're done. + if let Some(val) = inner.borrow_mut().value.take() { + return Poll::Ready(Ok(val)); + } + + // If we can get mutable access, then the sender has gone away. We + // didn't see a value above, so we're canceled. Otherwise we park + // our task and wait for a value to come in. + if Rc::get_mut(inner).is_some() { + Poll::Ready(Err(Canceled)) + } else { + inner.borrow().rx_task.register(cx.waker()); + Poll::Pending + } + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + self.close(); + } +} diff --git a/actix-utils/src/order.rs b/actix-utils/src/order.rs index fa7bb750..88f2b77a 100644 --- a/actix-utils/src/order.rs +++ b/actix-utils/src/order.rs @@ -1,14 +1,17 @@ use std::collections::VecDeque; use std::convert::Infallible; use std::fmt; +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; use std::rc::Rc; +use std::task::{Context, Poll}; use actix_service::{IntoService, Service, Transform}; -use futures::future::{ok, FutureResult}; -use futures::task::AtomicTask; -use futures::unsync::oneshot; -use futures::{Async, Future, Poll}; +use futures::future::{ok, ready, FutureExt, Ready}; + +use crate::oneshot; +use crate::task::LocalWaker; struct Record { rx: oneshot::Receiver>, @@ -93,7 +96,7 @@ where type Error = InOrderError; type InitError = Infallible; type Transform = InOrderService; - type Future = FutureResult; + type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { ok(InOrderService::new(service)) @@ -102,7 +105,7 @@ where pub struct InOrderService { service: S, - task: Rc, + task: Rc, acks: VecDeque>, } @@ -120,7 +123,7 @@ where Self { service: service.into_service(), acks: VecDeque::new(), - task: Rc::new(AtomicTask::new()), + task: Rc::new(LocalWaker::new()), } } } @@ -137,28 +140,30 @@ where type Error = InOrderError; type Future = InOrderServiceResponse; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready(&mut self, cx: &mut Context) -> Poll> { // poll_ready could be called from different task - self.task.register(); + self.task.register(cx.waker()); // check acks while !self.acks.is_empty() { let rec = self.acks.front_mut().unwrap(); - match rec.rx.poll() { - Ok(Async::Ready(res)) => { + match Pin::new(&mut rec.rx).poll(cx) { + Poll::Ready(Ok(res)) => { let rec = self.acks.pop_front().unwrap(); let _ = rec.tx.send(res); } - Ok(Async::NotReady) => break, - Err(oneshot::Canceled) => return Err(InOrderError::Disconnected), + Poll::Pending => break, + Poll::Ready(Err(oneshot::Canceled)) => { + return Poll::Ready(Err(InOrderError::Disconnected)) + } } } // check nested service - if let Async::NotReady = self.service.poll_ready().map_err(InOrderError::Service)? { - Ok(Async::NotReady) + if let Poll::Pending = self.service.poll_ready(cx).map_err(InOrderError::Service)? { + Poll::Pending } else { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } } @@ -168,10 +173,10 @@ where self.acks.push_back(Record { rx: rx1, tx: tx2 }); let task = self.task.clone(); - tokio_current_thread::spawn(self.service.call(request).then(move |res| { - task.notify(); + tokio_executor::current_thread::spawn(self.service.call(request).then(move |res| { + task.wake(); let _ = tx1.send(res); - Ok(()) + ready(()) })); InOrderServiceResponse { rx: rx2 } @@ -184,29 +189,29 @@ pub struct InOrderServiceResponse { } impl Future for InOrderServiceResponse { - type Item = S::Response; - type Error = InOrderError; + type Output = Result>; - fn poll(&mut self) -> Poll { - match self.rx.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(Ok(res))) => Ok(Async::Ready(res)), - Ok(Async::Ready(Err(e))) => Err(e.into()), - Err(oneshot::Canceled) => Err(InOrderError::Disconnected), + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + match Pin::new(&mut self.rx).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(Ok(res))) => Poll::Ready(Ok(res)), + Poll::Ready(Ok(Err(e))) => Poll::Ready(Err(e.into())), + Poll::Ready(Err(_)) => Poll::Ready(Err(InOrderError::Disconnected)), } } } #[cfg(test)] mod tests { - use futures::future::{lazy, Future}; - use futures::{stream::futures_unordered, sync::oneshot, Async, Poll, Stream}; + use std::task::{Context, Poll}; use std::time::Duration; use super::*; - use actix_service::blank::Blank; - use actix_service::{Service, ServiceExt}; + use actix_service::Service; + use futures::channel::oneshot; + use futures::future::{lazy, LocalBoxFuture}; + use futures::stream::{futures_unordered::FuturesUnordered, StreamExt}; struct Srv; @@ -214,28 +219,14 @@ mod tests { type Request = oneshot::Receiver; type Response = usize; type Error = (); - type Future = Box>; + type Future = LocalBoxFuture<'static, Result>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: oneshot::Receiver) -> Self::Future { - Box::new(req.map_err(|_| ())) - } - } - - struct SrvPoll { - s: S, - } - - impl Future for SrvPoll { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - let _ = self.s.poll_ready(); - Ok(Async::NotReady) + req.map(|res| res.map_err(|_| ())).boxed_local() } } @@ -251,23 +242,26 @@ mod tests { let rx2 = rx2; let rx3 = rx3; let tx_stop = tx_stop; - let _ = actix_rt::System::new("test").block_on(lazy(move || { - let mut srv = Blank::new().and_then(InOrderService::new(Srv)); + let _ = actix_rt::System::new("test").block_on(async { + let mut srv = InOrderService::new(Srv); let res1 = srv.call(rx1); let res2 = srv.call(rx2); let res3 = srv.call(rx3); - tokio_current_thread::spawn(SrvPoll { s: srv }); - futures_unordered(vec![res1, res2, res3]) - .collect() - .and_then(move |res: Vec<_>| { - assert_eq!(res, vec![1, 2, 3]); - let _ = tx_stop.send(()); - actix_rt::System::current().stop(); - Ok(()) - }) - })); + let _ = lazy(|cx| srv.poll_ready(cx)).await; + + // dispatcher do this + tokio_timer::delay_for(Duration::from_millis(100)).await; + let _ = lazy(|cx| srv.poll_ready(cx)).await; + + assert_eq!(res1.await.unwrap(), 1); + assert_eq!(res2.await.unwrap(), 2); + assert_eq!(res3.await.unwrap(), 3); + + let _ = tx_stop.send(()); + actix_rt::System::current().stop(); + }); }); let _ = tx3.send(3); @@ -275,7 +269,7 @@ mod tests { let _ = tx2.send(2); let _ = tx1.send(1); - let _ = rx_stop.wait(); + let _ = actix_rt::System::new("test").block_on(rx_stop); let _ = h.join(); } } diff --git a/actix-utils/src/stream.rs b/actix-utils/src/stream.rs deleted file mode 100644 index e8ac8783..00000000 --- a/actix-utils/src/stream.rs +++ /dev/null @@ -1,151 +0,0 @@ -use std::marker::PhantomData; -use std::rc::Rc; - -use actix_service::{IntoService, NewService, Service}; -use futures::unsync::mpsc; -use futures::{Async, Future, Poll, Stream}; - -type Request = Result<::Item, ::Error>; - -pub trait IntoStream { - type Item; - type Error; - type Stream: Stream; - - fn into_stream(self) -> Self::Stream; -} - -impl IntoStream for T -where - T: Stream, -{ - type Item = T::Item; - type Error = T::Error; - type Stream = T; - - fn into_stream(self) -> Self::Stream { - self - } -} - -pub struct StreamService { - factory: Rc, - config: T::Config, - _t: PhantomData<(S, E)>, -} - -impl Service for StreamService -where - S: IntoStream + 'static, - T: NewService, Response = (), Error = E, InitError = E>, - T::Future: 'static, - T::Service: 'static, - ::Future: 'static, -{ - type Request = S; - type Response = (); - type Error = E; - type Future = Box>; - - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) - } - - fn call(&mut self, req: S) -> Self::Future { - Box::new( - self.factory - .new_service(&self.config) - .and_then(move |srv| StreamDispatcher::new(req, srv)), - ) - } -} - -pub struct StreamDispatcher -where - S: IntoStream + 'static, - T: Service, Response = ()> + 'static, - T::Future: 'static, -{ - stream: S, - service: T, - err_rx: mpsc::UnboundedReceiver, - err_tx: mpsc::UnboundedSender, -} - -impl StreamDispatcher -where - S: Stream, - T: Service, Response = ()>, - T::Future: 'static, -{ - pub fn new(stream: F1, service: F2) -> Self - where - F1: IntoStream, - F2: IntoService, - { - let (err_tx, err_rx) = mpsc::unbounded(); - StreamDispatcher { - err_rx, - err_tx, - stream: stream.into_stream(), - service: service.into_service(), - } - } -} - -impl Future for StreamDispatcher -where - S: Stream, - T: Service, Response = ()>, - T::Future: 'static, -{ - type Item = (); - type Error = T::Error; - - fn poll(&mut self) -> Poll { - if let Ok(Async::Ready(Some(e))) = self.err_rx.poll() { - return Err(e); - } - - loop { - match self.service.poll_ready()? { - Async::Ready(_) => match self.stream.poll() { - Ok(Async::Ready(Some(item))) => { - tokio_current_thread::spawn(StreamDispatcherService { - fut: self.service.call(Ok(item)), - stop: self.err_tx.clone(), - }) - } - Err(err) => tokio_current_thread::spawn(StreamDispatcherService { - fut: self.service.call(Err(err)), - stop: self.err_tx.clone(), - }), - Ok(Async::NotReady) => return Ok(Async::NotReady), - Ok(Async::Ready(None)) => return Ok(Async::Ready(())), - }, - Async::NotReady => return Ok(Async::NotReady), - } - } - } -} - -struct StreamDispatcherService { - fut: F, - stop: mpsc::UnboundedSender, -} - -impl Future for StreamDispatcherService { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll { - match self.fut.poll() { - Ok(Async::Ready(_)) => Ok(Async::Ready(())), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(e) => { - let _ = self.stop.unbounded_send(e); - Ok(Async::Ready(())) - } - } - } -} diff --git a/actix-utils/src/task.rs b/actix-utils/src/task.rs new file mode 100644 index 00000000..1370a362 --- /dev/null +++ b/actix-utils/src/task.rs @@ -0,0 +1,69 @@ +use std::cell::UnsafeCell; +use std::marker::PhantomData; +use std::task::Waker; +use std::{fmt, rc}; + +/// A synchronization primitive for task wakeup. +/// +/// Sometimes the task interested in a given event will change over time. +/// An `LocalWaker` can coordinate concurrent notifications with the consumer +/// potentially "updating" the underlying task to wake up. This is useful in +/// scenarios where a computation completes in another task and wants to +/// notify the consumer, but the consumer is in the process of being migrated to +/// a new logical task. +/// +/// Consumers should call `register` before checking the result of a computation +/// and producers should call `wake` after producing the computation (this +/// differs from the usual `thread::park` pattern). It is also permitted for +/// `wake` to be called **before** `register`. This results in a no-op. +/// +/// A single `AtomicWaker` may be reused for any number of calls to `register` or +/// `wake`. +pub struct LocalWaker { + waker: UnsafeCell>, + _t: PhantomData>, +} + +impl LocalWaker { + /// Create an `LocalWaker`. + pub fn new() -> Self { + LocalWaker { + waker: UnsafeCell::new(None), + _t: PhantomData, + } + } + + #[inline] + /// Registers the waker to be notified on calls to `wake`. + pub fn register(&self, waker: &Waker) { + unsafe { + let w = self.waker.get(); + if (*w).is_none() { + *w = Some(waker.clone()) + } + } + } + + #[inline] + /// Calls `wake` on the last `Waker` passed to `register`. + /// + /// If `register` has not been called yet, then this does nothing. + pub fn wake(&self) { + if let Some(waker) = self.take() { + waker.wake(); + } + } + + /// Returns the last `Waker` passed to `register`, so that the user can wake it. + /// + /// If a waker has not been registered, this returns `None`. + pub fn take(&self) -> Option { + unsafe { (*self.waker.get()).take() } + } +} + +impl fmt::Debug for LocalWaker { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "LocalWaker") + } +} diff --git a/actix-utils/src/time.rs b/actix-utils/src/time.rs index 0138ec3e..2c50d9dc 100644 --- a/actix-utils/src/time.rs +++ b/actix-utils/src/time.rs @@ -1,10 +1,10 @@ use std::convert::Infallible; +use std::task::{Context, Poll}; use std::time::{self, Duration, Instant}; -use actix_service::{NewService, Service}; -use futures::future::{ok, FutureResult}; -use futures::{Async, Future, Poll}; -use tokio_timer::sleep; +use actix_service::{Service, ServiceFactory}; +use futures::future::{ok, ready, FutureExt, Ready}; +use tokio_timer::delay_for; use super::cell::Cell; @@ -42,14 +42,14 @@ impl Default for LowResTime { } } -impl NewService for LowResTime { +impl ServiceFactory for LowResTime { type Request = (); type Response = Instant; type Error = Infallible; type InitError = Infallible; type Config = (); type Service = LowResTimeService; - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(self.timer()) @@ -79,12 +79,10 @@ impl LowResTimeService { b.resolution }; - tokio_current_thread::spawn(sleep(interval).map_err(|_| panic!()).and_then( - move |_| { - inner.get_mut().current.take(); - Ok(()) - }, - )); + tokio_executor::current_thread::spawn(delay_for(interval).then(move |_| { + inner.get_mut().current.take(); + ready(()) + })); now } } @@ -94,10 +92,10 @@ impl Service for LowResTimeService { type Request = (); type Response = Instant; type Error = Infallible; - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -146,12 +144,10 @@ impl SystemTimeService { b.resolution }; - tokio_current_thread::spawn(sleep(interval).map_err(|_| panic!()).and_then( - move |_| { - inner.get_mut().current.take(); - Ok(()) - }, - )); + tokio_executor::current_thread::spawn(delay_for(interval).then(move |_| { + inner.get_mut().current.take(); + ready(()) + })); now } } @@ -160,7 +156,6 @@ impl SystemTimeService { #[cfg(test)] mod tests { use super::*; - use futures::future; use std::time::{Duration, SystemTime}; /// State Under Test: Two calls of `SystemTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`. @@ -170,13 +165,11 @@ mod tests { fn system_time_service_time_does_not_immediately_change() { let resolution = Duration::from_millis(50); - let _ = actix_rt::System::new("test").block_on(future::lazy(|| { + let _ = actix_rt::System::new("test").block_on(async { let time_service = SystemTimeService::with(resolution); assert_eq!(time_service.now(), time_service.now()); - - Ok::<(), ()>(()) - })); + }); } /// State Under Test: Two calls of `LowResTimeService::now()` return the same value if they are done within resolution interval of `SystemTimeService`. @@ -186,13 +179,11 @@ mod tests { fn lowres_time_service_time_does_not_immediately_change() { let resolution = Duration::from_millis(50); - let _ = actix_rt::System::new("test").block_on(future::lazy(|| { + let _ = actix_rt::System::new("test").block_on(async { let time_service = LowResTimeService::with(resolution); assert_eq!(time_service.now(), time_service.now()); - - Ok::<(), ()>(()) - })); + }); } /// State Under Test: `SystemTimeService::now()` updates returned value every resolution period. @@ -204,7 +195,7 @@ mod tests { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(150); - let _ = actix_rt::System::new("test").block_on(future::lazy(|| { + actix_rt::System::new("test").block_on(async { let time_service = SystemTimeService::with(resolution); let first_time = time_service @@ -212,17 +203,15 @@ mod tests { .duration_since(SystemTime::UNIX_EPOCH) .unwrap(); - sleep(wait_time).then(move |_| { - let second_time = time_service - .now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(); + delay_for(wait_time).await; - assert!(second_time - first_time >= wait_time); + let second_time = time_service + .now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap(); - Ok::<(), ()>(()) - }) - })); + assert!(second_time - first_time >= wait_time); + }); } /// State Under Test: `LowResTimeService::now()` updates returned value every resolution period. @@ -234,18 +223,15 @@ mod tests { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(150); - let _ = actix_rt::System::new("test").block_on(future::lazy(|| { + let _ = actix_rt::System::new("test").block_on(async { let time_service = LowResTimeService::with(resolution); let first_time = time_service.now(); - sleep(wait_time).then(move |_| { - let second_time = time_service.now(); + delay_for(wait_time).await; - assert!(second_time - first_time >= wait_time); - - Ok::<(), ()>(()) - }) - })); + let second_time = time_service.now(); + assert!(second_time - first_time >= wait_time); + }); } } diff --git a/actix-utils/src/timeout.rs b/actix-utils/src/timeout.rs index e05788f7..2e8939cb 100644 --- a/actix-utils/src/timeout.rs +++ b/actix-utils/src/timeout.rs @@ -2,19 +2,21 @@ //! //! If the response does not complete within the specified timeout, the response //! will be aborted. -use std::fmt; +use std::future::Future; use std::marker::PhantomData; -use std::time::Duration; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{fmt, time}; use actix_service::{IntoService, Service, Transform}; -use futures::future::{ok, FutureResult}; -use futures::{Async, Future, Poll}; -use tokio_timer::{clock, Delay}; +use futures::future::{ok, Ready}; +use pin_project::pin_project; +use tokio_timer::{clock, delay, Delay}; /// Applies a timeout to requests. #[derive(Debug)] pub struct Timeout { - timeout: Duration, + timeout: time::Duration, _t: PhantomData, } @@ -66,7 +68,7 @@ impl PartialEq for TimeoutError { } impl Timeout { - pub fn new(timeout: Duration) -> Self { + pub fn new(timeout: time::Duration) -> Self { Timeout { timeout, _t: PhantomData, @@ -89,7 +91,7 @@ where type Error = TimeoutError; type InitError = E; type Transform = TimeoutService; - type Future = FutureResult; + type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { ok(TimeoutService { @@ -103,14 +105,14 @@ where #[derive(Debug, Clone)] pub struct TimeoutService { service: S, - timeout: Duration, + timeout: time::Duration, } impl TimeoutService where S: Service, { - pub fn new(timeout: Duration, service: U) -> Self + pub fn new(timeout: time::Duration, service: U) -> Self where U: IntoService, { @@ -130,21 +132,23 @@ where type Error = TimeoutError; type Future = TimeoutServiceResponse; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready().map_err(TimeoutError::Service) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(cx).map_err(TimeoutError::Service) } fn call(&mut self, request: S::Request) -> Self::Future { TimeoutServiceResponse { fut: self.service.call(request), - sleep: Delay::new(clock::now() + self.timeout), + sleep: delay(clock::now() + self.timeout), } } } /// `TimeoutService` response future +#[pin_project] #[derive(Debug)] pub struct TimeoutServiceResponse { + #[pin] fut: T::Future, sleep: Delay, } @@ -153,36 +157,34 @@ impl Future for TimeoutServiceResponse where T: Service, { - type Item = T::Response; - type Error = TimeoutError; + type Output = Result>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); - fn poll(&mut self) -> Poll { // First, try polling the future - match self.fut.poll() { - Ok(Async::Ready(v)) => return Ok(Async::Ready(v)), - Ok(Async::NotReady) => {} - Err(e) => return Err(TimeoutError::Service(e)), + match this.fut.poll(cx) { + Poll::Ready(Ok(v)) => return Poll::Ready(Ok(v)), + Poll::Ready(Err(e)) => return Poll::Ready(Err(TimeoutError::Service(e))), + Poll::Pending => {} } // Now check the sleep - match self.sleep.poll() { - Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(_)) => Err(TimeoutError::Timeout), - Err(_) => Err(TimeoutError::Timeout), + match Pin::new(&mut this.sleep).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_) => Poll::Ready(Err(TimeoutError::Timeout)), } } } #[cfg(test)] mod tests { - use futures::future::lazy; - use futures::{Async, Poll}; - + use std::task::{Context, Poll}; use std::time::Duration; use super::*; - use actix_service::blank::{Blank, BlankNewService}; - use actix_service::{NewService, Service, ServiceExt}; + use actix_service::{apply, factory_fn, Service, ServiceFactory}; + use futures::future::{ok, FutureExt, LocalBoxFuture}; struct SleepService(Duration); @@ -190,14 +192,16 @@ mod tests { type Request = (); type Response = (); type Error = (); - type Future = Box>; + type Future = LocalBoxFuture<'static, Result<(), ()>>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { - Box::new(tokio_timer::sleep(self.0).map_err(|_| ())) + tokio_timer::delay_for(self.0) + .then(|_| ok::<_, ()>(())) + .boxed_local() } } @@ -206,11 +210,10 @@ mod tests { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(50); - let res = actix_rt::System::new("test").block_on(lazy(|| { - let mut timeout = Blank::default() - .and_then(TimeoutService::new(resolution, SleepService(wait_time))); - timeout.call(()) - })); + let res = actix_rt::System::new("test").block_on(async { + let mut timeout = TimeoutService::new(resolution, SleepService(wait_time)); + timeout.call(()).await + }); assert_eq!(res, Ok(())); } @@ -219,11 +222,10 @@ mod tests { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(150); - let res = actix_rt::System::new("test").block_on(lazy(|| { - let mut timeout = Blank::default() - .and_then(TimeoutService::new(resolution, SleepService(wait_time))); - timeout.call(()) - })); + let res = actix_rt::System::new("test").block_on(async { + let mut timeout = TimeoutService::new(resolution, SleepService(wait_time)); + timeout.call(()).await + }); assert_eq!(res, Err(TimeoutError::Timeout)); } @@ -232,15 +234,15 @@ mod tests { let resolution = Duration::from_millis(100); let wait_time = Duration::from_millis(150); - let res = actix_rt::System::new("test").block_on(lazy(|| { - let timeout = BlankNewService::<(), (), ()>::default() - .apply(Timeout::new(resolution), || Ok(SleepService(wait_time))); - if let Async::Ready(mut to) = timeout.new_service(&()).poll().unwrap() { - to.call(()) - } else { - panic!() - } - })); + let res = actix_rt::System::new("test").block_on(async { + let timeout = apply( + Timeout::new(resolution), + factory_fn(|| ok::<_, ()>(SleepService(wait_time))), + ); + let mut srv = timeout.new_service(&()).await.unwrap(); + + srv.call(()).await + }); assert_eq!(res, Err(TimeoutError::Timeout)); } }