update other crates

This commit is contained in:
Rob Ede 2020-12-25 02:05:31 +00:00
parent 0d19050269
commit 6e19dde3e7
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6
10 changed files with 113 additions and 104 deletions

View File

@ -40,8 +40,7 @@ impl<T> Clone for TcpConnectorFactory<T> {
} }
} }
impl<T: Address> ServiceFactory for TcpConnectorFactory<T> { impl<T: Address> ServiceFactory<Connect<T>> for TcpConnectorFactory<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>; type Response = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
type Config = (); type Config = ();
@ -70,8 +69,7 @@ impl<T> Clone for TcpConnector<T> {
} }
} }
impl<T: Address> Service for TcpConnector<T> { impl<T: Address> Service<Connect<T>> for TcpConnector<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>; type Response = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]

View File

@ -76,8 +76,8 @@ pub async fn start_default_resolver() -> Result<AsyncResolver, ConnectError> {
/// Create TCP connector service. /// Create TCP connector service.
pub fn new_connector<T: Address + 'static>( pub fn new_connector<T: Address + 'static>(
resolver: AsyncResolver, resolver: AsyncResolver,
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> ) -> impl Service<Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> + Clone
+ Clone { {
pipeline(Resolver::new(resolver)).and_then(TcpConnector::new()) pipeline(Resolver::new(resolver)).and_then(TcpConnector::new())
} }
@ -85,8 +85,8 @@ pub fn new_connector<T: Address + 'static>(
pub fn new_connector_factory<T: Address + 'static>( pub fn new_connector_factory<T: Address + 'static>(
resolver: AsyncResolver, resolver: AsyncResolver,
) -> impl ServiceFactory< ) -> impl ServiceFactory<
Connect<T>,
Config = (), Config = (),
Request = Connect<T>,
Response = Connection<T, TcpStream>, Response = Connection<T, TcpStream>,
Error = ConnectError, Error = ConnectError,
InitError = (), InitError = (),
@ -96,15 +96,15 @@ pub fn new_connector_factory<T: Address + 'static>(
/// Create connector service with default parameters. /// Create connector service with default parameters.
pub fn default_connector<T: Address + 'static>( pub fn default_connector<T: Address + 'static>(
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> ) -> impl Service<Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> + Clone
+ Clone { {
pipeline(Resolver::default()).and_then(TcpConnector::new()) pipeline(Resolver::default()).and_then(TcpConnector::new())
} }
/// Create connector service factory with default parameters. /// Create connector service factory with default parameters.
pub fn default_connector_factory<T: Address + 'static>() -> impl ServiceFactory< pub fn default_connector_factory<T: Address + 'static>() -> impl ServiceFactory<
Connect<T>,
Config = (), Config = (),
Request = Connect<T>,
Response = Connection<T, TcpStream>, Response = Connection<T, TcpStream>,
Error = ConnectError, Error = ConnectError,
InitError = (), InitError = (),

View File

