feat(adr-118/p5.1): MQTT topic router (BfldEvent → Vec<TopicMessage>) — 162/162 GREEN

Iter 21. Lands ADR-122 §2.2 topic shape + class-gated routing as a pure
function. No broker dep yet — that lands in iter 22 with tokio + rumqttc
behind an `mqtt` feature. This iter is the routing policy, separated for
testability.

Added (gated on `feature = "std"`):
- src/mqtt_topics.rs:
  * TopicMessage { topic: String, payload: String }
  * TopicMessage::ruview_topic(node, entity) builds the canonical
    `ruview/<node>/bfld/<entity>/state` shape
  * render_events(&BfldEvent) -> Vec<TopicMessage>:
      class < Anonymous (0/1): returns empty (raw/derived are local only)
      class >= Anonymous (2/3): emits presence + motion + person_count +
        confidence, plus zone_activity if zone_id set
      class == Anonymous (2) ONLY: also emits identity_risk
      class == Restricted (3): identity_risk is suppressed even with score
- pub use render_events, TopicMessage from lib.rs

Payload encoding:
- presence:     "true" | "false"
- motion:       "{:.6}" — fixed-precision decimal in [0.0, 1.0]
- person_count: bare integer string
- confidence:   "{:.6}"
- zone_activity: JSON-string with quotes — "\"living_room\""
- identity_risk: "{:.6}"

tests/mqtt_topic_routing.rs (10 named tests, all green):
  topic_format_is_ruview_node_bfld_entity_state
  anonymous_class_publishes_six_topics_with_zone
    (6 = presence/motion/count/conf/zone/identity_risk)
  anonymous_class_without_zone_omits_zone_activity_topic (5 topics)
  restricted_class_omits_identity_risk_topic (class 3 → 5 topics, no risk)
  raw_and_derived_classes_publish_nothing
    *** structural enforcement of "raw stays local" at the topic layer ***
  presence_payload_is_lowercase_json_bool
  motion_payload_is_fixed_precision_decimal
  person_count_payload_is_bare_integer
  zone_payload_is_json_string_with_quotes
  identity_risk_payload_is_fixed_precision_decimal

ACs progressed:
- ADR-122 §2.2 topic shape now matches the documented format byte-for-byte.
- ADR-122 AC4 — per-class topic gating: classes 2 / 3 publish disjoint
  sets, with identity_risk uniquely guarded.
- ADR-118 invariant I1 reaching the public surface — Raw frames produce
  zero topic messages, so even a buggy publisher loop cannot leak them.

Test config:
- cargo test --no-default-features → 72 passed (mqtt_topics cfg-out)
- cargo test                       → 162 passed (152 + 10)

Out of scope (next iter target):
- tokio + rumqttc behind a new `mqtt` feature gate
- BfldPipelineHandle: Arc<Mutex<BfldPipeline>> + a tokio task that pumps
  inbound SensingInputs, runs render_events on each emitted BfldEvent,
  and calls client.publish() for each TopicMessage
- mosquitto integration test pattern (cf. feedback_mqtt_integration_test_patterns
  memory: per-test client_id, pump until SubAck, wait for publisher discovery)

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-24 16:47:11 -04:00
parent 24f63466c1
commit f674efff9d
3 changed files with 244 additions and 0 deletions

View File

