diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index 1ae07dfb..e9672bf9 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -17,12 +17,10 @@ path = "src/lib.rs" [dependencies] bitflags = "1.2.1" - -# ToDo: update to bytes 0.6 when tokio-util is bytes 0.6 compat -bytes = "0.5" +bytes = "0.6" futures-core = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false } log = "0.4" pin-project = "1.0.0" tokio = "0.3.1" -tokio-util = { version = "0.4.0", default-features = false, features = ["codec"] } +tokio-util = { version = "0.5.0", default-features = false, features = ["codec"] } diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 758cdeed..7bfc2d9a 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -8,6 +8,7 @@ use futures_sink::Sink; use pin_project::pin_project; use crate::{AsyncRead, AsyncWrite, Decoder, Encoder, ReadBuf}; +use std::mem::MaybeUninit; /// Low-water mark const LW: usize = 1024; @@ -221,21 +222,28 @@ impl Framed { this.read_buf.reserve(HW - remaining) } - // FixMe: Is this the right way to do it for now? - let mut read = ReadBuf::uninit(this.read_buf.bytes_mut()); - let cnt = match this.io.poll_read(cx, &mut read) { + // FIXME: Is this the right way to do it for now? (line 225 - 244) + let dst = this.read_buf.bytes_mut(); + // SAFETY: `BufMut::bytes_mut` only return an empty slice when remaining is 0. + let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit]) }; + let mut buf = ReadBuf::uninit(dst); + + let cnt = match this.io.poll_read(cx, &mut buf) { Poll::Pending => return Poll::Pending, Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), - Poll::Ready(Ok(())) => read.filled().len(), + Poll::Ready(Ok(())) => buf.filled().len(), }; if cnt == 0 { this.flags.insert(Flags::EOF); } else { + // SAFETY: This is guaranteed to be the number of initialized (and read) + // bytes due to the invariants provided by `ReadBuf::filled`. unsafe { this.read_buf.advance_mut(cnt); } } + this.flags.insert(Flags::READABLE); } } diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index e2e74893..1e47f92e 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -35,7 +35,7 @@ slab = "0.4" [dev-dependencies] actix-testing = "1.0.0" -bytes = "0.5" +bytes = "0.6" env_logger = "0.7" futures-util = { version = "0.3.4", default-features = false, features = ["sink"] } tokio = { version = "0.3.1", features = ["full"] } diff --git a/actix-server/examples/basic.rs b/actix-server/examples/basic.rs index 3f4d87cf..45e473a9 100644 --- a/actix-server/examples/basic.rs +++ b/actix-server/examples/basic.rs @@ -52,8 +52,7 @@ async fn main() -> io::Result<()> { let mut buf = BytesMut::new(); loop { - // ToDo: change to read_buf - match stream.read(&mut buf).await { + match stream.read_buf(&mut buf).await { // end of stream; bail from loop Ok(0) => break, @@ -73,7 +72,7 @@ async fn main() -> io::Result<()> { } // send data down service pipeline - Ok((buf.len(), size)) + Ok((buf.freeze(), size)) } }) .map_err(|err| error!("Service Error: {:?}", err)) diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index c40bcd07..1f084559 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -39,9 +39,9 @@ pub(crate) struct AcceptLoop { impl AcceptLoop { pub fn new(srv: Server) -> Self { - let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create mio::Poll: {}", e)); + let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); let waker = WakerQueue::with_capacity(poll.registry(), 128) - .unwrap_or_else(|e| panic!("Can not create mio::Waker: {}", e)); + .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); Self { srv: Some(srv), diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 6eb67d3c..416e253b 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -3,7 +3,6 @@ pub(crate) use std::net::{ }; pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket}; - #[cfg(unix)] pub(crate) use { mio::net::UnixListener as MioUnixListener, @@ -26,10 +25,6 @@ use { std::os::unix::io::{FromRawFd, IntoRawFd}, }; -/// socket module contains a unified wrapper for Tcp/Uds listener/SocketAddr/Stream and necessary -/// trait impl for registering the listener to mio::Poll and convert stream to -/// `actix_rt::net::{TcpStream, UnixStream}`. - pub(crate) enum MioListener { Tcp(MioTcpListener), #[cfg(unix)] @@ -175,15 +170,14 @@ pub trait FromStream: Sized { fn from_mio(sock: MioStream) -> io::Result; } -// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream +// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream #[cfg(unix)] impl FromStream for TcpStream { fn from_mio(sock: MioStream) -> io::Result { match sock { MioStream::Tcp(mio) => { let raw = IntoRawFd::into_raw_fd(mio); - // # Safety: - // This is a in place conversion from mio stream to tokio stream. + // SAFETY: This is a in place conversion from mio stream to tokio stream. TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) } MioStream::Uds(_) => { @@ -193,22 +187,21 @@ impl FromStream for TcpStream { } } -// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream +// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream #[cfg(windows)] impl FromStream for TcpStream { fn from_mio(sock: MioStream) -> io::Result { match sock { MioStream::Tcp(mio) => { let raw = IntoRawSocket::into_raw_socket(mio); - // # Safety: - // This is a in place conversion from mio stream to tokio stream. + // SAFETY: This is a in place conversion from mio stream to tokio stream. TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) } } } } -// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream +// FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream #[cfg(unix)] impl FromStream for UnixStream { fn from_mio(sock: MioStream) -> io::Result { @@ -216,8 +209,7 @@ impl FromStream for UnixStream { MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), MioStream::Uds(mio) => { let raw = IntoRawFd::into_raw_fd(mio); - // # Safety: - // This is a in place conversion from mio stream to tokio stream. + // SAFETY: This is a in place conversion from mio stream to tokio stream. UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) } } diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index fadfee15..105ef246 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -43,7 +43,7 @@ impl WakerQueue { pub(crate) fn wake(&self, interest: WakerInterest) { let (waker, queue) = self.deref(); - // ToDo: should we handle error here? + // FIXME: should we handle error here? queue .push(interest) .unwrap_or_else(|e| panic!("WakerQueue overflow: {}", e));