feat(adr-118/p5.5): BfldPipelineHandle worker thread (177/177 GREEN)

Iter 25. Single-call operator surface: spawn() takes a BfldPipeline and
a Publish impl, returns a handle whose send() enqueues sensing inputs
into a worker thread. The worker drives pipeline.process() then
publish_event() per input. Drop or shutdown() joins cleanly.

Added (gated on `feature = "std"`):
- src/mqtt_topics.rs: impl<P: Publish> Publish for Arc<Mutex<P>>
  Lets a publisher owned by a worker thread remain inspectable from a
  test or operator post-shutdown.
- src/pipeline_handle.rs:
  * PipelineInput { inputs: SensingInputs, embedding: Option<...> }
  * BfldPipelineHandle { sender, worker: Option<JoinHandle<()>> }
  * spawn<P: Publish + Send + 'static>(pipeline, publisher) -> Self
      Worker loop: recv() → pipeline.process() → publish_event(); errors
      logged to stderr (single-frame failures must not kill the loop)
  * send(PipelineInput) -> Result<(), SendError<...>>
  * shutdown(self) — replaces sender with a dropped channel so worker
    recv() returns Err(RecvError); join propagates worker panics
  * Drop impl mirrors shutdown so forgotten handles still clean up
- pub use BfldPipelineHandle, PipelineInput from lib.rs

tests/pipeline_handle_worker.rs (8 named tests, all green):
  handle_publishes_single_input (5 topics for Anonymous + no zone)
  handle_publishes_multiple_inputs_in_order (3 × 5 = 15 topics)
  handle_send_after_shutdown_errors
    (compile-time witness: shutdown(self) consumes the handle so
     post-shutdown send() is structurally impossible)
  handle_drop_without_explicit_shutdown_joins_worker_cleanly
    (validates the Drop path completes without hanging)
  handle_honors_privacy_mode_toggle_via_pipeline_state
    (4 topics for Restricted; identity_risk absent)
  handle_drops_event_when_gate_rejects
    (5 topics from first Accept-state input + 0 from Reject)
  handle_with_zone_threads_through_to_published_topics
    (zone_activity payload = "\"kitchen\"")
  class_3_pipeline_baseline_produces_four_topics_per_input

Test publisher pattern: Arc<Mutex<CapturePublisher>> lets the test thread
read out the worker thread's publish log post-shutdown without needing
custom channel plumbing per test.

ACs progressed:
- ADR-118 §2.1 lib.rs entry point now has the "set up MQTT and walk away"
  operator surface promised in the implementation plan. Two lines:
      let handle = BfldPipelineHandle::spawn(pipeline, rumqttc_pub);
      handle.send(PipelineInput { inputs, embedding })?;
- ADR-122 §2.2 per-frame publish path is now structurally guarded by
  worker-thread isolation: even if a Publish::publish call panics, only
  the worker thread dies; the main thread sees a clean error on send().

Test config:
- cargo test --no-default-features → 72 passed
- cargo test                       → 177 passed (169 + 8)
- cargo test --features mqtt       → 186 (178 + 8 — handle is std-only,
  reachable in both feature configs)

Out of scope (next iter target):
- GitHub Actions workflow with mosquitto Docker service so the iter-24
  integration test actually runs in CI with BFLD_MQTT_BROKER set.
- HA discovery payload publisher (ADR-122 §2.1) — the auto-discovery
  config messages HA needs alongside the state topics this handle ships.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-24 17:27:48 -04:00
parent fac9faceb2
commit e8b4fdbc8f
4 changed files with 312 additions and 0 deletions

View File

