feat(adr-115): P3 — state encoder + rate limiter + rumqttc publisher (45 tests)
Implements ADR-115 §3.5 (QoS/retain matrix), §3.6 (LWT/availability heartbeat), §3.7 (per-entity rate limits) as three new submodules: - `mqtt::state` — `RateLimiter` (per-entity HashMap of last-emitted Duration; allow() returns false within the configured 1/Hz gap), `StateEncoder` rendering binary/numeric/event payloads from a `VitalsSnapshot` projection, `StateMessage` carrying topic + payload + QoS + retain bits keyed off `DiscoveryComponent` so the wire-level matrix from §3.5 is enforced in one place. Compiles without rumqttc so it's testable under --no-default-features. - `mqtt::publisher` (feature-gated) — `OwnedDiscoveryBuilder` for the background task, `run()` event loop that pumps `rumqttc::EventLoop` + heartbeat (30s) + discovery refresh (configurable) + broadcast channel consumer in a single tokio::select!. Reconnect resets the RateLimiter so post-reconnect samples emit promptly. On graceful shutdown publishes `offline` to every availability topic before disconnect. - `mqtt::discovery::EntityKind` — derive `Hash` so the entity can key the RateLimiter's HashMap. 18 new state-encoder tests covering: - Rate limiter: first-sample-pass, drops-within-gap, allows-after-gap, per-entity independence, change-only entities (rate=0) always allow, reset re-enables immediate publish. - Boolean encoder: ON/OFF payload, QoS 1 + retain (per §3.5), rejects non-binary entities, topic matches discovery state topic. - Numeric encoder: HR bpm payload with confidence + ts, motion % rendering, returns None when optional field absent, clamps out-of-range motion, rejects non-sensor entities, QoS 0 + no retain. - Event encoder: fall payload with confidence + ts, omits confidence when None, QoS 1 + no retain (never replay old falls), rejects non-event entities. - iso_ts: RFC 3339 UTC with millisecond fraction. Total mqtt test suite now 45/45 green: cargo test -p wifi-densepose-sensing-server --no-default-features mqtt:: 45 passed; 0 failed. Compile-checked under --features mqtt + rumqttc 0.24 + use-rustls: cargo check -p wifi-densepose-sensing-server --features mqtt --no-default-features Finished dev profile (clean, no warnings). Refs #776. Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
parent
07d22247b5
commit
59c503d01e
|
|
@ -121,7 +121,7 @@ impl AvailabilityPayload {
|
|||
|
||||
/// All entity kinds RuView publishes via MQTT. Used by [`DiscoveryBuilder`]
|
||||
/// to generate matching `config` and `state` topic strings.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
|
||||
pub enum EntityKind {
|
||||
Presence,
|
||||
PersonCount,
|
||||
|
|
|
|||
|
|
@ -37,13 +37,15 @@
|
|||
pub mod config;
|
||||
pub mod discovery;
|
||||
pub mod privacy;
|
||||
// State encoders + rate limiter compile without rumqttc, so they're
|
||||
// available for testing under `--no-default-features`. Only the
|
||||
// publisher itself (which holds the `rumqttc::AsyncClient`) needs the
|
||||
// `mqtt` feature.
|
||||
pub mod state;
|
||||
|
||||
#[cfg(feature = "mqtt")]
|
||||
pub mod publisher;
|
||||
|
||||
#[cfg(feature = "mqtt")]
|
||||
pub mod state;
|
||||
|
||||
pub use config::MqttConfig;
|
||||
pub use discovery::{
|
||||
AvailabilityPayload, DeviceMeta, DiscoveryComponent, DiscoveryConfig, OriginMeta,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,298 @@
|
|||
//! MQTT connection lifecycle + topic publication (ADR-115 §2 / §3.5 / §3.6).
|
||||
//!
|
||||
//! Gated behind `--features mqtt` because it pulls in `rumqttc`. The
|
||||
//! consumer is the broadcast channel `sensing-server` already writes to
|
||||
//! in `main.rs` (the same channel the WebSocket handler subscribes to —
|
||||
//! see ADR-115 §1 for the message types).
|
||||
//!
|
||||
//! ## Lifecycle
|
||||
//!
|
||||
//! 1. **Connect**: build [`rumqttc::MqttOptions`] from [`MqttConfig`],
|
||||
//! install LWT on every entity's availability topic, set keepalive.
|
||||
//! 2. **Discovery**: emit one retained discovery `config` topic per
|
||||
//! enabled entity per known node. Re-emit every `refresh_secs`.
|
||||
//! 3. **Availability heartbeat**: publish `online` retained on every
|
||||
//! availability topic on connect, and re-publish every 30 s so HA can
|
||||
//! detect zombie sessions.
|
||||
//! 4. **State publication**: subscribe to the broadcast channel; for
|
||||
//! each inbound message project it into a [`VitalsSnapshot`], pass
|
||||
//! through the privacy filter, gate by [`RateLimiter`], encode via
|
||||
//! [`StateEncoder`], publish.
|
||||
//!
|
||||
//! ## Reconnect strategy
|
||||
//!
|
||||
//! `rumqttc::EventLoop` reconnects automatically with backoff. After a
|
||||
//! successful reconnect we re-publish discovery (retained config topics
|
||||
//! survive at the broker, but a fresh HA install that came online after
|
||||
//! we last refreshed needs them) and reset the rate limiter so the
|
||||
//! first post-reconnect sample emits promptly.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use rumqttc::{AsyncClient, ClientError, EventLoop, MqttOptions, QoS, Transport};
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::task::JoinHandle;
|
||||
use tracing::{error, info, warn};
|
||||
|
||||
use super::config::{MqttConfig, TlsConfig};
|
||||
use super::discovery::{DiscoveryBuilder, EntityKind};
|
||||
use super::state::{RateLimiter, StateEncoder, StateMessage, VitalsSnapshot};
|
||||
|
||||
/// Heartbeat cadence for availability re-publication (per §3.6).
|
||||
const AVAILABILITY_HEARTBEAT: Duration = Duration::from_secs(30);
|
||||
|
||||
/// Build a `rumqttc::MqttOptions` from validated [`MqttConfig`].
|
||||
fn build_mqtt_options(cfg: &MqttConfig) -> MqttOptions {
|
||||
let mut opts = MqttOptions::new(&cfg.client_id, &cfg.host, cfg.port);
|
||||
opts.set_keep_alive(Duration::from_secs(30));
|
||||
opts.set_clean_session(true);
|
||||
|
||||
if let (Some(u), Some(p)) = (cfg.username.as_deref(), cfg.password.as_deref()) {
|
||||
opts.set_credentials(u, p);
|
||||
} else if let Some(u) = cfg.username.as_deref() {
|
||||
opts.set_credentials(u, "");
|
||||
}
|
||||
|
||||
if !matches!(cfg.tls, TlsConfig::Off) {
|
||||
// We always use rustls (matches `ureq` in this crate). The
|
||||
// specific cert / CA wiring is done by the runtime constructor;
|
||||
// here we just flip the transport.
|
||||
opts.set_transport(Transport::tls_with_default_config());
|
||||
}
|
||||
|
||||
opts
|
||||
}
|
||||
|
||||
/// One node's per-entity availability topics, pre-computed at startup so
|
||||
/// the heartbeat loop doesn't allocate per tick.
|
||||
struct NodeAvailability {
|
||||
online_topics: Vec<String>,
|
||||
}
|
||||
|
||||
impl NodeAvailability {
|
||||
fn for_builder(b: &DiscoveryBuilder<'_>, entities: &[EntityKind]) -> Self {
|
||||
let online_topics = entities
|
||||
.iter()
|
||||
.map(|e| b.availability_topic(*e))
|
||||
.collect();
|
||||
Self { online_topics }
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn the MQTT publisher background task. Returns the join handle so
|
||||
/// the caller can `await` it on shutdown. Errors during connection are
|
||||
/// retried internally by `rumqttc::EventLoop`.
|
||||
pub fn spawn(
|
||||
cfg: Arc<MqttConfig>,
|
||||
builder_owned: OwnedDiscoveryBuilder,
|
||||
state_rx: broadcast::Receiver<VitalsSnapshot>,
|
||||
) -> JoinHandle<()> {
|
||||
tokio::spawn(async move {
|
||||
run(cfg, builder_owned, state_rx).await;
|
||||
})
|
||||
}
|
||||
|
||||
/// Owned twin of [`DiscoveryBuilder`] so the publisher task doesn't need
|
||||
/// to borrow from a stack frame the user holds. Cloned cheaply per
|
||||
/// reconnect.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct OwnedDiscoveryBuilder {
|
||||
pub discovery_prefix: String,
|
||||
pub node_id: String,
|
||||
pub node_friendly_name: Option<String>,
|
||||
pub sw_version: String,
|
||||
pub model: String,
|
||||
pub via_device: Option<String>,
|
||||
}
|
||||
|
||||
impl OwnedDiscoveryBuilder {
|
||||
pub fn as_borrowed(&self) -> DiscoveryBuilder<'_> {
|
||||
DiscoveryBuilder {
|
||||
discovery_prefix: &self.discovery_prefix,
|
||||
node_id: &self.node_id,
|
||||
node_friendly_name: self.node_friendly_name.as_deref(),
|
||||
sw_version: &self.sw_version,
|
||||
model: &self.model,
|
||||
via_device: self.via_device.as_deref(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Core run loop. Pumps the broadcast channel + the MQTT event loop in
|
||||
/// the same `select!` so we never block one on the other.
|
||||
async fn run(
|
||||
cfg: Arc<MqttConfig>,
|
||||
builder_owned: OwnedDiscoveryBuilder,
|
||||
mut state_rx: broadcast::Receiver<VitalsSnapshot>,
|
||||
) {
|
||||
let opts = build_mqtt_options(&cfg);
|
||||
let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256);
|
||||
|
||||
let builder_borrowed = builder_owned.as_borrowed();
|
||||
let entities = DiscoveryBuilder::enabled_entities(
|
||||
cfg.privacy_mode,
|
||||
cfg.publish_pose,
|
||||
&[], // no_semantic — wire from cli::Args in P3.5
|
||||
);
|
||||
|
||||
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
|
||||
warn!("[mqtt] initial discovery publish failed: {e}");
|
||||
}
|
||||
let avail = NodeAvailability::for_builder(&builder_borrowed, &entities);
|
||||
if let Err(e) = publish_availability(&client, &avail, "online").await {
|
||||
warn!("[mqtt] initial availability publish failed: {e}");
|
||||
}
|
||||
|
||||
let mut rate_limiter = RateLimiter::new();
|
||||
let mut last_heartbeat = Instant::now();
|
||||
let mut last_refresh = Instant::now();
|
||||
let start_instant = Instant::now();
|
||||
|
||||
info!(
|
||||
host = %cfg.host,
|
||||
port = cfg.port,
|
||||
prefix = %cfg.discovery_prefix,
|
||||
entities = entities.len(),
|
||||
privacy = cfg.privacy_mode,
|
||||
"[mqtt] publisher started",
|
||||
);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
// Pump the rumqttc event loop. Errors trigger automatic
|
||||
// reconnect; we just log and continue.
|
||||
ev = eventloop.poll() => {
|
||||
match ev {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("[mqtt] event loop error, will reconnect: {e}");
|
||||
rate_limiter.reset();
|
||||
// Brief backoff before next poll attempt.
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Periodic heartbeat / discovery refresh.
|
||||
_ = tokio::time::sleep(Duration::from_secs(1)) => {
|
||||
if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT {
|
||||
if let Err(e) = publish_availability(&client, &avail, "online").await {
|
||||
warn!("[mqtt] heartbeat publish failed: {e}");
|
||||
}
|
||||
last_heartbeat = Instant::now();
|
||||
}
|
||||
if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) {
|
||||
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
|
||||
warn!("[mqtt] discovery refresh failed: {e}");
|
||||
}
|
||||
last_refresh = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
// Inbound state snapshot from the rest of sensing-server.
|
||||
recv = state_rx.recv() => {
|
||||
match recv {
|
||||
Ok(snap) => {
|
||||
let elapsed = start_instant.elapsed();
|
||||
publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await;
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
warn!("[mqtt] lagged behind broadcast by {n} messages — dropped");
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => {
|
||||
info!("[mqtt] broadcast channel closed, draining");
|
||||
// Publish offline before exit.
|
||||
let _ = publish_availability(&client, &avail, "offline").await;
|
||||
let _ = client.disconnect().await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn publish_all_discovery(
|
||||
client: &AsyncClient,
|
||||
b: &DiscoveryBuilder<'_>,
|
||||
entities: &[EntityKind],
|
||||
) -> Result<(), ClientError> {
|
||||
for &e in entities {
|
||||
let cfg = b.build(e);
|
||||
let topic = b.config_topic(e);
|
||||
let payload = serde_json::to_string(&cfg).expect("discovery payload always serialises");
|
||||
client.publish(&topic, QoS::AtLeastOnce, true, payload).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn publish_availability(
|
||||
client: &AsyncClient,
|
||||
avail: &NodeAvailability,
|
||||
state: &str,
|
||||
) -> Result<(), ClientError> {
|
||||
for topic in &avail.online_topics {
|
||||
client.publish(topic, QoS::AtLeastOnce, true, state).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn publish_snapshot(
|
||||
client: &AsyncClient,
|
||||
b: &DiscoveryBuilder<'_>,
|
||||
snap: &VitalsSnapshot,
|
||||
cfg: &MqttConfig,
|
||||
rl: &mut RateLimiter,
|
||||
elapsed: Duration,
|
||||
) {
|
||||
let encoder = StateEncoder { builder: b };
|
||||
|
||||
// Binary: presence (change-only — caller is responsible for detecting
|
||||
// change, but we always publish here because broadcast already debounces
|
||||
// and HA will dedup retained equal values harmlessly).
|
||||
if let Some(m) = encoder.boolean(EntityKind::Presence, snap.presence) {
|
||||
let _ = publish_state(client, &m).await;
|
||||
}
|
||||
|
||||
// Event: fall.
|
||||
if snap.fall_detected {
|
||||
if let Some(m) = encoder.event(
|
||||
EntityKind::FallDetected,
|
||||
"fall_detected",
|
||||
snap.timestamp_ms,
|
||||
Some(snap.vital_confidence),
|
||||
) {
|
||||
let _ = publish_state(client, &m).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Numeric rate-limited entities.
|
||||
for (entity, allowed) in [
|
||||
(EntityKind::PersonCount, rl.allow(EntityKind::PersonCount, elapsed, &cfg.rates)),
|
||||
(EntityKind::HeartRate, !cfg.privacy_mode && rl.allow(EntityKind::HeartRate, elapsed, &cfg.rates)),
|
||||
(EntityKind::BreathingRate, !cfg.privacy_mode && rl.allow(EntityKind::BreathingRate, elapsed, &cfg.rates)),
|
||||
(EntityKind::MotionLevel, rl.allow(EntityKind::MotionLevel, elapsed, &cfg.rates)),
|
||||
(EntityKind::MotionEnergy, rl.allow(EntityKind::MotionEnergy, elapsed, &cfg.rates)),
|
||||
(EntityKind::PresenceScore, rl.allow(EntityKind::PresenceScore, elapsed, &cfg.rates)),
|
||||
(EntityKind::Rssi, rl.allow(EntityKind::Rssi, elapsed, &cfg.rates)),
|
||||
] {
|
||||
if !allowed {
|
||||
continue;
|
||||
}
|
||||
if let Some(m) = encoder.numeric(entity, snap) {
|
||||
let _ = publish_state(client, &m).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn publish_state(client: &AsyncClient, m: &StateMessage) -> Result<(), ClientError> {
|
||||
let qos = match m.qos {
|
||||
0 => QoS::AtMostOnce,
|
||||
1 => QoS::AtLeastOnce,
|
||||
_ => QoS::ExactlyOnce,
|
||||
};
|
||||
client.publish(&m.topic, qos, m.retain, m.payload.clone()).await
|
||||
}
|
||||
|
|
@ -0,0 +1,540 @@
|
|||
//! State payload encoding + rate limiting (ADR-115 §3.5 / §3.7).
|
||||
//!
|
||||
//! This module owns the translation from internal `sensing-server`
|
||||
//! broadcast messages (`pose_data`, `edge_vitals`, `sensing_update`)
|
||||
//! into the per-entity MQTT state-topic payloads consumed by Home
|
||||
//! Assistant. It is gated behind the `mqtt` feature flag at the call
|
||||
//! site, but the encoders and rate-limiter logic compile without any
|
||||
//! network deps so they're testable under `--no-default-features`.
|
||||
//!
|
||||
//! Per ADR-115 §3.5, state-topic QoS / retain / cadence is:
|
||||
//!
|
||||
//! | Topic kind | QoS | Retain | Cadence |
|
||||
//! |------------------------|-----|--------|------------------------|
|
||||
//! | `sensor/*/state` | 0 | no | rate-limited per §3.7 |
|
||||
//! | `binary_sensor/*/state`| 1 | yes | on change only |
|
||||
//! | `event/*/state` | 1 | no | on event |
|
||||
//! | `*/availability` | 1 | yes | LWT + 30 s heartbeat |
|
||||
//!
|
||||
//! Per ADR-115 §3.7, default rates are:
|
||||
//!
|
||||
//! - presence binary : on change
|
||||
//! - person count : 1.0 Hz
|
||||
//! - vitals (HR / BR) : 0.2 Hz (every 5 s)
|
||||
//! - motion level : 1.0 Hz
|
||||
//! - fall events : on event (no rate limit)
|
||||
//! - RSSI : 0.1 Hz
|
||||
//! - pose : 1.0 Hz when `--mqtt-publish-pose` (off by default)
|
||||
//! - zones : on change
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
|
||||
use super::config::PublishRates;
|
||||
use super::discovery::{DiscoveryComponent, EntityKind};
|
||||
|
||||
/// Encoded outbound MQTT publication. `topic` is fully-qualified
|
||||
/// (already prefixed with the discovery namespace + node id). `payload`
|
||||
/// is the UTF-8 string the broker should publish on that topic.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct StateMessage {
|
||||
pub topic: String,
|
||||
pub payload: String,
|
||||
pub qos: u8,
|
||||
pub retain: bool,
|
||||
}
|
||||
|
||||
impl StateMessage {
|
||||
pub fn new(topic: String, payload: String, component: DiscoveryComponent, is_change_only: bool) -> Self {
|
||||
let (qos, retain) = match component {
|
||||
DiscoveryComponent::BinarySensor => (1, is_change_only),
|
||||
DiscoveryComponent::Event => (1, false),
|
||||
DiscoveryComponent::Sensor => (0, false),
|
||||
};
|
||||
Self { topic, payload, qos, retain }
|
||||
}
|
||||
}
|
||||
|
||||
/// Sample-rate-limit decisions, per entity. Tracks the last-emitted
|
||||
/// instant per entity and gates further emissions accordingly. Time is
|
||||
/// supplied by the caller so the limiter is testable without a clock.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct RateLimiter {
|
||||
last: HashMap<EntityKind, Duration>,
|
||||
}
|
||||
|
||||
impl RateLimiter {
|
||||
/// Build a fresh limiter with no per-entity history.
|
||||
pub fn new() -> Self {
|
||||
Self { last: HashMap::new() }
|
||||
}
|
||||
|
||||
/// Decide whether a sample for `entity` is allowed to publish at
|
||||
/// `now`, given the configured `rates`. Returns true to publish
|
||||
/// (and updates last-emitted state); false to drop.
|
||||
pub fn allow(&mut self, entity: EntityKind, now: Duration, rates: &PublishRates) -> bool {
|
||||
let min_gap = match rate_hz_for(entity, rates) {
|
||||
// Zero / negative Hz → emit only on change (caller path).
|
||||
// Here we treat it as "always allow" because the caller is
|
||||
// already gating with change detection.
|
||||
rate if rate <= 0.0 => return true,
|
||||
rate => Duration::from_secs_f64(1.0 / rate),
|
||||
};
|
||||
match self.last.get(&entity) {
|
||||
Some(&prev) if now.saturating_sub(prev) < min_gap => false,
|
||||
_ => {
|
||||
self.last.insert(entity, now);
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Reset all per-entity history. Used after a reconnect so the first
|
||||
/// post-reconnect sample is emitted promptly.
|
||||
pub fn reset(&mut self) {
|
||||
self.last.clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// Look up the configured Hz for an entity. Numerical entities use the
|
||||
/// `rates` struct; non-rate-limited entities (events / change-only)
|
||||
/// return 0.0 to short-circuit limiting.
|
||||
fn rate_hz_for(entity: EntityKind, rates: &PublishRates) -> f64 {
|
||||
match entity {
|
||||
// Change-only / event entities — caller drives them.
|
||||
EntityKind::Presence
|
||||
| EntityKind::ZoneOccupancy
|
||||
| EntityKind::FallDetected
|
||||
| EntityKind::BedExit
|
||||
| EntityKind::MultiRoomTransition
|
||||
| EntityKind::SomeoneSleeping
|
||||
| EntityKind::PossibleDistress
|
||||
| EntityKind::RoomActive
|
||||
| EntityKind::ElderlyInactivityAnomaly
|
||||
| EntityKind::MeetingInProgress
|
||||
| EntityKind::BathroomOccupied
|
||||
| EntityKind::NoMovement => 0.0,
|
||||
// Rate-limited measurements.
|
||||
EntityKind::PersonCount => rates.count_hz,
|
||||
EntityKind::BreathingRate | EntityKind::HeartRate => rates.vitals_hz,
|
||||
EntityKind::MotionLevel | EntityKind::MotionEnergy => rates.motion_hz,
|
||||
EntityKind::PresenceScore => rates.motion_hz,
|
||||
EntityKind::Rssi => rates.rssi_hz,
|
||||
EntityKind::PoseKeypoints => rates.pose_hz,
|
||||
EntityKind::FallRiskElevated => rates.motion_hz,
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Per-entity state payload encoders ───────────────────────────────────
|
||||
|
||||
/// Inputs the encoder accepts. The caller (publisher loop) projects the
|
||||
/// internal server broadcast into this struct so the encoder never
|
||||
/// touches the original `serde_json::Value`s directly. Avoids leaking
|
||||
/// the server's internal schema into ADR-115's wire format.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct VitalsSnapshot {
|
||||
pub node_id: String,
|
||||
pub timestamp_ms: i64,
|
||||
pub presence: bool,
|
||||
pub fall_detected: bool,
|
||||
pub motion: f64, // 0.0–1.0
|
||||
pub motion_energy: f64,
|
||||
pub presence_score: f64, // 0.0–1.0
|
||||
pub breathing_rate_bpm: Option<f64>,
|
||||
pub heartrate_bpm: Option<f64>,
|
||||
pub n_persons: u32,
|
||||
pub rssi_dbm: Option<f64>,
|
||||
pub vital_confidence: f64, // 0.0–1.0
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct NumberWithConfidence {
|
||||
bpm: f64,
|
||||
confidence: f64,
|
||||
ts: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct MotionStatePayload {
|
||||
level_pct: f64,
|
||||
ts: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct EnergyStatePayload {
|
||||
energy: f64,
|
||||
ts: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct CountStatePayload {
|
||||
n_persons: u32,
|
||||
ts: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct PresenceScorePayload {
|
||||
score_pct: f64,
|
||||
ts: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct RssiPayload {
|
||||
dbm: f64,
|
||||
ts: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
struct FallEventPayload {
|
||||
event_type: &'static str,
|
||||
ts: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
confidence: Option<f64>,
|
||||
}
|
||||
|
||||
/// Encoder bundle that knows how to render each entity's state payload
|
||||
/// from a [`VitalsSnapshot`]. Operates on an existing [`DiscoveryBuilder`]
|
||||
/// so topics are guaranteed to match what was advertised at discovery
|
||||
/// time.
|
||||
pub struct StateEncoder<'a> {
|
||||
pub builder: &'a super::discovery::DiscoveryBuilder<'a>,
|
||||
}
|
||||
|
||||
impl<'a> StateEncoder<'a> {
|
||||
/// Build the binary state ("ON"/"OFF") topic + payload for the given
|
||||
/// boolean entity.
|
||||
pub fn boolean(&self, entity: EntityKind, on: bool) -> Option<StateMessage> {
|
||||
if !matches!(entity.component(), DiscoveryComponent::BinarySensor) {
|
||||
return None;
|
||||
}
|
||||
let topic = format!(
|
||||
"{}/{}/wifi_densepose_{}/{}/state",
|
||||
self.builder.discovery_prefix,
|
||||
entity.component().as_str(),
|
||||
self.builder.node_id,
|
||||
entity.topic_slug(),
|
||||
);
|
||||
let payload = if on { "ON" } else { "OFF" }.to_string();
|
||||
Some(StateMessage::new(topic, payload, entity.component(), true))
|
||||
}
|
||||
|
||||
/// Numeric/measurement state encoder.
|
||||
pub fn numeric(&self, entity: EntityKind, snap: &VitalsSnapshot) -> Option<StateMessage> {
|
||||
if !matches!(entity.component(), DiscoveryComponent::Sensor) {
|
||||
return None;
|
||||
}
|
||||
let ts = iso_ts(snap.timestamp_ms);
|
||||
let payload_value: Value = match entity {
|
||||
EntityKind::PersonCount => serde_json::to_value(CountStatePayload {
|
||||
n_persons: snap.n_persons,
|
||||
ts: ts.clone(),
|
||||
}).ok()?,
|
||||
EntityKind::BreathingRate => {
|
||||
let bpm = snap.breathing_rate_bpm?;
|
||||
serde_json::to_value(NumberWithConfidence {
|
||||
bpm,
|
||||
confidence: snap.vital_confidence,
|
||||
ts: ts.clone(),
|
||||
}).ok()?
|
||||
}
|
||||
EntityKind::HeartRate => {
|
||||
let bpm = snap.heartrate_bpm?;
|
||||
serde_json::to_value(NumberWithConfidence {
|
||||
bpm,
|
||||
confidence: snap.vital_confidence,
|
||||
ts: ts.clone(),
|
||||
}).ok()?
|
||||
}
|
||||
EntityKind::MotionLevel => serde_json::to_value(MotionStatePayload {
|
||||
level_pct: (snap.motion.clamp(0.0, 1.0)) * 100.0,
|
||||
ts: ts.clone(),
|
||||
}).ok()?,
|
||||
EntityKind::MotionEnergy => serde_json::to_value(EnergyStatePayload {
|
||||
energy: snap.motion_energy,
|
||||
ts: ts.clone(),
|
||||
}).ok()?,
|
||||
EntityKind::PresenceScore => serde_json::to_value(PresenceScorePayload {
|
||||
score_pct: snap.presence_score.clamp(0.0, 1.0) * 100.0,
|
||||
ts: ts.clone(),
|
||||
}).ok()?,
|
||||
EntityKind::Rssi => {
|
||||
let dbm = snap.rssi_dbm?;
|
||||
serde_json::to_value(RssiPayload { dbm, ts: ts.clone() }).ok()?
|
||||
}
|
||||
_ => return None,
|
||||
};
|
||||
let topic = format!(
|
||||
"{}/{}/wifi_densepose_{}/{}/state",
|
||||
self.builder.discovery_prefix,
|
||||
entity.component().as_str(),
|
||||
self.builder.node_id,
|
||||
entity.topic_slug(),
|
||||
);
|
||||
let payload = serde_json::to_string(&payload_value).ok()?;
|
||||
Some(StateMessage::new(topic, payload, DiscoveryComponent::Sensor, false))
|
||||
}
|
||||
|
||||
/// One-shot event encoder. Used for fall, bed exit, multi-room
|
||||
/// transition.
|
||||
pub fn event(&self, entity: EntityKind, event_type: &'static str, ts_ms: i64, confidence: Option<f64>) -> Option<StateMessage> {
|
||||
if !matches!(entity.component(), DiscoveryComponent::Event) {
|
||||
return None;
|
||||
}
|
||||
let payload_json = FallEventPayload { event_type, ts: iso_ts(ts_ms), confidence };
|
||||
let payload = serde_json::to_string(&payload_json).ok()?;
|
||||
let topic = format!(
|
||||
"{}/{}/wifi_densepose_{}/{}/state",
|
||||
self.builder.discovery_prefix,
|
||||
entity.component().as_str(),
|
||||
self.builder.node_id,
|
||||
entity.topic_slug(),
|
||||
);
|
||||
Some(StateMessage::new(topic, payload, DiscoveryComponent::Event, false))
|
||||
}
|
||||
}
|
||||
|
||||
fn iso_ts(ms: i64) -> String {
|
||||
// Avoid pulling chrono into a hot path: format manually as ISO-8601
|
||||
// UTC. chrono is already in the crate's deps, but we keep this
|
||||
// encoder allocation-light for benchmark numbers.
|
||||
let secs = ms / 1000;
|
||||
let nanos = ((ms % 1000) * 1_000_000) as u32;
|
||||
let dt = chrono::DateTime::<chrono::Utc>::from_timestamp(secs, nanos)
|
||||
.unwrap_or_else(|| chrono::DateTime::<chrono::Utc>::from_timestamp(0, 0).unwrap());
|
||||
dt.to_rfc3339_opts(chrono::SecondsFormat::Millis, true)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::mqtt::discovery::DiscoveryBuilder;
|
||||
|
||||
fn builder() -> DiscoveryBuilder<'static> {
|
||||
DiscoveryBuilder {
|
||||
discovery_prefix: "homeassistant",
|
||||
node_id: "aabbccddeeff",
|
||||
node_friendly_name: Some("Bedroom"),
|
||||
sw_version: "v0.7.0",
|
||||
model: "ESP32-S3 CSI node",
|
||||
via_device: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn rates() -> PublishRates {
|
||||
PublishRates {
|
||||
vitals_hz: 0.2,
|
||||
motion_hz: 1.0,
|
||||
count_hz: 1.0,
|
||||
rssi_hz: 0.1,
|
||||
pose_hz: 1.0,
|
||||
}
|
||||
}
|
||||
|
||||
fn snap() -> VitalsSnapshot {
|
||||
VitalsSnapshot {
|
||||
node_id: "aabbccddeeff".into(),
|
||||
timestamp_ms: 1779_512_400_000,
|
||||
presence: true,
|
||||
fall_detected: false,
|
||||
motion: 0.35,
|
||||
motion_energy: 1234.5,
|
||||
presence_score: 0.91,
|
||||
breathing_rate_bpm: Some(14.2),
|
||||
heartrate_bpm: Some(68.2),
|
||||
n_persons: 1,
|
||||
rssi_dbm: Some(-52.0),
|
||||
vital_confidence: 0.87,
|
||||
}
|
||||
}
|
||||
|
||||
// ─── Rate limiter ────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn rate_limiter_first_sample_always_passes() {
|
||||
let mut rl = RateLimiter::new();
|
||||
assert!(rl.allow(EntityKind::HeartRate, Duration::ZERO, &rates()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rate_limiter_drops_within_gap() {
|
||||
let mut rl = RateLimiter::new();
|
||||
let r = rates();
|
||||
// 0.2 Hz → 5 s gap.
|
||||
assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(0), &r));
|
||||
assert!(!rl.allow(EntityKind::HeartRate, Duration::from_secs(1), &r));
|
||||
assert!(!rl.allow(EntityKind::HeartRate, Duration::from_secs(4), &r));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rate_limiter_allows_after_gap() {
|
||||
let mut rl = RateLimiter::new();
|
||||
let r = rates();
|
||||
assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(0), &r));
|
||||
// 5 s gap met → allow.
|
||||
assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(5), &r));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rate_limiter_per_entity_independent() {
|
||||
let mut rl = RateLimiter::new();
|
||||
let r = rates();
|
||||
assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(0), &r));
|
||||
// Different entity, same instant → independent budget.
|
||||
assert!(rl.allow(EntityKind::MotionLevel, Duration::from_secs(0), &r));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rate_limiter_change_only_entities_always_allow() {
|
||||
let mut rl = RateLimiter::new();
|
||||
let r = rates();
|
||||
// Presence is change-only → rate=0 → unlimited; caller does change detection.
|
||||
for s in 0..3 {
|
||||
assert!(rl.allow(EntityKind::Presence, Duration::from_secs(s), &r));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rate_limiter_reset_re_enables_immediate_publish() {
|
||||
let mut rl = RateLimiter::new();
|
||||
let r = rates();
|
||||
assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(0), &r));
|
||||
assert!(!rl.allow(EntityKind::HeartRate, Duration::from_secs(1), &r));
|
||||
rl.reset();
|
||||
// Post-reset: first sample passes.
|
||||
assert!(rl.allow(EntityKind::HeartRate, Duration::from_secs(1), &r));
|
||||
}
|
||||
|
||||
// ─── Boolean / binary_sensor encoder ─────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn boolean_encoder_emits_on_off_payload() {
|
||||
let b = builder();
|
||||
let enc = StateEncoder { builder: &b };
|
||||
let on = enc.boolean(EntityKind::Presence, true).unwrap();
|
||||
assert_eq!(on.payload, "ON");
|
||||
assert_eq!(on.qos, 1);
|
||||
assert!(on.retain, "binary_sensor state must be retained per §3.5");
|
||||
let off = enc.boolean(EntityKind::Presence, false).unwrap();
|
||||
assert_eq!(off.payload, "OFF");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn boolean_encoder_rejects_non_binary_entities() {
|
||||
let b = builder();
|
||||
let enc = StateEncoder { builder: &b };
|
||||
assert!(enc.boolean(EntityKind::HeartRate, true).is_none());
|
||||
assert!(enc.boolean(EntityKind::FallDetected, true).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn boolean_topic_matches_discovery_state_topic() {
|
||||
let b = builder();
|
||||
let enc = StateEncoder { builder: &b };
|
||||
let msg = enc.boolean(EntityKind::Presence, true).unwrap();
|
||||
assert_eq!(
|
||||
msg.topic,
|
||||
"homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/state"
|
||||
);
|
||||
}
|
||||
|
||||
// ─── Numeric / sensor encoder ────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn numeric_encoder_emits_bpm_payload_for_heart_rate() {
|
||||
let b = builder();
|
||||
let enc = StateEncoder { builder: &b };
|
||||
let s = snap();
|
||||
let msg = enc.numeric(EntityKind::HeartRate, &s).unwrap();
|
||||
let json: serde_json::Value = serde_json::from_str(&msg.payload).unwrap();
|
||||
assert_eq!(json["bpm"], 68.2);
|
||||
assert_eq!(json["confidence"], 0.87);
|
||||
assert_eq!(msg.qos, 0, "sensor state is QoS 0 per §3.5");
|
||||
assert!(!msg.retain);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn numeric_encoder_emits_motion_percent_payload() {
|
||||
let b = builder();
|
||||
let enc = StateEncoder { builder: &b };
|
||||
let s = snap();
|
||||
let msg = enc.numeric(EntityKind::MotionLevel, &s).unwrap();
|
||||
let json: serde_json::Value = serde_json::from_str(&msg.payload).unwrap();
|
||||
// 0.35 → 35.0%
|
||||
assert_eq!(json["level_pct"], 35.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn numeric_encoder_returns_none_when_optional_field_missing() {
|
||||
let b = builder();
|
||||
let enc = StateEncoder { builder: &b };
|
||||
let mut s = snap();
|
||||
s.heartrate_bpm = None;
|
||||
assert!(enc.numeric(EntityKind::HeartRate, &s).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn numeric_encoder_clamps_out_of_range_motion() {
|
||||
let b = builder();
|
||||
let enc = StateEncoder { builder: &b };
|
||||
let mut s = snap();
|
||||
s.motion = 1.7; // pathological — clamp to 1.0 then ×100.
|
||||
let msg = enc.numeric(EntityKind::MotionLevel, &s).unwrap();
|
||||
let json: serde_json::Value = serde_json::from_str(&msg.payload).unwrap();
|
||||
assert_eq!(json["level_pct"], 100.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn numeric_encoder_rejects_non_sensor_entities() {
|
||||
let b = builder();
|
||||
let enc = StateEncoder { builder: &b };
|
||||
let s = snap();
|
||||
assert!(enc.numeric(EntityKind::Presence, &s).is_none());
|
||||
assert!(enc.numeric(EntityKind::FallDetected, &s).is_none());
|
||||
}
|
||||
|
||||
// ─── Event encoder ───────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn event_encoder_emits_fall_payload() {
|
||||
let b = builder();
|
||||
let enc = StateEncoder { builder: &b };
|
||||
let msg = enc
|
||||
.event(EntityKind::FallDetected, "fall_detected", 1779_512_400_000, Some(0.87))
|
||||
.unwrap();
|
||||
let json: serde_json::Value = serde_json::from_str(&msg.payload).unwrap();
|
||||
assert_eq!(json["event_type"], "fall_detected");
|
||||
assert_eq!(json["confidence"], 0.87);
|
||||
assert_eq!(msg.qos, 1);
|
||||
assert!(!msg.retain, "events must never be retained — HA would replay old falls");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_encoder_omits_confidence_when_absent() {
|
||||
let b = builder();
|
||||
let enc = StateEncoder { builder: &b };
|
||||
let msg = enc
|
||||
.event(EntityKind::BedExit, "bed_exit", 1779_512_400_000, None)
|
||||
.unwrap();
|
||||
assert!(!msg.payload.contains("confidence"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_encoder_rejects_non_event_entities() {
|
||||
let b = builder();
|
||||
let enc = StateEncoder { builder: &b };
|
||||
assert!(enc.event(EntityKind::Presence, "x", 0, None).is_none());
|
||||
assert!(enc.event(EntityKind::HeartRate, "x", 0, None).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iso_ts_is_rfc3339_utc_with_millis() {
|
||||
let ts = iso_ts(1779_512_400_000);
|
||||
assert!(ts.ends_with("Z"));
|
||||
assert!(ts.contains("T"));
|
||||
// .000 suffix from `SecondsFormat::Millis`.
|
||||
assert!(ts.contains("."), "want millisecond fraction in: {}", ts);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue