diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index 7fd8f4ec..c3337cc5 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -6182,11 +6182,12 @@ async fn main() { let mcfg = std::sync::Arc::new(mqtt::config::MqttConfig::from_args(&args.mqtt_opts)); match mcfg.validate() { Ok(()) => { - let node_id = mcfg.client_id.clone(); + // Template builder only: the real node_id is set per-node by the + // publisher from each snapshot's node_id. let builder = mqtt::publisher::OwnedDiscoveryBuilder { discovery_prefix: mcfg.discovery_prefix.clone(), - node_id: node_id.clone(), - node_friendly_name: Some("RuView".to_string()), + node_id: "template".to_string(), + node_friendly_name: None, sw_version: env!("CARGO_PKG_VERSION").to_string(), model: "RuView WiFi Sensing".to_string(), via_device: None, @@ -6200,37 +6201,37 @@ async fn main() { let Ok(v) = serde_json::from_str::(&json) else { continue; }; - let cls = &v["classification"]; - let vit = &v["vital_signs"]; - let presence = cls["presence"].as_bool().unwrap_or(false); - let n_persons = v["persons"] - .as_array() - .map(|a| a.len() as u32) - .or_else(|| v["estimated_persons"].as_u64().map(|x| x as u32)) - .unwrap_or(0); - let motion = match cls["motion_level"].as_str() { - Some("none") | Some("still") | Some("idle") | Some("") => 0.0, - Some(_) => 1.0, - None => 0.0, + // Per-node fan-out: the sensing broadcast carries a + // `nodes` array (same shape as REST /api/v1/nodes). + // Emit one VitalsSnapshot per node, preserving ids. + let Some(arr) = v["nodes"].as_array() else { + continue; }; - let snap = mqtt::state::VitalsSnapshot { - node_id: node_id.clone(), - timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64, - presence, - motion, - presence_score: if presence { - cls["confidence"].as_f64().unwrap_or(1.0) - } else { - 0.0 - }, - breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(), - heartrate_bpm: vit["heart_rate_bpm"].as_f64(), - n_persons, - rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(), - vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0), - ..Default::default() - }; - let _ = vtx.send(snap); + let ts_ms = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64; + for node in arr { + let id = match node["node_id"].as_u64() { + Some(n) => n.to_string(), + None => continue, + }; + let motion = match node["motion_level"].as_str() { + Some("absent") | Some("none") | Some("still") + | Some("idle") | Some("") | None => 0.0, + Some(_) => 1.0, + }; + let presence = motion > 0.0 + && node["status"].as_str() == Some("active"); + let snap = mqtt::state::VitalsSnapshot { + node_id: id, + timestamp_ms: ts_ms, + presence, + motion, + presence_score: if presence { 1.0 } else { 0.0 }, + n_persons: node["person_count"].as_u64().unwrap_or(0) as u32, + rssi_dbm: node["rssi_dbm"].as_f64(), + ..Default::default() + }; + let _ = vtx.send(snap); + } } }); tracing::info!("MQTT publisher started -> {host}:{port}"); diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs index 20b5fd16..f99da615 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs @@ -19,6 +19,13 @@ //! through the privacy filter, gate by [`RateLimiter`], encode via //! [`StateEncoder`], publish. //! +//! ## Per-node discovery +//! +//! Each inbound [`VitalsSnapshot`] carries its own `node_id`. The publisher +//! keeps a map of per-node discovery identities and registers a new HA +//! device the first time a node id is seen on the stream, so N physical +//! nodes surface as N Home-Assistant devices (one per room). +//! //! ## Reconnect strategy //! //! `rumqttc::EventLoop` reconnects automatically with backoff. After a @@ -27,6 +34,7 @@ //! we last refreshed needs them) and reset the rate limiter so the //! first post-reconnect sample emits promptly. +use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -80,6 +88,26 @@ impl NodeAvailability { } } +/// Per-node runtime state owned by the publisher loop: the node's discovery +/// identity, its pre-computed availability topics, and an independent rate +/// limiter so one chatty room can't starve another's publish budget. +struct NodeEntry { + builder: OwnedDiscoveryBuilder, + avail: NodeAvailability, + rate_limiter: RateLimiter, +} + +impl NodeEntry { + fn new(builder: OwnedDiscoveryBuilder, entities: &[EntityKind]) -> Self { + let avail = NodeAvailability::for_builder(&builder.as_borrowed(), entities); + Self { + builder, + avail, + rate_limiter: RateLimiter::new(), + } + } +} + /// Spawn the MQTT publisher background task. Returns the join handle so /// the caller can `await` it on shutdown. Errors during connection are /// retried internally by `rumqttc::EventLoop`. @@ -117,6 +145,21 @@ impl OwnedDiscoveryBuilder { via_device: self.via_device.as_deref(), } } + + /// Clone this template for a concrete node id. The HA device identity and + /// every topic derive from `node_id`, so changing it yields a distinct + /// device. If `friendly` is `Some`, it overrides the device name; + /// otherwise `DiscoveryBuilder::device` falls back to "RuView node ". + fn for_node(&self, node_id: &str, friendly: Option<&str>) -> OwnedDiscoveryBuilder { + let mut b = self.clone(); + b.node_id = node_id.to_string(); + if let Some(name) = friendly { + b.node_friendly_name = Some(name.to_string()); + } else { + b.node_friendly_name = None; + } + b + } } /// Core run loop. Pumps the broadcast channel + the MQTT event loop in @@ -129,22 +172,16 @@ async fn run( let opts = build_mqtt_options(&cfg); let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256); - let builder_borrowed = builder_owned.as_borrowed(); let entities = DiscoveryBuilder::enabled_entities( cfg.privacy_mode, cfg.publish_pose, &[], // no_semantic — wire from cli::Args in P3.5 ); - if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await { - warn!("[mqtt] initial discovery publish failed: {e}"); - } - let avail = NodeAvailability::for_builder(&builder_borrowed, &entities); - if let Err(e) = publish_availability(&client, &avail, "online").await { - warn!("[mqtt] initial availability publish failed: {e}"); - } - - let mut rate_limiter = RateLimiter::new(); + // One discovery identity + availability + rate limiter PER node id seen on + // the stream. Nodes are discovered lazily: the first snapshot from a new + // node id triggers its retained discovery + availability publish. + let mut nodes: HashMap = HashMap::new(); let mut last_heartbeat = Instant::now(); let mut last_refresh = Instant::now(); let start_instant = Instant::now(); @@ -155,7 +192,7 @@ async fn run( prefix = %cfg.discovery_prefix, entities = entities.len(), privacy = cfg.privacy_mode, - "[mqtt] publisher started", + "[mqtt] publisher started (per-node discovery)", ); loop { @@ -169,7 +206,14 @@ async fn run( Ok(_) => {} Err(e) => { error!("[mqtt] event loop error, will reconnect: {e}"); - rate_limiter.reset(); + for n in nodes.values_mut() { + n.rate_limiter.reset(); + } + // Force re-publish of discovery for all known nodes on + // the next refresh tick (HA may have restarted). + last_refresh = Instant::now() + .checked_sub(Duration::from_secs(cfg.refresh_secs)) + .unwrap_or_else(Instant::now); // Brief backoff before next poll attempt. tokio::time::sleep(Duration::from_millis(500)).await; } @@ -179,14 +223,20 @@ async fn run( // Periodic heartbeat / discovery refresh. _ = tokio::time::sleep(Duration::from_secs(1)) => { if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT { - if let Err(e) = publish_availability(&client, &avail, "online").await { - warn!("[mqtt] heartbeat publish failed: {e}"); + for n in nodes.values() { + if let Err(e) = publish_availability(&client, &n.avail, "online").await { + warn!("[mqtt] heartbeat publish failed: {e}"); + } } last_heartbeat = Instant::now(); } if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) { - if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await { - warn!("[mqtt] discovery refresh failed: {e}"); + for n in nodes.values() { + if let Err(e) = + publish_all_discovery(&client, &n.builder.as_borrowed(), &entities).await + { + warn!("[mqtt] discovery refresh failed: {e}"); + } } last_refresh = Instant::now(); } @@ -197,15 +247,55 @@ async fn run( match recv { Ok(snap) => { let elapsed = start_instant.elapsed(); - publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await; + + // Lazily register a brand-new node: publish its retained + // discovery + availability exactly once, when first seen. + if !nodes.contains_key(&snap.node_id) { + let nb = builder_owned.for_node(&snap.node_id, None); + let borrowed = nb.as_borrowed(); + if let Err(e) = + publish_all_discovery(&client, &borrowed, &entities).await + { + warn!( + "[mqtt] discovery publish failed for node {}: {e}", + snap.node_id + ); + } + let entry = NodeEntry::new(nb, &entities); + if let Err(e) = + publish_availability(&client, &entry.avail, "online").await + { + warn!( + "[mqtt] availability publish failed for node {}: {e}", + snap.node_id + ); + } + info!("[mqtt] registered HA device for node {}", snap.node_id); + nodes.insert(snap.node_id.clone(), entry); + } + + if let Some(entry) = nodes.get_mut(&snap.node_id) { + let b = entry.builder.as_borrowed(); + publish_snapshot( + &client, + &b, + &snap, + &cfg, + &mut entry.rate_limiter, + elapsed, + ) + .await; + } } Err(broadcast::error::RecvError::Lagged(n)) => { warn!("[mqtt] lagged behind broadcast by {n} messages — dropped"); } Err(broadcast::error::RecvError::Closed) => { info!("[mqtt] broadcast channel closed, draining"); - // Publish offline before exit. - let _ = publish_availability(&client, &avail, "offline").await; + // Publish offline for every known node before exit. + for n in nodes.values() { + let _ = publish_availability(&client, &n.avail, "offline").await; + } let _ = client.disconnect().await; return; }