From f895c7d18674e7a3498b75289749889437e3bb73 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Thu, 14 Nov 2019 17:29:05 +0600 Subject: [PATCH] migratte openssl and rustls connecttors --- actix-connect/Cargo.toml | 5 +-- actix-connect/src/ssl/mod.rs | 9 ++--- actix-connect/src/ssl/openssl.rs | 60 ++++++++++++++++++-------------- actix-connect/src/ssl/rustls.rs | 53 ++++++++++++++-------------- 4 files changed, 68 insertions(+), 59 deletions(-) diff --git a/actix-connect/Cargo.toml b/actix-connect/Cargo.toml index 2f33f742..4ede79b8 100644 --- a/actix-connect/Cargo.toml +++ b/actix-connect/Cargo.toml @@ -49,11 +49,12 @@ trust-dns-resolver = { version="0.18.0-alpha.1", default-features = false } # openssl open-ssl = { version="0.10", package = "openssl", optional = true } -tokio-openssl = { version="0.3", optional = true } +tokio-openssl = { version = "0.4.0-alpha.6", optional = true } #rustls rust-tls = { version = "0.16.0", package = "rustls", optional = true } -tokio-rustls = { version = "0.10.0", optional = true } +# tokio-rustls = { version = "0.10.0", optional = true } +tokio-rustls = { git = "https://github.com/quininer/tokio-rustls.git", branch = "tokio-0.2", optional = true } webpki = { version = "0.21", optional = true } [dev-dependencies] diff --git a/actix-connect/src/ssl/mod.rs b/actix-connect/src/ssl/mod.rs index 5f1de69d..8b3bd2e8 100644 --- a/actix-connect/src/ssl/mod.rs +++ b/actix-connect/src/ssl/mod.rs @@ -6,7 +6,8 @@ mod openssl; pub use self::openssl::{ OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector, }; -// #[cfg(feature = "rustls")] -// mod rustls; -// #[cfg(feature = "rustls")] -// pub use self::rustls::RustlsConnector; + +#[cfg(feature = "rustls")] +mod rustls; +#[cfg(feature = "rustls")] +pub use self::rustls::RustlsConnector; diff --git a/actix-connect/src/ssl/openssl.rs b/actix-connect/src/ssl/openssl.rs index bbe6099c..491bcc1d 100644 --- a/actix-connect/src/ssl/openssl.rs +++ b/actix-connect/src/ssl/openssl.rs @@ -6,11 +6,11 @@ use std::{fmt, io}; use actix_codec::{AsyncRead, AsyncWrite}; use actix_service::{Service, ServiceFactory}; -use futures::{future::ok, future::Ready, ready}; -use open_ssl::ssl::{HandshakeError, SslConnector}; +use futures::future::{err, ok, Either, FutureExt, LocalBoxFuture, Ready}; +use open_ssl::ssl::SslConnector; use pin_project::pin_project; use tokio_net::tcp::TcpStream; -use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream}; +use tokio_openssl::{HandshakeError, SslStream}; use trust_dns_resolver::AsyncResolver; use crate::{ @@ -34,15 +34,15 @@ impl OpensslConnector { impl OpensslConnector where - T: Address + Unpin, - U: AsyncRead + AsyncWrite + fmt::Debug, + T: Address + Unpin + 'static, + U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, { pub fn service( connector: SslConnector, ) -> impl Service< Request = Connection, Response = Connection>, - Error = HandshakeError, + Error = io::Error, > { OpensslConnectorService { connector: connector, @@ -60,13 +60,13 @@ impl Clone for OpensslConnector { } } -impl ServiceFactory for OpensslConnector +impl ServiceFactory for OpensslConnector where - U: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, { type Request = Connection; type Response = Connection>; - type Error = HandshakeError; + type Error = io::Error; type Config = (); type Service = OpensslConnectorService; type InitError = (); @@ -94,14 +94,14 @@ impl Clone for OpensslConnectorService { } } -impl Service for OpensslConnectorService +impl Service for OpensslConnectorService where - U: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, { type Request = Connection; type Response = Connection>; - type Error = HandshakeError; - type Future = ConnectAsyncExt; + type Error = io::Error; + type Future = Either, Ready>>; fn poll_ready(&mut self, _: &mut Context) -> Poll> { Poll::Ready(Ok(())) @@ -110,9 +110,16 @@ where fn call(&mut self, stream: Connection) -> Self::Future { trace!("SSL Handshake start for: {:?}", stream.host()); let (io, stream) = stream.replace(()); - ConnectAsyncExt { - fut: SslConnectorExt::connect_async(&self.connector, stream.host(), io), - stream: Some(stream), + let host = stream.host().to_string(); + + match self.connector.configure() { + Err(e) => Either::Right(err(io::Error::new(io::ErrorKind::Other, e))), + Ok(config) => Either::Left(ConnectAsyncExt { + fut: async move { tokio_openssl::connect(config, &host, io).await } + .boxed_local(), + stream: Some(stream), + _t: PhantomData, + }), } } } @@ -120,15 +127,16 @@ where #[pin_project] pub struct ConnectAsyncExt { #[pin] - fut: ConnectAsync, + fut: LocalBoxFuture<'static, Result, HandshakeError>>, stream: Option>, + _t: PhantomData, } impl Future for ConnectAsyncExt where - U: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static, { - type Output = Result>, HandshakeError>; + type Output = Result>, io::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let this = self.project(); @@ -141,7 +149,7 @@ where } Poll::Ready(Err(e)) => { trace!("SSL Handshake error: {:?}", e); - e + Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, format!("{}", e)))) } Poll::Pending => Poll::Pending, } @@ -191,7 +199,7 @@ impl Clone for OpensslConnectServiceFactory { } } -impl ServiceFactory for OpensslConnectServiceFactory { +impl ServiceFactory for OpensslConnectServiceFactory { type Request = Connect; type Response = SslStream; type Error = ConnectError; @@ -211,7 +219,7 @@ pub struct OpensslConnectService { openssl: OpensslConnectorService, } -impl Service for OpensslConnectService { +impl Service for OpensslConnectService { type Request = Connect; type Response = SslStream; type Error = ConnectError; @@ -230,7 +238,7 @@ impl Service for OpensslConnectService { } } -pub struct OpensslConnectServiceResponse { +pub struct OpensslConnectServiceResponse { fut1: Option< as Service>::Future>, fut2: Option< as Service>::Future>, openssl: OpensslConnectorService, @@ -239,9 +247,9 @@ pub struct OpensslConnectServiceResponse { impl Future for OpensslConnectServiceResponse { type Output = Result, ConnectError>; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { if let Some(ref mut fut) = self.fut1 { - match ready!(Pin::new(fut).poll(cx)) { + match futures::ready!(Pin::new(fut).poll(cx)) { Ok(res) => { let _ = self.fut1.take(); self.fut2 = Some(self.openssl.call(res)); @@ -251,7 +259,7 @@ impl Future for OpensslConnectServiceResponse { } if let Some(ref mut fut) = self.fut2 { - match ready!(Pin::new(fut).poll(cx)) { + match futures::ready!(Pin::new(fut).poll(cx)) { Ok(connect) => Poll::Ready(Ok(connect.into_parts().0)), Err(e) => Poll::Ready(Err(ConnectError::Io(io::Error::new( io::ErrorKind::Other, diff --git a/actix-connect/src/ssl/rustls.rs b/actix-connect/src/ssl/rustls.rs index 465ef1e4..c1145ae4 100644 --- a/actix-connect/src/ssl/rustls.rs +++ b/actix-connect/src/ssl/rustls.rs @@ -1,10 +1,13 @@ use std::fmt; +use std::future::Future; use std::marker::PhantomData; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; use actix_codec::{AsyncRead, AsyncWrite}; -use actix_service::{NewService, Service}; -use futures::{future::ok, future::FutureResult, Async, Future, Poll}; -use std::sync::Arc; +use actix_service::{Service, ServiceFactory}; +use futures::future::{ok, Ready}; use tokio_rustls::{client::TlsStream, rustls::ClientConfig, Connect, TlsConnector}; use webpki::DNSNameRef; @@ -27,8 +30,8 @@ impl RustlsConnector { impl RustlsConnector where - T: Address, - U: AsyncRead + AsyncWrite + fmt::Debug, + T: Address + Unpin, + U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { pub fn service( connector: Arc, @@ -53,9 +56,9 @@ impl Clone for RustlsConnector { } } -impl NewService for RustlsConnector +impl ServiceFactory for RustlsConnector where - U: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { type Request = Connection; type Response = Connection>; @@ -63,7 +66,7 @@ where type Config = (); type Service = RustlsConnectorService; type InitError = (); - type Future = FutureResult; + type Future = Ready>; fn new_service(&self, _: &()) -> Self::Future { ok(RustlsConnectorService { @@ -78,17 +81,17 @@ pub struct RustlsConnectorService { _t: PhantomData<(T, U)>, } -impl Service for RustlsConnectorService +impl Service for RustlsConnectorService where - U: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { type Request = Connection; type Response = Connection>; type Error = std::io::Error; type Future = ConnectAsyncExt; - fn poll_ready(&mut self) -> Poll<(), Self::Error> { - Ok(Async::Ready(())) + fn poll_ready(&mut self, _: &mut Context) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, stream: Connection) -> Self::Future { @@ -108,24 +111,20 @@ pub struct ConnectAsyncExt { stream: Option>, } -impl Future for ConnectAsyncExt +impl Future for ConnectAsyncExt where - U: AsyncRead + AsyncWrite + fmt::Debug, + U: AsyncRead + AsyncWrite + Unpin + fmt::Debug, { - type Item = Connection>; - type Error = std::io::Error; + type Output = Result>, std::io::Error>; - fn poll(&mut self) -> Poll { - match self.fut.poll().map_err(|e| { - trace!("SSL Handshake error: {:?}", e); - e - })? { - Async::Ready(stream) => { - let s = self.stream.take().unwrap(); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let this = self.get_mut(); + Poll::Ready( + futures::ready!(Pin::new(&mut this.fut).poll(cx)).map(|stream| { + let s = this.stream.take().unwrap(); trace!("SSL Handshake success: {:?}", s.host()); - Ok(Async::Ready(s.replace(stream).1)) - } - Async::NotReady => Ok(Async::NotReady), - } + s.replace(stream).1 + }), + ) } }