feat(adr-103 v2): stable persistent baseline + NBVI quiet-window finder

Problem from ADR-103 v1: persisted NBVI-subset mean (19.86 in operator's
recording) drifted out of comparability after server restart because
NBVI re-selected a different top-12 subset, yielding a different mean
from the same channel. classifier saw current/baseline ratio > 1 even
in clearly empty room.

Fix:

1. Separate FULL-broadband mean (all non-zero subcarriers) from
   NBVI-subset mean in amp_presence_override. NBVI subset still drives
   CV / motion sensitivity. FULL is what gets compared to the
   persistent baseline — stable across NBVI re-selection.

2. baseline.json schema v2: full_broadband_{mean,p50,p95,std,cv_pct}
   replaces NBVI-only p95_amp/mean_amp. Loader prefers full_*; falls
   back to legacy fields for backward compat.

3. NBVI Step 1 quiet-window finder (ESPectre): nbvi_select_top_k now
   slides a window across the calibration history, picks the lowest-CV
   sub-window, and ranks subcarriers using only that. Robust to brief
   motion during the calibration buffer.

4. scripts/record-baseline.py v2: emits v2 schema, computes
   full-broadband stats per node, trims head/tail transients, picks
   cleanest 30-s sub-window, also saves per_subcarrier_mean for future
   subcarrier-level comparison.

Operator workflow now: step out → run script → restart server →
forget about the empty-room ritual forever.
This commit is contained in:
arsen 2026-05-17 10:11:24 +07:00
parent 764388c0bf
commit f411992435
2 changed files with 350 additions and 14 deletions

181
scripts/record-baseline.py Executable file
View File

