update openssl impl

This commit is contained in:
Nikolay Kim 2019-11-14 05:37:58 +06:00
parent 210f183aa0
commit b12b3b12a9
5 changed files with 45 additions and 64 deletions

View File

@ -36,7 +36,7 @@ actix-server-config = "0.2.0"
log = "0.4"
num_cpus = "1.0"
pin-project = "0.4.5"
mio = "0.6.19"
net2 = "0.2"
futures = "0.3.1"

View File

@ -13,10 +13,10 @@ mod nativetls;
#[cfg(feature = "nativetls")]
pub use self::nativetls::NativeTlsAcceptor;
#[cfg(feature = "rustls")]
mod rustls;
#[cfg(feature = "rustls")]
pub use self::rustls::RustlsAcceptor;
//#[cfg(feature = "rustls")]
//mod rustls;
//#[cfg(feature = "rustls")]
//pub use self::rustls::RustlsAcceptor;
/// Sets the maximum per-worker concurrent ssl connection establish process.
///

View File

@ -3,10 +3,7 @@ use std::marker::PhantomData;
use std::task::{Context, Poll};
use actix_service::{Service, ServiceFactory};
use futures::{
future::{self, LocalBoxFuture},
FutureExt as _, TryFutureExt as _,
};
use futures::future::{self, FutureExt as _, LocalBoxFuture, TryFutureExt as _};
use native_tls::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tls::{TlsAcceptor, TlsStream};

View File

@ -1,18 +1,18 @@
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_service::{NewService, Service};
use futures::{future::ok, future::Ready, Future, FutureExt, Poll};
use openssl::ssl::SslAcceptor;
use actix_service::{Service, ServiceFactory};
use futures::future::{ok, FutureExt, LocalBoxFuture, Ready};
use open_ssl::ssl::SslAcceptor;
use pin_project::pin_project;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_openssl::{HandshakeError, SslStream};
use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER;
use crate::{Io, Protocol, ServerConfig};
use futures::future::LocalBoxFuture;
use std::io;
use std::pin::Pin;
use std::task::Context;
/// Support `SSL` connections via openssl package
///
@ -41,7 +41,7 @@ impl<T: AsyncRead + AsyncWrite, P> Clone for OpensslAcceptor<T, P> {
}
}
impl<T: AsyncRead + AsyncWrite + Unpin + 'static, P> NewService for OpensslAcceptor<T, P> {
impl<T: AsyncRead + AsyncWrite + Unpin + 'static, P> ServiceFactory for OpensslAcceptor<T, P> {
type Request = Io<T, P>;
type Response = Io<SslStream<T>, P>;
type Error = HandshakeError<T>;
@ -75,22 +75,13 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static, P> Service for OpensslAcceptor
type Error = HandshakeError<T>;
type Future = OpensslAcceptorServiceFut<T, P>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
unimplemented!()
}
/*
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
if self.conns.available() {
Ok(Async::Ready(()))
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(ctx) {
Poll::Ready(Ok(()))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
*/
fn call(&mut self, req: Self::Request) -> Self::Future {
let (io, params, _) = req.into_parts();
@ -107,10 +98,12 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static, P> Service for OpensslAcceptor
}
}
#[pin_project]
pub struct OpensslAcceptorServiceFut<T, P>
where
T: AsyncRead + AsyncWrite,
{
#[pin]
fut: LocalBoxFuture<'static, Result<SslStream<T>, HandshakeError<T>>>,
params: Option<P>,
_guard: CounterGuard,
@ -120,16 +113,10 @@ impl<T: AsyncRead + AsyncWrite, P> Future for OpensslAcceptorServiceFut<T, P> {
type Output = Result<Io<SslStream<T>, P>, HandshakeError<T>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unimplemented!()
}
let this = self.project();
/*
type Item = Io<SslStream<T>, P>;
type Error = HandshakeError<T>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let io = futures::ready!(self.fut.poll())?;
let proto = if let Some(protos) = io.get_ref().ssl().selected_alpn_protocol() {
let io = futures::ready!(this.fut.poll(cx))?;
let proto = if let Some(protos) = io.ssl().selected_alpn_protocol() {
const H2: &[u8] = b"\x02h2";
const HTTP10: &[u8] = b"\x08http/1.0";
const HTTP11: &[u8] = b"\x08http/1.1";
@ -146,11 +133,7 @@ impl<T: AsyncRead + AsyncWrite, P> Future for OpensslAcceptorServiceFut<T, P> {
} else {
Protocol::Unknown
};
Ok(Async::Ready(Io::from_parts(
io,
self.params.take().unwrap(),
proto,
)))
Poll::Ready(Ok(Io::from_parts(io, this.params.take().unwrap(), proto)))
}
*/
}

