diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 376e4b31..8a115060 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -19,7 +19,7 @@ path = "src/lib.rs" [dependencies] bytes = "0.4.12" -futures = "0.1.24" -tokio-io = "0.1.12" -tokio-codec = "0.1.1" -log = "0.4" \ No newline at end of file +futures = { package = "futures-preview", version = "0.3.0-alpha.18" } +tokio-io = "0.2.0-alpha.4" +tokio-codec = "0.2.0-alpha.4" +log = "0.4" diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 7fbb4f27..796b3e1a 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -4,12 +4,14 @@ use std::fmt; use std::io::{self, Read, Write}; use bytes::BytesMut; -use futures::{Poll, Sink, StartSend, Stream}; +use futures::{Poll, Sink, Stream}; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; use super::framed_read::{framed_read2, framed_read2_with_buffer, FramedRead2}; use super::framed_write::{framed_write2, framed_write2_with_buffer, FramedWrite2}; +use std::pin::Pin; +use std::task::Context; const LW: usize = 1024; const HW: usize = 8 * 1024; @@ -226,36 +228,35 @@ where T: AsyncRead, U: Decoder, { - type Item = U::Item; - type Error = U::Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )} } } -impl Sink for Framed +impl Sink for Framed where T: AsyncWrite, U: Encoder, U::Error: From, { - type SinkItem = U::Item; - type SinkError = U::Error; + type Error = U::Error; - fn start_send( - &mut self, - item: Self::SinkItem, - ) -> StartSend { - self.inner.get_mut().start_send(item) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_ready(cx)} } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.get_mut().poll_complete() + fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).start_send(item)} } - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.inner.get_mut().close() + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_flush(cx)} + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| s.inner.get_mut()).poll_close(cx)} } } @@ -284,6 +285,10 @@ impl AsyncRead for Fuse { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.0.prepare_uninitialized_buffer(buf) } + + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_read(cx, buf)} + } } impl Write for Fuse { @@ -297,8 +302,16 @@ impl Write for Fuse { } impl AsyncWrite for Fuse { - fn shutdown(&mut self) -> Poll<(), io::Error> { - self.0.shutdown() + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_write(cx, buf)} + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_flush(cx)} + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.0).poll_shutdown(cx)} } } diff --git a/actix-codec/src/framed_read.rs b/actix-codec/src/framed_read.rs index f54b2c23..46a91c64 100644 --- a/actix-codec/src/framed_read.rs +++ b/actix-codec/src/framed_read.rs @@ -1,7 +1,9 @@ use std::fmt; +use std::pin::Pin; +use std::task::Context; use bytes::BytesMut; -use futures::{try_ready, Async, Poll, Sink, StartSend, Stream}; +use futures::{ready, Poll, Sink, Stream}; use log::trace; use tokio_codec::Decoder; use tokio_io::AsyncRead; @@ -83,34 +85,33 @@ where T: AsyncRead, D: Decoder, { - type Item = D::Item; - type Error = D::Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.poll() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_next(cx )} } } -impl Sink for FramedRead +impl Sink for FramedRead where - T: Sink, + T: Sink, { - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; + type Error = T::Error; - fn start_send( - &mut self, - item: Self::SinkItem, - ) -> StartSend { - self.inner.inner.0.start_send(item) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_ready(cx)} } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.inner.0.poll_complete() + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).start_send(item)} } - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.inner.inner.0.close() + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_flush(cx)} + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner.inner.0).poll_close(cx)} } } @@ -176,43 +177,43 @@ impl Stream for FramedRead2 where T: AsyncRead + Decoder, { - type Item = T::Item; - type Error = T::Error; + type Item = Result; - fn poll(&mut self) -> Poll, Self::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = unsafe { self.get_unchecked_mut() }; loop { // Repeatedly call `decode` or `decode_eof` as long as it is // "readable". Readable is defined as not having returned `None`. If // the upstream has returned EOF, and the decoder is no longer // readable, it can be assumed that the decoder will never become // readable again, at which point the stream is terminated. - if self.is_readable { - if self.eof { - let frame = self.inner.decode_eof(&mut self.buffer)?; - return Ok(Async::Ready(frame)); + if this.is_readable { + if this.eof { + let frame= this.inner.decode_eof(&mut this.buffer)?; + return Poll::Ready(Ok(frame).transpose()); } trace!("attempting to decode a frame"); - if let Some(frame) = self.inner.decode(&mut self.buffer)? { + if let Some(frame) = this.inner.decode(&mut this.buffer)? { trace!("frame decoded from buffer"); - return Ok(Async::Ready(Some(frame))); + return Poll::Ready(Some(Ok(frame))); } - self.is_readable = false; + this.is_readable = false; } - assert!(!self.eof); + assert!(!this.eof); // Otherwise, try to read more data and try again. Make sure we've // got room for at least one byte to read to ensure that we don't // get a spurious 0 that looks like EOF - self.buffer.reserve(1); - if 0 == try_ready!(self.inner.read_buf(&mut self.buffer)) { - self.eof = true; + this.buffer.reserve(1); + if 0 == ready!( unsafe { Pin::new_unchecked(&mut this.inner)}.poll_read(cx, &mut this.buffer))? { + this.eof = true; } - self.is_readable = true; + this.is_readable = true; } } } diff --git a/actix-codec/src/framed_write.rs b/actix-codec/src/framed_write.rs index 6b5c2b19..12fe13d3 100644 --- a/actix-codec/src/framed_write.rs +++ b/actix-codec/src/framed_write.rs @@ -1,8 +1,10 @@ use std::fmt; use std::io::{self, Read}; +use std::task::Context; +use std::pin::Pin; use bytes::BytesMut; -use futures::{try_ready, Async, AsyncSink, Poll, Sink, StartSend, Stream}; +use futures::{ready, Poll, Sink, Stream}; use log::trace; use tokio_codec::{Decoder, Encoder}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -95,24 +97,27 @@ where } } -impl Sink for FramedWrite +impl Sink for FramedWrite where T: AsyncWrite, E: Encoder, { - type SinkItem = E::Item; - type SinkError = E::Error; + type Error = E::Error; - fn start_send(&mut self, item: E::Item) -> StartSend { - self.inner.start_send(item) + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe {self.map_unchecked_mut(|s| &mut s.inner)}.poll_ready(cx) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.inner.poll_complete() + fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { + unsafe {self.map_unchecked_mut(|s| &mut s.inner)}.start_send(item) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - Ok(self.inner.close()?) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe {self.map_unchecked_mut(|s| &mut s.inner)}.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe {self.map_unchecked_mut(|s| &mut s.inner)}.poll_close(cx) } } @@ -121,10 +126,9 @@ where T: Stream, { type Item = T::Item; - type Error = T::Error; - fn poll(&mut self) -> Poll, Self::Error> { - self.inner.inner.0.poll() + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe {self.map_unchecked_mut(|s| &mut s.inner.inner.0)}.poll_next(cx) } } @@ -220,60 +224,70 @@ where } } -impl Sink for FramedWrite2 +impl Sink for FramedWrite2 where T: AsyncWrite + Encoder, { - type SinkItem = T::Item; - type SinkError = T::Error; + type Error = T::Error; - fn start_send(&mut self, item: T::Item) -> StartSend { - // Check the buffer capacity + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { let len = self.buffer.len(); if len >= self.high_watermark { - return Ok(AsyncSink::NotReady(item)); + return Poll::Pending; + } else { + return Poll::Ready(Ok(())); } - if len < self.low_watermark { - self.buffer.reserve(self.high_watermark - len) - } - - self.inner.encode(item, &mut self.buffer)?; - - Ok(AsyncSink::Ready) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: ::Item) -> Result<(), Self::Error> { + let this = unsafe { self.get_unchecked_mut() }; + // Check the buffer capacity + let len = this.buffer.len(); + if len < this.low_watermark { + this.buffer.reserve(this.high_watermark - len) + } + + this.inner.encode(item, &mut this.buffer)?; + + Ok(()) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = unsafe { self.get_unchecked_mut() }; trace!("flushing framed transport"); - while !self.buffer.is_empty() { - trace!("writing; remaining={}", self.buffer.len()); + while !this.buffer.is_empty() { + trace!("writing; remaining={}", this.buffer.len()); - let n = try_ready!(self.inner.poll_write(&self.buffer)); + let n = ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_write(cx, &this.buffer))?; if n == 0 { - return Err(io::Error::new( + return Poll::Ready(Err(io::Error::new( io::ErrorKind::WriteZero, "failed to \ write frame to transport", ) - .into()); + .into())) } // TODO: Add a way to `bytes` to do this w/o returning the drained // data. - let _ = self.buffer.split_to(n); + let _ = this.buffer.split_to(n); } // Try flushing the underlying IO - try_ready!(self.inner.poll_flush()); + ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_flush(cx))?; trace!("framed transport flushed"); - Ok(Async::Ready(())) + Poll::Ready(Ok(())) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - try_ready!(self.poll_complete()); - Ok(self.inner.shutdown()?) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = unsafe { self.get_unchecked_mut() }; + ready!(unsafe {Pin::new_unchecked(&mut this).map_unchecked_mut(|s|*s)}.poll_flush(cx))?; + ready!(unsafe {Pin::new_unchecked(&mut this.inner)}.poll_shutdown(cx))?; + + Poll::Ready(Ok(())) } } @@ -300,4 +314,8 @@ impl AsyncRead for FramedWrite2 { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.inner.prepare_uninitialized_buffer(buf) } + + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + unsafe { self.map_unchecked_mut(|s| &mut s.inner).poll_read(cx, buf)} + } } diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index 77102a8c..8de83c88 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -21,4 +21,5 @@ pub use self::framed_read::FramedRead; pub use self::framed_write::FramedWrite; pub use tokio_codec::{Decoder, Encoder}; +// TODO: Possibly migrate to futures' definition AsyncRead pub use tokio_io::{AsyncRead, AsyncWrite};