diff --git a/v2/crates/wifi-densepose-bfld/src/lib.rs b/v2/crates/wifi-densepose-bfld/src/lib.rs index 610cc6b7..4a593cd1 100644 --- a/v2/crates/wifi-densepose-bfld/src/lib.rs +++ b/v2/crates/wifi-densepose-bfld/src/lib.rs @@ -31,6 +31,8 @@ pub mod payload; #[cfg(feature = "std")] pub mod pipeline; #[cfg(feature = "std")] +pub mod pipeline_handle; +#[cfg(feature = "std")] pub mod privacy_gate; #[cfg(feature = "mqtt")] pub mod rumqttc_publisher; @@ -59,6 +61,8 @@ pub use payload::BfldPayload; #[cfg(feature = "std")] pub use pipeline::{BfldConfig, BfldPipeline}; #[cfg(feature = "std")] +pub use pipeline_handle::{BfldPipelineHandle, PipelineInput}; +#[cfg(feature = "std")] pub use privacy_gate::PrivacyGate; pub use signature_hasher::{SignatureHasher, RF_SIGNATURE_LEN, SITE_SALT_LEN}; pub use sink::{check_class, LocalSink, MatterSink, NetworkSink, Sink}; diff --git a/v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs b/v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs index 51c85d8a..304f433c 100644 --- a/v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs +++ b/v2/crates/wifi-densepose-bfld/src/mqtt_topics.rs @@ -76,6 +76,18 @@ impl Publish for CapturePublisher { } } +/// Forward `Publish` through a shared `Arc>` so a publisher owned by +/// a worker thread can still be inspected by the test or operator after the +/// fact. Lock-poisoning is treated as a panic — there is no recovery story. +impl Publish for std::sync::Arc> { + type Error = P::Error; + fn publish(&mut self, msg: &TopicMessage) -> Result<(), Self::Error> { + self.lock() + .expect("BFLD publish: inner publisher Mutex poisoned") + .publish(msg) + } +} + /// Publish every topic message rendered from `event`. Returns the number of /// messages actually published (zero for Raw / Derived class events). Errors /// short-circuit — the publisher state at error time may have partial output. diff --git a/v2/crates/wifi-densepose-bfld/src/pipeline_handle.rs b/v2/crates/wifi-densepose-bfld/src/pipeline_handle.rs new file mode 100644 index 00000000..459b0175 --- /dev/null +++ b/v2/crates/wifi-densepose-bfld/src/pipeline_handle.rs @@ -0,0 +1,94 @@ +//! `BfldPipelineHandle` — worker-thread wrapper around [`BfldPipeline`] and a +//! [`Publish`]er. ADR-118 §2.1 single-call operator surface. +//! +//! `spawn()` returns a handle owning the inbound channel sender. The worker +//! thread loops on `recv()`, drives one `pipeline.process()` per input, and +//! forwards any emitted `BfldEvent` through `publish_event()`. `shutdown()` +//! closes the channel and joins the thread. + +#![cfg(feature = "std")] + +use std::sync::mpsc::{channel, RecvError, SendError, Sender}; +use std::thread::{self, JoinHandle}; + +use crate::mqtt_topics::{publish_event, Publish}; +use crate::pipeline::BfldPipeline; +use crate::{IdentityEmbedding, SensingInputs}; + +/// Frame-level input to the spawned worker. The pipeline state — gate, +/// embedding ring, hasher — lives behind the worker thread; callers only +/// send the per-frame sensing data. +pub struct PipelineInput { + /// Sensing fields fed to `pipeline.process`. + pub inputs: SensingInputs, + /// Optional embedding for the iter-15 hasher input + iter-8 ring. + pub embedding: Option, +} + +/// Handle to the spawned worker. Drop or `shutdown()` to stop. `send()` +/// returns an error after shutdown. +pub struct BfldPipelineHandle { + sender: Sender, + worker: Option>, +} + +impl BfldPipelineHandle { + /// Spawn a worker that owns `pipeline` and `publisher`. Returns a handle + /// whose `send()` enqueues sensing inputs into the worker thread. + /// + /// Publish errors are logged to stderr and the worker continues — single + /// frame failures should not kill the long-running pipeline. + #[must_use] + pub fn spawn

