migrate actix-connect to std::future

This commit is contained in:
Nikolay Kim 2019-11-14 10:53:59 +06:00
parent b12b3b12a9
commit a87591769c
15 changed files with 297 additions and 231 deletions

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-codec" name = "actix-codec"
version = "0.1.2" version = "0.2.0-alpha.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Utilities for encoding and decoding frames" description = "Utilities for encoding and decoding frames"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@ -1,8 +1,8 @@
[package] [package]
name = "actix-connect" name = "actix-connect"
version = "0.3.0" version = "1.0.0-alpha.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Connector - tcp connector service" description = "Actix connect - tcp connector service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
homepage = "https://actix.rs" homepage = "https://actix.rs"
repository = "https://github.com/actix/actix-net.git" repository = "https://github.com/actix/actix-net.git"
@ -14,7 +14,7 @@ edition = "2018"
workspace = ".." workspace = ".."
[package.metadata.docs.rs] [package.metadata.docs.rs]
features = ["ssl", "uri"] features = ["openssl", "rustls", "uri"]
[lib] [lib]
name = "actix_connect" name = "actix_connect"
@ -24,33 +24,35 @@ path = "src/lib.rs"
default = ["uri"] default = ["uri"]
# openssl # openssl
ssl = ["openssl", "tokio-openssl"] openssl = ["open-ssl", "tokio-openssl"]
#rustls #rustls
rust-tls = ["rustls", "tokio-rustls", "webpki"] rustls = ["rust-tls", "tokio-rustls", "webpki"]
# support http::Uri as connect address # support http::Uri as connect address
uri = ["http"] uri = ["http"]
[dependencies] [dependencies]
actix-service = "0.4.0" actix-service = "1.0.0-alpha.1"
actix-codec = "0.1.2" actix-codec = "0.2.0-alpha.1"
actix-utils = "0.4.0" actix-utils = "0.5.0-alpha.1"
actix-rt = "0.2.5" actix-rt = "1.0.0-alpha.1"
derive_more = "0.15" derive_more = "0.15"
either = "1.5.2" either = "1.5.2"
futures = "0.3.1" futures = "0.3.1"
pin-project = "0.4.5"
http = { version = "0.1.17", optional = true } http = { version = "0.1.17", optional = true }
log = "0.4" log = "0.4"
tokio-net = "=0.2.0-alpha.6" tokio-net = "=0.2.0-alpha.6"
tokio-executor = "=0.2.0-alpha.6"
trust-dns-resolver = { version="0.18.0-alpha.1", default-features = false } trust-dns-resolver = { version="0.18.0-alpha.1", default-features = false }
# openssl # openssl
openssl = { version="0.10", optional = true } open-ssl = { version="0.10", package = "openssl", optional = true }
tokio-openssl = { version="0.3", optional = true } tokio-openssl = { version="0.3", optional = true }
#rustls #rustls
rustls = { version = "0.16.0", optional = true } rust-tls = { version = "0.16.0", package = "rustls", optional = true }
tokio-rustls = { version = "0.10.0", optional = true } tokio-rustls = { version = "0.10.0", optional = true }
webpki = { version = "0.21", optional = true } webpki = { version = "0.21", optional = true }
@ -58,3 +60,4 @@ webpki = { version = "0.21", optional = true }
bytes = "0.4" bytes = "0.4"
actix-testing = { version="0.2.0" } actix-testing = { version="0.2.0" }
actix-server-config = "0.2.0" actix-server-config = "0.2.0"
tokio = "0.2.0-alpha.6"

View File

