diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 7aae7cd2..95b40b25 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -9,9 +9,6 @@ use std::{fmt, thread}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot::{channel, error::RecvError as Canceled, Sender}; -// use futures_util::stream::FuturesUnordered; -// use tokio::task::JoinHandle; -// use tokio::stream::StreamExt; use tokio::task::LocalSet; use crate::runtime::Runtime; @@ -19,12 +16,6 @@ use crate::system::System; thread_local!( static ADDR: RefCell> = RefCell::new(None); - // TODO: Commented out code are for Arbiter::local_join function. - // It can be safely removed if this function is not used in actix-*. - // - // /// stores join handle for spawned async tasks. - // static HANDLE: RefCell>> = - // RefCell::new(FuturesUnordered::new()); static STORAGE: RefCell>> = RefCell::new(HashMap::new()); ); @@ -154,11 +145,6 @@ impl Arbiter { where F: Future + 'static, { - // HANDLE.with(|handle| { - // let handle = handle.borrow(); - // handle.push(tokio::task::spawn_local(future)); - // }); - // let _ = tokio::task::spawn_local(CleanupPending); let _ = tokio::task::spawn_local(future); } @@ -277,32 +263,12 @@ impl Arbiter { /// Returns a future that will be completed once all currently spawned futures /// have completed. - #[deprecated(since = "1.2.0", note = "Arbiter::local_join function is removed.")] + #[deprecated(since = "2.0.0", note = "Arbiter::local_join function is removed.")] pub async fn local_join() { - // let handle = HANDLE.with(|fut| std::mem::take(&mut *fut.borrow_mut())); - // async move { - // handle.collect::>().await; - // } unimplemented!("Arbiter::local_join function is removed.") } } -// /// Future used for cleaning-up already finished `JoinHandle`s -// /// from the `PENDING` list so the vector doesn't grow indefinitely -// struct CleanupPending; -// -// impl Future for CleanupPending { -// type Output = (); -// -// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { -// HANDLE.with(move |handle| { -// recycle_join_handle(&mut *handle.borrow_mut(), cx); -// }); -// -// Poll::Ready(()) -// } -// } - struct ArbiterController { rx: UnboundedReceiver, } @@ -330,11 +296,6 @@ impl Future for ArbiterController { Poll::Ready(Some(item)) => match item { ArbiterCommand::Stop => return Poll::Ready(()), ArbiterCommand::Execute(fut) => { - // HANDLE.with(|handle| { - // let mut handle = handle.borrow_mut(); - // handle.push(tokio::task::spawn_local(fut)); - // recycle_join_handle(&mut *handle, cx); - // }); tokio::task::spawn_local(fut); } ArbiterCommand::ExecuteFn(f) => { @@ -347,20 +308,6 @@ impl Future for ArbiterController { } } -// fn recycle_join_handle(handle: &mut FuturesUnordered>, cx: &mut Context<'_>) { -// let _ = Pin::new(&mut *handle).poll_next(cx); -// -// // Try to recycle more join handles and free up memory. -// // -// // this is a guess. The yield limit for FuturesUnordered is 32. -// // So poll an extra 3 times would make the total poll below 128. -// if handle.len() > 64 { -// (0..3).for_each(|_| { -// let _ = Pin::new(&mut *handle).poll_next(cx); -// }) -// } -// } - #[derive(Debug)] pub(crate) enum SystemCommand { Exit(i32), diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index 30fa2d78..6e9d0464 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -66,7 +66,7 @@ pub mod time { pub use tokio::time::{timeout, Timeout}; } -/// Blocking task management. +/// Task management. pub mod task { pub use tokio::task::{spawn_blocking, yield_now, JoinHandle}; } diff --git a/actix-rt/tests/integration_tests.rs b/actix-rt/tests/integration_tests.rs index 12ceb4ef..f338602d 100644 --- a/actix-rt/tests/integration_tests.rs +++ b/actix-rt/tests/integration_tests.rs @@ -62,43 +62,6 @@ fn join_another_arbiter() { ); } -// #[test] -// fn join_current_arbiter() { -// let time = Duration::from_secs(2); -// -// let instant = Instant::now(); -// actix_rt::System::new("test_join_current_arbiter").block_on(async move { -// actix_rt::spawn(async move { -// tokio::time::delay_for(time).await; -// actix_rt::Arbiter::current().stop(); -// }); -// actix_rt::Arbiter::local_join().await; -// }); -// assert!( -// instant.elapsed() >= time, -// "Join on current arbiter should wait for all spawned futures" -// ); -// -// let large_timer = Duration::from_secs(20); -// let instant = Instant::now(); -// actix_rt::System::new("test_join_current_arbiter").block_on(async move { -// actix_rt::spawn(async move { -// tokio::time::delay_for(time).await; -// actix_rt::Arbiter::current().stop(); -// }); -// let f = actix_rt::Arbiter::local_join(); -// actix_rt::spawn(async move { -// tokio::time::delay_for(large_timer).await; -// actix_rt::Arbiter::current().stop(); -// }); -// f.await; -// }); -// assert!( -// instant.elapsed() < large_timer, -// "local_join should await only for the already spawned futures" -// ); -// } - #[test] fn non_static_block_on() { let string = String::from("test_str"); diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index ead85de0..33bf3b6a 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -24,7 +24,7 @@ default = [] [dependencies] actix-codec = "0.4.0-beta.1" -actix-rt = "2.0.0-beta.2" +actix-rt = { version = "2.0.0-beta.2", default-features = false } actix-service = "2.0.0-beta.3" actix-utils = "3.0.0-beta.1" diff --git a/actix-tls/Cargo.toml b/actix-tls/Cargo.toml index 8f23d60c..9004ded8 100644 --- a/actix-tls/Cargo.toml +++ b/actix-tls/Cargo.toml @@ -29,7 +29,7 @@ default = ["accept", "connect", "uri"] accept = [] # enable connector services -connect = ["trust-dns-proto/tokio-runtime", "trust-dns-resolver/tokio-runtime", "trust-dns-resolver/system-config"] +connect = ["tokio/net"] # use openssl impls openssl = ["tls-openssl", "tokio-openssl"] @@ -45,19 +45,17 @@ uri = ["http"] [dependencies] actix-codec = "0.4.0-beta.1" -actix-rt = "2.0.0-beta.2" +actix-rt = { version = "2.0.0-beta.2", default-features = false } actix-service = "2.0.0-beta.3" actix-utils = "3.0.0-beta.1" derive_more = "0.99.5" either = "1.6" +futures-core = { version = "0.3.7", default-features = false } futures-util = { version = "0.3.7", default-features = false } -http = { version = "0.2.2", optional = true } +http = { version = "0.2.3", optional = true } log = "0.4" - -# resolver -trust-dns-proto = { version = "0.20.0", default-features = false, optional = true } -trust-dns-resolver = { version = "0.20.0", default-features = false, optional = true } +tokio = { version = "1", optional = true } # openssl tls-openssl = { package = "openssl", version = "0.10", optional = true } @@ -76,6 +74,7 @@ tls-native-tls = { package = "native-tls", version = "0.2", optional = true } tokio-native-tls = { version = "0.3", optional = true } [dev-dependencies] +actix-rt = "2.0.0-beta.2" actix-server = "2.0.0-beta.2" bytes = "1" env_logger = "0.8" diff --git a/actix-tls/src/accept/nativetls.rs b/actix-tls/src/accept/nativetls.rs index 5d80ce8b..9b2aeefb 100644 --- a/actix-tls/src/accept/nativetls.rs +++ b/actix-tls/src/accept/nativetls.rs @@ -3,7 +3,7 @@ use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; use actix_utils::counter::Counter; -use futures_util::future::{ready, LocalBoxFuture, Ready}; +use futures_core::future::LocalBoxFuture; pub use native_tls::Error; pub use tokio_native_tls::{TlsAcceptor, TlsStream}; @@ -44,15 +44,16 @@ where type Service = NativeTlsAcceptorService; type InitError = (); - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - MAX_CONN_COUNTER.with(|conns| { - ready(Ok(NativeTlsAcceptorService { + let res = MAX_CONN_COUNTER.with(|conns| { + Ok(NativeTlsAcceptorService { acceptor: self.acceptor.clone(), conns: conns.clone(), - })) - }) + }) + }); + Box::pin(async { res }) } } diff --git a/actix-tls/src/accept/openssl.rs b/actix-tls/src/accept/openssl.rs index efda5c38..19f14cb3 100644 --- a/actix-tls/src/accept/openssl.rs +++ b/actix-tls/src/accept/openssl.rs @@ -5,10 +5,7 @@ use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; use actix_utils::counter::{Counter, CounterGuard}; -use futures_util::{ - future::{ready, Ready}, - ready, -}; +use futures_core::{future::LocalBoxFuture, ready}; pub use openssl::ssl::{ AlpnError, Error as SslError, HandshakeError, Ssl, SslAcceptor, SslAcceptorBuilder, @@ -50,15 +47,16 @@ where type Config = (); type Service = AcceptorService; type InitError = (); - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - MAX_CONN_COUNTER.with(|conns| { - ready(Ok(AcceptorService { + let res = MAX_CONN_COUNTER.with(|conns| { + Ok(AcceptorService { acceptor: self.acceptor.clone(), conns: conns.clone(), - })) - }) + }) + }); + Box::pin(async { res }) } } diff --git a/actix-tls/src/accept/rustls.rs b/actix-tls/src/accept/rustls.rs index a6686f44..480c20c2 100644 --- a/actix-tls/src/accept/rustls.rs +++ b/actix-tls/src/accept/rustls.rs @@ -1,13 +1,15 @@ -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; +use std::{ + future::Future, + io, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; use actix_utils::counter::{Counter, CounterGuard}; -use futures_util::future::{ready, Ready}; +use futures_core::future::LocalBoxFuture; use tokio_rustls::{Accept, TlsAcceptor}; pub use rustls::{ServerConfig, Session}; @@ -52,15 +54,16 @@ where type Service = AcceptorService; type InitError = (); - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - MAX_CONN_COUNTER.with(|conns| { - ready(Ok(AcceptorService { + let res = MAX_CONN_COUNTER.with(|conns| { + Ok(AcceptorService { acceptor: self.config.clone().into(), conns: conns.clone(), - })) - }) + }) + }); + Box::pin(async { res }) } } diff --git a/actix-tls/src/connect/connector.rs b/actix-tls/src/connect/connector.rs index a0a6b8b5..ea683121 100644 --- a/actix-tls/src/connect/connector.rs +++ b/actix-tls/src/connect/connector.rs @@ -1,76 +1,48 @@ use std::collections::VecDeque; use std::future::Future; use std::io; -use std::marker::PhantomData; use std::net::SocketAddr; use std::pin::Pin; use std::task::{Context, Poll}; use actix_rt::net::TcpStream; use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ready, Ready}; +use futures_core::future::LocalBoxFuture; use log::{error, trace}; use super::connect::{Address, Connect, Connection}; use super::error::ConnectError; /// TCP connector service factory -#[derive(Debug)] -pub struct TcpConnectorFactory(PhantomData); - -impl TcpConnectorFactory { - pub fn new() -> Self { - TcpConnectorFactory(PhantomData) - } +#[derive(Copy, Clone, Debug)] +pub struct TcpConnectorFactory; +impl TcpConnectorFactory { /// Create TCP connector service - pub fn service(&self) -> TcpConnector { - TcpConnector(PhantomData) + pub fn service(&self) -> TcpConnector { + TcpConnector } } -impl Default for TcpConnectorFactory { - fn default() -> Self { - TcpConnectorFactory(PhantomData) - } -} - -impl Clone for TcpConnectorFactory { - fn clone(&self) -> Self { - TcpConnectorFactory(PhantomData) - } -} - -impl ServiceFactory> for TcpConnectorFactory { +impl ServiceFactory> for TcpConnectorFactory { type Response = Connection; type Error = ConnectError; type Config = (); - type Service = TcpConnector; + type Service = TcpConnector; type InitError = (); - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - ready(Ok(self.service())) + let service = self.service(); + Box::pin(async move { Ok(service) }) } } /// TCP connector service -#[derive(Default, Debug)] -pub struct TcpConnector(PhantomData); +#[derive(Copy, Clone, Debug)] +pub struct TcpConnector; -impl TcpConnector { - pub fn new() -> Self { - TcpConnector(PhantomData) - } -} - -impl Clone for TcpConnector { - fn clone(&self) -> Self { - TcpConnector(PhantomData) - } -} - -impl Service> for TcpConnector { +impl Service> for TcpConnector { type Response = Connection; type Error = ConnectError; type Future = TcpConnectorResponse; @@ -90,8 +62,6 @@ impl Service> for TcpConnector { } } -type LocalBoxFuture<'a, T> = Pin + 'a>>; - #[doc(hidden)] /// TCP stream connector response future pub enum TcpConnectorResponse { @@ -165,7 +135,7 @@ impl Future for TcpConnectorResponse { port, ); if addrs.is_none() || addrs.as_ref().unwrap().is_empty() { - return Poll::Ready(Err(err.into())); + return Poll::Ready(Err(ConnectError::Io(err))); } } } diff --git a/actix-tls/src/connect/error.rs b/actix-tls/src/connect/error.rs index 84b363dc..5d8cb9db 100644 --- a/actix-tls/src/connect/error.rs +++ b/actix-tls/src/connect/error.rs @@ -1,13 +1,12 @@ use std::io; -use derive_more::{Display, From}; -use trust_dns_resolver::error::ResolveError; +use derive_more::Display; -#[derive(Debug, From, Display)] +#[derive(Debug, Display)] pub enum ConnectError { /// Failed to resolve the hostname #[display(fmt = "Failed resolving hostname: {}", _0)] - Resolver(ResolveError), + Resolver(Box), /// No dns records #[display(fmt = "No dns records found for the input")] diff --git a/actix-tls/src/connect/mod.rs b/actix-tls/src/connect/mod.rs index 75312c59..a9c675f6 100644 --- a/actix-tls/src/connect/mod.rs +++ b/actix-tls/src/connect/mod.rs @@ -14,67 +14,26 @@ pub mod ssl; #[cfg(feature = "uri")] mod uri; -use actix_rt::{net::TcpStream, Arbiter}; +use actix_rt::net::TcpStream; use actix_service::{pipeline, pipeline_factory, Service, ServiceFactory}; -use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; -use trust_dns_resolver::system_conf::read_system_conf; -use trust_dns_resolver::TokioAsyncResolver as AsyncResolver; - -pub mod resolver { - pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; - pub use trust_dns_resolver::system_conf::read_system_conf; - pub use trust_dns_resolver::{error::ResolveError, AsyncResolver}; -} pub use self::connect::{Address, Connect, Connection}; pub use self::connector::{TcpConnector, TcpConnectorFactory}; pub use self::error::ConnectError; -pub use self::resolve::{Resolver, ResolverFactory}; +pub use self::resolve::{Resolve, Resolver, ResolverFactory}; pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService}; -pub async fn start_resolver( - cfg: ResolverConfig, - opts: ResolverOpts, -) -> Result { - Ok(AsyncResolver::tokio(cfg, opts)?) -} - -struct DefaultResolver(AsyncResolver); - -pub(crate) async fn get_default_resolver() -> Result { - if Arbiter::contains_item::() { - Ok(Arbiter::get_item(|item: &DefaultResolver| item.0.clone())) - } else { - let (cfg, opts) = match read_system_conf() { - Ok((cfg, opts)) => (cfg, opts), - Err(e) => { - log::error!("TRust-DNS can not load system config: {}", e); - (ResolverConfig::default(), ResolverOpts::default()) - } - }; - - let resolver = AsyncResolver::tokio(cfg, opts)?; - - Arbiter::set_item(DefaultResolver(resolver.clone())); - Ok(resolver) - } -} - -pub async fn start_default_resolver() -> Result { - get_default_resolver().await -} - /// Create TCP connector service. pub fn new_connector( - resolver: AsyncResolver, + resolver: Resolver, ) -> impl Service, Response = Connection, Error = ConnectError> + Clone { - pipeline(Resolver::new(resolver)).and_then(TcpConnector::new()) + pipeline(resolver).and_then(TcpConnector) } /// Create TCP connector service factory. pub fn new_connector_factory( - resolver: AsyncResolver, + resolver: Resolver, ) -> impl ServiceFactory< Connect, Config = (), @@ -82,14 +41,14 @@ pub fn new_connector_factory( Error = ConnectError, InitError = (), > + Clone { - pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory::new()) + pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory) } /// Create connector service with default parameters. pub fn default_connector( ) -> impl Service, Response = Connection, Error = ConnectError> + Clone { - pipeline(Resolver::default()).and_then(TcpConnector::new()) + new_connector(Resolver::Default) } /// Create connector service factory with default parameters. @@ -100,5 +59,5 @@ pub fn default_connector_factory() -> impl ServiceFactory< Error = ConnectError, InitError = (), > + Clone { - pipeline_factory(ResolverFactory::default()).and_then(TcpConnectorFactory::new()) + new_connector_factory(Resolver::Default) } diff --git a/actix-tls/src/connect/resolve.rs b/actix-tls/src/connect/resolve.rs index 61535faa..6d31f461 100644 --- a/actix-tls/src/connect/resolve.rs +++ b/actix-tls/src/connect/resolve.rs @@ -1,184 +1,123 @@ -use std::future::Future; -use std::marker::PhantomData; -use std::net::SocketAddr; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{ + net::{SocketAddr, ToSocketAddrs}, + rc::Rc, + task::Poll, +}; use actix_service::{Service, ServiceFactory}; -use futures_util::future::{ok, Either, Ready}; +use futures_core::future::LocalBoxFuture; use log::trace; -use trust_dns_resolver::TokioAsyncResolver as AsyncResolver; -use trust_dns_resolver::{error::ResolveError, lookup_ip::LookupIp}; use super::connect::{Address, Connect}; use super::error::ConnectError; -use super::get_default_resolver; /// DNS Resolver Service factory -pub struct ResolverFactory { - resolver: Option, - _t: PhantomData, +#[derive(Clone)] +pub struct ResolverFactory { + resolver: Resolver, } -impl ResolverFactory { - /// Create new resolver instance with custom configuration and options. - pub fn new(resolver: AsyncResolver) -> Self { - ResolverFactory { - resolver: Some(resolver), - _t: PhantomData, - } +impl ResolverFactory { + pub fn new(resolver: Resolver) -> Self { + Self { resolver } } - pub fn service(&self) -> Resolver { - Resolver { - resolver: self.resolver.clone(), - _t: PhantomData, - } + pub fn service(&self) -> Resolver { + self.resolver.clone() } } -impl Default for ResolverFactory { - fn default() -> Self { - ResolverFactory { - resolver: None, - _t: PhantomData, - } - } -} - -impl Clone for ResolverFactory { - fn clone(&self) -> Self { - ResolverFactory { - resolver: self.resolver.clone(), - _t: PhantomData, - } - } -} - -impl ServiceFactory> for ResolverFactory { +impl ServiceFactory> for ResolverFactory { type Response = Connect; type Error = ConnectError; type Config = (); - type Service = Resolver; + type Service = Resolver; type InitError = (); - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - ok(self.service()) + let service = self.resolver.clone(); + Box::pin(async { Ok(service) }) } } /// DNS Resolver Service -pub struct Resolver { - resolver: Option, - _t: PhantomData, +#[derive(Clone)] +pub enum Resolver { + Default, + Custom(Rc), } -impl Resolver { - /// Create new resolver instance with custom configuration and options. - pub fn new(resolver: AsyncResolver) -> Self { - Resolver { - resolver: Some(resolver), - _t: PhantomData, +/// trait for custom lookup with self defined resolver. +pub trait Resolve { + fn lookup( + &self, + addrs: Vec, + ) -> LocalBoxFuture<'_, Result, Box>>; +} + +impl Resolver { + /// Constructor for custom Resolve trait object and use it as resolver. + pub fn new_custom(resolver: impl Resolve + 'static) -> Self { + Self::Custom(Rc::new(resolver)) + } + + async fn lookup( + &self, + req: &Connect, + host: String, + ) -> Result, ConnectError> { + match self { + Self::Default => { + let res = tokio::net::lookup_host(host).await.map_err(|e| { + trace!( + "DNS resolver: failed to resolve host {:?} err: {}", + req.host(), + e + ); + ConnectError::Resolver(Box::new(e)) + })?; + + Ok(res.collect()) + } + Self::Custom(resolver) => { + let host = host + .to_socket_addrs() + .map_err(|e| ConnectError::Resolver(Box::new(e)))? + .collect(); + resolver.lookup(host).await.map_err(ConnectError::Resolver) + } } } } -impl Default for Resolver { - fn default() -> Self { - Resolver { - resolver: None, - _t: PhantomData, - } - } -} - -impl Clone for Resolver { - fn clone(&self) -> Self { - Resolver { - resolver: self.resolver.clone(), - _t: PhantomData, - } - } -} - -impl Service> for Resolver { +impl Service> for Resolver { type Response = Connect; type Error = ConnectError; - #[allow(clippy::type_complexity)] - type Future = Either< - Pin>>>, - Ready, Self::Error>>, - >; + type Future = LocalBoxFuture<'static, Result>; actix_service::always_ready!(); fn call(&mut self, mut req: Connect) -> Self::Future { - if req.addr.is_some() { - Either::Right(ok(req)) - } else if let Ok(ip) = req.host().parse() { - req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port()))); - Either::Right(ok(req)) - } else { - let resolver = self.resolver.as_ref().map(AsyncResolver::clone); - Either::Left(Box::pin(async move { + let resolver = self.clone(); + Box::pin(async move { + if req.addr.is_some() { + Ok(req) + } else if let Ok(ip) = req.host().parse() { + req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port()))); + Ok(req) + } else { trace!("DNS resolver: resolving host {:?}", req.host()); - let resolver = if let Some(resolver) = resolver { - resolver + + let host = if req.host().contains(':') { + req.host().to_string() } else { - get_default_resolver() - .await - .expect("Failed to get default resolver") + format!("{}:{}", req.host(), req.port()) }; - ResolverFuture::new(req, &resolver).await - })) - } - } -} -type LookupIpFuture = Pin>>>; + let addrs = resolver.lookup(&req, host).await?; -#[doc(hidden)] -/// Resolver future -pub struct ResolverFuture { - req: Option>, - lookup: LookupIpFuture, -} - -impl ResolverFuture { - pub fn new(req: Connect, resolver: &AsyncResolver) -> Self { - let host = if let Some(host) = req.host().splitn(2, ':').next() { - host - } else { - req.host() - }; - - // Clone data to be moved to the lookup future - let host_clone = host.to_owned(); - let resolver_clone = resolver.clone(); - - ResolverFuture { - lookup: Box::pin(async move { - let resolver = resolver_clone; - resolver.lookup_ip(host_clone).await - }), - req: Some(req), - } - } -} - -impl Future for ResolverFuture { - type Output = Result, ConnectError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - match Pin::new(&mut this.lookup).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(ips)) => { - let req = this.req.take().unwrap(); - let port = req.port(); - let req = req.set_addrs(ips.iter().map(|ip| SocketAddr::new(ip, port))); + let req = req.set_addrs(addrs); trace!( "DNS resolver: host {:?} resolved to {:?}", @@ -187,19 +126,11 @@ impl Future for ResolverFuture { ); if req.addr.is_none() { - Poll::Ready(Err(ConnectError::NoRecords)) + Err(ConnectError::NoRecords) } else { - Poll::Ready(Ok(req)) + Ok(req) } } - Poll::Ready(Err(e)) => { - trace!( - "DNS resolver: failed to resolve host {:?} err: {}", - this.req.as_ref().unwrap().host(), - e - ); - Poll::Ready(Err(e.into())) - } - } + }) } } diff --git a/actix-tls/src/connect/service.rs b/actix-tls/src/connect/service.rs index 59fe20cc..9c8d0736 100644 --- a/actix-tls/src/connect/service.rs +++ b/actix-tls/src/connect/service.rs @@ -5,38 +5,29 @@ use std::task::{Context, Poll}; use actix_rt::net::TcpStream; use actix_service::{Service, ServiceFactory}; use either::Either; -use futures_util::future::{ok, Ready}; -use trust_dns_resolver::TokioAsyncResolver as AsyncResolver; +use futures_core::future::LocalBoxFuture; use super::connect::{Address, Connect, Connection}; use super::connector::{TcpConnector, TcpConnectorFactory}; use super::error::ConnectError; use super::resolve::{Resolver, ResolverFactory}; -pub struct ConnectServiceFactory { - tcp: TcpConnectorFactory, - resolver: ResolverFactory, +pub struct ConnectServiceFactory { + tcp: TcpConnectorFactory, + resolver: ResolverFactory, } -impl ConnectServiceFactory { +impl ConnectServiceFactory { /// Construct new ConnectService factory - pub fn new() -> Self { + pub fn new(resolver: Resolver) -> Self { ConnectServiceFactory { - tcp: TcpConnectorFactory::default(), - resolver: ResolverFactory::default(), - } - } - - /// Construct new connect service with custom dns resolver - pub fn with_resolver(resolver: AsyncResolver) -> Self { - ConnectServiceFactory { - tcp: TcpConnectorFactory::default(), + tcp: TcpConnectorFactory, resolver: ResolverFactory::new(resolver), } } /// Construct new service - pub fn service(&self) -> ConnectService { + pub fn service(&self) -> ConnectService { ConnectService { tcp: self.tcp.service(), resolver: self.resolver.service(), @@ -44,7 +35,7 @@ impl ConnectServiceFactory { } /// Construct new tcp stream service - pub fn tcp_service(&self) -> TcpConnectService { + pub fn tcp_service(&self) -> TcpConnectService { TcpConnectService { tcp: self.tcp.service(), resolver: self.resolver.service(), @@ -52,44 +43,36 @@ impl ConnectServiceFactory { } } -impl Default for ConnectServiceFactory { - fn default() -> Self { - ConnectServiceFactory { - tcp: TcpConnectorFactory::default(), - resolver: ResolverFactory::default(), - } - } -} - -impl Clone for ConnectServiceFactory { +impl Clone for ConnectServiceFactory { fn clone(&self) -> Self { ConnectServiceFactory { - tcp: self.tcp.clone(), + tcp: self.tcp, resolver: self.resolver.clone(), } } } -impl ServiceFactory> for ConnectServiceFactory { +impl ServiceFactory> for ConnectServiceFactory { type Response = Connection; type Error = ConnectError; type Config = (); - type Service = ConnectService; + type Service = ConnectService; type InitError = (); - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - ok(self.service()) + let service = self.service(); + Box::pin(async move { Ok(service) }) } } #[derive(Clone)] -pub struct ConnectService { - tcp: TcpConnector, - resolver: Resolver, +pub struct ConnectService { + tcp: TcpConnector, + resolver: Resolver, } -impl Service> for ConnectService { +impl Service> for ConnectService { type Response = Connection; type Error = ConnectError; type Future = ConnectServiceResponse; @@ -99,14 +82,14 @@ impl Service> for ConnectService { fn call(&mut self, req: Connect) -> Self::Future { ConnectServiceResponse { state: ConnectState::Resolve(self.resolver.call(req)), - tcp: self.tcp.clone(), + tcp: self.tcp, } } } enum ConnectState { - Resolve( as Service>>::Future), - Connect( as Service>>::Future), + Resolve(>>::Future), + Connect(>>::Future), } impl ConnectState { @@ -128,7 +111,7 @@ impl ConnectState { pub struct ConnectServiceResponse { state: ConnectState, - tcp: TcpConnector, + tcp: TcpConnector, } impl Future for ConnectServiceResponse { @@ -151,12 +134,12 @@ impl Future for ConnectServiceResponse { } #[derive(Clone)] -pub struct TcpConnectService { - tcp: TcpConnector, - resolver: Resolver, +pub struct TcpConnectService { + tcp: TcpConnector, + resolver: Resolver, } -impl Service> for TcpConnectService { +impl Service> for TcpConnectService { type Response = TcpStream; type Error = ConnectError; type Future = TcpConnectServiceResponse; @@ -166,14 +149,14 @@ impl Service> for TcpConnectService { fn call(&mut self, req: Connect) -> Self::Future { TcpConnectServiceResponse { state: TcpConnectState::Resolve(self.resolver.call(req)), - tcp: self.tcp.clone(), + tcp: self.tcp, } } } enum TcpConnectState { - Resolve( as Service>>::Future), - Connect( as Service>>::Future), + Resolve(>>::Future), + Connect(>>::Future), } impl TcpConnectState { @@ -202,7 +185,7 @@ impl TcpConnectState { pub struct TcpConnectServiceResponse { state: TcpConnectState, - tcp: TcpConnector, + tcp: TcpConnector, } impl Future for TcpConnectServiceResponse { diff --git a/actix-tls/src/connect/ssl/openssl.rs b/actix-tls/src/connect/ssl/openssl.rs index 5193ce37..033bb77d 100644 --- a/actix-tls/src/connect/ssl/openssl.rs +++ b/actix-tls/src/connect/ssl/openssl.rs @@ -1,23 +1,25 @@ -use std::future::Future; -use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::{fmt, io}; +use std::{ + fmt, + future::Future, + io, + marker::PhantomData, + pin::Pin, + task::{Context, Poll}, +}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_rt::net::TcpStream; use actix_service::{Service, ServiceFactory}; -use futures_util::{ - future::{ready, Either, Ready}, - ready, -}; +use futures_core::{future::LocalBoxFuture, ready}; +use futures_util::future::{ready, Either, Ready}; use log::trace; + pub use openssl::ssl::{Error as SslError, HandshakeError, SslConnector, SslMethod}; pub use tokio_openssl::SslStream; -use trust_dns_resolver::TokioAsyncResolver as AsyncResolver; +use crate::connect::resolve::Resolve; use crate::connect::{ - Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection, + Address, Connect, ConnectError, ConnectService, ConnectServiceFactory, Connection, Resolver, }; /// OpenSSL connector factory @@ -55,12 +57,11 @@ where type Config = (); type Service = OpensslConnectorService; type InitError = (); - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - ready(Ok(OpensslConnectorService { - connector: self.connector.clone(), - })) + let connector = self.connector.clone(); + Box::pin(async { Ok(OpensslConnectorService { connector }) }) } } @@ -139,30 +140,30 @@ where } } -pub struct OpensslConnectServiceFactory { - tcp: ConnectServiceFactory, +pub struct OpensslConnectServiceFactory { + tcp: ConnectServiceFactory, openssl: OpensslConnector, } -impl OpensslConnectServiceFactory { +impl OpensslConnectServiceFactory { /// Construct new OpensslConnectService factory pub fn new(connector: SslConnector) -> Self { OpensslConnectServiceFactory { - tcp: ConnectServiceFactory::default(), + tcp: ConnectServiceFactory::new(Resolver::Default), openssl: OpensslConnector::new(connector), } } /// Construct new connect service with custom DNS resolver - pub fn with_resolver(connector: SslConnector, resolver: AsyncResolver) -> Self { + pub fn with_resolver(connector: SslConnector, resolver: impl Resolve + 'static) -> Self { OpensslConnectServiceFactory { - tcp: ConnectServiceFactory::with_resolver(resolver), + tcp: ConnectServiceFactory::new(Resolver::new_custom(resolver)), openssl: OpensslConnector::new(connector), } } /// Construct OpenSSL connect service - pub fn service(&self) -> OpensslConnectService { + pub fn service(&self) -> OpensslConnectService { OpensslConnectService { tcp: self.tcp.service(), openssl: OpensslConnectorService { @@ -172,7 +173,7 @@ impl OpensslConnectServiceFactory { } } -impl Clone for OpensslConnectServiceFactory { +impl Clone for OpensslConnectServiceFactory { fn clone(&self) -> Self { OpensslConnectServiceFactory { tcp: self.tcp.clone(), @@ -181,26 +182,27 @@ impl Clone for OpensslConnectServiceFactory { } } -impl ServiceFactory> for OpensslConnectServiceFactory { +impl ServiceFactory> for OpensslConnectServiceFactory { type Response = SslStream; type Error = ConnectError; type Config = (); - type Service = OpensslConnectService; + type Service = OpensslConnectService; type InitError = (); - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - ready(Ok(self.service())) + let service = self.service(); + Box::pin(async { Ok(service) }) } } #[derive(Clone)] -pub struct OpensslConnectService { - tcp: ConnectService, +pub struct OpensslConnectService { + tcp: ConnectService, openssl: OpensslConnectorService, } -impl Service> for OpensslConnectService { +impl Service> for OpensslConnectService { type Response = SslStream; type Error = ConnectError; type Future = OpensslConnectServiceResponse; @@ -217,7 +219,7 @@ impl Service> for OpensslConnectService { } pub struct OpensslConnectServiceResponse { - fut1: Option< as Service>>::Future>, + fut1: Option<>>::Future>, fut2: Option<>>::Future>, openssl: OpensslConnectorService, } diff --git a/actix-tls/src/connect/ssl/rustls.rs b/actix-tls/src/connect/ssl/rustls.rs index 390ba413..5eb5bee6 100644 --- a/actix-tls/src/connect/ssl/rustls.rs +++ b/actix-tls/src/connect/ssl/rustls.rs @@ -1,18 +1,17 @@ -use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; +use std::{ + fmt, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; pub use rustls::Session; pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; -use futures_util::{ - future::{ready, Ready}, - ready, -}; +use futures_core::{future::LocalBoxFuture, ready}; use log::trace; use tokio_rustls::{Connect, TlsConnector}; use webpki::DNSNameRef; @@ -53,12 +52,11 @@ where type Config = (); type Service = RustlsConnectorService; type InitError = (); - type Future = Ready>; + type Future = LocalBoxFuture<'static, Result>; fn new_service(&self, _: ()) -> Self::Future { - ready(Ok(RustlsConnectorService { - connector: self.connector.clone(), - })) + let connector = self.connector.clone(); + Box::pin(async { Ok(RustlsConnectorService { connector }) }) } } diff --git a/actix-tls/tests/test_connect.rs b/actix-tls/tests/test_connect.rs index aa773c7f..7a0074a8 100644 --- a/actix-tls/tests/test_connect.rs +++ b/actix-tls/tests/test_connect.rs @@ -7,11 +7,7 @@ use actix_service::{fn_service, Service, ServiceFactory}; use bytes::Bytes; use futures_util::sink::SinkExt; -use actix_tls::connect::{ - self as actix_connect, - resolver::{ResolverConfig, ResolverOpts}, - Connect, -}; +use actix_tls::connect::{self as actix_connect, Connect}; #[cfg(all(feature = "connect", feature = "openssl"))] #[actix_rt::test] @@ -57,14 +53,13 @@ async fn test_static_str() { }) }); - let resolver = actix_connect::start_default_resolver().await.unwrap(); - let mut conn = actix_connect::new_connector(resolver.clone()); + let mut conn = actix_connect::default_connector(); let con = conn.call(Connect::with("10", srv.addr())).await.unwrap(); assert_eq!(con.peer_addr().unwrap(), srv.addr()); let connect = Connect::new(srv.host().to_owned()); - let mut conn = actix_connect::new_connector(resolver); + let mut conn = actix_connect::default_connector(); let con = conn.call(connect).await; assert!(con.is_err()); } @@ -79,12 +74,7 @@ async fn test_new_service() { }) }); - let resolver = - actix_connect::start_resolver(ResolverConfig::default(), ResolverOpts::default()) - .await - .unwrap(); - - let factory = actix_connect::new_connector_factory(resolver); + let factory = actix_connect::default_connector_factory(); let mut conn = factory.new_service(()).await.unwrap(); let con = conn.call(Connect::with("10", srv.addr())).await.unwrap(); diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index f038414c..c82cf79e 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -17,7 +17,7 @@ path = "src/lib.rs" [dependencies] actix-codec = "0.4.0-beta.1" -actix-rt = "2.0.0-beta.2" +actix-rt = { version = "2.0.0-beta.2", default-features = false } actix-service = "2.0.0-beta.3" futures-core = { version = "0.3.7", default-features = false }