From b12662a54d64be1a22b67820412f63b713d73856 Mon Sep 17 00:00:00 2001 From: rUv Date: Tue, 2 Jun 2026 19:26:01 +0200 Subject: [PATCH] fix(mqtt): per-node HA devices use each node's own presence/motion (#872) (#918) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The MQTT bridge fanned out one Home-Assistant device per node (#898) but applied the *room-level aggregate* classification to every node — so in a multi-node setup a node in an empty corner inherited another node's "present", and `motion_level: "absent"` was mis-mapped to full motion (the aggregate match fell through `Some(_) => 1.0`). Each node in the sensing broadcast's `nodes` array already carries its own `classification` (`motion_level`/`presence`/`confidence`, see PerNodeFeatureInfo) and RSSI. Now each per-node snapshot reads that node's own classification, deferring to the room aggregate only for fields a node omits. Vitals (breathing/heart rate) and person count stay room-level. Extracted the JSON→VitalsSnapshot mapping into a pure, testable function (`vitals_snapshots_from_sensing_json`) and added 4 unit tests covering per-node divergence, partial-field fallback, the no-nodes aggregate path, and the absent→zero-motion fix. Supersedes #899, which targeted the right bug but read non-existent fields (`node["motion_level"]` / `node["status"]` instead of the nested `node["classification"]` + `stale`). Verified: builds with `--features mqtt`; new tests pass; full crate unit suite 432 + 114 passed, 0 failed. --- .../wifi-densepose-sensing-server/src/main.rs | 248 ++++++++++++++---- 1 file changed, 198 insertions(+), 50 deletions(-) diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index c986824f..5051fd83 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -5476,6 +5476,100 @@ async fn broadcast_tick_task(state: SharedState, tick_ms: u64) { } } +/// Map one sensing-broadcast JSON document into the `VitalsSnapshot`(s) to +/// publish over MQTT (issues #872/#898). +/// +/// Multi-node sources carry a `nodes` array where **each node has its own +/// `classification`** (`motion_level`, `presence`, `confidence`) and RSSI — so +/// each node must surface its *own* presence/motion, not the room-level +/// aggregate. Previously the bridge applied the aggregate `classification` to +/// every per-node Home-Assistant device, so a node in an empty corner inherited +/// another node's "present" (and `motion_level: "absent"` was mis-mapped to full +/// motion). Vitals (breathing / heart rate) and the person count are room-level +/// and shared across the per-node devices. Falls back to a single aggregate +/// snapshot when there is no per-node data (e.g. wifi / simulate sources). +#[cfg(feature = "mqtt")] +fn vitals_snapshots_from_sensing_json( + v: &serde_json::Value, + base_id: &str, +) -> Vec { + use wifi_densepose_sensing_server::mqtt::state::VitalsSnapshot; + + // motion_level string -> motion scalar. "absent"/"none"/"still"/"idle"/"" + // are non-moving; anything else (walking, …) is motion. `fallback` is used + // when the field is absent so a partial per-node payload defers to the + // room aggregate rather than silently reading 0. + fn motion_of(level: Option<&str>, fallback: f64) -> f64 { + match level { + Some("none") | Some("still") | Some("idle") | Some("absent") | Some("") => 0.0, + Some(_) => 1.0, + None => fallback, + } + } + + let ts = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64; + let vit = &v["vital_signs"]; + let breathing = vit["breathing_rate_bpm"].as_f64(); + let hr = vit["heart_rate_bpm"].as_f64(); + let n_persons = v["persons"] + .as_array() + .map(|a| a.len() as u32) + .or_else(|| v["estimated_persons"].as_u64().map(|x| x as u32)) + .unwrap_or(0); + + // Room-level aggregate: the no-nodes fallback, and the per-node default for + // any field a node omits. + let acls = &v["classification"]; + let agg_presence = acls["presence"].as_bool().unwrap_or(false); + let agg_motion = motion_of(acls["motion_level"].as_str(), 0.0); + let agg_conf = acls["confidence"].as_f64().unwrap_or(0.0); + + let mk = |node_id: String, presence: bool, motion: f64, conf: f64, rssi: Option| { + VitalsSnapshot { + node_id, + timestamp_ms: ts, + presence, + motion, + presence_score: if presence { conf.max(0.0) } else { 0.0 }, + breathing_rate_bpm: breathing, + heartrate_bpm: hr, + n_persons, + rssi_dbm: rssi, + vital_confidence: conf, + ..Default::default() + } + }; + + match v["nodes"].as_array() { + Some(arr) if !arr.is_empty() => arr + .iter() + .map(|node| { + let n = node["node_id"].as_u64().unwrap_or(0); + // Each node carries its OWN classification — use it, deferring to + // the room aggregate only for fields the node omits. + let ncls = &node["classification"]; + let presence = ncls["presence"].as_bool().unwrap_or(agg_presence); + let motion = motion_of(ncls["motion_level"].as_str(), agg_motion); + let conf = ncls["confidence"].as_f64().unwrap_or(agg_conf); + mk( + format!("{base_id}-node{n}"), + presence, + motion, + conf, + node["rssi_dbm"].as_f64(), + ) + }) + .collect(), + _ => vec![mk( + base_id.to_string(), + agg_presence, + agg_motion, + agg_conf, + v["nodes"][0]["rssi_dbm"].as_f64(), + )], + } +} + // ── Main ───────────────────────────────────────────────────────────────────── /// If `--ui-path` points nowhere (wrong cwd), try common repo layouts relative to cwd. @@ -6200,56 +6294,13 @@ async fn main() { let Ok(v) = serde_json::from_str::(&json) else { continue; }; - let cls = &v["classification"]; - let vit = &v["vital_signs"]; - let presence = cls["presence"].as_bool().unwrap_or(false); - let n_persons = v["persons"] - .as_array() - .map(|a| a.len() as u32) - .or_else(|| v["estimated_persons"].as_u64().map(|x| x as u32)) - .unwrap_or(0); - let motion = match cls["motion_level"].as_str() { - Some("none") | Some("still") | Some("idle") | Some("") => 0.0, - Some(_) => 1.0, - None => 0.0, - }; - let ts = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64; - let conf = cls["confidence"].as_f64().unwrap_or(0.0); - let presence_score = if presence { conf.max(0.0) } else { 0.0 }; - let breathing = vit["breathing_rate_bpm"].as_f64(); - let hr = vit["heart_rate_bpm"].as_f64(); - // #898: emit one snapshot per physical node so each - // surfaces as its own Home-Assistant device (with - // its own RSSI + availability). Falls back to a - // single aggregate snapshot when there is no - // per-node data (e.g. wifi / simulate sources). - let mk = |nid: String, rssi: Option| mqtt::state::VitalsSnapshot { - node_id: nid, - timestamp_ms: ts, - presence, - motion, - presence_score, - breathing_rate_bpm: breathing, - heartrate_bpm: hr, - n_persons, - rssi_dbm: rssi, - vital_confidence: conf, - ..Default::default() - }; - match v["nodes"].as_array() { - Some(arr) if !arr.is_empty() => { - for node in arr { - let n = node["node_id"].as_u64().unwrap_or(0); - let nid = format!("{node_id}-node{n}"); - let _ = vtx.send(mk(nid, node["rssi_dbm"].as_f64())); - } - } - _ => { - let _ = vtx.send(mk( - node_id.clone(), - v["nodes"][0]["rssi_dbm"].as_f64(), - )); - } + // #898/#872: emit one snapshot per physical node so + // each surfaces as its own Home-Assistant device with + // its *own* presence/motion/RSSI (see + // vitals_snapshots_from_sensing_json). Falls back to a + // single aggregate snapshot for per-node-less sources. + for snap in vitals_snapshots_from_sensing_json(&v, &node_id) { + let _ = vtx.send(snap); } } }); @@ -7068,3 +7119,100 @@ mod rolling_p95_tests { assert_eq!(p.len(), 1); } } + +#[cfg(all(test, feature = "mqtt"))] +mod mqtt_bridge_tests { + use super::vitals_snapshots_from_sensing_json; + use serde_json::json; + + /// Regression for the per-node presence bug (#872/#898): each node must + /// surface its OWN classification, not the room-level aggregate. Node 1 is + /// present+moving; node 2 is absent — node 2 must NOT inherit node 1's + /// "present". + #[test] + fn per_node_presence_uses_each_nodes_own_classification() { + let v = json!({ + "timestamp": 1.0, + "classification": { "presence": true, "motion_level": "walking", "confidence": 0.9 }, + "vital_signs": { "breathing_rate_bpm": 14.0, "heart_rate_bpm": 60.0 }, + "persons": [{}, {}], + "nodes": [ + { "node_id": 1, "rssi_dbm": -40.0, + "classification": { "presence": true, "motion_level": "walking", "confidence": 0.8 } }, + { "node_id": 2, "rssi_dbm": -70.0, + "classification": { "presence": false, "motion_level": "absent", "confidence": 0.1 } } + ] + }); + let snaps = vitals_snapshots_from_sensing_json(&v, "ruview"); + assert_eq!(snaps.len(), 2, "one snapshot per node"); + + let n1 = snaps.iter().find(|s| s.node_id == "ruview-node1").unwrap(); + let n2 = snaps.iter().find(|s| s.node_id == "ruview-node2").unwrap(); + + assert!(n1.presence && n1.motion > 0.0, "node1 present + moving"); + assert!( + !n2.presence && n2.motion == 0.0, + "node2 must be absent — not inherit the room aggregate" + ); + // Per-node RSSI preserved. + assert_eq!(n1.rssi_dbm, Some(-40.0)); + assert_eq!(n2.rssi_dbm, Some(-70.0)); + // Vitals + person count are room-level, shared across node devices. + assert_eq!(n1.n_persons, 2); + assert_eq!(n2.n_persons, 2); + assert_eq!(n1.breathing_rate_bpm, Some(14.0)); + assert_eq!(n2.heartrate_bpm, Some(60.0)); + // presence_score is gated on presence. + assert!(n1.presence_score > 0.0); + assert_eq!(n2.presence_score, 0.0); + } + + /// A node that omits a classification field defers to the room aggregate + /// rather than silently reading false/0. + #[test] + fn per_node_missing_fields_fall_back_to_aggregate() { + let v = json!({ + "timestamp": 1.0, + "classification": { "presence": true, "motion_level": "still", "confidence": 0.7 }, + "vital_signs": {}, + "nodes": [ { "node_id": 3, "rssi_dbm": -55.0 } ] // no per-node classification + }); + let snaps = vitals_snapshots_from_sensing_json(&v, "n"); + assert_eq!(snaps.len(), 1); + assert_eq!(snaps[0].node_id, "n-node3"); + assert!(snaps[0].presence, "defers to aggregate presence"); + assert_eq!(snaps[0].motion, 0.0, "aggregate 'still' => no motion"); + } + + /// No `nodes` array (wifi / simulate sources): single aggregate snapshot + /// keyed by the base id. + #[test] + fn falls_back_to_single_aggregate_when_no_nodes() { + let v = json!({ + "timestamp": 2.0, + "classification": { "presence": true, "motion_level": "idle", "confidence": 0.6 }, + "vital_signs": { "breathing_rate_bpm": 12.0 }, + "persons": [{}] + }); + let snaps = vitals_snapshots_from_sensing_json(&v, "ruview"); + assert_eq!(snaps.len(), 1); + assert_eq!(snaps[0].node_id, "ruview"); + assert!(snaps[0].presence); + assert_eq!(snaps[0].motion, 0.0, "idle => no motion"); + assert_eq!(snaps[0].n_persons, 1); + } + + /// `motion_level: "absent"` must map to zero motion (the old aggregate + /// match fell through to `Some(_) => 1.0`, treating absent as full motion). + #[test] + fn absent_motion_level_is_zero_motion() { + let v = json!({ + "timestamp": 0.0, + "classification": { "presence": false, "motion_level": "absent", "confidence": 0.0 }, + "vital_signs": {} + }); + let snaps = vitals_snapshots_from_sensing_json(&v, "x"); + assert_eq!(snaps[0].motion, 0.0); + assert!(!snaps[0].presence); + } +}