From 4e13c8baeb26d3c5cbaae75ba002dccba37b861f Mon Sep 17 00:00:00 2001 From: Yesuah Date: Sat, 30 May 2026 10:45:50 -0500 Subject: [PATCH] MQTT publisher is fully implemented but publisher::spawn is never called from main.rs. args.mqtt is never read. Fix adds the channel, wires broadcast_tick_task, and spawns the publisher on startup. --- .../wifi-densepose-sensing-server/src/main.rs | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index 03e459ea..cd73db22 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -894,6 +894,8 @@ struct AppStateInner { // (not replacing) the window-aggregated `tx` / `/ws/sensing` pipeline. intro: wifi_densepose_sensing_server::introspection::IntrospectionState, intro_tx: broadcast::Sender, + #[cfg(feature = "mqtt")] + vitals_tx: Option>, total_detections: u64, start_time: std::time::Instant, /// Vital sign detector (processes CSI frames to estimate HR/RR). @@ -5289,6 +5291,34 @@ async fn broadcast_tick_task(state: SharedState, tick_ms: u64) { let _ = s.tx.send(json); } } + // ADR-115: feed the MQTT publisher (compiled in only when + // --features mqtt is active; the field and this block are + // stripped entirely in the default feature set). + #[cfg(feature = "mqtt")] + if let Some(ref vtx) = s.vitals_tx { + if vtx.receiver_count() > 0 { + use wifi_densepose_sensing_server::mqtt::state::VitalsSnapshot; + let vs = &s.latest_vitals; + let snap = VitalsSnapshot { + node_id: "ruview".to_string(), + timestamp_ms: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64, + presence: update.classification.presence, + fall_detected: false, + motion: s.smoothed_motion, + motion_energy: update.features.motion_band_power, + presence_score: s.smoothed_person_score, + breathing_rate_bpm: vs.breathing_rate_bpm, + heartrate_bpm: vs.heart_rate_bpm, + n_persons: s.person_count() as u32, + rssi_dbm: update.nodes.first().map(|n| n.rssi_dbm), + vital_confidence: (vs.breathing_confidence + vs.heartbeat_confidence) / 2.0, + }; + let _ = vtx.send(snap); + } + } } } } @@ -5995,6 +6025,8 @@ async fn main() { tx, intro: wifi_densepose_sensing_server::introspection::IntrospectionState::new(), intro_tx, + #[cfg(feature = "mqtt")] + vitals_tx: None, total_detections: 0, start_time: std::time::Instant::now(), vital_detector: VitalSignDetector::new(vital_sample_rate), @@ -6092,6 +6124,47 @@ async fn main() { } } + // ADR-115: spawn the MQTT publisher when --mqtt / RUVIEW_MQTT=true is set. + // The publisher task owns connection lifecycle, HA discovery, and per-entity + // state publication. Errors during connect are retried internally by rumqttc. + #[cfg(feature = "mqtt")] + if args.mqtt { + use wifi_densepose_sensing_server::mqtt::{ + config::MqttConfig, + publisher::{spawn as mqtt_spawn, OwnedDiscoveryBuilder}, + state::VitalsSnapshot, + }; + let mqtt_cfg = MqttConfig::from_args(&args); + let can_start = match mqtt_cfg.validate() { + Ok(()) => true, + Err(ref e) if !e.is_fatal() => { + warn!("[mqtt] {e}"); + true + } + Err(e) => { + error!("[mqtt] invalid config — MQTT disabled: {e}"); + false + } + }; + if can_start { + let (vtx, vrx) = broadcast::channel::(16); + { + let mut s = state.write().await; + s.vitals_tx = Some(vtx); + } + let builder = OwnedDiscoveryBuilder { + discovery_prefix: mqtt_cfg.discovery_prefix.clone(), + node_id: mqtt_cfg.client_id.clone(), + node_friendly_name: None, + sw_version: env!("CARGO_PKG_VERSION").to_string(), + model: "WiFi-DensePose".to_string(), + via_device: None, + }; + mqtt_spawn(Arc::new(mqtt_cfg), builder, vrx); + info!("[mqtt] publisher task spawned — connecting to broker"); + } + } + // ADR-050: Parse bind address once, use for all listeners let bind_ip: std::net::IpAddr = args .bind_addr