fix(mqtt): publish one HA device per node id (refs #872)
This commit is contained in:
parent
f850d46e9a
commit
4bd0b24f0b
|
|
@ -6182,11 +6182,12 @@ async fn main() {
|
|||
let mcfg = std::sync::Arc::new(mqtt::config::MqttConfig::from_args(&args.mqtt_opts));
|
||||
match mcfg.validate() {
|
||||
Ok(()) => {
|
||||
let node_id = mcfg.client_id.clone();
|
||||
// Template builder only: the real node_id is set per-node by the
|
||||
// publisher from each snapshot's node_id.
|
||||
let builder = mqtt::publisher::OwnedDiscoveryBuilder {
|
||||
discovery_prefix: mcfg.discovery_prefix.clone(),
|
||||
node_id: node_id.clone(),
|
||||
node_friendly_name: Some("RuView".to_string()),
|
||||
node_id: "template".to_string(),
|
||||
node_friendly_name: None,
|
||||
sw_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
model: "RuView WiFi Sensing".to_string(),
|
||||
via_device: None,
|
||||
|
|
@ -6200,37 +6201,37 @@ async fn main() {
|
|||
let Ok(v) = serde_json::from_str::<serde_json::Value>(&json) else {
|
||||
continue;
|
||||
};
|
||||
let cls = &v["classification"];
|
||||
let vit = &v["vital_signs"];
|
||||
let presence = cls["presence"].as_bool().unwrap_or(false);
|
||||
let n_persons = v["persons"]
|
||||
.as_array()
|
||||
.map(|a| a.len() as u32)
|
||||
.or_else(|| v["estimated_persons"].as_u64().map(|x| x as u32))
|
||||
.unwrap_or(0);
|
||||
let motion = match cls["motion_level"].as_str() {
|
||||
Some("none") | Some("still") | Some("idle") | Some("") => 0.0,
|
||||
Some(_) => 1.0,
|
||||
None => 0.0,
|
||||
// Per-node fan-out: the sensing broadcast carries a
|
||||
// `nodes` array (same shape as REST /api/v1/nodes).
|
||||
// Emit one VitalsSnapshot per node, preserving ids.
|
||||
let Some(arr) = v["nodes"].as_array() else {
|
||||
continue;
|
||||
};
|
||||
let snap = mqtt::state::VitalsSnapshot {
|
||||
node_id: node_id.clone(),
|
||||
timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64,
|
||||
presence,
|
||||
motion,
|
||||
presence_score: if presence {
|
||||
cls["confidence"].as_f64().unwrap_or(1.0)
|
||||
} else {
|
||||
0.0
|
||||
},
|
||||
breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(),
|
||||
heartrate_bpm: vit["heart_rate_bpm"].as_f64(),
|
||||
n_persons,
|
||||
rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(),
|
||||
vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0),
|
||||
..Default::default()
|
||||
};
|
||||
let _ = vtx.send(snap);
|
||||
let ts_ms = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64;
|
||||
for node in arr {
|
||||
let id = match node["node_id"].as_u64() {
|
||||
Some(n) => n.to_string(),
|
||||
None => continue,
|
||||
};
|
||||
let motion = match node["motion_level"].as_str() {
|
||||
Some("absent") | Some("none") | Some("still")
|
||||
| Some("idle") | Some("") | None => 0.0,
|
||||
Some(_) => 1.0,
|
||||
};
|
||||
let presence = motion > 0.0
|
||||
&& node["status"].as_str() == Some("active");
|
||||
let snap = mqtt::state::VitalsSnapshot {
|
||||
node_id: id,
|
||||
timestamp_ms: ts_ms,
|
||||
presence,
|
||||
motion,
|
||||
presence_score: if presence { 1.0 } else { 0.0 },
|
||||
n_persons: node["person_count"].as_u64().unwrap_or(0) as u32,
|
||||
rssi_dbm: node["rssi_dbm"].as_f64(),
|
||||
..Default::default()
|
||||
};
|
||||
let _ = vtx.send(snap);
|
||||
}
|
||||
}
|
||||
});
|
||||
tracing::info!("MQTT publisher started -> {host}:{port}");
|
||||
|
|
|
|||
|
|
@ -19,6 +19,13 @@
|
|||
//! through the privacy filter, gate by [`RateLimiter`], encode via
|
||||
//! [`StateEncoder`], publish.
|
||||
//!
|
||||
//! ## Per-node discovery
|
||||
//!
|
||||
//! Each inbound [`VitalsSnapshot`] carries its own `node_id`. The publisher
|
||||
//! keeps a map of per-node discovery identities and registers a new HA
|
||||
//! device the first time a node id is seen on the stream, so N physical
|
||||
//! nodes surface as N Home-Assistant devices (one per room).
|
||||
//!
|
||||
//! ## Reconnect strategy
|
||||
//!
|
||||
//! `rumqttc::EventLoop` reconnects automatically with backoff. After a
|
||||
|
|
@ -27,6 +34,7 @@
|
|||
//! we last refreshed needs them) and reset the rate limiter so the
|
||||
//! first post-reconnect sample emits promptly.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
|
|
@ -80,6 +88,26 @@ impl NodeAvailability {
|
|||
}
|
||||
}
|
||||
|
||||
/// Per-node runtime state owned by the publisher loop: the node's discovery
|
||||
/// identity, its pre-computed availability topics, and an independent rate
|
||||
/// limiter so one chatty room can't starve another's publish budget.
|
||||
struct NodeEntry {
|
||||
builder: OwnedDiscoveryBuilder,
|
||||
avail: NodeAvailability,
|
||||
rate_limiter: RateLimiter,
|
||||
}
|
||||
|
||||
impl NodeEntry {
|
||||
fn new(builder: OwnedDiscoveryBuilder, entities: &[EntityKind]) -> Self {
|
||||
let avail = NodeAvailability::for_builder(&builder.as_borrowed(), entities);
|
||||
Self {
|
||||
builder,
|
||||
avail,
|
||||
rate_limiter: RateLimiter::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn the MQTT publisher background task. Returns the join handle so
|
||||
/// the caller can `await` it on shutdown. Errors during connection are
|
||||
/// retried internally by `rumqttc::EventLoop`.
|
||||
|
|
@ -117,6 +145,21 @@ impl OwnedDiscoveryBuilder {
|
|||
via_device: self.via_device.as_deref(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Clone this template for a concrete node id. The HA device identity and
|
||||
/// every topic derive from `node_id`, so changing it yields a distinct
|
||||
/// device. If `friendly` is `Some`, it overrides the device name;
|
||||
/// otherwise `DiscoveryBuilder::device` falls back to "RuView node <id>".
|
||||
fn for_node(&self, node_id: &str, friendly: Option<&str>) -> OwnedDiscoveryBuilder {
|
||||
let mut b = self.clone();
|
||||
b.node_id = node_id.to_string();
|
||||
if let Some(name) = friendly {
|
||||
b.node_friendly_name = Some(name.to_string());
|
||||
} else {
|
||||
b.node_friendly_name = None;
|
||||
}
|
||||
b
|
||||
}
|
||||
}
|
||||
|
||||
/// Core run loop. Pumps the broadcast channel + the MQTT event loop in
|
||||
|
|
@ -129,22 +172,16 @@ async fn run(
|
|||
let opts = build_mqtt_options(&cfg);
|
||||
let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256);
|
||||
|
||||
let builder_borrowed = builder_owned.as_borrowed();
|
||||
let entities = DiscoveryBuilder::enabled_entities(
|
||||
cfg.privacy_mode,
|
||||
cfg.publish_pose,
|
||||
&[], // no_semantic — wire from cli::Args in P3.5
|
||||
);
|
||||
|
||||
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
|
||||
warn!("[mqtt] initial discovery publish failed: {e}");
|
||||
}
|
||||
let avail = NodeAvailability::for_builder(&builder_borrowed, &entities);
|
||||
if let Err(e) = publish_availability(&client, &avail, "online").await {
|
||||
warn!("[mqtt] initial availability publish failed: {e}");
|
||||
}
|
||||
|
||||
let mut rate_limiter = RateLimiter::new();
|
||||
// One discovery identity + availability + rate limiter PER node id seen on
|
||||
// the stream. Nodes are discovered lazily: the first snapshot from a new
|
||||
// node id triggers its retained discovery + availability publish.
|
||||
let mut nodes: HashMap<String, NodeEntry> = HashMap::new();
|
||||
let mut last_heartbeat = Instant::now();
|
||||
let mut last_refresh = Instant::now();
|
||||
let start_instant = Instant::now();
|
||||
|
|
@ -155,7 +192,7 @@ async fn run(
|
|||
prefix = %cfg.discovery_prefix,
|
||||
entities = entities.len(),
|
||||
privacy = cfg.privacy_mode,
|
||||
"[mqtt] publisher started",
|
||||
"[mqtt] publisher started (per-node discovery)",
|
||||
);
|
||||
|
||||
loop {
|
||||
|
|
@ -169,7 +206,14 @@ async fn run(
|
|||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("[mqtt] event loop error, will reconnect: {e}");
|
||||
rate_limiter.reset();
|
||||
for n in nodes.values_mut() {
|
||||
n.rate_limiter.reset();
|
||||
}
|
||||
// Force re-publish of discovery for all known nodes on
|
||||
// the next refresh tick (HA may have restarted).
|
||||
last_refresh = Instant::now()
|
||||
.checked_sub(Duration::from_secs(cfg.refresh_secs))
|
||||
.unwrap_or_else(Instant::now);
|
||||
// Brief backoff before next poll attempt.
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
|
|
@ -179,14 +223,20 @@ async fn run(
|
|||
// Periodic heartbeat / discovery refresh.
|
||||
_ = tokio::time::sleep(Duration::from_secs(1)) => {
|
||||
if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT {
|
||||
if let Err(e) = publish_availability(&client, &avail, "online").await {
|
||||
warn!("[mqtt] heartbeat publish failed: {e}");
|
||||
for n in nodes.values() {
|
||||
if let Err(e) = publish_availability(&client, &n.avail, "online").await {
|
||||
warn!("[mqtt] heartbeat publish failed: {e}");
|
||||
}
|
||||
}
|
||||
last_heartbeat = Instant::now();
|
||||
}
|
||||
if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) {
|
||||
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
|
||||
warn!("[mqtt] discovery refresh failed: {e}");
|
||||
for n in nodes.values() {
|
||||
if let Err(e) =
|
||||
publish_all_discovery(&client, &n.builder.as_borrowed(), &entities).await
|
||||
{
|
||||
warn!("[mqtt] discovery refresh failed: {e}");
|
||||
}
|
||||
}
|
||||
last_refresh = Instant::now();
|
||||
}
|
||||
|
|
@ -197,15 +247,55 @@ async fn run(
|
|||
match recv {
|
||||
Ok(snap) => {
|
||||
let elapsed = start_instant.elapsed();
|
||||
publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await;
|
||||
|
||||
// Lazily register a brand-new node: publish its retained
|
||||
// discovery + availability exactly once, when first seen.
|
||||
if !nodes.contains_key(&snap.node_id) {
|
||||
let nb = builder_owned.for_node(&snap.node_id, None);
|
||||
let borrowed = nb.as_borrowed();
|
||||
if let Err(e) =
|
||||
publish_all_discovery(&client, &borrowed, &entities).await
|
||||
{
|
||||
warn!(
|
||||
"[mqtt] discovery publish failed for node {}: {e}",
|
||||
snap.node_id
|
||||
);
|
||||
}
|
||||
let entry = NodeEntry::new(nb, &entities);
|
||||
if let Err(e) =
|
||||
publish_availability(&client, &entry.avail, "online").await
|
||||
{
|
||||
warn!(
|
||||
"[mqtt] availability publish failed for node {}: {e}",
|
||||
snap.node_id
|
||||
);
|
||||
}
|
||||
info!("[mqtt] registered HA device for node {}", snap.node_id);
|
||||
nodes.insert(snap.node_id.clone(), entry);
|
||||
}
|
||||
|
||||
if let Some(entry) = nodes.get_mut(&snap.node_id) {
|
||||
let b = entry.builder.as_borrowed();
|
||||
publish_snapshot(
|
||||
&client,
|
||||
&b,
|
||||
&snap,
|
||||
&cfg,
|
||||
&mut entry.rate_limiter,
|
||||
elapsed,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
warn!("[mqtt] lagged behind broadcast by {n} messages — dropped");
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
info!("[mqtt] broadcast channel closed, draining");
|
||||
// Publish offline before exit.
|
||||
let _ = publish_availability(&client, &avail, "offline").await;
|
||||
// Publish offline for every known node before exit.
|
||||
for n in nodes.values() {
|
||||
let _ = publish_availability(&client, &n.avail, "offline").await;
|
||||
}
|
||||
let _ = client.disconnect().await;
|
||||
return;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue