fix(mqtt): one HA device per node — closes #898

After the #872 MQTT wiring, the JSON->VitalsSnapshot bridge hard-coded a
single node_id (the MQTT client id) and the publisher used one
OwnedDiscoveryBuilder, so every physical node collapsed into a single
Home-Assistant device (identifiers:["wifi_densepose_wifi-densepose-1"]),
contradicting the one-device-per-node docs.

- Bridge (main.rs): emit one VitalsSnapshot per node in the sensing
  update's nodes[] (each carries its own node_id + RSSI; shared aggregate
  presence/vitals), falling back to a single aggregate snapshot when
  there is no per-node data (wifi/simulate sources).
- Publisher (publisher.rs): add OwnedDiscoveryBuilder::for_node(), and
  publish discovery + availability lazily on first sight of each node_id,
  routing state to per-node topics. Heartbeat/refresh/offline-LWT iterate
  all known nodes. Result: N distinct HA devices, one per node.

3 new unit tests (distinct nodes -> distinct wifi_densepose_<node>
identifiers); full MQTT suite 71 passed, example builds.
This commit is contained in:
ruv 2026-06-02 09:43:28 +02:00
parent 9c9b137a54
commit 9ddcf0c9fc
3 changed files with 141 additions and 28 deletions

View File

@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Fixed ### Fixed
- **MQTT multi-node deployments now create one Home-Assistant device per node — closes #898.** After the #872 MQTT wiring landed, the JSON→`VitalsSnapshot` bridge hard-coded a single `node_id` (the MQTT client id) and the publisher used a single `OwnedDiscoveryBuilder`, so every physical node collapsed into one device (`identifiers:["wifi_densepose_wifi-densepose-1"]`), contradicting the "one device per node" docs. The bridge now emits one snapshot per node in the sensing update's `nodes[]` (each with its own `node_id` + RSSI, falling back to a single aggregate snapshot for wifi/simulate sources), and the publisher derives a per-node builder (`OwnedDiscoveryBuilder::for_node`) that publishes discovery + availability lazily on first sight of each `node_id` and routes state to per-node topics — yielding N distinct HA devices with per-node availability/LWT. Unit-tested (distinct nodes → distinct `wifi_densepose_<node>` identifiers); 71 MQTT tests pass.
- **Person count no longer pinned to 1 — addresses #803.** The aggregate occupancy reported by the sensing server was derived from `smoothed_person_score`, an EMA-smoothed *activity* score (amplitude variance / motion / spectral energy). That score saturates near a single occupant — one moving person maxes it out — so it cannot discriminate occupancy *count* and stayed clamped at 1 across S3/C6 and the Python/Docker/Rust servers. Meanwhile the count-aware per-node estimates the ESP32 paths already compute (firmware `n_persons`, and the DynamicMinCut `corr_persons`) were stashed in `NodeState::prev_person_count` and then **discarded** by the aggregator (same dead-wiring class as #872). The aggregator now takes `max(activity_count, node_max)` via a unit-tested `aggregate_person_count` helper, so a node positively estimating 23 occupants is surfaced instead of overwritten. The fix can only ever *raise* the count when a node reports more people, so the single-occupant case is provably never inflated (regression-guarded by test). **Second half:** the pure-CSI per-node path itself clamped its own estimate — the DynamicMinCut occupancy (`estimate_persons_from_correlation`, 03) was mapped to a score via `corr_persons / 3.0`, putting 2 people at 0.667, *just under* the 0.70 up-threshold of `score_to_person_count`, so the per-node count never climbed past 1 (so `node_max` was also stuck at 1 for CSI-only nodes). Replaced it with a threshold-aligned `corr_persons_to_score` mapping (1→0.40, 2→0.74, 3→0.96) whose steady state round-trips back to the same count through the EMA + hysteresis, while still gating transient noise. A convergence test replays the exact EMA loop to prove min-cut=2 now reports 2 (and documents that the old `/3.0` mapping reported 1). Full multi-person accuracy still depends on the underlying estimator quality; this removes the two server-side clamps that masked it. 586 sensing-server tests pass. - **Person count no longer pinned to 1 — addresses #803.** The aggregate occupancy reported by the sensing server was derived from `smoothed_person_score`, an EMA-smoothed *activity* score (amplitude variance / motion / spectral energy). That score saturates near a single occupant — one moving person maxes it out — so it cannot discriminate occupancy *count* and stayed clamped at 1 across S3/C6 and the Python/Docker/Rust servers. Meanwhile the count-aware per-node estimates the ESP32 paths already compute (firmware `n_persons`, and the DynamicMinCut `corr_persons`) were stashed in `NodeState::prev_person_count` and then **discarded** by the aggregator (same dead-wiring class as #872). The aggregator now takes `max(activity_count, node_max)` via a unit-tested `aggregate_person_count` helper, so a node positively estimating 23 occupants is surfaced instead of overwritten. The fix can only ever *raise* the count when a node reports more people, so the single-occupant case is provably never inflated (regression-guarded by test). **Second half:** the pure-CSI per-node path itself clamped its own estimate — the DynamicMinCut occupancy (`estimate_persons_from_correlation`, 03) was mapped to a score via `corr_persons / 3.0`, putting 2 people at 0.667, *just under* the 0.70 up-threshold of `score_to_person_count`, so the per-node count never climbed past 1 (so `node_max` was also stuck at 1 for CSI-only nodes). Replaced it with a threshold-aligned `corr_persons_to_score` mapping (1→0.40, 2→0.74, 3→0.96) whose steady state round-trips back to the same count through the EMA + hysteresis, while still gating transient noise. A convergence test replays the exact EMA loop to prove min-cut=2 now reports 2 (and documents that the old `/3.0` mapping reported 1). Full multi-person accuracy still depends on the underlying estimator quality; this removes the two server-side clamps that masked it. 586 sensing-server tests pass.
- **MQTT publisher now actually runs (`--mqtt`) — closes #872.** The `--mqtt*` flags were defined only in `cli::Args` (dead code, referenced nowhere) while the binary parses a *separate* `main::Args` with no mqtt fields, and `main.rs` never started the `mqtt::` publisher — so MQTT/Home-Assistant integration was completely unwired (`--mqtt` errored as an unexpected argument, and even with the Docker image's `--features mqtt` build the publisher never ran). Earlier attempts chased a Docker *rebuild*; the real cause was disconnected *code*. Extracted the flags into a shared `cli::MqttArgs` (`#[command(flatten)]` into both structs), spawn the publisher on `--mqtt`, and bridge the JSON sensing broadcast into the typed `VitalsSnapshot` stream with a defensive `serde_json::Value` mapping. Verified end-to-end against `mosquitto`: 20 HA auto-discovery entities + live state (presence/person-count/…). 577 (default) / 580 (`--features mqtt`) tests pass. - **MQTT publisher now actually runs (`--mqtt`) — closes #872.** The `--mqtt*` flags were defined only in `cli::Args` (dead code, referenced nowhere) while the binary parses a *separate* `main::Args` with no mqtt fields, and `main.rs` never started the `mqtt::` publisher — so MQTT/Home-Assistant integration was completely unwired (`--mqtt` errored as an unexpected argument, and even with the Docker image's `--features mqtt` build the publisher never ran). Earlier attempts chased a Docker *rebuild*; the real cause was disconnected *code*. Extracted the flags into a shared `cli::MqttArgs` (`#[command(flatten)]` into both structs), spawn the publisher on `--mqtt`, and bridge the JSON sensing broadcast into the typed `VitalsSnapshot` stream with a defensive `serde_json::Value` mapping. Verified end-to-end against `mosquitto`: 20 HA auto-discovery entities + live state (presence/person-count/…). 577 (default) / 580 (`--features mqtt`) tests pass.

