pzp-gc/lib/index.js

284 lines
7.7 KiB
JavaScript

// @ts-ignore
const multicb = require('multicb')
const makeDebug = require('debug')
// @ts-ignore
const p = (fn) => (...args) => {
return new Promise((res, rej) => {
// @ts-ignore
fn(...args, (err, val) => {
if (err) return rej(err)
return res(val)
})
})
}
/**
* @typedef {ReturnType<import('pzp-db').init>} pzpDB
* @typedef {ReturnType<import('pzp-goals').init>} pzpGoal
* @typedef {{
* gc: {
* maxLogBytes: number
* compactionInterval?: number
* }
* }} ExpectedConfig
* @typedef {{gc?: Partial<ExpectedConfig['gc']>}} Config
*/
/**
* @template T
* @typedef {T extends void ?
* (...args: [Error] | []) => void :
* (...args: [Error] | [null, T]) => void
* } CB
*/
/**
* @param {{ db: pzpDB, goals: pzpGoal }} peer
* @param {Config} config
*/
function initGC(peer, config) {
// State
const debug = makeDebug('pzp:gc')
let stopMonitoringLogSize = /** @type {CallableFunction | null} */ (null)
let hasCleanupScheduled = false
/**
* @param {Error} err
*/
function flattenCauseChain(err) {
let str = ''
while (err?.message ?? err) {
str += (err.message ?? err) + ': '
err = /**@type {Error}*/ (err.cause)
}
return str
}
/**
* Deletes messages that don't correspond to any goal.
* @private
* @return {Promise<void>}
*/
async function cleanup() {
return new Promise(async (res, rej) => {
debug('Cleanup started')
const startTime = Date.now()
const done = multicb({ pluck: 1 })
/**
* @param {string} errExplanation
*/
function makeRecCB(errExplanation) {
const cb = done()
return (/**@type {Error=}*/ err) => {
if (err) debug('%s: %s', errExplanation, flattenCauseChain(err))
cb()
}
}
let waiting = false
for await (const rec of peer.db.records()) {
if (!rec.msg) continue
const { id: msgID, msg } = rec
const [purpose, details] = await p(peer.goals.getMsgPurpose)(msgID, msg)
switch (purpose) {
case 'goal': {
continue // don't cleanup
}
case 'none': {
const recCB = makeRecCB('Failed to delete msg when cleaning up')
debug('Deleting msg %s with purpose=none', msgID)
peer.db.del(msgID, recCB)
waiting = true
continue
}
case 'ghost': {
const { tangleID, span } = details
const recCB = makeRecCB('Failed to delete ghost msg when cleaning up')
// TODO: Could one msg be a ghostable in MANY tangles? Or just one?
debug('Deleting and ghosting msg %s with purpose=ghost', msgID)
peer.db.ghosts.add({ tangleID, msgID, span }, (err) => {
if (err) return recCB(err)
peer.db.del(msgID, recCB)
})
waiting = true
continue
}
case 'trail': {
if (!msg.data) continue // it's already erased
const recCB = makeRecCB('Failed to erase trail msg when cleaning up')
debug('Erasing msg %s with purpose=trail', msgID)
peer.db.erase(msgID, recCB)
waiting = true
continue
}
default: {
rej(new Error('Unreachable'))
return
}
}
}
if (waiting) done(whenEnded)
else whenEnded()
/** @param {Error=} err */
function whenEnded(err) {
const duration = Date.now() - startTime
if (err) debug('Cleanup ended with an error %s', err.message ?? err)
else debug('Cleanup completed in %sms', duration)
res()
}
})
}
/**
* Recreates the log so it has no empty space.
* @private
* @param {CB<void>} cb
*/
function compact(cb) {
debug('Compaction started')
const startTime = Date.now()
peer.db.log.compact((err) => {
const duration = Date.now() - startTime
if (err) debug('Compaction ended with an error %s', err.message ?? err)
else debug('Compaction completed in %sms', duration)
cb()
})
}
/**
* @param {number} percentUsed
* @param {number} maxLogBytes
* @param {{ totalBytes: number; }} stats
*/
function reportCleanupNeed(percentUsed, maxLogBytes, stats) {
const bytesRemaining = maxLogBytes - stats.totalBytes
const kbRemaining = bytesRemaining >> 10
const mbRemaining = bytesRemaining >> 20
const remaining =
mbRemaining > 0
? `${mbRemaining}MB`
: kbRemaining > 0
? `${kbRemaining}KB`
: `${bytesRemaining}B`
if (bytesRemaining > 0) {
// prettier-ignore
debug('Log is %s% full (only %s of free space), needs cleanup', percentUsed.toFixed(0), remaining)
} else if (bytesRemaining === 0) {
debug('Log is full, needs cleanup')
} else {
// prettier-ignore
debug('Log is %s% full (%s beyond limit), needs cleanup', percentUsed.toFixed(0), remaining.slice(1))
}
}
/**
* @param {number} percentDeleted
* @param {{ deletedBytes: any; }} stats
*/
function reportCompactionNeed(percentDeleted, stats) {
const unusedBytes = stats.deletedBytes
const kbUnused = unusedBytes >> 10
const mbUnused = unusedBytes >> 20
const unused =
mbUnused > 0
? `${mbUnused}MB`
: kbUnused > 0
? `${kbUnused}KB`
: `${unusedBytes}B`
// prettier-ignore
debug('Log is %s% deleted (%s of unused space), needs compaction', percentDeleted.toFixed(0), unused)
}
/**
* Monitor the log size and schedule compaction and/or cleanup.
*
* @param {number} maxLogBytes
*/
function monitorLogSize(maxLogBytes) {
/** Number of records that match roughly 1% of the max log size */
const CHECKPOINT = Math.floor((maxLogBytes * 0.01) / 500) // assuming 1 record = 500 bytes
function checkLogSize() {
peer.db.log.stats((err, stats) => {
if (err) return
const percentUsed = (stats.totalBytes / maxLogBytes) * 100
const percentDeleted = (stats.deletedBytes / stats.totalBytes) * 100
const needsCleanup = percentUsed > 80
const needsCompaction = percentDeleted > 30
// Schedule clean up
if ((needsCleanup || needsCompaction) && !hasCleanupScheduled) {
if (needsCleanup) reportCleanupNeed(percentUsed, maxLogBytes, stats)
if (needsCompaction) reportCompactionNeed(percentDeleted, stats)
hasCleanupScheduled = true
if (needsCleanup) {
cleanup().finally(() => {
compact(() => {
hasCleanupScheduled = false
})
})
} else {
compact(() => {
hasCleanupScheduled = false
})
}
}
})
}
let count = 0
stopMonitoringLogSize = peer.db.onRecordAdded(() => {
count += 1
if (count >= CHECKPOINT) {
count = 0
checkLogSize()
}
})
checkLogSize()
}
/**
* @param {number?} maxLogBytes
*/
function start(maxLogBytes) {
stop()
const actualMaxLogBytes = maxLogBytes ?? config.gc?.maxLogBytes ?? null
// prettier-ignore
if (!actualMaxLogBytes) throw new Error('gc plugin requires maxLogBytes via start() argument or config.gc.maxLogBytes')
if (!stopMonitoringLogSize) {
monitorLogSize(actualMaxLogBytes)
}
}
function stop() {
if (stopMonitoringLogSize) {
stopMonitoringLogSize()
stopMonitoringLogSize = null
}
}
/**
* @param {CB<void>} cb
*/
function forceImmediately(cb) {
debug('Force clean and compact immediately')
cleanup().finally(() => {
compact(cb)
})
}
return {
start,
stop,
forceImmediately,
}
}
exports.name = 'gc'
exports.needs = ['db', 'goals']
exports.init = initGC