@ -22,6 +22,8 @@ pub mod emitter;
pub mod event;
pub mod frame;
#[cfg(feature = "std")]
pub mod mqtt_topics;
#[cfg(feature = "std")]
pub mod identity_features;
pub mod identity_risk;
#[cfg(feature = "std")]
@ -38,6 +40,8 @@ pub use coherence_gate::{CoherenceGate, MatchOutcome, NullOracle, SoulMatchOracl
pub use emitter::{BfldEmitter, SensingInputs};
#[cfg(feature = "std")]
pub use event::BfldEvent;
#[cfg(feature = "std")]
pub use mqtt_topics::{render_events, TopicMessage};
pub use embedding::{IdentityEmbedding, EMBEDDING_DIM};
pub use embedding_ring::{EmbeddingRing, RING_CAPACITY};
#[cfg(feature = "std")]

View File

@ -0,0 +1,102 @@
//! MQTT topic router. ADR-122 §2.2.
//!
//! Pure-function module that maps a [`BfldEvent`] into a list of per-entity
//! MQTT topic + payload pairs. No broker dependency lives here — the actual
//! `publish` call is a thin wrapper around `Client::publish(topic, payload)`
//! once a broker integration lands (deferred to a follow-up iter).
//!
//! Topic shape (ADR-122 §2.2):
//!
//! ```text
//! ruview/<node_id>/bfld/presence/state # class >= 2
//! ruview/<node_id>/bfld/motion/state # class >= 2
//! ruview/<node_id>/bfld/person_count/state # class >= 2
//! ruview/<node_id>/bfld/zone_activity/state # class >= 2 (when zone_id set)
//! ruview/<node_id>/bfld/confidence/state # class >= 2
//! ruview/<node_id>/bfld/identity_risk/state # class == 2 only
//! ```
//!
//! `raw` (class-1) and `availability` topics are intentionally not yet emitted
//! by this router; they belong to the broker-connection lifecycle, not to the
//! per-event publish loop.
#![cfg(feature = "std")]
use crate::{BfldEvent, PrivacyClass};
/// Per-topic MQTT message ready to feed into `Client::publish(topic, payload)`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TopicMessage {
/// Full MQTT topic, e.g. `ruview/seed-01/bfld/presence/state`.
pub topic: String,
/// UTF-8 payload bytes — single JSON scalar (`true`, `0.72`, `"living_room"`)
/// or a compact JSON object for diagnostics.
pub payload: String,
}
impl TopicMessage {
/// Build a topic of the form `ruview/<node_id>/bfld/<suffix>/state`.
#[must_use]
pub fn ruview_topic(node_id: &str, entity: &str) -> String {
let mut s = String::with_capacity(7 + node_id.len() + 6 + entity.len() + 6);
s.push_str("ruview/");
s.push_str(node_id);
s.push_str("/bfld/");
s.push_str(entity);
s.push_str("/state");
s
}
}
/// Render an event into the per-entity MQTT messages it should publish. Returns
/// an empty vec for events that fail the class gate (e.g., raw class 0).
#[must_use]
pub fn render_events(event: &BfldEvent) -> Vec<TopicMessage> {
let class_byte = event.privacy_class.as_u8();
if class_byte < PrivacyClass::Anonymous.as_u8() {
// Raw + Derived stay local — never published on the public topic tree.
return Vec::new();
}
let mut out = Vec::with_capacity(6);
let node = &event.node_id;
out.push(TopicMessage {
topic: TopicMessage::ruview_topic(node, "presence"),
payload: if event.presence { "true".into() } else { "false".into() },
});
out.push(TopicMessage {
topic: TopicMessage::ruview_topic(node, "motion"),
payload: format!("{:.6}", event.motion),
});
out.push(TopicMessage {
topic: TopicMessage::ruview_topic(node, "person_count"),
payload: format!("{}", event.person_count),
});
out.push(TopicMessage {
topic: TopicMessage::ruview_topic(node, "confidence"),
payload: format!("{:.6}", event.confidence),
});
if let Some(zone) = &event.zone_id {
// Emit a JSON string so consumers can distinguish "no zone" (omitted)
// from "single-zone deployment" (always the same zone string).
out.push(TopicMessage {
topic: TopicMessage::ruview_topic(node, "zone_activity"),
payload: format!("\"{zone}\""),
});
}
// Identity risk is only published at exactly class 2 (Anonymous). Class 3
// (Restricted) computes the score internally but never emits it.
if class_byte == PrivacyClass::Anonymous.as_u8() {
if let Some(score) = event.identity_risk_score {
out.push(TopicMessage {
topic: TopicMessage::ruview_topic(node, "identity_risk"),
payload: format!("{score:.6}"),
});
}
}
out
}

View File

