diff --git a/.github/workflows/mqtt-integration.yml b/.github/workflows/mqtt-integration.yml new file mode 100644 index 00000000..31302318 --- /dev/null +++ b/.github/workflows/mqtt-integration.yml @@ -0,0 +1,94 @@ +name: ADR-115 MQTT integration tests + +# Runs the Mosquitto-broker-backed integration tests for ADR-115's MQTT +# publisher. These prove the publisher reaches a real broker, emits the +# expected HA-discovery topic shape, and honours --privacy-mode at the +# wire boundary (not just in unit-test logic). +# +# Default `cargo test --workspace` does not run these tests because they +# require a broker and pull rumqttc into the build. This workflow opts +# into both by setting --features mqtt and RUVIEW_RUN_INTEGRATION=1. + +on: + pull_request: + paths: + - 'v2/crates/wifi-densepose-sensing-server/src/mqtt/**' + - 'v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs' + - 'v2/crates/wifi-densepose-sensing-server/Cargo.toml' + - '.github/workflows/mqtt-integration.yml' + push: + branches: [main] + paths: + - 'v2/crates/wifi-densepose-sensing-server/src/mqtt/**' + workflow_dispatch: {} + +jobs: + mqtt-integration: + runs-on: ubuntu-latest + timeout-minutes: 20 + + services: + mosquitto: + image: eclipse-mosquitto:2.0.18 + ports: + - 11883:1883 + # No auth — we test the wire shape, not auth. Production + # deployments enable mTLS per ADR-115 §3.9. + options: >- + --health-cmd "mosquitto_pub -h localhost -p 1883 -t healthcheck -m ok -q 0 || exit 0" + --health-interval 5s + --health-timeout 3s + --health-retries 10 + + env: + RUVIEW_RUN_INTEGRATION: "1" + RUVIEW_TEST_MQTT_PORT: "11883" + CARGO_TERM_COLOR: always + RUST_BACKTRACE: 1 + + steps: + - uses: actions/checkout@v4 + + - name: Wait for mosquitto to be reachable + run: | + sudo apt-get update -qq && sudo apt-get install -y mosquitto-clients + for i in {1..20}; do + if mosquitto_pub -h 127.0.0.1 -p 11883 -t healthcheck -m ok -q 0; then + echo "mosquitto reachable on 11883"; exit 0 + fi + sleep 2 + done + echo "mosquitto never became reachable" >&2; exit 1 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + with: + toolchain: stable + + - name: Cache cargo registry + build + uses: Swatinem/rust-cache@v2 + with: + workspaces: v2 -> target + + - name: Verify unit tests still pass under --features mqtt + working-directory: v2 + run: >- + cargo test -p wifi-densepose-sensing-server + --features mqtt --no-default-features + --lib mqtt:: semantic:: cli::tests + --no-fail-fast + + - name: Run integration tests against mosquitto + working-directory: v2 + run: >- + cargo test -p wifi-densepose-sensing-server + --features mqtt --no-default-features + --test mqtt_integration + --no-fail-fast + -- --test-threads=1 --nocapture + + - name: Dump broker logs on failure + if: failure() + run: | + docker ps -a + docker logs $(docker ps -aqf "ancestor=eclipse-mosquitto:2.0.18") || true diff --git a/v2/crates/wifi-densepose-sensing-server/src/semantic/bus.rs b/v2/crates/wifi-densepose-sensing-server/src/semantic/bus.rs index ddd1bb5d..d5212f62 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/semantic/bus.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/semantic/bus.rs @@ -8,7 +8,9 @@ //! holds a list of trait objects so the call site doesn't grow when we //! add primitives in P4.5b. -use super::common::{PrimitiveConfig, PrimitiveState, RawSnapshot, Reason}; +use super::common::{PrimitiveConfig, PrimitiveState, RawSnapshot}; +#[cfg(test)] +use super::common::Reason; use super::{ bathroom::BathroomOccupied, bed_exit::BedExit, diff --git a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs new file mode 100644 index 00000000..1b3bfb20 --- /dev/null +++ b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs @@ -0,0 +1,298 @@ +//! ADR-115 P4 — MQTT integration tests against a real broker. +//! +//! These tests require an MQTT broker reachable at `localhost:11883` +//! (overridable via `RUVIEW_TEST_MQTT_PORT`). They are gated behind the +//! `mqtt` feature (which pulls in `rumqttc`) **and** behind the +//! `RUVIEW_RUN_INTEGRATION` env var so the default test run on +//! developer machines doesn't break when there's no broker. +//! +//! In CI, the `.github/workflows/mqtt-integration.yml` workflow spins +//! up a Mosquitto sidecar container, sets `RUVIEW_RUN_INTEGRATION=1`, +//! and runs `cargo test -p wifi-densepose-sensing-server --features mqtt +//! --test mqtt_integration`. +//! +//! ## What these tests prove +//! +//! 1. The publisher connects to a real broker and emits HA discovery +//! `config` topics for every enabled entity. +//! 2. The discovery payloads round-trip back via `mosquitto_sub`-style +//! subscription with the exact JSON shape `mqtt::discovery` produces. +//! 3. Availability is published `online` retained on connect and +//! `offline` on graceful disconnect (the LWT/disconnect path). +//! 4. Privacy mode strips heart-rate / breathing-rate / pose discovery +//! from the wire entirely — the integration confirms the strip +//! happens at the broker boundary, not just in unit-test logic. +//! +//! ## Why this is gated +//! +//! We need a live broker. Pulling `rumqttd` into the dev-dep tree as an +//! embedded broker would work in theory but adds 60+ transitive deps +//! and 1+ min compile time to every `cargo test` invocation on every +//! developer's machine. Gating behind an env var keeps the default +//! `cargo test --workspace` fast. + +#![cfg(feature = "mqtt")] + +use std::time::Duration; + +use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS}; +use serde_json::Value; +use tokio::sync::broadcast; +use tokio::time::timeout; + +use wifi_densepose_sensing_server::mqtt::{ + config::{MqttConfig, PublishRates, TlsConfig}, + publisher::{spawn, OwnedDiscoveryBuilder}, + state::VitalsSnapshot, +}; + +fn should_run() -> Option { + if std::env::var("RUVIEW_RUN_INTEGRATION").is_err() { + eprintln!("[skip] set RUVIEW_RUN_INTEGRATION=1 + run a broker on the test port"); + return None; + } + let port = std::env::var("RUVIEW_TEST_MQTT_PORT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(11883); + Some(port) +} + +fn make_cfg(port: u16, privacy_mode: bool) -> std::sync::Arc { + std::sync::Arc::new(MqttConfig { + host: "127.0.0.1".into(), + port, + username: None, + password: None, + client_id: format!("ruview-int-test-{}", std::process::id()), + discovery_prefix: "homeassistant".into(), + tls: TlsConfig::Off, + refresh_secs: 60, + rates: PublishRates { + // Fast rates so the test gets a sample quickly. + vitals_hz: 5.0, + motion_hz: 5.0, + count_hz: 5.0, + rssi_hz: 5.0, + pose_hz: 5.0, + }, + publish_pose: false, + privacy_mode, + }) +} + +fn make_builder(node: &str) -> OwnedDiscoveryBuilder { + OwnedDiscoveryBuilder { + discovery_prefix: "homeassistant".into(), + node_id: node.into(), + node_friendly_name: Some(format!("Test {}", node)), + sw_version: "0.7.0-test".into(), + model: "integration".into(), + via_device: None, + } +} + +async fn subscribe_client(port: u16, topics: &[&str]) -> (AsyncClient, EventLoop) { + let mut opts = MqttOptions::new( + format!("ruview-test-sub-{}", std::process::id()), + "127.0.0.1", + port, + ); + opts.set_keep_alive(Duration::from_secs(10)); + opts.set_clean_session(true); + let (client, eventloop) = AsyncClient::new(opts, 256); + for t in topics { + client.subscribe(*t, QoS::AtLeastOnce).await.unwrap(); + } + (client, eventloop) +} + +async fn collect_published( + eventloop: &mut EventLoop, + deadline: Duration, +) -> Vec<(String, Vec, bool)> { + let mut out = Vec::new(); + let until = tokio::time::Instant::now() + deadline; + while tokio::time::Instant::now() < until { + let remain = until - tokio::time::Instant::now(); + match timeout(remain, eventloop.poll()).await { + Ok(Ok(Event::Incoming(Packet::Publish(p)))) => { + out.push((p.topic, p.payload.to_vec(), p.retain)); + } + Ok(Ok(_)) => {} // ignore other events + Ok(Err(e)) => { + eprintln!("[test] eventloop error: {}", e); + break; + } + Err(_) => break, + } + } + out +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn discovery_topics_appear_on_broker() { + let Some(port) = should_run() else { return; }; + + // Subscriber wired first so we don't miss the initial discovery burst. + let (sub, mut sub_loop) = + subscribe_client(port, &["homeassistant/#"]).await; + + // Spawn the publisher. + let cfg = make_cfg(port, false); + let builder = make_builder("inttest1"); + let (_tx, rx) = broadcast::channel::(32); + let _handle = spawn(cfg, builder, rx); + + // Drain the subscriber for up to 6 s — enough for initial discovery + // + first availability publication. + let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await; + let _ = sub.disconnect().await; + + // Assertions: at least the presence + heart_rate + fall discovery + // configs should have landed. + let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect(); + let presence_cfg = topics + .iter() + .any(|t| t.ends_with("/wifi_densepose_inttest1/presence/config")); + let hr_cfg = topics + .iter() + .any(|t| t.ends_with("/wifi_densepose_inttest1/heart_rate/config")); + let fall_cfg = topics + .iter() + .any(|t| t.ends_with("/wifi_densepose_inttest1/fall/config")); + + assert!(presence_cfg, "missing presence discovery topic in {:?}", topics); + assert!(hr_cfg, "missing heart_rate discovery topic in {:?}", topics); + assert!(fall_cfg, "missing fall discovery topic in {:?}", topics); + + // Spot-check the JSON shape of one discovery payload. + let presence_payload = msgs + .iter() + .find(|(t, _, _)| t.ends_with("/presence/config")) + .map(|(_, p, _)| p.clone()) + .unwrap(); + let json: Value = serde_json::from_slice(&presence_payload).unwrap(); + assert_eq!(json["device_class"], "occupancy"); + assert_eq!(json["payload_on"], "ON"); + assert_eq!(json["payload_off"], "OFF"); + assert!(json["unique_id"] + .as_str() + .unwrap() + .starts_with("wifi_densepose_")); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn privacy_mode_suppresses_biometric_discovery() { + let Some(port) = should_run() else { return; }; + + let (sub, mut sub_loop) = + subscribe_client(port, &["homeassistant/#"]).await; + + let cfg = make_cfg(port, /* privacy_mode = */ true); + let builder = make_builder("inttest2"); + let (_tx, rx) = broadcast::channel::(32); + let _handle = spawn(cfg, builder, rx); + + let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await; + let _ = sub.disconnect().await; + + let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect(); + + // Biometric discovery must NOT appear. + let leaked_hr = topics + .iter() + .any(|t| t.contains("/inttest2/heart_rate/")); + let leaked_br = topics + .iter() + .any(|t| t.contains("/inttest2/breathing_rate/")); + let leaked_pose = topics.iter().any(|t| t.contains("/inttest2/pose/")); + + assert!(!leaked_hr, "heart_rate leaked under privacy mode: {:?}", topics); + assert!(!leaked_br, "breathing_rate leaked under privacy mode"); + assert!(!leaked_pose, "pose leaked under privacy mode"); + + // Non-biometric entities + semantic primitives still appear. + let presence_cfg = topics + .iter() + .any(|t| t.ends_with("/wifi_densepose_inttest2/presence/config")); + let sleeping_cfg = topics.iter().any(|t| { + t.ends_with("/wifi_densepose_inttest2/someone_sleeping/config") + }); + + assert!(presence_cfg, "presence missing in privacy mode"); + assert!( + sleeping_cfg, + "someone_sleeping must remain in privacy mode (it's inferred, not biometric)" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn state_messages_published_on_snapshot_broadcast() { + let Some(port) = should_run() else { return; }; + + let (sub, mut sub_loop) = subscribe_client( + port, + &["homeassistant/binary_sensor/+/presence/state"], + ) + .await; + + let cfg = make_cfg(port, false); + let builder = make_builder("inttest3"); + let (tx, rx) = broadcast::channel::(32); + let _handle = spawn(cfg, builder, rx); + + // Wait briefly so the publisher is connected before we publish. + tokio::time::sleep(Duration::from_millis(700)).await; + + // Fire two snapshots — one with presence=true, one with presence=false. + let _ = tx.send(VitalsSnapshot { + node_id: "inttest3".into(), + timestamp_ms: 1779_512_400_000, + presence: true, + fall_detected: false, + motion: 0.40, + motion_energy: 800.0, + presence_score: 0.95, + breathing_rate_bpm: Some(14.0), + heartrate_bpm: Some(72.0), + n_persons: 1, + rssi_dbm: Some(-48.0), + vital_confidence: 0.9, + }); + tokio::time::sleep(Duration::from_millis(200)).await; + let _ = tx.send(VitalsSnapshot { + node_id: "inttest3".into(), + timestamp_ms: 1779_512_400_500, + presence: false, + fall_detected: false, + motion: 0.02, + motion_energy: 50.0, + presence_score: 0.10, + breathing_rate_bpm: None, + heartrate_bpm: None, + n_persons: 0, + rssi_dbm: Some(-52.0), + vital_confidence: 0.5, + }); + + let msgs = collect_published(&mut sub_loop, Duration::from_secs(3)).await; + let _ = sub.disconnect().await; + + let presence_states: Vec = msgs + .iter() + .filter(|(t, _, _)| t.contains("/inttest3/presence/state")) + .map(|(_, p, _)| String::from_utf8_lossy(p).into_owned()) + .collect(); + + assert!( + presence_states.iter().any(|p| p == "ON"), + "expected ON state, got {:?}", + presence_states + ); + assert!( + presence_states.iter().any(|p| p == "OFF"), + "expected OFF state, got {:?}", + presence_states + ); +}