diff --git a/scripts/align-ground-truth.js b/scripts/align-ground-truth.js index 6d69ec16..744581f8 100644 --- a/scripts/align-ground-truth.js +++ b/scripts/align-ground-truth.js @@ -136,18 +136,42 @@ function extractAmplitude(iqBytes, nSubcarriers) { /** * Load and parse a JSONL file, skipping blank/malformed lines. + * + * Reads byte-by-byte into Buffer slices to avoid Node's + * `String.MaxLength` (~512 MB) cap that `readFileSync(_, 'utf8')` hits + * on 30-min CSI recordings. Each line is decoded individually, so + * memory use stays bounded by the largest single record. */ function loadJsonl(filePath) { - const lines = fs.readFileSync(filePath, 'utf8').split('\n'); const records = []; - for (const line of lines) { - const trimmed = line.trim(); - if (!trimmed) continue; - try { - records.push(JSON.parse(trimmed)); - } catch { - // skip malformed lines + const fd = fs.openSync(filePath, 'r'); + try { + const bufSize = 1 << 20; // 1 MiB + const buf = Buffer.alloc(bufSize); + let leftover = ''; + let bytesRead; + do { + bytesRead = fs.readSync(fd, buf, 0, bufSize, null); + if (bytesRead > 0) { + const chunk = leftover + buf.toString('utf8', 0, bytesRead); + const lines = chunk.split('\n'); + leftover = lines.pop(); // last fragment may be incomplete + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + records.push(JSON.parse(trimmed)); + } catch { + // skip malformed lines + } + } + } + } while (bytesRead === bufSize); + if (leftover.trim()) { + try { records.push(JSON.parse(leftover.trim())); } catch {} } + } finally { + fs.closeSync(fd); } return records; } @@ -184,8 +208,12 @@ function loadCsi(filePath) { const features = []; for (const r of raw) { - if (!r.timestamp) continue; - const tsMs = isoToMs(r.timestamp); + if (r.timestamp == null) continue; + // Two timestamp formats: ISO string (legacy raw_csi/feature) or + // numeric float-seconds (current sensing_update from the Rust server). + const tsMs = typeof r.timestamp === 'number' + ? r.timestamp * 1000 + : isoToMs(r.timestamp); if (isNaN(tsMs)) continue; if (r.type === 'raw_csi') { @@ -205,6 +233,33 @@ function loadCsi(filePath) { rssi: r.rssi, seq: r.seq, }); + } else if (r.type === 'sensing_update') { + // Current sensing-server schema: one record per tick contains + // already-extracted amplitudes per node plus a server-computed + // feature vector. Project each into rawCsi/features so downstream + // windowing/matrix extraction can reuse its existing paths. + if (Array.isArray(r.nodes)) { + for (const node of r.nodes) { + if (!Array.isArray(node.amplitude) || node.amplitude.length === 0) continue; + rawCsi.push({ + tsMs, + nodeId: node.node_id, + subcarriers: node.amplitude.length, + amplitude: node.amplitude, // pre-extracted, no iq_hex needed + rssi: node.rssi_dbm, + seq: r.tick, + }); + } + } + if (Array.isArray(r.features) && r.features.length > 0) { + features.push({ + tsMs, + nodeId: 0, + features: r.features, + rssi: null, + seq: r.tick, + }); + } } } @@ -297,7 +352,11 @@ function extractCsiMatrix(window) { for (let f = 0; f < nFrames; f++) { const frame = window[f]; - if (frame.iqHex) { + if (frame.amplitude && frame.amplitude.length > 0) { + // Already-extracted amplitudes from sensing_update — copy directly. + const n = Math.min(nSc, frame.amplitude.length); + for (let s = 0; s < n; s++) matrix[f * nSc + s] = frame.amplitude[s]; + } else if (frame.iqHex) { const iq = parseIqHex(frame.iqHex); const amp = extractAmplitude(iq, nSc); matrix.set(amp, f * nSc);