View File

@ -6213,24 +6213,44 @@ async fn main() {
Some(_) => 1.0, Some(_) => 1.0,
None => 0.0, None => 0.0,
}; };
let snap = mqtt::state::VitalsSnapshot { let ts = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64;
node_id: node_id.clone(), let conf = cls["confidence"].as_f64().unwrap_or(0.0);
timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64, let presence_score = if presence { conf.max(0.0) } else { 0.0 };
let breathing = vit["breathing_rate_bpm"].as_f64();
let hr = vit["heart_rate_bpm"].as_f64();
// #898: emit one snapshot per physical node so each
// surfaces as its own Home-Assistant device (with
// its own RSSI + availability). Falls back to a
// single aggregate snapshot when there is no
// per-node data (e.g. wifi / simulate sources).
let mk = |nid: String, rssi: Option<f64>| mqtt::state::VitalsSnapshot {
node_id: nid,
timestamp_ms: ts,
presence, presence,
motion, motion,
presence_score: if presence { presence_score,
cls["confidence"].as_f64().unwrap_or(1.0) breathing_rate_bpm: breathing,
} else { heartrate_bpm: hr,
0.0
},
breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(),
heartrate_bpm: vit["heart_rate_bpm"].as_f64(),
n_persons, n_persons,
rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(), rssi_dbm: rssi,
vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0), vital_confidence: conf,
..Default::default() ..Default::default()
}; };
let _ = vtx.send(snap); match v["nodes"].as_array() {
Some(arr) if !arr.is_empty() => {
for node in arr {
let n = node["node_id"].as_u64().unwrap_or(0);
let nid = format!("{node_id}-node{n}");
let _ = vtx.send(mk(nid, node["rssi_dbm"].as_f64()));
}
}
_ => {
let _ = vtx.send(mk(
node_id.clone(),
v["nodes"][0]["rssi_dbm"].as_f64(),
));
}
}
} }
}); });
tracing::info!("MQTT publisher started -> {host}:{port}"); tracing::info!("MQTT publisher started -> {host}:{port}");

