temporary compat layer for tokio02 crates

This commit is contained in:
fakeshadow 2020-11-04 03:02:21 +08:00
parent a55c34c3f3
commit 1fae0037f5
3 changed files with 152 additions and 142 deletions

View File

@ -41,6 +41,7 @@ either = "1.5.3"
futures-util = { version = "0.3.4", default-features = false } futures-util = { version = "0.3.4", default-features = false }
http = { version = "0.2.0", optional = true } http = { version = "0.2.0", optional = true }
log = "0.4" log = "0.4"
tokio-compat-02 = "0.1.2"
trust-dns-proto = { version = "0.19", default-features = false, features = ["tokio-runtime"] } trust-dns-proto = { version = "0.19", default-features = false, features = ["tokio-runtime"] }
trust-dns-resolver = { version = "0.19", default-features = false, features = ["tokio-runtime", "system-config"] } trust-dns-resolver = { version = "0.19", default-features = false, features = ["tokio-runtime", "system-config"] }

View File

@ -43,12 +43,20 @@ pub async fn start_resolver(
cfg: ResolverConfig, cfg: ResolverConfig,
opts: ResolverOpts, opts: ResolverOpts,
) -> Result<AsyncResolver, ConnectError> { ) -> Result<AsyncResolver, ConnectError> {
// FIXME: remove compat layer
use tokio_compat_02::FutureExt;
async {
Ok(AsyncResolver::tokio(cfg, opts).await?) Ok(AsyncResolver::tokio(cfg, opts).await?)
}.compat().await
} }
struct DefaultResolver(AsyncResolver); struct DefaultResolver(AsyncResolver);
pub(crate) async fn get_default_resolver() -> Result<AsyncResolver, ConnectError> { pub(crate) async fn get_default_resolver() -> Result<AsyncResolver, ConnectError> {
// FIXME: remove compat layer
use tokio_compat_02::FutureExt;
async {
if Arbiter::contains_item::<DefaultResolver>() { if Arbiter::contains_item::<DefaultResolver>() {
Ok(Arbiter::get_item(|item: &DefaultResolver| item.0.clone())) Ok(Arbiter::get_item(|item: &DefaultResolver| item.0.clone()))
} else { } else {
@ -65,6 +73,7 @@ pub(crate) async fn get_default_resolver() -> Result<AsyncResolver, ConnectError
Arbiter::set_item(DefaultResolver(resolver.clone())); Arbiter::set_item(DefaultResolver(resolver.clone()));
Ok(resolver) Ok(resolver)
} }
}.compat().await
} }
pub async fn start_default_resolver() -> Result<AsyncResolver, ConnectError> { pub async fn start_default_resolver() -> Result<AsyncResolver, ConnectError> {

View File

@ -1,127 +1,127 @@
// use std::io; use std::io;
//
// use actix_codec::{BytesCodec, Framed}; use actix_codec::{BytesCodec, Framed};
// use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
// use actix_service::{fn_service, Service, ServiceFactory}; use actix_service::{fn_service, Service, ServiceFactory};
// use actix_testing::TestServer; use actix_testing::TestServer;
// use bytes::Bytes; use bytes::Bytes;
// use futures_util::sink::SinkExt; use futures_util::sink::SinkExt;
//
// use actix_connect::resolver::{ResolverConfig, ResolverOpts}; use actix_connect::resolver::{ResolverConfig, ResolverOpts};
// use actix_connect::Connect; use actix_connect::Connect;
//
// #[cfg(feature = "openssl")] #[cfg(feature = "openssl")]
// #[actix_rt::test] #[actix_rt::test]
// async fn test_string() { async fn test_string() {
// let srv = TestServer::with(|| { let srv = TestServer::with(|| {
// fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
// let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
// framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
// Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
// }) })
// }); });
//
// let mut conn = actix_connect::default_connector(); let mut conn = actix_connect::default_connector();
// let addr = format!("localhost:{}", srv.port()); let addr = format!("localhost:{}", srv.port());
// let con = conn.call(addr.into()).await.unwrap(); let con = conn.call(addr.into()).await.unwrap();
// assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
// } }
//
// #[cfg(feature = "rustls")] #[cfg(feature = "rustls")]
// #[actix_rt::test] #[actix_rt::test]
// async fn test_rustls_string() { async fn test_rustls_string() {
// let srv = TestServer::with(|| { let srv = TestServer::with(|| {
// fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
// let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
// framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
// Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
// }) })
// }); });
//
// let mut conn = actix_connect::default_connector(); let mut conn = actix_connect::default_connector();
// let addr = format!("localhost:{}", srv.port()); let addr = format!("localhost:{}", srv.port());
// let con = conn.call(addr.into()).await.unwrap(); let con = conn.call(addr.into()).await.unwrap();
// assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
// } }
//
// #[actix_rt::test] #[actix_rt::test]
// async fn test_static_str() { async fn test_static_str() {
// let srv = TestServer::with(|| { let srv = TestServer::with(|| {
// fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
// let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
// framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
// Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
// }) })
// }); });
//
// let resolver = actix_connect::start_default_resolver().await.unwrap(); let resolver = actix_connect::start_default_resolver().await.unwrap();
// let mut conn = actix_connect::new_connector(resolver.clone()); let mut conn = actix_connect::new_connector(resolver.clone());
//
// let con = conn.call(Connect::with("10", srv.addr())).await.unwrap(); let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
// assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
//
// let connect = Connect::new(srv.host().to_owned()); let connect = Connect::new(srv.host().to_owned());
// let mut conn = actix_connect::new_connector(resolver); let mut conn = actix_connect::new_connector(resolver);
// let con = conn.call(connect).await; let con = conn.call(connect).await;
// assert!(con.is_err()); assert!(con.is_err());
// } }
//
// #[actix_rt::test] #[actix_rt::test]
// async fn test_new_service() { async fn test_new_service() {
// let srv = TestServer::with(|| { let srv = TestServer::with(|| {
// fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
// let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
// framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
// Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
// }) })
// }); });
//
// let resolver = let resolver =
// actix_connect::start_resolver(ResolverConfig::default(), ResolverOpts::default()) actix_connect::start_resolver(ResolverConfig::default(), ResolverOpts::default())
// .await .await
// .unwrap(); .unwrap();
//
// let factory = actix_connect::new_connector_factory(resolver); let factory = actix_connect::new_connector_factory(resolver);
//
// let mut conn = factory.new_service(()).await.unwrap(); let mut conn = factory.new_service(()).await.unwrap();
// let con = conn.call(Connect::with("10", srv.addr())).await.unwrap(); let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
// assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
// } }
//
// #[cfg(all(feature = "openssl", feature = "uri"))] #[cfg(all(feature = "openssl", feature = "uri"))]
// #[actix_rt::test] #[actix_rt::test]
// async fn test_openssl_uri() { async fn test_openssl_uri() {
// use std::convert::TryFrom; use std::convert::TryFrom;
//
// let srv = TestServer::with(|| { let srv = TestServer::with(|| {
// fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
// let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
// framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
// Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
// }) })
// }); });
//
// let mut conn = actix_connect::default_connector(); let mut conn = actix_connect::default_connector();
// let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap(); let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
// let con = conn.call(addr.into()).await.unwrap(); let con = conn.call(addr.into()).await.unwrap();
// assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
// } }
//
// #[cfg(all(feature = "rustls", feature = "uri"))] #[cfg(all(feature = "rustls", feature = "uri"))]
// #[actix_rt::test] #[actix_rt::test]
// async fn test_rustls_uri() { async fn test_rustls_uri() {
// use std::convert::TryFrom; use std::convert::TryFrom;
//
// let srv = TestServer::with(|| { let srv = TestServer::with(|| {
// fn_service(|io: TcpStream| async { fn_service(|io: TcpStream| async {
// let mut framed = Framed::new(io, BytesCodec); let mut framed = Framed::new(io, BytesCodec);
// framed.send(Bytes::from_static(b"test")).await?; framed.send(Bytes::from_static(b"test")).await?;
// Ok::<_, io::Error>(()) Ok::<_, io::Error>(())
// }) })
// }); });
//
// let mut conn = actix_connect::default_connector(); let mut conn = actix_connect::default_connector();
// let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap(); let addr = http::Uri::try_from(format!("https://localhost:{}", srv.port())).unwrap();
// let con = conn.call(addr.into()).await.unwrap(); let con = conn.call(addr.into()).await.unwrap();
// assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
// } }