log: fix how stats are counted

This commit is contained in:
Andre Staltz 2023-11-23 15:38:05 +02:00
parent a84dd297a5
commit 5e63142e50
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
4 changed files with 41 additions and 23 deletions

View File

@ -120,7 +120,7 @@ function Log(filename, opts) {
let latestBlockIndex = /** @type {number | null} */ (null) let latestBlockIndex = /** @type {number | null} */ (null)
let nextOffsetInBlock = /** @type {number | null} */ (null) let nextOffsetInBlock = /** @type {number | null} */ (null)
let deletedBytes = 0 let deletedBytes = 0
const since = Obv() // offset of last written record const lastRecOffset = Obv() // offset of last written record
let compacting = false let compacting = false
const compactionProgress = Obv() const compactionProgress = Obv()
@ -160,14 +160,15 @@ function Log(filename, opts) {
latestBlockIndex = 0 latestBlockIndex = 0
nextOffsetInBlock = 0 nextOffsetInBlock = 0
cache.set(0, latestBlockBuf) cache.set(0, latestBlockBuf)
since.set(-1) lastRecOffset.set(-1)
// @ts-ignore // @ts-ignore
while (waitingLoad.length) waitingLoad.shift()() while (waitingLoad.length) waitingLoad.shift()()
} else { } else {
const blockStart = fileSize - blockSize const blockStart = fileSize - blockSize
loadLatestBlock(blockStart, function onLoadedLatestBlock(err) { loadLatestBlock(blockStart, function onLoadedLatestBlock(err) {
if (err) throw err if (err) throw err
debug('Opened log file, since: %d', since.value) // prettier-ignore
debug('Opened log file, last record is at log offset %d', lastRecOffset.value)
compact(function doneCompactingOnStartup(err) { compact(function doneCompactingOnStartup(err) {
if (err) debug('Failed compacting on startup: %s', err.message) if (err) debug('Failed compacting on startup: %s', err.message)
// @ts-ignore // @ts-ignore
@ -201,7 +202,7 @@ function Log(filename, opts) {
latestBlockIndex = blockStart / blockSize latestBlockIndex = blockStart / blockSize
const recSize = Record.readSize(blockBuf, offsetInBlock) const recSize = Record.readSize(blockBuf, offsetInBlock)
nextOffsetInBlock = offsetInBlock + recSize nextOffsetInBlock = offsetInBlock + recSize
since.set(blockStart + offsetInBlock) lastRecOffset.set(blockStart + offsetInBlock)
cb() cb()
} }
) )
@ -380,7 +381,7 @@ function Log(filename, opts) {
let nextOffset let nextOffset
if (Record.isEOB(blockBuf, nextOffsetInBlock)) { if (Record.isEOB(blockBuf, nextOffsetInBlock)) {
if (getNextBlockStart(offset) > since.value) nextOffset = -1 if (getNextBlockStart(offset) > lastRecOffset.value) nextOffset = -1
else nextOffset = 0 else nextOffset = 0
} else { } else {
nextOffset = offset + recSize nextOffset = offset + recSize
@ -601,7 +602,7 @@ function Log(filename, opts) {
debug('Failed to write block at log offset %d', blockStart) debug('Failed to write block at log offset %d', blockStart)
throw err throw err
} else { } else {
since.set(offset) lastRecOffset.set(offset)
// prettier-ignore // prettier-ignore
if (drainsBefore.length > 0) debug('Draining the waiting queue (%d functions) for the block at log offset %d', drainsBefore.length, blockStart) if (drainsBefore.length > 0) debug('Draining the waiting queue (%d functions) for the block at log offset %d', drainsBefore.length, blockStart)
@ -663,18 +664,22 @@ function Log(filename, opts) {
}) })
} }
function getTotalBytes() {
assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set')
assert(typeof nextOffsetInBlock === 'number', 'nextOffsetInBlock not set')
return latestBlockIndex * blockSize + nextOffsetInBlock
}
/** /**
* @param {CB<{ totalBytes: number; deletedBytes: number }>} cb * @param {CB<{ totalBytes: number; deletedBytes: number }>} cb
*/ */
function stats(cb) { function stats(cb) {
if (since.value == null) { onLoad(() => {
since((/** @type {number} */ totalBytes) => { cb(null, {
cb(null, { totalBytes: Math.max(0, totalBytes), deletedBytes }) totalBytes: getTotalBytes(),
return false deletedBytes,
}) })
} else { })()
cb(null, { totalBytes: Math.max(0, since.value), deletedBytes })
}
} }
/** /**
@ -751,19 +756,20 @@ function Log(filename, opts) {
} }
// Scan the old log and write blocks on the new log // Scan the old log and write blocks on the new log
const oldTotalBytes = getTotalBytes()
const oldLastRecOffset = lastRecOffset.value
let latestBlockBufNew = b4a.alloc(blockSize) let latestBlockBufNew = b4a.alloc(blockSize)
let latestBlockIndexNew = 0 let latestBlockIndexNew = 0
let nextOffsetInBlockNew = 0 let nextOffsetInBlockNew = 0
let holesFound = 0 let holesFound = 0
let timestampLastEmit = Date.now() let timestampLastEmit = Date.now()
const oldLogSize = since.value
const err3 = await new Promise((done) => { const err3 = await new Promise((done) => {
scan( scan(
function compactScanningRecord(oldOffset, data, size) { function compactScanningRecord(oldRecOffset, data, size) {
const now = Date.now() const now = Date.now()
if (now - timestampLastEmit > COMPACTION_PROGRESS_EMIT_INTERVAL) { if (now - timestampLastEmit > COMPACTION_PROGRESS_EMIT_INTERVAL) {
timestampLastEmit = now timestampLastEmit = now
const percent = oldOffset / oldLogSize const percent = oldRecOffset / oldLastRecOffset
compactionProgress.set({ percent, done: false }) compactionProgress.set({ percent, done: false })
} }
if (!data) { if (!data) {
@ -825,8 +831,8 @@ function Log(filename, opts) {
nextOffsetInBlock = nextOffsetInBlockNew nextOffsetInBlock = nextOffsetInBlockNew
cache.clear() cache.clear()
const nextSince = latestBlockIndex * blockSize + nextOffsetInBlock const nextSince = latestBlockIndex * blockSize + nextOffsetInBlock
const sizeDiff = oldLogSize - nextSince const sizeDiff = oldTotalBytes - getTotalBytes()
since.set(nextSince) lastRecOffset.set(nextSince)
compacting = false compacting = false
deletedBytes = 0 deletedBytes = 0
saveStats(function onSavedStatsAfterCompaction(err) { saveStats(function onSavedStatsAfterCompaction(err) {
@ -907,7 +913,7 @@ function Log(filename, opts) {
onOverwritesFlushed: onLoad(onOverwritesFlushed), onOverwritesFlushed: onLoad(onOverwritesFlushed),
compact: onLoad(compact), // TODO compact: onLoad(compact), // TODO
compactionProgress, compactionProgress,
since, lastRecOffset,
stats, // TODO stats, // TODO
// Useful for tests // Useful for tests

View File

@ -50,7 +50,7 @@ test('add()', async (t) => {
await p(peer.db._getLog().onDrain)() await p(peer.db._getLog().onDrain)()
const stats = await p(peer.db.logStats)() const stats = await p(peer.db.logStats)()
assert.deepEqual(stats, { totalBytes: 943, deletedBytes: 0 }) assert.deepEqual(stats, { totalBytes: 1450, deletedBytes: 0 })
await p(peer.close)(true) await p(peer.close)(true)
}) })

View File

@ -66,7 +66,7 @@ test('Log basics', async function (t) {
}) })
await p(log.onDrain)() await p(log.onDrain)()
assert.equal(log.since.value, 22) assert.equal(log.lastRecOffset.value, 22)
const rec1 = await p(log._get)(0) const rec1 = await p(log._get)(0)
assert.deepEqual(rec1, json1) assert.deepEqual(rec1, json1)

View File

@ -21,7 +21,7 @@ test('Log compaction', async (t) => {
assert('append two records') assert('append two records')
const stats2 = await p(log.stats)() const stats2 = await p(log.stats)()
assert.equal(stats2.totalBytes, 15, 'stats.totalBytes (2)') assert.equal(stats2.totalBytes, 25, 'stats.totalBytes (2)')
assert.equal(stats2.deletedBytes, 0, 'stats.deletedBytes (2)') assert.equal(stats2.deletedBytes, 0, 'stats.deletedBytes (2)')
const progressArr = [] const progressArr = []
@ -41,6 +41,10 @@ test('Log compaction', async (t) => {
'progress events' 'progress events'
) )
const stats3 = await p(log.stats)()
assert.equal(stats3.totalBytes, 25, 'stats.totalBytes (3)')
assert.equal(stats3.deletedBytes, 0, 'stats.deletedBytes (3)')
await new Promise((resolve, reject) => { await new Promise((resolve, reject) => {
const arr = [] const arr = []
log.scan( log.scan(
@ -75,6 +79,10 @@ test('Log compaction', async (t) => {
await p(log.onDrain)() await p(log.onDrain)()
assert('append two records') assert('append two records')
const stats1 = await p(log.stats)()
assert.equal(stats1.totalBytes, 25, 'stats.totalBytes before')
assert.equal(stats1.deletedBytes, 0, 'stats.deletedBytes before')
await p(log.del)(offset1) await p(log.del)(offset1)
await p(log.onOverwritesFlushed)() await p(log.onOverwritesFlushed)()
assert('delete first record') assert('delete first record')
@ -86,11 +94,15 @@ test('Log compaction', async (t) => {
progressArr, progressArr,
[ [
{ percent: 0, done: false }, { percent: 0, done: false },
{ percent: 1, done: true, sizeDiff: 5, holesFound: 1 }, { percent: 1, done: true, sizeDiff: 15, holesFound: 1 },
], ],
'progress events' 'progress events'
) )
const stats2 = await p(log.stats)()
assert.equal(stats2.totalBytes, 10, 'stats.totalBytes after')
assert.equal(stats2.deletedBytes, 0, 'stats.deletedBytes after')
await new Promise((resolve, reject) => { await new Promise((resolve, reject) => {
const arr = [] const arr = []
log.scan( log.scan(