From f674efff9df348027a255e824e260ca861158a76 Mon Sep 17 00:00:00 2001 From: ruv Date: Sun, 24 May 2026 16:47:11 -0400 Subject: [PATCH] =?UTF-8?q?feat(adr-118/p5.1):=20MQTT=20topic=20router=20(?= =?UTF-8?q?BfldEvent=20=E2=86=92=20Vec)=20=E2=80=94=20162/16?= =?UTF-8?q?2=20GREEN?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Iter 21. Lands ADR-122 §2.2 topic shape + class-gated routing as a pure function. No broker dep yet — that lands in iter 22 with tokio + rumqttc behind an `mqtt` feature. This iter is the routing policy, separated for testability. Added (gated on `feature = "std"`): - src/mqtt_topics.rs: * TopicMessage { topic: String, payload: String } * TopicMessage::ruview_topic(node, entity) builds the canonical `ruview//bfld//state` shape * render_events(&BfldEvent) -> Vec: class < Anonymous (0/1): returns empty (raw/derived are local only) class >= Anonymous (2/3): emits presence + motion + person_count + confidence, plus zone_activity if zone_id set class == Anonymous (2) ONLY: also emits identity_risk class == Restricted (3): identity_risk is suppressed even with score - pub use render_events, TopicMessage from lib.rs Payload encoding: - presence: "true" | "false" - motion: "{:.6}" — fixed-precision decimal in [0.0, 1.0] - person_count: bare integer string - confidence: "{:.6}" - zone_activity: JSON-string with quotes — "\"living_room\"" - identity_risk: "{:.6}" tests/mqtt_topic_routing.rs (10 named tests, all green): topic_format_is_ruview_node_bfld_entity_state anonymous_class_publishes_six_topics_with_zone (6 = presence/motion/count/conf/zone/identity_risk) anonymous_class_without_zone_omits_zone_activity_topic (5 topics) restricted_class_omits_identity_risk_topic (class 3 → 5 topics, no risk) raw_and_derived_classes_publish_nothing *** structural enforcement of "raw stays local" at the topic layer *** presence_payload_is_lowercase_json_bool motion_payload_is_fixed_precision_decimal person_count_payload_is_bare_integer zone_payload_is_json_string_with_quotes identity_risk_payload_is_fixed_precision_decimal ACs progressed: - ADR-122 §2.2 topic shape now matches the documented format byte-for-byte. - ADR-122 AC4 — per-class topic gating: classes 2 / 3 publish disjoint sets, with identity_risk uniquely guarded. - ADR-118 invariant I1 reaching the public surface — Raw frames produce zero topic messages, so even a buggy publisher loop cannot leak them. Test config: - cargo test --no-default-features → 72 passed (mqtt_topics cfg-out) - cargo test → 162 passed (152 + 10) Out of scope (next iter target): - tokio + rumqttc behind a new `mqtt` feature gate - BfldPipelineHandle: Arc> + a tokio task that pumps inbound SensingInputs, runs render_events on each emitted BfldEvent, and calls client.publish() for each TopicMessage - mosquitto integration test pattern (cf. feedback_mqtt_integration_test_patterns memory: per-test client_id, pump until SubAck, wait for publisher discovery) Co-Authored-By: claude-flow --- v2/crates/wifi-densepose-bfld/src/lib.rs | 4 + .../wifi-densepose-bfld/src/mqtt_topics.rs | 102 +++++++++++++ .../tests/mqtt_topic_routing.rs | 138 ++++++++++++++++++ 3 files changed, 244 insertions(+) create mode 100644 v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs create mode 100644 v2/crates/wifi-densepose-bfld/tests/mqtt_topic_routing.rs diff --git a/v2/crates/wifi-densepose-bfld/src/lib.rs b/v2/crates/wifi-densepose-bfld/src/lib.rs index 56d8eaf3..68fd1a2a 100644 --- a/v2/crates/wifi-densepose-bfld/src/lib.rs +++ b/v2/crates/wifi-densepose-bfld/src/lib.rs @@ -22,6 +22,8 @@ pub mod emitter; pub mod event; pub mod frame; #[cfg(feature = "std")] +pub mod mqtt_topics; +#[cfg(feature = "std")] pub mod identity_features; pub mod identity_risk; #[cfg(feature = "std")] @@ -38,6 +40,8 @@ pub use coherence_gate::{CoherenceGate, MatchOutcome, NullOracle, SoulMatchOracl pub use emitter::{BfldEmitter, SensingInputs}; #[cfg(feature = "std")] pub use event::BfldEvent; +#[cfg(feature = "std")] +pub use mqtt_topics::{render_events, TopicMessage}; pub use embedding::{IdentityEmbedding, EMBEDDING_DIM}; pub use embedding_ring::{EmbeddingRing, RING_CAPACITY}; #[cfg(feature = "std")] diff --git a/v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs b/v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs new file mode 100644 index 00000000..cf7c751d --- /dev/null +++ b/v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs @@ -0,0 +1,102 @@ +//! MQTT topic router. ADR-122 §2.2. +//! +//! Pure-function module that maps a [`BfldEvent`] into a list of per-entity +//! MQTT topic + payload pairs. No broker dependency lives here — the actual +//! `publish` call is a thin wrapper around `Client::publish(topic, payload)` +//! once a broker integration lands (deferred to a follow-up iter). +//! +//! Topic shape (ADR-122 §2.2): +//! +//! ```text +//! ruview//bfld/presence/state # class >= 2 +//! ruview//bfld/motion/state # class >= 2 +//! ruview//bfld/person_count/state # class >= 2 +//! ruview//bfld/zone_activity/state # class >= 2 (when zone_id set) +//! ruview//bfld/confidence/state # class >= 2 +//! ruview//bfld/identity_risk/state # class == 2 only +//! ``` +//! +//! `raw` (class-1) and `availability` topics are intentionally not yet emitted +//! by this router; they belong to the broker-connection lifecycle, not to the +//! per-event publish loop. + +#![cfg(feature = "std")] + +use crate::{BfldEvent, PrivacyClass}; + +/// Per-topic MQTT message ready to feed into `Client::publish(topic, payload)`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TopicMessage { + /// Full MQTT topic, e.g. `ruview/seed-01/bfld/presence/state`. + pub topic: String, + /// UTF-8 payload bytes — single JSON scalar (`true`, `0.72`, `"living_room"`) + /// or a compact JSON object for diagnostics. + pub payload: String, +} + +impl TopicMessage { + /// Build a topic of the form `ruview//bfld//state`. + #[must_use] + pub fn ruview_topic(node_id: &str, entity: &str) -> String { + let mut s = String::with_capacity(7 + node_id.len() + 6 + entity.len() + 6); + s.push_str("ruview/"); + s.push_str(node_id); + s.push_str("/bfld/"); + s.push_str(entity); + s.push_str("/state"); + s + } +} + +/// Render an event into the per-entity MQTT messages it should publish. Returns +/// an empty vec for events that fail the class gate (e.g., raw class 0). +#[must_use] +pub fn render_events(event: &BfldEvent) -> Vec { + let class_byte = event.privacy_class.as_u8(); + if class_byte < PrivacyClass::Anonymous.as_u8() { + // Raw + Derived stay local — never published on the public topic tree. + return Vec::new(); + } + + let mut out = Vec::with_capacity(6); + let node = &event.node_id; + + out.push(TopicMessage { + topic: TopicMessage::ruview_topic(node, "presence"), + payload: if event.presence { "true".into() } else { "false".into() }, + }); + out.push(TopicMessage { + topic: TopicMessage::ruview_topic(node, "motion"), + payload: format!("{:.6}", event.motion), + }); + out.push(TopicMessage { + topic: TopicMessage::ruview_topic(node, "person_count"), + payload: format!("{}", event.person_count), + }); + out.push(TopicMessage { + topic: TopicMessage::ruview_topic(node, "confidence"), + payload: format!("{:.6}", event.confidence), + }); + + if let Some(zone) = &event.zone_id { + // Emit a JSON string so consumers can distinguish "no zone" (omitted) + // from "single-zone deployment" (always the same zone string). + out.push(TopicMessage { + topic: TopicMessage::ruview_topic(node, "zone_activity"), + payload: format!("\"{zone}\""), + }); + } + + // Identity risk is only published at exactly class 2 (Anonymous). Class 3 + // (Restricted) computes the score internally but never emits it. + if class_byte == PrivacyClass::Anonymous.as_u8() { + if let Some(score) = event.identity_risk_score { + out.push(TopicMessage { + topic: TopicMessage::ruview_topic(node, "identity_risk"), + payload: format!("{score:.6}"), + }); + } + } + + out +} diff --git a/v2/crates/wifi-densepose-bfld/tests/mqtt_topic_routing.rs b/v2/crates/wifi-densepose-bfld/tests/mqtt_topic_routing.rs new file mode 100644 index 00000000..ec24e0aa --- /dev/null +++ b/v2/crates/wifi-densepose-bfld/tests/mqtt_topic_routing.rs @@ -0,0 +1,138 @@ +//! Acceptance tests for ADR-122 §2.2 — MQTT topic routing. + +#![cfg(feature = "std")] + +use wifi_densepose_bfld::{render_events, BfldEvent, PrivacyClass, TopicMessage}; + +fn sample_event(class: PrivacyClass, with_zone: bool) -> BfldEvent { + BfldEvent::with_privacy_gating( + "seed-01".into(), + 1_700_000_000_000_000_000, + true, + 0.72, + 2, + 0.91, + if with_zone { Some("living_room".into()) } else { None }, + class, + Some(0.34), + Some([0xAB; 32]), + ) +} + +fn topics_for(class: PrivacyClass) -> Vec { + render_events(&sample_event(class, true)) + .into_iter() + .map(|m| m.topic) + .collect() +} + +// --- topic shape --------------------------------------------------------- + +#[test] +fn topic_format_is_ruview_node_bfld_entity_state() { + let t = TopicMessage::ruview_topic("seed-42", "presence"); + assert_eq!(t, "ruview/seed-42/bfld/presence/state"); +} + +#[test] +fn anonymous_class_publishes_six_topics_with_zone() { + let topics = topics_for(PrivacyClass::Anonymous); + assert_eq!(topics.len(), 6, "got {topics:?}"); + let expected: Vec<&str> = vec![ + "ruview/seed-01/bfld/presence/state", + "ruview/seed-01/bfld/motion/state", + "ruview/seed-01/bfld/person_count/state", + "ruview/seed-01/bfld/confidence/state", + "ruview/seed-01/bfld/zone_activity/state", + "ruview/seed-01/bfld/identity_risk/state", + ]; + for t in &expected { + assert!(topics.contains(&t.to_string()), "missing topic {t}"); + } +} + +#[test] +fn anonymous_class_without_zone_omits_zone_activity_topic() { + let topics: Vec = render_events(&sample_event(PrivacyClass::Anonymous, false)) + .into_iter() + .map(|m| m.topic) + .collect(); + assert!(!topics.iter().any(|t| t.contains("zone_activity"))); + assert_eq!(topics.len(), 5); +} + +// --- class-gated routing ------------------------------------------------- + +#[test] +fn restricted_class_omits_identity_risk_topic() { + let topics = topics_for(PrivacyClass::Restricted); + assert!( + !topics.iter().any(|t| t.contains("identity_risk")), + "Restricted (class 3) must NOT publish identity_risk: {topics:?}", + ); + // Other entities still present. + assert!(topics.iter().any(|t| t.contains("presence"))); + assert!(topics.iter().any(|t| t.contains("motion"))); +} + +#[test] +fn raw_and_derived_classes_publish_nothing() { + // Raw (0) and Derived (1) are local-only / research — never on the + // public topic tree. + let raw = render_events(&sample_event(PrivacyClass::Raw, true)); + assert!(raw.is_empty(), "Raw class must publish nothing"); + let derived = render_events(&sample_event(PrivacyClass::Derived, true)); + assert!(derived.is_empty(), "Derived class must publish nothing"); +} + +// --- payload shape ------------------------------------------------------- + +#[test] +fn presence_payload_is_lowercase_json_bool() { + let msgs = render_events(&sample_event(PrivacyClass::Anonymous, false)); + let pres = msgs + .iter() + .find(|m| m.topic.contains("presence")) + .expect("presence topic"); + assert_eq!(pres.payload, "true"); +} + +#[test] +fn motion_payload_is_fixed_precision_decimal() { + let msgs = render_events(&sample_event(PrivacyClass::Anonymous, false)); + let motion = msgs + .iter() + .find(|m| m.topic.contains("motion")) + .expect("motion topic"); + assert_eq!(motion.payload, "0.720000"); +} + +#[test] +fn person_count_payload_is_bare_integer() { + let msgs = render_events(&sample_event(PrivacyClass::Anonymous, false)); + let pc = msgs + .iter() + .find(|m| m.topic.contains("person_count")) + .expect("person_count topic"); + assert_eq!(pc.payload, "2"); +} + +#[test] +fn zone_payload_is_json_string_with_quotes() { + let msgs = render_events(&sample_event(PrivacyClass::Anonymous, true)); + let zone = msgs + .iter() + .find(|m| m.topic.contains("zone_activity")) + .expect("zone_activity topic"); + assert_eq!(zone.payload, "\"living_room\""); +} + +#[test] +fn identity_risk_payload_is_fixed_precision_decimal() { + let msgs = render_events(&sample_event(PrivacyClass::Anonymous, false)); + let risk = msgs + .iter() + .find(|m| m.topic.contains("identity_risk")) + .expect("identity_risk topic"); + assert_eq!(risk.payload, "0.340000"); +}