diff --git a/scripts/record-baseline.py b/scripts/record-baseline.py new file mode 100755 index 00000000..40ddaf69 --- /dev/null +++ b/scripts/record-baseline.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +""" +Record an empty-room baseline for the RuView sensing-server. + +ADR-103 v2 — persistent baseline override that's stable across NBVI +re-selection between server restarts. Computes baseline from the FULL +amplitude vector (all non-zero subcarriers), not from the dynamic NBVI +top-K subset. + +Usage: + 1. Operator steps out of the room. + 2. Run: scripts/record-baseline.py [--duration 90] [--server localhost] + 3. Wait for the "saved" message. Operator can come back. + 4. Restart sensing-server to pick up the new baseline. + +The script connects to the live WebSocket stream, records `duration` +seconds of per-node amplitudes, trims the first and last 15 seconds +(catches door-opening transients), then for each node finds the most +stable 30-second sub-window (lowest broadband CV) and writes per-node +full-broadband mean / median / p95 to data/baseline.json. +""" + +import argparse +import asyncio +import json +import statistics +import sys +import time +from datetime import datetime, timezone +from pathlib import Path + +try: + import websockets +except ImportError: + print("error: pip install websockets", file=sys.stderr) + sys.exit(2) + + +def full_broadband_mean(amps): + """Mean over all non-zero subcarriers (skips guard tones).""" + valid = [v for v in amps if v > 0] + return (sum(valid) / len(valid)) if valid else 0.0 + + +async def record(server: str, duration: float, port: int): + by_node: dict[int, list[tuple[float, list[float], float]]] = {} + url = f"ws://{server}:{port}/ws/sensing" + start = time.time() + print(f"connecting to {url} — recording {duration:.0f}s …", flush=True) + async with websockets.connect(url) as ws: + async for msg in ws: + d = json.loads(msg) + if d.get("type") != "sensing_update": + continue + t = time.time() - start + for n in d.get("nodes") or []: + a = n.get("amplitude") or [] + if not a: + continue + by_node.setdefault(n["node_id"], []).append((t, a, n.get("rssi_dbm", 0.0))) + if time.time() - start >= duration: + break + return by_node + + +def trim_and_clean(frames, trim_head_sec=15.0, trim_tail_sec=15.0, clean_window_sec=30.0): + """Trim head/tail transients, then scan for the cleanest sub-window.""" + if not frames: + return None + t0 = frames[0][0] + t1 = frames[-1][0] + dur = t1 - t0 + if dur < trim_head_sec + trim_tail_sec + clean_window_sec / 2: + head = dur / 6 + tail = dur / 6 + else: + head = trim_head_sec + tail = trim_tail_sec + trimmed = [f for f in frames if t0 + head <= f[0] <= t1 - tail] + if not trimmed: + return None + + win = clean_window_sec + if (trimmed[-1][0] - trimmed[0][0]) <= win: + chunk = trimmed + else: + best = None # (cv, frames) + step = 5.0 + cursor = trimmed[0][0] + while cursor + win <= trimmed[-1][0]: + window = [f for f in trimmed if cursor <= f[0] <= cursor + win] + if len(window) >= 5: + bms = [full_broadband_mean(a) for _, a, _ in window] + mu = statistics.mean(bms) + if mu > 0: + sd = statistics.pstdev(bms) + cv = sd / mu + if best is None or cv < best[0]: + best = (cv, window) + cursor += step + if best is None or not best[1]: + return None + chunk = best[1] + + # ── Compute per-node stats on the clean window ─────────────── + full_means = [full_broadband_mean(a) for _, a, _ in chunk] + rssis = [r for _, _, r in chunk if r != 0] + sorted_full = sorted(full_means) + + # Per-subcarrier mean across the clean window (for diagnostic + future + # subcarrier-level comparison if the server gets that capability). + n_sub = min(len(a) for _, a, _ in chunk) + per_sub_means = [] + for k in range(n_sub): + vs = [a[k] for _, a, _ in chunk if k < len(a) and a[k] > 0] + per_sub_means.append(statistics.mean(vs) if vs else 0.0) + + return { + # Persistent fields the server reads: + "full_broadband_mean": statistics.mean(full_means), + "full_broadband_p50": sorted_full[len(sorted_full)//2], + "full_broadband_p95": sorted_full[int(len(sorted_full)*0.95)], + "full_broadband_std": statistics.pstdev(full_means), + "full_broadband_cv_pct": 100*statistics.pstdev(full_means)/statistics.mean(full_means) + if statistics.mean(full_means) else 0.0, + # Reference: + "rssi_dbm": statistics.mean(rssis) if rssis else 0.0, + "n_samples": len(full_means), + "window_start_sec": chunk[0][0], + "window_end_sec": chunk[-1][0], + # Per-subcarrier diagnostic (kept so future server versions can do + # subcarrier-level comparison without re-recording): + "per_subcarrier_mean": [round(v, 3) for v in per_sub_means], + } + + +def main(): + ap = argparse.ArgumentParser(description=__doc__.splitlines()[1]) + ap.add_argument("--duration", type=float, default=90.0, help="seconds to record (default 90)") + ap.add_argument("--server", default="localhost", help="sensing-server host") + ap.add_argument("--port", type=int, default=8765, help="ws port (default 8765)") + ap.add_argument("--out", type=Path, default=Path("v2/data/baseline.json")) + ap.add_argument("--trim-head", type=float, default=15.0) + ap.add_argument("--trim-tail", type=float, default=15.0) + ap.add_argument("--clean-window", type=float, default=30.0) + args = ap.parse_args() + + by_node = asyncio.run(record(args.server, args.duration, args.port)) + if not by_node: + print("no data received from server", file=sys.stderr) + sys.exit(1) + + out = { + "version": 2, + "captured_at": datetime.now(timezone.utc).isoformat(timespec="seconds"), + "duration_sec": args.duration, + "trim_head_sec": args.trim_head, + "trim_tail_sec": args.trim_tail, + "clean_window_sec": args.clean_window, + "method": "record → trim head/tail → find lowest-CV sub-window → FULL-broadband stats per node", + "nodes": {}, + } + print() + for nid, frames in sorted(by_node.items()): + result = trim_and_clean(frames, args.trim_head, args.trim_tail, args.clean_window) + if not result: + print(f"node {nid}: not enough data for cleaning (skipped)") + continue + out["nodes"][str(nid)] = result + print(f"node {nid}: {len(frames)} raw frames, kept cleanest {result['n_samples']}-sample window") + print(f" FULL broadband: mean={result['full_broadband_mean']:.2f} std={result['full_broadband_std']:.2f} CV={result['full_broadband_cv_pct']:.2f}%") + print(f" full p50={result['full_broadband_p50']:.2f} p95={result['full_broadband_p95']:.2f} rssi={result['rssi_dbm']:.1f}") + + args.out.parent.mkdir(parents=True, exist_ok=True) + args.out.write_text(json.dumps(out, indent=2)) + print(f"\nsaved → {args.out}") + print("restart sensing-server to load the new baseline.") + + +if __name__ == "__main__": + main() diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index d71c562f..f7575ebd 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -207,8 +207,15 @@ const NBVI_REFRESH_TICKS: u32 = 200; const NBVI_DEAD_GATE_PCT: f64 = 0.25; struct AmpState { + /// Rolling short window of NBVI-subset broadband mean (used for CV). short: VecDeque, + /// Rolling long window of NBVI-subset broadband mean (fallback baseline + /// via p95 when no persistent override is loaded). long: VecDeque, + /// Rolling short window of FULL broadband mean across all non-zero + /// subcarriers. Used for the persistent-baseline drop comparison — + /// stable across NBVI re-selection between server restarts (ADR-103). + short_full: VecDeque, /// Rolling buffer of full per-subcarrier amplitude vectors. nbvi_history: VecDeque>, /// Indices of currently-selected best subcarriers (sorted by NBVI @@ -220,21 +227,64 @@ struct AmpState { /// Compute the top-K NBVI subcarrier indices over the provided history. /// Returns empty if the history is too short to give a stable ranking. +/// +/// ADR-102 v2: ESPectre's Step 1 quiet-window finder is now active. We +/// slide a fixed window across `history`, score each window by its +/// broadband-mean coefficient of variation, and rank subcarriers using +/// only the calmest window. This makes the selection robust to brief +/// motion that happens during the calibration buffer (someone walks by +/// during boot, dog enters room) — the noisy windows are ignored. fn nbvi_select_top_k(history: &VecDeque>, k: usize) -> Vec { if history.len() < AMP_SHORT_WIN { return Vec::new(); } let n_sub = history.front().map(|v| v.len()).unwrap_or(0); if n_sub == 0 { return Vec::new(); } - // Per-subcarrier mean and std over the buffered frames. - let n = history.len() as f64; + // ── ESPectre Step 1: pick the quietest sub-window ──────────────── + // + // Slide AMP_SHORT_WIN-sized window across history with stride + // AMP_SHORT_WIN/3 (overlapping). For each window, compute the CV + // of its broadband mean. Lowest-CV window wins. If history is small, + // use the whole thing. + let window_size = AMP_SHORT_WIN; + let stride = (window_size / 3).max(1); + let frames: Vec<&Vec> = history.iter().collect(); + let total = frames.len(); + + let quiet_slice: &[&Vec] = if total <= window_size { + &frames[..] + } else { + let mut best_start = 0usize; + let mut best_cv = f64::INFINITY; + let mut start = 0usize; + while start + window_size <= total { + let window = &frames[start..start + window_size]; + // Per-frame broadband mean (any valid subcarrier). + let bb: Vec = window.iter().map(|f| { + let mut s = 0.0; let mut c = 0; + for &v in f.iter() { if v > 0.0 { s += v; c += 1; } } + if c == 0 { 0.0 } else { s / c as f64 } + }).collect(); + let mu: f64 = bb.iter().sum::() / bb.len() as f64; + if mu > 0.0 { + let var: f64 = bb.iter().map(|x| (x - mu).powi(2)).sum::() / bb.len() as f64; + let cv = var.sqrt() / mu; + if cv < best_cv { best_cv = cv; best_start = start; } + } + start += stride; + } + &frames[best_start..best_start + window_size] + }; + + // Per-subcarrier mean and std over the quietest window only. + let n = quiet_slice.len() as f64; let mut means = vec![0.0_f64; n_sub]; let mut sums = vec![0.0_f64; n_sub]; - for frame in history { + for frame in quiet_slice.iter() { for k in 0..n_sub.min(frame.len()) { sums[k] += frame[k]; } } for k in 0..n_sub { means[k] = sums[k] / n; } let mut stds = vec![0.0_f64; n_sub]; - for frame in history { + for frame in quiet_slice.iter() { for k in 0..n_sub.min(frame.len()) { let d = frame[k] - means[k]; stds[k] += d * d; @@ -288,6 +338,64 @@ fn amp_hold_init() -> &'static Mutex<(String, u32)> { AMP_HOLD.get_or_init(|| Mutex::new(("absent".to_string(), 0))) } +/// ADR-103: persistent baseline override (per-node mean_amp value). +/// When set, `amp_presence_override` uses this instead of the rolling +/// 95th-percentile from AMP_HIST.long. Loaded from `data/baseline.json` +/// at startup so a fresh server boot doesn't require the "step out for +/// 60 s" calibration ritual. Empty map = fall back to rolling p95. +static AMP_BASELINE_OVERRIDE: OnceLock>> = OnceLock::new(); + +fn amp_baseline_override_init() -> &'static Mutex> { + AMP_BASELINE_OVERRIDE.get_or_init(|| Mutex::new(std::collections::HashMap::new())) +} + +/// Load persistent baseline from JSON file. Tolerant: missing file or +/// parse errors are non-fatal (server falls back to rolling p95). +fn load_baseline_file(path: &str) { + let s = match std::fs::read_to_string(path) { + Ok(s) => s, + Err(_) => { + info!("baseline: no file at {path} — using rolling p95 (60-s warmup)"); + return; + } + }; + let v: serde_json::Value = match serde_json::from_str(&s) { + Ok(v) => v, + Err(e) => { + warn!("baseline: parse error at {path}: {e}"); + return; + } + }; + let nodes = match v.get("nodes").and_then(|n| n.as_object()) { + Some(n) => n, + None => { warn!("baseline: no .nodes object in {path}"); return; } + }; + let mut loaded: Vec<(u8, f64)> = Vec::new(); + for (k, node) in nodes { + let id: u8 = match k.parse() { Ok(i) => i, Err(_) => continue }; + // ADR-103 v2 schema (preferred): full_broadband_p95 / full_broadband_mean + // — stable across NBVI re-selection between restarts. Falls back to + // legacy NBVI-subset p95/mean if a v1 baseline.json was loaded. + let full_p95 = node.get("full_broadband_p95").and_then(|v| v.as_f64()); + let full_mean = node.get("full_broadband_mean").and_then(|v| v.as_f64()); + let nbvi_p95 = node.get("p95_amp").and_then(|v| v.as_f64()); + let nbvi_mean = node.get("mean_amp").and_then(|v| v.as_f64()); + let baseline = [full_p95, full_mean, nbvi_p95, nbvi_mean] + .into_iter().flatten().find(|v| *v > 0.0); + let Some(b) = baseline else { continue }; + loaded.push((id, b)); + } + if loaded.is_empty() { + warn!("baseline: {path} parsed but no usable per-node entries"); + return; + } + let mut o = amp_baseline_override_init().lock().unwrap(); + for (id, b) in &loaded { o.insert(*id, *b); } + let summary: Vec = loaded.iter().map(|(id, b)| format!("node{id}={b:.2}")).collect(); + info!("baseline: loaded {} node overrides from {} ({})", + loaded.len(), path, summary.join(", ")); +} + /// Classify motion/presence for one node from the raw amplitude vector. /// /// Returns `(motion_level, presence, confidence)` where confidence is the @@ -301,11 +409,24 @@ fn amp_presence_override(node_id: u8, amplitudes: &[f64]) -> Option<(String, boo let st = map.entry(node_id).or_insert_with(|| AmpState { short: VecDeque::with_capacity(AMP_SHORT_WIN), long: VecDeque::with_capacity(AMP_LONG_WIN), + short_full: VecDeque::with_capacity(AMP_SHORT_WIN), nbvi_history: VecDeque::with_capacity(NBVI_HISTORY_LEN), nbvi_selected: Vec::new(), nbvi_ticks: 0, }); + // ADR-103 v2: compute FULL broadband mean (all non-zero subcarriers) + // for the persistent baseline drop check. Stable across NBVI + // re-selection between server restarts. NBVI subset is still used + // for CV (motion sensitivity). + let full_mean: f64 = { + let mut s = 0.0; let mut c = 0; + for &v in amplitudes.iter() { if v > 0.0 { s += v; c += 1; } } + if c == 0 { 0.0 } else { s / c as f64 } + }; + st.short_full.push_back(full_mean); + while st.short_full.len() > AMP_SHORT_WIN { st.short_full.pop_front(); } + // Push current frame into NBVI history for ranking. st.nbvi_history.push_back(amplitudes.to_vec()); while st.nbvi_history.len() > NBVI_HISTORY_LEN { st.nbvi_history.pop_front(); } @@ -352,22 +473,49 @@ fn amp_presence_override(node_id: u8, amplitudes: &[f64]) -> Option<(String, boo let var: f64 = st.short.iter().map(|x| (x - mean_short).powi(2)).sum::() / n; let cv = if mean_short > 0.0 { var.sqrt() / mean_short } else { 0.0 }; - // Baseline = 95th percentile of long window once we have ≥ 5 s of data. - // A body in the channel attenuates amplitude, so the baseline (= - // empty-room amplitude) sits at the upper end of recent history. - let baseline = if st.long.len() >= AMP_SHORT_WIN * 3 { - let mut sorted: Vec = st.long.iter().copied().collect(); - sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); - let idx = ((sorted.len() as f64) * 0.95) as usize; - Some(sorted[idx.min(sorted.len() - 1)]) + // Baseline: + // 1. Persistent override (ADR-103) loaded from data/baseline.json + // at boot. Represents the FULL-broadband mean of the empty + // room. Stable across NBVI re-selection between restarts. + // 2. Fall back to the rolling 95th percentile of the long FULL + // window when no override is present. + // + // A body in the channel attenuates amplitude, so the baseline + // (= empty-room amplitude) sits at the upper end of recent history. + let baseline = { + let ovr = amp_baseline_override_init().lock().unwrap(); + if let Some(&fixed) = ovr.get(&node_id) { + Some(fixed) + } else if st.long.len() >= AMP_SHORT_WIN * 3 { + // Rolling fallback uses NBVI-subset (long) for backwards + // compatibility with the legacy non-baseline mode. + let mut sorted: Vec = st.long.iter().copied().collect(); + sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); + let idx = ((sorted.len() as f64) * 0.95) as usize; + Some(sorted[idx.min(sorted.len() - 1)]) + } else { + None + } + }; + + // mean_for_baseline_check: when override is loaded → use FULL + // broadband (stable across NBVI changes). Otherwise use NBVI subset + // (matches the legacy rolling baseline). Same source on both sides + // of the comparison. + let use_full = { + let ovr = amp_baseline_override_init().lock().unwrap(); + ovr.contains_key(&node_id) + }; + let mean_for_baseline = if use_full && !st.short_full.is_empty() { + st.short_full.iter().sum::() / st.short_full.len() as f64 } else { - None + mean_short }; // Stash this node's contribution for cross-node fusion. { let mut latest = amp_latest_init().lock().unwrap(); - latest.insert(node_id, (cv, mean_short, baseline)); + latest.insert(node_id, (cv, mean_for_baseline, baseline)); } amp_classify_from_latest() @@ -5415,6 +5563,13 @@ async fn main() { info!("Data source: {source}"); + // ADR-103: load persistent empty-room baseline if present so the + // classifier has a meaningful baseline from the first frame + // instead of waiting ~60 s for the rolling p95 to warm up. + load_baseline_file( + &std::env::var("RUVIEW_BASELINE_FILE").unwrap_or_else(|_| "data/baseline.json".into()) + ); + // Shared state // Vital sign sample rate derives from tick interval (e.g. 500ms tick => 2 Hz) let vital_sample_rate = 1000.0 / args.tick_ms as f64;