Compare commits

...

2 Commits

Author SHA1 Message Date
Rob Ede 22966602d3
Merge e67d4521e5 into b8ff2a47a6 2025-08-29 04:32:56 +01:00
Rob Ede e67d4521e5
parse v1 header from tcpstream 2025-08-29 04:32:49 +01:00
8 changed files with 262 additions and 37 deletions

View File

@ -2,9 +2,13 @@ version: "0.2"
words:
- actix
- addrs
- ALPN
- arrayvec
- bitflags
- clippy
- deque
- itertools
- itoa
- mptcp
- MSRV
- nonblocking
@ -13,6 +17,7 @@ words:
- rcgen
- Rustls
- rustup
- smallvec
- spki
- uring
- webpki

87
Cargo.lock generated
View File

@ -32,6 +32,33 @@ dependencies = [
"trybuild",
]
[[package]]
name = "actix-proxy-protocol"
version = "0.0.1"
dependencies = [
"actix-codec",
"actix-rt",
"actix-server",
"actix-service",
"actix-utils",
"arrayvec",
"bitflags 2.9.3",
"bytes",
"const-str",
"crc32fast",
"futures-core",
"futures-util",
"hex",
"itoa",
"nom 8.0.0",
"once_cell",
"pretty_assertions",
"smallvec",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "actix-rt"
version = "2.10.0"
@ -187,6 +214,12 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd"
[[package]]
name = "arrayvec"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
[[package]]
name = "async-stream"
version = "0.3.6"
@ -357,7 +390,7 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766"
dependencies = [
"nom",
"nom 7.1.3",
]
[[package]]
@ -438,6 +471,12 @@ dependencies = [
"cc",
]
[[package]]
name = "const-str"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3618cccc083bb987a415d85c02ca6c9994ea5b44731ec28b9ecf09658655fba9"
[[package]]
name = "core-foundation"
version = "0.9.4"
@ -454,6 +493,15 @@ version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b"
[[package]]
name = "crc32fast"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511"
dependencies = [
"cfg-if",
]
[[package]]
name = "criterion"
version = "0.5.1"
@ -536,6 +584,12 @@ dependencies = [
"powerfmt",
]
[[package]]
name = "diff"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8"
[[package]]
name = "displaydoc"
version = "0.2.5"
@ -764,6 +818,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "home"
version = "0.5.11"
@ -1198,6 +1258,15 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nom"
version = "8.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405"
dependencies = [
"memchr",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@ -1423,6 +1492,16 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "pretty_assertions"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ae130e2f271fbc2ac3a40fb1d07180839cdbbe443c7a27e1e3c13c5cac0116d"
dependencies = [
"diff",
"yansi",
]
[[package]]
name = "pretty_env_logger"
version = "0.5.0"
@ -2963,6 +3042,12 @@ version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb"
[[package]]
name = "yansi"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049"
[[package]]
name = "yasna"
version = "0.5.2"

View File

@ -1,9 +1,7 @@
[package]
name = "actix-proxy-protocol"
version = "0.0.1"
authors = [
"Rob Ede <robjtede@icloud.com>",
]
authors = ["Rob Ede <robjtede@icloud.com>"]
description = "PROXY protocol utilities"
keywords = ["proxy", "protocol", "network", "haproxy", "tcp"]
categories = ["network-programming", "asynchronous"]
@ -23,6 +21,7 @@ crc32fast = "1"
futures-core = { version = "0.3.17", default-features = false, features = ["std"] }
futures-util = { version = "0.3.17", default-features = false, features = ["std"] }
itoa = "1"
nom = "8"
smallvec = "1"
tokio = { version = "1.13.1", features = ["sync", "io-util"] }
tracing = { version = "0.1.30", default-features = false, features = ["log"] }
@ -31,11 +30,11 @@ tracing = { version = "0.1.30", default-features = false, features = ["log"] }
actix-codec = "0.5"
actix-rt = "2.6"
actix-server = "2"
bytes = "1"
const-str = "0.5"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] }
hex = "0.4"
once_cell = "1"
pretty_assertions = "1"
tokio = { version = "1.13.1", features = ["io-util", "rt-multi-thread", "macros", "fs"] }
tracing-subscriber = "0.3"

View File

