diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index c3337cc5..b29d376e 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -6209,9 +6209,12 @@ async fn main() { }; let ts_ms = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64; for node in arr { - let id = match node["node_id"].as_u64() { - Some(n) => n.to_string(), - None => continue, + // node_id may arrive as a JSON number (REST shape) + // or as a string; accept either, skip anything else. + let id = match &node["node_id"] { + serde_json::Value::Number(n) => n.to_string(), + serde_json::Value::String(s) if !s.is_empty() => s.clone(), + _ => continue, }; let motion = match node["motion_level"].as_str() { Some("absent") | Some("none") | Some("still") @@ -6226,8 +6229,15 @@ async fn main() { presence, motion, presence_score: if presence { 1.0 } else { 0.0 }, - n_persons: node["person_count"].as_u64().unwrap_or(0) as u32, + // person_count is u64 in the REST shape; clamp into u32. + n_persons: node["person_count"] + .as_u64() + .unwrap_or(0) + .min(u32::MAX as u64) as u32, rssi_dbm: node["rssi_dbm"].as_f64(), + // Remaining fields (motion_energy, breathing/heart rate, + // vital_confidence) are not part of the per-node summary + // emitted on this broadcast; Default (None/0.0) is correct. ..Default::default() }; let _ = vtx.send(snap); diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs index f99da615..319abe3a 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs @@ -181,6 +181,12 @@ async fn run( // One discovery identity + availability + rate limiter PER node id seen on // the stream. Nodes are discovered lazily: the first snapshot from a new // node id triggers its retained discovery + availability publish. + // + // NOTE: entries are never evicted — a node that goes silent keeps its HA + // device marked `online` (the broadcast carries no per-node "gone" signal). + // Bounded in practice by the deployment's physical node count. A future + // improvement could track last-seen per node and flip availability to + // `offline` after a timeout; intentionally out of scope for this fix. let mut nodes: HashMap = HashMap::new(); let mut last_heartbeat = Instant::now(); let mut last_refresh = Instant::now(); @@ -248,20 +254,22 @@ async fn run( Ok(snap) => { let elapsed = start_instant.elapsed(); - // Lazily register a brand-new node: publish its retained - // discovery + availability exactly once, when first seen. + // Lazily register a brand-new node the first time its id is + // seen: publish retained discovery + availability once, then + // insert. `contains_key` guards a single insert; the publish + // borrows `client`/`entities` (not `nodes`), so there is no + // borrow conflict with the `get_mut` below. if !nodes.contains_key(&snap.node_id) { let nb = builder_owned.for_node(&snap.node_id, None); - let borrowed = nb.as_borrowed(); + let entry = NodeEntry::new(nb, &entities); if let Err(e) = - publish_all_discovery(&client, &borrowed, &entities).await + publish_all_discovery(&client, &entry.builder.as_borrowed(), &entities).await { warn!( "[mqtt] discovery publish failed for node {}: {e}", snap.node_id ); } - let entry = NodeEntry::new(nb, &entities); if let Err(e) = publish_availability(&client, &entry.avail, "online").await { @@ -274,6 +282,8 @@ async fn run( nodes.insert(snap.node_id.clone(), entry); } + // Route the snapshot to its node's builder + rate limiter. + // Always present here (just inserted above if it was new). if let Some(entry) = nodes.get_mut(&snap.node_id) { let b = entry.builder.as_borrowed(); publish_snapshot(