mirror of https://github.com/fafhrd91/actix-net
remove either and futures-util. update lookup logic. rework connect mod. update doc
This commit is contained in:
parent
a068636e97
commit
8e30a1d643
|
@ -28,7 +28,7 @@ actix-rt = { version = "2.0.0-beta.2", default-features = false }
|
|||
actix-service = "2.0.0-beta.3"
|
||||
actix-utils = "3.0.0-beta.1"
|
||||
|
||||
futures-core = { version = "0.3.7", default-features = false }
|
||||
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
||||
log = "0.4"
|
||||
mio = { version = "0.7.6", features = ["os-poll", "net"] }
|
||||
num_cpus = "1.13"
|
||||
|
|
|
@ -2,9 +2,13 @@
|
|||
|
||||
## Unreleased - 2021-xx-xx
|
||||
* Remove `trust-dns-proto` and `trust-dns-resolver` [#248]
|
||||
* Use `tokio::net::lookup_host` for as simple and basic default resolver [#248]
|
||||
* Add `Resolve` trait for custom dns resolver. Add `Resolver::new_custom` function
|
||||
to construct custom resolvers.[#248]
|
||||
* Use `tokio::net::lookup_host` as simple and basic default resolver [#248]
|
||||
* Add `Resolve` trait for custom dns resolver. [#248]
|
||||
* Add `Resolver::new_custom` function to construct custom resolvers. [#248]
|
||||
* Export `webpki_roots::TLS_SERVER_ROOTS` in `actix_tls::connect` mod and remove
|
||||
the export from `actix_tls::accept` [#248]
|
||||
* Remove `ConnectTakeAddrsIter`. `Connect::take_addrs` would return
|
||||
`ConnectAddrsIter<'static>` as owned iterator. [#248]
|
||||
|
||||
[#248]: https://github.com/actix/actix-net/pull/248
|
||||
|
||||
|
|
|
@ -50,9 +50,7 @@ actix-service = "2.0.0-beta.3"
|
|||
actix-utils = "3.0.0-beta.1"
|
||||
|
||||
derive_more = "0.99.5"
|
||||
either = "1.6"
|
||||
futures-core = { version = "0.3.7", default-features = false }
|
||||
futures-util = { version = "0.3.7", default-features = false }
|
||||
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
|
||||
http = { version = "0.2.3", optional = true }
|
||||
log = "0.4"
|
||||
tokio = { version = "1", optional = true }
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
|
|
|
@ -14,7 +14,6 @@ use tokio_rustls::{Accept, TlsAcceptor};
|
|||
|
||||
pub use rustls::{ServerConfig, Session};
|
||||
pub use tokio_rustls::server::TlsStream;
|
||||
pub use webpki_roots::TLS_SERVER_ROOTS;
|
||||
|
||||
use super::MAX_CONN_COUNTER;
|
||||
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
use std::collections::{vec_deque, VecDeque};
|
||||
use std::fmt;
|
||||
use std::iter::{FromIterator, FusedIterator};
|
||||
use std::net::SocketAddr;
|
||||
|
||||
use either::Either;
|
||||
use std::{
|
||||
collections::{vec_deque, VecDeque},
|
||||
fmt,
|
||||
iter::{FromIterator, FusedIterator},
|
||||
net::SocketAddr,
|
||||
};
|
||||
|
||||
/// Connect request
|
||||
pub trait Address: Unpin + 'static {
|
||||
|
@ -39,7 +39,25 @@ impl Address for &'static str {
|
|||
pub struct Connect<T> {
|
||||
pub(crate) req: T,
|
||||
pub(crate) port: u16,
|
||||
pub(crate) addr: Option<Either<SocketAddr, VecDeque<SocketAddr>>>,
|
||||
pub(crate) addr: ConnectAddrs,
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq, Debug, Hash)]
|
||||
pub(crate) enum ConnectAddrs {
|
||||
One(Option<SocketAddr>),
|
||||
Multi(VecDeque<SocketAddr>),
|
||||
}
|
||||
|
||||
impl ConnectAddrs {
|
||||
pub(crate) fn is_none(&self) -> bool {
|
||||
matches!(*self, Self::One(None))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ConnectAddrs {
|
||||
fn default() -> Self {
|
||||
Self::One(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address> Connect<T> {
|
||||
|
@ -49,7 +67,7 @@ impl<T: Address> Connect<T> {
|
|||
Connect {
|
||||
req,
|
||||
port: port.unwrap_or(0),
|
||||
addr: None,
|
||||
addr: ConnectAddrs::One(None),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -59,7 +77,7 @@ impl<T: Address> Connect<T> {
|
|||
Connect {
|
||||
req,
|
||||
port: 0,
|
||||
addr: Some(Either::Left(addr)),
|
||||
addr: ConnectAddrs::One(Some(addr)),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -73,9 +91,7 @@ impl<T: Address> Connect<T> {
|
|||
|
||||
/// Use address.
|
||||
pub fn set_addr(mut self, addr: Option<SocketAddr>) -> Self {
|
||||
if let Some(addr) = addr {
|
||||
self.addr = Some(Either::Left(addr));
|
||||
}
|
||||
self.addr = ConnectAddrs::One(addr);
|
||||
self
|
||||
}
|
||||
|
||||
|
@ -86,9 +102,9 @@ impl<T: Address> Connect<T> {
|
|||
{
|
||||
let mut addrs = VecDeque::from_iter(addrs);
|
||||
self.addr = if addrs.len() < 2 {
|
||||
addrs.pop_front().map(Either::Left)
|
||||
ConnectAddrs::One(addrs.pop_front())
|
||||
} else {
|
||||
Some(Either::Right(addrs))
|
||||
ConnectAddrs::Multi(addrs)
|
||||
};
|
||||
self
|
||||
}
|
||||
|
@ -105,24 +121,18 @@ impl<T: Address> Connect<T> {
|
|||
|
||||
/// Pre-resolved addresses of the request.
|
||||
pub fn addrs(&self) -> ConnectAddrsIter<'_> {
|
||||
let inner = match self.addr {
|
||||
None => Either::Left(None),
|
||||
Some(Either::Left(addr)) => Either::Left(Some(addr)),
|
||||
Some(Either::Right(ref addrs)) => Either::Right(addrs.iter()),
|
||||
};
|
||||
|
||||
ConnectAddrsIter { inner }
|
||||
match self.addr {
|
||||
ConnectAddrs::One(addr) => ConnectAddrsIter::One(addr),
|
||||
ConnectAddrs::Multi(ref addrs) => ConnectAddrsIter::Multi(addrs.iter()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Takes pre-resolved addresses of the request.
|
||||
pub fn take_addrs(&mut self) -> ConnectTakeAddrsIter {
|
||||
let inner = match self.addr.take() {
|
||||
None => Either::Left(None),
|
||||
Some(Either::Left(addr)) => Either::Left(Some(addr)),
|
||||
Some(Either::Right(addrs)) => Either::Right(addrs.into_iter()),
|
||||
};
|
||||
|
||||
ConnectTakeAddrsIter { inner }
|
||||
pub fn take_addrs(&mut self) -> ConnectAddrsIter<'static> {
|
||||
match std::mem::take(&mut self.addr) {
|
||||
ConnectAddrs::One(addr) => ConnectAddrsIter::One(addr),
|
||||
ConnectAddrs::Multi(addrs) => ConnectAddrsIter::MultiOwned(addrs.into_iter()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -140,25 +150,29 @@ impl<T: Address> fmt::Display for Connect<T> {
|
|||
|
||||
/// Iterator over addresses in a [`Connect`] request.
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectAddrsIter<'a> {
|
||||
inner: Either<Option<SocketAddr>, vec_deque::Iter<'a, SocketAddr>>,
|
||||
pub enum ConnectAddrsIter<'a> {
|
||||
One(Option<SocketAddr>),
|
||||
Multi(vec_deque::Iter<'a, SocketAddr>),
|
||||
MultiOwned(vec_deque::IntoIter<SocketAddr>),
|
||||
}
|
||||
|
||||
impl Iterator for ConnectAddrsIter<'_> {
|
||||
type Item = SocketAddr;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self.inner {
|
||||
Either::Left(ref mut opt) => opt.take(),
|
||||
Either::Right(ref mut iter) => iter.next().copied(),
|
||||
match *self {
|
||||
Self::One(ref mut addr) => addr.take(),
|
||||
Self::Multi(ref mut iter) => iter.next().copied(),
|
||||
Self::MultiOwned(ref mut iter) => iter.next(),
|
||||
}
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
match self.inner {
|
||||
Either::Left(Some(_)) => (1, Some(1)),
|
||||
Either::Left(None) => (0, Some(0)),
|
||||
Either::Right(ref iter) => iter.size_hint(),
|
||||
match *self {
|
||||
Self::One(None) => (0, Some(0)),
|
||||
Self::One(Some(_)) => (1, Some(1)),
|
||||
Self::Multi(ref iter) => iter.size_hint(),
|
||||
Self::MultiOwned(ref iter) => iter.size_hint(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -173,35 +187,6 @@ impl ExactSizeIterator for ConnectAddrsIter<'_> {}
|
|||
|
||||
impl FusedIterator for ConnectAddrsIter<'_> {}
|
||||
|
||||
/// Owned iterator over addresses in a [`Connect`] request.
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectTakeAddrsIter {
|
||||
inner: Either<Option<SocketAddr>, vec_deque::IntoIter<SocketAddr>>,
|
||||
}
|
||||
|
||||
impl Iterator for ConnectTakeAddrsIter {
|
||||
type Item = SocketAddr;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
match self.inner {
|
||||
Either::Left(ref mut opt) => opt.take(),
|
||||
Either::Right(ref mut iter) => iter.next(),
|
||||
}
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
match self.inner {
|
||||
Either::Left(Some(_)) => (1, Some(1)),
|
||||
Either::Left(None) => (0, Some(0)),
|
||||
Either::Right(ref iter) => iter.size_hint(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ExactSizeIterator for ConnectTakeAddrsIter {}
|
||||
|
||||
impl FusedIterator for ConnectTakeAddrsIter {}
|
||||
|
||||
fn parse(host: &str) -> (&str, Option<u16>) {
|
||||
let mut parts_iter = host.splitn(2, ':');
|
||||
if let Some(host) = parts_iter.next() {
|
||||
|
|
|
@ -1,16 +1,18 @@
|
|||
use std::collections::VecDeque;
|
||||
use std::future::Future;
|
||||
use std::io;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
future::Future,
|
||||
io,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use log::{error, trace};
|
||||
|
||||
use super::connect::{Address, Connect, Connection};
|
||||
use super::connect::{Address, Connect, ConnectAddrs, Connection};
|
||||
use super::error::ConnectError;
|
||||
|
||||
/// TCP connector service factory
|
||||
|
@ -53,12 +55,7 @@ impl<T: Address> Service<Connect<T>> for TcpConnector {
|
|||
let port = req.port();
|
||||
let Connect { req, addr, .. } = req;
|
||||
|
||||
if let Some(addr) = addr {
|
||||
TcpConnectorResponse::new(req, port, addr)
|
||||
} else {
|
||||
error!("TCP connector: got unresolved address");
|
||||
TcpConnectorResponse::Error(Some(ConnectError::Unresolved))
|
||||
}
|
||||
TcpConnectorResponse::new(req, port, addr)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -75,11 +72,7 @@ pub enum TcpConnectorResponse<T> {
|
|||
}
|
||||
|
||||
impl<T: Address> TcpConnectorResponse<T> {
|
||||
pub fn new(
|
||||
req: T,
|
||||
port: u16,
|
||||
addr: either::Either<SocketAddr, VecDeque<SocketAddr>>,
|
||||
) -> TcpConnectorResponse<T> {
|
||||
pub(crate) fn new(req: T, port: u16, addr: ConnectAddrs) -> TcpConnectorResponse<T> {
|
||||
trace!(
|
||||
"TCP connector - connecting to {:?} port:{}",
|
||||
req.host(),
|
||||
|
@ -87,13 +80,19 @@ impl<T: Address> TcpConnectorResponse<T> {
|
|||
);
|
||||
|
||||
match addr {
|
||||
either::Either::Left(addr) => TcpConnectorResponse::Response {
|
||||
ConnectAddrs::One(None) => {
|
||||
error!("TCP connector: got unresolved address");
|
||||
TcpConnectorResponse::Error(Some(ConnectError::Unresolved))
|
||||
}
|
||||
ConnectAddrs::One(Some(addr)) => TcpConnectorResponse::Response {
|
||||
req: Some(req),
|
||||
port,
|
||||
addrs: None,
|
||||
stream: Some(Box::pin(TcpStream::connect(addr))),
|
||||
},
|
||||
either::Either::Right(addrs) => TcpConnectorResponse::Response {
|
||||
// when resolver returns multiple socket addr for request they would be popped from
|
||||
// front end of queue and returns with the first successful tcp connection.
|
||||
ConnectAddrs::Multi(addrs) => TcpConnectorResponse::Response {
|
||||
req: Some(req),
|
||||
port,
|
||||
addrs: Some(addrs),
|
||||
|
|
|
@ -4,6 +4,17 @@
|
|||
//!
|
||||
//! * `openssl` - enables TLS support via `openssl` crate
|
||||
//! * `rustls` - enables TLS support via `rustls` crate
|
||||
//!
|
||||
//! ## Workflow of connector service:
|
||||
//! - resolve [`Address`](self::connect::Address) with given [`Resolver`](self::resolve::Resolver)
|
||||
//! and collect [`SocketAddrs`](std::net::SocketAddr).
|
||||
//! - establish Tcp connection and return [`TcpStream`](tokio::net::TcpStream).
|
||||
//!
|
||||
//! ## Workflow of tls connector services:
|
||||
//! - Establish [`TcpStream`](tokio::net::TcpStream) with connector service.
|
||||
//! - Wrap around the stream and do connect handshake with remote address.
|
||||
//! - Return certain stream type impl [`AsyncRead`](tokio::io::AsyncRead) and
|
||||
//! [`AsyncWrite`](tokio::io::AsyncWrite)
|
||||
|
||||
mod connect;
|
||||
mod connector;
|
||||
|
|
|
@ -1,4 +1,10 @@
|
|||
use std::{net::SocketAddr, rc::Rc, task::Poll};
|
||||
use std::{
|
||||
future::Future,
|
||||
net::SocketAddr,
|
||||
pin::Pin,
|
||||
rc::Rc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
|
@ -89,11 +95,11 @@ pub enum Resolver {
|
|||
///
|
||||
/// // pass custom resolver to connector builder.
|
||||
/// // connector would then be usable as a service or awc's connector.
|
||||
/// let connector = actix_tls::connect::new_connector(resolver.clone());
|
||||
/// let connector = actix_tls::connect::new_connector::<&str>(resolver.clone());
|
||||
///
|
||||
/// // resolver can be passed to connector factory where returned service factory
|
||||
/// // can be used to construct new connector services.
|
||||
/// let factory = actix_tls::connect::new_connector_factory(resolver);
|
||||
/// let factory = actix_tls::connect::new_connector_factory::<&str>(resolver);
|
||||
///```
|
||||
pub trait Resolve {
|
||||
fn lookup<'a>(
|
||||
|
@ -115,22 +121,34 @@ impl Resolver {
|
|||
) -> Result<Vec<SocketAddr>, ConnectError> {
|
||||
match self {
|
||||
Self::Default => {
|
||||
let host = if req.host().contains(':') {
|
||||
req.host().to_string()
|
||||
let host = req.host();
|
||||
// TODO: Connect should always return host with port if possible.
|
||||
let host = if req
|
||||
.host()
|
||||
.splitn(2, ':')
|
||||
.last()
|
||||
.and_then(|p| p.parse::<u16>().ok())
|
||||
.map(|p| p == req.port())
|
||||
.unwrap_or(false)
|
||||
{
|
||||
host.to_string()
|
||||
} else {
|
||||
format!("{}:{}", req.host(), req.port())
|
||||
format!("{}:{}", host, req.port())
|
||||
};
|
||||
|
||||
let res = tokio::net::lookup_host(host).await.map_err(|e| {
|
||||
trace!(
|
||||
"DNS resolver: failed to resolve host {:?} err: {}",
|
||||
req.host(),
|
||||
e
|
||||
);
|
||||
ConnectError::Resolver(Box::new(e))
|
||||
})?;
|
||||
let res = tokio::net::lookup_host(host)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
trace!(
|
||||
"DNS resolver: failed to resolve host {:?} err: {}",
|
||||
req.host(),
|
||||
e
|
||||
);
|
||||
ConnectError::Resolver(Box::new(e))
|
||||
})?
|
||||
.collect();
|
||||
|
||||
Ok(res.collect())
|
||||
Ok(res)
|
||||
}
|
||||
Self::Custom(resolver) => resolver
|
||||
.lookup(req.host(), req.port())
|
||||
|
@ -143,21 +161,22 @@ impl Resolver {
|
|||
impl<T: Address> Service<Connect<T>> for Resolver {
|
||||
type Response = Connect<T>;
|
||||
type Error = ConnectError;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
type Future = ResolverFuture<T>;
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&mut self, mut req: Connect<T>) -> Self::Future {
|
||||
let resolver = self.clone();
|
||||
Box::pin(async move {
|
||||
if req.addr.is_some() {
|
||||
Ok(req)
|
||||
} else if let Ok(ip) = req.host().parse() {
|
||||
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
|
||||
Ok(req)
|
||||
} else {
|
||||
trace!("DNS resolver: resolving host {:?}", req.host());
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
if !req.addr.is_none() {
|
||||
ResolverFuture::Connected(Some(req))
|
||||
} else if let Ok(ip) = req.host().parse() {
|
||||
let addr = SocketAddr::new(ip, req.port());
|
||||
let req = req.set_addr(Some(addr));
|
||||
ResolverFuture::Connected(Some(req))
|
||||
} else {
|
||||
trace!("DNS resolver: resolving host {:?}", req.host());
|
||||
|
||||
let resolver = self.clone();
|
||||
ResolverFuture::Lookup(Box::pin(async move {
|
||||
let addrs = resolver.lookup(&req).await?;
|
||||
|
||||
let req = req.set_addrs(addrs);
|
||||
|
@ -173,7 +192,25 @@ impl<T: Address> Service<Connect<T>> for Resolver {
|
|||
} else {
|
||||
Ok(req)
|
||||
}
|
||||
}
|
||||
})
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum ResolverFuture<T: Address> {
|
||||
Connected(Option<Connect<T>>),
|
||||
Lookup(LocalBoxFuture<'static, Result<Connect<T>, ConnectError>>),
|
||||
}
|
||||
|
||||
impl<T: Address> Future for ResolverFuture<T> {
|
||||
type Output = Result<Connect<T>, ConnectError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.get_mut() {
|
||||
Self::Connected(conn) => Poll::Ready(Ok(conn
|
||||
.take()
|
||||
.expect("ResolverFuture polled after finished"))),
|
||||
Self::Lookup(fut) => fut.as_mut().poll(cx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use either::Either;
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use futures_core::{future::LocalBoxFuture, ready};
|
||||
|
||||
use super::connect::{Address, Connect, Connection};
|
||||
use super::connector::{TcpConnector, TcpConnectorFactory};
|
||||
|
@ -81,36 +82,42 @@ impl<T: Address> Service<Connect<T>> for ConnectService {
|
|||
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
ConnectServiceResponse {
|
||||
state: ConnectState::Resolve(self.resolver.call(req)),
|
||||
fut: ConnectFuture::Resolve(self.resolver.call(req)),
|
||||
tcp: self.tcp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum ConnectState<T: Address> {
|
||||
// helper enum to generic over futures of resolve and connect phase.
|
||||
pub(crate) enum ConnectFuture<T: Address> {
|
||||
Resolve(<Resolver as Service<Connect<T>>>::Future),
|
||||
Connect(<TcpConnector as Service<Connect<T>>>::Future),
|
||||
}
|
||||
|
||||
impl<T: Address> ConnectState<T> {
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn poll(
|
||||
// helper enum to contain the future output of ConnectFuture
|
||||
pub(crate) enum ConnectOutput<T: Address> {
|
||||
Resolved(Connect<T>),
|
||||
Connected(Connection<T, TcpStream>),
|
||||
}
|
||||
|
||||
impl<T: Address> ConnectFuture<T> {
|
||||
fn poll_connect(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Either<Poll<Result<Connection<T, TcpStream>, ConnectError>>, Connect<T>> {
|
||||
) -> Poll<Result<ConnectOutput<T>, ConnectError>> {
|
||||
match self {
|
||||
ConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) {
|
||||
Poll::Pending => Either::Left(Poll::Pending),
|
||||
Poll::Ready(Ok(res)) => Either::Right(res),
|
||||
Poll::Ready(Err(err)) => Either::Left(Poll::Ready(Err(err))),
|
||||
},
|
||||
ConnectState::Connect(ref mut fut) => Either::Left(Pin::new(fut).poll(cx)),
|
||||
ConnectFuture::Resolve(ref mut fut) => {
|
||||
Pin::new(fut).poll(cx).map_ok(ConnectOutput::Resolved)
|
||||
}
|
||||
ConnectFuture::Connect(ref mut fut) => {
|
||||
Pin::new(fut).poll(cx).map_ok(ConnectOutput::Connected)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConnectServiceResponse<T: Address> {
|
||||
state: ConnectState<T>,
|
||||
fut: ConnectFuture<T>,
|
||||
tcp: TcpConnector,
|
||||
}
|
||||
|
||||
|
@ -118,17 +125,13 @@ impl<T: Address> Future for ConnectServiceResponse<T> {
|
|||
type Output = Result<Connection<T, TcpStream>, ConnectError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let res = match self.state.poll(cx) {
|
||||
Either::Right(res) => {
|
||||
self.state = ConnectState::Connect(self.tcp.call(res));
|
||||
self.state.poll(cx)
|
||||
loop {
|
||||
match ready!(self.fut.poll_connect(cx))? {
|
||||
ConnectOutput::Resolved(res) => {
|
||||
self.fut = ConnectFuture::Connect(self.tcp.call(res));
|
||||
}
|
||||
ConnectOutput::Connected(res) => return Poll::Ready(Ok(res)),
|
||||
}
|
||||
Either::Left(res) => return res,
|
||||
};
|
||||
|
||||
match res {
|
||||
Either::Left(res) => res,
|
||||
Either::Right(_) => panic!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -139,7 +142,7 @@ pub struct TcpConnectService {
|
|||
resolver: Resolver,
|
||||
}
|
||||
|
||||
impl<T: Address + 'static> Service<Connect<T>> for TcpConnectService {
|
||||
impl<T: Address> Service<Connect<T>> for TcpConnectService {
|
||||
type Response = TcpStream;
|
||||
type Error = ConnectError;
|
||||
type Future = TcpConnectServiceResponse<T>;
|
||||
|
@ -148,43 +151,14 @@ impl<T: Address + 'static> Service<Connect<T>> for TcpConnectService {
|
|||
|
||||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
TcpConnectServiceResponse {
|
||||
state: TcpConnectState::Resolve(self.resolver.call(req)),
|
||||
fut: ConnectFuture::Resolve(self.resolver.call(req)),
|
||||
tcp: self.tcp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum TcpConnectState<T: Address> {
|
||||
Resolve(<Resolver as Service<Connect<T>>>::Future),
|
||||
Connect(<TcpConnector as Service<Connect<T>>>::Future),
|
||||
}
|
||||
|
||||
impl<T: Address> TcpConnectState<T> {
|
||||
fn poll(
|
||||
&mut self,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Either<Poll<Result<TcpStream, ConnectError>>, Connect<T>> {
|
||||
match self {
|
||||
TcpConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) {
|
||||
Poll::Pending => (),
|
||||
Poll::Ready(Ok(res)) => return Either::Right(res),
|
||||
Poll::Ready(Err(err)) => return Either::Left(Poll::Ready(Err(err))),
|
||||
},
|
||||
TcpConnectState::Connect(ref mut fut) => {
|
||||
if let Poll::Ready(res) = Pin::new(fut).poll(cx) {
|
||||
return match res {
|
||||
Ok(conn) => Either::Left(Poll::Ready(Ok(conn.into_parts().0))),
|
||||
Err(err) => Either::Left(Poll::Ready(Err(err))),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
Either::Left(Poll::Pending)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TcpConnectServiceResponse<T: Address> {
|
||||
state: TcpConnectState<T>,
|
||||
fut: ConnectFuture<T>,
|
||||
tcp: TcpConnector,
|
||||
}
|
||||
|
||||
|
@ -192,17 +166,13 @@ impl<T: Address> Future for TcpConnectServiceResponse<T> {
|
|||
type Output = Result<TcpStream, ConnectError>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let res = match self.state.poll(cx) {
|
||||
Either::Right(res) => {
|
||||
self.state = TcpConnectState::Connect(self.tcp.call(res));
|
||||
self.state.poll(cx)
|
||||
loop {
|
||||
match ready!(self.fut.poll_connect(cx))? {
|
||||
ConnectOutput::Resolved(res) => {
|
||||
self.fut = ConnectFuture::Connect(self.tcp.call(res));
|
||||
}
|
||||
ConnectOutput::Connected(conn) => return Poll::Ready(Ok(conn.into_parts().0)),
|
||||
}
|
||||
Either::Left(res) => return res,
|
||||
};
|
||||
|
||||
match res {
|
||||
Either::Left(res) => res,
|
||||
Either::Right(_) => panic!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ use std::{
|
|||
fmt,
|
||||
future::Future,
|
||||
io,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
@ -11,7 +10,6 @@ use actix_codec::{AsyncRead, AsyncWrite};
|
|||
use actix_rt::net::TcpStream;
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use futures_core::{future::LocalBoxFuture, ready};
|
||||
use futures_util::future::{ready, Either, Ready};
|
||||
use log::trace;
|
||||
|
||||
pub use openssl::ssl::{Error as SslError, HandshakeError, SslConnector, SslMethod};
|
||||
|
@ -82,29 +80,27 @@ where
|
|||
{
|
||||
type Response = Connection<T, SslStream<U>>;
|
||||
type Error = io::Error;
|
||||
#[allow(clippy::type_complexity)]
|
||||
type Future = Either<ConnectAsyncExt<T, U>, Ready<Result<Self::Response, Self::Error>>>;
|
||||
type Future = ConnectAsyncExt<T, U>;
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
|
||||
trace!("SSL Handshake start for: {:?}", stream.host());
|
||||
let (io, stream) = stream.replace(());
|
||||
let host = stream.host().to_string();
|
||||
let host = stream.host();
|
||||
|
||||
match self.connector.configure() {
|
||||
Err(e) => Either::Right(ready(Err(io::Error::new(io::ErrorKind::Other, e)))),
|
||||
Ok(config) => {
|
||||
let ssl = config
|
||||
.into_ssl(&host)
|
||||
.expect("SSL connect configuration was invalid.");
|
||||
let config = self
|
||||
.connector
|
||||
.configure()
|
||||
.expect("SSL connect configuration was invalid.");
|
||||
|
||||
Either::Left(ConnectAsyncExt {
|
||||
io: Some(SslStream::new(ssl, io).unwrap()),
|
||||
stream: Some(stream),
|
||||
_t: PhantomData,
|
||||
})
|
||||
}
|
||||
let ssl = config
|
||||
.into_ssl(host)
|
||||
.expect("SSL connect configuration was invalid.");
|
||||
|
||||
ConnectAsyncExt {
|
||||
io: Some(SslStream::new(ssl, io).unwrap()),
|
||||
stream: Some(stream),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -112,7 +108,6 @@ where
|
|||
pub struct ConnectAsyncExt<T, U> {
|
||||
io: Option<SslStream<U>>,
|
||||
stream: Option<Connection<T, ()>>,
|
||||
_t: PhantomData<U>,
|
||||
}
|
||||
|
||||
impl<T: Address, U> Future for ConnectAsyncExt<T, U>
|
||||
|
|
|
@ -8,6 +8,7 @@ use std::{
|
|||
|
||||
pub use rustls::Session;
|
||||
pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig};
|
||||
pub use webpki_roots::TLS_SERVER_ROOTS;
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
|
|
|
@ -97,7 +97,7 @@ async fn test_custom_resolver() {
|
|||
|
||||
struct MyResolver {
|
||||
trust_dns: TokioAsyncResolver,
|
||||
};
|
||||
}
|
||||
|
||||
impl Resolve for MyResolver {
|
||||
fn lookup<'a>(
|
||||
|
|
Loading…
Reference in New Issue