View File

@ -117,6 +117,23 @@ impl OwnedDiscoveryBuilder {
via_device: self.via_device.as_deref(), via_device: self.via_device.as_deref(),
} }
} }
/// Derive a per-node builder from this base (issue #898). Each physical
/// RuView node must surface as its own Home-Assistant device — the base
/// builder's `node_id` (the MQTT client id) is replaced with the actual
/// node id, giving a distinct `wifi_densepose_<node>` device identifier
/// and a per-node friendly name, instead of collapsing every node into a
/// single hard-coded device.
pub fn for_node(&self, node_id: &str) -> OwnedDiscoveryBuilder {
OwnedDiscoveryBuilder {
discovery_prefix: self.discovery_prefix.clone(),
node_id: node_id.to_string(),
node_friendly_name: Some(format!("RuView node {node_id}")),
sw_version: self.sw_version.clone(),
model: self.model.clone(),
via_device: self.via_device.clone(),
}
}
} }
/// Core run loop. Pumps the broadcast channel + the MQTT event loop in /// Core run loop. Pumps the broadcast channel + the MQTT event loop in
@ -129,20 +146,19 @@ async fn run(
let opts = build_mqtt_options(&cfg); let opts = build_mqtt_options(&cfg);
let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256); let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256);
let builder_borrowed = builder_owned.as_borrowed();
let entities = DiscoveryBuilder::enabled_entities( let entities = DiscoveryBuilder::enabled_entities(
cfg.privacy_mode, cfg.privacy_mode,
cfg.publish_pose, cfg.publish_pose,
&[], // no_semantic — wire from cli::Args in P3.5 &[], // no_semantic — wire from cli::Args in P3.5
); );
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await { // #898: one Home-Assistant device per node. Discovery + availability are
warn!("[mqtt] initial discovery publish failed: {e}"); // published lazily the first time a snapshot for a given node_id arrives;
} // each node's builder + availability are retained here for heartbeats and
let avail = NodeAvailability::for_builder(&builder_borrowed, &entities); // the offline LWT. (Previously a single hard-coded builder collapsed every
if let Err(e) = publish_availability(&client, &avail, "online").await { // node into one device.)
warn!("[mqtt] initial availability publish failed: {e}"); let mut nodes: std::collections::HashMap<String, (OwnedDiscoveryBuilder, NodeAvailability)> =
} std::collections::HashMap::new();
let mut rate_limiter = RateLimiter::new(); let mut rate_limiter = RateLimiter::new();
let mut last_heartbeat = Instant::now(); let mut last_heartbeat = Instant::now();
@ -179,14 +195,20 @@ async fn run(
// Periodic heartbeat / discovery refresh. // Periodic heartbeat / discovery refresh.
_ = tokio::time::sleep(Duration::from_secs(1)) => { _ = tokio::time::sleep(Duration::from_secs(1)) => {
if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT { if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT {
if let Err(e) = publish_availability(&client, &avail, "online").await { for (_, na) in nodes.values() {
warn!("[mqtt] heartbeat publish failed: {e}"); if let Err(e) = publish_availability(&client, na, "online").await {
warn!("[mqtt] heartbeat publish failed: {e}");
}
} }
last_heartbeat = Instant::now(); last_heartbeat = Instant::now();
} }
if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) { if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) {
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await { for (nb, _) in nodes.values() {
warn!("[mqtt] discovery refresh failed: {e}"); if let Err(e) =
publish_all_discovery(&client, &nb.as_borrowed(), &entities).await
{
warn!("[mqtt] discovery refresh failed: {e}");
}
} }
last_refresh = Instant::now(); last_refresh = Instant::now();
} }
@ -197,18 +219,39 @@ async fn run(
match recv { match recv {
Ok(snap) => { Ok(snap) => {
let elapsed = start_instant.elapsed(); let elapsed = start_instant.elapsed();
publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await; // #898: on first sight of a node_id, publish that
// node's discovery + availability; then route its
// state to per-node topics.
if !nodes.contains_key(&snap.node_id) {
let nb = builder_owned.for_node(&snap.node_id);
let borrowed = nb.as_borrowed();
if let Err(e) =
publish_all_discovery(&client, &borrowed, &entities).await
{
warn!("[mqtt] node {} discovery failed: {e}", snap.node_id);
}
let na = NodeAvailability::for_builder(&borrowed, &entities);
if let Err(e) = publish_availability(&client, &na, "online").await {
warn!("[mqtt] node {} availability failed: {e}", snap.node_id);
}
nodes.insert(snap.node_id.clone(), (nb, na));
}
let borrowed = nodes[&snap.node_id].0.as_borrowed();
publish_snapshot(&client, &borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await;
} }
Err(broadcast::error::RecvError::Lagged(n)) => { Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("[mqtt] lagged behind broadcast by {n} messages — dropped"); warn!("[mqtt] lagged behind broadcast by {n} messages — dropped");
} }
Err(broadcast::error::RecvError::Closed) => { Err(broadcast::error::RecvError::Closed) => {
info!("[mqtt] broadcast channel closed, draining"); info!("[mqtt] broadcast channel closed, draining");
// Publish offline before exit. // Publish offline for every known node before exit.
let _ = publish_availability(&client, &avail, "offline").await; for (_, na) in nodes.values() {
let _ = publish_availability(&client, na, "offline").await;
}
let _ = client.disconnect().await; let _ = client.disconnect().await;
return; return;
} }
} }
} }
} }
@ -296,3 +339,52 @@ async fn publish_state(client: &AsyncClient, m: &StateMessage) -> Result<(), Cli
}; };
client.publish(&m.topic, qos, m.retain, m.payload.clone()).await client.publish(&m.topic, qos, m.retain, m.payload.clone()).await
} }
#[cfg(test)]
mod per_node_device_tests {
//! Issue #898 — each physical node must surface as its own Home-Assistant
//! device, not collapse into one hard-coded device.
use super::*;
fn base() -> OwnedDiscoveryBuilder {
OwnedDiscoveryBuilder {
discovery_prefix: "homeassistant".into(),
node_id: "wifi-densepose-1".into(),
node_friendly_name: Some("RuView".into()),
sw_version: "0.0.0".into(),
model: "test".into(),
via_device: None,
}
}
fn device_identifiers(b: &OwnedDiscoveryBuilder) -> Vec<String> {
b.as_borrowed().build(EntityKind::Presence).device.identifiers
}
#[test]
fn for_node_overrides_node_id_and_friendly_name() {
let n = base().for_node("node-A");
assert_eq!(n.node_id, "node-A");
assert_eq!(n.node_friendly_name.as_deref(), Some("RuView node node-A"));
}
#[test]
fn distinct_nodes_yield_distinct_ha_device_identifiers() {
let b = base();
let a = device_identifiers(&b.for_node("node-A"));
let c = device_identifiers(&b.for_node("node-B"));
assert_eq!(a, vec!["wifi_densepose_node-A".to_string()]);
assert_eq!(c, vec!["wifi_densepose_node-B".to_string()]);
assert_ne!(a, c, "#898: two nodes must not collapse into one device");
}
#[test]
fn single_node_keeps_a_stable_identity() {
// Two snapshots from the same node map to the same device.
let b = base();
assert_eq!(
device_identifiers(&b.for_node("node-7")),
device_identifiers(&b.for_node("node-7"))
);
}
}