remove Pin from Service::poll_ready(); simplify combinators api; make code compile

This commit is contained in:
Nikolay Kim 2019-11-11 01:09:05 +06:00
parent eba19a2da3
commit c5e8764508
27 changed files with 389 additions and 1386 deletions

View File

@ -4,7 +4,7 @@ use std::fmt;
use std::io::{self, Read, Write};
use bytes::BytesMut;
use futures::{Poll, Sink, Stream};
use futures::{Poll, Sink, Stream};
use tokio_codec::{Decoder, Encoder};
use tokio_io::{AsyncRead, AsyncWrite};
@ -223,19 +223,16 @@ impl<T, U> Framed<T, U> {
}
}
impl<T, U> Stream for Framed<T, U>
where
T: AsyncRead,
U: Decoder,
{
type Item = Result<U::Item,U::Error>;
type Item = Result<U::Item, U::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )}
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx) }
}
}
impl<T, U> Sink<U::Item> for Framed<T, U>
@ -247,19 +244,22 @@ where
type Error = U::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_ready(cx)}
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_ready(cx) }
}
fn start_send(self: Pin<&mut Self>, item: <U as Encoder>::Item) -> Result<(), Self::Error> {
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).start_send(item)}
unsafe {
self.map_unchecked_mut(|s| s.inner.get_mut())
.start_send(item)
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_flush(cx)}
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_flush(cx) }
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_close(cx)}
unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_close(cx) }
}
}
@ -284,15 +284,17 @@ impl<T: Read, U> Read for Fuse<T, U> {
}
}
impl<T: AsyncRead, U> AsyncRead for Fuse<T, U> {
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.0.prepare_uninitialized_buffer(buf)
}
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_read(cx, buf)}
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_read(cx, buf) }
}
}
@ -306,23 +308,24 @@ impl<T: Write, U> Write for Fuse<T, U> {
}
}
impl<T: AsyncWrite, U> AsyncWrite for Fuse<T, U> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_write(cx, buf)}
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_write(cx, buf) }
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_flush(cx)}
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_flush(cx) }
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_shutdown(cx)}
unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_shutdown(cx) }
}
}
impl<T, U: Decoder> Decoder for Fuse<T, U> {
type Item = U::Item;
type Error = U::Error;

View File

@ -85,10 +85,10 @@ where
T: AsyncRead,
D: Decoder,
{
type Item = Result<D::Item,D::Error>;
type Item = Result<D::Item, D::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )}
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx) }
}
}
@ -99,21 +99,32 @@ where
type Error = T::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_ready(cx)}
unsafe {
self.map_unchecked_mut(|s| &mut s.inner.inner.0)
.poll_ready(cx)
}
}
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).start_send(item)}
unsafe {
self.map_unchecked_mut(|s| &mut s.inner.inner.0)
.start_send(item)
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_flush(cx)}
unsafe {
self.map_unchecked_mut(|s| &mut s.inner.inner.0)
.poll_flush(cx)
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_close(cx)}
unsafe {
self.map_unchecked_mut(|s| &mut s.inner.inner.0)
.poll_close(cx)
}
}
}
impl<T, D> fmt::Debug for FramedRead<T, D>
@ -178,7 +189,7 @@ impl<T> Stream for FramedRead2<T>
where
T: tokio_io::AsyncRead + Decoder,
{
type Item = Result<T::Item,T::Error>;
type Item = Result<T::Item, T::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = unsafe { self.get_unchecked_mut() };
@ -196,7 +207,6 @@ where
Ok(None) => return Poll::Ready(None),
Err(e) => return Poll::Ready(Some(Err(e))),
}
}
trace!("attempting to decode a frame");
@ -206,15 +216,12 @@ where
trace!("frame decoded from buffer");
return Poll::Ready(Some(Ok(frame)));
}
Err(e) => {
return Poll::Ready(Some(Err(e)))
}
Err(e) => return Poll::Ready(Some(Err(e))),
_ => {
// Need more data
}
}
this.is_readable = false;
}
@ -225,7 +232,6 @@ where
// get a spurious 0 that looks like EOF
this.buffer.reserve(1);
unsafe {
match Pin::new_unchecked(&mut this.inner).poll_read(cx, &mut this.buffer) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),

View File

