mirror of https://codeberg.org/pzp/pzp-db.git
879 lines
26 KiB
JavaScript
879 lines
26 KiB
JavaScript
const fs = require('fs')
|
|
const b4a = require('b4a')
|
|
const Cache = require('@alloc/quick-lru') // @ts-ignore
|
|
const RAF = require('polyraf') // @ts-ignore
|
|
const Obv = require('obz') // @ts-ignore
|
|
const AtomicFile = require('atomic-file-rw') // @ts-ignore
|
|
const debounce = require('lodash.debounce') // @ts-ignore
|
|
const isBufferZero = require('is-buffer-zero') // @ts-ignore
|
|
const debug = require('debug')('async-append-only-log') // @ts-ignore
|
|
const mutexify = require('mutexify')
|
|
|
|
const {
|
|
deletedRecordErr,
|
|
nanOffsetErr,
|
|
negativeOffsetErr,
|
|
outOfBoundsOffsetErr,
|
|
// delDuringCompactErr,
|
|
appendLargerThanBlockErr,
|
|
appendTransactionWantsArrayErr,
|
|
unexpectedTruncationErr,
|
|
// compactWithMaxLiveStreamErr,
|
|
} = require('./errors')
|
|
const Record = require('./record')
|
|
// const Compaction = require('./compaction') // FIXME:
|
|
|
|
/**
|
|
* @typedef {Buffer | Uint8Array} B4A
|
|
* @typedef {number} BlockIndex
|
|
* @typedef {{
|
|
* encode: (data: any) => B4A,
|
|
* decode: (data: B4A) => any
|
|
* }} Codec
|
|
* @typedef {{
|
|
* blockSize?: number,
|
|
* codec?: Codec,
|
|
* writeTimeout?: number,
|
|
* validateRecord?: (data: B4A) => boolean
|
|
* }} Options
|
|
*/
|
|
|
|
/**
|
|
* @template T
|
|
* @typedef {T extends void ?
|
|
* (...args: [Error] | []) => void :
|
|
* (...args: [Error] | [null, T]) => void
|
|
* } CB
|
|
*/
|
|
|
|
/**
|
|
* @param {unknown} check
|
|
* @param {string} message
|
|
* @returns {asserts check}
|
|
*/
|
|
function assert(check, message) {
|
|
if (!check) throw new Error(message)
|
|
}
|
|
|
|
/**
|
|
* The "End of Block" is a special field used to mark the end of a block, and
|
|
* in practice it's like a Record header "length" field, with the value 0.
|
|
* In most cases, the end region of a block will have a larger length than this,
|
|
* but we want to guarantee there is at *least* this many bytes at the end.
|
|
*/
|
|
const EOB = {
|
|
SIZE: Record.HEADER_SIZE,
|
|
asNumber: 0,
|
|
}
|
|
|
|
/** @type {Codec} */
|
|
const DEFAULT_CODEC = { encode: (x) => x, decode: (x) => x }
|
|
const DEFAULT_BLOCK_SIZE = 65536
|
|
const DEFAULT_WRITE_TIMEOUT = 250
|
|
const DEFAULT_VALIDATE = () => true
|
|
|
|
// const COMPACTION_PROGRESS_EMIT_INTERVAL = 1000 // FIXME:
|
|
|
|
/**
|
|
* @param {string} filename
|
|
* @param {Options} opts
|
|
*/
|
|
function AsyncAppendOnlyLog(filename, opts) {
|
|
const cache = new Cache({ maxSize: 1024 }) // This is potentially 64 MiB!
|
|
const raf = RAF(filename)
|
|
const statsFilename = filename + '.stats'
|
|
const blockSize = opts?.blockSize ?? DEFAULT_BLOCK_SIZE
|
|
const codec = opts?.codec ?? DEFAULT_CODEC
|
|
const writeTimeout = opts?.writeTimeout ?? DEFAULT_WRITE_TIMEOUT
|
|
const validateRecord = opts?.validateRecord ?? DEFAULT_VALIDATE
|
|
|
|
/**
|
|
* @type {Array<CallableFunction>}
|
|
*/
|
|
const waitingLoad = []
|
|
|
|
/** @type {Map<BlockIndex, Array<CallableFunction>>} */
|
|
const waitingDrain = new Map() // blockIndex -> []
|
|
/** @type {Array<CB<any>>} */
|
|
const waitingFlushDelete = []
|
|
/** @type {Map<BlockIndex, {blockBuf: B4A; offset: number}>} */
|
|
const blocksToBeWritten = new Map() // blockIndex -> { blockBuf, offset }
|
|
/** @type {Map<BlockIndex, B4A>} */
|
|
const blocksWithDeletables = new Map() // blockIndex -> blockBuf
|
|
let flushingDelete = false
|
|
let writingBlockIndex = -1
|
|
|
|
let latestBlockBuf = /** @type {B4A | null} */ (null)
|
|
let latestBlockIndex = /** @type {number | null} */ (null)
|
|
let nextOffsetInBlock = /** @type {number | null} */ (null)
|
|
let deletedBytes = 0
|
|
const since = Obv() // offset of last written record
|
|
// let compaction = null // FIXME:
|
|
// const compactionProgress = Obv()
|
|
// if (typeof window !== 'undefined') {
|
|
// // fs sync not working in browser
|
|
// compactionProgress.set({ percent: 1, done: true, sizeDiff: 0 })
|
|
// } else {
|
|
// compactionProgress.set(
|
|
// Compaction.stateFileExists(filename)
|
|
// ? { percent: 0, done: false }
|
|
// : { percent: 1, done: true, sizeDiff: 0 }
|
|
// )
|
|
// }
|
|
// const waitingCompaction = []
|
|
// onLoad(function maybeResumeCompaction() {
|
|
// // fs sync not working in browser
|
|
// if (typeof window !== 'undefined') return
|
|
// if (Compaction.stateFileExists(filename)) {
|
|
// compact(function onCompactDone(err) {
|
|
// if (err) throw err
|
|
// })
|
|
// }
|
|
// })()
|
|
|
|
AtomicFile.readFile(
|
|
statsFilename,
|
|
'utf8',
|
|
/** @type {CB<string>} */
|
|
function statsUp(err, json) {
|
|
if (err) {
|
|
debug('error loading stats: %s', err.message)
|
|
deletedBytes = 0
|
|
} else {
|
|
try {
|
|
const stats = JSON.parse(json)
|
|
deletedBytes = stats.deletedBytes
|
|
} catch (err) {
|
|
debug('error parsing stats: %s', /** @type {Error} */ (err).message)
|
|
deletedBytes = 0
|
|
}
|
|
}
|
|
|
|
raf.stat(
|
|
/** @type {CB<{size: number}>} */
|
|
function onRAFStatDone(err, stat) {
|
|
if (err) debug('failed to stat ' + filename, err)
|
|
|
|
const fileSize = stat ? stat.size : -1
|
|
|
|
if (fileSize <= 0) {
|
|
debug('empty file')
|
|
latestBlockBuf = b4a.alloc(blockSize)
|
|
latestBlockIndex = 0
|
|
nextOffsetInBlock = 0
|
|
cache.set(0, latestBlockBuf)
|
|
since.set(-1)
|
|
// @ts-ignore
|
|
while (waitingLoad.length) waitingLoad.shift()()
|
|
} else {
|
|
const blockStart = fileSize - blockSize
|
|
loadLatestBlock(blockStart, function onLoadedLatestBlock(err) {
|
|
if (err) throw err
|
|
debug('opened file, since: %d', since.value)
|
|
// @ts-ignore
|
|
while (waitingLoad.length) waitingLoad.shift()()
|
|
})
|
|
}
|
|
}
|
|
)
|
|
}
|
|
)
|
|
|
|
/**
|
|
* @param {number} blockStart
|
|
* @param {CB<void>} cb
|
|
*/
|
|
function loadLatestBlock(blockStart, cb) {
|
|
raf.read(
|
|
blockStart,
|
|
blockSize,
|
|
/** @type {CB<B4A>} */
|
|
(
|
|
function onRAFReadLastDone(err, blockBuf) {
|
|
if (err) return cb(err)
|
|
getLastGoodRecord(
|
|
blockBuf,
|
|
blockStart,
|
|
function gotLastGoodRecord(err, offsetInBlock) {
|
|
if (err) return cb(err)
|
|
latestBlockBuf = blockBuf
|
|
latestBlockIndex = blockStart / blockSize
|
|
const recSize = Record.readSize(blockBuf, offsetInBlock)
|
|
nextOffsetInBlock = offsetInBlock + recSize
|
|
since.set(blockStart + offsetInBlock)
|
|
cb()
|
|
}
|
|
)
|
|
}
|
|
)
|
|
)
|
|
}
|
|
|
|
/**
|
|
* @param {number} offset
|
|
*/
|
|
function getOffsetInBlock(offset) {
|
|
return offset % blockSize
|
|
}
|
|
|
|
/**
|
|
* @param {number} offset
|
|
*/
|
|
function getBlockStart(offset) {
|
|
return offset - getOffsetInBlock(offset)
|
|
}
|
|
|
|
/**
|
|
* @param {number} offset
|
|
*/
|
|
function getNextBlockStart(offset) {
|
|
return getBlockStart(offset) + blockSize
|
|
}
|
|
|
|
/**
|
|
* @param {number} offset
|
|
*/
|
|
function getBlockIndex(offset) {
|
|
return getBlockStart(offset) / blockSize
|
|
}
|
|
|
|
/** @type {(fn: (unlock: (cb: CB<any>, ...args: ([Error] | [null, any])) => void) => void) => void} */
|
|
const writeLock = mutexify()
|
|
|
|
/**
|
|
* @template T
|
|
* @param {number} blockStart
|
|
* @param {B4A | undefined} blockBuf
|
|
* @param {T} successValue
|
|
* @param {CB<T>} cb
|
|
*/
|
|
function writeWithFSync(blockStart, blockBuf, successValue, cb) {
|
|
writeLock(function onWriteLockReleased(unlock) {
|
|
raf.write(
|
|
blockStart,
|
|
blockBuf,
|
|
function onRAFWriteDone(/** @type {Error | null} */ err) {
|
|
if (err) return unlock(cb, err)
|
|
|
|
if (raf.fd) {
|
|
fs.fsync(raf.fd, function onFSyncDone(err) {
|
|
if (err) unlock(cb, err)
|
|
else unlock(cb, null, successValue)
|
|
})
|
|
} else unlock(cb, null, successValue)
|
|
}
|
|
)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @param {number} newSize
|
|
* @param {CB<void>} cb
|
|
*/
|
|
function truncateWithFSync(newSize, cb) {
|
|
writeLock(function onWriteLockReleasedForTruncate(unlock) {
|
|
raf.del(
|
|
newSize,
|
|
Infinity,
|
|
function onRAFDeleteDone(/** @type {Error | null} */ err) {
|
|
if (err) return unlock(cb, err)
|
|
|
|
if (raf.fd) {
|
|
fs.fsync(raf.fd, function onFSyncDoneForTruncate(err) {
|
|
if (err) unlock(cb, err)
|
|
else unlock(cb, null, undefined)
|
|
})
|
|
} else unlock(cb, null, undefined)
|
|
}
|
|
)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @param {B4A} blockBuf
|
|
* @param {number} badOffsetInBlock
|
|
* @param {number} blockStart
|
|
* @param {number} successValue
|
|
* @param {CB<number>} cb
|
|
*/
|
|
function fixBlock(blockBuf, badOffsetInBlock, blockStart, successValue, cb) {
|
|
debug('found invalid record at %d, fixing last block', badOffsetInBlock)
|
|
blockBuf.fill(0, badOffsetInBlock, blockSize)
|
|
writeWithFSync(blockStart, blockBuf, successValue, cb)
|
|
}
|
|
|
|
/**
|
|
* @param {B4A} blockBuf
|
|
* @param {number} blockStart
|
|
* @param {CB<number>} cb
|
|
*/
|
|
function getLastGoodRecord(blockBuf, blockStart, cb) {
|
|
let lastGoodOffset = 0
|
|
for (let offsetInRecord = 0; offsetInRecord < blockSize; ) {
|
|
const length = Record.readDataLength(blockBuf, offsetInRecord)
|
|
if (length === EOB.asNumber) break
|
|
const [dataBuf, recSize] = Record.read(blockBuf, offsetInRecord)
|
|
const isLengthCorrupt = offsetInRecord + recSize > blockSize
|
|
const isDataCorrupt = !isBufferZero(dataBuf) && !validateRecord(dataBuf)
|
|
if (isLengthCorrupt || isDataCorrupt) {
|
|
fixBlock(blockBuf, offsetInRecord, blockStart, lastGoodOffset, cb)
|
|
return
|
|
}
|
|
lastGoodOffset = offsetInRecord
|
|
offsetInRecord += recSize
|
|
}
|
|
|
|
cb(null, lastGoodOffset)
|
|
}
|
|
|
|
/**
|
|
* @param {number} offset
|
|
* @param {CB<B4A>} cb
|
|
*/
|
|
function getBlock(offset, cb) {
|
|
const blockIndex = getBlockIndex(offset)
|
|
|
|
if (cache.has(blockIndex)) {
|
|
debug('getting offset %d from cache', offset)
|
|
const cachedBlockBuf = cache.get(blockIndex)
|
|
cb(null, cachedBlockBuf)
|
|
} else {
|
|
debug('getting offset %d from disc', offset)
|
|
const blockStart = getBlockStart(offset)
|
|
raf.read(
|
|
blockStart,
|
|
blockSize,
|
|
/** @type {CB<B4A>} */
|
|
(
|
|
function onRAFReadDone(err, blockBuf) {
|
|
if (err) return cb(err)
|
|
cache.set(blockIndex, blockBuf)
|
|
cb(null, blockBuf)
|
|
}
|
|
)
|
|
)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param {number} offset
|
|
* @param {CB<B4A>} cb
|
|
*/
|
|
function get(offset, cb) {
|
|
assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set')
|
|
assert(typeof nextOffsetInBlock === 'number', 'nextOffsetInBlock not set')
|
|
const logSize = latestBlockIndex * blockSize + nextOffsetInBlock
|
|
if (typeof offset !== 'number') return cb(nanOffsetErr(offset))
|
|
if (isNaN(offset)) return cb(nanOffsetErr(offset))
|
|
if (offset < 0) return cb(negativeOffsetErr(offset))
|
|
if (offset >= logSize) return cb(outOfBoundsOffsetErr(offset, logSize))
|
|
|
|
getBlock(offset, function gotBlock(err, blockBuf) {
|
|
if (err) return cb(err)
|
|
const [dataBuf] = Record.read(blockBuf, getOffsetInBlock(offset))
|
|
if (isBufferZero(dataBuf)) return cb(deletedRecordErr())
|
|
cb(null, codec.decode(dataBuf))
|
|
})
|
|
}
|
|
|
|
/**
|
|
* Returns [nextOffset, decodedRecord, recordSize] where nextOffset can take 3
|
|
* forms:
|
|
* * `-1`: end of log
|
|
* * `0`: need a new block
|
|
* * `>0`: next record within block
|
|
* @param {Buffer} blockBuf
|
|
* @param {number} offset
|
|
* @param {boolean} asRaw
|
|
*/
|
|
function getDataNextOffset(blockBuf, offset, asRaw = false) {
|
|
const offsetInBlock = getOffsetInBlock(offset)
|
|
const [dataBuf, recSize] = Record.read(blockBuf, offsetInBlock)
|
|
const nextLength = Record.readDataLength(blockBuf, offsetInBlock + recSize)
|
|
|
|
let nextOffset
|
|
if (nextLength === EOB.asNumber) {
|
|
if (getNextBlockStart(offset) > since.value) nextOffset = -1
|
|
else nextOffset = 0
|
|
} else {
|
|
nextOffset = offset + recSize
|
|
}
|
|
|
|
if (isBufferZero(dataBuf)) return [nextOffset, null, recSize]
|
|
else return [nextOffset, asRaw ? dataBuf : codec.decode(dataBuf), recSize]
|
|
}
|
|
|
|
/**
|
|
* @param {(offset: number, value: B4A, size: number) => void} onNext
|
|
* @param {(error?: Error) => void} onDone
|
|
*/
|
|
function scan(onNext, onDone) {
|
|
let cursor = 0
|
|
const gotNextBlock =
|
|
/** @type {CB<B4A>} */
|
|
(
|
|
(err, blockBuf) => {
|
|
if (err) return onDone(err)
|
|
if (isBufferZero(blockBuf)) return onDone()
|
|
while (true) {
|
|
const [offset, value, size] = getDataNextOffset(blockBuf, cursor)
|
|
onNext(cursor, value, size)
|
|
if (offset === 0) {
|
|
cursor = getNextBlockStart(cursor)
|
|
getNextBlock()
|
|
return
|
|
} else if (offset === -1) {
|
|
onDone()
|
|
return
|
|
} else {
|
|
cursor = offset
|
|
}
|
|
}
|
|
}
|
|
)
|
|
function getNextBlock() {
|
|
setTimeout(getBlock, 0, cursor, gotNextBlock)
|
|
}
|
|
getNextBlock()
|
|
}
|
|
|
|
/**
|
|
* @param {number} offset
|
|
* @param {CB<void>} cb
|
|
*/
|
|
function del(offset, cb) {
|
|
// if (compaction) { // FIXME:
|
|
// cb(delDuringCompactErr())
|
|
// return
|
|
// }
|
|
const blockIndex = getBlockIndex(offset)
|
|
if (blocksToBeWritten.has(blockIndex)) {
|
|
onDrain(function delAfterDrained() {
|
|
del(offset, cb)
|
|
})
|
|
return
|
|
}
|
|
|
|
const gotBlockForDelete = /** @type {CB<B4A>} */ (
|
|
(err, blockBuf) => {
|
|
if (err) return cb(err)
|
|
assert(blockBuf, 'blockBuf should be defined in gotBlockForDelete')
|
|
const actualBlockBuf = blocksWithDeletables.get(blockIndex) ?? blockBuf
|
|
Record.overwriteWithZeroes(actualBlockBuf, getOffsetInBlock(offset))
|
|
deletedBytes += Record.readSize(
|
|
actualBlockBuf,
|
|
getOffsetInBlock(offset)
|
|
)
|
|
blocksWithDeletables.set(blockIndex, actualBlockBuf)
|
|
scheduleFlushDelete()
|
|
cb()
|
|
}
|
|
)
|
|
|
|
if (blocksWithDeletables.has(blockIndex)) {
|
|
const blockBuf = /** @type {any} */ (blocksWithDeletables.get(blockIndex))
|
|
gotBlockForDelete(null, blockBuf)
|
|
} else {
|
|
getBlock(offset, gotBlockForDelete)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param {Uint8Array} dataBuf
|
|
* @param {number} offsetInBlock
|
|
*/
|
|
function hasNoSpaceFor(dataBuf, offsetInBlock) {
|
|
return offsetInBlock + Record.size(dataBuf) + EOB.SIZE > blockSize
|
|
}
|
|
|
|
const scheduleFlushDelete = debounce(flushDelete, writeTimeout)
|
|
|
|
function flushDelete() {
|
|
if (blocksWithDeletables.size === 0) {
|
|
for (const cb of waitingFlushDelete) cb()
|
|
waitingFlushDelete.length = 0
|
|
return
|
|
}
|
|
const blockIndex = blocksWithDeletables.keys().next().value
|
|
const blockStart = blockIndex * blockSize
|
|
const blockBuf = blocksWithDeletables.get(blockIndex)
|
|
blocksWithDeletables.delete(blockIndex)
|
|
flushingDelete = true
|
|
|
|
writeWithFSync(blockStart, blockBuf, null, function flushedDelete(err, _) {
|
|
saveStats(function onSavedStats(err, _) {
|
|
if (err) debug('error saving stats: %s', err.message)
|
|
flushingDelete = false
|
|
if (err) {
|
|
for (const cb of waitingFlushDelete) cb(err)
|
|
waitingFlushDelete.length = 0
|
|
return
|
|
}
|
|
flushDelete() // next
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @param {CB<void>} cb
|
|
*/
|
|
function onDeletesFlushed(cb) {
|
|
if (flushingDelete || blocksWithDeletables.size > 0) {
|
|
waitingFlushDelete.push(cb)
|
|
} else cb()
|
|
}
|
|
|
|
/**
|
|
* @param {any} data
|
|
* @returns {number}
|
|
*/
|
|
function appendSingle(data) {
|
|
let encodedData = codec.encode(data)
|
|
if (typeof encodedData === 'string') encodedData = b4a.from(encodedData)
|
|
|
|
if (Record.size(encodedData) + EOB.SIZE > blockSize)
|
|
throw appendLargerThanBlockErr()
|
|
|
|
assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set')
|
|
assert(typeof nextOffsetInBlock === 'number', 'nextOffsetInBlock not set')
|
|
if (hasNoSpaceFor(encodedData, nextOffsetInBlock)) {
|
|
const nextBlockBuf = b4a.alloc(blockSize)
|
|
latestBlockBuf = nextBlockBuf
|
|
latestBlockIndex += 1
|
|
nextOffsetInBlock = 0
|
|
debug("data doesn't fit current block, creating new")
|
|
}
|
|
|
|
assert(latestBlockBuf, 'latestBlockBuf not set')
|
|
Record.write(latestBlockBuf, nextOffsetInBlock, encodedData)
|
|
cache.set(latestBlockIndex, latestBlockBuf) // update cache
|
|
const offset = latestBlockIndex * blockSize + nextOffsetInBlock
|
|
blocksToBeWritten.set(latestBlockIndex, {
|
|
blockBuf: latestBlockBuf,
|
|
offset,
|
|
})
|
|
nextOffsetInBlock += Record.size(encodedData)
|
|
scheduleWrite()
|
|
debug('data inserted at offset %d', offset)
|
|
return offset
|
|
}
|
|
|
|
/**
|
|
* @param {any | Array<any>} data
|
|
* @param {CB<number>} cb
|
|
*/
|
|
function append(data, cb) {
|
|
// if (compaction) { // FIXME:
|
|
// waitingCompaction.push(() => append(data, cb))
|
|
// return
|
|
// }
|
|
|
|
if (Array.isArray(data)) {
|
|
let offset = 0
|
|
for (let i = 0, length = data.length; i < length; ++i)
|
|
offset = appendSingle(data[i])
|
|
|
|
cb(null, offset)
|
|
} else cb(null, appendSingle(data))
|
|
}
|
|
|
|
/**
|
|
* @param {Array<any>} dataArray
|
|
* @param {CB<Array<number>>} cb
|
|
*/
|
|
function appendTransaction(dataArray, cb) {
|
|
if (!Array.isArray(dataArray)) {
|
|
return cb(appendTransactionWantsArrayErr())
|
|
}
|
|
// if (compaction) { // FIXME:
|
|
// waitingCompaction.push(() => appendTransaction(dataArray, cb))
|
|
// return
|
|
// }
|
|
|
|
let size = 0
|
|
const encodedDataArray = dataArray.map((data) => {
|
|
let encodedData = codec.encode(data)
|
|
if (typeof encodedData === 'string') encodedData = b4a.from(encodedData)
|
|
size += Record.size(encodedData)
|
|
return encodedData
|
|
})
|
|
|
|
size += EOB.SIZE
|
|
|
|
if (size > blockSize) return cb(appendLargerThanBlockErr())
|
|
|
|
assert(typeof nextOffsetInBlock === 'number', 'nextOffsetInBlock not set')
|
|
assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set')
|
|
if (nextOffsetInBlock + size > blockSize) {
|
|
// doesn't fit
|
|
const nextBlockBuf = b4a.alloc(blockSize)
|
|
latestBlockBuf = nextBlockBuf
|
|
latestBlockIndex += 1
|
|
nextOffsetInBlock = 0
|
|
debug("data doesn't fit current block, creating new")
|
|
}
|
|
|
|
assert(latestBlockBuf, 'latestBlockBuf not set')
|
|
const offsets = /** @type {Array<number>} */ ([])
|
|
for (const encodedData of encodedDataArray) {
|
|
Record.write(latestBlockBuf, nextOffsetInBlock, encodedData)
|
|
cache.set(latestBlockIndex, latestBlockBuf) // update cache
|
|
const offset = latestBlockIndex * blockSize + nextOffsetInBlock
|
|
offsets.push(offset)
|
|
blocksToBeWritten.set(latestBlockIndex, {
|
|
blockBuf: latestBlockBuf,
|
|
offset,
|
|
})
|
|
nextOffsetInBlock += Record.size(encodedData)
|
|
debug('data inserted at offset %d', offset)
|
|
}
|
|
|
|
scheduleWrite()
|
|
|
|
return cb(null, offsets)
|
|
}
|
|
|
|
const scheduleWrite = debounce(write, writeTimeout)
|
|
|
|
function write() {
|
|
if (blocksToBeWritten.size === 0) return
|
|
const blockIndex = blocksToBeWritten.keys().next().value
|
|
const blockStart = blockIndex * blockSize
|
|
const { blockBuf, offset } =
|
|
/** @type {{ blockBuf: B4A, offset: number }} */ (
|
|
blocksToBeWritten.get(blockIndex)
|
|
)
|
|
blocksToBeWritten.delete(blockIndex)
|
|
|
|
// prettier-ignore
|
|
debug('writing block of size: %d, to offset: %d', blockBuf.length, blockIndex * blockSize)
|
|
writingBlockIndex = blockIndex
|
|
writeWithFSync(blockStart, blockBuf, null, function onBlockWritten(err, _) {
|
|
const drainsBefore = (waitingDrain.get(blockIndex) || []).slice(0)
|
|
writingBlockIndex = -1
|
|
if (err) {
|
|
debug('failed to write block %d', blockIndex)
|
|
throw err
|
|
} else {
|
|
since.set(offset)
|
|
|
|
// prettier-ignore
|
|
debug('draining the waiting queue for %d, items: %d', blockIndex, drainsBefore.length)
|
|
for (let i = 0; i < drainsBefore.length; ++i) drainsBefore[i]()
|
|
|
|
// the resumed streams might have added more to waiting
|
|
let drainsAfter = waitingDrain.get(blockIndex) || []
|
|
if (drainsBefore.length === drainsAfter.length)
|
|
waitingDrain.delete(blockIndex)
|
|
else if (drainsAfter.length === 0) waitingDrain.delete(blockIndex)
|
|
else
|
|
waitingDrain.set(
|
|
blockIndex,
|
|
// @ts-ignore // FIXME:
|
|
waitingDrain.get(blockIndex).slice(drainsBefore.length)
|
|
)
|
|
|
|
write() // next!
|
|
}
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @param {number} blockIndex
|
|
* @param {B4A } blockBuf
|
|
* @param {CB<null>} cb
|
|
*/
|
|
function overwrite(blockIndex, blockBuf, cb) {
|
|
cache.set(blockIndex, blockBuf)
|
|
const blockStart = blockIndex * blockSize
|
|
writeWithFSync(blockStart, blockBuf, null, cb)
|
|
}
|
|
|
|
/**
|
|
* @param {number} newLatestBlockIndex
|
|
* @param {CB<number>} cb
|
|
*/
|
|
function truncate(newLatestBlockIndex, cb) {
|
|
assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set')
|
|
if (newLatestBlockIndex > latestBlockIndex) {
|
|
return cb(unexpectedTruncationErr())
|
|
}
|
|
if (newLatestBlockIndex === latestBlockIndex) {
|
|
const blockStart = latestBlockIndex * blockSize
|
|
loadLatestBlock(blockStart, function onTruncateLoadedLatestBlock1(err) {
|
|
if (err) cb(err)
|
|
else cb(null, 0)
|
|
})
|
|
return
|
|
}
|
|
const size = (latestBlockIndex + 1) * blockSize
|
|
const newSize = (newLatestBlockIndex + 1) * blockSize
|
|
for (let i = newLatestBlockIndex + 1; i < latestBlockIndex; ++i) {
|
|
cache.delete(i)
|
|
}
|
|
truncateWithFSync(newSize, function onTruncateWithFSyncDone(err) {
|
|
if (err) return cb(err)
|
|
const blockStart = newSize - blockSize
|
|
loadLatestBlock(blockStart, function onTruncateLoadedLatestBlock2(err) {
|
|
if (err) return cb(err)
|
|
const sizeDiff = size - newSize
|
|
cb(null, sizeDiff)
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @param {CB<{ totalBytes: number; deletedBytes: number }>} cb
|
|
*/
|
|
function stats(cb) {
|
|
if (since.value == null) {
|
|
since((/** @type {number} */ totalBytes) => {
|
|
cb(null, { totalBytes: Math.max(0, totalBytes), deletedBytes })
|
|
return false
|
|
})
|
|
} else {
|
|
cb(null, { totalBytes: Math.max(0, since.value), deletedBytes })
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param {CB<unknown>} cb
|
|
*/
|
|
function saveStats(cb) {
|
|
const stats = JSON.stringify({ deletedBytes })
|
|
AtomicFile.writeFile(statsFilename, stats, 'utf8', cb)
|
|
}
|
|
|
|
/**
|
|
* @param {CB<unknown>} cb
|
|
*/
|
|
// function compact(cb) { // FIXME:
|
|
// if (compaction) {
|
|
// debug('compaction already in progress')
|
|
// waitingCompaction.push(cb)
|
|
// return
|
|
// }
|
|
// for (const stream of self.streams) {
|
|
// if (stream.live && (stream.max || stream.max_inclusive)) {
|
|
// return cb(compactWithMaxLiveStreamErr())
|
|
// }
|
|
// }
|
|
// onStreamsDone(function startCompactAfterStreamsDone() {
|
|
// onDrain(function startCompactAfterDrain() {
|
|
// onDeletesFlushed(function startCompactAfterDeletes() {
|
|
// if (compactionProgress.value.done) {
|
|
// compactionProgress.set({ percent: 0, done: false })
|
|
// }
|
|
// compaction = new Compaction(self, (/** @type {any} */ err, /** @type {any} */ stats) => {
|
|
// compaction = null
|
|
// if (err) return cb(err)
|
|
// deletedBytes = 0
|
|
// saveStats(function onSavedStatsAfterCompaction(err) {
|
|
// if (err)
|
|
// debug('error saving stats after compaction: %s', err.message)
|
|
// })
|
|
// for (const stream of self.streams) {
|
|
// if (stream.live) stream.postCompactionReset(since.value)
|
|
// }
|
|
// compactionProgress.set({ ...stats, percent: 1, done: true })
|
|
// for (const callback of waitingCompaction) callback()
|
|
// waitingCompaction.length = 0
|
|
// cb()
|
|
// })
|
|
// let prevUpdate = 0
|
|
// compaction.progress((/** @type {any} */ stats) => {
|
|
// const now = Date.now()
|
|
// if (now - prevUpdate > COMPACTION_PROGRESS_EMIT_INTERVAL) {
|
|
// prevUpdate = now
|
|
// compactionProgress.set({ ...stats, done: false })
|
|
// }
|
|
// })
|
|
// })
|
|
// })
|
|
// })
|
|
// }
|
|
|
|
/**
|
|
* @param {CB<unknown>} cb
|
|
*/
|
|
function close(cb) {
|
|
onDrain(function closeAfterHavingDrained() {
|
|
onDeletesFlushed(function closeAfterDeletesFlushed() {
|
|
raf.close(cb)
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @template T
|
|
* @param {T} fn
|
|
* @returns {T}
|
|
*/
|
|
function onLoad(fn) {
|
|
const fun = /** @type {(this: null | void, ...args: Array<any> )=>void} */ (
|
|
fn
|
|
)
|
|
return /** @type {any} */ (
|
|
function waitForLogLoaded(/** @type {any[]} */ ...args) {
|
|
if (latestBlockBuf === null) waitingLoad.push(fun.bind(null, ...args))
|
|
else fun(...args)
|
|
}
|
|
)
|
|
}
|
|
|
|
/**
|
|
* @param {CallableFunction} fn
|
|
*/
|
|
function onDrain(fn) {
|
|
// if (compaction) { // FIXME:
|
|
// waitingCompaction.push(fn)
|
|
// return
|
|
// }
|
|
if (blocksToBeWritten.size === 0 && writingBlockIndex === -1) fn()
|
|
else {
|
|
const latestBlockIndex = /** @type {number} */ (
|
|
blocksToBeWritten.size > 0
|
|
? last(blocksToBeWritten.keys())
|
|
: writingBlockIndex
|
|
)
|
|
const drains = waitingDrain.get(latestBlockIndex) || []
|
|
drains.push(fn)
|
|
waitingDrain.set(latestBlockIndex, drains)
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param {IterableIterator<number>} iterable
|
|
*/
|
|
function last(iterable) {
|
|
let res = null
|
|
for (let x of iterable) res = x
|
|
return res
|
|
}
|
|
|
|
return {
|
|
// Public API:
|
|
get: onLoad(get),
|
|
scan: onLoad(scan),
|
|
del: onLoad(del), // TODO
|
|
append: onLoad(append), // TODO
|
|
appendTransaction: onLoad(appendTransaction),
|
|
close: onLoad(close), // TODO
|
|
onDrain: onLoad(onDrain), // TODO
|
|
onDeletesFlushed: onLoad(onDeletesFlushed),
|
|
// compact: onLoad(compact), // FIXME:
|
|
// compactionProgress,
|
|
since,
|
|
stats, // TODO
|
|
|
|
// Internals needed by ./compaction.js:
|
|
filename,
|
|
blockSize,
|
|
overwrite,
|
|
truncate,
|
|
hasNoSpaceFor,
|
|
}
|
|
}
|
|
|
|
module.exports = AsyncAppendOnlyLog
|