feat(adr-118/p5.7): publish_discovery bootstrap helper (193/193 GREEN)
Iter 27. The free function that closes the discovery ↔ state loop on
the publishing side. Mirrors publish_event from iter 22 but for the
HA-DISCO config payloads from iter 26.
Added (in src/ha_discovery.rs, gated on `feature = "std"`):
- publish_discovery<P: Publish>(publisher, node_id, class) -> Result<usize, P::Error>
Renders the per-class discovery payloads (iter 26) and forwards
each through publisher.publish(). Returns the count or short-
circuits on first error.
Docstring documents the canonical bootstrap pattern: separate
retain-true publisher for discovery, retain-false publisher for state,
both sharing the same broker connection if desired.
- pub use publish_discovery from lib.rs
tests/ha_discovery_publish.rs (6 named tests, all green):
publish_discovery_returns_six_for_anonymous_class
publish_discovery_returns_five_for_restricted_class
(no identity_risk in captured topics)
publish_discovery_returns_zero_for_raw_and_derived
(HA-DISCO + class gating composition: raw / derived never
advertised to HA)
publish_discovery_topics_are_homeassistant_config_format
publish_discovery_short_circuits_on_publisher_error
(FailingPub fails on 4th publish; first 3 messages land, then error)
bootstrap_pattern_publishes_discovery_then_state_through_shared_publisher
*** End-to-end bootstrap proof: one Arc<Mutex<CapturePublisher>>
used for both discovery (publish_discovery) and state
(BfldPipelineHandle::spawn + send). Asserts:
- 6 + 5 = 11 messages captured in order
- First 6 topics are homeassistant/.../config
- Next 5 topics are ruview/<node>/bfld/.../state
Validates the iter-25 Arc<Mutex<P>> Publish adapter + iter-26
discovery + iter-27 bootstrap helper compose correctly. ***
ACs progressed:
- ADR-122 §2.1 — bootstrap surface complete. Operator writes one
publish_discovery call at startup, then BfldPipelineHandle::send for
every frame. HA finds the device on first restart after discovery
was retained on the broker.
- ADR-122 AC1 (six entities per node) — discovery and state phases
share the same six-entity definition; the bootstrap test proves they
reach the broker in the documented order.
Test config:
- cargo test --no-default-features → 72 passed (publish_discovery cfg-out)
- cargo test → 193 passed (187 + 6)
Out of scope (next iter target):
- GitHub Actions workflow with mosquitto Docker service. Without this
the iter-24 live integration test stays in skip mode in CI; with it,
every PR would prove the full publish_discovery + handle stack works
end-to-end against a real broker.
- HA blueprint shipping (ADR-122 §2.6): three operator-ready YAML
blueprints (presence-driven lighting / motion-aware HVAC / identity-
risk anomaly notification) packaged in cog-ha-matter/blueprints/.
Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
parent
05609ef51c
commit
d356e1d5fd
|
|
@ -13,9 +13,48 @@
|
|||
|
||||
#![cfg(feature = "std")]
|
||||
|
||||
use crate::mqtt_topics::TopicMessage;
|
||||
use crate::mqtt_topics::{Publish, TopicMessage};
|
||||
use crate::PrivacyClass;
|
||||
|
||||
/// Bootstrap helper: render the per-node HA-DISCO config payloads and forward
|
||||
/// each through `publisher`. Returns the count published, or short-circuits
|
||||
/// on the first publisher error.
|
||||
///
|
||||
/// Typical bootstrap pattern combining iter 25's `Arc<Mutex<P>>` adapter and
|
||||
/// iter 23's retain-aware `RumqttPublisher`:
|
||||
///
|
||||
/// ```ignore
|
||||
/// use std::sync::{Arc, Mutex};
|
||||
/// use wifi_densepose_bfld::{
|
||||
/// publish_discovery, BfldConfig, BfldPipeline, BfldPipelineHandle,
|
||||
/// PrivacyClass, RumqttPublisher,
|
||||
/// };
|
||||
/// use rumqttc::MqttOptions;
|
||||
///
|
||||
/// let opts = MqttOptions::new("seed-01", "broker.local", 1883);
|
||||
/// let (retained_pub, _conn) = RumqttPublisher::connect(opts.clone(), 64);
|
||||
/// let mut retained_pub = retained_pub.with_retain(true);
|
||||
/// publish_discovery(&mut retained_pub, "seed-01", PrivacyClass::Anonymous)?;
|
||||
///
|
||||
/// let (state_pub, _conn) = RumqttPublisher::connect(opts, 64);
|
||||
/// let pipeline = BfldPipeline::new(BfldConfig::new("seed-01"));
|
||||
/// let handle = BfldPipelineHandle::spawn(pipeline, state_pub);
|
||||
/// // handle.send(...) from now on
|
||||
/// # Ok::<(), rumqttc::ClientError>(())
|
||||
/// ```
|
||||
pub fn publish_discovery<P: Publish>(
|
||||
publisher: &mut P,
|
||||
node_id: &str,
|
||||
class: PrivacyClass,
|
||||
) -> Result<usize, P::Error> {
|
||||
let mut count = 0;
|
||||
for msg in render_discovery_payloads(node_id, class) {
|
||||
publisher.publish(&msg)?;
|
||||
count += 1;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Render every HA-DISCO config message for the given node at `class`. Returns
|
||||
/// an empty `Vec` for classes < `Anonymous` (HA doesn't see raw / derived).
|
||||
#[must_use]
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ pub use emitter::{BfldEmitter, SensingInputs};
|
|||
#[cfg(feature = "std")]
|
||||
pub use event::BfldEvent;
|
||||
#[cfg(feature = "std")]
|
||||
pub use ha_discovery::render_discovery_payloads;
|
||||
pub use ha_discovery::{publish_discovery, render_discovery_payloads};
|
||||
#[cfg(feature = "std")]
|
||||
pub use mqtt_topics::{publish_event, render_events, CapturePublisher, Publish, TopicMessage};
|
||||
#[cfg(feature = "mqtt")]
|
||||
|
|
|
|||
|
|
@ -0,0 +1,139 @@
|
|||
//! Acceptance tests for `publish_discovery` bootstrap helper. ADR-122 §2.1.
|
||||
|
||||
#![cfg(feature = "std")]
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use wifi_densepose_bfld::{
|
||||
publish_discovery, BfldConfig, BfldPipeline, BfldPipelineHandle, CapturePublisher,
|
||||
IdentityEmbedding, PipelineInput, PrivacyClass, Publish, SensingInputs, TopicMessage,
|
||||
EMBEDDING_DIM,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn publish_discovery_returns_six_for_anonymous_class() {
|
||||
let mut p = CapturePublisher::default();
|
||||
let count = publish_discovery(&mut p, "seed-01", PrivacyClass::Anonymous).unwrap();
|
||||
assert_eq!(count, 6);
|
||||
assert_eq!(p.published.len(), 6);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn publish_discovery_returns_five_for_restricted_class() {
|
||||
let mut p = CapturePublisher::default();
|
||||
let count = publish_discovery(&mut p, "seed-01", PrivacyClass::Restricted).unwrap();
|
||||
assert_eq!(count, 5);
|
||||
assert!(
|
||||
!p.published
|
||||
.iter()
|
||||
.any(|m| m.topic.contains("identity_risk")),
|
||||
"Restricted must not publish identity_risk discovery",
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn publish_discovery_returns_zero_for_raw_and_derived() {
|
||||
for class in [PrivacyClass::Raw, PrivacyClass::Derived] {
|
||||
let mut p = CapturePublisher::default();
|
||||
let count = publish_discovery(&mut p, "seed-01", class).unwrap();
|
||||
assert_eq!(count, 0);
|
||||
assert!(p.published.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn publish_discovery_topics_are_homeassistant_config_format() {
|
||||
let mut p = CapturePublisher::default();
|
||||
publish_discovery(&mut p, "seed-99", PrivacyClass::Anonymous).unwrap();
|
||||
for msg in &p.published {
|
||||
assert!(msg.topic.starts_with("homeassistant/"));
|
||||
assert!(msg.topic.ends_with("/config"));
|
||||
assert!(msg.topic.contains("seed-99_bfld_"));
|
||||
}
|
||||
}
|
||||
|
||||
// --- error propagation --------------------------------------------------
|
||||
|
||||
struct FailingPub {
|
||||
sent: usize,
|
||||
fails_after: usize,
|
||||
}
|
||||
impl Publish for FailingPub {
|
||||
type Error = &'static str;
|
||||
fn publish(&mut self, _msg: &TopicMessage) -> Result<(), Self::Error> {
|
||||
if self.sent >= self.fails_after {
|
||||
return Err("broker offline");
|
||||
}
|
||||
self.sent += 1;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn publish_discovery_short_circuits_on_publisher_error() {
|
||||
let mut p = FailingPub {
|
||||
sent: 0,
|
||||
fails_after: 3,
|
||||
};
|
||||
let result = publish_discovery(&mut p, "seed-01", PrivacyClass::Anonymous);
|
||||
assert_eq!(result, Err("broker offline"));
|
||||
assert_eq!(p.sent, 3, "exactly 3 messages should land before the error");
|
||||
}
|
||||
|
||||
// --- bootstrap pattern integration with BfldPipelineHandle --------------
|
||||
|
||||
fn sample_input() -> PipelineInput {
|
||||
PipelineInput {
|
||||
inputs: SensingInputs {
|
||||
timestamp_ns: 0,
|
||||
presence: true,
|
||||
motion: 0.4,
|
||||
person_count: 1,
|
||||
sensing_confidence: 0.9,
|
||||
sep: 0.2,
|
||||
stab: 0.2,
|
||||
consist: 0.2,
|
||||
risk_conf: 0.2,
|
||||
rf_signature_hash: None,
|
||||
},
|
||||
embedding: Some(IdentityEmbedding::from_raw([0.05; EMBEDDING_DIM])),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bootstrap_pattern_publishes_discovery_then_state_through_shared_publisher() {
|
||||
// Single Arc<Mutex<CapturePublisher>> shared between discovery bootstrap
|
||||
// and the iter-25 worker handle. After both phases, the publisher's
|
||||
// captured log holds discovery first, state second.
|
||||
let pub_arc = Arc::new(Mutex::new(CapturePublisher::default()));
|
||||
|
||||
// Phase 1: discovery (would be retained=true with a real broker).
|
||||
let count = publish_discovery(&mut pub_arc.clone(), "seed-01", PrivacyClass::Anonymous)
|
||||
.expect("discovery publish");
|
||||
assert_eq!(count, 6);
|
||||
|
||||
// Phase 2: spawn the handle with the same publisher. Pipeline emit drives
|
||||
// 5 state messages (Anonymous + no zone).
|
||||
let pipeline = BfldPipeline::new(BfldConfig::new("seed-01"));
|
||||
let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone());
|
||||
handle.send(sample_input()).expect("send");
|
||||
thread::sleep(Duration::from_millis(50));
|
||||
handle.shutdown();
|
||||
|
||||
let log = pub_arc.lock().unwrap();
|
||||
assert_eq!(
|
||||
log.published.len(),
|
||||
6 + 5,
|
||||
"6 discovery + 5 state messages should be in the log",
|
||||
);
|
||||
|
||||
// First 6 are discovery (homeassistant/...), next 5 are state (ruview/...).
|
||||
for msg in log.published.iter().take(6) {
|
||||
assert!(msg.topic.starts_with("homeassistant/"), "got {}", msg.topic);
|
||||
}
|
||||
for msg in log.published.iter().skip(6) {
|
||||
assert!(msg.topic.starts_with("ruview/"), "got {}", msg.topic);
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue