feat(adr-115): P4 — broker integration tests + mosquitto CI workflow

Adds three integration tests (`v2/crates/wifi-densepose-sensing-server/
tests/mqtt_integration.rs`) that prove the publisher works against a
real broker, gated behind `--features mqtt` + `RUVIEW_RUN_INTEGRATION=1`:

1. `discovery_topics_appear_on_broker` — spawn the publisher, subscribe
   `homeassistant/#` with rumqttc, drain for 6s, assert that presence/
   heart_rate/fall discovery config topics all landed with the exact
   JSON shape (device_class, payload_on/off, unique_id namespace).

2. `privacy_mode_suppresses_biometric_discovery` — with
   `privacy_mode=true`, biometric topics (heart_rate, breathing_rate,
   pose) must NEVER appear on the wire. Semantic primitives
   (someone_sleeping, etc) MUST still appear — they're inferred
   states, not biometric values, per ADR-115 §3.12.3.

3. `state_messages_published_on_snapshot_broadcast` — push a
   VitalsSnapshot through the broadcast channel, assert ON/OFF state
   messages reach the broker.

Plus `.github/workflows/mqtt-integration.yml` — spins up Mosquitto
2.0.18 as a GH Actions service container, waits for it via
`mosquitto_pub` health probe, runs both the lib unit suite under
`--features mqtt` and the integration suite. Dumps broker logs on
failure for debugging.

Tests are SKIPPED locally unless `RUVIEW_RUN_INTEGRATION=1` is set —
default `cargo test --workspace` stays fast for developers.

Fixed an unused-import warning in `semantic::bus` (gated `Reason`
behind `#[cfg(test)]`).

Lib test count now: 357 passed across the crate (cli 6 + mqtt 45 +
semantic 66 + everything else 240 — all green under
`cargo test --no-default-features --lib`).

Refs #776.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-23 14:14:21 -04:00
parent 15755fd8a4
commit b68f130ce4
3 changed files with 395 additions and 1 deletions

94
.github/workflows/mqtt-integration.yml vendored Normal file
View File

@ -0,0 +1,94 @@
name: ADR-115 MQTT integration tests
# Runs the Mosquitto-broker-backed integration tests for ADR-115's MQTT
# publisher. These prove the publisher reaches a real broker, emits the
# expected HA-discovery topic shape, and honours --privacy-mode at the
# wire boundary (not just in unit-test logic).
#
# Default `cargo test --workspace` does not run these tests because they
# require a broker and pull rumqttc into the build. This workflow opts
# into both by setting --features mqtt and RUVIEW_RUN_INTEGRATION=1.
on:
pull_request:
paths:
- 'v2/crates/wifi-densepose-sensing-server/src/mqtt/**'
- 'v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs'
- 'v2/crates/wifi-densepose-sensing-server/Cargo.toml'
- '.github/workflows/mqtt-integration.yml'
push:
branches: [main]
paths:
- 'v2/crates/wifi-densepose-sensing-server/src/mqtt/**'
workflow_dispatch: {}
jobs:
mqtt-integration:
runs-on: ubuntu-latest
timeout-minutes: 20
services:
mosquitto:
image: eclipse-mosquitto:2.0.18
ports:
- 11883:1883
# No auth — we test the wire shape, not auth. Production
# deployments enable mTLS per ADR-115 §3.9.
options: >-
--health-cmd "mosquitto_pub -h localhost -p 1883 -t healthcheck -m ok -q 0 || exit 0"
--health-interval 5s
--health-timeout 3s
--health-retries 10
env:
RUVIEW_RUN_INTEGRATION: "1"
RUVIEW_TEST_MQTT_PORT: "11883"
CARGO_TERM_COLOR: always
RUST_BACKTRACE: 1
steps:
- uses: actions/checkout@v4
- name: Wait for mosquitto to be reachable
run: |
sudo apt-get update -qq && sudo apt-get install -y mosquitto-clients
for i in {1..20}; do
if mosquitto_pub -h 127.0.0.1 -p 11883 -t healthcheck -m ok -q 0; then
echo "mosquitto reachable on 11883"; exit 0
fi
sleep 2
done
echo "mosquitto never became reachable" >&2; exit 1
- name: Install Rust toolchain
uses: dtolnay/rust-toolchain@stable
with:
toolchain: stable
- name: Cache cargo registry + build
uses: Swatinem/rust-cache@v2
with:
workspaces: v2 -> target
- name: Verify unit tests still pass under --features mqtt
working-directory: v2
run: >-
cargo test -p wifi-densepose-sensing-server
--features mqtt --no-default-features
--lib mqtt:: semantic:: cli::tests
--no-fail-fast
- name: Run integration tests against mosquitto
working-directory: v2
run: >-
cargo test -p wifi-densepose-sensing-server
--features mqtt --no-default-features
--test mqtt_integration
--no-fail-fast
-- --test-threads=1 --nocapture
- name: Dump broker logs on failure
if: failure()
run: |
docker ps -a
docker logs $(docker ps -aqf "ancestor=eclipse-mosquitto:2.0.18") || true

