From 21c1adbd2a00d0e633da566fe77d50a4c2491181 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Wed, 25 Oct 2023 15:22:07 +0300 Subject: [PATCH] make ghost read APIs synchronous --- lib/index.js | 34 +++++----- lib/utils.js | 148 +++++++++++++++++++++++++++++-------------- package.json | 1 + test/ghosts.tests.js | 10 +-- 4 files changed, 121 insertions(+), 72 deletions(-) diff --git a/lib/index.js b/lib/index.js index 424e42d..fac4558 100644 --- a/lib/index.js +++ b/lib/index.js @@ -326,7 +326,9 @@ function initDB(peer, config) { */ function loaded(cb) { if (cb === void 0) return promisify(loaded)() - scannedLog.onReady(cb) + scannedLog.onReady(() => { + ghostDB.onReady(cb) + }) } /** @@ -961,30 +963,24 @@ function initDB(peer, config) { /** * @param {MsgID} tangleID - * @param {CB>} cb + * @returns {Array} */ - function getGhosts(tangleID, cb) { - ghostDB.read(tangleID, (err, ghosts) => { - if (err) return cb(new Error('ghosts.get() failed', { cause: err })) - const msgIDs = [...ghosts.keys()] - cb(null, msgIDs) - }) + function getGhosts(tangleID) { + const ghosts = ghostDB.read(tangleID) + return [...ghosts.keys()] } /** * @param {MsgID} tangleID - * @param {CB} cb + * @returns {number} */ - function getMinGhostDepth(tangleID, cb) { - ghostDB.read(tangleID, (err, ghosts) => { - // prettier-ignore - if (err) return cb(new Error('ghosts.getMinDepth() failed', { cause: err })) - let minDepth = Infinity - for (const depth of ghosts.values()) { - if (depth < minDepth) minDepth = depth - } - cb(null, minDepth) - }) + function getMinGhostDepth(tangleID) { + const ghosts = ghostDB.read(tangleID) + let minDepth = Infinity + for (const depth of ghosts.values()) { + if (depth < minDepth) minDepth = depth + } + return minDepth } /** diff --git a/lib/utils.js b/lib/utils.js index 6617f9c..6e2f4b0 100644 --- a/lib/utils.js +++ b/lib/utils.js @@ -1,6 +1,12 @@ +const FS = require('fs') +const Path = require('path') // @ts-ignore const atomic = require('atomic-file-rw') -const Path = require('path') +// @ts-ignore +const multicb = require('multicb') + +// TODO: fs is only supported in node.js. We should support browser by replacing +// fs.readdir with a browser "file" that just lists all ghost files. /** * @template T @@ -31,12 +37,22 @@ class ReadyGate { for (const cb of this.#waiting) cb() this.#waiting.clear() } + + get isReady() { + return this.#ready + } } class GhostDB { /** @type {string} */ #basePath + /** @type {ReadyGate} */ + #loaded + + /** @type {Map>} */ + #maps + static encodingOpts = { encoding: 'utf-8' } /** @@ -44,6 +60,32 @@ class GhostDB { */ constructor(basePath) { this.#basePath = basePath + this.#maps = new Map() + this.#loaded = new ReadyGate() + + // Load all ghosts files into Maps in memory + // TODO this is opening up ALL the files at once, perhaps we should allow a + // specific max concurrent number of reads? i.e. not fully sequential + // neither fully parallel + if (FS.existsSync(basePath)) { + const done = multicb({ pluck: 1 }) + FS.readdirSync(basePath).forEach((tangleID) => { + const cb = done() + this.#read(tangleID, (err, map) => { + // prettier-ignore + if (err) return cb(new Error('GhostDB failed to read ghost file', { cause: err })) + this.#maps.set(tangleID, map) + cb() + }) + }) + done((/** @type {any} */ err) => { + // prettier-ignore + if (err) throw new Error('GhostDB failed to load', { cause: err }) + this.#loaded.setReady() + }) + } else { + this.#loaded.setReady() + } } /** @@ -69,63 +111,18 @@ class GhostDB { return new Map(JSON.parse(str)) } - /** - * @param {string} tangleID - * @param {string} msgID - * @param {number} depth - * @param {number} max - * @param {CB} cb - */ - save(tangleID, msgID, depth, max, cb) { - atomic.readFile( - this.#path(tangleID), - GhostDB.encodingOpts, - (/** @type {any} */ err, /** @type {any} */ str) => { - // Load Map - /** @type {Map} */ - let map; - if (err && err.code === 'ENOENT') map = new Map() - // prettier-ignore - else if (err) return cb(new Error('GhostDB.save() failed to read ghost file', { cause: err })) - else map = this.#deserialize(str) - - map.set(msgID, depth) - - // Garbage collect any ghost smaller than largestDepth - max - let largestDepth = -1 - for (const depth of map.values()) { - if (depth > largestDepth) largestDepth = depth - } - for (const [x, depth] of map.entries()) { - if (depth <= largestDepth - max) map.delete(x) - } - - atomic.writeFile( - this.#path(tangleID), - this.#serialize(map), - GhostDB.encodingOpts, - (/** @type {any} */ err) => { - // prettier-ignore - if (err) return cb(new Error('GhostDB.save() failed to write ghost file', { cause: err })) - else cb() - } - ) - } - ) - } - /** * @param {string} tangleID * @param {CB>} cb */ - read(tangleID, cb) { + #read(tangleID, cb) { atomic.readFile( this.#path(tangleID), GhostDB.encodingOpts, (/** @type {any} */ err, /** @type {any} */ str) => { // Load Map /** @type {Map} */ - let map; + let map if (err && err.code === 'ENOENT') map = new Map() // prettier-ignore else if (err) return cb(new Error('GhostDB.read() failed to read ghost file', { cause: err })) @@ -135,6 +132,61 @@ class GhostDB { } ) } + + /** + * @param {() => void} cb + */ + onReady(cb) { + this.#loaded.onReady(cb) + } + + /** + * @param {string} tangleID + * @param {string} msgID + * @param {number} depth + * @param {number} max + * @param {CB} cb + */ + save(tangleID, msgID, depth, max, cb) { + this.#loaded.onReady(() => { + if (!this.#maps.has(tangleID)) this.#maps.set(tangleID, new Map()) + const map = /** @type {Map} */ (this.#maps.get(tangleID)) + const newMap = new Map(map) + newMap.set(msgID, depth) + + // Garbage collect any ghost smaller than largestDepth - max + let largestDepth = -1 + for (const depth of newMap.values()) { + if (depth > largestDepth) largestDepth = depth + } + for (const [x, depth] of newMap.entries()) { + if (depth <= largestDepth - max) newMap.delete(x) + } + + atomic.writeFile( + this.#path(tangleID), + this.#serialize(newMap), + GhostDB.encodingOpts, + (/** @type {any} */ err) => { + // prettier-ignore + if (err) return cb(new Error('GhostDB.save() failed to write ghost file', { cause: err })) + this.#maps.set(tangleID, newMap) + cb() + } + ) + }) + } + + /** + * @param {string} tangleID + * @returns {Map} + */ + read(tangleID) { + if (!this.#loaded.isReady) { + throw new Error('GhostDB.read() called before loaded') + } + return this.#maps.get(tangleID) ?? new Map() + } } module.exports = { ReadyGate, GhostDB } diff --git a/package.json b/package.json index b917730..b1404a8 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "b4a": "~1.6.4", "bs58": "~5.0.0", "json-canon": "~1.0.0", + "multicb": "~1.2.2", "obz": "~1.1.0", "ppppp-keypair": "github:staltz/ppppp-keypair", "promisify-4loc": "~1.0.0", diff --git a/test/ghosts.tests.js b/test/ghosts.tests.js index 5044350..4a3bb10 100644 --- a/test/ghosts.tests.js +++ b/test/ghosts.tests.js @@ -34,7 +34,7 @@ test('ghosts.add, ghosts.get, ghosts.getMinDepth', async (t) => { } const feedID = peer.db.feed.getID(account, 'post') - const ghosts0 = await p(peer.db.ghosts.get)(feedID) + const ghosts0 = peer.db.ghosts.get(feedID) assert.deepEqual(ghosts0, [], 'no ghosts so far') await p(peer.db.ghosts.add)({ msg: msgIDs[0], tangle: feedID, max: MAX }) @@ -43,16 +43,16 @@ test('ghosts.add, ghosts.get, ghosts.getMinDepth', async (t) => { await p(peer.db.ghosts.add)({ msg: msgIDs[3], tangle: feedID, max: MAX }) await p(peer.db.ghosts.add)({ msg: msgIDs[4], tangle: feedID, max: MAX }) - const ghostsA = await p(peer.db.ghosts.get)(feedID) + const ghostsA = peer.db.ghosts.get(feedID) assert.deepEqual(ghostsA, msgIDs.slice(0, 5), 'ghosts so far') - const depthA = await p(peer.db.ghosts.getMinDepth)(feedID) + const depthA = peer.db.ghosts.getMinDepth(feedID) assert.equal(depthA, 1, 'min depth so far') await p(peer.db.ghosts.add)({ msg: msgIDs[5], tangle: feedID, max: MAX }) - const ghostsB = await p(peer.db.ghosts.get)(feedID) + const ghostsB = peer.db.ghosts.get(feedID) assert.deepEqual(ghostsB, msgIDs.slice(1, 6), 'ghosts so far') - const depthB = await p(peer.db.ghosts.getMinDepth)(feedID) + const depthB = peer.db.ghosts.getMinDepth(feedID) assert.equal(depthB, 2, 'min depth so far') await p(peer.close)(true)