From d1f8cbc1a89f889e41e86a06f17deb40cda53797 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sat, 30 Aug 2025 00:00:43 +0100 Subject: [PATCH] feat: PROXYv1 wrapper --- Cargo.lock | 54 +++++++++- actix-http/Cargo.toml | 13 +++ actix-http/examples/haproxy.rs | 45 ++++++++ actix-http/src/h1/dispatcher.rs | 2 + actix-http/src/h1/service.rs | 53 +++++++++ actix-http/src/service.rs | 125 ++++++++++++++++++++++ actix-web/src/middleware/authors-guide.md | 1 - 7 files changed, 287 insertions(+), 6 deletions(-) create mode 100644 actix-http/examples/haproxy.rs diff --git a/Cargo.lock b/Cargo.lock index efa4ba081..a7fa9458a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,6 +75,7 @@ version = "3.11.1" dependencies = [ "actix-codec", "actix-http-test", + "actix-proxy-protocol", "actix-rt", "actix-server", "actix-service", @@ -105,6 +106,7 @@ dependencies = [ "local-channel", "memchr", "mime", + "nom 8.0.0", "once_cell", "openssl", "percent-encoding", @@ -222,6 +224,27 @@ dependencies = [ "thiserror", ] +[[package]] +name = "actix-proxy-protocol" +version = "0.0.2" +dependencies = [ + "actix-rt", + "actix-service", + "actix-utils", + "arrayvec", + "bitflags 2.9.3", + "crc32fast", + "futures-core", + "futures-util", + "impl-more 0.3.1", + "itoa", + "nom 8.0.0", + "pin-project-lite", + "smallvec", + "tokio", + "tracing", +] + [[package]] name = "actix-router" version = "0.5.3" @@ -239,9 +262,9 @@ dependencies = [ [[package]] name = "actix-rt" -version = "2.10.0" +version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24eda4e2a6e042aa4e55ac438a2ae052d3b5da0ecf83d7411e1a368946925208" +checksum = "92589714878ca59a7626ea19734f0e07a6a875197eec751bb5d3f99e64998c63" dependencies = [ "actix-macros", "futures-core", @@ -315,7 +338,7 @@ dependencies = [ "futures-core", "http 0.2.12", "http 1.3.1", - "impl-more", + "impl-more 0.1.9", "openssl", "pin-project-lite", "rustls-native-certs", @@ -375,7 +398,7 @@ dependencies = [ "foldhash", "futures-core", "futures-util", - "impl-more", + "impl-more 0.1.9", "itoa", "language-tags", "log", @@ -573,6 +596,12 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "assert_matches" version = "1.5.0" @@ -832,7 +861,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" dependencies = [ - "nom", + "nom 7.1.3", ] [[package]] @@ -1750,6 +1779,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8a5a9a0ff0086c7a148acb942baaabeadf9504d10400b5a05645853729b9cd2" +[[package]] +name = "impl-more" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35a84fd5aa25fae5c0f4a33d9cac2ca017fc622cbd089be2229993514990f870" + [[package]] name = "indexmap" version = "2.11.0" @@ -2069,6 +2104,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + [[package]] name = "num-conv" version = "0.1.0" diff --git a/actix-http/Cargo.toml b/actix-http/Cargo.toml index e28e6c400..1d4d7177f 100644 --- a/actix-http/Cargo.toml +++ b/actix-http/Cargo.toml @@ -60,6 +60,9 @@ http2 = ["dep:h2"] # WebSocket protocol implementation ws = ["dep:local-channel", "dep:base64", "dep:rand", "dep:sha1"] +# HAProxy PROXY protocol support +haproxy = ["dep:nom"] + # TLS via OpenSSL openssl = ["__tls", "actix-tls/accept", "actix-tls/openssl"] @@ -93,6 +96,8 @@ __tls = [] [dependencies] actix-codec = "0.5" +# actix-proxy-protocol = "0.0.2" +actix-proxy-protocol = { path = "../../actix-web-lab/actix-proxy-protocol" } actix-rt = { version = "2.2", default-features = false } actix-service = "2" actix-utils = "3" @@ -104,6 +109,7 @@ derive_more = { version = "2", features = ["as_ref", "deref", "deref_mut", "disp encoding_rs = "0.8" foldhash = "0.1" futures-core = { version = "0.3.17", default-features = false, features = ["alloc"] } +futures-util = { version = "0.3.17", default-features = false, features = ["alloc"] } http = "0.2.7" httparse = "1.5.1" httpdate = "1.0.1" @@ -126,6 +132,9 @@ local-channel = { version = "0.1", optional = true } rand = { version = "0.9", optional = true } sha1 = { version = "0.10", optional = true } +# proxy +nom = { version = "8", optional = true } + # openssl/rustls actix-tls = { version = "3.4", default-features = false, optional = true } @@ -165,6 +174,10 @@ workspace = true name = "ws" required-features = ["ws", "rustls-0_23"] +[[example]] +name = "haproxy" +required-features = ["http2", "haproxy"] + [[example]] name = "tls_rustls" required-features = ["http2", "rustls-0_23"] diff --git a/actix-http/examples/haproxy.rs b/actix-http/examples/haproxy.rs new file mode 100644 index 000000000..d8e7cb4d2 --- /dev/null +++ b/actix-http/examples/haproxy.rs @@ -0,0 +1,45 @@ +use std::{io, time::Duration}; + +use actix_http::{Error, HttpService, Request, Response, StatusCode}; +use actix_server::Server; +use bytes::BytesMut; +use futures_util::StreamExt as _; +use http::header::HeaderValue; +use tracing::info; + +#[actix_rt::main] +async fn main() -> io::Result<()> { + env_logger::init_from_env(env_logger::Env::new().default_filter_or("info")); + + Server::build() + .bind("echo", ("127.0.0.1", 8080), || { + HttpService::build() + .client_request_timeout(Duration::from_secs(20)) + .client_disconnect_timeout(Duration::from_secs(20)) + .finish(|mut req: Request| async move { + let mut body = BytesMut::new(); + while let Some(item) = req.payload().next().await { + body.extend_from_slice(&item?); + } + + info!("request body: {body:?}"); + + let res = Response::build(StatusCode::OK) + .insert_header(("x-head", HeaderValue::from_static("dummy value!"))) + .body(body); + + Ok::<_, Error>(res) + }) + .tcp_auto_h2c_proxy_protocol_v1() + })? + .workers(2) + .run() + .await +} + +static_assertions::assert_impl_all!( + tokio::io::BufReader: + tokio::io::AsyncRead, + tokio::io::AsyncWrite, + Unpin, +); diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index 3f0b78af4..caaf15fe1 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -993,6 +993,8 @@ where Poll::Ready(Ok(n)) => { this.flags.remove(Flags::FINISHED); + eprintln!("readbuf contents: {:?}", this.read_buf); + if n == 0 { return Ok(true); } diff --git a/actix-http/src/h1/service.rs b/actix-http/src/h1/service.rs index 2cf76edb2..7ef0d6277 100644 --- a/actix-http/src/h1/service.rs +++ b/actix-http/src/h1/service.rs @@ -91,6 +91,59 @@ where } } +#[cfg(feature = "haproxy")] +impl H1Service, S, B, X, U> +where + S: ServiceFactory, + S::Future: 'static, + S::Error: Into>, + S::InitError: fmt::Debug, + S::Response: Into>, + + B: MessageBody, + + X: ServiceFactory, + X::Future: 'static, + X::Error: Into>, + X::InitError: fmt::Debug, + + U: ServiceFactory< + ( + Request, + Framed, Codec>, + ), + Config = (), + Response = (), + >, + U::Future: 'static, + U::Error: fmt::Display + Into>, + U::InitError: fmt::Debug, +{ + /// Creates TCP stream service from HTTP service that consumes PROXY protocol v1 headers first. + /// + /// The connection info obtained from the PROXY header. + pub fn tcp_proxy_protocol_v1( + self, + ) -> impl ServiceFactory< + TcpStream, + Config = (), + Response = (), + Error = actix_proxy_protocol::v1::TlsError, + InitError = (), + > { + use actix_proxy_protocol::v1::{TlsError, TlsStream}; + + actix_proxy_protocol::v1::Acceptor::new() + .map_init_err(|_| unreachable!("TLS acceptor service factory does not error on init")) + .map_err(TlsError::into_service_error) + .map(|io: TlsStream| { + let peer_addr = io.0.get_ref().peer_addr().ok(); + (io, peer_addr) + }) + .and_then(self.map_err(TlsError::Service)) + } +} + #[cfg(feature = "openssl")] mod openssl { use actix_tls::accept::{ diff --git a/actix-http/src/service.rs b/actix-http/src/service.rs index 3be099d9f..318dc2d03 100644 --- a/actix-http/src/service.rs +++ b/actix-http/src/service.rs @@ -240,6 +240,131 @@ where } } +#[cfg(feature = "haproxy")] +impl HttpService, S, B, X, U> +where + S: ServiceFactory, + S::Future: 'static, + S::Error: Into> + 'static, + S::InitError: fmt::Debug, + S::Response: Into> + 'static, + >::Future: 'static, + + B: MessageBody + 'static, + + X: ServiceFactory, + X::Future: 'static, + X::Error: Into>, + X::InitError: fmt::Debug, + + U: ServiceFactory< + ( + Request, + Framed, h1::Codec>, + ), + Config = (), + Response = (), + >, + U::Future: 'static, + U::Error: fmt::Display + Into>, + U::InitError: fmt::Debug, +{ + /// Creates TCP stream service from HTTP service that consumes PROXY protocol v1 headers first. + /// + /// The connection info obtained from the PROXY header. + pub fn tcp_proxy_protocol_v1( + self, + ) -> impl ServiceFactory< + TcpStream, + Config = (), + Response = (), + Error = actix_proxy_protocol::v1::TlsError, + InitError = (), + > { + use actix_proxy_protocol::v1::{TlsError, TlsStream}; + + actix_proxy_protocol::v1::Acceptor::new() + .map_init_err(|_| unreachable!("TLS acceptor service factory does not error on init")) + .map_err(TlsError::into_service_error) + .map(|io: TlsStream| { + let peer_addr = io.0.get_ref().peer_addr().ok(); + (io, Protocol::Http1, peer_addr) + }) + .and_then(self.map_err(TlsError::Service)) + } +} + +#[cfg(feature = "haproxy")] +impl HttpService, S, B, X, U> +where + S: ServiceFactory, + S::Future: 'static, + S::Error: Into> + 'static, + S::InitError: fmt::Debug, + S::Response: Into> + 'static, + >::Future: 'static, + + B: MessageBody + 'static, + + X: ServiceFactory, + X::Future: 'static, + X::Error: Into>, + X::InitError: fmt::Debug, + + U: ServiceFactory< + ( + Request, + Framed, h1::Codec>, + ), + Config = (), + Response = (), + >, + U::Future: 'static, + U::Error: fmt::Display + Into>, + U::InitError: fmt::Debug, +{ + /// Creates TCP stream service from HTTP service that consumes PROXY protocol v1 headers first. + /// + /// The connection info obtained from the PROXY header. + pub fn tcp_auto_h2c_proxy_protocol_v1( + self, + ) -> impl ServiceFactory< + TcpStream, + Config = (), + Response = (), + Error = actix_proxy_protocol::v1::TlsError, + InitError = (), + > { + use actix_proxy_protocol::v1::{TlsError, TlsStream}; + + actix_proxy_protocol::v1::Acceptor::new() + .map_init_err(|_| unreachable!("TLS acceptor service factory does not error on init")) + .map_err(TlsError::into_service_error) + .and_then(fn_service(move |io: TlsStream| async move { + // subset of HTTP/2 preface defined by RFC 9113 ยง3.4 + // this subset was chosen to maximize likelihood that peeking only once will allow us to + // reliably determine version or else it should fallback to h1 and fail quickly if data + // on the wire is junk + const H2_PREFACE: &[u8] = b"PRI * HTTP/2"; + + let mut buf = [0; 12]; + + // TODO: cannot peak into a bufreader + io.0.get_ref().peek(&mut buf).await.map_err(TlsError::Tls)?; + + let proto = if buf == H2_PREFACE { + Protocol::Http2 + } else { + Protocol::Http1 + }; + + let peer_addr = io.0.get_ref().peer_addr().ok(); + Ok((io, proto, peer_addr)) + })) + .and_then(self.map_err(TlsError::Service)) + } +} + /// Configuration options used when accepting TLS connection. #[cfg(feature = "__tls")] #[derive(Debug, Default)] diff --git a/actix-web/src/middleware/authors-guide.md b/actix-web/src/middleware/authors-guide.md index edf7de385..e073caf73 100644 --- a/actix-web/src/middleware/authors-guide.md +++ b/actix-web/src/middleware/authors-guide.md @@ -16,7 +16,6 @@ Middleware is registered for each App, Scope, or Resource and executed in the re Actix Web's middleware system is built on two main traits: 1. `Transform`: The builder trait that creates the actual Service. It's responsible for: - - Creating new middleware instances - Assembling the middleware chain - Handling initialization errors