mirror of https://codeberg.org/pzp/pzp-db.git
support erase() persistence
This commit is contained in:
parent
9e7feb3d41
commit
ae122c815e
|
@ -5,7 +5,8 @@ const MsgV3 = require('./msg-v3')
|
|||
/**
|
||||
* @typedef {import('./index').Msg} Msg
|
||||
* @typedef {import('./index').RecPresent} RecPresent
|
||||
* @typedef {RecPresent['misc']} Misc
|
||||
* @typedef {import('./index').Rec} Rec
|
||||
* @typedef {import('./index').Misc} Misc
|
||||
* @typedef {import('ppppp-keypair').Keypair} Keypair
|
||||
*
|
||||
* @typedef {Buffer | Uint8Array} B4A
|
||||
|
@ -45,11 +46,11 @@ function keypairToSSBKeys(keypair) {
|
|||
const decryptCache = new WeakMap()
|
||||
|
||||
/**
|
||||
* @template {{msg: Msg, misc?: Misc}} T
|
||||
* @template {{msg: Msg}} T
|
||||
* @param {T} rec
|
||||
* @param {any} peer
|
||||
* @param {any} config
|
||||
* @returns {T & {misc?: Misc}}
|
||||
* @returns {T}
|
||||
*/
|
||||
function decrypt(rec, peer, config) {
|
||||
if (decryptCache.has(rec)) return decryptCache.get(rec)
|
||||
|
@ -73,7 +74,7 @@ function decrypt(rec, peer, config) {
|
|||
...rec,
|
||||
msg: msgDecrypted,
|
||||
misc: {
|
||||
...rec.misc,
|
||||
// ...rec.misc,
|
||||
private: true,
|
||||
originalData: data,
|
||||
encryptionFormat: encryptionFormat.name,
|
||||
|
@ -87,20 +88,20 @@ function decrypt(rec, peer, config) {
|
|||
* @param {RecPresent} rec
|
||||
* @returns {RecPresent}
|
||||
*/
|
||||
function reEncrypt(rec) {
|
||||
return {
|
||||
id: rec.id,
|
||||
msg: { ...rec.msg, data: rec.misc.originalData },
|
||||
received: rec.received,
|
||||
misc: {
|
||||
seq: rec.misc.seq,
|
||||
offset: rec.misc.offset,
|
||||
size: rec.misc.size,
|
||||
},
|
||||
}
|
||||
}
|
||||
// function reEncrypt(rec) {
|
||||
// return {
|
||||
// id: rec.id,
|
||||
// msg: { ...rec.msg, data: rec.misc.originalData },
|
||||
// received: rec.received,
|
||||
// misc: {
|
||||
// seq: rec.misc.seq,
|
||||
// offset: rec.misc.offset,
|
||||
// size: rec.misc.size,
|
||||
// },
|
||||
// }
|
||||
// }
|
||||
|
||||
module.exports = {
|
||||
decrypt,
|
||||
reEncrypt,
|
||||
// reEncrypt,
|
||||
}
|
||||
|
|
75
lib/index.js
75
lib/index.js
|
@ -34,11 +34,6 @@ const { decrypt } = require('./encryption')
|
|||
* id?: never;
|
||||
* msg?: never;
|
||||
* received?: never;
|
||||
* misc: {
|
||||
* offset: number;
|
||||
* size: number;
|
||||
* seq: number;
|
||||
* };
|
||||
* }} RecDeleted
|
||||
*
|
||||
* @typedef {{
|
||||
|
@ -51,16 +46,17 @@ const { decrypt } = require('./encryption')
|
|||
* id: MsgID;
|
||||
* msg: Msg;
|
||||
* received: number;
|
||||
* misc: {
|
||||
* offset: number;
|
||||
* size: number;
|
||||
* seq: number;
|
||||
* private?: boolean;
|
||||
* originalData?: any;
|
||||
* encryptionFormat?: string;
|
||||
* }
|
||||
* }} RecPresent
|
||||
*
|
||||
* @typedef {{
|
||||
* offset: number;
|
||||
* size: number;
|
||||
* seq: number;
|
||||
* private?: boolean;
|
||||
* originalData?: any;
|
||||
* encryptionFormat?: string;
|
||||
* }} Misc
|
||||
*
|
||||
* @typedef {RecPresent | RecDeleted} Rec
|
||||
*/
|
||||
|
||||
|
@ -127,9 +123,12 @@ class DBTangle extends MsgV3.Tangle {
|
|||
* @param {{ path: string; keypair: Keypair; }} config
|
||||
*/
|
||||
function initDB(peer, config) {
|
||||
/** @type {Array<Rec>} */
|
||||
/** @type {Array<Rec | null>} */
|
||||
const recs = []
|
||||
|
||||
/** @type {WeakMap<Rec, Misc>} */
|
||||
const miscRegistry = new WeakMap()
|
||||
|
||||
/** @type {Map<string, EncryptionFormat>} */
|
||||
const encryptionFormats = new Map()
|
||||
|
||||
|
@ -181,26 +180,21 @@ function initDB(peer, config) {
|
|||
// setTimeout to let peer.db.* secret-stack become available
|
||||
// needed by decrypt()
|
||||
setTimeout(() => {
|
||||
let i = -1
|
||||
let seq = -1
|
||||
log.scan(
|
||||
function scanEach(offset, recInLog, size) {
|
||||
i += 1
|
||||
seq += 1
|
||||
if (!recInLog) {
|
||||
// deleted record
|
||||
/** @type {RecDeleted} */
|
||||
const rec = { misc: { offset, size, seq: i } }
|
||||
recs.push(rec)
|
||||
recs.push(null)
|
||||
return
|
||||
}
|
||||
// TODO: for performance, dont decrypt on startup, instead decrypt on
|
||||
// demand, or decrypt in the background. Or then store the log with
|
||||
// decrypted msgs and only encrypt when moving it to the network.
|
||||
/** @type {RecPresent} */
|
||||
const rec = /** @type {any} */ (decrypt(recInLog, peer, config))
|
||||
rec.misc ??= /** @type {Rec['misc']} */ ({})
|
||||
rec.misc.offset = offset
|
||||
rec.misc.size = size
|
||||
rec.misc.seq = i
|
||||
const rec = decrypt(recInLog, peer, config)
|
||||
miscRegistry.set(rec, { offset, size, seq })
|
||||
recs.push(rec)
|
||||
},
|
||||
function scanEnd(err) {
|
||||
|
@ -227,11 +221,11 @@ function initDB(peer, config) {
|
|||
if (err) return cb(new Error('logAppend failed', { cause: err }))
|
||||
const size = b4a.from(JSON.stringify(recInLog), 'utf8').length
|
||||
const seq = recs.length
|
||||
// FIXME: where do we put originalData ???
|
||||
const recExposed = decrypt(recInLog, peer, config)
|
||||
const rec = /** @type {RecPresent} */ (recInLog);
|
||||
rec.misc = { offset, size, seq }
|
||||
recExposed.misc = {...recExposed.misc, ...rec.misc}
|
||||
recs.push(/** @type {any} */ (recExposed))
|
||||
const rec = /** @type {RecPresent} */ (recInLog)
|
||||
miscRegistry.set(rec, { offset, size, seq })
|
||||
recs.push(recExposed)
|
||||
cb(null, rec)
|
||||
})
|
||||
}
|
||||
|
@ -938,8 +932,13 @@ function initDB(peer, config) {
|
|||
const rec = getRecord(msgID)
|
||||
if (!rec) return cb()
|
||||
if (!rec.msg) return cb()
|
||||
const { offset, size, seq } = rec.misc
|
||||
recs[rec.misc.seq] = { misc: { offset, size, seq } }
|
||||
const misc = miscRegistry.get(rec)
|
||||
const seq = misc?.seq ?? -1
|
||||
const offset = misc?.offset ?? -1
|
||||
if (seq === -1) {
|
||||
return cb(new Error('del() failed to find record in miscRegistry'))
|
||||
}
|
||||
recs[seq] = null
|
||||
log.onDrain(() => {
|
||||
log.del(offset, cb)
|
||||
})
|
||||
|
@ -1022,9 +1021,17 @@ function initDB(peer, config) {
|
|||
if (!rec) return cb()
|
||||
if (!rec.msg) return cb()
|
||||
if (!rec.msg.data) return cb()
|
||||
recs[rec.misc.seq].msg = MsgV3.erase(rec.msg)
|
||||
// FIXME: persist this change to disk!! Not supported by AAOL yet
|
||||
cb()
|
||||
rec.msg = MsgV3.erase(rec.msg)
|
||||
const misc = miscRegistry.get(rec)
|
||||
const seq = misc?.seq ?? -1
|
||||
const offset = misc?.offset ?? -1
|
||||
if (seq === -1) {
|
||||
return cb(new Error('erase() failed to find record in miscRegistry'))
|
||||
}
|
||||
recs[seq] = rec
|
||||
log.onDrain(() => {
|
||||
log.overwrite(offset, rec, cb)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1038,7 +1045,7 @@ function initDB(peer, config) {
|
|||
function* msgs() {
|
||||
for (let i = 0; i < recs.length; i++) {
|
||||
const rec = recs[i]
|
||||
if (rec.msg) yield rec.msg
|
||||
if (rec?.msg) yield rec.msg
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -681,7 +681,7 @@ function Log(filename, opts) {
|
|||
const oldDataLength = Record.readDataLength(blockBufNow, offsetInBlock)
|
||||
const oldEmptyLength = Record.readEmptyLength(blockBufNow, offsetInBlock)
|
||||
// Make sure encodedData fits inside existing record
|
||||
if (Record.size(encodedData) > oldDataLength + oldEmptyLength) {
|
||||
if (encodedData.length > oldDataLength + oldEmptyLength) {
|
||||
return cb(overwriteLargerThanOld())
|
||||
}
|
||||
const newEmptyLength = oldDataLength - encodedData.length
|
||||
|
|
|
@ -68,40 +68,44 @@ test('erase()', async (t) => {
|
|||
|
||||
await p(peer.close)(true)
|
||||
|
||||
// FIXME:
|
||||
// const log = AAOL(path.join(DIR, 'db.bin'), {
|
||||
// cacheSize: 1,
|
||||
// blockSize: 64 * 1024,
|
||||
// codec: {
|
||||
// encode(msg) {
|
||||
// return Buffer.from(JSON.stringify(msg), 'utf8')
|
||||
// },
|
||||
// decode(buf) {
|
||||
// return JSON.parse(buf.toString('utf8'))
|
||||
// },
|
||||
// },
|
||||
// })
|
||||
const log = Log(path.join(DIR, 'db.bin'), {
|
||||
cacheSize: 1,
|
||||
blockSize: 64 * 1024,
|
||||
codec: {
|
||||
encode(msg) {
|
||||
return Buffer.from(JSON.stringify(msg), 'utf8')
|
||||
},
|
||||
decode(buf) {
|
||||
return JSON.parse(buf.toString('utf8'))
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// const persistedMsgs = await new Promise((resolve, reject) => {
|
||||
// let persistedMsgs = []
|
||||
// log.stream({ offsets: true, values: true, sizes: true }).pipe(
|
||||
// push.drain(
|
||||
// function drainEach({ offset, value, size }) {
|
||||
// if (value) {
|
||||
// persistedMsgs.push(value.msg)
|
||||
// }
|
||||
// },
|
||||
// function drainEnd(err) {
|
||||
// if (err) return reject(err)
|
||||
// resolve(persistedMsgs)
|
||||
// }
|
||||
// )
|
||||
// )
|
||||
// })
|
||||
const persistedMsgs = await new Promise((resolve, reject) => {
|
||||
let persistedMsgs = []
|
||||
log.scan(
|
||||
function drainEach(offset, rec, size) {
|
||||
if (rec) {
|
||||
persistedMsgs.push(rec.msg)
|
||||
}
|
||||
},
|
||||
function drainEnd(err) {
|
||||
if (err) return reject(err)
|
||||
resolve(persistedMsgs)
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
// t.deepEqual(
|
||||
// persistedMsgs.filter((msg) => msg.content).map((msg) => msg.content.text),
|
||||
// ['m0', 'm1', 'm3', 'm4'],
|
||||
// 'msgs in disk after the delete'
|
||||
// )
|
||||
const afterReopen = []
|
||||
for (const msg of persistedMsgs) {
|
||||
if (msg.data && msg.metadata.account?.length > 4) {
|
||||
afterReopen.push(msg.data.text)
|
||||
}
|
||||
}
|
||||
|
||||
assert.deepEqual(
|
||||
afterReopen,
|
||||
['m0', 'm1', 'm3', 'm4'],
|
||||
'4 msgs after the erase'
|
||||
)
|
||||
})
|
||||
|
|
|
@ -35,7 +35,7 @@ test('records() iterator', async (t) => {
|
|||
for (const rec of peer.db.records()) {
|
||||
if (!rec.msg.data) continue
|
||||
if (rec.msg.metadata.account === 'self') continue
|
||||
assert.ok(rec.misc.size > rec.msg.metadata.dataSize, 'size > dataSize')
|
||||
assert.ok(rec.received, 'received')
|
||||
count++
|
||||
}
|
||||
assert.equal(count, 6)
|
||||
|
|
Loading…
Reference in New Issue