@ -0,0 +1,181 @@
#!/usr/bin/env python3
"""
Record an empty-room baseline for the RuView sensing-server.
ADR-103 v2 persistent baseline override that's stable across NBVI
re-selection between server restarts. Computes baseline from the FULL
amplitude vector (all non-zero subcarriers), not from the dynamic NBVI
top-K subset.
Usage:
1. Operator steps out of the room.
2. Run: scripts/record-baseline.py [--duration 90] [--server localhost]
3. Wait for the "saved" message. Operator can come back.
4. Restart sensing-server to pick up the new baseline.
The script connects to the live WebSocket stream, records `duration`
seconds of per-node amplitudes, trims the first and last 15 seconds
(catches door-opening transients), then for each node finds the most
stable 30-second sub-window (lowest broadband CV) and writes per-node
full-broadband mean / median / p95 to data/baseline.json.
"""
import argparse
import asyncio
import json
import statistics
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
try:
import websockets
except ImportError:
print("error: pip install websockets", file=sys.stderr)
sys.exit(2)
def full_broadband_mean(amps):
"""Mean over all non-zero subcarriers (skips guard tones)."""
valid = [v for v in amps if v > 0]
return (sum(valid) / len(valid)) if valid else 0.0
async def record(server: str, duration: float, port: int):
by_node: dict[int, list[tuple[float, list[float], float]]] = {}
url = f"ws://{server}:{port}/ws/sensing"
start = time.time()
print(f"connecting to {url} — recording {duration:.0f}s …", flush=True)
async with websockets.connect(url) as ws:
async for msg in ws:
d = json.loads(msg)
if d.get("type") != "sensing_update":
continue
t = time.time() - start
for n in d.get("nodes") or []:
a = n.get("amplitude") or []
if not a:
continue
by_node.setdefault(n["node_id"], []).append((t, a, n.get("rssi_dbm", 0.0)))
if time.time() - start >= duration:
break
return by_node
def trim_and_clean(frames, trim_head_sec=15.0, trim_tail_sec=15.0, clean_window_sec=30.0):
"""Trim head/tail transients, then scan for the cleanest sub-window."""
if not frames:
return None
t0 = frames[0][0]
t1 = frames[-1][0]
dur = t1 - t0
if dur < trim_head_sec + trim_tail_sec + clean_window_sec / 2:
head = dur / 6
tail = dur / 6
else:
head = trim_head_sec
tail = trim_tail_sec
trimmed = [f for f in frames if t0 + head <= f[0] <= t1 - tail]
if not trimmed:
return None
win = clean_window_sec
if (trimmed[-1][0] - trimmed[0][0]) <= win:
chunk = trimmed
else:
best = None # (cv, frames)
step = 5.0
cursor = trimmed[0][0]
while cursor + win <= trimmed[-1][0]:
window = [f for f in trimmed if cursor <= f[0] <= cursor + win]
if len(window) >= 5:
bms = [full_broadband_mean(a) for _, a, _ in window]
mu = statistics.mean(bms)
if mu > 0:
sd = statistics.pstdev(bms)
cv = sd / mu
if best is None or cv < best[0]:
best = (cv, window)
cursor += step
if best is None or not best[1]:
return None
chunk = best[1]
# ── Compute per-node stats on the clean window ───────────────
full_means = [full_broadband_mean(a) for _, a, _ in chunk]
rssis = [r for _, _, r in chunk if r != 0]
sorted_full = sorted(full_means)
# Per-subcarrier mean across the clean window (for diagnostic + future
# subcarrier-level comparison if the server gets that capability).
n_sub = min(len(a) for _, a, _ in chunk)
per_sub_means = []
for k in range(n_sub):
vs = [a[k] for _, a, _ in chunk if k < len(a) and a[k] > 0]
per_sub_means.append(statistics.mean(vs) if vs else 0.0)
return {
# Persistent fields the server reads:
"full_broadband_mean": statistics.mean(full_means),
"full_broadband_p50": sorted_full[len(sorted_full)//2],
"full_broadband_p95": sorted_full[int(len(sorted_full)*0.95)],
"full_broadband_std": statistics.pstdev(full_means),
"full_broadband_cv_pct": 100*statistics.pstdev(full_means)/statistics.mean(full_means)
if statistics.mean(full_means) else 0.0,
# Reference:
"rssi_dbm": statistics.mean(rssis) if rssis else 0.0,
"n_samples": len(full_means),
"window_start_sec": chunk[0][0],
"window_end_sec": chunk[-1][0],
# Per-subcarrier diagnostic (kept so future server versions can do
# subcarrier-level comparison without re-recording):
"per_subcarrier_mean": [round(v, 3) for v in per_sub_means],
}
def main():
ap = argparse.ArgumentParser(description=__doc__.splitlines()[1])
ap.add_argument("--duration", type=float, default=90.0, help="seconds to record (default 90)")
ap.add_argument("--server", default="localhost", help="sensing-server host")
ap.add_argument("--port", type=int, default=8765, help="ws port (default 8765)")
ap.add_argument("--out", type=Path, default=Path("v2/data/baseline.json"))
ap.add_argument("--trim-head", type=float, default=15.0)
ap.add_argument("--trim-tail", type=float, default=15.0)
ap.add_argument("--clean-window", type=float, default=30.0)
args = ap.parse_args()
by_node = asyncio.run(record(args.server, args.duration, args.port))
if not by_node:
print("no data received from server", file=sys.stderr)
sys.exit(1)
out = {
"version": 2,
"captured_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
"duration_sec": args.duration,
"trim_head_sec": args.trim_head,
"trim_tail_sec": args.trim_tail,
"clean_window_sec": args.clean_window,
"method": "record → trim head/tail → find lowest-CV sub-window → FULL-broadband stats per node",
"nodes": {},
}
print()
for nid, frames in sorted(by_node.items()):
result = trim_and_clean(frames, args.trim_head, args.trim_tail, args.clean_window)
if not result:
print(f"node {nid}: not enough data for cleaning (skipped)")
continue
out["nodes"][str(nid)] = result
print(f"node {nid}: {len(frames)} raw frames, kept cleanest {result['n_samples']}-sample window")
print(f" FULL broadband: mean={result['full_broadband_mean']:.2f} std={result['full_broadband_std']:.2f} CV={result['full_broadband_cv_pct']:.2f}%")
print(f" full p50={result['full_broadband_p50']:.2f} p95={result['full_broadband_p95']:.2f} rssi={result['rssi_dbm']:.1f}")
args.out.parent.mkdir(parents=True, exist_ok=True)
args.out.write_text(json.dumps(out, indent=2))
print(f"\nsaved → {args.out}")
print("restart sensing-server to load the new baseline.")
if __name__ == "__main__":
main()

View File

@ -207,8 +207,15 @@ const NBVI_REFRESH_TICKS: u32 = 200;
const NBVI_DEAD_GATE_PCT: f64 = 0.25;
struct AmpState {
/// Rolling short window of NBVI-subset broadband mean (used for CV).
short: VecDeque<f64>,
/// Rolling long window of NBVI-subset broadband mean (fallback baseline
/// via p95 when no persistent override is loaded).
long: VecDeque<f64>,
/// Rolling short window of FULL broadband mean across all non-zero
/// subcarriers. Used for the persistent-baseline drop comparison —
/// stable across NBVI re-selection between server restarts (ADR-103).
short_full: VecDeque<f64>,
/// Rolling buffer of full per-subcarrier amplitude vectors.
nbvi_history: VecDeque<Vec<f64>>,
/// Indices of currently-selected best subcarriers (sorted by NBVI
@ -220,21 +227,64 @@ struct AmpState {
/// Compute the top-K NBVI subcarrier indices over the provided history.
/// Returns empty if the history is too short to give a stable ranking.
///
/// ADR-102 v2: ESPectre's Step 1 quiet-window finder is now active. We
/// slide a fixed window across `history`, score each window by its
/// broadband-mean coefficient of variation, and rank subcarriers using
/// only the calmest window. This makes the selection robust to brief
/// motion that happens during the calibration buffer (someone walks by
/// during boot, dog enters room) — the noisy windows are ignored.
fn nbvi_select_top_k(history: &VecDeque<Vec<f64>>, k: usize) -> Vec<usize> {
if history.len() < AMP_SHORT_WIN { return Vec::new(); }
let n_sub = history.front().map(|v| v.len()).unwrap_or(0);
if n_sub == 0 { return Vec::new(); }
// Per-subcarrier mean and std over the buffered frames.
let n = history.len() as f64;
// ── ESPectre Step 1: pick the quietest sub-window ────────────────
//
// Slide AMP_SHORT_WIN-sized window across history with stride
// AMP_SHORT_WIN/3 (overlapping). For each window, compute the CV
// of its broadband mean. Lowest-CV window wins. If history is small,
// use the whole thing.
let window_size = AMP_SHORT_WIN;
let stride = (window_size / 3).max(1);
let frames: Vec<&Vec<f64>> = history.iter().collect();
let total = frames.len();
let quiet_slice: &[&Vec<f64>] = if total <= window_size {
&frames[..]
} else {
let mut best_start = 0usize;
let mut best_cv = f64::INFINITY;
let mut start = 0usize;
while start + window_size <= total {
let window = &frames[start..start + window_size];
// Per-frame broadband mean (any valid subcarrier).
let bb: Vec<f64> = window.iter().map(|f| {
let mut s = 0.0; let mut c = 0;
for &v in f.iter() { if v > 0.0 { s += v; c += 1; } }
if c == 0 { 0.0 } else { s / c as f64 }
}).collect();
let mu: f64 = bb.iter().sum::<f64>() / bb.len() as f64;
if mu > 0.0 {
let var: f64 = bb.iter().map(|x| (x - mu).powi(2)).sum::<f64>() / bb.len() as f64;
let cv = var.sqrt() / mu;
if cv < best_cv { best_cv = cv; best_start = start; }
}
start += stride;
}
&frames[best_start..best_start + window_size]
};
// Per-subcarrier mean and std over the quietest window only.
let n = quiet_slice.len() as f64;
let mut means = vec![0.0_f64; n_sub];
let mut sums = vec![0.0_f64; n_sub];
for frame in history {
for frame in quiet_slice.iter() {
for k in 0..n_sub.min(frame.len()) { sums[k] += frame[k]; }
}
for k in 0..n_sub { means[k] = sums[k] / n; }
let mut stds = vec![0.0_f64; n_sub];
for frame in history {
for frame in quiet_slice.iter() {
for k in 0..n_sub.min(frame.len()) {
let d = frame[k] - means[k];
stds[k] += d * d;
@ -288,6 +338,64 @@ fn amp_hold_init() -> &'static Mutex<(String, u32)> {
AMP_HOLD.get_or_init(|| Mutex::new(("absent".to_string(), 0)))
}
/// ADR-103: persistent baseline override (per-node mean_amp value).
/// When set, `amp_presence_override` uses this instead of the rolling
/// 95th-percentile from AMP_HIST.long. Loaded from `data/baseline.json`
/// at startup so a fresh server boot doesn't require the "step out for
/// 60 s" calibration ritual. Empty map = fall back to rolling p95.
static AMP_BASELINE_OVERRIDE: OnceLock<Mutex<std::collections::HashMap<u8, f64>>> = OnceLock::new();
fn amp_baseline_override_init() -> &'static Mutex<std::collections::HashMap<u8, f64>> {
AMP_BASELINE_OVERRIDE.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
}
/// Load persistent baseline from JSON file. Tolerant: missing file or
/// parse errors are non-fatal (server falls back to rolling p95).
fn load_baseline_file(path: &str) {
let s = match std::fs::read_to_string(path) {
Ok(s) => s,
Err(_) => {
info!("baseline: no file at {path} — using rolling p95 (60-s warmup)");
return;
}
};
let v: serde_json::Value = match serde_json::from_str(&s) {
Ok(v) => v,
Err(e) => {
warn!("baseline: parse error at {path}: {e}");
return;
}
};
let nodes = match v.get("nodes").and_then(|n| n.as_object()) {
Some(n) => n,
None => { warn!("baseline: no .nodes object in {path}"); return; }
};
let mut loaded: Vec<(u8, f64)> = Vec::new();
for (k, node) in nodes {
let id: u8 = match k.parse() { Ok(i) => i, Err(_) => continue };
// ADR-103 v2 schema (preferred): full_broadband_p95 / full_broadband_mean
// — stable across NBVI re-selection between restarts. Falls back to
// legacy NBVI-subset p95/mean if a v1 baseline.json was loaded.
let full_p95 = node.get("full_broadband_p95").and_then(|v| v.as_f64());
let full_mean = node.get("full_broadband_mean").and_then(|v| v.as_f64());
let nbvi_p95 = node.get("p95_amp").and_then(|v| v.as_f64());
let nbvi_mean = node.get("mean_amp").and_then(|v| v.as_f64());
let baseline = [full_p95, full_mean, nbvi_p95, nbvi_mean]
.into_iter().flatten().find(|v| *v > 0.0);
let Some(b) = baseline else { continue };
loaded.push((id, b));
}
if loaded.is_empty() {
warn!("baseline: {path} parsed but no usable per-node entries");
return;
}
let mut o = amp_baseline_override_init().lock().unwrap();
for (id, b) in &loaded { o.insert(*id, *b); }
let summary: Vec<String> = loaded.iter().map(|(id, b)| format!("node{id}={b:.2}")).collect();
info!("baseline: loaded {} node overrides from {} ({})",
loaded.len(), path, summary.join(", "));
}
/// Classify motion/presence for one node from the raw amplitude vector.
///
/// Returns `(motion_level, presence, confidence)` where confidence is the
@ -301,11 +409,24 @@ fn amp_presence_override(node_id: u8, amplitudes: &[f64]) -> Option<(String, boo
let st = map.entry(node_id).or_insert_with(|| AmpState {
short: VecDeque::with_capacity(AMP_SHORT_WIN),
long: VecDeque::with_capacity(AMP_LONG_WIN),
short_full: VecDeque::with_capacity(AMP_SHORT_WIN),
nbvi_history: VecDeque::with_capacity(NBVI_HISTORY_LEN),
nbvi_selected: Vec::new(),
nbvi_ticks: 0,
});
// ADR-103 v2: compute FULL broadband mean (all non-zero subcarriers)
// for the persistent baseline drop check. Stable across NBVI
// re-selection between server restarts. NBVI subset is still used
// for CV (motion sensitivity).
let full_mean: f64 = {
let mut s = 0.0; let mut c = 0;
for &v in amplitudes.iter() { if v > 0.0 { s += v; c += 1; } }
if c == 0 { 0.0 } else { s / c as f64 }
};
st.short_full.push_back(full_mean);
while st.short_full.len() > AMP_SHORT_WIN { st.short_full.pop_front(); }
// Push current frame into NBVI history for ranking.
st.nbvi_history.push_back(amplitudes.to_vec());
while st.nbvi_history.len() > NBVI_HISTORY_LEN { st.nbvi_history.pop_front(); }
@ -352,22 +473,49 @@ fn amp_presence_override(node_id: u8, amplitudes: &[f64]) -> Option<(String, boo
let var: f64 = st.short.iter().map(|x| (x - mean_short).powi(2)).sum::<f64>() / n;
let cv = if mean_short > 0.0 { var.sqrt() / mean_short } else { 0.0 };
// Baseline = 95th percentile of long window once we have ≥ 5 s of data.
// A body in the channel attenuates amplitude, so the baseline (=
// empty-room amplitude) sits at the upper end of recent history.
let baseline = if st.long.len() >= AMP_SHORT_WIN * 3 {
let mut sorted: Vec<f64> = st.long.iter().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = ((sorted.len() as f64) * 0.95) as usize;
Some(sorted[idx.min(sorted.len() - 1)])
// Baseline:
// 1. Persistent override (ADR-103) loaded from data/baseline.json
// at boot. Represents the FULL-broadband mean of the empty
// room. Stable across NBVI re-selection between restarts.
// 2. Fall back to the rolling 95th percentile of the long FULL
// window when no override is present.
//
// A body in the channel attenuates amplitude, so the baseline
// (= empty-room amplitude) sits at the upper end of recent history.
let baseline = {
let ovr = amp_baseline_override_init().lock().unwrap();
if let Some(&fixed) = ovr.get(&node_id) {
Some(fixed)
} else if st.long.len() >= AMP_SHORT_WIN * 3 {
// Rolling fallback uses NBVI-subset (long) for backwards
// compatibility with the legacy non-baseline mode.
let mut sorted: Vec<f64> = st.long.iter().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let idx = ((sorted.len() as f64) * 0.95) as usize;
Some(sorted[idx.min(sorted.len() - 1)])
} else {
None
}
};
// mean_for_baseline_check: when override is loaded → use FULL
// broadband (stable across NBVI changes). Otherwise use NBVI subset
// (matches the legacy rolling baseline). Same source on both sides
// of the comparison.
let use_full = {
let ovr = amp_baseline_override_init().lock().unwrap();
ovr.contains_key(&node_id)
};
let mean_for_baseline = if use_full && !st.short_full.is_empty() {
st.short_full.iter().sum::<f64>() / st.short_full.len() as f64
} else {
None
mean_short
};
// Stash this node's contribution for cross-node fusion.
{
let mut latest = amp_latest_init().lock().unwrap();
latest.insert(node_id, (cv, mean_short, baseline));
latest.insert(node_id, (cv, mean_for_baseline, baseline));
}
amp_classify_from_latest()
@ -5415,6 +5563,13 @@ async fn main() {
info!("Data source: {source}");
// ADR-103: load persistent empty-room baseline if present so the
// classifier has a meaningful baseline from the first frame
// instead of waiting ~60 s for the rolling p95 to warm up.
load_baseline_file(
&std::env::var("RUVIEW_BASELINE_FILE").unwrap_or_else(|_| "data/baseline.json".into())
);
// Shared state
// Vital sign sample rate derives from tick interval (e.g. 500ms tick => 2 Hz)
let vital_sample_rate = 1000.0 / args.tick_ms as f64;