diff --git a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs index 50975617..4edb8a7c 100644 --- a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs +++ b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs @@ -274,38 +274,40 @@ async fn state_messages_published_on_snapshot_broadcast() { let (tx, rx) = broadcast::channel::(32); let _handle = spawn(cfg, builder, rx); - // Wait long enough for the publisher to: - // (a) connect to the broker (rumqttc connects on first publish) - // (b) complete the initial 21+ QoS-1 discovery publishes - // (c) reach its select! and begin draining state_rx - // 3s is well past what we measured locally for the full ramp-up - // (median ~800ms on a fast loopback; doubled for CI safety). - tokio::time::sleep(Duration::from_secs(3)).await; + // Iter 46 — instead of front-loading 6 snapshots and hoping the + // publisher's startup beats them, drive snapshots in a background + // task THROUGHOUT the capture window. CI runners can be slow to + // boot the publisher (mosquitto sidecar + cold cargo cache + slow + // QoS-1 discovery publishes), so a "publisher must be ready by + // t=3s" assumption is fragile. Steady-state ON/OFF traffic for the + // full 14 s window guarantees both states appear in the capture + // even if the first 3-5 s of publishes are missed. + let tx_bg = tx.clone(); + let drive = tokio::spawn(async move { + // Brief warm-up before first publish. + tokio::time::sleep(Duration::from_secs(1)).await; + for i in 0..40 { + let _ = tx_bg.send(VitalsSnapshot { + node_id: "inttest3".into(), + timestamp_ms: 1779_512_400_000 + (i as i64) * 300, + presence: i % 2 == 0, + fall_detected: false, + motion: if i % 2 == 0 { 0.40 } else { 0.02 }, + motion_energy: 800.0, + presence_score: if i % 2 == 0 { 0.95 } else { 0.10 }, + breathing_rate_bpm: Some(14.0), + heartrate_bpm: Some(72.0), + n_persons: if i % 2 == 0 { 1 } else { 0 }, + rssi_dbm: Some(-48.0), + vital_confidence: 0.9, + }); + tokio::time::sleep(Duration::from_millis(300)).await; + } + }); - // Fire snapshots repeatedly so a single dropped broadcast doesn't - // tank the test. Each tx.send is fanout to ALL receivers, so the - // publisher receives every one. - for i in 0..6 { - let _ = tx.send(VitalsSnapshot { - node_id: "inttest3".into(), - timestamp_ms: 1779_512_400_000 + (i as i64) * 100, - presence: i % 2 == 0, - fall_detected: false, - motion: if i % 2 == 0 { 0.40 } else { 0.02 }, - motion_energy: 800.0, - presence_score: if i % 2 == 0 { 0.95 } else { 0.10 }, - breathing_rate_bpm: Some(14.0), - heartrate_bpm: Some(72.0), - n_persons: if i % 2 == 0 { 1 } else { 0 }, - rssi_dbm: Some(-48.0), - vital_confidence: 0.9, - }); - tokio::time::sleep(Duration::from_millis(200)).await; - } - - // Capture window — generous so we don't race the publisher's - // change-detection on the presence binary_sensor. - let msgs = collect_published(&mut sub_loop, Duration::from_secs(8)).await; + // 14 s window covers warm-up + 12 s of steady-state ON/OFF traffic. + let msgs = collect_published(&mut sub_loop, Duration::from_secs(14)).await; + drive.abort(); let _ = sub.disconnect().await; // Diagnostic: dump every captured topic so we can see what (if