migratte openssl and rustls connecttors

This commit is contained in:
Nikolay Kim 2019-11-14 17:29:05 +06:00
parent b6f9a78011
commit f895c7d186
4 changed files with 68 additions and 59 deletions

View File

@ -49,11 +49,12 @@ trust-dns-resolver = { version="0.18.0-alpha.1", default-features = false }
# openssl
open-ssl = { version="0.10", package = "openssl", optional = true }
tokio-openssl = { version="0.3", optional = true }
tokio-openssl = { version = "0.4.0-alpha.6", optional = true }
#rustls
rust-tls = { version = "0.16.0", package = "rustls", optional = true }
tokio-rustls = { version = "0.10.0", optional = true }
# tokio-rustls = { version = "0.10.0", optional = true }
tokio-rustls = { git = "https://github.com/quininer/tokio-rustls.git", branch = "tokio-0.2", optional = true }
webpki = { version = "0.21", optional = true }
[dev-dependencies]

View File

@ -6,7 +6,8 @@ mod openssl;
pub use self::openssl::{
OpensslConnectService, OpensslConnectServiceFactory, OpensslConnector,
};
// #[cfg(feature = "rustls")]
// mod rustls;
// #[cfg(feature = "rustls")]
// pub use self::rustls::RustlsConnector;
#[cfg(feature = "rustls")]
mod rustls;
#[cfg(feature = "rustls")]
pub use self::rustls::RustlsConnector;

View File

