rework address (now host) trait

This commit is contained in:
Rob Ede 2021-11-29 20:04:11 +00:00
parent 76c16a4e7b
commit 65474aad0d
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
20 changed files with 286 additions and 231 deletions

View File

@ -16,6 +16,7 @@
* Remove `connect::{new_connector, new_connector_factory, default_connector, default_connector_factory}` methods. [#422]
* Convert `connect::ResolverService` from enum to struct. [#422]
* Remove `connect::native_tls::Connector::service` method. [#422]
* Rename `connect::{Address => Host}` trait. [#422]
[#422]: https://github.com/actix/actix-net/pull/422

View File

@ -16,7 +16,10 @@ use actix_rt::{
time::timeout,
};
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::Counter;
use actix_utils::{
counter::Counter,
future::{ready, Ready as FutReady},
};
use derive_more::{Deref, DerefMut, From};
use futures_core::future::LocalBoxFuture;
pub use tokio_native_tls::{native_tls::Error, TlsAcceptor};
@ -117,7 +120,7 @@ impl<IO: ActixStream + 'static> ServiceFactory<IO> for Acceptor {
type Config = ();
type Service = AcceptorService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
type Future = FutReady<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
let res = MAX_CONN_COUNTER.with(|conns| {
@ -128,7 +131,7 @@ impl<IO: ActixStream + 'static> ServiceFactory<IO> for Acceptor {
})
});
Box::pin(async { res })
ready(res)
}
}

View File

@ -17,9 +17,11 @@ use actix_rt::{
time::{sleep, Sleep},
};
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard};
use actix_utils::{
counter::{Counter, CounterGuard},
future::{ready, Ready as FutReady},
};
use derive_more::{Deref, DerefMut, From};
use futures_core::future::LocalBoxFuture;
pub use openssl::ssl::{
AlpnError, Error, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
};
@ -122,7 +124,7 @@ impl<IO: ActixStream> ServiceFactory<IO> for Acceptor {
type Config = ();
type Service = AcceptorService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
type Future = FutReady<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
let res = MAX_CONN_COUNTER.with(|conns| {
@ -133,7 +135,7 @@ impl<IO: ActixStream> ServiceFactory<IO> for Acceptor {
})
});
Box::pin(async { res })
ready(res)
}
}

View File

@ -18,9 +18,11 @@ use actix_rt::{
time::{sleep, Sleep},
};
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard};
use actix_utils::{
counter::{Counter, CounterGuard},
future::{ready, Ready as FutReady},
};
use derive_more::{Deref, DerefMut, From};
use futures_core::future::LocalBoxFuture;
use pin_project_lite::pin_project;
pub use tokio_rustls::rustls::ServerConfig;
use tokio_rustls::{Accept, TlsAcceptor};
@ -120,7 +122,7 @@ impl<IO: ActixStream> ServiceFactory<IO> for Acceptor {
type Config = ();
type Service = AcceptorService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
type Future = FutReady<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
let res = MAX_CONN_COUNTER.with(|conns| {
@ -131,7 +133,7 @@ impl<IO: ActixStream> ServiceFactory<IO> for Acceptor {
})
});
Box::pin(async { res })
ready(res)
}
}

View File

@ -1,22 +0,0 @@
/// An interface for types where host parts (hostname and port) can be derived.
pub trait Address: Unpin + 'static {
/// Returns hostname part.
fn hostname(&self) -> &str;
/// Returns optional port part.
fn port(&self) -> Option<u16> {
None
}
}
impl Address for String {
fn hostname(&self) -> &str {
self
}
}
impl Address for &'static str {
fn hostname(&self) -> &str {
self
}
}

View File