@ -54,8 +54,7 @@ impl<T> Clone for ResolverFactory<T> {
} }
} }
impl<T: Address> ServiceFactory for ResolverFactory<T> { impl<T: Address> ServiceFactory<Connect<T>> for ResolverFactory<T> {
type Request = Connect<T>;
type Response = Connect<T>; type Response = Connect<T>;
type Error = ConnectError; type Error = ConnectError;
type Config = (); type Config = ();
@ -102,8 +101,7 @@ impl<T> Clone for Resolver<T> {
} }
} }
impl<T: Address> Service for Resolver<T> { impl<T: Address> Service<Connect<T>> for Resolver<T> {
type Request = Connect<T>;
type Response = Connect<T>; type Response = Connect<T>;
type Error = ConnectError; type Error = ConnectError;
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]

View File

@ -70,8 +70,7 @@ impl<T> Clone for ConnectServiceFactory<T> {
} }
} }
impl<T: Address> ServiceFactory for ConnectServiceFactory<T> { impl<T: Address> ServiceFactory<Connect<T>> for ConnectServiceFactory<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>; type Response = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
type Config = (); type Config = ();
@ -90,8 +89,7 @@ pub struct ConnectService<T> {
resolver: Resolver<T>, resolver: Resolver<T>,
} }
impl<T: Address> Service for ConnectService<T> { impl<T: Address> Service<Connect<T>> for ConnectService<T> {
type Request = Connect<T>;
type Response = Connection<T, TcpStream>; type Response = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
type Future = ConnectServiceResponse<T>; type Future = ConnectServiceResponse<T>;
@ -109,8 +107,8 @@ impl<T: Address> Service for ConnectService<T> {
} }
enum ConnectState<T: Address> { enum ConnectState<T: Address> {
Resolve(<Resolver<T> as Service>::Future), Resolve(<Resolver<T> as Service<Connect<T>>>::Future),
Connect(<TcpConnector<T> as Service>::Future), Connect(<TcpConnector<T> as Service<Connect<T>>>::Future),
} }
impl<T: Address> ConnectState<T> { impl<T: Address> ConnectState<T> {
@ -160,8 +158,7 @@ pub struct TcpConnectService<T> {
resolver: Resolver<T>, resolver: Resolver<T>,
} }
impl<T: Address + 'static> Service for TcpConnectService<T> { impl<T: Address + 'static> Service<Connect<T>> for TcpConnectService<T> {
type Request = Connect<T>;
type Response = TcpStream; type Response = TcpStream;
type Error = ConnectError; type Error = ConnectError;
type Future = TcpConnectServiceResponse<T>; type Future = TcpConnectServiceResponse<T>;
@ -179,8 +176,8 @@ impl<T: Address + 'static> Service for TcpConnectService<T> {
} }
enum TcpConnectState<T: Address> { enum TcpConnectState<T: Address> {
Resolve(<Resolver<T> as Service>::Future), Resolve(<Resolver<T> as Service<Connect<T>>>::Future),
Connect(<TcpConnector<T> as Service>::Future), Connect(<TcpConnector<T> as Service<Connect<T>>>::Future),
} }
impl<T: Address> TcpConnectState<T> { impl<T: Address> TcpConnectState<T> {

View File

@ -2,7 +2,10 @@ use std::collections::HashMap;
use std::{fmt, io, net}; use std::{fmt, io, net};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_service as actix; use actix_service::{
fn_service, IntoServiceFactory as IntoBaseServiceFactory,
ServiceFactory as BaseServiceFactory,
};
use actix_utils::counter::CounterGuard; use actix_utils::counter::CounterGuard;
use futures_util::future::{ok, Future, FutureExt, LocalBoxFuture}; use futures_util::future::{ok, Future, FutureExt, LocalBoxFuture};
use log::error; use log::error;
@ -141,12 +144,10 @@ impl InternalServiceFactory for ConfiguredService {
let name = names.remove(&token).unwrap().0; let name = names.remove(&token).unwrap().0;
res.push(( res.push((
token, token,
Box::new(StreamService::new(actix::fn_service( Box::new(StreamService::new(fn_service(move |_: TcpStream| {
move |_: TcpStream| { error!("Service {:?} is not configured", name);
error!("Service {:?} is not configured", name); ok::<_, ()>(())
ok::<_, ()>(()) }))),
},
))),
)); ));
}; };
} }
@ -208,8 +209,8 @@ impl ServiceRuntime {
/// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods. /// *ServiceConfig::bind()* or *ServiceConfig::listen()* methods.
pub fn service<T, F>(&mut self, name: &str, service: F) pub fn service<T, F>(&mut self, name: &str, service: F)
where where
F: actix::IntoServiceFactory<T>, F: IntoBaseServiceFactory<T, TcpStream>,
T: actix::ServiceFactory<Config = (), Request = TcpStream> + 'static, T: BaseServiceFactory<TcpStream, Config = ()> + 'static,
T::Future: 'static, T::Future: 'static,
T::Service: 'static, T::Service: 'static,
T::InitError: fmt::Debug, T::InitError: fmt::Debug,
@ -237,8 +238,8 @@ impl ServiceRuntime {
} }
type BoxedNewService = Box< type BoxedNewService = Box<
dyn actix::ServiceFactory< dyn BaseServiceFactory<
Request = (Option<CounterGuard>, StdStream), (Option<CounterGuard>, StdStream),
Response = (), Response = (),
Error = (), Error = (),
InitError = (), InitError = (),
@ -252,15 +253,14 @@ struct ServiceFactory<T> {
inner: T, inner: T,
} }
impl<T> actix::ServiceFactory for ServiceFactory<T> impl<T> BaseServiceFactory<(Option<CounterGuard>, StdStream)> for ServiceFactory<T>
where where
T: actix::ServiceFactory<Config = (), Request = TcpStream>, T: BaseServiceFactory<TcpStream, Config = ()>,
T::Future: 'static, T::Future: 'static,
T::Service: 'static, T::Service: 'static,
T::Error: 'static, T::Error: 'static,
T::InitError: fmt::Debug + 'static, T::InitError: fmt::Debug + 'static,
{ {
type Request = (Option<CounterGuard>, StdStream);
type Response = (); type Response = ();
type Error = (); type Error = ();
type Config = (); type Config = ();

View File

@ -3,7 +3,7 @@ use std::net::SocketAddr;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use actix_rt::spawn; use actix_rt::spawn;
use actix_service::{self as actix, Service, ServiceFactory as ActixServiceFactory}; use actix_service::{Service, ServiceFactory as BaseServiceFactory};
use actix_utils::counter::CounterGuard; use actix_utils::counter::CounterGuard;
use futures_util::future::{err, ok, LocalBoxFuture, Ready}; use futures_util::future::{err, ok, LocalBoxFuture, Ready};
use futures_util::{FutureExt, TryFutureExt}; use futures_util::{FutureExt, TryFutureExt};
@ -13,7 +13,7 @@ use super::Token;
use crate::socket::{FromStream, StdStream}; use crate::socket::{FromStream, StdStream};
pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static { pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: actix::ServiceFactory<Config = (), Request = Stream>; type Factory: BaseServiceFactory<Stream, Config = ()>;
fn create(&self) -> Self::Factory; fn create(&self) -> Self::Factory;
} }
@ -28,31 +28,34 @@ pub(crate) trait InternalServiceFactory: Send {
pub(crate) type BoxedServerService = Box< pub(crate) type BoxedServerService = Box<
dyn Service< dyn Service<
Request = (Option<CounterGuard>, StdStream), (Option<CounterGuard>, StdStream),
Response = (), Response = (),
Error = (), Error = (),
Future = Ready<Result<(), ()>>, Future = Ready<Result<(), ()>>,
>, >,
>; >;
pub(crate) struct StreamService<T> { pub(crate) struct StreamService<S, I> {
service: T, service: S,
_phantom: PhantomData<I>,
} }
impl<T> StreamService<T> { impl<S, I> StreamService<S, I> {
pub(crate) fn new(service: T) -> Self { pub(crate) fn new(service: S) -> Self {
StreamService { service } StreamService {
service,
_phantom: PhantomData,
}
} }
} }
impl<T, I> Service for StreamService<T> impl<S, I> Service<(Option<CounterGuard>, StdStream)> for StreamService<S, I>
where where
T: Service<Request = I>, S: Service<I>,
T::Future: 'static, S::Future: 'static,
T::Error: 'static, S::Error: 'static,
I: FromStream, I: FromStream,
{ {
type Request = (Option<CounterGuard>, StdStream);
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future = Ready<Result<(), ()>>; type Future = Ready<Result<(), ()>>;
@ -144,7 +147,7 @@ where
impl<F, T, I> ServiceFactory<I> for F impl<F, T, I> ServiceFactory<I> for F
where where
F: Fn() -> T + Send + Clone + 'static, F: Fn() -> T + Send + Clone + 'static,
T: actix::ServiceFactory<Config = (), Request = I>, T: BaseServiceFactory<I, Config = ()>,
I: FromStream, I: FromStream,
{ {
type Factory = T; type Factory = T;

View File

@ -27,12 +27,11 @@ impl<S, F> TracingService<S, F> {
} }
} }
impl<S, F> Service for TracingService<S, F> impl<S, Req, F> Service<Req> for TracingService<S, F>
where where
S: Service, S: Service<Req>,
F: Fn(&S::Request) -> Option<tracing::Span>, F: Fn(&Req) -> Option<tracing::Span>,
{ {
type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = S::Error; type Error = S::Error;
type Future = Either<S::Future, Instrumented<S::Future>>; type Future = Either<S::Future, Instrumented<S::Future>>;
@ -41,7 +40,7 @@ where
self.inner.poll_ready(ctx) self.inner.poll_ready(ctx)
} }
fn call(&mut self, req: Self::Request) -> Self::Future { fn call(&mut self, req: Req) -> Self::Future {
let span = (self.make_span)(&req); let span = (self.make_span)(&req);
let _enter = span.as_ref().map(|s| s.enter()); let _enter = span.as_ref().map(|s| s.enter());
@ -74,18 +73,12 @@ impl<S, U, F> TracingTransform<S, U, F> {
} }
} }
impl<S, U, F> Transform<S> for TracingTransform<S, U, F> impl<S, Req, U, F> Transform<S, Req> for TracingTransform<S, U, F>
where where
S: Service, S: Service<Req>,
U: ServiceFactory< U: ServiceFactory<Req, Response = S::Response, Error = S::Error, Service = S>,
Request = S::Request, F: Fn(&Req) -> Option<tracing::Span> + Clone,
Response = S::Response,
Error = S::Error,
Service = S,
>,
F: Fn(&S::Request) -> Option<tracing::Span> + Clone,
{ {
type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = S::Error; type Error = S::Error;
type Transform = TracingService<S, F>; type Transform = TracingService<S, F>;
@ -110,14 +103,14 @@ where
/// |req: &Request| Some(span!(Level::INFO, "request", req.id)) /// |req: &Request| Some(span!(Level::INFO, "request", req.id))
/// ); /// );
/// ``` /// ```
pub fn trace<S, U, F>( pub fn trace<S, Req, U, F>(
service_factory: U, service_factory: U,
make_span: F, make_span: F,
) -> ApplyTransform<TracingTransform<S::Service, S, F>, S> ) -> ApplyTransform<TracingTransform<S::Service, S, F>, S, Req>
where where
S: ServiceFactory, S: ServiceFactory<Req>,
F: Fn(&S::Request) -> Option<tracing::Span> + Clone, F: Fn(&Req) -> Option<tracing::Span> + Clone,
U: IntoServiceFactory<S>, U: IntoServiceFactory<S, Req>,
{ {
apply( apply(
TracingTransform::new(make_span), TracingTransform::new(make_span),

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-utils" name = "actix-utils"
version = "3.0.0" version = "2.0.0"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Various network related services and utilities for the Actix ecosystem." description = "Various network related services and utilities for the Actix ecosystem."
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@ -1,4 +1,4 @@
//! Framed dispatcher service and related utilities //! Framed dispatcher service and related utilities.
#![allow(type_alias_bounds)] #![allow(type_alias_bounds)]
@ -11,6 +11,7 @@ use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service}; use actix_service::{IntoService, Service};
use futures_core::stream::Stream; use futures_core::stream::Stream;
use log::debug; use log::debug;
use pin_project_lite::pin_project;
use crate::mpsc; use crate::mpsc;
@ -62,12 +63,12 @@ pub enum Message<T> {
Close, Close,
} }
pin_project_lite::pin_project! { pin_project! {
/// Dispatcher is a future that reads frames from Framed object /// Dispatcher is a future that reads frames from Framed object
/// and passes them to the service. /// and passes them to the service.
pub struct Dispatcher<S, T, U, I> pub struct Dispatcher<S, T, U, I>
where where
S: Service<Request = <U as Decoder>::Item, Response = I>, S: Service<<U as Decoder>::Item, Response = I>,
S::Error: 'static, S::Error: 'static,
S::Future: 'static, S::Future: 'static,
T: AsyncRead, T: AsyncRead,
@ -86,7 +87,11 @@ pin_project_lite::pin_project! {
} }
} }
enum State<S: Service, U: Encoder<I> + Decoder, I> { enum State<S, U, I>
where
S: Service<<U as Decoder>::Item>,
U: Encoder<I> + Decoder,
{
Processing, Processing,
Error(DispatcherError<S::Error, U, I>), Error(DispatcherError<S::Error, U, I>),
FramedError(DispatcherError<S::Error, U, I>), FramedError(DispatcherError<S::Error, U, I>),
@ -94,7 +99,11 @@ enum State<S: Service, U: Encoder<I> + Decoder, I> {
Stopping, Stopping,
} }
impl<S: Service, U: Encoder<I> + Decoder, I> State<S, U, I> { impl<S, U, I> State<S, U, I>
where
S: Service<<U as Decoder>::Item>,
U: Encoder<I> + Decoder,
{
fn take_error(&mut self) -> DispatcherError<S::Error, U, I> { fn take_error(&mut self) -> DispatcherError<S::Error, U, I> {
match mem::replace(self, State::Processing) { match mem::replace(self, State::Processing) {
State::Error(err) => err, State::Error(err) => err,
@ -112,7 +121,7 @@ impl<S: Service, U: Encoder<I> + Decoder, I> State<S, U, I> {
impl<S, T, U, I> Dispatcher<S, T, U, I> impl<S, T, U, I> Dispatcher<S, T, U, I>
where where
S: Service<Request = <U as Decoder>::Item, Response = I>, S: Service<<U as Decoder>::Item, Response = I>,
S::Error: 'static, S::Error: 'static,
S::Future: 'static, S::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
@ -121,7 +130,10 @@ where
<U as Decoder>::Error: fmt::Debug, <U as Decoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug, <U as Encoder<I>>::Error: fmt::Debug,
{ {
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self { pub fn new<F>(framed: Framed<T, U>, service: F) -> Self
where
F: IntoService<S, <U as Decoder>::Item>,
{
let (tx, rx) = mpsc::channel(); let (tx, rx) = mpsc::channel();
Dispatcher { Dispatcher {
framed, framed,
@ -133,11 +145,14 @@ where
} }
/// Construct new `Dispatcher` instance with customer `mpsc::Receiver` /// Construct new `Dispatcher` instance with customer `mpsc::Receiver`
pub fn with_rx<F: IntoService<S>>( pub fn with_rx<F>(
framed: Framed<T, U>, framed: Framed<T, U>,
service: F, service: F,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>, rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
) -> Self { ) -> Self
where
F: IntoService<S, <U as Decoder>::Item>,
{
let tx = rx.sender(); let tx = rx.sender();
Dispatcher { Dispatcher {
framed, framed,
@ -176,7 +191,7 @@ where
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where where
S: Service<Request = <U as Decoder>::Item, Response = I>, S: Service<<U as Decoder>::Item, Response = I>,
S::Error: 'static, S::Error: 'static,
S::Future: 'static, S::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
@ -220,7 +235,7 @@ where
/// write to framed object /// write to framed object
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where where
S: Service<Request = <U as Decoder>::Item, Response = I>, S: Service<<U as Decoder>::Item, Response = I>,
S::Error: 'static, S::Error: 'static,
S::Future: 'static, S::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,
@ -271,7 +286,7 @@ where
impl<S, T, U, I> Future for Dispatcher<S, T, U, I> impl<S, T, U, I> Future for Dispatcher<S, T, U, I>
where where
S: Service<Request = <U as Decoder>::Item, Response = I>, S: Service<<U as Decoder>::Item, Response = I>,
S::Error: 'static, S::Error: 'static,
S::Future: 'static, S::Future: 'static,
T: AsyncRead + AsyncWrite, T: AsyncRead + AsyncWrite,

View File

@ -2,6 +2,7 @@
//! //!
//! If the response does not complete within the specified timeout, the response //! If the response does not complete within the specified timeout, the response
//! will be aborted. //! will be aborted.
use core::future::Future; use core::future::Future;
use core::marker::PhantomData; use core::marker::PhantomData;
use core::pin::Pin; use core::pin::Pin;
@ -10,6 +11,7 @@ use core::{fmt, time};
use actix_rt::time::{delay_for, Delay}; use actix_rt::time::{delay_for, Delay};
use actix_service::{IntoService, Service, Transform}; use actix_service::{IntoService, Service, Transform};
use pin_project_lite::pin_project;
/// Applies a timeout to requests. /// Applies a timeout to requests.
#[derive(Debug)] #[derive(Debug)]
@ -77,21 +79,21 @@ impl<E> Clone for Timeout<E> {
} }
} }
impl<S, E> Transform<S> for Timeout<E> impl<S, E, Req> Transform<S, Req> for Timeout<E>
where where
S: Service, S: Service<Req>,
{ {
type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = TimeoutError<S::Error>; type Error = TimeoutError<S::Error>;
type Transform = TimeoutService<S>;
type InitError = E; type InitError = E;
type Transform = TimeoutService<S, Req>;
type Future = TimeoutFuture<Self::Transform, Self::InitError>; type Future = TimeoutFuture<Self::Transform, Self::InitError>;
fn new_transform(&self, service: S) -> Self::Future { fn new_transform(&self, service: S) -> Self::Future {
let service = TimeoutService { let service = TimeoutService {
service, service,
timeout: self.timeout, timeout: self.timeout,
_phantom: PhantomData,
}; };
TimeoutFuture { TimeoutFuture {
@ -118,40 +120,41 @@ impl<T, E> Future for TimeoutFuture<T, E> {
/// Applies a timeout to requests. /// Applies a timeout to requests.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct TimeoutService<S> { pub struct TimeoutService<S, Req> {
service: S, service: S,
timeout: time::Duration, timeout: time::Duration,
_phantom: PhantomData<Req>,
} }
impl<S> TimeoutService<S> impl<S, Req> TimeoutService<S, Req>
where where
S: Service, S: Service<Req>,
{ {
pub fn new<U>(timeout: time::Duration, service: U) -> Self pub fn new<U>(timeout: time::Duration, service: U) -> Self
where where
U: IntoService<S>, U: IntoService<S, Req>,
{ {
TimeoutService { TimeoutService {
timeout, timeout,
service: service.into_service(), service: service.into_service(),
_phantom: PhantomData,
} }
} }
} }
impl<S> Service for TimeoutService<S> impl<S, Req> Service<Req> for TimeoutService<S, Req>
where where
S: Service, S: Service<Req>,
{ {
type Request = S::Request;
type Response = S::Response; type Response = S::Response;
type Error = TimeoutError<S::Error>; type Error = TimeoutError<S::Error>;
type Future = TimeoutServiceResponse<S>; type Future = TimeoutServiceResponse<S, Req>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx).map_err(TimeoutError::Service) self.service.poll_ready(cx).map_err(TimeoutError::Service)
} }
fn call(&mut self, request: S::Request) -> Self::Future { fn call(&mut self, request: Req) -> Self::Future {
TimeoutServiceResponse { TimeoutServiceResponse {
fut: self.service.call(request), fut: self.service.call(request),
sleep: delay_for(self.timeout), sleep: delay_for(self.timeout),
@ -159,19 +162,22 @@ where
} }
} }
pin_project_lite::pin_project! { pin_project! {
/// `TimeoutService` response future /// `TimeoutService` response future
#[derive(Debug)] #[derive(Debug)]
pub struct TimeoutServiceResponse<T: Service> { pub struct TimeoutServiceResponse<T, Req>
where
T: Service<Req>
{
#[pin] #[pin]
fut: T::Future, fut: T::Future,
sleep: Delay, sleep: Delay,
} }
} }
impl<T> Future for TimeoutServiceResponse<T> impl<T, Req> Future for TimeoutServiceResponse<T, Req>
where where
T: Service, T: Service<Req>,
{ {
type Output = Result<T::Response, TimeoutError<T::Error>>; type Output = Result<T::Response, TimeoutError<T::Error>>;
@ -204,8 +210,7 @@ mod tests {
struct SleepService(Duration); struct SleepService(Duration);
impl Service for SleepService { impl Service<()> for SleepService {
type Request = ();
type Response = (); type Response = ();
type Error = (); type Error = ();
type Future = LocalBoxFuture<'static, Result<(), ()>>; type Future = LocalBoxFuture<'static, Result<(), ()>>;