@ -31,6 +31,8 @@ pub mod payload;
#[cfg(feature = "std")]
pub mod pipeline;
#[cfg(feature = "std")]
pub mod pipeline_handle;
#[cfg(feature = "std")]
pub mod privacy_gate;
#[cfg(feature = "mqtt")]
pub mod rumqttc_publisher;
@ -59,6 +61,8 @@ pub use payload::BfldPayload;
#[cfg(feature = "std")]
pub use pipeline::{BfldConfig, BfldPipeline};
#[cfg(feature = "std")]
pub use pipeline_handle::{BfldPipelineHandle, PipelineInput};
#[cfg(feature = "std")]
pub use privacy_gate::PrivacyGate;
pub use signature_hasher::{SignatureHasher, RF_SIGNATURE_LEN, SITE_SALT_LEN};
pub use sink::{check_class, LocalSink, MatterSink, NetworkSink, Sink};

View File

@ -76,6 +76,18 @@ impl Publish for CapturePublisher {
}
}
/// Forward `Publish` through a shared `Arc<Mutex<P>>` so a publisher owned by
/// a worker thread can still be inspected by the test or operator after the
/// fact. Lock-poisoning is treated as a panic — there is no recovery story.
impl<P: Publish> Publish for std::sync::Arc<std::sync::Mutex<P>> {
type Error = P::Error;
fn publish(&mut self, msg: &TopicMessage) -> Result<(), Self::Error> {
self.lock()
.expect("BFLD publish: inner publisher Mutex poisoned")
.publish(msg)
}
}
/// Publish every topic message rendered from `event`. Returns the number of
/// messages actually published (zero for Raw / Derived class events). Errors
/// short-circuit — the publisher state at error time may have partial output.

View File

@ -0,0 +1,94 @@
//! `BfldPipelineHandle` — worker-thread wrapper around [`BfldPipeline`] and a
//! [`Publish`]er. ADR-118 §2.1 single-call operator surface.
//!
//! `spawn()` returns a handle owning the inbound channel sender. The worker
//! thread loops on `recv()`, drives one `pipeline.process()` per input, and
//! forwards any emitted `BfldEvent` through `publish_event()`. `shutdown()`
//! closes the channel and joins the thread.
#![cfg(feature = "std")]
use std::sync::mpsc::{channel, RecvError, SendError, Sender};
use std::thread::{self, JoinHandle};
use crate::mqtt_topics::{publish_event, Publish};
use crate::pipeline::BfldPipeline;
use crate::{IdentityEmbedding, SensingInputs};
/// Frame-level input to the spawned worker. The pipeline state — gate,
/// embedding ring, hasher — lives behind the worker thread; callers only
/// send the per-frame sensing data.
pub struct PipelineInput {
/// Sensing fields fed to `pipeline.process`.
pub inputs: SensingInputs,
/// Optional embedding for the iter-15 hasher input + iter-8 ring.
pub embedding: Option<IdentityEmbedding>,
}
/// Handle to the spawned worker. Drop or `shutdown()` to stop. `send()`
/// returns an error after shutdown.
pub struct BfldPipelineHandle {
sender: Sender<PipelineInput>,
worker: Option<JoinHandle<()>>,
}
impl BfldPipelineHandle {
/// Spawn a worker that owns `pipeline` and `publisher`. Returns a handle
/// whose `send()` enqueues sensing inputs into the worker thread.
///
/// Publish errors are logged to stderr and the worker continues — single
/// frame failures should not kill the long-running pipeline.
#[must_use]
pub fn spawn<P>(mut pipeline: BfldPipeline, mut publisher: P) -> Self
where
P: Publish + Send + 'static,
P::Error: core::fmt::Debug,
{
let (sender, receiver) = channel::<PipelineInput>();
let worker = thread::spawn(move || loop {
match receiver.recv() {
Ok(PipelineInput { inputs, embedding }) => {
if let Some(event) = pipeline.process(inputs, embedding) {
if let Err(e) = publish_event(&mut publisher, &event) {
eprintln!("BFLD publish error: {e:?}");
}
}
}
Err(RecvError) => break, // channel closed by shutdown / drop
}
});
Self {
sender,
worker: Some(worker),
}
}
/// Enqueue an input. Returns `SendError<PipelineInput>` (carrying the
/// rejected input) if the worker has already shut down.
pub fn send(&self, input: PipelineInput) -> Result<(), SendError<PipelineInput>> {
self.sender.send(input)
}
/// Close the input channel and join the worker. Panics from the worker
/// thread propagate here; otherwise returns cleanly.
pub fn shutdown(mut self) {
if let Some(worker) = self.worker.take() {
drop(std::mem::replace(&mut self.sender, channel().0));
worker
.join()
.expect("BFLD pipeline worker panicked during shutdown");
}
}
}
impl Drop for BfldPipelineHandle {
/// Best-effort cleanup if `shutdown()` was not called explicitly.
fn drop(&mut self) {
if let Some(worker) = self.worker.take() {
// Replace the sender with a fresh disconnected one so the worker
// recv() returns Err(RecvError) and the loop exits.
drop(std::mem::replace(&mut self.sender, channel().0));
let _ = worker.join();
}
}
}

