refactor(mqtt): address review feedback on per-node fan-out
- main.rs: parse node_id as number-or-string; clamp person_count to u32 - publisher.rs: document unbounded node-map (no eviction); tidy lazy register
This commit is contained in:
parent
3ca1043fec
commit
0de296f806
|
|
@ -6209,9 +6209,12 @@ async fn main() {
|
|||
};
|
||||
let ts_ms = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64;
|
||||
for node in arr {
|
||||
let id = match node["node_id"].as_u64() {
|
||||
Some(n) => n.to_string(),
|
||||
None => continue,
|
||||
// node_id may arrive as a JSON number (REST shape)
|
||||
// or as a string; accept either, skip anything else.
|
||||
let id = match &node["node_id"] {
|
||||
serde_json::Value::Number(n) => n.to_string(),
|
||||
serde_json::Value::String(s) if !s.is_empty() => s.clone(),
|
||||
_ => continue,
|
||||
};
|
||||
let motion = match node["motion_level"].as_str() {
|
||||
Some("absent") | Some("none") | Some("still")
|
||||
|
|
@ -6226,8 +6229,15 @@ async fn main() {
|
|||
presence,
|
||||
motion,
|
||||
presence_score: if presence { 1.0 } else { 0.0 },
|
||||
n_persons: node["person_count"].as_u64().unwrap_or(0) as u32,
|
||||
// person_count is u64 in the REST shape; clamp into u32.
|
||||
n_persons: node["person_count"]
|
||||
.as_u64()
|
||||
.unwrap_or(0)
|
||||
.min(u32::MAX as u64) as u32,
|
||||
rssi_dbm: node["rssi_dbm"].as_f64(),
|
||||
// Remaining fields (motion_energy, breathing/heart rate,
|
||||
// vital_confidence) are not part of the per-node summary
|
||||
// emitted on this broadcast; Default (None/0.0) is correct.
|
||||
..Default::default()
|
||||
};
|
||||
let _ = vtx.send(snap);
|
||||
|
|
|
|||
|
|
@ -181,6 +181,12 @@ async fn run(
|
|||
// One discovery identity + availability + rate limiter PER node id seen on
|
||||
// the stream. Nodes are discovered lazily: the first snapshot from a new
|
||||
// node id triggers its retained discovery + availability publish.
|
||||
//
|
||||
// NOTE: entries are never evicted — a node that goes silent keeps its HA
|
||||
// device marked `online` (the broadcast carries no per-node "gone" signal).
|
||||
// Bounded in practice by the deployment's physical node count. A future
|
||||
// improvement could track last-seen per node and flip availability to
|
||||
// `offline` after a timeout; intentionally out of scope for this fix.
|
||||
let mut nodes: HashMap<String, NodeEntry> = HashMap::new();
|
||||
let mut last_heartbeat = Instant::now();
|
||||
let mut last_refresh = Instant::now();
|
||||
|
|
@ -248,20 +254,22 @@ async fn run(
|
|||
Ok(snap) => {
|
||||
let elapsed = start_instant.elapsed();
|
||||
|
||||
// Lazily register a brand-new node: publish its retained
|
||||
// discovery + availability exactly once, when first seen.
|
||||
// Lazily register a brand-new node the first time its id is
|
||||
// seen: publish retained discovery + availability once, then
|
||||
// insert. `contains_key` guards a single insert; the publish
|
||||
// borrows `client`/`entities` (not `nodes`), so there is no
|
||||
// borrow conflict with the `get_mut` below.
|
||||
if !nodes.contains_key(&snap.node_id) {
|
||||
let nb = builder_owned.for_node(&snap.node_id, None);
|
||||
let borrowed = nb.as_borrowed();
|
||||
let entry = NodeEntry::new(nb, &entities);
|
||||
if let Err(e) =
|
||||
publish_all_discovery(&client, &borrowed, &entities).await
|
||||
publish_all_discovery(&client, &entry.builder.as_borrowed(), &entities).await
|
||||
{
|
||||
warn!(
|
||||
"[mqtt] discovery publish failed for node {}: {e}",
|
||||
snap.node_id
|
||||
);
|
||||
}
|
||||
let entry = NodeEntry::new(nb, &entities);
|
||||
if let Err(e) =
|
||||
publish_availability(&client, &entry.avail, "online").await
|
||||
{
|
||||
|
|
@ -274,6 +282,8 @@ async fn run(
|
|||
nodes.insert(snap.node_id.clone(), entry);
|
||||
}
|
||||
|
||||
// Route the snapshot to its node's builder + rate limiter.
|
||||
// Always present here (just inserted above if it was new).
|
||||
if let Some(entry) = nodes.get_mut(&snap.node_id) {
|
||||
let b = entry.builder.as_borrowed();
|
||||
publish_snapshot(
|
||||
|
|
|
|||
Loading…
Reference in New Issue