Compare commits

...

2 Commits

Author SHA1 Message Date
Rob Ede 22966602d3
Merge e67d4521e5 into b8ff2a47a6 2025-08-29 04:32:56 +01:00
Rob Ede e67d4521e5
parse v1 header from tcpstream 2025-08-29 04:32:49 +01:00
8 changed files with 262 additions and 37 deletions

View File

@ -2,9 +2,13 @@ version: "0.2"
words: words:
- actix - actix
- addrs - addrs
- ALPN
- arrayvec
- bitflags
- clippy - clippy
- deque - deque
- itertools - itertools
- itoa
- mptcp - mptcp
- MSRV - MSRV
- nonblocking - nonblocking
@ -13,6 +17,7 @@ words:
- rcgen - rcgen
- Rustls - Rustls
- rustup - rustup
- smallvec
- spki - spki
- uring - uring
- webpki - webpki

87
Cargo.lock generated
View File

@ -32,6 +32,33 @@ dependencies = [
"trybuild", "trybuild",
] ]
[[package]]
name = "actix-proxy-protocol"
version = "0.0.1"
dependencies = [
"actix-codec",
"actix-rt",
"actix-server",
"actix-service",
"actix-utils",
"arrayvec",
"bitflags 2.9.3",
"bytes",
"const-str",
"crc32fast",
"futures-core",
"futures-util",
"hex",
"itoa",
"nom 8.0.0",
"once_cell",
"pretty_assertions",
"smallvec",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]] [[package]]
name = "actix-rt" name = "actix-rt"
version = "2.10.0" version = "2.10.0"
@ -187,6 +214,12 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd"
[[package]]
name = "arrayvec"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]] [[package]]
name = "async-stream" name = "async-stream"
version = "0.3.6" version = "0.3.6"
@ -357,7 +390,7 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [ dependencies = [
"nom", "nom 7.1.3",
] ]
[[package]] [[package]]
@ -438,6 +471,12 @@ dependencies = [
"cc", "cc",
] ]
[[package]]
name = "const-str"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3618cccc083bb987a415d85c02ca6c9994ea5b44731ec28b9ecf09658655fba9"
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.9.4" version = "0.9.4"
@ -454,6 +493,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "crc32fast"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "criterion" name = "criterion"
version = "0.5.1" version = "0.5.1"
@ -536,6 +584,12 @@ dependencies = [
"powerfmt", "powerfmt",
] ]
[[package]]
name = "diff"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8"
[[package]] [[package]]
name = "displaydoc" name = "displaydoc"
version = "0.2.5" version = "0.2.5"
@ -764,6 +818,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "home" name = "home"
version = "0.5.11" version = "0.5.11"
@ -1198,6 +1258,15 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "nom"
version = "8.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "nu-ansi-term" name = "nu-ansi-term"
version = "0.46.0" version = "0.46.0"
@ -1423,6 +1492,16 @@ dependencies = [
"zerocopy", "zerocopy",
] ]
[[package]]
name = "pretty_assertions"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ae130e2f271fbc2ac3a40fb1d07180839cdbbe443c7a27e1e3c13c5cac0116d"
dependencies = [
"diff",
"yansi",
]
[[package]] [[package]]
name = "pretty_env_logger" name = "pretty_env_logger"
version = "0.5.0" version = "0.5.0"
@ -2963,6 +3042,12 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb"
[[package]]
name = "yansi"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
[[package]] [[package]]
name = "yasna" name = "yasna"
version = "0.5.2" version = "0.5.2"

View File

@ -1,9 +1,7 @@
[package] [package]
name = "actix-proxy-protocol" name = "actix-proxy-protocol"
version = "0.0.1" version = "0.0.1"
authors = [ authors = ["Rob Ede <robjtede@icloud.com>"]
"Rob Ede <robjtede@icloud.com>",
]
description = "PROXY protocol utilities" description = "PROXY protocol utilities"
keywords = ["proxy", "protocol", "network", "haproxy", "tcp"] keywords = ["proxy", "protocol", "network", "haproxy", "tcp"]
categories = ["network-programming", "asynchronous"] categories = ["network-programming", "asynchronous"]
@ -23,6 +21,7 @@ crc32fast = "1"
futures-core = { version = "0.3.17", default-features = false, features = ["std"] } futures-core = { version = "0.3.17", default-features = false, features = ["std"] }
futures-util = { version = "0.3.17", default-features = false, features = ["std"] } futures-util = { version = "0.3.17", default-features = false, features = ["std"] }
itoa = "1" itoa = "1"
nom = "8"
smallvec = "1" smallvec = "1"
tokio = { version = "1.13.1", features = ["sync", "io-util"] } tokio = { version = "1.13.1", features = ["sync", "io-util"] }
tracing = { version = "0.1.30", default-features = false, features = ["log"] } tracing = { version = "0.1.30", default-features = false, features = ["log"] }
@ -31,11 +30,11 @@ tracing = { version = "0.1.30", default-features = false, features = ["log"] }
actix-codec = "0.5" actix-codec = "0.5"
actix-rt = "2.6" actix-rt = "2.6"
actix-server = "2" actix-server = "2"
bytes = "1" bytes = "1"
const-str = "0.5" const-str = "0.5"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] } futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] }
hex = "0.4"
once_cell = "1" once_cell = "1"
pretty_assertions = "1" pretty_assertions = "1"
tokio = { version = "1.13.1", features = ["io-util", "rt-multi-thread", "macros", "fs"] } tokio = { version = "1.13.1", features = ["io-util", "rt-multi-thread", "macros", "fs"] }
tracing-subscriber = "0.3"

