diff --git a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs index e625efad..f71bb3d6 100644 --- a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs +++ b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs @@ -171,12 +171,28 @@ async fn discovery_topics_appear_on_broker() { // Spawn the publisher. let cfg = make_cfg(port, false, "discovery"); let builder = make_builder("inttest1"); - let (_tx, rx) = broadcast::channel::(32); + let (tx, rx) = broadcast::channel::(32); let _handle = spawn(cfg, builder, rx); + // #898: discovery is now published per-node the first time a snapshot for + // that node_id arrives (not eagerly at startup). Drive snapshots for + // "inttest1" throughout the window so its device's discovery lands — same + // pattern as state_messages_published_on_snapshot_broadcast. + let tx_bg = tx.clone(); + let drive = tokio::spawn(async move { + for _ in 0..60 { + let _ = tx_bg.send(VitalsSnapshot { + node_id: "inttest1".into(), + ..Default::default() + }); + tokio::time::sleep(Duration::from_millis(200)).await; + } + }); + // Drain the subscriber for up to 6 s — enough for initial discovery // + first availability publication. let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await; + drive.abort(); let _ = sub.disconnect().await; // Assertions: at least the presence + heart_rate + fall discovery @@ -221,10 +237,23 @@ async fn privacy_mode_suppresses_biometric_discovery() { let cfg = make_cfg(port, /* privacy_mode = */ true, "privacy"); let builder = make_builder("inttest2"); - let (_tx, rx) = broadcast::channel::(32); + let (tx, rx) = broadcast::channel::(32); let _handle = spawn(cfg, builder, rx); + // #898: per-node discovery is triggered by a snapshot for that node_id. + let tx_bg = tx.clone(); + let drive = tokio::spawn(async move { + for _ in 0..60 { + let _ = tx_bg.send(VitalsSnapshot { + node_id: "inttest2".into(), + ..Default::default() + }); + tokio::time::sleep(Duration::from_millis(200)).await; + } + }); + let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await; + drive.abort(); let _ = sub.disconnect().await; let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect();