fix ghosts.add() against concurrent writes

This commit is contained in:
Andre Staltz 2023-10-26 12:06:06 +03:00
parent 3fccd4d661
commit dea38e4c1a
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
3 changed files with 102 additions and 41 deletions

View File

@ -4,6 +4,8 @@ const Path = require('path')
const atomic = require('atomic-file-rw')
// @ts-ignore
const multicb = require('multicb')
// @ts-ignore
const mutexify = require('mutexify')
const ReadyGate = require('./utils/ready-gate')
// TODO: fs is only supported in node.js. We should support browser by replacing
@ -27,6 +29,9 @@ class Ghosts {
/** @type {Map<string, Map<string, number>>} */
#maps
/** @type {(fn: (unlock: (cb: CB<void>, ...args: ([Error] | [null, null])) => void) => void) => void} */
#writeLock
static encodingOpts = { encoding: 'utf-8' }
/**
@ -36,6 +41,7 @@ class Ghosts {
this.#basePath = basePath
this.#maps = new Map()
this.#loaded = new ReadyGate()
this.#writeLock = mutexify()
// Load all ghosts files into Maps in memory
// TODO this is opening up ALL the files at once, perhaps we should allow a
@ -122,32 +128,34 @@ class Ghosts {
* @param {CB<void>} cb
*/
save(tangleID, msgID, depth, span, cb) {
this.#loaded.onReady(() => {
if (!this.#maps.has(tangleID)) this.#maps.set(tangleID, new Map())
const map = /** @type {Map<string, number>} */ (this.#maps.get(tangleID))
const newMap = new Map(map)
newMap.set(msgID, depth)
this.#writeLock((unlock) => {
this.#loaded.onReady(() => {
if (!this.#maps.has(tangleID)) this.#maps.set(tangleID, new Map())
const map = this.#maps.get(tangleID)
const newMap = new Map(/** @type {Map<string, number>} */ (map))
newMap.set(msgID, depth)
// Garbage collect any ghost smaller than largestDepth - span
let largestDepth = -1
for (const depth of newMap.values()) {
if (depth > largestDepth) largestDepth = depth
}
for (const [x, depth] of newMap.entries()) {
if (depth <= largestDepth - span) newMap.delete(x)
}
atomic.writeFile(
this.#path(tangleID),
this.#serialize(newMap),
Ghosts.encodingOpts,
(/** @type {any} */ err) => {
// prettier-ignore
if (err) return cb(new Error('GhostDB.save() failed to write ghost file', { cause: err }))
this.#maps.set(tangleID, newMap)
cb()
// Garbage collect any ghost smaller than largestDepth - span
let largestDepth = -1
for (const depth of newMap.values()) {
if (depth > largestDepth) largestDepth = depth
}
)
for (const [x, depth] of newMap.entries()) {
if (depth <= largestDepth - span) newMap.delete(x)
}
atomic.writeFile(
this.#path(tangleID),
this.#serialize(newMap),
Ghosts.encodingOpts,
(/** @type {any} */ err) => {
// prettier-ignore
if (err) return unlock(cb, new Error('GhostDB.save() failed to write ghost file', { cause: err }))
this.#maps.set(tangleID, newMap)
unlock(cb, null, null)
}
)
})
})
}
@ -157,26 +165,30 @@ class Ghosts {
* @param {CB<void>} cb
*/
remove(tangleID, msgID, cb) {
this.#loaded.onReady(() => {
if (!this.#maps.has(tangleID)) return cb()
this.#writeLock((unlock) => {
this.#loaded.onReady(() => {
if (!this.#maps.has(tangleID)) return unlock(cb, null, null)
const map = /** @type {Map<string, number>} */ (this.#maps.get(tangleID))
if (!map.has(msgID)) return cb()
const map = /** @type {Map<string, number>} */ (
this.#maps.get(tangleID)
)
if (!map.has(msgID)) return unlock(cb, null, null)
const newMap = new Map(map)
newMap.delete(msgID)
const newMap = new Map(map)
newMap.delete(msgID)
atomic.writeFile(
this.#path(tangleID),
this.#serialize(newMap),
Ghosts.encodingOpts,
(/** @type {any} */ err) => {
// prettier-ignore
if (err) return cb(new Error('GhostDB.save() failed to write ghost file', { cause: err }))
this.#maps.set(tangleID, newMap)
cb()
}
)
atomic.writeFile(
this.#path(tangleID),
this.#serialize(newMap),
Ghosts.encodingOpts,
(/** @type {any} */ err) => {
// prettier-ignore
if (err) return unlock(cb,new Error('GhostDB.save() failed to write ghost file', { cause: err }))
this.#maps.set(tangleID, newMap)
unlock(cb, null, null)
}
)
})
})
}

View File

@ -34,6 +34,7 @@
"bs58": "~5.0.0",
"json-canon": "~1.0.0",
"multicb": "~1.2.2",
"mutexify": "~1.4.0",
"obz": "~1.1.0",
"ppppp-keypair": "github:staltz/ppppp-keypair",
"promisify-4loc": "~1.0.0",

View File

@ -57,3 +57,51 @@ test('ghosts.add, ghosts.get, ghosts.getMinDepth', async (t) => {
await p(peer.close)(true)
})
test('ghosts.add queues very-concurrent calls', async (t) => {
const peer = SecretStack({ appKey: caps.shse })
.use(require('../lib'))
.use(require('ssb-box'))
.call(null, { keypair, path: DIR })
await peer.db.loaded()
const account = await p(peer.db.account.create)({ domain: 'person' })
const SPAN = 5
let msgIDs = []
for (let i = 0; i < 10; i++) {
const rec = await p(peer.db.feed.publish)({
account,
domain: 'post',
data: { text: 'hello ' + i },
})
msgIDs.push(rec.id)
}
const tangleID = peer.db.feed.getID(account, 'post')
const ghosts0 = peer.db.ghosts.get(tangleID)
assert.deepEqual(ghosts0, [], 'no ghosts so far')
await Promise.all([
p(peer.db.ghosts.add)({ msgID: msgIDs[0], tangleID, span: SPAN }),
p(peer.db.ghosts.add)({ msgID: msgIDs[1], tangleID, span: SPAN }),
p(peer.db.ghosts.add)({ msgID: msgIDs[2], tangleID, span: SPAN }),
p(peer.db.ghosts.add)({ msgID: msgIDs[3], tangleID, span: SPAN }),
p(peer.db.ghosts.add)({ msgID: msgIDs[4], tangleID, span: SPAN }),
])
const ghostsA = peer.db.ghosts.get(tangleID)
assert.deepEqual(ghostsA, msgIDs.slice(0, 5), 'ghosts so far')
const depthA = peer.db.ghosts.getMinDepth(tangleID)
assert.equal(depthA, 1, 'min depth so far')
await p(peer.db.ghosts.add)({ msgID: msgIDs[5], tangleID, span: SPAN })
const ghostsB = peer.db.ghosts.get(tangleID)
assert.deepEqual(ghostsB, msgIDs.slice(1, 6), 'ghosts so far')
const depthB = peer.db.ghosts.getMinDepth(tangleID)
assert.equal(depthB, 2, 'min depth so far')
await p(peer.close)(true)
})