From fc7674bde90f41fe5f9d7a09e6b7fa7947ac2772 Mon Sep 17 00:00:00 2001 From: ruv Date: Thu, 28 May 2026 23:09:06 -0400 Subject: [PATCH] feat(signal,ruvector): ADR-138 LinkGroup/ArrayCoordinator clock-quality gating (#842) - ruvector viewpoint/coherence.rs: ClockQualityScore, ClockQualityGate, ClockGateDecision (Admit/MonitorOnly/Reject), ClockRejectReason. 200us floor, 9s staleness ceiling per ADR-110. - signal ruvsense/array_coordinator.rs: ArrayCoordinator domain service + DirectionalEvidence. Gates nodes, computes GDI + Cramer-Rao credence, builds attention weights (real node_attention_weights when amplitudes present, else clock-quality softmax), emits CoherenceDrop + GeometryInsufficient flags. - Cycle resolution: ArrayCoordinator lives in signal (depends on ruvector), not ruvector, so it can emit ADR-137 canonical ContradictionFlag. Documented. - 8 tests (5 coordinator + 3 clock gate); workspace 0 errors. Co-Authored-By: claude-flow --- .../src/viewpoint/coherence.rs | 141 +++++++ .../src/viewpoint/mod.rs | 5 +- .../src/ruvsense/array_coordinator.rs | 343 ++++++++++++++++++ .../wifi-densepose-signal/src/ruvsense/mod.rs | 6 + 4 files changed, 494 insertions(+), 1 deletion(-) create mode 100644 v2/crates/wifi-densepose-signal/src/ruvsense/array_coordinator.rs diff --git a/v2/crates/wifi-densepose-ruvector/src/viewpoint/coherence.rs b/v2/crates/wifi-densepose-ruvector/src/viewpoint/coherence.rs index 14b375aa..1c05bdbf 100644 --- a/v2/crates/wifi-densepose-ruvector/src/viewpoint/coherence.rs +++ b/v2/crates/wifi-densepose-ruvector/src/viewpoint/coherence.rs @@ -212,6 +212,109 @@ impl CoherenceGate { } } +// --------------------------------------------------------------------------- +// ADR-138 — Clock-quality gate (coherence × clock dispersion/age) +// --------------------------------------------------------------------------- + +/// Per-node clock-synchronisation quality (ADR-138 §2.2), derived from the +/// ADR-110 802.15.4 time-sync follower offset statistics. +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct ClockQualityScore { + /// EMA-smoothed follower offset standard deviation (µs). ADR-110 measured + /// ~104 µs against the ±100 µs target on COM9↔COM12. + pub offset_stdev_us: f32, + /// Age of the most recent sync packet (µs). The sensing server enforces a + /// 9 s staleness ceiling. + pub age_us: u64, + /// Whether a valid sync has ever been observed for this node. + pub valid: bool, +} + +impl ClockQualityScore { + /// Scalar clock quality in `[0, 1]` (1 = perfectly synced). Reaches 0 at + /// `5 × max_offset_stdev_us`; used to bias directional attention weights. + #[must_use] + pub fn quality(&self, max_offset_stdev_us: f32) -> f32 { + if !self.valid || max_offset_stdev_us <= 0.0 { + return 0.0; + } + (1.0 - self.offset_stdev_us / (5.0 * max_offset_stdev_us)).clamp(0.0, 1.0) + } +} + +/// Why a node failed the clock-quality gate hard. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ClockRejectReason { + /// Phase coherence below the gate threshold. + Incoherent, + /// Sync packet older than the staleness ceiling. + ClockStale, + /// Offset dispersion far beyond the floor (≥ 5× the monitor threshold). + ClockDispersed, + /// No valid sync ever observed for this node. + ClockInvalid, +} + +/// One node's gate decision for one sensing cycle (ADR-138 §2.2). +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ClockGateDecision { + /// Both terms pass: node admitted at full weight. + Admit, + /// Phase OK but clock degraded: evidence-only, NO environment/model update. + MonitorOnly { clock_quality: f32 }, + /// Either term fails hard: node excluded this cycle. + Reject { reason: ClockRejectReason }, +} + +/// Clock-quality gate: combines the phase [`CoherenceGate`] with clock +/// dispersion and age terms (ADR-138 §2.2). +#[derive(Debug, Clone)] +pub struct ClockQualityGate { + /// Phase-coherence gate (threshold + hysteresis). + pub coherence: CoherenceGate, + /// Offset-stdev floor (µs): at or above ⇒ `MonitorOnly`. Default 200.0. + pub max_offset_stdev_us: f32, + /// Sync-age ceiling (µs): above ⇒ hard reject. Default 9_000_000. + pub max_age_us: u64, +} + +impl ClockQualityGate { + /// Construct from a phase gate and the two clock thresholds. + pub fn new(coherence: CoherenceGate, max_offset_stdev_us: f32, max_age_us: u64) -> Self { + Self { coherence, max_offset_stdev_us, max_age_us } + } + + /// Defaults: phase gate 0.7/0.05, 200 µs floor, 9 s staleness ceiling. + pub fn default_params() -> Self { + Self::new(CoherenceGate::default_params(), 200.0, 9_000_000) + } + + /// Evaluate both terms for one node this cycle. `coherence_value` is the + /// rolling phasor coherence ([`CoherenceState::coherence`]). + pub fn evaluate(&mut self, coherence_value: f32, clock: &ClockQualityScore) -> ClockGateDecision { + if !clock.valid { + return ClockGateDecision::Reject { reason: ClockRejectReason::ClockInvalid }; + } + if clock.age_us > self.max_age_us { + return ClockGateDecision::Reject { reason: ClockRejectReason::ClockStale }; + } + if clock.offset_stdev_us >= 5.0 * self.max_offset_stdev_us { + return ClockGateDecision::Reject { reason: ClockRejectReason::ClockDispersed }; + } + // Phase term (hysteretic). Clock-degraded but coherent ⇒ MonitorOnly. + if !self.coherence.evaluate(coherence_value) { + return ClockGateDecision::Reject { reason: ClockRejectReason::Incoherent }; + } + if clock.offset_stdev_us >= self.max_offset_stdev_us { + ClockGateDecision::MonitorOnly { + clock_quality: clock.quality(self.max_offset_stdev_us), + } + } else { + ClockGateDecision::Admit + } + } +} + /// Stateless coherence gate function matching the ADR-031 specification. /// /// Computes the complex mean of unit phasors from the given phase differences @@ -388,4 +491,42 @@ mod tests { } assert_eq!(state.len(), 5, "count should be capped at window size"); } + + // ===== ADR-138 clock-quality gate ===== + + #[test] + fn clock_gate_invalid_rejected() { + let mut g = ClockQualityGate::default_params(); + let c = ClockQualityScore { offset_stdev_us: 10.0, age_us: 0, valid: false }; + assert_eq!( + g.evaluate(0.9, &c), + ClockGateDecision::Reject { reason: ClockRejectReason::ClockInvalid } + ); + } + + #[test] + fn clock_gate_dispersed_rejected() { + let mut g = ClockQualityGate::default_params(); // floor 200 → 5× = 1000 µs + let c = ClockQualityScore { offset_stdev_us: 1500.0, age_us: 0, valid: true }; + assert_eq!( + g.evaluate(0.9, &c), + ClockGateDecision::Reject { reason: ClockRejectReason::ClockDispersed } + ); + } + + #[test] + fn clock_gate_admit_and_monitor_and_quality() { + let mut g = ClockQualityGate::default_params(); + let good = ClockQualityScore { offset_stdev_us: 50.0, age_us: 0, valid: true }; + assert_eq!(g.evaluate(0.9, &good), ClockGateDecision::Admit); + // quality: 1 - 50/(5*200) = 0.95 + assert!((good.quality(200.0) - 0.95).abs() < 1e-4); + + let mut g2 = ClockQualityGate::default_params(); + let degraded = ClockQualityScore { offset_stdev_us: 250.0, age_us: 0, valid: true }; + assert!(matches!( + g2.evaluate(0.9, °raded), + ClockGateDecision::MonitorOnly { .. } + )); + } } diff --git a/v2/crates/wifi-densepose-ruvector/src/viewpoint/mod.rs b/v2/crates/wifi-densepose-ruvector/src/viewpoint/mod.rs index 76c934cb..0bbeed1a 100644 --- a/v2/crates/wifi-densepose-ruvector/src/viewpoint/mod.rs +++ b/v2/crates/wifi-densepose-ruvector/src/viewpoint/mod.rs @@ -22,6 +22,9 @@ pub mod geometry; // Re-export primary types at the module root for ergonomic imports. pub use attention::{CrossViewpointAttention, GeometricBias}; -pub use coherence::{CoherenceGate, CoherenceState}; +pub use coherence::{ + ClockGateDecision, ClockQualityGate, ClockQualityScore, ClockRejectReason, CoherenceGate, + CoherenceState, +}; pub use fusion::{FusedEmbedding, FusionConfig, MultistaticArray, ViewpointEmbedding}; pub use geometry::{CramerRaoBound, GeometricDiversityIndex}; diff --git a/v2/crates/wifi-densepose-signal/src/ruvsense/array_coordinator.rs b/v2/crates/wifi-densepose-signal/src/ruvsense/array_coordinator.rs new file mode 100644 index 00000000..2fbcfe05 --- /dev/null +++ b/v2/crates/wifi-densepose-signal/src/ruvsense/array_coordinator.rs @@ -0,0 +1,343 @@ +//! ADR-138 — `ArrayCoordinator`: a stateless-per-call domain service that gates +//! array nodes on geometry and clock quality and projects *directional evidence* +//! (not pose decisions). +//! +//! # Crate placement (deviation from ADR-138 §2.3, deliberate) +//! +//! ADR-138 placed `ArrayCoordinator` in `wifi-densepose-ruvector` +//! (`viewpoint/fusion.rs`). But `wifi-densepose-signal` already **depends on** +//! `wifi-densepose-ruvector`, and the coordinator must emit the canonical +//! [`ContradictionFlag`](super::fusion_quality::ContradictionFlag) owned by +//! ADR-137 (in this crate). Placing it in ruvector would create a dependency +//! cycle. It therefore lives here in `wifi-densepose-signal`, which can see both +//! ruvector's geometry/coherence types and ADR-137's `ContradictionFlag`. The +//! `ClockQualityGate` (which needs no `ContradictionFlag`) stays in ruvector per +//! the ADR. + +use wifi_densepose_ruvector::viewpoint::coherence::{ + ClockGateDecision, ClockQualityGate, ClockQualityScore, +}; +use wifi_densepose_ruvector::viewpoint::geometry::{ + CramerRaoBound, GeometricDiversityIndex, NodeId, ViewpointPosition, +}; + +use super::fusion_quality::ContradictionFlag; +use super::multistatic::node_attention_weights; + +/// One node's contribution to the array for a single sensing cycle. +#[derive(Debug, Clone)] +pub struct ArrayNodeInput { + /// Stable node identifier. + pub node_id: NodeId, + /// Node position (x, y) in metres (deployment geometry). + pub position: (f32, f32), + /// Azimuth (radians) of the node from the array centroid. + pub azimuth: f32, + /// Rolling phasor coherence for this node (`CoherenceState::coherence()`). + pub coherence: f32, + /// Clock-sync quality (ADR-110 follower offset stats). + pub clock: ClockQualityScore, + /// Optional per-node amplitude vector; when present across nodes, the + /// directional weights use the real fusion attention (ADR-137 + /// `node_attention_weights`) instead of the clock-only fallback. + pub amplitude: Option>, +} + +/// Directional evidence: what the array can resolve right now and how much to +/// trust each direction (ADR-138 §2.3). NOT a pose decision. +#[derive(Debug, Clone)] +pub struct DirectionalEvidence { + /// Per-admitted-viewpoint attention weight (sums to ~1.0 over admitted). + pub weights: Vec<(NodeId, f32)>, + /// Geometric Diversity Index over admitted nodes. `None` when < 2 admitted. + /// + /// (ADR-138 §2.3 typed this non-optional; made `Option` here because GDI is + /// undefined for < 2 viewpoints and a sentinel would be misleading.) + pub gdi: Option, + /// Cramér-Rao RMSE lower bound (m) for a centroid target. `None` when + /// < 3 admitted viewpoints (under-determined). + pub credence_rmse_m: Option, + /// Per-node gate decisions — the audit trail. + pub gate_decisions: Vec<(NodeId, ClockGateDecision)>, + /// Contradiction flags forwarded to the ADR-137 fusion-quality machinery. + pub contradictions: Vec, + /// Viewpoints admitted at full weight. + pub n_admitted: usize, + /// Viewpoints admitted MonitorOnly (evidence-only, no environment update). + pub n_monitoring: usize, +} + +/// Configuration for [`ArrayCoordinator`]. +#[derive(Debug, Clone)] +pub struct ArrayCoordinatorConfig { + /// Per-node clock+coherence gate (cloned per node so hysteresis state does + /// not leak across nodes within a cycle). + pub gate: ClockQualityGate, + /// σ multiple defining a cross-sectional coherence-drop contradiction. + pub contradiction_sigma: f32, + /// Per-measurement noise std (m) for the Cramér-Rao credence estimate. + pub crb_noise_std_m: f32, + /// Attention temperature for the directional weight softmax. + pub attention_temperature: f32, +} + +impl Default for ArrayCoordinatorConfig { + fn default() -> Self { + Self { + gate: ClockQualityGate::default_params(), + contradiction_sigma: 2.0, + crb_noise_std_m: 0.1, + attention_temperature: 1.0, + } + } +} + +/// Stateless-per-call domain service (ADR-138 §2.3). +#[derive(Debug, Clone)] +pub struct ArrayCoordinator { + config: ArrayCoordinatorConfig, +} + +impl ArrayCoordinator { + /// Create a coordinator with the given configuration. + pub fn new(config: ArrayCoordinatorConfig) -> Self { + Self { config } + } + + /// Gate the nodes on clock+coherence, then over the admitted set compute + /// GDI, Cramér-Rao credence, and attention weights, collecting contradiction + /// flags (cross-sectional coherence drops + geometry insufficiency). + pub fn coordinate(&self, nodes: &[ArrayNodeInput]) -> DirectionalEvidence { + // 1. Per-node clock+coherence gate (fresh gate per node). + let mut gate_decisions = Vec::with_capacity(nodes.len()); + for n in nodes { + let mut gate = self.config.gate.clone(); + gate_decisions.push((n.node_id, gate.evaluate(n.coherence, &n.clock))); + } + + // Admitted = full-weight; monitoring = evidence-only. + let admitted_idx: Vec = (0..nodes.len()) + .filter(|&i| matches!(gate_decisions[i].1, ClockGateDecision::Admit)) + .collect(); + let monitoring_idx: Vec = (0..nodes.len()) + .filter(|&i| matches!(gate_decisions[i].1, ClockGateDecision::MonitorOnly { .. })) + .collect(); + let evidence_idx: Vec = + admitted_idx.iter().chain(monitoring_idx.iter()).copied().collect(); + + let mut contradictions = Vec::new(); + + // 2. Cross-sectional coherence-drop contradictions over the evidence set. + if evidence_idx.len() >= 3 { + let cohs: Vec = evidence_idx.iter().map(|&i| nodes[i].coherence).collect(); + let mean = cohs.iter().sum::() / cohs.len() as f32; + let var = cohs.iter().map(|c| (c - mean).powi(2)).sum::() / cohs.len() as f32; + let std = var.sqrt(); + if std > 1e-6 { + for &i in &evidence_idx { + let sigma = (mean - nodes[i].coherence) / std; + if sigma > self.config.contradiction_sigma { + contradictions.push(ContradictionFlag::CoherenceDrop { node_idx: i, sigma }); + } + } + } + } + + // 3. GDI over admitted nodes. + let gdi = if admitted_idx.len() >= 2 { + let azimuths: Vec = admitted_idx.iter().map(|&i| nodes[i].azimuth).collect(); + let ids: Vec = admitted_idx.iter().map(|&i| nodes[i].node_id).collect(); + GeometricDiversityIndex::compute(&azimuths, &ids) + } else { + None + }; + if let Some(ref g) = gdi { + if !g.is_sufficient() { + contradictions.push(ContradictionFlag::GeometryInsufficient { gdi: g.value }); + } + } + + // 4. Cramér-Rao credence for a centroid target over admitted nodes. + let credence_rmse_m = if admitted_idx.len() >= 3 { + let vps: Vec = admitted_idx + .iter() + .map(|&i| ViewpointPosition { + x: nodes[i].position.0, + y: nodes[i].position.1, + noise_std: self.config.crb_noise_std_m, + }) + .collect(); + let cx = vps.iter().map(|v| v.x).sum::() / vps.len() as f32; + let cy = vps.iter().map(|v| v.y).sum::() / vps.len() as f32; + CramerRaoBound::estimate((cx, cy), &vps).map(|crb| crb.rmse_lower_bound) + } else { + None + }; + + // 5. Attention weights over admitted nodes. + let weights = self.admitted_weights(nodes, &admitted_idx); + + DirectionalEvidence { + weights, + gdi, + credence_rmse_m, + gate_decisions, + contradictions, + n_admitted: admitted_idx.len(), + n_monitoring: monitoring_idx.len(), + } + } + + /// Directional weights over the admitted set. When every admitted node has + /// an amplitude vector of equal length, reuse the ADR-137 fusion attention + /// (`node_attention_weights`); otherwise fall back to a clock-quality + /// softmax so well-clocked nodes weigh more. + fn admitted_weights( + &self, + nodes: &[ArrayNodeInput], + admitted_idx: &[usize], + ) -> Vec<(NodeId, f32)> { + if admitted_idx.is_empty() { + return Vec::new(); + } + // Try the real fusion-attention path when amplitudes are present + uniform. + let amps: Option> = admitted_idx + .iter() + .map(|&i| nodes[i].amplitude.as_deref()) + .collect(); + if let Some(amps) = amps { + let len0 = amps.first().map(|a| a.len()).unwrap_or(0); + if len0 > 0 && amps.iter().all(|a| a.len() == len0) { + let w = node_attention_weights(&s, self.config.attention_temperature); + return admitted_idx.iter().map(|&i| nodes[i].node_id).zip(w).collect(); + } + } + + // Clock-quality softmax fallback. + let max_floor = self.config.gate.max_offset_stdev_us; + let logits: Vec = admitted_idx + .iter() + .map(|&i| nodes[i].clock.quality(max_floor) / self.config.attention_temperature) + .collect(); + let max_logit = logits.iter().cloned().fold(f32::NEG_INFINITY, f32::max); + let exps: Vec = logits.iter().map(|l| (l - max_logit).exp()).collect(); + let sum: f32 = exps.iter().sum::().max(1e-12); + admitted_idx + .iter() + .zip(exps) + .map(|(&i, e)| (nodes[i].node_id, e / sum)) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn clock(stdev: f32, age_us: u64) -> ClockQualityScore { + ClockQualityScore { offset_stdev_us: stdev, age_us, valid: true } + } + + fn node(id: NodeId, x: f32, y: f32, az: f32, coh: f32, stdev: f32) -> ArrayNodeInput { + ArrayNodeInput { + node_id: id, + position: (x, y), + azimuth: az, + coherence: coh, + clock: clock(stdev, 1000), + amplitude: None, + } + } + + /// 4 well-placed, well-clocked, coherent nodes → all admitted, weights sum + /// to 1, credence available, no contradictions. + #[test] + fn ac_four_good_nodes_all_admitted() { + use std::f32::consts::PI; + let coord = ArrayCoordinator::new(ArrayCoordinatorConfig::default()); + let nodes = vec![ + node(0, 1.0, 0.0, 0.0, 0.9, 50.0), + node(1, 0.0, 1.0, PI / 2.0, 0.9, 50.0), + node(2, -1.0, 0.0, PI, 0.9, 50.0), + node(3, 0.0, -1.0, 3.0 * PI / 2.0, 0.9, 50.0), + ]; + let ev = coord.coordinate(&nodes); + assert_eq!(ev.n_admitted, 4); + assert_eq!(ev.n_monitoring, 0); + assert!((ev.weights.iter().map(|(_, w)| *w).sum::() - 1.0).abs() < 1e-4); + assert!(ev.credence_rmse_m.is_some()); + assert!(ev.gdi.is_some() && ev.gdi.as_ref().unwrap().is_sufficient()); + assert!(ev.contradictions.is_empty()); + } + + /// A clock-degraded node (offset ≥ 200 µs floor) is MonitorOnly: evidence + /// yes, not counted as admitted. + #[test] + fn ac_clock_degraded_node_is_monitor_only() { + use std::f32::consts::PI; + let coord = ArrayCoordinator::new(ArrayCoordinatorConfig::default()); + let mut nodes = vec![ + node(0, 1.0, 0.0, 0.0, 0.9, 50.0), + node(1, 0.0, 1.0, PI / 2.0, 0.9, 50.0), + node(2, -1.0, 0.0, PI, 0.9, 50.0), + ]; + nodes[2].clock = clock(250.0, 1000); // above 200 µs floor, below 1000 µs hard + let ev = coord.coordinate(&nodes); + assert_eq!(ev.n_admitted, 2); + assert_eq!(ev.n_monitoring, 1); + assert!(matches!( + ev.gate_decisions[2].1, + ClockGateDecision::MonitorOnly { .. } + )); + } + + /// A stale node (age > 9 s) is hard-rejected. + #[test] + fn ac_stale_node_rejected() { + let coord = ArrayCoordinator::new(ArrayCoordinatorConfig::default()); + let mut n0 = node(0, 1.0, 0.0, 0.0, 0.9, 50.0); + n0.clock = clock(50.0, 10_000_000); // 10 s > 9 s ceiling + let ev = coord.coordinate(&[n0]); + assert_eq!(ev.n_admitted, 0); + assert!(matches!( + ev.gate_decisions[0].1, + ClockGateDecision::Reject { + reason: wifi_densepose_ruvector::viewpoint::coherence::ClockRejectReason::ClockStale + } + )); + } + + /// An incoherent node (coherence below the phase gate) is rejected. + #[test] + fn ac_incoherent_node_rejected() { + let coord = ArrayCoordinator::new(ArrayCoordinatorConfig::default()); + let n0 = node(0, 1.0, 0.0, 0.0, 0.2, 50.0); // 0.2 < 0.7 gate + let ev = coord.coordinate(&[n0]); + assert_eq!(ev.n_admitted, 0); + } + + /// A cross-sectional coherence outlier raises a `CoherenceDrop` flag. + /// + /// Uses 6 nodes: with a single outlier among N equal values the outlier's + /// z-score is exactly √(N-1), so N≥6 is required to exceed the default 2σ + /// threshold (√5≈2.24). This is an inherent property of cross-sectional + /// outlier detection, not a tuning artefact. + #[test] + fn ac_coherence_outlier_flagged() { + use std::f32::consts::PI; + let coord = ArrayCoordinator::new(ArrayCoordinatorConfig::default()); + let nodes: Vec = (0..6) + .map(|i| { + let az = i as f32 * PI / 3.0; + // Node 5 is the low-coherence outlier (still above the 0.7 gate). + let coh = if i == 5 { 0.71 } else { 0.95 }; + node(i, az.cos(), az.sin(), az, coh, 50.0) + }) + .collect(); + let ev = coord.coordinate(&nodes); + assert!(ev + .contradictions + .iter() + .any(|c| matches!(c, ContradictionFlag::CoherenceDrop { node_idx: 5, .. }))); + } +} diff --git a/v2/crates/wifi-densepose-signal/src/ruvsense/mod.rs b/v2/crates/wifi-densepose-signal/src/ruvsense/mod.rs index 888bea7e..f6c3c13f 100644 --- a/v2/crates/wifi-densepose-signal/src/ruvsense/mod.rs +++ b/v2/crates/wifi-densepose-signal/src/ruvsense/mod.rs @@ -61,12 +61,18 @@ pub mod cir; // ADR-137: Fusion-engine quality scoring (evidence + contradiction flags) pub mod fusion_quality; +// ADR-138: Array coordinator — clock-quality gating + directional evidence +pub mod array_coordinator; + // ADR-135: Empty-room baseline calibration (Welford online, circular phase) pub mod calibration; // Re-export core types for ergonomic access pub use coherence::CoherenceState; pub use coherence_gate::{GateDecision, GatePolicy}; +pub use array_coordinator::{ + ArrayCoordinator, ArrayCoordinatorConfig, ArrayNodeInput, DirectionalEvidence, +}; pub use fusion_quality::{ CalibrationId, ContradictionFlag, EvidenceRef, FamilyId, QualityScore, };