mirror of https://codeberg.org/pzp/pzp-sync.git
Compare commits
7 Commits
Author | SHA1 | Date |
---|---|---|
|
89401de0c6 | |
|
39f8cca208 | |
|
cb557ef15a | |
|
779d7aab31 | |
|
8f2bd08a60 | |
|
a517fd465c | |
|
2b74aa6b1a |
|
@ -10,6 +10,7 @@ const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range')
|
|||
* @typedef {ReturnType<import('pzp-dict').init>} PZPDict
|
||||
* @typedef {ReturnType<import('pzp-set').init>} PZPSet
|
||||
* @typedef {import('pzp-db/msg-v4').Msg} Msg
|
||||
* @typedef {import('pzp-db/db-tangle')} DBTangle
|
||||
* @typedef {import('pzp-goals').Goal} Goal
|
||||
* @typedef {import('./range').Range} Range
|
||||
* @typedef {string} MsgID
|
||||
|
@ -314,12 +315,15 @@ class Algorithm {
|
|||
* @param {number} count
|
||||
*/
|
||||
async pruneNewest(rootID, count) {
|
||||
/** @type {DBTangle | null} */
|
||||
const tangle = await p(this.#peer.db.getTangle)(rootID)
|
||||
if (!tangle) return
|
||||
const sorted = tangle.topoSort()
|
||||
if (sorted.length <= count) return
|
||||
const msgID = sorted[sorted.length - count] // New "oldest dataful msg"
|
||||
const { deletables, erasables } = tangle.getDeletablesAndErasables(msgID)
|
||||
const deletablesErasables = tangle.getDeletablesAndErasables(msgID)
|
||||
if (!deletablesErasables) return
|
||||
const { deletables, erasables } = deletablesErasables
|
||||
const del = p(this.#peer.db.del)
|
||||
const erase = p(this.#peer.db.erase)
|
||||
for (const msgID of deletables) {
|
||||
|
|
|
@ -208,8 +208,7 @@ class SyncStream extends Pipeable {
|
|||
this.#remoteHave.set(id, remoteHaveRange)
|
||||
this.#remoteWant.set(id, remoteWantRange)
|
||||
const goal = this.#goals.get(id)
|
||||
const haveRange = this.#localHave.get(id)
|
||||
if (!haveRange) throw new Error(`Local have-range not set for ${id}`)
|
||||
const haveRange = this.#localHave.get(id) ?? [-1, -1]
|
||||
const localWant = await this.#algo.wantRange(haveRange, remoteHaveRange, goal)
|
||||
this.#localWant.set(id, localWant)
|
||||
const localBloom0 = await this.#algo.bloomFor(id, 0, localWant)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"name": "pzp-sync",
|
||||
"version": "1.0.2",
|
||||
"version": "1.0.4",
|
||||
"description": "PZP replication using Kleppmann's hash graph sync",
|
||||
"author": "Andre Staltz <contact@staltz.com>",
|
||||
"license": "MIT",
|
||||
|
@ -38,7 +38,7 @@
|
|||
"@types/node": "16.x",
|
||||
"c8": "7",
|
||||
"pzp-caps": "^1.0.0",
|
||||
"pzp-db": "^1.0.1",
|
||||
"pzp-db": "^1.0.4",
|
||||
"pzp-dict": "^1.0.0",
|
||||
"pzp-goals": "^1.0.0",
|
||||
"pzp-keypair": "^1.0.0",
|
||||
|
|
|
@ -306,6 +306,76 @@ test('create 100 messages in parallel that still manage to sync realtime (withou
|
|||
}
|
||||
assert.equal(arr.length, n, `alice has ${arr.length} posts from bob`)
|
||||
|
||||
await p(remoteAlice.close)(true)
|
||||
await p(alice.close)(true)
|
||||
await p(bob.close)(true)
|
||||
})
|
||||
|
||||
test('create 200 messages that manage to replicate with low "newest" goals', async (t) => {
|
||||
const n = 200
|
||||
|
||||
const alice = createPeer({ name: 'alice' })
|
||||
const bob = createPeer({ name: 'bob' })
|
||||
|
||||
await alice.db.loaded()
|
||||
await bob.db.loaded()
|
||||
|
||||
const bobID = await p(bob.db.account.create)({
|
||||
subdomain: 'account',
|
||||
_nonce: 'bob',
|
||||
})
|
||||
|
||||
const bobPostsID = bob.db.feed.getID(bobID, 'post')
|
||||
|
||||
{
|
||||
const arr = (await flatten(alice.db.msgs()))
|
||||
.filter((msg) => msg.metadata.account === bobID && msg.data)
|
||||
.map((msg) => msg.data.text)
|
||||
assert.deepEqual(arr, [], 'alice has no posts from bob')
|
||||
}
|
||||
|
||||
const confirmed = []
|
||||
// for keeping track of which msgs have arrived
|
||||
for (let i = 0; i < n; i++) {
|
||||
confirmed.push(false)
|
||||
}
|
||||
|
||||
alice.db.onRecordAdded(rec => {
|
||||
if (rec.msg.data?.text) {
|
||||
const num = Number.parseInt(rec.msg.data.text)
|
||||
confirmed[num] = true
|
||||
}
|
||||
})
|
||||
|
||||
bob.goals.set(bobPostsID, 'newest-50')
|
||||
alice.goals.set(bobPostsID, 'newest-50')
|
||||
|
||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
||||
assert('alice and bob connected')
|
||||
|
||||
bob.sync.start()
|
||||
await p(setTimeout)(1000)
|
||||
assert('sync!')
|
||||
|
||||
const hundred = []
|
||||
for (let i = 0; i < n; i++) {
|
||||
hundred.push(i)
|
||||
}
|
||||
Promise.all(hundred.map(i => p(bob.db.feed.publish)({
|
||||
account: bobID,
|
||||
domain: 'post',
|
||||
data: { text: `${i}` },
|
||||
})))
|
||||
assert(`bob published ${n} posts in parallel`)
|
||||
|
||||
let tries = 30
|
||||
// just waiting for them to arrive
|
||||
do {
|
||||
await p(setTimeout)(100)
|
||||
} while (!confirmed.every(v => v === true) && tries-- > 0)
|
||||
|
||||
assert.equal(confirmed.filter(v => v === true).length, n, `alice has all of bob's posts`)
|
||||
|
||||
await p(remoteAlice.close)(true)
|
||||
await p(alice.close)(true)
|
||||
await p(bob.close)(true)
|
||||
|
|
Loading…
Reference in New Issue