diff --git a/lib/ghosts.js b/lib/ghosts.js index 5157a7b..6225412 100644 --- a/lib/ghosts.js +++ b/lib/ghosts.js @@ -3,7 +3,7 @@ const Path = require('path') const atomic = require('atomic-file-rw') const multicb = require('multicb') const mutexify = require('mutexify') -const ReadyGate = require('./utils/ready-gate') +const Doneable = require('./utils/doneable') // TODO: fs is only supported in node.js. We should support browser by replacing // fs.readdir with a browser "file" that just lists all ghost files. @@ -28,7 +28,7 @@ const ReadyGate = require('./utils/ready-gate') class Ghosts { /** @type {string} */ #basePath - /** @type {ReadyGate} */ + /** @type {Doneable} */ #loaded /** @type {Map>} */ #maps @@ -43,7 +43,7 @@ class Ghosts { constructor(basePath) { this.#basePath = basePath this.#maps = new Map() - this.#loaded = new ReadyGate() + this.#loaded = new Doneable() this.#writeLock = mutexify() // Load all ghosts files into Maps in memory @@ -64,10 +64,10 @@ class Ghosts { done((err, _) => { // prettier-ignore if (err) throw new Error('GhostDB failed to load', { cause: err }) - this.#loaded.setReady() + this.#loaded.done() }) } else { - this.#loaded.setReady() + this.#loaded.done() } } @@ -116,7 +116,7 @@ class Ghosts { * @param {() => void} cb */ onReady(cb) { - this.#loaded.onReady(cb) + this.#loaded.onDone(cb) } /** @@ -128,7 +128,7 @@ class Ghosts { */ save(tangleID, msgID, depth, span, cb) { this.#writeLock((unlock) => { - this.#loaded.onReady(() => { + this.#loaded.onDone(() => { if (!this.#maps.has(tangleID)) this.#maps.set(tangleID, new Map()) const map = this.#maps.get(tangleID) const newMap = new Map(/** @type {Map} */ (map)) @@ -165,7 +165,7 @@ class Ghosts { */ remove(tangleID, msgID, cb) { this.#writeLock((unlock) => { - this.#loaded.onReady(() => { + this.#loaded.onDone(() => { if (!this.#maps.has(tangleID)) return unlock(cb, null, void 0) const map = /** @type {Map} */ ( @@ -196,7 +196,7 @@ class Ghosts { * @returns {Map} */ read(tangleID) { - if (!this.#loaded.isReady) { + if (!this.#loaded.isDone) { throw new Error('GhostDB.read() called before loaded') } return this.#maps.get(tangleID) ?? new Map() diff --git a/lib/index.js b/lib/index.js index e620ce1..289255c 100644 --- a/lib/index.js +++ b/lib/index.js @@ -12,7 +12,7 @@ const { ACCOUNT_ANY, ACCOUNT_DOMAIN_PREFIX, } = require('./msg-v4/constants') -const ReadyGate = require('./utils/ready-gate') +const Doneable = require('./utils/doneable') const Ghosts = require('./ghosts') const { decrypt } = require('./encryption') @@ -193,6 +193,8 @@ function initDB(peer, config) { const recs = [] /** @type {WeakMap} */ const miscRegistry = new WeakMap() + /** @type {Map>} */ + const msgsBeingAdded = new Map() /** @type {Map} */ const encryptionFormats = new Map() /** @type {Obz} */ @@ -241,7 +243,7 @@ function initDB(peer, config) { }) }) - const scannedLog = new ReadyGate() + const scannedLog = new Doneable() // setTimeout to let peer.db.* secret-stack become available // needed by decrypt() setTimeout(() => { @@ -265,7 +267,7 @@ function initDB(peer, config) { function scanEnd(err) { // prettier-ignore if (err) throw new Error('Failed to initially scan the log', { cause: err }); - scannedLog.setReady() + scannedLog.done() } ) }) @@ -300,13 +302,13 @@ function initDB(peer, config) { */ function installEncryptionFormat(encryptionFormat) { if (encryptionFormat.setup) { - const loaded = new ReadyGate() + const loaded = new Doneable() encryptionFormat.setup(config, (/** @type {any} */ err) => { // prettier-ignore if (err) throw new Error(`Failed to install encryption format "${encryptionFormat.name}"`, {cause: err}); - loaded.setReady() + loaded.done() }) - encryptionFormat.onReady = loaded.onReady.bind(loaded) + encryptionFormat.onReady = loaded.onDone.bind(loaded) } encryptionFormats.set(encryptionFormat.name, encryptionFormat) } @@ -388,7 +390,7 @@ function initDB(peer, config) { */ function loaded(cb) { if (cb === void 0) return promisify(loaded)() - scannedLog.onReady(() => { + scannedLog.onDone(() => { ghosts.onReady(cb) }) } @@ -485,6 +487,12 @@ function initDB(peer, config) { function add(msg, tangleID, cb) { const msgID = MsgV4.getMsgID(msg) + if (msgsBeingAdded.has(msgID)) { + msgsBeingAdded.get(msgID)?.onDone(cb) + return + } + msgsBeingAdded.set(msgID, new Doneable()) + // TODO: optimize this. Perhaps have a Map() of msgID -> record // Or even better, a bloom filter. If you just want to answer no/perhaps. let rec @@ -505,7 +513,12 @@ function initDB(peer, config) { if (err) return cb(new Error('add() failed to remove ghost', { cause: err })) logAppend(msgID, msg, (err, rec) => { if (err) return cb(new Error('add() failed in the log', { cause: err })) - queueMicrotask(() => onRecordAdded.set(rec)) + const doneable = msgsBeingAdded.get(msgID) + msgsBeingAdded.delete(msgID) + queueMicrotask(() => { + doneable?.done([null, rec]) + onRecordAdded.set(rec) + }) cb(null, rec) }) }) diff --git a/lib/utils/doneable.js b/lib/utils/doneable.js new file mode 100644 index 0000000..d09d4a6 --- /dev/null +++ b/lib/utils/doneable.js @@ -0,0 +1,49 @@ +/** + * @template T + * @typedef {import('../index').CB} CB + */ + +/** + * @template T + * @typedef {[] | [Error] | [null, T]} Args + */ + +/** + * @template T + */ +class Doneable { + #waiting + #done + /** @type {Args | null} */ + #args + constructor() { + this.#waiting = new Set() + this.#done = false + this.#args = null + } + + /** + * @param {CB} cb + */ + onDone(cb) { + // @ts-ignore + if (this.#done) cb(...this.#args) + else this.#waiting.add(cb) + } + + /** + * @param {Args=} args + */ + done(args) { + this.#done = true + this.#args = args ?? [] + for (const cb of this.#waiting) cb(...this.#args) + this.#waiting.clear() + } + + get isDone() { + return this.#done + } +} + +module.exports = Doneable diff --git a/lib/utils/ready-gate.js b/lib/utils/ready-gate.js deleted file mode 100644 index ece500f..0000000 --- a/lib/utils/ready-gate.js +++ /dev/null @@ -1,28 +0,0 @@ -class ReadyGate { - #waiting - #ready - constructor() { - this.#waiting = new Set() - this.#ready = false - } - - /** - * @param {() => void} cb - */ - onReady(cb) { - if (this.#ready) cb() - else this.#waiting.add(cb) - } - - setReady() { - this.#ready = true - for (const cb of this.#waiting) cb() - this.#waiting.clear() - } - - get isReady() { - return this.#ready - } -} - -module.exports = ReadyGate diff --git a/test/add.test.js b/test/add.test.js index f994bb4..f230604 100644 --- a/test/add.test.js +++ b/test/add.test.js @@ -16,36 +16,49 @@ test('add()', async (t) => { const peer = createPeer({ keypair, path: DIR }) await peer.db.loaded() - const accountMsg0 = MsgV4.createAccount(keypair, 'person', 'aliceNonce') const id = MsgV4.getMsgID(accountMsg0) - await p(peer.db.add)(accountMsg0, id) + await t.test('basic use case', async () => { + await p(peer.db.add)(accountMsg0, id) - const rootMsg = MsgV4.createMoot(id, 'post', keypair) - const rootID = MsgV4.getMsgID(rootMsg) + const rootMsg = MsgV4.createMoot(id, 'post', keypair) + const rootID = MsgV4.getMsgID(rootMsg) - const recRoot = await p(peer.db.add)(rootMsg, rootID) - assert.equal(recRoot.msg.metadata.dataSize, 0, 'root msg added') - const tangle = new MsgV4.Tangle(rootID) - tangle.add(recRoot.id, recRoot.msg) + const recRoot = await p(peer.db.add)(rootMsg, rootID) + assert.equal(recRoot.msg.metadata.dataSize, 0, 'root msg added') + const tangle = new MsgV4.Tangle(rootID) + tangle.add(recRoot.id, recRoot.msg) - const inputMsg = MsgV4.create({ - keypair, - domain: 'post', - data: { text: 'This is the first post!' }, - account: id, - accountTips: [id], - tangles: { - [rootID]: tangle, - }, + const inputMsg = MsgV4.create({ + keypair, + domain: 'post', + data: { text: 'This is the first post!' }, + account: id, + accountTips: [id], + tangles: { + [rootID]: tangle, + }, + }) + + const rec = await p(peer.db.add)(inputMsg, null) // tangleID implicit + assert.equal(rec.msg.data.text, 'This is the first post!') + + const stats = await p(peer.db.log.stats)() + assert.deepEqual(stats, { totalBytes: 1662, deletedBytes: 0 }) }) - const rec = await p(peer.db.add)(inputMsg, null) // tangleID implicit - assert.equal(rec.msg.data.text, 'This is the first post!') + await t.test('concurrent add of the same msg appends just one', async () => { + const rootMsg = MsgV4.createMoot(id, 'whatever', keypair) + const rootID = MsgV4.getMsgID(rootMsg) + await Promise.all([ + p(peer.db.add)(rootMsg, rootID), + p(peer.db.add)(rootMsg, rootID), + ]) - const stats = await p(peer.db.log.stats)() - assert.deepEqual(stats, { totalBytes: 1662, deletedBytes: 0 }) + const stats = await p(peer.db.log.stats)() + assert.deepEqual(stats, { totalBytes: 2072, deletedBytes: 0 }) + }) await p(peer.close)(true) })