mirror of https://codeberg.org/pzp/pzp-sync.git
Compare commits
No commits in common. "master" and "v1.0.0" have entirely different histories.
|
@ -2,10 +2,6 @@
|
||||||
|
|
||||||
PZP replication using Kleppmann's hash graph sync
|
PZP replication using Kleppmann's hash graph sync
|
||||||
|
|
||||||
https://martin.kleppmann.com/2020/12/02/bloom-filter-hash-graph-sync.html
|
|
||||||
|
|
||||||
https://arxiv.org/abs/2012.00472
|
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
|
@ -10,7 +10,6 @@ const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range')
|
||||||
* @typedef {ReturnType<import('pzp-dict').init>} PZPDict
|
* @typedef {ReturnType<import('pzp-dict').init>} PZPDict
|
||||||
* @typedef {ReturnType<import('pzp-set').init>} PZPSet
|
* @typedef {ReturnType<import('pzp-set').init>} PZPSet
|
||||||
* @typedef {import('pzp-db/msg-v4').Msg} Msg
|
* @typedef {import('pzp-db/msg-v4').Msg} Msg
|
||||||
* @typedef {import('pzp-db/db-tangle')} DBTangle
|
|
||||||
* @typedef {import('pzp-goals').Goal} Goal
|
* @typedef {import('pzp-goals').Goal} Goal
|
||||||
* @typedef {import('./range').Range} Range
|
* @typedef {import('./range').Range} Range
|
||||||
* @typedef {string} MsgID
|
* @typedef {string} MsgID
|
||||||
|
@ -315,15 +314,12 @@ class Algorithm {
|
||||||
* @param {number} count
|
* @param {number} count
|
||||||
*/
|
*/
|
||||||
async pruneNewest(rootID, count) {
|
async pruneNewest(rootID, count) {
|
||||||
/** @type {DBTangle | null} */
|
|
||||||
const tangle = await p(this.#peer.db.getTangle)(rootID)
|
const tangle = await p(this.#peer.db.getTangle)(rootID)
|
||||||
if (!tangle) return
|
if (!tangle) return
|
||||||
const sorted = tangle.topoSort()
|
const sorted = tangle.topoSort()
|
||||||
if (sorted.length <= count) return
|
if (sorted.length <= count) return
|
||||||
const msgID = sorted[sorted.length - count] // New "oldest dataful msg"
|
const msgID = sorted[sorted.length - count] // New "oldest dataful msg"
|
||||||
const deletablesErasables = tangle.getDeletablesAndErasables(msgID)
|
const { deletables, erasables } = tangle.getDeletablesAndErasables(msgID)
|
||||||
if (!deletablesErasables) return
|
|
||||||
const { deletables, erasables } = deletablesErasables
|
|
||||||
const del = p(this.#peer.db.del)
|
const del = p(this.#peer.db.del)
|
||||||
const erase = p(this.#peer.db.erase)
|
const erase = p(this.#peer.db.erase)
|
||||||
for (const msgID of deletables) {
|
for (const msgID of deletables) {
|
||||||
|
|
|
@ -108,9 +108,6 @@ class SyncStream extends Pipeable {
|
||||||
this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => {
|
this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => {
|
||||||
if (!this.sink || this.sink.paused) return
|
if (!this.sink || this.sink.paused) return
|
||||||
const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles))
|
const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles))
|
||||||
|
|
||||||
this.resume()
|
|
||||||
|
|
||||||
for (const id of tangleIDs) {
|
for (const id of tangleIDs) {
|
||||||
if (this.#realtimeSyncing.has(id)) {
|
if (this.#realtimeSyncing.has(id)) {
|
||||||
if (this.#receivableMsgs.has(msgID)) continue
|
if (this.#receivableMsgs.has(msgID)) continue
|
||||||
|
@ -208,7 +205,8 @@ class SyncStream extends Pipeable {
|
||||||
this.#remoteHave.set(id, remoteHaveRange)
|
this.#remoteHave.set(id, remoteHaveRange)
|
||||||
this.#remoteWant.set(id, remoteWantRange)
|
this.#remoteWant.set(id, remoteWantRange)
|
||||||
const goal = this.#goals.get(id)
|
const goal = this.#goals.get(id)
|
||||||
const haveRange = this.#localHave.get(id) ?? [-1, -1]
|
const haveRange = this.#localHave.get(id)
|
||||||
|
if (!haveRange) throw new Error(`Local have-range not set for ${id}`)
|
||||||
const localWant = await this.#algo.wantRange(haveRange, remoteHaveRange, goal)
|
const localWant = await this.#algo.wantRange(haveRange, remoteHaveRange, goal)
|
||||||
this.#localWant.set(id, localWant)
|
this.#localWant.set(id, localWant)
|
||||||
const localBloom0 = await this.#algo.bloomFor(id, 0, localWant)
|
const localBloom0 = await this.#algo.bloomFor(id, 0, localWant)
|
||||||
|
@ -259,7 +257,8 @@ class SyncStream extends Pipeable {
|
||||||
async #sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) {
|
async #sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round-1, id, msgIDsForMe)
|
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round-1, id, msgIDsForMe)
|
||||||
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
|
const remoteWantRange = this.#remoteWant.get(id)
|
||||||
|
if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`)
|
||||||
this.#updateReceivableMsgs(id, msgIDsForMe)
|
this.#updateReceivableMsgs(id, msgIDsForMe)
|
||||||
const msgIDsForThem = await this.#algo.getMsgsMissing(
|
const msgIDsForThem = await this.#algo.getMsgsMissing(
|
||||||
id,
|
id,
|
||||||
|
@ -291,7 +290,8 @@ class SyncStream extends Pipeable {
|
||||||
async #sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) {
|
async #sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round, id, msgIDsForMe)
|
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round, id, msgIDsForMe)
|
||||||
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
|
const remoteWantRange = this.#remoteWant.get(id)
|
||||||
|
if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`)
|
||||||
this.#updateReceivableMsgs(id, msgIDsForMe)
|
this.#updateReceivableMsgs(id, msgIDsForMe)
|
||||||
const msgIDsForThem = await this.#algo.getMsgsMissing(
|
const msgIDsForThem = await this.#algo.getMsgsMissing(
|
||||||
id,
|
id,
|
||||||
|
@ -322,7 +322,8 @@ class SyncStream extends Pipeable {
|
||||||
async #sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) {
|
async #sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN7: got bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe)
|
this.#debug('%s Stream IN7: got bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe)
|
||||||
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
|
const remoteWantRange = this.#remoteWant.get(id)
|
||||||
|
if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`)
|
||||||
this.#updateReceivableMsgs(id, msgIDsForMe)
|
this.#updateReceivableMsgs(id, msgIDsForMe)
|
||||||
const msgIDsForThem = await this.#algo.getMsgsMissing(
|
const msgIDsForThem = await this.#algo.getMsgsMissing(
|
||||||
id,
|
id,
|
||||||
|
@ -360,7 +361,8 @@ class SyncStream extends Pipeable {
|
||||||
async #sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) {
|
async #sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id)
|
this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id)
|
||||||
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
|
const remoteWantRange = this.#remoteWant.get(id)
|
||||||
|
if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`)
|
||||||
const msgIDsForThem = await this.#algo.getMsgsMissing(
|
const msgIDsForThem = await this.#algo.getMsgsMissing(
|
||||||
id,
|
id,
|
||||||
round,
|
round,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "pzp-sync",
|
"name": "pzp-sync",
|
||||||
"version": "1.0.4",
|
"version": "1.0.0",
|
||||||
"description": "PZP replication using Kleppmann's hash graph sync",
|
"description": "PZP replication using Kleppmann's hash graph sync",
|
||||||
"author": "Andre Staltz <contact@staltz.com>",
|
"author": "Andre Staltz <contact@staltz.com>",
|
||||||
"license": "MIT",
|
"license": "MIT",
|
||||||
|
@ -38,7 +38,7 @@
|
||||||
"@types/node": "16.x",
|
"@types/node": "16.x",
|
||||||
"c8": "7",
|
"c8": "7",
|
||||||
"pzp-caps": "^1.0.0",
|
"pzp-caps": "^1.0.0",
|
||||||
"pzp-db": "^1.0.4",
|
"pzp-db": "^1.0.1",
|
||||||
"pzp-dict": "^1.0.0",
|
"pzp-dict": "^1.0.0",
|
||||||
"pzp-goals": "^1.0.0",
|
"pzp-goals": "^1.0.0",
|
||||||
"pzp-keypair": "^1.0.0",
|
"pzp-keypair": "^1.0.0",
|
||||||
|
|
|
@ -167,216 +167,3 @@ test('sync feed msgs in realtime after the 9 rounds, reverse', async (t) => {
|
||||||
await p(alice.close)(true)
|
await p(alice.close)(true)
|
||||||
await p(bob.close)(true)
|
await p(bob.close)(true)
|
||||||
})
|
})
|
||||||
|
|
||||||
test('create 100 messages in parallel that still manage to sync realtime (create an initial post before starting realtime too)', async (t) => {
|
|
||||||
const alice = createPeer({ name: 'alice' })
|
|
||||||
const bob = createPeer({ name: 'bob' })
|
|
||||||
|
|
||||||
await alice.db.loaded()
|
|
||||||
await bob.db.loaded()
|
|
||||||
|
|
||||||
const bobID = await p(bob.db.account.create)({
|
|
||||||
subdomain: 'account',
|
|
||||||
_nonce: 'bob',
|
|
||||||
})
|
|
||||||
|
|
||||||
await p(bob.db.feed.publish)({
|
|
||||||
account: bobID,
|
|
||||||
domain: 'post',
|
|
||||||
data: { text: 'm0' },
|
|
||||||
})
|
|
||||||
assert('bob published post 0')
|
|
||||||
|
|
||||||
const bobPostsID = bob.db.feed.getID(bobID, 'post')
|
|
||||||
|
|
||||||
{
|
|
||||||
const arr = (await flatten(alice.db.msgs()))
|
|
||||||
.filter((msg) => msg.metadata.account === bobID && msg.data)
|
|
||||||
.map((msg) => msg.data.text)
|
|
||||||
assert.deepEqual(arr, [], 'alice has no posts from bob')
|
|
||||||
}
|
|
||||||
|
|
||||||
bob.goals.set(bobPostsID, 'all')
|
|
||||||
alice.goals.set(bobPostsID, 'all')
|
|
||||||
|
|
||||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
|
||||||
assert('alice and bob connected')
|
|
||||||
|
|
||||||
bob.sync.start()
|
|
||||||
await p(setTimeout)(1000)
|
|
||||||
assert('sync!')
|
|
||||||
|
|
||||||
const n = 100
|
|
||||||
const hundred = []
|
|
||||||
for (let i = 0; i < n; i++) {
|
|
||||||
hundred.push(i)
|
|
||||||
}
|
|
||||||
await Promise.all(hundred.map(i => p(bob.db.feed.publish)({
|
|
||||||
account: bobID,
|
|
||||||
domain: 'post',
|
|
||||||
data: { text: `post nr ${i}` },
|
|
||||||
})))
|
|
||||||
assert('bob published 100 posts in parallel')
|
|
||||||
|
|
||||||
const bobMsgs = await flatten(bob.db.msgs())
|
|
||||||
// 1 for creating bob's account, 1 for the 'post' moot, and 1 for first post
|
|
||||||
assert.equal(bobMsgs.length, n + 3, "bob has all of his own messages")
|
|
||||||
|
|
||||||
let arr
|
|
||||||
// just waiting for them to arrive
|
|
||||||
for (let i = 0; i < 100; i++) {
|
|
||||||
arr = (await flatten(alice.db.msgs()))
|
|
||||||
// moot doesn't have msg.data
|
|
||||||
.filter((msg) => msg.metadata.account === bobID && msg.data)
|
|
||||||
.filter(msg => msg.metadata.domain === 'post')
|
|
||||||
.map((msg) => msg.data.text)
|
|
||||||
if (arr.length < n) {
|
|
||||||
await p(setTimeout)(100)
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert.equal(arr.length, n + 1, `alice has ${arr.length} posts from bob`)
|
|
||||||
|
|
||||||
await p(remoteAlice.close)(true)
|
|
||||||
await p(alice.close)(true)
|
|
||||||
await p(bob.close)(true)
|
|
||||||
})
|
|
||||||
|
|
||||||
test('create 100 messages in parallel that still manage to sync realtime (without creating an initial post before starting realtime)', async (t) => {
|
|
||||||
const alice = createPeer({ name: 'alice' })
|
|
||||||
const bob = createPeer({ name: 'bob' })
|
|
||||||
|
|
||||||
await alice.db.loaded()
|
|
||||||
await bob.db.loaded()
|
|
||||||
|
|
||||||
const bobID = await p(bob.db.account.create)({
|
|
||||||
subdomain: 'account',
|
|
||||||
_nonce: 'bob',
|
|
||||||
})
|
|
||||||
|
|
||||||
const bobPostsID = bob.db.feed.getID(bobID, 'post')
|
|
||||||
|
|
||||||
{
|
|
||||||
const arr = (await flatten(alice.db.msgs()))
|
|
||||||
.filter((msg) => msg.metadata.account === bobID && msg.data)
|
|
||||||
.map((msg) => msg.data.text)
|
|
||||||
assert.deepEqual(arr, [], 'alice has no posts from bob')
|
|
||||||
}
|
|
||||||
|
|
||||||
bob.goals.set(bobPostsID, 'all')
|
|
||||||
alice.goals.set(bobPostsID, 'all')
|
|
||||||
|
|
||||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
|
||||||
assert('alice and bob connected')
|
|
||||||
|
|
||||||
bob.sync.start()
|
|
||||||
await p(setTimeout)(1000)
|
|
||||||
assert('sync!')
|
|
||||||
|
|
||||||
const n = 100
|
|
||||||
const hundred = []
|
|
||||||
for (let i = 0; i < n; i++) {
|
|
||||||
hundred.push(i)
|
|
||||||
}
|
|
||||||
await Promise.all(hundred.map(i => p(bob.db.feed.publish)({
|
|
||||||
account: bobID,
|
|
||||||
domain: 'post',
|
|
||||||
data: { text: `post nr ${i}` },
|
|
||||||
})))
|
|
||||||
assert('bob published 100 posts in parallel')
|
|
||||||
|
|
||||||
const bobMsgs = await flatten(bob.db.msgs())
|
|
||||||
// 1 for creating bob's account, and 1 for the 'post' moot
|
|
||||||
assert.equal(bobMsgs.length, n + 2, "bob has all of his own messages")
|
|
||||||
|
|
||||||
let arr
|
|
||||||
// just waiting for them to arrive
|
|
||||||
for (let i = 0; i < 100; i++) {
|
|
||||||
arr = (await flatten(alice.db.msgs()))
|
|
||||||
// moot doesn't have msg.data
|
|
||||||
.filter((msg) => msg.metadata.account === bobID && msg.data)
|
|
||||||
.filter(msg => msg.metadata.domain === 'post')
|
|
||||||
.map((msg) => msg.data.text)
|
|
||||||
if (arr.length < n) {
|
|
||||||
await p(setTimeout)(100)
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert.equal(arr.length, n, `alice has ${arr.length} posts from bob`)
|
|
||||||
|
|
||||||
await p(remoteAlice.close)(true)
|
|
||||||
await p(alice.close)(true)
|
|
||||||
await p(bob.close)(true)
|
|
||||||
})
|
|
||||||
|
|
||||||
test('create 200 messages that manage to replicate with low "newest" goals', async (t) => {
|
|
||||||
const n = 200
|
|
||||||
|
|
||||||
const alice = createPeer({ name: 'alice' })
|
|
||||||
const bob = createPeer({ name: 'bob' })
|
|
||||||
|
|
||||||
await alice.db.loaded()
|
|
||||||
await bob.db.loaded()
|
|
||||||
|
|
||||||
const bobID = await p(bob.db.account.create)({
|
|
||||||
subdomain: 'account',
|
|
||||||
_nonce: 'bob',
|
|
||||||
})
|
|
||||||
|
|
||||||
const bobPostsID = bob.db.feed.getID(bobID, 'post')
|
|
||||||
|
|
||||||
{
|
|
||||||
const arr = (await flatten(alice.db.msgs()))
|
|
||||||
.filter((msg) => msg.metadata.account === bobID && msg.data)
|
|
||||||
.map((msg) => msg.data.text)
|
|
||||||
assert.deepEqual(arr, [], 'alice has no posts from bob')
|
|
||||||
}
|
|
||||||
|
|
||||||
const confirmed = []
|
|
||||||
// for keeping track of which msgs have arrived
|
|
||||||
for (let i = 0; i < n; i++) {
|
|
||||||
confirmed.push(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
alice.db.onRecordAdded(rec => {
|
|
||||||
if (rec.msg.data?.text) {
|
|
||||||
const num = Number.parseInt(rec.msg.data.text)
|
|
||||||
confirmed[num] = true
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
bob.goals.set(bobPostsID, 'newest-50')
|
|
||||||
alice.goals.set(bobPostsID, 'newest-50')
|
|
||||||
|
|
||||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
|
||||||
assert('alice and bob connected')
|
|
||||||
|
|
||||||
bob.sync.start()
|
|
||||||
await p(setTimeout)(1000)
|
|
||||||
assert('sync!')
|
|
||||||
|
|
||||||
const hundred = []
|
|
||||||
for (let i = 0; i < n; i++) {
|
|
||||||
hundred.push(i)
|
|
||||||
}
|
|
||||||
Promise.all(hundred.map(i => p(bob.db.feed.publish)({
|
|
||||||
account: bobID,
|
|
||||||
domain: 'post',
|
|
||||||
data: { text: `${i}` },
|
|
||||||
})))
|
|
||||||
assert(`bob published ${n} posts in parallel`)
|
|
||||||
|
|
||||||
let tries = 30
|
|
||||||
// just waiting for them to arrive
|
|
||||||
do {
|
|
||||||
await p(setTimeout)(100)
|
|
||||||
} while (!confirmed.every(v => v === true) && tries-- > 0)
|
|
||||||
|
|
||||||
assert.equal(confirmed.filter(v => v === true).length, n, `alice has all of bob's posts`)
|
|
||||||
|
|
||||||
await p(remoteAlice.close)(true)
|
|
||||||
await p(alice.close)(true)
|
|
||||||
await p(bob.close)(true)
|
|
||||||
})
|
|
Loading…
Reference in New Issue