diff --git a/lib/log/errors.js b/lib/log/errors.js index d505a51..83d26d7 100644 --- a/lib/log/errors.js +++ b/lib/log/errors.js @@ -62,11 +62,6 @@ function appendLargerThanBlockErr() { return new Error('Data to be appended is larger than block size') } -function unexpectedTruncationErr() { - // prettier-ignore - return new Error('truncate() is trying to *increase* the log size, which is totally unexpected. There may be a logic bug in the log') -} - module.exports = { ErrorWithCode, nanOffsetErr, @@ -77,5 +72,4 @@ module.exports = { compactWithMaxLiveStreamErr, overwriteLargerThanOld, appendLargerThanBlockErr, - unexpectedTruncationErr, } diff --git a/lib/log/index.js b/lib/log/index.js index d0e0434..4cbb35d 100644 --- a/lib/log/index.js +++ b/lib/log/index.js @@ -1,12 +1,13 @@ const fs = require('fs') const b4a = require('b4a') +const p = require('promisify-tuple') const Cache = require('@alloc/quick-lru') // @ts-ignore const RAF = require('polyraf') // @ts-ignore const Obv = require('obz') // @ts-ignore const AtomicFile = require('atomic-file-rw') // @ts-ignore const debounce = require('lodash.debounce') // @ts-ignore const isBufferZero = require('is-buffer-zero') // @ts-ignore -const debug = require('debug')('log') // @ts-ignore +const debug = require('debug')('ppppp-db:log') // @ts-ignore const mutexify = require('mutexify') const { @@ -14,14 +15,11 @@ const { nanOffsetErr, negativeOffsetErr, outOfBoundsOffsetErr, - // delDuringCompactErr, appendLargerThanBlockErr, - unexpectedTruncationErr, overwriteLargerThanOld, - // compactWithMaxLiveStreamErr, + delDuringCompactErr, } = require('./errors') const Record = require('./record') -// const Compaction = require('./compaction') // FIXME: /** * @typedef {Buffer | Uint8Array} B4A @@ -54,8 +52,8 @@ const Record = require('./record') /** * @template T * @typedef {T extends void ? - * (...args: [Error] | []) => void : - * (...args: [Error] | [null, T]) => void + * (...args: [NodeJS.ErrnoException] | []) => void : + * (...args: [NodeJS.ErrnoException] | [null, T]) => void * } CB */ @@ -72,7 +70,14 @@ const DEFAULT_BLOCK_SIZE = 65536 const DEFAULT_WRITE_TIMEOUT = 250 const DEFAULT_VALIDATE = () => true -// const COMPACTION_PROGRESS_EMIT_INTERVAL = 1000 // FIXME: +const COMPACTION_PROGRESS_START = { percent: 0, done: false } +const COMPACTION_PROGRESS_END_EMPTY = { + percent: 1, + done: true, + sizeDiff: 0, + holesFound: 0, +} +const COMPACTION_PROGRESS_EMIT_INTERVAL = 500 /** * @template [T=B4A] @@ -88,7 +93,7 @@ function Log(filename, opts) { ) const cache = new Cache({ maxSize: 1024 }) // This is potentially 64 MiB! - const raf = RAF(filename) + let raf = RAF(filename) const statsFilename = filename + '.stats' const blockSize = opts?.blockSize ?? DEFAULT_BLOCK_SIZE const codec = opts?.codec ?? DEFAULT_CODEC @@ -116,56 +121,41 @@ function Log(filename, opts) { let nextOffsetInBlock = /** @type {number | null} */ (null) let deletedBytes = 0 const since = Obv() // offset of last written record - // let compaction = null // FIXME: - // const compactionProgress = Obv() - // if (typeof window !== 'undefined') { - // // fs sync not working in browser - // compactionProgress.set({ percent: 1, done: true, sizeDiff: 0 }) - // } else { - // compactionProgress.set( - // Compaction.stateFileExists(filename) - // ? { percent: 0, done: false } - // : { percent: 1, done: true, sizeDiff: 0 } - // ) - // } - // const waitingCompaction = [] - // onLoad(function maybeResumeCompaction() { - // // fs sync not working in browser - // if (typeof window !== 'undefined') return - // if (Compaction.stateFileExists(filename)) { - // compact(function onCompactDone(err) { - // if (err) throw err - // }) - // } - // })() + + let compacting = false + const compactionProgress = Obv() + compactionProgress.set(COMPACTION_PROGRESS_START) + /** @type {Array>} */ + const waitingCompaction = [] AtomicFile.readFile( statsFilename, 'utf8', - /** @type {CB} */ - function statsUp(err, json) { + /** @type {CB} */ function doneLoadingStatsFile(err, json) { if (err) { - debug('error loading stats: %s', err.message) + // prettier-ignore + if (err.code !== 'ENOENT') debug('Failed loading stats file: %s', err.message) deletedBytes = 0 } else { try { const stats = JSON.parse(json) deletedBytes = stats.deletedBytes } catch (err) { - debug('error parsing stats: %s', /** @type {Error} */ (err).message) + // prettier-ignore + debug('Failed parsing stats file: %s', /** @type {Error} */ (err).message) deletedBytes = 0 } } raf.stat( - /** @type {CB<{size: number}>} */ - function onRAFStatDone(err, stat) { - if (err) debug('failed to stat ' + filename, err) + /** @type {CB<{size: number}>} */ function onRAFStatDone(err, stat) { + // prettier-ignore + if (err && err.code !== 'ENOENT') debug('Failed to read %s stats: %s', filename, err.message) const fileSize = stat ? stat.size : -1 if (fileSize <= 0) { - debug('empty file') + debug('Opened log file, which is empty') latestBlockBuf = b4a.alloc(blockSize) latestBlockIndex = 0 nextOffsetInBlock = 0 @@ -177,9 +167,12 @@ function Log(filename, opts) { const blockStart = fileSize - blockSize loadLatestBlock(blockStart, function onLoadedLatestBlock(err) { if (err) throw err - debug('opened file, since: %d', since.value) - // @ts-ignore - while (waitingLoad.length) waitingLoad.shift()() + debug('Opened log file, since: %d', since.value) + compact(function doneCompactingOnStartup(err) { + if (err) debug('Failed compacting on startup: %s', err.message) + // @ts-ignore + while (waitingLoad.length) waitingLoad.shift()() + }) }) } } @@ -274,29 +267,6 @@ function Log(filename, opts) { }) } - /** - * @param {number} newSize - * @param {CB} cb - */ - function truncateWithFSync(newSize, cb) { - writeLock(function onWriteLockReleasedForTruncate(unlock) { - raf.del( - newSize, - Infinity, - function onRAFDeleteDone(/** @type {Error | null} */ err) { - if (err) return unlock(cb, err) - - if (raf.fd) { - fs.fsync(raf.fd, function onFSyncDoneForTruncate(err) { - if (err) unlock(cb, err) - else unlock(cb, null, undefined) - }) - } else unlock(cb, null, undefined) - } - ) - }) - } - /** * @param {B4A} blockBuf * @param {number} badOffsetInBlock @@ -305,7 +275,8 @@ function Log(filename, opts) { * @param {CB} cb */ function fixBlock(blockBuf, badOffsetInBlock, blockStart, successValue, cb) { - debug('found invalid record at %d, fixing last block', badOffsetInBlock) + // prettier-ignore + debug('Fixing block with an invalid record at block offset %d', badOffsetInBlock) blockBuf.fill(0, badOffsetInBlock, blockSize) writeWithFSync(blockStart, blockBuf, successValue, cb) } @@ -341,11 +312,11 @@ function Log(filename, opts) { const blockIndex = getBlockIndex(offset) if (cache.has(blockIndex)) { - debug('getting offset %d from cache', offset) + debug('Reading block at log offset %d from cache', offset) const cachedBlockBuf = cache.get(blockIndex) cb(null, cachedBlockBuf) } else { - debug('getting offset %d from disc', offset) + debug('Reading block at log offset %d from disc', offset) const blockStart = getBlockStart(offset) raf.read( blockStart, @@ -396,9 +367,10 @@ function Log(filename, opts) { * * `>0`: next record within block * @param {Buffer} blockBuf * @param {number} offset - * @return {[number, extractCodecType | null, number]} + * @param {boolean} asRaw + * @return {[number, extractCodecType | B4A | null, number]} */ - function getDataNextOffset(blockBuf, offset) { + function getDataNextOffset(blockBuf, offset, asRaw = false) { const offsetInBlock = getOffsetInBlock(offset) const [dataBuf, recSize, dataLength, emptyLength] = Record.read( blockBuf, @@ -415,24 +387,30 @@ function Log(filename, opts) { } if (dataLength === 0 && emptyLength > 0) return [nextOffset, null, recSize] - else return [nextOffset, codec.decode(dataBuf), recSize] + else return [nextOffset, asRaw ? dataBuf : codec.decode(dataBuf), recSize] } /** - * @param {(offset: number, data: extractCodecType | null, size: number) => void} onNext + * @param {(offset: number, data: extractCodecType | null, size: number) => Promise | void} onNext * @param {(error?: Error) => void} onDone */ - function scan(onNext, onDone) { + function scan(onNext, onDone, asRaw = false) { let cursor = 0 const gotNextBlock = /** @type {CB} */ ( - (err, blockBuf) => { + async (err, blockBuf) => { if (err) return onDone(err) if (isBufferZero(blockBuf)) return onDone() while (true) { - const [offset, data, size] = getDataNextOffset(blockBuf, cursor) - onNext(cursor, data, size) + const [offset, data, size] = getDataNextOffset( + blockBuf, + cursor, + asRaw + ) + // @ts-ignore + const promise = onNext(cursor, data, size) + if (promise) await promise if (offset === 0) { cursor = getNextBlockStart(cursor) getNextBlock() @@ -457,10 +435,10 @@ function Log(filename, opts) { * @param {CB} cb */ function del(offset, cb) { - // if (compaction) { // FIXME: - // cb(delDuringCompactErr()) - // return - // } + if (compacting) { + cb(delDuringCompactErr()) + return + } const blockIndex = getBlockIndex(offset) if (blocksToBeWritten.has(blockIndex)) { onDrain(function delAfterDrained() { @@ -520,9 +498,10 @@ function Log(filename, opts) { blockBuf, null, function flushedOverwrites(err, _) { - if (err) debug('error flushing overwrites with fsync: %s', err.message) + if (err) debug('Failed to flush overwrites with fsync: %s', err.message) saveStats(function onSavedStats(err, _) { - if (err) debug('error saving stats: %s', err.message) + // prettier-ignore + if (err) debug('Failed to save stats file after flugshing overwrites: %s', err.message) flushingOverwrites = false if (err) { for (const cb of waitingFlushOverwrites) cb(err) @@ -563,7 +542,8 @@ function Log(filename, opts) { latestBlockBuf = nextBlockBuf latestBlockIndex += 1 nextOffsetInBlock = 0 - debug("data doesn't fit current block, creating new") + // prettier-ignore + debug('New block created at log offset %d to fit new record', latestBlockIndex * blockSize) } assert(latestBlockBuf, 'latestBlockBuf not set') @@ -576,7 +556,7 @@ function Log(filename, opts) { }) nextOffsetInBlock += Record.size(encodedData) scheduleWrite() - debug('data inserted at offset %d', offset) + debug('New record written at log offset %d', offset) return offset } @@ -585,10 +565,10 @@ function Log(filename, opts) { * @param {CB} cb */ function append(data, cb) { - // if (compaction) { // FIXME: - // waitingCompaction.push(() => append(data, cb)) - // return - // } + if (compacting) { + waitingCompaction.push(() => append(data, cb)) + return + } let offset try { @@ -612,19 +592,19 @@ function Log(filename, opts) { blocksToBeWritten.delete(blockIndex) // prettier-ignore - debug('writing block of size: %d, to offset: %d', blockBuf.length, blockIndex * blockSize) + debug('Writing block of size %d at log offset %d', blockBuf.length, blockStart) writingBlockIndex = blockIndex writeWithFSync(blockStart, blockBuf, null, function onBlockWritten(err, _) { const drainsBefore = (waitingDrain.get(blockIndex) || []).slice(0) writingBlockIndex = -1 if (err) { - debug('failed to write block %d', blockIndex) + debug('Failed to write block at log offset %d', blockStart) throw err } else { since.set(offset) // prettier-ignore - debug('draining the waiting queue for %d, items: %d', blockIndex, drainsBefore.length) + if (drainsBefore.length > 0) debug('Draining the waiting queue (%d functions) for the block at log offset %d', drainsBefore.length, blockStart) for (let i = 0; i < drainsBefore.length; ++i) drainsBefore[i]() // the resumed streams might have added more to waiting @@ -635,7 +615,7 @@ function Log(filename, opts) { else waitingDrain.set( blockIndex, - // @ts-ignore // FIXME: + // @ts-ignore waitingDrain.get(blockIndex).slice(drainsBefore.length) ) @@ -644,17 +624,6 @@ function Log(filename, opts) { }) } - /** - * @param {number} blockIndex - * @param {B4A } blockBuf - * @param {CB} cb - */ - function overwriteBlock(blockIndex, blockBuf, cb) { - cache.set(blockIndex, blockBuf) - const blockStart = blockIndex * blockSize - writeWithFSync(blockStart, blockBuf, null, cb) - } - /** * @param {number} offset * @param {extractCodecType} data @@ -694,39 +663,6 @@ function Log(filename, opts) { }) } - /** - * @param {number} newLatestBlockIndex - * @param {CB} cb - */ - function truncate(newLatestBlockIndex, cb) { - assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set') - if (newLatestBlockIndex > latestBlockIndex) { - return cb(unexpectedTruncationErr()) - } - if (newLatestBlockIndex === latestBlockIndex) { - const blockStart = latestBlockIndex * blockSize - loadLatestBlock(blockStart, function onTruncateLoadedLatestBlock1(err) { - if (err) cb(err) - else cb(null, 0) - }) - return - } - const size = (latestBlockIndex + 1) * blockSize - const newSize = (newLatestBlockIndex + 1) * blockSize - for (let i = newLatestBlockIndex + 1; i < latestBlockIndex; ++i) { - cache.delete(i) - } - truncateWithFSync(newSize, function onTruncateWithFSyncDone(err) { - if (err) return cb(err) - const blockStart = newSize - blockSize - loadLatestBlock(blockStart, function onTruncateLoadedLatestBlock2(err) { - if (err) return cb(err) - const sizeDiff = size - newSize - cb(null, sizeDiff) - }) - }) - } - /** * @param {CB<{ totalBytes: number; deletedBytes: number }>} cb */ @@ -742,61 +678,165 @@ function Log(filename, opts) { } /** - * @param {CB} cb + * @param {CB} cb */ function saveStats(cb) { const stats = JSON.stringify({ deletedBytes }) AtomicFile.writeFile(statsFilename, stats, 'utf8', cb) } + /** @type {CB} */ + function logError(err) { + if (err) console.error(err) + } + /** - * @param {CB} cb + * Compaction is the process of removing deleted records from the log by + * creating a new log with only the undeleted records, and then atomically + * swapping the new log for the old one. + * @param {CB?} cb */ - // function compact(cb) { // FIXME: - // if (compaction) { - // debug('compaction already in progress') - // waitingCompaction.push(cb) - // return - // } - // for (const stream of self.streams) { - // if (stream.live && (stream.max || stream.max_inclusive)) { - // return cb(compactWithMaxLiveStreamErr()) - // } - // } - // onStreamsDone(function startCompactAfterStreamsDone() { - // onDrain(function startCompactAfterDrain() { - // onOverwritesFlushed(function startCompactAfterDeletes() { - // if (compactionProgress.value.done) { - // compactionProgress.set({ percent: 0, done: false }) - // } - // compaction = new Compaction(self, (/** @type {any} */ err, /** @type {any} */ stats) => { - // compaction = null - // if (err) return cb(err) - // deletedBytes = 0 - // saveStats(function onSavedStatsAfterCompaction(err) { - // if (err) - // debug('error saving stats after compaction: %s', err.message) - // }) - // for (const stream of self.streams) { - // if (stream.live) stream.postCompactionReset(since.value) - // } - // compactionProgress.set({ ...stats, percent: 1, done: true }) - // for (const callback of waitingCompaction) callback() - // waitingCompaction.length = 0 - // cb() - // }) - // let prevUpdate = 0 - // compaction.progress((/** @type {any} */ stats) => { - // const now = Date.now() - // if (now - prevUpdate > COMPACTION_PROGRESS_EMIT_INTERVAL) { - // prevUpdate = now - // compactionProgress.set({ ...stats, done: false }) - // } - // }) - // }) - // }) - // }) - // } + async function compact(cb) { + if (compacting) { + if (cb) waitingCompaction.push(cb) + return + } + cb ??= logError + const debug2 = debug.extend('compact') + if (deletedBytes === 0) { + debug2('Skipping compaction since there are no deleted bytes') + compactionProgress.set(COMPACTION_PROGRESS_END_EMPTY) + return cb() + } + await p(onDrain)() + const [err1] = await p(onOverwritesFlushed)() + if (err1) return cb(err1) + + compacting = true + if (compactionProgress.value.done) { + compactionProgress.set(COMPACTION_PROGRESS_START) + } + + const filenameNew = filename + '.compacting' + const [err2] = await p(fs.unlink.bind(fs))(filenameNew) + if (err2 && err2.code !== 'ENOENT') return cb(err2) + + const rafNew = RAF(filenameNew) + + /** + * @param {number} blockIndex + * @param {B4A} blockBuf + * @returns {Promise} + */ + function writeBlock(blockIndex, blockBuf) { + const blockStart = blockIndex * blockSize + // prettier-ignore + debug2('Writing block of size %d at log offset %d', blockBuf.length, blockStart) + return new Promise((resolve, reject) => { + rafNew.write( + blockStart, + blockBuf, + /** @type {CB} */ + function onCompactRAFWriteDone(err) { + if (err) return reject(err) + if (rafNew.fd) { + fs.fsync(rafNew.fd, function onCompactFSyncDone(err) { + if (err) reject(err) + else resolve() + }) + } else resolve() + } + ) + }) + } + + // Scan the old log and write blocks on the new log + let latestBlockBufNew = b4a.alloc(blockSize) + let latestBlockIndexNew = 0 + let nextOffsetInBlockNew = 0 + let holesFound = 0 + let timestampLastEmit = Date.now() + const oldLogSize = since.value + const err3 = await new Promise((done) => { + scan( + function compactScanningRecord(oldOffset, data, size) { + const now = Date.now() + if (now - timestampLastEmit > COMPACTION_PROGRESS_EMIT_INTERVAL) { + timestampLastEmit = now + const percent = oldOffset / oldLogSize + compactionProgress.set({ percent, done: false }) + } + if (!data) { + holesFound += 1 + return + } + const dataBuf = /** @type {B4A} */ (/** @type {any} */ (data)) + /** @type {Promise | undefined} */ + let promiseWriteBlock = void 0 + + if (hasNoSpaceFor(dataBuf, nextOffsetInBlockNew)) { + promiseWriteBlock = writeBlock( + latestBlockIndexNew, + latestBlockBufNew + ) + latestBlockBufNew = b4a.alloc(blockSize) + latestBlockIndexNew += 1 + nextOffsetInBlockNew = 0 + // prettier-ignore + debug2('New block created at log offset %d to fit new record', latestBlockIndexNew * blockSize) + } + + Record.write(latestBlockBufNew, nextOffsetInBlockNew, dataBuf) + debug2( + 'New record written at log offset %d', + latestBlockIndexNew * blockSize + nextOffsetInBlockNew + ) + nextOffsetInBlockNew += Record.size(dataBuf) + return promiseWriteBlock + }, + done, + true + ) + }) + if (err3) { + await p(rafNew.close.bind(rafNew))() + compacting = false + return cb(err3) + } + await writeBlock(latestBlockIndexNew, latestBlockBufNew) + + // Swap the new log for the old one + const [[err4], [err5]] = await Promise.all([ + p(raf.close.bind(raf))(), + p(rafNew.close.bind(rafNew))(), + ]) + if (err4 ?? err5) { + compacting = false + return cb(err4 ?? err5) + } + const [err6] = await p(fs.rename.bind(fs))(filenameNew, filename) + if (err6) { + compacting = false + return cb(err6) + } + raf = RAF(filename) + latestBlockBuf = latestBlockBufNew + latestBlockIndex = latestBlockIndexNew + nextOffsetInBlock = nextOffsetInBlockNew + cache.clear() + const nextSince = latestBlockIndex * blockSize + nextOffsetInBlock + const sizeDiff = oldLogSize - nextSince + since.set(nextSince) + compacting = false + deletedBytes = 0 + saveStats(function onSavedStatsAfterCompaction(err) { + if (err) debug2('Failed to save stats file: %s', err.message) + }) + compactionProgress.set({ percent: 1, done: true, sizeDiff, holesFound }) + for (const callback of waitingCompaction) callback() + waitingCompaction.length = 0 + cb() + } /** * @param {CB} cb @@ -827,13 +867,13 @@ function Log(filename, opts) { } /** - * @param {CallableFunction} fn + * @param {() => void} fn */ function onDrain(fn) { - // if (compaction) { // FIXME: - // waitingCompaction.push(fn) - // return - // } + if (compacting) { + waitingCompaction.push(fn) + return + } if (blocksToBeWritten.size === 0 && writingBlockIndex === -1) fn() else { const latestBlockIndex = /** @type {number} */ ( @@ -865,18 +905,11 @@ function Log(filename, opts) { close: onLoad(close), // TODO onDrain: onLoad(onDrain), // TODO onOverwritesFlushed: onLoad(onOverwritesFlushed), - // compact: onLoad(compact), // FIXME: - // compactionProgress, + compact: onLoad(compact), // TODO + compactionProgress, since, stats, // TODO - // Internals needed by ./compaction.js: - filename, - blockSize, - overwriteBlock, - truncate, - hasNoSpaceFor, - // Useful for tests _get: onLoad(get), } diff --git a/package.json b/package.json index 1377aaf..f0aa3d6 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,7 @@ "ppppp-keypair": "github:staltz/ppppp-keypair", "polyraf": "^1.1.0", "promisify-4loc": "~1.0.0", + "promisify-tuple": "~1.2.0", "push-stream": "~11.2.0", "set.prototype.union": "~1.0.2" }, diff --git a/test/log/compact.test.js b/test/log/compact.test.js new file mode 100644 index 0000000..3df1369 --- /dev/null +++ b/test/log/compact.test.js @@ -0,0 +1,162 @@ +const test = require('node:test') +const assert = require('node:assert') +const p = require('node:util').promisify +const Log = require('../../lib/log') + +test('Log compaction', async (t) => { + await t.test('compact a log that does not have holes', async (t) => { + const file = '/tmp/ppppp-db-log-compaction-test-' + Date.now() + '.log' + const log = Log(file, { blockSize: 15 }) + + const stats = await p(log.stats)() + assert.equal(stats.totalBytes, 0, 'stats.totalBytes (1)') + assert.equal(stats.deletedBytes, 0, 'stats.deletedBytes (1)') + + const buf1 = Buffer.from('first') + const buf2 = Buffer.from('second') + + const offset1 = await p(log.append)(buf1) + const offset2 = await p(log.append)(buf2) + await p(log.onDrain)() + assert('append two records') + + const stats2 = await p(log.stats)() + assert.equal(stats2.totalBytes, 15, 'stats.totalBytes (2)') + assert.equal(stats2.deletedBytes, 0, 'stats.deletedBytes (2)') + + const progressArr = [] + log.compactionProgress((stats) => { + progressArr.push(stats) + }) + + await p(log.compact)() + await p(log.onDrain)() + + assert.deepEqual( + progressArr, + [ + { percent: 0, done: false }, + { percent: 1, done: true, sizeDiff: 0, holesFound: 0 }, + ], + 'progress events' + ) + + await new Promise((resolve, reject) => { + const arr = [] + log.scan( + (offset, data, size) => { + arr.push(data) + }, + (err) => { + if (err) return reject(err) + assert.deepEqual(arr, [buf1, buf2], 'both records exist') + resolve() + } + ) + }) + + await p(log.close)() + }) + + await t.test('delete first record, compact, stream', async (t) => { + const file = '/tmp/ppppp-db-log-compaction-test-' + Date.now() + '.log' + const log = Log(file, { blockSize: 15 }) + + const buf1 = Buffer.from('first') + const buf2 = Buffer.from('second') + + const progressArr = [] + log.compactionProgress((stats) => { + progressArr.push(stats) + }) + + const offset1 = await p(log.append)(buf1) + const offset2 = await p(log.append)(buf2) + await p(log.onDrain)() + assert('append two records') + + await p(log.del)(offset1) + await p(log.onOverwritesFlushed)() + assert('delete first record') + + await p(log.compact)() + await p(log.onDrain)() + + assert.deepEqual( + progressArr, + [ + { percent: 0, done: false }, + { percent: 1, done: true, sizeDiff: 5, holesFound: 1 }, + ], + 'progress events' + ) + + await new Promise((resolve, reject) => { + const arr = [] + log.scan( + (offset, data, size) => { + arr.push(data) + }, + (err) => { + if (err) return reject(err) + assert.deepEqual(arr, [buf2], 'only second record exists') + resolve() + } + ) + }) + + await p(log.close)() + }) + + await t.test('delete last record, compact, stream', async (t) => { + const file = '/tmp/ppppp-db-log-compaction-test-' + Date.now() + '.log' + const log = Log(file, { blockSize: 15 }) + + const buf1 = Buffer.from('first') + const buf2 = Buffer.from('second') + const buf3 = Buffer.from('third') + + const offset1 = await p(log.append)(buf1) + const offset2 = await p(log.append)(buf2) + const offset3 = await p(log.append)(buf3) + await p(log.onDrain)() + assert('append three records') + + await p(log.del)(offset3) + await p(log.onOverwritesFlushed)() + assert('delete third record') + + await new Promise((resolve, reject) => { + const arr = [] + log.scan( + (offset, data, size) => { + arr.push(data) + }, + (err) => { + if (err) return reject(err) + assert.deepEqual(arr, [buf1, buf2, null], 'all blocks') + resolve() + } + ) + }) + + await p(log.compact)() + await p(log.onDrain)() + + await new Promise((resolve, reject) => { + const arr = [] + log.scan( + (offset, data, size) => { + arr.push(data) + }, + (err) => { + if (err) return reject(err) + assert.deepEqual(arr, [buf1, buf2], 'last block truncated away') + resolve() + } + ) + }) + + await p(log.close)() + }) +}) diff --git a/test/log/delete.test.js b/test/log/delete.test.js index 7505280..17f58e8 100644 --- a/test/log/delete.test.js +++ b/test/log/delete.test.js @@ -48,7 +48,7 @@ test('Log deletes', async (t) => { await p(log.close)() }) - await t.test('Deleted records are not invalid upon re-opening', async (t) => { + await t.test('Deleted records auto-compacted upon re-opening', async (t) => { const file = '/tmp/ppppp-db-log-test-del-invalid.log' try { fs.unlinkSync(file) @@ -94,19 +94,12 @@ test('Log deletes', async (t) => { }, (err) => { assert.ifError(err) - assert.deepEqual(arr, [{ text: 'm0' }, null, { text: 'm2' }]) + assert.deepEqual(arr, [{ text: 'm0' }, { text: 'm2' }]) resolve() } ) }) - await assert.rejects(p(log2._get)(offset2), (err) => { - assert.ok(err) - assert.equal(err.message, 'Record has been deleted') - assert.equal(err.code, 'ERR_AAOL_DELETED_RECORD') - return true - }) - await p(log2.close)() })