(mut pipeline: BfldPipeline, mut publisher: P) -> Self + where + P: Publish + Send + 'static, + P::Error: core::fmt::Debug, + { + let (sender, receiver) = channel::(); + let worker = thread::spawn(move || loop { + match receiver.recv() { + Ok(PipelineInput { inputs, embedding }) => { + if let Some(event) = pipeline.process(inputs, embedding) { + if let Err(e) = publish_event(&mut publisher, &event) { + eprintln!("BFLD publish error: {e:?}"); + } + } + } + Err(RecvError) => break, // channel closed by shutdown / drop + } + }); + Self { + sender, + worker: Some(worker), + } + } + + /// Enqueue an input. Returns `SendError` (carrying the + /// rejected input) if the worker has already shut down. + pub fn send(&self, input: PipelineInput) -> Result<(), SendError> { + self.sender.send(input) + } + + /// Close the input channel and join the worker. Panics from the worker + /// thread propagate here; otherwise returns cleanly. + pub fn shutdown(mut self) { + if let Some(worker) = self.worker.take() { + drop(std::mem::replace(&mut self.sender, channel().0)); + worker + .join() + .expect("BFLD pipeline worker panicked during shutdown"); + } + } +} + +impl Drop for BfldPipelineHandle { + /// Best-effort cleanup if `shutdown()` was not called explicitly. + fn drop(&mut self) { + if let Some(worker) = self.worker.take() { + // Replace the sender with a fresh disconnected one so the worker + // recv() returns Err(RecvError) and the loop exits. + drop(std::mem::replace(&mut self.sender, channel().0)); + let _ = worker.join(); + } + } +} diff --git a/v2/crates/wifi-densepose-bfld/tests/pipeline_handle_worker.rs b/v2/crates/wifi-densepose-bfld/tests/pipeline_handle_worker.rs new file mode 100644 index 00000000..7f567de9 --- /dev/null +++ b/v2/crates/wifi-densepose-bfld/tests/pipeline_handle_worker.rs @@ -0,0 +1,202 @@ +//! Acceptance tests for `BfldPipelineHandle`. ADR-118 §2.1 worker surface. + +#![cfg(feature = "std")] + +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; + +use wifi_densepose_bfld::{ + BfldConfig, BfldPipeline, BfldPipelineHandle, CapturePublisher, IdentityEmbedding, + PipelineInput, PrivacyClass, SensingInputs, EMBEDDING_DIM, +}; + +fn inputs(ts_ns: u64) -> SensingInputs { + SensingInputs { + timestamp_ns: ts_ns, + presence: true, + motion: 0.5, + person_count: 1, + sensing_confidence: 0.9, + sep: 0.2, + stab: 0.2, + consist: 0.2, + risk_conf: 0.2, + rf_signature_hash: None, + } +} + +fn embedding() -> IdentityEmbedding { + IdentityEmbedding::from_raw([0.05; EMBEDDING_DIM]) +} + +fn input(ts_ns: u64) -> PipelineInput { + PipelineInput { + inputs: inputs(ts_ns), + embedding: Some(embedding()), + } +} + +fn drain(published: &Arc>) -> Vec { + published + .lock() + .unwrap() + .published + .iter() + .map(|m| m.topic.clone()) + .collect() +} + +#[test] +fn handle_publishes_single_input() { + let pipeline = BfldPipeline::new(BfldConfig::new("seed-01")); + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone()); + + handle.send(input(0)).expect("send must succeed"); + + // Give the worker a moment to drain the channel. + thread::sleep(Duration::from_millis(50)); + handle.shutdown(); + + let topics = drain(&pub_arc); + assert_eq!(topics.len(), 5, "Anonymous + no zone → 5 topics"); +} + +#[test] +fn handle_publishes_multiple_inputs_in_order() { + let pipeline = BfldPipeline::new(BfldConfig::new("seed-01")); + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone()); + + for i in 0..3 { + handle.send(input(i * 1_000_000)).unwrap(); + } + thread::sleep(Duration::from_millis(80)); + handle.shutdown(); + + let topics = drain(&pub_arc); + assert_eq!(topics.len(), 15, "3 inputs × 5 topics each = 15"); +} + +#[test] +fn handle_send_after_shutdown_errors() { + let pipeline = BfldPipeline::new(BfldConfig::new("seed-01")); + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + let handle = BfldPipelineHandle::spawn(pipeline, pub_arc); + + // Save the sender by cloning before shutdown — but BfldPipelineHandle + // owns the sender, so the test demonstrates this via post-shutdown send: + handle.shutdown(); + // shutdown consumed handle; we can't call send afterward at the type + // level. The compile-time guarantee IS the test. +} + +#[test] +fn handle_drop_without_explicit_shutdown_joins_worker_cleanly() { + let pipeline = BfldPipeline::new(BfldConfig::new("seed-01")); + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + { + let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone()); + handle.send(input(0)).unwrap(); + thread::sleep(Duration::from_millis(50)); + // No explicit shutdown — Drop must handle worker join. + } + // If we reached here without hanging or panicking, the Drop path worked. + let topics = drain(&pub_arc); + assert_eq!(topics.len(), 5); +} + +#[test] +fn handle_honors_privacy_mode_toggle_via_pipeline_state() { + let mut pipeline = BfldPipeline::new(BfldConfig::new("seed-01")); + pipeline.enable_privacy_mode(); + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone()); + + handle.send(input(0)).unwrap(); + thread::sleep(Duration::from_millis(50)); + handle.shutdown(); + + let topics = drain(&pub_arc); + // Restricted + no zone: presence/motion/count/confidence = 4 topics. + assert_eq!(topics.len(), 4, "Restricted strips identity_risk topic"); + assert!(!topics.iter().any(|t| t.contains("identity_risk"))); +} + +#[test] +fn handle_drops_event_when_gate_rejects() { + let pipeline = BfldPipeline::new(BfldConfig::new("seed-01")); + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone()); + + // Two high-risk inputs back-to-back force the gate into Reject after debounce. + use wifi_densepose_bfld::coherence_gate::DEBOUNCE_NS; + let mut high_risk = inputs(0); + high_risk.sep = 1.0; + high_risk.stab = 1.0; + high_risk.consist = 1.0; + high_risk.risk_conf = 0.8; + handle + .send(PipelineInput { + inputs: high_risk.clone(), + embedding: Some(embedding()), + }) + .unwrap(); + high_risk.timestamp_ns = DEBOUNCE_NS; + handle + .send(PipelineInput { + inputs: high_risk, + embedding: Some(embedding()), + }) + .unwrap(); + thread::sleep(Duration::from_millis(80)); + handle.shutdown(); + + let topics = drain(&pub_arc); + // First input emits (Accept state) → 5 topics. Second input gate-promoted + // to Reject → 0 topics. Total = 5. + assert_eq!(topics.len(), 5, "Reject must drop the second event entirely"); +} + +#[test] +fn handle_with_zone_threads_through_to_published_topics() { + let pipeline = BfldPipeline::new(BfldConfig::new("seed-01").with_zone("kitchen")); + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone()); + + handle.send(input(0)).unwrap(); + thread::sleep(Duration::from_millis(50)); + handle.shutdown(); + + let topics = drain(&pub_arc); + assert!( + topics.iter().any(|t| t.contains("zone_activity")), + "zone_activity topic must be present when zone configured", + ); + + let zone_msg = pub_arc + .lock() + .unwrap() + .published + .iter() + .find(|m| m.topic.contains("zone_activity")) + .map(|m| m.payload.clone()); + assert_eq!(zone_msg.as_deref(), Some("\"kitchen\"")); +} + +#[test] +fn class_3_pipeline_baseline_produces_four_topics_per_input() { + // Baseline class = Restricted (no privacy_mode toggle needed). + let pipeline = BfldPipeline::new( + BfldConfig::new("seed-01").with_privacy_class(PrivacyClass::Restricted), + ); + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + let handle = BfldPipelineHandle::spawn(pipeline, pub_arc.clone()); + + handle.send(input(0)).unwrap(); + thread::sleep(Duration::from_millis(50)); + handle.shutdown(); + + assert_eq!(drain(&pub_arc).len(), 4); +}