From 0cb037c00781e066d0e24b541721dacf7582c5f8 Mon Sep 17 00:00:00 2001 From: ruv Date: Sun, 24 May 2026 16:57:05 -0400 Subject: [PATCH] =?UTF-8?q?feat(adr-118/p5.2):=20Publish=20trait=20+=20pub?= =?UTF-8?q?lish=5Fevent=20free=20function=20=E2=80=94=20169/169=20GREEN?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Iter 22. Abstracts the MQTT publish boundary without pulling in tokio or rumqttc yet. The trait is sync (callers can hold &mut self without an async runtime); the production rumqttc-backed impl in iter 23 will drive a tokio task internally and present the same sync surface here. Added (in src/mqtt_topics.rs, gated on `feature = "std"`): - Publish trait with associated Error type - CapturePublisher (Vec-backed; default-constructible) for unit tests - publish_event(publisher, event) -> Result Iterates render_events(event) and forwards each TopicMessage to publisher.publish(). Returns the count actually published, or the publisher's error short-circuited on first failure. - pub use Publish, CapturePublisher, publish_event from lib.rs tests/mqtt_publish_loop.rs (7 named tests, all green): capture_publisher_records_every_message publish_returns_zero_for_raw_and_derived_events (parameterized — class 0 and class 1 both produce zero publishes, reinforcing the invariant I1 surface enforcement from iter 21) published_topics_match_render_events_ordering (stable per-event topic sequence for MQTT consumers) restricted_class_publishes_no_identity_risk_topic anonymous_without_zone_publishes_five_messages (5 = no zone_activity) publisher_error_short_circuits_publish_event (FailingPublisher fails on 3rd publish; publish_event surfaces the error AND leaves the first two messages durably published) capture_publisher_error_type_is_infallible (compile-time witness that CapturePublisher cannot panic the loop) ACs progressed: - ADR-122 §2.2 publisher boundary — the broker-facing surface is now a named trait operators can mock, swap, or wrap with retries. - ADR-122 AC4 — publish_event respects the iter-21 class gating; Raw / Derived events produce zero broker traffic by definition. - ADR-118 invariant I1 — even if the broker connection somehow regressed, the trait-level publish_event cannot exfiltrate a Raw frame because render_events returns empty first. Test config: - cargo test --no-default-features → 72 passed (mqtt_publish_loop cfg-out) - cargo test → 169 passed (162 + 7) Out of scope (next iter target): - New `mqtt` feature gate; tokio + rumqttc deps under it - RumqttPublisher: impl Publish that holds an MqttClient + a small tokio block_on or oneshot send to bridge sync trait to async client - Optional: BfldPipelineHandle that owns Arc> + a spawn-and-forget tokio task pumping inbound (inputs, embedding) → process → publish_event(&rumqtt_pub, &event) - mosquitto integration test following the patterns from feedback_mqtt_integration_test_patterns memory note Co-Authored-By: claude-flow --- v2/crates/wifi-densepose-bfld/src/lib.rs | 2 +- .../wifi-densepose-bfld/src/mqtt_topics.rs | 43 +++++++ .../tests/mqtt_publish_loop.rs | 115 ++++++++++++++++++ 3 files changed, 159 insertions(+), 1 deletion(-) create mode 100644 v2/crates/wifi-densepose-bfld/tests/mqtt_publish_loop.rs diff --git a/v2/crates/wifi-densepose-bfld/src/lib.rs b/v2/crates/wifi-densepose-bfld/src/lib.rs index 68fd1a2a..cf5dc09d 100644 --- a/v2/crates/wifi-densepose-bfld/src/lib.rs +++ b/v2/crates/wifi-densepose-bfld/src/lib.rs @@ -41,7 +41,7 @@ pub use emitter::{BfldEmitter, SensingInputs}; #[cfg(feature = "std")] pub use event::BfldEvent; #[cfg(feature = "std")] -pub use mqtt_topics::{render_events, TopicMessage}; +pub use mqtt_topics::{publish_event, render_events, CapturePublisher, Publish, TopicMessage}; pub use embedding::{IdentityEmbedding, EMBEDDING_DIM}; pub use embedding_ring::{EmbeddingRing, RING_CAPACITY}; #[cfg(feature = "std")] diff --git a/v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs b/v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs index cf7c751d..51c85d8a 100644 --- a/v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs +++ b/v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs @@ -48,6 +48,49 @@ impl TopicMessage { } } +/// Abstract MQTT publisher boundary. The crate ships only the trait + a +/// capture-impl for tests; the production rumqttc-backed impl lands in a +/// follow-up iter behind a `mqtt` feature gate. +/// +/// `publish` is synchronous so callers can hold a `&mut self` without an +/// async runtime; the rumqttc wrapper drives a tokio task internally. +pub trait Publish { + /// Error type — typically the broker's transport error. + type Error; + /// Publish a single rendered message. Implementations may buffer. + fn publish(&mut self, msg: &TopicMessage) -> Result<(), Self::Error>; +} + +/// Capture-impl for unit tests. Stores every published message in order. +#[derive(Debug, Default)] +pub struct CapturePublisher { + /// Every `publish()` call appends to this vec. + pub published: Vec, +} + +impl Publish for CapturePublisher { + type Error = core::convert::Infallible; + fn publish(&mut self, msg: &TopicMessage) -> Result<(), Self::Error> { + self.published.push(msg.clone()); + Ok(()) + } +} + +/// Publish every topic message rendered from `event`. Returns the number of +/// messages actually published (zero for Raw / Derived class events). Errors +/// short-circuit — the publisher state at error time may have partial output. +pub fn publish_event( + publisher: &mut P, + event: &BfldEvent, +) -> Result { + let mut count = 0; + for msg in render_events(event) { + publisher.publish(&msg)?; + count += 1; + } + Ok(count) +} + /// Render an event into the per-entity MQTT messages it should publish. Returns /// an empty vec for events that fail the class gate (e.g., raw class 0). #[must_use] diff --git a/v2/crates/wifi-densepose-bfld/tests/mqtt_publish_loop.rs b/v2/crates/wifi-densepose-bfld/tests/mqtt_publish_loop.rs new file mode 100644 index 00000000..13eb2408 --- /dev/null +++ b/v2/crates/wifi-densepose-bfld/tests/mqtt_publish_loop.rs @@ -0,0 +1,115 @@ +//! Acceptance tests for ADR-122 §2.2 — `Publish` trait + `publish_event`. + +#![cfg(feature = "std")] + +use wifi_densepose_bfld::{ + publish_event, BfldEvent, CapturePublisher, PrivacyClass, Publish, TopicMessage, +}; + +fn sample_event(class: PrivacyClass, with_zone: bool) -> BfldEvent { + BfldEvent::with_privacy_gating( + "seed-99".into(), + 1_700_000_000_000_000_000, + true, + 0.5, + 1, + 0.8, + if with_zone { Some("kitchen".into()) } else { None }, + class, + Some(0.25), + Some([0xCD; 32]), + ) +} + +#[test] +fn capture_publisher_records_every_message() { + let mut p = CapturePublisher::default(); + let count = publish_event(&mut p, &sample_event(PrivacyClass::Anonymous, true)) + .expect("publish must succeed"); + assert_eq!(count, p.published.len(), "return value must equal publish count"); + assert_eq!(count, 6, "Anonymous + zone publishes 6 topics"); +} + +#[test] +fn publish_returns_zero_for_raw_and_derived_events() { + for class in [PrivacyClass::Raw, PrivacyClass::Derived] { + let mut p = CapturePublisher::default(); + let count = publish_event(&mut p, &sample_event(class, true)).unwrap(); + assert_eq!(count, 0, "class {class:?} must publish nothing"); + assert!(p.published.is_empty()); + } +} + +#[test] +fn published_topics_match_render_events_ordering() { + // The publish loop must iterate in the same order as render_events so + // that downstream MQTT consumers see a stable per-event topic sequence. + let event = sample_event(PrivacyClass::Anonymous, true); + let mut p = CapturePublisher::default(); + publish_event(&mut p, &event).unwrap(); + let rendered = wifi_densepose_bfld::render_events(&event); + assert_eq!(p.published, rendered); +} + +#[test] +fn restricted_class_publishes_no_identity_risk_topic() { + let mut p = CapturePublisher::default(); + publish_event(&mut p, &sample_event(PrivacyClass::Restricted, true)).unwrap(); + assert!( + !p.published.iter().any(|m| m.topic.contains("identity_risk")), + "Restricted must not publish identity_risk, got: {:?}", + p.published.iter().map(|m| &m.topic).collect::>(), + ); +} + +#[test] +fn anonymous_without_zone_publishes_five_messages() { + let mut p = CapturePublisher::default(); + let count = publish_event(&mut p, &sample_event(PrivacyClass::Anonymous, false)).unwrap(); + assert_eq!(count, 5); +} + +// --- error propagation -------------------------------------------------- + +struct FailingPublisher { + fails_after: usize, + published_so_far: usize, +} + +impl Publish for FailingPublisher { + type Error = &'static str; + fn publish(&mut self, _msg: &TopicMessage) -> Result<(), Self::Error> { + if self.published_so_far >= self.fails_after { + return Err("broker offline"); + } + self.published_so_far += 1; + Ok(()) + } +} + +#[test] +fn publisher_error_short_circuits_publish_event() { + let mut p = FailingPublisher { + fails_after: 2, + published_so_far: 0, + }; + let result = publish_event(&mut p, &sample_event(PrivacyClass::Anonymous, true)); + match result { + Err("broker offline") => {} + other => panic!("expected broker-offline error, got {other:?}"), + } + assert_eq!( + p.published_so_far, 2, + "exactly the first two messages should land before the error", + ); +} + +// --- error type ergonomics ---------------------------------------------- + +#[test] +fn capture_publisher_error_type_is_infallible() { + let mut p = CapturePublisher::default(); + let r: Result = + publish_event(&mut p, &sample_event(PrivacyClass::Anonymous, false)); + assert!(r.is_ok()); +}