View File

@ -1,18 +1,20 @@
use std::future::Future;
use std::io;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use actix_service::{NewService, Service};
use futures::{future::ok, future::FutureResult, Async, Future, Poll};
use rustls::ServerConfig;
use actix_service::{Service, ServiceFactory};
use futures::future::{ok, Ready};
use pin_project::pin_project;
use rust_tls::ServerConfig;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_rustls::{server::TlsStream, Accept, TlsAcceptor};
use crate::counter::{Counter, CounterGuard};
use crate::ssl::MAX_CONN_COUNTER;
use crate::{Io, Protocol, ServerConfig as SrvConfig};
use std::pin::Pin;
use std::task::Context;
/// Support `SSL` connections via rustls package
///
@ -41,7 +43,7 @@ impl<T, P> Clone for RustlsAcceptor<T, P> {
}
}
impl<T: AsyncRead + AsyncWrite, P> NewService for RustlsAcceptor<T, P> {
impl<T: AsyncRead + AsyncWrite, P> ServiceFactory for RustlsAcceptor<T, P> {
type Request = Io<T, P>;
type Response = Io<TlsStream<T>, P>;
type Error = io::Error;
@ -49,7 +51,7 @@ impl<T: AsyncRead + AsyncWrite, P> NewService for RustlsAcceptor<T, P> {
type Config = SrvConfig;
type Service = RustlsAcceptorService<T, P>;
type InitError = ();
type Future = FutureResult<Self::Service, Self::InitError>;
type Future = Ready<Result<Self::Service, Self::InitError>>;
fn new_service(&self, cfg: &SrvConfig) -> Self::Future {
cfg.set_secure();
@ -76,14 +78,11 @@ impl<T: AsyncRead + AsyncWrite, P> Service for RustlsAcceptorService<T, P> {
type Error = io::Error;
type Future = RustlsAcceptorServiceFut<T, P>;
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) {
Ok(Async::Ready(()))
Poll::Ready(Ok(()))
} else {
Ok(Async::NotReady)
Poll::Pending
}
}
@ -97,24 +96,26 @@ impl<T: AsyncRead + AsyncWrite, P> Service for RustlsAcceptorService<T, P> {
}
}
#[pin_project]
pub struct RustlsAcceptorServiceFut<T, P>
where
T: AsyncRead + AsyncWrite,
{
#[pin]
fut: Accept<T>,
params: Option<P>,
_guard: CounterGuard,
}
impl<T: AsyncRead + AsyncWrite, P> Future for RustlsAcceptorServiceFut<T, P> {
type Item = Io<TlsStream<T>, P>;
type Error = io::Error;
type Output = Result<Io<TlsStream<T>, P>, io::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let io = futures::try_ready!(self.fut.poll());
Ok(Async::Ready(Io::from_parts(
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
let io = futures::ready!(this.fut.poll(cx));
Poll::Ready(Ok(Io::from_parts(
io,
self.params.take().unwrap(),
this.params.take().unwrap(),
Protocol::Unknown,
)))
}