Merge pull request #904 from ruvnet/fix/898-mqtt-per-node-devices
fix(mqtt): one Home-Assistant device per node — closes #898
This commit is contained in:
commit
9df908d898
|
|
@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
|
- **MQTT multi-node deployments now create one Home-Assistant device per node — closes #898.** After the #872 MQTT wiring landed, the JSON→`VitalsSnapshot` bridge hard-coded a single `node_id` (the MQTT client id) and the publisher used a single `OwnedDiscoveryBuilder`, so every physical node collapsed into one device (`identifiers:["wifi_densepose_wifi-densepose-1"]`), contradicting the "one device per node" docs. The bridge now emits one snapshot per node in the sensing update's `nodes[]` (each with its own `node_id` + RSSI, falling back to a single aggregate snapshot for wifi/simulate sources), and the publisher derives a per-node builder (`OwnedDiscoveryBuilder::for_node`) that publishes discovery + availability lazily on first sight of each `node_id` and routes state to per-node topics — yielding N distinct HA devices with per-node availability/LWT. Unit-tested (distinct nodes → distinct `wifi_densepose_<node>` identifiers); 71 MQTT tests pass.
|
||||||
- **Person count no longer pinned to 1 — addresses #803.** The aggregate occupancy reported by the sensing server was derived from `smoothed_person_score`, an EMA-smoothed *activity* score (amplitude variance / motion / spectral energy). That score saturates near a single occupant — one moving person maxes it out — so it cannot discriminate occupancy *count* and stayed clamped at 1 across S3/C6 and the Python/Docker/Rust servers. Meanwhile the count-aware per-node estimates the ESP32 paths already compute (firmware `n_persons`, and the DynamicMinCut `corr_persons`) were stashed in `NodeState::prev_person_count` and then **discarded** by the aggregator (same dead-wiring class as #872). The aggregator now takes `max(activity_count, node_max)` via a unit-tested `aggregate_person_count` helper, so a node positively estimating 2–3 occupants is surfaced instead of overwritten. The fix can only ever *raise* the count when a node reports more people, so the single-occupant case is provably never inflated (regression-guarded by test). **Second half:** the pure-CSI per-node path itself clamped its own estimate — the DynamicMinCut occupancy (`estimate_persons_from_correlation`, 0–3) was mapped to a score via `corr_persons / 3.0`, putting 2 people at 0.667, *just under* the 0.70 up-threshold of `score_to_person_count`, so the per-node count never climbed past 1 (so `node_max` was also stuck at 1 for CSI-only nodes). Replaced it with a threshold-aligned `corr_persons_to_score` mapping (1→0.40, 2→0.74, 3→0.96) whose steady state round-trips back to the same count through the EMA + hysteresis, while still gating transient noise. A convergence test replays the exact EMA loop to prove min-cut=2 now reports 2 (and documents that the old `/3.0` mapping reported 1). Full multi-person accuracy still depends on the underlying estimator quality; this removes the two server-side clamps that masked it. 586 sensing-server tests pass.
|
- **Person count no longer pinned to 1 — addresses #803.** The aggregate occupancy reported by the sensing server was derived from `smoothed_person_score`, an EMA-smoothed *activity* score (amplitude variance / motion / spectral energy). That score saturates near a single occupant — one moving person maxes it out — so it cannot discriminate occupancy *count* and stayed clamped at 1 across S3/C6 and the Python/Docker/Rust servers. Meanwhile the count-aware per-node estimates the ESP32 paths already compute (firmware `n_persons`, and the DynamicMinCut `corr_persons`) were stashed in `NodeState::prev_person_count` and then **discarded** by the aggregator (same dead-wiring class as #872). The aggregator now takes `max(activity_count, node_max)` via a unit-tested `aggregate_person_count` helper, so a node positively estimating 2–3 occupants is surfaced instead of overwritten. The fix can only ever *raise* the count when a node reports more people, so the single-occupant case is provably never inflated (regression-guarded by test). **Second half:** the pure-CSI per-node path itself clamped its own estimate — the DynamicMinCut occupancy (`estimate_persons_from_correlation`, 0–3) was mapped to a score via `corr_persons / 3.0`, putting 2 people at 0.667, *just under* the 0.70 up-threshold of `score_to_person_count`, so the per-node count never climbed past 1 (so `node_max` was also stuck at 1 for CSI-only nodes). Replaced it with a threshold-aligned `corr_persons_to_score` mapping (1→0.40, 2→0.74, 3→0.96) whose steady state round-trips back to the same count through the EMA + hysteresis, while still gating transient noise. A convergence test replays the exact EMA loop to prove min-cut=2 now reports 2 (and documents that the old `/3.0` mapping reported 1). Full multi-person accuracy still depends on the underlying estimator quality; this removes the two server-side clamps that masked it. 586 sensing-server tests pass.
|
||||||
- **MQTT publisher now actually runs (`--mqtt`) — closes #872.** The `--mqtt*` flags were defined only in `cli::Args` (dead code, referenced nowhere) while the binary parses a *separate* `main::Args` with no mqtt fields, and `main.rs` never started the `mqtt::` publisher — so MQTT/Home-Assistant integration was completely unwired (`--mqtt` errored as an unexpected argument, and even with the Docker image's `--features mqtt` build the publisher never ran). Earlier attempts chased a Docker *rebuild*; the real cause was disconnected *code*. Extracted the flags into a shared `cli::MqttArgs` (`#[command(flatten)]` into both structs), spawn the publisher on `--mqtt`, and bridge the JSON sensing broadcast into the typed `VitalsSnapshot` stream with a defensive `serde_json::Value` mapping. Verified end-to-end against `mosquitto`: 20 HA auto-discovery entities + live state (presence/person-count/…). 577 (default) / 580 (`--features mqtt`) tests pass.
|
- **MQTT publisher now actually runs (`--mqtt`) — closes #872.** The `--mqtt*` flags were defined only in `cli::Args` (dead code, referenced nowhere) while the binary parses a *separate* `main::Args` with no mqtt fields, and `main.rs` never started the `mqtt::` publisher — so MQTT/Home-Assistant integration was completely unwired (`--mqtt` errored as an unexpected argument, and even with the Docker image's `--features mqtt` build the publisher never ran). Earlier attempts chased a Docker *rebuild*; the real cause was disconnected *code*. Extracted the flags into a shared `cli::MqttArgs` (`#[command(flatten)]` into both structs), spawn the publisher on `--mqtt`, and bridge the JSON sensing broadcast into the typed `VitalsSnapshot` stream with a defensive `serde_json::Value` mapping. Verified end-to-end against `mosquitto`: 20 HA auto-discovery entities + live state (presence/person-count/…). 577 (default) / 580 (`--features mqtt`) tests pass.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6213,24 +6213,44 @@ async fn main() {
|
||||||
Some(_) => 1.0,
|
Some(_) => 1.0,
|
||||||
None => 0.0,
|
None => 0.0,
|
||||||
};
|
};
|
||||||
let snap = mqtt::state::VitalsSnapshot {
|
let ts = (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64;
|
||||||
node_id: node_id.clone(),
|
let conf = cls["confidence"].as_f64().unwrap_or(0.0);
|
||||||
timestamp_ms: (v["timestamp"].as_f64().unwrap_or(0.0) * 1000.0) as i64,
|
let presence_score = if presence { conf.max(0.0) } else { 0.0 };
|
||||||
|
let breathing = vit["breathing_rate_bpm"].as_f64();
|
||||||
|
let hr = vit["heart_rate_bpm"].as_f64();
|
||||||
|
// #898: emit one snapshot per physical node so each
|
||||||
|
// surfaces as its own Home-Assistant device (with
|
||||||
|
// its own RSSI + availability). Falls back to a
|
||||||
|
// single aggregate snapshot when there is no
|
||||||
|
// per-node data (e.g. wifi / simulate sources).
|
||||||
|
let mk = |nid: String, rssi: Option<f64>| mqtt::state::VitalsSnapshot {
|
||||||
|
node_id: nid,
|
||||||
|
timestamp_ms: ts,
|
||||||
presence,
|
presence,
|
||||||
motion,
|
motion,
|
||||||
presence_score: if presence {
|
presence_score,
|
||||||
cls["confidence"].as_f64().unwrap_or(1.0)
|
breathing_rate_bpm: breathing,
|
||||||
} else {
|
heartrate_bpm: hr,
|
||||||
0.0
|
|
||||||
},
|
|
||||||
breathing_rate_bpm: vit["breathing_rate_bpm"].as_f64(),
|
|
||||||
heartrate_bpm: vit["heart_rate_bpm"].as_f64(),
|
|
||||||
n_persons,
|
n_persons,
|
||||||
rssi_dbm: v["nodes"][0]["rssi_dbm"].as_f64(),
|
rssi_dbm: rssi,
|
||||||
vital_confidence: cls["confidence"].as_f64().unwrap_or(0.0),
|
vital_confidence: conf,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
let _ = vtx.send(snap);
|
match v["nodes"].as_array() {
|
||||||
|
Some(arr) if !arr.is_empty() => {
|
||||||
|
for node in arr {
|
||||||
|
let n = node["node_id"].as_u64().unwrap_or(0);
|
||||||
|
let nid = format!("{node_id}-node{n}");
|
||||||
|
let _ = vtx.send(mk(nid, node["rssi_dbm"].as_f64()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
let _ = vtx.send(mk(
|
||||||
|
node_id.clone(),
|
||||||
|
v["nodes"][0]["rssi_dbm"].as_f64(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
tracing::info!("MQTT publisher started -> {host}:{port}");
|
tracing::info!("MQTT publisher started -> {host}:{port}");
|
||||||
|
|
|
||||||
|
|
@ -117,6 +117,23 @@ impl OwnedDiscoveryBuilder {
|
||||||
via_device: self.via_device.as_deref(),
|
via_device: self.via_device.as_deref(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Derive a per-node builder from this base (issue #898). Each physical
|
||||||
|
/// RuView node must surface as its own Home-Assistant device — the base
|
||||||
|
/// builder's `node_id` (the MQTT client id) is replaced with the actual
|
||||||
|
/// node id, giving a distinct `wifi_densepose_<node>` device identifier
|
||||||
|
/// and a per-node friendly name, instead of collapsing every node into a
|
||||||
|
/// single hard-coded device.
|
||||||
|
pub fn for_node(&self, node_id: &str) -> OwnedDiscoveryBuilder {
|
||||||
|
OwnedDiscoveryBuilder {
|
||||||
|
discovery_prefix: self.discovery_prefix.clone(),
|
||||||
|
node_id: node_id.to_string(),
|
||||||
|
node_friendly_name: Some(format!("RuView node {node_id}")),
|
||||||
|
sw_version: self.sw_version.clone(),
|
||||||
|
model: self.model.clone(),
|
||||||
|
via_device: self.via_device.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Core run loop. Pumps the broadcast channel + the MQTT event loop in
|
/// Core run loop. Pumps the broadcast channel + the MQTT event loop in
|
||||||
|
|
@ -129,20 +146,19 @@ async fn run(
|
||||||
let opts = build_mqtt_options(&cfg);
|
let opts = build_mqtt_options(&cfg);
|
||||||
let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256);
|
let (client, mut eventloop): (AsyncClient, EventLoop) = AsyncClient::new(opts, 256);
|
||||||
|
|
||||||
let builder_borrowed = builder_owned.as_borrowed();
|
|
||||||
let entities = DiscoveryBuilder::enabled_entities(
|
let entities = DiscoveryBuilder::enabled_entities(
|
||||||
cfg.privacy_mode,
|
cfg.privacy_mode,
|
||||||
cfg.publish_pose,
|
cfg.publish_pose,
|
||||||
&[], // no_semantic — wire from cli::Args in P3.5
|
&[], // no_semantic — wire from cli::Args in P3.5
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
|
// #898: one Home-Assistant device per node. Discovery + availability are
|
||||||
warn!("[mqtt] initial discovery publish failed: {e}");
|
// published lazily the first time a snapshot for a given node_id arrives;
|
||||||
}
|
// each node's builder + availability are retained here for heartbeats and
|
||||||
let avail = NodeAvailability::for_builder(&builder_borrowed, &entities);
|
// the offline LWT. (Previously a single hard-coded builder collapsed every
|
||||||
if let Err(e) = publish_availability(&client, &avail, "online").await {
|
// node into one device.)
|
||||||
warn!("[mqtt] initial availability publish failed: {e}");
|
let mut nodes: std::collections::HashMap<String, (OwnedDiscoveryBuilder, NodeAvailability)> =
|
||||||
}
|
std::collections::HashMap::new();
|
||||||
|
|
||||||
let mut rate_limiter = RateLimiter::new();
|
let mut rate_limiter = RateLimiter::new();
|
||||||
let mut last_heartbeat = Instant::now();
|
let mut last_heartbeat = Instant::now();
|
||||||
|
|
@ -179,14 +195,20 @@ async fn run(
|
||||||
// Periodic heartbeat / discovery refresh.
|
// Periodic heartbeat / discovery refresh.
|
||||||
_ = tokio::time::sleep(Duration::from_secs(1)) => {
|
_ = tokio::time::sleep(Duration::from_secs(1)) => {
|
||||||
if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT {
|
if last_heartbeat.elapsed() >= AVAILABILITY_HEARTBEAT {
|
||||||
if let Err(e) = publish_availability(&client, &avail, "online").await {
|
for (_, na) in nodes.values() {
|
||||||
warn!("[mqtt] heartbeat publish failed: {e}");
|
if let Err(e) = publish_availability(&client, na, "online").await {
|
||||||
|
warn!("[mqtt] heartbeat publish failed: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
last_heartbeat = Instant::now();
|
last_heartbeat = Instant::now();
|
||||||
}
|
}
|
||||||
if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) {
|
if last_refresh.elapsed() >= Duration::from_secs(cfg.refresh_secs) {
|
||||||
if let Err(e) = publish_all_discovery(&client, &builder_borrowed, &entities).await {
|
for (nb, _) in nodes.values() {
|
||||||
warn!("[mqtt] discovery refresh failed: {e}");
|
if let Err(e) =
|
||||||
|
publish_all_discovery(&client, &nb.as_borrowed(), &entities).await
|
||||||
|
{
|
||||||
|
warn!("[mqtt] discovery refresh failed: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
last_refresh = Instant::now();
|
last_refresh = Instant::now();
|
||||||
}
|
}
|
||||||
|
|
@ -197,18 +219,39 @@ async fn run(
|
||||||
match recv {
|
match recv {
|
||||||
Ok(snap) => {
|
Ok(snap) => {
|
||||||
let elapsed = start_instant.elapsed();
|
let elapsed = start_instant.elapsed();
|
||||||
publish_snapshot(&client, &builder_borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await;
|
// #898: on first sight of a node_id, publish that
|
||||||
|
// node's discovery + availability; then route its
|
||||||
|
// state to per-node topics.
|
||||||
|
if !nodes.contains_key(&snap.node_id) {
|
||||||
|
let nb = builder_owned.for_node(&snap.node_id);
|
||||||
|
let borrowed = nb.as_borrowed();
|
||||||
|
if let Err(e) =
|
||||||
|
publish_all_discovery(&client, &borrowed, &entities).await
|
||||||
|
{
|
||||||
|
warn!("[mqtt] node {} discovery failed: {e}", snap.node_id);
|
||||||
|
}
|
||||||
|
let na = NodeAvailability::for_builder(&borrowed, &entities);
|
||||||
|
if let Err(e) = publish_availability(&client, &na, "online").await {
|
||||||
|
warn!("[mqtt] node {} availability failed: {e}", snap.node_id);
|
||||||
|
}
|
||||||
|
nodes.insert(snap.node_id.clone(), (nb, na));
|
||||||
|
}
|
||||||
|
let borrowed = nodes[&snap.node_id].0.as_borrowed();
|
||||||
|
publish_snapshot(&client, &borrowed, &snap, &cfg, &mut rate_limiter, elapsed).await;
|
||||||
}
|
}
|
||||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||||
warn!("[mqtt] lagged behind broadcast by {n} messages — dropped");
|
warn!("[mqtt] lagged behind broadcast by {n} messages — dropped");
|
||||||
}
|
}
|
||||||
Err(broadcast::error::RecvError::Closed) => {
|
Err(broadcast::error::RecvError::Closed) => {
|
||||||
info!("[mqtt] broadcast channel closed, draining");
|
info!("[mqtt] broadcast channel closed, draining");
|
||||||
// Publish offline before exit.
|
// Publish offline for every known node before exit.
|
||||||
let _ = publish_availability(&client, &avail, "offline").await;
|
for (_, na) in nodes.values() {
|
||||||
|
let _ = publish_availability(&client, na, "offline").await;
|
||||||
|
}
|
||||||
let _ = client.disconnect().await;
|
let _ = client.disconnect().await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -296,3 +339,52 @@ async fn publish_state(client: &AsyncClient, m: &StateMessage) -> Result<(), Cli
|
||||||
};
|
};
|
||||||
client.publish(&m.topic, qos, m.retain, m.payload.clone()).await
|
client.publish(&m.topic, qos, m.retain, m.payload.clone()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod per_node_device_tests {
|
||||||
|
//! Issue #898 — each physical node must surface as its own Home-Assistant
|
||||||
|
//! device, not collapse into one hard-coded device.
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
fn base() -> OwnedDiscoveryBuilder {
|
||||||
|
OwnedDiscoveryBuilder {
|
||||||
|
discovery_prefix: "homeassistant".into(),
|
||||||
|
node_id: "wifi-densepose-1".into(),
|
||||||
|
node_friendly_name: Some("RuView".into()),
|
||||||
|
sw_version: "0.0.0".into(),
|
||||||
|
model: "test".into(),
|
||||||
|
via_device: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn device_identifiers(b: &OwnedDiscoveryBuilder) -> Vec<String> {
|
||||||
|
b.as_borrowed().build(EntityKind::Presence).device.identifiers
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn for_node_overrides_node_id_and_friendly_name() {
|
||||||
|
let n = base().for_node("node-A");
|
||||||
|
assert_eq!(n.node_id, "node-A");
|
||||||
|
assert_eq!(n.node_friendly_name.as_deref(), Some("RuView node node-A"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn distinct_nodes_yield_distinct_ha_device_identifiers() {
|
||||||
|
let b = base();
|
||||||
|
let a = device_identifiers(&b.for_node("node-A"));
|
||||||
|
let c = device_identifiers(&b.for_node("node-B"));
|
||||||
|
assert_eq!(a, vec!["wifi_densepose_node-A".to_string()]);
|
||||||
|
assert_eq!(c, vec!["wifi_densepose_node-B".to_string()]);
|
||||||
|
assert_ne!(a, c, "#898: two nodes must not collapse into one device");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn single_node_keeps_a_stable_identity() {
|
||||||
|
// Two snapshots from the same node map to the same device.
|
||||||
|
let b = base();
|
||||||
|
assert_eq!(
|
||||||
|
device_identifiers(&b.for_node("node-7")),
|
||||||
|
device_identifiers(&b.for_node("node-7"))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -171,12 +171,28 @@ async fn discovery_topics_appear_on_broker() {
|
||||||
// Spawn the publisher.
|
// Spawn the publisher.
|
||||||
let cfg = make_cfg(port, false, "discovery");
|
let cfg = make_cfg(port, false, "discovery");
|
||||||
let builder = make_builder("inttest1");
|
let builder = make_builder("inttest1");
|
||||||
let (_tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
|
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
|
||||||
let _handle = spawn(cfg, builder, rx);
|
let _handle = spawn(cfg, builder, rx);
|
||||||
|
|
||||||
|
// #898: discovery is now published per-node the first time a snapshot for
|
||||||
|
// that node_id arrives (not eagerly at startup). Drive snapshots for
|
||||||
|
// "inttest1" throughout the window so its device's discovery lands — same
|
||||||
|
// pattern as state_messages_published_on_snapshot_broadcast.
|
||||||
|
let tx_bg = tx.clone();
|
||||||
|
let drive = tokio::spawn(async move {
|
||||||
|
for _ in 0..60 {
|
||||||
|
let _ = tx_bg.send(VitalsSnapshot {
|
||||||
|
node_id: "inttest1".into(),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// Drain the subscriber for up to 6 s — enough for initial discovery
|
// Drain the subscriber for up to 6 s — enough for initial discovery
|
||||||
// + first availability publication.
|
// + first availability publication.
|
||||||
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
|
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
|
||||||
|
drive.abort();
|
||||||
let _ = sub.disconnect().await;
|
let _ = sub.disconnect().await;
|
||||||
|
|
||||||
// Assertions: at least the presence + heart_rate + fall discovery
|
// Assertions: at least the presence + heart_rate + fall discovery
|
||||||
|
|
@ -221,10 +237,23 @@ async fn privacy_mode_suppresses_biometric_discovery() {
|
||||||
|
|
||||||
let cfg = make_cfg(port, /* privacy_mode = */ true, "privacy");
|
let cfg = make_cfg(port, /* privacy_mode = */ true, "privacy");
|
||||||
let builder = make_builder("inttest2");
|
let builder = make_builder("inttest2");
|
||||||
let (_tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
|
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(32);
|
||||||
let _handle = spawn(cfg, builder, rx);
|
let _handle = spawn(cfg, builder, rx);
|
||||||
|
|
||||||
|
// #898: per-node discovery is triggered by a snapshot for that node_id.
|
||||||
|
let tx_bg = tx.clone();
|
||||||
|
let drive = tokio::spawn(async move {
|
||||||
|
for _ in 0..60 {
|
||||||
|
let _ = tx_bg.send(VitalsSnapshot {
|
||||||
|
node_id: "inttest2".into(),
|
||||||
|
..Default::default()
|
||||||
|
});
|
||||||
|
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
|
let msgs = collect_published(&mut sub_loop, Duration::from_secs(6)).await;
|
||||||
|
drive.abort();
|
||||||
let _ = sub.disconnect().await;
|
let _ = sub.disconnect().await;
|
||||||
|
|
||||||
let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect();
|
let topics: Vec<&str> = msgs.iter().map(|(t, _, _)| t.as_str()).collect();
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue