feat(engine): compose ADR-138/142/143 + ADR-139 live loop

- ADR-138: process_cycle runs ArrayCoordinator when node geometry is registered;
  array contradictions (CoherenceDrop/GeometryInsufficient) fold into the
  privacy demotion; DirectionalEvidence surfaced in TrustedOutput
- ADR-142: per-node mean-amplitude → EvolutionTracker; cross-link change-point
  recorded as a WorldGraph Event node
- ADR-143: ingest_reflectors() runs Rf-SLAM discovery, writes stable
  Wall/Furniture reflectors as ObjectAnchor nodes
- ADR-139 live loop: update_person_track(), apply_active_privacy_mode()
  (PrivacyRollup suppresses person_track under identity-strict modes),
  snapshot_json()
- Acceptance test live_frame_to_reload_same_contents: full path
  fusion->worldgraph->privacy_rollup->persist->reload->same contents, no raw RF
- 9 engine tests; workspace 0 errors

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
ruv 2026-05-29 08:31:05 -04:00
parent 2eada40e3b
commit 2517a16d88
3 changed files with 352 additions and 9 deletions

1
v2/Cargo.lock generated
View File

@ -10659,6 +10659,7 @@ dependencies = [
"wifi-densepose-bfld",
"wifi-densepose-core",
"wifi-densepose-geo",
"wifi-densepose-ruvector",
"wifi-densepose-signal",
"wifi-densepose-worldgraph",
]

View File

@ -11,6 +11,7 @@ repository.workspace = true
# Composed building blocks (ADR-135..146).
wifi-densepose-core = { version = "0.3.0", path = "../wifi-densepose-core" }
wifi-densepose-signal = { version = "0.3.1", path = "../wifi-densepose-signal", default-features = false }
wifi-densepose-ruvector = { version = "0.3.0", path = "../wifi-densepose-ruvector", default-features = false }
# bfld is no_std by default; the privacy CONTROL PLANE (PrivacyModeRegistry) is
# std-gated, so request std explicitly even under a workspace --no-default-features build.
wifi-densepose-bfld = { version = "0.3.0", path = "../wifi-densepose-bfld", features = ["std"] }

View File