@ -1,11 +1,15 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::future::Future;
use std::io;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{NewService, Service}; use actix_service::{Service, ServiceFactory};
use futures::future::{err, ok, Either, FutureResult}; use futures::future::{err, ok, BoxFuture, Either, FutureExt, Ready};
use futures::{Async, Future, Poll}; use pin_project::pin_project;
use tokio_tcp::{ConnectFuture, TcpStream}; use tokio_net::tcp::TcpStream;
use super::connect::{Address, Connect, Connection}; use super::connect::{Address, Connect, Connection};
use super::error::ConnectError; use super::error::ConnectError;
@ -37,14 +41,14 @@ impl<T> Clone for TcpConnectorFactory<T> {
} }
} }
impl<T: Address> NewService for TcpConnectorFactory<T> { impl<T: Address> ServiceFactory for TcpConnectorFactory<T> {
type Request = Connect<T>; type Request = Connect<T>;
type Response = Connection<T, TcpStream>; type Response = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
type Config = (); type Config = ();
type Service = TcpConnector<T>; type Service = TcpConnector<T>;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
ok(self.service()) ok(self.service())
@ -71,10 +75,10 @@ impl<T: Address> Service for TcpConnector<T> {
type Request = Connect<T>; type Request = Connect<T>;
type Response = Connection<T, TcpStream>; type Response = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
type Future = Either<TcpConnectorResponse<T>, FutureResult<Self::Response, Self::Error>>; type Future = Either<TcpConnectorResponse<T>, Ready<Result<Self::Response, Self::Error>>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Ok(Async::Ready(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, req: Connect<T>) -> Self::Future { fn call(&mut self, req: Connect<T>) -> Self::Future {
@ -82,21 +86,22 @@ impl<T: Address> Service for TcpConnector<T> {
let Connect { req, addr, .. } = req; let Connect { req, addr, .. } = req;
if let Some(addr) = addr { if let Some(addr) = addr {
Either::A(TcpConnectorResponse::new(req, port, addr)) Either::Left(TcpConnectorResponse::new(req, port, addr))
} else { } else {
error!("TCP connector: got unresolved address"); error!("TCP connector: got unresolved address");
Either::B(err(ConnectError::Unresolverd)) Either::Right(err(ConnectError::Unresolverd))
} }
} }
} }
#[pin_project]
#[doc(hidden)] #[doc(hidden)]
/// Tcp stream connector response future /// Tcp stream connector response future
pub struct TcpConnectorResponse<T> { pub struct TcpConnectorResponse<T> {
req: Option<T>, req: Option<T>,
port: u16, port: u16,
addrs: Option<VecDeque<SocketAddr>>, addrs: Option<VecDeque<SocketAddr>>,
stream: Option<ConnectFuture>, stream: Option<BoxFuture<'static, Result<TcpStream, io::Error>>>,
} }
impl<T: Address> TcpConnectorResponse<T> { impl<T: Address> TcpConnectorResponse<T> {
@ -116,7 +121,7 @@ impl<T: Address> TcpConnectorResponse<T> {
req: Some(req), req: Some(req),
port, port,
addrs: None, addrs: None,
stream: Some(TcpStream::connect(&addr)), stream: Some(TcpStream::connect(addr).boxed()),
}, },
either::Either::Right(addrs) => TcpConnectorResponse { either::Either::Right(addrs) => TcpConnectorResponse {
req: Some(req), req: Some(req),
@ -129,40 +134,40 @@ impl<T: Address> TcpConnectorResponse<T> {
} }
impl<T: Address> Future for TcpConnectorResponse<T> { impl<T: Address> Future for TcpConnectorResponse<T> {
type Item = Connection<T, TcpStream>; type Output = Result<Connection<T, TcpStream>, ConnectError>;
type Error = ConnectError;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// connect // connect
loop { loop {
if let Some(new) = self.stream.as_mut() { if let Some(new) = this.stream.as_mut() {
match new.poll() { match new.as_mut().poll(cx) {
Ok(Async::Ready(sock)) => { Poll::Ready(Ok(sock)) => {
let req = self.req.take().unwrap(); let req = this.req.take().unwrap();
trace!( trace!(
"TCP connector - successfully connected to connecting to {:?} - {:?}", "TCP connector - successfully connected to connecting to {:?} - {:?}",
req.host(), sock.peer_addr() req.host(), sock.peer_addr()
); );
return Ok(Async::Ready(Connection::new(sock, req))); return Poll::Ready(Ok(Connection::new(sock, req)));
} }
Ok(Async::NotReady) => return Ok(Async::NotReady), Poll::Pending => return Poll::Pending,
Err(err) => { Poll::Ready(Err(err)) => {
trace!( trace!(
"TCP connector - failed to connect to connecting to {:?} port: {}", "TCP connector - failed to connect to connecting to {:?} port: {}",
self.req.as_ref().unwrap().host(), this.req.as_ref().unwrap().host(),
self.port, this.port,
); );
if self.addrs.is_none() || self.addrs.as_ref().unwrap().is_empty() { if this.addrs.is_none() || this.addrs.as_ref().unwrap().is_empty() {
return Err(err.into()); return Poll::Ready(Err(err.into()));
} }
} }
} }
} }
// try to connect // try to connect
self.stream = Some(TcpStream::connect( let addr = this.addrs.as_mut().unwrap().pop_front().unwrap();
&self.addrs.as_mut().unwrap().pop_front().unwrap(), *this.stream = Some(TcpStream::connect(addr).boxed());
));
} }
} }
} }

View File

@ -31,12 +31,12 @@ pub use self::resolver::{Resolver, ResolverFactory};
pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService}; pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService};
use actix_rt::Arbiter; use actix_rt::Arbiter;
use actix_service::{NewService, Service, ServiceExt}; use actix_service::{pipeline, pipeline_factory, Service, ServiceFactory};
use tokio_tcp::TcpStream; use tokio_net::tcp::TcpStream;
pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver { pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver {
let (resolver, bg) = AsyncResolver::new(cfg, opts); let (resolver, bg) = AsyncResolver::new(cfg, opts);
tokio_current_thread::spawn(bg); tokio_executor::current_thread::spawn(bg);
resolver resolver
} }
@ -55,7 +55,7 @@ pub(crate) fn get_default_resolver() -> AsyncResolver {
}; };
let (resolver, bg) = AsyncResolver::new(cfg, opts); let (resolver, bg) = AsyncResolver::new(cfg, opts);
tokio_current_thread::spawn(bg); tokio_executor::current_thread::spawn(bg);
Arbiter::set_item(DefaultResolver(resolver.clone())); Arbiter::set_item(DefaultResolver(resolver.clone()));
resolver resolver
@ -70,37 +70,37 @@ pub fn start_default_resolver() -> AsyncResolver {
pub fn new_connector<T: Address>( pub fn new_connector<T: Address>(
resolver: AsyncResolver, resolver: AsyncResolver,
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> ) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
+ Clone { {
Resolver::new(resolver).and_then(TcpConnector::new()) pipeline(Resolver::new(resolver)).and_then(TcpConnector::new())
} }
/// Create tcp connector service /// Create tcp connector service
pub fn new_connector_factory<T: Address>( pub fn new_connector_factory<T: Address>(
resolver: AsyncResolver, resolver: AsyncResolver,
) -> impl NewService< ) -> impl ServiceFactory<
Config = (), Config = (),
Request = Connect<T>, Request = Connect<T>,
Response = Connection<T, TcpStream>, Response = Connection<T, TcpStream>,
Error = ConnectError, Error = ConnectError,
InitError = (), InitError = (),
> + Clone { > {
ResolverFactory::new(resolver).and_then(TcpConnectorFactory::new()) pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory::new())
} }
/// Create connector service with default parameters /// Create connector service with default parameters
pub fn default_connector<T: Address>( pub fn default_connector<T: Address>(
) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError> ) -> impl Service<Request = Connect<T>, Response = Connection<T, TcpStream>, Error = ConnectError>
+ Clone { {
Resolver::default().and_then(TcpConnector::new()) pipeline(Resolver::default()).and_then(TcpConnector::new())
} }
/// Create connector service factory with default parameters /// Create connector service factory with default parameters
pub fn default_connector_factory<T: Address>() -> impl NewService< pub fn default_connector_factory<T: Address>() -> impl ServiceFactory<
Config = (), Config = (),
Request = Connect<T>, Request = Connect<T>,
Response = Connection<T, TcpStream>, Response = Connection<T, TcpStream>,
Error = ConnectError, Error = ConnectError,
InitError = (), InitError = (),
> + Clone { > {
ResolverFactory::default().and_then(TcpConnectorFactory::new()) pipeline_factory(ResolverFactory::default()).and_then(TcpConnectorFactory::new())
} }

View File

@ -1,9 +1,12 @@
use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{NewService, Service}; use actix_service::{Service, ServiceFactory};
use futures::future::{ok, Either, FutureResult}; use futures::future::{ok, Either, Ready};
use futures::{Async, Future, Poll}; use pin_project::pin_project;
use trust_dns_resolver::lookup_ip::LookupIpFuture; use trust_dns_resolver::lookup_ip::LookupIpFuture;
use trust_dns_resolver::{AsyncResolver, Background}; use trust_dns_resolver::{AsyncResolver, Background};
@ -52,14 +55,14 @@ impl<T> Clone for ResolverFactory<T> {
} }
} }
impl<T: Address> NewService for ResolverFactory<T> { impl<T: Address> ServiceFactory for ResolverFactory<T> {
type Request = Connect<T>; type Request = Connect<T>;
type Response = Connect<T>; type Response = Connect<T>;
type Error = ConnectError; type Error = ConnectError;
type Config = (); type Config = ();
type Service = Resolver<T>; type Service = Resolver<T>;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
ok(self.service()) ok(self.service())
@ -104,32 +107,34 @@ impl<T: Address> Service for Resolver<T> {
type Request = Connect<T>; type Request = Connect<T>;
type Response = Connect<T>; type Response = Connect<T>;
type Error = ConnectError; type Error = ConnectError;
type Future = Either<ResolverFuture<T>, FutureResult<Connect<T>, Self::Error>>; type Future = Either<ResolverFuture<T>, Ready<Result<Connect<T>, Self::Error>>>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Ok(Async::Ready(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, mut req: Connect<T>) -> Self::Future { fn call(&mut self, mut req: Connect<T>) -> Self::Future {
if req.addr.is_some() { if req.addr.is_some() {
Either::B(ok(req)) Either::Right(ok(req))
} else if let Ok(ip) = req.host().parse() { } else if let Ok(ip) = req.host().parse() {
req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port()))); req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port())));
Either::B(ok(req)) Either::Right(ok(req))
} else { } else {
trace!("DNS resolver: resolving host {:?}", req.host()); trace!("DNS resolver: resolving host {:?}", req.host());
if self.resolver.is_none() { if self.resolver.is_none() {
self.resolver = Some(get_default_resolver()); self.resolver = Some(get_default_resolver());
} }
Either::A(ResolverFuture::new(req, self.resolver.as_ref().unwrap())) Either::Left(ResolverFuture::new(req, self.resolver.as_ref().unwrap()))
} }
} }
} }
#[pin_project]
#[doc(hidden)] #[doc(hidden)]
/// Resolver future /// Resolver future
pub struct ResolverFuture<T: Address> { pub struct ResolverFuture<T: Address> {
req: Option<Connect<T>>, req: Option<Connect<T>>,
#[pin]
lookup: Background<LookupIpFuture>, lookup: Background<LookupIpFuture>,
} }
@ -149,22 +154,15 @@ impl<T: Address> ResolverFuture<T> {
} }
impl<T: Address> Future for ResolverFuture<T> { impl<T: Address> Future for ResolverFuture<T> {
type Item = Connect<T>; type Output = Result<Connect<T>, ConnectError>;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.lookup.poll().map_err(|e| { let this = self.project();
trace!(
"DNS resolver: failed to resolve host {:?} err: {}",
self.req.as_ref().unwrap().host(),
e
);
e
})? {
Async::NotReady => Ok(Async::NotReady),
Async::Ready(ips) => {
let req = self.req.take().unwrap();
match this.lookup.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(ips)) => {
let req = this.req.take().unwrap();
let port = req.port(); let port = req.port();
let req = req.set_addrs(ips.iter().map(|ip| SocketAddr::new(ip, port))); let req = req.set_addrs(ips.iter().map(|ip| SocketAddr::new(ip, port)));
@ -175,11 +173,19 @@ impl<T: Address> Future for ResolverFuture<T> {
); );
if req.addr.is_none() { if req.addr.is_none() {
Err(ConnectError::NoRecords) Poll::Ready(Err(ConnectError::NoRecords))
} else { } else {
Ok(Async::Ready(req)) Poll::Ready(Ok(req))
} }
} }
Poll::Ready(Err(e)) => {
trace!(
"DNS resolver: failed to resolve host {:?} err: {}",
this.req.as_ref().unwrap().host(),
e
);
Poll::Ready(Err(e.into()))
}
} }
} }
} }

