feat(adr-104): phase-domain drift channel (script + server)

scripts/record-baseline.py and capture_baseline_to_disk now
compute per-subcarrier circular mean + variance of phases when the
WS stream carries them (ADR-106). Saved as per_subcarrier_phase_mean
+ per_subcarrier_phase_var in baseline.json.

Server loads them into PHASE_BASELINE_PER_SUB; phase_drift_update
computes a per-tick score (mean circular distance / π over
subcarriers with baseline variance < 0.30) and stores it in
PHASE_DRIFT. Surfaces as PerNodeFeatureInfo.phase_drift_score
(skip-if-none). Honesty contract: emits None below
PHASE_DRIFT_MIN_USABLE = 16 usable subcarriers.

Legacy baselines without phase fields fall back to amplitude-only
behaviour with no change.

Co-Authored-By: claude-flow <ruv@ruv.net>
This commit is contained in:
arsen 2026-05-17 16:44:21 +07:00
parent 5a79127780
commit 47dafab42d
2 changed files with 258 additions and 16 deletions

View File

@ -23,6 +23,7 @@ full-broadband mean / median / p95 to data/baseline.json.
import argparse
import asyncio
import json
import math
import statistics
import sys
import time
@ -42,8 +43,31 @@ def full_broadband_mean(amps):
return (sum(valid) / len(valid)) if valid else 0.0
def circular_mean_var(phases):
"""ADR-104 phase-domain: circular mean (radians) and circular variance
(1 - |R|, in [0, 1]) over a list of unwrapped/atan2 phase samples.
Variance close to 0 = phases tightly clustered (stable subcarrier,
suitable for baseline-comparison). Close to 1 = phases scattered
(subcarrier is noisy; baseline reference unreliable).
"""
n = len(phases)
if n == 0:
return (0.0, 1.0)
sx = sum(math.sin(p) for p in phases) / n
cx = sum(math.cos(p) for p in phases) / n
r = math.sqrt(sx * sx + cx * cx)
mean = math.atan2(sx, cx)
var = 1.0 - r
return (mean, var)
async def record(server: str, duration: float, port: int):
by_node: dict[int, list[tuple[float, list[float], float]]] = {}
# Per-node frame log: (t_sec, amps, phases, rssi).
# ADR-104 phase-domain: phases captured alongside amplitudes when the
# WS payload carries `phases` (ADR-106 full complex CSI). Missing or
# empty phase vectors → trim_and_clean writes only amplitude baseline.
by_node: dict[int, list[tuple[float, list[float], list[float], float]]] = {}
url = f"ws://{server}:{port}/ws/sensing"
start = time.time()
print(f"connecting to {url} — recording {duration:.0f}s …", flush=True)
@ -57,14 +81,23 @@ async def record(server: str, duration: float, port: int):
a = n.get("amplitude") or []
if not a:
continue
by_node.setdefault(n["node_id"], []).append((t, a, n.get("rssi_dbm", 0.0)))
ph = n.get("phases") or []
by_node.setdefault(n["node_id"], []).append(
(t, a, ph, n.get("rssi_dbm", 0.0))
)
if time.time() - start >= duration:
break
return by_node
def trim_and_clean(frames, trim_head_sec=15.0, trim_tail_sec=15.0, clean_window_sec=30.0):
"""Trim head/tail transients, then scan for the cleanest sub-window."""
"""Trim head/tail transients, then scan for the cleanest sub-window.
`frames` is a list of (t_sec, amps, phases, rssi). `phases` may be an
empty list when the server hasn't been upgraded to emit them — in
that case the resulting baseline omits the phase-domain fields and
the server falls back to amplitude-only drift (ADR-104 baseline mode).
"""
if not frames:
return None
t0 = frames[0][0]
@ -104,18 +137,39 @@ def trim_and_clean(frames, trim_head_sec=15.0, trim_tail_sec=15.0, clean_window_
# ── Compute per-node stats on the clean window ───────────────
full_means = [full_broadband_mean(a) for _, a, _ in chunk]
rssis = [r for _, _, r in chunk if r != 0]
rssis = [r for _, _, _, r in chunk if r != 0]
sorted_full = sorted(full_means)
# Per-subcarrier mean across the clean window (for diagnostic + future
# subcarrier-level comparison if the server gets that capability).
n_sub = min(len(a) for _, a, _ in chunk)
n_sub = min(len(a) for _, a, _, _ in chunk)
per_sub_means = []
for k in range(n_sub):
vs = [a[k] for _, a, _ in chunk if k < len(a) and a[k] > 0]
vs = [a[k] for _, a, _, _ in chunk if k < len(a) and a[k] > 0]
per_sub_means.append(statistics.mean(vs) if vs else 0.0)
return {
# ADR-104 phase-domain: per-subcarrier circular mean + variance of the
# captured phase samples. Only included if the WS stream carried
# phases — server tolerates either schema.
have_phases = any(ph for _, _, ph, _ in chunk)
per_sub_phase_means: list[float] = []
per_sub_phase_vars: list[float] = []
if have_phases:
n_phase_sub = min(
(len(ph) for _, _, ph, _ in chunk if ph),
default=0,
)
for k in range(n_phase_sub):
samples = [ph[k] for _, _, ph, _ in chunk if k < len(ph)]
if not samples:
per_sub_phase_means.append(0.0)
per_sub_phase_vars.append(1.0)
continue
mean, var = circular_mean_var(samples)
per_sub_phase_means.append(mean)
per_sub_phase_vars.append(var)
result = {
# Persistent fields the server reads:
"full_broadband_mean": statistics.mean(full_means),
"full_broadband_p50": sorted_full[len(sorted_full)//2],
@ -132,6 +186,12 @@ def trim_and_clean(frames, trim_head_sec=15.0, trim_tail_sec=15.0, clean_window_
# subcarrier-level comparison without re-recording):
"per_subcarrier_mean": [round(v, 3) for v in per_sub_means],
}
if per_sub_phase_means:
# Rounding: 4 decimals on mean phase (radian), 3 on variance
# — phase variance is in [0,1] so 3 decimals is plenty.
result["per_subcarrier_phase_mean"] = [round(v, 4) for v in per_sub_phase_means]
result["per_subcarrier_phase_var"] = [round(v, 3) for v in per_sub_phase_vars]
return result
def main():

View File

@ -413,6 +413,36 @@ fn amp_baseline_per_sub_init() -> &'static Mutex<std::collections::HashMap<u8, V
AMP_BASELINE_PER_SUB.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
}
/// ADR-104 phase-domain: per-node `(phase_mean_rad, phase_var)` vectors
/// loaded from baseline.json `per_subcarrier_phase_mean` +
/// `per_subcarrier_phase_var`. Optional — present only when the
/// recorder captured complex CSI. A high `var` (close to 1.0) on a
/// subcarrier means the baseline phase was unstable across the
/// recording window, so that subcarrier's per-tick phase delta is
/// unreliable and the server discards it from the phase drift score.
static PHASE_BASELINE_PER_SUB: OnceLock<Mutex<std::collections::HashMap<u8, (Vec<f64>, Vec<f64>)>>> = OnceLock::new();
fn phase_baseline_per_sub_init()
-> &'static Mutex<std::collections::HashMap<u8, (Vec<f64>, Vec<f64>)>>
{
PHASE_BASELINE_PER_SUB.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
}
/// ADR-104 phase-domain: per-node "phase drift" score in `[0, 1]`,
/// updated each tick. 0 = current phases match the baseline; 1 = π
/// rad away (maximally far on the unit circle). Computed only when a
/// phase baseline exposes ≥ 16 usable subcarriers (var < threshold).
static PHASE_DRIFT: OnceLock<Mutex<std::collections::HashMap<u8, f64>>> = OnceLock::new();
fn phase_drift_init() -> &'static Mutex<std::collections::HashMap<u8, f64>> {
PHASE_DRIFT.get_or_init(|| Mutex::new(std::collections::HashMap::new()))
}
/// Discard subcarriers whose baseline phase variance exceeds this.
/// 0.30 corresponds to mean resultant length R ≈ 0.70 — phases were
/// reasonably clustered during baseline capture. Tunable, conservative.
const PHASE_BASELINE_VAR_MAX: f64 = 0.30;
/// Minimum usable subcarriers required to emit a phase drift score.
/// Below this the score is too noisy to trust and we return None.
const PHASE_DRIFT_MIN_USABLE: usize = 16;
/// ADR-104: per-node "spectral drift" score = mean |Δ amp / baseline|
/// across subcarriers, computed against AMP_BASELINE_PER_SUB. Updated
/// every classifier tick; read by amp_node_level / amp_classify_from_latest
@ -506,6 +536,21 @@ fn load_baseline_file(path: &str) {
o.insert(id, vec);
}
}
// ADR-104 phase-domain: load per-subcarrier circular mean +
// variance vectors. Optional; only present when the recorder
// captured complex CSI (ADR-106). Lengths must match — if they
// don't we drop the phase baseline rather than silently mixing
// bad data into the drift score.
let p_mean = node.get("per_subcarrier_phase_mean").and_then(|v| v.as_array());
let p_var = node.get("per_subcarrier_phase_var").and_then(|v| v.as_array());
if let (Some(m), Some(v)) = (p_mean, p_var) {
let means: Vec<f64> = m.iter().filter_map(|x| x.as_f64()).collect();
let vars: Vec<f64> = v.iter().filter_map(|x| x.as_f64()).collect();
if means.len() == vars.len() && means.len() >= 16 {
let mut o = phase_baseline_per_sub_init().lock().unwrap();
o.insert(id, (means, vars));
}
}
}
if loaded.is_empty() {
warn!("baseline: {path} parsed but no usable per-node entries");
@ -533,6 +578,48 @@ fn load_baseline_file(path: &str) {
if cv_summary.is_empty() { "no CV normalization".to_string() } else { cv_summary.join(", ") });
}
/// ADR-104 phase-domain: update PHASE_DRIFT for a node from the
/// current per-subcarrier phases. Compares current phase to baseline
/// using circular distance, averaged over subcarriers whose baseline
/// variance is below `PHASE_BASELINE_VAR_MAX` (unstable subcarriers
/// would dominate with noise). Output is normalised to `[0, 1]`
/// where 0 = phases match baseline exactly and 1 = π rad apart.
///
/// No-op if a phase baseline isn't loaded for this node, or if fewer
/// than `PHASE_DRIFT_MIN_USABLE` subcarriers pass the variance gate.
/// Honesty contract: better to surface no score than a noisy one.
fn phase_drift_update(node_id: u8, phases: &[f64]) {
if phases.is_empty() {
return;
}
let base = phase_baseline_per_sub_init().lock().unwrap();
let (b_mean, b_var) = match base.get(&node_id) {
Some(t) => (t.0.clone(), t.1.clone()),
None => return,
};
drop(base);
let n = b_mean.len().min(phases.len());
if n == 0 { return; }
let mut sum = 0.0_f64;
let mut usable: usize = 0;
for k in 0..n {
if b_var[k] > PHASE_BASELINE_VAR_MAX { continue; }
// Circular distance via the imaginary part of e^(i Δφ),
// taken |.| and normalised by π. Equivalent to
// |atan2(sin Δ, cos Δ)| / π but cheaper.
let delta = phases[k] - b_mean[k];
let s = delta.sin();
let c = delta.cos();
let d = s.atan2(c).abs() / std::f64::consts::PI;
sum += d;
usable += 1;
}
if usable < PHASE_DRIFT_MIN_USABLE { return; }
let score = (sum / usable as f64).clamp(0.0, 1.0);
let mut m = phase_drift_init().lock().unwrap();
m.insert(node_id, score);
}
/// Classify motion/presence for one node from the raw amplitude vector.
///
/// Returns `(motion_level, presence, confidence)` where confidence is the
@ -1427,6 +1514,15 @@ struct PerNodeFeatureInfo {
/// to see the off-axis presence channel firing in real time.
#[serde(skip_serializing_if = "Option::is_none")]
drift_score: Option<f64>,
/// ADR-104 phase-domain drift score in `[0, 1]`. 0 = current
/// per-subcarrier phases match the captured baseline; 1 = phases
/// are π rad apart on every usable subcarrier. `None` until either
/// (a) no per-subcarrier phase baseline is loaded or (b) fewer
/// than `PHASE_DRIFT_MIN_USABLE` subcarriers pass the baseline-
/// variance gate. More sensitive than amplitude drift to sub-mm
/// chest-wall motion (vital signs).
#[serde(skip_serializing_if = "Option::is_none")]
phase_drift_score: Option<f64>,
}
/// Build a per-node feature snapshot for the WebSocket envelope.
@ -1487,6 +1583,12 @@ fn build_node_features(
let m = amp_drift_init().lock().unwrap();
m.get(&node_id).copied()
};
// ADR-104 phase-domain drift (None when no phase baseline
// loaded or too few usable subcarriers).
let phase_drift_score = {
let m = phase_drift_init().lock().unwrap();
m.get(&node_id).copied()
};
PerNodeFeatureInfo {
node_id,
features,
@ -1497,6 +1599,7 @@ fn build_node_features(
stale,
novelty_score: ns.last_novelty_score,
drift_score,
phase_drift_score,
}
})
.collect();
@ -2832,6 +2935,9 @@ async fn windows_wifi_task(state: SharedState, tick_ms: u64) {
classification.presence = presence;
classification.confidence = conf;
}
// ADR-104 phase-domain: update phase drift score for this node
// alongside the amplitude classifier. No-op if no phase baseline.
phase_drift_update(frame.node_id, &frame.phases);
drop(s_write_pre);
// ── Step 5: Build enhanced fields from pipeline result ───────
@ -5028,7 +5134,10 @@ async fn capture_baseline_to_disk(
out_path: &str,
) -> Result<serde_json::Value, String> {
use std::time::{Instant, SystemTime, UNIX_EPOCH};
let mut by_node: std::collections::HashMap<u8, Vec<(f64, Vec<f64>, f64)>> =
// ADR-104 phase-domain: tuple now (t, amps, phases, rssi). Phases
// may be an empty Vec if the WS payload didn't carry them (legacy
// FW or scan path) — emit-time we just skip the phase block.
let mut by_node: std::collections::HashMap<u8, Vec<(f64, Vec<f64>, Vec<f64>, f64)>> =
std::collections::HashMap::new();
// Read off the broadcast channel directly via subscribing to a WS
@ -5061,8 +5170,12 @@ async fn capture_baseline_to_disk(
.map(|a| a.iter().filter_map(|x| x.as_f64()).collect())
.unwrap_or_default();
if amps.is_empty() { continue; }
let phases: Vec<f64> = n.get("phases")
.and_then(|v| v.as_array())
.map(|a| a.iter().filter_map(|x| x.as_f64()).collect())
.unwrap_or_default();
let rssi = n.get("rssi_dbm").and_then(|v| v.as_f64()).unwrap_or(0.0);
by_node.entry(nid).or_default().push((t, amps, rssi));
by_node.entry(nid).or_default().push((t, amps, phases, rssi));
}
}
}
@ -5084,7 +5197,7 @@ async fn capture_baseline_to_disk(
let (head, tail) = if dur < trim_sec * 2.0 + clean_window_sec / 2.0 {
(dur / 6.0, dur / 6.0)
} else { (trim_sec, trim_sec) };
let trimmed: Vec<&(f64, Vec<f64>, f64)> = frames.iter()
let trimmed: Vec<&(f64, Vec<f64>, Vec<f64>, f64)> = frames.iter()
.filter(|f| f.0 >= t0 + head && f.0 <= t1 - tail).collect();
if trimmed.is_empty() { continue; }
@ -5095,14 +5208,14 @@ async fn capture_baseline_to_disk(
// Scan windows for lowest-CV chunk.
let win = clean_window_sec;
let chunk: Vec<&&(f64, Vec<f64>, f64)> = if trimmed.last().unwrap().0 - trimmed.first().unwrap().0 <= win {
let chunk: Vec<&&(f64, Vec<f64>, Vec<f64>, f64)> = if trimmed.last().unwrap().0 - trimmed.first().unwrap().0 <= win {
trimmed.iter().collect()
} else {
let mut best: Option<(f64, Vec<&&(f64, Vec<f64>, f64)>)> = None;
let mut best: Option<(f64, Vec<&&(f64, Vec<f64>, Vec<f64>, f64)>)> = None;
let step = 5.0;
let mut cursor = trimmed.first().unwrap().0;
while cursor + win <= trimmed.last().unwrap().0 {
let w: Vec<&&(f64, Vec<f64>, f64)> = trimmed.iter()
let w: Vec<&&(f64, Vec<f64>, Vec<f64>, f64)> = trimmed.iter()
.filter(|f| f.0 >= cursor && f.0 <= cursor + win).collect();
if w.len() >= 5 {
let bms: Vec<f64> = w.iter().map(|f| full_mean(&f.1)).collect();
@ -5129,10 +5242,67 @@ async fn capture_baseline_to_disk(
sorted_bms.sort_by(|a,b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let p50 = sorted_bms[sorted_bms.len() / 2];
let p95 = sorted_bms[(sorted_bms.len() as f64 * 0.95) as usize];
let rssis: Vec<f64> = chunk.iter().map(|f| f.2).filter(|x| *x != 0.0).collect();
let rssis: Vec<f64> = chunk.iter().map(|f| f.3).filter(|x| *x != 0.0).collect();
let rssi_mean = if rssis.is_empty() { 0.0 } else { rssis.iter().sum::<f64>() / rssis.len() as f64 };
nodes_out.insert(nid.to_string(), serde_json::json!({
// ADR-104: per-subcarrier amplitude mean for the off-axis drift
// channel. Match the recording-script schema exactly.
let n_sub = chunk.iter().map(|f| f.1.len()).min().unwrap_or(0);
let mut per_sub_means: Vec<f64> = Vec::with_capacity(n_sub);
for k in 0..n_sub {
let mut vals: Vec<f64> = Vec::with_capacity(chunk.len());
for f in &chunk {
if let Some(&v) = f.1.get(k) {
if v > 0.0 { vals.push(v); }
}
}
let m = if vals.is_empty() { 0.0 } else {
(vals.iter().sum::<f64>() / vals.len() as f64 * 1000.0).round() / 1000.0
};
per_sub_means.push(m);
}
// ADR-104 phase-domain: per-subcarrier circular mean + variance.
// Only emit if any phase samples were captured (older FW / wifi
// scan paths send no phases). Variance is in [0, 1]; values
// close to 0 = stable subcarrier (reliable baseline reference);
// values close to 1 = noisy (server uses var as a usability gate).
let have_phase = chunk.iter().any(|f| !f.2.is_empty());
let mut per_sub_phase_mean: Vec<f64> = Vec::new();
let mut per_sub_phase_var: Vec<f64> = Vec::new();
if have_phase {
let n_phase_sub = chunk.iter()
.filter(|f| !f.2.is_empty())
.map(|f| f.2.len())
.min()
.unwrap_or(0);
for k in 0..n_phase_sub {
let mut sx = 0.0_f64;
let mut cx = 0.0_f64;
let mut count: usize = 0;
for f in &chunk {
if let Some(&p) = f.2.get(k) {
sx += p.sin();
cx += p.cos();
count += 1;
}
}
if count == 0 {
per_sub_phase_mean.push(0.0);
per_sub_phase_var.push(1.0);
continue;
}
let n = count as f64;
let sx = sx / n; let cx = cx / n;
let r = (sx * sx + cx * cx).sqrt();
let m = ((sx.atan2(cx)) * 10000.0).round() / 10000.0;
let v = ((1.0 - r) * 1000.0).round() / 1000.0;
per_sub_phase_mean.push(m);
per_sub_phase_var.push(v);
}
}
let mut node_obj = serde_json::json!({
"full_broadband_mean": mean,
"full_broadband_p50": p50,
"full_broadband_p95": p95,
@ -5140,7 +5310,13 @@ async fn capture_baseline_to_disk(
"full_broadband_cv_pct": cv * 100.0,
"rssi_dbm": rssi_mean,
"n_samples": chunk.len(),
}));
"per_subcarrier_mean": per_sub_means,
});
if !per_sub_phase_mean.is_empty() {
node_obj["per_subcarrier_phase_mean"] = serde_json::json!(per_sub_phase_mean);
node_obj["per_subcarrier_phase_var"] = serde_json::json!(per_sub_phase_var);
}
nodes_out.insert(nid.to_string(), node_obj);
}
if nodes_out.is_empty() {
@ -5807,6 +5983,12 @@ async fn udp_receiver_task(state: SharedState, udp_port: u16) {
classification.presence = presence;
classification.confidence = conf;
}
// ADR-104 phase-domain: update phase drift if a
// phase baseline is loaded and the latest frame
// carried phases.
if let Some(ph) = ns.latest_phases.as_ref() {
phase_drift_update(node_id, ph);
}
ns.rssi_history.push_back(features.mean_rssi);
if ns.rssi_history.len() > 60 {