From a87591769ce8962f9d20d67e7e6e9f5beeba95a0 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 14 Nov 2019 10:53:59 +0600 Subject: [PATCH] migrate actix-connect to std::future --- actix-codec/Cargo.toml | 2 +- actix-connect/Cargo.toml | 25 +++-- actix-connect/src/connector.rs | 65 ++++++------ actix-connect/src/lib.rs | 28 ++--- actix-connect/src/resolver.rs | 60 ++++++----- actix-connect/src/service.rs | 152 ++++++++++++++++++---------- actix-connect/src/ssl/mod.rs | 12 +-- actix-connect/src/ssl/openssl.rs | 96 ++++++++++-------- actix-connect/tests/test_connect.rs | 66 +++++------- actix-rt/Cargo.toml | 2 +- actix-server/Cargo.toml | 6 +- actix-service/Cargo.toml | 2 +- actix-service/src/pipeline.rs | 2 +- actix-testing/Cargo.toml | 4 +- actix-utils/Cargo.toml | 6 +- 15 files changed, 297 insertions(+), 231 deletions(-) diff --git a/actix-codec/Cargo.toml b/actix-codec/Cargo.toml index ff672051..51a7b21b 100644 --- a/actix-codec/Cargo.toml +++ b/actix-codec/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-codec" -version = "0.1.2" +version = "0.2.0-alpha.1" authors = ["Nikolay Kim "] description = "Utilities for encoding and decoding frames" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 85b626ae..8999ee43 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "actix-connect" -version = "0.3.0" +version = "1.0.0-alpha.1" authors = ["Nikolay Kim "] -description = "Actix Connector - tcp connector service" +description = "Actix connect - tcp connector service" keywords = ["network", "framework", "async", "futures"] homepage = "https://actix.rs" repository = "https://github.com/actix/actix-net.git" @@ -14,7 +14,7 @@ edition = "2018" workspace = ".." [package.metadata.docs.rs] -features = ["ssl", "uri"] +features = ["openssl", "rustls", "uri"] [lib] name = "actix_connect" @@ -24,33 +24,35 @@ path = "src/lib.rs" default = ["uri"] # openssl -ssl = ["openssl", "tokio-openssl"] +openssl = ["open-ssl", "tokio-openssl"] #rustls -rust-tls = ["rustls", "tokio-rustls", "webpki"] +rustls = ["rust-tls", "tokio-rustls", "webpki"] # support http::Uri as connect address uri = ["http"] [dependencies] -actix-service = "0.4.0" -actix-codec = "0.1.2" -actix-utils = "0.4.0" -actix-rt = "0.2.5" +actix-service = "1.0.0-alpha.1" +actix-codec = "0.2.0-alpha.1" +actix-utils = "0.5.0-alpha.1" +actix-rt = "1.0.0-alpha.1" derive_more = "0.15" either = "1.5.2" futures = "0.3.1" +pin-project = "0.4.5" http = { version = "0.1.17", optional = true } log = "0.4" tokio-net = "=0.2.0-alpha.6" +tokio-executor = "=0.2.0-alpha.6" trust-dns-resolver = { version="0.18.0-alpha.1", default-features = false } # openssl -openssl = { version="0.10", optional = true } +open-ssl = { version="0.10", package = "openssl", optional = true } tokio-openssl = { version="0.3", optional = true } #rustls -rustls = { version = "0.16.0", optional = true } +rust-tls = { version = "0.16.0", package = "rustls", optional = true } tokio-rustls = { version = "0.10.0", optional = true } webpki = { version = "0.21", optional = true } @@ -58,3 +60,4 @@ webpki = { version = "0.21", optional = true } bytes = "0.4" actix-testing = { version="0.2.0" } actix-server-config = "0.2.0" +tokio = "0.2.0-alpha.6" diff --git a/actix-connect/src/connector.rs b/actix-connect/src/connector.rs index ecb61996..4b811bca 100644 --- a/actix-connect/src/connector.rs +++ b/actix-connect/src/connector.rs @@ -1,11 +1,15 @@ use std::collections::VecDeque; +use std::future::Future; +use std::io; use std::marker::PhantomData; use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; -use actix_service::{NewService, Service}; -use futures::future::{err, ok, Either, FutureResult}; -use futures::{Async, Future, Poll}; -use tokio_tcp::{ConnectFuture, TcpStream}; +use actix_service::{Service, ServiceFactory}; +use futures::future::{err, ok, BoxFuture, Either, FutureExt, Ready}; +use pin_project::pin_project; +use tokio_net::tcp::TcpStream; use super::connect::{Address, Connect, Connection}; use super::error::ConnectError; @@ -37,14 +41,14 @@ impl Clone for TcpConnectorFactory { } } -impl NewService for TcpConnectorFactory { +impl ServiceFactory for TcpConnectorFactory { type Request = Connect; type Response = Connection; type Error = ConnectError; type Config = (); type Service = TcpConnector; type InitError = (); - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(self.service()) @@ -71,10 +75,10 @@ impl Service for TcpConnector { type Request = Connect; type Response = Connection; type Error = ConnectError; - type Future = Either, FutureResult>; + type Future = Either, Ready>>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: Connect) -> Self::Future { @@ -82,21 +86,22 @@ impl Service for TcpConnector { let Connect { req, addr, .. } = req; if let Some(addr) = addr { - Either::A(TcpConnectorResponse::new(req, port, addr)) + Either::Left(TcpConnectorResponse::new(req, port, addr)) } else { error!("TCP connector: got unresolved address"); - Either::B(err(ConnectError::Unresolverd)) + Either::Right(err(ConnectError::Unresolverd)) } } } +#[pin_project] #[doc(hidden)] /// Tcp stream connector response future pub struct TcpConnectorResponse { req: Option, port: u16, addrs: Option>, - stream: Option, + stream: Option>>, } impl TcpConnectorResponse { @@ -116,7 +121,7 @@ impl TcpConnectorResponse { req: Some(req), port, addrs: None, - stream: Some(TcpStream::connect(&addr)), + stream: Some(TcpStream::connect(addr).boxed()), }, either::Either::Right(addrs) => TcpConnectorResponse { req: Some(req), @@ -129,40 +134,40 @@ impl TcpConnectorResponse { } impl Future for TcpConnectorResponse { - type Item = Connection; - type Error = ConnectError; + type Output = Result, ConnectError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); - fn poll(&mut self) -> Poll { // connect loop { - if let Some(new) = self.stream.as_mut() { - match new.poll() { - Ok(Async::Ready(sock)) => { - let req = self.req.take().unwrap(); + if let Some(new) = this.stream.as_mut() { + match new.as_mut().poll(cx) { + Poll::Ready(Ok(sock)) => { + let req = this.req.take().unwrap(); trace!( "TCP connector - successfully connected to connecting to {:?} - {:?}", req.host(), sock.peer_addr() ); - return Ok(Async::Ready(Connection::new(sock, req))); + return Poll::Ready(Ok(Connection::new(sock, req))); } - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(err) => { + Poll::Pending => return Poll::Pending, + Poll::Ready(Err(err)) => { trace!( "TCP connector - failed to connect to connecting to {:?} port: {}", - self.req.as_ref().unwrap().host(), - self.port, + this.req.as_ref().unwrap().host(), + this.port, ); - if self.addrs.is_none() || self.addrs.as_ref().unwrap().is_empty() { - return Err(err.into()); + if this.addrs.is_none() || this.addrs.as_ref().unwrap().is_empty() { + return Poll::Ready(Err(err.into())); } } } } // try to connect - self.stream = Some(TcpStream::connect( - &self.addrs.as_mut().unwrap().pop_front().unwrap(), - )); + let addr = this.addrs.as_mut().unwrap().pop_front().unwrap(); + *this.stream = Some(TcpStream::connect(addr).boxed()); } } } diff --git a/actix-connect/src/lib.rs b/actix-connect/src/lib.rs index e79c9d1f..6663206a 100644 --- a/actix-connect/src/lib.rs +++ b/actix-connect/src/lib.rs @@ -31,12 +31,12 @@ pub use self::resolver::{Resolver, ResolverFactory}; pub use self::service::{ConnectService, ConnectServiceFactory, TcpConnectService}; use actix_rt::Arbiter; -use actix_service::{NewService, Service, ServiceExt}; -use tokio_tcp::TcpStream; +use actix_service::{pipeline, pipeline_factory, Service, ServiceFactory}; +use tokio_net::tcp::TcpStream; pub fn start_resolver(cfg: ResolverConfig, opts: ResolverOpts) -> AsyncResolver { let (resolver, bg) = AsyncResolver::new(cfg, opts); - tokio_current_thread::spawn(bg); + tokio_executor::current_thread::spawn(bg); resolver } @@ -55,7 +55,7 @@ pub(crate) fn get_default_resolver() -> AsyncResolver { }; let (resolver, bg) = AsyncResolver::new(cfg, opts); - tokio_current_thread::spawn(bg); + tokio_executor::current_thread::spawn(bg); Arbiter::set_item(DefaultResolver(resolver.clone())); resolver @@ -70,37 +70,37 @@ pub fn start_default_resolver() -> AsyncResolver { pub fn new_connector( resolver: AsyncResolver, ) -> impl Service, Response = Connection, Error = ConnectError> - + Clone { - Resolver::new(resolver).and_then(TcpConnector::new()) +{ + pipeline(Resolver::new(resolver)).and_then(TcpConnector::new()) } /// Create tcp connector service pub fn new_connector_factory( resolver: AsyncResolver, -) -> impl NewService< +) -> impl ServiceFactory< Config = (), Request = Connect, Response = Connection, Error = ConnectError, InitError = (), -> + Clone { - ResolverFactory::new(resolver).and_then(TcpConnectorFactory::new()) +> { + pipeline_factory(ResolverFactory::new(resolver)).and_then(TcpConnectorFactory::new()) } /// Create connector service with default parameters pub fn default_connector( ) -> impl Service, Response = Connection, Error = ConnectError> - + Clone { - Resolver::default().and_then(TcpConnector::new()) +{ + pipeline(Resolver::default()).and_then(TcpConnector::new()) } /// Create connector service factory with default parameters -pub fn default_connector_factory() -> impl NewService< +pub fn default_connector_factory() -> impl ServiceFactory< Config = (), Request = Connect, Response = Connection, Error = ConnectError, InitError = (), -> + Clone { - ResolverFactory::default().and_then(TcpConnectorFactory::new()) +> { + pipeline_factory(ResolverFactory::default()).and_then(TcpConnectorFactory::new()) } diff --git a/actix-connect/src/resolver.rs b/actix-connect/src/resolver.rs index 0d0234a2..56df979e 100644 --- a/actix-connect/src/resolver.rs +++ b/actix-connect/src/resolver.rs @@ -1,9 +1,12 @@ +use std::future::Future; use std::marker::PhantomData; use std::net::SocketAddr; +use std::pin::Pin; +use std::task::{Context, Poll}; -use actix_service::{NewService, Service}; -use futures::future::{ok, Either, FutureResult}; -use futures::{Async, Future, Poll}; +use actix_service::{Service, ServiceFactory}; +use futures::future::{ok, Either, Ready}; +use pin_project::pin_project; use trust_dns_resolver::lookup_ip::LookupIpFuture; use trust_dns_resolver::{AsyncResolver, Background}; @@ -52,14 +55,14 @@ impl Clone for ResolverFactory { } } -impl NewService for ResolverFactory { +impl ServiceFactory for ResolverFactory { type Request = Connect; type Response = Connect; type Error = ConnectError; type Config = (); type Service = Resolver; type InitError = (); - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(self.service()) @@ -104,32 +107,34 @@ impl Service for Resolver { type Request = Connect; type Response = Connect; type Error = ConnectError; - type Future = Either, FutureResult, Self::Error>>; + type Future = Either, Ready, Self::Error>>>; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, mut req: Connect) -> Self::Future { if req.addr.is_some() { - Either::B(ok(req)) + Either::Right(ok(req)) } else if let Ok(ip) = req.host().parse() { req.addr = Some(either::Either::Left(SocketAddr::new(ip, req.port()))); - Either::B(ok(req)) + Either::Right(ok(req)) } else { trace!("DNS resolver: resolving host {:?}", req.host()); if self.resolver.is_none() { self.resolver = Some(get_default_resolver()); } - Either::A(ResolverFuture::new(req, self.resolver.as_ref().unwrap())) + Either::Left(ResolverFuture::new(req, self.resolver.as_ref().unwrap())) } } } +#[pin_project] #[doc(hidden)] /// Resolver future pub struct ResolverFuture { req: Option>, + #[pin] lookup: Background, } @@ -149,22 +154,15 @@ impl ResolverFuture { } impl Future for ResolverFuture { - type Item = Connect; - type Error = ConnectError; + type Output = Result, ConnectError>; - fn poll(&mut self) -> Poll { - match self.lookup.poll().map_err(|e| { - trace!( - "DNS resolver: failed to resolve host {:?} err: {}", - self.req.as_ref().unwrap().host(), - e - ); - e - })? { - Async::NotReady => Ok(Async::NotReady), - Async::Ready(ips) => { - let req = self.req.take().unwrap(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + match this.lookup.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(ips)) => { + let req = this.req.take().unwrap(); let port = req.port(); let req = req.set_addrs(ips.iter().map(|ip| SocketAddr::new(ip, port))); @@ -175,11 +173,19 @@ impl Future for ResolverFuture { ); if req.addr.is_none() { - Err(ConnectError::NoRecords) + Poll::Ready(Err(ConnectError::NoRecords)) } else { - Ok(Async::Ready(req)) + Poll::Ready(Ok(req)) } } + Poll::Ready(Err(e)) => { + trace!( + "DNS resolver: failed to resolve host {:?} err: {}", + this.req.as_ref().unwrap().host(), + e + ); + Poll::Ready(Err(e.into())) + } } } } diff --git a/actix-connect/src/service.rs b/actix-connect/src/service.rs index 8bd3cf0d..917c849d 100644 --- a/actix-connect/src/service.rs +++ b/actix-connect/src/service.rs @@ -1,7 +1,11 @@ -use actix_service::{NewService, Service}; -use futures::future::{ok, FutureResult}; -use futures::{try_ready, Async, Future, Poll}; -use tokio_tcp::TcpStream; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use actix_service::{Service, ServiceFactory}; +use either::Either; +use futures::future::{ok, Ready}; +use tokio_net::tcp::TcpStream; use trust_dns_resolver::AsyncResolver; use crate::connect::{Address, Connect, Connection}; @@ -14,7 +18,7 @@ pub struct ConnectServiceFactory { resolver: ResolverFactory, } -impl ConnectServiceFactory { +impl ConnectServiceFactory { /// Construct new ConnectService factory pub fn new() -> Self { ConnectServiceFactory { @@ -66,14 +70,14 @@ impl Clone for ConnectServiceFactory { } } -impl NewService for ConnectServiceFactory { +impl ServiceFactory for ConnectServiceFactory { type Request = Connect; type Response = Connection; type Error = ConnectError; type Config = (); type Service = ConnectService; type InitError = (); - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(self.service()) @@ -86,47 +90,66 @@ pub struct ConnectService { resolver: Resolver, } -impl Service for ConnectService { +impl Service for ConnectService { type Request = Connect; type Response = Connection; type Error = ConnectError; type Future = ConnectServiceResponse; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: Connect) -> Self::Future { ConnectServiceResponse { - fut1: Some(self.resolver.call(req)), - fut2: None, + state: ConnectState::Resolve(self.resolver.call(req)), tcp: self.tcp.clone(), } } } -pub struct ConnectServiceResponse { - fut1: Option< as Service>::Future>, - fut2: Option< as Service>::Future>, +enum ConnectState { + Resolve( as Service>::Future), + Connect( as Service>::Future), +} + +impl ConnectState { + fn poll( + &mut self, + cx: &mut Context, + ) -> Either, ConnectError>>, Connect> { + match self { + ConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) { + Poll::Pending => Either::Left(Poll::Pending), + Poll::Ready(Ok(res)) => Either::Right(res), + Poll::Ready(Err(err)) => Either::Left(Poll::Ready(Err(err))), + }, + ConnectState::Connect(ref mut fut) => Either::Left(Pin::new(fut).poll(cx)), + } + } +} + +pub struct ConnectServiceResponse { + state: ConnectState, tcp: TcpConnector, } -impl Future for ConnectServiceResponse { - type Item = Connection; - type Error = ConnectError; +impl Future for ConnectServiceResponse { + type Output = Result, ConnectError>; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut1 { - let res = try_ready!(fut.poll()); - let _ = self.fut1.take(); - self.fut2 = Some(self.tcp.call(res)); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let res = match self.state.poll(cx) { + Either::Right(res) => { + self.state = ConnectState::Connect(self.tcp.call(res)); + self.state.poll(cx) + } + Either::Left(res) => return res, + }; + + match res { + Either::Left(res) => res, + Either::Right(_) => panic!(), } - - if let Some(ref mut fut) = self.fut2 { - return fut.poll(); - } - - Ok(Async::NotReady) } } @@ -136,48 +159,73 @@ pub struct TcpConnectService { resolver: Resolver, } -impl Service for TcpConnectService { +impl Service for TcpConnectService { type Request = Connect; type Response = TcpStream; type Error = ConnectError; type Future = TcpConnectServiceResponse; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: Connect) -> Self::Future { TcpConnectServiceResponse { - fut1: Some(self.resolver.call(req)), - fut2: None, + state: TcpConnectState::Resolve(self.resolver.call(req)), tcp: self.tcp.clone(), } } } -pub struct TcpConnectServiceResponse { - fut1: Option< as Service>::Future>, - fut2: Option< as Service>::Future>, +enum TcpConnectState { + Resolve( as Service>::Future), + Connect( as Service>::Future), +} + +impl TcpConnectState { + fn poll( + &mut self, + cx: &mut Context, + ) -> Either>, Connect> { + match self { + TcpConnectState::Resolve(ref mut fut) => match Pin::new(fut).poll(cx) { + Poll::Pending => (), + Poll::Ready(Ok(res)) => return Either::Right(res), + Poll::Ready(Err(err)) => return Either::Left(Poll::Ready(Err(err))), + }, + TcpConnectState::Connect(ref mut fut) => { + if let Poll::Ready(res) = Pin::new(fut).poll(cx) { + return match res { + Ok(conn) => Either::Left(Poll::Ready(Ok(conn.into_parts().0))), + Err(err) => Either::Left(Poll::Ready(Err(err))), + }; + } + } + } + Either::Left(Poll::Pending) + } +} + +pub struct TcpConnectServiceResponse { + state: TcpConnectState, tcp: TcpConnector, } -impl Future for TcpConnectServiceResponse { - type Item = TcpStream; - type Error = ConnectError; +impl Future for TcpConnectServiceResponse { + type Output = Result; - fn poll(&mut self) -> Poll { - if let Some(ref mut fut) = self.fut1 { - let res = try_ready!(fut.poll()); - let _ = self.fut1.take(); - self.fut2 = Some(self.tcp.call(res)); - } - - if let Some(ref mut fut) = self.fut2 { - if let Async::Ready(conn) = fut.poll()? { - return Ok(Async::Ready(conn.into_parts().0)); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let res = match self.state.poll(cx) { + Either::Right(res) => { + self.state = TcpConnectState::Connect(self.tcp.call(res)); + self.state.poll(cx) } - } + Either::Left(res) => return res, + }; - Ok(Async::NotReady) + match res { + Either::Left(res) => res, + Either::Right(_) => panic!(), + } } } diff --git a/actix-connect/src/ssl/mod.rs b/actix-connect/src/ssl/mod.rs index 6c02848c..5f1de69d 100644 --- a/actix-connect/src/ssl/mod.rs +++ b/actix-connect/src/ssl/mod.rs @@ -1,12 +1,12 @@ //! SSL Services -#[cfg(feature = "ssl")] +#[cfg(feature = "openssl")] mod openssl; -#[cfg(feature = "ssl")] +#[cfg(feature = "openssl")] pub use self::openssl::{ OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector, }; -#[cfg(feature = "rust-tls")] -mod rustls; -#[cfg(feature = "rust-tls")] -pub use self::rustls::RustlsConnector; +// #[cfg(feature = "rustls")] +// mod rustls; +// #[cfg(feature = "rustls")] +// pub use self::rustls::RustlsConnector; diff --git a/actix-connect/src/ssl/openssl.rs b/actix-connect/src/ssl/openssl.rs index ff1f846d..bbe6099c 100644 --- a/actix-connect/src/ssl/openssl.rs +++ b/actix-connect/src/ssl/openssl.rs @@ -1,12 +1,16 @@ +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::{fmt, io}; use actix_codec::{AsyncRead, AsyncWrite}; -use actix_service::{NewService, Service}; -use futures::{future::ok, future::FutureResult, try_ready, Async, Future, Poll}; -use openssl::ssl::{HandshakeError, SslConnector}; +use actix_service::{Service, ServiceFactory}; +use futures::{future::ok, future::Ready, ready}; +use open_ssl::ssl::{HandshakeError, SslConnector}; +use pin_project::pin_project; +use tokio_net::tcp::TcpStream; use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream}; -use tokio_tcp::TcpStream; use trust_dns_resolver::AsyncResolver; use crate::{ @@ -30,7 +34,7 @@ impl OpensslConnector { impl OpensslConnector where - T: Address, + T: Address + Unpin, U: AsyncRead + AsyncWrite + fmt::Debug, { pub fn service( @@ -56,7 +60,7 @@ impl Clone for OpensslConnector { } } -impl NewService for OpensslConnector +impl ServiceFactory for OpensslConnector where U: AsyncRead + AsyncWrite + fmt::Debug, { @@ -66,7 +70,7 @@ where type Config = (); type Service = OpensslConnectorService; type InitError = (); - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(OpensslConnectorService { @@ -90,7 +94,7 @@ impl Clone for OpensslConnectorService { } } -impl Service for OpensslConnectorService +impl Service for OpensslConnectorService where U: AsyncRead + AsyncWrite + fmt::Debug, { @@ -99,8 +103,8 @@ where type Error = HandshakeError; type Future = ConnectAsyncExt; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, stream: Connection) -> Self::Future { @@ -113,29 +117,33 @@ where } } +#[pin_project] pub struct ConnectAsyncExt { + #[pin] fut: ConnectAsync, stream: Option>, } -impl Future for ConnectAsyncExt +impl Future for ConnectAsyncExt where U: AsyncRead + AsyncWrite + fmt::Debug, { - type Item = Connection>; - type Error = HandshakeError; + type Output = Result>, HandshakeError>; - fn poll(&mut self) -> Poll { - match self.fut.poll().map_err(|e| { - trace!("SSL Handshake error: {:?}", e); - e - })? { - Async::Ready(stream) => { - let s = self.stream.take().unwrap(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.project(); + + match this.fut.poll(cx) { + Poll::Ready(Ok(stream)) => { + let s = this.stream.take().unwrap(); trace!("SSL Handshake success: {:?}", s.host()); - Ok(Async::Ready(s.replace(stream).1)) + Poll::Ready(Ok(s.replace(stream).1)) } - Async::NotReady => Ok(Async::NotReady), + Poll::Ready(Err(e)) => { + trace!("SSL Handshake error: {:?}", e); + e + } + Poll::Pending => Poll::Pending, } } } @@ -145,7 +153,7 @@ pub struct OpensslConnectServiceFactory { openssl: OpensslConnector, } -impl OpensslConnectServiceFactory { +impl OpensslConnectServiceFactory { /// Construct new OpensslConnectService factory pub fn new(connector: SslConnector) -> Self { OpensslConnectServiceFactory { @@ -183,14 +191,14 @@ impl Clone for OpensslConnectServiceFactory { } } -impl NewService for OpensslConnectServiceFactory { +impl ServiceFactory for OpensslConnectServiceFactory { type Request = Connect; type Response = SslStream; type Error = ConnectError; type Config = (); type Service = OpensslConnectService; type InitError = (); - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(self.service()) @@ -203,14 +211,14 @@ pub struct OpensslConnectService { openssl: OpensslConnectorService, } -impl Service for OpensslConnectService { +impl Service for OpensslConnectService { type Request = Connect; type Response = SslStream; type Error = ConnectError; type Future = OpensslConnectServiceResponse; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, req: Connect) -> Self::Future { @@ -222,30 +230,36 @@ impl Service for OpensslConnectService { } } -pub struct OpensslConnectServiceResponse { +pub struct OpensslConnectServiceResponse { fut1: Option< as Service>::Future>, fut2: Option< as Service>::Future>, openssl: OpensslConnectorService, } -impl Future for OpensslConnectServiceResponse { - type Item = SslStream; - type Error = ConnectError; +impl Future for OpensslConnectServiceResponse { + type Output = Result, ConnectError>; - fn poll(&mut self) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { if let Some(ref mut fut) = self.fut1 { - let res = try_ready!(fut.poll()); - let _ = self.fut1.take(); - self.fut2 = Some(self.openssl.call(res)); + match ready!(Pin::new(fut).poll(cx)) { + Ok(res) => { + let _ = self.fut1.take(); + self.fut2 = Some(self.openssl.call(res)); + } + Err(e) => return Poll::Ready(Err(e.into())), + } } if let Some(ref mut fut) = self.fut2 { - let connect = try_ready!(fut - .poll() - .map_err(|e| ConnectError::Io(io::Error::new(io::ErrorKind::Other, e)))); - Ok(Async::Ready(connect.into_parts().0)) + match ready!(Pin::new(fut).poll(cx)) { + Ok(connect) => Poll::Ready(Ok(connect.into_parts().0)), + Err(e) => Poll::Ready(Err(ConnectError::Io(io::Error::new( + io::ErrorKind::Other, + e, + )))), + } } else { - Ok(Async::NotReady) + Poll::Pending } } } diff --git a/actix-connect/tests/test_connect.rs b/actix-connect/tests/test_connect.rs index 471dd314..da16fbb0 100644 --- a/actix-connect/tests/test_connect.rs +++ b/actix-connect/tests/test_connect.rs @@ -1,13 +1,14 @@ +use std::io; + use actix_codec::{BytesCodec, Framed}; use actix_server_config::Io; -use actix_service::{service_fn, NewService, Service}; +use actix_service::{service_fn, Service, ServiceFactory}; use actix_testing::{self as test, TestServer}; use bytes::Bytes; -use futures::{future::lazy, Future, Sink}; -use http::{HttpTryFrom, Uri}; +use futures::SinkExt; use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; -use actix_connect::{default_connector, Connect}; +use actix_connect::Connect; #[cfg(feature = "ssl")] #[test] @@ -43,57 +44,46 @@ fn test_rustls_string() { assert_eq!(con.peer_addr().unwrap(), srv.addr()); } -#[test] -fn test_static_str() { +#[tokio::test] +async fn test_static_str() { let srv = TestServer::with(|| { - service_fn(|io: Io| { - Framed::new(io.into_parts().0, BytesCodec) - .send(Bytes::from_static(b"test")) - .then(|_| Ok::<_, ()>(())) + service_fn(|io: Io| { + async { + let mut framed = Framed::new(io.into_parts().0, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + } }) }); - let resolver = test::block_on(lazy( - || Ok::<_, ()>(actix_connect::start_default_resolver()), - )) - .unwrap(); + let resolver = actix_connect::start_default_resolver(); + let mut conn = actix_connect::new_connector(resolver.clone()); - let mut conn = test::block_on(lazy(|| { - Ok::<_, ()>(actix_connect::new_connector(resolver.clone())) - })) - .unwrap(); - - let con = test::block_on(conn.call(Connect::with("10", srv.addr()))).unwrap(); + let con = conn.call(Connect::with("10", srv.addr())).await.unwrap(); assert_eq!(con.peer_addr().unwrap(), srv.addr()); let connect = Connect::new(srv.host().to_owned()); - let mut conn = - test::block_on(lazy(|| Ok::<_, ()>(actix_connect::new_connector(resolver)))).unwrap(); - let con = test::block_on(conn.call(connect)); + let mut conn = actix_connect::new_connector(resolver); + let con = conn.call(connect).await; assert!(con.is_err()); } #[test] fn test_new_service() { let srv = TestServer::with(|| { - service_fn(|io: Io| { - Framed::new(io.into_parts().0, BytesCodec) - .send(Bytes::from_static(b"test")) - .then(|_| Ok::<_, ()>(())) + service_fn(|io: Io| { + async { + let mut framed = Framed::new(io.into_parts().0, BytesCodec); + framed.send(Bytes::from_static(b"test")).await?; + Ok::<_, io::Error>(()) + } }) }); - let resolver = test::block_on(lazy(|| { - Ok::<_, ()>(actix_connect::start_resolver( - ResolverConfig::default(), - ResolverOpts::default(), - )) - })) - .unwrap(); - let factory = test::block_on(lazy(|| { - Ok::<_, ()>(actix_connect::new_connector_factory(resolver)) - })) - .unwrap(); + let resolver = test::block_on(async { + actix_connect::start_resolver(ResolverConfig::default(), ResolverOpts::default()) + }); + let factory = test::block_on(async { actix_connect::new_connector_factory(resolver) }); let mut conn = test::block_on(factory.new_service(&())).unwrap(); let con = test::block_on(conn.call(Connect::with("10", srv.addr()))).unwrap(); diff --git a/actix-rt/Cargo.toml b/actix-rt/Cargo.toml index a20684b7..f134bd84 100644 --- a/actix-rt/Cargo.toml +++ b/actix-rt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-rt" -version = "0.2.5" +version = "1.0.0-alpha.1" authors = ["Nikolay Kim "] description = "Actix runtime" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 2146afe0..a2b4f2d7 100644 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -30,8 +30,8 @@ rustls = ["rust-tls", "tokio-rustls", "webpki", "webpki-roots", "actix-server-co # uds = ["mio-uds", "tokio-uds", "actix-server-config/uds"] [dependencies] -actix-rt = "0.2.2" -actix-service = "0.4.1" +actix-rt = "1.0.0-alpha.1" +actix-service = "1.0.0-alpha.1" actix-server-config = "0.2.0" log = "0.4" @@ -67,5 +67,5 @@ webpki-roots = { version = "0.17", optional = true } [dev-dependencies] bytes = "0.4" -actix-codec = "0.1.2" +actix-codec = "0.2.0-alpha.1" env_logger = "0.6" diff --git a/actix-service/Cargo.toml b/actix-service/Cargo.toml index 00df1178..9741bbfc 100644 --- a/actix-service/Cargo.toml +++ b/actix-service/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-service" -version = "0.4.2" +version = "1.0.0-alpha.1" authors = ["Nikolay Kim "] description = "Actix Service" keywords = ["network", "framework", "async", "futures"] diff --git a/actix-service/src/pipeline.rs b/actix-service/src/pipeline.rs index 6eefdfd4..8c771aba 100644 --- a/actix-service/src/pipeline.rs +++ b/actix-service/src/pipeline.rs @@ -110,7 +110,7 @@ impl PipelineFactory { /// Call another service after call to this one has resolved successfully. pub fn and_then( self, - factory: U, + factory: F, ) -> PipelineFactory< impl ServiceFactory< Config = T::Config, diff --git a/actix-testing/Cargo.toml b/actix-testing/Cargo.toml index 4e139e33..84233bc0 100644 --- a/actix-testing/Cargo.toml +++ b/actix-testing/Cargo.toml @@ -17,10 +17,10 @@ name = "actix_testing" path = "src/lib.rs" [dependencies] -actix-rt = "0.2.5" +actix-rt = "1.0.0-alpha.1" actix-server = "0.7.0" actix-server-config = "0.2.0" -actix-service = "0.4.2" +actix-service = "1.0.0-alpha.1" log = "0.4" net2 = "0.2" diff --git a/actix-utils/Cargo.toml b/actix-utils/Cargo.toml index 9b91931b..67053e12 100644 --- a/actix-utils/Cargo.toml +++ b/actix-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "actix-utils" -version = "0.5.0" +version = "0.5.0-alpha1" authors = ["Nikolay Kim "] description = "Actix utils - various actix net related services" keywords = ["network", "framework", "async", "futures"] @@ -18,8 +18,8 @@ name = "actix_utils" path = "src/lib.rs" [dependencies] -actix-service = "0.4.1" -actix-codec = "0.1.2" +actix-service = "1.0.0-alpha.1" +actix-codec = "0.2.0-alpha.1" bytes = "0.4" either = "1.5.2" futures = "0.3.1"