feat(adr-118/p6.4): spawn_with_oracle for Soul Signature deployments (227/227 GREEN)

Iter 34. Closes the gap where BfldPipelineHandle had no path for an
operator-supplied SoulMatchOracle to reach the worker thread. The
emit_with_oracle surface added in iter 14 was unreachable through the
handle API — Soul Signature deployments (ADR-118 §1.4) had to either
drop down to BfldEmitter directly or accept Recalibrate gate-drops on
known-enrolled matches.

Added (in src/pipeline.rs):
- BfldPipeline::process_with_oracle<O: SoulMatchOracle>(
      inputs, embedding, oracle,
  ) -> Option<BfldEvent>
  Wraps emitter.emit_with_oracle then applies the same privacy_mode
  post-processing as process(). Privacy_mode and oracle are independent
  — class-3 demote still happens AFTER any oracle Recalibrate exemption.

Added (in src/pipeline_handle.rs):
- BfldPipelineHandle::spawn_with_oracle<P, O>(pipeline, publisher, oracle) -> Self
  where O: SoulMatchOracle + Send + Sync + 'static
  The worker thread owns the oracle and consults it on every recv().
  Worker loop now calls pipeline.process_with_oracle(...) instead of
  pipeline.process(...).

tests/handle_soul_oracle.rs (3 named tests, all green):
  spawn_with_oracle_null_is_equivalent_to_spawn
    Parity: 3 identical low-risk inputs through spawn() and
    spawn_with_oracle(NullOracle) produce the same publish count
    and the same motion-topic count.
  spawn_with_always_match_oracle_lets_events_publish_under_high_risk
    *** Headline test ***
    3 high-risk inputs spaced > DEBOUNCE_NS apart. With AlwaysMatch
    oracle, all 3 produce motion topics — the gate never reaches
    Recalibrate because the oracle reports an enrolled-person match.
  spawn_with_null_oracle_drops_events_under_sustained_recalibrate_score
    Negative control for the above: same 3 inputs through NullOracle,
    only 1 motion topic survives (the first input lands at Accept;
    the second and third hit Recalibrate after debounce and are
    dropped per ADR-121 §2.4).

ADR-124 status (iter step 0 sibling check):
- docs/adr/ADR-124-rvagent-mcp-ruvector-npm-integration.md unchanged
  at 431 lines. SENSE-BRIDGE scope remains orthogonal to BFLD core;
  no overlap with this iter.

ACs progressed:
- ADR-118 §1.4 Soul Signature companion contract end-to-end through
  the public handle API. Operators wiring Soul Signature into a
  RuView deployment now use:
      BfldPipelineHandle::spawn_with_oracle(pipeline, publisher, my_oracle)
  …and the rest of the per-frame flow stays identical to spawn().
- ADR-121 §2.6 Recalibrate exemption proven over the worker-thread
  boundary, not just at the unit level (iter 12 covered the gate-only
  case).

Test config:
- cargo test --no-default-features → 72 passed
- cargo test                       → 227 passed (224 + 3)

Out of scope (next iter target):
- GitHub Actions workflow with mosquitto Docker (lifts iters 24+29
  live-broker e2e from skip-mode). Remaining unmet ACs require
  either external resources (KIT BFId, Pi5/Nexmon) or CI infra.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-24 18:45:54 -04:00
parent 5c9c76bdaf
commit 38676aa2bd
3 changed files with 218 additions and 0 deletions

View File

@ -14,6 +14,7 @@
#![cfg(feature = "std")]
use crate::coherence_gate::SoulMatchOracle;
use crate::emitter::{BfldEmitter, SensingInputs};
use crate::identity_risk::GateAction;
use crate::signature_hasher::SignatureHasher;
@ -114,6 +115,24 @@ impl BfldPipeline {
Some(event)
}
/// Variant of [`Self::process`] that consults a [`SoulMatchOracle`] before
/// the coherence gate fires `Recalibrate`. See ADR-121 §2.6 and ADR-118
/// §1.4. The privacy_mode post-processing still applies; the oracle only
/// affects whether the gate transitions to Recalibrate at all.
pub fn process_with_oracle<O: SoulMatchOracle>(
&mut self,
inputs: SensingInputs,
embedding: Option<IdentityEmbedding>,
oracle: &O,
) -> Option<BfldEvent> {
let mut event = self.emitter.emit_with_oracle(inputs, embedding, oracle)?;
if self.privacy_mode {
event.privacy_class = PrivacyClass::Restricted;
event.apply_privacy_gating();
}
Some(event)
}
/// Wire-bytes variant of [`Self::process`]: returns a [`BfldFrame`] ready
/// to serialize via `BfldFrame::to_bytes()`. Caller supplies a
/// `header_template` carrying AP / STA / session identity fields and a

