fix according to review

This commit is contained in:
fakeshadow 2020-10-31 19:09:13 +08:00
parent 6620a8fb4b
commit a55c34c3f3
7 changed files with 26 additions and 29 deletions

View File

@ -17,12 +17,10 @@ path = "src/lib.rs"
[dependencies] [dependencies]
bitflags = "1.2.1" bitflags = "1.2.1"
bytes = "0.6"
# ToDo: update to bytes 0.6 when tokio-util is bytes 0.6 compat
bytes = "0.5"
futures-core = { version = "0.3.4", default-features = false } futures-core = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false }
log = "0.4" log = "0.4"
pin-project = "1.0.0" pin-project = "1.0.0"
tokio = "0.3.1" tokio = "0.3.1"
tokio-util = { version = "0.4.0", default-features = false, features = ["codec"] } tokio-util = { version = "0.5.0", default-features = false, features = ["codec"] }

View File

@ -8,6 +8,7 @@ use futures_sink::Sink;
use pin_project::pin_project; use pin_project::pin_project;
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder, ReadBuf}; use crate::{AsyncRead, AsyncWrite, Decoder, Encoder, ReadBuf};
use std::mem::MaybeUninit;
/// Low-water mark /// Low-water mark
const LW: usize = 1024; const LW: usize = 1024;
@ -221,21 +222,28 @@ impl<T, U> Framed<T, U> {
this.read_buf.reserve(HW - remaining) this.read_buf.reserve(HW - remaining)
} }
// FixMe: Is this the right way to do it for now? // FIXME: Is this the right way to do it for now? (line 225 - 244)
let mut read = ReadBuf::uninit(this.read_buf.bytes_mut()); let dst = this.read_buf.bytes_mut();
let cnt = match this.io.poll_read(cx, &mut read) { // SAFETY: `BufMut::bytes_mut` only return an empty slice when remaining is 0.
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
let mut buf = ReadBuf::uninit(dst);
let cnt = match this.io.poll_read(cx, &mut buf) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
Poll::Ready(Ok(())) => read.filled().len(), Poll::Ready(Ok(())) => buf.filled().len(),
}; };
if cnt == 0 { if cnt == 0 {
this.flags.insert(Flags::EOF); this.flags.insert(Flags::EOF);
} else { } else {
// SAFETY: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by `ReadBuf::filled`.
unsafe { unsafe {
this.read_buf.advance_mut(cnt); this.read_buf.advance_mut(cnt);
} }
} }
this.flags.insert(Flags::READABLE); this.flags.insert(Flags::READABLE);
} }
} }

View File

@ -35,7 +35,7 @@ slab = "0.4"
[dev-dependencies] [dev-dependencies]
actix-testing = "1.0.0" actix-testing = "1.0.0"
bytes = "0.5" bytes = "0.6"
env_logger = "0.7" env_logger = "0.7"
futures-util = { version = "0.3.4", default-features = false, features = ["sink"] } futures-util = { version = "0.3.4", default-features = false, features = ["sink"] }
tokio = { version = "0.3.1", features = ["full"] } tokio = { version = "0.3.1", features = ["full"] }

View File

@ -52,8 +52,7 @@ async fn main() -> io::Result<()> {
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
loop { loop {
// ToDo: change to read_buf match stream.read_buf(&mut buf).await {
match stream.read(&mut buf).await {
// end of stream; bail from loop // end of stream; bail from loop
Ok(0) => break, Ok(0) => break,
@ -73,7 +72,7 @@ async fn main() -> io::Result<()> {
} }
// send data down service pipeline // send data down service pipeline
Ok((buf.len(), size)) Ok((buf.freeze(), size))
} }
}) })
.map_err(|err| error!("Service Error: {:?}", err)) .map_err(|err| error!("Service Error: {:?}", err))

View File

