Compare commits

...

16 Commits

Author SHA1 Message Date
Powersource 89401de0c6 Merge pull request 'Add test for low newest goals' (#14) from low-newest into master
Reviewed-on: https://codeberg.org/pzp/pzp-sync/pulls/14
2024-06-28 11:04:37 +00:00
Jacob Karlsson 39f8cca208 Remove log 2024-06-28 13:02:38 +02:00
Jacob Karlsson cb557ef15a Add test for low newest goals 2024-06-28 12:54:28 +02:00
Jacob Karlsson 779d7aab31 1.0.4 2024-06-09 17:43:04 +02:00
Jacob Karlsson 8f2bd08a60 Don't throw on missing local want range 2024-06-09 17:42:24 +02:00
Jacob Karlsson a517fd465c 1.0.3 2024-06-05 17:30:11 +02:00
Jacob Karlsson 2b74aa6b1a Fix not checking null on dbtangle call 2024-06-05 17:29:54 +02:00
Jacob Karlsson b851d3eaf1 1.0.2 2024-05-29 17:29:21 +02:00
Powersource dbb4bb28e6 Merge pull request 'Handle missing remoteWants without throwing' (#9) from missing-remotewant into master
Reviewed-on: https://codeberg.org/pzp/pzp-sync/pulls/9
2024-05-29 15:27:51 +00:00
Jacob Karlsson 5797116a87 Handle missing remoteWants without throwing 2024-05-29 17:17:47 +02:00
Jacob Karlsson c6d142dd0c 1.0.1 2024-05-29 15:13:06 +02:00
Powersource 5410d7a87c Merge pull request 'Add realtime sync test with 100 msgs' (#6) from 100-realtime into master
Reviewed-on: https://codeberg.org/pzp/pzp-sync/pulls/6
2024-05-29 13:11:30 +00:00
Jacob Karlsson 87b6b25685 Fix failing realtime test 2024-05-29 14:56:20 +02:00
Jacob Karlsson 636cd9adcc Add failing test where we don't create a message before realtime sync 2024-05-29 14:48:40 +02:00
Jacob Karlsson e199eb97d3 Fix realtime 100 test with a hack 2024-05-28 15:38:30 +02:00
Jacob Karlsson cbf12a5e8c Start adding realtime test with 100 msgs 2024-05-23 18:46:36 +02:00
5 changed files with 232 additions and 13 deletions

View File

@ -2,6 +2,10 @@
PZP replication using Kleppmann's hash graph sync
https://martin.kleppmann.com/2020/12/02/bloom-filter-hash-graph-sync.html
https://arxiv.org/abs/2012.00472
## Installation
```

View File

@ -10,6 +10,7 @@ const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range')
* @typedef {ReturnType<import('pzp-dict').init>} PZPDict
* @typedef {ReturnType<import('pzp-set').init>} PZPSet
* @typedef {import('pzp-db/msg-v4').Msg} Msg
* @typedef {import('pzp-db/db-tangle')} DBTangle
* @typedef {import('pzp-goals').Goal} Goal
* @typedef {import('./range').Range} Range
* @typedef {string} MsgID
@ -314,12 +315,15 @@ class Algorithm {
* @param {number} count
*/
async pruneNewest(rootID, count) {
/** @type {DBTangle | null} */
const tangle = await p(this.#peer.db.getTangle)(rootID)
if (!tangle) return
const sorted = tangle.topoSort()
if (sorted.length <= count) return
const msgID = sorted[sorted.length - count] // New "oldest dataful msg"
const { deletables, erasables } = tangle.getDeletablesAndErasables(msgID)
const deletablesErasables = tangle.getDeletablesAndErasables(msgID)
if (!deletablesErasables) return
const { deletables, erasables } = deletablesErasables
const del = p(this.#peer.db.del)
const erase = p(this.#peer.db.erase)
for (const msgID of deletables) {

View File

@ -108,6 +108,9 @@ class SyncStream extends Pipeable {
this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => {
if (!this.sink || this.sink.paused) return
const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles))
this.resume()
for (const id of tangleIDs) {
if (this.#realtimeSyncing.has(id)) {
if (this.#receivableMsgs.has(msgID)) continue
@ -205,8 +208,7 @@ class SyncStream extends Pipeable {
this.#remoteHave.set(id, remoteHaveRange)
this.#remoteWant.set(id, remoteWantRange)
const goal = this.#goals.get(id)
const haveRange = this.#localHave.get(id)
if (!haveRange) throw new Error(`Local have-range not set for ${id}`)
const haveRange = this.#localHave.get(id) ?? [-1, -1]
const localWant = await this.#algo.wantRange(haveRange, remoteHaveRange, goal)
this.#localWant.set(id, localWant)
const localBloom0 = await this.#algo.bloomFor(id, 0, localWant)
@ -257,8 +259,7 @@ class SyncStream extends Pipeable {
async #sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) {
// prettier-ignore
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round-1, id, msgIDsForMe)
const remoteWantRange = this.#remoteWant.get(id)
if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`)
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = await this.#algo.getMsgsMissing(
id,
@ -290,8 +291,7 @@ class SyncStream extends Pipeable {
async #sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) {
// prettier-ignore
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round, id, msgIDsForMe)
const remoteWantRange = this.#remoteWant.get(id)
if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`)
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = await this.#algo.getMsgsMissing(
id,
@ -322,8 +322,7 @@ class SyncStream extends Pipeable {
async #sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) {
// prettier-ignore
this.#debug('%s Stream IN7: got bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe)
const remoteWantRange = this.#remoteWant.get(id)
if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`)
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = await this.#algo.getMsgsMissing(
id,
@ -361,8 +360,7 @@ class SyncStream extends Pipeable {
async #sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) {
// prettier-ignore
this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id)
const remoteWantRange = this.#remoteWant.get(id)
if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`)
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
const msgIDsForThem = await this.#algo.getMsgsMissing(
id,
round,

View File

@ -1,6 +1,6 @@
{
"name": "pzp-sync",
"version": "1.0.0",
"version": "1.0.4",
"description": "PZP replication using Kleppmann's hash graph sync",
"author": "Andre Staltz <contact@staltz.com>",
"license": "MIT",
@ -38,7 +38,7 @@
"@types/node": "16.x",
"c8": "7",
"pzp-caps": "^1.0.0",
"pzp-db": "^1.0.1",
"pzp-db": "^1.0.4",
"pzp-dict": "^1.0.0",
"pzp-goals": "^1.0.0",
"pzp-keypair": "^1.0.0",

View File

@ -167,3 +167,216 @@ test('sync feed msgs in realtime after the 9 rounds, reverse', async (t) => {
await p(alice.close)(true)
await p(bob.close)(true)
})
test('create 100 messages in parallel that still manage to sync realtime (create an initial post before starting realtime too)', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm0' },
})
assert('bob published post 0')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('alice and bob connected')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
const n = 100
const hundred = []
for (let i = 0; i < n; i++) {
hundred.push(i)
}
await Promise.all(hundred.map(i => p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: `post nr ${i}` },
})))
assert('bob published 100 posts in parallel')
const bobMsgs = await flatten(bob.db.msgs())
// 1 for creating bob's account, 1 for the 'post' moot, and 1 for first post
assert.equal(bobMsgs.length, n + 3, "bob has all of his own messages")
let arr
// just waiting for them to arrive
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
// moot doesn't have msg.data
.filter((msg) => msg.metadata.account === bobID && msg.data)
.filter(msg => msg.metadata.domain === 'post')
.map((msg) => msg.data.text)
if (arr.length < n) {
await p(setTimeout)(100)
} else {
break
}
}
assert.equal(arr.length, n + 1, `alice has ${arr.length} posts from bob`)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('create 100 messages in parallel that still manage to sync realtime (without creating an initial post before starting realtime)', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('alice and bob connected')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
const n = 100
const hundred = []
for (let i = 0; i < n; i++) {
hundred.push(i)
}
await Promise.all(hundred.map(i => p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: `post nr ${i}` },
})))
assert('bob published 100 posts in parallel')
const bobMsgs = await flatten(bob.db.msgs())
// 1 for creating bob's account, and 1 for the 'post' moot
assert.equal(bobMsgs.length, n + 2, "bob has all of his own messages")
let arr
// just waiting for them to arrive
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
// moot doesn't have msg.data
.filter((msg) => msg.metadata.account === bobID && msg.data)
.filter(msg => msg.metadata.domain === 'post')
.map((msg) => msg.data.text)
if (arr.length < n) {
await p(setTimeout)(100)
} else {
break
}
}
assert.equal(arr.length, n, `alice has ${arr.length} posts from bob`)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('create 200 messages that manage to replicate with low "newest" goals', async (t) => {
const n = 200
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
const confirmed = []
// for keeping track of which msgs have arrived
for (let i = 0; i < n; i++) {
confirmed.push(false)
}
alice.db.onRecordAdded(rec => {
if (rec.msg.data?.text) {
const num = Number.parseInt(rec.msg.data.text)
confirmed[num] = true
}
})
bob.goals.set(bobPostsID, 'newest-50')
alice.goals.set(bobPostsID, 'newest-50')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('alice and bob connected')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
const hundred = []
for (let i = 0; i < n; i++) {
hundred.push(i)
}
Promise.all(hundred.map(i => p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: `${i}` },
})))
assert(`bob published ${n} posts in parallel`)
let tries = 30
// just waiting for them to arrive
do {
await p(setTimeout)(100)
} while (!confirmed.every(v => v === true) && tries-- > 0)
assert.equal(confirmed.filter(v => v === true).length, n, `alice has all of bob's posts`)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})