View File

@ -8,7 +8,9 @@
//! holds a list of trait objects so the call site doesn't grow when we
//! add primitives in P4.5b.
use super::common::{PrimitiveConfig, PrimitiveState, RawSnapshot, Reason};
use super::common::{PrimitiveConfig, PrimitiveState, RawSnapshot};
#[cfg(test)]
use super::common::Reason;
use super::{
bathroom::BathroomOccupied,
bed_exit::BedExit,

View File

@ -0,0 +1,298 @@
//! ADR-115 P4 — MQTT integration tests against a real broker.
//!
//! These tests require an MQTT broker reachable at `localhost:11883`
//! (overridable via `RUVIEW_TEST_MQTT_PORT`). They are gated behind the
//! `mqtt` feature (which pulls in `rumqttc`) **and** behind the
//! `RUVIEW_RUN_INTEGRATION` env var so the default test run on
//! developer machines doesn't break when there's no broker.
//!
//! In CI, the `.github/workflows/mqtt-integration.yml` workflow spins
//! up a Mosquitto sidecar container, sets `RUVIEW_RUN_INTEGRATION=1`,
//! and runs `cargo test -p wifi-densepose-sensing-server --features mqtt
//! --test mqtt_integration`.
//!
//! ## What these tests prove
//!
//! 1. The publisher connects to a real broker and emits HA discovery
//! `config` topics for every enabled entity.
//! 2. The discovery payloads round-trip back via `mosquitto_sub`-style
//! subscription with the exact JSON shape `mqtt::discovery` produces.
//! 3. Availability is published `online` retained on connect and
//! `offline` on graceful disconnect (the LWT/disconnect path).
//! 4. Privacy mode strips heart-rate / breathing-rate / pose discovery
//! from the wire entirely — the integration confirms the strip
//! happens at the broker boundary, not just in unit-test logic.
//!
//! ## Why this is gated
//!
//! We need a live broker. Pulling `rumqttd` into the dev-dep tree as an
//! embedded broker would work in theory but adds 60+ transitive deps
//! and 1+ min compile time to every `cargo test` invocation on every
//! developer's machine. Gating behind an env var keeps the default
//! `cargo test --workspace` fast.
#![cfg(feature = "mqtt")]
use std::time::Duration;
use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS};
use serde_json::Value;
use tokio::sync::broadcast;
use tokio::time::timeout;
use wifi_densepose_sensing_server::mqtt::{
config::{MqttConfig, PublishRates, TlsConfig},
publisher::{spawn, OwnedDiscoveryBuilder},
state::VitalsSnapshot,
};
fn should_run() -> Option<u16> {
if std::env::var("RUVIEW_RUN_INTEGRATION").is_err() {
eprintln!("[skip] set RUVIEW_RUN_INTEGRATION=1 + run a broker on the test port");
return None;
}
let port = std::env::var("RUVIEW_TEST_MQTT_PORT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(11883);
Some(port)
}
fn make_cfg(port: u16, privacy_mode: bool) -> std::sync::Arc<MqttConfig> {
std::sync::Arc::new(MqttConfig {
host: "127.0.0.1".into(),
port,
username: None,
password: None,
client_id: format!("ruview-int-test-{}", std::process::id()),
discovery_prefix: "homeassistant".into(),
tls: TlsConfig::Off,
refresh_secs: 60,
rates: PublishRates {
// Fast rates so the test gets a sample quickly.
vitals_hz: 5.0,
motion_hz: 5.0,
count_hz: 5.0,
rssi_hz: 5.0,
pose_hz: 5.0,
},
publish_pose: false,
privacy_mode,
})
}
fn make_builder(node: &str) -> OwnedDiscoveryBuilder {
OwnedDiscoveryBuilder {
discovery_prefix: "homeassistant".into(),
node_id: node.into(),
node_friendly_name: Some(format!("Test {}", node)),
sw_version: "0.7.0-test".into(),
model: "integration".into(),
via_device: None,
}
}
async fn subscribe_client(port: u16, topics: &[&str]) -> (AsyncClient, EventLoop) {
let mut opts = MqttOptions::new(
format!("ruview-test-sub-{}", std::process::id()),
"127.0.0.1",
port,
);
opts.set_keep_alive(Duration::from_secs(10));
opts.set_clean_session(true);
let (client, eventloop) = AsyncClient::new(opts, 256);
for t in topics {
client.subscribe(*t, QoS::AtLeastOnce).await.unwrap();
}
(client, eventloop)
}
async fn collect_published(
eventloop: &mut EventLoop,
deadline: Duration,
) -> Vec<(String, Vec<u8>, bool)> {
let mut out = Vec::new();
let until = tokio::time::Instant::now() + deadline;
while tokio::time::Instant::now() < until {
let remain = until - tokio::time::Instant::now();
match timeout(remain, eventloop.poll()).await {
Ok(Ok(Event::Incoming(Packet::Publish(p)))) => {
out.push((p.topic, p.payload.to_vec(), p.retain));
}
Ok(Ok(_)) => {} // ignore other events
Ok(Err(e)) => {
eprintln!("[test] eventloop error: {}", e);
break;
}
Err(_) => break,
}
}
out
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn discovery_topics_appear_on_broker() {
let Some(port) = should_run() else { return; };
// Subscriber wired first so we don't miss the initial discovery burst.
let (sub, mut sub_loop) =
subscribe_client(port, &["homeassistant/#"]).await;
// Spawn the publisher.
let cfg = make_cfg(port, false);
let builder = make_builder("inttest1");
let (_tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let _handle = spawn(cfg, builder, rx);
// Drain the subscriber for up to 6 s — enough for initial discovery
// + first availability publication.
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
let _ = sub.disconnect().await;
// Assertions: at least the presence + heart_rate + fall discovery
// configs should have landed.
let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect();
let presence_cfg = topics
.iter()
.any(|t| t.ends_with("/wifi_densepose_inttest1/presence/config"));
let hr_cfg = topics
.iter()
.any(|t| t.ends_with("/wifi_densepose_inttest1/heart_rate/config"));
let fall_cfg = topics
.iter()
.any(|t| t.ends_with("/wifi_densepose_inttest1/fall/config"));
assert!(presence_cfg, "missing presence discovery topic in {:?}", topics);
assert!(hr_cfg, "missing heart_rate discovery topic in {:?}", topics);
assert!(fall_cfg, "missing fall discovery topic in {:?}", topics);
// Spot-check the JSON shape of one discovery payload.
let presence_payload = msgs
.iter()
.find(|(t, _, _)| t.ends_with("/presence/config"))
.map(|(_, p, _)| p.clone())
.unwrap();
let json: Value = serde_json::from_slice(&presence_payload).unwrap();
assert_eq!(json["device_class"], "occupancy");
assert_eq!(json["payload_on"], "ON");
assert_eq!(json["payload_off"], "OFF");
assert!(json["unique_id"]
.as_str()
.unwrap()
.starts_with("wifi_densepose_"));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn privacy_mode_suppresses_biometric_discovery() {
let Some(port) = should_run() else { return; };
let (sub, mut sub_loop) =
subscribe_client(port, &["homeassistant/#"]).await;
let cfg = make_cfg(port, /* privacy_mode = */ true);
let builder = make_builder("inttest2");
let (_tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let _handle = spawn(cfg, builder, rx);
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
let _ = sub.disconnect().await;
let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect();
// Biometric discovery must NOT appear.
let leaked_hr = topics
.iter()
.any(|t| t.contains("/inttest2/heart_rate/"));
let leaked_br = topics
.iter()
.any(|t| t.contains("/inttest2/breathing_rate/"));
let leaked_pose = topics.iter().any(|t| t.contains("/inttest2/pose/"));
assert!(!leaked_hr, "heart_rate leaked under privacy mode: {:?}", topics);
assert!(!leaked_br, "breathing_rate leaked under privacy mode");
assert!(!leaked_pose, "pose leaked under privacy mode");
// Non-biometric entities + semantic primitives still appear.
let presence_cfg = topics
.iter()
.any(|t| t.ends_with("/wifi_densepose_inttest2/presence/config"));
let sleeping_cfg = topics.iter().any(|t| {
t.ends_with("/wifi_densepose_inttest2/someone_sleeping/config")
});
assert!(presence_cfg, "presence missing in privacy mode");
assert!(
sleeping_cfg,
"someone_sleeping must remain in privacy mode (it's inferred, not biometric)"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn state_messages_published_on_snapshot_broadcast() {
let Some(port) = should_run() else { return; };
let (sub, mut sub_loop) = subscribe_client(
port,
&["homeassistant/binary_sensor/+/presence/state"],
)
.await;
let cfg = make_cfg(port, false);
let builder = make_builder("inttest3");
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let _handle = spawn(cfg, builder, rx);
// Wait briefly so the publisher is connected before we publish.
tokio::time::sleep(Duration::from_millis(700)).await;
// Fire two snapshots — one with presence=true, one with presence=false.
let _ = tx.send(VitalsSnapshot {
node_id: "inttest3".into(),
timestamp_ms: 1779_512_400_000,
presence: true,
fall_detected: false,
motion: 0.40,
motion_energy: 800.0,
presence_score: 0.95,
breathing_rate_bpm: Some(14.0),
heartrate_bpm: Some(72.0),
n_persons: 1,
rssi_dbm: Some(-48.0),
vital_confidence: 0.9,
});
tokio::time::sleep(Duration::from_millis(200)).await;
let _ = tx.send(VitalsSnapshot {
node_id: "inttest3".into(),
timestamp_ms: 1779_512_400_500,
presence: false,
fall_detected: false,
motion: 0.02,
motion_energy: 50.0,
presence_score: 0.10,
breathing_rate_bpm: None,
heartrate_bpm: None,
n_persons: 0,
rssi_dbm: Some(-52.0),
vital_confidence: 0.5,
});
let msgs = collect_published(&mut sub_loop, Duration::from_secs(3)).await;
let _ = sub.disconnect().await;
let presence_states: Vec<String> = msgs
.iter()
.filter(|(t, _, _)| t.contains("/inttest3/presence/state"))
.map(|(_, p, _)| String::from_utf8_lossy(p).into_owned())
.collect();
assert!(
presence_states.iter().any(|p| p == "ON"),
"expected ON state, got {:?}",
presence_states
);
assert!(
presence_states.iter().any(|p| p == "OFF"),
"expected OFF state, got {:?}",
presence_states
);
}