fix(align): stream JSONL + support sensing_update format (#641)

Two blockers discovered while running ADR-079 P7→P8 end-to-end against
a 30-minute paired session (39,088 GT frames + 45,625 CSI frames):

1. `readFileSync(_, 'utf8').split('\n')` hit Node's `String.MaxLength`
   (~512 MB) on the 750 MB CSI recording. Result:
       Error: Cannot create a string longer than 0x1fffffe8 characters
   Replaced loadJsonl with a 1 MiB byte-buffer streaming reader that
   decodes line-by-line, so memory use stays bounded by the largest
   single record.

2. The sensing-server has long since switched from the legacy `raw_csi`
   / `feature` typed records to a single `sensing_update` record per
   tick (with nodes[].amplitude and top-level features). The aligner
   filtered on the old types and produced 0 frames every time. Added a
   `sensing_update` branch that projects each tick into rawCsi/features
   entries the existing windowing code can consume, and updated
   extractCsiMatrix to use already-extracted amplitudes when iqHex is
   absent. timestamp is now accepted as either ISO string (legacy) or
   numeric float-seconds (current).

End-to-end verified: produces 1,077 paired samples at
`--min-confidence 0.3 --window-frames 20` from the full 30-min
recording; downstream `train-wiflow-supervised.js` runs to completion.
See follow-up #640 for the PCK gap (data + GPU needed) — those are
training concerns, not aligner concerns.
This commit is contained in:
rUv 2026-05-19 14:51:03 -04:00 committed by GitHub
parent ad15f1b049
commit ef20a7280d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 70 additions and 11 deletions

View File

@ -136,18 +136,42 @@ function extractAmplitude(iqBytes, nSubcarriers) {
/**
* Load and parse a JSONL file, skipping blank/malformed lines.
*
* Reads byte-by-byte into Buffer slices to avoid Node's
* `String.MaxLength` (~512 MB) cap that `readFileSync(_, 'utf8')` hits
* on 30-min CSI recordings. Each line is decoded individually, so
* memory use stays bounded by the largest single record.
*/
function loadJsonl(filePath) {
const lines = fs.readFileSync(filePath, 'utf8').split('\n');
const records = [];
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
records.push(JSON.parse(trimmed));
} catch {
// skip malformed lines
const fd = fs.openSync(filePath, 'r');
try {
const bufSize = 1 << 20; // 1 MiB
const buf = Buffer.alloc(bufSize);
let leftover = '';
let bytesRead;
do {
bytesRead = fs.readSync(fd, buf, 0, bufSize, null);
if (bytesRead > 0) {
const chunk = leftover + buf.toString('utf8', 0, bytesRead);
const lines = chunk.split('\n');
leftover = lines.pop(); // last fragment may be incomplete
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
records.push(JSON.parse(trimmed));
} catch {
// skip malformed lines
}
}
}
} while (bytesRead === bufSize);
if (leftover.trim()) {
try { records.push(JSON.parse(leftover.trim())); } catch {}
}
} finally {
fs.closeSync(fd);
}
return records;
}
@ -184,8 +208,12 @@ function loadCsi(filePath) {
const features = [];
for (const r of raw) {
if (!r.timestamp) continue;
const tsMs = isoToMs(r.timestamp);
if (r.timestamp == null) continue;
// Two timestamp formats: ISO string (legacy raw_csi/feature) or
// numeric float-seconds (current sensing_update from the Rust server).
const tsMs = typeof r.timestamp === 'number'
? r.timestamp * 1000
: isoToMs(r.timestamp);
if (isNaN(tsMs)) continue;
if (r.type === 'raw_csi') {
@ -205,6 +233,33 @@ function loadCsi(filePath) {
rssi: r.rssi,
seq: r.seq,
});
} else if (r.type === 'sensing_update') {
// Current sensing-server schema: one record per tick contains
// already-extracted amplitudes per node plus a server-computed
// feature vector. Project each into rawCsi/features so downstream
// windowing/matrix extraction can reuse its existing paths.
if (Array.isArray(r.nodes)) {
for (const node of r.nodes) {
if (!Array.isArray(node.amplitude) || node.amplitude.length === 0) continue;
rawCsi.push({
tsMs,
nodeId: node.node_id,
subcarriers: node.amplitude.length,
amplitude: node.amplitude, // pre-extracted, no iq_hex needed
rssi: node.rssi_dbm,
seq: r.tick,
});
}
}
if (Array.isArray(r.features) && r.features.length > 0) {
features.push({
tsMs,
nodeId: 0,
features: r.features,
rssi: null,
seq: r.tick,
});
}
}
}
@ -297,7 +352,11 @@ function extractCsiMatrix(window) {
for (let f = 0; f < nFrames; f++) {
const frame = window[f];
if (frame.iqHex) {
if (frame.amplitude && frame.amplitude.length > 0) {
// Already-extracted amplitudes from sensing_update — copy directly.
const n = Math.min(nSc, frame.amplitude.length);
for (let s = 0; s < n; s++) matrix[f * nSc + s] = frame.amplitude[s];
} else if (frame.iqHex) {
const iq = parseIqHex(frame.iqHex);
const amp = extractAmplitude(iq, nSc);
matrix.set(amp, f * nSc);