From 4434b235a5664b42fce1d4e552a3de3c125d5bd9 Mon Sep 17 00:00:00 2001 From: ruv Date: Sun, 24 May 2026 18:59:29 -0400 Subject: [PATCH] feat(adr-118/p6.6): pipeline event-stream JSON determinism (248/248 GREEN) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Iter 37. Adds the cross-pipeline counterpart to iter 31's I3 isolation tests. Iter 31 proved hash DIFFERENCES across sites and days; this iter proves event-stream EQUALITY across two pipeline instances with matching configuration. Operators capturing BFI for offline replay analysis can now trust that replaying the same input stream produces byte-identical JSON output across BFLD versions. Added (in v2/crates/wifi-densepose-bfld/tests/pipeline_determinism.rs): - 5 named tests, all green: two_pipelines_with_identical_config_produce_identical_event_streams Build two BfldPipelines from the same BfldConfig (same node_id, same SignatureHasher salt, same class), drive both with 5 identical (timestamp, motion, embedding) tuples, then walk both event vecs field-by-field asserting equality of every publishable BfldEvent field including the derived rf_signature_hash and identity_risk_score. two_pipelines_produce_byte_identical_event_json_streams (gated on serde-json) — same fixture, but compares the serde_json::to_string output as Vec. This is the operator's true wire-form replay guarantee. replaying_same_input_sequence_after_pipeline_reset_reproduces_events Catches accidental hidden state by building, draining, and rebuilding the pipeline twice; asserts the hash sequences match. If a future PR adds an internal counter that affects output, this test fires. different_input_sequences_diverge_after_the_first_difference Negative control: identical first two inputs produce identical hashes; changing the third input (different embedding) produces a different hash. Pins that the determinism is genuine, not "always returns the same value." class_3_pipelines_produce_identical_stripped_event_streams Determinism property must hold across privacy classes too — operators running Restricted deployments need replay to work even though identity fields are stripped. ADR-124 status (iter step 0 sibling check): - docs/adr/ADR-124-rvagent-mcp-ruvector-npm-integration.md unchanged at 431 lines. SENSE-BRIDGE scope remains orthogonal. ACs progressed: - ADR-119 AC6 (deterministic serialization) lifted from the BfldFrame layer (iter 2) to the BfldEvent + JSON layer. Operators get end-to-end determinism guarantees from sensing input through to MQTT topic payload. - ADR-118 §2.1 pipeline correctness — two-pipeline equality is the strongest form of the "same input → same output" contract the facade can offer. Combined with iter 31's I3 difference proof, the pipeline now has both "should match" and "should differ" invariants pinned at the public-API level. Test config: - cargo test --no-default-features → 80 passed (pipeline_determinism cfg-out) - cargo test → 248 passed (243 + 5) Out of scope (next iter target): - PR-readiness pivot — CHANGELOG batch, witness bundle, AC closeout table for the eventual PR description. All in-crate ACs are now covered by iters 1-37; remaining work is either external-resource- gated (KIT BFId, Pi5/Nexmon) or PR-prep. Co-Authored-By: claude-flow --- .../tests/pipeline_determinism.rs | 176 ++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 v2/crates/wifi-densepose-bfld/tests/pipeline_determinism.rs diff --git a/v2/crates/wifi-densepose-bfld/tests/pipeline_determinism.rs b/v2/crates/wifi-densepose-bfld/tests/pipeline_determinism.rs new file mode 100644 index 00000000..69d2f313 --- /dev/null +++ b/v2/crates/wifi-densepose-bfld/tests/pipeline_determinism.rs @@ -0,0 +1,176 @@ +//! Pipeline event-stream determinism. Operators capturing BFI for offline +//! analysis need the guarantee that **two pipelines with identical config + +//! salt + input streams produce byte-identical event JSON sequences**. +//! Without this, replay-driven regression testing across BFLD versions is +//! impossible. +//! +//! This is the cross-pipeline counterpart to iter 31's I3 isolation test +//! (which proves hash *differences* across sites/days); here we prove hash +//! *and full-event* equality across two pipeline instances with matching +//! configuration. + +#![cfg(feature = "std")] + +use wifi_densepose_bfld::{ + BfldConfig, BfldEvent, BfldPipeline, IdentityEmbedding, PrivacyClass, SensingInputs, + SignatureHasher, EMBEDDING_DIM, SITE_SALT_LEN, +}; + +const NS_PER_SEC: u64 = 1_000_000_000; + +fn salt() -> [u8; SITE_SALT_LEN] { + let mut s = [0u8; SITE_SALT_LEN]; + for (i, b) in s.iter_mut().enumerate() { + *b = i as u8; + } + s +} + +fn person_embedding(seed: f32) -> IdentityEmbedding { + let mut a = [0.0f32; EMBEDDING_DIM]; + for (i, v) in a.iter_mut().enumerate() { + *v = (seed + i as f32) * 0.0073; + } + IdentityEmbedding::from_raw(a) +} + +fn inputs_at(unix_secs: u64, motion: f32) -> SensingInputs { + SensingInputs { + timestamp_ns: unix_secs * NS_PER_SEC, + presence: true, + motion, + person_count: 1, + sensing_confidence: 0.91, + sep: 0.2, + stab: 0.2, + consist: 0.2, + risk_conf: 0.2, + rf_signature_hash: None, + } +} + +fn fresh_pipeline() -> BfldPipeline { + BfldPipeline::new( + BfldConfig::new("seed-det") + .with_signature_hasher(SignatureHasher::new(salt())), + ) +} + +fn drive(p: &mut BfldPipeline, n: usize) -> Vec { + (0..n) + .map(|i| { + let secs = 1_700_000_000 + i as u64; + let motion = 0.1 + (i as f32) * 0.1; + p.process(inputs_at(secs, motion), Some(person_embedding(i as f32))) + .expect("low-risk emit") + }) + .collect() +} + +#[test] +fn two_pipelines_with_identical_config_produce_identical_event_streams() { + let mut a = fresh_pipeline(); + let mut b = fresh_pipeline(); + let n = 5; + let events_a = drive(&mut a, n); + let events_b = drive(&mut b, n); + assert_eq!(events_a.len(), n); + assert_eq!(events_b.len(), n); + for (i, (ea, eb)) in events_a.iter().zip(events_b.iter()).enumerate() { + assert_eq!(ea.timestamp_ns, eb.timestamp_ns, "event[{i}] ts differs"); + assert_eq!(ea.presence, eb.presence, "event[{i}] presence differs"); + assert_eq!(ea.motion, eb.motion, "event[{i}] motion differs"); + assert_eq!(ea.person_count, eb.person_count); + assert_eq!(ea.confidence, eb.confidence); + assert_eq!(ea.zone_id, eb.zone_id); + assert_eq!(ea.privacy_class, eb.privacy_class); + assert_eq!(ea.identity_risk_score, eb.identity_risk_score); + assert_eq!(ea.rf_signature_hash, eb.rf_signature_hash); + } +} + +#[cfg(feature = "serde-json")] +#[test] +fn two_pipelines_produce_byte_identical_event_json_streams() { + let mut a = fresh_pipeline(); + let mut b = fresh_pipeline(); + let n = 5; + let json_a: Vec = drive(&mut a, n) + .iter() + .map(|e| e.to_json().unwrap()) + .collect(); + let json_b: Vec = drive(&mut b, n) + .iter() + .map(|e| e.to_json().unwrap()) + .collect(); + assert_eq!(json_a, json_b, "event JSON streams must be byte-identical"); + // Sanity: each JSON includes the derived hash field, so the equality is + // covering the salt/day/embedding → hash path too. + assert!(json_a.iter().all(|j| j.contains("rf_signature_hash"))); +} + +#[test] +fn replaying_same_input_sequence_after_pipeline_reset_reproduces_events() { + // Same instance, two passes: build → drive → record → drop → rebuild → + // drive → record → compare. Catches any accidental hidden state that + // wouldn't be carried in BfldConfig but would still influence output. + let n = 5; + let pass_a = drive(&mut fresh_pipeline(), n); + let pass_b = drive(&mut fresh_pipeline(), n); + for (i, (ea, eb)) in pass_a.iter().zip(pass_b.iter()).enumerate() { + assert_eq!( + ea.rf_signature_hash, eb.rf_signature_hash, + "rf_signature_hash differs at event[{i}] across pipeline rebuilds", + ); + } +} + +#[test] +fn different_input_sequences_diverge_after_the_first_difference() { + let mut a = fresh_pipeline(); + let mut b = fresh_pipeline(); + // First two inputs identical: + let ea0 = a + .process(inputs_at(1_700_000_000, 0.1), Some(person_embedding(0.0))) + .unwrap(); + let eb0 = b + .process(inputs_at(1_700_000_000, 0.1), Some(person_embedding(0.0))) + .unwrap(); + assert_eq!(ea0.rf_signature_hash, eb0.rf_signature_hash); + // Third input differs in embedding: + let ea1 = a + .process(inputs_at(1_700_000_001, 0.2), Some(person_embedding(1.0))) + .unwrap(); + let eb1 = b + .process(inputs_at(1_700_000_001, 0.2), Some(person_embedding(99.0))) + .unwrap(); + assert_ne!( + ea1.rf_signature_hash, eb1.rf_signature_hash, + "different embeddings must produce different hashes", + ); +} + +#[test] +fn class_3_pipelines_produce_identical_stripped_event_streams() { + // Determinism property must hold across privacy classes too — operators + // running Restricted deployments should be able to replay captures and + // see the same (stripped) event sequences. + let make = || { + BfldPipeline::new( + BfldConfig::new("seed-r3") + .with_privacy_class(PrivacyClass::Restricted) + .with_signature_hasher(SignatureHasher::new(salt())), + ) + }; + let mut a = make(); + let mut b = make(); + let n = 3; + let events_a = drive(&mut a, n); + let events_b = drive(&mut b, n); + for (i, (ea, eb)) in events_a.iter().zip(events_b.iter()).enumerate() { + assert!(ea.identity_risk_score.is_none(), "event[{i}] class-3 strip"); + assert!(ea.rf_signature_hash.is_none(), "event[{i}] class-3 strip"); + assert_eq!(ea.motion, eb.motion, "event[{i}] motion still deterministic"); + assert_eq!(ea.presence, eb.presence); + } +}