document acceptors

This commit is contained in:
Rob Ede 2021-11-29 13:50:55 +00:00
parent 5600a2df7a
commit 1256af5671
No known key found for this signature in database
GPG Key ID: 97C636207D3EF933
14 changed files with 241 additions and 214 deletions

View File

@ -1,9 +1,13 @@
# Changes
## Unreleased - 2021-xx-xx
* `impl Default` for `connect::Resolver`. [#???]
* Implement `Default` for `connect::Resolver`. [#422]
* Derive `Debug` for `connect::Connection`. [#422]
* Remove redundant `connect::Connection::from_parts` method. [#422]
* Rename TLS acceptor service future types and hide from docs. [#422]
* Implement `Error` for `ConnectError`. [#422]
[#???]: https://github.com/actix/actix-net/pull/???
[#422]: https://github.com/actix/actix-net/pull/422
## 3.0.0-beta.9 - 2021-11-22
@ -44,7 +48,7 @@
* Remove `connect::ssl::openssl::OpensslConnectService`. [#297]
* Add `connect::ssl::native_tls` module for native tls support. [#295]
* Rename `accept::{nativetls => native_tls}`. [#295]
* Remove `connect::TcpConnectService` type. service caller expect a `TcpStream` should use
* Remove `connect::TcpConnectService` type. Service caller expecting a `TcpStream` should use
`connect::ConnectService` instead and call `Connection<T, TcpStream>::into_parts`. [#299]
[#295]: https://github.com/actix/actix-net/pull/295

View File

@ -20,7 +20,7 @@ name = "actix_tls"
path = "src/lib.rs"
[features]
default = ["accept", "connect", "uri"]
default = ["accept", "connect", "uri", "rustls", "openssl", "native-tls"]
# enable acceptor services
accept = []

View File

@ -1,3 +1,5 @@
//! Native-TLS based acceptor service.
use std::{
convert::Infallible,
io::{self, IoSlice},
@ -20,30 +22,30 @@ pub use tokio_native_tls::{native_tls::Error, TlsAcceptor};
use super::{TlsError, DEFAULT_TLS_HANDSHAKE_TIMEOUT, MAX_CONN_COUNTER};
/// Wrapper type for `tokio_native_tls::TlsStream` in order to impl `ActixStream` trait.
pub struct TlsStream<T>(tokio_native_tls::TlsStream<T>);
/// Wraps a [`tokio_native_tls::TlsStream`] in order to impl [`ActixStream`] trait.
pub struct TlsStream<IO>(tokio_native_tls::TlsStream<IO>);
impl<T> From<tokio_native_tls::TlsStream<T>> for TlsStream<T> {
fn from(stream: tokio_native_tls::TlsStream<T>) -> Self {
impl<IO> From<tokio_native_tls::TlsStream<IO>> for TlsStream<IO> {
fn from(stream: tokio_native_tls::TlsStream<IO>) -> Self {
Self(stream)
}
}
impl<T: ActixStream> Deref for TlsStream<T> {
type Target = tokio_native_tls::TlsStream<T>;
impl<IO: ActixStream> Deref for TlsStream<IO> {
type Target = tokio_native_tls::TlsStream<IO>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: ActixStream> DerefMut for TlsStream<T> {
impl<IO: ActixStream> DerefMut for TlsStream<IO> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: ActixStream> AsyncRead for TlsStream<T> {
impl<IO: ActixStream> AsyncRead for TlsStream<IO> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -53,7 +55,7 @@ impl<T: ActixStream> AsyncRead for TlsStream<T> {
}
}
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
impl<IO: ActixStream> AsyncWrite for TlsStream<IO> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -83,13 +85,13 @@ impl<T: ActixStream> AsyncWrite for TlsStream<T> {
}
}
impl<T: ActixStream> ActixStream for TlsStream<T> {
impl<IO: ActixStream> ActixStream for TlsStream<IO> {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
T::poll_read_ready((&**self).get_ref().get_ref().get_ref(), cx)
IO::poll_read_ready((&**self).get_ref().get_ref().get_ref(), cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
T::poll_write_ready((&**self).get_ref().get_ref().get_ref(), cx)
IO::poll_write_ready((&**self).get_ref().get_ref().get_ref(), cx)
}
}
@ -130,8 +132,8 @@ impl Clone for Acceptor {
}
}
impl<T: ActixStream + 'static> ServiceFactory<T> for Acceptor {
type Response = TlsStream<T>;
impl<IO: ActixStream + 'static> ServiceFactory<IO> for Acceptor {
type Response = TlsStream<IO>;
type Error = TlsError<Error, Infallible>;
type Config = ();
type Service = NativeTlsAcceptorService;
@ -151,14 +153,15 @@ impl<T: ActixStream + 'static> ServiceFactory<T> for Acceptor {
}
}
/// Native-TLS based acceptor service.
pub struct NativeTlsAcceptorService {
acceptor: TlsAcceptor,
conns: Counter,
handshake_timeout: Duration,
}
impl<T: ActixStream + 'static> Service<T> for NativeTlsAcceptorService {
type Response = TlsStream<T>;
impl<IO: ActixStream + 'static> Service<IO> for NativeTlsAcceptorService {
type Response = TlsStream<IO>;
type Error = TlsError<Error, Infallible>;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
@ -170,7 +173,7 @@ impl<T: ActixStream + 'static> Service<T> for NativeTlsAcceptorService {
}
}
fn call(&self, io: T) -> Self::Future {
fn call(&self, io: IO) -> Self::Future {
let guard = self.conns.get();
let acceptor = self.acceptor.clone();

View File

@ -1,3 +1,5 @@
//! OpenSSL based acceptor service.
use std::{
convert::Infallible,
future::Future,
@ -16,7 +18,6 @@ use actix_rt::{
use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard};
use futures_core::future::LocalBoxFuture;
pub use openssl::ssl::{
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
};
@ -24,30 +25,30 @@ use pin_project_lite::pin_project;
use super::{TlsError, DEFAULT_TLS_HANDSHAKE_TIMEOUT, MAX_CONN_COUNTER};
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
pub struct TlsStream<T>(tokio_openssl::SslStream<T>);
/// Wraps a [`tokio_openssl::SslStream`] in order to impl [`ActixStream`] trait.
pub struct TlsStream<IO>(tokio_openssl::SslStream<IO>);
impl<T> From<tokio_openssl::SslStream<T>> for TlsStream<T> {
fn from(stream: tokio_openssl::SslStream<T>) -> Self {
impl<IO> From<tokio_openssl::SslStream<IO>> for TlsStream<IO> {
fn from(stream: tokio_openssl::SslStream<IO>) -> Self {
Self(stream)
}
}
impl<T> Deref for TlsStream<T> {
type Target = tokio_openssl::SslStream<T>;
impl<IO> Deref for TlsStream<IO> {
type Target = tokio_openssl::SslStream<IO>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for TlsStream<T> {
impl<IO> DerefMut for TlsStream<IO> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: ActixStream> AsyncRead for TlsStream<T> {
impl<IO: ActixStream> AsyncRead for TlsStream<IO> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -57,7 +58,7 @@ impl<T: ActixStream> AsyncRead for TlsStream<T> {
}
}
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
impl<IO: ActixStream> AsyncWrite for TlsStream<IO> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -87,13 +88,13 @@ impl<T: ActixStream> AsyncWrite for TlsStream<T> {
}
}
impl<T: ActixStream> ActixStream for TlsStream<T> {
impl<IO: ActixStream> ActixStream for TlsStream<IO> {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
T::poll_read_ready((&**self).get_ref(), cx)
IO::poll_read_ready((&**self).get_ref(), cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
T::poll_write_ready((&**self).get_ref(), cx)
IO::poll_write_ready((&**self).get_ref(), cx)
}
}
@ -134,8 +135,8 @@ impl Clone for Acceptor {
}
}
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
type Response = TlsStream<T>;
impl<IO: ActixStream> ServiceFactory<IO> for Acceptor {
type Response = TlsStream<IO>;
type Error = TlsError<SslError, Infallible>;
type Config = ();
type Service = AcceptorService;
@ -155,16 +156,17 @@ impl<T: ActixStream> ServiceFactory<T> for Acceptor {
}
}
/// OpenSSL based acceptor service.
pub struct AcceptorService {
acceptor: SslAcceptor,
conns: Counter,
handshake_timeout: Duration,
}
impl<T: ActixStream> Service<T> for AcceptorService {
type Response = TlsStream<T>;
impl<IO: ActixStream> Service<IO> for AcceptorService {
type Response = TlsStream<IO>;
type Error = TlsError<SslError, Infallible>;
type Future = AcceptorServiceResponse<T>;
type Future = AcceptFut<IO>;
fn poll_ready(&self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(ctx) {
@ -174,11 +176,11 @@ impl<T: ActixStream> Service<T> for AcceptorService {
}
}
fn call(&self, io: T) -> Self::Future {
fn call(&self, io: IO) -> Self::Future {
let ssl_ctx = self.acceptor.context();
let ssl = Ssl::new(ssl_ctx).expect("Provided SSL acceptor was invalid.");
AcceptorServiceResponse {
AcceptFut {
_guard: self.conns.get(),
timeout: sleep(self.handshake_timeout),
stream: Some(tokio_openssl::SslStream::new(ssl, io).unwrap()),
@ -187,16 +189,18 @@ impl<T: ActixStream> Service<T> for AcceptorService {
}
pin_project! {
pub struct AcceptorServiceResponse<T: ActixStream> {
stream: Option<tokio_openssl::SslStream<T>>,
/// Accept future for Rustls service.
#[doc(hidden)]
pub struct AcceptFut<IO: ActixStream> {
stream: Option<tokio_openssl::SslStream<IO>>,
#[pin]
timeout: Sleep,
_guard: CounterGuard,
}
}
impl<T: ActixStream> Future for AcceptorServiceResponse<T> {
type Output = Result<TlsStream<T>, TlsError<SslError, Infallible>>;
impl<IO: ActixStream> Future for AcceptFut<IO> {
type Output = Result<TlsStream<IO>, TlsError<SslError, Infallible>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

View File

@ -1,3 +1,5 @@
//! Rustls based acceptor service.
use std::{
convert::Infallible,
future::Future,
@ -18,36 +20,35 @@ use actix_service::{Service, ServiceFactory};
use actix_utils::counter::{Counter, CounterGuard};
use futures_core::future::LocalBoxFuture;
use pin_project_lite::pin_project;
use tokio_rustls::{Accept, TlsAcceptor};
pub use tokio_rustls::rustls::ServerConfig;
use tokio_rustls::{Accept, TlsAcceptor};
use super::{TlsError, DEFAULT_TLS_HANDSHAKE_TIMEOUT, MAX_CONN_COUNTER};
/// Wrapper type for `tokio_openssl::SslStream` in order to impl `ActixStream` trait.
pub struct TlsStream<T>(tokio_rustls::server::TlsStream<T>);
/// Wraps a [`tokio_rustls::server::TlsStream`] in order to impl [`ActixStream`] trait.
pub struct TlsStream<IO>(tokio_rustls::server::TlsStream<IO>);
impl<T> From<tokio_rustls::server::TlsStream<T>> for TlsStream<T> {
fn from(stream: tokio_rustls::server::TlsStream<T>) -> Self {
impl<IO> From<tokio_rustls::server::TlsStream<IO>> for TlsStream<IO> {
fn from(stream: tokio_rustls::server::TlsStream<IO>) -> Self {
Self(stream)
}
}
impl<T> Deref for TlsStream<T> {
type Target = tokio_rustls::server::TlsStream<T>;
impl<IO> Deref for TlsStream<IO> {
type Target = tokio_rustls::server::TlsStream<IO>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T> DerefMut for TlsStream<T> {
impl<IO> DerefMut for TlsStream<IO> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<T: ActixStream> AsyncRead for TlsStream<T> {
impl<IO: ActixStream> AsyncRead for TlsStream<IO> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -57,7 +58,7 @@ impl<T: ActixStream> AsyncRead for TlsStream<T> {
}
}
impl<T: ActixStream> AsyncWrite for TlsStream<T> {
impl<IO: ActixStream> AsyncWrite for TlsStream<IO> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -87,13 +88,13 @@ impl<T: ActixStream> AsyncWrite for TlsStream<T> {
}
}
impl<T: ActixStream> ActixStream for TlsStream<T> {
impl<IO: ActixStream> ActixStream for TlsStream<IO> {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
T::poll_read_ready((&**self).get_ref().0, cx)
IO::poll_read_ready((&**self).get_ref().0, cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
T::poll_write_ready((&**self).get_ref().0, cx)
IO::poll_write_ready((&**self).get_ref().0, cx)
}
}
@ -134,8 +135,8 @@ impl Clone for Acceptor {
}
}
impl<T: ActixStream> ServiceFactory<T> for Acceptor {
type Response = TlsStream<T>;
impl<IO: ActixStream> ServiceFactory<IO> for Acceptor {
type Response = TlsStream<IO>;
type Error = TlsError<io::Error, Infallible>;
type Config = ();
type Service = AcceptorService;
@ -155,17 +156,17 @@ impl<T: ActixStream> ServiceFactory<T> for Acceptor {
}
}
/// Rustls based `Acceptor` service
/// Rustls based acceptor service.
pub struct AcceptorService {
acceptor: TlsAcceptor,
conns: Counter,
handshake_timeout: Duration,
}
impl<T: ActixStream> Service<T> for AcceptorService {
type Response = TlsStream<T>;
impl<IO: ActixStream> Service<IO> for AcceptorService {
type Response = TlsStream<IO>;
type Error = TlsError<io::Error, Infallible>;
type Future = AcceptorServiceFut<T>;
type Future = AcceptFut<IO>;
fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) {
@ -175,8 +176,8 @@ impl<T: ActixStream> Service<T> for AcceptorService {
}
}
fn call(&self, req: T) -> Self::Future {
AcceptorServiceFut {
fn call(&self, req: IO) -> Self::Future {
AcceptFut {
fut: self.acceptor.accept(req),
timeout: sleep(self.handshake_timeout),
_guard: self.conns.get(),
@ -185,16 +186,18 @@ impl<T: ActixStream> Service<T> for AcceptorService {
}
pin_project! {
pub struct AcceptorServiceFut<T: ActixStream> {
fut: Accept<T>,
/// Accept future for Rustls service.
#[doc(hidden)]
pub struct AcceptFut<IO: ActixStream> {
fut: Accept<IO>,
#[pin]
timeout: Sleep,
_guard: CounterGuard,
}
}
impl<T: ActixStream> Future for AcceptorServiceFut<T> {
type Output = Result<TlsStream<T>, TlsError<io::Error, Infallible>>;
impl<IO: ActixStream> Future for AcceptFut<IO> {
type Output = Result<TlsStream<IO>, TlsError<io::Error, Infallible>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

View File

@ -4,6 +4,7 @@ use std::{
iter::{self, FromIterator as _},
mem,
net::{IpAddr, SocketAddr},
ops,
};
/// Parse a host into parts (hostname and port).
@ -218,71 +219,68 @@ impl iter::ExactSizeIterator for ConnectAddrsIter<'_> {}
impl iter::FusedIterator for ConnectAddrsIter<'_> {}
pub struct Connection<T, U> {
io: U,
req: T,
/// Holds underlying I/O and original connection request.
#[derive(Debug)]
pub struct Connection<R, IO> {
req: R,
io: IO,
}
impl<T, U> Connection<T, U> {
pub fn new(io: U, req: T) -> Self {
impl<R, IO> Connection<R, IO> {
/// Construct new `Connection` from
pub fn new(io: IO, req: R) -> Self {
Self { io, req }
}
}
impl<T, U> Connection<T, U> {
/// Reconstruct from a parts.
pub fn from_parts(io: U, req: T) -> Self {
Self { io, req }
}
/// Deconstruct into a parts.
pub fn into_parts(self) -> (U, T) {
impl<R, IO> Connection<R, IO> {
/// Deconstructs into parts.
pub fn into_parts(self) -> (IO, R) {
(self.io, self.req)
}
/// Replace inclosed object, return new Stream and old object
pub fn replace_io<Y>(self, io: Y) -> (U, Connection<T, Y>) {
/// Replaces underlying IO, returning old UI and new `Connection`.
pub fn replace_io<IO2>(self, io: IO2) -> (IO, Connection<R, IO2>) {
(self.io, Connection { io, req: self.req })
}
/// Returns a shared reference to the underlying stream.
pub fn io_ref(&self) -> &U {
/// Returns a shared reference to the underlying IO.
pub fn io_ref(&self) -> &IO {
&self.io
}
/// Returns a mutable reference to the underlying stream.
pub fn io_mut(&mut self) -> &mut U {
/// Returns a mutable reference to the underlying IO.
pub fn io_mut(&mut self) -> &mut IO {
&mut self.io
}
/// Returns a reference to the connection request.
pub fn request(&self) -> &R {
&self.req
}
}
impl<T: Address, U> Connection<T, U> {
impl<R: Address, IO> Connection<R, IO> {
/// Get hostname.
pub fn host(&self) -> &str {
self.req.hostname()
}
}
impl<T, U> std::ops::Deref for Connection<T, U> {
type Target = U;
impl<R, IO> ops::Deref for Connection<R, IO> {
type Target = IO;
fn deref(&self) -> &U {
fn deref(&self) -> &IO {
&self.io
}
}
impl<T, U> std::ops::DerefMut for Connection<T, U> {
fn deref_mut(&mut self) -> &mut U {
impl<R, IO> ops::DerefMut for Connection<R, IO> {
fn deref_mut(&mut self) -> &mut IO {
&mut self.io
}
}
impl<T, U: fmt::Debug> fmt::Debug for Connection<T, U> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Stream {{{:?}}}", self.io)
}
}
fn parse_host(host: &str) -> (&str, Option<u16>) {
let mut parts_iter = host.splitn(2, ':');

View File

@ -13,8 +13,10 @@ use futures_core::{future::LocalBoxFuture, ready};
use log::{error, trace};
use tokio_util::sync::ReusableBoxFuture;
use super::connect::{Address, Connect, ConnectAddrs, Connection};
use super::error::ConnectError;
use super::{
connect::{Address, Connect, ConnectAddrs, Connection},
error::ConnectError,
};
/// TCP connector service factory
#[derive(Debug, Copy, Clone)]
@ -27,8 +29,8 @@ impl TcpConnectorFactory {
}
}
impl<T: Address> ServiceFactory<Connect<T>> for TcpConnectorFactory {
type Response = Connection<T, TcpStream>;
impl<R: Address> ServiceFactory<Connect<R>> for TcpConnectorFactory {
type Response = Connection<R, TcpStream>;
type Error = ConnectError;
type Config = ();
type Service = TcpConnector;
@ -41,18 +43,18 @@ impl<T: Address> ServiceFactory<Connect<T>> for TcpConnectorFactory {
}
}
/// TCP connector service
/// TCP connector service.
#[derive(Debug, Copy, Clone)]
pub struct TcpConnector;
impl<T: Address> Service<Connect<T>> for TcpConnector {
type Response = Connection<T, TcpStream>;
impl<R: Address> Service<Connect<R>> for TcpConnector {
type Response = Connection<R, TcpStream>;
type Error = ConnectError;
type Future = TcpConnectorResponse<T>;
type Future = TcpConnectorResponse<R>;
actix_service::always_ready!();
fn call(&self, req: Connect<T>) -> Self::Future {
fn call(&self, req: Connect<R>) -> Self::Future {
let port = req.port();
let Connect {
req,
@ -66,9 +68,9 @@ impl<T: Address> Service<Connect<T>> for TcpConnector {
}
/// TCP stream connector response future
pub enum TcpConnectorResponse<T> {
pub enum TcpConnectorResponse<R> {
Response {
req: Option<T>,
req: Option<R>,
port: u16,
local_addr: Option<IpAddr>,
addrs: Option<VecDeque<SocketAddr>>,
@ -77,13 +79,13 @@ pub enum TcpConnectorResponse<T> {
Error(Option<ConnectError>),
}
impl<T: Address> TcpConnectorResponse<T> {
impl<R: Address> TcpConnectorResponse<R> {
pub(crate) fn new(
req: T,
req: R,
port: u16,
local_addr: Option<IpAddr>,
addr: ConnectAddrs,
) -> TcpConnectorResponse<T> {
) -> TcpConnectorResponse<R> {
if addr.is_none() {
error!("TCP connector: unresolved connection address");
return TcpConnectorResponse::Error(Some(ConnectError::Unresolved));
@ -123,8 +125,8 @@ impl<T: Address> TcpConnectorResponse<T> {
}
}
impl<T: Address> Future for TcpConnectorResponse<T> {
type Output = Result<Connection<T, TcpStream>, ConnectError>;
impl<R: Address> Future for TcpConnectorResponse<R> {
type Output = Result<Connection<R, TcpStream>, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.get_mut() {

View File

@ -1,15 +1,16 @@
use std::io;
use std::{error::Error, io};
use derive_more::Display;
/// Errors that can result from using a connector service.
#[derive(Debug, Display)]
pub enum ConnectError {
/// Failed to resolve the hostname
#[display(fmt = "Failed resolving hostname: {}", _0)]
#[display(fmt = "Failed resolving hostname")]
Resolver(Box<dyn std::error::Error>),
/// No dns records
#[display(fmt = "No dns records found for the input")]
/// No DNS records
#[display(fmt = "No DNS records found for the input")]
NoRecords,
/// Invalid input
@ -23,3 +24,13 @@ pub enum ConnectError {
#[display(fmt = "{}", _0)]
Io(io::Error),
}
impl Error for ConnectError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Resolver(err) => Some(&**err),
Self::Io(err) => Some(err),
Self::NoRecords | Self::InvalidInput | Self::Unresolved => None,
}
}
}

View File

@ -34,8 +34,8 @@ impl ResolverFactory {
}
}
impl<T: Address> ServiceFactory<Connect<T>> for ResolverFactory {
type Response = Connect<T>;
impl<R: Address> ServiceFactory<Connect<R>> for ResolverFactory {
type Response = Connect<R>;
type Error = ConnectError;
type Config = ();
type Service = Resolver;
@ -92,7 +92,7 @@ impl<T: Address> ServiceFactory<Connect<T>> for ResolverFactory {
/// let resolver = Resolver::new_custom(resolver);
///
/// // pass custom resolver to connector builder.
/// // connector would then be usable as a service or `awc`'s connector.
/// // connector would then be usable as a service or an `awc` connector.
/// let connector = actix_tls::connect::new_connector::<&str>(resolver.clone());
///
/// // resolver can be passed to connector factory where returned service factory
@ -100,7 +100,7 @@ impl<T: Address> ServiceFactory<Connect<T>> for ResolverFactory {
/// let factory = actix_tls::connect::new_connector_factory::<&str>(resolver);
/// ```
pub trait Resolve {
/// Given DNS lookup information, returns a futures that completes with socket information.
/// Given DNS lookup information, returns a future that completes with socket information.
fn lookup<'a>(
&'a self,
host: &'a str,
@ -132,8 +132,8 @@ impl Resolver {
Self::Custom(Rc::new(resolver))
}
// look up with default resolver
fn look_up<T: Address>(req: &Connect<T>) -> JoinHandle<io::Result<IntoIter<SocketAddr>>> {
/// Resolve DNS with default resolver.
fn look_up<R: Address>(req: &Connect<R>) -> JoinHandle<io::Result<IntoIter<SocketAddr>>> {
let host = req.hostname();
// TODO: Connect should always return host(name?) with port if possible; basically try to
// reduce ability to create conflicting lookup info by having port in host string being
@ -153,19 +153,20 @@ impl Resolver {
format!("{}:{}", host, req.port())
};
// run blocking DNS lookup in thread pool
// run blocking DNS lookup in thread pool since DNS lookups can take upwards of seconds on
// some platforms if conditions are poor and OS-level cache is not populated
spawn_blocking(move || std::net::ToSocketAddrs::to_socket_addrs(&host))
}
}
impl<T: Address> Service<Connect<T>> for Resolver {
type Response = Connect<T>;
impl<R: Address> Service<Connect<R>> for Resolver {
type Response = Connect<R>;
type Error = ConnectError;
type Future = ResolverFuture<T>;
type Future = ResolverFuture<R>;
actix_service::always_ready!();
fn call(&self, req: Connect<T>) -> Self::Future {
fn call(&self, req: Connect<R>) -> Self::Future {
if req.addr.is_some() {
ResolverFuture::Connected(Some(req))
} else if let Ok(ip) = req.hostname().parse() {
@ -203,17 +204,17 @@ impl<T: Address> Service<Connect<T>> for Resolver {
}
}
pub enum ResolverFuture<T: Address> {
Connected(Option<Connect<T>>),
pub enum ResolverFuture<R: Address> {
Connected(Option<Connect<R>>),
LookUp(
JoinHandle<io::Result<IntoIter<SocketAddr>>>,
Option<Connect<T>>,
Option<Connect<R>>,
),
LookupCustom(LocalBoxFuture<'static, Result<Connect<T>, ConnectError>>),
LookupCustom(LocalBoxFuture<'static, Result<Connect<R>, ConnectError>>),
}
impl<T: Address> Future for ResolverFuture<T> {
type Output = Result<Connect<T>, ConnectError>;
impl<R: Address> Future for ResolverFuture<R> {
type Output = Result<Connect<R>, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.get_mut() {

View File

@ -19,7 +19,7 @@ pub struct ConnectServiceFactory {
}
impl ConnectServiceFactory {
/// Construct new ConnectService factory
/// Constructs new ConnectService factory.
pub fn new(resolver: Resolver) -> Self {
ConnectServiceFactory {
tcp: TcpConnectorFactory,
@ -27,7 +27,7 @@ impl ConnectServiceFactory {
}
}
/// Construct new service
/// Constructs new service.
pub fn service(&self) -> ConnectService {
ConnectService {
tcp: self.tcp.service(),
@ -45,8 +45,8 @@ impl Clone for ConnectServiceFactory {
}
}
impl<T: Address> ServiceFactory<Connect<T>> for ConnectServiceFactory {
type Response = Connection<T, TcpStream>;
impl<R: Address> ServiceFactory<Connect<R>> for ConnectServiceFactory {
type Response = Connection<R, TcpStream>;
type Error = ConnectError;
type Config = ();
type Service = ConnectService;
@ -65,14 +65,14 @@ pub struct ConnectService {
resolver: Resolver,
}
impl<T: Address> Service<Connect<T>> for ConnectService {
type Response = Connection<T, TcpStream>;
impl<R: Address> Service<Connect<R>> for ConnectService {
type Response = Connection<R, TcpStream>;
type Error = ConnectError;
type Future = ConnectServiceResponse<T>;
type Future = ConnectServiceResponse<R>;
actix_service::always_ready!();
fn call(&self, req: Connect<T>) -> Self::Future {
fn call(&self, req: Connect<R>) -> Self::Future {
ConnectServiceResponse {
fut: ConnectFuture::Resolve(self.resolver.call(req)),
tcp: self.tcp,
@ -81,22 +81,22 @@ impl<T: Address> Service<Connect<T>> for ConnectService {
}
// helper enum to generic over futures of resolve and connect phase.
pub(crate) enum ConnectFuture<T: Address> {
Resolve(<Resolver as Service<Connect<T>>>::Future),
Connect(<TcpConnector as Service<Connect<T>>>::Future),
pub(crate) enum ConnectFuture<R: Address> {
Resolve(<Resolver as Service<Connect<R>>>::Future),
Connect(<TcpConnector as Service<Connect<R>>>::Future),
}
// helper enum to contain the future output of ConnectFuture
pub(crate) enum ConnectOutput<T: Address> {
Resolved(Connect<T>),
Connected(Connection<T, TcpStream>),
/// Helper enum to contain the future output of `ConnectFuture`.
pub(crate) enum ConnectOutput<R: Address> {
Resolved(Connect<R>),
Connected(Connection<R, TcpStream>),
}
impl<T: Address> ConnectFuture<T> {
impl<R: Address> ConnectFuture<R> {
fn poll_connect(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Result<ConnectOutput<T>, ConnectError>> {
) -> Poll<Result<ConnectOutput<R>, ConnectError>> {
match self {
ConnectFuture::Resolve(ref mut fut) => {
Pin::new(fut).poll(cx).map_ok(ConnectOutput::Resolved)
@ -108,13 +108,13 @@ impl<T: Address> ConnectFuture<T> {
}
}
pub struct ConnectServiceResponse<T: Address> {
fut: ConnectFuture<T>,
pub struct ConnectServiceResponse<R: Address> {
fut: ConnectFuture<R>,
tcp: TcpConnector,
}
impl<T: Address> Future for ConnectServiceResponse<T> {
type Output = Result<Connection<T, TcpStream>, ConnectError>;
impl<R: Address> Future for ConnectServiceResponse<R> {
type Output = Result<Connection<R, TcpStream>, ConnectError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {

View File

@ -4,20 +4,20 @@ use actix_service::{Service, ServiceFactory};
use super::{Address, Connect, ConnectError, ConnectServiceFactory, Connection, Resolver};
/// Create TCP connector service.
pub fn new_connector<T: Address + 'static>(
pub fn new_connector<R: Address + 'static>(
resolver: Resolver,
) -> impl Service<Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> + Clone
) -> impl Service<Connect<R>, Response = Connection<R, TcpStream>, Error = ConnectError> + Clone
{
ConnectServiceFactory::new(resolver).service()
}
/// Create TCP connector service factory.
pub fn new_connector_factory<T: Address + 'static>(
pub fn new_connector_factory<R: Address + 'static>(
resolver: Resolver,
) -> impl ServiceFactory<
Connect<T>,
Connect<R>,
Config = (),
Response = Connection<T, TcpStream>,
Response = Connection<R, TcpStream>,
Error = ConnectError,
InitError = (),
> + Clone {
@ -25,17 +25,17 @@ pub fn new_connector_factory<T: Address + 'static>(
}
/// Create TCP connector service with default parameters.
pub fn default_connector<T: Address + 'static>(
) -> impl Service<Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> + Clone
pub fn default_connector<R: Address + 'static>(
) -> impl Service<Connect<R>, Response = Connection<R, TcpStream>, Error = ConnectError> + Clone
{
new_connector(Resolver::Default)
}
/// Create TCP connector service factory with default parameters.
pub fn default_connector_factory<T: Address + 'static>() -> impl ServiceFactory<
Connect<T>,
pub fn default_connector_factory<R: Address + 'static>() -> impl ServiceFactory<
Connect<R>,
Config = (),
Response = Connection<T, TcpStream>,
Response = Connection<R, TcpStream>,
Error = ConnectError,
InitError = (),
> + Clone {

View File

@ -37,11 +37,11 @@ impl Clone for NativetlsConnector {
}
}
impl<T: Address, U> ServiceFactory<Connection<T, U>> for NativetlsConnector
impl<R: Address, IO> ServiceFactory<Connection<R, IO>> for NativetlsConnector
where
U: ActixStream + 'static,
IO: ActixStream + 'static,
{
type Response = Connection<T, TlsStream<U>>;
type Response = Connection<R, TlsStream<IO>>;
type Error = io::Error;
type Config = ();
type Service = Self;
@ -56,20 +56,21 @@ where
// NativetlsConnector is both it's ServiceFactory and Service impl type.
// As the factory and service share the same type and state.
impl<T, U> Service<Connection<T, U>> for NativetlsConnector
impl<R, IO> Service<Connection<R, IO>> for NativetlsConnector
where
T: Address,
U: ActixStream + 'static,
R: Address,
IO: ActixStream + 'static,
{
type Response = Connection<T, TlsStream<U>>;
type Response = Connection<R, TlsStream<IO>>;
type Error = io::Error;
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
actix_service::always_ready!();
fn call(&self, stream: Connection<T, U>) -> Self::Future {
fn call(&self, stream: Connection<R, IO>) -> Self::Future {
let (io, stream) = stream.replace_io(());
let connector = self.connector.clone();
Box::pin(async move {
trace!("SSL Handshake start for: {:?}", stream.host());
connector

View File

@ -38,12 +38,12 @@ impl Clone for OpensslConnector {
}
}
impl<T, U> ServiceFactory<Connection<T, U>> for OpensslConnector
impl<R, IO> ServiceFactory<Connection<R, IO>> for OpensslConnector
where
T: Address,
U: ActixStream + 'static,
R: Address,
IO: ActixStream + 'static,
{
type Response = Connection<T, SslStream<U>>;
type Response = Connection<R, SslStream<IO>>;
type Error = io::Error;
type Config = ();
type Service = OpensslConnectorService;
@ -68,18 +68,18 @@ impl Clone for OpensslConnectorService {
}
}
impl<T, U> Service<Connection<T, U>> for OpensslConnectorService
impl<R, IO> Service<Connection<R, IO>> for OpensslConnectorService
where
T: Address,
U: ActixStream,
R: Address,
IO: ActixStream,
{
type Response = Connection<T, SslStream<U>>;
type Response = Connection<R, SslStream<IO>>;
type Error = io::Error;
type Future = ConnectAsyncExt<T, U>;
type Future = ConnectAsyncExt<R, IO>;
actix_service::always_ready!();
fn call(&self, stream: Connection<T, U>) -> Self::Future {
fn call(&self, stream: Connection<R, IO>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", stream.host());
let (io, stream) = stream.replace_io(());
let host = stream.host();
@ -100,17 +100,17 @@ where
}
}
pub struct ConnectAsyncExt<T, U> {
io: Option<SslStream<U>>,
stream: Option<Connection<T, ()>>,
pub struct ConnectAsyncExt<R, IO> {
io: Option<SslStream<IO>>,
stream: Option<Connection<R, ()>>,
}
impl<T: Address, U> Future for ConnectAsyncExt<T, U>
impl<R: Address, IO> Future for ConnectAsyncExt<R, IO>
where
T: Address,
U: ActixStream,
R: Address,
IO: ActixStream,
{
type Output = Result<Connection<T, SslStream<U>>, io::Error>;
type Output = Result<Connection<R, SslStream<IO>>, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

View File

@ -15,7 +15,7 @@ use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready};
use log::trace;
use tokio_rustls::rustls::{client::ServerName, OwnedTrustAnchor, RootCertStore};
use tokio_rustls::{Connect, TlsConnector};
use tokio_rustls::{Connect as RustlsConnect, TlsConnector as RustlsTlsConnector};
use crate::connect::{Address, Connection};
@ -59,12 +59,12 @@ impl Clone for RustlsConnector {
}
}
impl<T, U> ServiceFactory<Connection<T, U>> for RustlsConnector
impl<R, IO> ServiceFactory<Connection<R, IO>> for RustlsConnector
where
T: Address,
U: ActixStream + 'static,
R: Address,
IO: ActixStream + 'static,
{
type Response = Connection<T, TlsStream<U>>;
type Response = Connection<R, TlsStream<IO>>;
type Error = io::Error;
type Config = ();
type Service = RustlsConnectorService;
@ -89,24 +89,24 @@ impl Clone for RustlsConnectorService {
}
}
impl<T, U> Service<Connection<T, U>> for RustlsConnectorService
impl<R, IO> Service<Connection<R, IO>> for RustlsConnectorService
where
T: Address,
U: ActixStream,
R: Address,
IO: ActixStream,
{
type Response = Connection<T, TlsStream<U>>;
type Response = Connection<R, TlsStream<IO>>;
type Error = io::Error;
type Future = RustlsConnectorServiceFuture<T, U>;
type Future = RustlsConnectorServiceFuture<R, IO>;
actix_service::always_ready!();
fn call(&self, connection: Connection<T, U>) -> Self::Future {
fn call(&self, connection: Connection<R, IO>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", connection.host());
let (stream, connection) = connection.replace_io(());
match ServerName::try_from(connection.host()) {
Ok(host) => RustlsConnectorServiceFuture::Future {
connect: TlsConnector::from(self.connector.clone()).connect(host, stream),
connect: RustlsTlsConnector::from(self.connector.clone()).connect(host, stream),
connection: Some(connection),
},
Err(_) => RustlsConnectorServiceFuture::InvalidDns,
@ -114,21 +114,21 @@ where
}
}
pub enum RustlsConnectorServiceFuture<T, U> {
pub enum RustlsConnectorServiceFuture<R, IO> {
/// See issue <https://github.com/briansmith/webpki/issues/54>
InvalidDns,
Future {
connect: Connect<U>,
connection: Option<Connection<T, ()>>,
connect: RustlsConnect<IO>,
connection: Option<Connection<R, ()>>,
},
}
impl<T, U> Future for RustlsConnectorServiceFuture<T, U>
impl<R, IO> Future for RustlsConnectorServiceFuture<R, IO>
where
T: Address,
U: ActixStream,
R: Address,
IO: ActixStream,
{
type Output = Result<Connection<T, TlsStream<U>>, io::Error>;
type Output = Result<Connection<R, TlsStream<IO>>, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.get_mut() {