View File

@ -11,6 +11,7 @@
use std::sync::mpsc::{channel, RecvError, SendError, Sender};
use std::thread::{self, JoinHandle};
use crate::coherence_gate::SoulMatchOracle;
use crate::mqtt_topics::{publish_event, Publish};
use crate::pipeline::BfldPipeline;
use crate::{IdentityEmbedding, SensingInputs};
@ -63,6 +64,45 @@ impl BfldPipelineHandle {
}
}
/// Variant of [`Self::spawn`] that installs a long-lived
/// [`SoulMatchOracle`] used on every per-frame `process` call. The oracle
/// must be `Send + Sync + 'static` because the worker thread consults it
/// on every recv. Pairs with ADR-121 §2.6: when the oracle reports a
/// `Match`, a would-be Recalibrate gate transition is downgraded to
/// `PredictOnly` (high score is the *intended* outcome of a known-enrolled
/// person match, not an attacker-grade sniffer arrival).
#[must_use]
pub fn spawn_with_oracle<P, O>(
mut pipeline: BfldPipeline,
mut publisher: P,
oracle: O,
) -> Self
where
P: Publish + Send + 'static,
P::Error: core::fmt::Debug,
O: SoulMatchOracle + Send + Sync + 'static,
{
let (sender, receiver) = channel::<PipelineInput>();
let worker = thread::spawn(move || loop {
match receiver.recv() {
Ok(PipelineInput { inputs, embedding }) => {
if let Some(event) =
pipeline.process_with_oracle(inputs, embedding, &oracle)
{
if let Err(e) = publish_event(&mut publisher, &event) {
eprintln!("BFLD publish error: {e:?}");
}
}
}
Err(RecvError) => break,
}
});
Self {
sender,
worker: Some(worker),
}
}
/// Enqueue an input. Returns `SendError<PipelineInput>` (carrying the
/// rejected input) if the worker has already shut down.
pub fn send(&self, input: PipelineInput) -> Result<(), SendError<PipelineInput>> {

View File

@ -0,0 +1,159 @@
//! Acceptance tests for `BfldPipelineHandle::spawn_with_oracle`. ADR-121 §2.6
//! end-to-end: the operator-supplied Soul Signature oracle reaches the worker
//! thread and downgrades Recalibrate-grade scores to PredictOnly.
#![cfg(feature = "std")]
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use wifi_densepose_bfld::coherence_gate::DEBOUNCE_NS;
use wifi_densepose_bfld::{
BfldConfig, BfldPipeline, BfldPipelineHandle, CapturePublisher, IdentityEmbedding,
MatchOutcome, NullOracle, PipelineInput, SensingInputs, SoulMatchOracle, EMBEDDING_DIM,
};
const NS_PER_SEC: u64 = 1_000_000_000;
fn input_at(ts_secs: f64, risk: [f32; 4]) -> PipelineInput {
let [sep, stab, consist, risk_conf] = risk;
let ts_ns = (ts_secs * NS_PER_SEC as f64) as u64;
PipelineInput {
inputs: SensingInputs {
timestamp_ns: ts_ns,
presence: true,
motion: 0.5,
person_count: 1,
sensing_confidence: 0.9,
sep,
stab,
consist,
risk_conf,
rf_signature_hash: None,
},
embedding: Some(IdentityEmbedding::from_raw([0.05; EMBEDDING_DIM])),
}
}
struct AlwaysMatch;
impl SoulMatchOracle for AlwaysMatch {
fn matches_enrolled(&self) -> MatchOutcome {
MatchOutcome::Match {
person_id: 0xDEAD_BEEF,
}
}
}
fn topic_count(log: &CapturePublisher, contains: &str) -> usize {
log.published
.iter()
.filter(|m| m.topic.contains(contains))
.count()
}
#[test]
fn spawn_with_oracle_null_is_equivalent_to_spawn() {
let pub_a = Arc::new(Mutex::new(CapturePublisher::default()));
let pub_b = Arc::new(Mutex::new(CapturePublisher::default()));
let handle_a = BfldPipelineHandle::spawn(
BfldPipeline::new(BfldConfig::new("seed-null-1")),
pub_a.clone(),
);
let handle_b = BfldPipelineHandle::spawn_with_oracle(
BfldPipeline::new(BfldConfig::new("seed-null-1")),
pub_b.clone(),
NullOracle,
);
for i in 0..3 {
handle_a
.send(input_at(i as f64 * 0.1, [0.2, 0.2, 0.2, 0.2]))
.unwrap();
handle_b
.send(input_at(i as f64 * 0.1, [0.2, 0.2, 0.2, 0.2]))
.unwrap();
}
thread::sleep(Duration::from_millis(120));
handle_a.shutdown();
handle_b.shutdown();
let log_a = pub_a.lock().unwrap();
let log_b = pub_b.lock().unwrap();
assert_eq!(log_a.published.len(), log_b.published.len());
assert_eq!(
topic_count(&log_a, "/motion/state"),
topic_count(&log_b, "/motion/state"),
);
}
#[test]
fn spawn_with_always_match_oracle_lets_events_publish_under_high_risk() {
// Without the oracle (or with NullOracle), a sustained Recalibrate-grade
// score (all factors ≈ 1.0) promotes to Recalibrate after DEBOUNCE_NS
// and `process_with_oracle` returns None for those frames. With
// AlwaysMatch, the gate downgrades to PredictOnly, so events keep
// publishing.
let pub_arc = Arc::new(Mutex::new(CapturePublisher::default()));
let handle = BfldPipelineHandle::spawn_with_oracle(
BfldPipeline::new(BfldConfig::new("seed-match")),
pub_arc.clone(),
AlwaysMatch,
);
// Send 3 high-risk inputs separated by > DEBOUNCE_NS so the gate would
// have promoted to Recalibrate were it not for the oracle exemption.
handle.send(input_at(0.0, [1.0, 1.0, 1.0, 1.0])).unwrap();
let ts_after_debounce = (DEBOUNCE_NS as f64) / (NS_PER_SEC as f64);
handle
.send(input_at(ts_after_debounce, [1.0, 1.0, 1.0, 1.0]))
.unwrap();
handle
.send(input_at(ts_after_debounce * 2.0, [1.0, 1.0, 1.0, 1.0]))
.unwrap();
thread::sleep(Duration::from_millis(120));
handle.shutdown();
let log = pub_arc.lock().unwrap();
let motions = topic_count(&log, "/motion/state");
// All 3 inputs should yield motion topics — none dropped to Recalibrate.
assert_eq!(
motions, 3,
"AlwaysMatch oracle must prevent Recalibrate-drop, got {motions} motion topics",
);
}
#[test]
fn spawn_with_null_oracle_drops_events_under_sustained_recalibrate_score() {
// Negative control for the test above: same high-risk input sequence
// through NullOracle should DROP the second + later events (the gate
// promotes to Recalibrate after the first one passes through at Accept
// baseline and the debounce elapses).
let pub_arc = Arc::new(Mutex::new(CapturePublisher::default()));
let handle = BfldPipelineHandle::spawn_with_oracle(
BfldPipeline::new(BfldConfig::new("seed-null-drop")),
pub_arc.clone(),
NullOracle,
);
handle.send(input_at(0.0, [1.0, 1.0, 1.0, 1.0])).unwrap();
let ts_after_debounce = (DEBOUNCE_NS as f64) / (NS_PER_SEC as f64);
handle
.send(input_at(ts_after_debounce, [1.0, 1.0, 1.0, 1.0]))
.unwrap();
handle
.send(input_at(ts_after_debounce * 2.0, [1.0, 1.0, 1.0, 1.0]))
.unwrap();
thread::sleep(Duration::from_millis(120));
handle.shutdown();
let log = pub_arc.lock().unwrap();
let motions = topic_count(&log, "/motion/state");
// The first input passes (gate still in Accept). The second + third
// hit Recalibrate after debounce → dropped. Expect exactly 1.
assert_eq!(
motions, 1,
"NullOracle must let the gate Recalibrate-drop after debounce, got {motions} motion topics",
);
}