//! Adds PROXY protocol v1 prelude to connections. #![allow(unused)] use std::{ io, mem, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, }, }; use actix_proxy_protocol::{tlv, v1, v2, AddressFamily, Command, TransportProtocol}; use actix_rt::net::TcpStream; use actix_server::Server; use actix_service::{fn_service, ServiceFactoryExt as _}; use arrayvec::ArrayVec; use bytes::BytesMut; use const_str::concat_bytes; use once_cell::sync::Lazy; use tokio::io::{ copy_bidirectional, AsyncBufReadExt as _, AsyncReadExt as _, AsyncWriteExt as _, BufReader, }; static UPSTREAM: Lazy = Lazy::new(|| SocketAddr::from(([127, 0, 0, 1], 8080))); /* NOTES: 108 byte buffer on receiver side is enough for any PROXY header after PROXY, receive until CRLF, *then* decode parts TLV = type-length-value TO DO: handle UNKNOWN transport v2 UNSPEC mode AF_UNIX socket */ fn extend_with_ip_bytes(buf: &mut Vec, ip: IpAddr) { match ip { IpAddr::V4(ip) => buf.extend_from_slice(&ip.octets()), IpAddr::V6(ip) => buf.extend_from_slice(&ip.octets()), } } async fn wrap_with_proxy_protocol_v1(mut stream: TcpStream) -> io::Result<()> { let mut upstream = TcpStream::connect(("127.0.0.1", 8080)).await?; tracing::info!( "PROXYv1 {} -> {}", stream.peer_addr().unwrap(), UPSTREAM.to_string(), ); let proxy_header = v1::Header::new( AddressFamily::Inet, SocketAddr::from(([127, 0, 0, 1], 8081)), *UPSTREAM, ); proxy_header.write_to_tokio(&mut upstream).await?; let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?; Ok(()) } async fn wrap_with_proxy_protocol_v2(mut stream: TcpStream) -> io::Result<()> { let mut upstream = TcpStream::connect(("127.0.0.1", 8080)).await?; tracing::info!( "PROXYv2 {} -> {}", stream.peer_addr().unwrap(), UPSTREAM.to_string(), ); let mut proxy_header = v2::Header::new_tcp_ipv4_proxy(([127, 0, 0, 1], 8082), *UPSTREAM); proxy_header.add_typed_tlv(tlv::UniqueId::new("4269")); // UNIQUE_ID proxy_header.add_typed_tlv(tlv::Noop::new("NOOP m8")); // NOOP proxy_header.add_typed_tlv(tlv::Authority::new("localhost")); // NOOP proxy_header.add_typed_tlv(tlv::Alpn::new("http/1.1")); // NOOP proxy_header.add_crc23c_checksum(); proxy_header.write_to_tokio(&mut upstream).await?; let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?; Ok(()) } async fn unwrap_proxy_protocol(mut stream: TcpStream) -> io::Result<()> { let mut upstream = TcpStream::connect(("127.0.0.1", 8080)).await?; tracing::info!( "PROXY unwrap {} -> {}", stream.peer_addr().unwrap(), UPSTREAM.to_string(), ); let mut header = [0; 12]; stream.peek(&mut header).await?; eprintln!("header: {}", String::from_utf8_lossy(&header)); if &header[..v1::SIGNATURE.len()] == v1::SIGNATURE.as_bytes() { tracing::info!("v1"); let mut stream = BufReader::new(stream); let mut buf = Vec::with_capacity(v1::MAX_HEADER_SIZE); let _len = stream.read_until(b'\n', &mut buf).await?; eprintln!("{}", String::from_utf8_lossy(&buf)); let (rest, header) = match v1::Header::try_from_bytes(&buf) { Ok((rest, header)) => (rest, header), Err(err) => { match err { nom::Err::Incomplete(needed) => todo!(), nom::Err::Error(err) => { eprintln!( "err {:?}, input: {}", err.code, String::from_utf8_lossy(err.input) ) } nom::Err::Failure(_) => todo!(), } return Ok(()); } }; eprintln!("{:02X?} - {:?}", rest, header); let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?; } else if header == v2::SIGNATURE { tracing::info!("v2"); let (_bytes_read, _bytes_written) = copy_bidirectional(&mut stream, &mut upstream).await?; } else { tracing::warn!("No proxy header; closing"); } Ok(()) } fn start_server() -> io::Result { tracing::info!("proxying to 127.0.0.1:8080"); Ok(Server::build() .bind("proxy-protocol-v1", ("127.0.0.1", 8081), move || { fn_service(wrap_with_proxy_protocol_v1) .map_err(|err| tracing::error!("service error: {err:?}")) })? .bind("proxy-protocol-v2", ("127.0.0.1", 8082), move || { fn_service(wrap_with_proxy_protocol_v2) .map_err(|err| tracing::error!("service error: {err:?}")) })? .bind("proxy-protocol-unwrap", ("127.0.0.1", 8083), move || { fn_service(unwrap_proxy_protocol) .map_err(|err| tracing::error!("service error: {err:?}")) })? .workers(2) .run()) } #[tokio::main] async fn main() -> io::Result<()> { tracing_subscriber::fmt::fmt().without_time().init(); start_server()?.await?; Ok(()) }