cog-ha-matter (ADR-116 P3): wire publisher::spawn into main.rs

P3 closes the publisher wiring loop. `main.rs` now:

  1. builds `PublisherInputs` from CLI args via the pure helper
     extracted last iter,
  2. opens a `broadcast::channel::<VitalsSnapshot>(256)`,
  3. calls `runtime::spawn_publisher(inputs, rx)` — a thin
     wrapper around ADR-115's `publisher::spawn` that owns the
     `Arc<MqttConfig>` wrap,
  4. holds the tx side so the channel stays open until P3.5
     wires the sensing-server bridge,
  5. awaits Ctrl-C or unexpected publisher exit (logged at WARN).

Two new tests:
  * `spawn_publisher_returns_live_handle_without_broker` — proves
    the wiring compiles and the rumqttc event loop survives an
    unreachable broker (it retries internally; we abort the handle
    inside 100 ms). Catches breakage from a future refactor that
    accidentally pre-validates host reachability.
  * `default_state_channel_capacity_is_reasonable` — locks the
    `DEFAULT_STATE_CHANNEL_CAPACITY = 256` default; a regression to
    e.g. 1 would surface here instead of as a dropped frame in
    production under bursty multi-Seed federation.

12/12 cog-ha-matter tests green (10 → 12).

ADR-116 phase table: P3 flipped from "in progress" to  wiring done,
with the P3.5 follow-up (sensing-server `/v1/snapshot` WS bridge)
explicitly named.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-23 17:59:02 -04:00
parent 5723f505b7
commit c9f005c360
3 changed files with 97 additions and 6 deletions

View File

@ -94,7 +94,7 @@ Ranked by build cost × user impact:
|---|---|---|
| **P1** | Research dossier ([`docs/research/ADR-116-ha-matter-cog-research.md`](../research/ADR-116-ha-matter-cog-research.md)) | ✅ **done** — 8 sections, 30+ citations, v1 scope ranked |
| **P2** | Cog crate scaffold (`v2/crates/cog-ha-matter/`) — Cargo.toml + `src/{lib,main,manifest}.rs`, workspace member, CLI args, `--print-manifest` flag, 2 manifest unit tests | ✅ **done**`cargo check` + `cargo test` green |
| **P3** | Wrap existing ADR-115 MQTT publisher as cog entry point | in progress — `runtime::build_publisher_inputs` extracted (pure helper, 8 unit tests) converts cog CLI → `MqttConfig + OwnedDiscoveryBuilder`. Next: `tokio::spawn(publisher::run(...))` in `main.rs` |
| **P3** | Wrap existing ADR-115 MQTT publisher as cog entry point | **wiring done**`main.rs` boots ADR-115's `publisher::spawn` via `runtime::spawn_publisher` thin wrapper, holds a long-lived `broadcast::Sender<VitalsSnapshot>`, awaits Ctrl-C. Live-handle test green without a broker. Next (P3.5): subscribe to sensing-server `/v1/snapshot` WS and republish into the channel. |
| **P4** | Seed-native enhancements (embedded broker, mDNS, witness) | pending |
| **P5** | RuVector-backed threshold learning (SONA adaptation) | pending |
| **P6** | Multi-Seed federation (cross-Seed dedup + witness) | pending |

View File

@ -8,7 +8,10 @@
use std::process::ExitCode;
use clap::Parser;
use tracing::info;
use cog_ha_matter::runtime;
use tokio::sync::broadcast;
use tracing::{info, warn};
use wifi_densepose_sensing_server::mqtt::state::VitalsSnapshot;
#[derive(Parser, Debug)]
#[command(
@ -87,8 +90,40 @@ async fn main() -> ExitCode {
return ExitCode::SUCCESS;
}
// P2 stops here — P3 will boot the ADR-115 MQTT publisher in a
// `tokio::spawn` and register the mDNS responder + control plane.
info!("scaffold-only — P3 wires the MQTT publisher next");
// P3: boot the ADR-115 publisher. The broadcast tx is held by
// main so the channel doesn't close before the sensing-server
// bridge (next iter) wires its VitalsSnapshot producer in.
let identity = runtime::CogIdentity::default_for_build();
let inputs = runtime::build_publisher_inputs(
&args.mqtt_host,
args.mqtt_port,
args.privacy_mode,
identity,
);
let (state_tx, state_rx) =
broadcast::channel::<VitalsSnapshot>(runtime::DEFAULT_STATE_CHANNEL_CAPACITY);
let publisher_handle = runtime::spawn_publisher(inputs, state_rx);
info!(
capacity = runtime::DEFAULT_STATE_CHANNEL_CAPACITY,
"publisher spawned — awaiting VitalsSnapshot bridge (P3.5)"
);
// P3.5 (next iter): subscribe to the sensing-server's
// `/v1/snapshot` WebSocket and republish into `state_tx`. Until
// that lands the cog connects to MQTT, advertises discovery,
// and just doesn't have any state to publish — exactly what an
// HA install with no nodes online looks like.
let _ = &state_tx;
// Wait on Ctrl-C so the cog runs as a long-lived daemon under
// the Seed's process supervisor.
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("ctrl-c received — shutting down");
}
joined = publisher_handle => {
warn!(?joined, "publisher task exited unexpectedly");
}
}
ExitCode::SUCCESS
}