@ -30,13 +30,20 @@
#![forbid(unsafe_code)]
use wifi_densepose_bfld::{PrivacyClass, PrivacyMode, PrivacyModeRegistry};
use std::collections::BTreeMap;
use wifi_densepose_bfld::{PrivacyAction, PrivacyClass, PrivacyMode, PrivacyModeRegistry};
use wifi_densepose_geo::types::GeoRegistration;
use wifi_densepose_ruvector::viewpoint::coherence::ClockQualityScore;
use wifi_densepose_signal::ruvsense::fusion_quality::CalibrationId;
use wifi_densepose_signal::ruvsense::multistatic::{MultistaticConfig, MultistaticFuser};
use wifi_densepose_signal::ruvsense::{MultiBandCsiFrame, QualityScore};
use wifi_densepose_signal::ruvsense::{
ArrayCoordinator, ArrayCoordinatorConfig, ArrayNodeInput, ChangePoint, DirectionalEvidence,
EvolutionTracker, MultiBandCsiFrame, QualityScore, ReflectorObservation, RfSlam,
};
use wifi_densepose_worldgraph::{
EnuPoint, SemanticProvenance, WorldEdge, WorldGraph, WorldId, WorldNode, ZoneBoundsEnu,
AnchorKind, EnuPoint, PrivacyRollup, SemanticProvenance, WorldEdge, WorldGraph, WorldGraphError,
WorldId, WorldNode, ZoneBoundsEnu,
};
/// Errors from an engine cycle.
@ -60,6 +67,14 @@ impl From<wifi_densepose_signal::ruvsense::multistatic::MultistaticError> for En
}
}
/// Geometry of a sensing node, needed to run the ADR-138 array coordinator.
#[derive(Debug, Clone, Copy)]
struct NodeGeom {
x: f32,
y: f32,
azimuth: f32,
}
/// The auditable result of one engine cycle — the trust chain made concrete.
#[derive(Debug, Clone)]
pub struct TrustedOutput {
@ -73,6 +88,12 @@ pub struct TrustedOutput {
pub demoted: bool,
/// The mandatory provenance attached to the semantic node.
pub provenance: SemanticProvenance,
/// ADR-138 directional evidence, when node geometry is registered for every
/// contributing node (else `None`).
pub directional: Option<DirectionalEvidence>,
/// ADR-142 cross-link change-point detected this cycle, if any (and the
/// `Event` node it was recorded as in the WorldGraph).
pub change_point: Option<(ChangePoint, WorldId)>,
}
/// Composition root for the RuView streaming engine.
@ -83,6 +104,15 @@ pub struct StreamingEngine {
world: WorldGraph,
model_version: u16,
cycle: u64,
// ADR-138: array coordinator + per-node geometry (by frame node_id).
array: ArrayCoordinator,
node_geom: BTreeMap<u8, NodeGeom>,
// ADR-142: per-link evolution tracker (sized lazily to the node count).
evolution: Option<EvolutionTracker>,
// ADR-143: persistent reflector discovery (v2 mode).
slam: RfSlam,
// ADR-139 live loop: stable track_id -> PersonTrack WorldId.
person_tracks: BTreeMap<u64, WorldId>,
}
impl StreamingEngine {
@ -97,9 +127,100 @@ impl StreamingEngine {
world: WorldGraph::new(registration),
model_version,
cycle: 0,
array: ArrayCoordinator::new(ArrayCoordinatorConfig::default()),
node_geom: BTreeMap::new(),
evolution: None,
slam: RfSlam::with_discovery(0.5, 5, 0.6),
person_tracks: BTreeMap::new(),
}
}
/// ADR-139 live loop: create or update a `PersonTrack` node by stable
/// `track_id`, locate it in `room`, and wire an `Observes` edge from
/// `sensor` (so the privacy rollup can suppress it under identity-strict
/// modes). Returns the (stable) WorldGraph id.
pub fn update_person_track(
&mut self,
track_id: u64,
x: f32,
y: f32,
room: WorldId,
sensor: WorldId,
) -> WorldId {
let existing = self.person_tracks.get(&track_id).copied();
let node = WorldNode::PersonTrack {
id: existing.unwrap_or(WorldId::UNASSIGNED),
track_id,
last_position: EnuPoint { east_m: f64::from(x), north_m: f64::from(y), up_m: 0.0 },
reid_embedding_ref: None,
};
let id = self.world.upsert_node(node);
if existing.is_none() {
self.person_tracks.insert(track_id, id);
let _ = self.world.add_edge(id, room, WorldEdge::LocatedIn { since_unix_ms: 0 });
let _ = self.world.add_edge(
sensor,
id,
WorldEdge::Observes { quality: 1.0, last_seen_unix_ms: 0 },
);
}
id
}
/// ADR-139 §2.4 / ADR-141: materialise `PrivacyLimitedBy` edges for the
/// active privacy mode. Under an identity-suppressing mode, `person_track`
/// observations are denied; the rollup names what was suppressed.
pub fn apply_active_privacy_mode(&mut self) -> PrivacyRollup {
let mode = self.privacy.active_mode();
let suppress_identity = self.privacy.is_action_enforced(PrivacyAction::SuppressIdentity);
self.world.apply_privacy_mode(
&format!("{mode:?}"),
"SuppressIdentity",
move |_sensor_kind, node_kind| !(suppress_identity && node_kind == "person_track"),
)
}
/// Persist the WorldGraph as deterministic JSON (the RVF payload). Contains
/// only graph nodes/edges — **never** raw RF frames.
///
/// # Errors
/// [`WorldGraphError`] on serialisation failure.
pub fn snapshot_json(&self) -> Result<Vec<u8>, WorldGraphError> {
self.world.to_json()
}
/// Register a contributing node's geometry (ADR-138). When every frame's
/// `node_id` in a cycle has a registered geometry, the cycle runs the array
/// coordinator and folds its contradictions into the privacy decision.
pub fn register_node_geometry(&mut self, node_id: u8, x: f32, y: f32, azimuth: f32) {
self.node_geom.insert(node_id, NodeGeom { x, y, azimuth });
}
/// Ingest CIR-derived reflector sightings (ADR-143) and persist any newly
/// stable static anchors into the WorldGraph as `ObjectAnchor` nodes.
/// Returns the WorldGraph ids written this call.
pub fn ingest_reflectors(&mut self, observations: &[ReflectorObservation]) -> Vec<WorldId> {
for obs in observations {
self.slam.observe(obs);
}
let mut written = Vec::new();
for (pos, class) in self.slam.static_anchors(0.05, 1.0) {
let kind = match class {
wifi_densepose_signal::ruvsense::ReflectorClass::Wall => AnchorKind::Reflector,
wifi_densepose_signal::ruvsense::ReflectorClass::Furniture => AnchorKind::Furniture,
wifi_densepose_signal::ruvsense::ReflectorClass::Mobile => continue,
};
let id = self.world.upsert_node(WorldNode::ObjectAnchor {
id: WorldId::UNASSIGNED,
position: EnuPoint { east_m: pos[0], north_m: pos[1], up_m: pos[2] },
anchor_kind: kind,
confidence: 0.9,
});
written.push(id);
}
written
}
/// Register a room and return its WorldGraph id (the observation scope).
pub fn add_room(&mut self, area_id: &str, name: &str) -> WorldId {
self.world.upsert_node(WorldNode::Room {
@ -165,18 +286,28 @@ impl StreamingEngine {
room: WorldId,
now_ms: i64,
) -> Result<TrustedOutput, EngineError> {
// 1. Fuse + score (ADR-137).
// 1. Array coordination (ADR-138) — only when geometry is known for
// every contributing node. Its contradictions feed the privacy gate.
let directional = self.coordinate_array(node_frames);
let array_contradiction =
directional.as_ref().is_some_and(|d| !d.contradictions.is_empty());
// 2. Fuse + score (ADR-137).
let (fused, mut quality) = self.fuser.fuse_scored(node_frames, self.coherence_accept)?;
// 2. Stamp calibration provenance (ADR-135 → ADR-136 → ADR-137).
// 3. Stamp calibration provenance (ADR-135 → ADR-136 → ADR-137).
quality.calibration_id = Some(calibration);
// 3. Privacy control plane (ADR-141): demote on contradiction.
// 4. Evolution change-point (ADR-142) over per-node mean amplitude.
let change_point = self.track_evolution(node_frames, now_ms, room);
// 5. Privacy control plane (ADR-141): demote on a fusion-level OR an
// array-level contradiction (monotonic — information only removed).
let base_class = self.privacy.active_class();
let demoted = quality.forces_privacy_demotion();
let demoted = quality.forces_privacy_demotion() || array_contradiction;
let effective_class = if demoted { demote_one(base_class) } else { base_class };
// 4. Semantic state with mandatory provenance (ADR-139/140).
// 6. Semantic state with mandatory provenance (ADR-139/140).
let provenance = SemanticProvenance {
evidence: quality.evidence_refs.iter().map(|e| format!("{e:?}")).collect(),
model_version: format!("rfenc-v{}", self.model_version),
@ -196,7 +327,77 @@ impl StreamingEngine {
);
self.cycle += 1;
Ok(TrustedOutput { semantic_id, quality, effective_class, demoted, provenance })
Ok(TrustedOutput {
semantic_id,
quality,
effective_class,
demoted,
provenance,
directional,
change_point,
})
}
/// ADR-138: build per-node array inputs and coordinate, iff every frame's
/// `node_id` has a registered geometry. Returns `None` otherwise.
fn coordinate_array(&self, node_frames: &[MultiBandCsiFrame]) -> Option<DirectionalEvidence> {
if node_frames.is_empty() {
return None;
}
let mut inputs = Vec::with_capacity(node_frames.len());
for f in node_frames {
let g = self.node_geom.get(&f.node_id)?; // bail if any node lacks geometry
inputs.push(ArrayNodeInput {
node_id: u32::from(f.node_id),
position: (g.x, g.y),
azimuth: g.azimuth,
coherence: f.coherence,
clock: ClockQualityScore { offset_stdev_us: 50.0, age_us: 1_000, valid: true },
amplitude: f.channel_frames.first().map(|cf| cf.amplitude.clone()),
});
}
Some(self.array.coordinate(&inputs))
}
/// ADR-142: fold per-node mean amplitude into the evolution tracker and,
/// on a cross-link change-point, record an `Event` node in the WorldGraph.
fn track_evolution(
&mut self,
node_frames: &[MultiBandCsiFrame],
now_ms: i64,
room: WorldId,
) -> Option<(ChangePoint, WorldId)> {
let values: Vec<f64> = node_frames
.iter()
.filter_map(|f| f.channel_frames.first())
.map(|cf| {
if cf.amplitude.is_empty() {
0.0
} else {
cf.amplitude.iter().map(|&a| f64::from(a)).sum::<f64>() / cf.amplitude.len() as f64
}
})
.collect();
if values.is_empty() {
return None;
}
let n = values.len();
let tracker = self
.evolution
.get_or_insert_with(|| EvolutionTracker::new(n, 2.0, (n / 2).max(2)));
// Node count must be stable for the tracker to remain meaningful.
if tracker.n_links() != n {
return None;
}
let cp = tracker.observe_window(&values)?;
let event = self.world.upsert_node(WorldNode::Event {
id: WorldId::UNASSIGNED,
event_type: "baseline_topology_change".to_string(),
at_unix_ms: now_ms,
located_in: Some(room),
});
let _ = self.world.add_edge(event, room, WorldEdge::LocatedIn { since_unix_ms: now_ms });
Some((cp, event))
}
}
@ -300,6 +501,146 @@ mod tests {
assert_eq!(o1.quality.per_node_weights, o2.quality.per_node_weights);
}
fn node_frame_scaled(node_id: u8, ts_us: u64, n_sub: usize, scale: f32) -> MultiBandCsiFrame {
MultiBandCsiFrame {
node_id,
timestamp_us: ts_us,
channel_frames: vec![CanonicalCsiFrame {
amplitude: (0..n_sub).map(|i| scale * (1.0 + 0.1 * i as f32)).collect(),
phase: (0..n_sub).map(|i| i as f32 * 0.05).collect(),
hardware_type: HardwareType::Esp32S3,
}],
frequencies_mhz: vec![2412],
coherence: 0.9,
}
}
/// ADR-138 composed: with node geometry registered, the cycle produces
/// directional evidence (admitted nodes + weights).
#[test]
fn array_coordinator_runs_when_geometry_registered() {
use std::f32::consts::PI;
let (mut e, room) = engine();
e.register_node_geometry(0, 1.0, 0.0, 0.0);
e.register_node_geometry(1, -1.0, 0.0, PI); // opposite → good diversity
let out = e
.process_cycle(&[node_frame(0, 1000, 56), node_frame(1, 1001, 56)], CalibrationId(1), room, 1)
.unwrap();
let d = out.directional.expect("geometry registered → directional evidence");
assert_eq!(d.n_admitted, 2);
assert!((d.weights.iter().map(|(_, w)| *w).sum::<f32>() - 1.0).abs() < 1e-3);
// Well-separated, coherent nodes → no array contradiction → no demotion.
assert!(!out.demoted);
}
/// ADR-138 composed: poor geometry (near-colinear nodes) raises a
/// GeometryInsufficient contradiction that demotes privacy.
#[test]
fn array_geometry_insufficient_demotes() {
let (mut e, room) = engine();
e.register_node_geometry(0, 1.0, 0.0, 0.0);
e.register_node_geometry(1, 1.0, 0.01, 0.01); // nearly colinear → low GDI
let out = e
.process_cycle(&[node_frame(0, 1000, 56), node_frame(1, 1001, 56)], CalibrationId(1), room, 1)
.unwrap();
let d = out.directional.unwrap();
assert!(!d.contradictions.is_empty(), "insufficient geometry flagged");
assert!(out.demoted && out.effective_class == PrivacyClass::Restricted);
}
/// ADR-142 composed: a sustained baseline then a simultaneous amplitude
/// shift on both links yields a change-point + an Event node in the graph.
#[test]
fn evolution_change_point_recorded_as_event() {
let (mut e, room) = engine();
let cal = CalibrationId(1);
// Jittered baseline so each link has non-zero std (constant std=0 is undefined).
for i in 0..30u64 {
let s = if i % 2 == 0 { 0.99 } else { 1.01 };
let out = e
.process_cycle(&[node_frame_scaled(0, 1000, 56, s), node_frame_scaled(1, 1001, 56, s)], cal, room, i as i64)
.unwrap();
assert!(out.change_point.is_none(), "baseline must not trip a change-point");
}
// Large simultaneous excursion on both links → change-point.
let out = e
.process_cycle(&[node_frame_scaled(0, 1000, 56, 1.6), node_frame_scaled(1, 1001, 56, 1.6)], cal, room, 99)
.unwrap();
let (_, event_id) = out.change_point.expect("simultaneous shift → change-point");
assert!(matches!(
e.world().node(event_id),
Some(WorldNode::Event { event_type, .. }) if event_type == "baseline_topology_change"
));
}
/// ADR-143 composed: ingesting stable reflector sightings writes an
/// ObjectAnchor node into the WorldGraph.
#[test]
fn reflector_ingestion_writes_object_anchors() {
use wifi_densepose_signal::ruvsense::ReflectorObservation;
let (mut e, _room) = engine();
let day_ns = 86_400_000_000_000u64;
// 8 tight, coherent sightings spanning ~a day → a stable Wall anchor.
let obs: Vec<ReflectorObservation> = (0..8u64)
.map(|i| {
let j = if i % 2 == 0 { 0.005 } else { -0.005 };
ReflectorObservation { position: [3.0 + j, 1.0, 0.0], delay_ns: 12.0, coherence: 0.9, at_ns: i * (day_ns / 8) }
})
.collect();
let written = e.ingest_reflectors(&obs);
assert!(!written.is_empty(), "stable reflector → ObjectAnchor written");
assert!(matches!(
e.world().node(written[0]),
Some(WorldNode::ObjectAnchor { .. })
));
}
/// ADR-139 live-loop acceptance (the architecture-proving path):
/// `live_frame -> fusion_event -> worldgraph_update -> privacy_rollup ->
/// persist -> reload -> same_contents`, with NO raw RF frame persisted.
#[test]
fn live_frame_to_reload_same_contents() {
let mut e =
StreamingEngine::new(PrivacyMode::StrictNoIdentity, 1, GeoRegistration::default());
let room = e.add_room("living_room", "Living Room");
let sensor = e.add_sensor("esp32-com9", room);
// live_frame -> fusion_event -> worldgraph_update (SemanticState).
let out = e
.process_cycle(&[node_frame(0, 1000, 56), node_frame(1, 1001, 56)], CalibrationId(9), room, 100)
.unwrap();
// person track feeding.
let pt = e.update_person_track(7, 2.0, 2.0, room, sensor);
// privacy_rollup: StrictNoIdentity suppresses the person_track.
let rollup = e.apply_active_privacy_mode();
assert!(rollup.suppressed_nodes.contains(&pt), "person track suppressed");
assert!(rollup.denied_pairs.iter().any(|(_s, n)| *n == pt));
// persist.
let bytes = e.snapshot_json().unwrap();
// No raw RF frame persisted — the snapshot is graph nodes/edges only.
let json = String::from_utf8(bytes.clone()).unwrap();
assert!(!json.contains("\"amplitude\"") && !json.contains("\"data\""), "no raw RF in snapshot");
// reload.
let reloaded = WorldGraph::from_json(&bytes).unwrap();
// same_contents: node count, area resolution, the SemanticState + track,
// and an identical room-contents query before vs after reload.
assert_eq!(reloaded.node_count(), e.world().node_count());
assert_eq!(reloaded.room_for_area("living_room"), e.world().room_for_area("living_room"));
assert!(reloaded.node(out.semantic_id).is_some());
assert!(reloaded.node(pt).is_some());
let mut before = e.world().contents_of(room);
before.sort_by_key(|w| w.0);
let mut after = reloaded.contents_of(room);
after.sort_by_key(|w| w.0);
assert_eq!(before, after, "same room-contents query after reload");
// Deterministic persistence: re-serialising the reload is byte-identical.
assert_eq!(reloaded.to_json().unwrap(), bytes);
}
/// The privacy mode switch is recorded in a verifiable attestation chain
/// (ADR-141), and a stricter mode raises the emitted class.
#[test]