diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index ae59d797..1fd37ce9 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -187,6 +187,46 @@ struct Args { #[arg(long)] calibrate: bool, + // ─── ADR-115 §3.8 — MQTT publisher (HA-DISCO) ────────────────────────── + #[arg(long, env = "RUVIEW_MQTT")] + mqtt: bool, + #[arg(long, env = "RUVIEW_MQTT_HOST", default_value = "localhost")] + mqtt_host: String, + #[arg(long, env = "RUVIEW_MQTT_PORT")] + mqtt_port: Option, + #[arg(long, env = "RUVIEW_MQTT_USERNAME")] + mqtt_username: Option, + #[arg(long, default_value = "MQTT_PASSWORD")] + mqtt_password_env: String, + #[arg(long, env = "RUVIEW_MQTT_CLIENT_ID")] + mqtt_client_id: Option, + #[arg(long, env = "RUVIEW_MQTT_PREFIX", default_value = "homeassistant")] + mqtt_prefix: String, + #[arg(long, env = "RUVIEW_MQTT_TLS")] + mqtt_tls: bool, + #[arg(long, value_name = "PATH")] + mqtt_ca_file: Option, + #[arg(long, value_name = "PATH")] + mqtt_client_cert: Option, + #[arg(long, value_name = "PATH")] + mqtt_client_key: Option, + #[arg(long, default_value = "600")] + mqtt_refresh_secs: u64, + #[arg(long, default_value = "0.2")] + mqtt_rate_vitals: f64, + #[arg(long, default_value = "1.0")] + mqtt_rate_motion: f64, + #[arg(long, default_value = "1.0")] + mqtt_rate_count: f64, + #[arg(long, default_value = "0.1")] + mqtt_rate_rssi: f64, + #[arg(long)] + mqtt_publish_pose: bool, + #[arg(long, default_value = "1.0")] + mqtt_rate_pose: f64, + #[arg(long, env = "RUVIEW_PRIVACY_MODE")] + privacy_mode: bool, + // --------------------------------------------------------------- // ADR-102: Edge Module Registry — surface the canonical Cognitum // cog catalog via `GET /api/v1/edge/registry`. @@ -901,6 +941,8 @@ struct AppStateInner { // (not replacing) the window-aggregated `tx` / `/ws/sensing` pipeline. intro: wifi_densepose_sensing_server::introspection::IntrospectionState, intro_tx: broadcast::Sender, + #[cfg(feature = "mqtt")] + vitals_tx: Option>, total_detections: u64, start_time: std::time::Instant, /// Vital sign detector (processes CSI frames to estimate HR/RR). @@ -5661,6 +5703,31 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) { if let Ok(json) = serde_json::to_string(&update) { let _ = s.tx.send(json); } + #[cfg(feature = "mqtt")] + if let Some(ref vtx) = s.vitals_tx { + if vtx.receiver_count() > 0 { + use wifi_densepose_sensing_server::mqtt::state::VitalsSnapshot; + let vs = &s.latest_vitals; + let snap = VitalsSnapshot { + node_id: "ruview".to_string(), + timestamp_ms: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64, + presence: update.classification.presence, + fall_detected: false, + motion: s.smoothed_motion, + motion_energy: update.features.motion_band_power, + presence_score: s.smoothed_person_score, + breathing_rate_bpm: vs.breathing_rate_bpm, + heartrate_bpm: vs.heart_rate_bpm, + n_persons: s.person_count() as u32, + rssi_dbm: update.nodes.first().map(|n| n.rssi_dbm), + vital_confidence: (vs.breathing_confidence + vs.heartbeat_confidence) / 2.0, + }; + let _ = vtx.send(snap); + } + } s.latest_update = Some(update); } } @@ -5692,6 +5759,34 @@ async fn broadcast_tick_task(state: SharedState, tick_ms: u64) { let _ = s.tx.send(json); } } + // ADR-115: feed the MQTT publisher (compiled in only when + // --features mqtt is active; the field and this block are + // stripped entirely in the default feature set). + #[cfg(feature = "mqtt")] + if let Some(ref vtx) = s.vitals_tx { + if vtx.receiver_count() > 0 { + use wifi_densepose_sensing_server::mqtt::state::VitalsSnapshot; + let vs = &s.latest_vitals; + let snap = VitalsSnapshot { + node_id: "ruview".to_string(), + timestamp_ms: std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64, + presence: update.classification.presence, + fall_detected: false, + motion: s.smoothed_motion, + motion_energy: update.features.motion_band_power, + presence_score: s.smoothed_person_score, + breathing_rate_bpm: vs.breathing_rate_bpm, + heartrate_bpm: vs.heart_rate_bpm, + n_persons: s.person_count() as u32, + rssi_dbm: update.nodes.first().map(|n| n.rssi_dbm), + vital_confidence: (vs.breathing_confidence + vs.heartbeat_confidence) / 2.0, + }; + let _ = vtx.send(snap); + } + } } } } @@ -6630,6 +6725,8 @@ async fn main() { tx, intro: wifi_densepose_sensing_server::introspection::IntrospectionState::new(), intro_tx, + #[cfg(feature = "mqtt")] + vitals_tx: None, total_detections: 0, start_time: std::time::Instant::now(), vital_detector: VitalSignDetector::new(vital_sample_rate), @@ -6727,6 +6824,80 @@ async fn main() { } } + // ADR-115: spawn the MQTT publisher when --mqtt / RUVIEW_MQTT=true is set. + // The publisher task owns connection lifecycle, HA discovery, and per-entity + // state publication. Errors during connect are retried internally by rumqttc. + #[cfg(feature = "mqtt")] + if args.mqtt { + use wifi_densepose_sensing_server::mqtt::{ + config::{MqttConfig, PublishRates, TlsConfig}, + publisher::{spawn as mqtt_spawn, OwnedDiscoveryBuilder}, + state::VitalsSnapshot, + }; + let password = std::env::var(&args.mqtt_password_env).ok(); + let port = args.mqtt_port.unwrap_or(if args.mqtt_tls { 8883 } else { 1883 }); + let tls = if !args.mqtt_tls { + TlsConfig::Off + } else { + match (args.mqtt_ca_file.as_ref(), args.mqtt_client_cert.as_ref(), args.mqtt_client_key.as_ref()) { + (Some(ca), Some(cert), Some(key)) => TlsConfig::MutualTls { + ca_file: ca.clone(), client_cert: cert.clone(), client_key: key.clone(), + }, + (Some(ca), None, None) => TlsConfig::PinnedCa { ca_file: ca.clone() }, + _ => TlsConfig::SystemTrust, + } + }; + let client_id = args.mqtt_client_id.clone() + .unwrap_or_else(|| format!("wifi-densepose-{}", std::process::id())); + let mqtt_cfg = MqttConfig { + host: args.mqtt_host.clone(), + port, + username: args.mqtt_username.clone(), + password, + client_id: client_id.clone(), + discovery_prefix: args.mqtt_prefix.clone(), + tls, + refresh_secs: args.mqtt_refresh_secs, + rates: PublishRates { + vitals_hz: args.mqtt_rate_vitals, + motion_hz: args.mqtt_rate_motion, + count_hz: args.mqtt_rate_count, + rssi_hz: args.mqtt_rate_rssi, + pose_hz: args.mqtt_rate_pose, + }, + publish_pose: args.mqtt_publish_pose, + privacy_mode: args.privacy_mode, + }; + let can_start = match mqtt_cfg.validate() { + Ok(()) => true, + Err(ref e) if !e.is_fatal() => { + warn!("[mqtt] {e}"); + true + } + Err(e) => { + error!("[mqtt] invalid config — MQTT disabled: {e}"); + false + } + }; + if can_start { + let (vtx, vrx) = broadcast::channel::(16); + { + let mut s = state.write().await; + s.vitals_tx = Some(vtx); + } + let builder = OwnedDiscoveryBuilder { + discovery_prefix: mqtt_cfg.discovery_prefix.clone(), + node_id: mqtt_cfg.client_id.clone(), + node_friendly_name: None, + sw_version: env!("CARGO_PKG_VERSION").to_string(), + model: "WiFi-DensePose".to_string(), + via_device: None, + }; + mqtt_spawn(Arc::new(mqtt_cfg), builder, vrx); + info!("[mqtt] publisher task spawned — connecting to broker"); + } + } + // ADR-050: Parse bind address once, use for all listeners let bind_ip: std::net::IpAddr = args .bind_addr