View File

@ -19,9 +19,13 @@
//! the cog's tests and the `--print-manifest` path can exercise the
//! builder without pulling in the rumqttc event loop.
use std::sync::Arc;
use tokio::{sync::broadcast, task::JoinHandle};
use wifi_densepose_sensing_server::mqtt::{
config::{MqttConfig, PublishRates, TlsConfig},
publisher::OwnedDiscoveryBuilder,
publisher::{self, OwnedDiscoveryBuilder},
state::VitalsSnapshot,
DEFAULT_DISCOVERY_PREFIX, MANUFACTURER,
};
@ -100,6 +104,31 @@ pub fn build_publisher_inputs(
PublisherInputs { config, discovery }
}
/// Default broadcast-channel capacity for the cog's VitalsSnapshot
/// stream. Matches the sensing-server's own default so the cog
/// doesn't bottleneck the publisher under bursty loads (multi-Seed
/// federation, mesh re-sync events).
pub const DEFAULT_STATE_CHANNEL_CAPACITY: usize = 256;
/// Spawn the ADR-115 MQTT publisher with the cog's typed inputs.
///
/// Thin wrapper around [`publisher::spawn`] that:
/// 1. wraps `inputs.config` in `Arc` (publisher requires shared
/// ownership across reconnects),
/// 2. moves `inputs.discovery` into the spawn (publisher clones it
/// per reconnect; `OwnedDiscoveryBuilder` is `Clone`),
/// 3. hands the broadcast receiver across without an intermediate.
///
/// Returning the `JoinHandle` lets `main.rs` await it on shutdown
/// (or `abort()` it from a control-plane handler).
pub fn spawn_publisher(
inputs: PublisherInputs,
state_rx: broadcast::Receiver<VitalsSnapshot>,
) -> JoinHandle<()> {
let PublisherInputs { config, discovery } = inputs;
publisher::spawn(Arc::new(config), discovery, state_rx)
}
#[cfg(test)]
mod tests {
use super::*;
@ -174,6 +203,33 @@ mod tests {
assert!(matches!(out.config.tls, TlsConfig::Off));
}
#[tokio::test]
async fn spawn_publisher_returns_live_handle_without_broker() {
// No real broker on this port — rumqttc retries internally so
// the spawned task stays alive. We just prove the wiring
// compiles + the JoinHandle is not pre-finished. Aborting
// immediately keeps the test under 100 ms.
let inputs = build_publisher_inputs("127.0.0.1", 1, false, id());
let (tx, rx) = broadcast::channel::<VitalsSnapshot>(DEFAULT_STATE_CHANNEL_CAPACITY);
let handle = spawn_publisher(inputs, rx);
// Task is still running (not pre-finished by config validation).
assert!(!handle.is_finished());
// Keep `tx` alive past the handle abort so the receiver side
// doesn't panic on drop before the task notices the channel
// closed.
handle.abort();
let _ = handle.await; // joined, may be Err(Cancelled) — OK.
drop(tx);
}
#[test]
fn default_state_channel_capacity_is_reasonable() {
// Lock the default so a regression to e.g. 1 surfaces a named
// test. Multi-Seed federation needs headroom for bursty
// mesh re-sync events.
assert!(DEFAULT_STATE_CHANNEL_CAPACITY >= 64);
}
#[test]
fn default_identity_carries_pkg_version_and_pid() {
let identity = CogIdentity::default_for_build();