mirror of https://codeberg.org/pzp/pzp-db.git
make ghost read APIs synchronous
This commit is contained in:
parent
b87ca604eb
commit
21c1adbd2a
34
lib/index.js
34
lib/index.js
|
@ -326,7 +326,9 @@ function initDB(peer, config) {
|
||||||
*/
|
*/
|
||||||
function loaded(cb) {
|
function loaded(cb) {
|
||||||
if (cb === void 0) return promisify(loaded)()
|
if (cb === void 0) return promisify(loaded)()
|
||||||
scannedLog.onReady(cb)
|
scannedLog.onReady(() => {
|
||||||
|
ghostDB.onReady(cb)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -961,30 +963,24 @@ function initDB(peer, config) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {MsgID} tangleID
|
* @param {MsgID} tangleID
|
||||||
* @param {CB<Array<string>>} cb
|
* @returns {Array<string>}
|
||||||
*/
|
*/
|
||||||
function getGhosts(tangleID, cb) {
|
function getGhosts(tangleID) {
|
||||||
ghostDB.read(tangleID, (err, ghosts) => {
|
const ghosts = ghostDB.read(tangleID)
|
||||||
if (err) return cb(new Error('ghosts.get() failed', { cause: err }))
|
return [...ghosts.keys()]
|
||||||
const msgIDs = [...ghosts.keys()]
|
|
||||||
cb(null, msgIDs)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {MsgID} tangleID
|
* @param {MsgID} tangleID
|
||||||
* @param {CB<number>} cb
|
* @returns {number}
|
||||||
*/
|
*/
|
||||||
function getMinGhostDepth(tangleID, cb) {
|
function getMinGhostDepth(tangleID) {
|
||||||
ghostDB.read(tangleID, (err, ghosts) => {
|
const ghosts = ghostDB.read(tangleID)
|
||||||
// prettier-ignore
|
let minDepth = Infinity
|
||||||
if (err) return cb(new Error('ghosts.getMinDepth() failed', { cause: err }))
|
for (const depth of ghosts.values()) {
|
||||||
let minDepth = Infinity
|
if (depth < minDepth) minDepth = depth
|
||||||
for (const depth of ghosts.values()) {
|
}
|
||||||
if (depth < minDepth) minDepth = depth
|
return minDepth
|
||||||
}
|
|
||||||
cb(null, minDepth)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
148
lib/utils.js
148
lib/utils.js
|
@ -1,6 +1,12 @@
|
||||||
|
const FS = require('fs')
|
||||||
|
const Path = require('path')
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
const atomic = require('atomic-file-rw')
|
const atomic = require('atomic-file-rw')
|
||||||
const Path = require('path')
|
// @ts-ignore
|
||||||
|
const multicb = require('multicb')
|
||||||
|
|
||||||
|
// TODO: fs is only supported in node.js. We should support browser by replacing
|
||||||
|
// fs.readdir with a browser "file" that just lists all ghost files.
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @template T
|
* @template T
|
||||||
|
@ -31,12 +37,22 @@ class ReadyGate {
|
||||||
for (const cb of this.#waiting) cb()
|
for (const cb of this.#waiting) cb()
|
||||||
this.#waiting.clear()
|
this.#waiting.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
get isReady() {
|
||||||
|
return this.#ready
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class GhostDB {
|
class GhostDB {
|
||||||
/** @type {string} */
|
/** @type {string} */
|
||||||
#basePath
|
#basePath
|
||||||
|
|
||||||
|
/** @type {ReadyGate} */
|
||||||
|
#loaded
|
||||||
|
|
||||||
|
/** @type {Map<string, Map<string, number>>} */
|
||||||
|
#maps
|
||||||
|
|
||||||
static encodingOpts = { encoding: 'utf-8' }
|
static encodingOpts = { encoding: 'utf-8' }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -44,6 +60,32 @@ class GhostDB {
|
||||||
*/
|
*/
|
||||||
constructor(basePath) {
|
constructor(basePath) {
|
||||||
this.#basePath = basePath
|
this.#basePath = basePath
|
||||||
|
this.#maps = new Map()
|
||||||
|
this.#loaded = new ReadyGate()
|
||||||
|
|
||||||
|
// Load all ghosts files into Maps in memory
|
||||||
|
// TODO this is opening up ALL the files at once, perhaps we should allow a
|
||||||
|
// specific max concurrent number of reads? i.e. not fully sequential
|
||||||
|
// neither fully parallel
|
||||||
|
if (FS.existsSync(basePath)) {
|
||||||
|
const done = multicb({ pluck: 1 })
|
||||||
|
FS.readdirSync(basePath).forEach((tangleID) => {
|
||||||
|
const cb = done()
|
||||||
|
this.#read(tangleID, (err, map) => {
|
||||||
|
// prettier-ignore
|
||||||
|
if (err) return cb(new Error('GhostDB failed to read ghost file', { cause: err }))
|
||||||
|
this.#maps.set(tangleID, map)
|
||||||
|
cb()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
done((/** @type {any} */ err) => {
|
||||||
|
// prettier-ignore
|
||||||
|
if (err) throw new Error('GhostDB failed to load', { cause: err })
|
||||||
|
this.#loaded.setReady()
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
this.#loaded.setReady()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -69,63 +111,18 @@ class GhostDB {
|
||||||
return new Map(JSON.parse(str))
|
return new Map(JSON.parse(str))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {string} tangleID
|
|
||||||
* @param {string} msgID
|
|
||||||
* @param {number} depth
|
|
||||||
* @param {number} max
|
|
||||||
* @param {CB<void>} cb
|
|
||||||
*/
|
|
||||||
save(tangleID, msgID, depth, max, cb) {
|
|
||||||
atomic.readFile(
|
|
||||||
this.#path(tangleID),
|
|
||||||
GhostDB.encodingOpts,
|
|
||||||
(/** @type {any} */ err, /** @type {any} */ str) => {
|
|
||||||
// Load Map
|
|
||||||
/** @type {Map<string, number>} */
|
|
||||||
let map;
|
|
||||||
if (err && err.code === 'ENOENT') map = new Map()
|
|
||||||
// prettier-ignore
|
|
||||||
else if (err) return cb(new Error('GhostDB.save() failed to read ghost file', { cause: err }))
|
|
||||||
else map = this.#deserialize(str)
|
|
||||||
|
|
||||||
map.set(msgID, depth)
|
|
||||||
|
|
||||||
// Garbage collect any ghost smaller than largestDepth - max
|
|
||||||
let largestDepth = -1
|
|
||||||
for (const depth of map.values()) {
|
|
||||||
if (depth > largestDepth) largestDepth = depth
|
|
||||||
}
|
|
||||||
for (const [x, depth] of map.entries()) {
|
|
||||||
if (depth <= largestDepth - max) map.delete(x)
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.writeFile(
|
|
||||||
this.#path(tangleID),
|
|
||||||
this.#serialize(map),
|
|
||||||
GhostDB.encodingOpts,
|
|
||||||
(/** @type {any} */ err) => {
|
|
||||||
// prettier-ignore
|
|
||||||
if (err) return cb(new Error('GhostDB.save() failed to write ghost file', { cause: err }))
|
|
||||||
else cb()
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {string} tangleID
|
* @param {string} tangleID
|
||||||
* @param {CB<Map<string, number>>} cb
|
* @param {CB<Map<string, number>>} cb
|
||||||
*/
|
*/
|
||||||
read(tangleID, cb) {
|
#read(tangleID, cb) {
|
||||||
atomic.readFile(
|
atomic.readFile(
|
||||||
this.#path(tangleID),
|
this.#path(tangleID),
|
||||||
GhostDB.encodingOpts,
|
GhostDB.encodingOpts,
|
||||||
(/** @type {any} */ err, /** @type {any} */ str) => {
|
(/** @type {any} */ err, /** @type {any} */ str) => {
|
||||||
// Load Map
|
// Load Map
|
||||||
/** @type {Map<string, number>} */
|
/** @type {Map<string, number>} */
|
||||||
let map;
|
let map
|
||||||
if (err && err.code === 'ENOENT') map = new Map()
|
if (err && err.code === 'ENOENT') map = new Map()
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
else if (err) return cb(new Error('GhostDB.read() failed to read ghost file', { cause: err }))
|
else if (err) return cb(new Error('GhostDB.read() failed to read ghost file', { cause: err }))
|
||||||
|
@ -135,6 +132,61 @@ class GhostDB {
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {() => void} cb
|
||||||
|
*/
|
||||||
|
onReady(cb) {
|
||||||
|
this.#loaded.onReady(cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {string} tangleID
|
||||||
|
* @param {string} msgID
|
||||||
|
* @param {number} depth
|
||||||
|
* @param {number} max
|
||||||
|
* @param {CB<void>} cb
|
||||||
|
*/
|
||||||
|
save(tangleID, msgID, depth, max, cb) {
|
||||||
|
this.#loaded.onReady(() => {
|
||||||
|
if (!this.#maps.has(tangleID)) this.#maps.set(tangleID, new Map())
|
||||||
|
const map = /** @type {Map<string, number>} */ (this.#maps.get(tangleID))
|
||||||
|
const newMap = new Map(map)
|
||||||
|
newMap.set(msgID, depth)
|
||||||
|
|
||||||
|
// Garbage collect any ghost smaller than largestDepth - max
|
||||||
|
let largestDepth = -1
|
||||||
|
for (const depth of newMap.values()) {
|
||||||
|
if (depth > largestDepth) largestDepth = depth
|
||||||
|
}
|
||||||
|
for (const [x, depth] of newMap.entries()) {
|
||||||
|
if (depth <= largestDepth - max) newMap.delete(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.writeFile(
|
||||||
|
this.#path(tangleID),
|
||||||
|
this.#serialize(newMap),
|
||||||
|
GhostDB.encodingOpts,
|
||||||
|
(/** @type {any} */ err) => {
|
||||||
|
// prettier-ignore
|
||||||
|
if (err) return cb(new Error('GhostDB.save() failed to write ghost file', { cause: err }))
|
||||||
|
this.#maps.set(tangleID, newMap)
|
||||||
|
cb()
|
||||||
|
}
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {string} tangleID
|
||||||
|
* @returns {Map<string, number>}
|
||||||
|
*/
|
||||||
|
read(tangleID) {
|
||||||
|
if (!this.#loaded.isReady) {
|
||||||
|
throw new Error('GhostDB.read() called before loaded')
|
||||||
|
}
|
||||||
|
return this.#maps.get(tangleID) ?? new Map()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = { ReadyGate, GhostDB }
|
module.exports = { ReadyGate, GhostDB }
|
||||||
|
|
|
@ -33,6 +33,7 @@
|
||||||
"b4a": "~1.6.4",
|
"b4a": "~1.6.4",
|
||||||
"bs58": "~5.0.0",
|
"bs58": "~5.0.0",
|
||||||
"json-canon": "~1.0.0",
|
"json-canon": "~1.0.0",
|
||||||
|
"multicb": "~1.2.2",
|
||||||
"obz": "~1.1.0",
|
"obz": "~1.1.0",
|
||||||
"ppppp-keypair": "github:staltz/ppppp-keypair",
|
"ppppp-keypair": "github:staltz/ppppp-keypair",
|
||||||
"promisify-4loc": "~1.0.0",
|
"promisify-4loc": "~1.0.0",
|
||||||
|
|
|
@ -34,7 +34,7 @@ test('ghosts.add, ghosts.get, ghosts.getMinDepth', async (t) => {
|
||||||
}
|
}
|
||||||
const feedID = peer.db.feed.getID(account, 'post')
|
const feedID = peer.db.feed.getID(account, 'post')
|
||||||
|
|
||||||
const ghosts0 = await p(peer.db.ghosts.get)(feedID)
|
const ghosts0 = peer.db.ghosts.get(feedID)
|
||||||
assert.deepEqual(ghosts0, [], 'no ghosts so far')
|
assert.deepEqual(ghosts0, [], 'no ghosts so far')
|
||||||
|
|
||||||
await p(peer.db.ghosts.add)({ msg: msgIDs[0], tangle: feedID, max: MAX })
|
await p(peer.db.ghosts.add)({ msg: msgIDs[0], tangle: feedID, max: MAX })
|
||||||
|
@ -43,16 +43,16 @@ test('ghosts.add, ghosts.get, ghosts.getMinDepth', async (t) => {
|
||||||
await p(peer.db.ghosts.add)({ msg: msgIDs[3], tangle: feedID, max: MAX })
|
await p(peer.db.ghosts.add)({ msg: msgIDs[3], tangle: feedID, max: MAX })
|
||||||
await p(peer.db.ghosts.add)({ msg: msgIDs[4], tangle: feedID, max: MAX })
|
await p(peer.db.ghosts.add)({ msg: msgIDs[4], tangle: feedID, max: MAX })
|
||||||
|
|
||||||
const ghostsA = await p(peer.db.ghosts.get)(feedID)
|
const ghostsA = peer.db.ghosts.get(feedID)
|
||||||
assert.deepEqual(ghostsA, msgIDs.slice(0, 5), 'ghosts so far')
|
assert.deepEqual(ghostsA, msgIDs.slice(0, 5), 'ghosts so far')
|
||||||
const depthA = await p(peer.db.ghosts.getMinDepth)(feedID)
|
const depthA = peer.db.ghosts.getMinDepth(feedID)
|
||||||
assert.equal(depthA, 1, 'min depth so far')
|
assert.equal(depthA, 1, 'min depth so far')
|
||||||
|
|
||||||
await p(peer.db.ghosts.add)({ msg: msgIDs[5], tangle: feedID, max: MAX })
|
await p(peer.db.ghosts.add)({ msg: msgIDs[5], tangle: feedID, max: MAX })
|
||||||
|
|
||||||
const ghostsB = await p(peer.db.ghosts.get)(feedID)
|
const ghostsB = peer.db.ghosts.get(feedID)
|
||||||
assert.deepEqual(ghostsB, msgIDs.slice(1, 6), 'ghosts so far')
|
assert.deepEqual(ghostsB, msgIDs.slice(1, 6), 'ghosts so far')
|
||||||
const depthB = await p(peer.db.ghosts.getMinDepth)(feedID)
|
const depthB = peer.db.ghosts.getMinDepth(feedID)
|
||||||
assert.equal(depthB, 2, 'min depth so far')
|
assert.equal(depthB, 2, 'min depth so far')
|
||||||
|
|
||||||
await p(peer.close)(true)
|
await p(peer.close)(true)
|
||||||
|
|
Loading…
Reference in New Issue