work around socket addr with additional enum variant

This commit is contained in:
fakeshadow 2020-10-19 15:20:01 +08:00
parent 6ee906335a
commit 869175e1c9
8 changed files with 46 additions and 36 deletions

View File

@ -7,7 +7,7 @@ use futures_core::{ready, Stream};
use futures_sink::Sink; use futures_sink::Sink;
use pin_project::pin_project; use pin_project::pin_project;
use crate::{AsyncRead, AsyncWrite, Decoder, Encoder}; use crate::{AsyncRead, AsyncWrite, Decoder, Encoder, ReadBuf};
/// Low-water mark /// Low-water mark
const LW: usize = 1024; const LW: usize = 1024;
@ -221,13 +221,13 @@ impl<T, U> Framed<T, U> {
this.read_buf.reserve(HW - remaining) this.read_buf.reserve(HW - remaining)
} }
// FixMe: This must be fixed as `poll_read_buf` is removed // FixMe: Is this the right way to do it for now?
// let cnt = match this.io.poll_read(cx, &mut this.read_buf) { let mut buf = ReadBuf::new(&mut this.read_buf);
// Poll::Pending => return Poll::Pending, let cnt = match this.io.poll_read(cx, &mut buf) {
// Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))), Poll::Pending => return Poll::Pending,
// Poll::Ready(Ok(cnt)) => cnt, Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e.into()))),
// }; Poll::Ready(Ok(())) => buf.filled().len(),
let cnt = 0; };
if cnt == 0 { if cnt == 0 {
this.flags.insert(Flags::EOF); this.flags.insert(Flags::EOF);

View File

@ -18,5 +18,5 @@ mod framed;
pub use self::bcodec::BytesCodec; pub use self::bcodec::BytesCodec;
pub use self::framed::{Framed, FramedParts}; pub use self::framed::{Framed, FramedParts};
pub use tokio::io::{AsyncRead, AsyncWrite}; pub use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
pub use tokio_util::codec::{Decoder, Encoder}; pub use tokio_util::codec::{Decoder, Encoder};

View File

@ -50,7 +50,7 @@ tokio-openssl = { version = "0.5.0", optional = true }
# rustls # rustls
rust-tls = { package = "rustls", version = "0.18.0", optional = true } rust-tls = { package = "rustls", version = "0.18.0", optional = true }
tokio-rustls = { version = "0.14.0", optional = true } tokio-rustls = { version = "0.20.0", optional = true }
webpki = { version = "0.21", optional = true } webpki = { version = "0.21", optional = true }
[dev-dependencies] [dev-dependencies]

View File

@ -18,10 +18,9 @@ use std::{env, io};
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::Server;
use actix_service::pipeline_factory; use actix_service::pipeline_factory;
use bytes::BytesMut;
use futures_util::future::ok; use futures_util::future::ok;
use log::{error, info}; use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadBuf}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[actix_rt::main] #[actix_rt::main]
async fn main() -> io::Result<()> { async fn main() -> io::Result<()> {
@ -49,7 +48,8 @@ async fn main() -> io::Result<()> {
let num = num + 1; let num = num + 1;
let mut size = 0; let mut size = 0;
let mut buf = Vec::new(); // FixMe: BytesMut and Vec are not working?
let mut buf = [0; 1024];
loop { loop {
match stream.read(&mut buf).await { match stream.read(&mut buf).await {

View File

@ -130,7 +130,7 @@ impl Accept {
let addr = lst.local_addr(); let addr = lst.local_addr();
let mut server = lst let mut server = lst
.into_listener() .into_mio_listener()
.unwrap_or_else(|e| panic!("Can not set non_block on listener: {}", e)); .unwrap_or_else(|e| panic!("Can not set non_block on listener: {}", e));
let entry = sockets.vacant_entry(); let entry = sockets.vacant_entry();
let token = entry.key(); let token = entry.key();

View File

@ -1,14 +1,19 @@
use std::{fmt, io, net}; use std::net::{SocketAddr as StdTcpSocketAddr, TcpListener as StdTcpListener};
use std::{fmt, io};
#[cfg(unix)] #[cfg(unix)]
use std::os::unix::io::{FromRawFd, IntoRawFd}; use std::os::unix::{
io::{FromRawFd, IntoRawFd},
net::{SocketAddr as StdUdsSocketAddr, UnixListener as StdUnixListener},
};
#[cfg(windows)] #[cfg(windows)]
use std::os::windows::io::{FromRawSocket, IntoRawSocket}; use std::os::windows::io::{FromRawSocket, IntoRawSocket};
use mio::event::Source; use mio::event::Source;
use mio::net::{ use mio::net::{
TcpListener as MioTcpListener, TcpStream as MioTcpStream, UnixListener as MioUnixListener, SocketAddr as MioSocketAddr, TcpListener as MioTcpListener, TcpStream as MioTcpStream,
UnixStream as MioUnixStream, UnixListener as MioUnixListener, UnixStream as MioUnixStream,
}; };
use mio::{Interest, Registry, Token}; use mio::{Interest, Registry, Token};
@ -20,15 +25,17 @@ use actix_rt::net::{TcpStream, UnixStream};
/// `actix_rt::net::{TcpStream, UnixStream}`. /// `actix_rt::net::{TcpStream, UnixStream}`.
pub(crate) enum StdListener { pub(crate) enum StdListener {
Tcp(net::TcpListener), Tcp(StdTcpListener),
#[cfg(all(unix))] #[cfg(all(unix))]
Uds(std::os::unix::net::UnixListener), Uds(StdUnixListener),
} }
pub(crate) enum SocketAddr { pub(crate) enum SocketAddr {
Tcp(net::SocketAddr), Tcp(StdTcpSocketAddr),
#[cfg(all(unix))] #[cfg(all(unix))]
Uds(mio::net::SocketAddr), Uds(StdUdsSocketAddr),
#[cfg(all(unix))]
UdsMio(MioSocketAddr),
} }
impl fmt::Display for SocketAddr { impl fmt::Display for SocketAddr {
@ -37,6 +44,8 @@ impl fmt::Display for SocketAddr {
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
#[cfg(all(unix))] #[cfg(all(unix))]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
#[cfg(all(unix))]
SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr),
} }
} }
} }
@ -47,6 +56,8 @@ impl fmt::Debug for SocketAddr {
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
#[cfg(all(unix))] #[cfg(all(unix))]
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
#[cfg(all(unix))]
SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr),
} }
} }
} }
@ -66,15 +77,11 @@ impl StdListener {
match self { match self {
StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()),
#[cfg(all(unix))] #[cfg(all(unix))]
StdListener::Uds(_lst) => { StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()),
// FixMe: How to get a SocketAddr?
unimplemented!()
// SocketAddr::Uds(lst.local_addr().unwrap())
}
} }
} }
pub(crate) fn into_listener(self) -> std::io::Result<MioSocketListener> { pub(crate) fn into_mio_listener(self) -> std::io::Result<MioSocketListener> {
match self { match self {
StdListener::Tcp(lst) => { StdListener::Tcp(lst) => {
// ToDo: is this non_blocking a good practice? // ToDo: is this non_blocking a good practice?
@ -115,7 +122,7 @@ impl MioSocketListener {
#[cfg(all(unix))] #[cfg(all(unix))]
MioSocketListener::Uds(ref lst) => lst MioSocketListener::Uds(ref lst) => lst
.accept() .accept()
.map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::Uds(addr)))), .map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::UdsMio(addr)))),
} }
} }
} }

