wifi-densepose/scripts/generate-replay-fixtures.py

109 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""ADR-114: generate 1000 idle + 1000 motion CSI replay fixtures.
Two files are written under
`v2/crates/wifi-densepose-sensing-server/tests/fixtures/`:
* `replay_idle.jsonl` — 1000 frames of empty-room baseline +
per-frame Gaussian noise (low CV).
* `replay_motion.jsonl` — 1000 frames of the same baseline + 1.5 Hz
coherent modulation + per-frame Gaussian
noise (high CV).
Format: one JSON object per line:
{"node_id": <u8>, "amplitude": [<f64>; 56]}
These are *synthetic but parameter-matched to live data* (baseline
mean = 27.04 / 14.72 from data/baseline.json, CV ≈ 2.6 / 3.6 %).
They exist to provide deterministic regression coverage of the
amp_presence_override classifier. Real captured-from-sensor fixtures
can replace them in-place (same filename, same line format) without
changing the test code.
Deterministic by seed so the test result is reproducible across
machines. Re-run only when you want to regenerate.
"""
from __future__ import annotations
import json
import math
import random
from pathlib import Path
OUT_DIR = (
Path(__file__).resolve().parent.parent
/ "v2"
/ "crates"
/ "wifi-densepose-sensing-server"
/ "tests"
/ "fixtures"
)
# Per-node baseline mean amplitude pulled from a real recording of
# this deployment (data/baseline.json). Holding them in code keeps
# the fixture script self-contained.
NODE_BASELINES = {1: 27.04, 2: 14.72}
N_SUB = 56
FRAMES_PER_NODE = 500 # 500 × 2 nodes = 1000 per fixture file
def gen_subcarrier_profile(rng: random.Random, mean: float) -> list[float]:
"""Static per-subcarrier mean profile — same for the whole capture."""
return [max(1.0, mean * rng.uniform(0.7, 1.3)) for _ in range(N_SUB)]
def write_fixture(path: Path, motion: bool, seed: int) -> int:
rng = random.Random(seed)
profiles = {
nid: gen_subcarrier_profile(rng, mean) for nid, mean in NODE_BASELINES.items()
}
count = 0
with path.open("w") as f:
# Interleave nodes round-robin so the test driver gets per-node
# streams of the same length, like a real WS feed.
for i in range(FRAMES_PER_NODE):
for nid, profile in profiles.items():
t = i / 20.0 # 20 Hz tick
# AMP_SHORT_WIN in the server is 90 frames = 4.5 s.
# Idle: small per-frame noise → rolling-window CV stays
# well below the universal threshold.
# Motion: a slow ~0.15 Hz coherent envelope (6.7 s cycle,
# longer than the 4.5 s averaging window) drives the
# broadband mean up/down by ±40 %, producing a high
# rolling CV. Mimics body position changes during
# walking — the channel response shifts slowly relative
# to the classifier window.
if motion:
envelope = 1.0 + 0.40 * math.sin(2 * math.pi * 0.15 * t)
else:
envelope = 1.0
amps: list[float] = []
for mu in profile:
noise_sigma = mu * (0.05 if motion else 0.018)
n = rng.gauss(0.0, noise_sigma)
amps.append(round(mu * envelope + n, 3))
f.write(json.dumps({"node_id": nid, "amplitude": amps}) + "\n")
count += 1
return count
def main() -> None:
OUT_DIR.mkdir(parents=True, exist_ok=True)
idle_path = OUT_DIR / "replay_idle.jsonl"
motion_path = OUT_DIR / "replay_motion.jsonl"
n_idle = write_fixture(idle_path, motion=False, seed=42)
n_motion = write_fixture(motion_path, motion=True, seed=43)
print(f"wrote {n_idle} idle frames → {idle_path}")
print(f"wrote {n_motion} motion frames → {motion_path}")
print()
print("These fixtures are SYNTHETIC parameter-matched to live data —")
print("the cargo test that consumes them measures classifier")
print("consistency, not real-world accuracy. Replace with live")
print("captures (same line format, same filenames) when operator")
print("time allows for a true empty-vs-walking ground-truth pair.")
if __name__ == "__main__":
main()