From d4b772d454e512d8591ce9f1822a552209577efb Mon Sep 17 00:00:00 2001
From: Nikolay Kim <fafhrd91@gmail.com>
Date: Mon, 24 Sep 2018 20:40:31 -0700
Subject: [PATCH] simplify FramedTransport

---
 src/framed.rs | 269 +++++++++++++-------------------------------------
 1 file changed, 68 insertions(+), 201 deletions(-)

diff --git a/src/framed.rs b/src/framed.rs
index 11d4f2ab..10ea0c36 100644
--- a/src/framed.rs
+++ b/src/framed.rs
@@ -1,9 +1,9 @@
 //! Framed dispatcher service and related utilities
-use std::fmt;
 use std::marker::PhantomData;
+use std::mem;
 
 use actix;
-use futures::future::{ok, Either, FutureResult, Join};
+use futures::future::{ok, Either, FutureResult};
 use futures::unsync::mpsc;
 use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
 use tokio_codec::{Decoder, Encoder, Framed};
@@ -14,110 +14,97 @@ use service::{IntoNewService, IntoService, NewService, Service};
 type Request<U> = <U as Decoder>::Item;
 type Response<U> = <U as Encoder>::Item;
 
-pub struct FramedNewService<S, T, U, E> {
+pub struct FramedNewService<S, T, U> {
     factory: S,
-    error_handler: E,
     _t: PhantomData<(T, U)>,
 }
 
-impl<S, T, U> FramedNewService<S, T, U, DefaultErrorHandler<S, U, S::InitError>>
+impl<S, T, U> FramedNewService<S, T, U>
 where
     T: AsyncRead + AsyncWrite,
     U: Decoder + Encoder,
     S: NewService<Request = Request<U>, Response = Option<Response<U>>> + Clone,
     <<S as NewService>::Service as Service>::Future: 'static,
-    <<S as NewService>::Service as Service>::Error: fmt::Debug + 'static,
+    <<S as NewService>::Service as Service>::Error: 'static,
     <U as Encoder>::Item: 'static,
-    <U as Encoder>::Error: fmt::Debug + 'static,
-    <U as Encoder>::Error: fmt::Debug + 'static,
+    <U as Encoder>::Error: 'static,
 {
     pub fn new<F1: IntoNewService<S>>(factory: F1) -> Self {
         Self {
             factory: factory.into_new_service(),
-            error_handler: DefaultErrorHandler(PhantomData),
             _t: PhantomData,
         }
     }
 }
 
-impl<S, T, U, E> Clone for FramedNewService<S, T, U, E>
+impl<S, T, U> Clone for FramedNewService<S, T, U>
 where
     S: Clone,
-    E: Clone,
 {
     fn clone(&self) -> Self {
         Self {
             factory: self.factory.clone(),
-            error_handler: self.error_handler.clone(),
             _t: PhantomData,
         }
     }
 }
 
-impl<S, T, U, E> NewService for FramedNewService<S, T, U, E>
+impl<S, T, U> NewService for FramedNewService<S, T, U>
 where
     T: AsyncRead + AsyncWrite,
     U: Decoder + Encoder,
     S: NewService<Request = Request<U>, Response = Option<Response<U>>> + Clone,
-    E: NewService<Request = TransportError<S::Service, U>, InitError = S::InitError> + Clone,
     <<S as NewService>::Service as Service>::Future: 'static,
-    <<S as NewService>::Service as Service>::Error: fmt::Debug + 'static,
+    <<S as NewService>::Service as Service>::Error: 'static,
     <U as Encoder>::Item: 'static,
-    <U as Decoder>::Error: fmt::Debug + 'static,
-    <U as Encoder>::Error: fmt::Debug + 'static,
+    <U as Encoder>::Error: 'static,
 {
     type Request = Framed<T, U>;
-    type Response = FramedTransport<S::Service, T, U, E::Service>;
+    type Response = FramedTransport<S::Service, T, U>;
     type Error = S::InitError;
     type InitError = S::InitError;
-    type Service = FramedService<S, T, U, E>;
+    type Service = FramedService<S, T, U>;
     type Future = FutureResult<Self::Service, Self::InitError>;
 
     fn new_service(&self) -> Self::Future {
         ok(FramedService {
             factory: self.factory.clone(),
-            error_service: self.error_handler.clone(),
             _t: PhantomData,
         })
     }
 }
 
-pub struct FramedService<S, T, U, E> {
+pub struct FramedService<S, T, U> {
     factory: S,
-    error_service: E,
     _t: PhantomData<(T, U)>,
 }
 
-impl<S, T, U, E> Clone for FramedService<S, T, U, E>
+impl<S, T, U> Clone for FramedService<S, T, U>
 where
     S: Clone,
-    E: Clone,
 {
     fn clone(&self) -> Self {
         Self {
             factory: self.factory.clone(),
-            error_service: self.error_service.clone(),
             _t: PhantomData,
         }
     }
 }
 
-impl<S, T, U, E> Service for FramedService<S, T, U, E>
+impl<S, T, U> Service for FramedService<S, T, U>
 where
     T: AsyncRead + AsyncWrite,
     U: Decoder + Encoder,
     S: NewService<Request = Request<U>, Response = Option<Response<U>>>,
-    E: NewService<Request = TransportError<S::Service, U>, InitError = S::InitError>,
     <<S as NewService>::Service as Service>::Future: 'static,
-    <<S as NewService>::Service as Service>::Error: fmt::Debug + 'static,
+    <<S as NewService>::Service as Service>::Error: 'static,
     <U as Encoder>::Item: 'static,
-    <U as Decoder>::Error: fmt::Debug + 'static,
-    <U as Encoder>::Error: fmt::Debug + 'static,
+    <U as Encoder>::Error: 'static,
 {
     type Request = Framed<T, U>;
-    type Response = FramedTransport<S::Service, T, U, E::Service>;
+    type Response = FramedTransport<S::Service, T, U>;
     type Error = S::InitError;
-    type Future = FramedServiceResponseFuture<S, T, U, E>;
+    type Future = FramedServiceResponseFuture<S, T, U>;
 
     fn poll_ready(&mut self) -> Poll<(), Self::Error> {
         Ok(Async::Ready(()))
@@ -125,129 +112,69 @@ where
 
     fn call(&mut self, req: Self::Request) -> Self::Future {
         FramedServiceResponseFuture {
-            fut: self
-                .factory
-                .new_service()
-                .join(self.error_service.new_service()),
+            fut: self.factory.new_service(),
+
             framed: Some(req),
         }
     }
 }
 
 #[doc(hidden)]
-pub struct FramedServiceResponseFuture<S, T, U, E>
+pub struct FramedServiceResponseFuture<S, T, U>
 where
     T: AsyncRead + AsyncWrite,
     U: Decoder + Encoder,
     S: NewService<Request = Request<U>, Response = Option<Response<U>>>,
-    E: NewService<Request = TransportError<S::Service, U>, InitError = S::InitError>,
     <<S as NewService>::Service as Service>::Future: 'static,
-    <<S as NewService>::Service as Service>::Error: fmt::Debug + 'static,
+    <<S as NewService>::Service as Service>::Error: 'static,
     <U as Encoder>::Item: 'static,
-    <U as Decoder>::Error: fmt::Debug + 'static,
-    <U as Encoder>::Error: fmt::Debug + 'static,
+    <U as Encoder>::Error: 'static,
 {
-    fut: Join<S::Future, E::Future>,
+    fut: S::Future,
     framed: Option<Framed<T, U>>,
 }
 
-impl<S, T, U, E> Future for FramedServiceResponseFuture<S, T, U, E>
+impl<S, T, U> Future for FramedServiceResponseFuture<S, T, U>
 where
     T: AsyncRead + AsyncWrite,
     U: Decoder + Encoder,
     S: NewService<Request = Request<U>, Response = Option<Response<U>>>,
-    E: NewService<Request = TransportError<S::Service, U>, InitError = S::InitError>,
     <<S as NewService>::Service as Service>::Future: 'static,
-    <<S as NewService>::Service as Service>::Error: fmt::Debug + 'static,
+    <<S as NewService>::Service as Service>::Error: 'static,
     <U as Encoder>::Item: 'static,
-    <U as Decoder>::Error: fmt::Debug + 'static,
-    <U as Encoder>::Error: fmt::Debug + 'static,
+    <U as Encoder>::Error: 'static,
 {
-    type Item = FramedTransport<S::Service, T, U, E::Service>;
+    type Item = FramedTransport<S::Service, T, U>;
     type Error = S::InitError;
 
     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
         match self.fut.poll()? {
             Async::NotReady => Ok(Async::NotReady),
-            Async::Ready((service, error_service)) => {
-                Ok(Async::Ready(FramedTransport::with_error_service(
-                    self.framed.take().unwrap(),
-                    service,
-                    error_service,
-                )))
-            }
+            Async::Ready(service) => Ok(Async::Ready(FramedTransport::new(
+                self.framed.take().unwrap(),
+                service,
+            ))),
         }
     }
 }
 
-pub enum TransportError<S: Service, U: Encoder + Decoder> {
+/// Framed transport errors
+pub enum FramedTransportError<S: Service, U: Encoder + Decoder> {
     Decoder(<U as Decoder>::Error),
     Encoder(<U as Encoder>::Error),
     Service(S::Error),
 }
 
-/// Default error handling service
-pub struct DefaultErrorHandler<S, U, E>(PhantomData<(S, U, E)>);
-
-impl<S, U, E> Service for DefaultErrorHandler<S, U, E>
-where
-    S: Service,
-    U: Encoder + Decoder,
-    S::Error: fmt::Debug,
-    <U as Decoder>::Error: fmt::Debug,
-    <U as Encoder>::Error: fmt::Debug,
-{
-    type Request = TransportError<S, U>;
-    type Response = ();
-    type Error = ();
-    type Future = FutureResult<Self::Response, Self::Error>;
-
-    fn poll_ready(&mut self) -> Poll<(), Self::Error> {
-        Ok(Async::Ready(()))
-    }
-
-    fn call(&mut self, req: Self::Request) -> Self::Future {
-        match req {
-            TransportError::Service(err) => debug!("Service error: {:?}", err),
-            TransportError::Decoder(err) => trace!("Service decoder error: {:?}", err),
-            TransportError::Encoder(err) => trace!("Service encoder error: {:?}", err),
-        }
-        ok(())
-    }
-}
-
-impl<S, U, E> NewService for DefaultErrorHandler<S, U, E>
-where
-    S: Service,
-    U: Encoder + Decoder,
-    S::Error: fmt::Debug,
-    <U as Decoder>::Error: fmt::Debug,
-    <U as Encoder>::Error: fmt::Debug,
-{
-    type Request = TransportError<S, U>;
-    type Response = ();
-    type Error = ();
-    type InitError = E;
-    type Service = DefaultErrorHandler<S, U, ()>;
-    type Future = FutureResult<Self::Service, Self::InitError>;
-
-    fn new_service(&self) -> Self::Future {
-        ok(DefaultErrorHandler(PhantomData))
-    }
-}
-
 /// FramedTransport - is a future that reads frames from Framed object
 /// and pass then to the service.
-pub struct FramedTransport<S, T, U, E>
+pub struct FramedTransport<S, T, U>
 where
     S: Service,
     T: AsyncRead + AsyncWrite,
     U: Encoder + Decoder,
-    E: Service,
 {
     service: S,
-    error_service: E,
-    state: TransportState<E>,
+    state: TransportState<S, U>,
     framed: Framed<T, U>,
     request: Option<Request<U>>,
     response: Option<Response<U>>,
@@ -256,23 +183,21 @@ where
     flushed: bool,
 }
 
-enum TransportState<E: Service> {
+enum TransportState<S: Service, U: Encoder + Decoder> {
     Processing,
-    Error(E::Future),
-    EncoderError(E::Future),
-    SinkFlushing,
+    Error(FramedTransportError<S, U>),
+    EncoderError(FramedTransportError<S, U>),
     Stopping,
 }
 
-impl<S, T, U> FramedTransport<S, T, U, DefaultErrorHandler<S, U, ()>>
+impl<S, T, U> FramedTransport<S, T, U>
 where
     T: AsyncRead + AsyncWrite,
     U: Decoder + Encoder,
     S: Service<Request = Request<U>, Response = Option<Response<U>>>,
     S::Future: 'static,
-    S::Error: fmt::Debug + 'static,
-    <U as Encoder>::Error: fmt::Debug + 'static,
-    <U as Decoder>::Error: fmt::Debug + 'static,
+    S::Error: 'static,
+    <U as Encoder>::Error: 'static,
 {
     pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
         let (write_tx, write_rx) = mpsc::channel(16);
@@ -281,62 +206,24 @@ where
             write_rx,
             write_tx,
             service: service.into_service(),
-            error_service: DefaultErrorHandler(PhantomData),
             state: TransportState::Processing,
             request: None,
             response: None,
             flushed: true,
         }
     }
-
-    /// Set error handler service
-    pub fn error_handler<E>(self, handler: E) -> FramedTransport<S, T, U, E>
-    where
-        E: Service<Request = TransportError<S, U>>,
-    {
-        FramedTransport {
-            framed: self.framed,
-            request: self.request,
-            service: self.service,
-            write_rx: self.write_rx,
-            write_tx: self.write_tx,
-            response: self.response,
-            flushed: self.flushed,
-            state: TransportState::Processing,
-            error_service: handler,
-        }
-    }
 }
 
-impl<S, T, U, E> FramedTransport<S, T, U, E>
+impl<S, T, U> FramedTransport<S, T, U>
 where
     T: AsyncRead + AsyncWrite,
     U: Decoder + Encoder,
     S: Service<Request = Request<U>, Response = Option<Response<U>>>,
-    E: Service<Request = TransportError<S, U>>,
     S::Future: 'static,
-    S::Error: fmt::Debug + 'static,
+    S::Error: 'static,
     <U as Encoder>::Item: 'static,
-    <U as Encoder>::Error: fmt::Debug + 'static,
-    <U as Decoder>::Error: fmt::Debug + 'static,
+    <U as Encoder>::Error: 'static,
 {
-    pub fn with_error_service<F: IntoService<S>>(
-        framed: Framed<T, U>, service: F, error_service: E,
-    ) -> Self {
-        let (write_tx, write_rx) = mpsc::channel(16);
-        FramedTransport {
-            framed,
-            write_rx,
-            write_tx,
-            error_service,
-            service: service.into_service(),
-            state: TransportState::Processing,
-            request: None,
-            response: None,
-            flushed: true,
-        }
-    }
-
     fn poll_service(&mut self) -> bool {
         match self.service.poll_ready() {
             Ok(Async::Ready(_)) => {
@@ -365,9 +252,8 @@ where
                                 return false;
                             }
                             Err(err) => {
-                                self.state = TransportState::Error(
-                                    self.error_service.call(TransportError::Service(err)),
-                                );
+                                self.state =
+                                    TransportState::Error(FramedTransportError::Service(err));
                                 return true;
                             }
                         }
@@ -375,9 +261,8 @@ where
                     match self.framed.poll() {
                         Ok(Async::Ready(Some(el))) => item = Some(el),
                         Err(err) => {
-                            self.state = TransportState::Error(
-                                self.error_service.call(TransportError::Decoder(err)),
-                            );
+                            self.state =
+                                TransportState::Error(FramedTransportError::Decoder(err));
                             return true;
                         }
                         Ok(Async::NotReady) => return false,
@@ -390,9 +275,7 @@ where
             }
             Ok(Async::NotReady) => return false,
             Err(err) => {
-                self.state = TransportState::Error(
-                    self.error_service.call(TransportError::Service(err)),
-                );
+                self.state = TransportState::Error(FramedTransportError::Service(err));
                 return true;
             }
         }
@@ -408,10 +291,8 @@ where
                     Ok(AsyncSink::Ready) => None,
                     Ok(AsyncSink::NotReady(item)) => Some(item),
                     Err(err) => {
-                        trace!("Connection error: {:?}", err);
-                        self.state = TransportState::EncoderError(
-                            self.error_service.call(TransportError::Encoder(err)),
-                        );
+                        self.state =
+                            TransportState::EncoderError(FramedTransportError::Encoder(err));
                         return true;
                     }
                 }
@@ -427,10 +308,8 @@ where
                     }
                     Ok(Async::NotReady) => break,
                     Err(err) => {
-                        trace!("Connection flush error: {:?}", err);
-                        self.state = TransportState::EncoderError(
-                            self.error_service.call(TransportError::Encoder(err)),
-                        );
+                        self.state =
+                            TransportState::EncoderError(FramedTransportError::Encoder(err));
                         return true;
                     }
                 }