View File

@ -1,7 +1,11 @@
use actix_service::{NewService, Service}; use std::future::Future;
use futures::future::{ok, FutureResult}; use std::pin::Pin;
use futures::{try_ready, Async, Future, Poll}; use std::task::{Context, Poll};
use tokio_tcp::TcpStream;
use actix_service::{Service, ServiceFactory};
use either::Either;
use futures::future::{ok, Ready};
use tokio_net::tcp::TcpStream;
use trust_dns_resolver::AsyncResolver; use trust_dns_resolver::AsyncResolver;
use crate::connect::{Address, Connect, Connection}; use crate::connect::{Address, Connect, Connection};
@ -14,7 +18,7 @@ pub struct ConnectServiceFactory<T> {
resolver: ResolverFactory<T>, resolver: ResolverFactory<T>,
} }
impl<T> ConnectServiceFactory<T> { impl<T: Unpin> ConnectServiceFactory<T> {
/// Construct new ConnectService factory /// Construct new ConnectService factory
pub fn new() -> Self { pub fn new() -> Self {
ConnectServiceFactory { ConnectServiceFactory {
@ -66,14 +70,14 @@ impl<T> Clone for ConnectServiceFactory<T> {
} }
} }
impl<T: Address> NewService for ConnectServiceFactory<T> { impl<T: Address + Unpin> ServiceFactory for ConnectServiceFactory<T> {
type Request = Connect<T>; type Request = Connect<T>;
type Response = Connection<T, TcpStream>; type Response = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
type Config = (); type Config = ();
type Service = ConnectService<T>; type Service = ConnectService<T>;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
ok(self.service()) ok(self.service())
@ -86,47 +90,66 @@ pub struct ConnectService<T> {
resolver: Resolver<T>, resolver: Resolver<T>,
} }
impl<T: Address> Service for ConnectService<T> { impl<T: Address + Unpin> Service for ConnectService<T> {
type Request = Connect<T>; type Request = Connect<T>;
type Response = Connection<T, TcpStream>; type Response = Connection<T, TcpStream>;
type Error = ConnectError; type Error = ConnectError;
type Future = ConnectServiceResponse<T>; type Future = ConnectServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Ok(Async::Ready(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, req: Connect<T>) -> Self::Future { fn call(&mut self, req: Connect<T>) -> Self::Future {
ConnectServiceResponse { ConnectServiceResponse {
fut1: Some(self.resolver.call(req)), state: ConnectState::Resolve(self.resolver.call(req)),
fut2: None,
tcp: self.tcp.clone(), tcp: self.tcp.clone(),
} }
} }
} }
pub struct ConnectServiceResponse<T: Address> { enum ConnectState<T: Address + Unpin> {
fut1: Option<<Resolver<T> as Service>::Future>, Resolve(<Resolver<T> as Service>::Future),
fut2: Option<<TcpConnector<T> as Service>::Future>, Connect(<TcpConnector<T> as Service>::Future),
}
impl<T: Address + Unpin> ConnectState<T> {
fn poll(
&mut self,
cx: &mut Context,
) -> Either<Poll<Result<Connection<T, TcpStream>, ConnectError>>, Connect<T>> {
match self {
ConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) {
Poll::Pending => Either::Left(Poll::Pending),
Poll::Ready(Ok(res)) => Either::Right(res),
Poll::Ready(Err(err)) => Either::Left(Poll::Ready(Err(err))),
},
ConnectState::Connect(ref mut fut) => Either::Left(Pin::new(fut).poll(cx)),
}
}
}
pub struct ConnectServiceResponse<T: Address + Unpin> {
state: ConnectState<T>,
tcp: TcpConnector<T>, tcp: TcpConnector<T>,
} }
impl<T: Address> Future for ConnectServiceResponse<T> { impl<T: Address + Unpin> Future for ConnectServiceResponse<T> {
type Item = Connection<T, TcpStream>; type Output = Result<Connection<T, TcpStream>, ConnectError>;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Some(ref mut fut) = self.fut1 { let res = match self.state.poll(cx) {
let res = try_ready!(fut.poll()); Either::Right(res) => {
let _ = self.fut1.take(); self.state = ConnectState::Connect(self.tcp.call(res));
self.fut2 = Some(self.tcp.call(res)); self.state.poll(cx)
}
Either::Left(res) => return res,
};
match res {
Either::Left(res) => res,
Either::Right(_) => panic!(),
} }
if let Some(ref mut fut) = self.fut2 {
return fut.poll();
}
Ok(Async::NotReady)
} }
} }
@ -136,48 +159,73 @@ pub struct TcpConnectService<T> {
resolver: Resolver<T>, resolver: Resolver<T>,
} }
impl<T: Address> Service for TcpConnectService<T> { impl<T: Address + Unpin + 'static> Service for TcpConnectService<T> {
type Request = Connect<T>; type Request = Connect<T>;
type Response = TcpStream; type Response = TcpStream;
type Error = ConnectError; type Error = ConnectError;
type Future = TcpConnectServiceResponse<T>; type Future = TcpConnectServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Ok(Async::Ready(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, req: Connect<T>) -> Self::Future { fn call(&mut self, req: Connect<T>) -> Self::Future {
TcpConnectServiceResponse { TcpConnectServiceResponse {
fut1: Some(self.resolver.call(req)), state: TcpConnectState::Resolve(self.resolver.call(req)),
fut2: None,
tcp: self.tcp.clone(), tcp: self.tcp.clone(),
} }
} }
} }
pub struct TcpConnectServiceResponse<T: Address> { enum TcpConnectState<T: Address + Unpin> {
fut1: Option<<Resolver<T> as Service>::Future>, Resolve(<Resolver<T> as Service>::Future),
fut2: Option<<TcpConnector<T> as Service>::Future>, Connect(<TcpConnector<T> as Service>::Future),
}
impl<T: Address + Unpin> TcpConnectState<T> {
fn poll(
&mut self,
cx: &mut Context,
) -> Either<Poll<Result<TcpStream, ConnectError>>, Connect<T>> {
match self {
TcpConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) {
Poll::Pending => (),
Poll::Ready(Ok(res)) => return Either::Right(res),
Poll::Ready(Err(err)) => return Either::Left(Poll::Ready(Err(err))),
},
TcpConnectState::Connect(ref mut fut) => {
if let Poll::Ready(res) = Pin::new(fut).poll(cx) {
return match res {
Ok(conn) => Either::Left(Poll::Ready(Ok(conn.into_parts().0))),
Err(err) => Either::Left(Poll::Ready(Err(err))),
};
}
}
}
Either::Left(Poll::Pending)
}
}
pub struct TcpConnectServiceResponse<T: Address + Unpin> {
state: TcpConnectState<T>,
tcp: TcpConnector<T>, tcp: TcpConnector<T>,
} }
impl<T: Address> Future for TcpConnectServiceResponse<T> { impl<T: Address + Unpin> Future for TcpConnectServiceResponse<T> {
type Item = TcpStream; type Output = Result<TcpStream, ConnectError>;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Some(ref mut fut) = self.fut1 { let res = match self.state.poll(cx) {
let res = try_ready!(fut.poll()); Either::Right(res) => {
let _ = self.fut1.take(); self.state = TcpConnectState::Connect(self.tcp.call(res));
self.fut2 = Some(self.tcp.call(res)); self.state.poll(cx)
}
if let Some(ref mut fut) = self.fut2 {
if let Async::Ready(conn) = fut.poll()? {
return Ok(Async::Ready(conn.into_parts().0));
} }
} Either::Left(res) => return res,
};
Ok(Async::NotReady) match res {
Either::Left(res) => res,
Either::Right(_) => panic!(),
}
} }
} }

