From 4e13c8baeb26d3c5cbaae75ba002dccba37b861f Mon Sep 17 00:00:00 2001 From: Yesuah Date: Sat, 30 May 2026 10:45:50 -0500 Subject: [PATCH 1/3] MQTT publisher is fully implemented but publisher::spawn is never called from main.rs. args.mqtt is never read. Fix adds the channel, wires broadcast_tick_task, and spawns the publisher on startup. --- .../wifi-densepose-sensing-server/src/main.rs | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index 03e459ea..cd73db22 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -894,6 +894,8 @@ struct AppStateInner { // (not replacing) the window-aggregated `tx` / `/ws/sensing` pipeline. intro: wifi_densepose_sensing_server::introspection::IntrospectionState, intro_tx: broadcast::Sender, + #[cfg(feature = "mqtt")] + vitals_tx: Option>, total_detections: u64, start_time: std::time::Instant, /// Vital sign detector (processes CSI frames to estimate HR/RR). @@ -5289,6 +5291,34 @@ async fn broadcast_tick_task(state: SharedState, tick_ms: u64) { let _ = s.tx.send(json); } } + // ADR-115: feed the MQTT publisher (compiled in only when + // --features mqtt is active; the field and this block are + // stripped entirely in the default feature set). + #[cfg(feature = "mqtt")] + if let Some(ref vtx) = s.vitals_tx { + if vtx.receiver_count() > 0 { + use wifi_densepose_sensing_server::mqtt::state::VitalsSnapshot; + let vs = &s.latest_vitals; + let snap = VitalsSnapshot { + node_id: "ruview".to_string(), + timestamp_ms: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64, + presence: update.classification.presence, + fall_detected: false, + motion: s.smoothed_motion, + motion_energy: update.features.motion_band_power, + presence_score: s.smoothed_person_score, + breathing_rate_bpm: vs.breathing_rate_bpm, + heartrate_bpm: vs.heart_rate_bpm, + n_persons: s.person_count() as u32, + rssi_dbm: update.nodes.first().map(|n| n.rssi_dbm), + vital_confidence: (vs.breathing_confidence + vs.heartbeat_confidence) / 2.0, + }; + let _ = vtx.send(snap); + } + } } } } @@ -5995,6 +6025,8 @@ async fn main() { tx, intro: wifi_densepose_sensing_server::introspection::IntrospectionState::new(), intro_tx, + #[cfg(feature = "mqtt")] + vitals_tx: None, total_detections: 0, start_time: std::time::Instant::now(), vital_detector: VitalSignDetector::new(vital_sample_rate), @@ -6092,6 +6124,47 @@ async fn main() { } } + // ADR-115: spawn the MQTT publisher when --mqtt / RUVIEW_MQTT=true is set. + // The publisher task owns connection lifecycle, HA discovery, and per-entity + // state publication. Errors during connect are retried internally by rumqttc. + #[cfg(feature = "mqtt")] + if args.mqtt { + use wifi_densepose_sensing_server::mqtt::{ + config::MqttConfig, + publisher::{spawn as mqtt_spawn, OwnedDiscoveryBuilder}, + state::VitalsSnapshot, + }; + let mqtt_cfg = MqttConfig::from_args(&args); + let can_start = match mqtt_cfg.validate() { + Ok(()) => true, + Err(ref e) if !e.is_fatal() => { + warn!("[mqtt] {e}"); + true + } + Err(e) => { + error!("[mqtt] invalid config — MQTT disabled: {e}"); + false + } + }; + if can_start { + let (vtx, vrx) = broadcast::channel::(16); + { + let mut s = state.write().await; + s.vitals_tx = Some(vtx); + } + let builder = OwnedDiscoveryBuilder { + discovery_prefix: mqtt_cfg.discovery_prefix.clone(), + node_id: mqtt_cfg.client_id.clone(), + node_friendly_name: None, + sw_version: env!("CARGO_PKG_VERSION").to_string(), + model: "WiFi-DensePose".to_string(), + via_device: None, + }; + mqtt_spawn(Arc::new(mqtt_cfg), builder, vrx); + info!("[mqtt] publisher task spawned — connecting to broker"); + } + } + // ADR-050: Parse bind address once, use for all listeners let bind_ip: std::net::IpAddr = args .bind_addr From bcb7b3dad9b9890ea3ba16dad909eca079fcd906 Mon Sep 17 00:00:00 2001 From: Yesuah Date: Sat, 30 May 2026 11:29:33 -0500 Subject: [PATCH 2/3] fix: add MQTT fields to local Args struct; build MqttConfig inline main.rs has its own Args struct separate from the library's cli::Args. MqttConfig::from_args() expects the library type so calling it with the local type caused a type mismatch. Fix adds all MQTT fields to the local Args and constructs MqttConfig inline in the spawn block. --- .../wifi-densepose-sensing-server/src/main.rs | 77 ++++++++++++++++++- 1 file changed, 75 insertions(+), 2 deletions(-) diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index cd73db22..5c9f2860 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -180,6 +180,46 @@ struct Args { #[arg(long)] calibrate: bool, + // ─── ADR-115 §3.8 — MQTT publisher (HA-DISCO) ────────────────────────── + #[arg(long, env = "RUVIEW_MQTT")] + mqtt: bool, + #[arg(long, env = "RUVIEW_MQTT_HOST", default_value = "localhost")] + mqtt_host: String, + #[arg(long, env = "RUVIEW_MQTT_PORT")] + mqtt_port: Option, + #[arg(long, env = "RUVIEW_MQTT_USERNAME")] + mqtt_username: Option, + #[arg(long, default_value = "MQTT_PASSWORD")] + mqtt_password_env: String, + #[arg(long, env = "RUVIEW_MQTT_CLIENT_ID")] + mqtt_client_id: Option, + #[arg(long, env = "RUVIEW_MQTT_PREFIX", default_value = "homeassistant")] + mqtt_prefix: String, + #[arg(long, env = "RUVIEW_MQTT_TLS")] + mqtt_tls: bool, + #[arg(long, value_name = "PATH")] + mqtt_ca_file: Option, + #[arg(long, value_name = "PATH")] + mqtt_client_cert: Option, + #[arg(long, value_name = "PATH")] + mqtt_client_key: Option, + #[arg(long, default_value = "600")] + mqtt_refresh_secs: u64, + #[arg(long, default_value = "0.2")] + mqtt_rate_vitals: f64, + #[arg(long, default_value = "1.0")] + mqtt_rate_motion: f64, + #[arg(long, default_value = "1.0")] + mqtt_rate_count: f64, + #[arg(long, default_value = "0.1")] + mqtt_rate_rssi: f64, + #[arg(long)] + mqtt_publish_pose: bool, + #[arg(long, default_value = "1.0")] + mqtt_rate_pose: f64, + #[arg(long, env = "RUVIEW_PRIVACY_MODE")] + privacy_mode: bool, + // --------------------------------------------------------------- // ADR-102: Edge Module Registry — surface the canonical Cognitum // cog catalog via `GET /api/v1/edge/registry`. @@ -6130,11 +6170,44 @@ async fn main() { #[cfg(feature = "mqtt")] if args.mqtt { use wifi_densepose_sensing_server::mqtt::{ - config::MqttConfig, + config::{MqttConfig, PublishRates, TlsConfig}, publisher::{spawn as mqtt_spawn, OwnedDiscoveryBuilder}, state::VitalsSnapshot, }; - let mqtt_cfg = MqttConfig::from_args(&args); + let password = std::env::var(&args.mqtt_password_env).ok(); + let port = args.mqtt_port.unwrap_or(if args.mqtt_tls { 8883 } else { 1883 }); + let tls = if !args.mqtt_tls { + TlsConfig::Off + } else { + match (args.mqtt_ca_file.as_ref(), args.mqtt_client_cert.as_ref(), args.mqtt_client_key.as_ref()) { + (Some(ca), Some(cert), Some(key)) => TlsConfig::MutualTls { + ca_file: ca.clone(), client_cert: cert.clone(), client_key: key.clone(), + }, + (Some(ca), None, None) => TlsConfig::PinnedCa { ca_file: ca.clone() }, + _ => TlsConfig::SystemTrust, + } + }; + let client_id = args.mqtt_client_id.clone() + .unwrap_or_else(|| format!("wifi-densepose-{}", std::process::id())); + let mqtt_cfg = MqttConfig { + host: args.mqtt_host.clone(), + port, + username: args.mqtt_username.clone(), + password, + client_id: client_id.clone(), + discovery_prefix: args.mqtt_prefix.clone(), + tls, + refresh_secs: args.mqtt_refresh_secs, + rates: PublishRates { + vitals_hz: args.mqtt_rate_vitals, + motion_hz: args.mqtt_rate_motion, + count_hz: args.mqtt_rate_count, + rssi_hz: args.mqtt_rate_rssi, + pose_hz: args.mqtt_rate_pose, + }, + publish_pose: args.mqtt_publish_pose, + privacy_mode: args.privacy_mode, + }; let can_start = match mqtt_cfg.validate() { Ok(()) => true, Err(ref e) if !e.is_fatal() => { From b35582d4e0aeed0c27396cc1ab7bb482d6cefe78 Mon Sep 17 00:00:00 2001 From: Yesuah Date: Sat, 30 May 2026 14:06:10 -0500 Subject: [PATCH 3/3] fix: send VitalsSnapshot from simulated_data_task for MQTT state topics broadcast_tick_task only runs for esp32 source. simulate source uses simulated_data_task which never sent to vitals_tx, so MQTT state topics never received messages and all entities stayed unknown in HA. --- .../wifi-densepose-sensing-server/src/main.rs | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index 5c9f2860..1cf40637 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -5300,6 +5300,31 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { if let Ok(json) = serde_json::to_string(&update) { let _ = s.tx.send(json); } + #[cfg(feature = "mqtt")] + if let Some(ref vtx) = s.vitals_tx { + if vtx.receiver_count() > 0 { + use wifi_densepose_sensing_server::mqtt::state::VitalsSnapshot; + let vs = &s.latest_vitals; + let snap = VitalsSnapshot { + node_id: "ruview".to_string(), + timestamp_ms: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64, + presence: update.classification.presence, + fall_detected: false, + motion: s.smoothed_motion, + motion_energy: update.features.motion_band_power, + presence_score: s.smoothed_person_score, + breathing_rate_bpm: vs.breathing_rate_bpm, + heartrate_bpm: vs.heart_rate_bpm, + n_persons: s.person_count() as u32, + rssi_dbm: update.nodes.first().map(|n| n.rssi_dbm), + vital_confidence: (vs.breathing_confidence + vs.heartbeat_confidence) / 2.0, + }; + let _ = vtx.send(snap); + } + } s.latest_update = Some(update); } }