@ -15,10 +15,13 @@ use actix_proxy_protocol::{tlv, v1, v2, AddressFamily, Command, TransportProtoco
use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::{fn_service, ServiceFactoryExt as _};
use arrayvec::ArrayVec;
use bytes::BytesMut;
use const_str::concat_bytes;
use once_cell::sync::Lazy;
use tokio::io::{copy_bidirectional, AsyncReadExt as _, AsyncWriteExt as _};
use tokio::io::{
copy_bidirectional, AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader,
};
static UPSTREAM: Lazy<SocketAddr> = Lazy::new(|| SocketAddr::from(([127, 0, 0, 1], 8080)));
@ -47,7 +50,7 @@ async fn wrap_with_proxy_protocol_v1(mut stream: TcpStream) -> io::Result<()> {
tracing::info!(
"PROXYv1 {} -> {}",
stream.peer_addr().unwrap(),
UPSTREAM.to_string()
UPSTREAM.to_string(),
);
let proxy_header = v1::Header::new(
@ -69,7 +72,7 @@ async fn wrap_with_proxy_protocol_v2(mut stream: TcpStream) -> io::Result<()> {
tracing::info!(
"PROXYv2 {} -> {}",
stream.peer_addr().unwrap(),
UPSTREAM.to_string()
UPSTREAM.to_string(),
);
let mut proxy_header = v2::Header::new_tcp_ipv4_proxy(([127, 0, 0, 1], 8082), *UPSTREAM);
@ -87,19 +90,74 @@ async fn wrap_with_proxy_protocol_v2(mut stream: TcpStream) -> io::Result<()> {
Ok(())
}
async fn unwrap_proxy_protocol(mut stream: TcpStream) -> io::Result<()> {
let mut upstream = TcpStream::connect(("127.0.0.1", 8080)).await?;
tracing::info!(
"PROXY unwrap {} -> {}",
stream.peer_addr().unwrap(),
UPSTREAM.to_string(),
);
let mut header = [0; 12];
stream.peek(&mut header).await?;
eprintln!("header: {}", String::from_utf8_lossy(&header));
if &header[..v1::SIGNATURE.len()] == v1::SIGNATURE.as_bytes() {
tracing::info!("v1");
let mut stream = BufReader::new(stream);
let mut buf = Vec::with_capacity(v1::MAX_HEADER_SIZE);
let _len = stream.read_until(b'\n', &mut buf).await?;
eprintln!("{}", String::from_utf8_lossy(&buf));
let (rest, header) = match v1::Header::try_from_bytes(&buf) {
Ok((rest, header)) => (rest, header),
Err(err) => {
match err {
nom::Err::Incomplete(needed) => todo!(),
nom::Err::Error(err) => {
eprintln!(
"err {:?}, input: {}",
err.code,
String::from_utf8_lossy(err.input)
)
}
nom::Err::Failure(_) => todo!(),
}
return Ok(());
}
};
eprintln!("{:02X?} - {:?}", rest, header);
let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?;
} else if header == v2::SIGNATURE {
tracing::info!("v2");
let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?;
} else {
tracing::warn!("No proxy header; closing");
}
Ok(())
}
fn start_server() -> io::Result<Server> {
let addr = ("127.0.0.1", 8082);
tracing::info!("starting proxy server on port: {}", &addr.0);
tracing::info!("proxying to 127.0.0.1:8080");
Ok(Server::build()
.bind("proxy-protocol-v1", ("127.0.0.1", 8081), move || {
fn_service(wrap_with_proxy_protocol_v1)
.map_err(|err| tracing::error!("service error: {:?}", err))
.map_err(|err| tracing::error!("service error: {err:?}"))
})?
.bind("proxy-protocol-v2", addr, move || {
.bind("proxy-protocol-v2", ("127.0.0.1", 8082), move || {
fn_service(wrap_with_proxy_protocol_v2)
.map_err(|err| tracing::error!("service error: {:?}", err))
.map_err(|err| tracing::error!("service error: {err:?}"))
})?
.bind("proxy-protocol-unwrap", ("127.0.0.1", 8083), move || {
fn_service(unwrap_proxy_protocol)
.map_err(|err| tracing::error!("service error: {err:?}"))
})?
.workers(2)
.run())
@ -107,7 +165,9 @@ fn start_server() -> io::Result<Server> {
#[tokio::main]
async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
tracing_subscriber::fmt::fmt().without_time().init();
start_server()?.await?;
Ok(())
}

View File

@ -1,21 +1,9 @@
//! PROXY protocol.
#![deny(rust_2018_idioms, nonstandard_style)]
#![warn(future_incompatible)]
// #![warn(missing_docs)]
#![allow(unused)]
#![expect(dead_code)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
use std::{
convert::TryFrom as _,
fmt, io,
net::{IpAddr, SocketAddr},
};
use arrayvec::{ArrayString, ArrayVec};
use tokio::io::{AsyncWrite, AsyncWriteExt as _};
pub mod tlv;
pub mod v1;
pub mod v2;
@ -107,7 +95,7 @@ pub enum AddressFamily {
}
impl AddressFamily {
fn v1_str(&self) -> &'static str {
pub(crate) fn v1_str(&self) -> &'static str {
match self {
AddressFamily::Inet => "TCP4",
AddressFamily::Inet6 => "TCP6",

View File

@ -217,7 +217,7 @@ bitflags::bitflags! {
/// TLS (SSL).
///
/// Heckin broken atm.
/// Very broken atm.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Ssl {
/// The <client> field is made of a bit field indicating which element is present.
@ -239,7 +239,7 @@ pub struct Ssl {
impl Tlv for Ssl {
const TYPE: u8 = PP2_TYPE_SSL;
fn try_from_value(value: &[u8]) -> Option<Self> {
fn try_from_value(_value: &[u8]) -> Option<Self> {
/// The PP2_CLIENT_SSL flag indicates that the client connected over SSL/TLS. When
/// this field is present, the US-ASCII string representation of the TLS version is
/// appended at the end of the field in the TLV format using the type

View File

@ -1,11 +1,13 @@
use std::{fmt, io, net::SocketAddr};
use arrayvec::ArrayVec;
use nom::{IResult, Parser as _};
use tokio::io::{AsyncWrite, AsyncWriteExt as _};
use crate::AddressFamily;
pub(crate) const SIGNATURE: &str = "PROXY";
pub const SIGNATURE: &str = "PROXY";
pub const MAX_HEADER_SIZE: usize = 107;
#[derive(Debug, Clone)]
pub struct Header {
@ -33,15 +35,19 @@ impl Header {
}
pub fn write_to(&self, wrt: &mut impl io::Write) -> io::Result<()> {
write!(wrt, "{}", self)
write!(wrt, "{self}")
}
pub async fn write_to_tokio(&self, wrt: &mut (impl AsyncWrite + Unpin)) -> io::Result<()> {
// max length of a V1 header is 107 bytes
let mut buf = ArrayVec::<_, 107>::new();
let mut buf = ArrayVec::<_, MAX_HEADER_SIZE>::new();
self.write_to(&mut buf)?;
wrt.write_all(&buf).await
}
pub fn try_from_bytes(slice: &[u8]) -> IResult<&[u8], Self> {
parsing::parse(slice)
}
}
impl fmt::Display for Header {
@ -58,3 +64,85 @@ impl fmt::Display for Header {
)
}
}
mod parsing {
use std::{
net::{Ipv4Addr, SocketAddrV4},
str::{self, FromStr},
};
use nom::{
branch::alt,
bytes::complete::{tag, take_while},
character::complete::char,
combinator::{map, map_res},
IResult,
};
use super::*;
/// Parses a number from serialized representation (as bytes).
fn parse_number<T: FromStr>(input: &[u8]) -> IResult<&[u8], T> {
map_res(take_while(|c: u8| c.is_ascii_digit()), |s: &[u8]| {
let s = str::from_utf8(s).map_err(|_| "utf8 error")?;
let val = s.parse::<T>().map_err(|_| "u8 parse error")?;
Ok::<_, Box<dyn std::error::Error>>(val)
})
.parse(input)
}
/// Parses an address family.
fn parse_address_family(input: &[u8]) -> IResult<&[u8], AddressFamily> {
map_res(alt((tag("TCP4"), tag("TCP6"))), |af: &[u8]| match af {
b"TCP4" => Ok(AddressFamily::Inet),
b"TCP6" => Ok(AddressFamily::Inet6),
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid address family",
)),
})
.parse(input)
}
/// Parses an IPv4 address from serialized representation (as bytes).
fn parse_ipv4(input: &[u8]) -> IResult<&[u8], Ipv4Addr> {
map(
(
parse_number::<u8>,
char('.'),
parse_number::<u8>,
char('.'),
parse_number::<u8>,
char('.'),
parse_number::<u8>,
),
|(a, _, b, _, c, _, d)| Ipv4Addr::new(a, b, c, d),
)
.parse(input)
}
/// Parses an IPv4 address from ASCII bytes.
pub(super) fn parse(input: &[u8]) -> IResult<&[u8], Header> {
map(
(
tag(SIGNATURE),
char(' '),
parse_address_family,
char(' '),
parse_ipv4,
char(' '),
parse_ipv4,
char(' '),
parse_number::<u16>,
char(' '),
parse_number::<u16>,
),
|(_, _, af, _, src_ip, _, dst_ip, _, src_port, _, dst_port)| Header {
af,
src: SocketAddr::V4(SocketAddrV4::new(src_ip, src_port)),
dst: SocketAddr::V4(SocketAddrV4::new(dst_ip, dst_port)),
},
)
.parse(input)
}
}

View File

@ -3,7 +3,7 @@ use std::{
net::{IpAddr, SocketAddr},
};
use smallvec::{smallvec, SmallVec, ToSmallVec as _};
use smallvec::{SmallVec, ToSmallVec as _};
use tokio::io::{AsyncWrite, AsyncWriteExt as _};
use crate::{
@ -11,7 +11,7 @@ use crate::{
AddressFamily, Command, TransportProtocol, Version,
};
pub(crate) const SIGNATURE: [u8; 12] = [
pub const SIGNATURE: [u8; 12] = [
0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A,
];
@ -192,7 +192,7 @@ impl Header {
let mut buf = Vec::new();
this.write_to(&mut buf).unwrap();
let mut crc_calc = crc32fast::hash(&buf);
let crc_calc = crc32fast::hash(&buf);
Some(crc_sent.checksum == crc_calc)
}