diff --git a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs index 2b17391e..803b0455 100644 --- a/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs +++ b/v2/crates/wifi-densepose-sensing-server/tests/mqtt_integration.rs @@ -96,17 +96,44 @@ fn make_builder(node: &str) -> OwnedDiscoveryBuilder { } async fn subscribe_client(port: u16, topics: &[&str]) -> (AsyncClient, EventLoop) { + // Per-call unique client_id so subscribers across tests don't take + // each other over. + let suffix: u64 = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.subsec_nanos() as u64) + .unwrap_or(0); let mut opts = MqttOptions::new( - format!("ruview-test-sub-{}", std::process::id()), + format!("ruview-test-sub-{}-{}", std::process::id(), suffix), "127.0.0.1", port, ); opts.set_keep_alive(Duration::from_secs(10)); opts.set_clean_session(true); - let (client, eventloop) = AsyncClient::new(opts, 256); + let (client, mut eventloop) = AsyncClient::new(opts, 256); for t in topics { client.subscribe(*t, QoS::AtLeastOnce).await.unwrap(); } + + // Drive the eventloop until we see the SubAck for our last subscribe. + // Without this the SUBSCRIBE packet is only queued in rumqttc's + // outbound channel; it doesn't reach the broker until something + // pumps the eventloop. The caller's `collect_published` does that, + // but by then the publisher may already have emitted state + // messages — including the retained ones that won't be re-sent. + let until = tokio::time::Instant::now() + Duration::from_secs(3); + while tokio::time::Instant::now() < until { + let remain = until - tokio::time::Instant::now(); + match timeout(remain, eventloop.poll()).await { + Ok(Ok(Event::Incoming(Packet::SubAck(_)))) => break, + Ok(Ok(_)) => continue, + Ok(Err(e)) => { + eprintln!("[subscribe_client] eventloop error before SubAck: {e}"); + break; + } + Err(_) => break, + } + } + (client, eventloop) }