Re-scan log into memory after compacting

This commit is contained in:
Andre Staltz 2024-02-27 16:52:01 +02:00
parent 667b33779d
commit 60afd4b64b
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
3 changed files with 80 additions and 20 deletions

View File

@ -192,7 +192,7 @@ function initDB(peer, config) {
/** @type {Array<Rec | null>} */ /** @type {Array<Rec | null>} */
const recs = [] const recs = []
/** @type {WeakMap<Rec, Misc>} */ /** @type {WeakMap<Rec, Misc>} */
const miscRegistry = new WeakMap() let miscRegistry = new WeakMap()
/** @type {Map<MsgID, Doneable<RecPresent>>} */ /** @type {Map<MsgID, Doneable<RecPresent>>} */
const msgsBeingAdded = new Map() const msgsBeingAdded = new Map()
/** @type {Map<string, EncryptionFormat>} */ /** @type {Map<string, EncryptionFormat>} */
@ -272,6 +272,34 @@ function initDB(peer, config) {
) )
}) })
/**
* @param {CB<void>} cb
*/
function rescanLogPostCompaction(cb) {
miscRegistry = new WeakMap()
let seq = -1
log.scan(
function rescanEach(offset, recInLog, size) {
seq += 1
if (!recInLog) {
// deleted record
recs[seq] = null
return
}
const rec = decrypt(recInLog, peer, config)
miscRegistry.set(rec, { offset, size, seq })
recs[seq] = rec
},
function rescanEnd(err) {
// prettier-ignore
if (err) return cb(new Error('Failed to rescan the log after compaction', { cause: err }))
recs.length = seq + 1
cb()
},
false // asRaw
)
}
/** /**
* @param {MsgID} id * @param {MsgID} id
* @param {Msg} msg * @param {Msg} msg
@ -1183,6 +1211,23 @@ function initDB(peer, config) {
} }
} }
/** @type {CB<void>} */
function logError(err) {
if (err) console.error(err)
}
/**
* @param {CB<void>} cb
*/
function compact(cb) {
cb ??= logError
log.compact((err) => {
// prettier-ignore
if (err) return cb?.(err)
rescanLogPostCompaction(cb)
});
}
return { return {
// public // public
installEncryptionFormat, installEncryptionFormat,
@ -1217,7 +1262,7 @@ function initDB(peer, config) {
records, records,
log: { log: {
stats: log.stats.bind(log), stats: log.stats.bind(log),
compact: log.compact.bind(log), compact,
}, },
// internal // internal

View File

@ -398,6 +398,7 @@ function Log(filename, opts) {
/** /**
* @param {(offset: number, data: extractCodecType<typeof codec> | null, size: number) => Promise<void> | void} onNext * @param {(offset: number, data: extractCodecType<typeof codec> | null, size: number) => Promise<void> | void} onNext
* @param {(error?: Error) => void} onDone * @param {(error?: Error) => void} onDone
* @param {boolean} asRaw
*/ */
function scan(onNext, onDone, asRaw = false) { function scan(onNext, onDone, asRaw = false) {
let cursor = 0 let cursor = 0

View File

@ -34,18 +34,19 @@ test('del()', async (t) => {
msgIDs.push(rec.id) msgIDs.push(rec.id)
} }
const before = [] {
const texts = []
for (const msg of peer.db.msgs()) { for (const msg of peer.db.msgs()) {
if (msg.data && msg.metadata.account?.length > 4) { if (msg.data && msg.metadata.account?.length > 4) {
before.push(msg.data.text) texts.push(msg.data.text)
} }
} }
assert.deepEqual( assert.deepEqual(
before, texts,
['m0', 'm1', 'm2', 'm3', 'm4'], ['m0', 'm1', 'm2', 'm3', 'm4'],
'msgs before the delete' 'msgs before the delete'
) )
}
const stats1 = await p(peer.db.log.stats)() const stats1 = await p(peer.db.log.stats)()
assert.deepEqual( assert.deepEqual(
@ -55,19 +56,32 @@ test('del()', async (t) => {
) )
await p(peer.db.del)(msgIDs[2]) await p(peer.db.del)(msgIDs[2])
await p(peer.db.del)(msgIDs[3])
const after = [] {
const texts = []
for (const msg of peer.db.msgs()) { for (const msg of peer.db.msgs()) {
if (msg.data && msg.metadata.account?.length > 4) { if (msg.data && msg.metadata.account?.length > 4) {
after.push(msg.data.text) texts.push(msg.data.text)
} }
} }
assert.deepEqual(texts, ['m0', 'm1', 'm4'], 'msgs after the delete')
assert.deepEqual(after, ['m0', 'm1', 'm3', 'm4'], 'msgs after the delete') }
await p(peer.db.log.compact)() await p(peer.db.log.compact)()
assert('compacted') assert('compacted')
await p(peer.db.del)(msgIDs[4])
{
const texts = []
for (const msg of peer.db.msgs()) {
if (msg.data && msg.metadata.account?.length > 4) {
texts.push(msg.data.text)
}
}
assert.deepEqual(texts, ['m0', 'm1'], 'msgs when deleted after compacted')
}
await p(peer.close)(true) await p(peer.close)(true)
const log = Log(path.join(DIR, 'db', 'log'), { const log = Log(path.join(DIR, 'db', 'log'), {
@ -101,7 +115,7 @@ test('del()', async (t) => {
const stats2 = await p(log.stats)() const stats2 = await p(log.stats)()
assert.deepEqual( assert.deepEqual(
stats2, stats2,
{ totalBytes: 3495, deletedBytes: 0 }, { totalBytes: 2880, deletedBytes: 615 },
'stats after delete and compact' 'stats after delete and compact'
) )
@ -109,7 +123,7 @@ test('del()', async (t) => {
persistedMsgs persistedMsgs
.filter((msg) => msg.data && msg.metadata.account?.length > 4) .filter((msg) => msg.data && msg.metadata.account?.length > 4)
.map((msg) => msg.data.text), .map((msg) => msg.data.text),
['m0', 'm1', 'm3', 'm4'], ['m0', 'm1'],
'msgs in disk after the delete' 'msgs in disk after the delete'
) )
}) })