From dea38e4c1ae7e71fd5bbb78db064e6bdbff9c302 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Thu, 26 Oct 2023 12:06:06 +0300 Subject: [PATCH] fix ghosts.add() against concurrent writes --- lib/ghosts.js | 94 +++++++++++++++++++++++++------------------- package.json | 1 + test/ghosts.tests.js | 48 ++++++++++++++++++++++ 3 files changed, 102 insertions(+), 41 deletions(-) diff --git a/lib/ghosts.js b/lib/ghosts.js index 15d93a9..e544b3d 100644 --- a/lib/ghosts.js +++ b/lib/ghosts.js @@ -4,6 +4,8 @@ const Path = require('path') const atomic = require('atomic-file-rw') // @ts-ignore const multicb = require('multicb') +// @ts-ignore +const mutexify = require('mutexify') const ReadyGate = require('./utils/ready-gate') // TODO: fs is only supported in node.js. We should support browser by replacing @@ -27,6 +29,9 @@ class Ghosts { /** @type {Map>} */ #maps + /** @type {(fn: (unlock: (cb: CB, ...args: ([Error] | [null, null])) => void) => void) => void} */ + #writeLock + static encodingOpts = { encoding: 'utf-8' } /** @@ -36,6 +41,7 @@ class Ghosts { this.#basePath = basePath this.#maps = new Map() this.#loaded = new ReadyGate() + this.#writeLock = mutexify() // Load all ghosts files into Maps in memory // TODO this is opening up ALL the files at once, perhaps we should allow a @@ -122,32 +128,34 @@ class Ghosts { * @param {CB} cb */ save(tangleID, msgID, depth, span, cb) { - this.#loaded.onReady(() => { - if (!this.#maps.has(tangleID)) this.#maps.set(tangleID, new Map()) - const map = /** @type {Map} */ (this.#maps.get(tangleID)) - const newMap = new Map(map) - newMap.set(msgID, depth) + this.#writeLock((unlock) => { + this.#loaded.onReady(() => { + if (!this.#maps.has(tangleID)) this.#maps.set(tangleID, new Map()) + const map = this.#maps.get(tangleID) + const newMap = new Map(/** @type {Map} */ (map)) + newMap.set(msgID, depth) - // Garbage collect any ghost smaller than largestDepth - span - let largestDepth = -1 - for (const depth of newMap.values()) { - if (depth > largestDepth) largestDepth = depth - } - for (const [x, depth] of newMap.entries()) { - if (depth <= largestDepth - span) newMap.delete(x) - } - - atomic.writeFile( - this.#path(tangleID), - this.#serialize(newMap), - Ghosts.encodingOpts, - (/** @type {any} */ err) => { - // prettier-ignore - if (err) return cb(new Error('GhostDB.save() failed to write ghost file', { cause: err })) - this.#maps.set(tangleID, newMap) - cb() + // Garbage collect any ghost smaller than largestDepth - span + let largestDepth = -1 + for (const depth of newMap.values()) { + if (depth > largestDepth) largestDepth = depth } - ) + for (const [x, depth] of newMap.entries()) { + if (depth <= largestDepth - span) newMap.delete(x) + } + + atomic.writeFile( + this.#path(tangleID), + this.#serialize(newMap), + Ghosts.encodingOpts, + (/** @type {any} */ err) => { + // prettier-ignore + if (err) return unlock(cb, new Error('GhostDB.save() failed to write ghost file', { cause: err })) + this.#maps.set(tangleID, newMap) + unlock(cb, null, null) + } + ) + }) }) } @@ -157,26 +165,30 @@ class Ghosts { * @param {CB} cb */ remove(tangleID, msgID, cb) { - this.#loaded.onReady(() => { - if (!this.#maps.has(tangleID)) return cb() + this.#writeLock((unlock) => { + this.#loaded.onReady(() => { + if (!this.#maps.has(tangleID)) return unlock(cb, null, null) - const map = /** @type {Map} */ (this.#maps.get(tangleID)) - if (!map.has(msgID)) return cb() + const map = /** @type {Map} */ ( + this.#maps.get(tangleID) + ) + if (!map.has(msgID)) return unlock(cb, null, null) - const newMap = new Map(map) - newMap.delete(msgID) + const newMap = new Map(map) + newMap.delete(msgID) - atomic.writeFile( - this.#path(tangleID), - this.#serialize(newMap), - Ghosts.encodingOpts, - (/** @type {any} */ err) => { - // prettier-ignore - if (err) return cb(new Error('GhostDB.save() failed to write ghost file', { cause: err })) - this.#maps.set(tangleID, newMap) - cb() - } - ) + atomic.writeFile( + this.#path(tangleID), + this.#serialize(newMap), + Ghosts.encodingOpts, + (/** @type {any} */ err) => { + // prettier-ignore + if (err) return unlock(cb,new Error('GhostDB.save() failed to write ghost file', { cause: err })) + this.#maps.set(tangleID, newMap) + unlock(cb, null, null) + } + ) + }) }) } diff --git a/package.json b/package.json index b1404a8..510a51e 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "bs58": "~5.0.0", "json-canon": "~1.0.0", "multicb": "~1.2.2", + "mutexify": "~1.4.0", "obz": "~1.1.0", "ppppp-keypair": "github:staltz/ppppp-keypair", "promisify-4loc": "~1.0.0", diff --git a/test/ghosts.tests.js b/test/ghosts.tests.js index ee5b97c..388a6cd 100644 --- a/test/ghosts.tests.js +++ b/test/ghosts.tests.js @@ -57,3 +57,51 @@ test('ghosts.add, ghosts.get, ghosts.getMinDepth', async (t) => { await p(peer.close)(true) }) + +test('ghosts.add queues very-concurrent calls', async (t) => { + const peer = SecretStack({ appKey: caps.shse }) + .use(require('../lib')) + .use(require('ssb-box')) + .call(null, { keypair, path: DIR }) + + await peer.db.loaded() + + const account = await p(peer.db.account.create)({ domain: 'person' }) + const SPAN = 5 + + let msgIDs = [] + for (let i = 0; i < 10; i++) { + const rec = await p(peer.db.feed.publish)({ + account, + domain: 'post', + data: { text: 'hello ' + i }, + }) + msgIDs.push(rec.id) + } + const tangleID = peer.db.feed.getID(account, 'post') + + const ghosts0 = peer.db.ghosts.get(tangleID) + assert.deepEqual(ghosts0, [], 'no ghosts so far') + + await Promise.all([ + p(peer.db.ghosts.add)({ msgID: msgIDs[0], tangleID, span: SPAN }), + p(peer.db.ghosts.add)({ msgID: msgIDs[1], tangleID, span: SPAN }), + p(peer.db.ghosts.add)({ msgID: msgIDs[2], tangleID, span: SPAN }), + p(peer.db.ghosts.add)({ msgID: msgIDs[3], tangleID, span: SPAN }), + p(peer.db.ghosts.add)({ msgID: msgIDs[4], tangleID, span: SPAN }), + ]) + + const ghostsA = peer.db.ghosts.get(tangleID) + assert.deepEqual(ghostsA, msgIDs.slice(0, 5), 'ghosts so far') + const depthA = peer.db.ghosts.getMinDepth(tangleID) + assert.equal(depthA, 1, 'min depth so far') + + await p(peer.db.ghosts.add)({ msgID: msgIDs[5], tangleID, span: SPAN }) + + const ghostsB = peer.db.ghosts.get(tangleID) + assert.deepEqual(ghostsB, msgIDs.slice(1, 6), 'ghosts so far') + const depthB = peer.db.ghosts.getMinDepth(tangleID) + assert.equal(depthB, 2, 'min depth so far') + + await p(peer.close)(true) +})