From 967cede74d47e2b9e80ad6973841c51a9cab9fd7 Mon Sep 17 00:00:00 2001 From: ruv Date: Sat, 23 May 2026 15:52:52 -0400 Subject: [PATCH] fix(adr-115/test): drive subscriber eventloop until SubAck before returning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Third cause of the state_messages_published_on_snapshot_broadcast failure (after timing fix in 5ed8e3451 and client_id fix in 2aeed32a7): the subscriber's eventloop was NEVER polled between `client.subscribe(...).await` and `collect_published(eventloop, ...)`, so the SUBSCRIBE packet was only queued in rumqttc's outbound channel — it didn't reach the broker until collect_published began polling. By that time the publisher had already emitted all 6 state messages. The retained ones (binary_sensor presence with retain=true) should have been redelivered on the late subscribe, but only the LAST one would land — yet CI was reporting `got []` (zero messages). Theory: the broker may not redeliver retained messages reliably when the subscribe arrives during the publisher's burst, OR the test's collect_published timing budget runs out before redelivery completes. Fix: drain the subscriber's eventloop inside `subscribe_client` until we see the SubAck for our subscribe. That guarantees the subscription is active at the broker BEFORE the function returns, so non-retained publishes from the publisher's send loop arrive normally. Also made the subscriber client_id include a per-call nanosecond suffix so subscribers in back-to-back tests can't collide on a shared ID (paranoia, complementary to the publisher-side fix from 2aeed32a7). Refs PR #778, issue #776. Co-Authored-By: claude-flow --- .../tests/mqtt_integration.rs | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs index 2b17391e..803b0455 100644 --- a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs +++ b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs @@ -96,17 +96,44 @@ fn make_builder(node: &str) -> OwnedDiscoveryBuilder { } async fn subscribe_client(port: u16, topics: &[&str]) -> (AsyncClient, EventLoop) { + // Per-call unique client_id so subscribers across tests don't take + // each other over. + let suffix: u64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.subsec_nanos() as u64) + .unwrap_or(0); let mut opts = MqttOptions::new( - format!("ruview-test-sub-{}", std::process::id()), + format!("ruview-test-sub-{}-{}", std::process::id(), suffix), "127.0.0.1", port, ); opts.set_keep_alive(Duration::from_secs(10)); opts.set_clean_session(true); - let (client, eventloop) = AsyncClient::new(opts, 256); + let (client, mut eventloop) = AsyncClient::new(opts, 256); for t in topics { client.subscribe(*t, QoS::AtLeastOnce).await.unwrap(); } + + // Drive the eventloop until we see the SubAck for our last subscribe. + // Without this the SUBSCRIBE packet is only queued in rumqttc's + // outbound channel; it doesn't reach the broker until something + // pumps the eventloop. The caller's `collect_published` does that, + // but by then the publisher may already have emitted state + // messages — including the retained ones that won't be re-sent. + let until = tokio::time::Instant::now() + Duration::from_secs(3); + while tokio::time::Instant::now() < until { + let remain = until - tokio::time::Instant::now(); + match timeout(remain, eventloop.poll()).await { + Ok(Ok(Event::Incoming(Packet::SubAck(_)))) => break, + Ok(Ok(_)) => continue, + Ok(Err(e)) => { + eprintln!("[subscribe_client] eventloop error before SubAck: {e}"); + break; + } + Err(_) => break, + } + } + (client, eventloop) }