diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 1d9a669b..8b14dd70 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -21,7 +21,7 @@ bitflags = "1.2.1" bytes = "0.5.2" futures-core = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false } -tokio = { version = "0.2.4", default-features=false } -tokio-util = { version = "0.2.0", default-features=false, features=["codec"] } +tokio = { version = "0.2.5", default-features = false } +tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] } log = "0.4" pin-project = "0.4.17" diff --git a/actix-codec/src/bcodec.rs b/actix-codec/src/bcodec.rs index c71c0fa4..4d92ee69 100644 --- a/actix-codec/src/bcodec.rs +++ b/actix-codec/src/bcodec.rs @@ -9,8 +9,7 @@ use super::{Decoder, Encoder}; #[derive(Debug, Copy, Clone)] pub struct BytesCodec; -impl Encoder for BytesCodec { - type Item = Bytes; +impl Encoder for BytesCodec { type Error = io::Error; fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> { diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 685d87c9..cba5dd3c 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -9,7 +9,9 @@ use pin_project::pin_project; use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; +/// Low-water mark const LW: usize = 1024; +/// High-water mark const HW: usize = 8 * 1024; bitflags::bitflags! { @@ -34,7 +36,7 @@ pub struct Framed { impl Framed where T: AsyncRead + AsyncWrite, - U: Decoder + Encoder, + U: Decoder, { /// Provides a `Stream` and `Sink` interface for reading and writing to this /// `Io` object, using `Decode` and `Encode` to read and write the raw data. @@ -129,7 +131,7 @@ impl Framed { } /// Consume the `Frame`, returning `Frame` with different codec. - pub fn into_framed(self, codec: U2) -> Framed { + pub fn into_framed(self, codec: U2) -> Framed { Framed { codec, io: self.io, @@ -140,7 +142,7 @@ impl Framed { } /// Consume the `Frame`, returning `Frame` with different io. - pub fn map_io(self, f: F) -> Framed + pub fn map_io(self, f: F) -> Framed where F: Fn(T) -> T2, { @@ -154,7 +156,7 @@ impl Framed { } /// Consume the `Frame`, returning `Frame` with different codec. - pub fn map_codec(self, f: F) -> Framed + pub fn map_codec(self, f: F) -> Framed where F: Fn(U) -> U2, { @@ -186,10 +188,10 @@ impl Framed { impl Framed { /// Serialize item and Write to the inner buffer - pub fn write(mut self: Pin<&mut Self>, item: ::Item) -> Result<(), ::Error> + pub fn write(mut self: Pin<&mut Self>, item: I) -> Result<(), >::Error> where T: AsyncWrite, - U: Encoder, + U: Encoder, { let this = self.as_mut().project(); let remaining = this.write_buf.capacity() - this.write_buf.len(); @@ -209,7 +211,10 @@ impl Framed { } /// Try to read underlying I/O stream and decode item. - pub fn next_item(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> + pub fn next_item( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll::Item, U::Error>>> where T: AsyncRead, U: Decoder, @@ -266,10 +271,10 @@ impl Framed { } /// Flush write buffer to underlying I/O stream. - pub fn flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> + pub fn flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> where T: AsyncWrite, - U: Encoder, + U: Encoder, { let mut this = self.as_mut().project(); log::trace!("flushing framed transport"); @@ -277,9 +282,7 @@ impl Framed { while !this.write_buf.is_empty() { log::trace!("writing; remaining={}", this.write_buf.len()); - let n = ready!( - this.io.as_mut().poll_write(cx, this.write_buf) - )?; + let n = ready!(this.io.as_mut().poll_write(cx, this.write_buf))?; if n == 0 { return Poll::Ready(Err(io::Error::new( @@ -301,10 +304,10 @@ impl Framed { } /// Flush write buffer and shutdown underlying I/O stream. - pub fn close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> + pub fn close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> where T: AsyncWrite, - U: Encoder, + U: Encoder, { let mut this = self.as_mut().project(); ready!(this.io.as_mut().poll_flush(cx))?; @@ -325,10 +328,10 @@ where } } -impl Sink for Framed +impl Sink for Framed where T: AsyncWrite, - U: Encoder, + U: Encoder, U::Error: From, { type Error = U::Error; @@ -341,24 +344,15 @@ where } } - fn start_send( - self: Pin<&mut Self>, - item: ::Item, - ) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { self.write(item) } - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.flush(cx) } - fn poll_close( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.close(cx) } } diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index 7c38bdf7..3035be13 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -8,7 +8,7 @@ //! [`AsyncWrite`]: AsyncWrite //! [`Sink`]: futures_sink::Sink //! [`Stream`]: futures_core::Stream -#![deny(rust_2018_idioms, warnings)] +#![deny(rust_2018_idioms)] mod bcodec; mod framed;