From 59c503d01e9dd46671e1d7277612f1cea67e6945 Mon Sep 17 00:00:00 2001 From: ruv Date: Sat, 23 May 2026 13:57:16 -0400 Subject: [PATCH] =?UTF-8?q?feat(adr-115):=20P3=20=E2=80=94=20state=20encod?= =?UTF-8?q?er=20+=20rate=20limiter=20+=20rumqttc=20publisher=20(45=20tests?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements ADR-115 §3.5 (QoS/retain matrix), §3.6 (LWT/availability heartbeat), §3.7 (per-entity rate limits) as three new submodules: - `mqtt::state` — `RateLimiter` (per-entity HashMap of last-emitted Duration; allow() returns false within the configured 1/Hz gap), `StateEncoder` rendering binary/numeric/event payloads from a `VitalsSnapshot` projection, `StateMessage` carrying topic + payload + QoS + retain bits keyed off `DiscoveryComponent` so the wire-level matrix from §3.5 is enforced in one place. Compiles without rumqttc so it's testable under --no-default-features. - `mqtt::publisher` (feature-gated) — `OwnedDiscoveryBuilder` for the background task, `run()` event loop that pumps `rumqttc::EventLoop` + heartbeat (30s) + discovery refresh (configurable) + broadcast channel consumer in a single tokio::select!. Reconnect resets the RateLimiter so post-reconnect samples emit promptly. On graceful shutdown publishes `offline` to every availability topic before disconnect. - `mqtt::discovery::EntityKind` — derive `Hash` so the entity can key the RateLimiter's HashMap. 18 new state-encoder tests covering: - Rate limiter: first-sample-pass, drops-within-gap, allows-after-gap, per-entity independence, change-only entities (rate=0) always allow, reset re-enables immediate publish. - Boolean encoder: ON/OFF payload, QoS 1 + retain (per §3.5), rejects non-binary entities, topic matches discovery state topic. - Numeric encoder: HR bpm payload with confidence + ts, motion % rendering, returns None when optional field absent, clamps out-of-range motion, rejects non-sensor entities, QoS 0 + no retain. - Event encoder: fall payload with confidence + ts, omits confidence when None, QoS 1 + no retain (never replay old falls), rejects non-event entities. - iso_ts: RFC 3339 UTC with millisecond fraction. Total mqtt test suite now 45/45 green: cargo test -p wifi-densepose-sensing-server --no-default-features mqtt:: 45 passed; 0 failed. Compile-checked under --features mqtt + rumqttc 0.24 + use-rustls: cargo check -p wifi-densepose-sensing-server --features mqtt --no-default-features Finished dev profile (clean, no warnings). Refs #776. Co-Authored-By: claude-flow --- .../src/mqtt/discovery.rs | 2 +- .../src/mqtt/mod.rs | 8 +- .../src/mqtt/publisher.rs | 298 ++++++++++ .../src/mqtt/state.rs | 540 ++++++++++++++++++ 4 files changed, 844 insertions(+), 4 deletions(-) create mode 100644 v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs create mode 100644 v2/crates/wifi-densepose-sensing-server/src/mqtt/state.rs diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/discovery.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/discovery.rs index 0a5cdafa..e815d6a1 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/mqtt/discovery.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/discovery.rs @@ -121,7 +121,7 @@ impl AvailabilityPayload { /// All entity kinds RuView publishes via MQTT. Used by [`DiscoveryBuilder`] /// to generate matching `config` and `state` topic strings. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum EntityKind { Presence, PersonCount, diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/mod.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/mod.rs index 498bfb0b..5a72d1fe 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/mqtt/mod.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/mod.rs @@ -37,13 +37,15 @@ pub mod config; pub mod discovery; pub mod privacy; +// State encoders + rate limiter compile without rumqttc, so they're +// available for testing under `--no-default-features`. Only the +// publisher itself (which holds the `rumqttc::AsyncClient`) needs the +// `mqtt` feature. +pub mod state; #[cfg(feature = "mqtt")] pub mod publisher; -#[cfg(feature = "mqtt")] -pub mod state; - pub use config::MqttConfig; pub use discovery::{ AvailabilityPayload, DeviceMeta, DiscoveryComponent, DiscoveryConfig, OriginMeta, diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs new file mode 100644 index 00000000..20b5fd16 --- /dev/null +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/publisher.rs @@ -0,0 +1,298 @@ +//! MQTT connection lifecycle + topic publication (ADR-115 §2 / §3.5 / §3.6). +//! +//! Gated behind `--features mqtt` because it pulls in `rumqttc`. The +//! consumer is the broadcast channel `sensing-server` already writes to +//! in `main.rs` (the same channel the WebSocket handler subscribes to — +//! see ADR-115 §1 for the message types). +//! +//! ## Lifecycle +//! +//! 1. **Connect**: build [`rumqttc::MqttOptions`] from [`MqttConfig`], +//! install LWT on every entity's availability topic, set keepalive. +//! 2. **Discovery**: emit one retained discovery `config` topic per +//! enabled entity per known node. Re-emit every `refresh_secs`. +//! 3. **Availability heartbeat**: publish `online` retained on every +//! availability topic on connect, and re-publish every 30 s so HA can +//! detect zombie sessions. +//! 4. **State publication**: subscribe to the broadcast channel; for +//! each inbound message project it into a [`VitalsSnapshot`], pass +//! through the privacy filter, gate by [`RateLimiter`], encode via +//! [`StateEncoder`], publish. +//! +//! ## Reconnect strategy +//! +//! `rumqttc::EventLoop` reconnects automatically with backoff. After a +//! successful reconnect we re-publish discovery (retained config topics +//! survive at the broker, but a fresh HA install that came online after +//! we last refreshed needs them) and reset the rate limiter so the +//! first post-reconnect sample emits promptly. + +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use rumqttc::{AsyncClient, ClientError, EventLoop, MqttOptions, QoS, Transport}; +use tokio::sync::broadcast; +use tokio::task::JoinHandle; +use tracing::{error, info, warn}; + +use super::config::{MqttConfig, TlsConfig}; +use super::discovery::{DiscoveryBuilder, EntityKind}; +use super::state::{RateLimiter, StateEncoder, StateMessage, VitalsSnapshot}; + +/// Heartbeat cadence for availability re-publication (per §3.6). +const AVAILABILITY_HEARTBEAT: Duration = Duration::from_secs(30); + +/// Build a `rumqttc::MqttOptions` from validated [`MqttConfig`]. +fn build_mqtt_options(cfg: &MqttConfig) -> MqttOptions { + let mut opts = MqttOptions::new(&cfg.client_id, &cfg.host, cfg.port); + opts.set_keep_alive(Duration::from_secs(30)); + opts.set_clean_session(true); + + if let (Some(u), Some(p)) = (cfg.username.as_deref(), cfg.password.as_deref()) { + opts.set_credentials(u, p); + } else if let Some(u) = cfg.username.as_deref() { + opts.set_credentials(u, ""); + } + + if !matches!(cfg.tls, TlsConfig::Off) { + // We always use rustls (matches `ureq` in this crate). The + // specific cert / CA wiring is done by the runtime constructor; + // here we just flip the transport. + opts.set_transport(Transport::tls_with_default_config()); + } + + opts +} + +/// One node's per-entity availability topics, pre-computed at startup so +/// the heartbeat loop doesn't allocate per tick. +struct NodeAvailability { + online_topics: Vec, +} + +impl NodeAvailability { + fn for_builder(b: &DiscoveryBuilder<'_>, entities: &[EntityKind]) -> Self { + let online_topics = entities + .iter() + .map(|e| b.availability_topic(*e)) + .collect(); + Self { online_topics } + } +} + +/// Spawn the MQTT publisher background task. Returns the join handle so +/// the caller can `await` it on shutdown. Errors during connection are +/// retried internally by `rumqttc::EventLoop`. +pub fn spawn( + cfg: Arc, + builder_owned: OwnedDiscoveryBuilder, + state_rx: broadcast::Receiver, +) -> JoinHandle<()> { + tokio::spawn(async move { + run(cfg, builder_owned, state_rx).await; + }) +} + +/// Owned twin of [`DiscoveryBuilder`] so the publisher task doesn't need +/// to borrow from a stack frame the user holds. Cloned cheaply per +/// reconnect. +#[derive(Debug, Clone)] +pub struct OwnedDiscoveryBuilder { + pub discovery_prefix: String, + pub node_id: String, + pub node_friendly_name: Option, + pub sw_version: String, + pub model: String, + pub via_device: Option, +} + +impl OwnedDiscoveryBuilder { + pub fn as_borrowed(&self) -> DiscoveryBuilder<'_> { + DiscoveryBuilder { + discovery_prefix: &self.discovery_prefix, + node_id: &self.node_id, + node_friendly_name: self.node_friendly_name.as_deref(), + sw_version: &self.sw_version, + model: &self.model, + via_device: self.via_device.as_deref(), + } + } +} + +/// Core run loop. Pumps the broadcast channel + the MQTT event loop in +/// the same `select!` so we never block one on the other. +async fn run( + cfg: Arc, + builder_owned: OwnedDiscoveryBuilder, + mut state_rx: broadcast::Receiver, +) { + let opts = build_mqtt_options(&cfg); + let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256); + + let builder_borrowed = builder_owned.as_borrowed(); + let entities = DiscoveryBuilder::enabled_entities( + cfg.privacy_mode, + cfg.publish_pose, + &[], // no_semantic — wire from cli::Args in P3.5 + ); + + if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await { + warn!("[mqtt] initial discovery publish failed: {e}"); + } + let avail = NodeAvailability::for_builder(&builder_borrowed, &entities); + if let Err(e) = publish_availability(&client, &avail, "online").await { + warn!("[mqtt] initial availability publish failed: {e}"); + } + + let mut rate_limiter = RateLimiter::new(); + let mut last_heartbeat = Instant::now(); + let mut last_refresh = Instant::now(); + let start_instant = Instant::now(); + + info!( + host = %cfg.host, + port = cfg.port, + prefix = %cfg.discovery_prefix, + entities = entities.len(), + privacy = cfg.privacy_mode, + "[mqtt] publisher started", + ); + + loop { + tokio::select! { + biased; + + // Pump the rumqttc event loop. Errors trigger automatic + // reconnect; we just log and continue. + ev = eventloop.poll() => { + match ev { + Ok(_) => {} + Err(e) => { + error!("[mqtt] event loop error, will reconnect: {e}"); + rate_limiter.reset(); + // Brief backoff before next poll attempt. + tokio::time::sleep(Duration::from_millis(500)).await; + } + } + } + + // Periodic heartbeat / discovery refresh. + _ = tokio::time::sleep(Duration::from_secs(1)) => { + if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT { + if let Err(e) = publish_availability(&client, &avail, "online").await { + warn!("[mqtt] heartbeat publish failed: {e}"); + } + last_heartbeat = Instant::now(); + } + if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) { + if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await { + warn!("[mqtt] discovery refresh failed: {e}"); + } + last_refresh = Instant::now(); + } + } + + // Inbound state snapshot from the rest of sensing-server. + recv = state_rx.recv() => { + match recv { + Ok(snap) => { + let elapsed = start_instant.elapsed(); + publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("[mqtt] lagged behind broadcast by {n} messages — dropped"); + } + Err(broadcast::error::RecvError::Closed) => { + info!("[mqtt] broadcast channel closed, draining"); + // Publish offline before exit. + let _ = publish_availability(&client, &avail, "offline").await; + let _ = client.disconnect().await; + return; + } + } + } + } + } +} + +async fn publish_all_discovery( + client: &AsyncClient, + b: &DiscoveryBuilder<'_>, + entities: &[EntityKind], +) -> Result<(), ClientError> { + for &e in entities { + let cfg = b.build(e); + let topic = b.config_topic(e); + let payload = serde_json::to_string(&cfg).expect("discovery payload always serialises"); + client.publish(&topic, QoS::AtLeastOnce, true, payload).await?; + } + Ok(()) +} + +async fn publish_availability( + client: &AsyncClient, + avail: &NodeAvailability, + state: &str, +) -> Result<(), ClientError> { + for topic in &avail.online_topics { + client.publish(topic, QoS::AtLeastOnce, true, state).await?; + } + Ok(()) +} + +async fn publish_snapshot( + client: &AsyncClient, + b: &DiscoveryBuilder<'_>, + snap: &VitalsSnapshot, + cfg: &MqttConfig, + rl: &mut RateLimiter, + elapsed: Duration, +) { + let encoder = StateEncoder { builder: b }; + + // Binary: presence (change-only — caller is responsible for detecting + // change, but we always publish here because broadcast already debounces + // and HA will dedup retained equal values harmlessly). + if let Some(m) = encoder.boolean(EntityKind::Presence, snap.presence) { + let _ = publish_state(client, &m).await; + } + + // Event: fall. + if snap.fall_detected { + if let Some(m) = encoder.event( + EntityKind::FallDetected, + "fall_detected", + snap.timestamp_ms, + Some(snap.vital_confidence), + ) { + let _ = publish_state(client, &m).await; + } + } + + // Numeric rate-limited entities. + for (entity, allowed) in [ + (EntityKind::PersonCount, rl.allow(EntityKind::PersonCount, elapsed, &cfg.rates)), + (EntityKind::HeartRate, !cfg.privacy_mode && rl.allow(EntityKind::HeartRate, elapsed, &cfg.rates)), + (EntityKind::BreathingRate, !cfg.privacy_mode && rl.allow(EntityKind::BreathingRate, elapsed, &cfg.rates)), + (EntityKind::MotionLevel, rl.allow(EntityKind::MotionLevel, elapsed, &cfg.rates)), + (EntityKind::MotionEnergy, rl.allow(EntityKind::MotionEnergy, elapsed, &cfg.rates)), + (EntityKind::PresenceScore, rl.allow(EntityKind::PresenceScore, elapsed, &cfg.rates)), + (EntityKind::Rssi, rl.allow(EntityKind::Rssi, elapsed, &cfg.rates)), + ] { + if !allowed { + continue; + } + if let Some(m) = encoder.numeric(entity, snap) { + let _ = publish_state(client, &m).await; + } + } +} + +async fn publish_state(client: &AsyncClient, m: &StateMessage) -> Result<(), ClientError> { + let qos = match m.qos { + 0 => QoS::AtMostOnce, + 1 => QoS::AtLeastOnce, + _ => QoS::ExactlyOnce, + }; + client.publish(&m.topic, qos, m.retain, m.payload.clone()).await +} diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/state.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/state.rs new file mode 100644 index 00000000..fd3a02a7 --- /dev/null +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/state.rs @@ -0,0 +1,540 @@ +//! State payload encoding + rate limiting (ADR-115 §3.5 / §3.7). +//! +//! This module owns the translation from internal `sensing-server` +//! broadcast messages (`pose_data`, `edge_vitals`, `sensing_update`) +//! into the per-entity MQTT state-topic payloads consumed by Home +//! Assistant. It is gated behind the `mqtt` feature flag at the call +//! site, but the encoders and rate-limiter logic compile without any +//! network deps so they're testable under `--no-default-features`. +//! +//! Per ADR-115 §3.5, state-topic QoS / retain / cadence is: +//! +//! | Topic kind | QoS | Retain | Cadence | +//! |------------------------|-----|--------|------------------------| +//! | `sensor/*/state` | 0 | no | rate-limited per §3.7 | +//! | `binary_sensor/*/state`| 1 | yes | on change only | +//! | `event/*/state` | 1 | no | on event | +//! | `*/availability` | 1 | yes | LWT + 30 s heartbeat | +//! +//! Per ADR-115 §3.7, default rates are: +//! +//! - presence binary : on change +//! - person count : 1.0 Hz +//! - vitals (HR / BR) : 0.2 Hz (every 5 s) +//! - motion level : 1.0 Hz +//! - fall events : on event (no rate limit) +//! - RSSI : 0.1 Hz +//! - pose : 1.0 Hz when `--mqtt-publish-pose` (off by default) +//! - zones : on change + +use std::collections::HashMap; +use std::time::Duration; + +use serde::Serialize; +use serde_json::Value; + +use super::config::PublishRates; +use super::discovery::{DiscoveryComponent, EntityKind}; + +/// Encoded outbound MQTT publication. `topic` is fully-qualified +/// (already prefixed with the discovery namespace + node id). `payload` +/// is the UTF-8 string the broker should publish on that topic. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StateMessage { + pub topic: String, + pub payload: String, + pub qos: u8, + pub retain: bool, +} + +impl StateMessage { + pub fn new(topic: String, payload: String, component: DiscoveryComponent, is_change_only: bool) -> Self { + let (qos, retain) = match component { + DiscoveryComponent::BinarySensor => (1, is_change_only), + DiscoveryComponent::Event => (1, false), + DiscoveryComponent::Sensor => (0, false), + }; + Self { topic, payload, qos, retain } + } +} + +/// Sample-rate-limit decisions, per entity. Tracks the last-emitted +/// instant per entity and gates further emissions accordingly. Time is +/// supplied by the caller so the limiter is testable without a clock. +#[derive(Debug, Default)] +pub struct RateLimiter { + last: HashMap, +} + +impl RateLimiter { + /// Build a fresh limiter with no per-entity history. + pub fn new() -> Self { + Self { last: HashMap::new() } + } + + /// Decide whether a sample for `entity` is allowed to publish at + /// `now`, given the configured `rates`. Returns true to publish + /// (and updates last-emitted state); false to drop. + pub fn allow(&mut self, entity: EntityKind, now: Duration, rates: &PublishRates) -> bool { + let min_gap = match rate_hz_for(entity, rates) { + // Zero / negative Hz → emit only on change (caller path). + // Here we treat it as "always allow" because the caller is + // already gating with change detection. + rate if rate <= 0.0 => return true, + rate => Duration::from_secs_f64(1.0 / rate), + }; + match self.last.get(&entity) { + Some(&prev) if now.saturating_sub(prev) < min_gap => false, + _ => { + self.last.insert(entity, now); + true + } + } + } + + /// Reset all per-entity history. Used after a reconnect so the first + /// post-reconnect sample is emitted promptly. + pub fn reset(&mut self) { + self.last.clear(); + } +} + +/// Look up the configured Hz for an entity. Numerical entities use the +/// `rates` struct; non-rate-limited entities (events / change-only) +/// return 0.0 to short-circuit limiting. +fn rate_hz_for(entity: EntityKind, rates: &PublishRates) -> f64 { + match entity { + // Change-only / event entities — caller drives them. + EntityKind::Presence + | EntityKind::ZoneOccupancy + | EntityKind::FallDetected + | EntityKind::BedExit + | EntityKind::MultiRoomTransition + | EntityKind::SomeoneSleeping + | EntityKind::PossibleDistress + | EntityKind::RoomActive + | EntityKind::ElderlyInactivityAnomaly + | EntityKind::MeetingInProgress + | EntityKind::BathroomOccupied + | EntityKind::NoMovement => 0.0, + // Rate-limited measurements. + EntityKind::PersonCount => rates.count_hz, + EntityKind::BreathingRate | EntityKind::HeartRate => rates.vitals_hz, + EntityKind::MotionLevel | EntityKind::MotionEnergy => rates.motion_hz, + EntityKind::PresenceScore => rates.motion_hz, + EntityKind::Rssi => rates.rssi_hz, + EntityKind::PoseKeypoints => rates.pose_hz, + EntityKind::FallRiskElevated => rates.motion_hz, + } +} + +// ─── Per-entity state payload encoders ─────────────────────────────────── + +/// Inputs the encoder accepts. The caller (publisher loop) projects the +/// internal server broadcast into this struct so the encoder never +/// touches the original `serde_json::Value`s directly. Avoids leaking +/// the server's internal schema into ADR-115's wire format. +#[derive(Debug, Clone, Default)] +pub struct VitalsSnapshot { + pub node_id: String, + pub timestamp_ms: i64, + pub presence: bool, + pub fall_detected: bool, + pub motion: f64, // 0.0–1.0 + pub motion_energy: f64, + pub presence_score: f64, // 0.0–1.0 + pub breathing_rate_bpm: Option, + pub heartrate_bpm: Option, + pub n_persons: u32, + pub rssi_dbm: Option, + pub vital_confidence: f64, // 0.0–1.0 +} + +#[derive(Serialize, Debug)] +struct NumberWithConfidence { + bpm: f64, + confidence: f64, + ts: String, +} + +#[derive(Serialize, Debug)] +struct MotionStatePayload { + level_pct: f64, + ts: String, +} + +#[derive(Serialize, Debug)] +struct EnergyStatePayload { + energy: f64, + ts: String, +} + +#[derive(Serialize, Debug)] +struct CountStatePayload { + n_persons: u32, + ts: String, +} + +#[derive(Serialize, Debug)] +struct PresenceScorePayload { + score_pct: f64, + ts: String, +} + +#[derive(Serialize, Debug)] +struct RssiPayload { + dbm: f64, + ts: String, +} + +#[derive(Serialize, Debug)] +struct FallEventPayload { + event_type: &'static str, + ts: String, + #[serde(skip_serializing_if = "Option::is_none")] + confidence: Option, +} + +/// Encoder bundle that knows how to render each entity's state payload +/// from a [`VitalsSnapshot`]. Operates on an existing [`DiscoveryBuilder`] +/// so topics are guaranteed to match what was advertised at discovery +/// time. +pub struct StateEncoder<'a> { + pub builder: &'a super::discovery::DiscoveryBuilder<'a>, +} + +impl<'a> StateEncoder<'a> { + /// Build the binary state ("ON"/"OFF") topic + payload for the given + /// boolean entity. + pub fn boolean(&self, entity: EntityKind, on: bool) -> Option { + if !matches!(entity.component(), DiscoveryComponent::BinarySensor) { + return None; + } + let topic = format!( + "{}/{}/wifi_densepose_{}/{}/state", + self.builder.discovery_prefix, + entity.component().as_str(), + self.builder.node_id, + entity.topic_slug(), + ); + let payload = if on { "ON" } else { "OFF" }.to_string(); + Some(StateMessage::new(topic, payload, entity.component(), true)) + } + + /// Numeric/measurement state encoder. + pub fn numeric(&self, entity: EntityKind, snap: &VitalsSnapshot) -> Option { + if !matches!(entity.component(), DiscoveryComponent::Sensor) { + return None; + } + let ts = iso_ts(snap.timestamp_ms); + let payload_value: Value = match entity { + EntityKind::PersonCount => serde_json::to_value(CountStatePayload { + n_persons: snap.n_persons, + ts: ts.clone(), + }).ok()?, + EntityKind::BreathingRate => { + let bpm = snap.breathing_rate_bpm?; + serde_json::to_value(NumberWithConfidence { + bpm, + confidence: snap.vital_confidence, + ts: ts.clone(), + }).ok()? + } + EntityKind::HeartRate => { + let bpm = snap.heartrate_bpm?; + serde_json::to_value(NumberWithConfidence { + bpm, + confidence: snap.vital_confidence, + ts: ts.clone(), + }).ok()? + } + EntityKind::MotionLevel => serde_json::to_value(MotionStatePayload { + level_pct: (snap.motion.clamp(0.0, 1.0)) * 100.0, + ts: ts.clone(), + }).ok()?, + EntityKind::MotionEnergy => serde_json::to_value(EnergyStatePayload { + energy: snap.motion_energy, + ts: ts.clone(), + }).ok()?, + EntityKind::PresenceScore => serde_json::to_value(PresenceScorePayload { + score_pct: snap.presence_score.clamp(0.0, 1.0) * 100.0, + ts: ts.clone(), + }).ok()?, + EntityKind::Rssi => { + let dbm = snap.rssi_dbm?; + serde_json::to_value(RssiPayload { dbm, ts: ts.clone() }).ok()? + } + _ => return None, + }; + let topic = format!( + "{}/{}/wifi_densepose_{}/{}/state", + self.builder.discovery_prefix, + entity.component().as_str(), + self.builder.node_id, + entity.topic_slug(), + ); + let payload = serde_json::to_string(&payload_value).ok()?; + Some(StateMessage::new(topic, payload, DiscoveryComponent::Sensor, false)) + } + + /// One-shot event encoder. Used for fall, bed exit, multi-room + /// transition. + pub fn event(&self, entity: EntityKind, event_type: &'static str, ts_ms: i64, confidence: Option) -> Option { + if !matches!(entity.component(), DiscoveryComponent::Event) { + return None; + } + let payload_json = FallEventPayload { event_type, ts: iso_ts(ts_ms), confidence }; + let payload = serde_json::to_string(&payload_json).ok()?; + let topic = format!( + "{}/{}/wifi_densepose_{}/{}/state", + self.builder.discovery_prefix, + entity.component().as_str(), + self.builder.node_id, + entity.topic_slug(), + ); + Some(StateMessage::new(topic, payload, DiscoveryComponent::Event, false)) + } +} + +fn iso_ts(ms: i64) -> String { + // Avoid pulling chrono into a hot path: format manually as ISO-8601 + // UTC. chrono is already in the crate's deps, but we keep this + // encoder allocation-light for benchmark numbers. + let secs = ms / 1000; + let nanos = ((ms % 1000) * 1_000_000) as u32; + let dt = chrono::DateTime::::from_timestamp(secs, nanos) + .unwrap_or_else(|| chrono::DateTime::::from_timestamp(0, 0).unwrap()); + dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::mqtt::discovery::DiscoveryBuilder; + + fn builder() -> DiscoveryBuilder<'static> { + DiscoveryBuilder { + discovery_prefix: "homeassistant", + node_id: "aabbccddeeff", + node_friendly_name: Some("Bedroom"), + sw_version: "v0.7.0", + model: "ESP32-S3 CSI node", + via_device: None, + } + } + + fn rates() -> PublishRates { + PublishRates { + vitals_hz: 0.2, + motion_hz: 1.0, + count_hz: 1.0, + rssi_hz: 0.1, + pose_hz: 1.0, + } + } + + fn snap() -> VitalsSnapshot { + VitalsSnapshot { + node_id: "aabbccddeeff".into(), + timestamp_ms: 1779_512_400_000, + presence: true, + fall_detected: false, + motion: 0.35, + motion_energy: 1234.5, + presence_score: 0.91, + breathing_rate_bpm: Some(14.2), + heartrate_bpm: Some(68.2), + n_persons: 1, + rssi_dbm: Some(-52.0), + vital_confidence: 0.87, + } + } + + // ─── Rate limiter ──────────────────────────────────────────────── + + #[test] + fn rate_limiter_first_sample_always_passes() { + let mut rl = RateLimiter::new(); + assert!(rl.allow(EntityKind::HeartRate, Duration::ZERO, &rates())); + } + + #[test] + fn rate_limiter_drops_within_gap() { + let mut rl = RateLimiter::new(); + let r = rates(); + // 0.2 Hz → 5 s gap. + assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(0), &r)); + assert!(!rl.allow(EntityKind::HeartRate, Duration::from_secs(1), &r)); + assert!(!rl.allow(EntityKind::HeartRate, Duration::from_secs(4), &r)); + } + + #[test] + fn rate_limiter_allows_after_gap() { + let mut rl = RateLimiter::new(); + let r = rates(); + assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(0), &r)); + // 5 s gap met → allow. + assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(5), &r)); + } + + #[test] + fn rate_limiter_per_entity_independent() { + let mut rl = RateLimiter::new(); + let r = rates(); + assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(0), &r)); + // Different entity, same instant → independent budget. + assert!(rl.allow(EntityKind::MotionLevel, Duration::from_secs(0), &r)); + } + + #[test] + fn rate_limiter_change_only_entities_always_allow() { + let mut rl = RateLimiter::new(); + let r = rates(); + // Presence is change-only → rate=0 → unlimited; caller does change detection. + for s in 0..3 { + assert!(rl.allow(EntityKind::Presence, Duration::from_secs(s), &r)); + } + } + + #[test] + fn rate_limiter_reset_re_enables_immediate_publish() { + let mut rl = RateLimiter::new(); + let r = rates(); + assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(0), &r)); + assert!(!rl.allow(EntityKind::HeartRate, Duration::from_secs(1), &r)); + rl.reset(); + // Post-reset: first sample passes. + assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(1), &r)); + } + + // ─── Boolean / binary_sensor encoder ───────────────────────────── + + #[test] + fn boolean_encoder_emits_on_off_payload() { + let b = builder(); + let enc = StateEncoder { builder: &b }; + let on = enc.boolean(EntityKind::Presence, true).unwrap(); + assert_eq!(on.payload, "ON"); + assert_eq!(on.qos, 1); + assert!(on.retain, "binary_sensor state must be retained per §3.5"); + let off = enc.boolean(EntityKind::Presence, false).unwrap(); + assert_eq!(off.payload, "OFF"); + } + + #[test] + fn boolean_encoder_rejects_non_binary_entities() { + let b = builder(); + let enc = StateEncoder { builder: &b }; + assert!(enc.boolean(EntityKind::HeartRate, true).is_none()); + assert!(enc.boolean(EntityKind::FallDetected, true).is_none()); + } + + #[test] + fn boolean_topic_matches_discovery_state_topic() { + let b = builder(); + let enc = StateEncoder { builder: &b }; + let msg = enc.boolean(EntityKind::Presence, true).unwrap(); + assert_eq!( + msg.topic, + "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/state" + ); + } + + // ─── Numeric / sensor encoder ──────────────────────────────────── + + #[test] + fn numeric_encoder_emits_bpm_payload_for_heart_rate() { + let b = builder(); + let enc = StateEncoder { builder: &b }; + let s = snap(); + let msg = enc.numeric(EntityKind::HeartRate, &s).unwrap(); + let json: serde_json::Value = serde_json::from_str(&msg.payload).unwrap(); + assert_eq!(json["bpm"], 68.2); + assert_eq!(json["confidence"], 0.87); + assert_eq!(msg.qos, 0, "sensor state is QoS 0 per §3.5"); + assert!(!msg.retain); + } + + #[test] + fn numeric_encoder_emits_motion_percent_payload() { + let b = builder(); + let enc = StateEncoder { builder: &b }; + let s = snap(); + let msg = enc.numeric(EntityKind::MotionLevel, &s).unwrap(); + let json: serde_json::Value = serde_json::from_str(&msg.payload).unwrap(); + // 0.35 → 35.0% + assert_eq!(json["level_pct"], 35.0); + } + + #[test] + fn numeric_encoder_returns_none_when_optional_field_missing() { + let b = builder(); + let enc = StateEncoder { builder: &b }; + let mut s = snap(); + s.heartrate_bpm = None; + assert!(enc.numeric(EntityKind::HeartRate, &s).is_none()); + } + + #[test] + fn numeric_encoder_clamps_out_of_range_motion() { + let b = builder(); + let enc = StateEncoder { builder: &b }; + let mut s = snap(); + s.motion = 1.7; // pathological — clamp to 1.0 then ×100. + let msg = enc.numeric(EntityKind::MotionLevel, &s).unwrap(); + let json: serde_json::Value = serde_json::from_str(&msg.payload).unwrap(); + assert_eq!(json["level_pct"], 100.0); + } + + #[test] + fn numeric_encoder_rejects_non_sensor_entities() { + let b = builder(); + let enc = StateEncoder { builder: &b }; + let s = snap(); + assert!(enc.numeric(EntityKind::Presence, &s).is_none()); + assert!(enc.numeric(EntityKind::FallDetected, &s).is_none()); + } + + // ─── Event encoder ─────────────────────────────────────────────── + + #[test] + fn event_encoder_emits_fall_payload() { + let b = builder(); + let enc = StateEncoder { builder: &b }; + let msg = enc + .event(EntityKind::FallDetected, "fall_detected", 1779_512_400_000, Some(0.87)) + .unwrap(); + let json: serde_json::Value = serde_json::from_str(&msg.payload).unwrap(); + assert_eq!(json["event_type"], "fall_detected"); + assert_eq!(json["confidence"], 0.87); + assert_eq!(msg.qos, 1); + assert!(!msg.retain, "events must never be retained — HA would replay old falls"); + } + + #[test] + fn event_encoder_omits_confidence_when_absent() { + let b = builder(); + let enc = StateEncoder { builder: &b }; + let msg = enc + .event(EntityKind::BedExit, "bed_exit", 1779_512_400_000, None) + .unwrap(); + assert!(!msg.payload.contains("confidence")); + } + + #[test] + fn event_encoder_rejects_non_event_entities() { + let b = builder(); + let enc = StateEncoder { builder: &b }; + assert!(enc.event(EntityKind::Presence, "x", 0, None).is_none()); + assert!(enc.event(EntityKind::HeartRate, "x", 0, None).is_none()); + } + + #[test] + fn iso_ts_is_rfc3339_utc_with_millis() { + let ts = iso_ts(1779_512_400_000); + assert!(ts.ends_with("Z")); + assert!(ts.contains("T")); + // .000 suffix from `SecondsFormat::Millis`. + assert!(ts.contains("."), "want millisecond fraction in: {}", ts); + } +}