mirror of https://github.com/fafhrd91/actix-net
remove trust dns
This commit is contained in:
parent
6112a47529
commit
aea6051f4b
|
@ -9,9 +9,6 @@ use std::{fmt, thread};
|
|||
|
||||
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
|
||||
use tokio::sync::oneshot::{channel, error::RecvError as Canceled, Sender};
|
||||
// use futures_util::stream::FuturesUnordered;
|
||||
// use tokio::task::JoinHandle;
|
||||
// use tokio::stream::StreamExt;
|
||||
use tokio::task::LocalSet;
|
||||
|
||||
use crate::runtime::Runtime;
|
||||
|
@ -19,12 +16,6 @@ use crate::system::System;
|
|||
|
||||
thread_local!(
|
||||
static ADDR: RefCell<Option<Arbiter>> = RefCell::new(None);
|
||||
// TODO: Commented out code are for Arbiter::local_join function.
|
||||
// It can be safely removed if this function is not used in actix-*.
|
||||
//
|
||||
// /// stores join handle for spawned async tasks.
|
||||
// static HANDLE: RefCell<FuturesUnordered<JoinHandle<()>>> =
|
||||
// RefCell::new(FuturesUnordered::new());
|
||||
static STORAGE: RefCell<HashMap<TypeId, Box<dyn Any>>> = RefCell::new(HashMap::new());
|
||||
);
|
||||
|
||||
|
@ -154,11 +145,6 @@ impl Arbiter {
|
|||
where
|
||||
F: Future<Output = ()> + 'static,
|
||||
{
|
||||
// HANDLE.with(|handle| {
|
||||
// let handle = handle.borrow();
|
||||
// handle.push(tokio::task::spawn_local(future));
|
||||
// });
|
||||
// let _ = tokio::task::spawn_local(CleanupPending);
|
||||
let _ = tokio::task::spawn_local(future);
|
||||
}
|
||||
|
||||
|
@ -277,32 +263,12 @@ impl Arbiter {
|
|||
|
||||
/// Returns a future that will be completed once all currently spawned futures
|
||||
/// have completed.
|
||||
#[deprecated(since = "1.2.0", note = "Arbiter::local_join function is removed.")]
|
||||
#[deprecated(since = "2.0.0", note = "Arbiter::local_join function is removed.")]
|
||||
pub async fn local_join() {
|
||||
// let handle = HANDLE.with(|fut| std::mem::take(&mut *fut.borrow_mut()));
|
||||
// async move {
|
||||
// handle.collect::<Vec<_>>().await;
|
||||
// }
|
||||
unimplemented!("Arbiter::local_join function is removed.")
|
||||
}
|
||||
}
|
||||
|
||||
// /// Future used for cleaning-up already finished `JoinHandle`s
|
||||
// /// from the `PENDING` list so the vector doesn't grow indefinitely
|
||||
// struct CleanupPending;
|
||||
//
|
||||
// impl Future for CleanupPending {
|
||||
// type Output = ();
|
||||
//
|
||||
// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
// HANDLE.with(move |handle| {
|
||||
// recycle_join_handle(&mut *handle.borrow_mut(), cx);
|
||||
// });
|
||||
//
|
||||
// Poll::Ready(())
|
||||
// }
|
||||
// }
|
||||
|
||||
struct ArbiterController {
|
||||
rx: UnboundedReceiver<ArbiterCommand>,
|
||||
}
|
||||
|
@ -330,11 +296,6 @@ impl Future for ArbiterController {
|
|||
Poll::Ready(Some(item)) => match item {
|
||||
ArbiterCommand::Stop => return Poll::Ready(()),
|
||||
ArbiterCommand::Execute(fut) => {
|
||||
// HANDLE.with(|handle| {
|
||||
// let mut handle = handle.borrow_mut();
|
||||
// handle.push(tokio::task::spawn_local(fut));
|
||||
// recycle_join_handle(&mut *handle, cx);
|
||||
// });
|
||||
tokio::task::spawn_local(fut);
|
||||
}
|
||||
ArbiterCommand::ExecuteFn(f) => {
|
||||
|
@ -347,20 +308,6 @@ impl Future for ArbiterController {
|
|||
}
|
||||
}
|
||||
|
||||
// fn recycle_join_handle(handle: &mut FuturesUnordered<JoinHandle<()>>, cx: &mut Context<'_>) {
|
||||
// let _ = Pin::new(&mut *handle).poll_next(cx);
|
||||
//
|
||||
// // Try to recycle more join handles and free up memory.
|
||||
// //
|
||||
// // this is a guess. The yield limit for FuturesUnordered is 32.
|
||||
// // So poll an extra 3 times would make the total poll below 128.
|
||||
// if handle.len() > 64 {
|
||||
// (0..3).for_each(|_| {
|
||||
// let _ = Pin::new(&mut *handle).poll_next(cx);
|
||||
// })
|
||||
// }
|
||||
// }
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum SystemCommand {
|
||||
Exit(i32),
|
||||
|
|
|
@ -66,7 +66,7 @@ pub mod time {
|
|||
pub use tokio::time::{timeout, Timeout};
|
||||
}
|
||||
|
||||
/// Blocking task management.
|
||||
/// Task management.
|
||||
pub mod task {
|
||||
pub use tokio::task::{spawn_blocking, yield_now, JoinHandle};
|
||||
}
|
||||
|
|
|
@ -62,43 +62,6 @@ fn join_another_arbiter() {
|
|||
);
|
||||
}
|
||||
|
||||
// #[test]
|
||||
// fn join_current_arbiter() {
|
||||
// let time = Duration::from_secs(2);
|
||||
//
|
||||
// let instant = Instant::now();
|
||||
// actix_rt::System::new("test_join_current_arbiter").block_on(async move {
|
||||
// actix_rt::spawn(async move {
|
||||
// tokio::time::delay_for(time).await;
|
||||
// actix_rt::Arbiter::current().stop();
|
||||
// });
|
||||
// actix_rt::Arbiter::local_join().await;
|
||||
// });
|
||||
// assert!(
|
||||
// instant.elapsed() >= time,
|
||||
// "Join on current arbiter should wait for all spawned futures"
|
||||
// );
|
||||
//
|
||||
// let large_timer = Duration::from_secs(20);
|
||||
// let instant = Instant::now();
|
||||
// actix_rt::System::new("test_join_current_arbiter").block_on(async move {
|
||||
// actix_rt::spawn(async move {
|
||||
// tokio::time::delay_for(time).await;
|
||||
// actix_rt::Arbiter::current().stop();
|
||||
// });
|
||||
// let f = actix_rt::Arbiter::local_join();
|
||||
// actix_rt::spawn(async move {
|
||||
// tokio::time::delay_for(large_timer).await;
|
||||
// actix_rt::Arbiter::current().stop();
|
||||
// });
|
||||
// f.await;
|
||||
// });
|
||||
// assert!(
|
||||
// instant.elapsed() < large_timer,
|
||||
// "local_join should await only for the already spawned futures"
|
||||
// );
|
||||
// }
|
||||
|
||||
#[test]
|
||||
fn non_static_block_on() {
|
||||
let string = String::from("test_str");
|
||||
|
|
|
@ -24,7 +24,7 @@ default = []
|
|||
|
||||
[dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-rt = "2.0.0-beta.2"
|
||||
actix-rt = { version = "2.0.0-beta.2", default-features = false }
|
||||
actix-service = "2.0.0-beta.3"
|
||||
actix-utils = "3.0.0-beta.1"
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ default = ["accept", "connect", "uri"]
|
|||
accept = []
|
||||
|
||||
# enable connector services
|
||||
connect = ["trust-dns-proto/tokio-runtime", "trust-dns-resolver/tokio-runtime", "trust-dns-resolver/system-config"]
|
||||
connect = ["tokio/net"]
|
||||
|
||||
# use openssl impls
|
||||
openssl = ["tls-openssl", "tokio-openssl"]
|
||||
|
@ -45,19 +45,17 @@ uri = ["http"]
|
|||
|
||||
[dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-rt = "2.0.0-beta.2"
|
||||
actix-rt = { version = "2.0.0-beta.2", default-features = false }
|
||||
actix-service = "2.0.0-beta.3"
|
||||
actix-utils = "3.0.0-beta.1"
|
||||
|
||||
derive_more = "0.99.5"
|
||||
either = "1.6"
|
||||
futures-core = { version = "0.3.7", default-features = false }
|
||||
futures-util = { version = "0.3.7", default-features = false }
|
||||
http = { version = "0.2.2", optional = true }
|
||||
http = { version = "0.2.3", optional = true }
|
||||
log = "0.4"
|
||||
|
||||
# resolver
|
||||
trust-dns-proto = { version = "0.20.0", default-features = false, optional = true }
|
||||
trust-dns-resolver = { version = "0.20.0", default-features = false, optional = true }
|
||||
tokio = { version = "1", optional = true }
|
||||
|
||||
# openssl
|
||||
tls-openssl = { package = "openssl", version = "0.10", optional = true }
|
||||
|
@ -76,6 +74,7 @@ tls-native-tls = { package = "native-tls", version = "0.2", optional = true }
|
|||
tokio-native-tls = { version = "0.3", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.0.0-beta.2"
|
||||
actix-server = "2.0.0-beta.2"
|
||||
bytes = "1"
|
||||
env_logger = "0.8"
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::task::{Context, Poll};
|
|||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use actix_utils::counter::Counter;
|
||||
use futures_util::future::{ready, LocalBoxFuture, Ready};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
|
||||
pub use native_tls::Error;
|
||||
pub use tokio_native_tls::{TlsAcceptor, TlsStream};
|
||||
|
@ -44,15 +44,16 @@ where
|
|||
|
||||
type Service = NativeTlsAcceptorService;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Service, Self::InitError>>;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
MAX_CONN_COUNTER.with(|conns| {
|
||||
ready(Ok(NativeTlsAcceptorService {
|
||||
let res = MAX_CONN_COUNTER.with(|conns| {
|
||||
Ok(NativeTlsAcceptorService {
|
||||
acceptor: self.acceptor.clone(),
|
||||
conns: conns.clone(),
|
||||
}))
|
||||
})
|
||||
})
|
||||
});
|
||||
Box::pin(async { res })
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,10 +5,7 @@ use std::task::{Context, Poll};
|
|||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use actix_utils::counter::{Counter, CounterGuard};
|
||||
use futures_util::{
|
||||
future::{ready, Ready},
|
||||
ready,
|
||||
};
|
||||
use futures_core::{future::LocalBoxFuture, ready};
|
||||
|
||||
pub use openssl::ssl::{
|
||||
AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder,
|
||||
|
@ -50,15 +47,16 @@ where
|
|||
type Config = ();
|
||||
type Service = AcceptorService;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Service, Self::InitError>>;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
MAX_CONN_COUNTER.with(|conns| {
|
||||
ready(Ok(AcceptorService {
|
||||
let res = MAX_CONN_COUNTER.with(|conns| {
|
||||
Ok(AcceptorService {
|
||||
acceptor: self.acceptor.clone(),
|
||||
conns: conns.clone(),
|
||||
}))
|
||||
})
|
||||
})
|
||||
});
|
||||
Box::pin(async { res })
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
use std::future::Future;
|
||||
use std::io;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{
|
||||
future::Future,
|
||||
io,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use actix_utils::counter::{Counter, CounterGuard};
|
||||
use futures_util::future::{ready, Ready};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use tokio_rustls::{Accept, TlsAcceptor};
|
||||
|
||||
pub use rustls::{ServerConfig, Session};
|
||||
|
@ -52,15 +54,16 @@ where
|
|||
|
||||
type Service = AcceptorService;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Service, Self::InitError>>;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
MAX_CONN_COUNTER.with(|conns| {
|
||||
ready(Ok(AcceptorService {
|
||||
let res = MAX_CONN_COUNTER.with(|conns| {
|
||||
Ok(AcceptorService {
|
||||
acceptor: self.config.clone().into(),
|
||||
conns: conns.clone(),
|
||||
}))
|
||||
})
|
||||
})
|
||||
});
|
||||
Box::pin(async { res })
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,76 +1,48 @@
|
|||
use std::collections::VecDeque;
|
||||
use std::future::Future;
|
||||
use std::io;
|
||||
use std::marker::PhantomData;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use futures_util::future::{ready, Ready};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use log::{error, trace};
|
||||
|
||||
use super::connect::{Address, Connect, Connection};
|
||||
use super::error::ConnectError;
|
||||
|
||||
/// TCP connector service factory
|
||||
#[derive(Debug)]
|
||||
pub struct TcpConnectorFactory<T>(PhantomData<T>);
|
||||
|
||||
impl<T> TcpConnectorFactory<T> {
|
||||
pub fn new() -> Self {
|
||||
TcpConnectorFactory(PhantomData)
|
||||
}
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct TcpConnectorFactory;
|
||||
|
||||
impl TcpConnectorFactory {
|
||||
/// Create TCP connector service
|
||||
pub fn service(&self) -> TcpConnector<T> {
|
||||
TcpConnector(PhantomData)
|
||||
pub fn service(&self) -> TcpConnector {
|
||||
TcpConnector
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for TcpConnectorFactory<T> {
|
||||
fn default() -> Self {
|
||||
TcpConnectorFactory(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for TcpConnectorFactory<T> {
|
||||
fn clone(&self) -> Self {
|
||||
TcpConnectorFactory(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address> ServiceFactory<Connect<T>> for TcpConnectorFactory<T> {
|
||||
impl<T: Address> ServiceFactory<Connect<T>> for TcpConnectorFactory {
|
||||
type Response = Connection<T, TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Config = ();
|
||||
type Service = TcpConnector<T>;
|
||||
type Service = TcpConnector;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Service, Self::InitError>>;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
ready(Ok(self.service()))
|
||||
let service = self.service();
|
||||
Box::pin(async move { Ok(service) })
|
||||
}
|
||||
}
|
||||
|
||||
/// TCP connector service
|
||||
#[derive(Default, Debug)]
|
||||
pub struct TcpConnector<T>(PhantomData<T>);
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct TcpConnector;
|
||||
|
||||
impl<T> TcpConnector<T> {
|
||||
pub fn new() -> Self {
|
||||
TcpConnector(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for TcpConnector<T> {
|
||||
fn clone(&self) -> Self {
|
||||
TcpConnector(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address> Service<Connect<T>> for TcpConnector<T> {
|
||||
impl<T: Address> Service<Connect<T>> for TcpConnector {
|
||||
type Response = Connection<T, TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Future = TcpConnectorResponse<T>;
|
||||
|
@ -90,8 +62,6 @@ impl<T: Address> Service<Connect<T>> for TcpConnector<T> {
|
|||
}
|
||||
}
|
||||
|
||||
type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
|
||||
|
||||
#[doc(hidden)]
|
||||
/// TCP stream connector response future
|
||||
pub enum TcpConnectorResponse<T> {
|
||||
|
@ -165,7 +135,7 @@ impl<T: Address> Future for TcpConnectorResponse<T> {
|
|||
port,
|
||||
);
|
||||
if addrs.is_none() || addrs.as_ref().unwrap().is_empty() {
|
||||
return Poll::Ready(Err(err.into()));
|
||||
return Poll::Ready(Err(ConnectError::Io(err)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
use std::io;
|
||||
|
||||
use derive_more::{Display, From};
|
||||
use trust_dns_resolver::error::ResolveError;
|
||||
use derive_more::Display;
|
||||
|
||||
#[derive(Debug, From, Display)]
|
||||
#[derive(Debug, Display)]
|
||||
pub enum ConnectError {
|
||||
/// Failed to resolve the hostname
|
||||
#[display(fmt = "Failed resolving hostname: {}", _0)]
|
||||
Resolver(ResolveError),
|
||||
Resolver(Box<dyn std::error::Error>),
|
||||
|
||||
/// No dns records
|
||||
#[display(fmt = "No dns records found for the input")]
|
||||
|
|
|
@ -14,67 +14,26 @@ pub mod ssl;
|
|||
#[cfg(feature = "uri")]
|
||||
mod uri;
|
||||
|
||||
use actix_rt::{net::TcpStream, Arbiter};
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_service::{pipeline, pipeline_factory, Service, ServiceFactory};
|
||||
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
|
||||
use trust_dns_resolver::system_conf::read_system_conf;
|
||||
use trust_dns_resolver::TokioAsyncResolver as AsyncResolver;
|
||||
|
||||
pub mod resolver {
|
||||
pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
|
||||
pub use trust_dns_resolver::system_conf::read_system_conf;
|
||||
pub use trust_dns_resolver::{error::ResolveError, AsyncResolver};
|
||||
}
|
||||
|
||||
pub use self::connect::{Address, Connect, Connection};
|
||||
pub use self::connector::{TcpConnector, TcpConnectorFactory};
|
||||
pub use self::error::ConnectError;
|
||||
pub use self::resolve::{Resolver, ResolverFactory};
|
||||
pub use self::resolve::{Resolve, Resolver, ResolverFactory};
|
||||
pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService};
|
||||
|
||||
pub async fn start_resolver(
|
||||
cfg: ResolverConfig,
|
||||
opts: ResolverOpts,
|
||||
) -> Result<AsyncResolver, ConnectError> {
|
||||
Ok(AsyncResolver::tokio(cfg, opts)?)
|
||||
}
|
||||
|
||||
struct DefaultResolver(AsyncResolver);
|
||||
|
||||
pub(crate) async fn get_default_resolver() -> Result<AsyncResolver, ConnectError> {
|
||||
if Arbiter::contains_item::<DefaultResolver>() {
|
||||
Ok(Arbiter::get_item(|item: &DefaultResolver| item.0.clone()))
|
||||
} else {
|
||||
let (cfg, opts) = match read_system_conf() {
|
||||
Ok((cfg, opts)) => (cfg, opts),
|
||||
Err(e) => {
|
||||
log::error!("TRust-DNS can not load system config: {}", e);
|
||||
(ResolverConfig::default(), ResolverOpts::default())
|
||||
}
|
||||
};
|
||||
|
||||
let resolver = AsyncResolver::tokio(cfg, opts)?;
|
||||
|
||||
Arbiter::set_item(DefaultResolver(resolver.clone()));
|
||||
Ok(resolver)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_default_resolver() -> Result<AsyncResolver, ConnectError> {
|
||||
get_default_resolver().await
|
||||
}
|
||||
|
||||
/// Create TCP connector service.
|
||||
pub fn new_connector<T: Address + 'static>(
|
||||
resolver: AsyncResolver,
|
||||
resolver: Resolver,
|
||||
) -> impl Service<Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> + Clone
|
||||
{
|
||||
pipeline(Resolver::new(resolver)).and_then(TcpConnector::new())
|
||||
pipeline(resolver).and_then(TcpConnector)
|
||||
}
|
||||
|
||||
/// Create TCP connector service factory.
|
||||
pub fn new_connector_factory<T: Address + 'static>(
|
||||
resolver: AsyncResolver,
|
||||
resolver: Resolver,
|
||||
) -> impl ServiceFactory<
|
||||
Connect<T>,
|
||||
Config = (),
|
||||
|
@ -82,14 +41,14 @@ pub fn new_connector_factory<T: Address + 'static>(
|
|||
Error = ConnectError,
|
||||
InitError = (),
|
||||
> + Clone {
|
||||
pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory::new())
|
||||
pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory)
|
||||
}
|
||||
|
||||
/// Create connector service with default parameters.
|
||||
pub fn default_connector<T: Address + 'static>(
|
||||
) -> impl Service<Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> + Clone
|
||||
{
|
||||
pipeline(Resolver::default()).and_then(TcpConnector::new())
|
||||
new_connector(Resolver::Default)
|
||||
}
|
||||
|
||||
/// Create connector service factory with default parameters.
|
||||
|
@ -100,5 +59,5 @@ pub fn default_connector_factory<T: Address + 'static>() -> impl ServiceFactory<
|
|||
Error = ConnectError,
|
||||
InitError = (),
|
||||
> + Clone {
|
||||
pipeline_factory(ResolverFactory::default()).and_then(TcpConnectorFactory::new())
|
||||
new_connector_factory(Resolver::Default)
|
||||
}
|
||||
|
|
|
@ -1,184 +1,123 @@
|
|||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::net::SocketAddr;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{
|
||||
net::{SocketAddr, ToSocketAddrs},
|
||||
rc::Rc,
|
||||
task::Poll,
|
||||
};
|
||||
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use futures_util::future::{ok, Either, Ready};
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
use log::trace;
|
||||
use trust_dns_resolver::TokioAsyncResolver as AsyncResolver;
|
||||
use trust_dns_resolver::{error::ResolveError, lookup_ip::LookupIp};
|
||||
|
||||
use super::connect::{Address, Connect};
|
||||
use super::error::ConnectError;
|
||||
use super::get_default_resolver;
|
||||
|
||||
/// DNS Resolver Service factory
|
||||
pub struct ResolverFactory<T> {
|
||||
resolver: Option<AsyncResolver>,
|
||||
_t: PhantomData<T>,
|
||||
#[derive(Clone)]
|
||||
pub struct ResolverFactory {
|
||||
resolver: Resolver,
|
||||
}
|
||||
|
||||
impl<T> ResolverFactory<T> {
|
||||
/// Create new resolver instance with custom configuration and options.
|
||||
pub fn new(resolver: AsyncResolver) -> Self {
|
||||
ResolverFactory {
|
||||
resolver: Some(resolver),
|
||||
_t: PhantomData,
|
||||
}
|
||||
impl ResolverFactory {
|
||||
pub fn new(resolver: Resolver) -> Self {
|
||||
Self { resolver }
|
||||
}
|
||||
|
||||
pub fn service(&self) -> Resolver<T> {
|
||||
Resolver {
|
||||
resolver: self.resolver.clone(),
|
||||
_t: PhantomData,
|
||||
}
|
||||
pub fn service(&self) -> Resolver {
|
||||
self.resolver.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for ResolverFactory<T> {
|
||||
fn default() -> Self {
|
||||
ResolverFactory {
|
||||
resolver: None,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for ResolverFactory<T> {
|
||||
fn clone(&self) -> Self {
|
||||
ResolverFactory {
|
||||
resolver: self.resolver.clone(),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address> ServiceFactory<Connect<T>> for ResolverFactory<T> {
|
||||
impl<T: Address> ServiceFactory<Connect<T>> for ResolverFactory {
|
||||
type Response = Connect<T>;
|
||||
type Error = ConnectError;
|
||||
type Config = ();
|
||||
type Service = Resolver<T>;
|
||||
type Service = Resolver;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Service, Self::InitError>>;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
ok(self.service())
|
||||
let service = self.resolver.clone();
|
||||
Box::pin(async { Ok(service) })
|
||||
}
|
||||
}
|
||||
|
||||
/// DNS Resolver Service
|
||||
pub struct Resolver<T> {
|
||||
resolver: Option<AsyncResolver>,
|
||||
_t: PhantomData<T>,
|
||||
#[derive(Clone)]
|
||||
pub enum Resolver {
|
||||
Default,
|
||||
Custom(Rc<dyn Resolve>),
|
||||
}
|
||||
|
||||
impl<T> Resolver<T> {
|
||||
/// Create new resolver instance with custom configuration and options.
|
||||
pub fn new(resolver: AsyncResolver) -> Self {
|
||||
Resolver {
|
||||
resolver: Some(resolver),
|
||||
_t: PhantomData,
|
||||
/// trait for custom lookup with self defined resolver.
|
||||
pub trait Resolve {
|
||||
fn lookup(
|
||||
&self,
|
||||
addrs: Vec<SocketAddr>,
|
||||
) -> LocalBoxFuture<'_, Result<Vec<SocketAddr>, Box<dyn std::error::Error>>>;
|
||||
}
|
||||
|
||||
impl Resolver {
|
||||
/// Constructor for custom Resolve trait object and use it as resolver.
|
||||
pub fn new_custom(resolver: impl Resolve + 'static) -> Self {
|
||||
Self::Custom(Rc::new(resolver))
|
||||
}
|
||||
|
||||
async fn lookup<T: Address>(
|
||||
&self,
|
||||
req: &Connect<T>,
|
||||
host: String,
|
||||
) -> Result<Vec<SocketAddr>, ConnectError> {
|
||||
match self {
|
||||
Self::Default => {
|
||||
let res = tokio::net::lookup_host(host).await.map_err(|e| {
|
||||
trace!(
|
||||
"DNS resolver: failed to resolve host {:?} err: {}",
|
||||
req.host(),
|
||||
e
|
||||
);
|
||||
ConnectError::Resolver(Box::new(e))
|
||||
})?;
|
||||
|
||||
Ok(res.collect())
|
||||
}
|
||||
Self::Custom(resolver) => {
|
||||
let host = host
|
||||
.to_socket_addrs()
|
||||
.map_err(|e| ConnectError::Resolver(Box::new(e)))?
|
||||
.collect();
|
||||
resolver.lookup(host).await.map_err(ConnectError::Resolver)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Default for Resolver<T> {
|
||||
fn default() -> Self {
|
||||
Resolver {
|
||||
resolver: None,
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for Resolver<T> {
|
||||
fn clone(&self) -> Self {
|
||||
Resolver {
|
||||
resolver: self.resolver.clone(),
|
||||
_t: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address> Service<Connect<T>> for Resolver<T> {
|
||||
impl<T: Address> Service<Connect<T>> for Resolver {
|
||||
type Response = Connect<T>;
|
||||
type Error = ConnectError;
|
||||
#[allow(clippy::type_complexity)]
|
||||
type Future = Either<
|
||||
Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>,
|
||||
Ready<Result<Connect<T>, Self::Error>>,
|
||||
>;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
|
||||
|
||||
actix_service::always_ready!();
|
||||
|
||||
fn call(&mut self, mut req: Connect<T>) -> Self::Future {
|
||||
if req.addr.is_some() {
|
||||
Either::Right(ok(req))
|
||||
} else if let Ok(ip) = req.host().parse() {
|
||||
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
|
||||
Either::Right(ok(req))
|
||||
} else {
|
||||
let resolver = self.resolver.as_ref().map(AsyncResolver::clone);
|
||||
Either::Left(Box::pin(async move {
|
||||
let resolver = self.clone();
|
||||
Box::pin(async move {
|
||||
if req.addr.is_some() {
|
||||
Ok(req)
|
||||
} else if let Ok(ip) = req.host().parse() {
|
||||
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
|
||||
Ok(req)
|
||||
} else {
|
||||
trace!("DNS resolver: resolving host {:?}", req.host());
|
||||
let resolver = if let Some(resolver) = resolver {
|
||||
resolver
|
||||
|
||||
let host = if req.host().contains(':') {
|
||||
req.host().to_string()
|
||||
} else {
|
||||
get_default_resolver()
|
||||
.await
|
||||
.expect("Failed to get default resolver")
|
||||
format!("{}:{}", req.host(), req.port())
|
||||
};
|
||||
ResolverFuture::new(req, &resolver).await
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type LookupIpFuture = Pin<Box<dyn Future<Output = Result<LookupIp, ResolveError>>>>;
|
||||
let addrs = resolver.lookup(&req, host).await?;
|
||||
|
||||
#[doc(hidden)]
|
||||
/// Resolver future
|
||||
pub struct ResolverFuture<T: Address> {
|
||||
req: Option<Connect<T>>,
|
||||
lookup: LookupIpFuture,
|
||||
}
|
||||
|
||||
impl<T: Address> ResolverFuture<T> {
|
||||
pub fn new(req: Connect<T>, resolver: &AsyncResolver) -> Self {
|
||||
let host = if let Some(host) = req.host().splitn(2, ':').next() {
|
||||
host
|
||||
} else {
|
||||
req.host()
|
||||
};
|
||||
|
||||
// Clone data to be moved to the lookup future
|
||||
let host_clone = host.to_owned();
|
||||
let resolver_clone = resolver.clone();
|
||||
|
||||
ResolverFuture {
|
||||
lookup: Box::pin(async move {
|
||||
let resolver = resolver_clone;
|
||||
resolver.lookup_ip(host_clone).await
|
||||
}),
|
||||
req: Some(req),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address> Future for ResolverFuture<T> {
|
||||
type Output = Result<Connect<T>, ConnectError>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
match Pin::new(&mut this.lookup).poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(Ok(ips)) => {
|
||||
let req = this.req.take().unwrap();
|
||||
let port = req.port();
|
||||
let req = req.set_addrs(ips.iter().map(|ip| SocketAddr::new(ip, port)));
|
||||
let req = req.set_addrs(addrs);
|
||||
|
||||
trace!(
|
||||
"DNS resolver: host {:?} resolved to {:?}",
|
||||
|
@ -187,19 +126,11 @@ impl<T: Address> Future for ResolverFuture<T> {
|
|||
);
|
||||
|
||||
if req.addr.is_none() {
|
||||
Poll::Ready(Err(ConnectError::NoRecords))
|
||||
Err(ConnectError::NoRecords)
|
||||
} else {
|
||||
Poll::Ready(Ok(req))
|
||||
Ok(req)
|
||||
}
|
||||
}
|
||||
Poll::Ready(Err(e)) => {
|
||||
trace!(
|
||||
"DNS resolver: failed to resolve host {:?} err: {}",
|
||||
this.req.as_ref().unwrap().host(),
|
||||
e
|
||||
);
|
||||
Poll::Ready(Err(e.into()))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,38 +5,29 @@ use std::task::{Context, Poll};
|
|||
use actix_rt::net::TcpStream;
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use either::Either;
|
||||
use futures_util::future::{ok, Ready};
|
||||
use trust_dns_resolver::TokioAsyncResolver as AsyncResolver;
|
||||
use futures_core::future::LocalBoxFuture;
|
||||
|
||||
use super::connect::{Address, Connect, Connection};
|
||||
use super::connector::{TcpConnector, TcpConnectorFactory};
|
||||
use super::error::ConnectError;
|
||||
use super::resolve::{Resolver, ResolverFactory};
|
||||
|
||||
pub struct ConnectServiceFactory<T> {
|
||||
tcp: TcpConnectorFactory<T>,
|
||||
resolver: ResolverFactory<T>,
|
||||
pub struct ConnectServiceFactory {
|
||||
tcp: TcpConnectorFactory,
|
||||
resolver: ResolverFactory,
|
||||
}
|
||||
|
||||
impl<T> ConnectServiceFactory<T> {
|
||||
impl ConnectServiceFactory {
|
||||
/// Construct new ConnectService factory
|
||||
pub fn new() -> Self {
|
||||
pub fn new(resolver: Resolver) -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: TcpConnectorFactory::default(),
|
||||
resolver: ResolverFactory::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct new connect service with custom dns resolver
|
||||
pub fn with_resolver(resolver: AsyncResolver) -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: TcpConnectorFactory::default(),
|
||||
tcp: TcpConnectorFactory,
|
||||
resolver: ResolverFactory::new(resolver),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct new service
|
||||
pub fn service(&self) -> ConnectService<T> {
|
||||
pub fn service(&self) -> ConnectService {
|
||||
ConnectService {
|
||||
tcp: self.tcp.service(),
|
||||
resolver: self.resolver.service(),
|
||||
|
@ -44,7 +35,7 @@ impl<T> ConnectServiceFactory<T> {
|
|||
}
|
||||
|
||||
/// Construct new tcp stream service
|
||||
pub fn tcp_service(&self) -> TcpConnectService<T> {
|
||||
pub fn tcp_service(&self) -> TcpConnectService {
|
||||
TcpConnectService {
|
||||
tcp: self.tcp.service(),
|
||||
resolver: self.resolver.service(),
|
||||
|
@ -52,44 +43,36 @@ impl<T> ConnectServiceFactory<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Default for ConnectServiceFactory<T> {
|
||||
fn default() -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: TcpConnectorFactory::default(),
|
||||
resolver: ResolverFactory::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for ConnectServiceFactory<T> {
|
||||
impl Clone for ConnectServiceFactory {
|
||||
fn clone(&self) -> Self {
|
||||
ConnectServiceFactory {
|
||||
tcp: self.tcp.clone(),
|
||||
tcp: self.tcp,
|
||||
resolver: self.resolver.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Address> ServiceFactory<Connect<T>> for ConnectServiceFactory<T> {
|
||||
impl<T: Address> ServiceFactory<Connect<T>> for ConnectServiceFactory {
|
||||
type Response = Connection<T, TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Config = ();
|
||||
type Service = ConnectService<T>;
|
||||
type Service = ConnectService;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Service, Self::InitError>>;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
ok(self.service())
|
||||
let service = self.service();
|
||||
Box::pin(async move { Ok(service) })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ConnectService<T> {
|
||||
tcp: TcpConnector<T>,
|
||||
resolver: Resolver<T>,
|
||||
pub struct ConnectService {
|
||||
tcp: TcpConnector,
|
||||
resolver: Resolver,
|
||||
}
|
||||
|
||||
impl<T: Address> Service<Connect<T>> for ConnectService<T> {
|
||||
impl<T: Address> Service<Connect<T>> for ConnectService {
|
||||
type Response = Connection<T, TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Future = ConnectServiceResponse<T>;
|
||||
|
@ -99,14 +82,14 @@ impl<T: Address> Service<Connect<T>> for ConnectService<T> {
|
|||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
ConnectServiceResponse {
|
||||
state: ConnectState::Resolve(self.resolver.call(req)),
|
||||
tcp: self.tcp.clone(),
|
||||
tcp: self.tcp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum ConnectState<T: Address> {
|
||||
Resolve(<Resolver<T> as Service<Connect<T>>>::Future),
|
||||
Connect(<TcpConnector<T> as Service<Connect<T>>>::Future),
|
||||
Resolve(<Resolver as Service<Connect<T>>>::Future),
|
||||
Connect(<TcpConnector as Service<Connect<T>>>::Future),
|
||||
}
|
||||
|
||||
impl<T: Address> ConnectState<T> {
|
||||
|
@ -128,7 +111,7 @@ impl<T: Address> ConnectState<T> {
|
|||
|
||||
pub struct ConnectServiceResponse<T: Address> {
|
||||
state: ConnectState<T>,
|
||||
tcp: TcpConnector<T>,
|
||||
tcp: TcpConnector,
|
||||
}
|
||||
|
||||
impl<T: Address> Future for ConnectServiceResponse<T> {
|
||||
|
@ -151,12 +134,12 @@ impl<T: Address> Future for ConnectServiceResponse<T> {
|
|||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TcpConnectService<T> {
|
||||
tcp: TcpConnector<T>,
|
||||
resolver: Resolver<T>,
|
||||
pub struct TcpConnectService {
|
||||
tcp: TcpConnector,
|
||||
resolver: Resolver,
|
||||
}
|
||||
|
||||
impl<T: Address + 'static> Service<Connect<T>> for TcpConnectService<T> {
|
||||
impl<T: Address + 'static> Service<Connect<T>> for TcpConnectService {
|
||||
type Response = TcpStream;
|
||||
type Error = ConnectError;
|
||||
type Future = TcpConnectServiceResponse<T>;
|
||||
|
@ -166,14 +149,14 @@ impl<T: Address + 'static> Service<Connect<T>> for TcpConnectService<T> {
|
|||
fn call(&mut self, req: Connect<T>) -> Self::Future {
|
||||
TcpConnectServiceResponse {
|
||||
state: TcpConnectState::Resolve(self.resolver.call(req)),
|
||||
tcp: self.tcp.clone(),
|
||||
tcp: self.tcp,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum TcpConnectState<T: Address> {
|
||||
Resolve(<Resolver<T> as Service<Connect<T>>>::Future),
|
||||
Connect(<TcpConnector<T> as Service<Connect<T>>>::Future),
|
||||
Resolve(<Resolver as Service<Connect<T>>>::Future),
|
||||
Connect(<TcpConnector as Service<Connect<T>>>::Future),
|
||||
}
|
||||
|
||||
impl<T: Address> TcpConnectState<T> {
|
||||
|
@ -202,7 +185,7 @@ impl<T: Address> TcpConnectState<T> {
|
|||
|
||||
pub struct TcpConnectServiceResponse<T: Address> {
|
||||
state: TcpConnectState<T>,
|
||||
tcp: TcpConnector<T>,
|
||||
tcp: TcpConnector,
|
||||
}
|
||||
|
||||
impl<T: Address> Future for TcpConnectServiceResponse<T> {
|
||||
|
|
|
@ -1,23 +1,25 @@
|
|||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{fmt, io};
|
||||
use std::{
|
||||
fmt,
|
||||
future::Future,
|
||||
io,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use futures_util::{
|
||||
future::{ready, Either, Ready},
|
||||
ready,
|
||||
};
|
||||
use futures_core::{future::LocalBoxFuture, ready};
|
||||
use futures_util::future::{ready, Either, Ready};
|
||||
use log::trace;
|
||||
|
||||
pub use openssl::ssl::{Error as SslError, HandshakeError, SslConnector, SslMethod};
|
||||
pub use tokio_openssl::SslStream;
|
||||
use trust_dns_resolver::TokioAsyncResolver as AsyncResolver;
|
||||
|
||||
use crate::connect::resolve::Resolve;
|
||||
use crate::connect::{
|
||||
Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection,
|
||||
Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection, Resolver,
|
||||
};
|
||||
|
||||
/// OpenSSL connector factory
|
||||
|
@ -55,12 +57,11 @@ where
|
|||
type Config = ();
|
||||
type Service = OpensslConnectorService;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Service, Self::InitError>>;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
ready(Ok(OpensslConnectorService {
|
||||
connector: self.connector.clone(),
|
||||
}))
|
||||
let connector = self.connector.clone();
|
||||
Box::pin(async { Ok(OpensslConnectorService { connector }) })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,30 +140,30 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
pub struct OpensslConnectServiceFactory<T> {
|
||||
tcp: ConnectServiceFactory<T>,
|
||||
pub struct OpensslConnectServiceFactory {
|
||||
tcp: ConnectServiceFactory,
|
||||
openssl: OpensslConnector,
|
||||
}
|
||||
|
||||
impl<T> OpensslConnectServiceFactory<T> {
|
||||
impl OpensslConnectServiceFactory {
|
||||
/// Construct new OpensslConnectService factory
|
||||
pub fn new(connector: SslConnector) -> Self {
|
||||
OpensslConnectServiceFactory {
|
||||
tcp: ConnectServiceFactory::default(),
|
||||
tcp: ConnectServiceFactory::new(Resolver::Default),
|
||||
openssl: OpensslConnector::new(connector),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct new connect service with custom DNS resolver
|
||||
pub fn with_resolver(connector: SslConnector, resolver: AsyncResolver) -> Self {
|
||||
pub fn with_resolver(connector: SslConnector, resolver: impl Resolve + 'static) -> Self {
|
||||
OpensslConnectServiceFactory {
|
||||
tcp: ConnectServiceFactory::with_resolver(resolver),
|
||||
tcp: ConnectServiceFactory::new(Resolver::new_custom(resolver)),
|
||||
openssl: OpensslConnector::new(connector),
|
||||
}
|
||||
}
|
||||
|
||||
/// Construct OpenSSL connect service
|
||||
pub fn service(&self) -> OpensslConnectService<T> {
|
||||
pub fn service(&self) -> OpensslConnectService {
|
||||
OpensslConnectService {
|
||||
tcp: self.tcp.service(),
|
||||
openssl: OpensslConnectorService {
|
||||
|
@ -172,7 +173,7 @@ impl<T> OpensslConnectServiceFactory<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> Clone for OpensslConnectServiceFactory<T> {
|
||||
impl Clone for OpensslConnectServiceFactory {
|
||||
fn clone(&self) -> Self {
|
||||
OpensslConnectServiceFactory {
|
||||
tcp: self.tcp.clone(),
|
||||
|
@ -181,26 +182,27 @@ impl<T> Clone for OpensslConnectServiceFactory<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T: Address + 'static> ServiceFactory<Connect<T>> for OpensslConnectServiceFactory<T> {
|
||||
impl<T: Address + 'static> ServiceFactory<Connect<T>> for OpensslConnectServiceFactory {
|
||||
type Response = SslStream<TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Config = ();
|
||||
type Service = OpensslConnectService<T>;
|
||||
type Service = OpensslConnectService;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Service, Self::InitError>>;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
ready(Ok(self.service()))
|
||||
let service = self.service();
|
||||
Box::pin(async { Ok(service) })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct OpensslConnectService<T> {
|
||||
tcp: ConnectService<T>,
|
||||
pub struct OpensslConnectService {
|
||||
tcp: ConnectService,
|
||||
openssl: OpensslConnectorService,
|
||||
}
|
||||
|
||||
impl<T: Address + 'static> Service<Connect<T>> for OpensslConnectService<T> {
|
||||
impl<T: Address + 'static> Service<Connect<T>> for OpensslConnectService {
|
||||
type Response = SslStream<TcpStream>;
|
||||
type Error = ConnectError;
|
||||
type Future = OpensslConnectServiceResponse<T>;
|
||||
|
@ -217,7 +219,7 @@ impl<T: Address + 'static> Service<Connect<T>> for OpensslConnectService<T> {
|
|||
}
|
||||
|
||||
pub struct OpensslConnectServiceResponse<T: Address + 'static> {
|
||||
fut1: Option<<ConnectService<T> as Service<Connect<T>>>::Future>,
|
||||
fut1: Option<<ConnectService as Service<Connect<T>>>::Future>,
|
||||
fut2: Option<<OpensslConnectorService as Service<Connection<T, TcpStream>>>::Future>,
|
||||
openssl: OpensslConnectorService,
|
||||
}
|
||||
|
|
|
@ -1,18 +1,17 @@
|
|||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{
|
||||
fmt,
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
|
||||
pub use rustls::Session;
|
||||
pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig};
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use futures_util::{
|
||||
future::{ready, Ready},
|
||||
ready,
|
||||
};
|
||||
use futures_core::{future::LocalBoxFuture, ready};
|
||||
use log::trace;
|
||||
use tokio_rustls::{Connect, TlsConnector};
|
||||
use webpki::DNSNameRef;
|
||||
|
@ -53,12 +52,11 @@ where
|
|||
type Config = ();
|
||||
type Service = RustlsConnectorService;
|
||||
type InitError = ();
|
||||
type Future = Ready<Result<Self::Service, Self::InitError>>;
|
||||
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
|
||||
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
ready(Ok(RustlsConnectorService {
|
||||
connector: self.connector.clone(),
|
||||
}))
|
||||
let connector = self.connector.clone();
|
||||
Box::pin(async { Ok(RustlsConnectorService { connector }) })
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,11 +7,7 @@ use actix_service::{fn_service, Service, ServiceFactory};
|
|||
use bytes::Bytes;
|
||||
use futures_util::sink::SinkExt;
|
||||
|
||||
use actix_tls::connect::{
|
||||
self as actix_connect,
|
||||
resolver::{ResolverConfig, ResolverOpts},
|
||||
Connect,
|
||||
};
|
||||
use actix_tls::connect::{self as actix_connect, Connect};
|
||||
|
||||
#[cfg(all(feature = "connect", feature = "openssl"))]
|
||||
#[actix_rt::test]
|
||||
|
@ -57,14 +53,13 @@ async fn test_static_str() {
|
|||
})
|
||||
});
|
||||
|
||||
let resolver = actix_connect::start_default_resolver().await.unwrap();
|
||||
let mut conn = actix_connect::new_connector(resolver.clone());
|
||||
let mut conn = actix_connect::default_connector();
|
||||
|
||||
let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
|
||||
assert_eq!(con.peer_addr().unwrap(), srv.addr());
|
||||
|
||||
let connect = Connect::new(srv.host().to_owned());
|
||||
let mut conn = actix_connect::new_connector(resolver);
|
||||
let mut conn = actix_connect::default_connector();
|
||||
let con = conn.call(connect).await;
|
||||
assert!(con.is_err());
|
||||
}
|
||||
|
@ -79,12 +74,7 @@ async fn test_new_service() {
|
|||
})
|
||||
});
|
||||
|
||||
let resolver =
|
||||
actix_connect::start_resolver(ResolverConfig::default(), ResolverOpts::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let factory = actix_connect::new_connector_factory(resolver);
|
||||
let factory = actix_connect::default_connector_factory();
|
||||
|
||||
let mut conn = factory.new_service(()).await.unwrap();
|
||||
let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
|
||||
|
|
|
@ -17,7 +17,7 @@ path = "src/lib.rs"
|
|||
|
||||
[dependencies]
|
||||
actix-codec = "0.4.0-beta.1"
|
||||
actix-rt = "2.0.0-beta.2"
|
||||
actix-rt = { version = "2.0.0-beta.2", default-features = false }
|
||||
actix-service = "2.0.0-beta.3"
|
||||
|
||||
futures-core = { version = "0.3.7", default-features = false }
|
||||
|
|
Loading…
Reference in New Issue