From 934a9867f7898b1b15bc123268d516683b1a67eb Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Mon, 18 May 2020 01:50:47 +0100 Subject: [PATCH] use new codec signature in actix-utils --- actix-utils/src/framed.rs | 94 +++++++++++++++++++-------------------- actix-utils/src/lib.rs | 2 +- 2 files changed, 47 insertions(+), 49 deletions(-) diff --git a/actix-utils/src/framed.rs b/actix-utils/src/framed.rs index 1663db9d..09515956 100644 --- a/actix-utils/src/framed.rs +++ b/actix-utils/src/framed.rs @@ -6,31 +6,28 @@ use std::{fmt, mem}; use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed}; use actix_service::{IntoService, Service}; -use futures_util::{future::Future, FutureExt, stream::Stream}; +use futures_util::{future::Future, stream::Stream, FutureExt}; use log::debug; use crate::mpsc; -type Request = ::Item; -type Response = ::Item; - /// Framed transport errors -pub enum DispatcherError { +pub enum DispatcherError + Decoder, I> { Service(E), - Encoder(::Error), + Encoder(>::Error), Decoder(::Error), } -impl From for DispatcherError { +impl + Decoder, I> From for DispatcherError { fn from(err: E) -> Self { DispatcherError::Service(err) } } -impl fmt::Debug for DispatcherError +impl + Decoder, I> fmt::Debug for DispatcherError where E: fmt::Debug, - ::Error: fmt::Debug, + >::Error: fmt::Debug, ::Error: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -42,10 +39,10 @@ where } } -impl fmt::Display for DispatcherError +impl + Decoder, I> fmt::Display for DispatcherError where E: fmt::Display, - ::Error: fmt::Debug, + >::Error: fmt::Debug, ::Error: fmt::Debug, { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -62,44 +59,44 @@ pub enum Message { Close, } -/// FramedTransport - is a future that reads frames from Framed object -/// and pass then to the service. +/// Dispatcher is a future that reads frames from Framed object +/// and passes them to the service. #[pin_project::pin_project] -pub struct Dispatcher +pub struct Dispatcher where - S: Service, Response = Response>, + S: Service::Item, Response = I>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, - U: Encoder + Decoder, - ::Item: 'static, - ::Error: std::fmt::Debug, + U: Encoder + Decoder, + // I: 'static, + >::Error: std::fmt::Debug, { service: S, - state: State, + state: State, #[pin] framed: Framed, - rx: mpsc::Receiver::Item>, S::Error>>, - tx: mpsc::Sender::Item>, S::Error>>, + rx: mpsc::Receiver, S::Error>>, + tx: mpsc::Sender, S::Error>>, } -enum State { +enum State + Decoder, I> { Processing, - Error(DispatcherError), - FramedError(DispatcherError), + Error(DispatcherError), + FramedError(DispatcherError), FlushAndStop, Stopping, } -impl State { - fn take_error(&mut self) -> DispatcherError { +impl + Decoder, I> State { + fn take_error(&mut self) -> DispatcherError { match mem::replace(self, State::Processing) { State::Error(err) => err, _ => panic!(), } } - fn take_framed_error(&mut self) -> DispatcherError { + fn take_framed_error(&mut self) -> DispatcherError { match mem::replace(self, State::Processing) { State::FramedError(err) => err, _ => panic!(), @@ -107,15 +104,16 @@ impl State { } } -impl Dispatcher +impl Dispatcher where - S: Service, Response = Response>, + S: Service::Item, Response = I>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, + U: Decoder + Encoder, + // I: 'static, + ::Error: std::fmt::Debug, + >::Error: std::fmt::Debug, { pub fn new>(framed: Framed, service: F) -> Self { let (tx, rx) = mpsc::channel(); @@ -132,7 +130,7 @@ where pub fn with_rx>( framed: Framed, service: F, - rx: mpsc::Receiver::Item>, S::Error>>, + rx: mpsc::Receiver, S::Error>>, ) -> Self { let tx = rx.sender(); Dispatcher { @@ -145,7 +143,7 @@ where } /// Get sink - pub fn get_sink(&self) -> mpsc::Sender::Item>, S::Error>> { + pub fn get_sink(&self) -> mpsc::Sender, S::Error>> { self.tx.clone() } @@ -172,13 +170,13 @@ where fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool where - S: Service, Response = Response>, + S: Service::Item, Response = I>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, + U: Decoder + Encoder, + I: 'static, + >::Error: std::fmt::Debug, { loop { let this = self.as_mut().project(); @@ -214,13 +212,13 @@ where /// write to framed object fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool where - S: Service, Response = Response>, + S: Service::Item, Response = I>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, + U: Decoder + Encoder, + I: 'static, + >::Error: std::fmt::Debug, { loop { let mut this = self.as_mut().project(); @@ -263,18 +261,18 @@ where } } -impl Future for Dispatcher +impl Future for Dispatcher where - S: Service, Response = Response>, + S: Service::Item, Response = I>, S::Error: 'static, S::Future: 'static, T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, - ::Item: 'static, - ::Error: std::fmt::Debug, + U: Decoder + Encoder, + I: 'static, + >::Error: std::fmt::Debug, ::Error: std::fmt::Debug, { - type Output = Result<(), DispatcherError>; + type Output = Result<(), DispatcherError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { diff --git a/actix-utils/src/lib.rs b/actix-utils/src/lib.rs index c4d56c56..3c66accc 100644 --- a/actix-utils/src/lib.rs +++ b/actix-utils/src/lib.rs @@ -1,5 +1,5 @@ //! Actix utils - various helper services -#![deny(rust_2018_idioms, warnings)] +#![deny(rust_2018_idioms)] #![allow(clippy::type_complexity)] mod cell;