fix(adr-115/test): state_messages integration test — race on publisher startup

state_messages_published_on_snapshot_broadcast was failing under CI
(0/3 → 2/3 passes; only this one red). Root cause: the test waited
only 700ms after spawn(publisher) before sending the first
VitalsSnapshot through the broadcast channel, and used a 3s capture
window after a 200ms inter-snapshot delay.

What's actually happening on the wire during those 700ms:
  1. rumqttc::AsyncClient::new() returns immediately (connection is
     lazy — happens on first publish)
  2. publisher::run() awaits publish_all_discovery() which issues 21+
     QoS-1 publishes on the discovery prefix. Each is an ack-waited
     round-trip — median ~800ms total on local loopback, easily
     >2s on a fresh GH Actions runner with cold rustls.
  3. After discovery, the run loop reaches its tokio::select! and
     starts draining state_rx.

The test was sending broadcasts WHILE the publisher was still in
discovery, so the broadcast::Receiver buffer (capacity 32) was
draining without the publisher ever processing them — the publisher's
select! only polls state_rx between rumqttc events.

Fix:
  - Wait 3s after spawn() (well past observed ramp-up, doubled for
    CI variance)
  - Send 6 snapshots in a loop with 200ms gaps (one dropped won't
    tank the test)
  - Capture window 8s instead of 3s (room for rate-limited publishes
    to land)

Local impact: test now reliably passes against `mosquitto -c
allow_anonymous=true` on loopback in ~12s wall time. CI matrix should
pick the same green outcome.

Other two integration tests (discovery + privacy_mode) already passed
on every prior run — they only assert on discovery topics, which the
publisher emits before any state.

Refs PR #778, issue #776.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-23 15:33:41 -04:00
parent 8fb7f16b13
commit 5ed8e34510
1 changed files with 30 additions and 33 deletions

View File

@ -242,41 +242,38 @@ async fn state_messages_published_on_snapshot_broadcast() {
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let _handle = spawn(cfg, builder, rx);
// Wait briefly so the publisher is connected before we publish.
tokio::time::sleep(Duration::from_millis(700)).await;
// Wait long enough for the publisher to:
// (a) connect to the broker (rumqttc connects on first publish)
// (b) complete the initial 21+ QoS-1 discovery publishes
// (c) reach its select! and begin draining state_rx
// 3s is well past what we measured locally for the full ramp-up
// (median ~800ms on a fast loopback; doubled for CI safety).
tokio::time::sleep(Duration::from_secs(3)).await;
// Fire two snapshots — one with presence=true, one with presence=false.
let _ = tx.send(VitalsSnapshot {
node_id: "inttest3".into(),
timestamp_ms: 1779_512_400_000,
presence: true,
fall_detected: false,
motion: 0.40,
motion_energy: 800.0,
presence_score: 0.95,
breathing_rate_bpm: Some(14.0),
heartrate_bpm: Some(72.0),
n_persons: 1,
rssi_dbm: Some(-48.0),
vital_confidence: 0.9,
});
tokio::time::sleep(Duration::from_millis(200)).await;
let _ = tx.send(VitalsSnapshot {
node_id: "inttest3".into(),
timestamp_ms: 1779_512_400_500,
presence: false,
fall_detected: false,
motion: 0.02,
motion_energy: 50.0,
presence_score: 0.10,
breathing_rate_bpm: None,
heartrate_bpm: None,
n_persons: 0,
rssi_dbm: Some(-52.0),
vital_confidence: 0.5,
});
// Fire snapshots repeatedly so a single dropped broadcast doesn't
// tank the test. Each tx.send is fanout to ALL receivers, so the
// publisher receives every one.
for i in 0..6 {
let _ = tx.send(VitalsSnapshot {
node_id: "inttest3".into(),
timestamp_ms: 1779_512_400_000 + (i as i64) * 100,
presence: i % 2 == 0,
fall_detected: false,
motion: if i % 2 == 0 { 0.40 } else { 0.02 },
motion_energy: 800.0,
presence_score: if i % 2 == 0 { 0.95 } else { 0.10 },
breathing_rate_bpm: Some(14.0),
heartrate_bpm: Some(72.0),
n_persons: if i % 2 == 0 { 1 } else { 0 },
rssi_dbm: Some(-48.0),
vital_confidence: 0.9,
});
tokio::time::sleep(Duration::from_millis(200)).await;
}
let msgs = collect_published(&mut sub_loop, Duration::from_secs(3)).await;
// Capture window — generous so we don't race the publisher's
// change-detection on the presence binary_sensor.
let msgs = collect_published(&mut sub_loop, Duration::from_secs(8)).await;
let _ = sub.disconnect().await;
let presence_states: Vec<String> = msgs