log: improve overwrite() with scheduled flushes

This commit is contained in:
Andre Staltz 2023-11-10 14:50:22 +02:00
parent 2f527613c2
commit 9e7feb3d41
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
3 changed files with 59 additions and 48 deletions

View File

@ -103,12 +103,12 @@ function Log(filename, opts) {
/** @type {Map<BlockIndex, Array<CallableFunction>>} */
const waitingDrain = new Map() // blockIndex -> []
/** @type {Array<CB<any>>} */
const waitingFlushDelete = []
const waitingFlushOverwrites = []
/** @type {Map<BlockIndex, {blockBuf: B4A; offset: number}>} */
const blocksToBeWritten = new Map() // blockIndex -> { blockBuf, offset }
/** @type {Map<BlockIndex, B4A>} */
const blocksWithDeletables = new Map() // blockIndex -> blockBuf
let flushingDelete = false
const blocksWithOverwritables = new Map() // blockIndex -> blockBuf
let flushingOverwrites = false
let writingBlockIndex = -1
let latestBlockBuf = /** @type {B4A | null} */ (null)
@ -473,18 +473,20 @@ function Log(filename, opts) {
(err, blockBuf) => {
if (err) return cb(err)
assert(blockBuf, 'blockBuf should be defined in gotBlockForDelete')
const actualBlockBuf = blocksWithDeletables.get(blockIndex) ?? blockBuf
const blockBufNow = blocksWithOverwritables.get(blockIndex) ?? blockBuf
const offsetInBlock = getOffsetInBlock(offset)
Record.overwriteAsEmpty(actualBlockBuf, offsetInBlock)
deletedBytes += Record.readSize(actualBlockBuf, offsetInBlock)
blocksWithDeletables.set(blockIndex, actualBlockBuf)
scheduleFlushDelete()
Record.overwriteAsEmpty(blockBufNow, offsetInBlock)
deletedBytes += Record.readSize(blockBufNow, offsetInBlock)
blocksWithOverwritables.set(blockIndex, blockBufNow)
scheduleFlushOverwrites()
cb()
}
)
if (blocksWithDeletables.has(blockIndex)) {
const blockBuf = /** @type {any} */ (blocksWithDeletables.get(blockIndex))
if (blocksWithOverwritables.has(blockIndex)) {
const blockBuf = /** @type {any} */ (
blocksWithOverwritables.get(blockIndex)
)
gotBlockForDelete(null, blockBuf)
} else {
getBlock(offset, gotBlockForDelete)
@ -499,40 +501,46 @@ function Log(filename, opts) {
return offsetInBlock + Record.size(dataBuf) + Record.EOB_SIZE > blockSize
}
const scheduleFlushDelete = debounce(flushDelete, writeTimeout)
const scheduleFlushOverwrites = debounce(flushOverwrites, writeTimeout)
function flushDelete() {
if (blocksWithDeletables.size === 0) {
for (const cb of waitingFlushDelete) cb()
waitingFlushDelete.length = 0
function flushOverwrites() {
if (blocksWithOverwritables.size === 0) {
for (const cb of waitingFlushOverwrites) cb()
waitingFlushOverwrites.length = 0
return
}
const blockIndex = blocksWithDeletables.keys().next().value
const blockIndex = blocksWithOverwritables.keys().next().value
const blockStart = blockIndex * blockSize
const blockBuf = blocksWithDeletables.get(blockIndex)
blocksWithDeletables.delete(blockIndex)
flushingDelete = true
const blockBuf = blocksWithOverwritables.get(blockIndex)
blocksWithOverwritables.delete(blockIndex)
flushingOverwrites = true
writeWithFSync(blockStart, blockBuf, null, function flushedDelete(err, _) {
writeWithFSync(
blockStart,
blockBuf,
null,
function flushedOverwrites(err, _) {
if (err) debug('error flushing overwrites with fsync: %s', err.message)
saveStats(function onSavedStats(err, _) {
if (err) debug('error saving stats: %s', err.message)
flushingDelete = false
flushingOverwrites = false
if (err) {
for (const cb of waitingFlushDelete) cb(err)
waitingFlushDelete.length = 0
for (const cb of waitingFlushOverwrites) cb(err)
waitingFlushOverwrites.length = 0
return
}
flushDelete() // next
})
flushOverwrites() // next
})
}
)
}
/**
* @param {CB<void>} cb
*/
function onDeletesFlushed(cb) {
if (flushingDelete || blocksWithDeletables.size > 0) {
waitingFlushDelete.push(cb)
function onOverwritesFlushed(cb) {
if (flushingOverwrites || blocksWithOverwritables.size > 0) {
waitingFlushOverwrites.push(cb)
} else cb()
}
@ -650,7 +658,7 @@ function Log(filename, opts) {
/**
* @param {number} offset
* @param {extractCodecType<typeof codec>} data
* @param {CB<null>} cb
* @param {CB<void>} cb
*/
function overwrite(offset, data, cb) {
let encodedData = codec.encode(data)
@ -659,6 +667,7 @@ function Log(filename, opts) {
assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set')
assert(typeof nextOffsetInBlock === 'number', 'nextOffsetInBlock not set')
const logSize = latestBlockIndex * blockSize + nextOffsetInBlock
const blockIndex = getBlockIndex(offset)
if (typeof offset !== 'number') return cb(nanOffsetErr(offset))
if (isNaN(offset)) return cb(nanOffsetErr(offset))
if (offset < 0) return cb(negativeOffsetErr(offset))
@ -667,9 +676,10 @@ function Log(filename, opts) {
// Get the existing record at offset
getBlock(offset, function gotBlock(err, blockBuf) {
if (err) return cb(err)
const blockBufNow = blocksWithOverwritables.get(blockIndex) ?? blockBuf
const offsetInBlock = getOffsetInBlock(offset)
const oldDataLength = Record.readDataLength(blockBuf, offsetInBlock)
const oldEmptyLength = Record.readEmptyLength(blockBuf, offsetInBlock)
const oldDataLength = Record.readDataLength(blockBufNow, offsetInBlock)
const oldEmptyLength = Record.readEmptyLength(blockBufNow, offsetInBlock)
// Make sure encodedData fits inside existing record
if (Record.size(encodedData) > oldDataLength + oldEmptyLength) {
return cb(overwriteLargerThanOld())
@ -677,10 +687,10 @@ function Log(filename, opts) {
const newEmptyLength = oldDataLength - encodedData.length
deletedBytes += newEmptyLength
// write
Record.write(blockBuf, offsetInBlock, encodedData, newEmptyLength)
const blockStart = getBlockStart(offset)
writeWithFSync(blockStart, blockBuf, null, cb)
Record.write(blockBufNow, offsetInBlock, encodedData, newEmptyLength)
blocksWithOverwritables.set(blockIndex, blockBufNow)
scheduleFlushOverwrites()
cb()
})
}
@ -755,7 +765,7 @@ function Log(filename, opts) {
// }
// onStreamsDone(function startCompactAfterStreamsDone() {
// onDrain(function startCompactAfterDrain() {
// onDeletesFlushed(function startCompactAfterDeletes() {
// onOverwritesFlushed(function startCompactAfterDeletes() {
// if (compactionProgress.value.done) {
// compactionProgress.set({ percent: 0, done: false })
// }
@ -793,7 +803,7 @@ function Log(filename, opts) {
*/
function close(cb) {
onDrain(function closeAfterHavingDrained() {
onDeletesFlushed(function closeAfterDeletesFlushed() {
onOverwritesFlushed(function closeAfterOverwritesFlushed() {
raf.close(cb)
})
})
@ -854,7 +864,7 @@ function Log(filename, opts) {
overwrite: onLoad(overwrite), // TODO
close: onLoad(close), // TODO
onDrain: onLoad(onDrain), // TODO
onDeletesFlushed: onLoad(onDeletesFlushed),
onOverwritesFlushed: onLoad(onOverwritesFlushed),
// compact: onLoad(compact), // FIXME:
// compactionProgress,
since,

View File

@ -37,7 +37,7 @@ test('Log deletes', async (t) => {
assert.equal(buf3.toString(), msg3.toString())
await p(log.del)(offset2)
await p(log.onDeletesFlushed)()
await p(log.onOverwritesFlushed)()
await assert.rejects(p(log._get)(offset2), (err) => {
assert.ok(err)
assert.equal(err.message, 'Record has been deleted')
@ -80,7 +80,7 @@ test('Log deletes', async (t) => {
const offset3 = await p(log.append)({ text: 'm2' })
await p(log.del)(offset2)
await p(log.onDeletesFlushed)()
await p(log.onOverwritesFlushed)()
await p(log.close)()
@ -122,7 +122,7 @@ test('Log deletes', async (t) => {
await p(log.del)(offset1)
await p(log.onDrain)()
await p(log.onDeletesFlushed)()
await p(log.onOverwritesFlushed)()
const arr = []
await new Promise((resolve) => {
@ -166,7 +166,7 @@ test('Log deletes', async (t) => {
if (process.env.VERBOSE) console.timeEnd('delete ' + TOTAL / 2)
assert('deleted messages')
await p(log.onDeletesFlushed)()
await p(log.onOverwritesFlushed)()
await new Promise((resolve) => {
let i = 0

View File

@ -26,6 +26,7 @@ test('Log overwrites', async (t) => {
assert.equal(buf2.toString(), msg2.toString())
await p(log.overwrite)(offset1, Buffer.from('hi world'))
await p(log.onOverwritesFlushed)()
const buf = await p(log._get)(offset1)
assert.equal(buf.toString(), 'hi world')