@ -6,11 +6,11 @@ use std::{fmt, io};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{Service, ServiceFactory};
use futures::{future::ok, future::Ready, ready};
use open_ssl::ssl::{HandshakeError, SslConnector};
use futures::future::{err, ok, Either, FutureExt, LocalBoxFuture, Ready};
use open_ssl::ssl::SslConnector;
use pin_project::pin_project;
use tokio_net::tcp::TcpStream;
use tokio_openssl::{ConnectAsync, SslConnectorExt, SslStream};
use tokio_openssl::{HandshakeError, SslStream};
use trust_dns_resolver::AsyncResolver;
use crate::{
@ -34,15 +34,15 @@ impl<T, U> OpensslConnector<T, U> {
impl<T, U> OpensslConnector<T, U>
where
T: Address + Unpin,
U: AsyncRead + AsyncWrite + fmt::Debug,
T: Address + Unpin + 'static,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
{
pub fn service(
connector: SslConnector,
) -> impl Service<
Request = Connection<T, U>,
Response = Connection<T, SslStream<U>>,
Error = HandshakeError<U>,
Error = io::Error,
> {
OpensslConnectorService {
connector: connector,
@ -60,13 +60,13 @@ impl<T, U> Clone for OpensslConnector<T, U> {
}
}
impl<T: Address + Unpin, U> ServiceFactory for OpensslConnector<T, U>
impl<T: Address + Unpin + 'static, U> ServiceFactory for OpensslConnector<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
{
type Request = Connection<T, U>;
type Response = Connection<T, SslStream<U>>;
type Error = HandshakeError<U>;
type Error = io::Error;
type Config = ();
type Service = OpensslConnectorService<T, U>;
type InitError = ();
@ -94,14 +94,14 @@ impl<T, U> Clone for OpensslConnectorService<T, U> {
}
}
impl<T: Address + Unpin, U> Service for OpensslConnectorService<T, U>
impl<T: Address + Unpin + 'static, U> Service for OpensslConnectorService<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
{
type Request = Connection<T, U>;
type Response = Connection<T, SslStream<U>>;
type Error = HandshakeError<U>;
type Future = ConnectAsyncExt<T, U>;
type Error = io::Error;
type Future = Either<ConnectAsyncExt<T, U>, Ready<Result<Self::Response, Self::Error>>>;
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
@ -110,9 +110,16 @@ where
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", stream.host());
let (io, stream) = stream.replace(());
ConnectAsyncExt {
fut: SslConnectorExt::connect_async(&self.connector, stream.host(), io),
stream: Some(stream),
let host = stream.host().to_string();
match self.connector.configure() {
Err(e) => Either::Right(err(io::Error::new(io::ErrorKind::Other, e))),
Ok(config) => Either::Left(ConnectAsyncExt {
fut: async move { tokio_openssl::connect(config, &host, io).await }
.boxed_local(),
stream: Some(stream),
_t: PhantomData,
}),
}
}
}
@ -120,15 +127,16 @@ where
#[pin_project]
pub struct ConnectAsyncExt<T, U> {
#[pin]
fut: ConnectAsync<U>,
fut: LocalBoxFuture<'static, Result<SslStream<U>, HandshakeError<U>>>,
stream: Option<Connection<T, ()>>,
_t: PhantomData<U>,
}
impl<T: Address + Unpin, U> Future for ConnectAsyncExt<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug + 'static,
{
type Output = Result<Connection<T, SslStream<U>>, HandshakeError<U>>;
type Output = Result<Connection<T, SslStream<U>>, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
@ -141,7 +149,7 @@ where
}
Poll::Ready(Err(e)) => {
trace!("SSL Handshake error: {:?}", e);
e
Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, format!("{}", e))))
}
Poll::Pending => Poll::Pending,
}
@ -191,7 +199,7 @@ impl<T> Clone for OpensslConnectServiceFactory<T> {
}
}
impl<T: Address + Unpin> ServiceFactory for OpensslConnectServiceFactory<T> {
impl<T: Address + Unpin + 'static> ServiceFactory for OpensslConnectServiceFactory<T> {
type Request = Connect<T>;
type Response = SslStream<TcpStream>;
type Error = ConnectError;
@ -211,7 +219,7 @@ pub struct OpensslConnectService<T> {
openssl: OpensslConnectorService<T, TcpStream>,
}
impl<T: Address + Unpin> Service for OpensslConnectService<T> {
impl<T: Address + Unpin + 'static> Service for OpensslConnectService<T> {
type Request = Connect<T>;
type Response = SslStream<TcpStream>;
type Error = ConnectError;
@ -230,7 +238,7 @@ impl<T: Address + Unpin> Service for OpensslConnectService<T> {
}
}
pub struct OpensslConnectServiceResponse<T: Address + Unpin> {
pub struct OpensslConnectServiceResponse<T: Address + Unpin + 'static> {
fut1: Option<<ConnectService<T> as Service>::Future>,
fut2: Option<<OpensslConnectorService<T, TcpStream> as Service>::Future>,
openssl: OpensslConnectorService<T, TcpStream>,
@ -239,9 +247,9 @@ pub struct OpensslConnectServiceResponse<T: Address + Unpin> {
impl<T: Address + Unpin> Future for OpensslConnectServiceResponse<T> {
type Output = Result<SslStream<TcpStream>, ConnectError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Some(ref mut fut) = self.fut1 {
match ready!(Pin::new(fut).poll(cx)) {
match futures::ready!(Pin::new(fut).poll(cx)) {
Ok(res) => {
let _ = self.fut1.take();
self.fut2 = Some(self.openssl.call(res));
@ -251,7 +259,7 @@ impl<T: Address + Unpin> Future for OpensslConnectServiceResponse<T> {
}
if let Some(ref mut fut) = self.fut2 {
match ready!(Pin::new(fut).poll(cx)) {
match futures::ready!(Pin::new(fut).poll(cx)) {
Ok(connect) => Poll::Ready(Ok(connect.into_parts().0)),
Err(e) => Poll::Ready(Err(ConnectError::Io(io::Error::new(
io::ErrorKind::Other,

View File

@ -1,10 +1,13 @@
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use actix_codec::{AsyncRead, AsyncWrite};
use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use std::sync::Arc;
use actix_service::{Service, ServiceFactory};
use futures::future::{ok, Ready};
use tokio_rustls::{client::TlsStream, rustls::ClientConfig, Connect, TlsConnector};
use webpki::DNSNameRef;
@ -27,8 +30,8 @@ impl<T, U> RustlsConnector<T, U> {
impl<T, U> RustlsConnector<T, U>
where
T: Address,
U: AsyncRead + AsyncWrite + fmt::Debug,
T: Address + Unpin,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
{
pub fn service(
connector: Arc<ClientConfig>,
@ -53,9 +56,9 @@ impl<T, U> Clone for RustlsConnector<T, U> {
}
}
impl<T: Address, U> NewService for RustlsConnector<T, U>
impl<T: Address + Unpin, U> ServiceFactory for RustlsConnector<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
{
type Request = Connection<T, U>;
type Response = Connection<T, TlsStream<U>>;
@ -63,7 +66,7 @@ where
type Config = ();
type Service = RustlsConnectorService<T, U>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: &()) -> Self::Future {
ok(RustlsConnectorService {
@ -78,17 +81,17 @@ pub struct RustlsConnectorService<T, U> {
_t: PhantomData<(T, U)>,
}
impl<T: Address, U> Service for RustlsConnectorService<T, U>
impl<T: Address + Unpin, U> Service for RustlsConnectorService<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
{
type Request = Connection<T, U>;
type Response = Connection<T, TlsStream<U>>;
type Error = std::io::Error;
type Future = ConnectAsyncExt<T, U>;
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
Ok(Async::Ready(()))
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, stream: Connection<T, U>) -> Self::Future {
@ -108,24 +111,20 @@ pub struct ConnectAsyncExt<T, U> {
stream: Option<Connection<T, ()>>,
}
impl<T: Address, U> Future for ConnectAsyncExt<T, U>
impl<T: Address + Unpin, U> Future for ConnectAsyncExt<T, U>
where
U: AsyncRead + AsyncWrite + fmt::Debug,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
{
type Item = Connection<T, TlsStream<U>>;
type Error = std::io::Error;
type Output = Result<Connection<T, TlsStream<U>>, std::io::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.fut.poll().map_err(|e| {
trace!("SSL Handshake error: {:?}", e);
e
})? {
Async::Ready(stream) => {
let s = self.stream.take().unwrap();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.get_mut();
Poll::Ready(
futures::ready!(Pin::new(&mut this.fut).poll(cx)).map(|stream| {
let s = this.stream.take().unwrap();
trace!("SSL Handshake success: {:?}", s.host());
Ok(Async::Ready(s.replace(stream).1))
}
Async::NotReady => Ok(Async::NotReady),
}
s.replace(stream).1
}),
)
}
}