From 27edf153dcb33c6e99abee3d461f33e8a7fcb252 Mon Sep 17 00:00:00 2001 From: ruv Date: Tue, 2 Jun 2026 10:29:17 +0200 Subject: [PATCH] =?UTF-8?q?test(mqtt):=20drive=20per-node=20snapshots=20in?= =?UTF-8?q?=20discovery=20integration=20tests=20=E2=80=94=20#898?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After the per-node discovery change, discovery configs are published the first time a snapshot for a node_id arrives (not eagerly at startup). The two discovery integration tests (discovery_topics_appear_on_broker, privacy_mode_suppresses_biometric_discovery) spawned the publisher with an empty broadcast channel and never sent a snapshot, so they collected [] and failed ("missing presence discovery topic in []"). Drive snapshots for the test node_id throughout the capture window (same pattern as state_messages_published_on_snapshot_broadcast) so the per-node device's discovery lands. Verified against a local mosquitto: 3 passed. --- .../tests/mqtt_integration.rs | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs index e625efad..f71bb3d6 100644 --- a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs +++ b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs @@ -171,12 +171,28 @@ async fn discovery_topics_appear_on_broker() { // Spawn the publisher. let cfg = make_cfg(port, false, "discovery"); let builder = make_builder("inttest1"); - let (_tx, rx) = broadcast::channel::(32); + let (tx, rx) = broadcast::channel::(32); let _handle = spawn(cfg, builder, rx); + // #898: discovery is now published per-node the first time a snapshot for + // that node_id arrives (not eagerly at startup). Drive snapshots for + // "inttest1" throughout the window so its device's discovery lands — same + // pattern as state_messages_published_on_snapshot_broadcast. + let tx_bg = tx.clone(); + let drive = tokio::spawn(async move { + for _ in 0..60 { + let _ = tx_bg.send(VitalsSnapshot { + node_id: "inttest1".into(), + ..Default::default() + }); + tokio::time::sleep(Duration::from_millis(200)).await; + } + }); + // Drain the subscriber for up to 6 s — enough for initial discovery // + first availability publication. let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await; + drive.abort(); let _ = sub.disconnect().await; // Assertions: at least the presence + heart_rate + fall discovery @@ -221,10 +237,23 @@ async fn privacy_mode_suppresses_biometric_discovery() { let cfg = make_cfg(port, /* privacy_mode = */ true, "privacy"); let builder = make_builder("inttest2"); - let (_tx, rx) = broadcast::channel::(32); + let (tx, rx) = broadcast::channel::(32); let _handle = spawn(cfg, builder, rx); + // #898: per-node discovery is triggered by a snapshot for that node_id. + let tx_bg = tx.clone(); + let drive = tokio::spawn(async move { + for _ in 0..60 { + let _ = tx_bg.send(VitalsSnapshot { + node_id: "inttest2".into(), + ..Default::default() + }); + tokio::time::sleep(Duration::from_millis(200)).await; + } + }); + let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await; + drive.abort(); let _ = sub.disconnect().await; let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect();