Fix mistake

This commit is contained in:
SleeplessOne1917 2024-05-02 01:12:28 -04:00
parent b88733d84f
commit 6f50d110bf
2 changed files with 155 additions and 117 deletions

View File

@ -38,6 +38,7 @@ pub(crate) static MAX_CONN: AtomicUsize = AtomicUsize::new(256);
feature = "rustls-0_20", feature = "rustls-0_20",
feature = "rustls-0_21", feature = "rustls-0_21",
feature = "rustls-0_22", feature = "rustls-0_22",
feature = "rustls-0_23",
feature = "native-tls", feature = "native-tls",
))] ))]
pub(crate) const DEFAULT_TLS_HANDSHAKE_TIMEOUT: std::time::Duration = pub(crate) const DEFAULT_TLS_HANDSHAKE_TIMEOUT: std::time::Duration =

View File

@ -1,161 +1,198 @@
//! Rustls based connector service. //! `rustls` v0.23 based TLS connection acceptor service.
//! //!
//! See [`TlsConnector`] for main connector service factory docs. //! See [`Acceptor`] for main service factory docs.
use std::{ use std::{
convert::Infallible,
future::Future, future::Future,
io, io::{self, IoSlice},
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
time::Duration,
}; };
use actix_rt::net::ActixStream; use actix_rt::{
use actix_service::{Service, ServiceFactory}; net::{ActixStream, Ready},
use actix_utils::future::{ok, Ready}; time::{sleep, Sleep},
use futures_core::ready;
use rustls_pki_types_1::ServerName;
use tokio_rustls::{
client::TlsStream as AsyncTlsStream, rustls::ClientConfig, Connect as RustlsConnect,
TlsConnector as RustlsTlsConnector,
}; };
use actix_service::{Service, ServiceFactory};
use actix_utils::{
counter::{Counter, CounterGuard},
future::{ready, Ready as FutReady},
};
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::{Accept, TlsAcceptor};
use tokio_rustls_026 as tokio_rustls; use tokio_rustls_026 as tokio_rustls;
use crate::connect::{Connection, Host}; use super::{TlsError, DEFAULT_TLS_HANDSHAKE_TIMEOUT, MAX_CONN_COUNTER};
pub mod reexports { pub mod reexports {
//! Re-exports from the `rustls` v0.23 ecosystem that are useful for connectors. //! Re-exports from `rustls` that are useful for acceptors.
pub use tokio_rustls_026::{client::TlsStream as AsyncTlsStream, rustls::ClientConfig}; pub use tokio_rustls_026::rustls::ServerConfig;
#[cfg(feature = "rustls-0_23-webpki-roots")]
pub use webpki_roots_026::TLS_SERVER_ROOTS;
} }
/// Returns root certificates via `rustls-native-certs` crate as a rustls certificate store. /// Wraps a `rustls` based async TLS stream in order to implement [`ActixStream`].
/// pub struct TlsStream<IO>(tokio_rustls::server::TlsStream<IO>);
/// See [`rustls_native_certs::load_native_certs()`] for more info on behavior and errors.
#[cfg(feature = "rustls-0_23-native-roots")]
pub fn native_roots_cert_store() -> io::Result<tokio_rustls::rustls::RootCertStore> {
let mut root_certs = tokio_rustls::rustls::RootCertStore::empty();
for cert in rustls_native_certs_07::load_native_certs()? { impl_more::impl_from!(<IO> in tokio_rustls::server::TlsStream<IO> => TlsStream<IO>);
root_certs.add(cert).unwrap(); impl_more::impl_deref_and_mut!(<IO> in TlsStream<IO> => tokio_rustls::server::TlsStream<IO>);
}
Ok(root_certs) impl<IO: ActixStream> AsyncRead for TlsStream<IO> {
} fn poll_read(
self: Pin<&mut Self>,
/// Returns standard root certificates from `webpki-roots` crate as a rustls certificate store. cx: &mut Context<'_>,
#[cfg(feature = "rustls-0_23-webpki-roots")] buf: &mut ReadBuf<'_>,
pub fn webpki_roots_cert_store() -> tokio_rustls::rustls::RootCertStore { ) -> Poll<io::Result<()>> {
let mut root_certs = tokio_rustls::rustls::RootCertStore::empty(); Pin::new(&mut **self.get_mut()).poll_read(cx, buf)
root_certs.extend(webpki_roots_026::TLS_SERVER_ROOTS.to_owned());
root_certs
}
/// Connector service factory using `rustls`.
#[derive(Clone)]
pub struct TlsConnector {
connector: Arc<ClientConfig>,
}
impl TlsConnector {
/// Constructs new connector service factory from a `rustls` client configuration.
pub fn new(connector: Arc<ClientConfig>) -> Self {
TlsConnector { connector }
}
/// Constructs new connector service from a `rustls` client configuration.
pub fn service(connector: Arc<ClientConfig>) -> TlsConnectorService {
TlsConnectorService { connector }
} }
} }
impl<R, IO> ServiceFactory<Connection<R, IO>> for TlsConnector impl<IO: ActixStream> AsyncWrite for TlsStream<IO> {
where fn poll_write(
R: Host, self: Pin<&mut Self>,
IO: ActixStream + 'static, cx: &mut Context<'_>,
{ buf: &[u8],
type Response = Connection<R, AsyncTlsStream<IO>>; ) -> Poll<io::Result<usize>> {
type Error = io::Error; Pin::new(&mut **self.get_mut()).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut **self.get_mut()).poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut **self.get_mut()).poll_shutdown(cx)
}
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut **self.get_mut()).poll_write_vectored(cx, bufs)
}
fn is_write_vectored(&self) -> bool {
(**self).is_write_vectored()
}
}
impl<IO: ActixStream> ActixStream for TlsStream<IO> {
fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
IO::poll_read_ready((**self).get_ref().0, cx)
}
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<Ready>> {
IO::poll_write_ready((**self).get_ref().0, cx)
}
}
/// Accept TLS connections via the `rustls` crate.
pub struct Acceptor {
config: Arc<reexports::ServerConfig>,
handshake_timeout: Duration,
}
impl Acceptor {
/// Constructs `rustls` based acceptor service factory.
pub fn new(config: reexports::ServerConfig) -> Self {
Acceptor {
config: Arc::new(config),
handshake_timeout: DEFAULT_TLS_HANDSHAKE_TIMEOUT,
}
}
/// Limit the amount of time that the acceptor will wait for a TLS handshake to complete.
///
/// Default timeout is 3 seconds.
pub fn set_handshake_timeout(&mut self, handshake_timeout: Duration) -> &mut Self {
self.handshake_timeout = handshake_timeout;
self
}
}
impl Clone for Acceptor {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
handshake_timeout: self.handshake_timeout,
}
}
}
impl<IO: ActixStream> ServiceFactory<IO> for Acceptor {
type Response = TlsStream<IO>;
type Error = TlsError<io::Error, Infallible>;
type Config = (); type Config = ();
type Service = TlsConnectorService; type Service = AcceptorService;
type InitError = (); type InitError = ();
type Future = Ready<Result<Self::Service, Self::InitError>>; type Future = FutReady<Result<Self::Service, Self::InitError>>;
fn new_service(&self, _: ()) -> Self::Future { fn new_service(&self, _: ()) -> Self::Future {
ok(TlsConnectorService { let res = MAX_CONN_COUNTER.with(|conns| {
connector: self.connector.clone(), Ok(AcceptorService {
}) acceptor: self.config.clone().into(),
conns: conns.clone(),
handshake_timeout: self.handshake_timeout,
})
});
ready(res)
} }
} }
/// Connector service using `rustls`. /// Rustls based acceptor service.
#[derive(Clone)] pub struct AcceptorService {
pub struct TlsConnectorService { acceptor: TlsAcceptor,
connector: Arc<ClientConfig>, conns: Counter,
handshake_timeout: Duration,
} }
impl<R, IO> Service<Connection<R, IO>> for TlsConnectorService impl<IO: ActixStream> Service<IO> for AcceptorService {
where type Response = TlsStream<IO>;
R: Host, type Error = TlsError<io::Error, Infallible>;
IO: ActixStream, type Future = AcceptFut<IO>;
{
type Response = Connection<R, AsyncTlsStream<IO>>;
type Error = io::Error;
type Future = ConnectFut<R, IO>;
actix_service::always_ready!(); fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.conns.available(cx) {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn call(&self, connection: Connection<R, IO>) -> Self::Future { fn call(&self, req: IO) -> Self::Future {
tracing::trace!("TLS handshake start for: {:?}", connection.hostname()); AcceptFut {
let (stream, conn) = connection.replace_io(()); fut: self.acceptor.accept(req),
timeout: sleep(self.handshake_timeout),
match ServerName::try_from(conn.hostname()) { _guard: self.conns.get(),
Ok(host) => ConnectFut::Future {
connect: RustlsTlsConnector::from(Arc::clone(&self.connector))
.connect(host.to_owned(), stream),
connection: Some(conn),
},
Err(_) => ConnectFut::InvalidServerName,
} }
} }
} }
/// Connect future for Rustls service. pin_project! {
#[doc(hidden)] /// Accept future for Rustls service.
#[allow(clippy::large_enum_variant)] #[doc(hidden)]
pub enum ConnectFut<R, IO> { pub struct AcceptFut<IO: ActixStream> {
InvalidServerName, fut: Accept<IO>,
Future { #[pin]
connect: RustlsConnect<IO>, timeout: Sleep,
connection: Option<Connection<R, ()>>, _guard: CounterGuard,
}, }
} }
impl<R, IO> Future for ConnectFut<R, IO> impl<IO: ActixStream> Future for AcceptFut<IO> {
where type Output = Result<TlsStream<IO>, TlsError<io::Error, Infallible>>;
R: Host,
IO: ActixStream,
{
type Output = io::Result<Connection<R, AsyncTlsStream<IO>>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.get_mut() { let mut this = self.project();
Self::InvalidServerName => Poll::Ready(Err(io::Error::new( match Pin::new(&mut this.fut).poll(cx) {
io::ErrorKind::InvalidInput, Poll::Ready(Ok(stream)) => Poll::Ready(Ok(TlsStream(stream))),
"connection parameters specified invalid server name", Poll::Ready(Err(err)) => Poll::Ready(Err(TlsError::Tls(err))),
))), Poll::Pending => this.timeout.poll(cx).map(|_| Err(TlsError::Timeout)),
Self::Future {
connect,
connection,
} => {
let stream = ready!(Pin::new(connect).poll(cx))?;
let connection = connection.take().unwrap();
tracing::trace!("TLS handshake success: {:?}", connection.hostname());
Poll::Ready(Ok(connection.replace_io(stream).1))
}
} }
} }
} }