View File

@ -15,10 +15,13 @@ use actix_proxy_protocol::{tlv, v1, v2, AddressFamily, Command, TransportProtoco
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_server::Server; use actix_server::Server;
use actix_service::{fn_service, ServiceFactoryExt as _}; use actix_service::{fn_service, ServiceFactoryExt as _};
use arrayvec::ArrayVec;
use bytes::BytesMut; use bytes::BytesMut;
use const_str::concat_bytes; use const_str::concat_bytes;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tokio::io::{copy_bidirectional, AsyncReadExt as _, AsyncWriteExt as _}; use tokio::io::{
copy_bidirectional, AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader,
};
static UPSTREAM: Lazy<SocketAddr> = Lazy::new(|| SocketAddr::from(([127, 0, 0, 1], 8080))); static UPSTREAM: Lazy<SocketAddr> = Lazy::new(|| SocketAddr::from(([127, 0, 0, 1], 8080)));
@ -47,7 +50,7 @@ async fn wrap_with_proxy_protocol_v1(mut stream: TcpStream) -> io::Result<()> {
tracing::info!( tracing::info!(
"PROXYv1 {} -> {}", "PROXYv1 {} -> {}",
stream.peer_addr().unwrap(), stream.peer_addr().unwrap(),
UPSTREAM.to_string() UPSTREAM.to_string(),
); );
let proxy_header = v1::Header::new( let proxy_header = v1::Header::new(
@ -69,7 +72,7 @@ async fn wrap_with_proxy_protocol_v2(mut stream: TcpStream) -> io::Result<()> {
tracing::info!( tracing::info!(
"PROXYv2 {} -> {}", "PROXYv2 {} -> {}",
stream.peer_addr().unwrap(), stream.peer_addr().unwrap(),
UPSTREAM.to_string() UPSTREAM.to_string(),
); );
let mut proxy_header = v2::Header::new_tcp_ipv4_proxy(([127, 0, 0, 1], 8082), *UPSTREAM); let mut proxy_header = v2::Header::new_tcp_ipv4_proxy(([127, 0, 0, 1], 8082), *UPSTREAM);
@ -87,19 +90,74 @@ async fn wrap_with_proxy_protocol_v2(mut stream: TcpStream) -> io::Result<()> {
Ok(()) Ok(())
} }
async fn unwrap_proxy_protocol(mut stream: TcpStream) -> io::Result<()> {
let mut upstream = TcpStream::connect(("127.0.0.1", 8080)).await?;
tracing::info!(
"PROXY unwrap {} -> {}",
stream.peer_addr().unwrap(),
UPSTREAM.to_string(),
);
let mut header = [0; 12];
stream.peek(&mut header).await?;
eprintln!("header: {}", String::from_utf8_lossy(&header));
if &header[..v1::SIGNATURE.len()] == v1::SIGNATURE.as_bytes() {
tracing::info!("v1");
let mut stream = BufReader::new(stream);
let mut buf = Vec::with_capacity(v1::MAX_HEADER_SIZE);
let _len = stream.read_until(b'\n', &mut buf).await?;
eprintln!("{}", String::from_utf8_lossy(&buf));
let (rest, header) = match v1::Header::try_from_bytes(&buf) {
Ok((rest, header)) => (rest, header),
Err(err) => {
match err {
nom::Err::Incomplete(needed) => todo!(),
nom::Err::Error(err) => {
eprintln!(
"err {:?}, input: {}",
err.code,
String::from_utf8_lossy(err.input)
)
}
nom::Err::Failure(_) => todo!(),
}
return Ok(());
}
};
eprintln!("{:02X?} - {:?}", rest, header);
let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?;
} else if header == v2::SIGNATURE {
tracing::info!("v2");
let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?;
} else {
tracing::warn!("No proxy header; closing");
}
Ok(())
}
fn start_server() -> io::Result<Server> { fn start_server() -> io::Result<Server> {
let addr = ("127.0.0.1", 8082);
tracing::info!("starting proxy server on port: {}", &addr.0);
tracing::info!("proxying to 127.0.0.1:8080"); tracing::info!("proxying to 127.0.0.1:8080");
Ok(Server::build() Ok(Server::build()
.bind("proxy-protocol-v1", ("127.0.0.1", 8081), move || { .bind("proxy-protocol-v1", ("127.0.0.1", 8081), move || {
fn_service(wrap_with_proxy_protocol_v1) fn_service(wrap_with_proxy_protocol_v1)
.map_err(|err| tracing::error!("service error: {:?}", err)) .map_err(|err| tracing::error!("service error: {err:?}"))
})? })?
.bind("proxy-protocol-v2", addr, move || { .bind("proxy-protocol-v2", ("127.0.0.1", 8082), move || {
fn_service(wrap_with_proxy_protocol_v2) fn_service(wrap_with_proxy_protocol_v2)
.map_err(|err| tracing::error!("service error: {:?}", err)) .map_err(|err| tracing::error!("service error: {err:?}"))
})?
.bind("proxy-protocol-unwrap", ("127.0.0.1", 8083), move || {
fn_service(unwrap_proxy_protocol)
.map_err(|err| tracing::error!("service error: {err:?}"))
})? })?
.workers(2) .workers(2)
.run()) .run())
@ -107,7 +165,9 @@ fn start_server() -> io::Result<Server> {
#[tokio::main] #[tokio::main]
async fn main() -> io::Result<()> { async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info")); tracing_subscriber::fmt::fmt().without_time().init();
start_server()?.await?; start_server()?.await?;
Ok(()) Ok(())
} }

View File

@ -1,21 +1,9 @@
//! PROXY protocol. //! PROXY protocol.
#![deny(rust_2018_idioms, nonstandard_style)] #![expect(dead_code)]
#![warn(future_incompatible)]
// #![warn(missing_docs)]
#![allow(unused)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
use std::{
convert::TryFrom as _,
fmt, io,
net::{IpAddr, SocketAddr},
};
use arrayvec::{ArrayString, ArrayVec};
use tokio::io::{AsyncWrite, AsyncWriteExt as _};
pub mod tlv; pub mod tlv;
pub mod v1; pub mod v1;
pub mod v2; pub mod v2;
@ -107,7 +95,7 @@ pub enum AddressFamily {
} }
impl AddressFamily { impl AddressFamily {
fn v1_str(&self) -> &'static str { pub(crate) fn v1_str(&self) -> &'static str {
match self { match self {
AddressFamily::Inet => "TCP4", AddressFamily::Inet => "TCP4",
AddressFamily::Inet6 => "TCP6", AddressFamily::Inet6 => "TCP6",

View File

@ -217,7 +217,7 @@ bitflags::bitflags! {
/// TLS (SSL). /// TLS (SSL).
/// ///
/// Heckin broken atm. /// Very broken atm.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct Ssl { pub struct Ssl {
/// The <client> field is made of a bit field indicating which element is present. /// The <client> field is made of a bit field indicating which element is present.
@ -239,7 +239,7 @@ pub struct Ssl {
impl Tlv for Ssl { impl Tlv for Ssl {
const TYPE: u8 = PP2_TYPE_SSL; const TYPE: u8 = PP2_TYPE_SSL;
fn try_from_value(value: &[u8]) -> Option<Self> { fn try_from_value(_value: &[u8]) -> Option<Self> {
/// The PP2_CLIENT_SSL flag indicates that the client connected over SSL/TLS. When /// The PP2_CLIENT_SSL flag indicates that the client connected over SSL/TLS. When
/// this field is present, the US-ASCII string representation of the TLS version is /// this field is present, the US-ASCII string representation of the TLS version is
/// appended at the end of the field in the TLV format using the type /// appended at the end of the field in the TLV format using the type

View File

@ -1,11 +1,13 @@
use std::{fmt, io, net::SocketAddr}; use std::{fmt, io, net::SocketAddr};
use arrayvec::ArrayVec; use arrayvec::ArrayVec;
use nom::{IResult, Parser as _};
use tokio::io::{AsyncWrite, AsyncWriteExt as _}; use tokio::io::{AsyncWrite, AsyncWriteExt as _};
use crate::AddressFamily; use crate::AddressFamily;
pub(crate) const SIGNATURE: &str = "PROXY"; pub const SIGNATURE: &str = "PROXY";
pub const MAX_HEADER_SIZE: usize = 107;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Header { pub struct Header {
@ -33,15 +35,19 @@ impl Header {
} }
pub fn write_to(&self, wrt: &mut impl io::Write) -> io::Result<()> { pub fn write_to(&self, wrt: &mut impl io::Write) -> io::Result<()> {
write!(wrt, "{}", self) write!(wrt, "{self}")
} }
pub async fn write_to_tokio(&self, wrt: &mut (impl AsyncWrite + Unpin)) -> io::Result<()> { pub async fn write_to_tokio(&self, wrt: &mut (impl AsyncWrite + Unpin)) -> io::Result<()> {
// max length of a V1 header is 107 bytes // max length of a V1 header is 107 bytes
let mut buf = ArrayVec::<_, 107>::new(); let mut buf = ArrayVec::<_, MAX_HEADER_SIZE>::new();
self.write_to(&mut buf)?; self.write_to(&mut buf)?;
wrt.write_all(&buf).await wrt.write_all(&buf).await
} }
pub fn try_from_bytes(slice: &[u8]) -> IResult<&[u8], Self> {
parsing::parse(slice)
}
} }
impl fmt::Display for Header { impl fmt::Display for Header {
@ -58,3 +64,85 @@ impl fmt::Display for Header {
) )
} }
} }
mod parsing {
use std::{
net::{Ipv4Addr, SocketAddrV4},
str::{self, FromStr},
};
use nom::{
branch::alt,
bytes::complete::{tag, take_while},
character::complete::char,
combinator::{map, map_res},
IResult,
};
use super::*;
/// Parses a number from serialized representation (as bytes).
fn parse_number<T: FromStr>(input: &[u8]) -> IResult<&[u8], T> {
map_res(take_while(|c: u8| c.is_ascii_digit()), |s: &[u8]| {
let s = str::from_utf8(s).map_err(|_| "utf8 error")?;
let val = s.parse::<T>().map_err(|_| "u8 parse error")?;
Ok::<_, Box<dyn std::error::Error>>(val)
})
.parse(input)
}
/// Parses an address family.
fn parse_address_family(input: &[u8]) -> IResult<&[u8], AddressFamily> {
map_res(alt((tag("TCP4"), tag("TCP6"))), |af: &[u8]| match af {
b"TCP4" => Ok(AddressFamily::Inet),
b"TCP6" => Ok(AddressFamily::Inet6),
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid address family",
)),
})
.parse(input)
}
/// Parses an IPv4 address from serialized representation (as bytes).
fn parse_ipv4(input: &[u8]) -> IResult<&[u8], Ipv4Addr> {
map(
(
parse_number::<u8>,
char('.'),
parse_number::<u8>,
char('.'),
parse_number::<u8>,
char('.'),
parse_number::<u8>,
),
|(a, _, b, _, c, _, d)| Ipv4Addr::new(a, b, c, d),
)
.parse(input)
}
/// Parses an IPv4 address from ASCII bytes.
pub(super) fn parse(input: &[u8]) -> IResult<&[u8], Header> {
map(
(
tag(SIGNATURE),
char(' '),
parse_address_family,
char(' '),
parse_ipv4,
char(' '),
parse_ipv4,
char(' '),
parse_number::<u16>,
char(' '),
parse_number::<u16>,
),
|(_, _, af, _, src_ip, _, dst_ip, _, src_port, _, dst_port)| Header {
af,
src: SocketAddr::V4(SocketAddrV4::new(src_ip, src_port)),
dst: SocketAddr::V4(SocketAddrV4::new(dst_ip, dst_port)),
},
)
.parse(input)
}
}

View File

@ -3,7 +3,7 @@ use std::{
net::{IpAddr, SocketAddr}, net::{IpAddr, SocketAddr},
}; };
use smallvec::{smallvec, SmallVec, ToSmallVec as _}; use smallvec::{SmallVec, ToSmallVec as _};
use tokio::io::{AsyncWrite, AsyncWriteExt as _}; use tokio::io::{AsyncWrite, AsyncWriteExt as _};
use crate::{ use crate::{
@ -11,7 +11,7 @@ use crate::{
AddressFamily, Command, TransportProtocol, Version, AddressFamily, Command, TransportProtocol, Version,
}; };
pub(crate) const SIGNATURE: [u8; 12] = [ pub const SIGNATURE: [u8; 12] = [
0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, 0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A,
]; ];
@ -192,7 +192,7 @@ impl Header {
let mut buf = Vec::new(); let mut buf = Vec::new();
this.write_to(&mut buf).unwrap(); this.write_to(&mut buf).unwrap();
let mut crc_calc = crc32fast::hash(&buf); let crc_calc = crc32fast::hash(&buf);
Some(crc_sent.checksum == crc_calc) Some(crc_sent.checksum == crc_calc)
} }