diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index d22a1047..f978e054 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -764,6 +764,11 @@ struct Args { #[arg(long, default_value = "5005")] udp_port: u16, + /// ADR-106: keepalive packets/sec sent back to each sensor to drive + /// CSI callback rate (no FW change required). 0 disables. + #[arg(long, default_value = "20")] + csi_keepalive_pps: u32, + /// Path to UI static files #[arg(long, default_value = "../../ui")] ui_path: PathBuf, @@ -4469,6 +4474,78 @@ async fn info_page() -> Html { // ── UDP receiver task ──────────────────────────────────────────────────────── +/// ADR-106: stash per-node source addresses for the keepalive pinger. +/// Updated on every recv_from in the UDP receiver task; consumed by +/// `csi_keepalive_task` to send back small UDP packets that keep the +/// sensor's WiFi RX stack busy and therefore its CSI callback firing. +static NODE_ADDRS: OnceLock>> = OnceLock::new(); +fn node_addrs_init() -> &'static Mutex> { + NODE_ADDRS.get_or_init(|| Mutex::new(std::collections::HashMap::new())) +} + +/// Drives CSI callback rate on each sensor by sending ICMP echo at +/// `pps` pkt/s. Each sensor's FW receives the ping → WiFi RX produces +/// a CSI frame → server sees raw CSI from it. No FW change needed. +/// +/// Replaces the ad-hoc `ping -i 0.05 192.168.0.10x &` shell pattern +/// the operator was running by hand. Spawns one `ping` child process +/// per discovered sensor address (UDP keepalive via `send_to` does +/// not work — sensor drops closed-port UDP before CSI callback fires; +/// ICMP gets handled by the WiFi stack regardless of any user-space +/// listener). +async fn csi_keepalive_task(pps: u32) { + if pps == 0 { + info!("CSI keepalive disabled (--csi-keepalive-pps 0)"); + return; + } + let interval_sec = 1.0 / pps as f64; + info!("CSI keepalive: {pps} ICMP pkt/s/node (interval {interval_sec:.3}s)"); + + // node_id -> running child handle. We re-spawn if a child dies or + // if the sensor's address changes (DHCP rotation, etc.). + let mut children: std::collections::HashMap = + std::collections::HashMap::new(); + + let ping_bin = if std::path::Path::new("/sbin/ping").exists() { + "/sbin/ping" + } else { "/usr/bin/ping" }; + + loop { + // Refresh known sensor addresses (no clones inside the lock). + let snapshot: Vec<(u8, std::net::IpAddr)> = { + let m = node_addrs_init().lock().unwrap(); + m.iter().map(|(k, v)| (*k, v.ip())).collect() + }; + + // Re-spawn for any node whose ping died or whose IP changed. + for (nid, ip) in &snapshot { + let need_spawn = match children.get_mut(nid) { + None => true, + Some((prev_ip, child)) => { + if prev_ip != ip { true } + else { matches!(child.try_wait(), Ok(Some(_))) } + } + }; + if need_spawn { + let interval_str = format!("{interval_sec:.3}"); + let ip_str = ip.to_string(); + match tokio::process::Command::new(ping_bin) + .args(["-i", &interval_str, &ip_str]) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn() { + Ok(child) => { + info!("keepalive: ping -i {interval_str} {ip_str} for node {nid}"); + children.insert(*nid, (*ip, child)); + } + Err(e) => error!("keepalive: failed to spawn ping for node {nid}: {e}"), + } + } + } + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + } +} + async fn udp_receiver_task(state: SharedState, udp_port: u16) { let addr = format!("0.0.0.0:{udp_port}"); let socket = match UdpSocket::bind(&addr).await { @@ -4486,6 +4563,27 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { loop { match socket.recv_from(&mut buf).await { Ok((len, src)) => { + // ADR-106: stash sender address by node_id (peeked from + // packet magic+payload) so the keepalive task can ping + // back. Both feature_state and raw CSI parsers expose + // node_id near the start; do a cheap peek before full + // parse. If we can't read node_id, we'll learn it on a + // later packet — keepalive simply won't fire for this + // source until then. + if len >= 5 { + let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]); + let nid_peek = if matches!(magic, 0xC511_0001 | 0xC511_0002 | 0xC511_0006) { + Some(buf[4]) + } else { None }; + if let Some(nid) = nid_peek { + let mut m = node_addrs_init().lock().unwrap(); + let prev = m.insert(nid, src); + if prev.is_none() { + info!("keepalive: learned address for node {nid} = {src}"); + } + } + } + // ADR-081 feature_state packet (magic 0xC511_0006) — preferred upstream // payload from the firmware. Convert to Esp32VitalsPacket so the rest of // the pipeline (rendering, sensing_update broadcast) handles it uniformly. @@ -5804,6 +5902,9 @@ async fn main() { match source { "esp32" => { tokio::spawn(udp_receiver_task(state.clone(), args.udp_port)); + // ADR-106: drive CSI rate by pinging sensors back ourselves + // instead of relying on the operator's ad-hoc `ping -i 0.05 …`. + tokio::spawn(csi_keepalive_task(args.csi_keepalive_pps)); tokio::spawn(broadcast_tick_task(state.clone(), args.tick_ms)); } "wifi" => {