log: implement compaction (sift-clone and rename)

This commit is contained in:
Andre Staltz 2023-11-23 15:04:40 +02:00
parent d21c7ed697
commit a84dd297a5
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
5 changed files with 395 additions and 212 deletions

View File

@ -62,11 +62,6 @@ function appendLargerThanBlockErr() {
return new Error('Data to be appended is larger than block size') return new Error('Data to be appended is larger than block size')
} }
function unexpectedTruncationErr() {
// prettier-ignore
return new Error('truncate() is trying to *increase* the log size, which is totally unexpected. There may be a logic bug in the log')
}
module.exports = { module.exports = {
ErrorWithCode, ErrorWithCode,
nanOffsetErr, nanOffsetErr,
@ -77,5 +72,4 @@ module.exports = {
compactWithMaxLiveStreamErr, compactWithMaxLiveStreamErr,
overwriteLargerThanOld, overwriteLargerThanOld,
appendLargerThanBlockErr, appendLargerThanBlockErr,
unexpectedTruncationErr,
} }

View File

@ -1,12 +1,13 @@
const fs = require('fs') const fs = require('fs')
const b4a = require('b4a') const b4a = require('b4a')
const p = require('promisify-tuple')
const Cache = require('@alloc/quick-lru') // @ts-ignore const Cache = require('@alloc/quick-lru') // @ts-ignore
const RAF = require('polyraf') // @ts-ignore const RAF = require('polyraf') // @ts-ignore
const Obv = require('obz') // @ts-ignore const Obv = require('obz') // @ts-ignore
const AtomicFile = require('atomic-file-rw') // @ts-ignore const AtomicFile = require('atomic-file-rw') // @ts-ignore
const debounce = require('lodash.debounce') // @ts-ignore const debounce = require('lodash.debounce') // @ts-ignore
const isBufferZero = require('is-buffer-zero') // @ts-ignore const isBufferZero = require('is-buffer-zero') // @ts-ignore
const debug = require('debug')('log') // @ts-ignore const debug = require('debug')('ppppp-db:log') // @ts-ignore
const mutexify = require('mutexify') const mutexify = require('mutexify')
const { const {
@ -14,14 +15,11 @@ const {
nanOffsetErr, nanOffsetErr,
negativeOffsetErr, negativeOffsetErr,
outOfBoundsOffsetErr, outOfBoundsOffsetErr,
// delDuringCompactErr,
appendLargerThanBlockErr, appendLargerThanBlockErr,
unexpectedTruncationErr,
overwriteLargerThanOld, overwriteLargerThanOld,
// compactWithMaxLiveStreamErr, delDuringCompactErr,
} = require('./errors') } = require('./errors')
const Record = require('./record') const Record = require('./record')
// const Compaction = require('./compaction') // FIXME:
/** /**
* @typedef {Buffer | Uint8Array} B4A * @typedef {Buffer | Uint8Array} B4A
@ -54,8 +52,8 @@ const Record = require('./record')
/** /**
* @template T * @template T
* @typedef {T extends void ? * @typedef {T extends void ?
* (...args: [Error] | []) => void : * (...args: [NodeJS.ErrnoException] | []) => void :
* (...args: [Error] | [null, T]) => void * (...args: [NodeJS.ErrnoException] | [null, T]) => void
* } CB * } CB
*/ */
@ -72,7 +70,14 @@ const DEFAULT_BLOCK_SIZE = 65536
const DEFAULT_WRITE_TIMEOUT = 250 const DEFAULT_WRITE_TIMEOUT = 250
const DEFAULT_VALIDATE = () => true const DEFAULT_VALIDATE = () => true
// const COMPACTION_PROGRESS_EMIT_INTERVAL = 1000 // FIXME: const COMPACTION_PROGRESS_START = { percent: 0, done: false }
const COMPACTION_PROGRESS_END_EMPTY = {
percent: 1,
done: true,
sizeDiff: 0,
holesFound: 0,
}
const COMPACTION_PROGRESS_EMIT_INTERVAL = 500
/** /**
* @template [T=B4A] * @template [T=B4A]
@ -88,7 +93,7 @@ function Log(filename, opts) {
) )
const cache = new Cache({ maxSize: 1024 }) // This is potentially 64 MiB! const cache = new Cache({ maxSize: 1024 }) // This is potentially 64 MiB!
const raf = RAF(filename) let raf = RAF(filename)
const statsFilename = filename + '.stats' const statsFilename = filename + '.stats'
const blockSize = opts?.blockSize ?? DEFAULT_BLOCK_SIZE const blockSize = opts?.blockSize ?? DEFAULT_BLOCK_SIZE
const codec = opts?.codec ?? DEFAULT_CODEC const codec = opts?.codec ?? DEFAULT_CODEC
@ -116,56 +121,41 @@ function Log(filename, opts) {
let nextOffsetInBlock = /** @type {number | null} */ (null) let nextOffsetInBlock = /** @type {number | null} */ (null)
let deletedBytes = 0 let deletedBytes = 0
const since = Obv() // offset of last written record const since = Obv() // offset of last written record
// let compaction = null // FIXME:
// const compactionProgress = Obv() let compacting = false
// if (typeof window !== 'undefined') { const compactionProgress = Obv()
// // fs sync not working in browser compactionProgress.set(COMPACTION_PROGRESS_START)
// compactionProgress.set({ percent: 1, done: true, sizeDiff: 0 }) /** @type {Array<CB<any>>} */
// } else { const waitingCompaction = []
// compactionProgress.set(
// Compaction.stateFileExists(filename)
// ? { percent: 0, done: false }
// : { percent: 1, done: true, sizeDiff: 0 }
// )
// }
// const waitingCompaction = []
// onLoad(function maybeResumeCompaction() {
// // fs sync not working in browser
// if (typeof window !== 'undefined') return
// if (Compaction.stateFileExists(filename)) {
// compact(function onCompactDone(err) {
// if (err) throw err
// })
// }
// })()
AtomicFile.readFile( AtomicFile.readFile(
statsFilename, statsFilename,
'utf8', 'utf8',
/** @type {CB<string>} */ /** @type {CB<string>} */ function doneLoadingStatsFile(err, json) {
function statsUp(err, json) {
if (err) { if (err) {
debug('error loading stats: %s', err.message) // prettier-ignore
if (err.code !== 'ENOENT') debug('Failed loading stats file: %s', err.message)
deletedBytes = 0 deletedBytes = 0
} else { } else {
try { try {
const stats = JSON.parse(json) const stats = JSON.parse(json)
deletedBytes = stats.deletedBytes deletedBytes = stats.deletedBytes
} catch (err) { } catch (err) {
debug('error parsing stats: %s', /** @type {Error} */ (err).message) // prettier-ignore
debug('Failed parsing stats file: %s', /** @type {Error} */ (err).message)
deletedBytes = 0 deletedBytes = 0
} }
} }
raf.stat( raf.stat(
/** @type {CB<{size: number}>} */ /** @type {CB<{size: number}>} */ function onRAFStatDone(err, stat) {
function onRAFStatDone(err, stat) { // prettier-ignore
if (err) debug('failed to stat ' + filename, err) if (err && err.code !== 'ENOENT') debug('Failed to read %s stats: %s', filename, err.message)
const fileSize = stat ? stat.size : -1 const fileSize = stat ? stat.size : -1
if (fileSize <= 0) { if (fileSize <= 0) {
debug('empty file') debug('Opened log file, which is empty')
latestBlockBuf = b4a.alloc(blockSize) latestBlockBuf = b4a.alloc(blockSize)
latestBlockIndex = 0 latestBlockIndex = 0
nextOffsetInBlock = 0 nextOffsetInBlock = 0
@ -177,10 +167,13 @@ function Log(filename, opts) {
const blockStart = fileSize - blockSize const blockStart = fileSize - blockSize
loadLatestBlock(blockStart, function onLoadedLatestBlock(err) { loadLatestBlock(blockStart, function onLoadedLatestBlock(err) {
if (err) throw err if (err) throw err
debug('opened file, since: %d', since.value) debug('Opened log file, since: %d', since.value)
compact(function doneCompactingOnStartup(err) {
if (err) debug('Failed compacting on startup: %s', err.message)
// @ts-ignore // @ts-ignore
while (waitingLoad.length) waitingLoad.shift()() while (waitingLoad.length) waitingLoad.shift()()
}) })
})
} }
} }
) )
@ -274,29 +267,6 @@ function Log(filename, opts) {
}) })
} }
/**
* @param {number} newSize
* @param {CB<void>} cb
*/
function truncateWithFSync(newSize, cb) {
writeLock(function onWriteLockReleasedForTruncate(unlock) {
raf.del(
newSize,
Infinity,
function onRAFDeleteDone(/** @type {Error | null} */ err) {
if (err) return unlock(cb, err)
if (raf.fd) {
fs.fsync(raf.fd, function onFSyncDoneForTruncate(err) {
if (err) unlock(cb, err)
else unlock(cb, null, undefined)
})
} else unlock(cb, null, undefined)
}
)
})
}
/** /**
* @param {B4A} blockBuf * @param {B4A} blockBuf
* @param {number} badOffsetInBlock * @param {number} badOffsetInBlock
@ -305,7 +275,8 @@ function Log(filename, opts) {
* @param {CB<number>} cb * @param {CB<number>} cb
*/ */
function fixBlock(blockBuf, badOffsetInBlock, blockStart, successValue, cb) { function fixBlock(blockBuf, badOffsetInBlock, blockStart, successValue, cb) {
debug('found invalid record at %d, fixing last block', badOffsetInBlock) // prettier-ignore
debug('Fixing block with an invalid record at block offset %d', badOffsetInBlock)
blockBuf.fill(0, badOffsetInBlock, blockSize) blockBuf.fill(0, badOffsetInBlock, blockSize)
writeWithFSync(blockStart, blockBuf, successValue, cb) writeWithFSync(blockStart, blockBuf, successValue, cb)
} }
@ -341,11 +312,11 @@ function Log(filename, opts) {
const blockIndex = getBlockIndex(offset) const blockIndex = getBlockIndex(offset)
if (cache.has(blockIndex)) { if (cache.has(blockIndex)) {
debug('getting offset %d from cache', offset) debug('Reading block at log offset %d from cache', offset)
const cachedBlockBuf = cache.get(blockIndex) const cachedBlockBuf = cache.get(blockIndex)
cb(null, cachedBlockBuf) cb(null, cachedBlockBuf)
} else { } else {
debug('getting offset %d from disc', offset) debug('Reading block at log offset %d from disc', offset)
const blockStart = getBlockStart(offset) const blockStart = getBlockStart(offset)
raf.read( raf.read(
blockStart, blockStart,
@ -396,9 +367,10 @@ function Log(filename, opts) {
* * `>0`: next record within block * * `>0`: next record within block
* @param {Buffer} blockBuf * @param {Buffer} blockBuf
* @param {number} offset * @param {number} offset
* @return {[number, extractCodecType<typeof codec> | null, number]} * @param {boolean} asRaw
* @return {[number, extractCodecType<typeof codec> | B4A | null, number]}
*/ */
function getDataNextOffset(blockBuf, offset) { function getDataNextOffset(blockBuf, offset, asRaw = false) {
const offsetInBlock = getOffsetInBlock(offset) const offsetInBlock = getOffsetInBlock(offset)
const [dataBuf, recSize, dataLength, emptyLength] = Record.read( const [dataBuf, recSize, dataLength, emptyLength] = Record.read(
blockBuf, blockBuf,
@ -415,24 +387,30 @@ function Log(filename, opts) {
} }
if (dataLength === 0 && emptyLength > 0) return [nextOffset, null, recSize] if (dataLength === 0 && emptyLength > 0) return [nextOffset, null, recSize]
else return [nextOffset, codec.decode(dataBuf), recSize] else return [nextOffset, asRaw ? dataBuf : codec.decode(dataBuf), recSize]
} }
/** /**
* @param {(offset: number, data: extractCodecType<typeof codec> | null, size: number) => void} onNext * @param {(offset: number, data: extractCodecType<typeof codec> | null, size: number) => Promise<void> | void} onNext
* @param {(error?: Error) => void} onDone * @param {(error?: Error) => void} onDone
*/ */
function scan(onNext, onDone) { function scan(onNext, onDone, asRaw = false) {
let cursor = 0 let cursor = 0
const gotNextBlock = const gotNextBlock =
/** @type {CB<B4A>} */ /** @type {CB<B4A>} */
( (
(err, blockBuf) => { async (err, blockBuf) => {
if (err) return onDone(err) if (err) return onDone(err)
if (isBufferZero(blockBuf)) return onDone() if (isBufferZero(blockBuf)) return onDone()
while (true) { while (true) {
const [offset, data, size] = getDataNextOffset(blockBuf, cursor) const [offset, data, size] = getDataNextOffset(
onNext(cursor, data, size) blockBuf,
cursor,
asRaw
)
// @ts-ignore
const promise = onNext(cursor, data, size)
if (promise) await promise
if (offset === 0) { if (offset === 0) {
cursor = getNextBlockStart(cursor) cursor = getNextBlockStart(cursor)
getNextBlock() getNextBlock()
@ -457,10 +435,10 @@ function Log(filename, opts) {
* @param {CB<void>} cb * @param {CB<void>} cb
*/ */
function del(offset, cb) { function del(offset, cb) {
// if (compaction) { // FIXME: if (compacting) {
// cb(delDuringCompactErr()) cb(delDuringCompactErr())
// return return
// } }
const blockIndex = getBlockIndex(offset) const blockIndex = getBlockIndex(offset)
if (blocksToBeWritten.has(blockIndex)) { if (blocksToBeWritten.has(blockIndex)) {
onDrain(function delAfterDrained() { onDrain(function delAfterDrained() {
@ -520,9 +498,10 @@ function Log(filename, opts) {
blockBuf, blockBuf,
null, null,
function flushedOverwrites(err, _) { function flushedOverwrites(err, _) {
if (err) debug('error flushing overwrites with fsync: %s', err.message) if (err) debug('Failed to flush overwrites with fsync: %s', err.message)
saveStats(function onSavedStats(err, _) { saveStats(function onSavedStats(err, _) {
if (err) debug('error saving stats: %s', err.message) // prettier-ignore
if (err) debug('Failed to save stats file after flugshing overwrites: %s', err.message)
flushingOverwrites = false flushingOverwrites = false
if (err) { if (err) {
for (const cb of waitingFlushOverwrites) cb(err) for (const cb of waitingFlushOverwrites) cb(err)
@ -563,7 +542,8 @@ function Log(filename, opts) {
latestBlockBuf = nextBlockBuf latestBlockBuf = nextBlockBuf
latestBlockIndex += 1 latestBlockIndex += 1
nextOffsetInBlock = 0 nextOffsetInBlock = 0
debug("data doesn't fit current block, creating new") // prettier-ignore
debug('New block created at log offset %d to fit new record', latestBlockIndex * blockSize)
} }
assert(latestBlockBuf, 'latestBlockBuf not set') assert(latestBlockBuf, 'latestBlockBuf not set')
@ -576,7 +556,7 @@ function Log(filename, opts) {
}) })
nextOffsetInBlock += Record.size(encodedData) nextOffsetInBlock += Record.size(encodedData)
scheduleWrite() scheduleWrite()
debug('data inserted at offset %d', offset) debug('New record written at log offset %d', offset)
return offset return offset
} }
@ -585,10 +565,10 @@ function Log(filename, opts) {
* @param {CB<number>} cb * @param {CB<number>} cb
*/ */
function append(data, cb) { function append(data, cb) {
// if (compaction) { // FIXME: if (compacting) {
// waitingCompaction.push(() => append(data, cb)) waitingCompaction.push(() => append(data, cb))
// return return
// } }
let offset let offset
try { try {
@ -612,19 +592,19 @@ function Log(filename, opts) {
blocksToBeWritten.delete(blockIndex) blocksToBeWritten.delete(blockIndex)
// prettier-ignore // prettier-ignore
debug('writing block of size: %d, to offset: %d', blockBuf.length, blockIndex * blockSize) debug('Writing block of size %d at log offset %d', blockBuf.length, blockStart)
writingBlockIndex = blockIndex writingBlockIndex = blockIndex
writeWithFSync(blockStart, blockBuf, null, function onBlockWritten(err, _) { writeWithFSync(blockStart, blockBuf, null, function onBlockWritten(err, _) {
const drainsBefore = (waitingDrain.get(blockIndex) || []).slice(0) const drainsBefore = (waitingDrain.get(blockIndex) || []).slice(0)
writingBlockIndex = -1 writingBlockIndex = -1
if (err) { if (err) {
debug('failed to write block %d', blockIndex) debug('Failed to write block at log offset %d', blockStart)
throw err throw err
} else { } else {
since.set(offset) since.set(offset)
// prettier-ignore // prettier-ignore
debug('draining the waiting queue for %d, items: %d', blockIndex, drainsBefore.length) if (drainsBefore.length > 0) debug('Draining the waiting queue (%d functions) for the block at log offset %d', drainsBefore.length, blockStart)
for (let i = 0; i < drainsBefore.length; ++i) drainsBefore[i]() for (let i = 0; i < drainsBefore.length; ++i) drainsBefore[i]()
// the resumed streams might have added more to waiting // the resumed streams might have added more to waiting
@ -635,7 +615,7 @@ function Log(filename, opts) {
else else
waitingDrain.set( waitingDrain.set(
blockIndex, blockIndex,
// @ts-ignore // FIXME: // @ts-ignore
waitingDrain.get(blockIndex).slice(drainsBefore.length) waitingDrain.get(blockIndex).slice(drainsBefore.length)
) )
@ -644,17 +624,6 @@ function Log(filename, opts) {
}) })
} }
/**
* @param {number} blockIndex
* @param {B4A } blockBuf
* @param {CB<null>} cb
*/
function overwriteBlock(blockIndex, blockBuf, cb) {
cache.set(blockIndex, blockBuf)
const blockStart = blockIndex * blockSize
writeWithFSync(blockStart, blockBuf, null, cb)
}
/** /**
* @param {number} offset * @param {number} offset
* @param {extractCodecType<typeof codec>} data * @param {extractCodecType<typeof codec>} data
@ -694,39 +663,6 @@ function Log(filename, opts) {
}) })
} }
/**
* @param {number} newLatestBlockIndex
* @param {CB<number>} cb
*/
function truncate(newLatestBlockIndex, cb) {
assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set')
if (newLatestBlockIndex > latestBlockIndex) {
return cb(unexpectedTruncationErr())
}
if (newLatestBlockIndex === latestBlockIndex) {
const blockStart = latestBlockIndex * blockSize
loadLatestBlock(blockStart, function onTruncateLoadedLatestBlock1(err) {
if (err) cb(err)
else cb(null, 0)
})
return
}
const size = (latestBlockIndex + 1) * blockSize
const newSize = (newLatestBlockIndex + 1) * blockSize
for (let i = newLatestBlockIndex + 1; i < latestBlockIndex; ++i) {
cache.delete(i)
}
truncateWithFSync(newSize, function onTruncateWithFSyncDone(err) {
if (err) return cb(err)
const blockStart = newSize - blockSize
loadLatestBlock(blockStart, function onTruncateLoadedLatestBlock2(err) {
if (err) return cb(err)
const sizeDiff = size - newSize
cb(null, sizeDiff)
})
})
}
/** /**
* @param {CB<{ totalBytes: number; deletedBytes: number }>} cb * @param {CB<{ totalBytes: number; deletedBytes: number }>} cb
*/ */
@ -742,61 +678,165 @@ function Log(filename, opts) {
} }
/** /**
* @param {CB<unknown>} cb * @param {CB<void>} cb
*/ */
function saveStats(cb) { function saveStats(cb) {
const stats = JSON.stringify({ deletedBytes }) const stats = JSON.stringify({ deletedBytes })
AtomicFile.writeFile(statsFilename, stats, 'utf8', cb) AtomicFile.writeFile(statsFilename, stats, 'utf8', cb)
} }
/** @type {CB<void>} */
function logError(err) {
if (err) console.error(err)
}
/** /**
* @param {CB<unknown>} cb * Compaction is the process of removing deleted records from the log by
* creating a new log with only the undeleted records, and then atomically
* swapping the new log for the old one.
* @param {CB<void>?} cb
*/ */
// function compact(cb) { // FIXME: async function compact(cb) {
// if (compaction) { if (compacting) {
// debug('compaction already in progress') if (cb) waitingCompaction.push(cb)
// waitingCompaction.push(cb) return
// return }
// } cb ??= logError
// for (const stream of self.streams) { const debug2 = debug.extend('compact')
// if (stream.live && (stream.max || stream.max_inclusive)) { if (deletedBytes === 0) {
// return cb(compactWithMaxLiveStreamErr()) debug2('Skipping compaction since there are no deleted bytes')
// } compactionProgress.set(COMPACTION_PROGRESS_END_EMPTY)
// } return cb()
// onStreamsDone(function startCompactAfterStreamsDone() { }
// onDrain(function startCompactAfterDrain() { await p(onDrain)()
// onOverwritesFlushed(function startCompactAfterDeletes() { const [err1] = await p(onOverwritesFlushed)()
// if (compactionProgress.value.done) { if (err1) return cb(err1)
// compactionProgress.set({ percent: 0, done: false })
// } compacting = true
// compaction = new Compaction(self, (/** @type {any} */ err, /** @type {any} */ stats) => { if (compactionProgress.value.done) {
// compaction = null compactionProgress.set(COMPACTION_PROGRESS_START)
// if (err) return cb(err) }
// deletedBytes = 0
// saveStats(function onSavedStatsAfterCompaction(err) { const filenameNew = filename + '.compacting'
// if (err) const [err2] = await p(fs.unlink.bind(fs))(filenameNew)
// debug('error saving stats after compaction: %s', err.message) if (err2 && err2.code !== 'ENOENT') return cb(err2)
// })
// for (const stream of self.streams) { const rafNew = RAF(filenameNew)
// if (stream.live) stream.postCompactionReset(since.value)
// } /**
// compactionProgress.set({ ...stats, percent: 1, done: true }) * @param {number} blockIndex
// for (const callback of waitingCompaction) callback() * @param {B4A} blockBuf
// waitingCompaction.length = 0 * @returns {Promise<void>}
// cb() */
// }) function writeBlock(blockIndex, blockBuf) {
// let prevUpdate = 0 const blockStart = blockIndex * blockSize
// compaction.progress((/** @type {any} */ stats) => { // prettier-ignore
// const now = Date.now() debug2('Writing block of size %d at log offset %d', blockBuf.length, blockStart)
// if (now - prevUpdate > COMPACTION_PROGRESS_EMIT_INTERVAL) { return new Promise((resolve, reject) => {
// prevUpdate = now rafNew.write(
// compactionProgress.set({ ...stats, done: false }) blockStart,
// } blockBuf,
// }) /** @type {CB<void>} */
// }) function onCompactRAFWriteDone(err) {
// }) if (err) return reject(err)
// }) if (rafNew.fd) {
// } fs.fsync(rafNew.fd, function onCompactFSyncDone(err) {
if (err) reject(err)
else resolve()
})
} else resolve()
}
)
})
}
// Scan the old log and write blocks on the new log
let latestBlockBufNew = b4a.alloc(blockSize)
let latestBlockIndexNew = 0
let nextOffsetInBlockNew = 0
let holesFound = 0
let timestampLastEmit = Date.now()
const oldLogSize = since.value
const err3 = await new Promise((done) => {
scan(
function compactScanningRecord(oldOffset, data, size) {
const now = Date.now()
if (now - timestampLastEmit > COMPACTION_PROGRESS_EMIT_INTERVAL) {
timestampLastEmit = now
const percent = oldOffset / oldLogSize
compactionProgress.set({ percent, done: false })
}
if (!data) {
holesFound += 1
return
}
const dataBuf = /** @type {B4A} */ (/** @type {any} */ (data))
/** @type {Promise<void> | undefined} */
let promiseWriteBlock = void 0
if (hasNoSpaceFor(dataBuf, nextOffsetInBlockNew)) {
promiseWriteBlock = writeBlock(
latestBlockIndexNew,
latestBlockBufNew
)
latestBlockBufNew = b4a.alloc(blockSize)
latestBlockIndexNew += 1
nextOffsetInBlockNew = 0
// prettier-ignore
debug2('New block created at log offset %d to fit new record', latestBlockIndexNew * blockSize)
}
Record.write(latestBlockBufNew, nextOffsetInBlockNew, dataBuf)
debug2(
'New record written at log offset %d',
latestBlockIndexNew * blockSize + nextOffsetInBlockNew
)
nextOffsetInBlockNew += Record.size(dataBuf)
return promiseWriteBlock
},
done,
true
)
})
if (err3) {
await p(rafNew.close.bind(rafNew))()
compacting = false
return cb(err3)
}
await writeBlock(latestBlockIndexNew, latestBlockBufNew)
// Swap the new log for the old one
const [[err4], [err5]] = await Promise.all([
p(raf.close.bind(raf))(),
p(rafNew.close.bind(rafNew))(),
])
if (err4 ?? err5) {
compacting = false
return cb(err4 ?? err5)
}
const [err6] = await p(fs.rename.bind(fs))(filenameNew, filename)
if (err6) {
compacting = false
return cb(err6)
}
raf = RAF(filename)
latestBlockBuf = latestBlockBufNew
latestBlockIndex = latestBlockIndexNew
nextOffsetInBlock = nextOffsetInBlockNew
cache.clear()
const nextSince = latestBlockIndex * blockSize + nextOffsetInBlock
const sizeDiff = oldLogSize - nextSince
since.set(nextSince)
compacting = false
deletedBytes = 0
saveStats(function onSavedStatsAfterCompaction(err) {
if (err) debug2('Failed to save stats file: %s', err.message)
})
compactionProgress.set({ percent: 1, done: true, sizeDiff, holesFound })
for (const callback of waitingCompaction) callback()
waitingCompaction.length = 0
cb()
}
/** /**
* @param {CB<unknown>} cb * @param {CB<unknown>} cb
@ -827,13 +867,13 @@ function Log(filename, opts) {
} }
/** /**
* @param {CallableFunction} fn * @param {() => void} fn
*/ */
function onDrain(fn) { function onDrain(fn) {
// if (compaction) { // FIXME: if (compacting) {
// waitingCompaction.push(fn) waitingCompaction.push(fn)
// return return
// } }
if (blocksToBeWritten.size === 0 && writingBlockIndex === -1) fn() if (blocksToBeWritten.size === 0 && writingBlockIndex === -1) fn()
else { else {
const latestBlockIndex = /** @type {number} */ ( const latestBlockIndex = /** @type {number} */ (
@ -865,18 +905,11 @@ function Log(filename, opts) {
close: onLoad(close), // TODO close: onLoad(close), // TODO
onDrain: onLoad(onDrain), // TODO onDrain: onLoad(onDrain), // TODO
onOverwritesFlushed: onLoad(onOverwritesFlushed), onOverwritesFlushed: onLoad(onOverwritesFlushed),
// compact: onLoad(compact), // FIXME: compact: onLoad(compact), // TODO
// compactionProgress, compactionProgress,
since, since,
stats, // TODO stats, // TODO
// Internals needed by ./compaction.js:
filename,
blockSize,
overwriteBlock,
truncate,
hasNoSpaceFor,
// Useful for tests // Useful for tests
_get: onLoad(get), _get: onLoad(get),
} }

View File

@ -42,6 +42,7 @@
"ppppp-keypair": "github:staltz/ppppp-keypair", "ppppp-keypair": "github:staltz/ppppp-keypair",
"polyraf": "^1.1.0", "polyraf": "^1.1.0",
"promisify-4loc": "~1.0.0", "promisify-4loc": "~1.0.0",
"promisify-tuple": "~1.2.0",
"push-stream": "~11.2.0", "push-stream": "~11.2.0",
"set.prototype.union": "~1.0.2" "set.prototype.union": "~1.0.2"
}, },

162
test/log/compact.test.js Normal file
View File

@ -0,0 +1,162 @@
const test = require('node:test')
const assert = require('node:assert')
const p = require('node:util').promisify
const Log = require('../../lib/log')
test('Log compaction', async (t) => {
await t.test('compact a log that does not have holes', async (t) => {
const file = '/tmp/ppppp-db-log-compaction-test-' + Date.now() + '.log'
const log = Log(file, { blockSize: 15 })
const stats = await p(log.stats)()
assert.equal(stats.totalBytes, 0, 'stats.totalBytes (1)')
assert.equal(stats.deletedBytes, 0, 'stats.deletedBytes (1)')
const buf1 = Buffer.from('first')
const buf2 = Buffer.from('second')
const offset1 = await p(log.append)(buf1)
const offset2 = await p(log.append)(buf2)
await p(log.onDrain)()
assert('append two records')
const stats2 = await p(log.stats)()
assert.equal(stats2.totalBytes, 15, 'stats.totalBytes (2)')
assert.equal(stats2.deletedBytes, 0, 'stats.deletedBytes (2)')
const progressArr = []
log.compactionProgress((stats) => {
progressArr.push(stats)
})
await p(log.compact)()
await p(log.onDrain)()
assert.deepEqual(
progressArr,
[
{ percent: 0, done: false },
{ percent: 1, done: true, sizeDiff: 0, holesFound: 0 },
],
'progress events'
)
await new Promise((resolve, reject) => {
const arr = []
log.scan(
(offset, data, size) => {
arr.push(data)
},
(err) => {
if (err) return reject(err)
assert.deepEqual(arr, [buf1, buf2], 'both records exist')
resolve()
}
)
})
await p(log.close)()
})
await t.test('delete first record, compact, stream', async (t) => {
const file = '/tmp/ppppp-db-log-compaction-test-' + Date.now() + '.log'
const log = Log(file, { blockSize: 15 })
const buf1 = Buffer.from('first')
const buf2 = Buffer.from('second')
const progressArr = []
log.compactionProgress((stats) => {
progressArr.push(stats)
})
const offset1 = await p(log.append)(buf1)
const offset2 = await p(log.append)(buf2)
await p(log.onDrain)()
assert('append two records')
await p(log.del)(offset1)
await p(log.onOverwritesFlushed)()
assert('delete first record')
await p(log.compact)()
await p(log.onDrain)()
assert.deepEqual(
progressArr,
[
{ percent: 0, done: false },
{ percent: 1, done: true, sizeDiff: 5, holesFound: 1 },
],
'progress events'
)
await new Promise((resolve, reject) => {
const arr = []
log.scan(
(offset, data, size) => {
arr.push(data)
},
(err) => {
if (err) return reject(err)
assert.deepEqual(arr, [buf2], 'only second record exists')
resolve()
}
)
})
await p(log.close)()
})
await t.test('delete last record, compact, stream', async (t) => {
const file = '/tmp/ppppp-db-log-compaction-test-' + Date.now() + '.log'
const log = Log(file, { blockSize: 15 })
const buf1 = Buffer.from('first')
const buf2 = Buffer.from('second')
const buf3 = Buffer.from('third')
const offset1 = await p(log.append)(buf1)
const offset2 = await p(log.append)(buf2)
const offset3 = await p(log.append)(buf3)
await p(log.onDrain)()
assert('append three records')
await p(log.del)(offset3)
await p(log.onOverwritesFlushed)()
assert('delete third record')
await new Promise((resolve, reject) => {
const arr = []
log.scan(
(offset, data, size) => {
arr.push(data)
},
(err) => {
if (err) return reject(err)
assert.deepEqual(arr, [buf1, buf2, null], 'all blocks')
resolve()
}
)
})
await p(log.compact)()
await p(log.onDrain)()
await new Promise((resolve, reject) => {
const arr = []
log.scan(
(offset, data, size) => {
arr.push(data)
},
(err) => {
if (err) return reject(err)
assert.deepEqual(arr, [buf1, buf2], 'last block truncated away')
resolve()
}
)
})
await p(log.close)()
})
})

View File

@ -48,7 +48,7 @@ test('Log deletes', async (t) => {
await p(log.close)() await p(log.close)()
}) })
await t.test('Deleted records are not invalid upon re-opening', async (t) => { await t.test('Deleted records auto-compacted upon re-opening', async (t) => {
const file = '/tmp/ppppp-db-log-test-del-invalid.log' const file = '/tmp/ppppp-db-log-test-del-invalid.log'
try { try {
fs.unlinkSync(file) fs.unlinkSync(file)
@ -94,19 +94,12 @@ test('Log deletes', async (t) => {
}, },
(err) => { (err) => {
assert.ifError(err) assert.ifError(err)
assert.deepEqual(arr, [{ text: 'm0' }, null, { text: 'm2' }]) assert.deepEqual(arr, [{ text: 'm0' }, { text: 'm2' }])
resolve() resolve()
} }
) )
}) })
await assert.rejects(p(log2._get)(offset2), (err) => {
assert.ok(err)
assert.equal(err.message, 'Record has been deleted')
assert.equal(err.code, 'ERR_AAOL_DELETED_RECORD')
return true
})
await p(log2.close)() await p(log2.close)()
}) })