diff --git a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs index 1b3bfb20..118d3e4f 100644 --- a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs +++ b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs @@ -242,41 +242,38 @@ async fn state_messages_published_on_snapshot_broadcast() { let (tx, rx) = broadcast::channel::(32); let _handle = spawn(cfg, builder, rx); - // Wait briefly so the publisher is connected before we publish. - tokio::time::sleep(Duration::from_millis(700)).await; + // Wait long enough for the publisher to: + // (a) connect to the broker (rumqttc connects on first publish) + // (b) complete the initial 21+ QoS-1 discovery publishes + // (c) reach its select! and begin draining state_rx + // 3s is well past what we measured locally for the full ramp-up + // (median ~800ms on a fast loopback; doubled for CI safety). + tokio::time::sleep(Duration::from_secs(3)).await; - // Fire two snapshots — one with presence=true, one with presence=false. - let _ = tx.send(VitalsSnapshot { - node_id: "inttest3".into(), - timestamp_ms: 1779_512_400_000, - presence: true, - fall_detected: false, - motion: 0.40, - motion_energy: 800.0, - presence_score: 0.95, - breathing_rate_bpm: Some(14.0), - heartrate_bpm: Some(72.0), - n_persons: 1, - rssi_dbm: Some(-48.0), - vital_confidence: 0.9, - }); - tokio::time::sleep(Duration::from_millis(200)).await; - let _ = tx.send(VitalsSnapshot { - node_id: "inttest3".into(), - timestamp_ms: 1779_512_400_500, - presence: false, - fall_detected: false, - motion: 0.02, - motion_energy: 50.0, - presence_score: 0.10, - breathing_rate_bpm: None, - heartrate_bpm: None, - n_persons: 0, - rssi_dbm: Some(-52.0), - vital_confidence: 0.5, - }); + // Fire snapshots repeatedly so a single dropped broadcast doesn't + // tank the test. Each tx.send is fanout to ALL receivers, so the + // publisher receives every one. + for i in 0..6 { + let _ = tx.send(VitalsSnapshot { + node_id: "inttest3".into(), + timestamp_ms: 1779_512_400_000 + (i as i64) * 100, + presence: i % 2 == 0, + fall_detected: false, + motion: if i % 2 == 0 { 0.40 } else { 0.02 }, + motion_energy: 800.0, + presence_score: if i % 2 == 0 { 0.95 } else { 0.10 }, + breathing_rate_bpm: Some(14.0), + heartrate_bpm: Some(72.0), + n_persons: if i % 2 == 0 { 1 } else { 0 }, + rssi_dbm: Some(-48.0), + vital_confidence: 0.9, + }); + tokio::time::sleep(Duration::from_millis(200)).await; + } - let msgs = collect_published(&mut sub_loop, Duration::from_secs(3)).await; + // Capture window — generous so we don't race the publisher's + // change-detection on the presence binary_sensor. + let msgs = collect_published(&mut sub_loop, Duration::from_secs(8)).await; let _ = sub.disconnect().await; let presence_states: Vec = msgs