feat(adr-106): built-in CSI keepalive via managed ping processes
Continuation of ADR-106 (max raw signal off sensors). Operator was running `ping -i 0.05 192.168.0.101 &` by hand to keep CSI callbacks firing on the sensors. Server now does this itself: * Track per-node source addresses in NODE_ADDRS, populated on every recv_from via a cheap magic-byte peek (works for 0xC5110001 raw, 0xC5110002 vitals, 0xC5110006 feature_state). * csi_keepalive_task spawns one `ping -i <interval> <ip>` child per discovered sensor, re-spawns if the child dies or the sensor IP changes. Default 25 pkt/s via --csi-keepalive-pps; 0 disables. Why ICMP, not UDP: tried a UDP-based keepalive (send tiny UDP packet to sensor's known src port). Sensor's closed-port UDP rejected before the CSI callback fired on its side. ICMP echo gets handled in the WiFi stack regardless of any user-space listener so CSI fires reliably. Verified live, no external `ping` running: keepalive: ping -i 0.040 192.168.0.101 for node 1 node 1: 55.6 Hz raw CSI (amp+phase populated) node 2: 55.6 Hz raw CSI (amp+phase populated) Combined with ADR-106 NodeInfo fields (phases, noise_floor_dbm, n_antennas, timestamp_us) this gives downstream consumers — UI, classifier, future ML model — the full complex CSI signal at high rate without any operator-side ritual.
This commit is contained in:
parent
4daa2c9bc2
commit
8489efe9ae
|
|
@ -764,6 +764,11 @@ struct Args {
|
||||||
#[arg(long, default_value = "5005")]
|
#[arg(long, default_value = "5005")]
|
||||||
udp_port: u16,
|
udp_port: u16,
|
||||||
|
|
||||||
|
/// ADR-106: keepalive packets/sec sent back to each sensor to drive
|
||||||
|
/// CSI callback rate (no FW change required). 0 disables.
|
||||||
|
#[arg(long, default_value = "20")]
|
||||||
|
csi_keepalive_pps: u32,
|
||||||
|
|
||||||
/// Path to UI static files
|
/// Path to UI static files
|
||||||
#[arg(long, default_value = "../../ui")]
|
#[arg(long, default_value = "../../ui")]
|
||||||
ui_path: PathBuf,
|
ui_path: PathBuf,
|
||||||
|
|
@ -4469,6 +4474,78 @@ async fn info_page() -> Html<String> {
|
||||||
|
|
||||||
// ── UDP receiver task ────────────────────────────────────────────────────────
|
// ── UDP receiver task ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/// ADR-106: stash per-node source addresses for the keepalive pinger.
|
||||||
|
/// Updated on every recv_from in the UDP receiver task; consumed by
|
||||||
|
/// `csi_keepalive_task` to send back small UDP packets that keep the
|
||||||
|
/// sensor's WiFi RX stack busy and therefore its CSI callback firing.
|
||||||
|
static NODE_ADDRS: OnceLock<Mutex<std::collections::HashMap<u8, std::net::SocketAddr>>> = OnceLock::new();
|
||||||
|
fn node_addrs_init() -> &'static Mutex<std::collections::HashMap<u8, std::net::SocketAddr>> {
|
||||||
|
NODE_ADDRS.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Drives CSI callback rate on each sensor by sending ICMP echo at
|
||||||
|
/// `pps` pkt/s. Each sensor's FW receives the ping → WiFi RX produces
|
||||||
|
/// a CSI frame → server sees raw CSI from it. No FW change needed.
|
||||||
|
///
|
||||||
|
/// Replaces the ad-hoc `ping -i 0.05 192.168.0.10x &` shell pattern
|
||||||
|
/// the operator was running by hand. Spawns one `ping` child process
|
||||||
|
/// per discovered sensor address (UDP keepalive via `send_to` does
|
||||||
|
/// not work — sensor drops closed-port UDP before CSI callback fires;
|
||||||
|
/// ICMP gets handled by the WiFi stack regardless of any user-space
|
||||||
|
/// listener).
|
||||||
|
async fn csi_keepalive_task(pps: u32) {
|
||||||
|
if pps == 0 {
|
||||||
|
info!("CSI keepalive disabled (--csi-keepalive-pps 0)");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let interval_sec = 1.0 / pps as f64;
|
||||||
|
info!("CSI keepalive: {pps} ICMP pkt/s/node (interval {interval_sec:.3}s)");
|
||||||
|
|
||||||
|
// node_id -> running child handle. We re-spawn if a child dies or
|
||||||
|
// if the sensor's address changes (DHCP rotation, etc.).
|
||||||
|
let mut children: std::collections::HashMap<u8, (std::net::IpAddr, tokio::process::Child)> =
|
||||||
|
std::collections::HashMap::new();
|
||||||
|
|
||||||
|
let ping_bin = if std::path::Path::new("/sbin/ping").exists() {
|
||||||
|
"/sbin/ping"
|
||||||
|
} else { "/usr/bin/ping" };
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Refresh known sensor addresses (no clones inside the lock).
|
||||||
|
let snapshot: Vec<(u8, std::net::IpAddr)> = {
|
||||||
|
let m = node_addrs_init().lock().unwrap();
|
||||||
|
m.iter().map(|(k, v)| (*k, v.ip())).collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Re-spawn for any node whose ping died or whose IP changed.
|
||||||
|
for (nid, ip) in &snapshot {
|
||||||
|
let need_spawn = match children.get_mut(nid) {
|
||||||
|
None => true,
|
||||||
|
Some((prev_ip, child)) => {
|
||||||
|
if prev_ip != ip { true }
|
||||||
|
else { matches!(child.try_wait(), Ok(Some(_))) }
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if need_spawn {
|
||||||
|
let interval_str = format!("{interval_sec:.3}");
|
||||||
|
let ip_str = ip.to_string();
|
||||||
|
match tokio::process::Command::new(ping_bin)
|
||||||
|
.args(["-i", &interval_str, &ip_str])
|
||||||
|
.stdout(std::process::Stdio::null())
|
||||||
|
.stderr(std::process::Stdio::null())
|
||||||
|
.spawn() {
|
||||||
|
Ok(child) => {
|
||||||
|
info!("keepalive: ping -i {interval_str} {ip_str} for node {nid}");
|
||||||
|
children.insert(*nid, (*ip, child));
|
||||||
|
}
|
||||||
|
Err(e) => error!("keepalive: failed to spawn ping for node {nid}: {e}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn udp_receiver_task(state: SharedState, udp_port: u16) {
|
async fn udp_receiver_task(state: SharedState, udp_port: u16) {
|
||||||
let addr = format!("0.0.0.0:{udp_port}");
|
let addr = format!("0.0.0.0:{udp_port}");
|
||||||
let socket = match UdpSocket::bind(&addr).await {
|
let socket = match UdpSocket::bind(&addr).await {
|
||||||
|
|
@ -4486,6 +4563,27 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
|
||||||
loop {
|
loop {
|
||||||
match socket.recv_from(&mut buf).await {
|
match socket.recv_from(&mut buf).await {
|
||||||
Ok((len, src)) => {
|
Ok((len, src)) => {
|
||||||
|
// ADR-106: stash sender address by node_id (peeked from
|
||||||
|
// packet magic+payload) so the keepalive task can ping
|
||||||
|
// back. Both feature_state and raw CSI parsers expose
|
||||||
|
// node_id near the start; do a cheap peek before full
|
||||||
|
// parse. If we can't read node_id, we'll learn it on a
|
||||||
|
// later packet — keepalive simply won't fire for this
|
||||||
|
// source until then.
|
||||||
|
if len >= 5 {
|
||||||
|
let magic = u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]]);
|
||||||
|
let nid_peek = if matches!(magic, 0xC511_0001 | 0xC511_0002 | 0xC511_0006) {
|
||||||
|
Some(buf[4])
|
||||||
|
} else { None };
|
||||||
|
if let Some(nid) = nid_peek {
|
||||||
|
let mut m = node_addrs_init().lock().unwrap();
|
||||||
|
let prev = m.insert(nid, src);
|
||||||
|
if prev.is_none() {
|
||||||
|
info!("keepalive: learned address for node {nid} = {src}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ADR-081 feature_state packet (magic 0xC511_0006) — preferred upstream
|
// ADR-081 feature_state packet (magic 0xC511_0006) — preferred upstream
|
||||||
// payload from the firmware. Convert to Esp32VitalsPacket so the rest of
|
// payload from the firmware. Convert to Esp32VitalsPacket so the rest of
|
||||||
// the pipeline (rendering, sensing_update broadcast) handles it uniformly.
|
// the pipeline (rendering, sensing_update broadcast) handles it uniformly.
|
||||||
|
|
@ -5804,6 +5902,9 @@ async fn main() {
|
||||||
match source {
|
match source {
|
||||||
"esp32" => {
|
"esp32" => {
|
||||||
tokio::spawn(udp_receiver_task(state.clone(), args.udp_port));
|
tokio::spawn(udp_receiver_task(state.clone(), args.udp_port));
|
||||||
|
// ADR-106: drive CSI rate by pinging sensors back ourselves
|
||||||
|
// instead of relying on the operator's ad-hoc `ping -i 0.05 …`.
|
||||||
|
tokio::spawn(csi_keepalive_task(args.csi_keepalive_pps));
|
||||||
tokio::spawn(broadcast_tick_task(state.clone(), args.tick_ms));
|
tokio::spawn(broadcast_tick_task(state.clone(), args.tick_ms));
|
||||||
}
|
}
|
||||||
"wifi" => {
|
"wifi" => {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue