diff --git a/actix-codec/CHANGES.md b/actix-codec/CHANGES.md index c8b7fbe3..6aa05bcf 100644 --- a/actix-codec/CHANGES.md +++ b/actix-codec/CHANGES.md @@ -1,6 +1,8 @@ # Changes ## Unreleased - 2020-xx-xx +* Update `tokio` dependency to 0.3.1 +* Update `tokio-util` dependency to 0.4 ## 0.3.0 - 2020-08-23 * No changes from beta 2. diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index f6407de2..7280a43c 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -17,10 +17,12 @@ path = "src/lib.rs" [dependencies] bitflags = "1.2.1" -bytes = "0.5.2" + +# ToDo: update to bytes 0.6 when tokio-util is bytes 0.6 compat +bytes = "0.5" futures-core = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false } log = "0.4" pin-project = "0.4.17" -tokio = "0.3.0" +tokio = "0.3.1" tokio-util = { version = "0.4.0", default-features = false, features = ["codec"] } diff --git a/actix-connect/CHANGES.md b/actix-connect/CHANGES.md index 6f18b5b4..f9a78289 100644 --- a/actix-connect/CHANGES.md +++ b/actix-connect/CHANGES.md @@ -1,7 +1,8 @@ # Changes ## Unreleased - 2020-xx-xx - +* Update `tokio-openssl` dependency to 0.5 +* Update `tokio-rustls` dependency to 0.20 ## 2.0.0 - 2020-09-02 - No significant changes from `2.0.0-alpha.4`. diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 425e3605..b60ceeb9 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -54,5 +54,5 @@ tokio-rustls = { version = "0.20.0", optional = true } webpki = { version = "0.21", optional = true } [dev-dependencies] -bytes = "0.5.3" +bytes = "0.6" actix-testing = "1.0.0" diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 232a85d1..8fd1a32c 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -7,7 +7,8 @@ * Add `System::attach_to_tokio` method. [#173] ### Changed -* update tokio to 0.3 +* Update `tokio` dependency to 0.3.1 +* Rename `time` module `delay_for` to `sleep`, `delay_until` to `sleep_until`, `Delay` to `Sleep` to keep inline with tokio. * Remove `'static` lifetime requirement for `Runtime::block_on` and `SystemRunner::block_on`. These methods would accept a &Self when calling. * Remove `'static` lifetime requirement for `System::run` diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index 4eced1a4..75a12590 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -22,7 +22,7 @@ futures-channel = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false, features = ["alloc"] } copyless = "0.1.4" smallvec = "1" -tokio = { version = "0.3.0", features = ["rt", "net", "signal", "stream", "time"] } +tokio = { version = "0.3.1", features = ["rt", "net", "signal", "stream", "time"] } [dev-dependencies] -tokio = { version = "0.3.0", features = ["full"] } +tokio = { version = "0.3.1", features = ["full"] } diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index c843736d..6d5c2b73 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,7 +1,8 @@ # Changes ## Unreleased - 2020-xx-xx -* Update mio to 0.7.3 +* Update `mio` dependency to 0.7.3 +* `ServerBuilder::backlog` would accept `u32` instead of `i32` * Use `concurrent-queue` to manage poll wakes instead of `futures::channel::mpsc`. * Remove `AcceptNotify` type and pass `WakerQueue` to `WorkerClient` for notify the `Accept` more directly. * Convert `mio::Stream` to `actix_rt::net::TcpStream`(`UnixStream` for uds) using `FromRawFd` and `IntoRawFd`(`IntoRawSocket` and `FromRawSocket` on windows). diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index cf5e87eb..e2e74893 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -32,11 +32,10 @@ log = "0.4" mio = { version = "0.7.3", features = [ "os-poll", "tcp", "uds"] } num_cpus = "1.13" slab = "0.4" -socket2 = "0.3" [dev-dependencies] actix-testing = "1.0.0" bytes = "0.5" env_logger = "0.7" futures-util = { version = "0.3.4", default-features = false, features = ["sink"] } -tokio = { version = "0.3.0", features = ["full"] } +tokio = { version = "0.3.1", features = ["full"] } diff --git a/actix-server/examples/basic.rs b/actix-server/examples/basic.rs index b60a6a05..3f4d87cf 100644 --- a/actix-server/examples/basic.rs +++ b/actix-server/examples/basic.rs @@ -18,6 +18,7 @@ use std::{env, io}; use actix_rt::net::TcpStream; use actix_server::Server; use actix_service::pipeline_factory; +use bytes::BytesMut; use futures_util::future::ok; use log::{error, info}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -48,10 +49,10 @@ async fn main() -> io::Result<()> { let num = num + 1; let mut size = 0; - // FixMe: BytesMut and Vec are not working? - let mut buf = [0; 1024]; + let mut buf = BytesMut::new(); loop { + // ToDo: change to read_buf match stream.read(&mut buf).await { // end of stream; bail from loop Ok(0) => break, diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 690950df..0ee8c128 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -8,7 +8,7 @@ use mio::{Interest, Poll, Token as MioToken}; use slab::Slab; use crate::server::Server; -use crate::socket::{MioSocketListener, SocketAddr, StdListener}; +use crate::socket::{MioListener, SocketAddr}; use crate::waker_queue::{WakerInterest, WakerQueue, WakerQueueError, WAKER_TOKEN}; use crate::worker::{Conn, WorkerHandle}; use crate::Token; @@ -19,7 +19,7 @@ struct ServerSocketInfo { // be ware this is the crate token for identify socket and should not be confused with // mio::Token token: Token, - lst: MioSocketListener, + lst: MioListener, // timeout is used to mark the deadline when this socket's listener should be registered again // after an error. timeout: Option, @@ -40,7 +40,8 @@ pub(crate) struct AcceptLoop { impl AcceptLoop { pub fn new(srv: Server) -> Self { let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create mio::Poll: {}", e)); - let waker = WakerQueue::with_capacity(poll.registry(), 128).unwrap(); + let waker = WakerQueue::with_capacity(poll.registry(), 128) + .unwrap_or_else(|e| panic!("Can not create mio::Waker: {}", e)); Self { srv: Some(srv), @@ -59,7 +60,7 @@ impl AcceptLoop { pub(crate) fn start( &mut self, - socks: Vec<(Token, StdListener)>, + socks: Vec<(Token, MioListener)>, workers: Vec, ) { let srv = self.srv.take().expect("Can not re-use AcceptInfo"); @@ -99,7 +100,7 @@ impl Accept { pub(crate) fn start( poll: Poll, waker: WakerQueue, - socks: Vec<(Token, StdListener)>, + socks: Vec<(Token, MioListener)>, srv: Server, workers: Vec, ) { @@ -120,29 +121,26 @@ impl Accept { fn new_with_sockets( poll: Poll, waker: WakerQueue, - socks: Vec<(Token, StdListener)>, + socks: Vec<(Token, MioListener)>, workers: Vec, srv: Server, ) -> (Accept, Slab) { let mut sockets = Slab::new(); - for (hnd_token, lst) in socks.into_iter() { + for (hnd_token, mut lst) in socks.into_iter() { let addr = lst.local_addr(); - let mut sock = lst - .into_mio_listener() - .unwrap_or_else(|e| panic!("Can not set non_block on listener: {}", e)); let entry = sockets.vacant_entry(); let token = entry.key(); // Start listening for incoming connections poll.registry() - .register(&mut sock, MioToken(token + DELTA), Interest::READABLE) + .register(&mut lst, MioToken(token + DELTA), Interest::READABLE) .unwrap_or_else(|e| panic!("Can not register io: {}", e)); entry.insert(ServerSocketInfo { addr, token: hnd_token, - lst: sock, + lst, timeout: None, }); } @@ -175,17 +173,21 @@ impl Accept { // necessary/good practice to actively drain the waker queue. WAKER_TOKEN => 'waker: loop { match self.waker.pop() { - // worker notify it's availability has change. we maybe want to enter - // backpressure or recover from one. + // worker notify it's availability has change. we maybe want to recover + // from backpressure. Ok(WakerInterest::Notify) => { self.maybe_backpressure(&mut sockets, false); } Ok(WakerInterest::Pause) => { sockets.iter_mut().for_each(|(_, info)| { - if let Err(err) = self.deregister(info) { - error!("Can not deregister server socket {}", err); - } else { - info!("Paused accepting connections on {}", info.addr); + match self.deregister(info) { + Ok(_) => info!( + "Paused accepting connections on {}", + info.addr + ), + Err(e) => { + error!("Can not deregister server socket {}", e) + } } }); } @@ -253,11 +255,7 @@ impl Accept { // Calling reregister seems to fix the issue. self.poll .registry() - .register( - &mut info.lst, - mio::Token(token + DELTA), - Interest::READABLE, - ) + .register(&mut info.lst, mio::Token(token + DELTA), Interest::READABLE) .or_else(|_| { self.poll.registry().reregister( &mut info.lst, diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index f2f9aebb..ea7a4046 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use std::{io, mem, net}; +use std::{io, mem}; use actix_rt::net::TcpStream; use actix_rt::time::{sleep_until, Instant}; @@ -13,14 +13,14 @@ use futures_util::future::ready; use futures_util::stream::FuturesUnordered; use futures_util::{ready, stream::Stream, FutureExt, StreamExt}; use log::{error, info}; -use socket2::{Domain, Protocol, Socket, Type}; use crate::accept::AcceptLoop; use crate::config::{ConfiguredService, ServiceConfig}; use crate::server::{Server, ServerCommand}; use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; use crate::signals::{Signal, Signals}; -use crate::socket::StdListener; +use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; +use crate::socket::{MioTcpListener, MioTcpSocket}; use crate::waker_queue::{WakerInterest, WakerQueue}; use crate::worker::{self, Worker, WorkerAvailability, WorkerHandle}; use crate::Token; @@ -29,10 +29,10 @@ use crate::Token; pub struct ServerBuilder { threads: usize, token: Token, - backlog: i32, + backlog: u32, workers: Vec<(usize, WorkerHandle)>, services: Vec>, - sockets: Vec<(Token, String, StdListener)>, + sockets: Vec<(Token, String, MioListener)>, accept: AcceptLoop, exit: bool, shutdown_timeout: Duration, @@ -91,7 +91,7 @@ impl ServerBuilder { /// Generally set in the 64-2048 range. Default value is 2048. /// /// This method should be called before `bind()` method call. - pub fn backlog(mut self, num: i32) -> Self { + pub fn backlog(mut self, num: u32) -> Self { self.backlog = num; self } @@ -149,7 +149,7 @@ impl ServerBuilder { for (name, lst) in cfg.services { let token = self.token.next(); srv.stream(token, name.clone(), lst.local_addr()?); - self.sockets.push((token, name, StdListener::Tcp(lst))); + self.sockets.push((token, name, MioListener::Tcp(lst))); } self.services.push(Box::new(srv)); } @@ -162,7 +162,7 @@ impl ServerBuilder { pub fn bind>(mut self, name: N, addr: U, factory: F) -> io::Result where F: ServiceFactory, - U: net::ToSocketAddrs, + U: ToSocketAddrs, { let sockets = bind_addr(addr, self.backlog)?; @@ -175,12 +175,12 @@ impl ServerBuilder { lst.local_addr()?, )); self.sockets - .push((token, name.as_ref().to_string(), StdListener::Tcp(lst))); + .push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); } Ok(self) } - #[cfg(all(unix))] + #[cfg(unix)] /// Add new unix domain service to the server. pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result where @@ -188,8 +188,6 @@ impl ServerBuilder { N: AsRef, U: AsRef, { - use std::os::unix::net::UnixListener; - // The path must not exist when we try to bind. // Try to remove it to avoid bind error. if let Err(e) = std::fs::remove_file(addr.as_ref()) { @@ -199,7 +197,7 @@ impl ServerBuilder { } } - let lst = UnixListener::bind(addr)?; + let lst = crate::socket::StdUnixListener::bind(addr)?; self.listen_uds(name, lst, factory) } @@ -210,15 +208,15 @@ impl ServerBuilder { pub fn listen_uds>( mut self, name: N, - lst: std::os::unix::net::UnixListener, + lst: crate::socket::StdUnixListener, factory: F, ) -> io::Result where F: ServiceFactory, { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::net::{IpAddr, Ipv4Addr}; let token = self.token.next(); - let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); self.services.push(StreamNewService::create( name.as_ref().to_string(), token, @@ -226,7 +224,7 @@ impl ServerBuilder { addr, )); self.sockets - .push((token, name.as_ref().to_string(), StdListener::Uds(lst))); + .push((token, name.as_ref().to_string(), MioListener::from(lst))); Ok(self) } @@ -234,7 +232,7 @@ impl ServerBuilder { pub fn listen>( mut self, name: N, - lst: net::TcpListener, + lst: StdTcpListener, factory: F, ) -> io::Result where @@ -247,8 +245,9 @@ impl ServerBuilder { factory, lst.local_addr()?, )); + self.sockets - .push((token, name.as_ref().to_string(), StdListener::Tcp(lst))); + .push((token, name.as_ref().to_string(), MioListener::from(lst))); Ok(self) } @@ -450,10 +449,10 @@ impl Future for ServerBuilder { } } -pub(super) fn bind_addr( +pub(super) fn bind_addr( addr: S, - backlog: i32, -) -> io::Result> { + backlog: u32, +) -> io::Result> { let mut err = None; let mut succ = false; let mut sockets = Vec::new(); @@ -481,14 +480,13 @@ pub(super) fn bind_addr( } } -fn create_tcp_listener(addr: net::SocketAddr, backlog: i32) -> io::Result { - let domain = match addr { - net::SocketAddr::V4(_) => Domain::ipv4(), - net::SocketAddr::V6(_) => Domain::ipv6(), +fn create_tcp_listener(addr: StdSocketAddr, backlog: u32) -> io::Result { + let socket = match addr { + StdSocketAddr::V4(_) => MioTcpSocket::new_v4()?, + StdSocketAddr::V6(_) => MioTcpSocket::new_v6()?, }; - let socket = Socket::new(domain, Type::stream(), Some(Protocol::tcp()))?; - socket.set_reuse_address(true)?; - socket.bind(&addr.into())?; - socket.listen(backlog)?; - Ok(socket.into_tcp_listener()) + + socket.set_reuseaddr(true)?; + socket.bind(addr)?; + socket.listen(backlog) } diff --git a/actix-server/src/config.rs b/actix-server/src/config.rs index 1c10e51f..6087efa9 100644 --- a/actix-server/src/config.rs +++ b/actix-server/src/config.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::future::Future; -use std::{fmt, io, net}; +use std::{fmt, io}; use actix_rt::net::TcpStream; use actix_service as actix; @@ -12,18 +12,19 @@ use crate::builder::bind_addr; use crate::service::{ BoxedServerService, InternalServiceFactory, ServerMessage, StreamService, }; +use crate::socket::{MioTcpListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; use crate::LocalBoxFuture; use crate::Token; pub struct ServiceConfig { - pub(crate) services: Vec<(String, net::TcpListener)>, + pub(crate) services: Vec<(String, MioTcpListener)>, pub(crate) apply: Option>, pub(crate) threads: usize, - pub(crate) backlog: i32, + pub(crate) backlog: u32, } impl ServiceConfig { - pub(super) fn new(threads: usize, backlog: i32) -> ServiceConfig { + pub(super) fn new(threads: usize, backlog: u32) -> ServiceConfig { ServiceConfig { threads, backlog, @@ -43,19 +44,23 @@ impl ServiceConfig { /// Add new service to server pub fn bind>(&mut self, name: N, addr: U) -> io::Result<&mut Self> where - U: net::ToSocketAddrs, + U: ToSocketAddrs, { let sockets = bind_addr(addr, self.backlog)?; for lst in sockets { - self.listen(name.as_ref(), lst); + self._listen(name.as_ref(), lst); } Ok(self) } /// Add new service to server - pub fn listen>(&mut self, name: N, lst: net::TcpListener) -> &mut Self { + pub fn listen>(&mut self, name: N, lst: StdTcpListener) -> &mut Self { + self._listen(name, MioTcpListener::from_std(lst)) + } + + fn _listen>(&mut self, name: N, lst: MioTcpListener) -> &mut Self { if self.apply.is_none() { self.apply = Some(Box::new(not_configured)); } @@ -76,7 +81,7 @@ impl ServiceConfig { pub(super) struct ConfiguredService { rt: Box, - names: HashMap, + names: HashMap, topics: HashMap, services: Vec, } @@ -91,7 +96,7 @@ impl ConfiguredService { } } - pub(super) fn stream(&mut self, token: Token, name: String, addr: net::SocketAddr) { + pub(super) fn stream(&mut self, token: Token, name: String, addr: StdSocketAddr) { self.names.insert(token, (name.clone(), addr)); self.topics.insert(name, token); self.services.push(token); diff --git a/actix-server/src/socket.rs b/actix-server/src/socket.rs index 640288f4..d9c61c75 100644 --- a/actix-server/src/socket.rs +++ b/actix-server/src/socket.rs @@ -1,143 +1,64 @@ -use std::net::{SocketAddr as StdTcpSocketAddr, TcpListener as StdTcpListener}; +pub(crate) use std::net::{ + SocketAddr as StdSocketAddr, TcpListener as StdTcpListener, ToSocketAddrs, +}; + +pub(crate) use mio::net::{TcpListener as MioTcpListener, TcpSocket as MioTcpSocket}; + +#[cfg(unix)] +pub(crate) use { + mio::net::UnixListener as MioUnixListener, + std::os::unix::net::UnixListener as StdUnixListener, +}; + use std::{fmt, io}; -#[cfg(unix)] -use std::os::unix::{ - io::{FromRawFd, IntoRawFd}, - net::{SocketAddr as StdUdsSocketAddr, UnixListener as StdUnixListener}, -}; +use actix_rt::net::TcpStream; +use mio::event::Source; +use mio::net::TcpStream as MioTcpStream; +use mio::{Interest, Registry, Token}; + #[cfg(windows)] use std::os::windows::io::{FromRawSocket, IntoRawSocket}; - -use actix_rt::net::TcpStream; #[cfg(unix)] -use actix_rt::net::UnixStream; -use mio::event::Source; -#[cfg(unix)] -use mio::net::{ - SocketAddr as MioSocketAddr, UnixListener as MioUnixListener, UnixStream as MioUnixStream, +use { + actix_rt::net::UnixStream, + mio::net::{SocketAddr as MioSocketAddr, UnixStream as MioUnixStream}, + std::os::unix::io::{FromRawFd, IntoRawFd}, }; -use mio::net::{TcpListener as MioTcpListener, TcpStream as MioTcpStream}; -use mio::{Interest, Registry, Token}; /// socket module contains a unified wrapper for Tcp/Uds listener/SocketAddr/Stream and necessary /// trait impl for registering the listener to mio::Poll and convert stream to /// `actix_rt::net::{TcpStream, UnixStream}`. -pub(crate) enum StdListener { - Tcp(StdTcpListener), - #[cfg(unix)] - Uds(StdUnixListener), -} - -impl fmt::Debug for StdListener { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - StdListener::Tcp(ref lst) => write!(f, "{:?}", lst), - #[cfg(all(unix))] - StdListener::Uds(ref lst) => write!(f, "{:?}", lst), - } - } -} - -impl fmt::Display for StdListener { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - StdListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), - #[cfg(unix)] - StdListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), - } - } -} - -pub(crate) enum SocketAddr { - Tcp(StdTcpSocketAddr), - #[cfg(unix)] - Uds(StdUdsSocketAddr), - // this is a work around. mio would return different types of SocketAddr between accept and - // local_addr methods. - #[cfg(unix)] - UdsMio(MioSocketAddr), -} - -impl fmt::Display for SocketAddr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), - #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), - #[cfg(unix)] - SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr), - } - } -} - -impl fmt::Debug for SocketAddr { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match *self { - SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), - #[cfg(unix)] - SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), - #[cfg(unix)] - SocketAddr::UdsMio(ref addr) => write!(f, "{:?}", addr), - } - } -} - -impl StdListener { - pub(crate) fn local_addr(&self) -> SocketAddr { - match self { - StdListener::Tcp(lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), - #[cfg(unix)] - StdListener::Uds(lst) => SocketAddr::Uds(lst.local_addr().unwrap()), - } - } - - pub(crate) fn into_mio_listener(self) -> std::io::Result { - match self { - StdListener::Tcp(lst) => { - // ToDo: is this non_blocking a good practice? - lst.set_nonblocking(true)?; - Ok(MioSocketListener::Tcp(MioTcpListener::from_std(lst))) - } - #[cfg(unix)] - StdListener::Uds(lst) => { - // ToDo: the same as above - lst.set_nonblocking(true)?; - Ok(MioSocketListener::Uds(MioUnixListener::from_std(lst))) - } - } - } -} - -#[derive(Debug)] -pub enum MioStream { - Tcp(MioTcpStream), - #[cfg(unix)] - Uds(MioUnixStream), -} - -pub(crate) enum MioSocketListener { +pub(crate) enum MioListener { Tcp(MioTcpListener), #[cfg(unix)] Uds(MioUnixListener), } -impl MioSocketListener { +impl MioListener { + pub(crate) fn local_addr(&self) -> SocketAddr { + match *self { + MioListener::Tcp(ref lst) => SocketAddr::Tcp(lst.local_addr().unwrap()), + #[cfg(unix)] + MioListener::Uds(ref lst) => SocketAddr::Uds(lst.local_addr().unwrap()), + } + } + pub(crate) fn accept(&self) -> io::Result> { match *self { - MioSocketListener::Tcp(ref lst) => lst + MioListener::Tcp(ref lst) => lst .accept() .map(|(stream, addr)| Some((MioStream::Tcp(stream), SocketAddr::Tcp(addr)))), #[cfg(unix)] - MioSocketListener::Uds(ref lst) => lst + MioListener::Uds(ref lst) => lst .accept() - .map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::UdsMio(addr)))), + .map(|(stream, addr)| Some((MioStream::Uds(stream), SocketAddr::Uds(addr)))), } } } -impl Source for MioSocketListener { +impl Source for MioListener { fn register( &mut self, registry: &Registry, @@ -145,9 +66,9 @@ impl Source for MioSocketListener { interests: Interest, ) -> io::Result<()> { match *self { - MioSocketListener::Tcp(ref mut lst) => lst.register(registry, token, interests), + MioListener::Tcp(ref mut lst) => lst.register(registry, token, interests), #[cfg(unix)] - MioSocketListener::Uds(ref mut lst) => lst.register(registry, token, interests), + MioListener::Uds(ref mut lst) => lst.register(registry, token, interests), } } @@ -158,17 +79,17 @@ impl Source for MioSocketListener { interests: Interest, ) -> io::Result<()> { match *self { - MioSocketListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests), + MioListener::Tcp(ref mut lst) => lst.reregister(registry, token, interests), #[cfg(unix)] - MioSocketListener::Uds(ref mut lst) => lst.reregister(registry, token, interests), + MioListener::Uds(ref mut lst) => lst.reregister(registry, token, interests), } } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { match *self { - MioSocketListener::Tcp(ref mut lst) => lst.deregister(registry), + MioListener::Tcp(ref mut lst) => lst.deregister(registry), #[cfg(unix)] - MioSocketListener::Uds(ref mut lst) => { + MioListener::Uds(ref mut lst) => { let res = lst.deregister(registry); // cleanup file path @@ -183,6 +104,72 @@ impl Source for MioSocketListener { } } +impl From for MioListener { + fn from(lst: StdTcpListener) -> Self { + MioListener::Tcp(MioTcpListener::from_std(lst)) + } +} + +#[cfg(unix)] +impl From for MioListener { + fn from(lst: StdUnixListener) -> Self { + MioListener::Uds(MioUnixListener::from_std(lst)) + } +} + +impl fmt::Debug for MioListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + MioListener::Tcp(ref lst) => write!(f, "{:?}", lst), + #[cfg(all(unix))] + MioListener::Uds(ref lst) => write!(f, "{:?}", lst), + } + } +} + +impl fmt::Display for MioListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + MioListener::Tcp(ref lst) => write!(f, "{}", lst.local_addr().ok().unwrap()), + #[cfg(unix)] + MioListener::Uds(ref lst) => write!(f, "{:?}", lst.local_addr().ok().unwrap()), + } + } +} + +pub(crate) enum SocketAddr { + Tcp(StdSocketAddr), + #[cfg(unix)] + Uds(MioSocketAddr), +} + +impl fmt::Display for SocketAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SocketAddr::Tcp(ref addr) => write!(f, "{}", addr), + #[cfg(unix)] + SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + } + } +} + +impl fmt::Debug for SocketAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SocketAddr::Tcp(ref addr) => write!(f, "{:?}", addr), + #[cfg(unix)] + SocketAddr::Uds(ref addr) => write!(f, "{:?}", addr), + } + } +} + +#[derive(Debug)] +pub enum MioStream { + Tcp(MioTcpStream), + #[cfg(unix)] + Uds(MioUnixStream), +} + /// helper trait for converting mio stream to tokio stream. pub trait FromStream: Sized { fn from_mio(sock: MioStream) -> io::Result; @@ -243,18 +230,16 @@ mod tests { #[test] fn socket_addr() { - use socket2::{Domain, SockAddr, Socket, Type}; - let addr = SocketAddr::Tcp("127.0.0.1:8080".parse().unwrap()); assert!(format!("{:?}", addr).contains("127.0.0.1:8080")); assert_eq!(format!("{}", addr), "127.0.0.1:8080"); - let addr: StdTcpSocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = Socket::new(Domain::ipv4(), Type::stream(), None).unwrap(); - socket.set_reuse_address(true).unwrap(); - socket.bind(&SockAddr::from(addr)).unwrap(); - let tcp = socket.into_tcp_listener(); - let lst = StdListener::Tcp(tcp); + let addr: StdSocketAddr = "127.0.0.1:0".parse().unwrap(); + let socket = mio::net::TcpSocket::new_v4().unwrap(); + socket.set_reuseaddr(true).unwrap(); + socket.bind(addr).unwrap(); + let tcp = socket.listen(128).unwrap(); + let lst = MioListener::Tcp(tcp); assert!(format!("{:?}", lst).contains("TcpListener")); assert!(format!("{}", lst).contains("127.0.0.1")); } @@ -263,13 +248,13 @@ mod tests { #[cfg(unix)] fn uds() { let _ = std::fs::remove_file("/tmp/sock.xxxxx"); - if let Ok(socket) = StdUnixListener::bind("/tmp/sock.xxxxx") { + if let Ok(socket) = MioUnixListener::bind("/tmp/sock.xxxxx") { let addr = socket.local_addr().expect("Couldn't get local address"); let a = SocketAddr::Uds(addr); assert!(format!("{:?}", a).contains("/tmp/sock.xxxxx")); assert!(format!("{}", a).contains("/tmp/sock.xxxxx")); - let lst = StdListener::Uds(socket); + let lst = MioListener::Uds(socket); assert!(format!("{:?}", lst).contains("/tmp/sock.xxxxx")); assert!(format!("{}", lst).contains("/tmp/sock.xxxxx")); } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index ec88e0e5..05e383f8 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -199,8 +199,7 @@ impl Worker { .collect::>(); spawn(async move { - let res = join_all(fut).await; - let res: Result, _> = res.into_iter().collect(); + let res: Result, _> = join_all(fut).await.into_iter().collect(); match res { Ok(services) => { for item in services { diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index de4e2262..b3ade4e1 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -5,14 +5,13 @@ use std::{net, thread, time}; use actix_server::Server; use actix_service::fn_service; use futures_util::future::{lazy, ok}; -use socket2::{Domain, Protocol, Socket, Type}; fn unused_addr() -> net::SocketAddr { let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = Socket::new(Domain::ipv4(), Type::stream(), Some(Protocol::tcp())).unwrap(); - socket.bind(&addr.into()).unwrap(); - socket.set_reuse_address(true).unwrap(); - let tcp = socket.into_tcp_listener(); + let socket = mio::net::TcpSocket::new_v4().unwrap(); + socket.bind(addr).unwrap(); + socket.set_reuseaddr(true).unwrap(); + let tcp = socket.listen(32).unwrap(); tcp.local_addr().unwrap() } diff --git a/actix-tls/CHANGES.md b/actix-tls/CHANGES.md index 592b96b0..99795c91 100644 --- a/actix-tls/CHANGES.md +++ b/actix-tls/CHANGES.md @@ -1,7 +1,9 @@ # Changes ## Unreleased - 2020-xx-xx - +* move from `tokio-tls` to `tokio-native-tls` for native-tls feature. +* Update `tokio-openssl` dependency to 0.5.0 +* Update `tokio-rustls` dependency to 0.20.0 ## 2.0.0 - 2020-09-03 * `nativetls::NativeTlsAcceptor` is renamed to `nativetls::Acceptor`. diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 801f6144..6e6de84e 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -32,7 +32,7 @@ openssl = ["open-ssl", "tokio-openssl"] rustls = ["rust-tls", "webpki", "webpki-roots", "tokio-rustls"] # nativetls -nativetls = ["native-tls", "tokio-tls"] +nativetls = ["native-tls", "tokio-native-tls"] [dependencies] actix-service = "1.0.0" @@ -43,20 +43,20 @@ futures-util = { version = "0.3.4", default-features = false } # openssl open-ssl = { package = "openssl", version = "0.10", optional = true } -tokio-openssl = { version = "0.4.0", optional = true } +tokio-openssl = { version = "0.5.0", optional = true } # rustls rust-tls = { package = "rustls", version = "0.18.0", optional = true } webpki = { version = "0.21", optional = true } webpki-roots = { version = "0.20", optional = true } -tokio-rustls = { version = "0.14.0", optional = true } +tokio-rustls = { version = "0.20.0", optional = true } # native-tls native-tls = { version = "0.2", optional = true } -tokio-tls = { version = "0.3", optional = true } +tokio-native-tls = { version = "0.2.0", optional = true } [dev-dependencies] -bytes = "0.5" +bytes = "0.6" log = "0.4" env_logger = "0.7" actix-testing = "1.0.0" diff --git a/actix-tls/src/nativetls.rs b/actix-tls/src/nativetls.rs index fa66bff6..eb7e268b 100644 --- a/actix-tls/src/nativetls.rs +++ b/actix-tls/src/nativetls.rs @@ -7,7 +7,7 @@ use actix_utils::counter::Counter; use futures_util::future::{self, FutureExt, LocalBoxFuture, TryFutureExt}; pub use native_tls::Error; -pub use tokio_tls::{TlsAcceptor, TlsStream}; +pub use tokio_native_tls::{TlsAcceptor, TlsStream}; use crate::MAX_CONN_COUNTER; diff --git a/actix-utils/CHANGES.md b/actix-utils/CHANGES.md index fe7aca31..032fa778 100644 --- a/actix-utils/CHANGES.md +++ b/actix-utils/CHANGES.md @@ -1,6 +1,7 @@ # Changes ## Unreleased - 2020-xx-xx +* Update `bytes` to 0.6 ## 2.0.0 - 2020-08-23 * No changes from beta 1. diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index e253545a..e9e3d3ba 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -20,7 +20,7 @@ actix-codec = "0.3.0" actix-rt = "1.1.1" actix-service = "1.0.6" bitflags = "1.2.1" -bytes = "0.5.3" +bytes = "0.6" either = "1.5.3" futures-channel = { version = "0.3.4", default-features = false } futures-sink = { version = "0.3.4", default-features = false } diff --git a/string/Cargo.toml b/string/Cargo.toml index 3653fe58..99085072 100644 --- a/string/Cargo.toml +++ b/string/Cargo.toml @@ -15,7 +15,7 @@ name = "bytestring" path = "src/lib.rs" [dependencies] -bytes = "0.5.3" +bytes = "0.6" serde = { version = "1.0", optional = true } [dev-dependencies]