@@ -443,9 +322,8 @@ where
                         Ok(Async::Ready(Some(msg))) => match msg {
                             Ok(msg) => item = Some(msg),
                             Err(err) => {
-                                self.state = TransportState::Error(
-                                    self.error_service.call(TransportError::Service(err)),
-                                );
+                                self.state =
+                                    TransportState::Error(FramedTransportError::Service(err));
                                 return true;
                             }
                         },
@@ -466,23 +344,21 @@ where
     }
 }
 
-impl<S, T, U, E> Future for FramedTransport<S, T, U, E>
+impl<S, T, U> Future for FramedTransport<S, T, U>
 where
     T: AsyncRead + AsyncWrite,
     U: Decoder + Encoder,
     S: Service<Request = Request<U>, Response = Option<Response<U>>>,
     S::Future: 'static,
-    S::Error: fmt::Debug + 'static,
-    E: Service<Request = TransportError<S, U>>,
+    S::Error: 'static,
     <U as Encoder>::Item: 'static,
-    <U as Encoder>::Error: fmt::Debug + 'static,
-    <U as Decoder>::Error: fmt::Debug + 'static,
+    <U as Encoder>::Error: 'static,
 {
     type Item = ();
-    type Error = S::Error;
+    type Error = FramedTransportError<S, U>;
 
     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
-        let state = match self.state {
+        match mem::replace(&mut self.state, TransportState::Processing) {
             TransportState::Processing => {
                 if self.poll_service() {
                     return self.poll();
@@ -492,27 +368,18 @@ where
                 }
                 return Ok(Async::NotReady);
             }
-            TransportState::Error(ref mut fut) => match fut.poll() {
-                Err(_) | Ok(Async::Ready(_)) => TransportState::SinkFlushing,
-                _ => return Ok(Async::NotReady),
-            },
-            TransportState::EncoderError(ref mut fut) => match fut.poll() {
-                Err(_) | Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
-                _ => return Ok(Async::NotReady),
-            },
-            TransportState::SinkFlushing => {
+            TransportState::Error(err) => {
                 if self.poll_response() {
-                    return self.poll();
+                    return Err(err);
                 }
                 if self.flushed {
-                    return Ok(Async::Ready(()));
+                    return Err(err);
                 }
+                self.state = TransportState::Error(err);
                 return Ok(Async::NotReady);
             }
+            TransportState::EncoderError(err) => return Err(err),
             TransportState::Stopping => return Ok(Async::Ready(())),
-        };
-
-        self.state = state;
-        self.poll()
+        }
     }
 }