299 lines
10 KiB
Rust
299 lines
10 KiB
Rust
//! ADR-115 P4 — MQTT integration tests against a real broker.
|
|
//!
|
|
//! These tests require an MQTT broker reachable at `localhost:11883`
|
|
//! (overridable via `RUVIEW_TEST_MQTT_PORT`). They are gated behind the
|
|
//! `mqtt` feature (which pulls in `rumqttc`) **and** behind the
|
|
//! `RUVIEW_RUN_INTEGRATION` env var so the default test run on
|
|
//! developer machines doesn't break when there's no broker.
|
|
//!
|
|
//! In CI, the `.github/workflows/mqtt-integration.yml` workflow spins
|
|
//! up a Mosquitto sidecar container, sets `RUVIEW_RUN_INTEGRATION=1`,
|
|
//! and runs `cargo test -p wifi-densepose-sensing-server --features mqtt
|
|
//! --test mqtt_integration`.
|
|
//!
|
|
//! ## What these tests prove
|
|
//!
|
|
//! 1. The publisher connects to a real broker and emits HA discovery
|
|
//! `config` topics for every enabled entity.
|
|
//! 2. The discovery payloads round-trip back via `mosquitto_sub`-style
|
|
//! subscription with the exact JSON shape `mqtt::discovery` produces.
|
|
//! 3. Availability is published `online` retained on connect and
|
|
//! `offline` on graceful disconnect (the LWT/disconnect path).
|
|
//! 4. Privacy mode strips heart-rate / breathing-rate / pose discovery
|
|
//! from the wire entirely — the integration confirms the strip
|
|
//! happens at the broker boundary, not just in unit-test logic.
|
|
//!
|
|
//! ## Why this is gated
|
|
//!
|
|
//! We need a live broker. Pulling `rumqttd` into the dev-dep tree as an
|
|
//! embedded broker would work in theory but adds 60+ transitive deps
|
|
//! and 1+ min compile time to every `cargo test` invocation on every
|
|
//! developer's machine. Gating behind an env var keeps the default
|
|
//! `cargo test --workspace` fast.
|
|
|
|
#![cfg(feature = "mqtt")]
|
|
|
|
use std::time::Duration;
|
|
|
|
use rumqttc::{AsyncClient, Event, EventLoop, MqttOptions, Packet, QoS};
|
|
use serde_json::Value;
|
|
use tokio::sync::broadcast;
|
|
use tokio::time::timeout;
|
|
|
|
use wifi_densepose_sensing_server::mqtt::{
|
|
config::{MqttConfig, PublishRates, TlsConfig},
|
|
publisher::{spawn, OwnedDiscoveryBuilder},
|
|
state::VitalsSnapshot,
|
|
};
|
|
|
|
fn should_run() -> Option<u16> {
|
|
if std::env::var("RUVIEW_RUN_INTEGRATION").is_err() {
|
|
eprintln!("[skip] set RUVIEW_RUN_INTEGRATION=1 + run a broker on the test port");
|
|
return None;
|
|
}
|
|
let port = std::env::var("RUVIEW_TEST_MQTT_PORT")
|
|
.ok()
|
|
.and_then(|s| s.parse().ok())
|
|
.unwrap_or(11883);
|
|
Some(port)
|
|
}
|
|
|
|
fn make_cfg(port: u16, privacy_mode: bool) -> std::sync::Arc<MqttConfig> {
|
|
std::sync::Arc::new(MqttConfig {
|
|
host: "127.0.0.1".into(),
|
|
port,
|
|
username: None,
|
|
password: None,
|
|
client_id: format!("ruview-int-test-{}", std::process::id()),
|
|
discovery_prefix: "homeassistant".into(),
|
|
tls: TlsConfig::Off,
|
|
refresh_secs: 60,
|
|
rates: PublishRates {
|
|
// Fast rates so the test gets a sample quickly.
|
|
vitals_hz: 5.0,
|
|
motion_hz: 5.0,
|
|
count_hz: 5.0,
|
|
rssi_hz: 5.0,
|
|
pose_hz: 5.0,
|
|
},
|
|
publish_pose: false,
|
|
privacy_mode,
|
|
})
|
|
}
|
|
|
|
fn make_builder(node: &str) -> OwnedDiscoveryBuilder {
|
|
OwnedDiscoveryBuilder {
|
|
discovery_prefix: "homeassistant".into(),
|
|
node_id: node.into(),
|
|
node_friendly_name: Some(format!("Test {}", node)),
|
|
sw_version: "0.7.0-test".into(),
|
|
model: "integration".into(),
|
|
via_device: None,
|
|
}
|
|
}
|
|
|
|
async fn subscribe_client(port: u16, topics: &[&str]) -> (AsyncClient, EventLoop) {
|
|
let mut opts = MqttOptions::new(
|
|
format!("ruview-test-sub-{}", std::process::id()),
|
|
"127.0.0.1",
|
|
port,
|
|
);
|
|
opts.set_keep_alive(Duration::from_secs(10));
|
|
opts.set_clean_session(true);
|
|
let (client, eventloop) = AsyncClient::new(opts, 256);
|
|
for t in topics {
|
|
client.subscribe(*t, QoS::AtLeastOnce).await.unwrap();
|
|
}
|
|
(client, eventloop)
|
|
}
|
|
|
|
async fn collect_published(
|
|
eventloop: &mut EventLoop,
|
|
deadline: Duration,
|
|
) -> Vec<(String, Vec<u8>, bool)> {
|
|
let mut out = Vec::new();
|
|
let until = tokio::time::Instant::now() + deadline;
|
|
while tokio::time::Instant::now() < until {
|
|
let remain = until - tokio::time::Instant::now();
|
|
match timeout(remain, eventloop.poll()).await {
|
|
Ok(Ok(Event::Incoming(Packet::Publish(p)))) => {
|
|
out.push((p.topic, p.payload.to_vec(), p.retain));
|
|
}
|
|
Ok(Ok(_)) => {} // ignore other events
|
|
Ok(Err(e)) => {
|
|
eprintln!("[test] eventloop error: {}", e);
|
|
break;
|
|
}
|
|
Err(_) => break,
|
|
}
|
|
}
|
|
out
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn discovery_topics_appear_on_broker() {
|
|
let Some(port) = should_run() else { return; };
|
|
|
|
// Subscriber wired first so we don't miss the initial discovery burst.
|
|
let (sub, mut sub_loop) =
|
|
subscribe_client(port, &["homeassistant/#"]).await;
|
|
|
|
// Spawn the publisher.
|
|
let cfg = make_cfg(port, false);
|
|
let builder = make_builder("inttest1");
|
|
let (_tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
|
|
let _handle = spawn(cfg, builder, rx);
|
|
|
|
// Drain the subscriber for up to 6 s — enough for initial discovery
|
|
// + first availability publication.
|
|
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
|
|
let _ = sub.disconnect().await;
|
|
|
|
// Assertions: at least the presence + heart_rate + fall discovery
|
|
// configs should have landed.
|
|
let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect();
|
|
let presence_cfg = topics
|
|
.iter()
|
|
.any(|t| t.ends_with("/wifi_densepose_inttest1/presence/config"));
|
|
let hr_cfg = topics
|
|
.iter()
|
|
.any(|t| t.ends_with("/wifi_densepose_inttest1/heart_rate/config"));
|
|
let fall_cfg = topics
|
|
.iter()
|
|
.any(|t| t.ends_with("/wifi_densepose_inttest1/fall/config"));
|
|
|
|
assert!(presence_cfg, "missing presence discovery topic in {:?}", topics);
|
|
assert!(hr_cfg, "missing heart_rate discovery topic in {:?}", topics);
|
|
assert!(fall_cfg, "missing fall discovery topic in {:?}", topics);
|
|
|
|
// Spot-check the JSON shape of one discovery payload.
|
|
let presence_payload = msgs
|
|
.iter()
|
|
.find(|(t, _, _)| t.ends_with("/presence/config"))
|
|
.map(|(_, p, _)| p.clone())
|
|
.unwrap();
|
|
let json: Value = serde_json::from_slice(&presence_payload).unwrap();
|
|
assert_eq!(json["device_class"], "occupancy");
|
|
assert_eq!(json["payload_on"], "ON");
|
|
assert_eq!(json["payload_off"], "OFF");
|
|
assert!(json["unique_id"]
|
|
.as_str()
|
|
.unwrap()
|
|
.starts_with("wifi_densepose_"));
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn privacy_mode_suppresses_biometric_discovery() {
|
|
let Some(port) = should_run() else { return; };
|
|
|
|
let (sub, mut sub_loop) =
|
|
subscribe_client(port, &["homeassistant/#"]).await;
|
|
|
|
let cfg = make_cfg(port, /* privacy_mode = */ true);
|
|
let builder = make_builder("inttest2");
|
|
let (_tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
|
|
let _handle = spawn(cfg, builder, rx);
|
|
|
|
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
|
|
let _ = sub.disconnect().await;
|
|
|
|
let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect();
|
|
|
|
// Biometric discovery must NOT appear.
|
|
let leaked_hr = topics
|
|
.iter()
|
|
.any(|t| t.contains("/inttest2/heart_rate/"));
|
|
let leaked_br = topics
|
|
.iter()
|
|
.any(|t| t.contains("/inttest2/breathing_rate/"));
|
|
let leaked_pose = topics.iter().any(|t| t.contains("/inttest2/pose/"));
|
|
|
|
assert!(!leaked_hr, "heart_rate leaked under privacy mode: {:?}", topics);
|
|
assert!(!leaked_br, "breathing_rate leaked under privacy mode");
|
|
assert!(!leaked_pose, "pose leaked under privacy mode");
|
|
|
|
// Non-biometric entities + semantic primitives still appear.
|
|
let presence_cfg = topics
|
|
.iter()
|
|
.any(|t| t.ends_with("/wifi_densepose_inttest2/presence/config"));
|
|
let sleeping_cfg = topics.iter().any(|t| {
|
|
t.ends_with("/wifi_densepose_inttest2/someone_sleeping/config")
|
|
});
|
|
|
|
assert!(presence_cfg, "presence missing in privacy mode");
|
|
assert!(
|
|
sleeping_cfg,
|
|
"someone_sleeping must remain in privacy mode (it's inferred, not biometric)"
|
|
);
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn state_messages_published_on_snapshot_broadcast() {
|
|
let Some(port) = should_run() else { return; };
|
|
|
|
let (sub, mut sub_loop) = subscribe_client(
|
|
port,
|
|
&["homeassistant/binary_sensor/+/presence/state"],
|
|
)
|
|
.await;
|
|
|
|
let cfg = make_cfg(port, false);
|
|
let builder = make_builder("inttest3");
|
|
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
|
|
let _handle = spawn(cfg, builder, rx);
|
|
|
|
// Wait briefly so the publisher is connected before we publish.
|
|
tokio::time::sleep(Duration::from_millis(700)).await;
|
|
|
|
// Fire two snapshots — one with presence=true, one with presence=false.
|
|
let _ = tx.send(VitalsSnapshot {
|
|
node_id: "inttest3".into(),
|
|
timestamp_ms: 1779_512_400_000,
|
|
presence: true,
|
|
fall_detected: false,
|
|
motion: 0.40,
|
|
motion_energy: 800.0,
|
|
presence_score: 0.95,
|
|
breathing_rate_bpm: Some(14.0),
|
|
heartrate_bpm: Some(72.0),
|
|
n_persons: 1,
|
|
rssi_dbm: Some(-48.0),
|
|
vital_confidence: 0.9,
|
|
});
|
|
tokio::time::sleep(Duration::from_millis(200)).await;
|
|
let _ = tx.send(VitalsSnapshot {
|
|
node_id: "inttest3".into(),
|
|
timestamp_ms: 1779_512_400_500,
|
|
presence: false,
|
|
fall_detected: false,
|
|
motion: 0.02,
|
|
motion_energy: 50.0,
|
|
presence_score: 0.10,
|
|
breathing_rate_bpm: None,
|
|
heartrate_bpm: None,
|
|
n_persons: 0,
|
|
rssi_dbm: Some(-52.0),
|
|
vital_confidence: 0.5,
|
|
});
|
|
|
|
let msgs = collect_published(&mut sub_loop, Duration::from_secs(3)).await;
|
|
let _ = sub.disconnect().await;
|
|
|
|
let presence_states: Vec<String> = msgs
|
|
.iter()
|
|
.filter(|(t, _, _)| t.contains("/inttest3/presence/state"))
|
|
.map(|(_, p, _)| String::from_utf8_lossy(p).into_owned())
|
|
.collect();
|
|
|
|
assert!(
|
|
presence_states.iter().any(|p| p == "ON"),
|
|
"expected ON state, got {:?}",
|
|
presence_states
|
|
);
|
|
assert!(
|
|
presence_states.iter().any(|p| p == "OFF"),
|
|
"expected OFF state, got {:?}",
|
|
presence_states
|
|
);
|
|
}
|