From b6d17e947fc3e2e77074299d027a8c7ef25f220e Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Tue, 7 Nov 2023 10:48:20 +0200 Subject: [PATCH] log reading methods match log codec type --- lib/encryption.js | 9 ++++-- lib/index.js | 70 +++++++++++++++++++++++++---------------------- lib/log/index.js | 50 +++++++++++++++++++++++---------- test/add.test.js | 2 +- 4 files changed, 80 insertions(+), 51 deletions(-) diff --git a/lib/encryption.js b/lib/encryption.js index c86c995..4b30308 100644 --- a/lib/encryption.js +++ b/lib/encryption.js @@ -3,7 +3,9 @@ const b4a = require('b4a') const MsgV3 = require('./msg-v3') /** + * @typedef {import('./index').Msg} Msg * @typedef {import('./index').RecPresent} RecPresent + * @typedef {RecPresent['misc']} Misc * @typedef {import('ppppp-keypair').Keypair} Keypair * * @typedef {Buffer | Uint8Array} B4A @@ -43,9 +45,11 @@ function keypairToSSBKeys(keypair) { const decryptCache = new WeakMap() /** - * @param {Pick & Partial>} rec + * @template {{msg: Msg, misc?: Misc}} T + * @param {T} rec * @param {any} peer * @param {any} config + * @returns {T & {misc?: Misc}} */ function decrypt(rec, peer, config) { if (decryptCache.has(rec)) return decryptCache.get(rec) @@ -66,9 +70,8 @@ function decrypt(rec, peer, config) { const msgDecrypted = MsgV3.fromPlaintextBuffer(plaintextBuf, msgEncrypted) const recDecrypted = { - id: rec.id, + ...rec, msg: msgDecrypted, - received: rec.received, misc: { ...rec.misc, private: true, diff --git a/lib/index.js b/lib/index.js index 366e61d..ae6fbb0 100644 --- a/lib/index.js +++ b/lib/index.js @@ -45,6 +45,12 @@ const { decrypt } = require('./encryption') * id: MsgID; * msg: Msg; * received: number; + * }} RecInLog + * + * @typedef {{ + * id: MsgID; + * msg: Msg; + * received: number; * misc: { * offset: number; * size: number; @@ -129,29 +135,32 @@ function initDB(peer, config) { const onRecordAdded = Obz() + const codec = { + /** + * @param {RecInLog} msg + * @returns {B4A} + */ + encode(msg) { + return b4a.from(JSON.stringify(msg), 'utf8') + }, + /** + * @param {B4A} buf + * @returns {RecInLog} + */ + decode(buf) { + return JSON.parse(b4a.toString(buf, 'utf8')) + }, + } + const log = Log(Path.join(config.path, 'db.bin'), { blockSize: 64 * 1024, - codec: { - /** - * @param {Msg} msg - */ - encode(msg) { - return b4a.from(JSON.stringify(msg), 'utf8') - }, - /** - * @param {B4A} buf - * @returns {Msg} - */ - decode(buf) { - return JSON.parse(b4a.toString(buf, 'utf8')) - }, - }, + codec, /** * @param {B4A} buf */ validateRecord(buf) { try { - JSON.parse(b4a.toString(buf, 'utf8')) + codec.decode(buf) return true } catch { return false @@ -174,9 +183,9 @@ function initDB(peer, config) { setTimeout(() => { let i = -1 log.scan( - function scanEach(offset, logRec, size) { + function scanEach(offset, recInLog, size) { i += 1 - if (!logRec) { + if (!recInLog) { // deleted record /** @type {RecDeleted} */ const rec = { misc: { offset, size, seq: i } } @@ -186,8 +195,8 @@ function initDB(peer, config) { // TODO: for performance, dont decrypt on startup, instead decrypt on // demand, or decrypt in the background. Or then store the log with // decrypted msgs and only encrypt when moving it to the network. - // @ts-ignore // FIXME: - const rec = decrypt(logRec, peer, config) + /** @type {RecPresent} */ + const rec = /** @type {any} */ (decrypt(recInLog, peer, config)) rec.misc ??= /** @type {Rec['misc']} */ ({}) rec.misc.offset = offset rec.misc.size = size @@ -208,24 +217,21 @@ function initDB(peer, config) { * @param {CB} cb */ function logAppend(id, msg, cb) { - /** @type {RecPresent} */ - const rec = { + /** @type {RecInLog} */ + const recInLog = { id, msg, received: Date.now(), - misc: { - offset: 0, - size: 0, - seq: 0, - }, } - log.append(rec, (/** @type {any} */ err, /** @type {number} */ offset) => { + log.append(recInLog, (err, offset) => { if (err) return cb(new Error('logAppend failed', { cause: err })) - const size = b4a.from(JSON.stringify(rec), 'utf8').length + const size = b4a.from(JSON.stringify(recInLog), 'utf8').length const seq = recs.length - const recExposed = decrypt(rec, peer, config) - rec.misc = recExposed.misc = { offset, size, seq } - recs.push(recExposed) + const recExposed = decrypt(recInLog, peer, config) + const rec = /** @type {RecPresent} */ (recInLog); + rec.misc = { offset, size, seq } + recExposed.misc = {...recExposed.misc, ...rec.misc} + recs.push(/** @type {any} */ (recExposed)) cb(null, rec) }) } diff --git a/lib/log/index.js b/lib/log/index.js index 09e527a..e6f2212 100644 --- a/lib/log/index.js +++ b/lib/log/index.js @@ -26,13 +26,26 @@ const Record = require('./record') /** * @typedef {Buffer | Uint8Array} B4A * @typedef {number} BlockIndex + */ + +/** + * @template T * @typedef {{ - * encode: (data: any) => B4A, - * decode: (data: B4A) => any + * encode: (data: T) => B4A, + * decode: (data: B4A) => T * }} Codec + */ + +/** + * @template Type + * @typedef {Type extends Codec ? X : never} extractCodecType + */ + +/** + * @template T * @typedef {{ * blockSize?: number, - * codec?: Codec, + * codec?: Codec, * writeTimeout?: number, * validateRecord?: (data: B4A) => boolean * }} Options @@ -66,8 +79,6 @@ const EOB = { asNumber: 0, } -/** @type {Codec} */ -const DEFAULT_CODEC = { encode: (x) => x, decode: (x) => x } const DEFAULT_BLOCK_SIZE = 65536 const DEFAULT_WRITE_TIMEOUT = 250 const DEFAULT_VALIDATE = () => true @@ -75,10 +86,18 @@ const DEFAULT_VALIDATE = () => true // const COMPACTION_PROGRESS_EMIT_INTERVAL = 1000 // FIXME: /** + * @template [T=B4A] * @param {string} filename - * @param {Options} opts + * @param {Options} opts */ function AsyncAppendOnlyLog(filename, opts) { + const DEFAULT_CODEC = /** @type {Codec} */ ( + /** @type {any} */ ({ + encode: (/** @type {any} */ x) => x, + decode: (/** @type {any} */ x) => x, + }) + ) + const cache = new Cache({ maxSize: 1024 }) // This is potentially 64 MiB! const raf = RAF(filename) const statsFilename = filename + '.stats' @@ -357,7 +376,7 @@ function AsyncAppendOnlyLog(filename, opts) { /** * @param {number} offset - * @param {CB} cb + * @param {CB>} cb */ function get(offset, cb) { assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set') @@ -372,6 +391,7 @@ function AsyncAppendOnlyLog(filename, opts) { if (err) return cb(err) const [dataBuf] = Record.read(blockBuf, getOffsetInBlock(offset)) if (isBufferZero(dataBuf)) return cb(deletedRecordErr()) + // @ts-ignore cb(null, codec.decode(dataBuf)) }) } @@ -384,9 +404,9 @@ function AsyncAppendOnlyLog(filename, opts) { * * `>0`: next record within block * @param {Buffer} blockBuf * @param {number} offset - * @param {boolean} asRaw + * @return {[number, extractCodecType | null, number]} */ - function getDataNextOffset(blockBuf, offset, asRaw = false) { + function getDataNextOffset(blockBuf, offset) { const offsetInBlock = getOffsetInBlock(offset) const [dataBuf, recSize] = Record.read(blockBuf, offsetInBlock) const nextLength = Record.readDataLength(blockBuf, offsetInBlock + recSize) @@ -400,11 +420,11 @@ function AsyncAppendOnlyLog(filename, opts) { } if (isBufferZero(dataBuf)) return [nextOffset, null, recSize] - else return [nextOffset, asRaw ? dataBuf : codec.decode(dataBuf), recSize] + else return [nextOffset, codec.decode(dataBuf), recSize] } /** - * @param {(offset: number, value: B4A, size: number) => void} onNext + * @param {(offset: number, data: extractCodecType | null, size: number) => void} onNext * @param {(error?: Error) => void} onDone */ function scan(onNext, onDone) { @@ -416,8 +436,8 @@ function AsyncAppendOnlyLog(filename, opts) { if (err) return onDone(err) if (isBufferZero(blockBuf)) return onDone() while (true) { - const [offset, value, size] = getDataNextOffset(blockBuf, cursor) - onNext(cursor, value, size) + const [offset, data, size] = getDataNextOffset(blockBuf, cursor) + onNext(cursor, data, size) if (offset === 0) { cursor = getNextBlockStart(cursor) getNextBlock() @@ -524,7 +544,7 @@ function AsyncAppendOnlyLog(filename, opts) { } /** - * @param {any} data + * @param {extractCodecType} data * @returns {number} */ function appendSingle(data) { @@ -559,7 +579,7 @@ function AsyncAppendOnlyLog(filename, opts) { } /** - * @param {any | Array} data + * @param {extractCodecType | Array>} data * @param {CB} cb */ function append(data, cb) { diff --git a/test/add.test.js b/test/add.test.js index c8720e0..0b81c01 100644 --- a/test/add.test.js +++ b/test/add.test.js @@ -50,7 +50,7 @@ test('add()', async (t) => { await p(peer.db._getLog().onDrain)() const stats = await p(peer.db.logStats)() - assert.deepEqual(stats, { totalBytes: 974, deletedBytes: 0 }) + assert.deepEqual(stats, { totalBytes: 900, deletedBytes: 0 }) await p(peer.close)(true) })