View File

@ -0,0 +1,202 @@
//! Acceptance tests for `BfldPipelineHandle`. ADR-118 §2.1 worker surface.
#![cfg(feature = "std")]
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use wifi_densepose_bfld::{
BfldConfig, BfldPipeline, BfldPipelineHandle, CapturePublisher, IdentityEmbedding,
PipelineInput, PrivacyClass, SensingInputs, EMBEDDING_DIM,
};
fn inputs(ts_ns: u64) -> SensingInputs {
SensingInputs {
timestamp_ns: ts_ns,
presence: true,
motion: 0.5,
person_count: 1,
sensing_confidence: 0.9,
sep: 0.2,
stab: 0.2,
consist: 0.2,
risk_conf: 0.2,
rf_signature_hash: None,
}
}
fn embedding() -> IdentityEmbedding {
IdentityEmbedding::from_raw([0.05; EMBEDDING_DIM])
}
fn input(ts_ns: u64) -> PipelineInput {
PipelineInput {
inputs: inputs(ts_ns),
embedding: Some(embedding()),
}
}
fn drain(published: &Arc<Mutex<CapturePublisher>>) -> Vec<String> {
published
.lock()
.unwrap()
.published
.iter()
.map(|m| m.topic.clone())
.collect()
}
#[test]
fn handle_publishes_single_input() {
let pipeline = BfldPipeline::new(BfldConfig::new("seed-01"));
let pub_arc = Arc::new(Mutex::new(CapturePublisher::default()));
let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone());
handle.send(input(0)).expect("send must succeed");
// Give the worker a moment to drain the channel.
thread::sleep(Duration::from_millis(50));
handle.shutdown();
let topics = drain(&pub_arc);
assert_eq!(topics.len(), 5, "Anonymous + no zone → 5 topics");
}
#[test]
fn handle_publishes_multiple_inputs_in_order() {
let pipeline = BfldPipeline::new(BfldConfig::new("seed-01"));
let pub_arc = Arc::new(Mutex::new(CapturePublisher::default()));
let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone());
for i in 0..3 {
handle.send(input(i * 1_000_000)).unwrap();
}
thread::sleep(Duration::from_millis(80));
handle.shutdown();
let topics = drain(&pub_arc);
assert_eq!(topics.len(), 15, "3 inputs × 5 topics each = 15");
}
#[test]
fn handle_send_after_shutdown_errors() {
let pipeline = BfldPipeline::new(BfldConfig::new("seed-01"));
let pub_arc = Arc::new(Mutex::new(CapturePublisher::default()));
let handle = BfldPipelineHandle::spawn(pipeline, pub_arc);
// Save the sender by cloning before shutdown — but BfldPipelineHandle
// owns the sender, so the test demonstrates this via post-shutdown send:
handle.shutdown();
// shutdown consumed handle; we can't call send afterward at the type
// level. The compile-time guarantee IS the test.
}
#[test]
fn handle_drop_without_explicit_shutdown_joins_worker_cleanly() {
let pipeline = BfldPipeline::new(BfldConfig::new("seed-01"));
let pub_arc = Arc::new(Mutex::new(CapturePublisher::default()));
{
let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone());
handle.send(input(0)).unwrap();
thread::sleep(Duration::from_millis(50));
// No explicit shutdown — Drop must handle worker join.
}
// If we reached here without hanging or panicking, the Drop path worked.
let topics = drain(&pub_arc);
assert_eq!(topics.len(), 5);
}
#[test]
fn handle_honors_privacy_mode_toggle_via_pipeline_state() {
let mut pipeline = BfldPipeline::new(BfldConfig::new("seed-01"));
pipeline.enable_privacy_mode();
let pub_arc = Arc::new(Mutex::new(CapturePublisher::default()));
let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone());
handle.send(input(0)).unwrap();
thread::sleep(Duration::from_millis(50));
handle.shutdown();
let topics = drain(&pub_arc);
// Restricted + no zone: presence/motion/count/confidence = 4 topics.
assert_eq!(topics.len(), 4, "Restricted strips identity_risk topic");
assert!(!topics.iter().any(|t| t.contains("identity_risk")));
}
#[test]
fn handle_drops_event_when_gate_rejects() {
let pipeline = BfldPipeline::new(BfldConfig::new("seed-01"));
let pub_arc = Arc::new(Mutex::new(CapturePublisher::default()));
let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone());
// Two high-risk inputs back-to-back force the gate into Reject after debounce.
use wifi_densepose_bfld::coherence_gate::DEBOUNCE_NS;
let mut high_risk = inputs(0);
high_risk.sep = 1.0;
high_risk.stab = 1.0;
high_risk.consist = 1.0;
high_risk.risk_conf = 0.8;
handle
.send(PipelineInput {
inputs: high_risk.clone(),
embedding: Some(embedding()),
})
.unwrap();
high_risk.timestamp_ns = DEBOUNCE_NS;
handle
.send(PipelineInput {
inputs: high_risk,
embedding: Some(embedding()),
})
.unwrap();
thread::sleep(Duration::from_millis(80));
handle.shutdown();
let topics = drain(&pub_arc);
// First input emits (Accept state) → 5 topics. Second input gate-promoted
// to Reject → 0 topics. Total = 5.
assert_eq!(topics.len(), 5, "Reject must drop the second event entirely");
}
#[test]
fn handle_with_zone_threads_through_to_published_topics() {
let pipeline = BfldPipeline::new(BfldConfig::new("seed-01").with_zone("kitchen"));
let pub_arc = Arc::new(Mutex::new(CapturePublisher::default()));
let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone());
handle.send(input(0)).unwrap();
thread::sleep(Duration::from_millis(50));
handle.shutdown();
let topics = drain(&pub_arc);
assert!(
topics.iter().any(|t| t.contains("zone_activity")),
"zone_activity topic must be present when zone configured",
);
let zone_msg = pub_arc
.lock()
.unwrap()
.published
.iter()
.find(|m| m.topic.contains("zone_activity"))
.map(|m| m.payload.clone());
assert_eq!(zone_msg.as_deref(), Some("\"kitchen\""));
}
#[test]
fn class_3_pipeline_baseline_produces_four_topics_per_input() {
// Baseline class = Restricted (no privacy_mode toggle needed).
let pipeline = BfldPipeline::new(
BfldConfig::new("seed-01").with_privacy_class(PrivacyClass::Restricted),
);
let pub_arc = Arc::new(Mutex::new(CapturePublisher::default()));
let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone());
handle.send(input(0)).unwrap();
thread::sleep(Duration::from_millis(50));
handle.shutdown();
assert_eq!(drain(&pub_arc).len(), 4);
}