View File

@ -1,12 +1,12 @@
//! SSL Services //! SSL Services
#[cfg(feature = "ssl")] #[cfg(feature = "openssl")]
mod openssl; mod openssl;
#[cfg(feature = "ssl")] #[cfg(feature = "openssl")]
pub use self::openssl::{ pub use self::openssl::{
OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector, OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector,
}; };
#[cfg(feature = "rust-tls")] // #[cfg(feature = "rustls")]
mod rustls; // mod rustls;
#[cfg(feature = "rust-tls")] // #[cfg(feature = "rustls")]
pub use self::rustls::RustlsConnector; // pub use self::rustls::RustlsConnector;

View File

@ -1,12 +1,16 @@
use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, io}; use std::{fmt, io};
use actix_codec::{AsyncRead, AsyncWrite}; use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{NewService, Service}; use actix_service::{Service, ServiceFactory};
use futures::{future::ok, future::FutureResult, try_ready, Async, Future, Poll}; use futures::{future::ok, future::Ready, ready};
use openssl::ssl::{HandshakeError, SslConnector}; use open_ssl::ssl::{HandshakeError, SslConnector};
use pin_project::pin_project;
use tokio_net::tcp::TcpStream;
use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream}; use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream};
use tokio_tcp::TcpStream;
use trust_dns_resolver::AsyncResolver; use trust_dns_resolver::AsyncResolver;
use crate::{ use crate::{
@ -30,7 +34,7 @@ impl<T, U> OpensslConnector<T, U> {
impl<T, U> OpensslConnector<T, U> impl<T, U> OpensslConnector<T, U>
where where
T: Address, T: Address + Unpin,
U: AsyncRead + AsyncWrite + fmt::Debug, U: AsyncRead + AsyncWrite + fmt::Debug,
{ {
pub fn service( pub fn service(
@ -56,7 +60,7 @@ impl<T, U> Clone for OpensslConnector<T, U> {
} }
} }
impl<T: Address, U> NewService for OpensslConnector<T, U> impl<T: Address + Unpin, U> ServiceFactory for OpensslConnector<T, U>
where where
U: AsyncRead + AsyncWrite + fmt::Debug, U: AsyncRead + AsyncWrite + fmt::Debug,
{ {
@ -66,7 +70,7 @@ where
type Config = (); type Config = ();
type Service = OpensslConnectorService<T, U>; type Service = OpensslConnectorService<T, U>;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
ok(OpensslConnectorService { ok(OpensslConnectorService {
@ -90,7 +94,7 @@ impl<T, U> Clone for OpensslConnectorService<T, U> {
} }
} }
impl<T: Address, U> Service for OpensslConnectorService<T, U> impl<T: Address + Unpin, U> Service for OpensslConnectorService<T, U>
where where
U: AsyncRead + AsyncWrite + fmt::Debug, U: AsyncRead + AsyncWrite + fmt::Debug,
{ {
@ -99,8 +103,8 @@ where
type Error = HandshakeError<U>; type Error = HandshakeError<U>;
type Future = ConnectAsyncExt<T, U>; type Future = ConnectAsyncExt<T, U>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Ok(Async::Ready(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, stream: Connection<T, U>) -> Self::Future { fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
@ -113,29 +117,33 @@ where
} }
} }
#[pin_project]
pub struct ConnectAsyncExt<T, U> { pub struct ConnectAsyncExt<T, U> {
#[pin]
fut: ConnectAsync<U>, fut: ConnectAsync<U>,
stream: Option<Connection<T, ()>>, stream: Option<Connection<T, ()>>,
} }
impl<T: Address, U> Future for ConnectAsyncExt<T, U> impl<T: Address + Unpin, U> Future for ConnectAsyncExt<T, U>
where where
U: AsyncRead + AsyncWrite + fmt::Debug, U: AsyncRead + AsyncWrite + fmt::Debug,
{ {
type Item = Connection<T, SslStream<U>>; type Output = Result<Connection<T, SslStream<U>>, HandshakeError<U>>;
type Error = HandshakeError<U>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.fut.poll().map_err(|e| { let this = self.project();
trace!("SSL Handshake error: {:?}", e);
e match this.fut.poll(cx) {
})? { Poll::Ready(Ok(stream)) => {
Async::Ready(stream) => { let s = this.stream.take().unwrap();
let s = self.stream.take().unwrap();
trace!("SSL Handshake success: {:?}", s.host()); trace!("SSL Handshake success: {:?}", s.host());
Ok(Async::Ready(s.replace(stream).1)) Poll::Ready(Ok(s.replace(stream).1))
} }
Async::NotReady => Ok(Async::NotReady), Poll::Ready(Err(e)) => {
trace!("SSL Handshake error: {:?}", e);
e
}
Poll::Pending => Poll::Pending,
} }
} }
} }
@ -145,7 +153,7 @@ pub struct OpensslConnectServiceFactory<T> {
openssl: OpensslConnector<T, TcpStream>, openssl: OpensslConnector<T, TcpStream>,
} }
impl<T> OpensslConnectServiceFactory<T> { impl<T: Unpin> OpensslConnectServiceFactory<T> {
/// Construct new OpensslConnectService factory /// Construct new OpensslConnectService factory
pub fn new(connector: SslConnector) -> Self { pub fn new(connector: SslConnector) -> Self {
OpensslConnectServiceFactory { OpensslConnectServiceFactory {
@ -183,14 +191,14 @@ impl<T> Clone for OpensslConnectServiceFactory<T> {
} }
} }
impl<T: Address> NewService for OpensslConnectServiceFactory<T> { impl<T: Address + Unpin> ServiceFactory for OpensslConnectServiceFactory<T> {
type Request = Connect<T>; type Request = Connect<T>;
type Response = SslStream<TcpStream>; type Response = SslStream<TcpStream>;
type Error = ConnectError; type Error = ConnectError;
type Config = (); type Config = ();
type Service = OpensslConnectService<T>; type Service = OpensslConnectService<T>;
type InitError = (); type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>; type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &()) -> Self::Future { fn new_service(&self, _: &()) -> Self::Future {
ok(self.service()) ok(self.service())
@ -203,14 +211,14 @@ pub struct OpensslConnectService<T> {
openssl: OpensslConnectorService<T, TcpStream>, openssl: OpensslConnectorService<T, TcpStream>,
} }
impl<T: Address> Service for OpensslConnectService<T> { impl<T: Address + Unpin> Service for OpensslConnectService<T> {
type Request = Connect<T>; type Request = Connect<T>;
type Response = SslStream<TcpStream>; type Response = SslStream<TcpStream>;
type Error = ConnectError; type Error = ConnectError;
type Future = OpensslConnectServiceResponse<T>; type Future = OpensslConnectServiceResponse<T>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> { fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Ok(Async::Ready(())) Poll::Ready(Ok(()))
} }
fn call(&mut self, req: Connect<T>) -> Self::Future { fn call(&mut self, req: Connect<T>) -> Self::Future {
@ -222,30 +230,36 @@ impl<T: Address> Service for OpensslConnectService<T> {
} }
} }
pub struct OpensslConnectServiceResponse<T: Address> { pub struct OpensslConnectServiceResponse<T: Address + Unpin> {
fut1: Option<<ConnectService<T> as Service>::Future>, fut1: Option<<ConnectService<T> as Service>::Future>,
fut2: Option<<OpensslConnectorService<T, TcpStream> as Service>::Future>, fut2: Option<<OpensslConnectorService<T, TcpStream> as Service>::Future>,
openssl: OpensslConnectorService<T, TcpStream>, openssl: OpensslConnectorService<T, TcpStream>,
} }
impl<T: Address> Future for OpensslConnectServiceResponse<T> { impl<T: Address + Unpin> Future for OpensslConnectServiceResponse<T> {
type Item = SslStream<TcpStream>; type Output = Result<SslStream<TcpStream>, ConnectError>;
type Error = ConnectError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Some(ref mut fut) = self.fut1 { if let Some(ref mut fut) = self.fut1 {
let res = try_ready!(fut.poll()); match ready!(Pin::new(fut).poll(cx)) {
let _ = self.fut1.take(); Ok(res) => {
self.fut2 = Some(self.openssl.call(res)); let _ = self.fut1.take();
self.fut2 = Some(self.openssl.call(res));
}
Err(e) => return Poll::Ready(Err(e.into())),
}
} }
if let Some(ref mut fut) = self.fut2 { if let Some(ref mut fut) = self.fut2 {
let connect = try_ready!(fut match ready!(Pin::new(fut).poll(cx)) {
.poll() Ok(connect) => Poll::Ready(Ok(connect.into_parts().0)),
.map_err(|e| ConnectError::Io(io::Error::new(io::ErrorKind::Other, e)))); Err(e) => Poll::Ready(Err(ConnectError::Io(io::Error::new(
Ok(Async::Ready(connect.into_parts().0)) io::ErrorKind::Other,
e,
)))),
}
} else { } else {
Ok(Async::NotReady) Poll::Pending
} }
} }
} }

View File

@ -1,13 +1,14 @@
use std::io;
use actix_codec::{BytesCodec, Framed}; use actix_codec::{BytesCodec, Framed};
use actix_server_config::Io; use actix_server_config::Io;
use actix_service::{service_fn, NewService, Service}; use actix_service::{service_fn, Service, ServiceFactory};
use actix_testing::{self as test, TestServer}; use actix_testing::{self as test, TestServer};
use bytes::Bytes; use bytes::Bytes;
use futures::{future::lazy, Future, Sink}; use futures::SinkExt;
use http::{HttpTryFrom, Uri};
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use actix_connect::{default_connector, Connect}; use actix_connect::Connect;
#[cfg(feature = "ssl")] #[cfg(feature = "ssl")]
#[test] #[test]
@ -43,57 +44,46 @@ fn test_rustls_string() {
assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
} }
#[test] #[tokio::test]
fn test_static_str() { async fn test_static_str() {
let srv = TestServer::with(|| { let srv = TestServer::with(|| {
service_fn(|io: Io<tokio_tcp::TcpStream>| { service_fn(|io: Io<tokio_net::tcp::TcpStream>| {
Framed::new(io.into_parts().0, BytesCodec) async {
.send(Bytes::from_static(b"test")) let mut framed = Framed::new(io.into_parts().0, BytesCodec);
.then(|_| Ok::<_, ()>(())) framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
}
}) })
}); });
let resolver = test::block_on(lazy( let resolver = actix_connect::start_default_resolver();
|| Ok::<_, ()>(actix_connect::start_default_resolver()), let mut conn = actix_connect::new_connector(resolver.clone());
))
.unwrap();
let mut conn = test::block_on(lazy(|| { let con = conn.call(Connect::with("10", srv.addr())).await.unwrap();
Ok::<_, ()>(actix_connect::new_connector(resolver.clone()))
}))
.unwrap();
let con = test::block_on(conn.call(Connect::with("10", srv.addr()))).unwrap();
assert_eq!(con.peer_addr().unwrap(), srv.addr()); assert_eq!(con.peer_addr().unwrap(), srv.addr());
let connect = Connect::new(srv.host().to_owned()); let connect = Connect::new(srv.host().to_owned());
let mut conn = let mut conn = actix_connect::new_connector(resolver);
test::block_on(lazy(|| Ok::<_, ()>(actix_connect::new_connector(resolver)))).unwrap(); let con = conn.call(connect).await;
let con = test::block_on(conn.call(connect));
assert!(con.is_err()); assert!(con.is_err());
} }
#[test] #[test]
fn test_new_service() { fn test_new_service() {
let srv = TestServer::with(|| { let srv = TestServer::with(|| {
service_fn(|io: Io<tokio_tcp::TcpStream>| { service_fn(|io: Io<tokio_net::tcp::TcpStream>| {
Framed::new(io.into_parts().0, BytesCodec) async {
.send(Bytes::from_static(b"test")) let mut framed = Framed::new(io.into_parts().0, BytesCodec);
.then(|_| Ok::<_, ()>(())) framed.send(Bytes::from_static(b"test")).await?;
Ok::<_, io::Error>(())
}
}) })
}); });
let resolver = test::block_on(lazy(|| { let resolver = test::block_on(async {
Ok::<_, ()>(actix_connect::start_resolver( actix_connect::start_resolver(ResolverConfig::default(), ResolverOpts::default())
ResolverConfig::default(), });
ResolverOpts::default(), let factory = test::block_on(async { actix_connect::new_connector_factory(resolver) });
))
}))
.unwrap();
let factory = test::block_on(lazy(|| {
Ok::<_, ()>(actix_connect::new_connector_factory(resolver))
}))
.unwrap();
let mut conn = test::block_on(factory.new_service(&())).unwrap(); let mut conn = test::block_on(factory.new_service(&())).unwrap();
let con = test::block_on(conn.call(Connect::with("10", srv.addr()))).unwrap(); let con = test::block_on(conn.call(Connect::with("10", srv.addr()))).unwrap();

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-rt" name = "actix-rt"
version = "0.2.5" version = "1.0.0-alpha.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix runtime" description = "Actix runtime"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@ -30,8 +30,8 @@ rustls = ["rust-tls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-co
# uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"] # uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"]
[dependencies] [dependencies]
actix-rt = "0.2.2" actix-rt = "1.0.0-alpha.1"
actix-service = "0.4.1" actix-service = "1.0.0-alpha.1"
actix-server-config = "0.2.0" actix-server-config = "0.2.0"
log = "0.4" log = "0.4"
@ -67,5 +67,5 @@ webpki-roots = { version = "0.17", optional = true }
[dev-dependencies] [dev-dependencies]
bytes = "0.4" bytes = "0.4"
actix-codec = "0.1.2" actix-codec = "0.2.0-alpha.1"
env_logger = "0.6" env_logger = "0.6"

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-service" name = "actix-service"
version = "0.4.2" version = "1.0.0-alpha.1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix Service" description = "Actix Service"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]

View File

@ -110,7 +110,7 @@ impl<T: ServiceFactory> PipelineFactory<T> {
/// Call another service after call to this one has resolved successfully. /// Call another service after call to this one has resolved successfully.
pub fn and_then<F, U>( pub fn and_then<F, U>(
self, self,
factory: U, factory: F,
) -> PipelineFactory< ) -> PipelineFactory<
impl ServiceFactory< impl ServiceFactory<
Config = T::Config, Config = T::Config,

View File

@ -17,10 +17,10 @@ name = "actix_testing"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-rt = "0.2.5" actix-rt = "1.0.0-alpha.1"
actix-server = "0.7.0" actix-server = "0.7.0"
actix-server-config = "0.2.0" actix-server-config = "0.2.0"
actix-service = "0.4.2" actix-service = "1.0.0-alpha.1"
log = "0.4" log = "0.4"
net2 = "0.2" net2 = "0.2"

View File

@ -1,6 +1,6 @@
[package] [package]
name = "actix-utils" name = "actix-utils"
version = "0.5.0" version = "0.5.0-alpha1"
authors = ["Nikolay Kim <fafhrd91@gmail.com>"] authors = ["Nikolay Kim <fafhrd91@gmail.com>"]
description = "Actix utils - various actix net related services" description = "Actix utils - various actix net related services"
keywords = ["network", "framework", "async", "futures"] keywords = ["network", "framework", "async", "futures"]
@ -18,8 +18,8 @@ name = "actix_utils"
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
actix-service = "0.4.1" actix-service = "1.0.0-alpha.1"
actix-codec = "0.1.2" actix-codec = "0.2.0-alpha.1"
bytes = "0.4" bytes = "0.4"
either = "1.5.2" either = "1.5.2"
futures = "0.3.1" futures = "0.3.1"