pzp-db/lib/plugin.js

405 lines
12 KiB
JavaScript

const path = require('path')
const push = require('push-stream')
const AAOL = require('async-append-only-log')
const promisify = require('promisify-4loc')
const Obz = require('obz')
const { ReadyGate } = require('./utils')
const { decrypt, reEncrypt } = require('./encryption')
exports.name = 'db'
exports.init = function initMemDB(ssb, config) {
const hmacKey = null
const msgs = []
const feedFormats = new Map()
const encryptionFormats = new Map()
const onMsgAdded = Obz()
const latestMsgPerFeed = {
_map: new Map(), // feedId => nativeMsg
preupdateFromKVT(kvtf, i) {
const feedId = kvtf.feed ?? kvtf.value.author
this._map.set(feedId, i)
},
commitAllPreupdates() {
for (const i of this._map.values()) {
if (typeof i === 'number') {
this.updateFromKVT(msgs[i])
}
}
},
updateFromKVT(kvtf) {
const feedId = kvtf.feed ?? kvtf.value.author
const feedFormat = findFeedFormatForAuthor(feedId)
if (!feedFormat) {
console.warn('No feed format installed understands ' + feedId)
return
}
const msg = reEncrypt(kvtf)
const nativeMsg = feedFormat.toNativeMsg(msg.value, 'js')
this._map.set(feedId, nativeMsg)
},
update(feedId, nativeMsg) {
this._map.set(feedId, nativeMsg)
},
get(feedId) {
return this._map.get(feedId) ?? null
},
has(feedId) {
return this._map.has(feedId)
},
getAsKV(feedId, feedFormat) {
const nativeMsg = this._map.get(feedId)
if (!nativeMsg) return null
const feedFormat2 = feedFormat ?? findFeedFormatForAuthor(feedId)
if (!feedFormat2) {
throw new Error('No feed format installed understands ' + feedId)
}
const key = feedFormat2.getMsgId(nativeMsg, 'js')
const value = feedFormat2.fromNativeMsg(nativeMsg, 'js')
return { key, value }
},
deleteKVT(kvtf) {
const feedId = kvtf.feed ?? kvtf.value.author
const nativeMsg = this._map.get(feedId)
if (!nativeMsg) return
const feedFormat = findFeedFormatForAuthor(feedId)
if (!feedFormat) {
console.warn('No feed format installed understands ' + feedId)
return
}
const msgId = feedFormat.getMsgId(nativeMsg, 'js')
if (msgId === kvtf.key) this._map.delete(feedId)
},
delete(feedId) {
this._map.delete(feedId)
},
}
const log = AAOL(path.join(config.path, 'memdb-log.bin'), {
cacheSize: 1,
blockSize: 64 * 1024,
codec: {
encode(msg) {
return Buffer.from(JSON.stringify(msg), 'utf8')
},
decode(buf) {
return JSON.parse(buf.toString('utf8'))
},
},
validateRecord(buf) {
try {
JSON.parse(buf.toString('utf8'))
return true
} catch {
return false
}
},
})
ssb.close.hook(function (fn, args) {
log.close(() => {
fn.apply(this, args)
})
})
const scannedLog = new ReadyGate()
// setTimeout to let ssb.db.* secret-stack become available
setTimeout(() => {
let i = -1
log.stream({ offsets: true, values: true, sizes: true }).pipe(
push.drain(
function drainEach({ offset, value, size }) {
i += 1
if (!value) {
// deleted record
msgs.push(null)
return
}
// TODO: for performance, dont decrypt on startup, instead decrypt on
// demand, or decrypt in the background. Or then store the log with
// decrypted msgs and only encrypt when moving it to the network.
const msg = decrypt(value, ssb, config)
msg.meta ??= {}
msg.meta.offset = offset
msg.meta.size = size
msg.meta.seq = i
msgs.push(msg)
latestMsgPerFeed.preupdateFromKVT(msg, i)
},
function drainEnd(err) {
// prettier-ignore
if (err) throw new Error('Failed to initially scan the log', { cause: err });
latestMsgPerFeed.commitAllPreupdates()
scannedLog.setReady()
}
)
)
})
function logAppend(key, value, feedId, isOOO, cb) {
const kvt = {
key,
value,
timestamp: Date.now(),
}
if (feedId !== value.author) kvt.feed = feedId
if (isOOO) kvt.ooo = isOOO
log.append(kvt, (err, newOffset) => {
if (err) return cb(new Error('logAppend failed', { cause: err }))
const offset = newOffset // latestOffset
const size = Buffer.from(JSON.stringify(kvt), 'utf8').length
const seq = msgs.length
const kvtExposed = decrypt(kvt, ssb, config)
kvt.meta = kvtExposed.meta = { offset, size, seq }
msgs.push(kvtExposed)
cb(null, kvt)
})
}
function installFeedFormat(feedFormat) {
if (!feedFormat.encodings.includes('js')) {
// prettier-ignore
throw new Error(`Failed to install feed format "${feedFormat.name}" because it must support JS encoding`)
}
feedFormats.set(feedFormat.name, feedFormat)
}
function installEncryptionFormat(encryptionFormat) {
if (encryptionFormat.setup) {
const loaded = new ReadyGate()
encryptionFormat.setup(config, (err) => {
// prettier-ignore
if (err) throw new Error(`Failed to install encryption format "${encryptionFormat.name}"`, {cause: err});
loaded.setReady()
})
encryptionFormat.onReady = loaded.onReady.bind(loaded)
}
encryptionFormats.set(encryptionFormat.name, encryptionFormat)
}
function findFeedFormatForAuthor(author) {
for (const feedFormat of feedFormats.values()) {
if (feedFormat.isAuthor(author)) return feedFormat
}
return null
}
function findFeedFormatForNativeMsg(nativeMsg) {
for (const feedFormat of feedFormats.values()) {
if (feedFormat.isNativeMsg(nativeMsg)) return feedFormat
}
return null
}
function findEncryptionFormatFor(ciphertextJS) {
if (!ciphertextJS) return null
if (typeof ciphertextJS !== 'string') return null
const suffix = ciphertextJS.split('.').pop()
const encryptionFormat = encryptionFormats.get(suffix) ?? null
return encryptionFormat
}
function add(nativeMsg, cb) {
const feedFormat = findFeedFormatForNativeMsg(nativeMsg)
if (!feedFormat) {
// prettier-ignore
return cb(new Error('add() failed because no installed feed format understands the native message'))
}
const feedId = feedFormat.getFeedId(nativeMsg)
const prevNativeMsg = latestMsgPerFeed.get(feedId)
if (prevNativeMsg) {
feedFormat.validate(nativeMsg, prevNativeMsg, hmacKey, validationCB)
} else {
feedFormat.validateOOO(nativeMsg, hmacKey, validationCB)
}
function validationCB(err) {
// prettier-ignore
if (err) return cb(new Error('add() failed validation for feed format ' + feedFormat.name, {cause: err}))
const msgId = feedFormat.getMsgId(nativeMsg)
const msgVal = feedFormat.fromNativeMsg(nativeMsg)
latestMsgPerFeed.update(feedId, nativeMsg)
logAppend(msgId, msgVal, feedId, false, (err, kvt) => {
if (err) return cb(new Error('add() failed in the log', { cause: err }))
onMsgAdded.set({
kvt,
nativeMsg,
feedFormat: feedFormat.name,
})
cb(null, kvt)
})
}
}
function create(opts, cb) {
const keys = opts.keys ?? config.keys
const feedFormat = feedFormats.get(opts.feedFormat)
const encryptionFormat = encryptionFormats.get(opts.encryptionFormat)
// prettier-ignore
if (!feedFormat) return cb(new Error(`create() does not support feed format "${opts.feedFormat}"`))
// prettier-ignore
if (!feedFormat.isAuthor(keys.id)) return cb(new Error(`create() failed because keys.id ${keys.id} is not a valid author for feed format "${feedFormat.name}"`))
// prettier-ignore
if (opts.content.recps) {
if (!encryptionFormat) {
return cb(new Error(`create() does not support encryption format "${opts.encryptionFormat}"`))
}
}
if (!opts.content) return cb(new Error('create() requires a `content`'))
// Create full opts:
let provisionalNativeMsg
try {
provisionalNativeMsg = feedFormat.newNativeMsg({
timestamp: Date.now(),
...opts,
previous: null,
keys,
})
} catch (err) {
return cb(new Error('create() failed', { cause: err }))
}
const feedId = feedFormat.getFeedId(provisionalNativeMsg)
const previous = latestMsgPerFeed.getAsKV(feedId, feedFormat)
const fullOpts = {
timestamp: Date.now(),
...opts,
previous,
keys,
hmacKey,
}
// If opts ask for encryption, encrypt and put ciphertext in opts.content
const recps = fullOpts.content.recps
if (Array.isArray(recps) && recps.length > 0) {
const plaintext = feedFormat.toPlaintextBuffer(fullOpts)
const encryptOpts = {
...fullOpts,
keys,
recps,
previous: previous ? previous.key : null,
}
let ciphertextBuf
try {
ciphertextBuf = encryptionFormat.encrypt(plaintext, encryptOpts)
} catch (err) {
// prettier-ignore
return cb(new Error('create() failed to encrypt content', {cause: err}));
}
if (!ciphertextBuf) {
// prettier-ignore
return cb(new Error('create() failed to encrypt with ' + encryptionFormat.name))
}
const ciphertextBase64 = ciphertextBuf.toString('base64')
fullOpts.content = ciphertextBase64 + '.' + encryptionFormat.name
}
// Create the native message:
let nativeMsg
try {
nativeMsg = feedFormat.newNativeMsg(fullOpts)
} catch (err) {
return cb(new Error('create() failed', { cause: err }))
}
const msgId = feedFormat.getMsgId(nativeMsg)
const msgVal = feedFormat.fromNativeMsg(nativeMsg, 'js')
latestMsgPerFeed.update(feedId, nativeMsg)
// Encode the native message and append it to the log:
logAppend(msgId, msgVal, feedId, false, (err, kvt) => {
// prettier-ignore
if (err) return cb(new Error('create() failed to append the log', { cause: err }))
onMsgAdded.set({
kvt,
nativeMsg,
feedFormat: feedFormat.name,
})
cb(null, kvt)
})
}
function del(msgId, cb) {
const kvt = getKVT(msgId)
latestMsgPerFeed.deleteKVT(kvt)
msgs[kvt.meta.seq] = null
log.onDrain(() => {
log.del(kvt.meta.offset, cb)
})
}
function filterAsPullStream(fn) {
let i = 0
return function source(end, cb) {
if (end) return cb(end)
if (i >= msgs.length) return cb(true)
for (; i < msgs.length; i++) {
const msg = msgs[i]
if (msg && fn(msg, i, msgs)) {
i += 1
return cb(null, msg)
}
}
return cb(true)
}
}
function* filterAsIterator(fn) {
for (let i = 0; i < msgs.length; i++) {
const msg = msgs[i]
if (msg && fn(msg, i, msgs)) yield msg
}
}
function filterAsArray(fn) {
return msgs.filter(fn)
}
function forEach(fn) {
for (let i = 0; i < msgs.length; i++) if (msgs[i]) fn(msgs[i], i, msgs)
}
function getKVT(msgKey) {
for (let i = 0; i < msgs.length; i++) {
const msg = msgs[i]
if (msg && msg.key === msgKey) return msg
}
return null
}
function get(msgKey) {
return getKVT(msgKey)?.value
}
function loaded(cb) {
if (cb === void 0) return promisify(loaded)()
scannedLog.onReady(cb)
}
return {
// public
installFeedFormat,
installEncryptionFormat,
loaded,
add,
create,
del,
onMsgAdded,
filterAsPullStream,
filterAsIterator,
filterAsArray,
forEach,
getKVT,
get,
// internal
findEncryptionFormatFor,
findFeedFormatForAuthor,
}
}