Log: fix concurrent overwrite() and compact()

This commit is contained in:
Andre Staltz 2024-01-18 16:40:39 +02:00
parent 844b808e78
commit 9e41400cdc
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
2 changed files with 13 additions and 9 deletions

View File

@ -634,6 +634,11 @@ function Log(filename, opts) {
* @param {CB<void>} cb * @param {CB<void>} cb
*/ */
function overwrite(offset, data, cb) { function overwrite(offset, data, cb) {
if (compacting) {
waitingCompaction.push(() => overwrite(offset, data, cb))
return
}
let encodedData = codec.encode(data) let encodedData = codec.encode(data)
if (typeof encodedData === 'string') encodedData = b4a.from(encodedData) if (typeof encodedData === 'string') encodedData = b4a.from(encodedData)
@ -707,10 +712,6 @@ function Log(filename, opts) {
* @param {CB<void>?} cb * @param {CB<void>?} cb
*/ */
async function compact(cb) { async function compact(cb) {
if (compacting) {
if (cb) waitingCompaction.push(cb)
return
}
cb ??= logError cb ??= logError
const debug2 = debug.extend('compact') const debug2 = debug.extend('compact')
if (deletedBytes === 0) { if (deletedBytes === 0) {
@ -724,9 +725,13 @@ function Log(filename, opts) {
// prettier-ignore // prettier-ignore
return cb(new Error('Compact failed to pre-flush overwrites', { cause: err1 })) return cb(new Error('Compact failed to pre-flush overwrites', { cause: err1 }))
} }
if (compacting) {
if (cb) waitingCompaction.push(cb)
return
}
compacting = true
const startCompactTimestamp = Date.now() const startCompactTimestamp = Date.now()
compacting = true
if (compactionProgress.value.done) { if (compactionProgress.value.done) {
compactionProgress.set(COMPACTION_PROGRESS_START) compactionProgress.set(COMPACTION_PROGRESS_START)
} }
@ -734,6 +739,7 @@ function Log(filename, opts) {
const filenameNew = filename + '.compacting' const filenameNew = filename + '.compacting'
const [err2] = await p(fs.unlink.bind(fs))(filenameNew) const [err2] = await p(fs.unlink.bind(fs))(filenameNew)
if (err2 && err2.code !== 'ENOENT') { if (err2 && err2.code !== 'ENOENT') {
compacting = false
// prettier-ignore // prettier-ignore
return cb(new Error('Compact failed to get rid of previous compacting log', { cause: err2 })) return cb(new Error('Compact failed to get rid of previous compacting log', { cause: err2 }))
} }
@ -846,15 +852,16 @@ function Log(filename, opts) {
const nextSince = latestBlockIndex * blockSize + nextOffsetInBlock const nextSince = latestBlockIndex * blockSize + nextOffsetInBlock
const sizeDiff = oldTotalBytes - getTotalBytes() const sizeDiff = oldTotalBytes - getTotalBytes()
lastRecOffset.set(nextSince) lastRecOffset.set(nextSince)
compacting = false
const duration = Date.now() - startCompactTimestamp const duration = Date.now() - startCompactTimestamp
debug2('Completed in %d ms', duration) debug2('Completed in %d ms', duration)
deletedBytes = 0 deletedBytes = 0
const [err7] = await p(saveStats)() const [err7] = await p(saveStats)()
if (err7) { if (err7) {
compacting = false
return cb(new Error('Compact failed to save stats file', { cause: err7 })) return cb(new Error('Compact failed to save stats file', { cause: err7 }))
} }
compactionProgress.set({ percent: 1, done: true, sizeDiff, holesFound }) compactionProgress.set({ percent: 1, done: true, sizeDiff, holesFound })
compacting = false
for (const callback of waitingCompaction) callback() for (const callback of waitingCompaction) callback()
waitingCompaction.length = 0 waitingCompaction.length = 0
cb() cb()

View File

@ -30,7 +30,6 @@ test('Log compaction', async (t) => {
}) })
await p(log.compact)() await p(log.compact)()
await p(log.onDrain)()
assert.deepEqual( assert.deepEqual(
progressArr, progressArr,
@ -88,7 +87,6 @@ test('Log compaction', async (t) => {
assert('delete first record') assert('delete first record')
await p(log.compact)() await p(log.compact)()
await p(log.onDrain)()
assert.deepEqual( assert.deepEqual(
progressArr, progressArr,
@ -153,7 +151,6 @@ test('Log compaction', async (t) => {
}) })
await p(log.compact)() await p(log.compact)()
await p(log.onDrain)()
await new Promise((resolve, reject) => { await new Promise((resolve, reject) => {
const arr = [] const arr = []