mirror of https://github.com/fafhrd91/actix-net
remove socket dep.fix build for actix-tls
This commit is contained in:
parent
30d8971209
commit
f18f709ba7
|
@ -1,6 +1,8 @@
|
|||
# Changes
|
||||
|
||||
## Unreleased - 2020-xx-xx
|
||||
* Update `tokio` dependency to 0.3.1
|
||||
* Update `tokio-util` dependency to 0.4
|
||||
|
||||
## 0.3.0 - 2020-08-23
|
||||
* No changes from beta 2.
|
||||
|
|
|
@ -17,10 +17,12 @@ path = "src/lib.rs"
|
|||
|
||||
[dependencies]
|
||||
bitflags = "1.2.1"
|
||||
bytes = "0.5.2"
|
||||
|
||||
# ToDo: update to bytes 0.6 when tokio-util is bytes 0.6 compat
|
||||
bytes = "0.5"
|
||||
futures-core = { version = "0.3.4", default-features = false }
|
||||
futures-sink = { version = "0.3.4", default-features = false }
|
||||
log = "0.4"
|
||||
pin-project = "0.4.17"
|
||||
tokio = "0.3.0"
|
||||
tokio = "0.3.1"
|
||||
tokio-util = { version = "0.4.0", default-features = false, features = ["codec"] }
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
# Changes
|
||||
|
||||
## Unreleased - 2020-xx-xx
|
||||
|
||||
* Update `tokio-openssl` dependency to 0.5
|
||||
* Update `tokio-rustls` dependency to 0.20
|
||||
|
||||
## 2.0.0 - 2020-09-02
|
||||
- No significant changes from `2.0.0-alpha.4`.
|
||||
|
|
|
@ -54,5 +54,5 @@ tokio-rustls = { version = "0.20.0", optional = true }
|
|||
webpki = { version = "0.21", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
bytes = "0.5.3"
|
||||
bytes = "0.6"
|
||||
actix-testing = "1.0.0"
|
||||
|
|
|
@ -7,7 +7,8 @@
|
|||
* Add `System::attach_to_tokio` method. [#173]
|
||||
|
||||
### Changed
|
||||
* update tokio to 0.3
|
||||
* Update `tokio` dependency to 0.3.1
|
||||
* Rename `time` module `delay_for` to `sleep`, `delay_until` to `sleep_until`, `Delay` to `Sleep` to keep inline with tokio.
|
||||
* Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`. These methods would accept a &Self when calling.
|
||||
* Remove `'static` lifetime requirement for `System::run`
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@ futures-channel = { version = "0.3.4", default-features = false }
|
|||
futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] }
|
||||
copyless = "0.1.4"
|
||||
smallvec = "1"
|
||||
tokio = { version = "0.3.0", features = ["rt", "net", "signal", "stream", "time"] }
|
||||
tokio = { version = "0.3.1", features = ["rt", "net", "signal", "stream", "time"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { version = "0.3.0", features = ["full"] }
|
||||
tokio = { version = "0.3.1", features = ["full"] }
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
# Changes
|
||||
|
||||
## Unreleased - 2020-xx-xx
|
||||
* Update mio to 0.7.3
|
||||
* Update `mio` dependency to 0.7.3
|
||||
* `ServerBuilder::backlog` would accept `u32` instead of `i32`
|
||||
* Use `concurrent-queue` to manage poll wakes instead of `futures::channel::mpsc`.
|
||||
* Remove `AcceptNotify` type and pass `WakerQueue` to `WorkerClient` for notify the `Accept` more directly.
|
||||
* Convert `mio::Stream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`IntoRawSocket` and `FromRawSocket` on windows).
|
||||
|
|
|
@ -32,11 +32,10 @@ log = "0.4"
|
|||
mio = { version = "0.7.3", features = [ "os-poll", "tcp", "uds"] }
|
||||
num_cpus = "1.13"
|
||||
slab = "0.4"
|
||||
socket2 = "0.3"
|
||||
|
||||
[dev-dependencies]
|
||||
actix-testing = "1.0.0"
|
||||
bytes = "0.5"
|
||||
env_logger = "0.7"
|
||||
futures-util = { version = "0.3.4", default-features = false, features = ["sink"] }
|
||||
tokio = { version = "0.3.0", features = ["full"] }
|
||||
tokio = { version = "0.3.1", features = ["full"] }
|
||||
|
|
|
@ -18,6 +18,7 @@ use std::{env, io};
|
|||
use actix_rt::net::TcpStream;
|
||||
use actix_server::Server;
|
||||
use actix_service::pipeline_factory;
|
||||
use bytes::BytesMut;
|
||||
use futures_util::future::ok;
|
||||
use log::{error, info};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
|
@ -48,10 +49,10 @@ async fn main() -> io::Result<()> {
|
|||
let num = num + 1;
|
||||
|
||||
let mut size = 0;
|
||||
// FixMe: BytesMut and Vec are not working?
|
||||
let mut buf = [0; 1024];
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
loop {
|
||||
// ToDo: change to read_buf
|
||||
match stream.read(&mut buf).await {
|
||||
// end of stream; bail from loop
|
||||
Ok(0) => break,
|
||||
|
|
|
@ -8,7 +8,7 @@ use mio::{Interest, Poll, Token as MioToken};
|
|||
use slab::Slab;
|
||||
|
||||
use crate::server::Server;
|
||||
use crate::socket::{MioSocketListener, SocketAddr, StdListener};
|
||||
use crate::socket::{MioListener, SocketAddr};
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue, WakerQueueError, WAKER_TOKEN};
|
||||
use crate::worker::{Conn, WorkerHandle};
|
||||
use crate::Token;
|
||||
|
@ -19,7 +19,7 @@ struct ServerSocketInfo {
|
|||
// be ware this is the crate token for identify socket and should not be confused with
|
||||
// mio::Token
|
||||
token: Token,
|
||||
lst: MioSocketListener,
|
||||
lst: MioListener,
|
||||
// timeout is used to mark the deadline when this socket's listener should be registered again
|
||||
// after an error.
|
||||
timeout: Option<Instant>,
|
||||
|
@ -40,7 +40,8 @@ pub(crate) struct AcceptLoop {
|
|||
impl AcceptLoop {
|
||||
pub fn new(srv: Server) -> Self {
|
||||
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create mio::Poll: {}", e));
|
||||
let waker = WakerQueue::with_capacity(poll.registry(), 128).unwrap();
|
||||
let waker = WakerQueue::with_capacity(poll.registry(), 128)
|
||||
.unwrap_or_else(|e| panic!("Can not create mio::Waker: {}", e));
|
||||
|
||||
Self {
|
||||
srv: Some(srv),
|
||||
|
@ -59,7 +60,7 @@ impl AcceptLoop {
|
|||
|
||||
pub(crate) fn start(
|
||||
&mut self,
|
||||
socks: Vec<(Token, StdListener)>,
|
||||
socks: Vec<(Token, MioListener)>,
|
||||
workers: Vec<WorkerHandle>,
|
||||
) {
|
||||
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
|
||||
|
@ -99,7 +100,7 @@ impl Accept {
|
|||
pub(crate) fn start(
|
||||
poll: Poll,
|
||||
waker: WakerQueue,
|
||||
socks: Vec<(Token, StdListener)>,
|
||||
socks: Vec<(Token, MioListener)>,
|
||||
srv: Server,
|
||||
workers: Vec<WorkerHandle>,
|
||||
) {
|
||||
|
@ -120,29 +121,26 @@ impl Accept {
|
|||
fn new_with_sockets(
|
||||
poll: Poll,
|
||||
waker: WakerQueue,
|
||||
socks: Vec<(Token, StdListener)>,
|
||||
socks: Vec<(Token, MioListener)>,
|
||||
workers: Vec<WorkerHandle>,
|
||||
srv: Server,
|
||||
) -> (Accept, Slab<ServerSocketInfo>) {
|
||||
let mut sockets = Slab::new();
|
||||
for (hnd_token, lst) in socks.into_iter() {
|
||||
for (hnd_token, mut lst) in socks.into_iter() {
|
||||
let addr = lst.local_addr();
|
||||
|
||||
let mut sock = lst
|
||||
.into_mio_listener()
|
||||
.unwrap_or_else(|e| panic!("Can not set non_block on listener: {}", e));
|
||||
let entry = sockets.vacant_entry();
|
||||
let token = entry.key();
|
||||
|
||||
// Start listening for incoming connections
|
||||
poll.registry()
|
||||
.register(&mut sock, MioToken(token + DELTA), Interest::READABLE)
|
||||
.register(&mut lst, MioToken(token + DELTA), Interest::READABLE)
|
||||
.unwrap_or_else(|e| panic!("Can not register io: {}", e));
|
||||
|
||||
entry.insert(ServerSocketInfo {
|
||||
addr,
|
||||
token: hnd_token,
|
||||
lst: sock,
|
||||
lst,
|
||||
timeout: None,
|
||||
});
|
||||
}
|
||||
|
@ -175,17 +173,21 @@ impl Accept {
|
|||
// necessary/good practice to actively drain the waker queue.
|
||||
WAKER_TOKEN => 'waker: loop {
|
||||
match self.waker.pop() {
|
||||
// worker notify it's availability has change. we maybe want to enter
|
||||
// backpressure or recover from one.
|
||||
// worker notify it's availability has change. we maybe want to recover
|
||||
// from backpressure.
|
||||
Ok(WakerInterest::Notify) => {
|
||||
self.maybe_backpressure(&mut sockets, false);
|
||||
}
|
||||
Ok(WakerInterest::Pause) => {
|
||||
sockets.iter_mut().for_each(|(_, info)| {
|
||||
if let Err(err) = self.deregister(info) {
|
||||
error!("Can not deregister server socket {}", err);
|
||||
} else {
|
||||
info!("Paused accepting connections on {}", info.addr);
|
||||
match self.deregister(info) {
|
||||
Ok(_) => info!(
|
||||
"Paused accepting connections on {}",
|
||||
info.addr
|
||||
),
|
||||
Err(e) => {
|
||||
error!("Can not deregister server socket {}", e)
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -253,11 +255,7 @@ impl Accept {
|
|||
// Calling reregister seems to fix the issue.
|
||||
self.poll
|
||||
.registry()
|
||||
.register(
|
||||
&mut info.lst,
|
||||
mio::Token(token + DELTA),
|
||||
Interest::READABLE,
|
||||
)
|
||||
.register(&mut info.lst, mio::Token(token + DELTA), Interest::READABLE)
|
||||
.or_else(|_| {
|
||||
self.poll.registry().reregister(
|
||||
&mut info.lst,
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::future::Future;
|
|||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::Duration;
|
||||
use std::{io, mem, net};
|
||||
use std::{io, mem};
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_rt::time::{sleep_until, Instant};
|
||||
|
@ -13,14 +13,14 @@ use futures_util::future::ready;
|
|||
use futures_util::stream::FuturesUnordered;
|
||||
use futures_util::{ready, stream::Stream, FutureExt, StreamExt};
|
||||
use log::{error, info};
|
||||
use socket2::{Domain, Protocol, Socket, Type};
|
||||
|
||||
use crate::accept::AcceptLoop;
|
||||
use crate::config::{ConfiguredService, ServiceConfig};
|
||||
use crate::server::{Server, ServerCommand};
|
||||
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
|
||||
use crate::signals::{Signal, Signals};
|
||||
use crate::socket::StdListener;
|
||||
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
||||
use crate::socket::{MioTcpListener, MioTcpSocket};
|
||||
use crate::waker_queue::{WakerInterest, WakerQueue};
|
||||
use crate::worker::{self, Worker, WorkerAvailability, WorkerHandle};
|
||||
use crate::Token;
|
||||
|
@ -29,10 +29,10 @@ use crate::Token;
|
|||
pub struct ServerBuilder {
|
||||
threads: usize,
|
||||
token: Token,
|
||||
backlog: i32,
|
||||
backlog: u32,
|
||||
workers: Vec<(usize, WorkerHandle)>,
|
||||
services: Vec<Box<dyn InternalServiceFactory>>,
|
||||
sockets: Vec<(Token, String, StdListener)>,
|
||||
sockets: Vec<(Token, String, MioListener)>,
|
||||
accept: AcceptLoop,
|
||||
exit: bool,
|
||||
shutdown_timeout: Duration,
|
||||
|
@ -91,7 +91,7 @@ impl ServerBuilder {
|
|||
/// Generally set in the 64-2048 range. Default value is 2048.
|
||||
///
|
||||
/// This method should be called before `bind()` method call.
|
||||
pub fn backlog(mut self, num: i32) -> Self {
|
||||
pub fn backlog(mut self, num: u32) -> Self {
|
||||
self.backlog = num;
|
||||
self
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ impl ServerBuilder {
|
|||
for (name, lst) in cfg.services {
|
||||
let token = self.token.next();
|
||||
srv.stream(token, name.clone(), lst.local_addr()?);
|
||||
self.sockets.push((token, name, StdListener::Tcp(lst)));
|
||||
self.sockets.push((token, name, MioListener::Tcp(lst)));
|
||||
}
|
||||
self.services.push(Box::new(srv));
|
||||
}
|
||||
|
@ -162,7 +162,7 @@ impl ServerBuilder {
|
|||
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory<TcpStream>,
|
||||
U: net::ToSocketAddrs,
|
||||
U: ToSocketAddrs,
|
||||
{
|
||||
let sockets = bind_addr(addr, self.backlog)?;
|
||||
|
||||
|
@ -175,12 +175,12 @@ impl ServerBuilder {
|
|||
lst.local_addr()?,
|
||||
));
|
||||
self.sockets
|
||||
.push((token, name.as_ref().to_string(), StdListener::Tcp(lst)));
|
||||
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
|
||||
}
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
#[cfg(all(unix))]
|
||||
#[cfg(unix)]
|
||||
/// Add new unix domain service to the server.
|
||||
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
|
||||
where
|
||||
|
@ -188,8 +188,6 @@ impl ServerBuilder {
|
|||
N: AsRef<str>,
|
||||
U: AsRef<std::path::Path>,
|
||||
{
|
||||
use std::os::unix::net::UnixListener;
|
||||
|
||||
// The path must not exist when we try to bind.
|
||||
// Try to remove it to avoid bind error.
|
||||
if let Err(e) = std::fs::remove_file(addr.as_ref()) {
|
||||
|
@ -199,7 +197,7 @@ impl ServerBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
let lst = UnixListener::bind(addr)?;
|
||||
let lst = crate::socket::StdUnixListener::bind(addr)?;
|
||||
self.listen_uds(name, lst, factory)
|
||||
}
|
||||
|
||||
|
@ -210,15 +208,15 @@ impl ServerBuilder {
|
|||
pub fn listen_uds<F, N: AsRef<str>>(
|
||||
mut self,
|
||||
name: N,
|
||||
lst: std::os::unix::net::UnixListener,
|
||||
lst: crate::socket::StdUnixListener,
|
||||
factory: F,
|
||||
) -> io::Result<Self>
|
||||
where
|
||||
F: ServiceFactory<actix_rt::net::UnixStream>,
|
||||
{
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use std::net::{IpAddr, Ipv4Addr};
|
||||
let token = self.token.next();
|
||||
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
||||
let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
|
||||
self.services.push(StreamNewService::create(
|
||||
name.as_ref().to_string(),
|
||||
token,
|
||||
|
@ -226,7 +224,7 @@ impl ServerBuilder {
|
|||
addr,
|
||||
));
|
||||
self.sockets
|
||||
.push((token, name.as_ref().to_string(), StdListener::Uds(lst)));
|
||||
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
|
@ -234,7 +232,7 @@ impl ServerBuilder {
|
|||
pub fn listen<F, N: AsRef<str>>(
|
||||
mut self,
|
||||
name: N,
|
||||
lst: net::TcpListener,
|
||||
lst: StdTcpListener,
|
||||
factory: F,
|
||||
) -> io::Result<Self>
|
||||
where
|
||||
|
@ -247,8 +245,9 @@ impl ServerBuilder {
|
|||
factory,
|
||||
lst.local_addr()?,
|
||||
));
|
||||
|
||||
self.sockets
|
||||
.push((token, name.as_ref().to_string(), StdListener::Tcp(lst)));
|
||||
.push((token, name.as_ref().to_string(), MioListener::from(lst)));
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
|
@ -450,10 +449,10 @@ impl Future for ServerBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) fn bind_addr<S: net::ToSocketAddrs>(
|
||||
pub(super) fn bind_addr<S: ToSocketAddrs>(
|
||||
addr: S,
|
||||
backlog: i32,
|
||||
) -> io::Result<Vec<net::TcpListener>> {
|
||||
backlog: u32,
|
||||
) -> io::Result<Vec<MioTcpListener>> {
|
||||
let mut err = None;
|
||||
let mut succ = false;
|
||||
let mut sockets = Vec::new();
|
||||
|
@ -481,14 +480,13 @@ pub(super) fn bind_addr<S: net::ToSocketAddrs>(
|
|||
}
|
||||
}
|
||||
|
||||
fn create_tcp_listener(addr: net::SocketAddr, backlog: i32) -> io::Result<net::TcpListener> {
|
||||
let domain = match addr {
|
||||
net::SocketAddr::V4(_) => Domain::ipv4(),
|
||||
net::SocketAddr::V6(_) => Domain::ipv6(),
|
||||
fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result<MioTcpListener> {
|
||||
let socket = match addr {
|
||||
StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?,
|
||||
StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?,
|
||||
};
|
||||
let socket = Socket::new(domain, Type::stream(), Some(Protocol::tcp()))?;
|
||||
socket.set_reuse_address(true)?;
|
||||
socket.bind(&addr.into())?;
|
||||
socket.listen(backlog)?;
|
||||
Ok(socket.into_tcp_listener())
|
||||
|
||||
socket.set_reuseaddr(true)?;
|
||||
socket.bind(addr)?;
|
||||
socket.listen(backlog)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::{fmt, io, net};
|
||||
use std::{fmt, io};
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_service as actix;
|
||||
|
@ -12,18 +12,19 @@ use crate::builder::bind_addr;
|
|||
use crate::service::{
|
||||
BoxedServerService, InternalServiceFactory, ServerMessage, StreamService,
|
||||
};
|
||||
use crate::socket::{MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
|
||||
use crate::LocalBoxFuture;
|
||||
use crate::Token;
|
||||
|
||||
pub struct ServiceConfig {
|
||||
pub(crate) services: Vec<(String, net::TcpListener)>,
|
||||
pub(crate) services: Vec<(String, MioTcpListener)>,
|
||||
pub(crate) apply: Option<Box<dyn ServiceRuntimeConfiguration>>,
|
||||
pub(crate) threads: usize,
|
||||
pub(crate) backlog: i32,
|
||||
pub(crate) backlog: u32,
|
||||
}
|
||||
|
||||
impl ServiceConfig {
|
||||
pub(super) fn new(threads: usize, backlog: i32) -> ServiceConfig {
|
||||
pub(super) fn new(threads: usize, backlog: u32) -> ServiceConfig {
|
||||
ServiceConfig {
|
||||
threads,
|
||||
backlog,
|
||||
|
@ -43,19 +44,23 @@ impl ServiceConfig {
|
|||
/// Add new service to server
|
||||
pub fn bind<U, N: AsRef<str>>(&mut self, name: N, addr: U) -> io::Result<&mut Self>
|
||||
where
|
||||
U: net::ToSocketAddrs,
|
||||
U: ToSocketAddrs,
|
||||
{
|
||||
let sockets = bind_addr(addr, self.backlog)?;
|
||||
|
||||
for lst in sockets {
|
||||
self.listen(name.as_ref(), lst);
|
||||
self._listen(name.as_ref(), lst);
|
||||
}
|
||||
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Add new service to server
|
||||
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: net::TcpListener) -> &mut Self {
|
||||
pub fn listen<N: AsRef<str>>(&mut self, name: N, lst: StdTcpListener) -> &mut Self {
|
||||
self._listen(name, MioTcpListener::from_std(lst))
|
||||
}
|
||||
|
||||
fn _listen<N: AsRef<str>>(&mut self, name: N, lst: MioTcpListener) -> &mut Self {
|
||||
if self.apply.is_none() {
|
||||
self.apply = Some(Box::new(not_configured));
|
||||
}
|
||||
|
@ -76,7 +81,7 @@ impl ServiceConfig {
|
|||
|
||||
pub(super) struct ConfiguredService {
|
||||
rt: Box<dyn ServiceRuntimeConfiguration>,
|
||||
names: HashMap<Token, (String, net::SocketAddr)>,
|
||||
names: HashMap<Token, (String, StdSocketAddr)>,
|
||||
topics: HashMap<String, Token>,
|
||||
services: Vec<Token>,
|
||||
}
|
||||
|
@ -91,7 +96,7 @@ impl ConfiguredService {
|
|||
}
|
||||
}
|
||||
|
||||
pub(super) fn stream(&mut self, token: Token, name: String, addr: net::SocketAddr) {
|
||||
pub(super) fn stream(&mut self, token: Token, name: String, addr: StdSocketAddr) {
|
||||
self.names.insert(token, (name.clone(), addr));
|
||||
self.topics.insert(name, token);
|
||||
self.services.push(token);
|
||||
|
|
|
@ -1,143 +1,64 @@
|
|||
use std::net::{SocketAddr as StdTcpSocketAddr, TcpListener as StdTcpListener};
|
||||
pub(crate) use std::net::{
|
||||
SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs,
|
||||
};
|
||||
|
||||
pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket};
|
||||
|
||||
#[cfg(unix)]
|
||||
pub(crate) use {
|
||||
mio::net::UnixListener as MioUnixListener,
|
||||
std::os::unix::net::UnixListener as StdUnixListener,
|
||||
};
|
||||
|
||||
use std::{fmt, io};
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::{
|
||||
io::{FromRawFd, IntoRawFd},
|
||||
net::{SocketAddr as StdUdsSocketAddr, UnixListener as StdUnixListener},
|
||||
};
|
||||
use actix_rt::net::TcpStream;
|
||||
use mio::event::Source;
|
||||
use mio::net::TcpStream as MioTcpStream;
|
||||
use mio::{Interest, Registry, Token};
|
||||
|
||||
#[cfg(windows)]
|
||||
use std::os::windows::io::{FromRawSocket, IntoRawSocket};
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
#[cfg(unix)]
|
||||
use actix_rt::net::UnixStream;
|
||||
use mio::event::Source;
|
||||
#[cfg(unix)]
|
||||
use mio::net::{
|
||||
SocketAddr as MioSocketAddr, UnixListener as MioUnixListener, UnixStream as MioUnixStream,
|
||||
use {
|
||||
actix_rt::net::UnixStream,
|
||||
mio::net::{SocketAddr as MioSocketAddr, UnixStream as MioUnixStream},
|
||||
std::os::unix::io::{FromRawFd, IntoRawFd},
|
||||
};
|
||||
use mio::net::{TcpListener as MioTcpListener, TcpStream as MioTcpStream};
|
||||
use mio::{Interest, Registry, Token};
|
||||
|
||||
/// socket module contains a unified wrapper for Tcp/Uds listener/SocketAddr/Stream and necessary
|
||||
/// trait impl for registering the listener to mio::Poll and convert stream to
|
||||
/// `actix_rt::net::{TcpStream, UnixStream}`.
|
||||
|
||||
pub(crate) enum StdListener {
|
||||
Tcp(StdTcpListener),
|
||||
#[cfg(unix)]
|
||||
Uds(StdUnixListener),
|
||||
}
|
||||
|
||||
impl fmt::Debug for StdListener {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
StdListener::Tcp(ref lst) => write!(f, "{:?}", lst),
|
||||
#[cfg(all(unix))]
|
||||
StdListener::Uds(ref lst) => write!(f, "{:?}", lst),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for StdListener {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()),
|
||||
#[cfg(unix)]
|
||||
StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum SocketAddr {
|
||||
Tcp(StdTcpSocketAddr),
|
||||
#[cfg(unix)]
|
||||
Uds(StdUdsSocketAddr),
|
||||
// this is a work around. mio would return different types of SocketAddr between accept and
|
||||
// local_addr methods.
|
||||
#[cfg(unix)]
|
||||
UdsMio(MioSocketAddr),
|
||||
}
|
||||
|
||||
impl fmt::Display for SocketAddr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
|
||||
#[cfg(unix)]
|
||||
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
|
||||
#[cfg(unix)]
|
||||
SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SocketAddr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
|
||||
#[cfg(unix)]
|
||||
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
|
||||
#[cfg(unix)]
|
||||
SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StdListener {
|
||||
pub(crate) fn local_addr(&self) -> SocketAddr {
|
||||
match self {
|
||||
StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()),
|
||||
#[cfg(unix)]
|
||||
StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn into_mio_listener(self) -> std::io::Result<MioSocketListener> {
|
||||
match self {
|
||||
StdListener::Tcp(lst) => {
|
||||
// ToDo: is this non_blocking a good practice?
|
||||
lst.set_nonblocking(true)?;
|
||||
Ok(MioSocketListener::Tcp(MioTcpListener::from_std(lst)))
|
||||
}
|
||||
#[cfg(unix)]
|
||||
StdListener::Uds(lst) => {
|
||||
// ToDo: the same as above
|
||||
lst.set_nonblocking(true)?;
|
||||
Ok(MioSocketListener::Uds(MioUnixListener::from_std(lst)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MioStream {
|
||||
Tcp(MioTcpStream),
|
||||
#[cfg(unix)]
|
||||
Uds(MioUnixStream),
|
||||
}
|
||||
|
||||
pub(crate) enum MioSocketListener {
|
||||
pub(crate) enum MioListener {
|
||||
Tcp(MioTcpListener),
|
||||
#[cfg(unix)]
|
||||
Uds(MioUnixListener),
|
||||
}
|
||||
|
||||
impl MioSocketListener {
|
||||
impl MioListener {
|
||||
pub(crate) fn local_addr(&self) -> SocketAddr {
|
||||
match *self {
|
||||
MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()),
|
||||
#[cfg(unix)]
|
||||
MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn accept(&self) -> io::Result<Option<(MioStream, SocketAddr)>> {
|
||||
match *self {
|
||||
MioSocketListener::Tcp(ref lst) => lst
|
||||
MioListener::Tcp(ref lst) => lst
|
||||
.accept()
|
||||
.map(|(stream, addr)| Some((MioStream::Tcp(stream), SocketAddr::Tcp(addr)))),
|
||||
#[cfg(unix)]
|
||||
MioSocketListener::Uds(ref lst) => lst
|
||||
MioListener::Uds(ref lst) => lst
|
||||
.accept()
|
||||
.map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::UdsMio(addr)))),
|
||||
.map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::Uds(addr)))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Source for MioSocketListener {
|
||||
impl Source for MioListener {
|
||||
fn register(
|
||||
&mut self,
|
||||
registry: &Registry,
|
||||
|
@ -145,9 +66,9 @@ impl Source for MioSocketListener {
|
|||
interests: Interest,
|
||||
) -> io::Result<()> {
|
||||
match *self {
|
||||
MioSocketListener::Tcp(ref mut lst) => lst.register(registry, token, interests),
|
||||
MioListener::Tcp(ref mut lst) => lst.register(registry, token, interests),
|
||||
#[cfg(unix)]
|
||||
MioSocketListener::Uds(ref mut lst) => lst.register(registry, token, interests),
|
||||
MioListener::Uds(ref mut lst) => lst.register(registry, token, interests),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -158,17 +79,17 @@ impl Source for MioSocketListener {
|
|||
interests: Interest,
|
||||
) -> io::Result<()> {
|
||||
match *self {
|
||||
MioSocketListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests),
|
||||
MioListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests),
|
||||
#[cfg(unix)]
|
||||
MioSocketListener::Uds(ref mut lst) => lst.reregister(registry, token, interests),
|
||||
MioListener::Uds(ref mut lst) => lst.reregister(registry, token, interests),
|
||||
}
|
||||
}
|
||||
|
||||
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
|
||||
match *self {
|
||||
MioSocketListener::Tcp(ref mut lst) => lst.deregister(registry),
|
||||
MioListener::Tcp(ref mut lst) => lst.deregister(registry),
|
||||
#[cfg(unix)]
|
||||
MioSocketListener::Uds(ref mut lst) => {
|
||||
MioListener::Uds(ref mut lst) => {
|
||||
let res = lst.deregister(registry);
|
||||
|
||||
// cleanup file path
|
||||
|
@ -183,6 +104,72 @@ impl Source for MioSocketListener {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<StdTcpListener> for MioListener {
|
||||
fn from(lst: StdTcpListener) -> Self {
|
||||
MioListener::Tcp(MioTcpListener::from_std(lst))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl From<StdUnixListener> for MioListener {
|
||||
fn from(lst: StdUnixListener) -> Self {
|
||||
MioListener::Uds(MioUnixListener::from_std(lst))
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for MioListener {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
MioListener::Tcp(ref lst) => write!(f, "{:?}", lst),
|
||||
#[cfg(all(unix))]
|
||||
MioListener::Uds(ref lst) => write!(f, "{:?}", lst),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for MioListener {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()),
|
||||
#[cfg(unix)]
|
||||
MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum SocketAddr {
|
||||
Tcp(StdSocketAddr),
|
||||
#[cfg(unix)]
|
||||
Uds(MioSocketAddr),
|
||||
}
|
||||
|
||||
impl fmt::Display for SocketAddr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
SocketAddr::Tcp(ref addr) => write!(f, "{}", addr),
|
||||
#[cfg(unix)]
|
||||
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for SocketAddr {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match *self {
|
||||
SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr),
|
||||
#[cfg(unix)]
|
||||
SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum MioStream {
|
||||
Tcp(MioTcpStream),
|
||||
#[cfg(unix)]
|
||||
Uds(MioUnixStream),
|
||||
}
|
||||
|
||||
/// helper trait for converting mio stream to tokio stream.
|
||||
pub trait FromStream: Sized {
|
||||
fn from_mio(sock: MioStream) -> io::Result<Self>;
|
||||
|
@ -243,18 +230,16 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn socket_addr() {
|
||||
use socket2::{Domain, SockAddr, Socket, Type};
|
||||
|
||||
let addr = SocketAddr::Tcp("127.0.0.1:8080".parse().unwrap());
|
||||
assert!(format!("{:?}", addr).contains("127.0.0.1:8080"));
|
||||
assert_eq!(format!("{}", addr), "127.0.0.1:8080");
|
||||
|
||||
let addr: StdTcpSocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||
let socket = Socket::new(Domain::ipv4(), Type::stream(), None).unwrap();
|
||||
socket.set_reuse_address(true).unwrap();
|
||||
socket.bind(&SockAddr::from(addr)).unwrap();
|
||||
let tcp = socket.into_tcp_listener();
|
||||
let lst = StdListener::Tcp(tcp);
|
||||
let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||
let socket = mio::net::TcpSocket::new_v4().unwrap();
|
||||
socket.set_reuseaddr(true).unwrap();
|
||||
socket.bind(addr).unwrap();
|
||||
let tcp = socket.listen(128).unwrap();
|
||||
let lst = MioListener::Tcp(tcp);
|
||||
assert!(format!("{:?}", lst).contains("TcpListener"));
|
||||
assert!(format!("{}", lst).contains("127.0.0.1"));
|
||||
}
|
||||
|
@ -263,13 +248,13 @@ mod tests {
|
|||
#[cfg(unix)]
|
||||
fn uds() {
|
||||
let _ = std::fs::remove_file("/tmp/sock.xxxxx");
|
||||
if let Ok(socket) = StdUnixListener::bind("/tmp/sock.xxxxx") {
|
||||
if let Ok(socket) = MioUnixListener::bind("/tmp/sock.xxxxx") {
|
||||
let addr = socket.local_addr().expect("Couldn't get local address");
|
||||
let a = SocketAddr::Uds(addr);
|
||||
assert!(format!("{:?}", a).contains("/tmp/sock.xxxxx"));
|
||||
assert!(format!("{}", a).contains("/tmp/sock.xxxxx"));
|
||||
|
||||
let lst = StdListener::Uds(socket);
|
||||
let lst = MioListener::Uds(socket);
|
||||
assert!(format!("{:?}", lst).contains("/tmp/sock.xxxxx"));
|
||||
assert!(format!("{}", lst).contains("/tmp/sock.xxxxx"));
|
||||
}
|
||||
|
|
|
@ -199,8 +199,7 @@ impl Worker {
|
|||
.collect::<Vec<_>>();
|
||||
|
||||
spawn(async move {
|
||||
let res = join_all(fut).await;
|
||||
let res: Result<Vec<_>, _> = res.into_iter().collect();
|
||||
let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect();
|
||||
match res {
|
||||
Ok(services) => {
|
||||
for item in services {
|
||||
|
|
|
@ -5,14 +5,13 @@ use std::{net, thread, time};
|
|||
use actix_server::Server;
|
||||
use actix_service::fn_service;
|
||||
use futures_util::future::{lazy, ok};
|
||||
use socket2::{Domain, Protocol, Socket, Type};
|
||||
|
||||
fn unused_addr() -> net::SocketAddr {
|
||||
let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
|
||||
let socket = Socket::new(Domain::ipv4(), Type::stream(), Some(Protocol::tcp())).unwrap();
|
||||
socket.bind(&addr.into()).unwrap();
|
||||
socket.set_reuse_address(true).unwrap();
|
||||
let tcp = socket.into_tcp_listener();
|
||||
let socket = mio::net::TcpSocket::new_v4().unwrap();
|
||||
socket.bind(addr).unwrap();
|
||||
socket.set_reuseaddr(true).unwrap();
|
||||
let tcp = socket.listen(32).unwrap();
|
||||
tcp.local_addr().unwrap()
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## Unreleased - 2020-xx-xx
|
||||
|
||||
* move from `tokio-tls` to `tokio-native-tls` for native-tls feature.
|
||||
* Update `tokio-openssl` dependency to 0.5.0
|
||||
* Update `tokio-rustls` dependency to 0.20.0
|
||||
|
||||
## 2.0.0 - 2020-09-03
|
||||
* `nativetls::NativeTlsAcceptor` is renamed to `nativetls::Acceptor`.
|
||||
|
|
|
@ -32,7 +32,7 @@ openssl = ["open-ssl", "tokio-openssl"]
|
|||
rustls = ["rust-tls", "webpki", "webpki-roots", "tokio-rustls"]
|
||||
|
||||
# nativetls
|
||||
nativetls = ["native-tls", "tokio-tls"]
|
||||
nativetls = ["native-tls", "tokio-native-tls"]
|
||||
|
||||
[dependencies]
|
||||
actix-service = "1.0.0"
|
||||
|
@ -43,20 +43,20 @@ futures-util = { version = "0.3.4", default-features = false }
|
|||
|
||||
# openssl
|
||||
open-ssl = { package = "openssl", version = "0.10", optional = true }
|
||||
tokio-openssl = { version = "0.4.0", optional = true }
|
||||
tokio-openssl = { version = "0.5.0", optional = true }
|
||||
|
||||
# rustls
|
||||
rust-tls = { package = "rustls", version = "0.18.0", optional = true }
|
||||
webpki = { version = "0.21", optional = true }
|
||||
webpki-roots = { version = "0.20", optional = true }
|
||||
tokio-rustls = { version = "0.14.0", optional = true }
|
||||
tokio-rustls = { version = "0.20.0", optional = true }
|
||||
|
||||
# native-tls
|
||||
native-tls = { version = "0.2", optional = true }
|
||||
tokio-tls = { version = "0.3", optional = true }
|
||||
tokio-native-tls = { version = "0.2.0", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
bytes = "0.5"
|
||||
bytes = "0.6"
|
||||
log = "0.4"
|
||||
env_logger = "0.7"
|
||||
actix-testing = "1.0.0"
|
||||
|
|
|
@ -7,7 +7,7 @@ use actix_utils::counter::Counter;
|
|||
use futures_util::future::{self, FutureExt, LocalBoxFuture, TryFutureExt};
|
||||
|
||||
pub use native_tls::Error;
|
||||
pub use tokio_tls::{TlsAcceptor, TlsStream};
|
||||
pub use tokio_native_tls::{TlsAcceptor, TlsStream};
|
||||
|
||||
use crate::MAX_CONN_COUNTER;
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
# Changes
|
||||
|
||||
## Unreleased - 2020-xx-xx
|
||||
* Update `bytes` to 0.6
|
||||
|
||||
## 2.0.0 - 2020-08-23
|
||||
* No changes from beta 1.
|
||||
|
|
|
@ -20,7 +20,7 @@ actix-codec = "0.3.0"
|
|||
actix-rt = "1.1.1"
|
||||
actix-service = "1.0.6"
|
||||
bitflags = "1.2.1"
|
||||
bytes = "0.5.3"
|
||||
bytes = "0.6"
|
||||
either = "1.5.3"
|
||||
futures-channel = { version = "0.3.4", default-features = false }
|
||||
futures-sink = { version = "0.3.4", default-features = false }
|
||||
|
|
|
@ -15,7 +15,7 @@ name = "bytestring"
|
|||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
bytes = "0.5.3"
|
||||
bytes = "0.6"
|
||||
serde = { version = "1.0", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
Loading…
Reference in New Issue