improve debug logs in the Log module

This commit is contained in:
Andre Staltz 2024-01-18 16:25:00 +02:00
parent 17742f9ed2
commit 69a0a1f60c
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
1 changed files with 24 additions and 18 deletions

View File

@ -168,7 +168,7 @@ function Log(filename, opts) {
loadLatestBlock(blockStart, function onLoadedLatestBlock(err) { loadLatestBlock(blockStart, function onLoadedLatestBlock(err) {
if (err) throw err if (err) throw err
// prettier-ignore // prettier-ignore
debug('Opened log file, last record is at log offset %d', lastRecOffset.value) debug('Opened log file, last record is at log offset %d, block %d', lastRecOffset.value, latestBlockIndex)
// @ts-ignore // @ts-ignore
while (waitingLoad.length) waitingLoad.shift()() while (waitingLoad.length) waitingLoad.shift()()
}) })
@ -274,7 +274,7 @@ function Log(filename, opts) {
*/ */
function fixBlock(blockBuf, badOffsetInBlock, blockStart, successValue, cb) { function fixBlock(blockBuf, badOffsetInBlock, blockStart, successValue, cb) {
// prettier-ignore // prettier-ignore
debug('Fixing block with an invalid record at block offset %d', badOffsetInBlock) debug('Fixing a block with an invalid record at block offset %d', badOffsetInBlock)
blockBuf.fill(0, badOffsetInBlock, blockSize) blockBuf.fill(0, badOffsetInBlock, blockSize)
writeWithFSync(blockStart, blockBuf, successValue, cb) writeWithFSync(blockStart, blockBuf, successValue, cb)
} }
@ -310,11 +310,11 @@ function Log(filename, opts) {
const blockIndex = getBlockIndex(offset) const blockIndex = getBlockIndex(offset)
if (cache.has(blockIndex)) { if (cache.has(blockIndex)) {
debug('Reading block at log offset %d from cache', offset) debug('Reading block %d at log offset %d from cache', blockIndex, offset)
const cachedBlockBuf = cache.get(blockIndex) const cachedBlockBuf = cache.get(blockIndex)
cb(null, cachedBlockBuf) cb(null, cachedBlockBuf)
} else { } else {
debug('Reading block at log offset %d from disc', offset) debug('Reading block %d at log offset %d from disc', blockIndex, offset)
const blockStart = getBlockStart(offset) const blockStart = getBlockStart(offset)
raf.read( raf.read(
blockStart, blockStart,
@ -455,6 +455,8 @@ function Log(filename, opts) {
deletedBytes += Record.readSize(blockBufNow, offsetInBlock) deletedBytes += Record.readSize(blockBufNow, offsetInBlock)
blocksWithOverwritables.set(blockIndex, blockBufNow) blocksWithOverwritables.set(blockIndex, blockBufNow)
scheduleFlushOverwrites() scheduleFlushOverwrites()
// prettier-ignore
debug('Deleted record at log offset %d, block %d, block offset %d', offset, blockIndex, offsetInBlock)
cb() cb()
} }
) )
@ -541,9 +543,11 @@ function Log(filename, opts) {
latestBlockIndex += 1 latestBlockIndex += 1
nextOffsetInBlock = 0 nextOffsetInBlock = 0
// prettier-ignore // prettier-ignore
debug('New block created at log offset %d to fit new record', latestBlockIndex * blockSize) debug('Block %d created at log offset %d to fit new record', latestBlockIndex, latestBlockIndex * blockSize)
} }
// prettier-ignore
debug('Appending record at log offset %d, blockIndex %d, block offset %d', latestBlockIndex * blockSize + nextOffsetInBlock, latestBlockIndex, nextOffsetInBlock)
assert(latestBlockBuf, 'latestBlockBuf not set') assert(latestBlockBuf, 'latestBlockBuf not set')
Record.write(latestBlockBuf, nextOffsetInBlock, encodedData) Record.write(latestBlockBuf, nextOffsetInBlock, encodedData)
cache.set(latestBlockIndex, latestBlockBuf) // update cache cache.set(latestBlockIndex, latestBlockBuf) // update cache
@ -554,7 +558,6 @@ function Log(filename, opts) {
}) })
nextOffsetInBlock += Record.size(encodedData) nextOffsetInBlock += Record.size(encodedData)
scheduleWrite() scheduleWrite()
debug('New record written at log offset %d', offset)
return offset return offset
} }
@ -590,32 +593,35 @@ function Log(filename, opts) {
blocksToBeWritten.delete(blockIndex) blocksToBeWritten.delete(blockIndex)
// prettier-ignore // prettier-ignore
debug('Writing block of size %d at log offset %d', blockBuf.length, blockStart) debug('Writing block %d of size %d at log offset %d', blockIndex, blockBuf.length, blockStart)
writingBlockIndex = blockIndex writingBlockIndex = blockIndex
writeWithFSync(blockStart, blockBuf, null, function onBlockWritten(err, _) { writeWithFSync(blockStart, blockBuf, null, function onBlockWritten(err, _) {
const drainsBefore = (waitingDrain.get(blockIndex) || []).slice(0) const drainsBefore = (waitingDrain.get(blockIndex) || []).slice(0)
writingBlockIndex = -1 writingBlockIndex = -1
if (err) { if (err) {
debug('Failed to write block at log offset %d', blockStart) // prettier-ignore
debug('Failed to write block %d at log offset %d', blockIndex, blockStart)
throw err throw err
} else { } else {
lastRecOffset.set(offset) lastRecOffset.set(offset)
// prettier-ignore // prettier-ignore
if (drainsBefore.length > 0) debug('Draining the waiting queue (%d functions) for the block at log offset %d', drainsBefore.length, blockStart) if (drainsBefore.length > 0) debug('Draining the waiting queue (%d functions) for block %d at log offset %d', drainsBefore.length, blockIndex, blockStart)
for (let i = 0; i < drainsBefore.length; ++i) drainsBefore[i]() for (let i = 0; i < drainsBefore.length; ++i) drainsBefore[i]()
// the resumed streams might have added more to waiting // the resumed streams might have added more to waiting
let drainsAfter = waitingDrain.get(blockIndex) || [] let drainsAfter = waitingDrain.get(blockIndex) || []
if (drainsBefore.length === drainsAfter.length) if (drainsBefore.length === drainsAfter.length) {
waitingDrain.delete(blockIndex) waitingDrain.delete(blockIndex)
else if (drainsAfter.length === 0) waitingDrain.delete(blockIndex) } else if (drainsAfter.length === 0) {
else waitingDrain.delete(blockIndex)
} else {
waitingDrain.set( waitingDrain.set(
blockIndex, blockIndex,
// @ts-ignore // @ts-ignore
waitingDrain.get(blockIndex).slice(drainsBefore.length) waitingDrain.get(blockIndex).slice(drainsBefore.length)
) )
}
write() // next! write() // next!
} }
@ -657,6 +663,8 @@ function Log(filename, opts) {
Record.write(blockBufNow, offsetInBlock, encodedData, newEmptyLength) Record.write(blockBufNow, offsetInBlock, encodedData, newEmptyLength)
blocksWithOverwritables.set(blockIndex, blockBufNow) blocksWithOverwritables.set(blockIndex, blockBufNow)
scheduleFlushOverwrites() scheduleFlushOverwrites()
// prettier-ignore
debug('Overwrote record at log offset %d, block %d, block offset %d', offset, blockIndex, offsetInBlock)
cb() cb()
}) })
} }
@ -740,7 +748,7 @@ function Log(filename, opts) {
function writeBlock(blockIndex, blockBuf) { function writeBlock(blockIndex, blockBuf) {
const blockStart = blockIndex * blockSize const blockStart = blockIndex * blockSize
// prettier-ignore // prettier-ignore
debug2('Writing block of size %d at log offset %d', blockBuf.length, blockStart) debug2('Writing block %d of size %d at log offset %d', blockIndex, blockBuf.length, blockStart)
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
rafNew.write( rafNew.write(
blockStart, blockStart,
@ -793,14 +801,12 @@ function Log(filename, opts) {
latestBlockIndexNew += 1 latestBlockIndexNew += 1
nextOffsetInBlockNew = 0 nextOffsetInBlockNew = 0
// prettier-ignore // prettier-ignore
debug2('New block created at log offset %d to fit new record', latestBlockIndexNew * blockSize) debug2('Block %d created for log offset %d to fit new record', latestBlockIndexNew, latestBlockIndexNew * blockSize)
} }
Record.write(latestBlockBufNew, nextOffsetInBlockNew, dataBuf) Record.write(latestBlockBufNew, nextOffsetInBlockNew, dataBuf)
debug2( // prettier-ignore
'New record written at log offset %d', debug2('Record copied into log offset %d, block %d, block offset %d', latestBlockIndexNew * blockSize + nextOffsetInBlockNew, latestBlockIndexNew, nextOffsetInBlockNew)
latestBlockIndexNew * blockSize + nextOffsetInBlockNew
)
nextOffsetInBlockNew += Record.size(dataBuf) nextOffsetInBlockNew += Record.size(dataBuf)
return promiseWriteBlock return promiseWriteBlock
}, },