mirror of https://codeberg.org/pzp/pzp-dict.git
589 lines
15 KiB
JavaScript
589 lines
15 KiB
JavaScript
const MsgV4 = require('pzp-db/msg-v4')
|
|
const pull = require('pull-stream')
|
|
|
|
const PREFIX = 'dict_v1__'
|
|
|
|
/**
|
|
* @typedef {ReturnType<import('pzp-db').init>} PZPDB
|
|
* @typedef {import('pzp-db').RecPresent} RecPresent
|
|
* @typedef {{
|
|
* hook: (
|
|
* cb: (
|
|
* this: any,
|
|
* fn: (this: any, ...a: Array<any>) => any,
|
|
* args: Array<any>
|
|
* ) => void
|
|
* ) => void
|
|
* }} ClosableHook
|
|
* @typedef {string} Subdomain
|
|
* @typedef {string} MsgID
|
|
* @typedef {`${Subdomain}.${string}`} SubdomainField
|
|
* @typedef {{
|
|
* update: {
|
|
* [field in string]: any
|
|
* },
|
|
* supersedes: Array<MsgID>,
|
|
* }} DictMsgData
|
|
* @typedef {{
|
|
* dict?: {
|
|
* ghostSpan?: number
|
|
* }
|
|
* }} Config
|
|
*/
|
|
|
|
/**
|
|
* @template [T = any]
|
|
* @typedef {import('pzp-db/msg-v4').Msg<T>} Msg<T>
|
|
*/
|
|
|
|
/**
|
|
* @template T
|
|
* @typedef {[T] extends [void] ?
|
|
* (...args: [Error] | []) => void :
|
|
* (...args: [Error] | [null, T]) => void
|
|
* } CB
|
|
*/
|
|
|
|
/**
|
|
* @param {string} domain
|
|
* @returns {Subdomain}
|
|
*/
|
|
function toSubdomain(domain) {
|
|
return domain.slice(PREFIX.length)
|
|
}
|
|
|
|
/**
|
|
* @param {Subdomain} subdomain
|
|
* @returns {string}
|
|
*/
|
|
function fromSubdomain(subdomain) {
|
|
return PREFIX + subdomain
|
|
}
|
|
|
|
/**
|
|
* @param {{ db: PZPDB, close: ClosableHook }} peer
|
|
* @param {Config} config
|
|
*/
|
|
function initDict(peer, config) {
|
|
let ghostSpan = config.dict?.ghostSpan ?? 32
|
|
if (ghostSpan < 1) throw new Error('config.dict.ghostSpan must be >= 0')
|
|
|
|
//#region state
|
|
let loadedAccountID = /** @type {string | null} */ (null)
|
|
let loadPromise = /** @type {Promise<void> | null} */ (null)
|
|
let cancelOnRecordAdded = /** @type {CallableFunction | null} */ (null)
|
|
const tangles = /** @type {Map<Subdomain, MsgV4.Tangle>} */ (new Map())
|
|
|
|
const fieldRoots = {
|
|
_map: /** @type {Map<SubdomainField, Set<MsgID>>} */ (new Map()),
|
|
/**
|
|
* @param {string} subdomain
|
|
* @param {string} field
|
|
* @returns {SubdomainField}
|
|
*/
|
|
_getKey(subdomain, field) {
|
|
return `${subdomain}.${field}`
|
|
},
|
|
/**
|
|
* @param {string} subdomain
|
|
* @returns {{[field in string]: Array<MsgID>}}
|
|
*/
|
|
getAll(subdomain) {
|
|
const out = /** @type {{[field in string]: Array<MsgID>}} */ ({})
|
|
for (const [key, value] of this._map.entries()) {
|
|
if (key.startsWith(subdomain + '.')) {
|
|
const field = key.slice(subdomain.length + 1)
|
|
out[field] = [...value]
|
|
}
|
|
}
|
|
return out
|
|
},
|
|
/**
|
|
* @param {string} subdomain
|
|
* @param {string} field
|
|
* @returns {Set<MsgID> | undefined}
|
|
*/
|
|
get(subdomain, field) {
|
|
const key = this._getKey(subdomain, field)
|
|
return this._map.get(key)
|
|
},
|
|
/**
|
|
* @param {string} subdomain
|
|
* @param {string} field
|
|
* @param {MsgID} msgID
|
|
*/
|
|
add(subdomain, field, msgID) {
|
|
const key = this._getKey(subdomain, field)
|
|
const set = this._map.get(key) ?? new Set()
|
|
set.add(msgID)
|
|
return this._map.set(key, set)
|
|
},
|
|
/**
|
|
* @param {string} subdomain
|
|
* @param {string} field
|
|
* @param {MsgID} msgID
|
|
*/
|
|
del(subdomain, field, msgID) {
|
|
const key = this._getKey(subdomain, field)
|
|
const set = this._map.get(key)
|
|
if (!set) return false
|
|
set.delete(msgID)
|
|
if (set.size === 0) this._map.delete(key)
|
|
return true
|
|
},
|
|
toString() {
|
|
return this._map
|
|
},
|
|
}
|
|
//#endregion
|
|
|
|
//#region active processes
|
|
peer.close.hook(function (fn, args) {
|
|
cancelOnRecordAdded?.()
|
|
fn.apply(this, args)
|
|
})
|
|
//#endregion
|
|
|
|
//#region internal methods
|
|
/**
|
|
* @private
|
|
* @param {Msg | null | undefined} msg
|
|
* @returns {msg is Msg}
|
|
*/
|
|
function isValidDictMoot(msg) {
|
|
if (!msg) return false
|
|
if (msg.metadata.account !== loadedAccountID) return false
|
|
const domain = msg.metadata.domain
|
|
if (!domain.startsWith(PREFIX)) return false
|
|
return MsgV4.isMoot(msg, loadedAccountID, domain)
|
|
}
|
|
|
|
/**
|
|
* @private
|
|
* @param {Msg | null | undefined} msg
|
|
* @returns {msg is Msg<DictMsgData>}
|
|
*/
|
|
function isValidDictMsg(msg) {
|
|
if (!msg) return false
|
|
if (!msg.data) return false
|
|
if (msg.metadata.account !== loadedAccountID) return false
|
|
if (!msg.metadata.domain.startsWith(PREFIX)) return false
|
|
if (!msg.data.update) return false
|
|
if (typeof msg.data.update !== 'object') return false
|
|
if (Array.isArray(msg.data.update)) return false
|
|
if (!Array.isArray(msg.data.supersedes)) return false
|
|
return true
|
|
}
|
|
|
|
/**
|
|
* @private
|
|
* @param {string} mootID
|
|
* @param {Msg} moot
|
|
*/
|
|
function learnDictMoot(mootID, moot) {
|
|
const subdomain = toSubdomain(moot.metadata.domain)
|
|
const tangle = tangles.get(subdomain) ?? new MsgV4.Tangle(mootID)
|
|
tangle.add(mootID, moot)
|
|
tangles.set(subdomain, tangle)
|
|
}
|
|
|
|
/**
|
|
* @private
|
|
* @param {string} msgID
|
|
* @param {Msg<DictMsgData>} msg
|
|
*/
|
|
function learnDictUpdate(msgID, msg) {
|
|
const { account, domain } = msg.metadata
|
|
const mootID = MsgV4.getMootID(account, domain)
|
|
const subdomain = toSubdomain(domain)
|
|
const tangle = tangles.get(subdomain) ?? new MsgV4.Tangle(mootID)
|
|
tangle.add(msgID, msg)
|
|
tangles.set(subdomain, tangle)
|
|
|
|
for (const field in msg.data.update) {
|
|
const existing = fieldRoots.get(subdomain, field)
|
|
if (!existing) {
|
|
fieldRoots.add(subdomain, field, msgID)
|
|
} else {
|
|
for (const existingID of existing) {
|
|
if (tangle.precedes(existingID, msgID)) {
|
|
fieldRoots.del(subdomain, field, existingID)
|
|
fieldRoots.add(subdomain, field, msgID)
|
|
} else {
|
|
fieldRoots.add(subdomain, field, msgID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @private
|
|
* @param {string} msgID
|
|
* @param {Msg} msg
|
|
*/
|
|
function maybeLearnAboutDict(msgID, msg) {
|
|
if (msg.metadata.account !== loadedAccountID) return
|
|
if (isValidDictMoot(msg)) {
|
|
learnDictMoot(msgID, msg)
|
|
return
|
|
}
|
|
if (isValidDictMsg(msg)) {
|
|
learnDictUpdate(msgID, msg)
|
|
return
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @private
|
|
* @param {CB<void>} cb
|
|
*/
|
|
function loaded(cb) {
|
|
if (cb === void 0) return loadPromise
|
|
else loadPromise?.then(() => cb(), cb)
|
|
}
|
|
|
|
/**
|
|
* @private
|
|
* @param {string} subdomain
|
|
* @param {CB<number>} cb
|
|
*/
|
|
function _squeezePotential(subdomain, cb) {
|
|
// prettier-ignore
|
|
if (!loadedAccountID) return cb(Error('Cannot squeeze potential before loading'))
|
|
// TODO: improve this so that the squeezePotential is the size of the
|
|
// tangle suffix built as a slice from the fieldRoots
|
|
const mootID = MsgV4.getMootID(loadedAccountID, fromSubdomain(subdomain))
|
|
peer.db.getTangle(mootID, (err, tangle) => {
|
|
if (err) return cb(err)
|
|
|
|
if (!tangle) return cb(null, 0)
|
|
const maxDepth = tangle.maxDepth
|
|
const fieldRoots = _getFieldRoots(subdomain)
|
|
let minDepth = Infinity
|
|
for (const field in fieldRoots) {
|
|
for (const msgID of fieldRoots[field]) {
|
|
const depth = tangle.getDepth(msgID)
|
|
if (depth < minDepth) minDepth = depth
|
|
}
|
|
}
|
|
return cb(null, maxDepth - minDepth)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @param {string} subdomain
|
|
* @param {{[field in string]: any}} update
|
|
* @param {CB<boolean>} cb
|
|
*/
|
|
function forceUpdate(subdomain, update, cb) {
|
|
if (!loadedAccountID) throw new Error('Cannot force update before loading')
|
|
const domain = fromSubdomain(subdomain)
|
|
|
|
// Populate supersedes
|
|
const supersedes = []
|
|
for (const field in update) {
|
|
const existing = fieldRoots.get(subdomain, field)
|
|
if (existing) supersedes.push(...existing)
|
|
}
|
|
|
|
peer.db.feed.publish(
|
|
{ account: loadedAccountID, domain, data: { update, supersedes } },
|
|
(err, rec) => {
|
|
// prettier-ignore
|
|
if (err) return cb(new Error('Failed to create msg when force-updating Dict', { cause: err }))
|
|
cb(null, true)
|
|
}
|
|
)
|
|
}
|
|
//#endregion
|
|
|
|
//#region public methods
|
|
/**
|
|
* @param {string} accountID
|
|
* @param {CB<void>} cb
|
|
*/
|
|
function load(accountID, cb) {
|
|
if (accountID === loadedAccountID) {
|
|
loaded(cb)
|
|
return
|
|
}
|
|
if (loadedAccountID !== null) {
|
|
// prettier-ignore
|
|
cb(new Error(`Cannot load Dict for account "${accountID}" because Dict for account "${loadedAccountID}" is already loaded`))
|
|
return
|
|
}
|
|
loadedAccountID = accountID
|
|
loadPromise = new Promise((resolve, reject) => {
|
|
// microtask is needed to ensure that loadPromise is assigned BEFORE this
|
|
// body is executed (which in turn does inversion of control when `cb` or
|
|
// `resolve` is called)
|
|
queueMicrotask(async () => {
|
|
for await (const rec of peer.db.records()) {
|
|
if (!rec.msg) continue
|
|
maybeLearnAboutDict(rec.id, rec.msg)
|
|
}
|
|
cancelOnRecordAdded = peer.db.onRecordAdded(
|
|
(/** @type {RecPresent} */ rec) => {
|
|
try {
|
|
maybeLearnAboutDict(rec.id, rec.msg)
|
|
} catch (err) {
|
|
console.error(err)
|
|
}
|
|
}
|
|
)
|
|
resolve()
|
|
cb()
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @param {string} subdomain
|
|
*/
|
|
function _getFieldRoots(subdomain) {
|
|
// prettier-ignore
|
|
if (!loadedAccountID) throw new Error('Cannot getFieldRoots() before loading')
|
|
return fieldRoots.getAll(subdomain)
|
|
}
|
|
|
|
/**
|
|
* @public
|
|
* @param {string} tangleID
|
|
* @param {CB<number>} cb
|
|
*/
|
|
function minRequiredDepth(tangleID, cb) {
|
|
peer.db.getTangle(tangleID, (err, tangle) => {
|
|
if (err) return cb(err)
|
|
|
|
// prettier-ignore
|
|
if (!tangle) return cb(null, 0)
|
|
if (!tangle.root) return cb(null, 0)
|
|
// prettier-ignore
|
|
if (!MsgV4.isMoot(tangle.root)) return cb(Error(`Tangle "${tangleID}" is not a moot`))
|
|
const domain = tangle.root.metadata.domain
|
|
// prettier-ignore
|
|
if (!domain.startsWith(PREFIX)) return cb(Error(`Tangle "${tangleID}" is not a Dict moot`))
|
|
|
|
// Discover field roots
|
|
const fieldRoots = new Set()
|
|
|
|
pull(
|
|
pull.values(tangle.topoSort()),
|
|
pull.asyncMap((msgID, cb) => {
|
|
peer.db.get(msgID, (err, msg) => {
|
|
if (err) return cb(err)
|
|
|
|
if (!msg?.data) return cb(null, null)
|
|
for (const supersededMsgID of msg.data.supersedes) {
|
|
fieldRoots.delete(supersededMsgID)
|
|
}
|
|
fieldRoots.add(msgID)
|
|
|
|
return cb(null, null)
|
|
})
|
|
}),
|
|
pull.drain(() => {}, (err) => {
|
|
// prettier-ignore
|
|
if (err) return cb(Error("minRequiredDepth() stream in dict failed", { cause: err }))
|
|
|
|
// Get minimum depth of all field roots
|
|
let minDepth = Infinity
|
|
for (const msgID of fieldRoots) {
|
|
const depth = tangle.getDepth(msgID)
|
|
if (depth < minDepth) minDepth = depth
|
|
}
|
|
|
|
return cb(null, minDepth)
|
|
})
|
|
)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @public
|
|
* @param {string} tangleID
|
|
* @param {CB<number>} cb
|
|
*/
|
|
function minGhostDepth(tangleID, cb) {
|
|
minRequiredDepth(tangleID, (err, minDepth) => {
|
|
if (err) return cb(err)
|
|
|
|
return cb(null, Math.max(0, minDepth - ghostSpan))
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @public
|
|
* @param {string} id
|
|
* @param {string} subdomain
|
|
* @param {CB<{[field in string]: any} | null>} cb
|
|
*/
|
|
function read(id, subdomain, cb) {
|
|
const domain = fromSubdomain(subdomain)
|
|
const mootID = MsgV4.getMootID(id, domain)
|
|
peer.db.getTangle(mootID, (err, tangle) => {
|
|
if (err) return cb(err)
|
|
|
|
if (!tangle) {
|
|
if (id === loadedAccountID) return cb(null, {})
|
|
else return cb(null, null)
|
|
}
|
|
|
|
const dict = /** @type {{[field in string]: any}} */ ({})
|
|
|
|
pull(
|
|
pull.values(tangle.topoSort()),
|
|
pull.asyncMap((msgID, cb) => {
|
|
peer.db.get(msgID, cb)
|
|
}),
|
|
pull.drain((msg) => {
|
|
if (isValidDictMsg(msg)) {
|
|
const { update } = msg.data
|
|
Object.assign(dict, update)
|
|
}
|
|
}, (err) => {
|
|
if (err) return cb(Error("dict read failed", { cause: err }))
|
|
cb(null, dict)
|
|
})
|
|
)
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @public
|
|
* @param {string} subdomain
|
|
* @returns {string}
|
|
*/
|
|
function getFeedID(subdomain) {
|
|
if (!loadedAccountID) throw new Error('Cannot getFeedID() before loading')
|
|
const domain = fromSubdomain(subdomain)
|
|
return MsgV4.getMootID(loadedAccountID, domain)
|
|
}
|
|
|
|
/**
|
|
* @public
|
|
* @param {MsgID} ghostableMsgID
|
|
* @param {MsgID} tangleID
|
|
* @param {CB<boolean>} cb
|
|
*/
|
|
function isGhostable(ghostableMsgID, tangleID, cb) {
|
|
if (ghostableMsgID === tangleID) return cb(null, false)
|
|
let i = 0
|
|
|
|
peer.db.get(ghostableMsgID, (err, msg) => {
|
|
if (err) return cb(err)
|
|
|
|
// prettier-ignore
|
|
if (!msg) return cb(Error(`isGhostable() msgID "${ghostableMsgID}" does not exist in the database`))
|
|
|
|
minRequiredDepth(tangleID, (err, minFieldRootDepth) => {
|
|
if (err) return cb(err)
|
|
|
|
const minGhostDepth = minFieldRootDepth - ghostSpan
|
|
const msgDepth = msg.metadata.tangles[tangleID].depth
|
|
if (minGhostDepth <= msgDepth && msgDepth < minFieldRootDepth) return cb(null, true)
|
|
return cb(null, false)
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @returns {number}
|
|
*/
|
|
function getGhostSpan() {
|
|
return ghostSpan
|
|
}
|
|
|
|
/**
|
|
* @param {number} span
|
|
* @returns {void}
|
|
*/
|
|
function setGhostSpan(span) {
|
|
if (span < 1) throw new Error('ghostSpan must be >= 0')
|
|
ghostSpan = span
|
|
}
|
|
|
|
/**
|
|
* @public
|
|
* @param {string} subdomain
|
|
* @param {{[field in string]: any}} update
|
|
* @param {CB<boolean>} cb
|
|
*/
|
|
function update(subdomain, update, cb) {
|
|
if (!loadedAccountID) return cb(new Error('Cannot update before loading'))
|
|
|
|
loaded(() => {
|
|
// prettier-ignore
|
|
if (!loadedAccountID) return cb(new Error('Expected account to be loaded'))
|
|
read(loadedAccountID, subdomain, (err, dict) => {
|
|
if (err) return cb(err)
|
|
|
|
// prettier-ignore
|
|
if (!dict) return cb(Error(`Cannot update non-existent dict "${subdomain}`))
|
|
|
|
let hasChanges = false
|
|
for (const [field, value] of Object.entries(update)) {
|
|
if (value !== dict[field]) {
|
|
hasChanges = true
|
|
break
|
|
}
|
|
}
|
|
if (!hasChanges) return cb(null, false)
|
|
forceUpdate(subdomain, update, cb)
|
|
})
|
|
})
|
|
}
|
|
|
|
/**
|
|
* @param {string} subdomain
|
|
* @param {CB<boolean>} cb
|
|
*/
|
|
function squeeze(subdomain, cb) {
|
|
if (!loadedAccountID) return cb(new Error('Cannot squeeze before loading'))
|
|
_squeezePotential(subdomain, (err, potential) => {
|
|
if (err) return cb(err)
|
|
|
|
if (potential < 1) return cb(null, false)
|
|
|
|
loaded(() => {
|
|
// prettier-ignore
|
|
if (!loadedAccountID) return cb(new Error('Expected account to be loaded'))
|
|
read(loadedAccountID, subdomain, (err, dict) => {
|
|
if (err) return cb(err)
|
|
|
|
// prettier-ignore
|
|
if (!dict) return cb(new Error(`Cannot squeeze non-existent Dict "${subdomain}"`))
|
|
forceUpdate(subdomain, dict, (err, _forceUpdated) => {
|
|
// prettier-ignore
|
|
if (err) return cb(new Error(`Failed to force update when squeezing Dict "${subdomain}"`, { cause: err }))
|
|
cb(null, true)
|
|
})
|
|
})
|
|
})
|
|
})
|
|
}
|
|
//#endregion
|
|
|
|
return {
|
|
load,
|
|
update,
|
|
read,
|
|
getDomain: fromSubdomain,
|
|
getFeedID,
|
|
isGhostable,
|
|
getGhostSpan,
|
|
setGhostSpan,
|
|
minGhostDepth,
|
|
minRequiredDepth,
|
|
squeeze,
|
|
|
|
_getFieldRoots,
|
|
_squeezePotential,
|
|
}
|
|
}
|
|
|
|
exports.name = 'dict'
|
|
exports.needs = ['db']
|
|
exports.init = initDict
|