View File

@ -22,7 +22,7 @@ impl WakerQueue {
/// construct a waker queue with given `Poll`'s `Registry` and capacity. /// construct a waker queue with given `Poll`'s `Registry` and capacity.
/// ///
/// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match /// A fixed `WAKER_TOKEN` is used to identify the wake interest and the `Poll` needs to match
/// event for it to properly handle `WakerInterest`. /// event's token for it to properly handle `WakerInterest`.
pub(crate) fn with_capacity(registry: &Registry, cap: usize) -> std::io::Result<Self> { pub(crate) fn with_capacity(registry: &Registry, cap: usize) -> std::io::Result<Self> {
let waker = Waker::new(registry, WAKER_TOKEN)?; let waker = Waker::new(registry, WAKER_TOKEN)?;
let queue = ConcurrentQueue::bounded(cap); let queue = ConcurrentQueue::bounded(cap);
@ -45,11 +45,11 @@ impl WakerQueue {
} }
} }
/// the types of interests we would look into when `Accept`'s `Poll` is waked up by waker. /// types of interests we would look into when `Accept`'s `Poll` is waked up by waker.
/// ///
/// *. These interests should not confused with `mio::Interest` and mostly not I/O related /// *. These interests should not be confused with `mio::Interest` and mostly not I/O related
pub(crate) enum WakerInterest { pub(crate) enum WakerInterest {
/// Interest from `WorkerClient` notifying `Accept` to run `backpressure` method /// Interest from `Worker` notifying `Accept` to run `backpressure` method
Notify, Notify,
/// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to /// `Pause`, `Resume`, `Stop` Interest are from `ServerBuilder` future. It listens to
/// `ServerCommand` and notify `Accept` to do exactly these tasks. /// `ServerCommand` and notify `Accept` to do exactly these tasks.
@ -57,9 +57,12 @@ pub(crate) enum WakerInterest {
Resume, Resume,
Stop, Stop,
/// `Timer` is an interest sent as a delayed future. When an error happens on accepting /// `Timer` is an interest sent as a delayed future. When an error happens on accepting
/// connection the poll would deregister the socket temporary and wake up the poll and register /// connection the poll would deregister sockets temporary and wake up the poll and register
/// again after the delayed future resolve. /// them again after the delayed future resolve.
Timer, Timer,
/// `Worker` ins an interest happen after a worker runs into faulted state(This is determined by
/// if work can be sent to it successfully).`Accept` would be waked up and add the new
/// `WorkerClient` to workers.
Worker(WorkerClient), Worker(WorkerClient),
} }

View File

@ -18,7 +18,7 @@ name = "actix_testing"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-rt = "1.0.0" actix-rt = "1.1.1"
actix-macros = "0.1.0" actix-macros = "0.1.0"
actix-server = "1.0.0" actix-server = "1.0.0"
actix-service = "1.0.0" actix-service = "1.0.0"