@ -0,0 +1,138 @@
//! Acceptance tests for ADR-122 §2.2 — MQTT topic routing.
#![cfg(feature = "std")]
use wifi_densepose_bfld::{render_events, BfldEvent, PrivacyClass, TopicMessage};
fn sample_event(class: PrivacyClass, with_zone: bool) -> BfldEvent {
BfldEvent::with_privacy_gating(
"seed-01".into(),
1_700_000_000_000_000_000,
true,
0.72,
2,
0.91,
if with_zone { Some("living_room".into()) } else { None },
class,
Some(0.34),
Some([0xAB; 32]),
)
}
fn topics_for(class: PrivacyClass) -> Vec<String> {
render_events(&sample_event(class, true))
.into_iter()
.map(|m| m.topic)
.collect()
}
// --- topic shape ---------------------------------------------------------
#[test]
fn topic_format_is_ruview_node_bfld_entity_state() {
let t = TopicMessage::ruview_topic("seed-42", "presence");
assert_eq!(t, "ruview/seed-42/bfld/presence/state");
}
#[test]
fn anonymous_class_publishes_six_topics_with_zone() {
let topics = topics_for(PrivacyClass::Anonymous);
assert_eq!(topics.len(), 6, "got {topics:?}");
let expected: Vec<&str> = vec![
"ruview/seed-01/bfld/presence/state",
"ruview/seed-01/bfld/motion/state",
"ruview/seed-01/bfld/person_count/state",
"ruview/seed-01/bfld/confidence/state",
"ruview/seed-01/bfld/zone_activity/state",
"ruview/seed-01/bfld/identity_risk/state",
];
for t in &expected {
assert!(topics.contains(&t.to_string()), "missing topic {t}");
}
}
#[test]
fn anonymous_class_without_zone_omits_zone_activity_topic() {
let topics: Vec<String> = render_events(&sample_event(PrivacyClass::Anonymous, false))
.into_iter()
.map(|m| m.topic)
.collect();
assert!(!topics.iter().any(|t| t.contains("zone_activity")));
assert_eq!(topics.len(), 5);
}
// --- class-gated routing -------------------------------------------------
#[test]
fn restricted_class_omits_identity_risk_topic() {
let topics = topics_for(PrivacyClass::Restricted);
assert!(
!topics.iter().any(|t| t.contains("identity_risk")),
"Restricted (class 3) must NOT publish identity_risk: {topics:?}",
);
// Other entities still present.
assert!(topics.iter().any(|t| t.contains("presence")));
assert!(topics.iter().any(|t| t.contains("motion")));
}
#[test]
fn raw_and_derived_classes_publish_nothing() {
// Raw (0) and Derived (1) are local-only / research — never on the
// public topic tree.
let raw = render_events(&sample_event(PrivacyClass::Raw, true));
assert!(raw.is_empty(), "Raw class must publish nothing");
let derived = render_events(&sample_event(PrivacyClass::Derived, true));
assert!(derived.is_empty(), "Derived class must publish nothing");
}
// --- payload shape -------------------------------------------------------
#[test]
fn presence_payload_is_lowercase_json_bool() {
let msgs = render_events(&sample_event(PrivacyClass::Anonymous, false));
let pres = msgs
.iter()
.find(|m| m.topic.contains("presence"))
.expect("presence topic");
assert_eq!(pres.payload, "true");
}
#[test]
fn motion_payload_is_fixed_precision_decimal() {
let msgs = render_events(&sample_event(PrivacyClass::Anonymous, false));
let motion = msgs
.iter()
.find(|m| m.topic.contains("motion"))
.expect("motion topic");
assert_eq!(motion.payload, "0.720000");
}
#[test]
fn person_count_payload_is_bare_integer() {
let msgs = render_events(&sample_event(PrivacyClass::Anonymous, false));
let pc = msgs
.iter()
.find(|m| m.topic.contains("person_count"))
.expect("person_count topic");
assert_eq!(pc.payload, "2");
}
#[test]
fn zone_payload_is_json_string_with_quotes() {
let msgs = render_events(&sample_event(PrivacyClass::Anonymous, true));
let zone = msgs
.iter()
.find(|m| m.topic.contains("zone_activity"))
.expect("zone_activity topic");
assert_eq!(zone.payload, "\"living_room\"");
}
#[test]
fn identity_risk_payload_is_fixed_precision_decimal() {
let msgs = render_events(&sample_event(PrivacyClass::Anonymous, false));
let risk = msgs
.iter()
.find(|m| m.topic.contains("identity_risk"))
.expect("identity_risk topic");
assert_eq!(risk.payload, "0.340000");
}