@ -39,9 +39,9 @@ pub(crate) struct AcceptLoop {
impl AcceptLoop { impl AcceptLoop {
pub fn new(srv: Server) -> Self { pub fn new(srv: Server) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create mio::Poll: {}", e)); let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::with_capacity(poll.registry(), 128) let waker = WakerQueue::with_capacity(poll.registry(), 128)
.unwrap_or_else(|e| panic!("Can not create mio::Waker: {}", e)); .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
Self { Self {
srv: Some(srv), srv: Some(srv),

View File

@ -3,7 +3,6 @@ pub(crate) use std::net::{
}; };
pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket}; pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket};
#[cfg(unix)] #[cfg(unix)]
pub(crate) use { pub(crate) use {
mio::net::UnixListener as MioUnixListener, mio::net::UnixListener as MioUnixListener,
@ -26,10 +25,6 @@ use {
std::os::unix::io::{FromRawFd, IntoRawFd}, std::os::unix::io::{FromRawFd, IntoRawFd},
}; };
/// socket module contains a unified wrapper for Tcp/Uds listener/SocketAddr/Stream and necessary
/// trait impl for registering the listener to mio::Poll and convert stream to
/// `actix_rt::net::{TcpStream, UnixStream}`.
pub(crate) enum MioListener { pub(crate) enum MioListener {
Tcp(MioTcpListener), Tcp(MioTcpListener),
#[cfg(unix)] #[cfg(unix)]
@ -175,15 +170,14 @@ pub trait FromStream: Sized {
fn from_mio(sock: MioStream) -> io::Result<Self>; fn from_mio(sock: MioStream) -> io::Result<Self>;
} }
// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(unix)] #[cfg(unix)]
impl FromStream for TcpStream { impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> { fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock { match sock {
MioStream::Tcp(mio) => { MioStream::Tcp(mio) => {
let raw = IntoRawFd::into_raw_fd(mio); let raw = IntoRawFd::into_raw_fd(mio);
// # Safety: // SAFETY: This is a in place conversion from mio stream to tokio stream.
// This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) TcpStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
} }
MioStream::Uds(_) => { MioStream::Uds(_) => {
@ -193,22 +187,21 @@ impl FromStream for TcpStream {
} }
} }
// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(windows)] #[cfg(windows)]
impl FromStream for TcpStream { impl FromStream for TcpStream {
fn from_mio(sock: MioStream) -> io::Result<Self> { fn from_mio(sock: MioStream) -> io::Result<Self> {
match sock { match sock {
MioStream::Tcp(mio) => { MioStream::Tcp(mio) => {
let raw = IntoRawSocket::into_raw_socket(mio); let raw = IntoRawSocket::into_raw_socket(mio);
// # Safety: // SAFETY: This is a in place conversion from mio stream to tokio stream.
// This is a in place conversion from mio stream to tokio stream.
TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) }) TcpStream::from_std(unsafe { FromRawSocket::from_raw_socket(raw) })
} }
} }
} }
} }
// ToDo: This is a workaround and we need an efficient way to convert between mio and tokio stream // FIXME: This is a workaround and we need an efficient way to convert between mio and tokio stream
#[cfg(unix)] #[cfg(unix)]
impl FromStream for UnixStream { impl FromStream for UnixStream {
fn from_mio(sock: MioStream) -> io::Result<Self> { fn from_mio(sock: MioStream) -> io::Result<Self> {
@ -216,8 +209,7 @@ impl FromStream for UnixStream {
MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"), MioStream::Tcp(_) => panic!("Should not happen, bug in server impl"),
MioStream::Uds(mio) => { MioStream::Uds(mio) => {
let raw = IntoRawFd::into_raw_fd(mio); let raw = IntoRawFd::into_raw_fd(mio);
// # Safety: // SAFETY: This is a in place conversion from mio stream to tokio stream.
// This is a in place conversion from mio stream to tokio stream.
UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) }) UnixStream::from_std(unsafe { FromRawFd::from_raw_fd(raw) })
} }
} }

View File

@ -43,7 +43,7 @@ impl WakerQueue {
pub(crate) fn wake(&self, interest: WakerInterest) { pub(crate) fn wake(&self, interest: WakerInterest) {
let (waker, queue) = self.deref(); let (waker, queue) = self.deref();
// ToDo: should we handle error here? // FIXME: should we handle error here?
queue queue
.push(interest) .push(interest)
.unwrap_or_else(|e| panic!("WakerQueue overflow: {}", e)); .unwrap_or_else(|e| panic!("WakerQueue overflow: {}", e));