From 38676aa2bdf09cf453609e0aa1062823f86ce66b Mon Sep 17 00:00:00 2001 From: ruv Date: Sun, 24 May 2026 18:45:54 -0400 Subject: [PATCH] feat(adr-118/p6.4): spawn_with_oracle for Soul Signature deployments (227/227 GREEN) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Iter 34. Closes the gap where BfldPipelineHandle had no path for an operator-supplied SoulMatchOracle to reach the worker thread. The emit_with_oracle surface added in iter 14 was unreachable through the handle API — Soul Signature deployments (ADR-118 §1.4) had to either drop down to BfldEmitter directly or accept Recalibrate gate-drops on known-enrolled matches. Added (in src/pipeline.rs): - BfldPipeline::process_with_oracle( inputs, embedding, oracle, ) -> Option Wraps emitter.emit_with_oracle then applies the same privacy_mode post-processing as process(). Privacy_mode and oracle are independent — class-3 demote still happens AFTER any oracle Recalibrate exemption. Added (in src/pipeline_handle.rs): - BfldPipelineHandle::spawn_with_oracle(pipeline, publisher, oracle) -> Self where O: SoulMatchOracle + Send + Sync + 'static The worker thread owns the oracle and consults it on every recv(). Worker loop now calls pipeline.process_with_oracle(...) instead of pipeline.process(...). tests/handle_soul_oracle.rs (3 named tests, all green): spawn_with_oracle_null_is_equivalent_to_spawn Parity: 3 identical low-risk inputs through spawn() and spawn_with_oracle(NullOracle) produce the same publish count and the same motion-topic count. spawn_with_always_match_oracle_lets_events_publish_under_high_risk *** Headline test *** 3 high-risk inputs spaced > DEBOUNCE_NS apart. With AlwaysMatch oracle, all 3 produce motion topics — the gate never reaches Recalibrate because the oracle reports an enrolled-person match. spawn_with_null_oracle_drops_events_under_sustained_recalibrate_score Negative control for the above: same 3 inputs through NullOracle, only 1 motion topic survives (the first input lands at Accept; the second and third hit Recalibrate after debounce and are dropped per ADR-121 §2.4). ADR-124 status (iter step 0 sibling check): - docs/adr/ADR-124-rvagent-mcp-ruvector-npm-integration.md unchanged at 431 lines. SENSE-BRIDGE scope remains orthogonal to BFLD core; no overlap with this iter. ACs progressed: - ADR-118 §1.4 Soul Signature companion contract end-to-end through the public handle API. Operators wiring Soul Signature into a RuView deployment now use: BfldPipelineHandle::spawn_with_oracle(pipeline, publisher, my_oracle) …and the rest of the per-frame flow stays identical to spawn(). - ADR-121 §2.6 Recalibrate exemption proven over the worker-thread boundary, not just at the unit level (iter 12 covered the gate-only case). Test config: - cargo test --no-default-features → 72 passed - cargo test → 227 passed (224 + 3) Out of scope (next iter target): - GitHub Actions workflow with mosquitto Docker (lifts iters 24+29 live-broker e2e from skip-mode). Remaining unmet ACs require either external resources (KIT BFId, Pi5/Nexmon) or CI infra. Co-Authored-By: claude-flow --- v2/crates/wifi-densepose-bfld/src/pipeline.rs | 19 +++ .../src/pipeline_handle.rs | 40 +++++ .../tests/handle_soul_oracle.rs | 159 ++++++++++++++++++ 3 files changed, 218 insertions(+) create mode 100644 v2/crates/wifi-densepose-bfld/tests/handle_soul_oracle.rs diff --git a/v2/crates/wifi-densepose-bfld/src/pipeline.rs b/v2/crates/wifi-densepose-bfld/src/pipeline.rs index 9e47386c..10892c36 100644 --- a/v2/crates/wifi-densepose-bfld/src/pipeline.rs +++ b/v2/crates/wifi-densepose-bfld/src/pipeline.rs @@ -14,6 +14,7 @@ #![cfg(feature = "std")] +use crate::coherence_gate::SoulMatchOracle; use crate::emitter::{BfldEmitter, SensingInputs}; use crate::identity_risk::GateAction; use crate::signature_hasher::SignatureHasher; @@ -114,6 +115,24 @@ impl BfldPipeline { Some(event) } + /// Variant of [`Self::process`] that consults a [`SoulMatchOracle`] before + /// the coherence gate fires `Recalibrate`. See ADR-121 §2.6 and ADR-118 + /// §1.4. The privacy_mode post-processing still applies; the oracle only + /// affects whether the gate transitions to Recalibrate at all. + pub fn process_with_oracle( + &mut self, + inputs: SensingInputs, + embedding: Option, + oracle: &O, + ) -> Option { + let mut event = self.emitter.emit_with_oracle(inputs, embedding, oracle)?; + if self.privacy_mode { + event.privacy_class = PrivacyClass::Restricted; + event.apply_privacy_gating(); + } + Some(event) + } + /// Wire-bytes variant of [`Self::process`]: returns a [`BfldFrame`] ready /// to serialize via `BfldFrame::to_bytes()`. Caller supplies a /// `header_template` carrying AP / STA / session identity fields and a diff --git a/v2/crates/wifi-densepose-bfld/src/pipeline_handle.rs b/v2/crates/wifi-densepose-bfld/src/pipeline_handle.rs index 459b0175..f82479b1 100644 --- a/v2/crates/wifi-densepose-bfld/src/pipeline_handle.rs +++ b/v2/crates/wifi-densepose-bfld/src/pipeline_handle.rs @@ -11,6 +11,7 @@ use std::sync::mpsc::{channel, RecvError, SendError, Sender}; use std::thread::{self, JoinHandle}; +use crate::coherence_gate::SoulMatchOracle; use crate::mqtt_topics::{publish_event, Publish}; use crate::pipeline::BfldPipeline; use crate::{IdentityEmbedding, SensingInputs}; @@ -63,6 +64,45 @@ impl BfldPipelineHandle { } } + /// Variant of [`Self::spawn`] that installs a long-lived + /// [`SoulMatchOracle`] used on every per-frame `process` call. The oracle + /// must be `Send + Sync + 'static` because the worker thread consults it + /// on every recv. Pairs with ADR-121 §2.6: when the oracle reports a + /// `Match`, a would-be Recalibrate gate transition is downgraded to + /// `PredictOnly` (high score is the *intended* outcome of a known-enrolled + /// person match, not an attacker-grade sniffer arrival). + #[must_use] + pub fn spawn_with_oracle( + mut pipeline: BfldPipeline, + mut publisher: P, + oracle: O, + ) -> Self + where + P: Publish + Send + 'static, + P::Error: core::fmt::Debug, + O: SoulMatchOracle + Send + Sync + 'static, + { + let (sender, receiver) = channel::(); + let worker = thread::spawn(move || loop { + match receiver.recv() { + Ok(PipelineInput { inputs, embedding }) => { + if let Some(event) = + pipeline.process_with_oracle(inputs, embedding, &oracle) + { + if let Err(e) = publish_event(&mut publisher, &event) { + eprintln!("BFLD publish error: {e:?}"); + } + } + } + Err(RecvError) => break, + } + }); + Self { + sender, + worker: Some(worker), + } + } + /// Enqueue an input. Returns `SendError` (carrying the /// rejected input) if the worker has already shut down. pub fn send(&self, input: PipelineInput) -> Result<(), SendError> { diff --git a/v2/crates/wifi-densepose-bfld/tests/handle_soul_oracle.rs b/v2/crates/wifi-densepose-bfld/tests/handle_soul_oracle.rs new file mode 100644 index 00000000..ee1f549d --- /dev/null +++ b/v2/crates/wifi-densepose-bfld/tests/handle_soul_oracle.rs @@ -0,0 +1,159 @@ +//! Acceptance tests for `BfldPipelineHandle::spawn_with_oracle`. ADR-121 §2.6 +//! end-to-end: the operator-supplied Soul Signature oracle reaches the worker +//! thread and downgrades Recalibrate-grade scores to PredictOnly. + +#![cfg(feature = "std")] + +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; + +use wifi_densepose_bfld::coherence_gate::DEBOUNCE_NS; +use wifi_densepose_bfld::{ + BfldConfig, BfldPipeline, BfldPipelineHandle, CapturePublisher, IdentityEmbedding, + MatchOutcome, NullOracle, PipelineInput, SensingInputs, SoulMatchOracle, EMBEDDING_DIM, +}; + +const NS_PER_SEC: u64 = 1_000_000_000; + +fn input_at(ts_secs: f64, risk: [f32; 4]) -> PipelineInput { + let [sep, stab, consist, risk_conf] = risk; + let ts_ns = (ts_secs * NS_PER_SEC as f64) as u64; + PipelineInput { + inputs: SensingInputs { + timestamp_ns: ts_ns, + presence: true, + motion: 0.5, + person_count: 1, + sensing_confidence: 0.9, + sep, + stab, + consist, + risk_conf, + rf_signature_hash: None, + }, + embedding: Some(IdentityEmbedding::from_raw([0.05; EMBEDDING_DIM])), + } +} + +struct AlwaysMatch; +impl SoulMatchOracle for AlwaysMatch { + fn matches_enrolled(&self) -> MatchOutcome { + MatchOutcome::Match { + person_id: 0xDEAD_BEEF, + } + } +} + +fn topic_count(log: &CapturePublisher, contains: &str) -> usize { + log.published + .iter() + .filter(|m| m.topic.contains(contains)) + .count() +} + +#[test] +fn spawn_with_oracle_null_is_equivalent_to_spawn() { + let pub_a = Arc::new(Mutex::new(CapturePublisher::default())); + let pub_b = Arc::new(Mutex::new(CapturePublisher::default())); + + let handle_a = BfldPipelineHandle::spawn( + BfldPipeline::new(BfldConfig::new("seed-null-1")), + pub_a.clone(), + ); + let handle_b = BfldPipelineHandle::spawn_with_oracle( + BfldPipeline::new(BfldConfig::new("seed-null-1")), + pub_b.clone(), + NullOracle, + ); + + for i in 0..3 { + handle_a + .send(input_at(i as f64 * 0.1, [0.2, 0.2, 0.2, 0.2])) + .unwrap(); + handle_b + .send(input_at(i as f64 * 0.1, [0.2, 0.2, 0.2, 0.2])) + .unwrap(); + } + thread::sleep(Duration::from_millis(120)); + handle_a.shutdown(); + handle_b.shutdown(); + + let log_a = pub_a.lock().unwrap(); + let log_b = pub_b.lock().unwrap(); + assert_eq!(log_a.published.len(), log_b.published.len()); + assert_eq!( + topic_count(&log_a, "/motion/state"), + topic_count(&log_b, "/motion/state"), + ); +} + +#[test] +fn spawn_with_always_match_oracle_lets_events_publish_under_high_risk() { + // Without the oracle (or with NullOracle), a sustained Recalibrate-grade + // score (all factors ≈ 1.0) promotes to Recalibrate after DEBOUNCE_NS + // and `process_with_oracle` returns None for those frames. With + // AlwaysMatch, the gate downgrades to PredictOnly, so events keep + // publishing. + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + let handle = BfldPipelineHandle::spawn_with_oracle( + BfldPipeline::new(BfldConfig::new("seed-match")), + pub_arc.clone(), + AlwaysMatch, + ); + + // Send 3 high-risk inputs separated by > DEBOUNCE_NS so the gate would + // have promoted to Recalibrate were it not for the oracle exemption. + handle.send(input_at(0.0, [1.0, 1.0, 1.0, 1.0])).unwrap(); + let ts_after_debounce = (DEBOUNCE_NS as f64) / (NS_PER_SEC as f64); + handle + .send(input_at(ts_after_debounce, [1.0, 1.0, 1.0, 1.0])) + .unwrap(); + handle + .send(input_at(ts_after_debounce * 2.0, [1.0, 1.0, 1.0, 1.0])) + .unwrap(); + thread::sleep(Duration::from_millis(120)); + handle.shutdown(); + + let log = pub_arc.lock().unwrap(); + let motions = topic_count(&log, "/motion/state"); + // All 3 inputs should yield motion topics — none dropped to Recalibrate. + assert_eq!( + motions, 3, + "AlwaysMatch oracle must prevent Recalibrate-drop, got {motions} motion topics", + ); +} + +#[test] +fn spawn_with_null_oracle_drops_events_under_sustained_recalibrate_score() { + // Negative control for the test above: same high-risk input sequence + // through NullOracle should DROP the second + later events (the gate + // promotes to Recalibrate after the first one passes through at Accept + // baseline and the debounce elapses). + let pub_arc = Arc::new(Mutex::new(CapturePublisher::default())); + let handle = BfldPipelineHandle::spawn_with_oracle( + BfldPipeline::new(BfldConfig::new("seed-null-drop")), + pub_arc.clone(), + NullOracle, + ); + + handle.send(input_at(0.0, [1.0, 1.0, 1.0, 1.0])).unwrap(); + let ts_after_debounce = (DEBOUNCE_NS as f64) / (NS_PER_SEC as f64); + handle + .send(input_at(ts_after_debounce, [1.0, 1.0, 1.0, 1.0])) + .unwrap(); + handle + .send(input_at(ts_after_debounce * 2.0, [1.0, 1.0, 1.0, 1.0])) + .unwrap(); + thread::sleep(Duration::from_millis(120)); + handle.shutdown(); + + let log = pub_arc.lock().unwrap(); + let motions = topic_count(&log, "/motion/state"); + // The first input passes (gate still in Accept). The second + third + // hit Recalibrate after debounce → dropped. Expect exactly 1. + assert_eq!( + motions, 1, + "NullOracle must let the gate Recalibrate-drop after debounce, got {motions} motion topics", + ); +}