Merge pull request #633 from ruvnet/integrate/pr-491-adaptive-person-count

Merge #491: feat(sensing-server): adaptive person count — RollingP95 + dedup_factor (integration on schwarztim's behalf)
This commit is contained in:
rUv 2026-05-19 08:26:36 -04:00 committed by GitHub
commit 961c01f4bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 305 additions and 18 deletions

View File

@ -144,6 +144,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- **README: corrected the camera-supervised pose-accuracy claim** (audit finding #5; see PR #535) — "92.9% PCK@20" → the ADR-079 target (35%+; proxy baseline 35.3%), noting P7/P8/P9 are pending.
### Added
- **`RollingP95` adaptive feature normalizer** (`v2/crates/wifi-densepose-sensing-server`) —
Streaming P95 estimator (600-sample / ~30 s sliding window) that self-calibrates
feature normalization to whatever distribution the deployment produces. Replaces
fixed-scale denominators (`variance/300`, `motion/250`, `spectral/500`) which saturated
when live ESP32 values exceeded those limits, collapsing dynamic range to zero.
Cold-start (<60 samples) falls back to the legacy denominators so day-0 behaviour
is preserved. Deployment-neutral: no hardcoded values. (ADR-044 §5.2)
- **`dedup_factor` runtime configuration API** (`v2/crates/wifi-densepose-sensing-server`) —
Exposes the multi-node person-count deduplication divisor at runtime via REST:
- `GET /api/v1/config/dedup-factor` — read current value.
- `POST /api/v1/config/dedup-factor` — set value (clamped 1.010.0, persisted).
- `POST /api/v1/config/ground-truth` — auto-tunes `dedup_factor` from a known
person count (`{"count": N}`); derives optimal divisor from current node-sum.
Config is persisted to `data/config.json` and reloaded on restart. (ADR-044 §5.3)
- **`nvsim` crate — deterministic NV-diamond magnetometer pipeline simulator** (ADR-089) —
New standalone leaf crate at `v2/crates/nvsim` modeling a forward-only
magnetic sensing path: scene → source synthesis (BiotSavart, dipole,

View File

@ -555,6 +555,93 @@ fn build_node_features(
Some(entries)
}
// ── ADR-044 §5.2: Rolling P95 adaptive feature normalizer ────────────────────
/// Streaming P95 estimator over a fixed-size sliding window.
///
/// Self-calibrates feature normalization to whatever distribution the deployment
/// produces — no hardcoded scale values that can saturate in large rooms or
/// degrade in high-interference environments.
///
/// O(n log n) per query via sorted copy — acceptable at 20 Hz with window=600.
/// Cold-start (len < min_samples) returns `None` so the caller uses the legacy
/// fixed denominator, preserving day-0 behaviour.
pub struct RollingP95 {
buf: std::collections::VecDeque<f64>,
window: usize,
min_samples: usize,
}
impl RollingP95 {
pub fn new(window: usize, min_samples: usize) -> Self {
Self {
buf: std::collections::VecDeque::with_capacity(window),
window,
min_samples,
}
}
pub fn push(&mut self, v: f64) {
if self.buf.len() == self.window {
self.buf.pop_front();
}
self.buf.push_back(v);
}
/// Returns `Some(p95)` once enough samples have accumulated, else `None`.
pub fn current(&self) -> Option<f64> {
if self.buf.len() < self.min_samples {
return None;
}
let mut sorted: Vec<f64> = self.buf.iter().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = ((sorted.len() as f64) * 0.95).ceil() as usize;
Some(sorted[idx.saturating_sub(1).min(sorted.len() - 1)])
}
#[allow(dead_code)]
pub fn len(&self) -> usize {
self.buf.len()
}
}
// ── ADR-044 §5.3: Runtime config persistence ─────────────────────────────────
/// Runtime configuration that persists across server restarts via `data/config.json`.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct RuntimeConfig {
/// Divisor for multi-node person-count deduplication (sum / factor).
pub dedup_factor: f64,
}
impl Default for RuntimeConfig {
fn default() -> Self {
Self { dedup_factor: 3.0 }
}
}
/// Load persisted runtime config from `<data_dir>/config.json`.
/// Falls back to [`RuntimeConfig::default`] if the file is absent or malformed.
pub(crate) fn load_runtime_config(data_dir: &std::path::Path) -> RuntimeConfig {
let path = data_dir.join("config.json");
match std::fs::read_to_string(&path) {
Ok(json) => serde_json::from_str(&json).unwrap_or_default(),
Err(_) => RuntimeConfig::default(),
}
}
/// Persist runtime config to `<data_dir>/config.json`.
pub(crate) fn save_runtime_config(data_dir: &std::path::Path, config: &RuntimeConfig) {
let path = data_dir.join("config.json");
if let Ok(json) = serde_json::to_string_pretty(config) {
if let Err(e) = std::fs::write(&path, json) {
warn!("Failed to save runtime config to {}: {e}", path.display());
} else {
info!("Runtime config saved to {}", path.display());
}
}
}
/// Shared application state
struct AppStateInner {
latest_update: Option<SensingUpdate>,
@ -662,6 +749,21 @@ struct AppStateInner {
multistatic_fuser: MultistaticFuser,
/// SVD-based room field model for eigenvalue person counting (None until calibration).
field_model: Option<FieldModel>,
// ── ADR-044 §5.2: adaptive rolling-p95 normalization ─────────────────────
/// Rolling P95 of `FeatureInfo.variance` over the last ~30 s (600 frames @ 20 Hz).
pub(crate) p95_variance: RollingP95,
/// Rolling P95 of `FeatureInfo.motion_band_power` over the last ~30 s.
pub(crate) p95_motion_band_power: RollingP95,
/// Rolling P95 of `FeatureInfo.spectral_power` over the last ~30 s.
pub(crate) p95_spectral_power: RollingP95,
// ── ADR-044 §5.3: runtime-configurable dedup factor ───────────────────────
/// Divisor for multi-node person-count deduplication (sum / factor).
/// Default 3.0 (one body visible to ~3 nodes on average).
/// Configurable at runtime via `POST /api/v1/config/dedup-factor` and
/// `POST /api/v1/config/ground-truth`. Persisted across restarts.
pub(crate) dedup_factor: f64,
/// Data directory for persisting runtime config (parent of `firmware_dir`).
pub(crate) data_dir: std::path::PathBuf,
}
/// If no ESP32 frame arrives within this duration, source reverts to offline.
@ -1748,8 +1850,13 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) {
let feat_variance = features.variance;
// ADR-044 §5.2: feed raw features into rolling-P95 estimators before scoring.
s.p95_variance.push(features.variance);
s.p95_motion_band_power.push(features.motion_band_power);
s.p95_spectral_power.push(features.spectral_power);
// Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
let raw_score = compute_person_score(&*s, &features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
let count = s.person_count();
@ -1887,8 +1994,13 @@ async fn windows_wifi_fallback_tick(state: &SharedState, seq: u32) {
let feat_variance = features.variance;
// ADR-044 §5.2: feed raw features into rolling-P95 estimators before scoring.
s.p95_variance.push(features.variance);
s.p95_motion_band_power.push(features.motion_band_power);
s.p95_spectral_power.push(features.spectral_power);
// Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
let raw_score = compute_person_score(&*s, &features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
let count = s.person_count();
@ -2350,13 +2462,19 @@ fn fuse_multi_node_features(
///
/// Returns a raw score (0.0..1.0) that the caller converts to person count
/// after temporal smoothing.
fn compute_person_score(feat: &FeatureInfo) -> f64 {
// Normalize each feature to [0, 1] using ranges calibrated from real
// ESP32 hardware (COM6/COM9 on ruv.net, March 2026).
let var_norm = (feat.variance / 300.0).clamp(0.0, 1.0);
fn compute_person_score(state: &AppStateInner, feat: &FeatureInfo) -> f64 {
// ADR-044 §5.2: adaptive rolling-P95 normalization.
// Legacy fixed denominators (variance/300, motion/250, spectral/500) saturate
// when live ESP32 values exceed those limits — zero dynamic range results.
// Use the P95 of the last ~30 s of history instead, falling back to the legacy
// denominators during cold-start (<60 samples) to preserve day-0 behaviour.
let var_denom = state.p95_variance.current().map(|p| p.max(50.0)).unwrap_or(300.0);
let motion_denom = state.p95_motion_band_power.current().map(|p| p.max(50.0)).unwrap_or(250.0);
let sp_denom = state.p95_spectral_power.current().map(|p| p.max(100.0)).unwrap_or(500.0);
let var_norm = (feat.variance / var_denom).clamp(0.0, 1.0);
let cp_norm = (feat.change_points as f64 / 30.0).clamp(0.0, 1.0);
let motion_norm = (feat.motion_band_power / 250.0).clamp(0.0, 1.0);
let sp_norm = (feat.spectral_power / 500.0).clamp(0.0, 1.0);
let motion_norm = (feat.motion_band_power / motion_denom).clamp(0.0, 1.0);
let sp_norm = (feat.spectral_power / sp_denom).clamp(0.0, 1.0);
var_norm * 0.40 + cp_norm * 0.20 + motion_norm * 0.25 + sp_norm * 0.15
}
@ -3805,8 +3923,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
// Aggregate person count: gate on presence first (matching WiFi path).
let now = std::time::Instant::now();
let total_persons = if vitals.presence {
let dedup = s.dedup_factor;
let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback(
&s.multistatic_fuser, &s.node_states,
&s.multistatic_fuser, &s.node_states, dedup,
);
match fused {
Some(ref f) => {
@ -4091,8 +4210,9 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
// Aggregate person count: gate on presence first (matching WiFi path).
let now = std::time::Instant::now();
let total_persons = if classification.presence {
let dedup = s.dedup_factor;
let (fused, fallback_count) = multistatic_bridge::fuse_or_fallback(
&s.multistatic_fuser, &s.node_states,
&s.multistatic_fuser, &s.node_states, dedup,
);
match fused {
Some(ref f) => {
@ -4244,8 +4364,13 @@ async fn simulated_data_task(state: SharedState, tick_ms: u64) {
let frame_amplitudes = frame.amplitudes.clone();
let frame_n_sub = frame.n_subcarriers;
// ADR-044 §5.2: feed raw features into rolling-P95 estimators before scoring.
s.p95_variance.push(features.variance);
s.p95_motion_band_power.push(features.motion_band_power);
s.p95_spectral_power.push(features.spectral_power);
// Multi-person estimation with temporal smoothing (EMA α=0.10).
let raw_score = compute_person_score(&features);
let raw_score = compute_person_score(&*s, &features);
s.smoothed_person_score = s.smoothed_person_score * 0.90 + raw_score * 0.10;
let est_persons = if classification.presence {
let count = s.person_count();
@ -4915,6 +5040,11 @@ async fn main() {
let initial_recordings = scan_recording_files();
info!("Discovered {} model files, {} recording files", initial_models.len(), initial_recordings.len());
// ADR-044 §5.3: load persisted runtime config from the data directory.
let data_dir = std::path::PathBuf::from("data");
let runtime_config = load_runtime_config(&data_dir);
info!("Loaded runtime config: dedup_factor={:.2}", runtime_config.dedup_factor);
let (tx, _) = broadcast::channel::<String>(256);
// ADR-099: parallel broadcast for the per-frame introspection snapshot stream
// consumed by `/ws/introspection`. Same ring size as `tx` (256) — slow
@ -4996,6 +5126,13 @@ async fn main() {
} else {
None
},
// ADR-044 §5.2: rolling-P95 over ~30 s at 20 Hz; warm-up after 60 samples.
p95_variance: RollingP95::new(600, 60),
p95_motion_band_power: RollingP95::new(600, 60),
p95_spectral_power: RollingP95::new(600, 60),
// ADR-044 §5.3: runtime-configurable dedup factor (persisted).
dedup_factor: runtime_config.dedup_factor,
data_dir: data_dir.clone(),
}));
// Start background tasks based on source
@ -5147,6 +5284,9 @@ async fn main() {
.route("/api/v1/calibration/start", post(calibration_start))
.route("/api/v1/calibration/stop", post(calibration_stop))
.route("/api/v1/calibration/status", get(calibration_status))
// ADR-044 §5.3: runtime-configurable dedup factor
.route("/api/v1/config/dedup-factor", get(config_get_dedup_factor).post(config_set_dedup_factor))
.route("/api/v1/config/ground-truth", post(config_set_ground_truth))
// Static UI files
.nest_service("/ui", ServeDir::new(&ui_path))
.layer(SetResponseHeaderLayer::overriding(
@ -5272,3 +5412,131 @@ mod novelty_tests {
assert!(ns.last_novelty_score.is_some());
}
}
// ── ADR-044 §5.3: dedup_factor runtime configuration endpoints ────────────────
/// `GET /api/v1/config/dedup-factor` — read the current dedup factor.
async fn config_get_dedup_factor(
State(state): State<SharedState>,
) -> Json<serde_json::Value> {
let s = state.read().await;
Json(serde_json::json!({
"dedup_factor": s.dedup_factor,
"description": "Divisor for multi-node person count deduplication (sum / factor). Range: 1.010.0."
}))
}
/// `POST /api/v1/config/dedup-factor` — set the dedup factor (clamped 1.010.0).
///
/// Body: `{ "value": <f64> }`
async fn config_set_dedup_factor(
State(state): State<SharedState>,
Json(body): Json<serde_json::Value>,
) -> Json<serde_json::Value> {
let value = body.get("value").and_then(|v| v.as_f64()).unwrap_or(3.0);
let clamped = value.clamp(1.0, 10.0);
let mut s = state.write().await;
s.dedup_factor = clamped;
let data_dir = s.data_dir.clone();
drop(s);
save_runtime_config(&data_dir, &RuntimeConfig { dedup_factor: clamped });
Json(serde_json::json!({
"status": "ok",
"dedup_factor": clamped,
}))
}
/// `POST /api/v1/config/ground-truth` — auto-tune dedup factor from a known person count.
///
/// Derives `dedup_factor = raw_node_sum / ground_truth_count` from the current
/// per-node person counts, clamped to [1.0, 10.0]. Persisted immediately.
///
/// Body: `{ "count": <u64> }`
async fn config_set_ground_truth(
State(state): State<SharedState>,
Json(body): Json<serde_json::Value>,
) -> Json<serde_json::Value> {
let ground_truth = match body.get("count").and_then(|v| v.as_u64()) {
Some(n) if n > 0 => n as usize,
_ => return Json(serde_json::json!({"error": "count must be a positive integer"})),
};
let mut s = state.write().await;
let raw_sum: usize = s.node_states.values()
.filter(|ns| ns.last_frame_time
.map(|t| t.elapsed() < std::time::Duration::from_secs(10))
.unwrap_or(false))
.map(|ns| ns.prev_person_count)
.sum();
let optimal = if raw_sum > 0 {
(raw_sum as f64) / (ground_truth as f64)
} else {
3.0
};
let clamped = optimal.clamp(1.0, 10.0);
s.dedup_factor = clamped;
let data_dir = s.data_dir.clone();
drop(s);
save_runtime_config(&data_dir, &RuntimeConfig { dedup_factor: clamped });
Json(serde_json::json!({
"status": "ok",
"ground_truth": ground_truth,
"raw_sum": raw_sum,
"computed_dedup_factor": clamped,
}))
}
// ── Unit tests: RollingP95 ─────────────────────────────────────────────────────
#[cfg(test)]
mod rolling_p95_tests {
use super::RollingP95;
#[test]
fn cold_start_returns_none() {
let p = RollingP95::new(100, 10);
assert!(p.current().is_none(), "empty buffer must return None");
}
#[test]
fn below_min_samples_returns_none() {
let mut p = RollingP95::new(100, 10);
for i in 1..=9 {
p.push(i as f64);
}
assert!(p.current().is_none(), "fewer than min_samples must return None");
}
#[test]
fn p95_of_ramp_is_near_95() {
let mut p = RollingP95::new(100, 10);
for i in 1..=100 {
p.push(i as f64);
}
let p95 = p.current().expect("should have value after 100 samples");
assert!(
p95 >= 94.0 && p95 <= 96.0,
"P95 of 1..=100 should be ~95, got {p95}"
);
}
#[test]
fn window_slides_evicts_oldest() {
let mut p = RollingP95::new(5, 3);
// Push 1..=5, then 100 — oldest (1) is evicted.
for i in 1..=5 {
p.push(i as f64);
}
p.push(100.0); // evicts 1; buf = [2, 3, 4, 5, 100]
let p95 = p.current().expect("6 pushes, window=5 → 5 samples");
// P95 of [2,3,4,5,100]: idx = ceil(5*0.95)=5 → sorted[4]=100
assert_eq!(p95, 100.0, "largest value should dominate p95 after eviction");
}
#[test]
fn len_reports_buffer_size() {
let mut p = RollingP95::new(10, 5);
assert_eq!(p.len(), 0);
p.push(1.0);
assert_eq!(p.len(), 1);
}
}

View File

@ -97,6 +97,7 @@ pub fn node_frames_from_states(node_states: &HashMap<u8, NodeState>) -> Vec<Mult
pub fn fuse_or_fallback(
fuser: &MultistaticFuser,
node_states: &HashMap<u8, NodeState>,
dedup_factor: f64,
) -> (Option<FusedSensingFrame>, Option<usize>) {
let frames = node_frames_from_states(node_states);
if frames.is_empty() {
@ -109,9 +110,11 @@ pub fn fuse_or_fallback(
(Some(fused), None)
}
Err(e) => {
tracing::debug!("Multistatic fusion failed ({e}), using per-node max fallback");
// Use max (not sum) to avoid double-counting when nodes have overlapping coverage.
let max_count: usize = node_states
tracing::debug!("Multistatic fusion failed ({e}), using per-node sum/dedup fallback");
// Sum per-node counts then divide by dedup_factor (assumed average
// visibility per body across nodes). ADR-044 §5.1.
// dedup_factor is runtime-configurable; default 3.0.
let total: usize = node_states
.values()
.filter(|ns| {
ns.last_frame_time
@ -119,9 +122,9 @@ pub fn fuse_or_fallback(
.unwrap_or(false)
})
.map(|ns| ns.prev_person_count)
.max()
.unwrap_or(0);
(None, Some(max_count))
.sum();
let estimated = ((total as f64) / dedup_factor).ceil() as usize;
(None, Some(estimated))
}
}
}
@ -257,7 +260,7 @@ mod tests {
fn test_fuse_or_fallback_empty() {
let fuser = MultistaticFuser::new();
let states: HashMap<u8, NodeState> = HashMap::new();
let (fused, count) = fuse_or_fallback(&fuser, &states);
let (fused, count) = fuse_or_fallback(&fuser, &states, 3.0);
assert!(fused.is_none());
assert_eq!(count, Some(0));
}