fix(adr-115/test): state_messages integration test — race on publisher startup
state_messages_published_on_snapshot_broadcast was failing under CI
(0/3 → 2/3 passes; only this one red). Root cause: the test waited
only 700ms after spawn(publisher) before sending the first
VitalsSnapshot through the broadcast channel, and used a 3s capture
window after a 200ms inter-snapshot delay.
What's actually happening on the wire during those 700ms:
1. rumqttc::AsyncClient::new() returns immediately (connection is
lazy — happens on first publish)
2. publisher::run() awaits publish_all_discovery() which issues 21+
QoS-1 publishes on the discovery prefix. Each is an ack-waited
round-trip — median ~800ms total on local loopback, easily
>2s on a fresh GH Actions runner with cold rustls.
3. After discovery, the run loop reaches its tokio::select! and
starts draining state_rx.
The test was sending broadcasts WHILE the publisher was still in
discovery, so the broadcast::Receiver buffer (capacity 32) was
draining without the publisher ever processing them — the publisher's
select! only polls state_rx between rumqttc events.
Fix:
- Wait 3s after spawn() (well past observed ramp-up, doubled for
CI variance)
- Send 6 snapshots in a loop with 200ms gaps (one dropped won't
tank the test)
- Capture window 8s instead of 3s (room for rate-limited publishes
to land)
Local impact: test now reliably passes against `mosquitto -c
allow_anonymous=true` on loopback in ~12s wall time. CI matrix should
pick the same green outcome.
Other two integration tests (discovery + privacy_mode) already passed
on every prior run — they only assert on discovery topics, which the
publisher emits before any state.
Refs PR #778, issue #776.
Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
parent
8fb7f16b13
commit
5ed8e34510
|
|
@ -242,41 +242,38 @@ async fn state_messages_published_on_snapshot_broadcast() {
|
||||||
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
|
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
|
||||||
let _handle = spawn(cfg, builder, rx);
|
let _handle = spawn(cfg, builder, rx);
|
||||||
|
|
||||||
// Wait briefly so the publisher is connected before we publish.
|
// Wait long enough for the publisher to:
|
||||||
tokio::time::sleep(Duration::from_millis(700)).await;
|
// (a) connect to the broker (rumqttc connects on first publish)
|
||||||
|
// (b) complete the initial 21+ QoS-1 discovery publishes
|
||||||
|
// (c) reach its select! and begin draining state_rx
|
||||||
|
// 3s is well past what we measured locally for the full ramp-up
|
||||||
|
// (median ~800ms on a fast loopback; doubled for CI safety).
|
||||||
|
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||||
|
|
||||||
// Fire two snapshots — one with presence=true, one with presence=false.
|
// Fire snapshots repeatedly so a single dropped broadcast doesn't
|
||||||
let _ = tx.send(VitalsSnapshot {
|
// tank the test. Each tx.send is fanout to ALL receivers, so the
|
||||||
node_id: "inttest3".into(),
|
// publisher receives every one.
|
||||||
timestamp_ms: 1779_512_400_000,
|
for i in 0..6 {
|
||||||
presence: true,
|
let _ = tx.send(VitalsSnapshot {
|
||||||
fall_detected: false,
|
node_id: "inttest3".into(),
|
||||||
motion: 0.40,
|
timestamp_ms: 1779_512_400_000 + (i as i64) * 100,
|
||||||
motion_energy: 800.0,
|
presence: i % 2 == 0,
|
||||||
presence_score: 0.95,
|
fall_detected: false,
|
||||||
breathing_rate_bpm: Some(14.0),
|
motion: if i % 2 == 0 { 0.40 } else { 0.02 },
|
||||||
heartrate_bpm: Some(72.0),
|
motion_energy: 800.0,
|
||||||
n_persons: 1,
|
presence_score: if i % 2 == 0 { 0.95 } else { 0.10 },
|
||||||
rssi_dbm: Some(-48.0),
|
breathing_rate_bpm: Some(14.0),
|
||||||
vital_confidence: 0.9,
|
heartrate_bpm: Some(72.0),
|
||||||
});
|
n_persons: if i % 2 == 0 { 1 } else { 0 },
|
||||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
rssi_dbm: Some(-48.0),
|
||||||
let _ = tx.send(VitalsSnapshot {
|
vital_confidence: 0.9,
|
||||||
node_id: "inttest3".into(),
|
});
|
||||||
timestamp_ms: 1779_512_400_500,
|
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||||
presence: false,
|
}
|
||||||
fall_detected: false,
|
|
||||||
motion: 0.02,
|
|
||||||
motion_energy: 50.0,
|
|
||||||
presence_score: 0.10,
|
|
||||||
breathing_rate_bpm: None,
|
|
||||||
heartrate_bpm: None,
|
|
||||||
n_persons: 0,
|
|
||||||
rssi_dbm: Some(-52.0),
|
|
||||||
vital_confidence: 0.5,
|
|
||||||
});
|
|
||||||
|
|
||||||
let msgs = collect_published(&mut sub_loop, Duration::from_secs(3)).await;
|
// Capture window — generous so we don't race the publisher's
|
||||||
|
// change-detection on the presence binary_sensor.
|
||||||
|
let msgs = collect_published(&mut sub_loop, Duration::from_secs(8)).await;
|
||||||
let _ = sub.disconnect().await;
|
let _ = sub.disconnect().await;
|
||||||
|
|
||||||
let presence_states: Vec<String> = msgs
|
let presence_states: Vec<String> = msgs
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue