diff --git a/lib/log/index.js b/lib/log/index.js index 4cbb35d..5d45622 100644 --- a/lib/log/index.js +++ b/lib/log/index.js @@ -120,7 +120,7 @@ function Log(filename, opts) { let latestBlockIndex = /** @type {number | null} */ (null) let nextOffsetInBlock = /** @type {number | null} */ (null) let deletedBytes = 0 - const since = Obv() // offset of last written record + const lastRecOffset = Obv() // offset of last written record let compacting = false const compactionProgress = Obv() @@ -160,14 +160,15 @@ function Log(filename, opts) { latestBlockIndex = 0 nextOffsetInBlock = 0 cache.set(0, latestBlockBuf) - since.set(-1) + lastRecOffset.set(-1) // @ts-ignore while (waitingLoad.length) waitingLoad.shift()() } else { const blockStart = fileSize - blockSize loadLatestBlock(blockStart, function onLoadedLatestBlock(err) { if (err) throw err - debug('Opened log file, since: %d', since.value) + // prettier-ignore + debug('Opened log file, last record is at log offset %d', lastRecOffset.value) compact(function doneCompactingOnStartup(err) { if (err) debug('Failed compacting on startup: %s', err.message) // @ts-ignore @@ -201,7 +202,7 @@ function Log(filename, opts) { latestBlockIndex = blockStart / blockSize const recSize = Record.readSize(blockBuf, offsetInBlock) nextOffsetInBlock = offsetInBlock + recSize - since.set(blockStart + offsetInBlock) + lastRecOffset.set(blockStart + offsetInBlock) cb() } ) @@ -380,7 +381,7 @@ function Log(filename, opts) { let nextOffset if (Record.isEOB(blockBuf, nextOffsetInBlock)) { - if (getNextBlockStart(offset) > since.value) nextOffset = -1 + if (getNextBlockStart(offset) > lastRecOffset.value) nextOffset = -1 else nextOffset = 0 } else { nextOffset = offset + recSize @@ -601,7 +602,7 @@ function Log(filename, opts) { debug('Failed to write block at log offset %d', blockStart) throw err } else { - since.set(offset) + lastRecOffset.set(offset) // prettier-ignore if (drainsBefore.length > 0) debug('Draining the waiting queue (%d functions) for the block at log offset %d', drainsBefore.length, blockStart) @@ -663,18 +664,22 @@ function Log(filename, opts) { }) } + function getTotalBytes() { + assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set') + assert(typeof nextOffsetInBlock === 'number', 'nextOffsetInBlock not set') + return latestBlockIndex * blockSize + nextOffsetInBlock + } + /** * @param {CB<{ totalBytes: number; deletedBytes: number }>} cb */ function stats(cb) { - if (since.value == null) { - since((/** @type {number} */ totalBytes) => { - cb(null, { totalBytes: Math.max(0, totalBytes), deletedBytes }) - return false + onLoad(() => { + cb(null, { + totalBytes: getTotalBytes(), + deletedBytes, }) - } else { - cb(null, { totalBytes: Math.max(0, since.value), deletedBytes }) - } + })() } /** @@ -751,19 +756,20 @@ function Log(filename, opts) { } // Scan the old log and write blocks on the new log + const oldTotalBytes = getTotalBytes() + const oldLastRecOffset = lastRecOffset.value let latestBlockBufNew = b4a.alloc(blockSize) let latestBlockIndexNew = 0 let nextOffsetInBlockNew = 0 let holesFound = 0 let timestampLastEmit = Date.now() - const oldLogSize = since.value const err3 = await new Promise((done) => { scan( - function compactScanningRecord(oldOffset, data, size) { + function compactScanningRecord(oldRecOffset, data, size) { const now = Date.now() if (now - timestampLastEmit > COMPACTION_PROGRESS_EMIT_INTERVAL) { timestampLastEmit = now - const percent = oldOffset / oldLogSize + const percent = oldRecOffset / oldLastRecOffset compactionProgress.set({ percent, done: false }) } if (!data) { @@ -825,8 +831,8 @@ function Log(filename, opts) { nextOffsetInBlock = nextOffsetInBlockNew cache.clear() const nextSince = latestBlockIndex * blockSize + nextOffsetInBlock - const sizeDiff = oldLogSize - nextSince - since.set(nextSince) + const sizeDiff = oldTotalBytes - getTotalBytes() + lastRecOffset.set(nextSince) compacting = false deletedBytes = 0 saveStats(function onSavedStatsAfterCompaction(err) { @@ -907,7 +913,7 @@ function Log(filename, opts) { onOverwritesFlushed: onLoad(onOverwritesFlushed), compact: onLoad(compact), // TODO compactionProgress, - since, + lastRecOffset, stats, // TODO // Useful for tests diff --git a/test/add.test.js b/test/add.test.js index 9c263f4..b4ab4f6 100644 --- a/test/add.test.js +++ b/test/add.test.js @@ -50,7 +50,7 @@ test('add()', async (t) => { await p(peer.db._getLog().onDrain)() const stats = await p(peer.db.logStats)() - assert.deepEqual(stats, { totalBytes: 943, deletedBytes: 0 }) + assert.deepEqual(stats, { totalBytes: 1450, deletedBytes: 0 }) await p(peer.close)(true) }) diff --git a/test/log/basic.test.js b/test/log/basic.test.js index 5c173d8..e0e9480 100644 --- a/test/log/basic.test.js +++ b/test/log/basic.test.js @@ -66,7 +66,7 @@ test('Log basics', async function (t) { }) await p(log.onDrain)() - assert.equal(log.since.value, 22) + assert.equal(log.lastRecOffset.value, 22) const rec1 = await p(log._get)(0) assert.deepEqual(rec1, json1) diff --git a/test/log/compact.test.js b/test/log/compact.test.js index 3df1369..aa32ce9 100644 --- a/test/log/compact.test.js +++ b/test/log/compact.test.js @@ -21,7 +21,7 @@ test('Log compaction', async (t) => { assert('append two records') const stats2 = await p(log.stats)() - assert.equal(stats2.totalBytes, 15, 'stats.totalBytes (2)') + assert.equal(stats2.totalBytes, 25, 'stats.totalBytes (2)') assert.equal(stats2.deletedBytes, 0, 'stats.deletedBytes (2)') const progressArr = [] @@ -41,6 +41,10 @@ test('Log compaction', async (t) => { 'progress events' ) + const stats3 = await p(log.stats)() + assert.equal(stats3.totalBytes, 25, 'stats.totalBytes (3)') + assert.equal(stats3.deletedBytes, 0, 'stats.deletedBytes (3)') + await new Promise((resolve, reject) => { const arr = [] log.scan( @@ -75,6 +79,10 @@ test('Log compaction', async (t) => { await p(log.onDrain)() assert('append two records') + const stats1 = await p(log.stats)() + assert.equal(stats1.totalBytes, 25, 'stats.totalBytes before') + assert.equal(stats1.deletedBytes, 0, 'stats.deletedBytes before') + await p(log.del)(offset1) await p(log.onOverwritesFlushed)() assert('delete first record') @@ -86,11 +94,15 @@ test('Log compaction', async (t) => { progressArr, [ { percent: 0, done: false }, - { percent: 1, done: true, sizeDiff: 5, holesFound: 1 }, + { percent: 1, done: true, sizeDiff: 15, holesFound: 1 }, ], 'progress events' ) + const stats2 = await p(log.stats)() + assert.equal(stats2.totalBytes, 10, 'stats.totalBytes after') + assert.equal(stats2.deletedBytes, 0, 'stats.deletedBytes after') + await new Promise((resolve, reject) => { const arr = [] log.scan(