use new codec signature in actix-utils

This commit is contained in:
Rob Ede 2020-05-18 01:50:47 +01:00
parent 52736c6cd8
commit 934a9867f7
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6
2 changed files with 47 additions and 49 deletions

View File

@ -6,31 +6,28 @@ use std::{fmt, mem};
use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service};
use futures_util::{future::Future, FutureExt, stream::Stream};
use futures_util::{future::Future, stream::Stream, FutureExt};
use log::debug;
use crate::mpsc;
type Request<U> = <U as Decoder>::Item;
type Response<U> = <U as Encoder>::Item;
/// Framed transport errors
pub enum DispatcherError<E, U: Encoder + Decoder> {
pub enum DispatcherError<E, U: Encoder<I> + Decoder, I> {
Service(E),
Encoder(<U as Encoder>::Error),
Encoder(<U as Encoder<I>>::Error),
Decoder(<U as Decoder>::Error),
}
impl<E, U: Encoder + Decoder> From<E> for DispatcherError<E, U> {
impl<E, U: Encoder<I> + Decoder, I> From<E> for DispatcherError<E, U, I> {
fn from(err: E) -> Self {
DispatcherError::Service(err)
}
}
impl<E, U: Encoder + Decoder> fmt::Debug for DispatcherError<E, U>
impl<E, U: Encoder<I> + Decoder, I> fmt::Debug for DispatcherError<E, U, I>
where
E: fmt::Debug,
<U as Encoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -42,10 +39,10 @@ where
}
}
impl<E, U: Encoder + Decoder> fmt::Display for DispatcherError<E, U>
impl<E, U: Encoder<I> + Decoder, I> fmt::Display for DispatcherError<E, U, I>
where
E: fmt::Display,
<U as Encoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -62,44 +59,44 @@ pub enum Message<T> {
Close,
}
/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
/// Dispatcher is a future that reads frames from Framed object
/// and passes them to the service.
#[pin_project::pin_project]
pub struct Dispatcher<S, T, U>
pub struct Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Encoder + Decoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Encoder<I> + Decoder,
// I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
service: S,
state: State<S, U>,
state: State<S, U, I>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<Message<<U as Encoder>::Item>, S::Error>>,
tx: mpsc::Sender<Result<Message<<U as Encoder>::Item>, S::Error>>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
}
enum State<S: Service, U: Encoder + Decoder> {
enum State<S: Service, U: Encoder<I> + Decoder, I> {
Processing,
Error(DispatcherError<S::Error, U>),
FramedError(DispatcherError<S::Error, U>),
Error(DispatcherError<S::Error, U, I>),
FramedError(DispatcherError<S::Error, U, I>),
FlushAndStop,
Stopping,
}
impl<S: Service, U: Encoder + Decoder> State<S, U> {
fn take_error(&mut self) -> DispatcherError<S::Error, U> {
impl<S: Service, U: Encoder<I> + Decoder, I> State<S, U, I> {
fn take_error(&mut self) -> DispatcherError<S::Error, U, I> {
match mem::replace(self, State::Processing) {
State::Error(err) => err,
_ => panic!(),
}
}
fn take_framed_error(&mut self) -> DispatcherError<S::Error, U> {
fn take_framed_error(&mut self) -> DispatcherError<S::Error, U, I> {
match mem::replace(self, State::Processing) {
State::FramedError(err) => err,
_ => panic!(),
@ -107,15 +104,16 @@ impl<S: Service, U: Encoder + Decoder> State<S, U> {
}
}
impl<S, T, U> Dispatcher<S, T, U>
impl<S, T, U, I> Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
// I: 'static,
<U as Decoder>::Error: std::fmt::Debug,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
let (tx, rx) = mpsc::channel();
@ -132,7 +130,7 @@ where
pub fn with_rx<F: IntoService<S>>(
framed: Framed<T, U>,
service: F,
rx: mpsc::Receiver<Result<Message<<U as Encoder>::Item>, S::Error>>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
) -> Self {
let tx = rx.sender();
Dispatcher {
@ -145,7 +143,7 @@ where
}
/// Get sink
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<<U as Encoder>::Item>, S::Error>> {
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
self.tx.clone()
}
@ -172,13 +170,13 @@ where
fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
loop {
let this = self.as_mut().project();
@ -214,13 +212,13 @@ where
/// write to framed object
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
loop {
let mut this = self.as_mut().project();
@ -263,18 +261,18 @@ where
}
}
impl<S, T, U> Future for Dispatcher<S, T, U>
impl<S, T, U, I> Future for Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
<U as Decoder>::Error: std::fmt::Debug,
{
type Output = Result<(), DispatcherError<S::Error, U>>;
type Output = Result<(), DispatcherError<S::Error, U, I>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {

View File

@ -1,5 +1,5 @@
//! Actix utils - various helper services
#![deny(rust_2018_idioms, warnings)]
#![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity)]
mod cell;