From 5ed8e34510da48ec08f024f9022362ea6887d09f Mon Sep 17 00:00:00 2001 From: ruv Date: Sat, 23 May 2026 15:33:41 -0400 Subject: [PATCH] =?UTF-8?q?fix(adr-115/test):=20state=5Fmessages=20integra?= =?UTF-8?q?tion=20test=20=E2=80=94=20race=20on=20publisher=20startup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit state_messages_published_on_snapshot_broadcast was failing under CI (0/3 → 2/3 passes; only this one red). Root cause: the test waited only 700ms after spawn(publisher) before sending the first VitalsSnapshot through the broadcast channel, and used a 3s capture window after a 200ms inter-snapshot delay. What's actually happening on the wire during those 700ms: 1. rumqttc::AsyncClient::new() returns immediately (connection is lazy — happens on first publish) 2. publisher::run() awaits publish_all_discovery() which issues 21+ QoS-1 publishes on the discovery prefix. Each is an ack-waited round-trip — median ~800ms total on local loopback, easily >2s on a fresh GH Actions runner with cold rustls. 3. After discovery, the run loop reaches its tokio::select! and starts draining state_rx. The test was sending broadcasts WHILE the publisher was still in discovery, so the broadcast::Receiver buffer (capacity 32) was draining without the publisher ever processing them — the publisher's select! only polls state_rx between rumqttc events. Fix: - Wait 3s after spawn() (well past observed ramp-up, doubled for CI variance) - Send 6 snapshots in a loop with 200ms gaps (one dropped won't tank the test) - Capture window 8s instead of 3s (room for rate-limited publishes to land) Local impact: test now reliably passes against `mosquitto -c allow_anonymous=true` on loopback in ~12s wall time. CI matrix should pick the same green outcome. Other two integration tests (discovery + privacy_mode) already passed on every prior run — they only assert on discovery topics, which the publisher emits before any state. Refs PR #778, issue #776. Co-Authored-By: claude-flow --- .../tests/mqtt_integration.rs | 63 +++++++++---------- 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs index 1b3bfb20..118d3e4f 100644 --- a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs +++ b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs @@ -242,41 +242,38 @@ async fn state_messages_published_on_snapshot_broadcast() { let (tx, rx) = broadcast::channel::(32); let _handle = spawn(cfg, builder, rx); - // Wait briefly so the publisher is connected before we publish. - tokio::time::sleep(Duration::from_millis(700)).await; + // Wait long enough for the publisher to: + // (a) connect to the broker (rumqttc connects on first publish) + // (b) complete the initial 21+ QoS-1 discovery publishes + // (c) reach its select! and begin draining state_rx + // 3s is well past what we measured locally for the full ramp-up + // (median ~800ms on a fast loopback; doubled for CI safety). + tokio::time::sleep(Duration::from_secs(3)).await; - // Fire two snapshots — one with presence=true, one with presence=false. - let _ = tx.send(VitalsSnapshot { - node_id: "inttest3".into(), - timestamp_ms: 1779_512_400_000, - presence: true, - fall_detected: false, - motion: 0.40, - motion_energy: 800.0, - presence_score: 0.95, - breathing_rate_bpm: Some(14.0), - heartrate_bpm: Some(72.0), - n_persons: 1, - rssi_dbm: Some(-48.0), - vital_confidence: 0.9, - }); - tokio::time::sleep(Duration::from_millis(200)).await; - let _ = tx.send(VitalsSnapshot { - node_id: "inttest3".into(), - timestamp_ms: 1779_512_400_500, - presence: false, - fall_detected: false, - motion: 0.02, - motion_energy: 50.0, - presence_score: 0.10, - breathing_rate_bpm: None, - heartrate_bpm: None, - n_persons: 0, - rssi_dbm: Some(-52.0), - vital_confidence: 0.5, - }); + // Fire snapshots repeatedly so a single dropped broadcast doesn't + // tank the test. Each tx.send is fanout to ALL receivers, so the + // publisher receives every one. + for i in 0..6 { + let _ = tx.send(VitalsSnapshot { + node_id: "inttest3".into(), + timestamp_ms: 1779_512_400_000 + (i as i64) * 100, + presence: i % 2 == 0, + fall_detected: false, + motion: if i % 2 == 0 { 0.40 } else { 0.02 }, + motion_energy: 800.0, + presence_score: if i % 2 == 0 { 0.95 } else { 0.10 }, + breathing_rate_bpm: Some(14.0), + heartrate_bpm: Some(72.0), + n_persons: if i % 2 == 0 { 1 } else { 0 }, + rssi_dbm: Some(-48.0), + vital_confidence: 0.9, + }); + tokio::time::sleep(Duration::from_millis(200)).await; + } - let msgs = collect_published(&mut sub_loop, Duration::from_secs(3)).await; + // Capture window — generous so we don't race the publisher's + // change-detection on the presence binary_sensor. + let msgs = collect_published(&mut sub_loop, Duration::from_secs(8)).await; let _ = sub.disconnect().await; let presence_states: Vec = msgs