From 9ddcf0c9fccde0ac356964bace76f1e6c2c2c1ae Mon Sep 17 00:00:00 2001 From: ruv Date: Tue, 2 Jun 2026 09:43:28 +0200 Subject: [PATCH 1/2] =?UTF-8?q?fix(mqtt):=20one=20HA=20device=20per=20node?= =?UTF-8?q?=20=E2=80=94=20closes=20#898?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After the #872 MQTT wiring, the JSON->VitalsSnapshot bridge hard-coded a single node_id (the MQTT client id) and the publisher used one OwnedDiscoveryBuilder, so every physical node collapsed into a single Home-Assistant device (identifiers:["wifi_densepose_wifi-densepose-1"]), contradicting the one-device-per-node docs. - Bridge (main.rs): emit one VitalsSnapshot per node in the sensing update's nodes[] (each carries its own node_id + RSSI; shared aggregate presence/vitals), falling back to a single aggregate snapshot when there is no per-node data (wifi/simulate sources). - Publisher (publisher.rs): add OwnedDiscoveryBuilder::for_node(), and publish discovery + availability lazily on first sight of each node_id, routing state to per-node topics. Heartbeat/refresh/offline-LWT iterate all known nodes. Result: N distinct HA devices, one per node. 3 new unit tests (distinct nodes -> distinct wifi_densepose_ identifiers); full MQTT suite 71 passed, example builds. --- CHANGELOG.md | 1 + .../wifi-densepose-sensing-server/src/main.rs | 46 +++++-- .../src/mqtt/publisher.rs | 122 +++++++++++++++--- 3 files changed, 141 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 269cd681..5f1d9063 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Fixed +- **MQTT multi-node deployments now create one Home-Assistant device per node — closes #898.** After the #872 MQTT wiring landed, the JSON→`VitalsSnapshot` bridge hard-coded a single `node_id` (the MQTT client id) and the publisher used a single `OwnedDiscoveryBuilder`, so every physical node collapsed into one device (`identifiers:["wifi_densepose_wifi-densepose-1"]`), contradicting the "one device per node" docs. The bridge now emits one snapshot per node in the sensing update's `nodes[]` (each with its own `node_id` + RSSI, falling back to a single aggregate snapshot for wifi/simulate sources), and the publisher derives a per-node builder (`OwnedDiscoveryBuilder::for_node`) that publishes discovery + availability lazily on first sight of each `node_id` and routes state to per-node topics — yielding N distinct HA devices with per-node availability/LWT. Unit-tested (distinct nodes → distinct `wifi_densepose_` identifiers); 71 MQTT tests pass. - **Person count no longer pinned to 1 — addresses #803.** The aggregate occupancy reported by the sensing server was derived from `smoothed_person_score`, an EMA-smoothed *activity* score (amplitude variance / motion / spectral energy). That score saturates near a single occupant — one moving person maxes it out — so it cannot discriminate occupancy *count* and stayed clamped at 1 across S3/C6 and the Python/Docker/Rust servers. Meanwhile the count-aware per-node estimates the ESP32 paths already compute (firmware `n_persons`, and the DynamicMinCut `corr_persons`) were stashed in `NodeState::prev_person_count` and then **discarded** by the aggregator (same dead-wiring class as #872). The aggregator now takes `max(activity_count, node_max)` via a unit-tested `aggregate_person_count` helper, so a node positively estimating 2–3 occupants is surfaced instead of overwritten. The fix can only ever *raise* the count when a node reports more people, so the single-occupant case is provably never inflated (regression-guarded by test). **Second half:** the pure-CSI per-node path itself clamped its own estimate — the DynamicMinCut occupancy (`estimate_persons_from_correlation`, 0–3) was mapped to a score via `corr_persons / 3.0`, putting 2 people at 0.667, *just under* the 0.70 up-threshold of `score_to_person_count`, so the per-node count never climbed past 1 (so `node_max` was also stuck at 1 for CSI-only nodes). Replaced it with a threshold-aligned `corr_persons_to_score` mapping (1→0.40, 2→0.74, 3→0.96) whose steady state round-trips back to the same count through the EMA + hysteresis, while still gating transient noise. A convergence test replays the exact EMA loop to prove min-cut=2 now reports 2 (and documents that the old `/3.0` mapping reported 1). Full multi-person accuracy still depends on the underlying estimator quality; this removes the two server-side clamps that masked it. 586 sensing-server tests pass. - **MQTT publisher now actually runs (`--mqtt`) — closes #872.** The `--mqtt*` flags were defined only in `cli::Args` (dead code, referenced nowhere) while the binary parses a *separate* `main::Args` with no mqtt fields, and `main.rs` never started the `mqtt::` publisher — so MQTT/Home-Assistant integration was completely unwired (`--mqtt` errored as an unexpected argument, and even with the Docker image's `--features mqtt` build the publisher never ran). Earlier attempts chased a Docker *rebuild*; the real cause was disconnected *code*. Extracted the flags into a shared `cli::MqttArgs` (`#[command(flatten)]` into both structs), spawn the publisher on `--mqtt`, and bridge the JSON sensing broadcast into the typed `VitalsSnapshot` stream with a defensive `serde_json::Value` mapping. Verified end-to-end against `mosquitto`: 20 HA auto-discovery entities + live state (presence/person-count/…). 577 (default) / 580 (`--features mqtt`) tests pass. diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index 7fd8f4ec..c986824f 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -6213,24 +6213,44 @@ async fn main() { Some(_) => 1.0, None => 0.0, }; - let snap = mqtt::state::VitalsSnapshot { - node_id: node_id.clone(), - timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64, + let ts = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64; + let conf = cls["confidence"].as_f64().unwrap_or(0.0); + let presence_score = if presence { conf.max(0.0) } else { 0.0 }; + let breathing = vit["breathing_rate_bpm"].as_f64(); + let hr = vit["heart_rate_bpm"].as_f64(); + // #898: emit one snapshot per physical node so each + // surfaces as its own Home-Assistant device (with + // its own RSSI + availability). Falls back to a + // single aggregate snapshot when there is no + // per-node data (e.g. wifi / simulate sources). + let mk = |nid: String, rssi: Option| mqtt::state::VitalsSnapshot { + node_id: nid, + timestamp_ms: ts, presence, motion, - presence_score: if presence { - cls["confidence"].as_f64().unwrap_or(1.0) - } else { - 0.0 - }, - breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(), - heartrate_bpm: vit["heart_rate_bpm"].as_f64(), + presence_score, + breathing_rate_bpm: breathing, + heartrate_bpm: hr, n_persons, - rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(), - vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0), + rssi_dbm: rssi, + vital_confidence: conf, ..Default::default() }; - let _ = vtx.send(snap); + match v["nodes"].as_array() { + Some(arr) if !arr.is_empty() => { + for node in arr { + let n = node["node_id"].as_u64().unwrap_or(0); + let nid = format!("{node_id}-node{n}"); + let _ = vtx.send(mk(nid, node["rssi_dbm"].as_f64())); + } + } + _ => { + let _ = vtx.send(mk( + node_id.clone(), + v["nodes"][0]["rssi_dbm"].as_f64(), + )); + } + } } }); tracing::info!("MQTT publisher started -> {host}:{port}"); diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs index 20b5fd16..ad9177f3 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs @@ -117,6 +117,23 @@ impl OwnedDiscoveryBuilder { via_device: self.via_device.as_deref(), } } + + /// Derive a per-node builder from this base (issue #898). Each physical + /// RuView node must surface as its own Home-Assistant device — the base + /// builder's `node_id` (the MQTT client id) is replaced with the actual + /// node id, giving a distinct `wifi_densepose_` device identifier + /// and a per-node friendly name, instead of collapsing every node into a + /// single hard-coded device. + pub fn for_node(&self, node_id: &str) -> OwnedDiscoveryBuilder { + OwnedDiscoveryBuilder { + discovery_prefix: self.discovery_prefix.clone(), + node_id: node_id.to_string(), + node_friendly_name: Some(format!("RuView node {node_id}")), + sw_version: self.sw_version.clone(), + model: self.model.clone(), + via_device: self.via_device.clone(), + } + } } /// Core run loop. Pumps the broadcast channel + the MQTT event loop in @@ -129,20 +146,19 @@ async fn run( let opts = build_mqtt_options(&cfg); let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256); - let builder_borrowed = builder_owned.as_borrowed(); let entities = DiscoveryBuilder::enabled_entities( cfg.privacy_mode, cfg.publish_pose, &[], // no_semantic — wire from cli::Args in P3.5 ); - if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await { - warn!("[mqtt] initial discovery publish failed: {e}"); - } - let avail = NodeAvailability::for_builder(&builder_borrowed, &entities); - if let Err(e) = publish_availability(&client, &avail, "online").await { - warn!("[mqtt] initial availability publish failed: {e}"); - } + // #898: one Home-Assistant device per node. Discovery + availability are + // published lazily the first time a snapshot for a given node_id arrives; + // each node's builder + availability are retained here for heartbeats and + // the offline LWT. (Previously a single hard-coded builder collapsed every + // node into one device.) + let mut nodes: std::collections::HashMap = + std::collections::HashMap::new(); let mut rate_limiter = RateLimiter::new(); let mut last_heartbeat = Instant::now(); @@ -179,14 +195,20 @@ async fn run( // Periodic heartbeat / discovery refresh. _ = tokio::time::sleep(Duration::from_secs(1)) => { if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT { - if let Err(e) = publish_availability(&client, &avail, "online").await { - warn!("[mqtt] heartbeat publish failed: {e}"); + for (_, na) in nodes.values() { + if let Err(e) = publish_availability(&client, na, "online").await { + warn!("[mqtt] heartbeat publish failed: {e}"); + } } last_heartbeat = Instant::now(); } if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) { - if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await { - warn!("[mqtt] discovery refresh failed: {e}"); + for (nb, _) in nodes.values() { + if let Err(e) = + publish_all_discovery(&client, &nb.as_borrowed(), &entities).await + { + warn!("[mqtt] discovery refresh failed: {e}"); + } } last_refresh = Instant::now(); } @@ -197,18 +219,39 @@ async fn run( match recv { Ok(snap) => { let elapsed = start_instant.elapsed(); - publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await; + // #898: on first sight of a node_id, publish that + // node's discovery + availability; then route its + // state to per-node topics. + if !nodes.contains_key(&snap.node_id) { + let nb = builder_owned.for_node(&snap.node_id); + let borrowed = nb.as_borrowed(); + if let Err(e) = + publish_all_discovery(&client, &borrowed, &entities).await + { + warn!("[mqtt] node {} discovery failed: {e}", snap.node_id); + } + let na = NodeAvailability::for_builder(&borrowed, &entities); + if let Err(e) = publish_availability(&client, &na, "online").await { + warn!("[mqtt] node {} availability failed: {e}", snap.node_id); + } + nodes.insert(snap.node_id.clone(), (nb, na)); + } + let borrowed = nodes[&snap.node_id].0.as_borrowed(); + publish_snapshot(&client, &borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await; } Err(broadcast::error::RecvError::Lagged(n)) => { warn!("[mqtt] lagged behind broadcast by {n} messages — dropped"); } Err(broadcast::error::RecvError::Closed) => { info!("[mqtt] broadcast channel closed, draining"); - // Publish offline before exit. - let _ = publish_availability(&client, &avail, "offline").await; + // Publish offline for every known node before exit. + for (_, na) in nodes.values() { + let _ = publish_availability(&client, na, "offline").await; + } let _ = client.disconnect().await; return; } + } } } @@ -296,3 +339,52 @@ async fn publish_state(client: &AsyncClient, m: &StateMessage) -> Result<(), Cli }; client.publish(&m.topic, qos, m.retain, m.payload.clone()).await } + +#[cfg(test)] +mod per_node_device_tests { + //! Issue #898 — each physical node must surface as its own Home-Assistant + //! device, not collapse into one hard-coded device. + use super::*; + + fn base() -> OwnedDiscoveryBuilder { + OwnedDiscoveryBuilder { + discovery_prefix: "homeassistant".into(), + node_id: "wifi-densepose-1".into(), + node_friendly_name: Some("RuView".into()), + sw_version: "0.0.0".into(), + model: "test".into(), + via_device: None, + } + } + + fn device_identifiers(b: &OwnedDiscoveryBuilder) -> Vec { + b.as_borrowed().build(EntityKind::Presence).device.identifiers + } + + #[test] + fn for_node_overrides_node_id_and_friendly_name() { + let n = base().for_node("node-A"); + assert_eq!(n.node_id, "node-A"); + assert_eq!(n.node_friendly_name.as_deref(), Some("RuView node node-A")); + } + + #[test] + fn distinct_nodes_yield_distinct_ha_device_identifiers() { + let b = base(); + let a = device_identifiers(&b.for_node("node-A")); + let c = device_identifiers(&b.for_node("node-B")); + assert_eq!(a, vec!["wifi_densepose_node-A".to_string()]); + assert_eq!(c, vec!["wifi_densepose_node-B".to_string()]); + assert_ne!(a, c, "#898: two nodes must not collapse into one device"); + } + + #[test] + fn single_node_keeps_a_stable_identity() { + // Two snapshots from the same node map to the same device. + let b = base(); + assert_eq!( + device_identifiers(&b.for_node("node-7")), + device_identifiers(&b.for_node("node-7")) + ); + } +} From 27edf153dcb33c6e99abee3d461f33e8a7fcb252 Mon Sep 17 00:00:00 2001 From: ruv Date: Tue, 2 Jun 2026 10:29:17 +0200 Subject: [PATCH 2/2] =?UTF-8?q?test(mqtt):=20drive=20per-node=20snapshots?= =?UTF-8?q?=20in=20discovery=20integration=20tests=20=E2=80=94=20#898?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After the per-node discovery change, discovery configs are published the first time a snapshot for a node_id arrives (not eagerly at startup). The two discovery integration tests (discovery_topics_appear_on_broker, privacy_mode_suppresses_biometric_discovery) spawned the publisher with an empty broadcast channel and never sent a snapshot, so they collected [] and failed ("missing presence discovery topic in []"). Drive snapshots for the test node_id throughout the capture window (same pattern as state_messages_published_on_snapshot_broadcast) so the per-node device's discovery lands. Verified against a local mosquitto: 3 passed. --- .../tests/mqtt_integration.rs | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs index e625efad..f71bb3d6 100644 --- a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs +++ b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs @@ -171,12 +171,28 @@ async fn discovery_topics_appear_on_broker() { // Spawn the publisher. let cfg = make_cfg(port, false, "discovery"); let builder = make_builder("inttest1"); - let (_tx, rx) = broadcast::channel::(32); + let (tx, rx) = broadcast::channel::(32); let _handle = spawn(cfg, builder, rx); + // #898: discovery is now published per-node the first time a snapshot for + // that node_id arrives (not eagerly at startup). Drive snapshots for + // "inttest1" throughout the window so its device's discovery lands — same + // pattern as state_messages_published_on_snapshot_broadcast. + let tx_bg = tx.clone(); + let drive = tokio::spawn(async move { + for _ in 0..60 { + let _ = tx_bg.send(VitalsSnapshot { + node_id: "inttest1".into(), + ..Default::default() + }); + tokio::time::sleep(Duration::from_millis(200)).await; + } + }); + // Drain the subscriber for up to 6 s — enough for initial discovery // + first availability publication. let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await; + drive.abort(); let _ = sub.disconnect().await; // Assertions: at least the presence + heart_rate + fall discovery @@ -221,10 +237,23 @@ async fn privacy_mode_suppresses_biometric_discovery() { let cfg = make_cfg(port, /* privacy_mode = */ true, "privacy"); let builder = make_builder("inttest2"); - let (_tx, rx) = broadcast::channel::(32); + let (tx, rx) = broadcast::channel::(32); let _handle = spawn(cfg, builder, rx); + // #898: per-node discovery is triggered by a snapshot for that node_id. + let tx_bg = tx.clone(); + let drive = tokio::spawn(async move { + for _ in 0..60 { + let _ = tx_bg.send(VitalsSnapshot { + node_id: "inttest2".into(), + ..Default::default() + }); + tokio::time::sleep(Duration::from_millis(200)).await; + } + }); + let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await; + drive.abort(); let _ = sub.disconnect().await; let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect();