mirror of https://codeberg.org/pzp/pzp-db.git
add ghosts API
This commit is contained in:
parent
6996fb2d20
commit
d74695b7e7
65
lib/index.js
65
lib/index.js
|
@ -15,7 +15,7 @@ const {
|
|||
ACCOUNT_SELF,
|
||||
ACCOUNT_ANY,
|
||||
} = require('./msg-v3/constants')
|
||||
const { ReadyGate } = require('./utils')
|
||||
const { ReadyGate, GhostDB } = require('./utils')
|
||||
const { decrypt } = require('./encryption')
|
||||
|
||||
/**
|
||||
|
@ -162,6 +162,8 @@ function initDB(peer, config) {
|
|||
},
|
||||
})
|
||||
|
||||
const ghostDB = new GhostDB(Path.join(config.path, 'ghosts'))
|
||||
|
||||
peer.close.hook(function (/** @type {any} */ fn, /** @type {any} */ args) {
|
||||
log.close(() => {
|
||||
// @ts-ignore
|
||||
|
@ -929,6 +931,62 @@ function initDB(peer, config) {
|
|||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {{ msg: MsgID; tangle: MsgID; max: number; }} opts
|
||||
* @param {CB<void>} cb
|
||||
*/
|
||||
function addGhost(opts, cb) {
|
||||
if (!opts) return cb(new Error('ghosts.add() requires an `opts`'))
|
||||
// prettier-ignore
|
||||
if (!opts.msg || typeof opts.msg !== 'string') return cb(new Error('ghosts.add() requires msgID of the deleted msg in `opts.msg`'))
|
||||
// prettier-ignore
|
||||
if (!opts.tangle || typeof opts.tangle !== 'string') return cb(new Error('ghosts.add() requires tangleID for the deleted msg in `opts.tangle`'))
|
||||
// prettier-ignore
|
||||
if (!opts.max || typeof opts.max !== 'number') return cb(new Error('ghosts.add() requires max depth distance in `opts.max`'))
|
||||
const { msg: msgID, tangle: tangleID, max } = opts
|
||||
const rec = getRecord(msgID)
|
||||
if (!rec) return cb()
|
||||
if (!rec.msg) return cb()
|
||||
const tangleData = rec.msg.metadata.tangles[tangleID]
|
||||
// prettier-ignore
|
||||
if (!tangleData) return cb(new Error(`ghosts.add() opts.msg "${opts.msg}" does not belong to opts.tangle "${opts.tangle}"`))
|
||||
const depth = tangleData.depth
|
||||
|
||||
ghostDB.save(tangleID, msgID, depth, max, (err) => {
|
||||
// prettier-ignore
|
||||
if (err) cb(new Error('ghosts.add() failed to save to disk', { cause: err }))
|
||||
else cb()
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {MsgID} tangleID
|
||||
* @param {CB<Array<string>>} cb
|
||||
*/
|
||||
function getGhosts(tangleID, cb) {
|
||||
ghostDB.read(tangleID, (err, ghosts) => {
|
||||
if (err) return cb(new Error('ghosts.get() failed', { cause: err }))
|
||||
const msgIDs = [...ghosts.keys()]
|
||||
cb(null, msgIDs)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {MsgID} tangleID
|
||||
* @param {CB<number>} cb
|
||||
*/
|
||||
function getMinGhostDepth(tangleID, cb) {
|
||||
ghostDB.read(tangleID, (err, ghosts) => {
|
||||
// prettier-ignore
|
||||
if (err) return cb(new Error('ghosts.getMinDepth() failed', { cause: err }))
|
||||
let minDepth = Infinity
|
||||
for (const depth of ghosts.values()) {
|
||||
if (depth < minDepth) minDepth = depth
|
||||
}
|
||||
cb(null, minDepth)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {MsgID} msgID
|
||||
* @param {CB<void>} cb
|
||||
|
@ -993,6 +1051,11 @@ function initDB(peer, config) {
|
|||
get,
|
||||
del,
|
||||
erase,
|
||||
ghosts: {
|
||||
add: addGhost,
|
||||
get: getGhosts,
|
||||
getMinDepth: getMinGhostDepth,
|
||||
},
|
||||
onRecordAdded,
|
||||
getTangle,
|
||||
msgs,
|
||||
|
|
114
lib/utils.js
114
lib/utils.js
|
@ -1,3 +1,15 @@
|
|||
// @ts-ignore
|
||||
const atomic = require('atomic-file-rw')
|
||||
const Path = require('path')
|
||||
|
||||
/**
|
||||
* @template T
|
||||
* @typedef {T extends void ?
|
||||
* (...args: [Error] | []) => void :
|
||||
* (...args: [Error] | [null, T]) => void
|
||||
* } CB
|
||||
*/
|
||||
|
||||
class ReadyGate {
|
||||
#waiting
|
||||
#ready
|
||||
|
@ -21,4 +33,104 @@ class ReadyGate {
|
|||
}
|
||||
}
|
||||
|
||||
module.exports = { ReadyGate }
|
||||
class GhostDB {
|
||||
/** @type {string} */
|
||||
#basePath
|
||||
|
||||
static encodingOpts = { encoding: 'utf-8' }
|
||||
|
||||
/**
|
||||
* @param {string} basePath
|
||||
*/
|
||||
constructor(basePath) {
|
||||
this.#basePath = basePath
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} tangleID
|
||||
*/
|
||||
#path(tangleID) {
|
||||
return Path.join(this.#basePath, tangleID)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Map<string, number>} map
|
||||
* @returns {string}
|
||||
*/
|
||||
#serialize(map) {
|
||||
return JSON.stringify([...map])
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} str
|
||||
* @returns {Map<string, number>}
|
||||
*/
|
||||
#deserialize(str) {
|
||||
return new Map(JSON.parse(str))
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} tangleID
|
||||
* @param {string} msgID
|
||||
* @param {number} depth
|
||||
* @param {number} max
|
||||
* @param {CB<void>} cb
|
||||
*/
|
||||
save(tangleID, msgID, depth, max, cb) {
|
||||
atomic.readFile(
|
||||
this.#path(tangleID),
|
||||
GhostDB.encodingOpts,
|
||||
(/** @type {any} */ err, /** @type {any} */ str) => {
|
||||
// Load Map
|
||||
/** @type {Map<string, number>} */
|
||||
let map;
|
||||
if (err && err.code === 'ENOENT') map = new Map()
|
||||
// prettier-ignore
|
||||
else if (err) return cb(new Error('GhostDB.save() failed to read ghost file', { cause: err }))
|
||||
else map = this.#deserialize(str)
|
||||
|
||||
map.set(msgID, depth)
|
||||
|
||||
// Garbage collect any ghost smaller than largestDepth - max
|
||||
let largestDepth = -1
|
||||
for (const depth of map.values()) {
|
||||
if (depth > largestDepth) largestDepth = depth
|
||||
}
|
||||
for (const [x, depth] of map.entries()) {
|
||||
if (depth <= largestDepth - max) map.delete(x)
|
||||
}
|
||||
|
||||
atomic.writeFile(
|
||||
this.#path(tangleID),
|
||||
this.#serialize(map),
|
||||
GhostDB.encodingOpts,
|
||||
(/** @type {any} */ err) => {
|
||||
// prettier-ignore
|
||||
if (err) return cb(new Error('GhostDB.save() failed to write ghost file', { cause: err }))
|
||||
else cb()
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} tangleID
|
||||
* @param {CB<Map<string, number>>} cb
|
||||
*/
|
||||
read(tangleID, cb) {
|
||||
atomic.readFile(
|
||||
this.#path(tangleID),
|
||||
GhostDB.encodingOpts,
|
||||
(/** @type {any} */ err, /** @type {any} */ str) => {
|
||||
// prettier-ignore
|
||||
if (err) return cb(new Error('GhostDB.read() failed to read ghost file', { cause: err }))
|
||||
const map = this.#deserialize(str)
|
||||
|
||||
cb(null, map)
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = { ReadyGate, GhostDB }
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
},
|
||||
"dependencies": {
|
||||
"async-append-only-log": "~4.3.10",
|
||||
"atomic-file-rw": "~0.3.0",
|
||||
"blake3": "~2.1.7",
|
||||
"b4a": "~1.6.4",
|
||||
"bs58": "~5.0.0",
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
const test = require('node:test')
|
||||
const assert = require('node:assert')
|
||||
const path = require('node:path')
|
||||
const os = require('node:os')
|
||||
const p = require('node:util').promisify
|
||||
const rimraf = require('rimraf')
|
||||
const SecretStack = require('secret-stack')
|
||||
const caps = require('ppppp-caps')
|
||||
const Keypair = require('ppppp-keypair')
|
||||
|
||||
const DIR = path.join(os.tmpdir(), 'ppppp-db-ghosts')
|
||||
rimraf.sync(DIR)
|
||||
|
||||
const keypair = Keypair.generate('ed25519', 'alice')
|
||||
test('ghosts.add, ghosts.get, ghosts.getMinDepth', async (t) => {
|
||||
const peer = SecretStack({ appKey: caps.shse })
|
||||
.use(require('../lib'))
|
||||
.use(require('ssb-box'))
|
||||
.call(null, { keypair, path: DIR })
|
||||
|
||||
await peer.db.loaded()
|
||||
|
||||
const account = await p(peer.db.account.create)({ domain: 'person' })
|
||||
const MAX = 5
|
||||
|
||||
let msgIDs = []
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const rec = await p(peer.db.feed.publish)({
|
||||
account,
|
||||
domain: 'post',
|
||||
data: { text: 'hello ' + i },
|
||||
})
|
||||
msgIDs.push(rec.id)
|
||||
}
|
||||
const feedID = peer.db.feed.getID(account, 'post')
|
||||
|
||||
await p(peer.db.ghosts.add)({ msg: msgIDs[0], tangle: feedID, max: MAX })
|
||||
await p(peer.db.ghosts.add)({ msg: msgIDs[1], tangle: feedID, max: MAX })
|
||||
await p(peer.db.ghosts.add)({ msg: msgIDs[2], tangle: feedID, max: MAX })
|
||||
await p(peer.db.ghosts.add)({ msg: msgIDs[3], tangle: feedID, max: MAX })
|
||||
await p(peer.db.ghosts.add)({ msg: msgIDs[4], tangle: feedID, max: MAX })
|
||||
|
||||
const ghostsA = await p(peer.db.ghosts.get)(feedID)
|
||||
assert.deepEqual(ghostsA, msgIDs.slice(0, 5), 'ghosts so far')
|
||||
const depthA = await p(peer.db.ghosts.getMinDepth)(feedID)
|
||||
assert.equal(depthA, 1, 'min depth so far')
|
||||
|
||||
await p(peer.db.ghosts.add)({ msg: msgIDs[5], tangle: feedID, max: MAX })
|
||||
|
||||
const ghostsB = await p(peer.db.ghosts.get)(feedID)
|
||||
assert.deepEqual(ghostsB, msgIDs.slice(1, 6), 'ghosts so far')
|
||||
const depthB = await p(peer.db.ghosts.getMinDepth)(feedID)
|
||||
assert.equal(depthB, 2, 'min depth so far')
|
||||
|
||||
await p(peer.close)(true)
|
||||
})
|
Loading…
Reference in New Issue