diff --git a/actix-codec/src/framed.rs b/actix-codec/src/framed.rs index 37beaac9..9ab9969a 100644 --- a/actix-codec/src/framed.rs +++ b/actix-codec/src/framed.rs @@ -7,7 +7,7 @@ use futures_core::{ready, Stream}; use futures_sink::Sink; use pin_project::pin_project; -use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; +use crate::{AsyncRead, AsyncWrite, Decoder, Encoder, ReadBuf}; /// Low-water mark const LW: usize = 1024; @@ -221,13 +221,13 @@ impl Framed { this.read_buf.reserve(HW - remaining) } - // FixMe: This must be fixed as `poll_read_buf` is removed - // let cnt = match this.io.poll_read(cx, &mut this.read_buf) { - // Poll::Pending => return Poll::Pending, - // Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), - // Poll::Ready(Ok(cnt)) => cnt, - // }; - let cnt = 0; + // FixMe: Is this the right way to do it for now? + let mut buf = ReadBuf::new(&mut this.read_buf); + let cnt = match this.io.poll_read(cx, &mut buf) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), + Poll::Ready(Ok(())) => buf.filled().len(), + }; if cnt == 0 { this.flags.insert(Flags::EOF); diff --git a/actix-codec/src/lib.rs b/actix-codec/src/lib.rs index d972763e..b5f32066 100644 --- a/actix-codec/src/lib.rs +++ b/actix-codec/src/lib.rs @@ -18,5 +18,5 @@ mod framed; pub use self::bcodec::BytesCodec; pub use self::framed::{Framed, FramedParts}; -pub use tokio::io::{AsyncRead, AsyncWrite}; +pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; pub use tokio_util::codec::{Decoder, Encoder}; diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index c423981f..425e3605 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -50,7 +50,7 @@ tokio-openssl = { version = "0.5.0", optional = true } # rustls rust-tls = { package = "rustls", version = "0.18.0", optional = true } -tokio-rustls = { version = "0.14.0", optional = true } +tokio-rustls = { version = "0.20.0", optional = true } webpki = { version = "0.21", optional = true } [dev-dependencies] diff --git a/actix-server/examples/basic.rs b/actix-server/examples/basic.rs index 9a442f99..b60a6a05 100644 --- a/actix-server/examples/basic.rs +++ b/actix-server/examples/basic.rs @@ -18,10 +18,9 @@ use std::{env, io}; use actix_rt::net::TcpStream; use actix_server::Server; use actix_service::pipeline_factory; -use bytes::BytesMut; use futures_util::future::ok; use log::{error, info}; -use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadBuf}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; #[actix_rt::main] async fn main() -> io::Result<()> { @@ -49,7 +48,8 @@ async fn main() -> io::Result<()> { let num = num + 1; let mut size = 0; - let mut buf = Vec::new(); + // FixMe: BytesMut and Vec are not working? + let mut buf = [0; 1024]; loop { match stream.read(&mut buf).await { diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index a8710c4e..a78e3ea6 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -130,7 +130,7 @@ impl Accept { let addr = lst.local_addr(); let mut server = lst - .into_listener() + .into_mio_listener() .unwrap_or_else(|e| panic!("Can not set non_block on listener: {}", e)); let entry = sockets.vacant_entry(); let token = entry.key(); diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index afbf4637..40850aa1 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -1,14 +1,19 @@ -use std::{fmt, io, net}; +use std::net::{SocketAddr as StdTcpSocketAddr, TcpListener as StdTcpListener}; +use std::{fmt, io}; #[cfg(unix)] -use std::os::unix::io::{FromRawFd, IntoRawFd}; +use std::os::unix::{ + io::{FromRawFd, IntoRawFd}, + net::{SocketAddr as StdUdsSocketAddr, UnixListener as StdUnixListener}, +}; + #[cfg(windows)] use std::os::windows::io::{FromRawSocket, IntoRawSocket}; use mio::event::Source; use mio::net::{ - TcpListener as MioTcpListener, TcpStream as MioTcpStream, UnixListener as MioUnixListener, - UnixStream as MioUnixStream, + SocketAddr as MioSocketAddr, TcpListener as MioTcpListener, TcpStream as MioTcpStream, + UnixListener as MioUnixListener, UnixStream as MioUnixStream, }; use mio::{Interest, Registry, Token}; @@ -20,15 +25,17 @@ use actix_rt::net::{TcpStream, UnixStream}; /// `actix_rt::net::{TcpStream, UnixStream}`. pub(crate) enum StdListener { - Tcp(net::TcpListener), + Tcp(StdTcpListener), #[cfg(all(unix))] - Uds(std::os::unix::net::UnixListener), + Uds(StdUnixListener), } pub(crate) enum SocketAddr { - Tcp(net::SocketAddr), + Tcp(StdTcpSocketAddr), #[cfg(all(unix))] - Uds(mio::net::SocketAddr), + Uds(StdUdsSocketAddr), + #[cfg(all(unix))] + UdsMio(MioSocketAddr), } impl fmt::Display for SocketAddr { @@ -37,6 +44,8 @@ impl fmt::Display for SocketAddr { SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), #[cfg(all(unix))] SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + #[cfg(all(unix))] + SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr), } } } @@ -47,6 +56,8 @@ impl fmt::Debug for SocketAddr { SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), #[cfg(all(unix))] SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + #[cfg(all(unix))] + SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr), } } } @@ -66,15 +77,11 @@ impl StdListener { match self { StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), #[cfg(all(unix))] - StdListener::Uds(_lst) => { - // FixMe: How to get a SocketAddr? - unimplemented!() - // SocketAddr::Uds(lst.local_addr().unwrap()) - } + StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()), } } - pub(crate) fn into_listener(self) -> std::io::Result { + pub(crate) fn into_mio_listener(self) -> std::io::Result { match self { StdListener::Tcp(lst) => { // ToDo: is this non_blocking a good practice? @@ -115,7 +122,7 @@ impl MioSocketListener { #[cfg(all(unix))] MioSocketListener::Uds(ref lst) => lst .accept() - .map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::Uds(addr)))), + .map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::UdsMio(addr)))), } } } diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 06a2155e..eafa1b81 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -22,7 +22,7 @@ impl WakerQueue { /// construct a waker queue with given `Poll`'s `Registry` and capacity. /// /// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match - /// event for it to properly handle `WakerInterest`. + /// event's token for it to properly handle `WakerInterest`. pub(crate) fn with_capacity(registry: &Registry, cap: usize) -> std::io::Result { let waker = Waker::new(registry, WAKER_TOKEN)?; let queue = ConcurrentQueue::bounded(cap); @@ -45,11 +45,11 @@ impl WakerQueue { } } -/// the types of interests we would look into when `Accept`'s `Poll` is waked up by waker. +/// types of interests we would look into when `Accept`'s `Poll` is waked up by waker. /// -/// *. These interests should not confused with `mio::Interest` and mostly not I/O related +/// *. These interests should not be confused with `mio::Interest` and mostly not I/O related pub(crate) enum WakerInterest { - /// Interest from `WorkerClient` notifying `Accept` to run `backpressure` method + /// Interest from `Worker` notifying `Accept` to run `backpressure` method Notify, /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to /// `ServerCommand` and notify `Accept` to do exactly these tasks. @@ -57,9 +57,12 @@ pub(crate) enum WakerInterest { Resume, Stop, /// `Timer` is an interest sent as a delayed future. When an error happens on accepting - /// connection the poll would deregister the socket temporary and wake up the poll and register - /// again after the delayed future resolve. + /// connection the poll would deregister sockets temporary and wake up the poll and register + /// them again after the delayed future resolve. Timer, + /// `Worker` ins an interest happen after a worker runs into faulted state(This is determined by + /// if work can be sent to it successfully).`Accept` would be waked up and add the new + /// `WorkerClient` to workers. Worker(WorkerClient), } diff --git a/actix-testing/Cargo.toml b/actix-testing/Cargo.toml index 430a12b6..957f59c3 100644 --- a/actix-testing/Cargo.toml +++ b/actix-testing/Cargo.toml @@ -18,7 +18,7 @@ name = "actix_testing" path = "src/lib.rs" [dependencies] -actix-rt = "1.0.0" +actix-rt = "1.1.1" actix-macros = "0.1.0" actix-server = "1.0.0" actix-service = "1.0.0"