add() supports concurrency

This commit is contained in:
Andre Staltz 2024-01-31 13:12:55 +02:00
parent 73b9a80c73
commit 667b33779d
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
5 changed files with 113 additions and 66 deletions

View File

@ -3,7 +3,7 @@ const Path = require('path')
const atomic = require('atomic-file-rw')
const multicb = require('multicb')
const mutexify = require('mutexify')
const ReadyGate = require('./utils/ready-gate')
const Doneable = require('./utils/doneable')
// TODO: fs is only supported in node.js. We should support browser by replacing
// fs.readdir with a browser "file" that just lists all ghost files.
@ -28,7 +28,7 @@ const ReadyGate = require('./utils/ready-gate')
class Ghosts {
/** @type {string} */
#basePath
/** @type {ReadyGate} */
/** @type {Doneable<void>} */
#loaded
/** @type {Map<MsgID, Map<string, number>>} */
#maps
@ -43,7 +43,7 @@ class Ghosts {
constructor(basePath) {
this.#basePath = basePath
this.#maps = new Map()
this.#loaded = new ReadyGate()
this.#loaded = new Doneable()
this.#writeLock = mutexify()
// Load all ghosts files into Maps in memory
@ -64,10 +64,10 @@ class Ghosts {
done((err, _) => {
// prettier-ignore
if (err) throw new Error('GhostDB failed to load', { cause: err })
this.#loaded.setReady()
this.#loaded.done()
})
} else {
this.#loaded.setReady()
this.#loaded.done()
}
}
@ -116,7 +116,7 @@ class Ghosts {
* @param {() => void} cb
*/
onReady(cb) {
this.#loaded.onReady(cb)
this.#loaded.onDone(cb)
}
/**
@ -128,7 +128,7 @@ class Ghosts {
*/
save(tangleID, msgID, depth, span, cb) {
this.#writeLock((unlock) => {
this.#loaded.onReady(() => {
this.#loaded.onDone(() => {
if (!this.#maps.has(tangleID)) this.#maps.set(tangleID, new Map())
const map = this.#maps.get(tangleID)
const newMap = new Map(/** @type {Map<string, number>} */ (map))
@ -165,7 +165,7 @@ class Ghosts {
*/
remove(tangleID, msgID, cb) {
this.#writeLock((unlock) => {
this.#loaded.onReady(() => {
this.#loaded.onDone(() => {
if (!this.#maps.has(tangleID)) return unlock(cb, null, void 0)
const map = /** @type {Map<string, number>} */ (
@ -196,7 +196,7 @@ class Ghosts {
* @returns {Map<string, number>}
*/
read(tangleID) {
if (!this.#loaded.isReady) {
if (!this.#loaded.isDone) {
throw new Error('GhostDB.read() called before loaded')
}
return this.#maps.get(tangleID) ?? new Map()

View File

@ -12,7 +12,7 @@ const {
ACCOUNT_ANY,
ACCOUNT_DOMAIN_PREFIX,
} = require('./msg-v4/constants')
const ReadyGate = require('./utils/ready-gate')
const Doneable = require('./utils/doneable')
const Ghosts = require('./ghosts')
const { decrypt } = require('./encryption')
@ -193,6 +193,8 @@ function initDB(peer, config) {
const recs = []
/** @type {WeakMap<Rec, Misc>} */
const miscRegistry = new WeakMap()
/** @type {Map<MsgID, Doneable<RecPresent>>} */
const msgsBeingAdded = new Map()
/** @type {Map<string, EncryptionFormat>} */
const encryptionFormats = new Map()
/** @type {Obz<Rec>} */
@ -241,7 +243,7 @@ function initDB(peer, config) {
})
})
const scannedLog = new ReadyGate()
const scannedLog = new Doneable()
// setTimeout to let peer.db.* secret-stack become available
// needed by decrypt()
setTimeout(() => {
@ -265,7 +267,7 @@ function initDB(peer, config) {
function scanEnd(err) {
// prettier-ignore
if (err) throw new Error('Failed to initially scan the log', { cause: err });
scannedLog.setReady()
scannedLog.done()
}
)
})
@ -300,13 +302,13 @@ function initDB(peer, config) {
*/
function installEncryptionFormat(encryptionFormat) {
if (encryptionFormat.setup) {
const loaded = new ReadyGate()
const loaded = new Doneable()
encryptionFormat.setup(config, (/** @type {any} */ err) => {
// prettier-ignore
if (err) throw new Error(`Failed to install encryption format "${encryptionFormat.name}"`, {cause: err});
loaded.setReady()
loaded.done()
})
encryptionFormat.onReady = loaded.onReady.bind(loaded)
encryptionFormat.onReady = loaded.onDone.bind(loaded)
}
encryptionFormats.set(encryptionFormat.name, encryptionFormat)
}
@ -388,7 +390,7 @@ function initDB(peer, config) {
*/
function loaded(cb) {
if (cb === void 0) return promisify(loaded)()
scannedLog.onReady(() => {
scannedLog.onDone(() => {
ghosts.onReady(cb)
})
}
@ -485,6 +487,12 @@ function initDB(peer, config) {
function add(msg, tangleID, cb) {
const msgID = MsgV4.getMsgID(msg)
if (msgsBeingAdded.has(msgID)) {
msgsBeingAdded.get(msgID)?.onDone(cb)
return
}
msgsBeingAdded.set(msgID, new Doneable())
// TODO: optimize this. Perhaps have a Map() of msgID -> record
// Or even better, a bloom filter. If you just want to answer no/perhaps.
let rec
@ -505,7 +513,12 @@ function initDB(peer, config) {
if (err) return cb(new Error('add() failed to remove ghost', { cause: err }))
logAppend(msgID, msg, (err, rec) => {
if (err) return cb(new Error('add() failed in the log', { cause: err }))
queueMicrotask(() => onRecordAdded.set(rec))
const doneable = msgsBeingAdded.get(msgID)
msgsBeingAdded.delete(msgID)
queueMicrotask(() => {
doneable?.done([null, rec])
onRecordAdded.set(rec)
})
cb(null, rec)
})
})

49
lib/utils/doneable.js Normal file
View File

@ -0,0 +1,49 @@
/**
* @template T
* @typedef {import('../index').CB<T>} CB
*/
/**
* @template T
* @typedef {[] | [Error] | [null, T]} Args
*/
/**
* @template T
*/
class Doneable {
#waiting
#done
/** @type {Args<T> | null} */
#args
constructor() {
this.#waiting = new Set()
this.#done = false
this.#args = null
}
/**
* @param {CB<T>} cb
*/
onDone(cb) {
// @ts-ignore
if (this.#done) cb(...this.#args)
else this.#waiting.add(cb)
}
/**
* @param {Args<T>=} args
*/
done(args) {
this.#done = true
this.#args = args ?? []
for (const cb of this.#waiting) cb(...this.#args)
this.#waiting.clear()
}
get isDone() {
return this.#done
}
}
module.exports = Doneable

View File

@ -1,28 +0,0 @@
class ReadyGate {
#waiting
#ready
constructor() {
this.#waiting = new Set()
this.#ready = false
}
/**
* @param {() => void} cb
*/
onReady(cb) {
if (this.#ready) cb()
else this.#waiting.add(cb)
}
setReady() {
this.#ready = true
for (const cb of this.#waiting) cb()
this.#waiting.clear()
}
get isReady() {
return this.#ready
}
}
module.exports = ReadyGate

View File

@ -16,10 +16,10 @@ test('add()', async (t) => {
const peer = createPeer({ keypair, path: DIR })
await peer.db.loaded()
const accountMsg0 = MsgV4.createAccount(keypair, 'person', 'aliceNonce')
const id = MsgV4.getMsgID(accountMsg0)
await t.test('basic use case', async () => {
await p(peer.db.add)(accountMsg0, id)
const rootMsg = MsgV4.createMoot(id, 'post', keypair)
@ -46,6 +46,19 @@ test('add()', async (t) => {
const stats = await p(peer.db.log.stats)()
assert.deepEqual(stats, { totalBytes: 1662, deletedBytes: 0 })
})
await t.test('concurrent add of the same msg appends just one', async () => {
const rootMsg = MsgV4.createMoot(id, 'whatever', keypair)
const rootID = MsgV4.getMsgID(rootMsg)
await Promise.all([
p(peer.db.add)(rootMsg, rootID),
p(peer.db.add)(rootMsg, rootID),
])
const stats = await p(peer.db.log.stats)()
assert.deepEqual(stats, { totalBytes: 2072, deletedBytes: 0 })
})
await p(peer.close)(true)
})