diff --git a/.cspell.yml b/.cspell.yml index 3aa9eea1..26044022 100644 --- a/.cspell.yml +++ b/.cspell.yml @@ -2,9 +2,13 @@ version: "0.2" words: - actix - addrs + - ALPN + - arrayvec + - bitflags - clippy - deque - itertools + - itoa - mptcp - MSRV - nonblocking @@ -13,6 +17,7 @@ words: - rcgen - Rustls - rustup + - smallvec - spki - uring - webpki diff --git a/Cargo.lock b/Cargo.lock index 2b79b44a..9de63203 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -32,6 +32,33 @@ dependencies = [ "trybuild", ] +[[package]] +name = "actix-proxy-protocol" +version = "0.0.1" +dependencies = [ + "actix-codec", + "actix-rt", + "actix-server", + "actix-service", + "actix-utils", + "arrayvec", + "bitflags 2.9.3", + "bytes", + "const-str", + "crc32fast", + "futures-core", + "futures-util", + "hex", + "itoa", + "nom 8.0.0", + "once_cell", + "pretty_assertions", + "smallvec", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "actix-rt" version = "2.10.0" @@ -187,6 +214,12 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "async-stream" version = "0.3.6" @@ -357,7 +390,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" dependencies = [ - "nom", + "nom 7.1.3", ] [[package]] @@ -438,6 +471,12 @@ dependencies = [ "cc", ] +[[package]] +name = "const-str" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3618cccc083bb987a415d85c02ca6c9994ea5b44731ec28b9ecf09658655fba9" + [[package]] name = "core-foundation" version = "0.9.4" @@ -454,6 +493,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + [[package]] name = "criterion" version = "0.5.1" @@ -536,6 +584,12 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "diff" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" + [[package]] name = "displaydoc" version = "0.2.5" @@ -764,6 +818,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "home" version = "0.5.11" @@ -1198,6 +1258,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nom" +version = "8.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df9761775871bdef83bee530e60050f7e54b1105350d6884eb0fb4f46c2f9405" +dependencies = [ + "memchr", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -1423,6 +1492,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "pretty_assertions" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ae130e2f271fbc2ac3a40fb1d07180839cdbbe443c7a27e1e3c13c5cac0116d" +dependencies = [ + "diff", + "yansi", +] + [[package]] name = "pretty_env_logger" version = "0.5.0" @@ -2963,6 +3042,12 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea2f10b9bb0928dfb1b42b65e1f9e36f7f54dbdf08457afefb38afcdec4fa2bb" +[[package]] +name = "yansi" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfe53a6657fd280eaa890a3bc59152892ffa3e30101319d168b781ed6529b049" + [[package]] name = "yasna" version = "0.5.2" diff --git a/actix-proxy-protocol/Cargo.toml b/actix-proxy-protocol/Cargo.toml index 452041cf..c64f031c 100755 --- a/actix-proxy-protocol/Cargo.toml +++ b/actix-proxy-protocol/Cargo.toml @@ -1,9 +1,7 @@ [package] name = "actix-proxy-protocol" version = "0.0.1" -authors = [ - "Rob Ede ", -] +authors = ["Rob Ede "] description = "PROXY protocol utilities" keywords = ["proxy", "protocol", "network", "haproxy", "tcp"] categories = ["network-programming", "asynchronous"] @@ -23,6 +21,7 @@ crc32fast = "1" futures-core = { version = "0.3.17", default-features = false, features = ["std"] } futures-util = { version = "0.3.17", default-features = false, features = ["std"] } itoa = "1" +nom = "8" smallvec = "1" tokio = { version = "1.13.1", features = ["sync", "io-util"] } tracing = { version = "0.1.30", default-features = false, features = ["log"] } @@ -31,11 +30,11 @@ tracing = { version = "0.1.30", default-features = false, features = ["log"] } actix-codec = "0.5" actix-rt = "2.6" actix-server = "2" - bytes = "1" const-str = "0.5" -env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink", "async-await-macro"] } +hex = "0.4" once_cell = "1" pretty_assertions = "1" tokio = { version = "1.13.1", features = ["io-util", "rt-multi-thread", "macros", "fs"] } +tracing-subscriber = "0.3" diff --git a/actix-proxy-protocol/examples/proxy-server.rs b/actix-proxy-protocol/examples/proxy-server.rs index 92062b6b..e26c518f 100644 --- a/actix-proxy-protocol/examples/proxy-server.rs +++ b/actix-proxy-protocol/examples/proxy-server.rs @@ -15,10 +15,13 @@ use actix_proxy_protocol::{tlv, v1, v2, AddressFamily, Command, TransportProtoco use actix_rt::net::TcpStream; use actix_server::Server; use actix_service::{fn_service, ServiceFactoryExt as _}; +use arrayvec::ArrayVec; use bytes::BytesMut; use const_str::concat_bytes; use once_cell::sync::Lazy; -use tokio::io::{copy_bidirectional, AsyncReadExt as _, AsyncWriteExt as _}; +use tokio::io::{ + copy_bidirectional, AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, +}; static UPSTREAM: Lazy = Lazy::new(|| SocketAddr::from(([127, 0, 0, 1], 8080))); @@ -47,7 +50,7 @@ async fn wrap_with_proxy_protocol_v1(mut stream: TcpStream) -> io::Result<()> { tracing::info!( "PROXYv1 {} -> {}", stream.peer_addr().unwrap(), - UPSTREAM.to_string() + UPSTREAM.to_string(), ); let proxy_header = v1::Header::new( @@ -69,7 +72,7 @@ async fn wrap_with_proxy_protocol_v2(mut stream: TcpStream) -> io::Result<()> { tracing::info!( "PROXYv2 {} -> {}", stream.peer_addr().unwrap(), - UPSTREAM.to_string() + UPSTREAM.to_string(), ); let mut proxy_header = v2::Header::new_tcp_ipv4_proxy(([127, 0, 0, 1], 8082), *UPSTREAM); @@ -87,19 +90,74 @@ async fn wrap_with_proxy_protocol_v2(mut stream: TcpStream) -> io::Result<()> { Ok(()) } +async fn unwrap_proxy_protocol(mut stream: TcpStream) -> io::Result<()> { + let mut upstream = TcpStream::connect(("127.0.0.1", 8080)).await?; + + tracing::info!( + "PROXY unwrap {} -> {}", + stream.peer_addr().unwrap(), + UPSTREAM.to_string(), + ); + + let mut header = [0; 12]; + stream.peek(&mut header).await?; + + eprintln!("header: {}", String::from_utf8_lossy(&header)); + + if &header[..v1::SIGNATURE.len()] == v1::SIGNATURE.as_bytes() { + tracing::info!("v1"); + + let mut stream = BufReader::new(stream); + let mut buf = Vec::with_capacity(v1::MAX_HEADER_SIZE); + let _len = stream.read_until(b'\n', &mut buf).await?; + + eprintln!("{}", String::from_utf8_lossy(&buf)); + + let (rest, header) = match v1::Header::try_from_bytes(&buf) { + Ok((rest, header)) => (rest, header), + Err(err) => { + match err { + nom::Err::Incomplete(needed) => todo!(), + nom::Err::Error(err) => { + eprintln!( + "err {:?}, input: {}", + err.code, + String::from_utf8_lossy(err.input) + ) + } + nom::Err::Failure(_) => todo!(), + } + return Ok(()); + } + }; + eprintln!("{:02X?} - {:?}", rest, header); + + let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?; + } else if header == v2::SIGNATURE { + tracing::info!("v2"); + let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?; + } else { + tracing::warn!("No proxy header; closing"); + } + + Ok(()) +} + fn start_server() -> io::Result { - let addr = ("127.0.0.1", 8082); - tracing::info!("starting proxy server on port: {}", &addr.0); tracing::info!("proxying to 127.0.0.1:8080"); Ok(Server::build() .bind("proxy-protocol-v1", ("127.0.0.1", 8081), move || { fn_service(wrap_with_proxy_protocol_v1) - .map_err(|err| tracing::error!("service error: {:?}", err)) + .map_err(|err| tracing::error!("service error: {err:?}")) })? - .bind("proxy-protocol-v2", addr, move || { + .bind("proxy-protocol-v2", ("127.0.0.1", 8082), move || { fn_service(wrap_with_proxy_protocol_v2) - .map_err(|err| tracing::error!("service error: {:?}", err)) + .map_err(|err| tracing::error!("service error: {err:?}")) + })? + .bind("proxy-protocol-unwrap", ("127.0.0.1", 8083), move || { + fn_service(unwrap_proxy_protocol) + .map_err(|err| tracing::error!("service error: {err:?}")) })? .workers(2) .run()) @@ -107,7 +165,9 @@ fn start_server() -> io::Result { #[tokio::main] async fn main() -> io::Result<()> { - env_logger::init_from_env(env_logger::Env::default().default_filter_or("info")); + tracing_subscriber::fmt::fmt().without_time().init(); + start_server()?.await?; + Ok(()) } diff --git a/actix-proxy-protocol/src/lib.rs b/actix-proxy-protocol/src/lib.rs index e79aade7..32bc075c 100644 --- a/actix-proxy-protocol/src/lib.rs +++ b/actix-proxy-protocol/src/lib.rs @@ -1,21 +1,9 @@ //! PROXY protocol. -#![deny(rust_2018_idioms, nonstandard_style)] -#![warn(future_incompatible)] -// #![warn(missing_docs)] -#![allow(unused)] +#![expect(dead_code)] #![doc(html_logo_url = "https://actix.rs/img/logo.png")] #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] -use std::{ - convert::TryFrom as _, - fmt, io, - net::{IpAddr, SocketAddr}, -}; - -use arrayvec::{ArrayString, ArrayVec}; -use tokio::io::{AsyncWrite, AsyncWriteExt as _}; - pub mod tlv; pub mod v1; pub mod v2; @@ -107,7 +95,7 @@ pub enum AddressFamily { } impl AddressFamily { - fn v1_str(&self) -> &'static str { + pub(crate) fn v1_str(&self) -> &'static str { match self { AddressFamily::Inet => "TCP4", AddressFamily::Inet6 => "TCP6", diff --git a/actix-proxy-protocol/src/tlv.rs b/actix-proxy-protocol/src/tlv.rs index 31ea98d9..04cd7173 100644 --- a/actix-proxy-protocol/src/tlv.rs +++ b/actix-proxy-protocol/src/tlv.rs @@ -217,7 +217,7 @@ bitflags::bitflags! { /// TLS (SSL). /// -/// Heckin broken atm. +/// Very broken atm. #[derive(Debug, Clone, PartialEq, Eq)] pub struct Ssl { /// The field is made of a bit field indicating which element is present. @@ -239,7 +239,7 @@ pub struct Ssl { impl Tlv for Ssl { const TYPE: u8 = PP2_TYPE_SSL; - fn try_from_value(value: &[u8]) -> Option { + fn try_from_value(_value: &[u8]) -> Option { /// The PP2_CLIENT_SSL flag indicates that the client connected over SSL/TLS. When /// this field is present, the US-ASCII string representation of the TLS version is /// appended at the end of the field in the TLV format using the type diff --git a/actix-proxy-protocol/src/v1.rs b/actix-proxy-protocol/src/v1.rs index a4ec1aa3..3cb29019 100644 --- a/actix-proxy-protocol/src/v1.rs +++ b/actix-proxy-protocol/src/v1.rs @@ -1,11 +1,13 @@ use std::{fmt, io, net::SocketAddr}; use arrayvec::ArrayVec; +use nom::{IResult, Parser as _}; use tokio::io::{AsyncWrite, AsyncWriteExt as _}; use crate::AddressFamily; -pub(crate) const SIGNATURE: &str = "PROXY"; +pub const SIGNATURE: &str = "PROXY"; +pub const MAX_HEADER_SIZE: usize = 107; #[derive(Debug, Clone)] pub struct Header { @@ -33,15 +35,19 @@ impl Header { } pub fn write_to(&self, wrt: &mut impl io::Write) -> io::Result<()> { - write!(wrt, "{}", self) + write!(wrt, "{self}") } pub async fn write_to_tokio(&self, wrt: &mut (impl AsyncWrite + Unpin)) -> io::Result<()> { // max length of a V1 header is 107 bytes - let mut buf = ArrayVec::<_, 107>::new(); + let mut buf = ArrayVec::<_, MAX_HEADER_SIZE>::new(); self.write_to(&mut buf)?; wrt.write_all(&buf).await } + + pub fn try_from_bytes(slice: &[u8]) -> IResult<&[u8], Self> { + parsing::parse(slice) + } } impl fmt::Display for Header { @@ -58,3 +64,85 @@ impl fmt::Display for Header { ) } } + +mod parsing { + use std::{ + net::{Ipv4Addr, SocketAddrV4}, + str::{self, FromStr}, + }; + + use nom::{ + branch::alt, + bytes::complete::{tag, take_while}, + character::complete::char, + combinator::{map, map_res}, + IResult, + }; + + use super::*; + + /// Parses a number from serialized representation (as bytes). + fn parse_number(input: &[u8]) -> IResult<&[u8], T> { + map_res(take_while(|c: u8| c.is_ascii_digit()), |s: &[u8]| { + let s = str::from_utf8(s).map_err(|_| "utf8 error")?; + let val = s.parse::().map_err(|_| "u8 parse error")?; + Ok::<_, Box>(val) + }) + .parse(input) + } + + /// Parses an address family. + fn parse_address_family(input: &[u8]) -> IResult<&[u8], AddressFamily> { + map_res(alt((tag("TCP4"), tag("TCP6"))), |af: &[u8]| match af { + b"TCP4" => Ok(AddressFamily::Inet), + b"TCP6" => Ok(AddressFamily::Inet6), + _ => Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid address family", + )), + }) + .parse(input) + } + + /// Parses an IPv4 address from serialized representation (as bytes). + fn parse_ipv4(input: &[u8]) -> IResult<&[u8], Ipv4Addr> { + map( + ( + parse_number::, + char('.'), + parse_number::, + char('.'), + parse_number::, + char('.'), + parse_number::, + ), + |(a, _, b, _, c, _, d)| Ipv4Addr::new(a, b, c, d), + ) + .parse(input) + } + + /// Parses an IPv4 address from ASCII bytes. + pub(super) fn parse(input: &[u8]) -> IResult<&[u8], Header> { + map( + ( + tag(SIGNATURE), + char(' '), + parse_address_family, + char(' '), + parse_ipv4, + char(' '), + parse_ipv4, + char(' '), + parse_number::, + char(' '), + parse_number::, + ), + |(_, _, af, _, src_ip, _, dst_ip, _, src_port, _, dst_port)| Header { + af, + src: SocketAddr::V4(SocketAddrV4::new(src_ip, src_port)), + dst: SocketAddr::V4(SocketAddrV4::new(dst_ip, dst_port)), + }, + ) + .parse(input) + } +} diff --git a/actix-proxy-protocol/src/v2.rs b/actix-proxy-protocol/src/v2.rs index ac92c61e..f5681e16 100644 --- a/actix-proxy-protocol/src/v2.rs +++ b/actix-proxy-protocol/src/v2.rs @@ -3,7 +3,7 @@ use std::{ net::{IpAddr, SocketAddr}, }; -use smallvec::{smallvec, SmallVec, ToSmallVec as _}; +use smallvec::{SmallVec, ToSmallVec as _}; use tokio::io::{AsyncWrite, AsyncWriteExt as _}; use crate::{ @@ -11,7 +11,7 @@ use crate::{ AddressFamily, Command, TransportProtocol, Version, }; -pub(crate) const SIGNATURE: [u8; 12] = [ +pub const SIGNATURE: [u8; 12] = [ 0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A, ]; @@ -192,7 +192,7 @@ impl Header { let mut buf = Vec::new(); this.write_to(&mut buf).unwrap(); - let mut crc_calc = crc32fast::hash(&buf); + let crc_calc = crc32fast::hash(&buf); Some(crc_sent.checksum == crc_calc) }