feat(adr-106): built-in CSI keepalive via managed ping processes

Continuation of ADR-106 (max raw signal off sensors).

Operator was running `ping -i 0.05 192.168.0.101 &` by hand to keep CSI
callbacks firing on the sensors. Server now does this itself:

* Track per-node source addresses in NODE_ADDRS, populated on every
  recv_from via a cheap magic-byte peek (works for 0xC5110001 raw,
  0xC5110002 vitals, 0xC5110006 feature_state).
* csi_keepalive_task spawns one `ping -i <interval> <ip>` child per
  discovered sensor, re-spawns if the child dies or the sensor IP
  changes. Default 25 pkt/s via --csi-keepalive-pps; 0 disables.

Why ICMP, not UDP: tried a UDP-based keepalive (send tiny UDP packet
to sensor's known src port). Sensor's closed-port UDP rejected before
the CSI callback fired on its side. ICMP echo gets handled in the
WiFi stack regardless of any user-space listener so CSI fires reliably.

Verified live, no external `ping` running:
  keepalive: ping -i 0.040 192.168.0.101 for node 1
  node 1: 55.6 Hz raw CSI (amp+phase populated)
  node 2: 55.6 Hz raw CSI (amp+phase populated)

Combined with ADR-106 NodeInfo fields (phases, noise_floor_dbm,
n_antennas, timestamp_us) this gives downstream consumers — UI,
classifier, future ML model — the full complex CSI signal at high
rate without any operator-side ritual.
This commit is contained in:
arsen 2026-05-17 11:57:23 +07:00
parent 4daa2c9bc2
commit 8489efe9ae
1 changed files with 101 additions and 0 deletions

View File

@ -764,6 +764,11 @@ struct Args {
#[arg(long, default_value = "5005")]
udp_port: u16,
/// ADR-106: keepalive packets/sec sent back to each sensor to drive
/// CSI callback rate (no FW change required). 0 disables.
#[arg(long, default_value = "20")]
csi_keepalive_pps: u32,
/// Path to UI static files
#[arg(long, default_value = "../../ui")]
ui_path: PathBuf,
@ -4469,6 +4474,78 @@ async fn info_page() -> Html<String> {
// ── UDP receiver task ────────────────────────────────────────────────────────
/// ADR-106: stash per-node source addresses for the keepalive pinger.
/// Updated on every recv_from in the UDP receiver task; consumed by
/// `csi_keepalive_task` to send back small UDP packets that keep the
/// sensor's WiFi RX stack busy and therefore its CSI callback firing.
static NODE_ADDRS: OnceLock<Mutex<std::collections::HashMap<u8, std::net::SocketAddr>>> = OnceLock::new();
fn node_addrs_init() -> &'static Mutex<std::collections::HashMap<u8, std::net::SocketAddr>> {
NODE_ADDRS.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
}
/// Drives CSI callback rate on each sensor by sending ICMP echo at
/// `pps` pkt/s. Each sensor's FW receives the ping → WiFi RX produces
/// a CSI frame → server sees raw CSI from it. No FW change needed.
///
/// Replaces the ad-hoc `ping -i 0.05 192.168.0.10x &` shell pattern
/// the operator was running by hand. Spawns one `ping` child process
/// per discovered sensor address (UDP keepalive via `send_to` does
/// not work — sensor drops closed-port UDP before CSI callback fires;
/// ICMP gets handled by the WiFi stack regardless of any user-space
/// listener).
async fn csi_keepalive_task(pps: u32) {
if pps == 0 {
info!("CSI keepalive disabled (--csi-keepalive-pps 0)");
return;
}
let interval_sec = 1.0 / pps as f64;
info!("CSI keepalive: {pps} ICMP pkt/s/node (interval {interval_sec:.3}s)");
// node_id -> running child handle. We re-spawn if a child dies or
// if the sensor's address changes (DHCP rotation, etc.).
let mut children: std::collections::HashMap<u8, (std::net::IpAddr, tokio::process::Child)> =
std::collections::HashMap::new();
let ping_bin = if std::path::Path::new("/sbin/ping").exists() {
"/sbin/ping"
} else { "/usr/bin/ping" };
loop {
// Refresh known sensor addresses (no clones inside the lock).
let snapshot: Vec<(u8, std::net::IpAddr)> = {
let m = node_addrs_init().lock().unwrap();
m.iter().map(|(k, v)| (*k, v.ip())).collect()
};
// Re-spawn for any node whose ping died or whose IP changed.
for (nid, ip) in &snapshot {
let need_spawn = match children.get_mut(nid) {
None => true,
Some((prev_ip, child)) => {
if prev_ip != ip { true }
else { matches!(child.try_wait(), Ok(Some(_))) }
}
};
if need_spawn {
let interval_str = format!("{interval_sec:.3}");
let ip_str = ip.to_string();
match tokio::process::Command::new(ping_bin)
.args(["-i", &interval_str, &ip_str])
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn() {
Ok(child) => {
info!("keepalive: ping -i {interval_str} {ip_str} for node {nid}");
children.insert(*nid, (*ip, child));
}
Err(e) => error!("keepalive: failed to spawn ping for node {nid}: {e}"),
}
}
}
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}
}
async fn udp_receiver_task(state: SharedState, udp_port: u16) {
let addr = format!("0.0.0.0:{udp_port}");
let socket = match UdpSocket::bind(&addr).await {
@ -4486,6 +4563,27 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
loop {
match socket.recv_from(&mut buf).await {
Ok((len, src)) => {
// ADR-106: stash sender address by node_id (peeked from
// packet magic+payload) so the keepalive task can ping
// back. Both feature_state and raw CSI parsers expose
// node_id near the start; do a cheap peek before full
// parse. If we can't read node_id, we'll learn it on a
// later packet — keepalive simply won't fire for this
// source until then.
if len >= 5 {
let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
let nid_peek = if matches!(magic, 0xC511_0001 | 0xC511_0002 | 0xC511_0006) {
Some(buf[4])
} else { None };
if let Some(nid) = nid_peek {
let mut m = node_addrs_init().lock().unwrap();
let prev = m.insert(nid, src);
if prev.is_none() {
info!("keepalive: learned address for node {nid} = {src}");
}
}
}
// ADR-081 feature_state packet (magic 0xC511_0006) — preferred upstream
// payload from the firmware. Convert to Esp32VitalsPacket so the rest of
// the pipeline (rendering, sensing_update broadcast) handles it uniformly.
@ -5804,6 +5902,9 @@ async fn main() {
match source {
"esp32" => {
tokio::spawn(udp_receiver_task(state.clone(), args.udp_port));
// ADR-106: drive CSI rate by pinging sensors back ourselves
// instead of relying on the operator's ad-hoc `ping -i 0.05 …`.
tokio::spawn(csi_keepalive_task(args.csi_keepalive_pps));
tokio::spawn(broadcast_tick_task(state.clone(), args.tick_ms));
}
"wifi" => {