@ -1,6 +1,6 @@
use derive_more::{Deref, DerefMut};
use super::Address;
use super::Host;
/// Wraps underlying I/O and the connection request that initiated it.
#[derive(Debug, Deref, DerefMut)]
@ -46,7 +46,7 @@ impl<R, IO> Connection<R, IO> {
}
}
impl<R: Address, IO> Connection<R, IO> {
impl<R: Host, IO> Connection<R, IO> {
/// Get hostname.
pub fn hostname(&self) -> &str {
self.req.hostname()

View File

@ -6,71 +6,50 @@ use std::{
use actix_rt::net::TcpStream;
use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready};
use actix_utils::future::{ok, Ready};
use futures_core::ready;
use super::{
error::ConnectError,
resolver::{Resolver, ResolverService},
tcp::{TcpConnector, TcpConnectorService},
Address, Connection, ConnectionInfo,
Connection, ConnectionInfo, Host,
};
/// Combined resolver and TCP connector service factory.
///
/// Used to create [`ConnectService`]s which receive connection information, resolve DNS if
/// required, and return a TCP stream.
#[derive(Clone, Default)]
pub struct Connector {
tcp: TcpConnector,
resolver: Resolver,
}
impl Connector {
/// Constructs new connector factory.
/// Constructs new connector factory with the given resolver.
pub fn new(resolver: Resolver) -> Self {
Connector {
tcp: TcpConnector,
resolver,
}
Connector { resolver }
}
/// Build connector service.
pub fn service(&self) -> ConnectorService {
ConnectorService {
tcp: self.tcp.service(),
tcp: TcpConnector.service(),
resolver: self.resolver.service(),
}
}
}
impl Clone for Connector {
fn clone(&self) -> Self {
Connector {
tcp: self.tcp,
resolver: self.resolver.clone(),
}
}
}
impl Default for Connector {
fn default() -> Self {
Self {
tcp: TcpConnector,
resolver: Resolver::default(),
}
}
}
impl<R: Address> ServiceFactory<ConnectionInfo<R>> for Connector {
impl<R: Host> ServiceFactory<ConnectionInfo<R>> for Connector {
type Response = Connection<R, TcpStream>;
type Error = ConnectError;
type Config = ();
type Service = ConnectorService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
let service = self.service();
Box::pin(async { Ok(service) })
ok(self.service())
}
}
@ -84,7 +63,7 @@ pub struct ConnectorService {
resolver: ResolverService,
}
impl<R: Address> Service<ConnectionInfo<R>> for ConnectorService {
impl<R: Host> Service<ConnectionInfo<R>> for ConnectorService {
type Response = Connection<R, TcpStream>;
type Error = ConnectError;
type Future = ConnectServiceResponse<R>;
@ -100,18 +79,18 @@ impl<R: Address> Service<ConnectionInfo<R>> for ConnectorService {
}
// helper enum to generic over futures of resolve and connect phase.
pub(crate) enum ConnectFuture<R: Address> {
pub(crate) enum ConnectFuture<R: Host> {
Resolve(<ResolverService as Service<ConnectionInfo<R>>>::Future),
Connect(<TcpConnectorService as Service<ConnectionInfo<R>>>::Future),
}
/// Helper enum to contain the future output of `ConnectFuture`.
pub(crate) enum ConnectOutput<R: Address> {
pub(crate) enum ConnectOutput<R: Host> {
Resolved(ConnectionInfo<R>),
Connected(Connection<R, TcpStream>),
}
impl<R: Address> ConnectFuture<R> {
impl<R: Host> ConnectFuture<R> {
fn poll_connect(
&mut self,
cx: &mut Context<'_>,
@ -127,12 +106,12 @@ impl<R: Address> ConnectFuture<R> {
}
}
pub struct ConnectServiceResponse<R: Address> {
pub struct ConnectServiceResponse<R: Host> {
fut: ConnectFuture<R>,
tcp: TcpConnectorService,
}
impl<R: Address> Future for ConnectServiceResponse<R> {
impl<R: Host> Future for ConnectServiceResponse<R> {
type Output = Result<Connection<R, TcpStream>, ConnectError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -0,0 +1,71 @@
//! The [`Host`] trait.
/// An interface for types where host parts (hostname and port) can be derived.
///
/// The [WHATWG URL Standard] defines the terminology used for this trait and its methods.
///
/// ```plain
/// +------------------------+
/// | host |
/// +-----------------+------+
/// | hostname | port |
/// | | |
/// | sub.example.com : 8080 |
/// +-----------------+------+
/// ```
///
/// [WHATWG URL Standard]: https://url.spec.whatwg.org/
pub trait Host: Unpin + 'static {
/// Extract hostname.
fn hostname(&self) -> &str;
/// Extract optional port.
fn port(&self) -> Option<u16> {
None
}
}
impl Host for String {
fn hostname(&self) -> &str {
self.split_once(':')
.map(|(hostname, _)| hostname)
.unwrap_or(self)
}
fn port(&self) -> Option<u16> {
self.split_once(':').and_then(|(_, port)| port.parse().ok())
}
}
impl Host for &'static str {
fn hostname(&self) -> &str {
self.split_once(':')
.map(|(hostname, _)| hostname)
.unwrap_or(self)
}
fn port(&self) -> Option<u16> {
self.split_once(':').and_then(|(_, port)| port.parse().ok())
}
}
#[cfg(test)]
mod tests {
use super::*;
macro_rules! assert_connection_info_eq {
($req:expr, $hostname:expr, $port:expr) => {{
assert_eq!($req.hostname(), $hostname);
assert_eq!($req.port(), $port);
}};
}
#[test]
fn host_parsing() {
assert_connection_info_eq!("example.com", "example.com", None);
assert_connection_info_eq!("example.com:8080", "example.com", Some(8080));
assert_connection_info_eq!("example:8080", "example", Some(8080));
assert_connection_info_eq!("example.com:false", "example.com", None);
assert_connection_info_eq!("example.com:false:false", "example.com", None);
}
}

View File

@ -10,7 +10,7 @@ use std::{
use super::{
connect_addrs::{ConnectAddrs, ConnectAddrsIter},
Address,
Host,
};
/// Connection request information.
@ -18,45 +18,47 @@ use super::{
/// May contain known/pre-resolved socket address(es) or a host that needs resolving with DNS.
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct ConnectionInfo<R> {
pub(crate) req: R,
pub(crate) request: R,
pub(crate) port: u16,
pub(crate) addr: ConnectAddrs,
pub(crate) local_addr: Option<IpAddr>,
}
impl<R: Address> ConnectionInfo<R> {
/// Create `Connect` instance by splitting the host at ':' and convert the second part to u16.
// TODO: assess usage and find nicer API
pub fn new(req: R) -> ConnectionInfo<R> {
let (_, port) = parse_host(req.hostname());
impl<R: Host> ConnectionInfo<R> {
/// Constructs new connection info using a request.
pub fn new(request: R) -> ConnectionInfo<R> {
let port = request.port();
ConnectionInfo {
req,
request,
port: port.unwrap_or(0),
addr: ConnectAddrs::None,
local_addr: None,
}
}
/// Create new `Connect` instance from host and socket address.
/// Constructs new connection info from request and known socket address.
///
/// Since socket address is known, Connector will skip name resolution stage.
pub fn with_addr(req: R, addr: SocketAddr) -> ConnectionInfo<R> {
/// Since socket address is known, [`Connector`](super::Connector) will skip the DNS
/// resolution step.
pub fn with_addr(request: R, addr: SocketAddr) -> ConnectionInfo<R> {
ConnectionInfo {
req,
request,
port: 0,
addr: ConnectAddrs::One(addr),
local_addr: None,
}
}
/// Set port if address does not provide one.
/// Set connection port.
///
/// If request provided a port, this will override it.
pub fn set_port(mut self, port: u16) -> Self {
self.port = port;
self
}
/// Set connect address.
/// Set connection socket address.
pub fn set_addr(mut self, addr: impl Into<Option<SocketAddr>>) -> Self {
self.addr = ConnectAddrs::from(addr.into());
self
@ -76,36 +78,46 @@ impl<R: Address> ConnectionInfo<R> {
self
}
/// Set local_addr of connect.
/// Set local address to connection with.
///
/// Useful in situations where you know the IP address bound to a particular network interface
/// and want to make sure the socket is opened through that interface.
pub fn set_local_addr(mut self, addr: impl Into<IpAddr>) -> Self {
self.local_addr = Some(addr.into());
self
}
/// Get hostname.
/// Returns a reference to the connection request.
pub fn request(&self) -> &R {
&self.request
}
/// Returns request hostname.
pub fn hostname(&self) -> &str {
self.req.hostname()
self.request.hostname()
}
/// Get request port.
/// Returns request port.
pub fn port(&self) -> u16 {
self.req.port().unwrap_or(self.port)
self.request.port().unwrap_or(self.port)
}
/**
Get resolved request addresses.
# Examples
```
# use std::net::SocketAddr;
# use actix_tls::connect::ConnectionInfo;
let addr = SocketAddr::from(([127, 0, 0, 1], 4242));
let conn = ConnectionInfo::with_addr("localhost").set_addr(None);
let mut addrs = conn.addrs();
assert!(addrs.next().is_none());
```
*/
/// Get borrowed iterator of resolved request addresses.
///
/// # Examples
/// ```
/// # use std::net::SocketAddr;
/// # use actix_tls::connect::ConnectionInfo;
/// let addr = SocketAddr::from(([127, 0, 0, 1], 4242));
///
/// let conn = ConnectionInfo::new("localhost");
/// let mut addrs = conn.addrs();
/// assert!(addrs.next().is_none());
///
/// let conn = ConnectionInfo::with_addr("localhost", addr);
/// let mut addrs = conn.addrs();
/// assert_eq!(addrs.next().unwrap(), addr);
/// ```
pub fn addrs(
&self,
) -> impl Iterator<Item = SocketAddr>
@ -121,14 +133,22 @@ impl<R: Address> ConnectionInfo<R> {
}
}
/**
Take resolved request addresses.
# Examples
```
```
*/
/// Take owned iterator resolved request addresses.
///
/// # Examples
/// ```
/// # use std::net::SocketAddr;
/// # use actix_tls::connect::ConnectionInfo;
/// let addr = SocketAddr::from(([127, 0, 0, 1], 4242));
///
/// let mut conn = ConnectionInfo::new("localhost");
/// let mut addrs = conn.take_addrs();
/// assert!(addrs.next().is_none());
///
/// let mut conn = ConnectionInfo::with_addr("localhost", addr);
/// let mut addrs = conn.take_addrs();
/// assert_eq!(addrs.next().unwrap(), addr);
/// ```
pub fn take_addrs(
&mut self,
) -> impl Iterator<Item = SocketAddr>
@ -143,54 +163,26 @@ impl<R: Address> ConnectionInfo<R> {
ConnectAddrs::Multi(addrs) => ConnectAddrsIter::MultiOwned(addrs.into_iter()),
}
}
/// Returns a reference to the connection request.
pub fn request(&self) -> &R {
&self.req
}
}
impl<R: Address> From<R> for ConnectionInfo<R> {
impl<R: Host> From<R> for ConnectionInfo<R> {
fn from(addr: R) -> Self {
ConnectionInfo::new(addr)
}
}
impl<R: Address> fmt::Display for ConnectionInfo<R> {
impl<R: Host> fmt::Display for ConnectionInfo<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:{}", self.hostname(), self.port())
}
}
fn parse_host(host: &str) -> (&str, Option<u16>) {
let mut parts_iter = host.splitn(2, ':');
match parts_iter.next() {
Some(hostname) => {
let port_str = parts_iter.next().unwrap_or("");
let port = port_str.parse::<u16>().ok();
(hostname, port)
}
None => (host, None),
}
}
#[cfg(test)]
mod tests {
use std::net::Ipv4Addr;
use super::*;
#[test]
fn test_host_parser() {
assert_eq!(parse_host("example.com"), ("example.com", None));
assert_eq!(parse_host("example.com:8080"), ("example.com", Some(8080)));
assert_eq!(parse_host("example:8080"), ("example", Some(8080)));
assert_eq!(parse_host("example.com:false"), ("example.com", None));
assert_eq!(parse_host("example.com:false:false"), ("example.com", None));
}
#[test]
fn test_addr_iter_multi() {
let localhost = SocketAddr::from((IpAddr::from(Ipv4Addr::LOCALHOST), 8080));

View File

@ -1,7 +1,7 @@
//! TCP and TLS connector services.
//!
//! # Stages of the TCP connector service:
//! 1. Resolve [`Address`] with given [`Resolver`] and collect list of socket addresses.
//! 1. Resolve [`Host`] (if needed) with given [`Resolver`] and collect list of socket addresses.
//! 1. Establish TCP connection and return [`TcpStream`].
//!
//! # Stages of TLS connector services:
@ -13,11 +13,11 @@
//! [`AsyncRead`]: actix_rt::net::AsyncRead
//! [`AsyncWrite`]: actix_rt::net::AsyncWrite
mod address;
mod connect_addrs;
mod connection;
mod connector;
mod error;
mod host;
mod info;
mod resolve;
mod resolver;
@ -39,10 +39,10 @@ pub mod rustls;
#[cfg_attr(docsrs, doc(cfg(feature = "native-tls")))]
pub mod native_tls;
pub use self::address::Address;
pub use self::connection::Connection;
pub use self::connector::{Connector, ConnectorService};
pub use self::error::ConnectError;
pub use self::host::Host;
pub use self::info::ConnectionInfo;
pub use self::resolve::Resolve;
pub use self::resolver::{Resolver, ResolverService};

View File

@ -14,7 +14,7 @@ use tokio_native_tls::{
TlsStream,
};
use crate::connect::{Address, Connection};
use crate::connect::{Connection, Host};
pub mod reexports {
//! Re-exports from `native-tls` that are useful for connectors.
@ -39,7 +39,7 @@ impl TlsConnector {
}
}
impl<R: Address, IO> ServiceFactory<Connection<R, IO>> for TlsConnector
impl<R: Host, IO> ServiceFactory<Connection<R, IO>> for TlsConnector
where
IO: ActixStream + 'static,
{
@ -59,7 +59,7 @@ where
/// As the factory and service share the same type and state.
impl<R, IO> Service<Connection<R, IO>> for TlsConnector
where
R: Address,
R: Host,
IO: ActixStream + 'static,
{
type Response = Connection<R, TlsStream<IO>>;

View File

@ -14,10 +14,10 @@ use actix_service::{Service, ServiceFactory};
use actix_utils::future::{ok, Ready};
use futures_core::ready;
use log::trace;
use openssl::ssl::{Error as SslError, HandshakeError, SslConnector, SslMethod};
use openssl::ssl::SslConnector;
use tokio_openssl::SslStream;
use crate::connect::{Address, Connection};
use crate::connect::{Connection, Host};
pub mod reexports {
//! Re-exports from `openssl` that are useful for connectors.
@ -52,7 +52,7 @@ impl Clone for Connector {
impl<R, IO> ServiceFactory<Connection<R, IO>> for Connector
where
R: Address,
R: Host,
IO: ActixStream + 'static,
{
type Response = Connection<R, SslStream<IO>>;
@ -84,7 +84,7 @@ impl Clone for ConnectorService {
impl<R, IO> Service<Connection<R, IO>> for ConnectorService
where
R: Address,
R: Host,
IO: ActixStream,
{
type Response = Connection<R, SslStream<IO>>;
@ -121,9 +121,9 @@ pub struct ConnectFut<R, IO> {
stream: Option<Connection<R, ()>>,
}
impl<R: Address, IO> Future for ConnectFut<R, IO>
impl<R: Host, IO> Future for ConnectFut<R, IO>
where
R: Address,
R: Host,
IO: ActixStream,
{
type Output = Result<Connection<R, SslStream<IO>>, io::Error>;

View File

@ -1,4 +1,4 @@
//! [`Resolve`] trait.
//! The [`Resolve`] trait.
use std::{error::Error as StdError, net::SocketAddr};
@ -6,7 +6,7 @@ use futures_core::future::LocalBoxFuture;
/// Custom async DNS resolvers.
///
/// # Usage
/// # Examples
/// ```
/// use std::net::SocketAddr;
///
@ -40,20 +40,17 @@ use futures_core::future::LocalBoxFuture;
/// }
/// }
///
/// let resolver = MyResolver {
/// let my_resolver = MyResolver {
/// trust_dns: TokioAsyncResolver::tokio_from_system_conf().unwrap(),
/// };
///
/// // construct custom resolver
/// let resolver = Resolver::new_custom(resolver);
///
/// // pass custom resolver to connector builder.
/// // connector would then be usable as a service or an `awc` connector.
/// let connector = actix_tls::connect::new_connector::<&str>(resolver.clone());
/// // wrap custom resolver
/// let resolver = Resolver::custom(my_resolver);
///
/// // resolver can be passed to connector factory where returned service factory
/// // can be used to construct new connector services.
/// let factory = actix_tls::connect::new_connector_factory::<&str>(resolver);
/// // can be used to construct new connector services for use in clients
/// let factory = actix_tls::connect::Connector::new(resolver);
/// let connector = factory.service();
/// ```
pub trait Resolve {
/// Given DNS lookup information, returns a future that completes with socket information.

View File

@ -14,7 +14,7 @@ use actix_utils::future::{ok, Ready};
use futures_core::{future::LocalBoxFuture, ready};
use log::trace;
use super::{Address, ConnectError, ConnectionInfo, Resolve};
use super::{ConnectError, ConnectionInfo, Host, Resolve};
/// DNS resolver service factory.
#[derive(Clone, Default)]
@ -36,7 +36,7 @@ impl Resolver {
}
}
impl<R: Address> ServiceFactory<ConnectionInfo<R>> for Resolver {
impl<R: Host> ServiceFactory<ConnectionInfo<R>> for Resolver {
type Response = ConnectionInfo<R>;
type Error = ConnectError;
type Config = ();
@ -81,7 +81,7 @@ impl ResolverService {
}
/// Resolve DNS with default resolver.
fn look_up<R: Address>(
fn look_up<R: Host>(
req: &ConnectionInfo<R>,
) -> JoinHandle<io::Result<IntoIter<SocketAddr>>> {
let host = req.hostname();
@ -109,32 +109,33 @@ impl ResolverService {
}
}
impl<R: Address> Service<ConnectionInfo<R>> for ResolverService {
impl<R: Host> Service<ConnectionInfo<R>> for ResolverService {
type Response = ConnectionInfo<R>;
type Error = ConnectError;
type Future = ResolverFuture<R>;
type Future = ResolverFut<R>;
actix_service::always_ready!();
fn call(&self, req: ConnectionInfo<R>) -> Self::Future {
if req.addr.is_some() {
ResolverFuture::Connected(Some(req))
ResolverFut::Connected(Some(req))
} else if let Ok(ip) = req.hostname().parse() {
let addr = SocketAddr::new(ip, req.port());
let req = req.set_addr(Some(addr));
ResolverFuture::Connected(Some(req))
ResolverFut::Connected(Some(req))
} else {
trace!("DNS resolver: resolving host {:?}", req.hostname());
match &self.kind {
ResolverKind::Default => {
let fut = Self::look_up(&req);
ResolverFuture::LookUp(fut, Some(req))
ResolverFut::LookUp(fut, Some(req))
}
ResolverKind::Custom(resolver) => {
let resolver = Rc::clone(resolver);
ResolverFuture::LookupCustom(Box::pin(async move {
ResolverFut::LookupCustom(Box::pin(async move {
let addrs = resolver
.lookup(req.hostname(), req.port())
.await
@ -154,7 +155,8 @@ impl<R: Address> Service<ConnectionInfo<R>> for ResolverService {
}
}
pub enum ResolverFuture<R: Address> {
/// Future for resolver service.
pub enum ResolverFut<R: Host> {
Connected(Option<ConnectionInfo<R>>),
LookUp(
JoinHandle<io::Result<IntoIter<SocketAddr>>>,
@ -163,7 +165,7 @@ pub enum ResolverFuture<R: Address> {
LookupCustom(LocalBoxFuture<'static, Result<ConnectionInfo<R>, ConnectError>>),
}
impl<R: Address> Future for ResolverFuture<R> {
impl<R: Host> Future for ResolverFut<R> {
type Output = Result<ConnectionInfo<R>, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -21,7 +21,7 @@ use tokio_rustls::{client::TlsStream, rustls::ClientConfig};
use tokio_rustls::{Connect as RustlsConnect, TlsConnector as RustlsTlsConnector};
use webpki_roots::TLS_SERVER_ROOTS;
use crate::connect::{Address, Connection};
use crate::connect::{Connection, Host};
pub mod reexports {
//! Re-exports from `rustls` and `webpki_roots` that are useful for connectors.
@ -66,7 +66,7 @@ impl Connector {
impl<R, IO> ServiceFactory<Connection<R, IO>> for Connector
where
R: Address,
R: Host,
IO: ActixStream + 'static,
{
type Response = Connection<R, TlsStream<IO>>;
@ -91,7 +91,7 @@ pub struct ConnectorService {
impl<R, IO> Service<Connection<R, IO>> for ConnectorService
where
R: Address,
R: Host,
IO: ActixStream,
{
type Response = Connection<R, TlsStream<IO>>;
@ -127,7 +127,7 @@ pub enum ConnectFut<R, IO> {
impl<R, IO> Future for ConnectFut<R, IO>
where
R: Address,
R: Host,
IO: ActixStream,
{
type Output = Result<Connection<R, TlsStream<IO>>, io::Error>;

View File

@ -13,12 +13,13 @@ use std::{
use actix_rt::net::{TcpSocket, TcpStream};
use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready};
use actix_utils::future::{ok, Ready};
use futures_core::ready;
use log::{error, trace};
use tokio_util::sync::ReusableBoxFuture;
use super::{
connect_addrs::ConnectAddrs, error::ConnectError, Address, Connection, ConnectionInfo,
connect_addrs::ConnectAddrs, error::ConnectError, Connection, ConnectionInfo, Host,
};
/// TCP connector service factory.
@ -26,23 +27,22 @@ use super::{
pub struct TcpConnector;
impl TcpConnector {
/// Create TCP connector service
/// Returns a new TCP connector service.
pub fn service(&self) -> TcpConnectorService {
TcpConnectorService
}
}
impl<R: Address> ServiceFactory<ConnectionInfo<R>> for TcpConnector {
impl<R: Host> ServiceFactory<ConnectionInfo<R>> for TcpConnector {
type Response = Connection<R, TcpStream>;
type Error = ConnectError;
type Config = ();
type Service = TcpConnectorService;
type InitError = ();
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future {
let service = self.service();
Box::pin(async move { Ok(service) })
ok(self.service())
}
}
@ -50,7 +50,7 @@ impl<R: Address> ServiceFactory<ConnectionInfo<R>> for TcpConnector {
#[derive(Debug, Copy, Clone)]
pub struct TcpConnectorService;
impl<R: Address> Service<ConnectionInfo<R>> for TcpConnectorService {
impl<R: Host> Service<ConnectionInfo<R>> for TcpConnectorService {
type Response = Connection<R, TcpStream>;
type Error = ConnectError;
type Future = TcpConnectorFut<R>;
@ -59,8 +59,9 @@ impl<R: Address> Service<ConnectionInfo<R>> for TcpConnectorService {
fn call(&self, req: ConnectionInfo<R>) -> Self::Future {
let port = req.port();
let ConnectionInfo {
req,
request: req,
addr,
local_addr,
..
@ -84,7 +85,7 @@ pub enum TcpConnectorFut<R> {
Error(Option<ConnectError>),
}
impl<R: Address> TcpConnectorFut<R> {
impl<R: Host> TcpConnectorFut<R> {
pub(crate) fn new(
req: R,
port: u16,
@ -130,7 +131,7 @@ impl<R: Address> TcpConnectorFut<R> {
}
}
impl<R: Address> Future for TcpConnectorFut<R> {
impl<R: Host> Future for TcpConnectorFut<R> {
type Output = Result<Connection<R, TcpStream>, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View File

@ -1,8 +1,8 @@
use http::Uri;
use super::Address;
use super::Host;
impl Address for Uri {
impl Host for Uri {
fn hostname(&self) -> &str {
self.host().unwrap_or("")
}
@ -35,9 +35,18 @@ fn scheme_to_port(scheme: Option<&str>) -> Option<u16> {
Some("mqtts") => Some(8883),
// File Transfer Protocol (FTP)
Some("ftp") => Some(1883),
Some("ftp") => Some(21),
Some("ftps") => Some(990),
// Redis
Some("redis") => Some(6379),
// MySQL
Some("mysql") => Some(3306),
// PostgreSQL
Some("postgres") => Some(5432),
_ => None,
}
}

View File

@ -7,13 +7,15 @@
feature = "openssl"
))]
extern crate tls_openssl as openssl;
use std::io::{BufReader, Write};
use actix_rt::net::TcpStream;
use actix_server::TestServer;
use actix_service::ServiceFactoryExt as _;
use actix_tls::accept::rustls::{Acceptor, TlsStream};
use actix_tls::connect::tls::openssl::SslConnector;
use actix_tls::connect::openssl::reexports::SslConnector;
use actix_utils::future::ok;
use rustls_pemfile::{certs, pkcs8_private_keys};
use tls_openssl::ssl::SslVerifyMode;
@ -53,13 +55,13 @@ fn rustls_server_config(cert: String, key: String) -> rustls::ServerConfig {
}
fn openssl_connector(cert: String, key: String) -> SslConnector {
use actix_tls::connect::tls::openssl::{SslConnector as OpensslConnector, SslMethod};
use tls_openssl::{pkey::PKey, x509::X509};
use actix_tls::connect::openssl::reexports::SslMethod;
use openssl::{pkey::PKey, x509::X509};
let cert = X509::from_pem(cert.as_bytes()).unwrap();
let key = PKey::private_key_from_pem(key.as_bytes()).unwrap();
let mut ssl = OpensslConnector::builder(SslMethod::tls()).unwrap();
let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap();
ssl.set_verify(SslVerifyMode::NONE);
ssl.set_certificate(&cert).unwrap();
ssl.set_private_key(&key).unwrap();

View File

@ -12,7 +12,7 @@ use actix_service::{fn_service, Service, ServiceFactory};
use bytes::Bytes;
use futures_util::sink::SinkExt;
use actix_tls::connect::{self as actix_connect, ConnectionInfo};
use actix_tls::connect::{ConnectError, Connection, ConnectionInfo, Connector, Host};
#[cfg(feature = "openssl")]
#[actix_rt::test]
@ -25,9 +25,9 @@ async fn test_string() {
})
});
let conn = actix_connect::default_connector();
let connector = Connector::default().service();
let addr = format!("localhost:{}", srv.port());
let con = conn.call(addr.into()).await.unwrap();
let con = connector.call(addr.into()).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
@ -42,7 +42,7 @@ async fn test_rustls_string() {
})
});
let conn = actix_connect::default_connector();
let conn = Connector::default().service();
let addr = format!("localhost:{}", srv.port());
let con = conn.call(addr.into()).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
@ -58,23 +58,29 @@ async fn test_static_str() {
})
});
let conn = actix_connect::default_connector();
let info = ConnectionInfo::with_addr("10", srv.addr());
let connector = Connector::default().service();
let conn = connector.call(info).await.unwrap();
assert_eq!(conn.peer_addr().unwrap(), srv.addr());
let con = conn
.call(ConnectionInfo::with_addr("10", srv.addr()))
.await
.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
let connect = ConnectionInfo::new(srv.host().to_owned());
let conn = actix_connect::default_connector();
let con = conn.call(connect).await;
assert!(con.is_err());
let info = ConnectionInfo::new(srv.host().to_owned());
let connector = Connector::default().service();
let conn = connector.call(info).await;
assert!(conn.is_err());
}
#[actix_rt::test]
async fn test_new_service() {
async fn service_factory() {
pub fn default_connector_factory<T: Host + 'static>() -> impl ServiceFactory<
ConnectionInfo<T>,
Config = (),
Response = Connection<T, TcpStream>,
Error = ConnectError,
InitError = (),
> {
Connector::default()
}
let srv = TestServer::with(|| {
fn_service(|io: TcpStream| async {
let mut framed = Framed::new(io, BytesCodec);
@ -83,14 +89,11 @@ async fn test_new_service() {
})
});
let factory = actix_connect::default_connector_factory();
let conn = factory.new_service(()).await.unwrap();
let con = conn
.call(ConnectionInfo::with_addr("10", srv.addr()))
.await
.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
let info = ConnectionInfo::with_addr("10", srv.addr());
let factory = default_connector_factory();
let connector = factory.new_service(()).await.unwrap();
let con = connector.call(info).await;
assert_eq!(con.unwrap().peer_addr().unwrap(), srv.addr());
}
#[cfg(all(feature = "openssl", feature = "uri"))]
@ -106,9 +109,9 @@ async fn test_openssl_uri() {
})
});
let conn = actix_connect::default_connector();
let connector = Connector::default().service();
let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
let con = conn.call(addr.into()).await.unwrap();
let con = connector.call(addr.into()).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
}
@ -125,7 +128,7 @@ async fn test_rustls_uri() {
})
});
let conn = actix_connect::default_connector();
let conn = Connector::default().service();
let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
let con = conn.call(addr.into()).await.unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr());
@ -141,7 +144,7 @@ async fn test_local_addr() {
})
});
let conn = actix_connect::default_connector();
let conn = Connector::default().service();
let local = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 3));
let (con, _) = conn

View File

@ -10,7 +10,9 @@ use actix_server::TestServer;
use actix_service::{fn_service, Service, ServiceFactory};
use futures_core::future::LocalBoxFuture;
use actix_tls::connect::{new_connector_factory, ConnectionInfo, Resolve, ResolverService};
use actix_tls::connect::{
ConnectError, Connection, ConnectionInfo, Connector, Host, Resolve, Resolver,
};
#[actix_rt::test]
async fn custom_resolver() {
@ -36,6 +38,18 @@ async fn custom_resolver() {
#[actix_rt::test]
async fn custom_resolver_connect() {
pub fn connector_factory<T: Host + 'static>(
resolver: Resolver,
) -> impl ServiceFactory<
ConnectionInfo<T>,
Config = (),
Response = Connection<T, TcpStream>,
Error = ConnectError,
InitError = (),
> {
Connector::new(resolver)
}
use trust_dns_resolver::TokioAsyncResolver;
let srv =
@ -68,8 +82,7 @@ async fn custom_resolver_connect() {
trust_dns: TokioAsyncResolver::tokio_from_system_conf().unwrap(),
};
let resolver = ResolverService::custom(resolver);
let factory = new_connector_factory(resolver);
let factory = connector_factory(Resolver::custom(resolver));
let conn = factory.new_service(()).await.unwrap();
let con = conn