@ -2,14 +2,14 @@ use std::fmt;
use std::io::{self, Read};
use bytes::BytesMut;
use futures::{ready,Poll, Sink, Stream};
use futures::{ready, Poll, Sink, Stream};
use log::trace;
use tokio_codec::{Decoder, Encoder};
use tokio_io::{AsyncRead, AsyncWrite};
use super::framed::Fuse;
use std::task::Context;
use std::pin::Pin;
use std::task::Context;
/// A `Sink` of frames encoded to an `AsyncWrite`.
pub struct FramedWrite<T, E> {
@ -105,19 +105,19 @@ where
type Error = E::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_ready(cx)}
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_ready(cx) }
}
fn start_send(self: Pin<&mut Self>, item: <E as Encoder>::Item) -> Result<(), Self::Error> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).start_send(item)}
unsafe { self.map_unchecked_mut(|s| &mut s.inner).start_send(item) }
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_flush(cx)}
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_flush(cx) }
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_close(cx)}
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_close(cx) }
}
}
@ -128,7 +128,10 @@ where
type Item = T::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_next(cx)}
unsafe {
self.map_unchecked_mut(|s| &mut s.inner.inner.0)
.poll_next(cx)
}
}
}
@ -230,7 +233,10 @@ where
{
type Error = T::Error;
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let len = self.buffer.len();
if len >= self.high_watermark {
return Poll::Pending;
@ -259,7 +265,9 @@ where
while !this.buffer.is_empty() {
trace!("writing; remaining={}", this.buffer.len());
let n = ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_write(cx, &this.buffer))?;
let n = ready!(
unsafe { Pin::new_unchecked(&mut this.inner) }.poll_write(cx, &this.buffer)
)?;
if n == 0 {
return Poll::Ready(Err(io::Error::new(
@ -267,7 +275,7 @@ where
"failed to \
write frame to transport",
)
.into()))
.into()));
}
// TODO: Add a way to `bytes` to do this w/o returning the drained
@ -276,7 +284,7 @@ where
}
// Try flushing the underlying IO
ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_flush(cx))?;
ready!(unsafe { Pin::new_unchecked(&mut this.inner) }.poll_flush(cx))?;
trace!("framed transport flushed");
Poll::Ready(Ok(()))
@ -284,14 +292,14 @@ where
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut this = unsafe { self.get_unchecked_mut() };
ready!(unsafe {Pin::new_unchecked(&mut this).map_unchecked_mut(|s|*s)}.poll_flush(cx))?;
ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_shutdown(cx))?;
ready!(
unsafe { Pin::new_unchecked(&mut this).map_unchecked_mut(|s| *s) }.poll_flush(cx)
)?;
ready!(unsafe { Pin::new_unchecked(&mut this.inner) }.poll_shutdown(cx))?;
Poll::Ready(Ok(()))
}
/*
fn start_send(&mut self, item: T::Item) -> StartSend<T::Item, T::Error> {
// Check the buffer capacity
@ -368,7 +376,11 @@ impl<T: AsyncRead> AsyncRead for FramedWrite2<T> {
self.inner.prepare_uninitialized_buffer(buf)
}
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_read(cx, buf)}
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_read(cx, buf) }
}
}

View File

@ -70,7 +70,7 @@ pub fn start_default_resolver() -> AsyncResolver {
pub fn new_connector<T: Address>(
resolver: AsyncResolver,
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
+ Clone {
+ Clone {
Resolver::new(resolver).and_then(TcpConnector::new())
}
@ -90,7 +90,7 @@ pub fn new_connector_factory<T: Address>(
/// Create connector service with default parameters
pub fn default_connector<T: Address>(
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
+ Clone {
+ Clone {
Resolver::default().and_then(TcpConnector::new())
}

View File

@ -1,10 +1,10 @@
use std::any::{Any, TypeId};
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{fmt, thread};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use std::{fmt, thread};
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::channel::oneshot::{channel, Canceled, Sender};
@ -27,7 +27,7 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);
pub(crate) enum ArbiterCommand {
Stop,
Execute(Box<dyn Future<Output=()> + Unpin + Send>),
Execute(Box<dyn Future<Output = ()> + Unpin + Send>),
ExecuteFn(Box<dyn FnExec>),
}
@ -133,7 +133,7 @@ impl Arbiter {
for fut in v.drain(..) {
// We pin the boxed future, so it can never again be moved.
let fut = unsafe { Pin::new_unchecked(fut) };
tokio_executor::current_thread::spawn( fut);
tokio_executor::current_thread::spawn(fut);
}
});
}
@ -146,8 +146,8 @@ impl Arbiter {
/// or Arbiter address, it is simply a helper for spawning futures on the current
/// thread.
pub fn spawn<F>(future: F)
where
F: Future<Output=()> + 'static,
where
F: Future<Output = ()> + 'static,
{
RUNNING.with(move |cell| {
if cell.get() {
@ -156,9 +156,7 @@ impl Arbiter {
} else {
// Box the future and push it to the queue, this results in double boxing
// because the executor boxes the future again, but works for now
Q.with(move |cell| {
cell.borrow_mut().push(Box::alloc().init(future))
});
Q.with(move |cell| cell.borrow_mut().push(Box::alloc().init(future)));
}
});
}
@ -167,17 +165,17 @@ impl Arbiter {
/// or Arbiter address, it is simply a helper for executing futures on the current
/// thread.
pub fn spawn_fn<F, R>(f: F)
where
F: FnOnce() -> R + 'static,
R: Future<Output=()> + 'static,
where
F: FnOnce() -> R + 'static,
R: Future<Output = ()> + 'static,
{
Arbiter::spawn(future::lazy(|_| f()).flatten())
}
/// Send a future to the Arbiter's thread, and spawn it.
pub fn send<F>(&self, future: F)
where
F: Future<Output=()> + Send + Unpin + 'static,
where
F: Future<Output = ()> + Send + Unpin + 'static,
{
let _ = self
.0
@ -187,8 +185,8 @@ impl Arbiter {
/// Send a function to the Arbiter's thread, and execute it. Any result from the function
/// is discarded.
pub fn exec_fn<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
where
F: FnOnce() + Send + 'static,
{
let _ = self
.0
@ -200,10 +198,10 @@ impl Arbiter {
/// Send a function to the Arbiter's thread. This function will be executed asynchronously.
/// A future is created, and when resolved will contain the result of the function sent
/// to the Arbiters thread.
pub fn exec<F, R>(&self, f: F) -> impl Future<Output=Result<R, Canceled>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
pub fn exec<F, R>(&self, f: F) -> impl Future<Output = Result<R, Canceled>>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (tx, rx) = channel();
let _ = self
@ -230,8 +228,8 @@ impl Arbiter {
///
/// Panics is item is not inserted
pub fn get_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&T) -> R,
where
F: FnMut(&T) -> R,
{
STORAGE.with(move |cell| {
let st = cell.borrow();
@ -247,8 +245,8 @@ impl Arbiter {
///
/// Panics is item is not inserted
pub fn get_mut_item<T: 'static, F, R>(mut f: F) -> R
where
F: FnMut(&mut T) -> R,
where
F: FnMut(&mut T) -> R,
{
STORAGE.with(move |cell| {
let mut st = cell.borrow_mut();
@ -285,28 +283,22 @@ impl Future for ArbiterController {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.rx).poll_next(cx) {
Poll::Ready(None) => {
return Poll::Ready(())
},
Poll::Ready(Some(item)) => {
match item {
ArbiterCommand::Stop => {
if let Some(stop) = self.stop.take() {
let _ = stop.send(0);
};
return Poll::Ready(());
}
ArbiterCommand::Execute(fut) => {
spawn(fut);
}
ArbiterCommand::ExecuteFn(f) => {
f.call_box();
}
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(item)) => match item {
ArbiterCommand::Stop => {
if let Some(stop) = self.stop.take() {
let _ = stop.send(0);
};
return Poll::Ready(());
}
ArbiterCommand::Execute(fut) => {
spawn(fut);
}
ArbiterCommand::ExecuteFn(f) => {
f.call_box();
}
}
Poll::Pending => {
return Poll::Pending
},
Poll::Pending => return Poll::Pending,
}
}
}
@ -372,8 +364,8 @@ pub trait FnExec: Send + 'static {
}
impl<F> FnExec for F
where
F: FnOnce() + Send + 'static,
where
F: FnOnce() + Send + 'static,
{
#[allow(clippy::boxed_local)]
fn call_box(self: Box<Self>) {

View File

@ -1,14 +1,14 @@
use std::borrow::Cow;
use std::io;
use futures::{future, FutureExt};
use futures::future::{lazy, Future};
use futures::channel::mpsc::unbounded;
use futures::channel::oneshot::{channel, Receiver};
use futures::future::{lazy, Future};
use futures::{future, FutureExt};
use tokio::runtime::current_thread::Handle;
use tokio_timer::{timer::Timer, clock::Clock};
use tokio_net::driver::Reactor;
use tokio_timer::{clock::Clock, timer::Timer};
use crate::arbiter::{Arbiter, SystemArbiter};
use crate::runtime::Runtime;
@ -159,7 +159,7 @@ pub(crate) struct AsyncSystemRunner {
impl AsyncSystemRunner {
/// This function will start event loop and returns a future that
/// resolves once the `System::stop()` function is called.
pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(),io::Error>> + Send {
pub(crate) fn run_nonblocking(self) -> impl Future<Output = Result<(), io::Error>> + Send {
let AsyncSystemRunner { stop, .. } = self;
// run loop
@ -182,7 +182,8 @@ impl AsyncSystemRunner {
Arbiter::stop_system();
return res;
}
}).flatten()
})
.flatten()
}
}

View File

@ -20,7 +20,7 @@ pub use actix_threadpool as blocking;
/// This function panics if actix system is not running.
pub fn spawn<F>(f: F)
where
F: futures::Future<Output = ()> + 'static,
F: futures::Future<Output = ()> + 'static,
{
if !System::is_set() {
panic!("System is not running");

View File

@ -3,8 +3,11 @@ use std::{fmt, io};
use futures::Future;
use tokio_executor::current_thread::{self, CurrentThread};
use tokio_timer::{timer::{self, Timer}, clock::Clock};
use tokio_net::driver::{Reactor, Handle as ReactorHandle};
use tokio_net::driver::{Handle as ReactorHandle, Reactor};
use tokio_timer::{
clock::Clock,
timer::{self, Timer},
};
use crate::builder::Builder;
@ -95,7 +98,7 @@ impl Runtime {
/// is currently at capacity and is unable to spawn a new future.
pub fn spawn<F>(&mut self, future: F) -> &mut Self
where
F: Future<Output = (),> + 'static,
F: Future<Output = ()> + 'static,
{
self.executor.spawn(future);
self
@ -149,7 +152,7 @@ impl Runtime {
// WARN: We do not enter the executor here, since in tokio 0.2 the executor is entered
// automatically inside its `block_on` and `run` methods
tokio_executor::with_default(&mut current_thread::TaskExecutor::current(),|| {
tokio_executor::with_default(&mut current_thread::TaskExecutor::current(), || {
tokio_timer::clock::with_default(clock, || {
let _reactor_guard = tokio_net::driver::set_default(reactor_handle);
let _timer_guard = tokio_timer::set_default(timer_handle);

View File

@ -28,4 +28,4 @@ pin-project = "0.4.0-alpha.11"
[dev-dependencies]
tokio = "0.2.0-alpha.5"
actix-rt = "0.2"
actix-rt = "0.2"

View File

@ -1,21 +1,18 @@
use futures::{Future, Poll};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use pin_project::pin_project;
use super::{IntoNewService, NewService, Service};
use crate::cell::Cell;
use pin_project::pin_project;
use std::pin::Pin;
use std::task::Context;
/// Service for the `and_then` combinator, chaining a computation onto the end
/// of another service which completes successfully.
///
/// This is created by the `ServiceExt::and_then` method.
#[pin_project]
pub struct AndThen<A, B> {
#[pin]
a: A,
#[pin]
b: Cell<B>,
}
@ -52,13 +49,9 @@ where
type Error = A::Error;
type Future = AndThenFuture<A, B>;
fn poll_ready(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let not_ready = !this.a.poll_ready(cx)?.is_ready();
if !this.b.get_pin().poll_ready(cx)?.is_ready() || not_ready {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let not_ready = !self.a.poll_ready(cx)?.is_ready();
if !self.b.get_mut().poll_ready(cx)?.is_ready() || not_ready {
Poll::Pending
} else {
Poll::Ready(Ok(()))

View File

@ -1,197 +0,0 @@
use std::rc::Rc;
use futures::{Future, Poll};
use crate::and_then::AndThen;
use crate::from_err::FromErr;
use crate::{NewService, Transform};
use pin_project::pin_project;
use std::pin::Pin;
use std::task::Context;
/// `Apply` new service combinator
pub struct AndThenTransform<T, A, B> {
a: A,
b: B,
t: Rc<T>,
}
impl<T, A, B> AndThenTransform<T, A, B>
where
A: NewService,
B: NewService<Config = A::Config, InitError = A::InitError>,
T: Transform<B::Service, Request = A::Response, InitError = A::InitError>,
T::Error: From<A::Error>,
{
/// Create new `ApplyNewService` new service instance
pub fn new(t: T, a: A, b: B) -> Self {
Self {
a,
b,
t: Rc::new(t),
}
}
}
impl<T, A, B> Clone for AndThenTransform<T, A, B>
where
A: Clone,
B: Clone,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
b: self.b.clone(),
t: self.t.clone(),
}
}
}
impl<T, A, B> NewService for AndThenTransform<T, A, B>
where
A: NewService,
B: NewService<Config = A::Config, InitError = A::InitError>,
T: Transform<B::Service, Request = A::Response, InitError = A::InitError>,
T::Error: From<A::Error>,
{
type Request = A::Request;
type Response = T::Response;
type Error = T::Error;
type Config = A::Config;
type InitError = T::InitError;
type Service = AndThen<FromErr<A::Service, T::Error>, T::Transform>;
type Future = AndThenTransformFuture<T, A, B>;
fn new_service(&self, cfg: &A::Config) -> Self::Future {
AndThenTransformFuture {
a: None,
t: None,
t_cell: self.t.clone(),
fut_a: self.a.new_service(cfg),
fut_b: self.b.new_service(cfg),
fut_t: None,
}
}
}
#[pin_project]
pub struct AndThenTransformFuture<T, A, B>
where
A: NewService,
B: NewService<InitError = A::InitError>,
T: Transform<B::Service, Request = A::Response, InitError = A::InitError>,
T::Error: From<A::Error>,
{
#[pin]
fut_a: A::Future,
#[pin]
fut_b: B::Future,
#[pin]
fut_t: Option<T::Future>,
a: Option<A::Service>,
t: Option<T::Transform>,
t_cell: Rc<T>,
}
impl<T, A, B> Future for AndThenTransformFuture<T, A, B>
where
A: NewService,
B: NewService<InitError = A::InitError>,
T: Transform<B::Service, Request = A::Response, InitError = A::InitError>,
T::Error: From<A::Error>,
{
type Output = Result<AndThen<FromErr<A::Service, T::Error>, T::Transform>, T::InitError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
if this.fut_t.is_none() {
if let Poll::Ready(svc) = this.fut_b.poll(cx)? {
this.fut_t.set(Some(this.t_cell.new_transform(svc)))
}
}
if this.a.is_none() {
if let Poll::Ready(svc) = this.fut_a.poll(cx)? {
*this.a = Some(svc)
}
}
if let Some(fut) = this.fut_t.as_pin_mut() {
if let Poll::Ready(transform) = fut.poll(cx)? {
*this.t = Some(transform)
}
}
if this.a.is_some() && this.t.is_some() {
Poll::Ready(Ok(AndThen::new(
FromErr::new(this.a.take().unwrap()),
this.t.take().unwrap(),
)))
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{ok, ready, Ready};
use futures::{Future, FutureExt, Poll, TryFutureExt};
use crate::{IntoNewService, IntoService, NewService, Service, ServiceExt};
use std::pin::Pin;
use std::task::Context;
#[derive(Clone)]
struct Srv;
impl Service for Srv {
type Request = ();
type Response = ();
type Error = ();
type Future = Ready<Result<(), ()>>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Err(()))
}
fn call(&mut self, _: ()) -> Self::Future {
ok(())
}
}
#[tokio::test]
async fn test_apply() {
let blank = |req| ready(Ok(req));
let mut srv = blank
.into_service()
.apply_fn(Srv, |req: &'static str, srv: &mut Srv| {
srv.call(()).map_ok(move |res| (req, res))
});
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), (("srv", ())));
}
#[tokio::test]
async fn test_new_service() {
let blank = move || ok::<_, ()>((|req| ok(req)).into_service());
let new_srv = blank.into_new_service().apply(
|req: &'static str, srv: &mut Srv| srv.call(()).map_ok(move |res| (req, res)),
|| ok(Srv),
);
let mut srv = new_srv.new_service(&()).await.unwrap();
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), (("srv", ())));
}
}

View File

@ -1,336 +0,0 @@
use futures::{Future, Poll};
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use super::{IntoNewService, IntoService, NewService, Service};
use crate::cell::Cell;
use crate::IntoFuture;
use pin_project::pin_project;
/// `Apply` service combinator
#[pin_project]
pub struct AndThenApply<A, B, F, Out>
where
A: Service,
B: Service<Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
#[pin]
a: A,
#[pin]
b: Cell<B>,
#[pin]
f: Cell<F>,
r: PhantomData<(Out,)>,
}
impl<A, B, F, Out> AndThenApply<A, B, F, Out>
where
A: Service,
B: Service<Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
/// Create new `Apply` combinator
pub fn new<A1: IntoService<A>, B1: IntoService<B>>(a: A1, b: B1, f: F) -> Self {
Self {
f: Cell::new(f),
a: a.into_service(),
b: Cell::new(b.into_service()),
r: PhantomData,
}
}
}
impl<A, B, F, Out> Clone for AndThenApply<A, B, F, Out>
where
A: Service + Clone,
B: Service<Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
fn clone(&self) -> Self {
AndThenApply {
a: self.a.clone(),
b: self.b.clone(),
f: self.f.clone(),
r: PhantomData,
}
}
}
impl<A, B, F, Out> Service for AndThenApply<A, B, F, Out>
where
A: Service,
B: Service<Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
type Request = A::Request;
type Response = Out::Item;
type Error = A::Error;
type Future = AndThenApplyFuture<A, B, F, Out>;
fn poll_ready(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let not_ready = !this.a.poll_ready(ctx)?.is_ready();
if !this.b.get_pin().poll_ready(ctx).is_ready() || not_ready {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}
fn call(&mut self, req: A::Request) -> Self::Future {
AndThenApplyFuture {
b: self.b.clone(),
f: self.f.clone(),
fut_b: None,
fut_a: Some(self.a.call(req)),
}
}
}
#[pin_project]
pub struct AndThenApplyFuture<A, B, F, Out>
where
A: Service,
B: Service<Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
b: Cell<B>,
f: Cell<F>,
#[pin]
fut_a: Option<A::Future>,
#[pin]
fut_b: Option<Out::Future>,
}
impl<A, B, F, Out> Future for AndThenApplyFuture<A, B, F, Out>
where
A: Service,
B: Service<Error = A::Error>,
F: FnMut(A::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
type Output = Result<Out::Item, A::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
if let Some(fut) = this.fut_b.as_mut().as_pin_mut() {
return fut.poll(cx).map_err(|e| e.into());
}
match this
.fut_a
.as_mut()
.as_pin_mut()
.expect("Bug in actix-service")
.poll(cx)?
{
Poll::Ready(resp) => {
this.fut_a.set(None);
this.fut_b.set(Some(
(&mut *this.f.get_mut())(resp, this.b.get_mut()).into_future(),
));
}
Poll::Pending => return Poll::Pending,
}
}
}
}
/// `ApplyNewService` new service combinator
pub struct AndThenApplyNewService<A, B, F, Out> {
a: A,
b: B,
f: Cell<F>,
r: PhantomData<Out>,
}
impl<A, B, F, Out> AndThenApplyNewService<A, B, F, Out>
where
A: NewService,
B: NewService<Config = A::Config, Error = A::Error, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
/// Create new `ApplyNewService` new service instance
pub fn new<A1: IntoNewService<A>, B1: IntoNewService<B>>(a: A1, b: B1, f: F) -> Self {
Self {
f: Cell::new(f),
a: a.into_new_service(),
b: b.into_new_service(),
r: PhantomData,
}
}
}
impl<A, B, F, Out> Clone for AndThenApplyNewService<A, B, F, Out>
where
A: Clone,
B: Clone,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
b: self.b.clone(),
f: self.f.clone(),
r: PhantomData,
}
}
}
impl<A, B, F, Out> NewService for AndThenApplyNewService<A, B, F, Out>
where
A: NewService,
B: NewService<Config = A::Config, Error = A::Error, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
type Request = A::Request;
type Response = Out::Item;
type Error = A::Error;
type Service = AndThenApply<A::Service, B::Service, F, Out>;
type Config = A::Config;
type InitError = A::InitError;
type Future = AndThenApplyNewServiceFuture<A, B, F, Out>;
fn new_service(&self, cfg: &A::Config) -> Self::Future {
AndThenApplyNewServiceFuture {
a: None,
b: None,
f: self.f.clone(),
fut_a: self.a.new_service(cfg),
fut_b: self.b.new_service(cfg),
}
}
}
#[pin_project]
pub struct AndThenApplyNewServiceFuture<A, B, F, Out>
where
A: NewService,
B: NewService<Error = A::Error, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
#[pin]
fut_b: B::Future,
#[pin]
fut_a: A::Future,
f: Cell<F>,
a: Option<A::Service>,
b: Option<B::Service>,
}
impl<A, B, F, Out> Future for AndThenApplyNewServiceFuture<A, B, F, Out>
where
A: NewService,
B: NewService<Error = A::Error, InitError = A::InitError>,
F: FnMut(A::Response, &mut B::Service) -> Out,
Out: IntoFuture,
Out::Error: Into<A::Error>,
{
type Output = Result<AndThenApply<A::Service, B::Service, F, Out>, A::InitError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.a.is_none() {
if let Poll::Ready(service) = this.fut_a.poll(cx)? {
*this.a = Some(service);
}
}
if this.b.is_none() {
if let Poll::Ready(service) = this.fut_b.poll(cx)? {
*this.b = Some(service);
}
}
if this.a.is_some() && this.b.is_some() {
Poll::Ready(Ok(AndThenApply {
f: this.f.clone(),
a: this.a.take().unwrap(),
b: Cell::new(this.b.take().unwrap()),
r: PhantomData,
}))
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use futures::future::{ok, Ready};
use futures::{Future, Poll, TryFutureExt};
use crate::blank::{Blank, BlankNewService};
use crate::{NewService, Service, ServiceExt};
use std::pin::Pin;
use std::task::Context;
#[derive(Clone)]
struct Srv;
impl Service for Srv {
type Request = ();
type Response = ();
type Error = ();
type Future = Ready<Result<(), ()>>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: ()) -> Self::Future {
ok(())
}
}
#[tokio::test]
async fn test_call() {
let mut srv = Blank::new().apply_fn(Srv, |req: &'static str, srv| {
srv.call(()).map_ok(move |res| (req, res))
});
assert_eq!(srv.poll_once().await, Poll::Ready(Ok(())));
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), (("srv", ())));
}
#[tokio::test]
async fn test_new_service() {
let new_srv = BlankNewService::new_unit().apply_fn(
|| ok(Srv),
|req: &'static str, srv| srv.call(()).map_ok(move |res| (req, res)),
);
let mut srv = new_srv.new_service(&()).await.unwrap();
assert_eq!(srv.poll_once().await, Poll::Ready(Ok(())));
let res = srv.call("srv").await;
assert!(res.is_ok());
assert_eq!(res.unwrap(), (("srv", ())));
}
}

View File

@ -1,13 +1,11 @@
use std::marker::PhantomData;
use futures::{ready, Future, Poll};
use super::{IntoNewService, IntoService, NewService, Service};
use crate::IntoFuture;
use pin_project::pin_project;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use super::IntoFuture;
use super::{IntoNewService, IntoService, NewService, Service};
/// Apply tranform function to a service
pub fn apply_fn<T, F, In, Out, U>(service: U, f: F) -> Apply<T, F, In, Out>
@ -89,11 +87,8 @@ where
type Error = Out::Error;
type Future = Out::Future;
fn poll_ready(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(ready!(self.project().service.poll_ready(ctx)).map_err(|e| e.into()))
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(futures::ready!(self.service.poll_ready(ctx)).map_err(|e| e.into()))
}
fn call(&mut self, req: In) -> Self::Future {

View File

@ -1,14 +1,13 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::future::Future;
use futures::{ready, Poll};
use futures::ready;
use pin_project::pin_project;
use crate::cell::Cell;
use crate::{IntoFuture, IntoService, NewService, Service};
use std::pin::Pin;
use std::task::Context;
use pin_project::pin_project;
/// Convert `Fn(&Config, &mut Service) -> Future<Service>` fn to a NewService
pub fn apply_cfg<F, C, T, R, S>(
@ -144,7 +143,7 @@ where
{
type Output = Result<S, R::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(ready!(self.project().fut.poll(cx))?.into_service()))
}
}
@ -226,7 +225,6 @@ where
{
cfg: C,
f: Cell<F>,
#[pin]
srv: Option<T::Service>,
#[pin]
srv_fut: Option<T::Future>,
@ -255,7 +253,7 @@ where
Poll::Pending => return Poll::Pending,
Poll::Ready(srv) => {
this.srv_fut.set(None);
this.srv.set(Some(srv));
*this.srv = Some(srv);
continue 'poll;
}
}
@ -263,13 +261,11 @@ where
if let Some(fut) = this.fut.as_mut().as_pin_mut() {
return Poll::Ready(Ok(ready!(fut.poll(cx))?.into_service()));
} else if let Some(mut srv) = this.srv.as_mut().as_pin_mut() {
match srv.as_mut().poll_ready(cx)? {
} else if let Some(ref mut srv) = this.srv {
match srv.poll_ready(cx)? {
Poll::Ready(_) => {
this.fut.set(Some(
this.f.get_mut()(&this.cfg, unsafe { Pin::get_unchecked_mut(srv) })
.into_future(),
));
this.fut
.set(Some(this.f.get_mut()(&this.cfg, srv).into_future()));
continue 'poll;
}
Poll::Pending => return Poll::Pending,

View File

@ -1,11 +1,13 @@
use std::pin::Pin;
#![allow(unused_imports, unused_variables, dead_code)]
use crate::{IntoFuture, NewService, Service, ServiceExt};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::{IntoFuture, NewService, Service};
use futures::future::FutureExt;
use futures::future::LocalBoxFuture;
use futures::future::{err, ok, Either, Ready};
use futures::{Future, Poll};
use std::task::Context;
pub type BoxedService<Req, Res, Err> = Box<
dyn Service<
@ -144,11 +146,8 @@ where
LocalBoxFuture<'static, Result<Res, Err>>,
>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
unimplemented!()
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(ctx)
}
fn call(&mut self, req: Self::Request) -> Self::Future {
@ -156,10 +155,6 @@ where
}
/*
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready()
}
fn call(&mut self, req: Self::Request) -> Self::Future {
let mut fut = self.0.call(req);
match fut.poll() {

View File

@ -1,5 +1,4 @@
//! Custom cell impl
use std::pin::Pin;
//! Custom cell impl, internal use only
use std::{cell::UnsafeCell, fmt, rc::Rc};
pub(crate) struct Cell<T> {
@ -34,9 +33,6 @@ impl<T> Cell<T> {
pub(crate) fn get_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.as_ref().get() }
}
pub(crate) fn get_pin(self: Pin<&mut Self>) -> Pin<&mut T> {
unsafe { Pin::new_unchecked(&mut *Pin::get_unchecked_mut(self).inner.as_ref().get()) }
}
#[allow(clippy::mut_from_ref)]
pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T {

View File

@ -1,14 +1,13 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::future::{ok, Ready};
use pin_project::pin_project;
use crate::IntoFuture;
use futures::future::{ok, Future, Ready};
use futures::{ready, Poll};
use crate::{IntoNewService, IntoService, NewService, Service};
use std::pin::Pin;
use std::task::Context;
use pin_project::pin_project;
/// Create `NewService` for function that can act as a Service
pub fn service_fn<F, Req, Out, Cfg>(f: F) -> NewServiceFn<F, Req, Out, Cfg>
@ -80,10 +79,7 @@ where
type Error = Out::Error;
type Future = Out::Future;
fn poll_ready(
self: Pin<&mut Self>,
_ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
@ -238,8 +234,10 @@ where
{
type Output = Result<S, R::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(ready!(self.project().fut.poll(cx))?.into_service()))
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Ok(
futures::ready!(self.project().fut.poll(cx))?.into_service()
))
}
}

View File

@ -1,79 +0,0 @@
use std::marker::PhantomData;
use futures::future::{ok, Ready};
use crate::apply::Apply;
use crate::{IntoFuture, IntoTransform, Service, Transform};
/// Use function as transform service
pub fn transform_fn<F, S, In, Out, Err>(
f: F,
) -> impl Transform<S, Request = In, Response = Out::Item, Error = Out::Error, InitError = Err>
where
S: Service,
F: FnMut(In, &mut S) -> Out + Clone,
Out: IntoFuture,
Out::Error: From<S::Error>,
{
FnTransform::new(f)
}
pub struct FnTransform<F, S, In, Out, Err>
where
F: FnMut(In, &mut S) -> Out + Clone,
Out: IntoFuture,
{
f: F,
_t: PhantomData<(S, In, Out, Err)>,
}
impl<F, S, In, Out, Err> FnTransform<F, S, In, Out, Err>
where
F: FnMut(In, &mut S) -> Out + Clone,
Out: IntoFuture,
{
pub fn new(f: F) -> Self {
FnTransform { f, _t: PhantomData }
}
}
impl<F, S, In, Out, Err> Transform<S> for FnTransform<F, S, In, Out, Err>
where
S: Service,
F: FnMut(In, &mut S) -> Out + Clone,
Out: IntoFuture,
Out::Error: From<S::Error>,
{
type Request = In;
type Response = Out::Item;
type Error = Out::Error;
type Transform = Apply<S, F, In, Out>;
type InitError = Err;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(Apply::new(service, self.f.clone()))
}
}
impl<F, S, In, Out, Err> IntoTransform<FnTransform<F, S, In, Out, Err>, S> for F
where
S: Service,
F: FnMut(In, &mut S) -> Out + Clone,
Out: IntoFuture,
Out::Error: From<S::Error>,
{
fn into_transform(self) -> FnTransform<F, S, In, Out, Err> {
FnTransform::new(self)
}
}
impl<F, S, In, Out, Err> Clone for FnTransform<F, S, In, Out, Err>
where
F: FnMut(In, &mut S) -> Out + Clone,
Out: IntoFuture,
{
fn clone(&self) -> Self {
Self::new(self.f.clone())
}
}

View File

@ -1,243 +0,0 @@
use std::marker::PhantomData;
use futures::{Future, Poll};
use super::{NewService, Service};
use std::pin::Pin;
use std::task::Context;
use pin_project::pin_project;
/// Service for the `from_err` combinator, changing the error type of a service.
///
/// This is created by the `ServiceExt::from_err` method.
#[pin_project]
pub struct FromErr<A, E> {
#[pin]
service: A,
f: PhantomData<E>,
}
impl<A, E> FromErr<A, E> {
pub(crate) fn new(service: A) -> Self
where
A: Service,
E: From<A::Error>,
{
FromErr {
service,
f: PhantomData,
}
}
}
impl<A, E> Clone for FromErr<A, E>
where
A: Clone,
{
fn clone(&self) -> Self {
FromErr {
service: self.service.clone(),
f: PhantomData,
}
}
}
impl<A, E> Service for FromErr<A, E>
where
A: Service,
E: From<A::Error>,
{
type Request = A::Request;
type Response = A::Response;
type Error = E;
type Future = FromErrFuture<A, E>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().service.poll_ready(ctx).map_err(E::from)
}
fn call(&mut self, req: A::Request) -> Self::Future {
FromErrFuture {
fut: self.service.call(req),
f: PhantomData,
}
}
}
#[pin_project]
pub struct FromErrFuture<A: Service, E> {
#[pin]
fut: A::Future,
f: PhantomData<E>,
}
impl<A, E> Future for FromErrFuture<A, E>
where
A: Service,
E: From<A::Error>,
{
type Output = Result<A::Response, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx).map_err(E::from)
}
}
/// NewService for the `from_err` combinator, changing the type of a new
/// service's error.
///
/// This is created by the `NewServiceExt::from_err` method.
pub struct FromErrNewService<A, E> {
a: A,
e: PhantomData<E>,
}
impl<A, E> FromErrNewService<A, E> {
/// Create new `FromErr` new service instance
pub fn new(a: A) -> Self
where
A: NewService,
E: From<A::Error>,
{
Self { a, e: PhantomData }
}
}
impl<A, E> Clone for FromErrNewService<A, E>
where
A: Clone,
{
fn clone(&self) -> Self {
Self {
a: self.a.clone(),
e: PhantomData,
}
}
}
impl<A, E> NewService for FromErrNewService<A, E>
where
A: NewService,
E: From<A::Error>,
{
type Request = A::Request;
type Response = A::Response;
type Error = E;
type Config = A::Config;
type Service = FromErr<A::Service, E>;
type InitError = A::InitError;
type Future = FromErrNewServiceFuture<A, E>;
fn new_service(&self, cfg: &A::Config) -> Self::Future {
FromErrNewServiceFuture {
fut: self.a.new_service(cfg),
e: PhantomData,
}
}
}
#[pin_project]
pub struct FromErrNewServiceFuture<A, E>
where
A: NewService,
E: From<A::Error>,
{
#[pin]
fut: A::Future,
e: PhantomData<E>,
}
impl<A, E> Future for FromErrNewServiceFuture<A, E>
where
A: NewService,
E: From<A::Error>,
{
type Output = Result<FromErr<A::Service, E>, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Poll::Ready(svc) = self.project().fut.poll(cx)? {
Poll::Ready(Ok(FromErr::new(svc)))
} else {
Poll::Pending
}
}
/*
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Poll::Ready(service) = self.fut.poll()? {
Ok(Poll::Ready(FromErr::new(service)))
} else {
Ok(Poll::Pending)
}
}
*/
}
#[cfg(test)]
mod tests {
use futures::future::{err, Ready};
use super::*;
use crate::{IntoNewService, NewService, Service, ServiceExt};
use tokio::future::ok;
struct Srv;
impl Service for Srv {
type Request = ();
type Response = ();
type Error = ();
type Future = Ready<Result<(), ()>>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Err(()))
}
fn call(&mut self, _: ()) -> Self::Future {
err(())
}
}
#[derive(Debug, PartialEq)]
struct Error;
impl From<()> for Error {
fn from(_: ()) -> Self {
Error
}
}
#[tokio::test]
async fn test_poll_ready() {
let mut srv = Srv.from_err::<Error>();
let res = srv.poll_once().await;
assert_eq!(res, Poll::Ready(Err(Error)));
}
#[tokio::test]
async fn test_call() {
let mut srv = Srv.from_err::<Error>();
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), Error);
}
#[tokio::test]
async fn test_new_service() {
let blank = || ok::<_, ()>(Srv);
let new_srv = blank.into_new_service().from_err::<Error>();
let mut srv = new_srv.new_service(&()).await.unwrap();
let res = srv.call(()).await;
assert!(res.is_err());
assert_eq!(res.err().unwrap(), Error);
}
}

