From 47dafab42d3bfacf4ed234e14ab7780f20790415 Mon Sep 17 00:00:00 2001 From: arsen Date: Sun, 17 May 2026 16:44:21 +0700 Subject: [PATCH] feat(adr-104): phase-domain drift channel (script + server) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit scripts/record-baseline.py and capture_baseline_to_disk now compute per-subcarrier circular mean + variance of phases when the WS stream carries them (ADR-106). Saved as per_subcarrier_phase_mean + per_subcarrier_phase_var in baseline.json. Server loads them into PHASE_BASELINE_PER_SUB; phase_drift_update computes a per-tick score (mean circular distance / π over subcarriers with baseline variance < 0.30) and stores it in PHASE_DRIFT. Surfaces as PerNodeFeatureInfo.phase_drift_score (skip-if-none). Honesty contract: emits None below PHASE_DRIFT_MIN_USABLE = 16 usable subcarriers. Legacy baselines without phase fields fall back to amplitude-only behaviour with no change. Co-Authored-By: claude-flow --- scripts/record-baseline.py | 74 ++++++- .../wifi-densepose-sensing-server/src/main.rs | 200 +++++++++++++++++- 2 files changed, 258 insertions(+), 16 deletions(-) diff --git a/scripts/record-baseline.py b/scripts/record-baseline.py index 40ddaf69..39b94cb9 100755 --- a/scripts/record-baseline.py +++ b/scripts/record-baseline.py @@ -23,6 +23,7 @@ full-broadband mean / median / p95 to data/baseline.json. import argparse import asyncio import json +import math import statistics import sys import time @@ -42,8 +43,31 @@ def full_broadband_mean(amps): return (sum(valid) / len(valid)) if valid else 0.0 +def circular_mean_var(phases): + """ADR-104 phase-domain: circular mean (radians) and circular variance + (1 - |R|, in [0, 1]) over a list of unwrapped/atan2 phase samples. + + Variance close to 0 = phases tightly clustered (stable subcarrier, + suitable for baseline-comparison). Close to 1 = phases scattered + (subcarrier is noisy; baseline reference unreliable). + """ + n = len(phases) + if n == 0: + return (0.0, 1.0) + sx = sum(math.sin(p) for p in phases) / n + cx = sum(math.cos(p) for p in phases) / n + r = math.sqrt(sx * sx + cx * cx) + mean = math.atan2(sx, cx) + var = 1.0 - r + return (mean, var) + + async def record(server: str, duration: float, port: int): - by_node: dict[int, list[tuple[float, list[float], float]]] = {} + # Per-node frame log: (t_sec, amps, phases, rssi). + # ADR-104 phase-domain: phases captured alongside amplitudes when the + # WS payload carries `phases` (ADR-106 full complex CSI). Missing or + # empty phase vectors → trim_and_clean writes only amplitude baseline. + by_node: dict[int, list[tuple[float, list[float], list[float], float]]] = {} url = f"ws://{server}:{port}/ws/sensing" start = time.time() print(f"connecting to {url} — recording {duration:.0f}s …", flush=True) @@ -57,14 +81,23 @@ async def record(server: str, duration: float, port: int): a = n.get("amplitude") or [] if not a: continue - by_node.setdefault(n["node_id"], []).append((t, a, n.get("rssi_dbm", 0.0))) + ph = n.get("phases") or [] + by_node.setdefault(n["node_id"], []).append( + (t, a, ph, n.get("rssi_dbm", 0.0)) + ) if time.time() - start >= duration: break return by_node def trim_and_clean(frames, trim_head_sec=15.0, trim_tail_sec=15.0, clean_window_sec=30.0): - """Trim head/tail transients, then scan for the cleanest sub-window.""" + """Trim head/tail transients, then scan for the cleanest sub-window. + + `frames` is a list of (t_sec, amps, phases, rssi). `phases` may be an + empty list when the server hasn't been upgraded to emit them — in + that case the resulting baseline omits the phase-domain fields and + the server falls back to amplitude-only drift (ADR-104 baseline mode). + """ if not frames: return None t0 = frames[0][0] @@ -104,18 +137,39 @@ def trim_and_clean(frames, trim_head_sec=15.0, trim_tail_sec=15.0, clean_window_ # ── Compute per-node stats on the clean window ─────────────── full_means = [full_broadband_mean(a) for _, a, _ in chunk] - rssis = [r for _, _, r in chunk if r != 0] + rssis = [r for _, _, _, r in chunk if r != 0] sorted_full = sorted(full_means) # Per-subcarrier mean across the clean window (for diagnostic + future # subcarrier-level comparison if the server gets that capability). - n_sub = min(len(a) for _, a, _ in chunk) + n_sub = min(len(a) for _, a, _, _ in chunk) per_sub_means = [] for k in range(n_sub): - vs = [a[k] for _, a, _ in chunk if k < len(a) and a[k] > 0] + vs = [a[k] for _, a, _, _ in chunk if k < len(a) and a[k] > 0] per_sub_means.append(statistics.mean(vs) if vs else 0.0) - return { + # ADR-104 phase-domain: per-subcarrier circular mean + variance of the + # captured phase samples. Only included if the WS stream carried + # phases — server tolerates either schema. + have_phases = any(ph for _, _, ph, _ in chunk) + per_sub_phase_means: list[float] = [] + per_sub_phase_vars: list[float] = [] + if have_phases: + n_phase_sub = min( + (len(ph) for _, _, ph, _ in chunk if ph), + default=0, + ) + for k in range(n_phase_sub): + samples = [ph[k] for _, _, ph, _ in chunk if k < len(ph)] + if not samples: + per_sub_phase_means.append(0.0) + per_sub_phase_vars.append(1.0) + continue + mean, var = circular_mean_var(samples) + per_sub_phase_means.append(mean) + per_sub_phase_vars.append(var) + + result = { # Persistent fields the server reads: "full_broadband_mean": statistics.mean(full_means), "full_broadband_p50": sorted_full[len(sorted_full)//2], @@ -132,6 +186,12 @@ def trim_and_clean(frames, trim_head_sec=15.0, trim_tail_sec=15.0, clean_window_ # subcarrier-level comparison without re-recording): "per_subcarrier_mean": [round(v, 3) for v in per_sub_means], } + if per_sub_phase_means: + # Rounding: 4 decimals on mean phase (radian), 3 on variance + # — phase variance is in [0,1] so 3 decimals is plenty. + result["per_subcarrier_phase_mean"] = [round(v, 4) for v in per_sub_phase_means] + result["per_subcarrier_phase_var"] = [round(v, 3) for v in per_sub_phase_vars] + return result def main(): diff --git a/v2/crates/wifi-densepose-sensing-server/src/main.rs b/v2/crates/wifi-densepose-sensing-server/src/main.rs index b453d4d3..44a9cba9 100644 --- a/v2/crates/wifi-densepose-sensing-server/src/main.rs +++ b/v2/crates/wifi-densepose-sensing-server/src/main.rs @@ -413,6 +413,36 @@ fn amp_baseline_per_sub_init() -> &'static Mutex, Vec)>>> = OnceLock::new(); +fn phase_baseline_per_sub_init() + -> &'static Mutex, Vec)>> +{ + PHASE_BASELINE_PER_SUB.get_or_init(|| Mutex::new(std::collections::HashMap::new())) +} + +/// ADR-104 phase-domain: per-node "phase drift" score in `[0, 1]`, +/// updated each tick. 0 = current phases match the baseline; 1 = π +/// rad away (maximally far on the unit circle). Computed only when a +/// phase baseline exposes ≥ 16 usable subcarriers (var < threshold). +static PHASE_DRIFT: OnceLock>> = OnceLock::new(); +fn phase_drift_init() -> &'static Mutex> { + PHASE_DRIFT.get_or_init(|| Mutex::new(std::collections::HashMap::new())) +} +/// Discard subcarriers whose baseline phase variance exceeds this. +/// 0.30 corresponds to mean resultant length R ≈ 0.70 — phases were +/// reasonably clustered during baseline capture. Tunable, conservative. +const PHASE_BASELINE_VAR_MAX: f64 = 0.30; +/// Minimum usable subcarriers required to emit a phase drift score. +/// Below this the score is too noisy to trust and we return None. +const PHASE_DRIFT_MIN_USABLE: usize = 16; + /// ADR-104: per-node "spectral drift" score = mean |Δ amp / baseline| /// across subcarriers, computed against AMP_BASELINE_PER_SUB. Updated /// every classifier tick; read by amp_node_level / amp_classify_from_latest @@ -506,6 +536,21 @@ fn load_baseline_file(path: &str) { o.insert(id, vec); } } + // ADR-104 phase-domain: load per-subcarrier circular mean + + // variance vectors. Optional; only present when the recorder + // captured complex CSI (ADR-106). Lengths must match — if they + // don't we drop the phase baseline rather than silently mixing + // bad data into the drift score. + let p_mean = node.get("per_subcarrier_phase_mean").and_then(|v| v.as_array()); + let p_var = node.get("per_subcarrier_phase_var").and_then(|v| v.as_array()); + if let (Some(m), Some(v)) = (p_mean, p_var) { + let means: Vec = m.iter().filter_map(|x| x.as_f64()).collect(); + let vars: Vec = v.iter().filter_map(|x| x.as_f64()).collect(); + if means.len() == vars.len() && means.len() >= 16 { + let mut o = phase_baseline_per_sub_init().lock().unwrap(); + o.insert(id, (means, vars)); + } + } } if loaded.is_empty() { warn!("baseline: {path} parsed but no usable per-node entries"); @@ -533,6 +578,48 @@ fn load_baseline_file(path: &str) { if cv_summary.is_empty() { "no CV normalization".to_string() } else { cv_summary.join(", ") }); } +/// ADR-104 phase-domain: update PHASE_DRIFT for a node from the +/// current per-subcarrier phases. Compares current phase to baseline +/// using circular distance, averaged over subcarriers whose baseline +/// variance is below `PHASE_BASELINE_VAR_MAX` (unstable subcarriers +/// would dominate with noise). Output is normalised to `[0, 1]` +/// where 0 = phases match baseline exactly and 1 = π rad apart. +/// +/// No-op if a phase baseline isn't loaded for this node, or if fewer +/// than `PHASE_DRIFT_MIN_USABLE` subcarriers pass the variance gate. +/// Honesty contract: better to surface no score than a noisy one. +fn phase_drift_update(node_id: u8, phases: &[f64]) { + if phases.is_empty() { + return; + } + let base = phase_baseline_per_sub_init().lock().unwrap(); + let (b_mean, b_var) = match base.get(&node_id) { + Some(t) => (t.0.clone(), t.1.clone()), + None => return, + }; + drop(base); + let n = b_mean.len().min(phases.len()); + if n == 0 { return; } + let mut sum = 0.0_f64; + let mut usable: usize = 0; + for k in 0..n { + if b_var[k] > PHASE_BASELINE_VAR_MAX { continue; } + // Circular distance via the imaginary part of e^(i Δφ), + // taken |.| and normalised by π. Equivalent to + // |atan2(sin Δ, cos Δ)| / π but cheaper. + let delta = phases[k] - b_mean[k]; + let s = delta.sin(); + let c = delta.cos(); + let d = s.atan2(c).abs() / std::f64::consts::PI; + sum += d; + usable += 1; + } + if usable < PHASE_DRIFT_MIN_USABLE { return; } + let score = (sum / usable as f64).clamp(0.0, 1.0); + let mut m = phase_drift_init().lock().unwrap(); + m.insert(node_id, score); +} + /// Classify motion/presence for one node from the raw amplitude vector. /// /// Returns `(motion_level, presence, confidence)` where confidence is the @@ -1427,6 +1514,15 @@ struct PerNodeFeatureInfo { /// to see the off-axis presence channel firing in real time. #[serde(skip_serializing_if = "Option::is_none")] drift_score: Option, + /// ADR-104 phase-domain drift score in `[0, 1]`. 0 = current + /// per-subcarrier phases match the captured baseline; 1 = phases + /// are π rad apart on every usable subcarrier. `None` until either + /// (a) no per-subcarrier phase baseline is loaded or (b) fewer + /// than `PHASE_DRIFT_MIN_USABLE` subcarriers pass the baseline- + /// variance gate. More sensitive than amplitude drift to sub-mm + /// chest-wall motion (vital signs). + #[serde(skip_serializing_if = "Option::is_none")] + phase_drift_score: Option, } /// Build a per-node feature snapshot for the WebSocket envelope. @@ -1487,6 +1583,12 @@ fn build_node_features( let m = amp_drift_init().lock().unwrap(); m.get(&node_id).copied() }; + // ADR-104 phase-domain drift (None when no phase baseline + // loaded or too few usable subcarriers). + let phase_drift_score = { + let m = phase_drift_init().lock().unwrap(); + m.get(&node_id).copied() + }; PerNodeFeatureInfo { node_id, features, @@ -1497,6 +1599,7 @@ fn build_node_features( stale, novelty_score: ns.last_novelty_score, drift_score, + phase_drift_score, } }) .collect(); @@ -2832,6 +2935,9 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) { classification.presence = presence; classification.confidence = conf; } + // ADR-104 phase-domain: update phase drift score for this node + // alongside the amplitude classifier. No-op if no phase baseline. + phase_drift_update(frame.node_id, &frame.phases); drop(s_write_pre); // ── Step 5: Build enhanced fields from pipeline result ─────── @@ -5028,7 +5134,10 @@ async fn capture_baseline_to_disk( out_path: &str, ) -> Result { use std::time::{Instant, SystemTime, UNIX_EPOCH}; - let mut by_node: std::collections::HashMap, f64)>> = + // ADR-104 phase-domain: tuple now (t, amps, phases, rssi). Phases + // may be an empty Vec if the WS payload didn't carry them (legacy + // FW or scan path) — emit-time we just skip the phase block. + let mut by_node: std::collections::HashMap, Vec, f64)>> = std::collections::HashMap::new(); // Read off the broadcast channel directly via subscribing to a WS @@ -5061,8 +5170,12 @@ async fn capture_baseline_to_disk( .map(|a| a.iter().filter_map(|x| x.as_f64()).collect()) .unwrap_or_default(); if amps.is_empty() { continue; } + let phases: Vec = n.get("phases") + .and_then(|v| v.as_array()) + .map(|a| a.iter().filter_map(|x| x.as_f64()).collect()) + .unwrap_or_default(); let rssi = n.get("rssi_dbm").and_then(|v| v.as_f64()).unwrap_or(0.0); - by_node.entry(nid).or_default().push((t, amps, rssi)); + by_node.entry(nid).or_default().push((t, amps, phases, rssi)); } } } @@ -5084,7 +5197,7 @@ async fn capture_baseline_to_disk( let (head, tail) = if dur < trim_sec * 2.0 + clean_window_sec / 2.0 { (dur / 6.0, dur / 6.0) } else { (trim_sec, trim_sec) }; - let trimmed: Vec<&(f64, Vec, f64)> = frames.iter() + let trimmed: Vec<&(f64, Vec, Vec, f64)> = frames.iter() .filter(|f| f.0 >= t0 + head && f.0 <= t1 - tail).collect(); if trimmed.is_empty() { continue; } @@ -5095,14 +5208,14 @@ async fn capture_baseline_to_disk( // Scan windows for lowest-CV chunk. let win = clean_window_sec; - let chunk: Vec<&&(f64, Vec, f64)> = if trimmed.last().unwrap().0 - trimmed.first().unwrap().0 <= win { + let chunk: Vec<&&(f64, Vec, Vec, f64)> = if trimmed.last().unwrap().0 - trimmed.first().unwrap().0 <= win { trimmed.iter().collect() } else { - let mut best: Option<(f64, Vec<&&(f64, Vec, f64)>)> = None; + let mut best: Option<(f64, Vec<&&(f64, Vec, Vec, f64)>)> = None; let step = 5.0; let mut cursor = trimmed.first().unwrap().0; while cursor + win <= trimmed.last().unwrap().0 { - let w: Vec<&&(f64, Vec, f64)> = trimmed.iter() + let w: Vec<&&(f64, Vec, Vec, f64)> = trimmed.iter() .filter(|f| f.0 >= cursor && f.0 <= cursor + win).collect(); if w.len() >= 5 { let bms: Vec = w.iter().map(|f| full_mean(&f.1)).collect(); @@ -5129,10 +5242,67 @@ async fn capture_baseline_to_disk( sorted_bms.sort_by(|a,b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)); let p50 = sorted_bms[sorted_bms.len() / 2]; let p95 = sorted_bms[(sorted_bms.len() as f64 * 0.95) as usize]; - let rssis: Vec = chunk.iter().map(|f| f.2).filter(|x| *x != 0.0).collect(); + let rssis: Vec = chunk.iter().map(|f| f.3).filter(|x| *x != 0.0).collect(); let rssi_mean = if rssis.is_empty() { 0.0 } else { rssis.iter().sum::() / rssis.len() as f64 }; - nodes_out.insert(nid.to_string(), serde_json::json!({ + // ADR-104: per-subcarrier amplitude mean for the off-axis drift + // channel. Match the recording-script schema exactly. + let n_sub = chunk.iter().map(|f| f.1.len()).min().unwrap_or(0); + let mut per_sub_means: Vec = Vec::with_capacity(n_sub); + for k in 0..n_sub { + let mut vals: Vec = Vec::with_capacity(chunk.len()); + for f in &chunk { + if let Some(&v) = f.1.get(k) { + if v > 0.0 { vals.push(v); } + } + } + let m = if vals.is_empty() { 0.0 } else { + (vals.iter().sum::() / vals.len() as f64 * 1000.0).round() / 1000.0 + }; + per_sub_means.push(m); + } + + // ADR-104 phase-domain: per-subcarrier circular mean + variance. + // Only emit if any phase samples were captured (older FW / wifi + // scan paths send no phases). Variance is in [0, 1]; values + // close to 0 = stable subcarrier (reliable baseline reference); + // values close to 1 = noisy (server uses var as a usability gate). + let have_phase = chunk.iter().any(|f| !f.2.is_empty()); + let mut per_sub_phase_mean: Vec = Vec::new(); + let mut per_sub_phase_var: Vec = Vec::new(); + if have_phase { + let n_phase_sub = chunk.iter() + .filter(|f| !f.2.is_empty()) + .map(|f| f.2.len()) + .min() + .unwrap_or(0); + for k in 0..n_phase_sub { + let mut sx = 0.0_f64; + let mut cx = 0.0_f64; + let mut count: usize = 0; + for f in &chunk { + if let Some(&p) = f.2.get(k) { + sx += p.sin(); + cx += p.cos(); + count += 1; + } + } + if count == 0 { + per_sub_phase_mean.push(0.0); + per_sub_phase_var.push(1.0); + continue; + } + let n = count as f64; + let sx = sx / n; let cx = cx / n; + let r = (sx * sx + cx * cx).sqrt(); + let m = ((sx.atan2(cx)) * 10000.0).round() / 10000.0; + let v = ((1.0 - r) * 1000.0).round() / 1000.0; + per_sub_phase_mean.push(m); + per_sub_phase_var.push(v); + } + } + + let mut node_obj = serde_json::json!({ "full_broadband_mean": mean, "full_broadband_p50": p50, "full_broadband_p95": p95, @@ -5140,7 +5310,13 @@ async fn capture_baseline_to_disk( "full_broadband_cv_pct": cv * 100.0, "rssi_dbm": rssi_mean, "n_samples": chunk.len(), - })); + "per_subcarrier_mean": per_sub_means, + }); + if !per_sub_phase_mean.is_empty() { + node_obj["per_subcarrier_phase_mean"] = serde_json::json!(per_sub_phase_mean); + node_obj["per_subcarrier_phase_var"] = serde_json::json!(per_sub_phase_var); + } + nodes_out.insert(nid.to_string(), node_obj); } if nodes_out.is_empty() { @@ -5807,6 +5983,12 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) { classification.presence = presence; classification.confidence = conf; } + // ADR-104 phase-domain: update phase drift if a + // phase baseline is loaded and the latest frame + // carried phases. + if let Some(ph) = ns.latest_phases.as_ref() { + phase_drift_update(node_id, ph); + } ns.rssi_history.push_back(features.mean_rssi); if ns.rssi_history.len() > 60 {