feat: PROXYv1 wrapper

This commit is contained in:
Rob Ede 2025-08-30 00:00:43 +01:00
parent 98d7d0b46b
commit d1f8cbc1a8
No known key found for this signature in database
GPG Key ID: F5E3FCAA33CBF062
7 changed files with 287 additions and 6 deletions

54
Cargo.lock generated
View File

@ -75,6 +75,7 @@ version = "3.11.1"
dependencies = [
"actix-codec",
"actix-http-test",
"actix-proxy-protocol",
"actix-rt",
"actix-server",
"actix-service",
@ -105,6 +106,7 @@ dependencies = [
"local-channel",
"memchr",
"mime",
"nom 8.0.0",
"once_cell",
"openssl",
"percent-encoding",
@ -222,6 +224,27 @@ dependencies = [
"thiserror",
]
[[package]]
name = "actix-proxy-protocol"
version = "0.0.2"
dependencies = [
"actix-rt",
"actix-service",
"actix-utils",
"arrayvec",
"bitflags 2.9.3",
"crc32fast",
"futures-core",
"futures-util",
"impl-more 0.3.1",
"itoa",
"nom 8.0.0",
"pin-project-lite",
"smallvec",
"tokio",
"tracing",
]
[[package]]
name = "actix-router"
version = "0.5.3"
@ -239,9 +262,9 @@ dependencies = [
[[package]]
name = "actix-rt"
version = "2.10.0"
version = "2.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24eda4e2a6e042aa4e55ac438a2ae052d3b5da0ecf83d7411e1a368946925208"
checksum = "92589714878ca59a7626ea19734f0e07a6a875197eec751bb5d3f99e64998c63"
dependencies = [
"actix-macros",
"futures-core",
@ -315,7 +338,7 @@ dependencies = [
"futures-core",
"http 0.2.12",
"http 1.3.1",
"impl-more",
"impl-more 0.1.9",
"openssl",
"pin-project-lite",
"rustls-native-certs",
@ -375,7 +398,7 @@ dependencies = [
"foldhash",
"futures-core",
"futures-util",
"impl-more",
"impl-more 0.1.9",
"itoa",
"language-tags",
"log",
@ -573,6 +596,12 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "arrayvec"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "assert_matches"
version = "1.5.0"
@ -832,7 +861,7 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
"nom 7.1.3",
]
[[package]]
@ -1750,6 +1779,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a5a9a0ff0086c7a148acb942baaabeadf9504d10400b5a05645853729b9cd2"
[[package]]
name = "impl-more"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35a84fd5aa25fae5c0f4a33d9cac2ca017fc622cbd089be2229993514990f870"
[[package]]
name = "indexmap"
version = "2.11.0"
@ -2069,6 +2104,15 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nom"
version = "8.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405"
dependencies = [
"memchr",
]
[[package]]
name = "num-conv"
version = "0.1.0"

View File

@ -60,6 +60,9 @@ http2 = ["dep:h2"]
# WebSocket protocol implementation
ws = ["dep:local-channel", "dep:base64", "dep:rand", "dep:sha1"]
# HAProxy PROXY protocol support
haproxy = ["dep:nom"]
# TLS via OpenSSL
openssl = ["__tls", "actix-tls/accept", "actix-tls/openssl"]
@ -93,6 +96,8 @@ __tls = []
[dependencies]
actix-codec = "0.5"
# actix-proxy-protocol = "0.0.2"
actix-proxy-protocol = { path = "../../actix-web-lab/actix-proxy-protocol" }
actix-rt = { version = "2.2", default-features = false }
actix-service = "2"
actix-utils = "3"
@ -104,6 +109,7 @@ derive_more = { version = "2", features = ["as_ref", "deref", "deref_mut", "disp
encoding_rs = "0.8"
foldhash = "0.1"
futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.17", default-features = false, features = ["alloc"] }
http = "0.2.7"
httparse = "1.5.1"
httpdate = "1.0.1"
@ -126,6 +132,9 @@ local-channel = { version = "0.1", optional = true }
rand = { version = "0.9", optional = true }
sha1 = { version = "0.10", optional = true }
# proxy
nom = { version = "8", optional = true }
# openssl/rustls
actix-tls = { version = "3.4", default-features = false, optional = true }
@ -165,6 +174,10 @@ workspace = true
name = "ws"
required-features = ["ws", "rustls-0_23"]
[[example]]
name = "haproxy"
required-features = ["http2", "haproxy"]
[[example]]
name = "tls_rustls"
required-features = ["http2", "rustls-0_23"]

View File

@ -0,0 +1,45 @@
use std::{io, time::Duration};
use actix_http::{Error, HttpService, Request, Response, StatusCode};
use actix_server::Server;
use bytes::BytesMut;
use futures_util::StreamExt as _;
use http::header::HeaderValue;
use tracing::info;
#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
Server::build()
.bind("echo", ("127.0.0.1", 8080), || {
HttpService::build()
.client_request_timeout(Duration::from_secs(20))
.client_disconnect_timeout(Duration::from_secs(20))
.finish(|mut req: Request| async move {
let mut body = BytesMut::new();
while let Some(item) = req.payload().next().await {
body.extend_from_slice(&item?);
}
info!("request body: {body:?}");
let res = Response::build(StatusCode::OK)
.insert_header(("x-head", HeaderValue::from_static("dummy value!")))
.body(body);
Ok::<_, Error>(res)
})
.tcp_auto_h2c_proxy_protocol_v1()
})?
.workers(2)
.run()
.await
}
static_assertions::assert_impl_all!(
tokio::io::BufReader<tokio::net::TcpStream>:
tokio::io::AsyncRead,
tokio::io::AsyncWrite,
Unpin,
);

View File

@ -993,6 +993,8 @@ where
Poll::Ready(Ok(n)) => {
this.flags.remove(Flags::FINISHED);
eprintln!("readbuf contents: {:?}", this.read_buf);
if n == 0 {
return Ok(true);
}

View File

@ -91,6 +91,59 @@ where
}
}
#[cfg(feature = "haproxy")]
impl<S, B, X, U> H1Service<actix_proxy_protocol::v1::TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Response<BoxBody>>,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>>,
B: MessageBody,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Response<BoxBody>>,
X::InitError: fmt::Debug,
U: ServiceFactory<
(
Request,
Framed<actix_proxy_protocol::v1::TlsStream<TcpStream>, Codec>,
),
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Response<BoxBody>>,
U::InitError: fmt::Debug,
{
/// Creates TCP stream service from HTTP service that consumes PROXY protocol v1 headers first.
///
/// The connection info obtained from the PROXY header.
pub fn tcp_proxy_protocol_v1(
self,
) -> impl ServiceFactory<
TcpStream,
Config = (),
Response = (),
Error = actix_proxy_protocol::v1::TlsError<std::io::Error, DispatchError>,
InitError = (),
> {
use actix_proxy_protocol::v1::{TlsError, TlsStream};
actix_proxy_protocol::v1::Acceptor::new()
.map_init_err(|_| unreachable!("TLS acceptor service factory does not error on init"))
.map_err(TlsError::into_service_error)
.map(|io: TlsStream<TcpStream>| {
let peer_addr = io.0.get_ref().peer_addr().ok();
(io, peer_addr)
})
.and_then(self.map_err(TlsError::Service))
}
}
#[cfg(feature = "openssl")]
mod openssl {
use actix_tls::accept::{

View File

@ -240,6 +240,131 @@ where
}
}
#[cfg(feature = "haproxy")]
impl<S, B, X, U> HttpService<actix_proxy_protocol::v1::TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Response<BoxBody>> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Response<BoxBody>>,
X::InitError: fmt::Debug,
U: ServiceFactory<
(
Request,
Framed<actix_proxy_protocol::v1::TlsStream<TcpStream>, h1::Codec>,
),
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Response<BoxBody>>,
U::InitError: fmt::Debug,
{
/// Creates TCP stream service from HTTP service that consumes PROXY protocol v1 headers first.
///
/// The connection info obtained from the PROXY header.
pub fn tcp_proxy_protocol_v1(
self,
) -> impl ServiceFactory<
TcpStream,
Config = (),
Response = (),
Error = actix_proxy_protocol::v1::TlsError<std::io::Error, DispatchError>,
InitError = (),
> {
use actix_proxy_protocol::v1::{TlsError, TlsStream};
actix_proxy_protocol::v1::Acceptor::new()
.map_init_err(|_| unreachable!("TLS acceptor service factory does not error on init"))
.map_err(TlsError::into_service_error)
.map(|io: TlsStream<TcpStream>| {
let peer_addr = io.0.get_ref().peer_addr().ok();
(io, Protocol::Http1, peer_addr)
})
.and_then(self.map_err(TlsError::Service))
}
}
#[cfg(feature = "haproxy")]
impl<S, B, X, U> HttpService<actix_proxy_protocol::v1::TlsStream<TcpStream>, S, B, X, U>
where
S: ServiceFactory<Request, Config = ()>,
S::Future: 'static,
S::Error: Into<Response<BoxBody>> + 'static,
S::InitError: fmt::Debug,
S::Response: Into<Response<B>> + 'static,
<S::Service as Service<Request>>::Future: 'static,
B: MessageBody + 'static,
X: ServiceFactory<Request, Config = (), Response = Request>,
X::Future: 'static,
X::Error: Into<Response<BoxBody>>,
X::InitError: fmt::Debug,
U: ServiceFactory<
(
Request,
Framed<actix_proxy_protocol::v1::TlsStream<TcpStream>, h1::Codec>,
),
Config = (),
Response = (),
>,
U::Future: 'static,
U::Error: fmt::Display + Into<Response<BoxBody>>,
U::InitError: fmt::Debug,
{
/// Creates TCP stream service from HTTP service that consumes PROXY protocol v1 headers first.
///
/// The connection info obtained from the PROXY header.
pub fn tcp_auto_h2c_proxy_protocol_v1(
self,
) -> impl ServiceFactory<
TcpStream,
Config = (),
Response = (),
Error = actix_proxy_protocol::v1::TlsError<std::io::Error, DispatchError>,
InitError = (),
> {
use actix_proxy_protocol::v1::{TlsError, TlsStream};
actix_proxy_protocol::v1::Acceptor::new()
.map_init_err(|_| unreachable!("TLS acceptor service factory does not error on init"))
.map_err(TlsError::into_service_error)
.and_then(fn_service(move |io: TlsStream<TcpStream>| async move {
// subset of HTTP/2 preface defined by RFC 9113 §3.4
// this subset was chosen to maximize likelihood that peeking only once will allow us to
// reliably determine version or else it should fallback to h1 and fail quickly if data
// on the wire is junk
const H2_PREFACE: &[u8] = b"PRI * HTTP/2";
let mut buf = [0; 12];
// TODO: cannot peak into a bufreader
io.0.get_ref().peek(&mut buf).await.map_err(TlsError::Tls)?;
let proto = if buf == H2_PREFACE {
Protocol::Http2
} else {
Protocol::Http1
};
let peer_addr = io.0.get_ref().peer_addr().ok();
Ok((io, proto, peer_addr))
}))
.and_then(self.map_err(TlsError::Service))
}
}
/// Configuration options used when accepting TLS connection.
#[cfg(feature = "__tls")]
#[derive(Debug, Default)]

View File

@ -16,7 +16,6 @@ Middleware is registered for each App, Scope, or Resource and executed in the re
Actix Web's middleware system is built on two main traits:
1. `Transform<S, Req>`: The builder trait that creates the actual Service. It's responsible for:
- Creating new middleware instances
- Assembling the middleware chain
- Handling initialization errors