fix(adr-115/test): drive subscriber eventloop until SubAck before returning

Third cause of the state_messages_published_on_snapshot_broadcast
failure (after timing fix in 5ed8e3451 and client_id fix in
2aeed32a7): the subscriber's eventloop was NEVER polled between
`client.subscribe(...).await` and `collect_published(eventloop, ...)`,
so the SUBSCRIBE packet was only queued in rumqttc's outbound channel
— it didn't reach the broker until collect_published began polling.

By that time the publisher had already emitted all 6 state messages.
The retained ones (binary_sensor presence with retain=true) should
have been redelivered on the late subscribe, but only the LAST one
would land — yet CI was reporting `got []` (zero messages).

Theory: the broker may not redeliver retained messages reliably when
the subscribe arrives during the publisher's burst, OR the test's
collect_published timing budget runs out before redelivery completes.

Fix: drain the subscriber's eventloop inside `subscribe_client` until
we see the SubAck for our subscribe. That guarantees the subscription
is active at the broker BEFORE the function returns, so non-retained
publishes from the publisher's send loop arrive normally.

Also made the subscriber client_id include a per-call nanosecond
suffix so subscribers in back-to-back tests can't collide on a shared
ID (paranoia, complementary to the publisher-side fix from
2aeed32a7).

Refs PR #778, issue #776.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-23 15:52:52 -04:00
parent 2aeed32a72
commit 967cede74d
1 changed files with 29 additions and 2 deletions

View File

@ -96,17 +96,44 @@ fn make_builder(node: &str) -> OwnedDiscoveryBuilder {
}
async fn subscribe_client(port: u16, topics: &[&str]) -> (AsyncClient, EventLoop) {
// Per-call unique client_id so subscribers across tests don't take
// each other over.
let suffix: u64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.subsec_nanos() as u64)
.unwrap_or(0);
let mut opts = MqttOptions::new(
format!("ruview-test-sub-{}", std::process::id()),
format!("ruview-test-sub-{}-{}", std::process::id(), suffix),
"127.0.0.1",
port,
);
opts.set_keep_alive(Duration::from_secs(10));
opts.set_clean_session(true);
let (client, eventloop) = AsyncClient::new(opts, 256);
let (client, mut eventloop) = AsyncClient::new(opts, 256);
for t in topics {
client.subscribe(*t, QoS::AtLeastOnce).await.unwrap();
}
// Drive the eventloop until we see the SubAck for our last subscribe.
// Without this the SUBSCRIBE packet is only queued in rumqttc's
// outbound channel; it doesn't reach the broker until something
// pumps the eventloop. The caller's `collect_published` does that,
// but by then the publisher may already have emitted state
// messages — including the retained ones that won't be re-sent.
let until = tokio::time::Instant::now() + Duration::from_secs(3);
while tokio::time::Instant::now() < until {
let remain = until - tokio::time::Instant::now();
match timeout(remain, eventloop.poll()).await {
Ok(Ok(Event::Incoming(Packet::SubAck(_)))) => break,
Ok(Ok(_)) => continue,
Ok(Err(e)) => {
eprintln!("[subscribe_client] eventloop error before SubAck: {e}");
break;
}
Err(_) => break,
}
}
(client, eventloop)
}