diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 458ac35a..edd68e4b 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -4,7 +4,7 @@ use std::fmt; use std::io::{self, Read, Write}; use bytes::BytesMut; -use futures::{Poll, Sink, Stream}; +use futures::{Poll, Sink, Stream}; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -223,19 +223,16 @@ impl Framed { } } - impl Stream for Framed where T: AsyncRead, U: Decoder, { - type Item = Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )} + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx) } } - - } impl Sink for Framed @@ -247,19 +244,22 @@ where type Error = U::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_ready(cx)} + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_ready(cx) } } fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { - unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).start_send(item)} + unsafe { + self.map_unchecked_mut(|s| s.inner.get_mut()) + .start_send(item) + } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_flush(cx)} + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_flush(cx) } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_close(cx)} + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_close(cx) } } } @@ -284,15 +284,17 @@ impl Read for Fuse { } } - impl AsyncRead for Fuse { - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.0.prepare_uninitialized_buffer(buf) } - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_read(cx, buf)} + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_read(cx, buf) } } } @@ -306,23 +308,24 @@ impl Write for Fuse { } } - impl AsyncWrite for Fuse { - - fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_write(cx, buf)} + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_write(cx, buf) } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_flush(cx)} + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_flush(cx) } } fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_shutdown(cx)} + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_shutdown(cx) } } } - impl Decoder for Fuse { type Item = U::Item; type Error = U::Error; diff --git a/actix-codec/src/framed_read.rs b/actix-codec/src/framed_read.rs index 30e0cd93..9d33e262 100644 --- a/actix-codec/src/framed_read.rs +++ b/actix-codec/src/framed_read.rs @@ -85,10 +85,10 @@ where T: AsyncRead, D: Decoder, { - type Item = Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )} + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx) } } } @@ -99,21 +99,32 @@ where type Error = T::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_ready(cx)} + unsafe { + self.map_unchecked_mut(|s| &mut s.inner.inner.0) + .poll_ready(cx) + } } fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).start_send(item)} + unsafe { + self.map_unchecked_mut(|s| &mut s.inner.inner.0) + .start_send(item) + } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_flush(cx)} + unsafe { + self.map_unchecked_mut(|s| &mut s.inner.inner.0) + .poll_flush(cx) + } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_close(cx)} + unsafe { + self.map_unchecked_mut(|s| &mut s.inner.inner.0) + .poll_close(cx) + } } - } impl fmt::Debug for FramedRead @@ -178,7 +189,7 @@ impl Stream for FramedRead2 where T: tokio_io::AsyncRead + Decoder, { - type Item = Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = unsafe { self.get_unchecked_mut() }; @@ -196,7 +207,6 @@ where Ok(None) => return Poll::Ready(None), Err(e) => return Poll::Ready(Some(Err(e))), } - } trace!("attempting to decode a frame"); @@ -206,15 +216,12 @@ where trace!("frame decoded from buffer"); return Poll::Ready(Some(Ok(frame))); } - Err(e) => { - return Poll::Ready(Some(Err(e))) - } + Err(e) => return Poll::Ready(Some(Err(e))), _ => { // Need more data } } - this.is_readable = false; } @@ -225,7 +232,6 @@ where // get a spurious 0 that looks like EOF this.buffer.reserve(1); unsafe { - match Pin::new_unchecked(&mut this.inner).poll_read(cx, &mut this.buffer) { Poll::Pending => return Poll::Pending, Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), diff --git a/actix-codec/src/framed_write.rs b/actix-codec/src/framed_write.rs index b2dfaf8e..278b9995 100644 --- a/actix-codec/src/framed_write.rs +++ b/actix-codec/src/framed_write.rs @@ -2,14 +2,14 @@ use std::fmt; use std::io::{self, Read}; use bytes::BytesMut; -use futures::{ready,Poll, Sink, Stream}; +use futures::{ready, Poll, Sink, Stream}; use log::trace; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; use super::framed::Fuse; -use std::task::Context; use std::pin::Pin; +use std::task::Context; /// A `Sink` of frames encoded to an `AsyncWrite`. pub struct FramedWrite { @@ -105,19 +105,19 @@ where type Error = E::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_ready(cx)} + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_ready(cx) } } fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).start_send(item)} + unsafe { self.map_unchecked_mut(|s| &mut s.inner).start_send(item) } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_flush(cx)} + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_flush(cx) } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_close(cx)} + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_close(cx) } } } @@ -128,7 +128,10 @@ where type Item = T::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_next(cx)} + unsafe { + self.map_unchecked_mut(|s| &mut s.inner.inner.0) + .poll_next(cx) + } } } @@ -230,7 +233,10 @@ where { type Error = T::Error; - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { let len = self.buffer.len(); if len >= self.high_watermark { return Poll::Pending; @@ -259,7 +265,9 @@ where while !this.buffer.is_empty() { trace!("writing; remaining={}", this.buffer.len()); - let n = ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_write(cx, &this.buffer))?; + let n = ready!( + unsafe { Pin::new_unchecked(&mut this.inner) }.poll_write(cx, &this.buffer) + )?; if n == 0 { return Poll::Ready(Err(io::Error::new( @@ -267,7 +275,7 @@ where "failed to \ write frame to transport", ) - .into())) + .into())); } // TODO: Add a way to `bytes` to do this w/o returning the drained @@ -276,7 +284,7 @@ where } // Try flushing the underlying IO - ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_flush(cx))?; + ready!(unsafe { Pin::new_unchecked(&mut this.inner) }.poll_flush(cx))?; trace!("framed transport flushed"); Poll::Ready(Ok(())) @@ -284,14 +292,14 @@ where fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = unsafe { self.get_unchecked_mut() }; - ready!(unsafe {Pin::new_unchecked(&mut this).map_unchecked_mut(|s|*s)}.poll_flush(cx))?; - ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_shutdown(cx))?; + ready!( + unsafe { Pin::new_unchecked(&mut this).map_unchecked_mut(|s| *s) }.poll_flush(cx) + )?; + ready!(unsafe { Pin::new_unchecked(&mut this.inner) }.poll_shutdown(cx))?; Poll::Ready(Ok(())) } - - /* fn start_send(&mut self, item: T::Item) -> StartSend { // Check the buffer capacity @@ -368,7 +376,11 @@ impl AsyncRead for FramedWrite2 { self.inner.prepare_uninitialized_buffer(buf) } - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_read(cx, buf)} + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_read(cx, buf) } } } diff --git a/actix-connect/src/lib.rs b/actix-connect/src/lib.rs index dbeff07c..e79c9d1f 100644 --- a/actix-connect/src/lib.rs +++ b/actix-connect/src/lib.rs @@ -70,7 +70,7 @@ pub fn start_default_resolver() -> AsyncResolver { pub fn new_connector( resolver: AsyncResolver, ) -> impl Service, Response = Connection, Error = ConnectError> - + Clone { + + Clone { Resolver::new(resolver).and_then(TcpConnector::new()) } @@ -90,7 +90,7 @@ pub fn new_connector_factory( /// Create connector service with default parameters pub fn default_connector( ) -> impl Service, Response = Connection, Error = ConnectError> - + Clone { + + Clone { Resolver::default().and_then(TcpConnector::new()) } diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index cfbbf418..7f9f473d 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -1,10 +1,10 @@ use std::any::{Any, TypeId}; use std::cell::{Cell, RefCell}; use std::collections::HashMap; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::{fmt, thread}; use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; +use std::{fmt, thread}; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::channel::oneshot::{channel, Canceled, Sender}; @@ -27,7 +27,7 @@ pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0); pub(crate) enum ArbiterCommand { Stop, - Execute(Box + Unpin + Send>), + Execute(Box + Unpin + Send>), ExecuteFn(Box), } @@ -133,7 +133,7 @@ impl Arbiter { for fut in v.drain(..) { // We pin the boxed future, so it can never again be moved. let fut = unsafe { Pin::new_unchecked(fut) }; - tokio_executor::current_thread::spawn( fut); + tokio_executor::current_thread::spawn(fut); } }); } @@ -146,8 +146,8 @@ impl Arbiter { /// or Arbiter address, it is simply a helper for spawning futures on the current /// thread. pub fn spawn(future: F) - where - F: Future + 'static, + where + F: Future + 'static, { RUNNING.with(move |cell| { if cell.get() { @@ -156,9 +156,7 @@ impl Arbiter { } else { // Box the future and push it to the queue, this results in double boxing // because the executor boxes the future again, but works for now - Q.with(move |cell| { - cell.borrow_mut().push(Box::alloc().init(future)) - }); + Q.with(move |cell| cell.borrow_mut().push(Box::alloc().init(future))); } }); } @@ -167,17 +165,17 @@ impl Arbiter { /// or Arbiter address, it is simply a helper for executing futures on the current /// thread. pub fn spawn_fn(f: F) - where - F: FnOnce() -> R + 'static, - R: Future + 'static, + where + F: FnOnce() -> R + 'static, + R: Future + 'static, { Arbiter::spawn(future::lazy(|_| f()).flatten()) } /// Send a future to the Arbiter's thread, and spawn it. pub fn send(&self, future: F) - where - F: Future + Send + Unpin + 'static, + where + F: Future + Send + Unpin + 'static, { let _ = self .0 @@ -187,8 +185,8 @@ impl Arbiter { /// Send a function to the Arbiter's thread, and execute it. Any result from the function /// is discarded. pub fn exec_fn(&self, f: F) - where - F: FnOnce() + Send + 'static, + where + F: FnOnce() + Send + 'static, { let _ = self .0 @@ -200,10 +198,10 @@ impl Arbiter { /// Send a function to the Arbiter's thread. This function will be executed asynchronously. /// A future is created, and when resolved will contain the result of the function sent /// to the Arbiters thread. - pub fn exec(&self, f: F) -> impl Future> - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, + pub fn exec(&self, f: F) -> impl Future> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, { let (tx, rx) = channel(); let _ = self @@ -230,8 +228,8 @@ impl Arbiter { /// /// Panics is item is not inserted pub fn get_item(mut f: F) -> R - where - F: FnMut(&T) -> R, + where + F: FnMut(&T) -> R, { STORAGE.with(move |cell| { let st = cell.borrow(); @@ -247,8 +245,8 @@ impl Arbiter { /// /// Panics is item is not inserted pub fn get_mut_item(mut f: F) -> R - where - F: FnMut(&mut T) -> R, + where + F: FnMut(&mut T) -> R, { STORAGE.with(move |cell| { let mut st = cell.borrow_mut(); @@ -285,28 +283,22 @@ impl Future for ArbiterController { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { match Pin::new(&mut self.rx).poll_next(cx) { - Poll::Ready(None) => { - return Poll::Ready(()) - }, - Poll::Ready(Some(item)) => { - match item { - ArbiterCommand::Stop => { - if let Some(stop) = self.stop.take() { - let _ = stop.send(0); - }; - return Poll::Ready(()); - } - ArbiterCommand::Execute(fut) => { - spawn(fut); - } - ArbiterCommand::ExecuteFn(f) => { - f.call_box(); - } + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(item)) => match item { + ArbiterCommand::Stop => { + if let Some(stop) = self.stop.take() { + let _ = stop.send(0); + }; + return Poll::Ready(()); + } + ArbiterCommand::Execute(fut) => { + spawn(fut); + } + ArbiterCommand::ExecuteFn(f) => { + f.call_box(); } - } - Poll::Pending => { - return Poll::Pending }, + Poll::Pending => return Poll::Pending, } } } @@ -372,8 +364,8 @@ pub trait FnExec: Send + 'static { } impl FnExec for F - where - F: FnOnce() + Send + 'static, +where + F: FnOnce() + Send + 'static, { #[allow(clippy::boxed_local)] fn call_box(self: Box) { diff --git a/actix-rt/src/builder.rs b/actix-rt/src/builder.rs index 1aa95045..1083b53a 100644 --- a/actix-rt/src/builder.rs +++ b/actix-rt/src/builder.rs @@ -1,14 +1,14 @@ use std::borrow::Cow; use std::io; -use futures::{future, FutureExt}; -use futures::future::{lazy, Future}; use futures::channel::mpsc::unbounded; use futures::channel::oneshot::{channel, Receiver}; +use futures::future::{lazy, Future}; +use futures::{future, FutureExt}; use tokio::runtime::current_thread::Handle; -use tokio_timer::{timer::Timer, clock::Clock}; use tokio_net::driver::Reactor; +use tokio_timer::{clock::Clock, timer::Timer}; use crate::arbiter::{Arbiter, SystemArbiter}; use crate::runtime::Runtime; @@ -159,7 +159,7 @@ pub(crate) struct AsyncSystemRunner { impl AsyncSystemRunner { /// This function will start event loop and returns a future that /// resolves once the `System::stop()` function is called. - pub(crate) fn run_nonblocking(self) -> impl Future> + Send { + pub(crate) fn run_nonblocking(self) -> impl Future> + Send { let AsyncSystemRunner { stop, .. } = self; // run loop @@ -182,7 +182,8 @@ impl AsyncSystemRunner { Arbiter::stop_system(); return res; } - }).flatten() + }) + .flatten() } } diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 97c56102..eeb768b2 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -20,7 +20,7 @@ pub use actix_threadpool as blocking; /// This function panics if actix system is not running. pub fn spawn(f: F) where - F: futures::Future + 'static, + F: futures::Future + 'static, { if !System::is_set() { panic!("System is not running"); diff --git a/actix-rt/src/runtime.rs b/actix-rt/src/runtime.rs index 1a4e707d..7f42a541 100644 --- a/actix-rt/src/runtime.rs +++ b/actix-rt/src/runtime.rs @@ -3,8 +3,11 @@ use std::{fmt, io}; use futures::Future; use tokio_executor::current_thread::{self, CurrentThread}; -use tokio_timer::{timer::{self, Timer}, clock::Clock}; -use tokio_net::driver::{Reactor, Handle as ReactorHandle}; +use tokio_net::driver::{Handle as ReactorHandle, Reactor}; +use tokio_timer::{ + clock::Clock, + timer::{self, Timer}, +}; use crate::builder::Builder; @@ -95,7 +98,7 @@ impl Runtime { /// is currently at capacity and is unable to spawn a new future. pub fn spawn(&mut self, future: F) -> &mut Self where - F: Future + 'static, + F: Future + 'static, { self.executor.spawn(future); self @@ -149,7 +152,7 @@ impl Runtime { // WARN: We do not enter the executor here, since in tokio 0.2 the executor is entered // automatically inside its `block_on` and `run` methods - tokio_executor::with_default(&mut current_thread::TaskExecutor::current(),|| { + tokio_executor::with_default(&mut current_thread::TaskExecutor::current(), || { tokio_timer::clock::with_default(clock, || { let _reactor_guard = tokio_net::driver::set_default(reactor_handle); let _timer_guard = tokio_timer::set_default(timer_handle); diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index af76325a..ca5a7368 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -28,4 +28,4 @@ pin-project = "0.4.0-alpha.11" [dev-dependencies] tokio = "0.2.0-alpha.5" -actix-rt = "0.2" \ No newline at end of file +actix-rt = "0.2" diff --git a/actix-service/src/and_then.rs b/actix-service/src/and_then.rs index 44bb1a45..6d023f53 100644 --- a/actix-service/src/and_then.rs +++ b/actix-service/src/and_then.rs @@ -1,21 +1,18 @@ -use futures::{Future, Poll}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use pin_project::pin_project; use super::{IntoNewService, NewService, Service}; use crate::cell::Cell; -use pin_project::pin_project; -use std::pin::Pin; -use std::task::Context; - /// Service for the `and_then` combinator, chaining a computation onto the end /// of another service which completes successfully. /// /// This is created by the `ServiceExt::and_then` method. -#[pin_project] pub struct AndThen { - #[pin] a: A, - #[pin] b: Cell, } @@ -52,13 +49,9 @@ where type Error = A::Error; type Future = AndThenFuture; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - let this = self.project(); - let not_ready = !this.a.poll_ready(cx)?.is_ready(); - if !this.b.get_pin().poll_ready(cx)?.is_ready() || not_ready { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + let not_ready = !self.a.poll_ready(cx)?.is_ready(); + if !self.b.get_mut().poll_ready(cx)?.is_ready() || not_ready { Poll::Pending } else { Poll::Ready(Ok(())) diff --git a/actix-service/src/and_then_apply.rs b/actix-service/src/and_then_apply.rs deleted file mode 100644 index f26b1517..00000000 --- a/actix-service/src/and_then_apply.rs +++ /dev/null @@ -1,197 +0,0 @@ -use std::rc::Rc; - -use futures::{Future, Poll}; - -use crate::and_then::AndThen; -use crate::from_err::FromErr; -use crate::{NewService, Transform}; - -use pin_project::pin_project; -use std::pin::Pin; -use std::task::Context; - -/// `Apply` new service combinator -pub struct AndThenTransform { - a: A, - b: B, - t: Rc, -} - -impl AndThenTransform -where - A: NewService, - B: NewService, - T: Transform, - T::Error: From, -{ - /// Create new `ApplyNewService` new service instance - pub fn new(t: T, a: A, b: B) -> Self { - Self { - a, - b, - t: Rc::new(t), - } - } -} - -impl Clone for AndThenTransform -where - A: Clone, - B: Clone, -{ - fn clone(&self) -> Self { - Self { - a: self.a.clone(), - b: self.b.clone(), - t: self.t.clone(), - } - } -} - -impl NewService for AndThenTransform -where - A: NewService, - B: NewService, - T: Transform, - T::Error: From, -{ - type Request = A::Request; - type Response = T::Response; - type Error = T::Error; - - type Config = A::Config; - type InitError = T::InitError; - type Service = AndThen, T::Transform>; - type Future = AndThenTransformFuture; - - fn new_service(&self, cfg: &A::Config) -> Self::Future { - AndThenTransformFuture { - a: None, - t: None, - t_cell: self.t.clone(), - fut_a: self.a.new_service(cfg), - fut_b: self.b.new_service(cfg), - fut_t: None, - } - } -} - -#[pin_project] -pub struct AndThenTransformFuture -where - A: NewService, - B: NewService, - T: Transform, - T::Error: From, -{ - #[pin] - fut_a: A::Future, - #[pin] - fut_b: B::Future, - #[pin] - fut_t: Option, - a: Option, - t: Option, - t_cell: Rc, -} - -impl Future for AndThenTransformFuture -where - A: NewService, - B: NewService, - T: Transform, - T::Error: From, -{ - type Output = Result, T::Transform>, T::InitError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - - if this.fut_t.is_none() { - if let Poll::Ready(svc) = this.fut_b.poll(cx)? { - this.fut_t.set(Some(this.t_cell.new_transform(svc))) - } - } - - if this.a.is_none() { - if let Poll::Ready(svc) = this.fut_a.poll(cx)? { - *this.a = Some(svc) - } - } - - if let Some(fut) = this.fut_t.as_pin_mut() { - if let Poll::Ready(transform) = fut.poll(cx)? { - *this.t = Some(transform) - } - } - - if this.a.is_some() && this.t.is_some() { - Poll::Ready(Ok(AndThen::new( - FromErr::new(this.a.take().unwrap()), - this.t.take().unwrap(), - ))) - } else { - Poll::Pending - } - } -} - -#[cfg(test)] -mod tests { - use futures::future::{ok, ready, Ready}; - use futures::{Future, FutureExt, Poll, TryFutureExt}; - - use crate::{IntoNewService, IntoService, NewService, Service, ServiceExt}; - use std::pin::Pin; - use std::task::Context; - - #[derive(Clone)] - struct Srv; - - impl Service for Srv { - type Request = (); - type Response = (); - type Error = (); - type Future = Ready>; - - fn poll_ready( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(Err(())) - } - - fn call(&mut self, _: ()) -> Self::Future { - ok(()) - } - } - - #[tokio::test] - async fn test_apply() { - let blank = |req| ready(Ok(req)); - - let mut srv = blank - .into_service() - .apply_fn(Srv, |req: &'static str, srv: &mut Srv| { - srv.call(()).map_ok(move |res| (req, res)) - }); - let res = srv.call("srv").await; - assert!(res.is_ok()); - assert_eq!(res.unwrap(), (("srv", ()))); - } - - #[tokio::test] - async fn test_new_service() { - let blank = move || ok::<_, ()>((|req| ok(req)).into_service()); - - let new_srv = blank.into_new_service().apply( - |req: &'static str, srv: &mut Srv| srv.call(()).map_ok(move |res| (req, res)), - || ok(Srv), - ); - let mut srv = new_srv.new_service(&()).await.unwrap(); - - let res = srv.call("srv").await; - assert!(res.is_ok()); - assert_eq!(res.unwrap(), (("srv", ()))); - } -} diff --git a/actix-service/src/and_then_apply_fn.rs b/actix-service/src/and_then_apply_fn.rs deleted file mode 100644 index 6b006c73..00000000 --- a/actix-service/src/and_then_apply_fn.rs +++ /dev/null @@ -1,336 +0,0 @@ -use futures::{Future, Poll}; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::Context; - -use super::{IntoNewService, IntoService, NewService, Service}; -use crate::cell::Cell; - -use crate::IntoFuture; -use pin_project::pin_project; - -/// `Apply` service combinator -#[pin_project] -pub struct AndThenApply -where - A: Service, - B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, -{ - #[pin] - a: A, - #[pin] - b: Cell, - #[pin] - f: Cell, - r: PhantomData<(Out,)>, -} - -impl AndThenApply -where - A: Service, - B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, -{ - /// Create new `Apply` combinator - pub fn new, B1: IntoService>(a: A1, b: B1, f: F) -> Self { - Self { - f: Cell::new(f), - a: a.into_service(), - b: Cell::new(b.into_service()), - r: PhantomData, - } - } -} - -impl Clone for AndThenApply -where - A: Service + Clone, - B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, -{ - fn clone(&self) -> Self { - AndThenApply { - a: self.a.clone(), - b: self.b.clone(), - f: self.f.clone(), - r: PhantomData, - } - } -} - -impl Service for AndThenApply -where - A: Service, - B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, -{ - type Request = A::Request; - type Response = Out::Item; - type Error = A::Error; - type Future = AndThenApplyFuture; - - fn poll_ready( - mut self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - let this = self.project(); - let not_ready = !this.a.poll_ready(ctx)?.is_ready(); - if !this.b.get_pin().poll_ready(ctx).is_ready() || not_ready { - Poll::Pending - } else { - Poll::Ready(Ok(())) - } - } - - fn call(&mut self, req: A::Request) -> Self::Future { - AndThenApplyFuture { - b: self.b.clone(), - f: self.f.clone(), - fut_b: None, - fut_a: Some(self.a.call(req)), - } - } -} - -#[pin_project] -pub struct AndThenApplyFuture -where - A: Service, - B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, -{ - b: Cell, - f: Cell, - #[pin] - fut_a: Option, - #[pin] - fut_b: Option, -} - -impl Future for AndThenApplyFuture -where - A: Service, - B: Service, - F: FnMut(A::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, -{ - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - loop { - if let Some(fut) = this.fut_b.as_mut().as_pin_mut() { - return fut.poll(cx).map_err(|e| e.into()); - } - - match this - .fut_a - .as_mut() - .as_pin_mut() - .expect("Bug in actix-service") - .poll(cx)? - { - Poll::Ready(resp) => { - this.fut_a.set(None); - this.fut_b.set(Some( - (&mut *this.f.get_mut())(resp, this.b.get_mut()).into_future(), - )); - } - Poll::Pending => return Poll::Pending, - } - } - } -} - -/// `ApplyNewService` new service combinator -pub struct AndThenApplyNewService { - a: A, - b: B, - f: Cell, - r: PhantomData, -} - -impl AndThenApplyNewService -where - A: NewService, - B: NewService, - F: FnMut(A::Response, &mut B::Service) -> Out, - Out: IntoFuture, - Out::Error: Into, -{ - /// Create new `ApplyNewService` new service instance - pub fn new, B1: IntoNewService>(a: A1, b: B1, f: F) -> Self { - Self { - f: Cell::new(f), - a: a.into_new_service(), - b: b.into_new_service(), - r: PhantomData, - } - } -} - -impl Clone for AndThenApplyNewService -where - A: Clone, - B: Clone, -{ - fn clone(&self) -> Self { - Self { - a: self.a.clone(), - b: self.b.clone(), - f: self.f.clone(), - r: PhantomData, - } - } -} - -impl NewService for AndThenApplyNewService -where - A: NewService, - B: NewService, - F: FnMut(A::Response, &mut B::Service) -> Out, - Out: IntoFuture, - Out::Error: Into, -{ - type Request = A::Request; - type Response = Out::Item; - type Error = A::Error; - type Service = AndThenApply; - type Config = A::Config; - type InitError = A::InitError; - type Future = AndThenApplyNewServiceFuture; - - fn new_service(&self, cfg: &A::Config) -> Self::Future { - AndThenApplyNewServiceFuture { - a: None, - b: None, - f: self.f.clone(), - fut_a: self.a.new_service(cfg), - fut_b: self.b.new_service(cfg), - } - } -} - -#[pin_project] -pub struct AndThenApplyNewServiceFuture -where - A: NewService, - B: NewService, - F: FnMut(A::Response, &mut B::Service) -> Out, - Out: IntoFuture, - Out::Error: Into, -{ - #[pin] - fut_b: B::Future, - #[pin] - fut_a: A::Future, - f: Cell, - a: Option, - b: Option, -} - -impl Future for AndThenApplyNewServiceFuture -where - A: NewService, - B: NewService, - F: FnMut(A::Response, &mut B::Service) -> Out, - Out: IntoFuture, - Out::Error: Into, -{ - type Output = Result, A::InitError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - if this.a.is_none() { - if let Poll::Ready(service) = this.fut_a.poll(cx)? { - *this.a = Some(service); - } - } - - if this.b.is_none() { - if let Poll::Ready(service) = this.fut_b.poll(cx)? { - *this.b = Some(service); - } - } - - if this.a.is_some() && this.b.is_some() { - Poll::Ready(Ok(AndThenApply { - f: this.f.clone(), - a: this.a.take().unwrap(), - b: Cell::new(this.b.take().unwrap()), - r: PhantomData, - })) - } else { - Poll::Pending - } - } -} - -#[cfg(test)] -mod tests { - use futures::future::{ok, Ready}; - use futures::{Future, Poll, TryFutureExt}; - - use crate::blank::{Blank, BlankNewService}; - use crate::{NewService, Service, ServiceExt}; - use std::pin::Pin; - use std::task::Context; - - #[derive(Clone)] - struct Srv; - - impl Service for Srv { - type Request = (); - type Response = (); - type Error = (); - type Future = Ready>; - - fn poll_ready( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: ()) -> Self::Future { - ok(()) - } - } - - #[tokio::test] - async fn test_call() { - let mut srv = Blank::new().apply_fn(Srv, |req: &'static str, srv| { - srv.call(()).map_ok(move |res| (req, res)) - }); - assert_eq!(srv.poll_once().await, Poll::Ready(Ok(()))); - let res = srv.call("srv").await; - assert!(res.is_ok()); - assert_eq!(res.unwrap(), (("srv", ()))); - } - - #[tokio::test] - async fn test_new_service() { - let new_srv = BlankNewService::new_unit().apply_fn( - || ok(Srv), - |req: &'static str, srv| srv.call(()).map_ok(move |res| (req, res)), - ); - let mut srv = new_srv.new_service(&()).await.unwrap(); - assert_eq!(srv.poll_once().await, Poll::Ready(Ok(()))); - let res = srv.call("srv").await; - assert!(res.is_ok()); - assert_eq!(res.unwrap(), (("srv", ()))); - } -} diff --git a/actix-service/src/apply.rs b/actix-service/src/apply.rs index 9bb96773..4e5bce34 100644 --- a/actix-service/src/apply.rs +++ b/actix-service/src/apply.rs @@ -1,13 +1,11 @@ -use std::marker::PhantomData; - -use futures::{ready, Future, Poll}; - -use super::{IntoNewService, IntoService, NewService, Service}; - -use crate::IntoFuture; use pin_project::pin_project; +use std::future::Future; +use std::marker::PhantomData; use std::pin::Pin; -use std::task::Context; +use std::task::{Context, Poll}; + +use super::IntoFuture; +use super::{IntoNewService, IntoService, NewService, Service}; /// Apply tranform function to a service pub fn apply_fn(service: U, f: F) -> Apply @@ -89,11 +87,8 @@ where type Error = Out::Error; type Future = Out::Future; - fn poll_ready( - mut self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(ready!(self.project().service.poll_ready(ctx)).map_err(|e| e.into())) + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + Poll::Ready(futures::ready!(self.service.poll_ready(ctx)).map_err(|e| e.into())) } fn call(&mut self, req: In) -> Self::Future { diff --git a/actix-service/src/apply_cfg.rs b/actix-service/src/apply_cfg.rs index d0ce6fa1..09a8a6e9 100644 --- a/actix-service/src/apply_cfg.rs +++ b/actix-service/src/apply_cfg.rs @@ -1,14 +1,13 @@ +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; -use futures::future::Future; -use futures::{ready, Poll}; +use futures::ready; +use pin_project::pin_project; use crate::cell::Cell; use crate::{IntoFuture, IntoService, NewService, Service}; -use std::pin::Pin; -use std::task::Context; - -use pin_project::pin_project; /// Convert `Fn(&Config, &mut Service) -> Future` fn to a NewService pub fn apply_cfg( @@ -144,7 +143,7 @@ where { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { Poll::Ready(Ok(ready!(self.project().fut.poll(cx))?.into_service())) } } @@ -226,7 +225,6 @@ where { cfg: C, f: Cell, - #[pin] srv: Option, #[pin] srv_fut: Option, @@ -255,7 +253,7 @@ where Poll::Pending => return Poll::Pending, Poll::Ready(srv) => { this.srv_fut.set(None); - this.srv.set(Some(srv)); + *this.srv = Some(srv); continue 'poll; } } @@ -263,13 +261,11 @@ where if let Some(fut) = this.fut.as_mut().as_pin_mut() { return Poll::Ready(Ok(ready!(fut.poll(cx))?.into_service())); - } else if let Some(mut srv) = this.srv.as_mut().as_pin_mut() { - match srv.as_mut().poll_ready(cx)? { + } else if let Some(ref mut srv) = this.srv { + match srv.poll_ready(cx)? { Poll::Ready(_) => { - this.fut.set(Some( - this.f.get_mut()(&this.cfg, unsafe { Pin::get_unchecked_mut(srv) }) - .into_future(), - )); + this.fut + .set(Some(this.f.get_mut()(&this.cfg, srv).into_future())); continue 'poll; } Poll::Pending => return Poll::Pending, diff --git a/actix-service/src/boxed.rs b/actix-service/src/boxed.rs index b405ca26..d81f9722 100644 --- a/actix-service/src/boxed.rs +++ b/actix-service/src/boxed.rs @@ -1,11 +1,13 @@ -use std::pin::Pin; +#![allow(unused_imports, unused_variables, dead_code)] -use crate::{IntoFuture, NewService, Service, ServiceExt}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::{IntoFuture, NewService, Service}; use futures::future::FutureExt; use futures::future::LocalBoxFuture; use futures::future::{err, ok, Either, Ready}; -use futures::{Future, Poll}; -use std::task::Context; pub type BoxedService = Box< dyn Service< @@ -144,11 +146,8 @@ where LocalBoxFuture<'static, Result>, >; - fn poll_ready( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - unimplemented!() + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + self.0.poll_ready(ctx) } fn call(&mut self, req: Self::Request) -> Self::Future { @@ -156,10 +155,6 @@ where } /* - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.0.poll_ready() - } - fn call(&mut self, req: Self::Request) -> Self::Future { let mut fut = self.0.call(req); match fut.poll() { diff --git a/actix-service/src/cell.rs b/actix-service/src/cell.rs index e9bb1bcf..20be519f 100644 --- a/actix-service/src/cell.rs +++ b/actix-service/src/cell.rs @@ -1,5 +1,4 @@ -//! Custom cell impl -use std::pin::Pin; +//! Custom cell impl, internal use only use std::{cell::UnsafeCell, fmt, rc::Rc}; pub(crate) struct Cell { @@ -34,9 +33,6 @@ impl Cell { pub(crate) fn get_mut(&mut self) -> &mut T { unsafe { &mut *self.inner.as_ref().get() } } - pub(crate) fn get_pin(self: Pin<&mut Self>) -> Pin<&mut T> { - unsafe { Pin::new_unchecked(&mut *Pin::get_unchecked_mut(self).inner.as_ref().get()) } - } #[allow(clippy::mut_from_ref)] pub(crate) unsafe fn get_mut_unsafe(&self) -> &mut T { diff --git a/actix-service/src/fn_service.rs b/actix-service/src/fn_service.rs index 5deeec66..14b9e4a8 100644 --- a/actix-service/src/fn_service.rs +++ b/actix-service/src/fn_service.rs @@ -1,14 +1,13 @@ +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::future::{ok, Ready}; +use pin_project::pin_project; use crate::IntoFuture; -use futures::future::{ok, Future, Ready}; -use futures::{ready, Poll}; - use crate::{IntoNewService, IntoService, NewService, Service}; -use std::pin::Pin; -use std::task::Context; - -use pin_project::pin_project; /// Create `NewService` for function that can act as a Service pub fn service_fn(f: F) -> NewServiceFn @@ -80,10 +79,7 @@ where type Error = Out::Error; type Future = Out::Future; - fn poll_ready( - self: Pin<&mut Self>, - _ctx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(&mut self, _ctx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } @@ -238,8 +234,10 @@ where { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Poll::Ready(Ok(ready!(self.project().fut.poll(cx))?.into_service())) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Poll::Ready(Ok( + futures::ready!(self.project().fut.poll(cx))?.into_service() + )) } } diff --git a/actix-service/src/fn_transform.rs b/actix-service/src/fn_transform.rs deleted file mode 100644 index ea132ada..00000000 --- a/actix-service/src/fn_transform.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::marker::PhantomData; - -use futures::future::{ok, Ready}; - -use crate::apply::Apply; -use crate::{IntoFuture, IntoTransform, Service, Transform}; - -/// Use function as transform service -pub fn transform_fn( - f: F, -) -> impl Transform -where - S: Service, - F: FnMut(In, &mut S) -> Out + Clone, - Out: IntoFuture, - Out::Error: From, -{ - FnTransform::new(f) -} - -pub struct FnTransform -where - F: FnMut(In, &mut S) -> Out + Clone, - Out: IntoFuture, -{ - f: F, - _t: PhantomData<(S, In, Out, Err)>, -} - -impl FnTransform -where - F: FnMut(In, &mut S) -> Out + Clone, - Out: IntoFuture, -{ - pub fn new(f: F) -> Self { - FnTransform { f, _t: PhantomData } - } -} - -impl Transform for FnTransform -where - S: Service, - F: FnMut(In, &mut S) -> Out + Clone, - Out: IntoFuture, - Out::Error: From, -{ - type Request = In; - type Response = Out::Item; - type Error = Out::Error; - type Transform = Apply; - type InitError = Err; - type Future = Ready>; - - fn new_transform(&self, service: S) -> Self::Future { - ok(Apply::new(service, self.f.clone())) - } -} - -impl IntoTransform, S> for F -where - S: Service, - F: FnMut(In, &mut S) -> Out + Clone, - Out: IntoFuture, - Out::Error: From, -{ - fn into_transform(self) -> FnTransform { - FnTransform::new(self) - } -} - -impl Clone for FnTransform -where - F: FnMut(In, &mut S) -> Out + Clone, - Out: IntoFuture, -{ - fn clone(&self) -> Self { - Self::new(self.f.clone()) - } -} diff --git a/actix-service/src/from_err.rs b/actix-service/src/from_err.rs deleted file mode 100644 index 72281972..00000000 --- a/actix-service/src/from_err.rs +++ /dev/null @@ -1,243 +0,0 @@ -use std::marker::PhantomData; - -use futures::{Future, Poll}; - -use super::{NewService, Service}; -use std::pin::Pin; -use std::task::Context; - -use pin_project::pin_project; - -/// Service for the `from_err` combinator, changing the error type of a service. -/// -/// This is created by the `ServiceExt::from_err` method. -#[pin_project] -pub struct FromErr { - #[pin] - service: A, - f: PhantomData, -} - -impl FromErr { - pub(crate) fn new(service: A) -> Self - where - A: Service, - E: From, - { - FromErr { - service, - f: PhantomData, - } - } -} - -impl Clone for FromErr -where - A: Clone, -{ - fn clone(&self) -> Self { - FromErr { - service: self.service.clone(), - f: PhantomData, - } - } -} - -impl Service for FromErr -where - A: Service, - E: From, -{ - type Request = A::Request; - type Response = A::Response; - type Error = E; - type Future = FromErrFuture; - - fn poll_ready( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - self.project().service.poll_ready(ctx).map_err(E::from) - } - - fn call(&mut self, req: A::Request) -> Self::Future { - FromErrFuture { - fut: self.service.call(req), - f: PhantomData, - } - } -} - -#[pin_project] -pub struct FromErrFuture { - #[pin] - fut: A::Future, - f: PhantomData, -} - -impl Future for FromErrFuture -where - A: Service, - E: From, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.project().fut.poll(cx).map_err(E::from) - } -} - -/// NewService for the `from_err` combinator, changing the type of a new -/// service's error. -/// -/// This is created by the `NewServiceExt::from_err` method. -pub struct FromErrNewService { - a: A, - e: PhantomData, -} - -impl FromErrNewService { - /// Create new `FromErr` new service instance - pub fn new(a: A) -> Self - where - A: NewService, - E: From, - { - Self { a, e: PhantomData } - } -} - -impl Clone for FromErrNewService -where - A: Clone, -{ - fn clone(&self) -> Self { - Self { - a: self.a.clone(), - e: PhantomData, - } - } -} - -impl NewService for FromErrNewService -where - A: NewService, - E: From, -{ - type Request = A::Request; - type Response = A::Response; - type Error = E; - - type Config = A::Config; - type Service = FromErr; - type InitError = A::InitError; - type Future = FromErrNewServiceFuture; - - fn new_service(&self, cfg: &A::Config) -> Self::Future { - FromErrNewServiceFuture { - fut: self.a.new_service(cfg), - e: PhantomData, - } - } -} - -#[pin_project] -pub struct FromErrNewServiceFuture -where - A: NewService, - E: From, -{ - #[pin] - fut: A::Future, - e: PhantomData, -} - -impl Future for FromErrNewServiceFuture -where - A: NewService, - E: From, -{ - type Output = Result, A::InitError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if let Poll::Ready(svc) = self.project().fut.poll(cx)? { - Poll::Ready(Ok(FromErr::new(svc))) - } else { - Poll::Pending - } - } - - /* - fn poll(&mut self) -> Poll { - if let Poll::Ready(service) = self.fut.poll()? { - Ok(Poll::Ready(FromErr::new(service))) - } else { - Ok(Poll::Pending) - } - } - */ -} - -#[cfg(test)] -mod tests { - use futures::future::{err, Ready}; - - use super::*; - use crate::{IntoNewService, NewService, Service, ServiceExt}; - use tokio::future::ok; - - struct Srv; - - impl Service for Srv { - type Request = (); - type Response = (); - type Error = (); - type Future = Ready>; - - fn poll_ready( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - Poll::Ready(Err(())) - } - - fn call(&mut self, _: ()) -> Self::Future { - err(()) - } - } - - #[derive(Debug, PartialEq)] - struct Error; - - impl From<()> for Error { - fn from(_: ()) -> Self { - Error - } - } - - #[tokio::test] - async fn test_poll_ready() { - let mut srv = Srv.from_err::(); - let res = srv.poll_once().await; - - assert_eq!(res, Poll::Ready(Err(Error))); - } - - #[tokio::test] - async fn test_call() { - let mut srv = Srv.from_err::(); - let res = srv.call(()).await; - assert!(res.is_err()); - assert_eq!(res.err().unwrap(), Error); - } - - #[tokio::test] - async fn test_new_service() { - let blank = || ok::<_, ()>(Srv); - let new_srv = blank.into_new_service().from_err::(); - let mut srv = new_srv.new_service(&()).await.unwrap(); - let res = srv.call(()).await; - assert!(res.is_err()); - assert_eq!(res.err().unwrap(), Error); - } -} diff --git a/actix-service/src/lib.rs b/actix-service/src/lib.rs index 185e79d4..3e6fa6a0 100644 --- a/actix-service/src/lib.rs +++ b/actix-service/src/lib.rs @@ -1,45 +1,33 @@ -use futures::future::{ready, LocalBoxFuture, Ready}; -use futures::{Future, Poll}; use std::cell::RefCell; -use std::pin::Pin; +use std::future::Future; use std::rc::Rc; use std::sync::Arc; -use std::task; -use std::task::Context; +use std::task::{self, Context, Poll}; mod cell; mod and_then; -mod and_then_apply; -mod and_then_apply_fn; mod apply; mod apply_cfg; -pub mod blank; pub mod boxed; mod fn_service; -mod fn_transform; -mod from_err; mod map; mod map_config; mod map_err; mod map_init_err; +mod pipeline; mod then; mod transform; mod transform_err; -pub use self::and_then::{AndThen, AndThenNewService}; - -use self::and_then_apply::AndThenTransform; -use self::and_then_apply_fn::{AndThenApply, AndThenApplyNewService}; -pub use self::apply::{apply_fn, new_apply_fn, Apply, ApplyNewService}; +pub use self::apply::{apply_fn, new_apply_fn}; pub use self::apply_cfg::{apply_cfg, new_apply_cfg}; pub use self::fn_service::{new_service_cfg, new_service_fn, service_fn, ServiceFn}; -pub use self::fn_transform::transform_fn; -pub use self::from_err::{FromErr, FromErrNewService}; pub use self::map::{Map, MapNewService}; pub use self::map_config::{MapConfig, MappedConfig, UnitConfig}; pub use self::map_err::{MapErr, MapErrNewService}; pub use self::map_init_err::MapInitErr; +pub use self::pipeline::{new_pipeline, pipeline, NewPipeline, Pipeline}; pub use self::then::{Then, ThenNewService}; pub use self::transform::{apply_transform, IntoTransform, Transform}; @@ -60,18 +48,6 @@ impl>, I, E> IntoFuture for F { } } -/* -impl IntoFuture for Result { - type Item = I; - type Error = E; - type Future = Ready; - - fn into_future(self) -> Self::Future { - ready(self) - } -} -*/ - /// An asynchronous function from `Request` to a `Response`. pub trait Service { /// Requests handled by the service. @@ -95,10 +71,7 @@ pub trait Service { /// This is a **best effort** implementation. False positives are permitted. /// It is permitted for the service to return `Ready` from a `poll_ready` /// call and the next invocation of `call` results in an error. - fn poll_ready( - self: Pin<&mut Self>, - ctx: &mut task::Context<'_>, - ) -> Poll>; + fn poll_ready(&mut self, ctx: &mut task::Context<'_>) -> Poll>; /// Process the request and return the response asynchronously. /// @@ -126,114 +99,17 @@ pub trait Service { } } - fn poll_once<'a>(&'a mut self) -> LocalBoxFuture<'a, Poll>> { - unsafe { - let mut this = Pin::new_unchecked(self); - Pin::new_unchecked(Box::new(futures::future::poll_fn(move |cx| { - let this = &mut this; - Poll::Ready(this.as_mut().poll_ready(cx)) - }))) - } - } + // fn poll_once<'a>(&'a mut self) -> LocalBoxFuture<'a, Poll>> { + // unsafe { + // let mut this = Pin::new_unchecked(self); + // Pin::new_unchecked(Box::new(futures::future::poll_fn(move |cx| { + // let this = &mut this; + // Poll::Ready(this.as_mut().poll_ready(cx)) + // }))) + // } + // } } -/// An extension trait for `Service`s that provides a variety of convenient -/// adapters -pub trait ServiceExt: Service { - /// Apply function to specified service and use it as a next service in - /// chain. - fn apply_fn(self, service: B1, f: F) -> AndThenApply - where - Self: Sized, - F: FnMut(Self::Response, &mut B) -> Out, - Out: IntoFuture, - Out::Error: Into, - B: Service, - B1: IntoService, - { - AndThenApply::new(self, service, f) - } - - /// Call another service after call to this one has resolved successfully. - /// - /// This function can be used to chain two services together and ensure that - /// the second service isn't called until call to the fist service have - /// finished. Result of the call to the first service is used as an - /// input parameter for the second service's call. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it. - fn and_then(self, service: F) -> AndThen - where - Self: Sized, - F: IntoService, - B: Service, - { - AndThen::new(self, service.into_service()) - } - - /// Map this service's error to any error implementing `From` for - /// this service`s `Error`. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it. - fn from_err(self) -> FromErr - where - Self: Sized, - E: From, - { - FromErr::new(self) - } - - /// Chain on a computation for when a call to the service finished, - /// passing the result of the call to the next service `B`. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it. - fn then(self, service: B) -> Then - where - Self: Sized, - B: Service, Error = Self::Error>, - { - Then::new(self, service) - } - - /// Map this service's output to a different type, returning a new service - /// of the resulting type. - /// - /// This function is similar to the `Option::map` or `Iterator::map` where - /// it will change the type of the underlying service. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it, similar to the existing `map` methods in the - /// standard library. - fn map(self, f: F) -> Map - where - Self: Sized, - F: FnMut(Self::Response) -> R, - { - Map::new(self, f) - } - - /// Map this service's error to a different error, returning a new service. - /// - /// This function is similar to the `Result::map_err` where it will change - /// the error type of the underlying service. This is useful for example to - /// ensure that services have the same error type. - /// - /// Note that this function consumes the receiving service and returns a - /// wrapped version of it. - fn map_err(self, f: F) -> MapErr - where - Self: Sized, - F: Fn(Self::Error) -> E, - { - MapErr::new(self, f) - } -} - -impl ServiceExt for T where T: Service {} - /// Creates new `Service` values. /// /// Acts as a service factory. This is useful for cases where new `Service` @@ -271,128 +147,6 @@ pub trait NewService { /// Create and return a new service value asynchronously. fn new_service(&self, cfg: &Self::Config) -> Self::Future; - - /// Apply transform service to specified service and use it as a next service in - /// chain. - fn apply(self, transform: T1, service: B1) -> AndThenTransform - where - Self: Sized, - T: Transform, - T::Error: From, - T1: IntoTransform, - B: NewService, - B1: IntoNewService, - { - AndThenTransform::new(transform.into_transform(), self, service.into_new_service()) - } - - /// Apply function to specified service and use it as a next service in - /// chain. - fn apply_fn(self, service: I, f: F) -> AndThenApplyNewService - where - Self: Sized, - B: NewService, - I: IntoNewService, - F: FnMut(Self::Response, &mut B::Service) -> Out, - Out: IntoFuture, - Out::Error: Into, - { - AndThenApplyNewService::new(self, service, f) - } - - /// Call another service after call to this one has resolved successfully. - fn and_then(self, new_service: F) -> AndThenNewService - where - Self: Sized, - F: IntoNewService, - B: NewService< - Config = Self::Config, - Request = Self::Response, - Error = Self::Error, - InitError = Self::InitError, - >, - { - AndThenNewService::new(self, new_service) - } - - /// `NewService` that create service to map this service's error - /// and new service's init error to any error - /// implementing `From` for this service`s `Error`. - /// - /// Note that this function consumes the receiving new service and returns a - /// wrapped version of it. - fn from_err(self) -> FromErrNewService - where - Self: Sized, - E: From, - { - FromErrNewService::new(self) - } - - /// Create `NewService` to chain on a computation for when a call to the - /// service finished, passing the result of the call to the next - /// service `B`. - /// - /// Note that this function consumes the receiving future and returns a - /// wrapped version of it. - fn then(self, new_service: F) -> ThenNewService - where - Self: Sized, - F: IntoNewService, - B: NewService< - Config = Self::Config, - Request = Result, - Error = Self::Error, - InitError = Self::InitError, - >, - { - ThenNewService::new(self, new_service) - } - - /// Map this service's output to a different type, returning a new service - /// of the resulting type. - fn map(self, f: F) -> MapNewService - where - Self: Sized, - F: FnMut(Self::Response) -> R, - { - MapNewService::new(self, f) - } - - /// Map this service's error to a different error, returning a new service. - fn map_err(self, f: F) -> MapErrNewService - where - Self: Sized, - F: Fn(Self::Error) -> E + Clone, - { - MapErrNewService::new(self, f) - } - - /// Map this factory's init error to a different error, returning a new service. - fn map_init_err(self, f: F) -> MapInitErr - where - Self: Sized, - F: Fn(Self::InitError) -> E, - { - MapInitErr::new(self, f) - } - - /// Map config to a different error, returning a new service. - fn map_config(self, f: F) -> MapConfig - where - Self: Sized, - F: Fn(&C) -> MappedConfig, - { - MapConfig::new(self, f) - } - - /// Replace config with unit - fn unit_config(self) -> UnitConfig - where - Self: NewService + Sized, - { - UnitConfig::new(self) - } } impl<'a, S> Service for &'a mut S @@ -404,11 +158,8 @@ where type Error = S::Error; type Future = S::Future; - fn poll_ready( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - unsafe { self.map_unchecked_mut(|s| &mut **s).poll_ready(ctx) } + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + (**self).poll_ready(ctx) } fn call(&mut self, request: Self::Request) -> S::Future { @@ -425,14 +176,8 @@ where type Error = S::Error; type Future = S::Future; - fn poll_ready( - mut self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - unsafe { - let p: &mut S = Pin::as_mut(&mut self).get_mut(); - Pin::new_unchecked(p).poll_ready(ctx) - } + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + (**self).poll_ready(ctx) } fn call(&mut self, request: Self::Request) -> S::Future { @@ -449,14 +194,8 @@ where type Error = S::Error; type Future = S::Future; - fn poll_ready( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - unsafe { - let r = self.get_unchecked_mut(); - Pin::new_unchecked(&mut (*(**r).borrow_mut())).poll_ready(ctx) - } + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + self.borrow_mut().poll_ready(ctx) } fn call(&mut self, request: Self::Request) -> S::Future { diff --git a/actix-service/src/map.rs b/actix-service/src/map.rs index a29690b0..3854595d 100644 --- a/actix-service/src/map.rs +++ b/actix-service/src/map.rs @@ -1,19 +1,16 @@ +use std::future::Future; use std::marker::PhantomData; - -use futures::{Future, Poll}; - -use super::{NewService, Service}; use std::pin::Pin; -use std::task::Context; +use std::task::{Context, Poll}; use pin_project::pin_project; +use super::{NewService, Service}; + /// Service for the `map` combinator, changing the type of a service's response. /// /// This is created by the `ServiceExt::map` method. -#[pin_project] pub struct Map { - #[pin] service: A, f: F, _t: PhantomData, @@ -58,11 +55,8 @@ where type Error = A::Error; type Future = MapFuture; - fn poll_ready( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - self.project().service.poll_ready(ctx) + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(ctx) } fn call(&mut self, req: A::Request) -> Self::Future { @@ -98,7 +92,7 @@ where { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); match this.fut.poll(cx) { Poll::Ready(Ok(resp)) => Poll::Ready(Ok((this.f)(resp))), @@ -191,7 +185,7 @@ where { type Output = Result, A::InitError>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); if let Poll::Ready(svc) = this.fut.poll(cx)? { Poll::Ready(Ok(Map::new(svc, this.f.take().unwrap()))) diff --git a/actix-service/src/map_err.rs b/actix-service/src/map_err.rs index 0bbd5bf0..4630c1ed 100644 --- a/actix-service/src/map_err.rs +++ b/actix-service/src/map_err.rs @@ -1,20 +1,17 @@ +use std::future::Future; use std::marker::PhantomData; - -use futures::{Future, Poll}; - -use super::{NewService, Service}; +use std::pin::Pin; +use std::task::{Context, Poll}; use pin_project::pin_project; -use std::pin::Pin; -use std::task::Context; + +use super::{NewService, Service}; /// Service for the `map_err` combinator, changing the type of a service's /// error. /// /// This is created by the `ServiceExt::map_err` method. -#[pin_project] pub struct MapErr { - #[pin] service: A, f: F, _t: PhantomData, @@ -59,12 +56,8 @@ where type Error = E; type Future = MapErrFuture; - fn poll_ready( - mut self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - let mut this = self.project(); - this.service.poll_ready(ctx).map_err(this.f) + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(ctx).map_err(&self.f) } fn call(&mut self, req: A::Request) -> Self::Future { @@ -101,8 +94,7 @@ where type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - this.fut.poll(cx).map_err(this.f) + self.project().fut.poll(cx).map_err(&self.f) } } @@ -196,7 +188,7 @@ where { type Output = Result, A::InitError>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); if let Poll::Ready(svc) = this.fut.poll(cx)? { Poll::Ready(Ok(MapErr::new(svc, this.f.clone()))) diff --git a/actix-service/src/map_init_err.rs b/actix-service/src/map_init_err.rs index c9225eb3..ad802172 100644 --- a/actix-service/src/map_init_err.rs +++ b/actix-service/src/map_init_err.rs @@ -1,12 +1,11 @@ +use std::future::Future; use std::marker::PhantomData; - -use futures::{Future, Poll}; - -use super::NewService; +use std::pin::Pin; +use std::task::{Context, Poll}; use pin_project::pin_project; -use std::pin::Pin; -use std::task::Context; + +use super::NewService; /// `MapInitErr` service combinator pub struct MapInitErr { @@ -91,7 +90,7 @@ where type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + let this = self.project(); this.fut.poll(cx).map_err(this.f) } } diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs new file mode 100644 index 00000000..b1a2b5cc --- /dev/null +++ b/actix-service/src/pipeline.rs @@ -0,0 +1,152 @@ +use std::task::{Context, Poll}; + +use crate::and_then::{AndThen, AndThenNewService}; +use crate::then::{Then, ThenNewService}; +use crate::{IntoNewService, IntoService, NewService, Service}; + +pub fn pipeline(service: F) -> Pipeline +where + F: IntoService, + T: Service, +{ + Pipeline { + service: service.into_service(), + } +} + +pub fn new_pipeline(new_service: F) -> NewPipeline +where + F: IntoNewService, + T: NewService, +{ + NewPipeline { + service: new_service.into_new_service(), + } +} + +/// Pipeline service +pub struct Pipeline { + service: T, +} + +impl Pipeline { + /// Call another service after call to this one has resolved successfully. + /// + /// This function can be used to chain two services together and ensure that + /// the second service isn't called until call to the fist service have + /// finished. Result of the call to the first service is used as an + /// input parameter for the second service's call. + /// + /// Note that this function consumes the receiving service and returns a + /// wrapped version of it. + pub fn and_then( + self, + service: F, + ) -> Pipeline> + where + Self: Sized, + F: IntoService, + U: Service, + { + Pipeline { + service: AndThen::new(self.service, service.into_service()), + } + } + + /// Chain on a computation for when a call to the service finished, + /// passing the result of the call to the next service `U`. + /// + /// Note that this function consumes the receiving pipeline and returns a + /// wrapped version of it. + pub fn then( + self, + service: F, + ) -> Pipeline> + where + Self: Sized, + F: IntoService, + U: Service, Error = T::Error>, + { + Pipeline { + service: Then::new(self.service, service.into_service()), + } + } +} + +impl Service for Pipeline { + type Request = T::Request; + type Response = T::Response; + type Error = T::Error; + type Future = T::Future; + + #[inline] + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + self.service.poll_ready(ctx) + } + + #[inline] + fn call(&mut self, req: T::Request) -> Self::Future { + self.service.call(req) + } +} + +/// Pipeline constructor +pub struct NewPipeline { + service: T, +} + +impl NewPipeline { + /// Call another service after call to this one has resolved successfully. + pub fn and_then(self, new_service: U) -> NewPipeline> + where + Self: Sized, + F: IntoNewService, + U: NewService< + Config = T::Config, + Request = T::Response, + Error = T::Error, + InitError = T::InitError, + >, + { + NewPipeline { + service: AndThenNewService::new(self.service, new_service.into_new_service()), + } + } + + /// Create `NewService` to chain on a computation for when a call to the + /// service finished, passing the result of the call to the next + /// service `U`. + /// + /// Note that this function consumes the receiving pipeline and returns a + /// wrapped version of it. + pub fn then(self, new_service: F) -> NewPipeline> + where + Self: Sized, + F: IntoNewService, + U: NewService< + Config = T::Config, + Request = Result, + Error = T::Error, + InitError = T::InitError, + >, + { + NewPipeline { + service: ThenNewService::new(self.service, new_service.into_new_service()), + } + } +} + +impl NewService for NewPipeline { + type Config = T::Config; + type Request = T::Request; + type Response = T::Response; + type Error = T::Error; + type Service = T::Service; + type InitError = T::InitError; + type Future = T::Future; + + #[inline] + fn new_service(&self, cfg: &T::Config) -> Self::Future { + self.service.new_service(cfg) + } +} diff --git a/actix-service/src/then.rs b/actix-service/src/then.rs index 7cfbfe88..e48bf218 100644 --- a/actix-service/src/then.rs +++ b/actix-service/src/then.rs @@ -1,21 +1,18 @@ -use futures::{Future, Poll}; +use std::future::Future; use std::pin::Pin; -use std::task::Context; +use std::task::{Context, Poll}; + +use pin_project::pin_project; use super::{IntoNewService, NewService, Service}; use crate::cell::Cell; -use pin_project::pin_project; - /// Service for the `then` combinator, chaining a computation onto the end of /// another service. /// /// This is created by the `ServiceExt::then` method. -#[pin_project] pub struct Then { - #[pin] a: A, - #[pin] b: Cell, } @@ -52,13 +49,9 @@ where type Error = B::Error; type Future = ThenFuture; - fn poll_ready( - self: Pin<&mut Self>, - ctx: &mut Context<'_>, - ) -> Poll> { - let this = self.project(); - let not_ready = !this.a.poll_ready(ctx)?.is_ready(); - if !this.b.get_pin().poll_ready(ctx)?.is_ready() || not_ready { + fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { + let not_ready = !self.a.poll_ready(ctx)?.is_ready(); + if !self.b.get_mut().poll_ready(ctx)?.is_ready() || not_ready { Poll::Pending } else { Poll::Ready(Ok(())) @@ -247,7 +240,7 @@ where { type Output = Result, A::InitError>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); if this.a.is_none() { if let Poll::Ready(service) = this.fut_a.poll(cx)? { diff --git a/actix-service/src/transform.rs b/actix-service/src/transform.rs index e4682b63..08ab8689 100644 --- a/actix-service/src/transform.rs +++ b/actix-service/src/transform.rs @@ -1,11 +1,11 @@ +use std::future::Future; +use std::pin::Pin; use std::rc::Rc; use std::sync::Arc; +use std::task::{Context, Poll}; use crate::transform_err::{TransformFromErr, TransformMapInitErr}; use crate::{IntoNewService, NewService, Service}; -use futures::{Future, Poll}; -use std::pin::Pin; -use std::task::Context; use pin_project::pin_project; @@ -221,7 +221,7 @@ where { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); if this.fut_t.as_mut().as_pin_mut().is_none() { diff --git a/actix-service/src/transform_err.rs b/actix-service/src/transform_err.rs index a6940707..304fef8d 100644 --- a/actix-service/src/transform_err.rs +++ b/actix-service/src/transform_err.rs @@ -1,13 +1,12 @@ +use std::future::Future; use std::marker::PhantomData; - -use futures::{Future, Poll}; - -use super::Transform; use std::pin::Pin; -use std::task::Context; +use std::task::{Context, Poll}; use pin_project::pin_project; +use super::Transform; + /// Transform for the `map_err` combinator, changing the type of a new /// transform's init error. /// @@ -85,7 +84,7 @@ where { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); this.fut.poll(cx).map_err(this.f) } @@ -162,7 +161,7 @@ where { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.project().fut.poll(cx).map_err(E::from) } }