diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index 8bdf6bcf..c4908713 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -23,7 +23,9 @@ name = "actix_service" path = "src/lib.rs" [dependencies] -futures = "0.1.25" +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } +pin-project = "0.4.0-alpha.11" [dev-dependencies] +tokio = "0.2.0-alpha.4" actix-rt = "0.2" \ No newline at end of file diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 208c7b09..44bb1a45 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -1,14 +1,21 @@ -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; use super::{IntoNewService, NewService, Service}; use crate::cell::Cell; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::Context; + /// Service for the `and_then` combinator, chaining a computation onto the end /// of another service which completes successfully. /// /// This is created by the `ServiceExt::and_then` method. +#[pin_project] pub struct AndThen { + #[pin] a: A, + #[pin] b: Cell, } @@ -45,12 +52,16 @@ where type Error = A::Error; type Future = AndThenFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let not_ready = self.a.poll_ready()?.is_not_ready(); - if self.b.get_mut().poll_ready()?.is_not_ready() || not_ready { - Ok(Async::NotReady) + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let not_ready = !this.a.poll_ready(cx)?.is_ready(); + if !this.b.get_pin().poll_ready(cx)?.is_ready() || not_ready { + Poll::Pending } else { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } } @@ -59,13 +70,16 @@ where } } +#[pin_project] pub struct AndThenFuture where A: Service, B: Service, { b: Cell, + #[pin] fut_b: Option, + #[pin] fut_a: Option, } @@ -88,22 +102,33 @@ where A: Service, B: Service, { - type Item = B::Response; - type Error = A::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut_b { - return fut.poll(); - } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); - match self.fut_a.as_mut().expect("Bug in actix-service").poll() { - Ok(Async::Ready(resp)) => { - let _ = self.fut_a.take(); - self.fut_b = Some(self.b.get_mut().call(resp)); - self.poll() + loop { + let mut fut_a = this.fut_a.as_mut(); + let mut fut_b = this.fut_b.as_mut(); + + if let Some(fut) = fut_b.as_mut().as_pin_mut() { + return fut.poll(cx); + } + + match fut_a + .as_mut() + .as_pin_mut() + .expect("Bug in actix-service") + .poll(cx) + { + Poll::Ready(Ok(resp)) => { + fut_a.set(None); + let new_fut = this.b.get_mut().call(resp); + fut_b.set(Some(new_fut)); + } + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => return Poll::Pending, } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err), } } } @@ -174,13 +199,17 @@ where } } +#[pin_project] pub struct AndThenNewServiceFuture where A: NewService, B: NewService, { + #[pin] fut_b: B::Future, + #[pin] fut_a: A::Future, + a: Option, b: Option, } @@ -205,37 +234,35 @@ where A: NewService, B: NewService, { - type Item = AndThen; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if self.a.is_none() { - if let Async::Ready(service) = self.fut_a.poll()? { - self.a = Some(service); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if this.a.is_none() { + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + *this.a = Some(service); } } - - if self.b.is_none() { - if let Async::Ready(service) = self.fut_b.poll()? { - self.b = Some(service); + if this.b.is_none() { + if let Poll::Ready(service) = this.fut_b.poll(cx)? { + *this.b = Some(service); } } - - if self.a.is_some() && self.b.is_some() { - Ok(Async::Ready(AndThen::new( - self.a.take().unwrap(), - self.b.take().unwrap(), + if this.a.is_some() && this.b.is_some() { + Poll::Ready(Ok(AndThen::new( + this.a.take().unwrap(), + this.b.take().unwrap(), ))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{ok, FutureResult}; - use futures::{Async, Poll}; + use futures::future::{ok, poll_fn, ready, Ready}; + use futures::Poll; use std::cell::Cell; use std::rc::Rc; @@ -243,15 +270,19 @@ mod tests { use crate::{NewService, Service, ServiceExt}; struct Srv1(Rc>); + impl Service for Srv1 { type Request = &'static str; type Response = &'static str; type Error = (); - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.0.set(self.0.get() + 1); - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } fn call(&mut self, req: &'static str) -> Self::Future { @@ -266,11 +297,14 @@ mod tests { type Request = &'static str; type Response = (&'static str, &'static str); type Error = (); - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { + fn poll_ready( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { self.0.set(self.0.get() + 1); - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } fn call(&mut self, req: &'static str) -> Self::Future { @@ -278,39 +312,35 @@ mod tests { } } - #[test] - fn test_poll_ready() { + #[tokio::test] + async fn test_poll_ready() { let cnt = Rc::new(Cell::new(0)); let mut srv = Srv1(cnt.clone()).and_then(Srv2(cnt.clone())); - let res = srv.poll_ready(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(())); + let res = srv.poll_once().await; + assert_eq!(res, Poll::Ready(Ok(()))); assert_eq!(cnt.get(), 2); } - #[test] - fn test_call() { + #[tokio::test] + async fn test_call() { let cnt = Rc::new(Cell::new(0)); let mut srv = Srv1(cnt.clone()).and_then(Srv2(cnt)); - let res = srv.call("srv1").poll(); + let res = srv.call("srv1").await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv1", "srv2"))); + assert_eq!(res.unwrap(), (("srv1", "srv2"))); } - #[test] - fn test_new_service() { + #[tokio::test] + async fn test_new_service() { let cnt = Rc::new(Cell::new(0)); let cnt2 = cnt.clone(); - let blank = move || Ok::<_, ()>(Srv1(cnt2.clone())); + let blank = move || ready(Ok::<_, ()>(Srv1(cnt2.clone()))); let new_srv = blank .into_new_service() - .and_then(move || Ok(Srv2(cnt.clone()))); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - let res = srv.call("srv1").poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv1", "srv2"))); - } else { - panic!() - } + .and_then(move || ready(Ok(Srv2(cnt.clone())))); + let mut srv = new_srv.new_service(&()).await.unwrap(); + let res = srv.call("srv1").await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), ("srv1", "srv2")); } } diff --git a/actix-service/src/and_then_apply.rs b/actix-service/src/and_then_apply.rs index a8ce6991..f26b1517 100644 --- a/actix-service/src/and_then_apply.rs +++ b/actix-service/src/and_then_apply.rs @@ -1,11 +1,15 @@ use std::rc::Rc; -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; use crate::and_then::AndThen; use crate::from_err::FromErr; use crate::{NewService, Transform}; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::Context; + /// `Apply` new service combinator pub struct AndThenTransform { a: A, @@ -72,6 +76,7 @@ where } } +#[pin_project] pub struct AndThenTransformFuture where A: NewService, @@ -79,8 +84,11 @@ where T: Transform, T::Error: From, { + #[pin] fut_a: A::Future, + #[pin] fut_b: B::Future, + #[pin] fut_t: Option, a: Option, t: Option, @@ -94,56 +102,63 @@ where T: Transform, T::Error: From, { - type Item = AndThen, T::Transform>; - type Error = T::InitError; + type Output = Result, T::Transform>, T::InitError>; - fn poll(&mut self) -> Poll { - if self.fut_t.is_none() { - if let Async::Ready(service) = self.fut_b.poll()? { - self.fut_t = Some(self.t_cell.new_transform(service)); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + if this.fut_t.is_none() { + if let Poll::Ready(svc) = this.fut_b.poll(cx)? { + this.fut_t.set(Some(this.t_cell.new_transform(svc))) } } - if self.a.is_none() { - if let Async::Ready(service) = self.fut_a.poll()? { - self.a = Some(service); + if this.a.is_none() { + if let Poll::Ready(svc) = this.fut_a.poll(cx)? { + *this.a = Some(svc) } } - if let Some(ref mut fut) = self.fut_t { - if let Async::Ready(transform) = fut.poll()? { - self.t = Some(transform); + if let Some(fut) = this.fut_t.as_pin_mut() { + if let Poll::Ready(transform) = fut.poll(cx)? { + *this.t = Some(transform) } } - if self.a.is_some() && self.t.is_some() { - Ok(Async::Ready(AndThen::new( - FromErr::new(self.a.take().unwrap()), - self.t.take().unwrap(), + if this.a.is_some() && this.t.is_some() { + Poll::Ready(Ok(AndThen::new( + FromErr::new(this.a.take().unwrap()), + this.t.take().unwrap(), ))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{ok, FutureResult}; - use futures::{Async, Future, Poll}; + use futures::future::{ok, ready, Ready}; + use futures::{Future, FutureExt, Poll, TryFutureExt}; use crate::{IntoNewService, IntoService, NewService, Service, ServiceExt}; + use std::pin::Pin; + use std::task::Context; #[derive(Clone)] struct Srv; + impl Service for Srv { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Err(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -151,36 +166,32 @@ mod tests { } } - #[test] - fn test_apply() { - let blank = |req| Ok(req); + #[tokio::test] + async fn test_apply() { + let blank = |req| ready(Ok(req)); let mut srv = blank .into_service() .apply_fn(Srv, |req: &'static str, srv: &mut Srv| { - srv.call(()).map(move |res| (req, res)) + srv.call(()).map_ok(move |res| (req, res)) }); - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); + let res = srv.call("srv").await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); + assert_eq!(res.unwrap(), (("srv", ()))); } - #[test] - fn test_new_service() { - let blank = || Ok::<_, ()>((|req| Ok(req)).into_service()); + #[tokio::test] + async fn test_new_service() { + let blank = move || ok::<_, ()>((|req| ok(req)).into_service()); let new_srv = blank.into_new_service().apply( - |req: &'static str, srv: &mut Srv| srv.call(()).map(move |res| (req, res)), - || Ok(Srv), + |req: &'static str, srv: &mut Srv| srv.call(()).map_ok(move |res| (req, res)), + || ok(Srv), ); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); - } else { - panic!() - } + let mut srv = new_srv.new_service(&()).await.unwrap(); + + let res = srv.call("srv").await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), (("srv", ()))); } } diff --git a/actix-service/src/and_then_apply_fn.rs b/actix-service/src/and_then_apply_fn.rs index 5fd846b7..6b006c73 100644 --- a/actix-service/src/and_then_apply_fn.rs +++ b/actix-service/src/and_then_apply_fn.rs @@ -1,11 +1,16 @@ +use futures::{Future, Poll}; use std::marker::PhantomData; - -use futures::{Async, Future, IntoFuture, Poll}; +use std::pin::Pin; +use std::task::Context; use super::{IntoNewService, IntoService, NewService, Service}; use crate::cell::Cell; +use crate::IntoFuture; +use pin_project::pin_project; + /// `Apply` service combinator +#[pin_project] pub struct AndThenApply where A: Service, @@ -14,8 +19,11 @@ where Out: IntoFuture, Out::Error: Into, { + #[pin] a: A, + #[pin] b: Cell, + #[pin] f: Cell, r: PhantomData<(Out,)>, } @@ -70,12 +78,16 @@ where type Error = A::Error; type Future = AndThenApplyFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let not_ready = self.a.poll_ready()?.is_not_ready(); - if self.b.get_mut().poll_ready()?.is_not_ready() || not_ready { - Ok(Async::NotReady) + fn poll_ready( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let not_ready = !this.a.poll_ready(ctx)?.is_ready(); + if !this.b.get_pin().poll_ready(ctx).is_ready() || not_ready { + Poll::Pending } else { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } } @@ -89,6 +101,7 @@ where } } +#[pin_project] pub struct AndThenApplyFuture where A: Service, @@ -99,7 +112,9 @@ where { b: Cell, f: Cell, + #[pin] fut_a: Option, + #[pin] fut_b: Option, } @@ -111,23 +126,30 @@ where Out: IntoFuture, Out::Error: Into, { - type Item = Out::Item; - type Error = A::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut_b { - return fut.poll().map_err(|e| e.into()); - } - - match self.fut_a.as_mut().expect("Bug in actix-service").poll() { - Ok(Async::Ready(resp)) => { - let _ = self.fut_a.take(); - self.fut_b = - Some((&mut *self.f.get_mut())(resp, self.b.get_mut()).into_future()); - self.poll() + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + if let Some(fut) = this.fut_b.as_mut().as_pin_mut() { + return fut.poll(cx).map_err(|e| e.into()); + } + + match this + .fut_a + .as_mut() + .as_pin_mut() + .expect("Bug in actix-service") + .poll(cx)? + { + Poll::Ready(resp) => { + this.fut_a.set(None); + this.fut_b.set(Some( + (&mut *this.f.get_mut())(resp, this.b.get_mut()).into_future(), + )); + } + Poll::Pending => return Poll::Pending, } - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(err) => Err(err), } } } @@ -195,12 +217,13 @@ where a: None, b: None, f: self.f.clone(), - fut_a: self.a.new_service(cfg).into_future(), - fut_b: self.b.new_service(cfg).into_future(), + fut_a: self.a.new_service(cfg), + fut_b: self.b.new_service(cfg), } } } +#[pin_project] pub struct AndThenApplyNewServiceFuture where A: NewService, @@ -209,7 +232,9 @@ where Out: IntoFuture, Out::Error: Into, { + #[pin] fut_b: B::Future, + #[pin] fut_a: A::Future, f: Cell, a: Option, @@ -224,53 +249,60 @@ where Out: IntoFuture, Out::Error: Into, { - type Item = AndThenApply; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if self.a.is_none() { - if let Async::Ready(service) = self.fut_a.poll()? { - self.a = Some(service); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + if this.a.is_none() { + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + *this.a = Some(service); } } - if self.b.is_none() { - if let Async::Ready(service) = self.fut_b.poll()? { - self.b = Some(service); + if this.b.is_none() { + if let Poll::Ready(service) = this.fut_b.poll(cx)? { + *this.b = Some(service); } } - if self.a.is_some() && self.b.is_some() { - Ok(Async::Ready(AndThenApply { - f: self.f.clone(), - a: self.a.take().unwrap(), - b: Cell::new(self.b.take().unwrap()), + if this.a.is_some() && this.b.is_some() { + Poll::Ready(Ok(AndThenApply { + f: this.f.clone(), + a: this.a.take().unwrap(), + b: Cell::new(this.b.take().unwrap()), r: PhantomData, })) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{ok, FutureResult}; - use futures::{Async, Future, Poll}; + use futures::future::{ok, Ready}; + use futures::{Future, Poll, TryFutureExt}; use crate::blank::{Blank, BlankNewService}; use crate::{NewService, Service, ServiceExt}; + use std::pin::Pin; + use std::task::Context; #[derive(Clone)] struct Srv; + impl Service for Srv { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -278,30 +310,27 @@ mod tests { } } - #[test] - fn test_call() { + #[tokio::test] + async fn test_call() { let mut srv = Blank::new().apply_fn(Srv, |req: &'static str, srv| { - srv.call(()).map(move |res| (req, res)) + srv.call(()).map_ok(move |res| (req, res)) }); - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); + assert_eq!(srv.poll_once().await, Poll::Ready(Ok(()))); + let res = srv.call("srv").await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); + assert_eq!(res.unwrap(), (("srv", ()))); } - #[test] - fn test_new_service() { + #[tokio::test] + async fn test_new_service() { let new_srv = BlankNewService::new_unit().apply_fn( - || Ok(Srv), - |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), + || ok(Srv), + |req: &'static str, srv| srv.call(()).map_ok(move |res| (req, res)), ); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); - } else { - panic!() - } + let mut srv = new_srv.new_service(&()).await.unwrap(); + assert_eq!(srv.poll_once().await, Poll::Ready(Ok(()))); + let res = srv.call("srv").await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), (("srv", ()))); } } diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index d5f84463..9bb96773 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -1,9 +1,14 @@ use std::marker::PhantomData; -use futures::{Async, Future, IntoFuture, Poll}; +use futures::{ready, Future, Poll}; use super::{IntoNewService, IntoService, NewService, Service}; +use crate::IntoFuture; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::Context; + /// Apply tranform function to a service pub fn apply_fn(service: U, f: F) -> Apply where @@ -30,10 +35,12 @@ where #[doc(hidden)] /// `Apply` service combinator +#[pin_project] pub struct Apply where T: Service, { + #[pin] service: T, f: F, r: PhantomData<(In, Out)>, @@ -82,8 +89,11 @@ where type Error = Out::Error; type Future = Out::Future; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready().map_err(|e| e.into()) + fn poll_ready( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(ready!(self.project().service.poll_ready(ctx)).map_err(|e| e.into())) } fn call(&mut self, req: In) -> Self::Future { @@ -154,12 +164,14 @@ where } } +#[pin_project] pub struct ApplyNewServiceFuture where T: NewService, F: FnMut(In, &mut T::Service) -> Out + Clone, Out: IntoFuture, { + #[pin] fut: T::Future, f: Option, r: PhantomData<(In, Out)>, @@ -187,36 +199,40 @@ where Out: IntoFuture, Out::Error: From, { - type Item = Apply; - type Error = T::InitError; + type Output = Result, T::InitError>; - fn poll(&mut self) -> Poll { - if let Async::Ready(service) = self.fut.poll()? { - Ok(Async::Ready(Apply::new(service, self.f.take().unwrap()))) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if let Poll::Ready(svc) = this.fut.poll(cx)? { + Poll::Ready(Ok(Apply::new(svc, this.f.take().unwrap()))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{ok, FutureResult}; - use futures::{Async, Future, Poll}; + use futures::future::{ok, Ready}; + use futures::{Future, Poll, TryFutureExt}; use super::*; use crate::{IntoService, NewService, Service, ServiceExt}; #[derive(Clone)] struct Srv; + impl Service for Srv { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -224,34 +240,33 @@ mod tests { } } - #[test] - fn test_call() { - let blank = |req| Ok(req); + #[tokio::test] + async fn test_call() { + let blank = |req| ok(req); let mut srv = blank .into_service() .apply_fn(Srv, |req: &'static str, srv| { - srv.call(()).map(move |res| (req, res)) + srv.call(()).map_ok(move |res| (req, res)) }); - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); + assert_eq!(srv.poll_once().await, Poll::Ready(Ok(()))); + let res = srv.call("srv").await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); + assert_eq!(res.unwrap(), (("srv", ()))); } - #[test] - fn test_new_service() { + #[tokio::test] + async fn test_new_service() { let new_srv = ApplyNewService::new( - || Ok::<_, ()>(Srv), - |req: &'static str, srv| srv.call(()).map(move |res| (req, res)), + || ok::<_, ()>(Srv), + |req: &'static str, srv| srv.call(()).map_ok(move |res| (req, res)), ); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - assert!(srv.poll_ready().is_ok()); - let res = srv.call("srv").poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv", ()))); - } else { - panic!() - } + + let mut srv = new_srv.new_service(&()).await.unwrap(); + + assert_eq!(srv.poll_once().await, Poll::Ready(Ok(()))); + let res = srv.call("srv").await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), (("srv", ()))); } } diff --git a/actix-service/src/apply_cfg.rs b/actix-service/src/apply_cfg.rs index a63de936..d0ce6fa1 100644 --- a/actix-service/src/apply_cfg.rs +++ b/actix-service/src/apply_cfg.rs @@ -1,10 +1,14 @@ use std::marker::PhantomData; use futures::future::Future; -use futures::{try_ready, Async, IntoFuture, Poll}; +use futures::{ready, Poll}; use crate::cell::Cell; -use crate::{IntoService, NewService, Service}; +use crate::{IntoFuture, IntoService, NewService, Service}; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// Convert `Fn(&Config, &mut Service) -> Future` fn to a NewService pub fn apply_cfg( @@ -61,7 +65,8 @@ where } } -/// Convert `Fn(&Config) -> Future` fn to NewService +/// Convert `Fn(&Config) -> Future` fn to NewService\ +#[pin_project] struct ApplyConfigService where F: FnMut(&C, &mut T) -> R, @@ -71,6 +76,7 @@ where S: Service, { f: Cell, + #[pin] srv: Cell, _t: PhantomData<(C, R, S)>, } @@ -118,12 +124,14 @@ where } } +#[pin_project] struct FnNewServiceConfigFut where R: IntoFuture, R::Item: IntoService, S: Service, { + #[pin] fut: R::Future, _t: PhantomData<(S,)>, } @@ -134,11 +142,10 @@ where R::Item: IntoService, S: Service, { - type Item = S; - type Error = R::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - Ok(Async::Ready(try_ready!(self.fut.poll()).into_service())) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(Ok(ready!(self.project().fut.poll(cx))?.into_service())) } } @@ -206,6 +213,7 @@ where } } +#[pin_project] struct ApplyConfigNewServiceFut where C: Clone, @@ -218,8 +226,11 @@ where { cfg: C, f: Cell, + #[pin] srv: Option, + #[pin] srv_fut: Option, + #[pin] fut: Option, _t: PhantomData<(S,)>, } @@ -234,33 +245,38 @@ where R::Item: IntoService, S: Service, { - type Item = S; - type Error = R::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.srv_fut { - match fut.poll()? { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(srv) => { - let _ = self.srv_fut.take(); - self.srv = Some(srv); - return self.poll(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + 'poll: loop { + if let Some(fut) = this.srv_fut.as_mut().as_pin_mut() { + match fut.poll(cx)? { + Poll::Pending => return Poll::Pending, + Poll::Ready(srv) => { + this.srv_fut.set(None); + this.srv.set(Some(srv)); + continue 'poll; + } } } - } - if let Some(ref mut fut) = self.fut { - Ok(Async::Ready(try_ready!(fut.poll()).into_service())) - } else if let Some(ref mut srv) = self.srv { - match srv.poll_ready()? { - Async::NotReady => Ok(Async::NotReady), - Async::Ready(_) => { - self.fut = Some(self.f.get_mut()(&self.cfg, srv).into_future()); - return self.poll(); + if let Some(fut) = this.fut.as_mut().as_pin_mut() { + return Poll::Ready(Ok(ready!(fut.poll(cx))?.into_service())); + } else if let Some(mut srv) = this.srv.as_mut().as_pin_mut() { + match srv.as_mut().poll_ready(cx)? { + Poll::Ready(_) => { + this.fut.set(Some( + this.f.get_mut()(&this.cfg, unsafe { Pin::get_unchecked_mut(srv) }) + .into_future(), + )); + continue 'poll; + } + Poll::Pending => return Poll::Pending, } + } else { + return Poll::Pending; } - } else { - Ok(Async::NotReady) } } } diff --git a/actix-service/src/blank.rs b/actix-service/src/blank.rs index d02f75f8..e213c4c0 100644 --- a/actix-service/src/blank.rs +++ b/actix-service/src/blank.rs @@ -1,9 +1,11 @@ use std::marker::PhantomData; -use futures::future::{ok, FutureResult}; -use futures::{Async, Poll}; +use futures::future::{ok, Ready}; +use futures::Poll; use super::{NewService, Service}; +use std::pin::Pin; +use std::task::Context; /// Empty service #[derive(Clone)] @@ -34,10 +36,13 @@ impl Service for Blank { type Request = R; type Response = R; type Error = E; - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + _ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: R) -> Self::Future { @@ -76,7 +81,7 @@ impl NewService for BlankNewService { type Config = (); type Service = Blank; type InitError = E2; - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(Blank::default()) diff --git a/actix-service/src/boxed.rs b/actix-service/src/boxed.rs index 16e37fa2..b405ca26 100644 --- a/actix-service/src/boxed.rs +++ b/actix-service/src/boxed.rs @@ -1,7 +1,11 @@ -use futures::future::{err, ok, Either, FutureResult}; -use futures::{Async, Future, IntoFuture, Poll}; +use std::pin::Pin; -use crate::{NewService, Service}; +use crate::{IntoFuture, NewService, Service, ServiceExt}; +use futures::future::FutureExt; +use futures::future::LocalBoxFuture; +use futures::future::{err, ok, Either, Ready}; +use futures::{Future, Poll}; +use std::task::Context; pub type BoxedService = Box< dyn Service< @@ -13,7 +17,7 @@ pub type BoxedService = Box< >; pub type BoxedServiceResponse = - Either, Box>>; + Either>, LocalBoxFuture<'static, Result>>; pub struct BoxedNewService(Inner); @@ -53,7 +57,7 @@ type Inner = Box< Error = Err, InitError = InitErr, Service = BoxedService, - Future = Box, Error = InitErr>>, + Future = LocalBoxFuture<'static, Result, InitErr>>, >, >; @@ -70,7 +74,8 @@ where type InitError = InitErr; type Config = C; type Service = BoxedService; - type Future = Box>; + + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, cfg: &C) -> Self::Future { self.0.new_service(cfg) @@ -99,15 +104,18 @@ where type InitError = InitErr; type Config = C; type Service = BoxedService; - type Future = Box>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, cfg: &C) -> Self::Future { - Box::new( + /* TODO: Figure out what the hell is hapenning here + Box::new( self.service .new_service(cfg) .into_future() .map(ServiceWrapper::boxed), ) + */ + unimplemented!() } } @@ -132,10 +140,22 @@ where type Response = Res; type Error = Err; type Future = Either< - FutureResult, - Box>, + Ready>, + LocalBoxFuture<'static, Result>, >; + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + unimplemented!() + } + + fn call(&mut self, req: Self::Request) -> Self::Future { + unimplemented!() + } + + /* fn poll_ready(&mut self) -> Poll<(), Self::Error> { self.0.poll_ready() } @@ -148,4 +168,5 @@ where Ok(Async::NotReady) => Either::B(Box::new(fut)), } } + */ } diff --git a/actix-service/src/cell.rs b/actix-service/src/cell.rs index 2dc6d6a6..e9bb1bcf 100644 --- a/actix-service/src/cell.rs +++ b/actix-service/src/cell.rs @@ -1,4 +1,5 @@ //! Custom cell impl +use std::pin::Pin; use std::{cell::UnsafeCell, fmt, rc::Rc}; pub(crate) struct Cell { @@ -33,6 +34,9 @@ impl Cell { pub(crate) fn get_mut(&mut self) -> &mut T { unsafe { &mut *self.inner.as_ref().get() } } + pub(crate) fn get_pin(self: Pin<&mut Self>) -> Pin<&mut T> { + unsafe { Pin::new_unchecked(&mut *Pin::get_unchecked_mut(self).inner.as_ref().get()) } + } #[allow(clippy::mut_from_ref)] pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T { diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index 8adcfd55..5deeec66 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -1,9 +1,14 @@ use std::marker::PhantomData; -use futures::future::{ok, Future, FutureResult}; -use futures::{try_ready, Async, IntoFuture, Poll}; +use crate::IntoFuture; +use futures::future::{ok, Future, Ready}; +use futures::{ready, Poll}; use crate::{IntoNewService, IntoService, NewService, Service}; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// Create `NewService` for function that can act as a Service pub fn service_fn(f: F) -> NewServiceFn @@ -75,8 +80,11 @@ where type Error = Out::Error; type Future = Out::Future; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + _ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: Req) -> Self::Future { @@ -135,7 +143,7 @@ where type Config = Cfg; type Service = ServiceFn; type InitError = (); - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &Cfg) -> Self::Future { ok(ServiceFn::new(self.f.clone())) @@ -210,12 +218,14 @@ where } } +#[pin_project] pub struct FnNewServiceConfigFut where R: IntoFuture, R::Item: IntoService, S: Service, { + #[pin] fut: R::Future, _t: PhantomData<(S,)>, } @@ -226,11 +236,10 @@ where R::Item: IntoService, S: Service, { - type Item = S; - type Error = R::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - Ok(Async::Ready(try_ready!(self.fut.poll()).into_service())) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(Ok(ready!(self.project().fut.poll(cx))?.into_service())) } } diff --git a/actix-service/src/fn_transform.rs b/actix-service/src/fn_transform.rs index 2e4be2d5..ea132ada 100644 --- a/actix-service/src/fn_transform.rs +++ b/actix-service/src/fn_transform.rs @@ -1,10 +1,9 @@ use std::marker::PhantomData; -use futures::future::{ok, FutureResult}; -use futures::IntoFuture; +use futures::future::{ok, Ready}; use crate::apply::Apply; -use crate::{IntoTransform, Service, Transform}; +use crate::{IntoFuture, IntoTransform, Service, Transform}; /// Use function as transform service pub fn transform_fn( @@ -50,7 +49,7 @@ where type Error = Out::Error; type Transform = Apply; type InitError = Err; - type Future = FutureResult; + type Future = Ready>; fn new_transform(&self, service: S) -> Self::Future { ok(Apply::new(service, self.f.clone())) diff --git a/actix-service/src/from_err.rs b/actix-service/src/from_err.rs index 5d37d725..72281972 100644 --- a/actix-service/src/from_err.rs +++ b/actix-service/src/from_err.rs @@ -1,13 +1,19 @@ use std::marker::PhantomData; -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; use super::{NewService, Service}; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// Service for the `from_err` combinator, changing the error type of a service. /// /// This is created by the `ServiceExt::from_err` method. +#[pin_project] pub struct FromErr { + #[pin] service: A, f: PhantomData, } @@ -47,8 +53,11 @@ where type Error = E; type Future = FromErrFuture; - fn poll_ready(&mut self) -> Poll<(), E> { - self.service.poll_ready().map_err(E::from) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + self.project().service.poll_ready(ctx).map_err(E::from) } fn call(&mut self, req: A::Request) -> Self::Future { @@ -59,7 +68,9 @@ where } } +#[pin_project] pub struct FromErrFuture { + #[pin] fut: A::Future, f: PhantomData, } @@ -69,11 +80,10 @@ where A: Service, E: From, { - type Item = A::Response; - type Error = E; + type Output = Result; - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(E::from) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx).map_err(E::from) } } @@ -131,11 +141,13 @@ where } } +#[pin_project] pub struct FromErrNewServiceFuture where A: NewService, E: From, { + #[pin] fut: A::Future, e: PhantomData, } @@ -145,34 +157,48 @@ where A: NewService, E: From, { - type Item = FromErr; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if let Async::Ready(service) = self.fut.poll()? { - Ok(Async::Ready(FromErr::new(service))) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Poll::Ready(svc) = self.project().fut.poll(cx)? { + Poll::Ready(Ok(FromErr::new(svc))) } else { - Ok(Async::NotReady) + Poll::Pending } } + + /* + fn poll(&mut self) -> Poll { + if let Poll::Ready(service) = self.fut.poll()? { + Ok(Poll::Ready(FromErr::new(service))) + } else { + Ok(Poll::Pending) + } + } + */ } #[cfg(test)] mod tests { - use futures::future::{err, FutureResult}; + use futures::future::{err, Ready}; use super::*; use crate::{IntoNewService, NewService, Service, ServiceExt}; + use tokio::future::ok; struct Srv; + impl Service for Srv { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Err(()) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Err(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -189,32 +215,29 @@ mod tests { } } - #[test] - fn test_poll_ready() { + #[tokio::test] + async fn test_poll_ready() { let mut srv = Srv.from_err::(); - let res = srv.poll_ready(); + let res = srv.poll_once().await; + + assert_eq!(res, Poll::Ready(Err(Error))); + } + + #[tokio::test] + async fn test_call() { + let mut srv = Srv.from_err::(); + let res = srv.call(()).await; assert!(res.is_err()); assert_eq!(res.err().unwrap(), Error); } - #[test] - fn test_call() { - let mut srv = Srv.from_err::(); - let res = srv.call(()).poll(); - assert!(res.is_err()); - assert_eq!(res.err().unwrap(), Error); - } - - #[test] - fn test_new_service() { - let blank = || Ok::<_, ()>(Srv); + #[tokio::test] + async fn test_new_service() { + let blank = || ok::<_, ()>(Srv); let new_srv = blank.into_new_service().from_err::(); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - let res = srv.call(()).poll(); - assert!(res.is_err()); - assert_eq!(res.err().unwrap(), Error); - } else { - panic!() - } + let mut srv = new_srv.new_service(&()).await.unwrap(); + let res = srv.call(()).await; + assert!(res.is_err()); + assert_eq!(res.err().unwrap(), Error); } } diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index ee327da0..185e79d4 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -1,8 +1,13 @@ +use futures::future::{ready, LocalBoxFuture, Ready}; +use futures::{Future, Poll}; use std::cell::RefCell; +use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; +use std::task; +use std::task::Context; -use futures::{Future, IntoFuture, Poll}; +mod cell; mod and_then; mod and_then_apply; @@ -11,7 +16,6 @@ mod apply; mod apply_cfg; pub mod blank; pub mod boxed; -mod cell; mod fn_service; mod fn_transform; mod from_err; @@ -24,6 +28,9 @@ mod transform; mod transform_err; pub use self::and_then::{AndThen, AndThenNewService}; + +use self::and_then_apply::AndThenTransform; +use self::and_then_apply_fn::{AndThenApply, AndThenApplyNewService}; pub use self::apply::{apply_fn, new_apply_fn, Apply, ApplyNewService}; pub use self::apply_cfg::{apply_cfg, new_apply_cfg}; pub use self::fn_service::{new_service_cfg, new_service_fn, service_fn, ServiceFn}; @@ -36,8 +43,34 @@ pub use self::map_init_err::MapInitErr; pub use self::then::{Then, ThenNewService}; pub use self::transform::{apply_transform, IntoTransform, Transform}; -use self::and_then_apply::AndThenTransform; -use self::and_then_apply_fn::{AndThenApply, AndThenApplyNewService}; +pub trait IntoFuture { + type Item; + type Error; + type Future: Future>; + fn into_future(self) -> Self::Future; +} + +impl>, I, E> IntoFuture for F { + type Item = I; + type Error = E; + type Future = F; + + fn into_future(self) -> Self::Future { + self + } +} + +/* +impl IntoFuture for Result { + type Item = I; + type Error = E; + type Future = Ready; + + fn into_future(self) -> Self::Future { + ready(self) + } +} +*/ /// An asynchronous function from `Request` to a `Response`. pub trait Service { @@ -51,7 +84,7 @@ pub trait Service { type Error; /// The future response value. - type Future: Future; + type Future: Future>; /// Returns `Ready` when the service is able to process requests. /// @@ -62,7 +95,10 @@ pub trait Service { /// This is a **best effort** implementation. False positives are permitted. /// It is permitted for the service to return `Ready` from a `poll_ready` /// call and the next invocation of `call` results in an error. - fn poll_ready(&mut self) -> Poll<(), Self::Error>; + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut task::Context<'_>, + ) -> Poll>; /// Process the request and return the response asynchronously. /// @@ -74,6 +110,31 @@ pub trait Service { /// Calling `call` without calling `poll_ready` is permitted. The /// implementation must be resilient to this fact. fn call(&mut self, req: Self::Request) -> Self::Future; + + #[cfg(test)] + fn poll_test(&mut self) -> Poll> { + // kinda stupid method, but works for our test purposes + unsafe { + let mut this = Pin::new_unchecked(self); + tokio::runtime::current_thread::Builder::new() + .build() + .unwrap() + .block_on(futures::future::poll_fn(move |cx| { + let this = &mut this; + Poll::Ready(this.as_mut().poll_ready(cx)) + })) + } + } + + fn poll_once<'a>(&'a mut self) -> LocalBoxFuture<'a, Poll>> { + unsafe { + let mut this = Pin::new_unchecked(self); + Pin::new_unchecked(Box::new(futures::future::poll_fn(move |cx| { + let this = &mut this; + Poll::Ready(this.as_mut().poll_ready(cx)) + }))) + } + } } /// An extension trait for `Service`s that provides a variety of convenient @@ -206,7 +267,7 @@ pub trait NewService { type InitError; /// The future of the `Service` instance. - type Future: Future; + type Future: Future>; /// Create and return a new service value asynchronously. fn new_service(&self, cfg: &Self::Config) -> Self::Future; @@ -343,8 +404,11 @@ where type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self) -> Poll<(), S::Error> { - (**self).poll_ready() + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut **s).poll_ready(ctx) } } fn call(&mut self, request: Self::Request) -> S::Future { @@ -361,8 +425,14 @@ where type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self) -> Poll<(), S::Error> { - (**self).poll_ready() + fn poll_ready( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + unsafe { + let p: &mut S = Pin::as_mut(&mut self).get_mut(); + Pin::new_unchecked(p).poll_ready(ctx) + } } fn call(&mut self, request: Self::Request) -> S::Future { @@ -379,12 +449,18 @@ where type Error = S::Error; type Future = S::Future; - fn poll_ready(&mut self) -> Poll<(), S::Error> { - self.borrow_mut().poll_ready() + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + unsafe { + let r = self.get_unchecked_mut(); + Pin::new_unchecked(&mut (*(**r).borrow_mut())).poll_ready(ctx) + } } fn call(&mut self, request: Self::Request) -> S::Future { - self.borrow_mut().call(request) + (&mut (**self).borrow_mut()).call(request) } } diff --git a/actix-service/src/map.rs b/actix-service/src/map.rs index ac3654b0..a29690b0 100644 --- a/actix-service/src/map.rs +++ b/actix-service/src/map.rs @@ -1,13 +1,19 @@ use std::marker::PhantomData; -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; use super::{NewService, Service}; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// Service for the `map` combinator, changing the type of a service's response. /// /// This is created by the `ServiceExt::map` method. +#[pin_project] pub struct Map { + #[pin] service: A, f: F, _t: PhantomData, @@ -52,8 +58,11 @@ where type Error = A::Error; type Future = MapFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready() + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + self.project().service.poll_ready(ctx) } fn call(&mut self, req: A::Request) -> Self::Future { @@ -61,12 +70,14 @@ where } } +#[pin_project] pub struct MapFuture where A: Service, F: FnMut(A::Response) -> Response, { f: F, + #[pin] fut: A::Future, } @@ -85,13 +96,14 @@ where A: Service, F: FnMut(A::Response) -> Response, { - type Item = Response; - type Error = A::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - match self.fut.poll()? { - Async::Ready(resp) => Ok(Async::Ready((self.f)(resp))), - Async::NotReady => Ok(Async::NotReady), + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.fut.poll(cx) { + Poll::Ready(Ok(resp)) => Poll::Ready(Ok((this.f)(resp))), + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, } } } @@ -151,11 +163,13 @@ where } } +#[pin_project] pub struct MapNewServiceFuture where A: NewService, F: FnMut(A::Response) -> Res, { + #[pin] fut: A::Future, f: Option, } @@ -175,34 +189,38 @@ where A: NewService, F: FnMut(A::Response) -> Res, { - type Item = Map; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if let Async::Ready(service) = self.fut.poll()? { - Ok(Async::Ready(Map::new(service, self.f.take().unwrap()))) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if let Poll::Ready(svc) = this.fut.poll(cx)? { + Poll::Ready(Ok(Map::new(svc, this.f.take().unwrap()))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{ok, FutureResult}; + use futures::future::{ok, Ready}; use super::*; use crate::{IntoNewService, Service, ServiceExt}; struct Srv; + impl Service for Srv { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -210,32 +228,28 @@ mod tests { } } - #[test] - fn test_poll_ready() { + #[tokio::test] + async fn test_poll_ready() { let mut srv = Srv.map(|_| "ok"); - let res = srv.poll_ready(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(())); + let res = srv.poll_once().await; + assert_eq!(res, Poll::Ready(Ok(()))); } - #[test] - fn test_call() { + #[tokio::test] + async fn test_call() { let mut srv = Srv.map(|_| "ok"); - let res = srv.call(()).poll(); + let res = srv.call(()).await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready("ok")); + assert_eq!(res.unwrap(), "ok"); } - #[test] - fn test_new_service() { - let blank = || Ok::<_, ()>(Srv); + #[tokio::test] + async fn test_new_service() { + let blank = || ok::<_, ()>(Srv); let new_srv = blank.into_new_service().map(|_| "ok"); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - let res = srv.call(()).poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready("ok")); - } else { - panic!() - } + let mut srv = new_srv.new_service(&()).await.unwrap(); + let res = srv.call(()).await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), ("ok")); } } diff --git a/actix-service/src/map_err.rs b/actix-service/src/map_err.rs index 47ce11fc..0bbd5bf0 100644 --- a/actix-service/src/map_err.rs +++ b/actix-service/src/map_err.rs @@ -1,14 +1,20 @@ use std::marker::PhantomData; -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; use super::{NewService, Service}; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::Context; + /// Service for the `map_err` combinator, changing the type of a service's /// error. /// /// This is created by the `ServiceExt::map_err` method. +#[pin_project] pub struct MapErr { + #[pin] service: A, f: F, _t: PhantomData, @@ -53,8 +59,12 @@ where type Error = E; type Future = MapErrFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.service.poll_ready().map_err(&self.f) + fn poll_ready( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let mut this = self.project(); + this.service.poll_ready(ctx).map_err(this.f) } fn call(&mut self, req: A::Request) -> Self::Future { @@ -62,12 +72,14 @@ where } } +#[pin_project] pub struct MapErrFuture where A: Service, F: Fn(A::Error) -> E, { f: F, + #[pin] fut: A::Future, } @@ -86,11 +98,11 @@ where A: Service, F: Fn(A::Error) -> E, { - type Item = A::Response; - type Error = E; + type Output = Result; - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(&self.f) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + this.fut.poll(cx).map_err(this.f) } } @@ -156,11 +168,13 @@ where } } +#[pin_project] pub struct MapErrNewServiceFuture where A: NewService, F: Fn(A::Error) -> E, { + #[pin] fut: A::Future, f: F, } @@ -180,24 +194,25 @@ where A: NewService, F: Fn(A::Error) -> E + Clone, { - type Item = MapErr; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if let Async::Ready(service) = self.fut.poll()? { - Ok(Async::Ready(MapErr::new(service, self.f.clone()))) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if let Poll::Ready(svc) = this.fut.poll(cx)? { + Poll::Ready(Ok(MapErr::new(svc, this.f.clone()))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{err, FutureResult}; + use futures::future::{err, Ready}; use super::*; use crate::{IntoNewService, NewService, Service, ServiceExt}; + use tokio::future::ok; struct Srv; @@ -205,10 +220,13 @@ mod tests { type Request = (); type Response = (); type Error = (); - type Future = FutureResult<(), ()>; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Err(()) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Err(())) } fn call(&mut self, _: ()) -> Self::Future { @@ -216,32 +234,33 @@ mod tests { } } - #[test] - fn test_poll_ready() { + #[tokio::test] + async fn test_poll_ready() { let mut srv = Srv.map_err(|_| "error"); - let res = srv.poll_ready(); - assert!(res.is_err()); - assert_eq!(res.err().unwrap(), "error"); - } - - #[test] - fn test_call() { - let mut srv = Srv.map_err(|_| "error"); - let res = srv.call(()).poll(); - assert!(res.is_err()); - assert_eq!(res.err().unwrap(), "error"); - } - - #[test] - fn test_new_service() { - let blank = || Ok::<_, ()>(Srv); - let new_srv = blank.into_new_service().map_err(|_| "error"); - if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() { - let res = srv.call(()).poll(); + let res = srv.poll_once().await; + if let Poll::Ready(res) = res { assert!(res.is_err()); assert_eq!(res.err().unwrap(), "error"); } else { - panic!() + panic!("Should be ready"); } } + + #[tokio::test] + async fn test_call() { + let mut srv = Srv.map_err(|_| "error"); + let res = srv.call(()).await; + assert!(res.is_err()); + assert_eq!(res.err().unwrap(), "error"); + } + + #[tokio::test] + async fn test_new_service() { + let blank = || ok::<_, ()>(Srv); + let new_srv = blank.into_new_service().map_err(|_| "error"); + let mut srv = new_srv.new_service(&()).await.unwrap(); + let res = srv.call(()).await; + assert!(res.is_err()); + assert_eq!(res.err().unwrap(), "error"); + } } diff --git a/actix-service/src/map_init_err.rs b/actix-service/src/map_init_err.rs index 4866370a..c9225eb3 100644 --- a/actix-service/src/map_init_err.rs +++ b/actix-service/src/map_init_err.rs @@ -4,6 +4,10 @@ use futures::{Future, Poll}; use super::NewService; +use pin_project::pin_project; +use std::pin::Pin; +use std::task::Context; + /// `MapInitErr` service combinator pub struct MapInitErr { a: A, @@ -58,13 +62,14 @@ where MapInitErrFuture::new(self.a.new_service(cfg), self.f.clone()) } } - +#[pin_project] pub struct MapInitErrFuture where A: NewService, F: Fn(A::InitError) -> E, { f: F, + #[pin] fut: A::Future, } @@ -83,10 +88,10 @@ where A: NewService, F: Fn(A::InitError) -> E, { - type Item = A::Service; - type Error = E; + type Output = Result; - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(&self.f) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + this.fut.poll(cx).map_err(this.f) } } diff --git a/actix-service/src/then.rs b/actix-service/src/then.rs index 56fae3a1..7cfbfe88 100644 --- a/actix-service/src/then.rs +++ b/actix-service/src/then.rs @@ -1,14 +1,21 @@ -use futures::{Async, Future, Poll}; +use futures::{Future, Poll}; +use std::pin::Pin; +use std::task::Context; use super::{IntoNewService, NewService, Service}; use crate::cell::Cell; +use pin_project::pin_project; + /// Service for the `then` combinator, chaining a computation onto the end of /// another service. /// /// This is created by the `ServiceExt::then` method. +#[pin_project] pub struct Then { + #[pin] a: A, + #[pin] b: Cell, } @@ -45,12 +52,16 @@ where type Error = B::Error; type Future = ThenFuture; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - let not_ready = self.a.poll_ready()?.is_not_ready(); - if self.b.get_mut().poll_ready()?.is_not_ready() || not_ready { - Ok(Async::NotReady) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let not_ready = !this.a.poll_ready(ctx)?.is_ready(); + if !this.b.get_pin().poll_ready(ctx)?.is_ready() || not_ready { + Poll::Pending } else { - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } } @@ -59,13 +70,16 @@ where } } +#[pin_project] pub struct ThenFuture where A: Service, B: Service>, { b: Cell, + #[pin] fut_b: Option, + #[pin] fut_a: Option, } @@ -88,26 +102,33 @@ where A: Service, B: Service>, { - type Item = B::Response; - type Error = B::Error; + type Output = Result; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut_b { - return fut.poll(); - } + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); - match self.fut_a.as_mut().expect("bug in actix-service").poll() { - Ok(Async::Ready(resp)) => { - let _ = self.fut_a.take(); - self.fut_b = Some(self.b.get_mut().call(Ok(resp))); - self.poll() + loop { + let mut fut_a = this.fut_a.as_mut(); + let mut fut_b = this.fut_b.as_mut(); + + if let Some(fut) = fut_b.as_mut().as_pin_mut() { + return fut.poll(cx); } - Err(err) => { - let _ = self.fut_a.take(); - self.fut_b = Some(self.b.get_mut().call(Err(err))); - self.poll() + + match fut_a + .as_mut() + .as_pin_mut() + .expect("Bug in actix-service") + .poll(cx) + { + Poll::Ready(r) => { + fut_a.set(None); + let new_fut = this.b.get_mut().call(r); + fut_b.set(Some(new_fut)); + } + + Poll::Pending => return Poll::Pending, } - Ok(Async::NotReady) => Ok(Async::NotReady), } } } @@ -175,6 +196,7 @@ where } } +#[pin_project] pub struct ThenNewServiceFuture where A: NewService, @@ -185,7 +207,9 @@ where InitError = A::InitError, >, { + #[pin] fut_b: B::Future, + #[pin] fut_a: A::Future, a: Option, b: Option, @@ -221,53 +245,59 @@ where InitError = A::InitError, >, { - type Item = Then; - type Error = A::InitError; + type Output = Result, A::InitError>; - fn poll(&mut self) -> Poll { - if self.a.is_none() { - if let Async::Ready(service) = self.fut_a.poll()? { - self.a = Some(service); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + if this.a.is_none() { + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + *this.a = Some(service); } } - - if self.b.is_none() { - if let Async::Ready(service) = self.fut_b.poll()? { - self.b = Some(service); + if this.b.is_none() { + if let Poll::Ready(service) = this.fut_b.poll(cx)? { + *this.b = Some(service); } } - - if self.a.is_some() && self.b.is_some() { - Ok(Async::Ready(Then::new( - self.a.take().unwrap(), - self.b.take().unwrap(), + if this.a.is_some() && this.b.is_some() { + Poll::Ready(Ok(Then::new( + this.a.take().unwrap(), + this.b.take().unwrap(), ))) } else { - Ok(Async::NotReady) + Poll::Pending } } } #[cfg(test)] mod tests { - use futures::future::{err, ok, FutureResult}; - use futures::{Async, Future, Poll}; + use futures::future::{err, ok, ready, Ready}; + use futures::{Future, Poll}; use std::cell::Cell; use std::rc::Rc; use crate::{IntoNewService, NewService, Service, ServiceExt}; + use std::pin::Pin; + use std::task::Context; #[derive(Clone)] struct Srv1(Rc>); + impl Service for Srv1 { type Request = Result<&'static str, &'static str>; type Response = &'static str; type Error = (); - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.0.set(self.0.get() + 1); - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let mut this = self.get_mut(); + + this.0.set(this.0.get() + 1); + Poll::Ready(Ok(())) } fn call(&mut self, req: Result<&'static str, &'static str>) -> Self::Future { @@ -284,11 +314,15 @@ mod tests { type Request = Result<&'static str, ()>; type Response = (&'static str, &'static str); type Error = (); - type Future = FutureResult; + type Future = Ready>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.0.set(self.0.get() + 1); - Ok(Async::Ready(())) + fn poll_ready( + self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll> { + let mut this = self.get_mut(); + this.0.set(this.0.get() + 1); + Poll::Ready(Err(())) } fn call(&mut self, req: Result<&'static str, ()>) -> Self::Future { @@ -299,46 +333,44 @@ mod tests { } } - #[test] - fn test_poll_ready() { + #[tokio::test] + async fn test_poll_ready() { let cnt = Rc::new(Cell::new(0)); let mut srv = Srv1(cnt.clone()).then(Srv2(cnt.clone())); - let res = srv.poll_ready(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(())); + let res = srv.poll_once().await; + assert_eq!(res, Poll::Ready(Err(()))); assert_eq!(cnt.get(), 2); } - #[test] - fn test_call() { + #[tokio::test] + async fn test_call() { let cnt = Rc::new(Cell::new(0)); let mut srv = Srv1(cnt.clone()).then(Srv2(cnt)).clone(); - let res = srv.call(Ok("srv1")).poll(); + let res = srv.call(Ok("srv1")).await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok"))); + assert_eq!(res.unwrap(), (("srv1", "ok"))); - let res = srv.call(Err("srv")).poll(); + let res = srv.call(Err("srv")).await; assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv2", "err"))); + assert_eq!(res.unwrap(), (("srv2", "err"))); } - #[test] - fn test_new_service() { + #[tokio::test] + async fn test_new_service() { let cnt = Rc::new(Cell::new(0)); let cnt2 = cnt.clone(); - let blank = move || Ok::<_, ()>(Srv1(cnt2.clone())); - let new_srv = blank.into_new_service().then(move || Ok(Srv2(cnt.clone()))); - if let Async::Ready(mut srv) = new_srv.clone().new_service(&()).poll().unwrap() { - let res = srv.call(Ok("srv1")).poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok"))); + let blank = move || ready(Ok::<_, ()>(Srv1(cnt2.clone()))); + let new_srv = blank + .into_new_service() + .then(move || ready(Ok(Srv2(cnt.clone())))); + let mut srv = new_srv.clone().new_service(&()).await.unwrap(); + let res = srv.call(Ok("srv1")).await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), (("srv1", "ok"))); - let res = srv.call(Err("srv")).poll(); - assert!(res.is_ok()); - assert_eq!(res.unwrap(), Async::Ready(("srv2", "err"))); - } else { - panic!() - } + let res = srv.call(Err("srv")).await; + assert!(res.is_ok()); + assert_eq!(res.unwrap(), (("srv2", "err"))); } } diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index 0e93bf72..e4682b63 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -1,10 +1,13 @@ use std::rc::Rc; use std::sync::Arc; -use futures::{Async, Future, IntoFuture, Poll}; - use crate::transform_err::{TransformFromErr, TransformMapInitErr}; use crate::{IntoNewService, NewService, Service}; +use futures::{Future, Poll}; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// The `Transform` trait defines the interface of a Service factory. `Transform` /// is often implemented for middleware, defining how to construct a @@ -32,7 +35,7 @@ pub trait Transform { type InitError; /// The future response value. - type Future: Future; + type Future: Future>; /// Creates and returns a new Service component, asynchronously fn new_transform(&self, service: S) -> Self::Future; @@ -193,19 +196,21 @@ where fn new_service(&self, cfg: &S::Config) -> Self::Future { ApplyTransformFuture { t_cell: self.t.clone(), - fut_a: self.s.new_service(cfg).into_future(), + fut_a: self.s.new_service(cfg), fut_t: None, } } } - +#[pin_project] pub struct ApplyTransformFuture where S: NewService, T: Transform, { + #[pin] fut_a: S::Future, - fut_t: Option<::Future>, + #[pin] + fut_t: Option, t_cell: Rc, } @@ -214,19 +219,21 @@ where S: NewService, T: Transform, { - type Item = T::Transform; - type Error = T::InitError; + type Output = Result; - fn poll(&mut self) -> Poll { - if self.fut_t.is_none() { - if let Async::Ready(service) = self.fut_a.poll()? { - self.fut_t = Some(self.t_cell.new_transform(service).into_future()); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + if this.fut_t.as_mut().as_pin_mut().is_none() { + if let Poll::Ready(service) = this.fut_a.poll(cx)? { + this.fut_t.set(Some(this.t_cell.new_transform(service))); } } - if let Some(ref mut fut) = self.fut_t { - fut.poll() + + if let Some(fut) = this.fut_t.as_mut().as_pin_mut() { + fut.poll(cx) } else { - Ok(Async::NotReady) + Poll::Pending } } } diff --git a/actix-service/src/transform_err.rs b/actix-service/src/transform_err.rs index dcc3c245..a6940707 100644 --- a/actix-service/src/transform_err.rs +++ b/actix-service/src/transform_err.rs @@ -3,6 +3,10 @@ use std::marker::PhantomData; use futures::{Future, Poll}; use super::Transform; +use std::pin::Pin; +use std::task::Context; + +use pin_project::pin_project; /// Transform for the `map_err` combinator, changing the type of a new /// transform's init error. @@ -63,12 +67,13 @@ where } } } - +#[pin_project] pub struct TransformMapInitErrFuture where T: Transform, F: Fn(T::InitError) -> E, { + #[pin] fut: T::Future, f: F, } @@ -78,11 +83,11 @@ where T: Transform, F: Fn(T::InitError) -> E + Clone, { - type Item = T::Transform; - type Error = E; + type Output = Result; - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(&self.f) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + this.fut.poll(cx).map_err(this.f) } } @@ -139,11 +144,13 @@ where } } +#[pin_project] pub struct TransformFromErrFuture where T: Transform, E: From, { + #[pin] fut: T::Future, _t: PhantomData, } @@ -153,10 +160,9 @@ where T: Transform, E: From, { - type Item = T::Transform; - type Error = E; + type Output = Result; - fn poll(&mut self) -> Poll { - self.fut.poll().map_err(E::from) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().fut.poll(cx).map_err(E::from) } }