log reading methods match log codec type

This commit is contained in:
Andre Staltz 2023-11-07 10:48:20 +02:00
parent db915d0287
commit b6d17e947f
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
4 changed files with 80 additions and 51 deletions

View File

@ -3,7 +3,9 @@ const b4a = require('b4a')
const MsgV3 = require('./msg-v3') const MsgV3 = require('./msg-v3')
/** /**
* @typedef {import('./index').Msg} Msg
* @typedef {import('./index').RecPresent} RecPresent * @typedef {import('./index').RecPresent} RecPresent
* @typedef {RecPresent['misc']} Misc
* @typedef {import('ppppp-keypair').Keypair} Keypair * @typedef {import('ppppp-keypair').Keypair} Keypair
* *
* @typedef {Buffer | Uint8Array} B4A * @typedef {Buffer | Uint8Array} B4A
@ -43,9 +45,11 @@ function keypairToSSBKeys(keypair) {
const decryptCache = new WeakMap() const decryptCache = new WeakMap()
/** /**
* @param {Pick<RecPresent, 'msg'> & Partial<Pick<RecPresent, 'id' | 'misc' | 'received'>>} rec * @template {{msg: Msg, misc?: Misc}} T
* @param {T} rec
* @param {any} peer * @param {any} peer
* @param {any} config * @param {any} config
* @returns {T & {misc?: Misc}}
*/ */
function decrypt(rec, peer, config) { function decrypt(rec, peer, config) {
if (decryptCache.has(rec)) return decryptCache.get(rec) if (decryptCache.has(rec)) return decryptCache.get(rec)
@ -66,9 +70,8 @@ function decrypt(rec, peer, config) {
const msgDecrypted = MsgV3.fromPlaintextBuffer(plaintextBuf, msgEncrypted) const msgDecrypted = MsgV3.fromPlaintextBuffer(plaintextBuf, msgEncrypted)
const recDecrypted = { const recDecrypted = {
id: rec.id, ...rec,
msg: msgDecrypted, msg: msgDecrypted,
received: rec.received,
misc: { misc: {
...rec.misc, ...rec.misc,
private: true, private: true,

View File

@ -45,6 +45,12 @@ const { decrypt } = require('./encryption')
* id: MsgID; * id: MsgID;
* msg: Msg; * msg: Msg;
* received: number; * received: number;
* }} RecInLog
*
* @typedef {{
* id: MsgID;
* msg: Msg;
* received: number;
* misc: { * misc: {
* offset: number; * offset: number;
* size: number; * size: number;
@ -129,29 +135,32 @@ function initDB(peer, config) {
const onRecordAdded = Obz() const onRecordAdded = Obz()
const codec = {
/**
* @param {RecInLog} msg
* @returns {B4A}
*/
encode(msg) {
return b4a.from(JSON.stringify(msg), 'utf8')
},
/**
* @param {B4A} buf
* @returns {RecInLog}
*/
decode(buf) {
return JSON.parse(b4a.toString(buf, 'utf8'))
},
}
const log = Log(Path.join(config.path, 'db.bin'), { const log = Log(Path.join(config.path, 'db.bin'), {
blockSize: 64 * 1024, blockSize: 64 * 1024,
codec: { codec,
/**
* @param {Msg} msg
*/
encode(msg) {
return b4a.from(JSON.stringify(msg), 'utf8')
},
/**
* @param {B4A} buf
* @returns {Msg}
*/
decode(buf) {
return JSON.parse(b4a.toString(buf, 'utf8'))
},
},
/** /**
* @param {B4A} buf * @param {B4A} buf
*/ */
validateRecord(buf) { validateRecord(buf) {
try { try {
JSON.parse(b4a.toString(buf, 'utf8')) codec.decode(buf)
return true return true
} catch { } catch {
return false return false
@ -174,9 +183,9 @@ function initDB(peer, config) {
setTimeout(() => { setTimeout(() => {
let i = -1 let i = -1
log.scan( log.scan(
function scanEach(offset, logRec, size) { function scanEach(offset, recInLog, size) {
i += 1 i += 1
if (!logRec) { if (!recInLog) {
// deleted record // deleted record
/** @type {RecDeleted} */ /** @type {RecDeleted} */
const rec = { misc: { offset, size, seq: i } } const rec = { misc: { offset, size, seq: i } }
@ -186,8 +195,8 @@ function initDB(peer, config) {
// TODO: for performance, dont decrypt on startup, instead decrypt on // TODO: for performance, dont decrypt on startup, instead decrypt on
// demand, or decrypt in the background. Or then store the log with // demand, or decrypt in the background. Or then store the log with
// decrypted msgs and only encrypt when moving it to the network. // decrypted msgs and only encrypt when moving it to the network.
// @ts-ignore // FIXME: /** @type {RecPresent} */
const rec = decrypt(logRec, peer, config) const rec = /** @type {any} */ (decrypt(recInLog, peer, config))
rec.misc ??= /** @type {Rec['misc']} */ ({}) rec.misc ??= /** @type {Rec['misc']} */ ({})
rec.misc.offset = offset rec.misc.offset = offset
rec.misc.size = size rec.misc.size = size
@ -208,24 +217,21 @@ function initDB(peer, config) {
* @param {CB<RecPresent>} cb * @param {CB<RecPresent>} cb
*/ */
function logAppend(id, msg, cb) { function logAppend(id, msg, cb) {
/** @type {RecPresent} */ /** @type {RecInLog} */
const rec = { const recInLog = {
id, id,
msg, msg,
received: Date.now(), received: Date.now(),
misc: {
offset: 0,
size: 0,
seq: 0,
},
} }
log.append(rec, (/** @type {any} */ err, /** @type {number} */ offset) => { log.append(recInLog, (err, offset) => {
if (err) return cb(new Error('logAppend failed', { cause: err })) if (err) return cb(new Error('logAppend failed', { cause: err }))
const size = b4a.from(JSON.stringify(rec), 'utf8').length const size = b4a.from(JSON.stringify(recInLog), 'utf8').length
const seq = recs.length const seq = recs.length
const recExposed = decrypt(rec, peer, config) const recExposed = decrypt(recInLog, peer, config)
rec.misc = recExposed.misc = { offset, size, seq } const rec = /** @type {RecPresent} */ (recInLog);
recs.push(recExposed) rec.misc = { offset, size, seq }
recExposed.misc = {...recExposed.misc, ...rec.misc}
recs.push(/** @type {any} */ (recExposed))
cb(null, rec) cb(null, rec)
}) })
} }

View File

@ -26,13 +26,26 @@ const Record = require('./record')
/** /**
* @typedef {Buffer | Uint8Array} B4A * @typedef {Buffer | Uint8Array} B4A
* @typedef {number} BlockIndex * @typedef {number} BlockIndex
*/
/**
* @template T
* @typedef {{ * @typedef {{
* encode: (data: any) => B4A, * encode: (data: T) => B4A,
* decode: (data: B4A) => any * decode: (data: B4A) => T
* }} Codec * }} Codec
*/
/**
* @template Type
* @typedef {Type extends Codec<infer X> ? X : never} extractCodecType
*/
/**
* @template T
* @typedef {{ * @typedef {{
* blockSize?: number, * blockSize?: number,
* codec?: Codec, * codec?: Codec<T>,
* writeTimeout?: number, * writeTimeout?: number,
* validateRecord?: (data: B4A) => boolean * validateRecord?: (data: B4A) => boolean
* }} Options * }} Options
@ -66,8 +79,6 @@ const EOB = {
asNumber: 0, asNumber: 0,
} }
/** @type {Codec} */
const DEFAULT_CODEC = { encode: (x) => x, decode: (x) => x }
const DEFAULT_BLOCK_SIZE = 65536 const DEFAULT_BLOCK_SIZE = 65536
const DEFAULT_WRITE_TIMEOUT = 250 const DEFAULT_WRITE_TIMEOUT = 250
const DEFAULT_VALIDATE = () => true const DEFAULT_VALIDATE = () => true
@ -75,10 +86,18 @@ const DEFAULT_VALIDATE = () => true
// const COMPACTION_PROGRESS_EMIT_INTERVAL = 1000 // FIXME: // const COMPACTION_PROGRESS_EMIT_INTERVAL = 1000 // FIXME:
/** /**
* @template [T=B4A]
* @param {string} filename * @param {string} filename
* @param {Options} opts * @param {Options<T>} opts
*/ */
function AsyncAppendOnlyLog(filename, opts) { function AsyncAppendOnlyLog(filename, opts) {
const DEFAULT_CODEC = /** @type {Codec<T>} */ (
/** @type {any} */ ({
encode: (/** @type {any} */ x) => x,
decode: (/** @type {any} */ x) => x,
})
)
const cache = new Cache({ maxSize: 1024 }) // This is potentially 64 MiB! const cache = new Cache({ maxSize: 1024 }) // This is potentially 64 MiB!
const raf = RAF(filename) const raf = RAF(filename)
const statsFilename = filename + '.stats' const statsFilename = filename + '.stats'
@ -357,7 +376,7 @@ function AsyncAppendOnlyLog(filename, opts) {
/** /**
* @param {number} offset * @param {number} offset
* @param {CB<B4A>} cb * @param {CB<extractCodecType<typeof codec>>} cb
*/ */
function get(offset, cb) { function get(offset, cb) {
assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set') assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set')
@ -372,6 +391,7 @@ function AsyncAppendOnlyLog(filename, opts) {
if (err) return cb(err) if (err) return cb(err)
const [dataBuf] = Record.read(blockBuf, getOffsetInBlock(offset)) const [dataBuf] = Record.read(blockBuf, getOffsetInBlock(offset))
if (isBufferZero(dataBuf)) return cb(deletedRecordErr()) if (isBufferZero(dataBuf)) return cb(deletedRecordErr())
// @ts-ignore
cb(null, codec.decode(dataBuf)) cb(null, codec.decode(dataBuf))
}) })
} }
@ -384,9 +404,9 @@ function AsyncAppendOnlyLog(filename, opts) {
* * `>0`: next record within block * * `>0`: next record within block
* @param {Buffer} blockBuf * @param {Buffer} blockBuf
* @param {number} offset * @param {number} offset
* @param {boolean} asRaw * @return {[number, extractCodecType<typeof codec> | null, number]}
*/ */
function getDataNextOffset(blockBuf, offset, asRaw = false) { function getDataNextOffset(blockBuf, offset) {
const offsetInBlock = getOffsetInBlock(offset) const offsetInBlock = getOffsetInBlock(offset)
const [dataBuf, recSize] = Record.read(blockBuf, offsetInBlock) const [dataBuf, recSize] = Record.read(blockBuf, offsetInBlock)
const nextLength = Record.readDataLength(blockBuf, offsetInBlock + recSize) const nextLength = Record.readDataLength(blockBuf, offsetInBlock + recSize)
@ -400,11 +420,11 @@ function AsyncAppendOnlyLog(filename, opts) {
} }
if (isBufferZero(dataBuf)) return [nextOffset, null, recSize] if (isBufferZero(dataBuf)) return [nextOffset, null, recSize]
else return [nextOffset, asRaw ? dataBuf : codec.decode(dataBuf), recSize] else return [nextOffset, codec.decode(dataBuf), recSize]
} }
/** /**
* @param {(offset: number, value: B4A, size: number) => void} onNext * @param {(offset: number, data: extractCodecType<typeof codec> | null, size: number) => void} onNext
* @param {(error?: Error) => void} onDone * @param {(error?: Error) => void} onDone
*/ */
function scan(onNext, onDone) { function scan(onNext, onDone) {
@ -416,8 +436,8 @@ function AsyncAppendOnlyLog(filename, opts) {
if (err) return onDone(err) if (err) return onDone(err)
if (isBufferZero(blockBuf)) return onDone() if (isBufferZero(blockBuf)) return onDone()
while (true) { while (true) {
const [offset, value, size] = getDataNextOffset(blockBuf, cursor) const [offset, data, size] = getDataNextOffset(blockBuf, cursor)
onNext(cursor, value, size) onNext(cursor, data, size)
if (offset === 0) { if (offset === 0) {
cursor = getNextBlockStart(cursor) cursor = getNextBlockStart(cursor)
getNextBlock() getNextBlock()
@ -524,7 +544,7 @@ function AsyncAppendOnlyLog(filename, opts) {
} }
/** /**
* @param {any} data * @param {extractCodecType<typeof codec>} data
* @returns {number} * @returns {number}
*/ */
function appendSingle(data) { function appendSingle(data) {
@ -559,7 +579,7 @@ function AsyncAppendOnlyLog(filename, opts) {
} }
/** /**
* @param {any | Array<any>} data * @param {extractCodecType<typeof codec> | Array<extractCodecType<typeof codec>>} data
* @param {CB<number>} cb * @param {CB<number>} cb
*/ */
function append(data, cb) { function append(data, cb) {

View File

@ -50,7 +50,7 @@ test('add()', async (t) => {
await p(peer.db._getLog().onDrain)() await p(peer.db._getLog().onDrain)()
const stats = await p(peer.db.logStats)() const stats = await p(peer.db.logStats)()
assert.deepEqual(stats, { totalBytes: 974, deletedBytes: 0 }) assert.deepEqual(stats, { totalBytes: 900, deletedBytes: 0 })
await p(peer.close)(true) await p(peer.close)(true)
}) })