fix(sensing-server): wire MQTT publisher into the binary — closes #872

#872 reported '--mqtt: unexpected argument' on the Docker image; prior
attempts chased a Docker *rebuild*, but the real cause was disconnected
*code*: the --mqtt* flags lived only in cli::Args (dead code — referenced
nowhere), while the binary parses a separate main::Args with no mqtt fields,
and main.rs never declared/started the mqtt:: publisher. So MQTT was fully
unwired: flags didn't parse, and the publisher never ran.

Fix:
- Extract the mqtt + privacy flags into a shared
  (#[derive(clap::Args)]); retarget mqtt::config::{from_args,build_tls} to it.
- #[command(flatten)] MqttArgs into the binary's main::Args (using the *lib*
  crate's type so it matches from_args), so --mqtt* now parse.
- Spawn the publisher on --mqtt: build MqttConfig, validate, and bridge the
  existing JSON sensing broadcast into the typed VitalsSnapshot stream the
  publisher consumes (defensive serde_json::Value mapping — absent fields
  default, never wrong values). #[cfg(feature=mqtt)]-gated; without the
  feature --mqtt WARNs and no-ops (documented contract). Fix the
  mqtt_publisher example for the new signature.

Verified end-to-end against local mosquitto: publisher connects and emits
20 HA auto-discovery entities + live state (presence ON, person_count, …).
Tests: 577 pass default / 580 pass --features mqtt / 0 fail; both configs
build.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-31 09:39:21 -04:00
parent edbe57378a
commit a3f80b0cda
4 changed files with 188 additions and 6 deletions

View File

@ -47,7 +47,7 @@ use tokio::sync::broadcast;
#[cfg(feature = "mqtt")]
use tracing::info;
#[cfg(feature = "mqtt")]
use wifi_densepose_sensing_server::cli::Args;
use wifi_densepose_sensing_server::cli::MqttArgs;
#[cfg(feature = "mqtt")]
use wifi_densepose_sensing_server::mqtt::{
config::MqttConfig,
@ -61,7 +61,15 @@ use wifi_densepose_sensing_server::mqtt::{
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let args = Args::parse();
let args = {
use clap::Parser;
#[derive(Parser)]
struct W {
#[command(flatten)]
m: MqttArgs,
}
W::parse().m
};
if !args.mqtt {
eprintln!("This example requires --mqtt. Aborting.");

View File

@ -3,6 +3,89 @@
use clap::Parser;
use std::path::PathBuf;
/// MQTT publisher (HA auto-discovery) + privacy-mode flags, shared via
/// `#[command(flatten)]` by both `cli::Args` and the binary's `main::Args`
/// so the `--mqtt*` flags reach the actual `Args::parse()` the server uses
/// (the publisher in `mqtt::` is keyed off this group). ADR-115 §3.8/§3.10.
#[derive(clap::Args, Debug, Clone)]
pub struct MqttArgs {
/// Enable MQTT publisher with HA auto-discovery
#[arg(long, env = "RUVIEW_MQTT")]
pub mqtt: bool,
/// MQTT broker host
#[arg(long, env = "RUVIEW_MQTT_HOST", default_value = "localhost")]
pub mqtt_host: String,
/// MQTT broker port (defaults: 1883 plain / 8883 with TLS)
#[arg(long, env = "RUVIEW_MQTT_PORT")]
pub mqtt_port: Option<u16>,
/// MQTT username
#[arg(long, env = "RUVIEW_MQTT_USERNAME")]
pub mqtt_username: Option<String>,
/// Environment variable holding the MQTT password
#[arg(long, default_value = "MQTT_PASSWORD")]
pub mqtt_password_env: String,
/// MQTT client ID (default: wifi-densepose-<pid>)
#[arg(long, env = "RUVIEW_MQTT_CLIENT_ID")]
pub mqtt_client_id: Option<String>,
/// Discovery topic prefix (ADR-115 §9.2 — accepted: `homeassistant`)
#[arg(long, env = "RUVIEW_MQTT_PREFIX", default_value = "homeassistant")]
pub mqtt_prefix: String,
/// Enable TLS to the broker
#[arg(long, env = "RUVIEW_MQTT_TLS")]
pub mqtt_tls: bool,
/// CA bundle for TLS
#[arg(long, value_name = "PATH")]
pub mqtt_ca_file: Option<PathBuf>,
/// Client certificate for mTLS
#[arg(long, value_name = "PATH")]
pub mqtt_client_cert: Option<PathBuf>,
/// Client key for mTLS
#[arg(long, value_name = "PATH")]
pub mqtt_client_key: Option<PathBuf>,
/// Discovery refresh interval (seconds)
#[arg(long, default_value = "600")]
pub mqtt_refresh_secs: u64,
/// Vitals publish rate (Hz) — HR/BR
#[arg(long, default_value = "0.2")]
pub mqtt_rate_vitals: f64,
/// Motion publish rate (Hz)
#[arg(long, default_value = "1.0")]
pub mqtt_rate_motion: f64,
/// Person count publish rate (Hz)
#[arg(long, default_value = "1.0")]
pub mqtt_rate_count: f64,
/// RSSI publish rate (Hz)
#[arg(long, default_value = "0.1")]
pub mqtt_rate_rssi: f64,
/// Publish pose keypoints over MQTT (off by default for bandwidth)
#[arg(long)]
pub mqtt_publish_pose: bool,
/// Pose publish rate (Hz) when --mqtt-publish-pose is set
#[arg(long, default_value = "1.0")]
pub mqtt_rate_pose: f64,
/// Strip biometrics (HR/BR/pose) before any MQTT/Matter publish (ADR-115 §3.10).
#[arg(long, env = "RUVIEW_PRIVACY_MODE")]
pub privacy_mode: bool,
}
/// CLI arguments for the sensing server.
#[derive(Parser, Debug)]
#[command(name = "sensing-server", about = "WiFi-DensePose sensing server")]

View File

@ -108,6 +108,13 @@ struct Args {
#[arg(long)]
disable_host_validation: bool,
/// MQTT publisher (HA auto-discovery) + privacy-mode flags (ADR-115).
/// Flattened so `--mqtt*` reach the binary's parser and the publisher
/// in `mqtt::` is actually started (fixes #872). Uses the *lib* crate's
/// `MqttArgs` type so it's compatible with `mqtt::config::from_args`.
#[command(flatten)]
mqtt_opts: wifi_densepose_sensing_server::cli::MqttArgs,
/// Data source: auto, wifi, esp32, simulate
#[arg(long, default_value = "auto")]
source: String,
@ -5985,6 +5992,84 @@ async fn main() {
// consumed by `/ws/introspection`. Same ring size as `tx` (256) — slow
// clients drop oldest, identical backpressure shape.
let (intro_tx, _) = broadcast::channel::<String>(256);
// #872: actually start the MQTT publisher when `--mqtt` is set. The publisher
// (mqtt::) consumes a typed VitalsSnapshot stream; we bridge the existing JSON
// sensing broadcast into it with a defensive serde_json::Value mapping (absent
// fields default — never publish wrong values). Gated on the `mqtt` feature
// (the Docker image is built `--features mqtt`); without it `--mqtt` WARNs and
// no-ops, matching the documented contract.
if args.mqtt_opts.mqtt {
#[cfg(feature = "mqtt")]
{
use wifi_densepose_sensing_server::mqtt;
let mcfg = std::sync::Arc::new(mqtt::config::MqttConfig::from_args(&args.mqtt_opts));
match mcfg.validate() {
Ok(()) => {
let node_id = mcfg.client_id.clone();
let builder = mqtt::publisher::OwnedDiscoveryBuilder {
discovery_prefix: mcfg.discovery_prefix.clone(),
node_id: node_id.clone(),
node_friendly_name: Some("RuView".to_string()),
sw_version: env!("CARGO_PKG_VERSION").to_string(),
model: "RuView WiFi Sensing".to_string(),
via_device: None,
};
let (vtx, vrx) = broadcast::channel::<mqtt::state::VitalsSnapshot>(64);
let (host, port) = (mcfg.host.clone(), mcfg.port);
mqtt::publisher::spawn(mcfg, builder, vrx);
let mut jrx = tx.subscribe();
tokio::spawn(async move {
while let Ok(json) = jrx.recv().await {
let Ok(v) = serde_json::from_str::<serde_json::Value>(&json) else {
continue;
};
let cls = &v["classification"];
let vit = &v["vital_signs"];
let presence = cls["presence"].as_bool().unwrap_or(false);
let n_persons = v["persons"]
.as_array()
.map(|a| a.len() as u32)
.or_else(|| v["estimated_persons"].as_u64().map(|x| x as u32))
.unwrap_or(0);
let motion = match cls["motion_level"].as_str() {
Some("none") | Some("still") | Some("idle") | Some("") => 0.0,
Some(_) => 1.0,
None => 0.0,
};
let snap = mqtt::state::VitalsSnapshot {
node_id: node_id.clone(),
timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64,
presence,
motion,
presence_score: if presence {
cls["confidence"].as_f64().unwrap_or(1.0)
} else {
0.0
},
breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(),
heartrate_bpm: vit["heart_rate_bpm"].as_f64(),
n_persons,
rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(),
vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0),
..Default::default()
};
let _ = vtx.send(snap);
}
});
tracing::info!("MQTT publisher started -> {host}:{port}");
}
Err(e) => tracing::error!("MQTT config invalid: {e}; publisher not started"),
}
}
#[cfg(not(feature = "mqtt"))]
tracing::warn!(
"--mqtt set but this binary was built without the `mqtt` feature; the publisher is a \
no-op. Use the official Docker image (built `--features mqtt`) or rebuild with \
`cargo build -p wifi-densepose-sensing-server --features mqtt`."
);
}
let state: SharedState = Arc::new(RwLock::new(AppStateInner {
latest_update: None,
rssi_history: VecDeque::new(),

View File

@ -63,7 +63,7 @@ impl MqttConfig {
/// `hostname()` via the `gethostname` crate if `mqtt_client_id` was
/// not supplied — we don't add a dep here, we let the publisher
/// supply the default lazily.
pub fn from_args(args: &crate::cli::Args) -> Self {
pub fn from_args(args: &crate::cli::MqttArgs) -> Self {
let password = std::env::var(&args.mqtt_password_env).ok();
let port = args.mqtt_port.unwrap_or(if args.mqtt_tls { 8883 } else { 1883 });
let tls = build_tls(args);
@ -135,7 +135,7 @@ impl MqttConfig {
}
}
fn build_tls(args: &crate::cli::Args) -> TlsConfig {
fn build_tls(args: &crate::cli::MqttArgs) -> TlsConfig {
if !args.mqtt_tls {
return TlsConfig::Off;
}
@ -186,8 +186,14 @@ mod tests {
use super::*;
use clap::Parser;
fn parse(args: &[&str]) -> crate::cli::Args {
crate::cli::Args::parse_from(std::iter::once("sensing-server").chain(args.iter().copied()))
fn parse(args: &[&str]) -> crate::cli::MqttArgs {
use clap::Parser;
#[derive(Parser)]
struct W {
#[command(flatten)]
m: crate::cli::MqttArgs,
}
W::parse_from(std::iter::once("sensing-server").chain(args.iter().copied())).m
}
#[test]