From ef20a7280dbfb8f94e04630aab4992d9a4b91815 Mon Sep 17 00:00:00 2001 From: rUv Date: Tue, 19 May 2026 14:51:03 -0400 Subject: [PATCH] fix(align): stream JSONL + support sensing_update format (#641) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two blockers discovered while running ADR-079 P7→P8 end-to-end against a 30-minute paired session (39,088 GT frames + 45,625 CSI frames): 1. `readFileSync(_, 'utf8').split('\n')` hit Node's `String.MaxLength` (~512 MB) on the 750 MB CSI recording. Result: Error: Cannot create a string longer than 0x1fffffe8 characters Replaced loadJsonl with a 1 MiB byte-buffer streaming reader that decodes line-by-line, so memory use stays bounded by the largest single record. 2. The sensing-server has long since switched from the legacy `raw_csi` / `feature` typed records to a single `sensing_update` record per tick (with nodes[].amplitude and top-level features). The aligner filtered on the old types and produced 0 frames every time. Added a `sensing_update` branch that projects each tick into rawCsi/features entries the existing windowing code can consume, and updated extractCsiMatrix to use already-extracted amplitudes when iqHex is absent. timestamp is now accepted as either ISO string (legacy) or numeric float-seconds (current). End-to-end verified: produces 1,077 paired samples at `--min-confidence 0.3 --window-frames 20` from the full 30-min recording; downstream `train-wiflow-supervised.js` runs to completion. See follow-up #640 for the PCK gap (data + GPU needed) — those are training concerns, not aligner concerns. --- scripts/align-ground-truth.js | 81 ++++++++++++++++++++++++++++++----- 1 file changed, 70 insertions(+), 11 deletions(-) diff --git a/scripts/align-ground-truth.js b/scripts/align-ground-truth.js index 6d69ec16..744581f8 100644 --- a/scripts/align-ground-truth.js +++ b/scripts/align-ground-truth.js @@ -136,18 +136,42 @@ function extractAmplitude(iqBytes, nSubcarriers) { /** * Load and parse a JSONL file, skipping blank/malformed lines. + * + * Reads byte-by-byte into Buffer slices to avoid Node's + * `String.MaxLength` (~512 MB) cap that `readFileSync(_, 'utf8')` hits + * on 30-min CSI recordings. Each line is decoded individually, so + * memory use stays bounded by the largest single record. */ function loadJsonl(filePath) { - const lines = fs.readFileSync(filePath, 'utf8').split('\n'); const records = []; - for (const line of lines) { - const trimmed = line.trim(); - if (!trimmed) continue; - try { - records.push(JSON.parse(trimmed)); - } catch { - // skip malformed lines + const fd = fs.openSync(filePath, 'r'); + try { + const bufSize = 1 << 20; // 1 MiB + const buf = Buffer.alloc(bufSize); + let leftover = ''; + let bytesRead; + do { + bytesRead = fs.readSync(fd, buf, 0, bufSize, null); + if (bytesRead > 0) { + const chunk = leftover + buf.toString('utf8', 0, bytesRead); + const lines = chunk.split('\n'); + leftover = lines.pop(); // last fragment may be incomplete + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + records.push(JSON.parse(trimmed)); + } catch { + // skip malformed lines + } + } + } + } while (bytesRead === bufSize); + if (leftover.trim()) { + try { records.push(JSON.parse(leftover.trim())); } catch {} } + } finally { + fs.closeSync(fd); } return records; } @@ -184,8 +208,12 @@ function loadCsi(filePath) { const features = []; for (const r of raw) { - if (!r.timestamp) continue; - const tsMs = isoToMs(r.timestamp); + if (r.timestamp == null) continue; + // Two timestamp formats: ISO string (legacy raw_csi/feature) or + // numeric float-seconds (current sensing_update from the Rust server). + const tsMs = typeof r.timestamp === 'number' + ? r.timestamp * 1000 + : isoToMs(r.timestamp); if (isNaN(tsMs)) continue; if (r.type === 'raw_csi') { @@ -205,6 +233,33 @@ function loadCsi(filePath) { rssi: r.rssi, seq: r.seq, }); + } else if (r.type === 'sensing_update') { + // Current sensing-server schema: one record per tick contains + // already-extracted amplitudes per node plus a server-computed + // feature vector. Project each into rawCsi/features so downstream + // windowing/matrix extraction can reuse its existing paths. + if (Array.isArray(r.nodes)) { + for (const node of r.nodes) { + if (!Array.isArray(node.amplitude) || node.amplitude.length === 0) continue; + rawCsi.push({ + tsMs, + nodeId: node.node_id, + subcarriers: node.amplitude.length, + amplitude: node.amplitude, // pre-extracted, no iq_hex needed + rssi: node.rssi_dbm, + seq: r.tick, + }); + } + } + if (Array.isArray(r.features) && r.features.length > 0) { + features.push({ + tsMs, + nodeId: 0, + features: r.features, + rssi: null, + seq: r.tick, + }); + } } } @@ -297,7 +352,11 @@ function extractCsiMatrix(window) { for (let f = 0; f < nFrames; f++) { const frame = window[f]; - if (frame.iqHex) { + if (frame.amplitude && frame.amplitude.length > 0) { + // Already-extracted amplitudes from sensing_update — copy directly. + const n = Math.min(nSc, frame.amplitude.length); + for (let s = 0; s < n; s++) matrix[f * nSc + s] = frame.amplitude[s]; + } else if (frame.iqHex) { const iq = parseIqHex(frame.iqHex); const amp = extractAmplitude(iq, nSc); matrix.set(amp, f * nSc);