feat(adr-118/p5.2): Publish trait + publish_event free function — 169/169 GREEN
Iter 22. Abstracts the MQTT publish boundary without pulling in tokio or
rumqttc yet. The trait is sync (callers can hold &mut self without an
async runtime); the production rumqttc-backed impl in iter 23 will drive
a tokio task internally and present the same sync surface here.
Added (in src/mqtt_topics.rs, gated on `feature = "std"`):
- Publish trait with associated Error type
- CapturePublisher (Vec-backed; default-constructible) for unit tests
- publish_event<P: Publish>(publisher, event) -> Result<usize, P::Error>
Iterates render_events(event) and forwards each TopicMessage to
publisher.publish(). Returns the count actually published, or the
publisher's error short-circuited on first failure.
- pub use Publish, CapturePublisher, publish_event from lib.rs
tests/mqtt_publish_loop.rs (7 named tests, all green):
capture_publisher_records_every_message
publish_returns_zero_for_raw_and_derived_events
(parameterized — class 0 and class 1 both produce zero publishes,
reinforcing the invariant I1 surface enforcement from iter 21)
published_topics_match_render_events_ordering
(stable per-event topic sequence for MQTT consumers)
restricted_class_publishes_no_identity_risk_topic
anonymous_without_zone_publishes_five_messages (5 = no zone_activity)
publisher_error_short_circuits_publish_event
(FailingPublisher fails on 3rd publish; publish_event surfaces the
error AND leaves the first two messages durably published)
capture_publisher_error_type_is_infallible
(compile-time witness that CapturePublisher cannot panic the loop)
ACs progressed:
- ADR-122 §2.2 publisher boundary — the broker-facing surface is now a
named trait operators can mock, swap, or wrap with retries.
- ADR-122 AC4 — publish_event respects the iter-21 class gating; Raw /
Derived events produce zero broker traffic by definition.
- ADR-118 invariant I1 — even if the broker connection somehow regressed,
the trait-level publish_event cannot exfiltrate a Raw frame because
render_events returns empty first.
Test config:
- cargo test --no-default-features → 72 passed (mqtt_publish_loop cfg-out)
- cargo test → 169 passed (162 + 7)
Out of scope (next iter target):
- New `mqtt` feature gate; tokio + rumqttc deps under it
- RumqttPublisher: impl Publish that holds an MqttClient + a small tokio
block_on or oneshot send to bridge sync trait to async client
- Optional: BfldPipelineHandle that owns Arc<Mutex<BfldPipeline>> + a
spawn-and-forget tokio task pumping inbound (inputs, embedding) →
process → publish_event(&rumqtt_pub, &event)
- mosquitto integration test following the patterns from
feedback_mqtt_integration_test_patterns memory note
Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
parent
f674efff9d
commit
0cb037c007
|
|
@ -41,7 +41,7 @@ pub use emitter::{BfldEmitter, SensingInputs};
|
|||
#[cfg(feature = "std")]
|
||||
pub use event::BfldEvent;
|
||||
#[cfg(feature = "std")]
|
||||
pub use mqtt_topics::{render_events, TopicMessage};
|
||||
pub use mqtt_topics::{publish_event, render_events, CapturePublisher, Publish, TopicMessage};
|
||||
pub use embedding::{IdentityEmbedding, EMBEDDING_DIM};
|
||||
pub use embedding_ring::{EmbeddingRing, RING_CAPACITY};
|
||||
#[cfg(feature = "std")]
|
||||
|
|
|
|||
|
|
@ -48,6 +48,49 @@ impl TopicMessage {
|
|||
}
|
||||
}
|
||||
|
||||
/// Abstract MQTT publisher boundary. The crate ships only the trait + a
|
||||
/// capture-impl for tests; the production rumqttc-backed impl lands in a
|
||||
/// follow-up iter behind a `mqtt` feature gate.
|
||||
///
|
||||
/// `publish` is synchronous so callers can hold a `&mut self` without an
|
||||
/// async runtime; the rumqttc wrapper drives a tokio task internally.
|
||||
pub trait Publish {
|
||||
/// Error type — typically the broker's transport error.
|
||||
type Error;
|
||||
/// Publish a single rendered message. Implementations may buffer.
|
||||
fn publish(&mut self, msg: &TopicMessage) -> Result<(), Self::Error>;
|
||||
}
|
||||
|
||||
/// Capture-impl for unit tests. Stores every published message in order.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct CapturePublisher {
|
||||
/// Every `publish()` call appends to this vec.
|
||||
pub published: Vec<TopicMessage>,
|
||||
}
|
||||
|
||||
impl Publish for CapturePublisher {
|
||||
type Error = core::convert::Infallible;
|
||||
fn publish(&mut self, msg: &TopicMessage) -> Result<(), Self::Error> {
|
||||
self.published.push(msg.clone());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Publish every topic message rendered from `event`. Returns the number of
|
||||
/// messages actually published (zero for Raw / Derived class events). Errors
|
||||
/// short-circuit — the publisher state at error time may have partial output.
|
||||
pub fn publish_event<P: Publish>(
|
||||
publisher: &mut P,
|
||||
event: &BfldEvent,
|
||||
) -> Result<usize, P::Error> {
|
||||
let mut count = 0;
|
||||
for msg in render_events(event) {
|
||||
publisher.publish(&msg)?;
|
||||
count += 1;
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Render an event into the per-entity MQTT messages it should publish. Returns
|
||||
/// an empty vec for events that fail the class gate (e.g., raw class 0).
|
||||
#[must_use]
|
||||
|
|
|
|||
|
|
@ -0,0 +1,115 @@
|
|||
//! Acceptance tests for ADR-122 §2.2 — `Publish` trait + `publish_event`.
|
||||
|
||||
#![cfg(feature = "std")]
|
||||
|
||||
use wifi_densepose_bfld::{
|
||||
publish_event, BfldEvent, CapturePublisher, PrivacyClass, Publish, TopicMessage,
|
||||
};
|
||||
|
||||
fn sample_event(class: PrivacyClass, with_zone: bool) -> BfldEvent {
|
||||
BfldEvent::with_privacy_gating(
|
||||
"seed-99".into(),
|
||||
1_700_000_000_000_000_000,
|
||||
true,
|
||||
0.5,
|
||||
1,
|
||||
0.8,
|
||||
if with_zone { Some("kitchen".into()) } else { None },
|
||||
class,
|
||||
Some(0.25),
|
||||
Some([0xCD; 32]),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn capture_publisher_records_every_message() {
|
||||
let mut p = CapturePublisher::default();
|
||||
let count = publish_event(&mut p, &sample_event(PrivacyClass::Anonymous, true))
|
||||
.expect("publish must succeed");
|
||||
assert_eq!(count, p.published.len(), "return value must equal publish count");
|
||||
assert_eq!(count, 6, "Anonymous + zone publishes 6 topics");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn publish_returns_zero_for_raw_and_derived_events() {
|
||||
for class in [PrivacyClass::Raw, PrivacyClass::Derived] {
|
||||
let mut p = CapturePublisher::default();
|
||||
let count = publish_event(&mut p, &sample_event(class, true)).unwrap();
|
||||
assert_eq!(count, 0, "class {class:?} must publish nothing");
|
||||
assert!(p.published.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn published_topics_match_render_events_ordering() {
|
||||
// The publish loop must iterate in the same order as render_events so
|
||||
// that downstream MQTT consumers see a stable per-event topic sequence.
|
||||
let event = sample_event(PrivacyClass::Anonymous, true);
|
||||
let mut p = CapturePublisher::default();
|
||||
publish_event(&mut p, &event).unwrap();
|
||||
let rendered = wifi_densepose_bfld::render_events(&event);
|
||||
assert_eq!(p.published, rendered);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn restricted_class_publishes_no_identity_risk_topic() {
|
||||
let mut p = CapturePublisher::default();
|
||||
publish_event(&mut p, &sample_event(PrivacyClass::Restricted, true)).unwrap();
|
||||
assert!(
|
||||
!p.published.iter().any(|m| m.topic.contains("identity_risk")),
|
||||
"Restricted must not publish identity_risk, got: {:?}",
|
||||
p.published.iter().map(|m| &m.topic).collect::<Vec<_>>(),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn anonymous_without_zone_publishes_five_messages() {
|
||||
let mut p = CapturePublisher::default();
|
||||
let count = publish_event(&mut p, &sample_event(PrivacyClass::Anonymous, false)).unwrap();
|
||||
assert_eq!(count, 5);
|
||||
}
|
||||
|
||||
// --- error propagation --------------------------------------------------
|
||||
|
||||
struct FailingPublisher {
|
||||
fails_after: usize,
|
||||
published_so_far: usize,
|
||||
}
|
||||
|
||||
impl Publish for FailingPublisher {
|
||||
type Error = &'static str;
|
||||
fn publish(&mut self, _msg: &TopicMessage) -> Result<(), Self::Error> {
|
||||
if self.published_so_far >= self.fails_after {
|
||||
return Err("broker offline");
|
||||
}
|
||||
self.published_so_far += 1;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn publisher_error_short_circuits_publish_event() {
|
||||
let mut p = FailingPublisher {
|
||||
fails_after: 2,
|
||||
published_so_far: 0,
|
||||
};
|
||||
let result = publish_event(&mut p, &sample_event(PrivacyClass::Anonymous, true));
|
||||
match result {
|
||||
Err("broker offline") => {}
|
||||
other => panic!("expected broker-offline error, got {other:?}"),
|
||||
}
|
||||
assert_eq!(
|
||||
p.published_so_far, 2,
|
||||
"exactly the first two messages should land before the error",
|
||||
);
|
||||
}
|
||||
|
||||
// --- error type ergonomics ----------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn capture_publisher_error_type_is_infallible() {
|
||||
let mut p = CapturePublisher::default();
|
||||
let r: Result<usize, core::convert::Infallible> =
|
||||
publish_event(&mut p, &sample_event(PrivacyClass::Anonymous, false));
|
||||
assert!(r.is_ok());
|
||||
}
|
||||
Loading…
Reference in New Issue