upgrade tokio-util to 0.3 and alter actix-codex

This commit is contained in:
Rob Ede 2020-05-18 01:50:12 +01:00
parent 61176f6410
commit 52736c6cd8
No known key found for this signature in database
GPG Key ID: C2A3B36E841A91E6
4 changed files with 26 additions and 33 deletions

View File

@ -21,7 +21,7 @@ bitflags = "1.2.1"
bytes = "0.5.2"
futures-core = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false }
tokio = { version = "0.2.4", default-features=false }
tokio-util = { version = "0.2.0", default-features=false, features=["codec"] }
tokio = { version = "0.2.5", default-features = false }
tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] }
log = "0.4"
pin-project = "0.4.17"

View File

@ -9,8 +9,7 @@ use super::{Decoder, Encoder};
#[derive(Debug, Copy, Clone)]
pub struct BytesCodec;
impl Encoder for BytesCodec {
type Item = Bytes;
impl Encoder<Bytes> for BytesCodec {
type Error = io::Error;
fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {

View File

@ -9,7 +9,9 @@ use pin_project::pin_project;
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder};
/// Low-water mark
const LW: usize = 1024;
/// High-water mark
const HW: usize = 8 * 1024;
bitflags::bitflags! {
@ -34,7 +36,7 @@ pub struct Framed<T, U> {
impl<T, U> Framed<T, U>
where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
U: Decoder,
{
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
@ -129,7 +131,7 @@ impl<T, U> Framed<T, U> {
}
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn into_framed<U2>(self, codec: U2) -> Framed<T, U2> {
pub fn into_framed<U2, I2>(self, codec: U2) -> Framed<T, U2> {
Framed {
codec,
io: self.io,
@ -140,7 +142,7 @@ impl<T, U> Framed<T, U> {
}
/// Consume the `Frame`, returning `Frame` with different io.
pub fn map_io<F, T2>(self, f: F) -> Framed<T2, U>
pub fn map_io<F, T2, I2>(self, f: F) -> Framed<T2, U>
where
F: Fn(T) -> T2,
{
@ -154,7 +156,7 @@ impl<T, U> Framed<T, U> {
}
/// Consume the `Frame`, returning `Frame` with different codec.
pub fn map_codec<F, U2>(self, f: F) -> Framed<T, U2>
pub fn map_codec<F, U2, I2>(self, f: F) -> Framed<T, U2>
where
F: Fn(U) -> U2,
{
@ -186,10 +188,10 @@ impl<T, U> Framed<T, U> {
impl<T, U> Framed<T, U> {
/// Serialize item and Write to the inner buffer
pub fn write(mut self: Pin<&mut Self>, item: <U as Encoder>::Item) -> Result<(), <U as Encoder>::Error>
pub fn write<I>(mut self: Pin<&mut Self>, item: I) -> Result<(), <U as Encoder<I>>::Error>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let this = self.as_mut().project();
let remaining = this.write_buf.capacity() - this.write_buf.len();
@ -209,7 +211,10 @@ impl<T, U> Framed<T, U> {
}
/// Try to read underlying I/O stream and decode item.
pub fn next_item(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<U::Item, U::Error>>>
pub fn next_item(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<<U as Decoder>::Item, U::Error>>>
where
T: AsyncRead,
U: Decoder,
@ -266,10 +271,10 @@ impl<T, U> Framed<T, U> {
}
/// Flush write buffer to underlying I/O stream.
pub fn flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
pub fn flush<I>(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let mut this = self.as_mut().project();
log::trace!("flushing framed transport");
@ -277,9 +282,7 @@ impl<T, U> Framed<T, U> {
while !this.write_buf.is_empty() {
log::trace!("writing; remaining={}", this.write_buf.len());
let n = ready!(
this.io.as_mut().poll_write(cx, this.write_buf)
)?;
let n = ready!(this.io.as_mut().poll_write(cx, this.write_buf))?;
if n == 0 {
return Poll::Ready(Err(io::Error::new(
@ -301,10 +304,10 @@ impl<T, U> Framed<T, U> {
}
/// Flush write buffer and shutdown underlying I/O stream.
pub fn close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
pub fn close<I>(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let mut this = self.as_mut().project();
ready!(this.io.as_mut().poll_flush(cx))?;
@ -325,10 +328,10 @@ where
}
}
impl<T, U> Sink<U::Item> for Framed<T, U>
impl<T, U, I> Sink<I> for Framed<T, U>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
U::Error: From<io::Error>,
{
type Error = U::Error;
@ -341,24 +344,15 @@ where
}
}
fn start_send(
self: Pin<&mut Self>,
item: <U as Encoder>::Item,
) -> Result<(), Self::Error> {
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
self.write(item)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.flush(cx)
}
fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.close(cx)
}
}

View File

@ -8,7 +8,7 @@
//! [`AsyncWrite`]: AsyncWrite
//! [`Sink`]: futures_sink::Sink
//! [`Stream`]: futures_core::Stream
#![deny(rust_2018_idioms, warnings)]
#![deny(rust_2018_idioms)]
mod bcodec;
mod framed;