Migrate actix-service to std::future,

This is a squash of ~8 commits, since it included a lot of experimentation. To see the commits,
look into the semtexzv/std-future-service-tmp branch.
This commit is contained in:
Michal Hornický 2019-10-01 09:54:02 +02:00
parent aa9bbe2114
commit b957640618
19 changed files with 813 additions and 490 deletions

View File

@ -23,7 +23,9 @@ name = "actix_service"
path = "src/lib.rs"
[dependencies]
futures = "0.1.25"
futures = { package = "futures-preview", version = "0.3.0-alpha.18" }
pin-project = "0.4.0-alpha.11"
[dev-dependencies]
tokio = "0.2.0-alpha.4"
actix-rt = "0.2"

View File

@ -1,14 +1,21 @@
use futures::{Async, Future, Poll};
use futures::{Future, Poll};
use super::{IntoNewService, NewService, Service};
use crate::cell::Cell;
use pin_project::pin_project;
use std::pin::Pin;
use std::task::Context;
/// Service for the `and_then` combinator, chaining a computation onto the end
/// of another service which completes successfully.
///
/// This is created by the `ServiceExt::and_then` method.
#[pin_project]
pub struct AndThen<A, B> {
#[pin]
a: A,
#[pin]
b: Cell<B>,
}
@ -45,12 +52,16 @@ where
type Error = A::Error;
type Future = AndThenFuture<A, B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
let not_ready = self.a.poll_ready()?.is_not_ready();
if self.b.get_mut().poll_ready()?.is_not_ready() || not_ready {
Ok(Async::NotReady)
fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let not_ready = !this.a.poll_ready(cx)?.is_ready();
if !this.b.get_pin().poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
}
}
@ -59,13 +70,16 @@ where
}
}
#[pin_project]
pub struct AndThenFuture<A, B>
where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
b: Cell<B>,
#[pin]
fut_b: Option<B::Future>,
#[pin]
fut_a: Option<A::Future>,
}
@ -88,22 +102,33 @@ where
A: Service,
B: Service<Request = A::Response, Error = A::Error>,
{
type Item = B::Response;
type Error = A::Error;
type Output = Result<B::Response, A::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut_b {
return fut.poll();
}
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
match self.fut_a.as_mut().expect("Bug in actix-service").poll() {
Ok(Async::Ready(resp)) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.get_mut().call(resp));
self.poll()
loop {
let mut fut_a = this.fut_a.as_mut();
let mut fut_b = this.fut_b.as_mut();
if let Some(fut) = fut_b.as_mut().as_pin_mut() {
return fut.poll(cx);
}
match fut_a
.as_mut()
.as_pin_mut()
.expect("Bug in actix-service")
.poll(cx)
{
Poll::Ready(Ok(resp)) => {
fut_a.set(None);
let new_fut = this.b.get_mut().call(resp);
fut_b.set(Some(new_fut));
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err),
}
}
}
@ -174,13 +199,17 @@ where
}
}
#[pin_project]
pub struct AndThenNewServiceFuture<A, B>
where
A: NewService,
B: NewService<Request = A::Response>,
{
#[pin]
fut_b: B::Future,
#[pin]
fut_a: A::Future,
a: Option<A::Service>,
b: Option<B::Service>,
}
@ -205,37 +234,35 @@ where
A: NewService,
B: NewService<Request = A::Response, Error = A::Error, InitError = A::InitError>,
{
type Item = AndThen<A::Service, B::Service>;
type Error = A::InitError;
type Output = Result<AndThen<A::Service, B::Service>, A::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.a.is_none() {
if let Async::Ready(service) = self.fut_a.poll()? {
self.a = Some(service);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.a.is_none() {
if let Poll::Ready(service) = this.fut_a.poll(cx)? {
*this.a = Some(service);
}
}
if self.b.is_none() {
if let Async::Ready(service) = self.fut_b.poll()? {
self.b = Some(service);
if this.b.is_none() {
if let Poll::Ready(service) = this.fut_b.poll(cx)? {
*this.b = Some(service);
}
}
if self.a.is_some() && self.b.is_some() {
Ok(Async::Ready(AndThen::new(
self.a.take().unwrap(),
self.b.take().unwrap(),
if this.a.is_some() && this.b.is_some() {
Poll::Ready(Ok(AndThen::new(
this.a.take().unwrap(),
this.b.take().unwrap(),
)))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{ok, FutureResult};
use futures::{Async, Poll};
use futures::future::{ok, poll_fn, ready, Ready};
use futures::Poll;
use std::cell::Cell;
use std::rc::Rc;
@ -243,15 +270,19 @@ mod tests {
use crate::{NewService, Service, ServiceExt};
struct Srv1(Rc<Cell<usize>>);
impl Service for Srv1 {
type Request = &'static str;
type Response = &'static str;
type Error = ();
type Future = FutureResult<Self::Response, ()>;
type Future = Ready<Result<Self::Response, ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
@ -266,11 +297,14 @@ mod tests {
type Request = &'static str;
type Response = (&'static str, &'static str);
type Error = ();
type Future = FutureResult<Self::Response, ()>;
type Future = Ready<Result<Self::Response, ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.0.set(self.0.get() + 1);
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
}
fn call(&mut self, req: &'static str) -> Self::Future {
@ -278,39 +312,35 @@ mod tests {
}
}
#[test]
fn test_poll_ready() {
#[tokio::test]
async fn test_poll_ready() {
let cnt = Rc::new(Cell::new(0));
let mut srv = Srv1(cnt.clone()).and_then(Srv2(cnt.clone()));
let res = srv.poll_ready();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(()));
let res = srv.poll_once().await;
assert_eq!(res, Poll::Ready(Ok(())));
assert_eq!(cnt.get(), 2);
}
#[test]
fn test_call() {
#[tokio::test]
async fn test_call() {
let cnt = Rc::new(Cell::new(0));
let mut srv = Srv1(cnt.clone()).and_then(Srv2(cnt));
let res = srv.call("srv1").poll();
let res = srv.call("srv1").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv1", "srv2")));
assert_eq!(res.unwrap(), (("srv1", "srv2")));
}
#[test]
fn test_new_service() {
#[tokio::test]
async fn test_new_service() {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let blank = move || Ok::<_, ()>(Srv1(cnt2.clone()));
let blank = move || ready(Ok::<_, ()>(Srv1(cnt2.clone())));
let new_srv = blank
.into_new_service()
.and_then(move || Ok(Srv2(cnt.clone())));
if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() {
let res = srv.call("srv1").poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv1", "srv2")));
} else {
panic!()
}
.and_then(move || ready(Ok(Srv2(cnt.clone()))));
let mut srv = new_srv.new_service(&()).await.unwrap();
let res = srv.call("srv1").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("srv1", "srv2"));
}
}

View File

@ -1,11 +1,15 @@
use std::rc::Rc;
use futures::{Async, Future, Poll};
use futures::{Future, Poll};
use crate::and_then::AndThen;
use crate::from_err::FromErr;
use crate::{NewService, Transform};
use pin_project::pin_project;
use std::pin::Pin;
use std::task::Context;
/// `Apply` new service combinator
pub struct AndThenTransform<T, A, B> {
a: A,
@ -72,6 +76,7 @@ where
}
}
#[pin_project]
pub struct AndThenTransformFuture<T, A, B>
where
A: NewService,
@ -79,8 +84,11 @@ where
T: Transform<B::Service, Request = A::Response, InitError = A::InitError>,
T::Error: From<A::Error>,
{
#[pin]
fut_a: A::Future,
#[pin]
fut_b: B::Future,
#[pin]
fut_t: Option<T::Future>,
a: Option<A::Service>,
t: Option<T::Transform>,
@ -94,56 +102,63 @@ where
T: Transform<B::Service, Request = A::Response, InitError = A::InitError>,
T::Error: From<A::Error>,
{
type Item = AndThen<FromErr<A::Service, T::Error>, T::Transform>;
type Error = T::InitError;
type Output = Result<AndThen<FromErr<A::Service, T::Error>, T::Transform>, T::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.fut_t.is_none() {
if let Async::Ready(service) = self.fut_b.poll()? {
self.fut_t = Some(self.t_cell.new_transform(service));
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
if this.fut_t.is_none() {
if let Poll::Ready(svc) = this.fut_b.poll(cx)? {
this.fut_t.set(Some(this.t_cell.new_transform(svc)))
}
}
if self.a.is_none() {
if let Async::Ready(service) = self.fut_a.poll()? {
self.a = Some(service);
if this.a.is_none() {
if let Poll::Ready(svc) = this.fut_a.poll(cx)? {
*this.a = Some(svc)
}
}
if let Some(ref mut fut) = self.fut_t {
if let Async::Ready(transform) = fut.poll()? {
self.t = Some(transform);
if let Some(fut) = this.fut_t.as_pin_mut() {
if let Poll::Ready(transform) = fut.poll(cx)? {
*this.t = Some(transform)
}
}
if self.a.is_some() && self.t.is_some() {
Ok(Async::Ready(AndThen::new(
FromErr::new(self.a.take().unwrap()),
self.t.take().unwrap(),
if this.a.is_some() && this.t.is_some() {
Poll::Ready(Ok(AndThen::new(
FromErr::new(this.a.take().unwrap()),
this.t.take().unwrap(),
)))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use futures::future::{ok, ready, Ready};
use futures::{Future, FutureExt, Poll, TryFutureExt};
use crate::{IntoNewService, IntoService, NewService, Service, ServiceExt};
use std::pin::Pin;
use std::task::Context;
#[derive(Clone)]
struct Srv;
impl Service for Srv {
type Request = ();
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Err(()))
}
fn call(&mut self, _: ()) -> Self::Future {
@ -151,36 +166,32 @@ mod tests {
}
}
#[test]
fn test_apply() {
let blank = |req| Ok(req);
#[tokio::test]
async fn test_apply() {
let blank = |req| ready(Ok(req));
let mut srv = blank
.into_service()
.apply_fn(Srv, |req: &'static str, srv: &mut Srv| {
srv.call(()).map(move |res| (req, res))
srv.call(()).map_ok(move |res| (req, res))
});
assert!(srv.poll_ready().is_ok());
let res = srv.call("srv").poll();
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv", ())));
assert_eq!(res.unwrap(), (("srv", ())));
}
#[test]
fn test_new_service() {
let blank = || Ok::<_, ()>((|req| Ok(req)).into_service());
#[tokio::test]
async fn test_new_service() {
let blank = move || ok::<_, ()>((|req| ok(req)).into_service());
let new_srv = blank.into_new_service().apply(
|req: &'static str, srv: &mut Srv| srv.call(()).map(move |res| (req, res)),
|| Ok(Srv),
|req: &'static str, srv: &mut Srv| srv.call(()).map_ok(move |res| (req, res)),
|| ok(Srv),
);
if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() {
assert!(srv.poll_ready().is_ok());
let res = srv.call("srv").poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv", ())));
} else {
panic!()
}
let mut srv = new_srv.new_service(&()).await.unwrap();
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), (("srv", ())));
}
}

View File

@ -1,11 +1,16 @@
use futures::{Future, Poll};
use std::marker::PhantomData;
use futures::{Async, Future, IntoFuture, Poll};
use std::pin::Pin;
use std::task::Context;
use super::{IntoNewService, IntoService, NewService, Service};
use crate::cell::Cell;
use crate::IntoFuture;
use pin_project::pin_project;
/// `Apply` service combinator
#[pin_project]
pub struct AndThenApply<A, B, F, Out>
where
A: Service,
@ -14,8 +19,11 @@ where
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
#[pin]
a: A,
#[pin]
b: Cell<B>,
#[pin]
f: Cell<F>,
r: PhantomData<(Out,)>,
}
@ -70,12 +78,16 @@ where
type Error = A::Error;
type Future = AndThenApplyFuture<A, B, F, Out>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
let not_ready = self.a.poll_ready()?.is_not_ready();
if self.b.get_mut().poll_ready()?.is_not_ready() || not_ready {
Ok(Async::NotReady)
fn poll_ready(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let not_ready = !this.a.poll_ready(ctx)?.is_ready();
if !this.b.get_pin().poll_ready(ctx).is_ready() || not_ready {
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
}
}
@ -89,6 +101,7 @@ where
}
}
#[pin_project]
pub struct AndThenApplyFuture<A, B, F, Out>
where
A: Service,
@ -99,7 +112,9 @@ where
{
b: Cell<B>,
f: Cell<F>,
#[pin]
fut_a: Option<A::Future>,
#[pin]
fut_b: Option<Out::Future>,
}
@ -111,23 +126,30 @@ where
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
type Item = Out::Item;
type Error = A::Error;
type Output = Result<Out::Item, A::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut_b {
return fut.poll().map_err(|e| e.into());
}
match self.fut_a.as_mut().expect("Bug in actix-service").poll() {
Ok(Async::Ready(resp)) => {
let _ = self.fut_a.take();
self.fut_b =
Some((&mut *self.f.get_mut())(resp, self.b.get_mut()).into_future());
self.poll()
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
if let Some(fut) = this.fut_b.as_mut().as_pin_mut() {
return fut.poll(cx).map_err(|e| e.into());
}
match this
.fut_a
.as_mut()
.as_pin_mut()
.expect("Bug in actix-service")
.poll(cx)?
{
Poll::Ready(resp) => {
this.fut_a.set(None);
this.fut_b.set(Some(
(&mut *this.f.get_mut())(resp, this.b.get_mut()).into_future(),
));
}
Poll::Pending => return Poll::Pending,
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => Err(err),
}
}
}
@ -195,12 +217,13 @@ where
a: None,
b: None,
f: self.f.clone(),
fut_a: self.a.new_service(cfg).into_future(),
fut_b: self.b.new_service(cfg).into_future(),
fut_a: self.a.new_service(cfg),
fut_b: self.b.new_service(cfg),
}
}
}
#[pin_project]
pub struct AndThenApplyNewServiceFuture<A, B, F, Out>
where
A: NewService,
@ -209,7 +232,9 @@ where
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
#[pin]
fut_b: B::Future,
#[pin]
fut_a: A::Future,
f: Cell<F>,
a: Option<A::Service>,
@ -224,53 +249,60 @@ where
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
type Item = AndThenApply<A::Service, B::Service, F, Out>;
type Error = A::InitError;
type Output = Result<AndThenApply<A::Service, B::Service, F, Out>, A::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.a.is_none() {
if let Async::Ready(service) = self.fut_a.poll()? {
self.a = Some(service);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.a.is_none() {
if let Poll::Ready(service) = this.fut_a.poll(cx)? {
*this.a = Some(service);
}
}
if self.b.is_none() {
if let Async::Ready(service) = self.fut_b.poll()? {
self.b = Some(service);
if this.b.is_none() {
if let Poll::Ready(service) = this.fut_b.poll(cx)? {
*this.b = Some(service);
}
}
if self.a.is_some() && self.b.is_some() {
Ok(Async::Ready(AndThenApply {
f: self.f.clone(),
a: self.a.take().unwrap(),
b: Cell::new(self.b.take().unwrap()),
if this.a.is_some() && this.b.is_some() {
Poll::Ready(Ok(AndThenApply {
f: this.f.clone(),
a: this.a.take().unwrap(),
b: Cell::new(this.b.take().unwrap()),
r: PhantomData,
}))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use futures::future::{ok, Ready};
use futures::{Future, Poll, TryFutureExt};
use crate::blank::{Blank, BlankNewService};
use crate::{NewService, Service, ServiceExt};
use std::pin::Pin;
use std::task::Context;
#[derive(Clone)]
struct Srv;
impl Service for Srv {
type Request = ();
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
@ -278,30 +310,27 @@ mod tests {
}
}
#[test]
fn test_call() {
#[tokio::test]
async fn test_call() {
let mut srv = Blank::new().apply_fn(Srv, |req: &'static str, srv| {
srv.call(()).map(move |res| (req, res))
srv.call(()).map_ok(move |res| (req, res))
});
assert!(srv.poll_ready().is_ok());
let res = srv.call("srv").poll();
assert_eq!(srv.poll_once().await, Poll::Ready(Ok(())));
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv", ())));
assert_eq!(res.unwrap(), (("srv", ())));
}
#[test]
fn test_new_service() {
#[tokio::test]
async fn test_new_service() {
let new_srv = BlankNewService::new_unit().apply_fn(
|| Ok(Srv),
|req: &'static str, srv| srv.call(()).map(move |res| (req, res)),
|| ok(Srv),
|req: &'static str, srv| srv.call(()).map_ok(move |res| (req, res)),
);
if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() {
assert!(srv.poll_ready().is_ok());
let res = srv.call("srv").poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv", ())));
} else {
panic!()
}
let mut srv = new_srv.new_service(&()).await.unwrap();
assert_eq!(srv.poll_once().await, Poll::Ready(Ok(())));
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), (("srv", ())));
}
}

View File

@ -1,9 +1,14 @@
use std::marker::PhantomData;
use futures::{Async, Future, IntoFuture, Poll};
use futures::{ready, Future, Poll};
use super::{IntoNewService, IntoService, NewService, Service};
use crate::IntoFuture;
use pin_project::pin_project;
use std::pin::Pin;
use std::task::Context;
/// Apply tranform function to a service
pub fn apply_fn<T, F, In, Out, U>(service: U, f: F) -> Apply<T, F, In, Out>
where
@ -30,10 +35,12 @@ where
#[doc(hidden)]
/// `Apply` service combinator
#[pin_project]
pub struct Apply<T, F, In, Out>
where
T: Service,
{
#[pin]
service: T,
f: F,
r: PhantomData<(In, Out)>,
@ -82,8 +89,11 @@ where
type Error = Out::Error;
type Future = Out::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(|e| e.into())
fn poll_ready(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(ready!(self.project().service.poll_ready(ctx)).map_err(|e| e.into()))
}
fn call(&mut self, req: In) -> Self::Future {
@ -154,12 +164,14 @@ where
}
}
#[pin_project]
pub struct ApplyNewServiceFuture<T, F, In, Out>
where
T: NewService,
F: FnMut(In, &mut T::Service) -> Out + Clone,
Out: IntoFuture,
{
#[pin]
fut: T::Future,
f: Option<F>,
r: PhantomData<(In, Out)>,
@ -187,36 +199,40 @@ where
Out: IntoFuture,
Out::Error: From<T::Error>,
{
type Item = Apply<T::Service, F, In, Out>;
type Error = T::InitError;
type Output = Result<Apply<T::Service, F, In, Out>, T::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(service) = self.fut.poll()? {
Ok(Async::Ready(Apply::new(service, self.f.take().unwrap())))
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(svc) = this.fut.poll(cx)? {
Poll::Ready(Ok(Apply::new(svc, this.f.take().unwrap())))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{ok, FutureResult};
use futures::{Async, Future, Poll};
use futures::future::{ok, Ready};
use futures::{Future, Poll, TryFutureExt};
use super::*;
use crate::{IntoService, NewService, Service, ServiceExt};
#[derive(Clone)]
struct Srv;
impl Service for Srv {
type Request = ();
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
@ -224,34 +240,33 @@ mod tests {
}
}
#[test]
fn test_call() {
let blank = |req| Ok(req);
#[tokio::test]
async fn test_call() {
let blank = |req| ok(req);
let mut srv = blank
.into_service()
.apply_fn(Srv, |req: &'static str, srv| {
srv.call(()).map(move |res| (req, res))
srv.call(()).map_ok(move |res| (req, res))
});
assert!(srv.poll_ready().is_ok());
let res = srv.call("srv").poll();
assert_eq!(srv.poll_once().await, Poll::Ready(Ok(())));
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv", ())));
assert_eq!(res.unwrap(), (("srv", ())));
}
#[test]
fn test_new_service() {
#[tokio::test]
async fn test_new_service() {
let new_srv = ApplyNewService::new(
|| Ok::<_, ()>(Srv),
|req: &'static str, srv| srv.call(()).map(move |res| (req, res)),
|| ok::<_, ()>(Srv),
|req: &'static str, srv| srv.call(()).map_ok(move |res| (req, res)),
);
if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() {
assert!(srv.poll_ready().is_ok());
let res = srv.call("srv").poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv", ())));
} else {
panic!()
}
let mut srv = new_srv.new_service(&()).await.unwrap();
assert_eq!(srv.poll_once().await, Poll::Ready(Ok(())));
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), (("srv", ())));
}
}

View File

@ -1,10 +1,14 @@
use std::marker::PhantomData;
use futures::future::Future;
use futures::{try_ready, Async, IntoFuture, Poll};
use futures::{ready, Poll};
use crate::cell::Cell;
use crate::{IntoService, NewService, Service};
use crate::{IntoFuture, IntoService, NewService, Service};
use std::pin::Pin;
use std::task::Context;
use pin_project::pin_project;
/// Convert `Fn(&Config, &mut Service) -> Future<Service>` fn to a NewService
pub fn apply_cfg<F, C, T, R, S>(
@ -61,7 +65,8 @@ where
}
}
/// Convert `Fn(&Config) -> Future<Service>` fn to NewService
/// Convert `Fn(&Config) -> Future<Service>` fn to NewService\
#[pin_project]
struct ApplyConfigService<F, C, T, R, S>
where
F: FnMut(&C, &mut T) -> R,
@ -71,6 +76,7 @@ where
S: Service,
{
f: Cell<F>,
#[pin]
srv: Cell<T>,
_t: PhantomData<(C, R, S)>,
}
@ -118,12 +124,14 @@ where
}
}
#[pin_project]
struct FnNewServiceConfigFut<R, S>
where
R: IntoFuture,
R::Item: IntoService<S>,
S: Service,
{
#[pin]
fut: R::Future,
_t: PhantomData<(S,)>,
}
@ -134,11 +142,10 @@ where
R::Item: IntoService<S>,
S: Service,
{
type Item = S;
type Error = R::Error;
type Output = Result<S, R::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(try_ready!(self.fut.poll()).into_service()))
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(ready!(self.project().fut.poll(cx))?.into_service()))
}
}
@ -206,6 +213,7 @@ where
}
}
#[pin_project]
struct ApplyConfigNewServiceFut<F, C, T, R, S>
where
C: Clone,
@ -218,8 +226,11 @@ where
{
cfg: C,
f: Cell<F>,
#[pin]
srv: Option<T::Service>,
#[pin]
srv_fut: Option<T::Future>,
#[pin]
fut: Option<R::Future>,
_t: PhantomData<(S,)>,
}
@ -234,33 +245,38 @@ where
R::Item: IntoService<S>,
S: Service,
{
type Item = S;
type Error = R::Error;
type Output = Result<S, R::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.srv_fut {
match fut.poll()? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(srv) => {
let _ = self.srv_fut.take();
self.srv = Some(srv);
return self.poll();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
'poll: loop {
if let Some(fut) = this.srv_fut.as_mut().as_pin_mut() {
match fut.poll(cx)? {
Poll::Pending => return Poll::Pending,
Poll::Ready(srv) => {
this.srv_fut.set(None);
this.srv.set(Some(srv));
continue 'poll;
}
}
}
}
if let Some(ref mut fut) = self.fut {
Ok(Async::Ready(try_ready!(fut.poll()).into_service()))
} else if let Some(ref mut srv) = self.srv {
match srv.poll_ready()? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(_) => {
self.fut = Some(self.f.get_mut()(&self.cfg, srv).into_future());
return self.poll();
if let Some(fut) = this.fut.as_mut().as_pin_mut() {
return Poll::Ready(Ok(ready!(fut.poll(cx))?.into_service()));
} else if let Some(mut srv) = this.srv.as_mut().as_pin_mut() {
match srv.as_mut().poll_ready(cx)? {
Poll::Ready(_) => {
this.fut.set(Some(
this.f.get_mut()(&this.cfg, unsafe { Pin::get_unchecked_mut(srv) })
.into_future(),
));
continue 'poll;
}
Poll::Pending => return Poll::Pending,
}
} else {
return Poll::Pending;
}
} else {
Ok(Async::NotReady)
}
}
}

View File

@ -1,9 +1,11 @@
use std::marker::PhantomData;
use futures::future::{ok, FutureResult};
use futures::{Async, Poll};
use futures::future::{ok, Ready};
use futures::Poll;
use super::{NewService, Service};
use std::pin::Pin;
use std::task::Context;
/// Empty service
#[derive(Clone)]
@ -34,10 +36,13 @@ impl<R, E> Service for Blank<R, E> {
type Request = R;
type Response = R;
type Error = E;
type Future = FutureResult<R, E>;
type Future = Ready<Result<R, E>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(
self: Pin<&mut Self>,
_ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: R) -> Self::Future {
@ -76,7 +81,7 @@ impl<R, E1, E2> NewService for BlankNewService<R, E1, E2> {
type Config = ();
type Service = Blank<R, E1>;
type InitError = E2;
type Future = FutureResult<Self::Service, Self::InitError>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &()) -> Self::Future {
ok(Blank::default())

View File

@ -1,7 +1,11 @@
use futures::future::{err, ok, Either, FutureResult};
use futures::{Async, Future, IntoFuture, Poll};
use std::pin::Pin;
use crate::{NewService, Service};
use crate::{IntoFuture, NewService, Service, ServiceExt};
use futures::future::FutureExt;
use futures::future::LocalBoxFuture;
use futures::future::{err, ok, Either, Ready};
use futures::{Future, Poll};
use std::task::Context;
pub type BoxedService<Req, Res, Err> = Box<
dyn Service<
@ -13,7 +17,7 @@ pub type BoxedService<Req, Res, Err> = Box<
>;
pub type BoxedServiceResponse<Res, Err> =
Either<FutureResult<Res, Err>, Box<dyn Future<Item = Res, Error = Err>>>;
Either<Ready<Result<Res, Err>>, LocalBoxFuture<'static, Result<Res, Err>>>;
pub struct BoxedNewService<C, Req, Res, Err, InitErr>(Inner<C, Req, Res, Err, InitErr>);
@ -53,7 +57,7 @@ type Inner<C, Req, Res, Err, InitErr> = Box<
Error = Err,
InitError = InitErr,
Service = BoxedService<Req, Res, Err>,
Future = Box<dyn Future<Item = BoxedService<Req, Res, Err>, Error = InitErr>>,
Future = LocalBoxFuture<'static, Result<BoxedService<Req, Res, Err>, InitErr>>,
>,
>;
@ -70,7 +74,8 @@ where
type InitError = InitErr;
type Config = C;
type Service = BoxedService<Req, Res, Err>;
type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
type Future = LocalBoxFuture<'static, Result<Self::Service, InitErr>>;
fn new_service(&self, cfg: &C) -> Self::Future {
self.0.new_service(cfg)
@ -99,15 +104,18 @@ where
type InitError = InitErr;
type Config = C;
type Service = BoxedService<Req, Res, Err>;
type Future = Box<dyn Future<Item = Self::Service, Error = Self::InitError>>;
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
fn new_service(&self, cfg: &C) -> Self::Future {
Box::new(
/* TODO: Figure out what the hell is hapenning here
Box::new(
self.service
.new_service(cfg)
.into_future()
.map(ServiceWrapper::boxed),
)
*/
unimplemented!()
}
}
@ -132,10 +140,22 @@ where
type Response = Res;
type Error = Err;
type Future = Either<
FutureResult<Self::Response, Self::Error>,
Box<dyn Future<Item = Self::Response, Error = Self::Error>>,
Ready<Result<Self::Response, Self::Error>>,
LocalBoxFuture<'static, Result<Res, Err>>,
>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
unimplemented!()
}
fn call(&mut self, req: Self::Request) -> Self::Future {
unimplemented!()
}
/*
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready()
}
@ -148,4 +168,5 @@ where
Ok(Async::NotReady) => Either::B(Box::new(fut)),
}
}
*/
}

View File

@ -1,4 +1,5 @@
//! Custom cell impl
use std::pin::Pin;
use std::{cell::UnsafeCell, fmt, rc::Rc};
pub(crate) struct Cell<T> {
@ -33,6 +34,9 @@ impl<T> Cell<T> {
pub(crate) fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.as_ref().get() }
}
pub(crate) fn get_pin(self: Pin<&mut Self>) -> Pin<&mut T> {
unsafe { Pin::new_unchecked(&mut *Pin::get_unchecked_mut(self).inner.as_ref().get()) }
}
#[allow(clippy::mut_from_ref)]
pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T {

View File

@ -1,9 +1,14 @@
use std::marker::PhantomData;
use futures::future::{ok, Future, FutureResult};
use futures::{try_ready, Async, IntoFuture, Poll};
use crate::IntoFuture;
use futures::future::{ok, Future, Ready};
use futures::{ready, Poll};
use crate::{IntoNewService, IntoService, NewService, Service};
use std::pin::Pin;
use std::task::Context;
use pin_project::pin_project;
/// Create `NewService` for function that can act as a Service
pub fn service_fn<F, Req, Out, Cfg>(f: F) -> NewServiceFn<F, Req, Out, Cfg>
@ -75,8 +80,11 @@ where
type Error = Out::Error;
type Future = Out::Future;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(
self: Pin<&mut Self>,
_ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Req) -> Self::Future {
@ -135,7 +143,7 @@ where
type Config = Cfg;
type Service = ServiceFn<F, Req, Out>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &Cfg) -> Self::Future {
ok(ServiceFn::new(self.f.clone()))
@ -210,12 +218,14 @@ where
}
}
#[pin_project]
pub struct FnNewServiceConfigFut<R, S, E>
where
R: IntoFuture<Error = E>,
R::Item: IntoService<S>,
S: Service,
{
#[pin]
fut: R::Future,
_t: PhantomData<(S,)>,
}
@ -226,11 +236,10 @@ where
R::Item: IntoService<S>,
S: Service,
{
type Item = S;
type Error = R::Error;
type Output = Result<S, R::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(try_ready!(self.fut.poll()).into_service()))
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(ready!(self.project().fut.poll(cx))?.into_service()))
}
}

View File

@ -1,10 +1,9 @@
use std::marker::PhantomData;
use futures::future::{ok, FutureResult};
use futures::IntoFuture;
use futures::future::{ok, Ready};
use crate::apply::Apply;
use crate::{IntoTransform, Service, Transform};
use crate::{IntoFuture, IntoTransform, Service, Transform};
/// Use function as transform service
pub fn transform_fn<F, S, In, Out, Err>(
@ -50,7 +49,7 @@ where
type Error = Out::Error;
type Transform = Apply<S, F, In, Out>;
type InitError = Err;
type Future = FutureResult<Self::Transform, Self::InitError>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(Apply::new(service, self.f.clone()))

View File

@ -1,13 +1,19 @@
use std::marker::PhantomData;
use futures::{Async, Future, Poll};
use futures::{Future, Poll};
use super::{NewService, Service};
use std::pin::Pin;
use std::task::Context;
use pin_project::pin_project;
/// Service for the `from_err` combinator, changing the error type of a service.
///
/// This is created by the `ServiceExt::from_err` method.
#[pin_project]
pub struct FromErr<A, E> {
#[pin]
service: A,
f: PhantomData<E>,
}
@ -47,8 +53,11 @@ where
type Error = E;
type Future = FromErrFuture<A, E>;
fn poll_ready(&mut self) -> Poll<(), E> {
self.service.poll_ready().map_err(E::from)
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().service.poll_ready(ctx).map_err(E::from)
}
fn call(&mut self, req: A::Request) -> Self::Future {
@ -59,7 +68,9 @@ where
}
}
#[pin_project]
pub struct FromErrFuture<A: Service, E> {
#[pin]
fut: A::Future,
f: PhantomData<E>,
}
@ -69,11 +80,10 @@ where
A: Service,
E: From<A::Error>,
{
type Item = A::Response;
type Error = E;
type Output = Result<A::Response, E>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(E::from)
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx).map_err(E::from)
}
}
@ -131,11 +141,13 @@ where
}
}
#[pin_project]
pub struct FromErrNewServiceFuture<A, E>
where
A: NewService,
E: From<A::Error>,
{
#[pin]
fut: A::Future,
e: PhantomData<E>,
}
@ -145,34 +157,48 @@ where
A: NewService,
E: From<A::Error>,
{
type Item = FromErr<A::Service, E>;
type Error = A::InitError;
type Output = Result<FromErr<A::Service, E>, A::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(service) = self.fut.poll()? {
Ok(Async::Ready(FromErr::new(service)))
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Poll::Ready(svc) = self.project().fut.poll(cx)? {
Poll::Ready(Ok(FromErr::new(svc)))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
/*
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Poll::Ready(service) = self.fut.poll()? {
Ok(Poll::Ready(FromErr::new(service)))
} else {
Ok(Poll::Pending)
}
}
*/
}
#[cfg(test)]
mod tests {
use futures::future::{err, FutureResult};
use futures::future::{err, Ready};
use super::*;
use crate::{IntoNewService, NewService, Service, ServiceExt};
use tokio::future::ok;
struct Srv;
impl Service for Srv {
type Request = ();
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Err(())
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Err(()))
}
fn call(&mut self, _: ()) -> Self::Future {
@ -189,32 +215,29 @@ mod tests {
}
}
#[test]
fn test_poll_ready() {
#[tokio::test]
async fn test_poll_ready() {
let mut srv = Srv.from_err::<Error>();
let res = srv.poll_ready();
let res = srv.poll_once().await;
assert_eq!(res, Poll::Ready(Err(Error)));
}
#[tokio::test]
async fn test_call() {
let mut srv = Srv.from_err::<Error>();
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), Error);
}
#[test]
fn test_call() {
let mut srv = Srv.from_err::<Error>();
let res = srv.call(()).poll();
assert!(res.is_err());
assert_eq!(res.err().unwrap(), Error);
}
#[test]
fn test_new_service() {
let blank = || Ok::<_, ()>(Srv);
#[tokio::test]
async fn test_new_service() {
let blank = || ok::<_, ()>(Srv);
let new_srv = blank.into_new_service().from_err::<Error>();
if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() {
let res = srv.call(()).poll();
assert!(res.is_err());
assert_eq!(res.err().unwrap(), Error);
} else {
panic!()
}
let mut srv = new_srv.new_service(&()).await.unwrap();
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), Error);
}
}

View File

@ -1,8 +1,13 @@
use futures::future::{ready, LocalBoxFuture, Ready};
use futures::{Future, Poll};
use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task;
use std::task::Context;
use futures::{Future, IntoFuture, Poll};
mod cell;
mod and_then;
mod and_then_apply;
@ -11,7 +16,6 @@ mod apply;
mod apply_cfg;
pub mod blank;
pub mod boxed;
mod cell;
mod fn_service;
mod fn_transform;
mod from_err;
@ -24,6 +28,9 @@ mod transform;
mod transform_err;
pub use self::and_then::{AndThen, AndThenNewService};
use self::and_then_apply::AndThenTransform;
use self::and_then_apply_fn::{AndThenApply, AndThenApplyNewService};
pub use self::apply::{apply_fn, new_apply_fn, Apply, ApplyNewService};
pub use self::apply_cfg::{apply_cfg, new_apply_cfg};
pub use self::fn_service::{new_service_cfg, new_service_fn, service_fn, ServiceFn};
@ -36,8 +43,34 @@ pub use self::map_init_err::MapInitErr;
pub use self::then::{Then, ThenNewService};
pub use self::transform::{apply_transform, IntoTransform, Transform};
use self::and_then_apply::AndThenTransform;
use self::and_then_apply_fn::{AndThenApply, AndThenApplyNewService};
pub trait IntoFuture {
type Item;
type Error;
type Future: Future<Output = Result<Self::Item, Self::Error>>;
fn into_future(self) -> Self::Future;
}
impl<F: Future<Output = Result<I, E>>, I, E> IntoFuture for F {
type Item = I;
type Error = E;
type Future = F;
fn into_future(self) -> Self::Future {
self
}
}
/*
impl <I,E> IntoFuture for Result<I,E> {
type Item = I;
type Error = E;
type Future = Ready<Self>;
fn into_future(self) -> Self::Future {
ready(self)
}
}
*/
/// An asynchronous function from `Request` to a `Response`.
pub trait Service {
@ -51,7 +84,7 @@ pub trait Service {
type Error;
/// The future response value.
type Future: Future<Item = Self::Response, Error = Self::Error>;
type Future: Future<Output = Result<Self::Response, Self::Error>>;
/// Returns `Ready` when the service is able to process requests.
///
@ -62,7 +95,10 @@ pub trait Service {
/// This is a **best effort** implementation. False positives are permitted.
/// It is permitted for the service to return `Ready` from a `poll_ready`
/// call and the next invocation of `call` results in an error.
fn poll_ready(&mut self) -> Poll<(), Self::Error>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>>;
/// Process the request and return the response asynchronously.
///
@ -74,6 +110,31 @@ pub trait Service {
/// Calling `call` without calling `poll_ready` is permitted. The
/// implementation must be resilient to this fact.
fn call(&mut self, req: Self::Request) -> Self::Future;
#[cfg(test)]
fn poll_test(&mut self) -> Poll<Result<(), Self::Error>> {
// kinda stupid method, but works for our test purposes
unsafe {
let mut this = Pin::new_unchecked(self);
tokio::runtime::current_thread::Builder::new()
.build()
.unwrap()
.block_on(futures::future::poll_fn(move |cx| {
let this = &mut this;
Poll::Ready(this.as_mut().poll_ready(cx))
}))
}
}
fn poll_once<'a>(&'a mut self) -> LocalBoxFuture<'a, Poll<Result<(), Self::Error>>> {
unsafe {
let mut this = Pin::new_unchecked(self);
Pin::new_unchecked(Box::new(futures::future::poll_fn(move |cx| {
let this = &mut this;
Poll::Ready(this.as_mut().poll_ready(cx))
})))
}
}
}
/// An extension trait for `Service`s that provides a variety of convenient
@ -206,7 +267,7 @@ pub trait NewService {
type InitError;
/// The future of the `Service` instance.
type Future: Future<Item = Self::Service, Error = Self::InitError>;
type Future: Future<Output = Result<Self::Service, Self::InitError>>;
/// Create and return a new service value asynchronously.
fn new_service(&self, cfg: &Self::Config) -> Self::Future;
@ -343,8 +404,11 @@ where
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), S::Error> {
(**self).poll_ready()
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| &mut **s).poll_ready(ctx) }
}
fn call(&mut self, request: Self::Request) -> S::Future {
@ -361,8 +425,14 @@ where
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), S::Error> {
(**self).poll_ready()
fn poll_ready(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), S::Error>> {
unsafe {
let p: &mut S = Pin::as_mut(&mut self).get_mut();
Pin::new_unchecked(p).poll_ready(ctx)
}
}
fn call(&mut self, request: Self::Request) -> S::Future {
@ -379,12 +449,18 @@ where
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self) -> Poll<(), S::Error> {
self.borrow_mut().poll_ready()
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
unsafe {
let r = self.get_unchecked_mut();
Pin::new_unchecked(&mut (*(**r).borrow_mut())).poll_ready(ctx)
}
}
fn call(&mut self, request: Self::Request) -> S::Future {
self.borrow_mut().call(request)
(&mut (**self).borrow_mut()).call(request)
}
}

View File

@ -1,13 +1,19 @@
use std::marker::PhantomData;
use futures::{Async, Future, Poll};
use futures::{Future, Poll};
use super::{NewService, Service};
use std::pin::Pin;
use std::task::Context;
use pin_project::pin_project;
/// Service for the `map` combinator, changing the type of a service's response.
///
/// This is created by the `ServiceExt::map` method.
#[pin_project]
pub struct Map<A, F, Response> {
#[pin]
service: A,
f: F,
_t: PhantomData<Response>,
@ -52,8 +58,11 @@ where
type Error = A::Error;
type Future = MapFuture<A, F, Response>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready()
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().service.poll_ready(ctx)
}
fn call(&mut self, req: A::Request) -> Self::Future {
@ -61,12 +70,14 @@ where
}
}
#[pin_project]
pub struct MapFuture<A, F, Response>
where
A: Service,
F: FnMut(A::Response) -> Response,
{
f: F,
#[pin]
fut: A::Future,
}
@ -85,13 +96,14 @@ where
A: Service,
F: FnMut(A::Response) -> Response,
{
type Item = Response;
type Error = A::Error;
type Output = Result<Response, A::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll()? {
Async::Ready(resp) => Ok(Async::Ready((self.f)(resp))),
Async::NotReady => Ok(Async::NotReady),
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.fut.poll(cx) {
Poll::Ready(Ok(resp)) => Poll::Ready(Ok((this.f)(resp))),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
}
@ -151,11 +163,13 @@ where
}
}
#[pin_project]
pub struct MapNewServiceFuture<A, F, Res>
where
A: NewService,
F: FnMut(A::Response) -> Res,
{
#[pin]
fut: A::Future,
f: Option<F>,
}
@ -175,34 +189,38 @@ where
A: NewService,
F: FnMut(A::Response) -> Res,
{
type Item = Map<A::Service, F, Res>;
type Error = A::InitError;
type Output = Result<Map<A::Service, F, Res>, A::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(service) = self.fut.poll()? {
Ok(Async::Ready(Map::new(service, self.f.take().unwrap())))
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(svc) = this.fut.poll(cx)? {
Poll::Ready(Ok(Map::new(svc, this.f.take().unwrap())))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{ok, FutureResult};
use futures::future::{ok, Ready};
use super::*;
use crate::{IntoNewService, Service, ServiceExt};
struct Srv;
impl Service for Srv {
type Request = ();
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
@ -210,32 +228,28 @@ mod tests {
}
}
#[test]
fn test_poll_ready() {
#[tokio::test]
async fn test_poll_ready() {
let mut srv = Srv.map(|_| "ok");
let res = srv.poll_ready();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(()));
let res = srv.poll_once().await;
assert_eq!(res, Poll::Ready(Ok(())));
}
#[test]
fn test_call() {
#[tokio::test]
async fn test_call() {
let mut srv = Srv.map(|_| "ok");
let res = srv.call(()).poll();
let res = srv.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready("ok"));
assert_eq!(res.unwrap(), "ok");
}
#[test]
fn test_new_service() {
let blank = || Ok::<_, ()>(Srv);
#[tokio::test]
async fn test_new_service() {
let blank = || ok::<_, ()>(Srv);
let new_srv = blank.into_new_service().map(|_| "ok");
if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() {
let res = srv.call(()).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready("ok"));
} else {
panic!()
}
let mut srv = new_srv.new_service(&()).await.unwrap();
let res = srv.call(()).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), ("ok"));
}
}

View File

@ -1,14 +1,20 @@
use std::marker::PhantomData;
use futures::{Async, Future, Poll};
use futures::{Future, Poll};
use super::{NewService, Service};
use pin_project::pin_project;
use std::pin::Pin;
use std::task::Context;
/// Service for the `map_err` combinator, changing the type of a service's
/// error.
///
/// This is created by the `ServiceExt::map_err` method.
#[pin_project]
pub struct MapErr<A, F, E> {
#[pin]
service: A,
f: F,
_t: PhantomData<E>,
@ -53,8 +59,12 @@ where
type Error = E;
type Future = MapErrFuture<A, F, E>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.service.poll_ready().map_err(&self.f)
fn poll_ready(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
this.service.poll_ready(ctx).map_err(this.f)
}
fn call(&mut self, req: A::Request) -> Self::Future {
@ -62,12 +72,14 @@ where
}
}
#[pin_project]
pub struct MapErrFuture<A, F, E>
where
A: Service,
F: Fn(A::Error) -> E,
{
f: F,
#[pin]
fut: A::Future,
}
@ -86,11 +98,11 @@ where
A: Service,
F: Fn(A::Error) -> E,
{
type Item = A::Response;
type Error = E;
type Output = Result<A::Response, E>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(&self.f)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
this.fut.poll(cx).map_err(this.f)
}
}
@ -156,11 +168,13 @@ where
}
}
#[pin_project]
pub struct MapErrNewServiceFuture<A, F, E>
where
A: NewService,
F: Fn(A::Error) -> E,
{
#[pin]
fut: A::Future,
f: F,
}
@ -180,24 +194,25 @@ where
A: NewService,
F: Fn(A::Error) -> E + Clone,
{
type Item = MapErr<A::Service, F, E>;
type Error = A::InitError;
type Output = Result<MapErr<A::Service, F, E>, A::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(service) = self.fut.poll()? {
Ok(Async::Ready(MapErr::new(service, self.f.clone())))
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(svc) = this.fut.poll(cx)? {
Poll::Ready(Ok(MapErr::new(svc, this.f.clone())))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{err, FutureResult};
use futures::future::{err, Ready};
use super::*;
use crate::{IntoNewService, NewService, Service, ServiceExt};
use tokio::future::ok;
struct Srv;
@ -205,10 +220,13 @@ mod tests {
type Request = ();
type Response = ();
type Error = ();
type Future = FutureResult<(), ()>;
type Future = Ready<Result<(), ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Err(())
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Err(()))
}
fn call(&mut self, _: ()) -> Self::Future {
@ -216,32 +234,33 @@ mod tests {
}
}
#[test]
fn test_poll_ready() {
#[tokio::test]
async fn test_poll_ready() {
let mut srv = Srv.map_err(|_| "error");
let res = srv.poll_ready();
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
}
#[test]
fn test_call() {
let mut srv = Srv.map_err(|_| "error");
let res = srv.call(()).poll();
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
}
#[test]
fn test_new_service() {
let blank = || Ok::<_, ()>(Srv);
let new_srv = blank.into_new_service().map_err(|_| "error");
if let Async::Ready(mut srv) = new_srv.new_service(&()).poll().unwrap() {
let res = srv.call(()).poll();
let res = srv.poll_once().await;
if let Poll::Ready(res) = res {
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
} else {
panic!()
panic!("Should be ready");
}
}
#[tokio::test]
async fn test_call() {
let mut srv = Srv.map_err(|_| "error");
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
}
#[tokio::test]
async fn test_new_service() {
let blank = || ok::<_, ()>(Srv);
let new_srv = blank.into_new_service().map_err(|_| "error");
let mut srv = new_srv.new_service(&()).await.unwrap();
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), "error");
}
}

View File

@ -4,6 +4,10 @@ use futures::{Future, Poll};
use super::NewService;
use pin_project::pin_project;
use std::pin::Pin;
use std::task::Context;
/// `MapInitErr` service combinator
pub struct MapInitErr<A, F, E> {
a: A,
@ -58,13 +62,14 @@ where
MapInitErrFuture::new(self.a.new_service(cfg), self.f.clone())
}
}
#[pin_project]
pub struct MapInitErrFuture<A, F, E>
where
A: NewService,
F: Fn(A::InitError) -> E,
{
f: F,
#[pin]
fut: A::Future,
}
@ -83,10 +88,10 @@ where
A: NewService,
F: Fn(A::InitError) -> E,
{
type Item = A::Service;
type Error = E;
type Output = Result<A::Service, E>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(&self.f)
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
this.fut.poll(cx).map_err(this.f)
}
}

View File

@ -1,14 +1,21 @@
use futures::{Async, Future, Poll};
use futures::{Future, Poll};
use std::pin::Pin;
use std::task::Context;
use super::{IntoNewService, NewService, Service};
use crate::cell::Cell;
use pin_project::pin_project;
/// Service for the `then` combinator, chaining a computation onto the end of
/// another service.
///
/// This is created by the `ServiceExt::then` method.
#[pin_project]
pub struct Then<A, B> {
#[pin]
a: A,
#[pin]
b: Cell<B>,
}
@ -45,12 +52,16 @@ where
type Error = B::Error;
type Future = ThenFuture<A, B>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
let not_ready = self.a.poll_ready()?.is_not_ready();
if self.b.get_mut().poll_ready()?.is_not_ready() || not_ready {
Ok(Async::NotReady)
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let not_ready = !this.a.poll_ready(ctx)?.is_ready();
if !this.b.get_pin().poll_ready(ctx)?.is_ready() || not_ready {
Poll::Pending
} else {
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
}
}
@ -59,13 +70,16 @@ where
}
}
#[pin_project]
pub struct ThenFuture<A, B>
where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
b: Cell<B>,
#[pin]
fut_b: Option<B::Future>,
#[pin]
fut_a: Option<A::Future>,
}
@ -88,26 +102,33 @@ where
A: Service,
B: Service<Request = Result<A::Response, A::Error>>,
{
type Item = B::Response;
type Error = B::Error;
type Output = Result<B::Response, B::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Some(ref mut fut) = self.fut_b {
return fut.poll();
}
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
match self.fut_a.as_mut().expect("bug in actix-service").poll() {
Ok(Async::Ready(resp)) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.get_mut().call(Ok(resp)));
self.poll()
loop {
let mut fut_a = this.fut_a.as_mut();
let mut fut_b = this.fut_b.as_mut();
if let Some(fut) = fut_b.as_mut().as_pin_mut() {
return fut.poll(cx);
}
Err(err) => {
let _ = self.fut_a.take();
self.fut_b = Some(self.b.get_mut().call(Err(err)));
self.poll()
match fut_a
.as_mut()
.as_pin_mut()
.expect("Bug in actix-service")
.poll(cx)
{
Poll::Ready(r) => {
fut_a.set(None);
let new_fut = this.b.get_mut().call(r);
fut_b.set(Some(new_fut));
}
Poll::Pending => return Poll::Pending,
}
Ok(Async::NotReady) => Ok(Async::NotReady),
}
}
}
@ -175,6 +196,7 @@ where
}
}
#[pin_project]
pub struct ThenNewServiceFuture<A, B>
where
A: NewService,
@ -185,7 +207,9 @@ where
InitError = A::InitError,
>,
{
#[pin]
fut_b: B::Future,
#[pin]
fut_a: A::Future,
a: Option<A::Service>,
b: Option<B::Service>,
@ -221,53 +245,59 @@ where
InitError = A::InitError,
>,
{
type Item = Then<A::Service, B::Service>;
type Error = A::InitError;
type Output = Result<Then<A::Service, B::Service>, A::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.a.is_none() {
if let Async::Ready(service) = self.fut_a.poll()? {
self.a = Some(service);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.a.is_none() {
if let Poll::Ready(service) = this.fut_a.poll(cx)? {
*this.a = Some(service);
}
}
if self.b.is_none() {
if let Async::Ready(service) = self.fut_b.poll()? {
self.b = Some(service);
if this.b.is_none() {
if let Poll::Ready(service) = this.fut_b.poll(cx)? {
*this.b = Some(service);
}
}
if self.a.is_some() && self.b.is_some() {
Ok(Async::Ready(Then::new(
self.a.take().unwrap(),
self.b.take().unwrap(),
if this.a.is_some() && this.b.is_some() {
Poll::Ready(Ok(Then::new(
this.a.take().unwrap(),
this.b.take().unwrap(),
)))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{err, ok, FutureResult};
use futures::{Async, Future, Poll};
use futures::future::{err, ok, ready, Ready};
use futures::{Future, Poll};
use std::cell::Cell;
use std::rc::Rc;
use crate::{IntoNewService, NewService, Service, ServiceExt};
use std::pin::Pin;
use std::task::Context;
#[derive(Clone)]
struct Srv1(Rc<Cell<usize>>);
impl Service for Srv1 {
type Request = Result<&'static str, &'static str>;
type Response = &'static str;
type Error = ();
type Future = FutureResult<Self::Response, Self::Error>;
type Future = Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.set(self.0.get() + 1);
Ok(Async::Ready(()))
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let mut this = self.get_mut();
this.0.set(this.0.get() + 1);
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Result<&'static str, &'static str>) -> Self::Future {
@ -284,11 +314,15 @@ mod tests {
type Request = Result<&'static str, ()>;
type Response = (&'static str, &'static str);
type Error = ();
type Future = FutureResult<Self::Response, ()>;
type Future = Ready<Result<Self::Response, ()>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.set(self.0.get() + 1);
Ok(Async::Ready(()))
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let mut this = self.get_mut();
this.0.set(this.0.get() + 1);
Poll::Ready(Err(()))
}
fn call(&mut self, req: Result<&'static str, ()>) -> Self::Future {
@ -299,46 +333,44 @@ mod tests {
}
}
#[test]
fn test_poll_ready() {
#[tokio::test]
async fn test_poll_ready() {
let cnt = Rc::new(Cell::new(0));
let mut srv = Srv1(cnt.clone()).then(Srv2(cnt.clone()));
let res = srv.poll_ready();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(()));
let res = srv.poll_once().await;
assert_eq!(res, Poll::Ready(Err(())));
assert_eq!(cnt.get(), 2);
}
#[test]
fn test_call() {
#[tokio::test]
async fn test_call() {
let cnt = Rc::new(Cell::new(0));
let mut srv = Srv1(cnt.clone()).then(Srv2(cnt)).clone();
let res = srv.call(Ok("srv1")).poll();
let res = srv.call(Ok("srv1")).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok")));
assert_eq!(res.unwrap(), (("srv1", "ok")));
let res = srv.call(Err("srv")).poll();
let res = srv.call(Err("srv")).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv2", "err")));
assert_eq!(res.unwrap(), (("srv2", "err")));
}
#[test]
fn test_new_service() {
#[tokio::test]
async fn test_new_service() {
let cnt = Rc::new(Cell::new(0));
let cnt2 = cnt.clone();
let blank = move || Ok::<_, ()>(Srv1(cnt2.clone()));
let new_srv = blank.into_new_service().then(move || Ok(Srv2(cnt.clone())));
if let Async::Ready(mut srv) = new_srv.clone().new_service(&()).poll().unwrap() {
let res = srv.call(Ok("srv1")).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv1", "ok")));
let blank = move || ready(Ok::<_, ()>(Srv1(cnt2.clone())));
let new_srv = blank
.into_new_service()
.then(move || ready(Ok(Srv2(cnt.clone()))));
let mut srv = new_srv.clone().new_service(&()).await.unwrap();
let res = srv.call(Ok("srv1")).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), (("srv1", "ok")));
let res = srv.call(Err("srv")).poll();
assert!(res.is_ok());
assert_eq!(res.unwrap(), Async::Ready(("srv2", "err")));
} else {
panic!()
}
let res = srv.call(Err("srv")).await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), (("srv2", "err")));
}
}

View File

@ -1,10 +1,13 @@
use std::rc::Rc;
use std::sync::Arc;
use futures::{Async, Future, IntoFuture, Poll};
use crate::transform_err::{TransformFromErr, TransformMapInitErr};
use crate::{IntoNewService, NewService, Service};
use futures::{Future, Poll};
use std::pin::Pin;
use std::task::Context;
use pin_project::pin_project;
/// The `Transform` trait defines the interface of a Service factory. `Transform`
/// is often implemented for middleware, defining how to construct a
@ -32,7 +35,7 @@ pub trait Transform<S> {
type InitError;
/// The future response value.
type Future: Future<Item = Self::Transform, Error = Self::InitError>;
type Future: Future<Output = Result<Self::Transform, Self::InitError>>;
/// Creates and returns a new Service component, asynchronously
fn new_transform(&self, service: S) -> Self::Future;
@ -193,19 +196,21 @@ where
fn new_service(&self, cfg: &S::Config) -> Self::Future {
ApplyTransformFuture {
t_cell: self.t.clone(),
fut_a: self.s.new_service(cfg).into_future(),
fut_a: self.s.new_service(cfg),
fut_t: None,
}
}
}
#[pin_project]
pub struct ApplyTransformFuture<T, S>
where
S: NewService,
T: Transform<S::Service, InitError = S::InitError>,
{
#[pin]
fut_a: S::Future,
fut_t: Option<<T::Future as IntoFuture>::Future>,
#[pin]
fut_t: Option<T::Future>,
t_cell: Rc<T>,
}
@ -214,19 +219,21 @@ where
S: NewService,
T: Transform<S::Service, InitError = S::InitError>,
{
type Item = T::Transform;
type Error = T::InitError;
type Output = Result<T::Transform, T::InitError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.fut_t.is_none() {
if let Async::Ready(service) = self.fut_a.poll()? {
self.fut_t = Some(self.t_cell.new_transform(service).into_future());
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
if this.fut_t.as_mut().as_pin_mut().is_none() {
if let Poll::Ready(service) = this.fut_a.poll(cx)? {
this.fut_t.set(Some(this.t_cell.new_transform(service)));
}
}
if let Some(ref mut fut) = self.fut_t {
fut.poll()
if let Some(fut) = this.fut_t.as_mut().as_pin_mut() {
fut.poll(cx)
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
}

View File

@ -3,6 +3,10 @@ use std::marker::PhantomData;
use futures::{Future, Poll};
use super::Transform;
use std::pin::Pin;
use std::task::Context;
use pin_project::pin_project;
/// Transform for the `map_err` combinator, changing the type of a new
/// transform's init error.
@ -63,12 +67,13 @@ where
}
}
}
#[pin_project]
pub struct TransformMapInitErrFuture<T, S, F, E>
where
T: Transform<S>,
F: Fn(T::InitError) -> E,
{
#[pin]
fut: T::Future,
f: F,
}
@ -78,11 +83,11 @@ where
T: Transform<S>,
F: Fn(T::InitError) -> E + Clone,
{
type Item = T::Transform;
type Error = E;
type Output = Result<T::Transform, E>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(&self.f)
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.fut.poll(cx).map_err(this.f)
}
}
@ -139,11 +144,13 @@ where
}
}
#[pin_project]
pub struct TransformFromErrFuture<T, S, E>
where
T: Transform<S>,
E: From<T::InitError>,
{
#[pin]
fut: T::Future,
_t: PhantomData<E>,
}
@ -153,10 +160,9 @@ where
T: Transform<S>,
E: From<T::InitError>,
{
type Item = T::Transform;
type Error = E;
type Output = Result<T::Transform, E>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.fut.poll().map_err(E::from)
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx).map_err(E::from)
}
}