diff --git a/v2/crates/wifi-densepose-sensing-server/src/lib.rs b/v2/crates/wifi-densepose-sensing-server/src/lib.rs index 807565c1..5e11d6f6 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/lib.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/lib.rs @@ -8,6 +8,7 @@ //! - Real-time CSI introspection / low-latency tap (`introspection`, ADR-099) pub mod bearer_auth; +pub mod cli; pub mod dataset; pub mod edge_registry; #[allow(dead_code)] @@ -15,6 +16,7 @@ pub mod embedding; pub mod graph_transformer; pub mod host_validation; pub mod introspection; +pub mod mqtt; pub mod path_safety; pub mod rvf_container; pub mod rvf_pipeline; diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/config.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/config.rs new file mode 100644 index 00000000..6d0cca53 --- /dev/null +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/config.rs @@ -0,0 +1,293 @@ +//! Runtime configuration for the MQTT publisher, built from CLI args. + +use std::path::PathBuf; + +/// All knobs the MQTT publisher needs. Built by [`MqttConfig::from_args`] +/// after [`crate::cli::Args`] parsing. +#[derive(Debug, Clone)] +pub struct MqttConfig { + pub host: String, + pub port: u16, + pub username: Option, + pub password: Option, + pub client_id: String, + pub discovery_prefix: String, + pub tls: TlsConfig, + pub refresh_secs: u64, + pub rates: PublishRates, + pub publish_pose: bool, + pub privacy_mode: bool, +} + +/// TLS settings for the MQTT publisher. +/// +/// `None` means plaintext. `Some(TlsBundle::SystemTrust)` means encrypt +/// against the system trust store. `Some(TlsBundle::PinnedCa { ... })` +/// means encrypt against a specific CA (the typical Cognitum Seed mTLS +/// recipe). +#[derive(Debug, Clone)] +pub enum TlsConfig { + Off, + SystemTrust, + PinnedCa { ca_file: PathBuf }, + MutualTls { ca_file: PathBuf, client_cert: PathBuf, client_key: PathBuf }, +} + +/// Per-entity publish rates (Hz). Zero means "publish on change only". +#[derive(Debug, Clone, Copy)] +pub struct PublishRates { + pub vitals_hz: f64, + pub motion_hz: f64, + pub count_hz: f64, + pub rssi_hz: f64, + pub pose_hz: f64, +} + +impl Default for PublishRates { + fn default() -> Self { + Self { + vitals_hz: 0.2, + motion_hz: 1.0, + count_hz: 1.0, + rssi_hz: 0.1, + pose_hz: 1.0, + } + } +} + +impl MqttConfig { + /// Build an [`MqttConfig`] from parsed [`crate::cli::Args`]. + /// + /// Reads `mqtt_password_env` to resolve the broker password from the + /// environment so secrets never appear on the command line. Reads + /// `hostname()` via the `gethostname` crate if `mqtt_client_id` was + /// not supplied — we don't add a dep here, we let the publisher + /// supply the default lazily. + pub fn from_args(args: &crate::cli::Args) -> Self { + let password = std::env::var(&args.mqtt_password_env).ok(); + let port = args.mqtt_port.unwrap_or(if args.mqtt_tls { 8883 } else { 1883 }); + let tls = build_tls(args); + let client_id = args + .mqtt_client_id + .clone() + .unwrap_or_else(|| { + // Avoid a `gethostname` dep in P1 — fallback only. + format!("wifi-densepose-{}", std::process::id()) + }); + + Self { + host: args.mqtt_host.clone(), + port, + username: args.mqtt_username.clone(), + password, + client_id, + discovery_prefix: args.mqtt_prefix.clone(), + tls, + refresh_secs: args.mqtt_refresh_secs, + rates: PublishRates { + vitals_hz: args.mqtt_rate_vitals, + motion_hz: args.mqtt_rate_motion, + count_hz: args.mqtt_rate_count, + rssi_hz: args.mqtt_rate_rssi, + pose_hz: args.mqtt_rate_pose, + }, + publish_pose: args.mqtt_publish_pose, + privacy_mode: args.privacy_mode, + } + } + + /// True iff this config is safe to start. Pre-flight validation that + /// runs before any network I/O so users get a clean error instead of + /// a connect failure 30 s later. + pub fn validate(&self) -> Result<(), MqttConfigError> { + if self.host.is_empty() { + return Err(MqttConfigError::EmptyHost); + } + if self.port == 0 { + return Err(MqttConfigError::InvalidPort(self.port)); + } + if self.refresh_secs == 0 { + return Err(MqttConfigError::RefreshTooSmall); + } + for rate in [ + self.rates.vitals_hz, + self.rates.motion_hz, + self.rates.count_hz, + self.rates.rssi_hz, + self.rates.pose_hz, + ] { + if !rate.is_finite() || rate < 0.0 { + return Err(MqttConfigError::InvalidRate(rate)); + } + } + if !self.host.eq_ignore_ascii_case("localhost") + && !self.host.starts_with("127.") + && !self.host.starts_with("::1") + && matches!(self.tls, TlsConfig::Off) + { + // Per ADR-115 §3.9 / §9.5 — WARN now, hard-fail at v0.8.0. + // We return a non-fatal advisory; the caller decides. + return Err(MqttConfigError::PlaintextOnPublicHost { + host: self.host.clone(), + }); + } + Ok(()) + } +} + +fn build_tls(args: &crate::cli::Args) -> TlsConfig { + if !args.mqtt_tls { + return TlsConfig::Off; + } + match ( + args.mqtt_ca_file.as_ref(), + args.mqtt_client_cert.as_ref(), + args.mqtt_client_key.as_ref(), + ) { + (Some(ca), Some(cert), Some(key)) => TlsConfig::MutualTls { + ca_file: ca.clone(), + client_cert: cert.clone(), + client_key: key.clone(), + }, + (Some(ca), None, None) => TlsConfig::PinnedCa { ca_file: ca.clone() }, + _ => TlsConfig::SystemTrust, + } +} + +/// Pre-flight validation errors. +#[derive(Debug, thiserror::Error)] +pub enum MqttConfigError { + #[error("MQTT broker host is empty")] + EmptyHost, + #[error("invalid MQTT broker port: {0}")] + InvalidPort(u16), + #[error("--mqtt-refresh-secs must be >= 1")] + RefreshTooSmall, + #[error("invalid MQTT publish rate: {0} Hz")] + InvalidRate(f64), + #[error( + "plaintext MQTT on non-localhost broker {host} is deprecated and will hard-fail in v0.8.0 \ + (ADR-115 §3.9). Add --mqtt-tls to encrypt." + )] + PlaintextOnPublicHost { host: String }, +} + +impl MqttConfigError { + /// True for errors that block startup. False for advisories the user + /// can override (used for the v0.7.0 → v0.8.0 deprecation curve on + /// plaintext). + pub fn is_fatal(&self) -> bool { + !matches!(self, MqttConfigError::PlaintextOnPublicHost { .. }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use clap::Parser; + + fn parse(args: &[&str]) -> crate::cli::Args { + crate::cli::Args::parse_from(std::iter::once("sensing-server").chain(args.iter().copied())) + } + + #[test] + fn from_args_defaults_localhost_1883() { + let cfg = MqttConfig::from_args(&parse(&[])); + assert_eq!(cfg.host, "localhost"); + assert_eq!(cfg.port, 1883); + assert_eq!(cfg.discovery_prefix, "homeassistant"); + assert!(matches!(cfg.tls, TlsConfig::Off)); + assert_eq!(cfg.refresh_secs, 600); + assert_eq!(cfg.rates.vitals_hz, 0.2); + assert!(!cfg.publish_pose); + assert!(!cfg.privacy_mode); + } + + #[test] + fn tls_flag_bumps_port_to_8883() { + let cfg = MqttConfig::from_args(&parse(&["--mqtt-tls"])); + assert_eq!(cfg.port, 8883); + assert!(matches!(cfg.tls, TlsConfig::SystemTrust)); + } + + #[test] + fn explicit_port_overrides_default() { + let cfg = MqttConfig::from_args(&parse(&["--mqtt-port", "8884"])); + assert_eq!(cfg.port, 8884); + } + + #[test] + fn mtls_when_full_triplet_supplied() { + let cfg = MqttConfig::from_args(&parse(&[ + "--mqtt-tls", + "--mqtt-ca-file", "/etc/ca.pem", + "--mqtt-client-cert", "/etc/client.pem", + "--mqtt-client-key", "/etc/client.key", + ])); + assert!(matches!(cfg.tls, TlsConfig::MutualTls { .. })); + } + + #[test] + fn validate_rejects_empty_host() { + let mut cfg = MqttConfig::from_args(&parse(&[])); + cfg.host = String::new(); + let err = cfg.validate().unwrap_err(); + assert!(matches!(err, MqttConfigError::EmptyHost)); + assert!(err.is_fatal()); + } + + #[test] + fn validate_rejects_zero_port() { + let mut cfg = MqttConfig::from_args(&parse(&[])); + cfg.port = 0; + assert!(matches!(cfg.validate(), Err(MqttConfigError::InvalidPort(0)))); + } + + #[test] + fn validate_localhost_plaintext_ok() { + let cfg = MqttConfig::from_args(&parse(&[])); + // localhost + plaintext is fine — no advisory. + assert!(cfg.validate().is_ok()); + } + + #[test] + fn validate_plaintext_public_advises_but_not_fatal() { + let cfg = MqttConfig::from_args(&parse(&["--mqtt-host", "broker.example.com"])); + let err = cfg.validate().unwrap_err(); + assert!(matches!(err, MqttConfigError::PlaintextOnPublicHost { .. })); + assert!(!err.is_fatal(), "v0.7.0 should warn, not block (ADR-115 §3.9)"); + } + + #[test] + fn validate_public_tls_ok() { + let cfg = MqttConfig::from_args(&parse(&[ + "--mqtt-host", "broker.example.com", + "--mqtt-tls", + ])); + assert!(cfg.validate().is_ok()); + } + + #[test] + fn validate_rejects_negative_rate() { + let mut cfg = MqttConfig::from_args(&parse(&[])); + cfg.rates.vitals_hz = -1.0; + assert!(matches!(cfg.validate(), Err(MqttConfigError::InvalidRate(_)))); + } + + #[test] + fn validate_rejects_nan_rate() { + let mut cfg = MqttConfig::from_args(&parse(&[])); + cfg.rates.motion_hz = f64::NAN; + assert!(matches!(cfg.validate(), Err(MqttConfigError::InvalidRate(_)))); + } + + #[test] + fn password_env_resolution() { + std::env::set_var("RUVIEW_TEST_MQTT_PW", "s3cret"); + let cfg = MqttConfig::from_args(&parse(&[ + "--mqtt-password-env", "RUVIEW_TEST_MQTT_PW", + ])); + assert_eq!(cfg.password.as_deref(), Some("s3cret")); + std::env::remove_var("RUVIEW_TEST_MQTT_PW"); + } +} diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/discovery.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/discovery.rs new file mode 100644 index 00000000..0a5cdafa --- /dev/null +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/discovery.rs @@ -0,0 +1,651 @@ +//! HA MQTT auto-discovery payload generators. +//! +//! Per ADR-115 §3.1 — §3.4 each RuView node becomes one HA `device` and +//! each capability (presence, person count, heart rate, breathing rate, +//! motion, fall, RSSI, zone occupancy, pose) becomes one entity on that +//! device. This module owns the JSON-serializable structures HA expects +//! on the `homeassistant////config` topic. +//! +//! The structures are `Serialize`-only; we never need to parse them +//! back. Field names match Home Assistant's published MQTT-discovery +//! schema (https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery) +//! pinned to the version the project tests against (v2025.5 as of this +//! ADR; bump in `docs/integrations/home-assistant.md` when the test +//! matrix moves). + +use serde::Serialize; + +use super::{MANUFACTURER, ORIGIN_NAME, SUPPORT_URL}; + +/// HA component kinds we publish today. Strings match the HA URL slug. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DiscoveryComponent { + BinarySensor, + Sensor, + Event, +} + +impl DiscoveryComponent { + pub fn as_str(self) -> &'static str { + match self { + DiscoveryComponent::BinarySensor => "binary_sensor", + DiscoveryComponent::Sensor => "sensor", + DiscoveryComponent::Event => "event", + } + } +} + +/// Top-level HA discovery payload. Serialised to JSON and published +/// retained, QoS 1 on `////config`. +/// +/// We only model the fields ADR-115 §3.3 examples touch. HA's schema has +/// many more optional fields; we add them on a per-entity-need basis to +/// keep payloads small (some retained brokers cap message size). +#[derive(Debug, Clone, Serialize)] +pub struct DiscoveryConfig { + pub name: String, + pub unique_id: String, + pub object_id: String, + pub state_topic: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub availability_topic: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub payload_available: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub payload_not_available: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub payload_on: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub payload_off: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub device_class: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub state_class: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub unit_of_measurement: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub icon: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub value_template: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub json_attributes_topic: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub event_types: Option>, + pub qos: u8, + pub device: DeviceMeta, + pub origin: OriginMeta, +} + +/// HA `device` block. Multiple entities pointing at the same +/// `identifiers` are grouped into one device card in the HA UI. +#[derive(Debug, Clone, Serialize)] +pub struct DeviceMeta { + pub identifiers: Vec, + pub name: String, + pub manufacturer: String, + pub model: String, + pub sw_version: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub via_device: Option, +} + +/// HA `origin` block. Tells HA users which software emitted the entities. +#[derive(Debug, Clone, Serialize)] +pub struct OriginMeta { + pub name: String, + pub sw_version: String, + pub support_url: String, +} + +/// Per-entity availability payload. Used as MQTT LWT so the broker +/// publishes `offline` automatically if our connection drops. +#[derive(Debug, Clone)] +pub struct AvailabilityPayload { + pub topic: String, + pub online: &'static str, + pub offline: &'static str, +} + +impl AvailabilityPayload { + pub fn for_entity(prefix: &str, component: DiscoveryComponent, node_id: &str, entity: &str) -> Self { + Self { + topic: format!( + "{prefix}/{}/wifi_densepose_{node_id}/{entity}/availability", + component.as_str() + ), + online: "online", + offline: "offline", + } + } +} + +/// All entity kinds RuView publishes via MQTT. Used by [`DiscoveryBuilder`] +/// to generate matching `config` and `state` topic strings. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EntityKind { + Presence, + PersonCount, + BreathingRate, + HeartRate, + MotionLevel, + MotionEnergy, + FallDetected, + PresenceScore, + Rssi, + ZoneOccupancy, + PoseKeypoints, + // Semantic primitives (ADR-115 §3.12). + SomeoneSleeping, + PossibleDistress, + RoomActive, + ElderlyInactivityAnomaly, + MeetingInProgress, + BathroomOccupied, + FallRiskElevated, + BedExit, + NoMovement, + MultiRoomTransition, +} + +impl EntityKind { + pub fn topic_slug(self) -> &'static str { + match self { + EntityKind::Presence => "presence", + EntityKind::PersonCount => "person_count", + EntityKind::BreathingRate => "breathing_rate", + EntityKind::HeartRate => "heart_rate", + EntityKind::MotionLevel => "motion_level", + EntityKind::MotionEnergy => "motion_energy", + EntityKind::FallDetected => "fall", + EntityKind::PresenceScore => "presence_score", + EntityKind::Rssi => "rssi", + EntityKind::ZoneOccupancy => "zone_occupancy", + EntityKind::PoseKeypoints => "pose", + EntityKind::SomeoneSleeping => "someone_sleeping", + EntityKind::PossibleDistress => "possible_distress", + EntityKind::RoomActive => "room_active", + EntityKind::ElderlyInactivityAnomaly => "elderly_inactivity_anomaly", + EntityKind::MeetingInProgress => "meeting_in_progress", + EntityKind::BathroomOccupied => "bathroom_occupied", + EntityKind::FallRiskElevated => "fall_risk_elevated", + EntityKind::BedExit => "bed_exit", + EntityKind::NoMovement => "no_movement", + EntityKind::MultiRoomTransition => "multi_room_transition", + } + } + + pub fn component(self) -> DiscoveryComponent { + match self { + // Boolean states → binary_sensor. + EntityKind::Presence + | EntityKind::ZoneOccupancy + | EntityKind::SomeoneSleeping + | EntityKind::PossibleDistress + | EntityKind::RoomActive + | EntityKind::ElderlyInactivityAnomaly + | EntityKind::MeetingInProgress + | EntityKind::BathroomOccupied + | EntityKind::NoMovement => DiscoveryComponent::BinarySensor, + // One-shot triggers → event. + EntityKind::FallDetected + | EntityKind::BedExit + | EntityKind::MultiRoomTransition => DiscoveryComponent::Event, + // Numeric measurements → sensor. + EntityKind::PersonCount + | EntityKind::BreathingRate + | EntityKind::HeartRate + | EntityKind::MotionLevel + | EntityKind::MotionEnergy + | EntityKind::PresenceScore + | EntityKind::Rssi + | EntityKind::PoseKeypoints + | EntityKind::FallRiskElevated => DiscoveryComponent::Sensor, + } + } + + /// True iff this entity carries biometric data that `--privacy-mode` + /// must suppress per ADR-115 §3.10 and §3.12.3. Semantic primitives + /// stay published even in privacy mode because they're inferred + /// states, not raw values. + pub fn is_biometric(self) -> bool { + matches!( + self, + EntityKind::BreathingRate | EntityKind::HeartRate | EntityKind::PoseKeypoints + ) + } + + /// Human-readable HA entity name shown in the UI. + pub fn display_name(self) -> &'static str { + match self { + EntityKind::Presence => "Presence", + EntityKind::PersonCount => "Person count", + EntityKind::BreathingRate => "Breathing rate", + EntityKind::HeartRate => "Heart rate", + EntityKind::MotionLevel => "Motion level", + EntityKind::MotionEnergy => "Motion energy", + EntityKind::FallDetected => "Fall detected", + EntityKind::PresenceScore => "Presence score", + EntityKind::Rssi => "Signal strength", + EntityKind::ZoneOccupancy => "Zone occupancy", + EntityKind::PoseKeypoints => "Pose", + EntityKind::SomeoneSleeping => "Someone sleeping", + EntityKind::PossibleDistress => "Possible distress", + EntityKind::RoomActive => "Room active", + EntityKind::ElderlyInactivityAnomaly => "Elderly inactivity anomaly", + EntityKind::MeetingInProgress => "Meeting in progress", + EntityKind::BathroomOccupied => "Bathroom occupied", + EntityKind::FallRiskElevated => "Fall risk elevated", + EntityKind::BedExit => "Bed exit", + EntityKind::NoMovement => "No movement", + EntityKind::MultiRoomTransition => "Room transition", + } + } +} + +/// Builds HA discovery payloads for a specific RuView node. +pub struct DiscoveryBuilder<'a> { + pub discovery_prefix: &'a str, + pub node_id: &'a str, + pub node_friendly_name: Option<&'a str>, + pub sw_version: &'a str, + pub model: &'a str, + pub via_device: Option<&'a str>, +} + +impl<'a> DiscoveryBuilder<'a> { + fn unique_id(&self, entity: EntityKind) -> String { + format!("wifi_densepose_{}_{}", self.node_id, entity.topic_slug()) + } + + fn state_topic(&self, entity: EntityKind) -> String { + format!( + "{}/{}/wifi_densepose_{}/{}/state", + self.discovery_prefix, + entity.component().as_str(), + self.node_id, + entity.topic_slug(), + ) + } + + pub fn config_topic(&self, entity: EntityKind) -> String { + format!( + "{}/{}/wifi_densepose_{}/{}/config", + self.discovery_prefix, + entity.component().as_str(), + self.node_id, + entity.topic_slug(), + ) + } + + pub fn availability_topic(&self, entity: EntityKind) -> String { + format!( + "{}/{}/wifi_densepose_{}/{}/availability", + self.discovery_prefix, + entity.component().as_str(), + self.node_id, + entity.topic_slug(), + ) + } + + fn device(&self) -> DeviceMeta { + let display = self + .node_friendly_name + .map(|n| n.to_string()) + .unwrap_or_else(|| format!("RuView node {}", self.node_id)); + DeviceMeta { + identifiers: vec![format!("wifi_densepose_{}", self.node_id)], + name: display, + manufacturer: MANUFACTURER.to_string(), + model: self.model.to_string(), + sw_version: self.sw_version.to_string(), + via_device: self.via_device.map(|s| s.to_string()), + } + } + + fn origin(&self) -> OriginMeta { + OriginMeta { + name: ORIGIN_NAME.to_string(), + sw_version: env!("CARGO_PKG_VERSION").to_string(), + support_url: SUPPORT_URL.to_string(), + } + } + + /// Build a discovery config payload for one entity on this node. + pub fn build(&self, entity: EntityKind) -> DiscoveryConfig { + let component = entity.component(); + let mut cfg = DiscoveryConfig { + name: entity.display_name().to_string(), + unique_id: self.unique_id(entity), + object_id: self.unique_id(entity), + state_topic: self.state_topic(entity), + availability_topic: Some(self.availability_topic(entity)), + payload_available: Some("online".into()), + payload_not_available: Some("offline".into()), + payload_on: None, + payload_off: None, + device_class: None, + state_class: None, + unit_of_measurement: None, + icon: None, + value_template: None, + json_attributes_topic: None, + event_types: None, + qos: match component { + DiscoveryComponent::BinarySensor | DiscoveryComponent::Event => 1, + DiscoveryComponent::Sensor => 0, + }, + device: self.device(), + origin: self.origin(), + }; + + match entity { + EntityKind::Presence + | EntityKind::ZoneOccupancy + | EntityKind::SomeoneSleeping + | EntityKind::RoomActive + | EntityKind::MeetingInProgress + | EntityKind::BathroomOccupied => { + cfg.payload_on = Some("ON".into()); + cfg.payload_off = Some("OFF".into()); + cfg.device_class = Some("occupancy".into()); + cfg.icon = Some(match entity { + EntityKind::SomeoneSleeping => "mdi:sleep", + EntityKind::MeetingInProgress => "mdi:account-group", + EntityKind::BathroomOccupied => "mdi:shower", + EntityKind::RoomActive => "mdi:home-account", + EntityKind::ZoneOccupancy => "mdi:map-marker", + _ => "mdi:motion-sensor", + }.into()); + } + EntityKind::PossibleDistress + | EntityKind::ElderlyInactivityAnomaly + | EntityKind::NoMovement => { + cfg.payload_on = Some("ON".into()); + cfg.payload_off = Some("OFF".into()); + cfg.device_class = Some("problem".into()); + cfg.icon = Some("mdi:alert-octagon".into()); + } + EntityKind::FallDetected => { + cfg.event_types = Some(vec!["fall_detected".into()]); + cfg.icon = Some("mdi:human-fall".into()); + } + EntityKind::BedExit => { + cfg.event_types = Some(vec!["bed_exit".into()]); + cfg.icon = Some("mdi:bed-empty".into()); + } + EntityKind::MultiRoomTransition => { + cfg.event_types = Some(vec!["transition".into()]); + cfg.icon = Some("mdi:transit-transfer".into()); + } + EntityKind::PersonCount => { + cfg.state_class = Some("measurement".into()); + cfg.unit_of_measurement = Some("persons".into()); + cfg.icon = Some("mdi:account-group".into()); + cfg.value_template = Some("{{ value_json.n_persons }}".into()); + } + EntityKind::BreathingRate => { + cfg.state_class = Some("measurement".into()); + cfg.unit_of_measurement = Some("bpm".into()); + cfg.icon = Some("mdi:lungs".into()); + cfg.value_template = Some("{{ value_json.bpm }}".into()); + cfg.json_attributes_topic = Some(cfg.state_topic.clone()); + } + EntityKind::HeartRate => { + cfg.state_class = Some("measurement".into()); + cfg.unit_of_measurement = Some("bpm".into()); + cfg.icon = Some("mdi:heart-pulse".into()); + cfg.value_template = Some("{{ value_json.bpm }}".into()); + cfg.json_attributes_topic = Some(cfg.state_topic.clone()); + } + EntityKind::MotionLevel => { + cfg.state_class = Some("measurement".into()); + cfg.unit_of_measurement = Some("%".into()); + cfg.icon = Some("mdi:run".into()); + cfg.value_template = Some("{{ value_json.level_pct }}".into()); + } + EntityKind::MotionEnergy => { + cfg.state_class = Some("measurement".into()); + cfg.icon = Some("mdi:waveform".into()); + cfg.value_template = Some("{{ value_json.energy }}".into()); + } + EntityKind::PresenceScore => { + cfg.state_class = Some("measurement".into()); + cfg.unit_of_measurement = Some("%".into()); + cfg.icon = Some("mdi:gauge".into()); + cfg.value_template = Some("{{ value_json.score_pct }}".into()); + } + EntityKind::Rssi => { + cfg.state_class = Some("measurement".into()); + cfg.device_class = Some("signal_strength".into()); + cfg.unit_of_measurement = Some("dBm".into()); + cfg.icon = Some("mdi:wifi".into()); + cfg.value_template = Some("{{ value_json.dbm }}".into()); + } + EntityKind::PoseKeypoints => { + cfg.icon = Some("mdi:human".into()); + cfg.json_attributes_topic = Some(cfg.state_topic.clone()); + cfg.value_template = Some("{{ value_json.n_keypoints }}".into()); + } + EntityKind::FallRiskElevated => { + cfg.state_class = Some("measurement".into()); + cfg.unit_of_measurement = Some("score".into()); + cfg.icon = Some("mdi:human-fall".into()); + cfg.value_template = Some("{{ value_json.score }}".into()); + } + } + + cfg + } + + /// All entity kinds this builder will publish, given a `privacy_mode` + /// flag and a `publish_pose` flag. Used by the publisher to drive the + /// discovery-emission loop. + pub fn enabled_entities(privacy_mode: bool, publish_pose: bool, semantic_disabled: &[String]) -> Vec { + let all = [ + EntityKind::Presence, + EntityKind::PersonCount, + EntityKind::BreathingRate, + EntityKind::HeartRate, + EntityKind::MotionLevel, + EntityKind::MotionEnergy, + EntityKind::FallDetected, + EntityKind::PresenceScore, + EntityKind::Rssi, + EntityKind::ZoneOccupancy, + EntityKind::PoseKeypoints, + EntityKind::SomeoneSleeping, + EntityKind::PossibleDistress, + EntityKind::RoomActive, + EntityKind::ElderlyInactivityAnomaly, + EntityKind::MeetingInProgress, + EntityKind::BathroomOccupied, + EntityKind::FallRiskElevated, + EntityKind::BedExit, + EntityKind::NoMovement, + EntityKind::MultiRoomTransition, + ]; + + all.into_iter() + .filter(|e| { + if privacy_mode && e.is_biometric() { + return false; + } + if *e == EntityKind::PoseKeypoints && !publish_pose { + return false; + } + if let Some(slug) = semantic_slug_for(*e) { + if semantic_disabled.iter().any(|d| d == slug) { + return false; + } + } + true + }) + .collect() + } +} + +/// For an entity kind, return the `--no-semantic ` slug it +/// would be disabled by, or `None` if it's not a semantic primitive. +fn semantic_slug_for(e: EntityKind) -> Option<&'static str> { + Some(match e { + EntityKind::SomeoneSleeping => "sleeping", + EntityKind::PossibleDistress => "distress", + EntityKind::RoomActive => "room_active", + EntityKind::ElderlyInactivityAnomaly => "elderly_anomaly", + EntityKind::MeetingInProgress => "meeting", + EntityKind::BathroomOccupied => "bathroom", + EntityKind::FallRiskElevated => "fall_risk", + EntityKind::BedExit => "bed_exit", + EntityKind::NoMovement => "no_movement", + EntityKind::MultiRoomTransition => "multi_room", + _ => return None, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::Value; + + fn builder() -> DiscoveryBuilder<'static> { + DiscoveryBuilder { + discovery_prefix: "homeassistant", + node_id: "aabbccddeeff", + node_friendly_name: Some("Bedroom"), + sw_version: "v0.7.0", + model: "ESP32-S3 CSI node", + via_device: Some("cognitum_seed_1"), + } + } + + #[test] + fn presence_discovery_payload_shape() { + let b = builder(); + let cfg = b.build(EntityKind::Presence); + let j: Value = serde_json::to_value(&cfg).unwrap(); + assert_eq!(j["name"], "Presence"); + assert_eq!(j["unique_id"], "wifi_densepose_aabbccddeeff_presence"); + assert_eq!(j["device_class"], "occupancy"); + assert_eq!(j["payload_on"], "ON"); + assert_eq!(j["payload_off"], "OFF"); + assert_eq!(j["qos"], 1); + assert_eq!( + j["state_topic"], + "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/state" + ); + assert_eq!(j["device"]["identifiers"][0], "wifi_densepose_aabbccddeeff"); + assert_eq!(j["device"]["name"], "Bedroom"); + assert_eq!(j["device"]["via_device"], "cognitum_seed_1"); + assert_eq!(j["origin"]["name"], "wifi-densepose-sensing-server"); + } + + #[test] + fn heart_rate_discovery_payload_shape() { + let b = builder(); + let cfg = b.build(EntityKind::HeartRate); + let j: Value = serde_json::to_value(&cfg).unwrap(); + assert_eq!(j["unit_of_measurement"], "bpm"); + assert_eq!(j["state_class"], "measurement"); + assert_eq!(j["value_template"], "{{ value_json.bpm }}"); + assert_eq!(j["qos"], 0); + assert!(j["json_attributes_topic"].as_str().unwrap().ends_with("/state")); + } + + #[test] + fn fall_event_payload_uses_event_component_and_types() { + let b = builder(); + let cfg = b.build(EntityKind::FallDetected); + let j: Value = serde_json::to_value(&cfg).unwrap(); + assert!(j["state_topic"].as_str().unwrap().contains("/event/")); + assert_eq!(j["event_types"][0], "fall_detected"); + assert_eq!(j["qos"], 1); + } + + #[test] + fn semantic_primitive_uses_problem_class_for_distress() { + let b = builder(); + let cfg = b.build(EntityKind::PossibleDistress); + let j: Value = serde_json::to_value(&cfg).unwrap(); + assert_eq!(j["device_class"], "problem"); + assert_eq!(j["payload_on"], "ON"); + assert_eq!(j["payload_off"], "OFF"); + } + + #[test] + fn enabled_entities_default_excludes_pose_and_includes_all_others() { + let entities = DiscoveryBuilder::enabled_entities(false, false, &[]); + assert!(!entities.contains(&EntityKind::PoseKeypoints)); + assert!(entities.contains(&EntityKind::Presence)); + assert!(entities.contains(&EntityKind::HeartRate)); + assert!(entities.contains(&EntityKind::SomeoneSleeping)); + } + + #[test] + fn privacy_mode_strips_biometrics() { + let entities = DiscoveryBuilder::enabled_entities(true, true, &[]); + for e in &entities { + assert!(!e.is_biometric(), "biometric {:?} leaked with privacy_mode", e); + } + // Semantic primitives must remain available (ADR-115 §3.12.3). + assert!(entities.contains(&EntityKind::SomeoneSleeping)); + assert!(entities.contains(&EntityKind::BathroomOccupied)); + } + + #[test] + fn no_semantic_disables_specific_primitive() { + let disabled = vec!["distress".to_string(), "sleeping".to_string()]; + let entities = DiscoveryBuilder::enabled_entities(false, false, &disabled); + assert!(!entities.contains(&EntityKind::PossibleDistress)); + assert!(!entities.contains(&EntityKind::SomeoneSleeping)); + // Raw signals untouched. + assert!(entities.contains(&EntityKind::Presence)); + } + + #[test] + fn topic_components_match_entity_kind() { + // binary_sensor for booleans. + assert_eq!(EntityKind::Presence.component(), DiscoveryComponent::BinarySensor); + assert_eq!(EntityKind::SomeoneSleeping.component(), DiscoveryComponent::BinarySensor); + // event for one-shots. + assert_eq!(EntityKind::FallDetected.component(), DiscoveryComponent::Event); + assert_eq!(EntityKind::BedExit.component(), DiscoveryComponent::Event); + // sensor for measurements. + assert_eq!(EntityKind::HeartRate.component(), DiscoveryComponent::Sensor); + assert_eq!(EntityKind::Rssi.component(), DiscoveryComponent::Sensor); + } + + #[test] + fn discovery_config_serialises_without_null_fields() { + let b = builder(); + let cfg = b.build(EntityKind::Presence); + let j = serde_json::to_string(&cfg).unwrap(); + // skip_serializing_if = "Option::is_none" must hide unused fields + // so retained payloads stay compact on small brokers. + assert!(!j.contains("\"event_types\":null")); + assert!(!j.contains("\"unit_of_measurement\":null")); + assert!(!j.contains("\"value_template\":null")); + } + + #[test] + fn availability_topic_matches_state_topic_path() { + let b = builder(); + let state = format!( + "homeassistant/binary_sensor/wifi_densepose_aabbccddeeff/presence/state" + ); + let avail = b.availability_topic(EntityKind::Presence); + // Must differ only in suffix. + assert_eq!( + state.trim_end_matches("/state"), + avail.trim_end_matches("/availability"), + ); + } + + #[test] + fn unique_id_uses_namespaced_node_prefix() { + let b = builder(); + let cfg = b.build(EntityKind::Rssi); + assert!(cfg.unique_id.starts_with("wifi_densepose_")); + // ADR-115 §7 — namespace prevents collision with other HA devices. + assert!(cfg.unique_id.contains(b.node_id)); + } +} diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/mod.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/mod.rs new file mode 100644 index 00000000..498bfb0b --- /dev/null +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/mod.rs @@ -0,0 +1,70 @@ +//! ADR-115 §2 — MQTT auto-discovery publisher (HA-DISCO). +//! +//! This module implements the dual-protocol Home Assistant integration's +//! primary path: MQTT + HA auto-discovery. It owns the full lifecycle: +//! +//! 1. Connect to a user-supplied broker with optional TLS / mTLS. +//! 2. Publish HA discovery `config` topics (retained) on connect and at +//! a refresh interval, so HA auto-creates one device + N entities per +//! RuView node. +//! 3. Translate `sensing-server` broadcast messages (`edge_vitals`, +//! `pose_data`, `sensing_update`) into per-entity state messages with +//! rate limits. +//! 4. Maintain a `availability` topic per entity with LWT for offline +//! detection. +//! +//! The module is gated behind the `mqtt` Cargo feature so the default +//! `sensing-server` binary stays small for users who don't need HA +//! integration. CLI flags parse unconditionally; the publisher is a +//! no-op without the feature. +//! +//! ## Layout +//! +//! - [`discovery`] — HA discovery payload generators per entity type +//! - [`state`] — per-entity state-message encoders + rate limiter +//! - [`publisher`] — connection lifecycle + topic publication +//! - [`privacy`] — biometric stripping per `--privacy-mode` +//! - [`config`] — `MqttConfig` struct fed by [`crate::cli::Args`] +//! +//! ## Cross-protocol coupling +//! +//! The semantic inference layer (ADR-115 §3.12, future `crate::semantic`) +//! emits primitive state changes onto a `tokio::broadcast` channel that +//! this module also subscribes to. Same channel is consumed by the Matter +//! Bridge (ADR-115 §3.11, future `crate::matter`), so adding a new +//! semantic primitive automatically flows to all surfaces. + +pub mod config; +pub mod discovery; +pub mod privacy; + +#[cfg(feature = "mqtt")] +pub mod publisher; + +#[cfg(feature = "mqtt")] +pub mod state; + +pub use config::MqttConfig; +pub use discovery::{ + AvailabilityPayload, DeviceMeta, DiscoveryComponent, DiscoveryConfig, OriginMeta, +}; + +/// Stable origin string written into every HA discovery payload's `origin` +/// block so HA users can see which RuView version emitted the entities. +pub const ORIGIN_NAME: &str = "wifi-densepose-sensing-server"; + +/// Stable manufacturer string written into every HA discovery payload's +/// `device` block. +pub const MANUFACTURER: &str = "ruvnet"; + +/// Stable `support_url` written into every HA discovery payload's `origin` +/// block. Resolves to the HACS Python integration's follow-on repository +/// per ADR-115 §9.3. +pub const SUPPORT_URL: &str = "https://github.com/ruvnet/hass-wifi-densepose"; + +/// Stable HA discovery topic prefix default. Maintainer-accepted in +/// ADR-115 §9.2 — ship Home Assistant's own default rather than a +/// RuView-namespaced one, so the integration is plug-and-play with a +/// stock Mosquitto add-on. Operators with custom HA setups can override +/// via `--mqtt-prefix`. +pub const DEFAULT_DISCOVERY_PREFIX: &str = "homeassistant"; diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/privacy.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/privacy.rs new file mode 100644 index 00000000..e9fe877f --- /dev/null +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/privacy.rs @@ -0,0 +1,103 @@ +//! Privacy-mode filter for outbound MQTT (and Matter) state messages. +//! +//! Implements the ADR-106 primitive-isolation contract at the integration +//! boundary, gated by [`crate::cli::Args::privacy_mode`]. When the flag is +//! set, biometric channels (HR, BR, raw pose keypoints) are stripped +//! from every outbound message *and* their entities are never discovered +//! by Home Assistant — `discovery.rs::DiscoveryBuilder::enabled_entities` +//! returns the filtered set. +//! +//! Semantic primitives (someone-sleeping, possible-distress, etc) stay +//! enabled in privacy mode because they're inferred *states*, not raw +//! biometric values. The inference runs server-side and only the boolean +//! / numeric state crosses the wire. This is the key design choice that +//! makes ADR-115 §3.12 enterprise- and healthcare-deployable. + +use super::discovery::EntityKind; + +/// Decision for one outbound publication. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PublishDecision { + /// Send as-is. + Publish, + /// Drop silently (entity is suppressed by privacy mode). + Suppress, +} + +/// Decide whether an entity may be published given a privacy-mode flag. +/// +/// Discovery and state share the same filter so an HA controller can't +/// learn from the absence of state that the entity might exist with +/// different filters in place — if it's stripped, it's stripped at every +/// layer. +pub fn decide(entity: EntityKind, privacy_mode: bool) -> PublishDecision { + if privacy_mode && entity.is_biometric() { + PublishDecision::Suppress + } else { + PublishDecision::Publish + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn privacy_off_publishes_everything() { + for e in [ + EntityKind::Presence, + EntityKind::HeartRate, + EntityKind::BreathingRate, + EntityKind::PoseKeypoints, + EntityKind::SomeoneSleeping, + EntityKind::PossibleDistress, + EntityKind::FallDetected, + ] { + assert_eq!(decide(e, false), PublishDecision::Publish); + } + } + + #[test] + fn privacy_on_suppresses_biometrics_only() { + // HR / BR / pose keypoints → suppressed. + assert_eq!(decide(EntityKind::HeartRate, true), PublishDecision::Suppress); + assert_eq!(decide(EntityKind::BreathingRate, true), PublishDecision::Suppress); + assert_eq!(decide(EntityKind::PoseKeypoints, true), PublishDecision::Suppress); + } + + #[test] + fn privacy_on_keeps_non_biometric_signals() { + for e in [ + EntityKind::Presence, + EntityKind::PersonCount, + EntityKind::MotionLevel, + EntityKind::Rssi, + EntityKind::ZoneOccupancy, + EntityKind::FallDetected, + EntityKind::PresenceScore, + ] { + assert_eq!(decide(e, true), PublishDecision::Publish, "{:?} should not be suppressed", e); + } + } + + #[test] + fn privacy_on_keeps_semantic_primitives() { + // Per ADR-115 §3.12.3 — semantic primitives are *inferred* states, + // not raw biometrics, so they remain available in privacy mode. + // This is the core privacy win of HA-MIND. + for e in [ + EntityKind::SomeoneSleeping, + EntityKind::PossibleDistress, + EntityKind::RoomActive, + EntityKind::ElderlyInactivityAnomaly, + EntityKind::MeetingInProgress, + EntityKind::BathroomOccupied, + EntityKind::FallRiskElevated, + EntityKind::BedExit, + EntityKind::NoMovement, + EntityKind::MultiRoomTransition, + ] { + assert_eq!(decide(e, true), PublishDecision::Publish, "{:?} should not be suppressed", e); + } + } +}