diff --git a/v2/crates/wifi-densepose-bfld/src/pipeline.rs b/v2/crates/wifi-densepose-bfld/src/pipeline.rs index 9e47386c..10892c36 100644 --- a/v2/crates/wifi-densepose-bfld/src/pipeline.rs +++ b/v2/crates/wifi-densepose-bfld/src/pipeline.rs @@ -14,6 +14,7 @@ #![cfg(feature = "std")] +use crate::coherence_gate::SoulMatchOracle; use crate::emitter::{BfldEmitter, SensingInputs}; use crate::identity_risk::GateAction; use crate::signature_hasher::SignatureHasher; @@ -114,6 +115,24 @@ impl BfldPipeline { Some(event) } + /// Variant of [`Self::process`] that consults a [`SoulMatchOracle`] before + /// the coherence gate fires `Recalibrate`. See ADR-121 §2.6 and ADR-118 + /// §1.4. The privacy_mode post-processing still applies; the oracle only + /// affects whether the gate transitions to Recalibrate at all. + pub fn process_with_oracle( + &mut self, + inputs: SensingInputs, + embedding: Option, + oracle: &O, + ) -> Option { + let mut event = self.emitter.emit_with_oracle(inputs, embedding, oracle)?; + if self.privacy_mode { + event.privacy_class = PrivacyClass::Restricted; + event.apply_privacy_gating(); + } + Some(event) + } + /// Wire-bytes variant of [`Self::process`]: returns a [`BfldFrame`] ready /// to serialize via `BfldFrame::to_bytes()`. Caller supplies a /// `header_template` carrying AP / STA / session identity fields and a diff --git a/v2/crates/wifi-densepose-bfld/src/pipeline_handle.rs b/v2/crates/wifi-densepose-bfld/src/pipeline_handle.rs index 459b0175..f82479b1 100644 --- a/v2/crates/wifi-densepose-bfld/src/pipeline_handle.rs +++ b/v2/crates/wifi-densepose-bfld/src/pipeline_handle.rs @@ -11,6 +11,7 @@ use std::sync::mpsc::{channel, RecvError, SendError, Sender}; use std::thread::{self, JoinHandle}; +use crate::coherence_gate::SoulMatchOracle; use crate::mqtt_topics::{publish_event, Publish}; use crate::pipeline::BfldPipeline; use crate::{IdentityEmbedding, SensingInputs}; @@ -63,6 +64,45 @@ impl BfldPipelineHandle { } } + /// Variant of [`Self::spawn`] that installs a long-lived + /// [`SoulMatchOracle`] used on every per-frame `process` call. The oracle + /// must be `Send + Sync + 'static` because the worker thread consults it + /// on every recv. Pairs with ADR-121 §2.6: when the oracle reports a + /// `Match`, a would-be Recalibrate gate transition is downgraded to + /// `PredictOnly` (high score is the *intended* outcome of a known-enrolled + /// person match, not an attacker-grade sniffer arrival). + #[must_use] + pub fn spawn_with_oracle( + mut pipeline: BfldPipeline, + mut publisher: P, + oracle: O, + ) -> Self + where + P: Publish + Send + 'static, + P::Error: core::fmt::Debug, + O: SoulMatchOracle + Send + Sync + 'static, + { + let (sender, receiver) = channel::(); + let worker = thread::spawn(move || loop { + match receiver.recv() { + Ok(PipelineInput { inputs, embedding }) => { + if let Some(event) = + pipeline.process_with_oracle(inputs, embedding, &oracle) + { + if let Err(e) = publish_event(&mut publisher, &event) { + eprintln!("BFLD publish error: {e:?}"); + } + } + } + Err(RecvError) => break, + } + }); + Self { + sender, + worker: Some(worker), + } + } + /// Enqueue an input. Returns `SendError` (carrying the /// rejected input) if the worker has already shut down. pub fn send(&self, input: PipelineInput) -> Result<(), SendError> { diff --git a/v2/crates/wifi-densepose-bfld/tests/handle_soul_oracle.rs b/v2/crates/wifi-densepose-bfld/tests/handle_soul_oracle.rs new file mode 100644 index 00000000..ee1f549d --- /dev/null +++ b/v2/crates/wifi-densepose-bfld/tests/handle_soul_oracle.rs @@ -0,0 +1,159 @@ +//! Acceptance tests for `BfldPipelineHandle::spawn_with_oracle`. ADR-121 §2.6 +//! end-to-end: the operator-supplied Soul Signature oracle reaches the worker +//! thread and downgrades Recalibrate-grade scores to PredictOnly. + +#![cfg(feature = "std")] + +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; + +use wifi_densepose_bfld::coherence_gate::DEBOUNCE_NS; +use wifi_densepose_bfld::{ + BfldConfig, BfldPipeline, BfldPipelineHandle, CapturePublisher, IdentityEmbedding, + MatchOutcome, NullOracle, PipelineInput, SensingInputs, SoulMatchOracle, EMBEDDING_DIM, +}; + +const NS_PER_SEC: u64 = 1_000_000_000; + +fn input_at(ts_secs: f64, risk: [f32; 4]) -> PipelineInput { + let [sep, stab, consist, risk_conf] = risk; + let ts_ns = (ts_secs * NS_PER_SEC as f64) as u64; + PipelineInput { + inputs: SensingInputs { + timestamp_ns: ts_ns, + presence: true, + motion: 0.5, + person_count: 1, + sensing_confidence: 0.9, + sep, + stab, + consist, + risk_conf, + rf_signature_hash: None, + }, + embedding: Some(IdentityEmbedding::from_raw([0.05; EMBEDDING_DIM])), + } +} + +struct AlwaysMatch; +impl SoulMatchOracle for AlwaysMatch { + fn matches_enrolled(&self) -> MatchOutcome { + MatchOutcome::Match { + person_id: 0xDEAD_BEEF, + } + } +} + +fn topic_count(log: &CapturePublisher, contains: &str) -> usize { + log.published + .iter() + .filter(|m| m.topic.contains(contains)) + .count() +} + +#[test] +fn spawn_with_oracle_null_is_equivalent_to_spawn() { + let pub_a = Arc::new(Mutex::new(CapturePublisher::default())); + let pub_b = Arc::new(Mutex::new(CapturePublisher::default())); + + let handle_a = BfldPipelineHandle::spawn( + BfldPipeline::new(BfldConfig::new("seed-null-1")), + pub_a.clone(), + ); + let handle_b = BfldPipelineHandle::spawn_with_oracle( + BfldPipeline::new(BfldConfig::new("seed-null-1")), + pub_b.clone(), + NullOracle, + ); + + for i in 0..3 { + handle_a + .send(input_at(i as f64 * 0.1, [0.2, 0.2, 0.2, 0.2])) + .unwrap(); + handle_b + .send(input_at(i as f64 * 0.1, [0.2, 0.2, 0.2, 0.2])) + .unwrap(); + } + thread::sleep(Duration::from_millis(120)); + handle_a.shutdown(); + handle_b.shutdown(); + + let log_a = pub_a.lock().unwrap(); + let log_b = pub_b.lock().unwrap(); + assert_eq!(log_a.published.len(), log_b.published.len()); + assert_eq!( + topic_count(&log_a, "/motion/state"), + topic_count(&log_b, "/motion/state"), + ); +} + +#[test] +fn spawn_with_always_match_oracle_lets_events_publish_under_high_risk() { + // Without the oracle (or with NullOracle), a sustained Recalibrate-grade + // score (all factors ≈ 1.0) promotes to Recalibrate after DEBOUNCE_NS + // and `process_with_oracle` returns None for those frames. With + // AlwaysMatch, the gate downgrades to PredictOnly, so events keep + // publishing. + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + let handle = BfldPipelineHandle::spawn_with_oracle( + BfldPipeline::new(BfldConfig::new("seed-match")), + pub_arc.clone(), + AlwaysMatch, + ); + + // Send 3 high-risk inputs separated by > DEBOUNCE_NS so the gate would + // have promoted to Recalibrate were it not for the oracle exemption. + handle.send(input_at(0.0, [1.0, 1.0, 1.0, 1.0])).unwrap(); + let ts_after_debounce = (DEBOUNCE_NS as f64) / (NS_PER_SEC as f64); + handle + .send(input_at(ts_after_debounce, [1.0, 1.0, 1.0, 1.0])) + .unwrap(); + handle + .send(input_at(ts_after_debounce * 2.0, [1.0, 1.0, 1.0, 1.0])) + .unwrap(); + thread::sleep(Duration::from_millis(120)); + handle.shutdown(); + + let log = pub_arc.lock().unwrap(); + let motions = topic_count(&log, "/motion/state"); + // All 3 inputs should yield motion topics — none dropped to Recalibrate. + assert_eq!( + motions, 3, + "AlwaysMatch oracle must prevent Recalibrate-drop, got {motions} motion topics", + ); +} + +#[test] +fn spawn_with_null_oracle_drops_events_under_sustained_recalibrate_score() { + // Negative control for the test above: same high-risk input sequence + // through NullOracle should DROP the second + later events (the gate + // promotes to Recalibrate after the first one passes through at Accept + // baseline and the debounce elapses). + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + let handle = BfldPipelineHandle::spawn_with_oracle( + BfldPipeline::new(BfldConfig::new("seed-null-drop")), + pub_arc.clone(), + NullOracle, + ); + + handle.send(input_at(0.0, [1.0, 1.0, 1.0, 1.0])).unwrap(); + let ts_after_debounce = (DEBOUNCE_NS as f64) / (NS_PER_SEC as f64); + handle + .send(input_at(ts_after_debounce, [1.0, 1.0, 1.0, 1.0])) + .unwrap(); + handle + .send(input_at(ts_after_debounce * 2.0, [1.0, 1.0, 1.0, 1.0])) + .unwrap(); + thread::sleep(Duration::from_millis(120)); + handle.shutdown(); + + let log = pub_arc.lock().unwrap(); + let motions = topic_count(&log, "/motion/state"); + // The first input passes (gate still in Accept). The second + third + // hit Recalibrate after debounce → dropped. Expect exactly 1. + assert_eq!( + motions, 1, + "NullOracle must let the gate Recalibrate-drop after debounce, got {motions} motion topics", + ); +}