View File

@ -1,45 +1,33 @@
use futures::future::{ready, LocalBoxFuture, Ready};
use futures::{Future, Poll};
use std::cell::RefCell;
use std::pin::Pin;
use std::future::Future;
use std::rc::Rc;
use std::sync::Arc;
use std::task;
use std::task::Context;
use std::task::{self, Context, Poll};
mod cell;
mod and_then;
mod and_then_apply;
mod and_then_apply_fn;
mod apply;
mod apply_cfg;
pub mod blank;
pub mod boxed;
mod fn_service;
mod fn_transform;
mod from_err;
mod map;
mod map_config;
mod map_err;
mod map_init_err;
mod pipeline;
mod then;
mod transform;
mod transform_err;
pub use self::and_then::{AndThen, AndThenNewService};
use self::and_then_apply::AndThenTransform;
use self::and_then_apply_fn::{AndThenApply, AndThenApplyNewService};
pub use self::apply::{apply_fn, new_apply_fn, Apply, ApplyNewService};
pub use self::apply::{apply_fn, new_apply_fn};
pub use self::apply_cfg::{apply_cfg, new_apply_cfg};
pub use self::fn_service::{new_service_cfg, new_service_fn, service_fn, ServiceFn};
pub use self::fn_transform::transform_fn;
pub use self::from_err::{FromErr, FromErrNewService};
pub use self::map::{Map, MapNewService};
pub use self::map_config::{MapConfig, MappedConfig, UnitConfig};
pub use self::map_err::{MapErr, MapErrNewService};
pub use self::map_init_err::MapInitErr;
pub use self::pipeline::{new_pipeline, pipeline, NewPipeline, Pipeline};
pub use self::then::{Then, ThenNewService};
pub use self::transform::{apply_transform, IntoTransform, Transform};
@ -60,18 +48,6 @@ impl<F: Future<Output = Result<I, E>>, I, E> IntoFuture for F {
}
}
/*
impl <I,E> IntoFuture for Result<I,E> {
type Item = I;
type Error = E;
type Future = Ready<Self>;
fn into_future(self) -> Self::Future {
ready(self)
}
}
*/
/// An asynchronous function from `Request` to a `Response`.
pub trait Service {
/// Requests handled by the service.
@ -95,10 +71,7 @@ pub trait Service {
/// This is a **best effort** implementation. False positives are permitted.
/// It is permitted for the service to return `Ready` from a `poll_ready`
/// call and the next invocation of `call` results in an error.
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>>;
fn poll_ready(&mut self, ctx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>>;
/// Process the request and return the response asynchronously.
///
@ -126,114 +99,17 @@ pub trait Service {
}
}
fn poll_once<'a>(&'a mut self) -> LocalBoxFuture<'a, Poll<Result<(), Self::Error>>> {
unsafe {
let mut this = Pin::new_unchecked(self);
Pin::new_unchecked(Box::new(futures::future::poll_fn(move |cx| {
let this = &mut this;
Poll::Ready(this.as_mut().poll_ready(cx))
})))
}
}
// fn poll_once<'a>(&'a mut self) -> LocalBoxFuture<'a, Poll<Result<(), Self::Error>>> {
// unsafe {
// let mut this = Pin::new_unchecked(self);
// Pin::new_unchecked(Box::new(futures::future::poll_fn(move |cx| {
// let this = &mut this;
// Poll::Ready(this.as_mut().poll_ready(cx))
// })))
// }
// }
}
/// An extension trait for `Service`s that provides a variety of convenient
/// adapters
pub trait ServiceExt: Service {
/// Apply function to specified service and use it as a next service in
/// chain.
fn apply_fn<F, B, B1, Out>(self, service: B1, f: F) -> AndThenApply<Self, B, F, Out>
where
Self: Sized,
F: FnMut(Self::Response, &mut B) -> Out,
Out: IntoFuture,
Out::Error: Into<Self::Error>,
B: Service<Error = Self::Error>,
B1: IntoService<B>,
{
AndThenApply::new(self, service, f)
}
/// Call another service after call to this one has resolved successfully.
///
/// This function can be used to chain two services together and ensure that
/// the second service isn't called until call to the fist service have
/// finished. Result of the call to the first service is used as an
/// input parameter for the second service's call.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
fn and_then<F, B>(self, service: F) -> AndThen<Self, B>
where
Self: Sized,
F: IntoService<B>,
B: Service<Request = Self::Response, Error = Self::Error>,
{
AndThen::new(self, service.into_service())
}
/// Map this service's error to any error implementing `From` for
/// this service`s `Error`.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
fn from_err<E>(self) -> FromErr<Self, E>
where
Self: Sized,
E: From<Self::Error>,
{
FromErr::new(self)
}
/// Chain on a computation for when a call to the service finished,
/// passing the result of the call to the next service `B`.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
fn then<B>(self, service: B) -> Then<Self, B>
where
Self: Sized,
B: Service<Request = Result<Self::Response, Self::Error>, Error = Self::Error>,
{
Then::new(self, service)
}
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
///
/// This function is similar to the `Option::map` or `Iterator::map` where
/// it will change the type of the underlying service.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it, similar to the existing `map` methods in the
/// standard library.
fn map<F, R>(self, f: F) -> Map<Self, F, R>
where
Self: Sized,
F: FnMut(Self::Response) -> R,
{
Map::new(self, f)
}
/// Map this service's error to a different error, returning a new service.
///
/// This function is similar to the `Result::map_err` where it will change
/// the error type of the underlying service. This is useful for example to
/// ensure that services have the same error type.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
fn map_err<F, E>(self, f: F) -> MapErr<Self, F, E>
where
Self: Sized,
F: Fn(Self::Error) -> E,
{
MapErr::new(self, f)
}
}
impl<T: ?Sized> ServiceExt for T where T: Service {}
/// Creates new `Service` values.
///
/// Acts as a service factory. This is useful for cases where new `Service`
@ -271,128 +147,6 @@ pub trait NewService {
/// Create and return a new service value asynchronously.
fn new_service(&self, cfg: &Self::Config) -> Self::Future;
/// Apply transform service to specified service and use it as a next service in
/// chain.
fn apply<T, T1, B, B1>(self, transform: T1, service: B1) -> AndThenTransform<T, Self, B>
where
Self: Sized,
T: Transform<B::Service, Request = Self::Response, InitError = Self::InitError>,
T::Error: From<Self::Error>,
T1: IntoTransform<T, B::Service>,
B: NewService<Config = Self::Config, InitError = Self::InitError>,
B1: IntoNewService<B>,
{
AndThenTransform::new(transform.into_transform(), self, service.into_new_service())
}
/// Apply function to specified service and use it as a next service in
/// chain.
fn apply_fn<B, I, F, Out>(self, service: I, f: F) -> AndThenApplyNewService<Self, B, F, Out>
where
Self: Sized,
B: NewService<Config = Self::Config, Error = Self::Error, InitError = Self::InitError>,
I: IntoNewService<B>,
F: FnMut(Self::Response, &mut B::Service) -> Out,
Out: IntoFuture,
Out::Error: Into<Self::Error>,
{
AndThenApplyNewService::new(self, service, f)
}
/// Call another service after call to this one has resolved successfully.
fn and_then<F, B>(self, new_service: F) -> AndThenNewService<Self, B>
where
Self: Sized,
F: IntoNewService<B>,
B: NewService<
Config = Self::Config,
Request = Self::Response,
Error = Self::Error,
InitError = Self::InitError,
>,
{
AndThenNewService::new(self, new_service)
}
/// `NewService` that create service to map this service's error
/// and new service's init error to any error
/// implementing `From` for this service`s `Error`.
///
/// Note that this function consumes the receiving new service and returns a
/// wrapped version of it.
fn from_err<E>(self) -> FromErrNewService<Self, E>
where
Self: Sized,
E: From<Self::Error>,
{
FromErrNewService::new(self)
}
/// Create `NewService` to chain on a computation for when a call to the
/// service finished, passing the result of the call to the next
/// service `B`.
///
/// Note that this function consumes the receiving future and returns a
/// wrapped version of it.
fn then<F, B>(self, new_service: F) -> ThenNewService<Self, B>
where
Self: Sized,
F: IntoNewService<B>,
B: NewService<
Config = Self::Config,
Request = Result<Self::Response, Self::Error>,
Error = Self::Error,
InitError = Self::InitError,
>,
{
ThenNewService::new(self, new_service)
}
/// Map this service's output to a different type, returning a new service
/// of the resulting type.
fn map<F, R>(self, f: F) -> MapNewService<Self, F, R>
where
Self: Sized,
F: FnMut(Self::Response) -> R,
{
MapNewService::new(self, f)
}
/// Map this service's error to a different error, returning a new service.
fn map_err<F, E>(self, f: F) -> MapErrNewService<Self, F, E>
where
Self: Sized,
F: Fn(Self::Error) -> E + Clone,
{
MapErrNewService::new(self, f)
}
/// Map this factory's init error to a different error, returning a new service.
fn map_init_err<F, E>(self, f: F) -> MapInitErr<Self, F, E>
where
Self: Sized,
F: Fn(Self::InitError) -> E,
{
MapInitErr::new(self, f)
}
/// Map config to a different error, returning a new service.
fn map_config<F, C>(self, f: F) -> MapConfig<Self, F, C>
where
Self: Sized,
F: Fn(&C) -> MappedConfig<Self::Config>,
{
MapConfig::new(self, f)
}
/// Replace config with unit
fn unit_config<C>(self) -> UnitConfig<Self, C>
where
Self: NewService<Config = ()> + Sized,
{
UnitConfig::new(self)
}
}
impl<'a, S> Service for &'a mut S
@ -404,11 +158,8 @@ where
type Error = S::Error;
type Future = S::Future;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
unsafe { self.map_unchecked_mut(|s| &mut **s).poll_ready(ctx) }
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
(**self).poll_ready(ctx)
}
fn call(&mut self, request: Self::Request) -> S::Future {
@ -425,14 +176,8 @@ where
type Error = S::Error;
type Future = S::Future;
fn poll_ready(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), S::Error>> {
unsafe {
let p: &mut S = Pin::as_mut(&mut self).get_mut();
Pin::new_unchecked(p).poll_ready(ctx)
}
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), S::Error>> {
(**self).poll_ready(ctx)
}
fn call(&mut self, request: Self::Request) -> S::Future {
@ -449,14 +194,8 @@ where
type Error = S::Error;
type Future = S::Future;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
unsafe {
let r = self.get_unchecked_mut();
Pin::new_unchecked(&mut (*(**r).borrow_mut())).poll_ready(ctx)
}
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.borrow_mut().poll_ready(ctx)
}
fn call(&mut self, request: Self::Request) -> S::Future {

View File

@ -1,19 +1,16 @@
use std::future::Future;
use std::marker::PhantomData;
use futures::{Future, Poll};
use super::{NewService, Service};
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use pin_project::pin_project;
use super::{NewService, Service};
/// Service for the `map` combinator, changing the type of a service's response.
///
/// This is created by the `ServiceExt::map` method.
#[pin_project]
pub struct Map<A, F, Response> {
#[pin]
service: A,
f: F,
_t: PhantomData<Response>,
@ -58,11 +55,8 @@ where
type Error = A::Error;
type Future = MapFuture<A, F, Response>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().service.poll_ready(ctx)
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx)
}
fn call(&mut self, req: A::Request) -> Self::Future {
@ -98,7 +92,7 @@ where
{
type Output = Result<Response, A::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.fut.poll(cx) {
Poll::Ready(Ok(resp)) => Poll::Ready(Ok((this.f)(resp))),
@ -191,7 +185,7 @@ where
{
type Output = Result<Map<A::Service, F, Res>, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(svc) = this.fut.poll(cx)? {
Poll::Ready(Ok(Map::new(svc, this.f.take().unwrap())))

View File

@ -1,20 +1,17 @@
use std::future::Future;
use std::marker::PhantomData;
use futures::{Future, Poll};
use super::{NewService, Service};
use std::pin::Pin;
use std::task::{Context, Poll};
use pin_project::pin_project;
use std::pin::Pin;
use std::task::Context;
use super::{NewService, Service};
/// Service for the `map_err` combinator, changing the type of a service's
/// error.
///
/// This is created by the `ServiceExt::map_err` method.
#[pin_project]
pub struct MapErr<A, F, E> {
#[pin]
service: A,
f: F,
_t: PhantomData<E>,
@ -59,12 +56,8 @@ where
type Error = E;
type Future = MapErrFuture<A, F, E>;
fn poll_ready(
mut self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let mut this = self.project();
this.service.poll_ready(ctx).map_err(this.f)
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(ctx).map_err(&self.f)
}
fn call(&mut self, req: A::Request) -> Self::Future {
@ -101,8 +94,7 @@ where
type Output = Result<A::Response, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
this.fut.poll(cx).map_err(this.f)
self.project().fut.poll(cx).map_err(&self.f)
}
}
@ -196,7 +188,7 @@ where
{
type Output = Result<MapErr<A::Service, F, E>, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(svc) = this.fut.poll(cx)? {
Poll::Ready(Ok(MapErr::new(svc, this.f.clone())))

View File

@ -1,12 +1,11 @@
use std::future::Future;
use std::marker::PhantomData;
use futures::{Future, Poll};
use super::NewService;
use std::pin::Pin;
use std::task::{Context, Poll};
use pin_project::pin_project;
use std::pin::Pin;
use std::task::Context;
use super::NewService;
/// `MapInitErr` service combinator
pub struct MapInitErr<A, F, E> {
@ -91,7 +90,7 @@ where
type Output = Result<A::Service, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let this = self.project();
this.fut.poll(cx).map_err(this.f)
}
}

View File

@ -0,0 +1,152 @@
use std::task::{Context, Poll};
use crate::and_then::{AndThen, AndThenNewService};
use crate::then::{Then, ThenNewService};
use crate::{IntoNewService, IntoService, NewService, Service};
pub fn pipeline<F, T>(service: F) -> Pipeline<T>
where
F: IntoService<T>,
T: Service,
{
Pipeline {
service: service.into_service(),
}
}
pub fn new_pipeline<F, T>(new_service: F) -> NewPipeline<T>
where
F: IntoNewService<T>,
T: NewService,
{
NewPipeline {
service: new_service.into_new_service(),
}
}
/// Pipeline service
pub struct Pipeline<T> {
service: T,
}
impl<T: Service> Pipeline<T> {
/// Call another service after call to this one has resolved successfully.
///
/// This function can be used to chain two services together and ensure that
/// the second service isn't called until call to the fist service have
/// finished. Result of the call to the first service is used as an
/// input parameter for the second service's call.
///
/// Note that this function consumes the receiving service and returns a
/// wrapped version of it.
pub fn and_then<F, U>(
self,
service: F,
) -> Pipeline<impl Service<Request = T::Request, Response = U::Response, Error = T::Error>>
where
Self: Sized,
F: IntoService<U>,
U: Service<Request = T::Response, Error = T::Error>,
{
Pipeline {
service: AndThen::new(self.service, service.into_service()),
}
}
/// Chain on a computation for when a call to the service finished,
/// passing the result of the call to the next service `U`.
///
/// Note that this function consumes the receiving pipeline and returns a
/// wrapped version of it.
pub fn then<F, U>(
self,
service: F,
) -> Pipeline<impl Service<Request = T::Request, Response = U::Response, Error = T::Error>>
where
Self: Sized,
F: IntoService<U>,
U: Service<Request = Result<T::Response, T::Error>, Error = T::Error>,
{
Pipeline {
service: Then::new(self.service, service.into_service()),
}
}
}
impl<T: Service> Service for Pipeline<T> {
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Future = T::Future;
#[inline]
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), T::Error>> {
self.service.poll_ready(ctx)
}
#[inline]
fn call(&mut self, req: T::Request) -> Self::Future {
self.service.call(req)
}
}
/// Pipeline constructor
pub struct NewPipeline<T> {
service: T,
}
impl<T: NewService> NewPipeline<T> {
/// Call another service after call to this one has resolved successfully.
pub fn and_then<F, U>(self, new_service: U) -> NewPipeline<AndThenNewService<T, U>>
where
Self: Sized,
F: IntoNewService<U>,
U: NewService<
Config = T::Config,
Request = T::Response,
Error = T::Error,
InitError = T::InitError,
>,
{
NewPipeline {
service: AndThenNewService::new(self.service, new_service.into_new_service()),
}
}
/// Create `NewService` to chain on a computation for when a call to the
/// service finished, passing the result of the call to the next
/// service `U`.
///
/// Note that this function consumes the receiving pipeline and returns a
/// wrapped version of it.
pub fn then<F, U>(self, new_service: F) -> NewPipeline<ThenNewService<T, U>>
where
Self: Sized,
F: IntoNewService<U>,
U: NewService<
Config = T::Config,
Request = Result<T::Response, T::Error>,
Error = T::Error,
InitError = T::InitError,
>,
{
NewPipeline {
service: ThenNewService::new(self.service, new_service.into_new_service()),
}
}
}
impl<T: NewService> NewService for NewPipeline<T> {
type Config = T::Config;
type Request = T::Request;
type Response = T::Response;
type Error = T::Error;
type Service = T::Service;
type InitError = T::InitError;
type Future = T::Future;
#[inline]
fn new_service(&self, cfg: &T::Config) -> Self::Future {
self.service.new_service(cfg)
}
}

View File

@ -1,21 +1,18 @@
use futures::{Future, Poll};
use std::future::Future;
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use pin_project::pin_project;
use super::{IntoNewService, NewService, Service};
use crate::cell::Cell;
use pin_project::pin_project;
/// Service for the `then` combinator, chaining a computation onto the end of
/// another service.
///
/// This is created by the `ServiceExt::then` method.
#[pin_project]
pub struct Then<A, B> {
#[pin]
a: A,
#[pin]
b: Cell<B>,
}
@ -52,13 +49,9 @@ where
type Error = B::Error;
type Future = ThenFuture<A, B>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
let this = self.project();
let not_ready = !this.a.poll_ready(ctx)?.is_ready();
if !this.b.get_pin().poll_ready(ctx)?.is_ready() || not_ready {
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let not_ready = !self.a.poll_ready(ctx)?.is_ready();
if !self.b.get_mut().poll_ready(ctx)?.is_ready() || not_ready {
Poll::Pending
} else {
Poll::Ready(Ok(()))
@ -247,7 +240,7 @@ where
{
type Output = Result<Then<A::Service, B::Service>, A::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if this.a.is_none() {
if let Poll::Ready(service) = this.fut_a.poll(cx)? {

View File

@ -1,11 +1,11 @@
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::task::{Context, Poll};
use crate::transform_err::{TransformFromErr, TransformMapInitErr};
use crate::{IntoNewService, NewService, Service};
use futures::{Future, Poll};
use std::pin::Pin;
use std::task::Context;
use pin_project::pin_project;
@ -221,7 +221,7 @@ where
{
type Output = Result<T::Transform, T::InitError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
if this.fut_t.as_mut().as_pin_mut().is_none() {

View File

@ -1,13 +1,12 @@
use std::future::Future;
use std::marker::PhantomData;
use futures::{Future, Poll};
use super::Transform;
use std::pin::Pin;
use std::task::Context;
use std::task::{Context, Poll};
use pin_project::pin_project;
use super::Transform;
/// Transform for the `map_err` combinator, changing the type of a new
/// transform's init error.
///
@ -85,7 +84,7 @@ where
{
type Output = Result<T::Transform, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.fut.poll(cx).map_err(this.f)
}
@ -162,7 +161,7 @@ where
{
type Output = Result<T::Transform, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().fut.poll(cx).map_err(E::from)
}
}