diff --git a/v2/crates/wifi-densepose-sensing-server/examples/mqtt_publisher.rs b/v2/crates/wifi-densepose-sensing-server/examples/mqtt_publisher.rs index 1e81c5f9..b1a9ee0d 100644 --- a/v2/crates/wifi-densepose-sensing-server/examples/mqtt_publisher.rs +++ b/v2/crates/wifi-densepose-sensing-server/examples/mqtt_publisher.rs @@ -47,7 +47,7 @@ use tokio::sync::broadcast; #[cfg(feature = "mqtt")] use tracing::info; #[cfg(feature = "mqtt")] -use wifi_densepose_sensing_server::cli::Args; +use wifi_densepose_sensing_server::cli::MqttArgs; #[cfg(feature = "mqtt")] use wifi_densepose_sensing_server::mqtt::{ config::MqttConfig, @@ -61,7 +61,15 @@ use wifi_densepose_sensing_server::mqtt::{ async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); - let args = Args::parse(); + let args = { + use clap::Parser; + #[derive(Parser)] + struct W { + #[command(flatten)] + m: MqttArgs, + } + W::parse().m + }; if !args.mqtt { eprintln!("This example requires --mqtt. Aborting."); diff --git a/v2/crates/wifi-densepose-sensing-server/src/cli.rs b/v2/crates/wifi-densepose-sensing-server/src/cli.rs index 0a773626..7ec447bd 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/cli.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/cli.rs @@ -3,6 +3,89 @@ use clap::Parser; use std::path::PathBuf; +/// MQTT publisher (HA auto-discovery) + privacy-mode flags, shared via +/// `#[command(flatten)]` by both `cli::Args` and the binary's `main::Args` +/// so the `--mqtt*` flags reach the actual `Args::parse()` the server uses +/// (the publisher in `mqtt::` is keyed off this group). ADR-115 §3.8/§3.10. +#[derive(clap::Args, Debug, Clone)] +pub struct MqttArgs { + /// Enable MQTT publisher with HA auto-discovery + #[arg(long, env = "RUVIEW_MQTT")] + pub mqtt: bool, + + /// MQTT broker host + #[arg(long, env = "RUVIEW_MQTT_HOST", default_value = "localhost")] + pub mqtt_host: String, + + /// MQTT broker port (defaults: 1883 plain / 8883 with TLS) + #[arg(long, env = "RUVIEW_MQTT_PORT")] + pub mqtt_port: Option, + + /// MQTT username + #[arg(long, env = "RUVIEW_MQTT_USERNAME")] + pub mqtt_username: Option, + + /// Environment variable holding the MQTT password + #[arg(long, default_value = "MQTT_PASSWORD")] + pub mqtt_password_env: String, + + /// MQTT client ID (default: wifi-densepose-) + #[arg(long, env = "RUVIEW_MQTT_CLIENT_ID")] + pub mqtt_client_id: Option, + + /// Discovery topic prefix (ADR-115 §9.2 — accepted: `homeassistant`) + #[arg(long, env = "RUVIEW_MQTT_PREFIX", default_value = "homeassistant")] + pub mqtt_prefix: String, + + /// Enable TLS to the broker + #[arg(long, env = "RUVIEW_MQTT_TLS")] + pub mqtt_tls: bool, + + /// CA bundle for TLS + #[arg(long, value_name = "PATH")] + pub mqtt_ca_file: Option, + + /// Client certificate for mTLS + #[arg(long, value_name = "PATH")] + pub mqtt_client_cert: Option, + + /// Client key for mTLS + #[arg(long, value_name = "PATH")] + pub mqtt_client_key: Option, + + /// Discovery refresh interval (seconds) + #[arg(long, default_value = "600")] + pub mqtt_refresh_secs: u64, + + /// Vitals publish rate (Hz) — HR/BR + #[arg(long, default_value = "0.2")] + pub mqtt_rate_vitals: f64, + + /// Motion publish rate (Hz) + #[arg(long, default_value = "1.0")] + pub mqtt_rate_motion: f64, + + /// Person count publish rate (Hz) + #[arg(long, default_value = "1.0")] + pub mqtt_rate_count: f64, + + /// RSSI publish rate (Hz) + #[arg(long, default_value = "0.1")] + pub mqtt_rate_rssi: f64, + + /// Publish pose keypoints over MQTT (off by default for bandwidth) + #[arg(long)] + pub mqtt_publish_pose: bool, + + /// Pose publish rate (Hz) when --mqtt-publish-pose is set + #[arg(long, default_value = "1.0")] + pub mqtt_rate_pose: f64, + + /// Strip biometrics (HR/BR/pose) before any MQTT/Matter publish (ADR-115 §3.10). + #[arg(long, env = "RUVIEW_PRIVACY_MODE")] + pub privacy_mode: bool, +} + /// CLI arguments for the sensing server. #[derive(Parser, Debug)] #[command(name = "sensing-server", about = "WiFi-DensePose sensing server")] diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index 03e459ea..041ee55f 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -108,6 +108,13 @@ struct Args { #[arg(long)] disable_host_validation: bool, + /// MQTT publisher (HA auto-discovery) + privacy-mode flags (ADR-115). + /// Flattened so `--mqtt*` reach the binary's parser and the publisher + /// in `mqtt::` is actually started (fixes #872). Uses the *lib* crate's + /// `MqttArgs` type so it's compatible with `mqtt::config::from_args`. + #[command(flatten)] + mqtt_opts: wifi_densepose_sensing_server::cli::MqttArgs, + /// Data source: auto, wifi, esp32, simulate #[arg(long, default_value = "auto")] source: String, @@ -5985,6 +5992,84 @@ async fn main() { // consumed by `/ws/introspection`. Same ring size as `tx` (256) — slow // clients drop oldest, identical backpressure shape. let (intro_tx, _) = broadcast::channel::(256); + + // #872: actually start the MQTT publisher when `--mqtt` is set. The publisher + // (mqtt::) consumes a typed VitalsSnapshot stream; we bridge the existing JSON + // sensing broadcast into it with a defensive serde_json::Value mapping (absent + // fields default — never publish wrong values). Gated on the `mqtt` feature + // (the Docker image is built `--features mqtt`); without it `--mqtt` WARNs and + // no-ops, matching the documented contract. + if args.mqtt_opts.mqtt { + #[cfg(feature = "mqtt")] + { + use wifi_densepose_sensing_server::mqtt; + let mcfg = std::sync::Arc::new(mqtt::config::MqttConfig::from_args(&args.mqtt_opts)); + match mcfg.validate() { + Ok(()) => { + let node_id = mcfg.client_id.clone(); + let builder = mqtt::publisher::OwnedDiscoveryBuilder { + discovery_prefix: mcfg.discovery_prefix.clone(), + node_id: node_id.clone(), + node_friendly_name: Some("RuView".to_string()), + sw_version: env!("CARGO_PKG_VERSION").to_string(), + model: "RuView WiFi Sensing".to_string(), + via_device: None, + }; + let (vtx, vrx) = broadcast::channel::(64); + let (host, port) = (mcfg.host.clone(), mcfg.port); + mqtt::publisher::spawn(mcfg, builder, vrx); + let mut jrx = tx.subscribe(); + tokio::spawn(async move { + while let Ok(json) = jrx.recv().await { + let Ok(v) = serde_json::from_str::(&json) else { + continue; + }; + let cls = &v["classification"]; + let vit = &v["vital_signs"]; + let presence = cls["presence"].as_bool().unwrap_or(false); + let n_persons = v["persons"] + .as_array() + .map(|a| a.len() as u32) + .or_else(|| v["estimated_persons"].as_u64().map(|x| x as u32)) + .unwrap_or(0); + let motion = match cls["motion_level"].as_str() { + Some("none") | Some("still") | Some("idle") | Some("") => 0.0, + Some(_) => 1.0, + None => 0.0, + }; + let snap = mqtt::state::VitalsSnapshot { + node_id: node_id.clone(), + timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64, + presence, + motion, + presence_score: if presence { + cls["confidence"].as_f64().unwrap_or(1.0) + } else { + 0.0 + }, + breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(), + heartrate_bpm: vit["heart_rate_bpm"].as_f64(), + n_persons, + rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(), + vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0), + ..Default::default() + }; + let _ = vtx.send(snap); + } + }); + tracing::info!("MQTT publisher started -> {host}:{port}"); + } + Err(e) => tracing::error!("MQTT config invalid: {e}; publisher not started"), + } + } + #[cfg(not(feature = "mqtt"))] + tracing::warn!( + "--mqtt set but this binary was built without the `mqtt` feature; the publisher is a \ + no-op. Use the official Docker image (built `--features mqtt`) or rebuild with \ + `cargo build -p wifi-densepose-sensing-server --features mqtt`." + ); + } + let state: SharedState = Arc::new(RwLock::new(AppStateInner { latest_update: None, rssi_history: VecDeque::new(), diff --git a/v2/crates/wifi-densepose-sensing-server/src/mqtt/config.rs b/v2/crates/wifi-densepose-sensing-server/src/mqtt/config.rs index 6d0cca53..a430125b 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/mqtt/config.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/mqtt/config.rs @@ -63,7 +63,7 @@ impl MqttConfig { /// `hostname()` via the `gethostname` crate if `mqtt_client_id` was /// not supplied — we don't add a dep here, we let the publisher /// supply the default lazily. - pub fn from_args(args: &crate::cli::Args) -> Self { + pub fn from_args(args: &crate::cli::MqttArgs) -> Self { let password = std::env::var(&args.mqtt_password_env).ok(); let port = args.mqtt_port.unwrap_or(if args.mqtt_tls { 8883 } else { 1883 }); let tls = build_tls(args); @@ -135,7 +135,7 @@ impl MqttConfig { } } -fn build_tls(args: &crate::cli::Args) -> TlsConfig { +fn build_tls(args: &crate::cli::MqttArgs) -> TlsConfig { if !args.mqtt_tls { return TlsConfig::Off; } @@ -186,8 +186,14 @@ mod tests { use super::*; use clap::Parser; - fn parse(args: &[&str]) -> crate::cli::Args { - crate::cli::Args::parse_from(std::iter::once("sensing-server").chain(args.iter().copied())) + fn parse(args: &[&str]) -> crate::cli::MqttArgs { + use clap::Parser; + #[derive(Parser)] + struct W { + #[command(flatten)] + m: crate::cli::MqttArgs, + } + W::parse_from(std::iter::once("sensing-server").chain(args.iter().copied())).m } #[test]