diff --git a/lib/log/index.js b/lib/log/index.js index c21045f..46fcc87 100644 --- a/lib/log/index.js +++ b/lib/log/index.js @@ -103,12 +103,12 @@ function Log(filename, opts) { /** @type {Map>} */ const waitingDrain = new Map() // blockIndex -> [] /** @type {Array>} */ - const waitingFlushDelete = [] + const waitingFlushOverwrites = [] /** @type {Map} */ const blocksToBeWritten = new Map() // blockIndex -> { blockBuf, offset } /** @type {Map} */ - const blocksWithDeletables = new Map() // blockIndex -> blockBuf - let flushingDelete = false + const blocksWithOverwritables = new Map() // blockIndex -> blockBuf + let flushingOverwrites = false let writingBlockIndex = -1 let latestBlockBuf = /** @type {B4A | null} */ (null) @@ -473,18 +473,20 @@ function Log(filename, opts) { (err, blockBuf) => { if (err) return cb(err) assert(blockBuf, 'blockBuf should be defined in gotBlockForDelete') - const actualBlockBuf = blocksWithDeletables.get(blockIndex) ?? blockBuf + const blockBufNow = blocksWithOverwritables.get(blockIndex) ?? blockBuf const offsetInBlock = getOffsetInBlock(offset) - Record.overwriteAsEmpty(actualBlockBuf, offsetInBlock) - deletedBytes += Record.readSize(actualBlockBuf, offsetInBlock) - blocksWithDeletables.set(blockIndex, actualBlockBuf) - scheduleFlushDelete() + Record.overwriteAsEmpty(blockBufNow, offsetInBlock) + deletedBytes += Record.readSize(blockBufNow, offsetInBlock) + blocksWithOverwritables.set(blockIndex, blockBufNow) + scheduleFlushOverwrites() cb() } ) - if (blocksWithDeletables.has(blockIndex)) { - const blockBuf = /** @type {any} */ (blocksWithDeletables.get(blockIndex)) + if (blocksWithOverwritables.has(blockIndex)) { + const blockBuf = /** @type {any} */ ( + blocksWithOverwritables.get(blockIndex) + ) gotBlockForDelete(null, blockBuf) } else { getBlock(offset, gotBlockForDelete) @@ -499,40 +501,46 @@ function Log(filename, opts) { return offsetInBlock + Record.size(dataBuf) + Record.EOB_SIZE > blockSize } - const scheduleFlushDelete = debounce(flushDelete, writeTimeout) + const scheduleFlushOverwrites = debounce(flushOverwrites, writeTimeout) - function flushDelete() { - if (blocksWithDeletables.size === 0) { - for (const cb of waitingFlushDelete) cb() - waitingFlushDelete.length = 0 + function flushOverwrites() { + if (blocksWithOverwritables.size === 0) { + for (const cb of waitingFlushOverwrites) cb() + waitingFlushOverwrites.length = 0 return } - const blockIndex = blocksWithDeletables.keys().next().value + const blockIndex = blocksWithOverwritables.keys().next().value const blockStart = blockIndex * blockSize - const blockBuf = blocksWithDeletables.get(blockIndex) - blocksWithDeletables.delete(blockIndex) - flushingDelete = true + const blockBuf = blocksWithOverwritables.get(blockIndex) + blocksWithOverwritables.delete(blockIndex) + flushingOverwrites = true - writeWithFSync(blockStart, blockBuf, null, function flushedDelete(err, _) { - saveStats(function onSavedStats(err, _) { - if (err) debug('error saving stats: %s', err.message) - flushingDelete = false - if (err) { - for (const cb of waitingFlushDelete) cb(err) - waitingFlushDelete.length = 0 - return - } - flushDelete() // next - }) - }) + writeWithFSync( + blockStart, + blockBuf, + null, + function flushedOverwrites(err, _) { + if (err) debug('error flushing overwrites with fsync: %s', err.message) + saveStats(function onSavedStats(err, _) { + if (err) debug('error saving stats: %s', err.message) + flushingOverwrites = false + if (err) { + for (const cb of waitingFlushOverwrites) cb(err) + waitingFlushOverwrites.length = 0 + return + } + flushOverwrites() // next + }) + } + ) } /** * @param {CB} cb */ - function onDeletesFlushed(cb) { - if (flushingDelete || blocksWithDeletables.size > 0) { - waitingFlushDelete.push(cb) + function onOverwritesFlushed(cb) { + if (flushingOverwrites || blocksWithOverwritables.size > 0) { + waitingFlushOverwrites.push(cb) } else cb() } @@ -650,7 +658,7 @@ function Log(filename, opts) { /** * @param {number} offset * @param {extractCodecType} data - * @param {CB} cb + * @param {CB} cb */ function overwrite(offset, data, cb) { let encodedData = codec.encode(data) @@ -659,6 +667,7 @@ function Log(filename, opts) { assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set') assert(typeof nextOffsetInBlock === 'number', 'nextOffsetInBlock not set') const logSize = latestBlockIndex * blockSize + nextOffsetInBlock + const blockIndex = getBlockIndex(offset) if (typeof offset !== 'number') return cb(nanOffsetErr(offset)) if (isNaN(offset)) return cb(nanOffsetErr(offset)) if (offset < 0) return cb(negativeOffsetErr(offset)) @@ -667,9 +676,10 @@ function Log(filename, opts) { // Get the existing record at offset getBlock(offset, function gotBlock(err, blockBuf) { if (err) return cb(err) + const blockBufNow = blocksWithOverwritables.get(blockIndex) ?? blockBuf const offsetInBlock = getOffsetInBlock(offset) - const oldDataLength = Record.readDataLength(blockBuf, offsetInBlock) - const oldEmptyLength = Record.readEmptyLength(blockBuf, offsetInBlock) + const oldDataLength = Record.readDataLength(blockBufNow, offsetInBlock) + const oldEmptyLength = Record.readEmptyLength(blockBufNow, offsetInBlock) // Make sure encodedData fits inside existing record if (Record.size(encodedData) > oldDataLength + oldEmptyLength) { return cb(overwriteLargerThanOld()) @@ -677,10 +687,10 @@ function Log(filename, opts) { const newEmptyLength = oldDataLength - encodedData.length deletedBytes += newEmptyLength // write - Record.write(blockBuf, offsetInBlock, encodedData, newEmptyLength) - - const blockStart = getBlockStart(offset) - writeWithFSync(blockStart, blockBuf, null, cb) + Record.write(blockBufNow, offsetInBlock, encodedData, newEmptyLength) + blocksWithOverwritables.set(blockIndex, blockBufNow) + scheduleFlushOverwrites() + cb() }) } @@ -755,7 +765,7 @@ function Log(filename, opts) { // } // onStreamsDone(function startCompactAfterStreamsDone() { // onDrain(function startCompactAfterDrain() { - // onDeletesFlushed(function startCompactAfterDeletes() { + // onOverwritesFlushed(function startCompactAfterDeletes() { // if (compactionProgress.value.done) { // compactionProgress.set({ percent: 0, done: false }) // } @@ -793,7 +803,7 @@ function Log(filename, opts) { */ function close(cb) { onDrain(function closeAfterHavingDrained() { - onDeletesFlushed(function closeAfterDeletesFlushed() { + onOverwritesFlushed(function closeAfterOverwritesFlushed() { raf.close(cb) }) }) @@ -854,7 +864,7 @@ function Log(filename, opts) { overwrite: onLoad(overwrite), // TODO close: onLoad(close), // TODO onDrain: onLoad(onDrain), // TODO - onDeletesFlushed: onLoad(onDeletesFlushed), + onOverwritesFlushed: onLoad(onOverwritesFlushed), // compact: onLoad(compact), // FIXME: // compactionProgress, since, diff --git a/test/log/delete.test.js b/test/log/delete.test.js index d783d81..7505280 100644 --- a/test/log/delete.test.js +++ b/test/log/delete.test.js @@ -37,7 +37,7 @@ test('Log deletes', async (t) => { assert.equal(buf3.toString(), msg3.toString()) await p(log.del)(offset2) - await p(log.onDeletesFlushed)() + await p(log.onOverwritesFlushed)() await assert.rejects(p(log._get)(offset2), (err) => { assert.ok(err) assert.equal(err.message, 'Record has been deleted') @@ -80,7 +80,7 @@ test('Log deletes', async (t) => { const offset3 = await p(log.append)({ text: 'm2' }) await p(log.del)(offset2) - await p(log.onDeletesFlushed)() + await p(log.onOverwritesFlushed)() await p(log.close)() @@ -122,7 +122,7 @@ test('Log deletes', async (t) => { await p(log.del)(offset1) await p(log.onDrain)() - await p(log.onDeletesFlushed)() + await p(log.onOverwritesFlushed)() const arr = [] await new Promise((resolve) => { @@ -166,7 +166,7 @@ test('Log deletes', async (t) => { if (process.env.VERBOSE) console.timeEnd('delete ' + TOTAL / 2) assert('deleted messages') - await p(log.onDeletesFlushed)() + await p(log.onOverwritesFlushed)() await new Promise((resolve) => { let i = 0 diff --git a/test/log/overwrite.test.js b/test/log/overwrite.test.js index c0dfc3e..6ff1d8c 100644 --- a/test/log/overwrite.test.js +++ b/test/log/overwrite.test.js @@ -26,6 +26,7 @@ test('Log overwrites', async (t) => { assert.equal(buf2.toString(), msg2.toString()) await p(log.overwrite)(offset1, Buffer.from('hi world')) + await p(log.onOverwritesFlushed)() const buf = await p(log._get)(offset1) assert.equal(buf.toString(), 'hi world')