mirror of https://github.com/fafhrd91/actix-net
update tokio-openssl and tokio-rustls
This commit is contained in:
parent
0b2d2df3c7
commit
e4359f7589
|
@ -1,8 +1,9 @@
|
|||
# Changes
|
||||
|
||||
## Unreleased - 2020-xx-xx
|
||||
* Update `tokio-openssl` dependency to 0.5
|
||||
* Update `tokio-rustls` dependency to 0.20
|
||||
* Update `tokio-openssl` to `0.6.0`
|
||||
* Update `tokio-rustls` to `0.22`
|
||||
* Update `rustls` to `0.19.0`
|
||||
|
||||
## 2.0.0 - 2020-09-02
|
||||
- No significant changes from `2.0.0-alpha.4`.
|
||||
|
|
|
@ -43,16 +43,16 @@ futures-util = { version = "0.3.4", default-features = false }
|
|||
http = { version = "0.2.2", optional = true }
|
||||
log = "0.4"
|
||||
# FIXME: Use release version
|
||||
trust-dns-proto = { git = "https://github.com/bluejekyll/trust-dns", branch = "main", default-features = false, features = ["tokio-runtime"] }
|
||||
trust-dns-resolver = { git = "https://github.com/bluejekyll/trust-dns", branch = "main", default-features = false, features = ["tokio-runtime", "system-config"] }
|
||||
trust-dns-proto = { git = "https://github.com/bluejekyll/trust-dns", branch = "main" }
|
||||
trust-dns-resolver = { git = "https://github.com/bluejekyll/trust-dns", branch = "main" }
|
||||
|
||||
# openssl
|
||||
open-ssl = { package = "openssl", version = "0.10", optional = true }
|
||||
tokio-openssl = { version = "0.5.0", optional = true }
|
||||
tokio-openssl = { version = "0.6.0", optional = true }
|
||||
|
||||
# rustls
|
||||
rust-tls = { package = "rustls", version = "0.18.0", optional = true }
|
||||
tokio-rustls = { version = "0.20.0", optional = true }
|
||||
rust-tls = { package = "rustls", version = "0.19.0", optional = true }
|
||||
tokio-rustls = { version = "0.22.0", optional = true }
|
||||
webpki = { version = "0.21", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
@ -5,7 +5,7 @@ use std::task::{Context, Poll};
|
|||
use std::{fmt, io};
|
||||
|
||||
pub use open_ssl::ssl::{Error as SslError, SslConnector, SslMethod};
|
||||
pub use tokio_openssl::{HandshakeError, SslStream};
|
||||
pub use tokio_openssl::SslStream;
|
||||
|
||||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_rt::net::TcpStream;
|
||||
|
@ -112,8 +112,13 @@ where
|
|||
match self.connector.configure() {
|
||||
Err(e) => Either::Right(err(io::Error::new(io::ErrorKind::Other, e))),
|
||||
Ok(config) => Either::Left(ConnectAsyncExt {
|
||||
fut: async move { tokio_openssl::connect(config, &host, io).await }
|
||||
.boxed_local(),
|
||||
// TODO: unbox this future.
|
||||
fut: Box::pin(async move {
|
||||
let ssl = config.into_ssl(host.as_str())?;
|
||||
let mut io = tokio_openssl::SslStream::new(ssl, io)?;
|
||||
Pin::new(&mut io).connect().await?;
|
||||
Ok(io)
|
||||
}),
|
||||
stream: Some(stream),
|
||||
_t: PhantomData,
|
||||
}),
|
||||
|
@ -122,7 +127,7 @@ where
|
|||
}
|
||||
|
||||
pub struct ConnectAsyncExt<T, U> {
|
||||
fut: LocalBoxFuture<'static, Result<SslStream<U>, HandshakeError<U>>>,
|
||||
fut: LocalBoxFuture<'static, Result<SslStream<U>, SslError>>,
|
||||
stream: Option<Connection<T, ()>>,
|
||||
_t: PhantomData<U>,
|
||||
}
|
||||
|
|
|
@ -2,8 +2,9 @@
|
|||
|
||||
## Unreleased - 2020-xx-xx
|
||||
* move from `tokio-tls` to `tokio-native-tls` for native-tls feature.
|
||||
* Update `tokio-openssl` dependency to 0.5.0
|
||||
* Update `tokio-rustls` dependency to 0.20.0
|
||||
* Update `tokio-openssl` to `0.6.0`
|
||||
* Update `tokio-rustls` to `0.22.0`
|
||||
* Update `rust-tls` to `0.19.0`
|
||||
|
||||
## 2.0.0 - 2020-09-03
|
||||
* `nativetls::NativeTlsAcceptor` is renamed to `nativetls::Acceptor`.
|
||||
|
|
|
@ -43,11 +43,11 @@ futures-util = { version = "0.3.4", default-features = false }
|
|||
|
||||
# openssl
|
||||
open-ssl = { package = "openssl", version = "0.10", optional = true }
|
||||
tokio-openssl = { version = "0.5.0", optional = true }
|
||||
tokio-openssl = { version = "0.6.0", optional = true }
|
||||
|
||||
# rustls
|
||||
rust-tls = { package = "rustls", version = "0.18.0", optional = true }
|
||||
tokio-rustls = { version = "0.20.0", optional = true }
|
||||
rust-tls = { package = "rustls", version = "0.19.0", optional = true }
|
||||
tokio-rustls = { version = "0.22.0", optional = true }
|
||||
webpki = { version = "0.21", optional = true }
|
||||
webpki-roots = { version = "0.20", optional = true }
|
||||
|
||||
|
|
|
@ -6,10 +6,11 @@ use std::task::{Context, Poll};
|
|||
use actix_codec::{AsyncRead, AsyncWrite};
|
||||
use actix_service::{Service, ServiceFactory};
|
||||
use actix_utils::counter::{Counter, CounterGuard};
|
||||
use futures_util::future::{ok, FutureExt, LocalBoxFuture, Ready};
|
||||
use futures_util::future::{ready, Ready};
|
||||
use futures_util::ready;
|
||||
|
||||
pub use open_ssl::ssl::{AlpnError, SslAcceptor, SslAcceptorBuilder};
|
||||
pub use tokio_openssl::{HandshakeError, SslStream};
|
||||
pub use open_ssl::ssl::{AlpnError, Error, Ssl, SslAcceptor};
|
||||
pub use tokio_openssl::SslStream;
|
||||
|
||||
use crate::MAX_CONN_COUNTER;
|
||||
|
||||
|
@ -45,7 +46,7 @@ impl<T: AsyncRead + AsyncWrite> Clone for Acceptor<T> {
|
|||
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> ServiceFactory for Acceptor<T> {
|
||||
type Request = T;
|
||||
type Response = SslStream<T>;
|
||||
type Error = HandshakeError<T>;
|
||||
type Error = Error;
|
||||
type Config = ();
|
||||
type Service = AcceptorService<T>;
|
||||
type InitError = ();
|
||||
|
@ -53,11 +54,11 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static> ServiceFactory for Acceptor<T>
|
|||
|
||||
fn new_service(&self, _: ()) -> Self::Future {
|
||||
MAX_CONN_COUNTER.with(|conns| {
|
||||
ok(AcceptorService {
|
||||
ready(Ok(AcceptorService {
|
||||
acceptor: self.acceptor.clone(),
|
||||
conns: conns.clone(),
|
||||
io: PhantomData,
|
||||
})
|
||||
}))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -71,7 +72,7 @@ pub struct AcceptorService<T> {
|
|||
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> Service for AcceptorService<T> {
|
||||
type Request = T;
|
||||
type Response = SslStream<T>;
|
||||
type Error = HandshakeError<T>;
|
||||
type Error = Error;
|
||||
type Future = AcceptorServiceResponse<T>;
|
||||
|
||||
fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
|
@ -83,31 +84,52 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static> Service for AcceptorService<T>
|
|||
}
|
||||
|
||||
fn call(&mut self, req: Self::Request) -> Self::Future {
|
||||
let acc = self.acceptor.clone();
|
||||
AcceptorServiceResponse {
|
||||
_guard: self.conns.get(),
|
||||
fut: async move {
|
||||
let acc = acc;
|
||||
tokio_openssl::accept(&acc, req).await
|
||||
}
|
||||
.boxed_local(),
|
||||
}
|
||||
let guard = self.conns.get();
|
||||
let stream = self.ssl_stream(req);
|
||||
AcceptorServiceResponse::Init(Some(stream), Some(guard))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct AcceptorServiceResponse<T>
|
||||
impl<T: AsyncRead + AsyncWrite + Unpin + 'static> AcceptorService<T> {
|
||||
// construct a new SslStream.
|
||||
// At this point the SslStream does not perform any IO.
|
||||
// The handshake would happen later in AcceptorServiceResponse
|
||||
fn ssl_stream(&self, stream: T) -> Result<SslStream<T>, Error> {
|
||||
let ssl = Ssl::new(self.acceptor.context())?;
|
||||
let stream = SslStream::new(ssl, stream)?;
|
||||
Ok(stream)
|
||||
}
|
||||
}
|
||||
|
||||
pub enum AcceptorServiceResponse<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
{
|
||||
fut: LocalBoxFuture<'static, Result<SslStream<T>, HandshakeError<T>>>,
|
||||
_guard: CounterGuard,
|
||||
Init(Option<Result<SslStream<T>, Error>>, Option<CounterGuard>),
|
||||
Accept(Option<SslStream<T>>, Option<CounterGuard>),
|
||||
}
|
||||
|
||||
impl<T: AsyncRead + AsyncWrite + Unpin> Future for AcceptorServiceResponse<T> {
|
||||
type Output = Result<SslStream<T>, HandshakeError<T>>;
|
||||
type Output = Result<SslStream<T>, Error>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let io = futures_util::ready!(Pin::new(&mut self.fut).poll(cx))?;
|
||||
Poll::Ready(Ok(io))
|
||||
loop {
|
||||
match self.as_mut().get_mut() {
|
||||
AcceptorServiceResponse::Init(res, guard) => {
|
||||
let guard = guard.take();
|
||||
let stream = res.take().unwrap()?;
|
||||
let state = AcceptorServiceResponse::Accept(Some(stream), guard);
|
||||
self.as_mut().set(state);
|
||||
}
|
||||
AcceptorServiceResponse::Accept(stream, guard) => {
|
||||
ready!(Pin::new(stream.as_mut().unwrap()).poll_accept(cx))?;
|
||||
// drop counter guard a little early as the accept has finished
|
||||
guard.take();
|
||||
|
||||
let stream = stream.take().unwrap();
|
||||
return Poll::Ready(Ok(stream));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue