fix(adr-115/test): drive state-test snapshots throughout capture window

Iter 46 — second attempt at fixing state_messages_published_on_snapshot_broadcast
on CI. The iter-45 SubAck fix proved necessary but not sufficient;
the test still returned an empty Vec for presence states.

Root cause analysis: the test was front-loading 6 snapshots over 1.2 s
after a 3 s warm-up sleep, then capturing for 8 s. That schedule
assumes:
  - mosquitto sidecar is ready
  - cargo build cache is warm
  - rumqttc connect + 21 QoS-1 discovery publishes complete in <3 s
  - the publisher's select! starts draining state_rx in <3 s

On the CI runner those assumptions break. The publisher takes >3 s to
finish discovery, so all 6 state publishes either land in the rumqttc
outbound channel before the broker is reachable OR are emitted while
the subscriber's reception path has stalled.

Fix: drive snapshots in a background task THROUGHOUT the capture
window instead of front-loading them. 40 snapshots × 300 ms = 12 s
of steady-state ON/OFF traffic across a 14 s capture window. Even if
the first 3-5 s of publishes are missed during slow publisher
bootstrap, plenty of ON and OFF messages arrive afterward.

This also makes the test more representative of real HA workloads
(steady stream of vitals, not a burst then silence).

Local cargo test --features mqtt --no-default-features --test
mqtt_integration --no-run → compiles green.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-23 16:02:31 -04:00
parent 636ca7b52f
commit 26e00e6910
1 changed files with 33 additions and 31 deletions

View File

@ -274,38 +274,40 @@ async fn state_messages_published_on_snapshot_broadcast() {
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
let _handle = spawn(cfg, builder, rx);
// Wait long enough for the publisher to:
// (a) connect to the broker (rumqttc connects on first publish)
// (b) complete the initial 21+ QoS-1 discovery publishes
// (c) reach its select! and begin draining state_rx
// 3s is well past what we measured locally for the full ramp-up
// (median ~800ms on a fast loopback; doubled for CI safety).
tokio::time::sleep(Duration::from_secs(3)).await;
// Iter 46 — instead of front-loading 6 snapshots and hoping the
// publisher's startup beats them, drive snapshots in a background
// task THROUGHOUT the capture window. CI runners can be slow to
// boot the publisher (mosquitto sidecar + cold cargo cache + slow
// QoS-1 discovery publishes), so a "publisher must be ready by
// t=3s" assumption is fragile. Steady-state ON/OFF traffic for the
// full 14 s window guarantees both states appear in the capture
// even if the first 3-5 s of publishes are missed.
let tx_bg = tx.clone();
let drive = tokio::spawn(async move {
// Brief warm-up before first publish.
tokio::time::sleep(Duration::from_secs(1)).await;
for i in 0..40 {
let _ = tx_bg.send(VitalsSnapshot {
node_id: "inttest3".into(),
timestamp_ms: 1779_512_400_000 + (i as i64) * 300,
presence: i % 2 == 0,
fall_detected: false,
motion: if i % 2 == 0 { 0.40 } else { 0.02 },
motion_energy: 800.0,
presence_score: if i % 2 == 0 { 0.95 } else { 0.10 },
breathing_rate_bpm: Some(14.0),
heartrate_bpm: Some(72.0),
n_persons: if i % 2 == 0 { 1 } else { 0 },
rssi_dbm: Some(-48.0),
vital_confidence: 0.9,
});
tokio::time::sleep(Duration::from_millis(300)).await;
}
});
// Fire snapshots repeatedly so a single dropped broadcast doesn't
// tank the test. Each tx.send is fanout to ALL receivers, so the
// publisher receives every one.
for i in 0..6 {
let _ = tx.send(VitalsSnapshot {
node_id: "inttest3".into(),
timestamp_ms: 1779_512_400_000 + (i as i64) * 100,
presence: i % 2 == 0,
fall_detected: false,
motion: if i % 2 == 0 { 0.40 } else { 0.02 },
motion_energy: 800.0,
presence_score: if i % 2 == 0 { 0.95 } else { 0.10 },
breathing_rate_bpm: Some(14.0),
heartrate_bpm: Some(72.0),
n_persons: if i % 2 == 0 { 1 } else { 0 },
rssi_dbm: Some(-48.0),
vital_confidence: 0.9,
});
tokio::time::sleep(Duration::from_millis(200)).await;
}
// Capture window — generous so we don't race the publisher's
// change-detection on the presence binary_sensor.
let msgs = collect_published(&mut sub_loop, Duration::from_secs(8)).await;
// 14 s window covers warm-up + 12 s of steady-state ON/OFF traffic.
let msgs = collect_published(&mut sub_loop, Duration::from_secs(14)).await;
drive.abort();
let _ = sub.disconnect().await;
// Diagnostic: dump every captured topic so we can see what (if