Merge pull request 'Add realtime sync test with 100 msgs' (#6) from 100-realtime into master

Reviewed-on: https://codeberg.org/pzp/pzp-sync/pulls/6
This commit is contained in:
Powersource 2024-05-29 13:11:30 +00:00
commit 5410d7a87c
3 changed files with 150 additions and 0 deletions

View File

@ -2,6 +2,10 @@
PZP replication using Kleppmann's hash graph sync PZP replication using Kleppmann's hash graph sync
https://martin.kleppmann.com/2020/12/02/bloom-filter-hash-graph-sync.html
https://arxiv.org/abs/2012.00472
## Installation ## Installation
``` ```

View File

@ -108,6 +108,9 @@ class SyncStream extends Pipeable {
this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => { this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => {
if (!this.sink || this.sink.paused) return if (!this.sink || this.sink.paused) return
const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles)) const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles))
this.resume()
for (const id of tangleIDs) { for (const id of tangleIDs) {
if (this.#realtimeSyncing.has(id)) { if (this.#realtimeSyncing.has(id)) {
if (this.#receivableMsgs.has(msgID)) continue if (this.#receivableMsgs.has(msgID)) continue

View File

@ -167,3 +167,146 @@ test('sync feed msgs in realtime after the 9 rounds, reverse', async (t) => {
await p(alice.close)(true) await p(alice.close)(true)
await p(bob.close)(true) await p(bob.close)(true)
}) })
test('create 100 messages in parallel that still manage to sync realtime (create an initial post before starting realtime too)', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm0' },
})
assert('bob published post 0')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('alice and bob connected')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
const n = 100
const hundred = []
for (let i = 0; i < n; i++) {
hundred.push(i)
}
await Promise.all(hundred.map(i => p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: `post nr ${i}` },
})))
assert('bob published 100 posts in parallel')
const bobMsgs = await flatten(bob.db.msgs())
// 1 for creating bob's account, 1 for the 'post' moot, and 1 for first post
assert.equal(bobMsgs.length, n + 3, "bob has all of his own messages")
let arr
// just waiting for them to arrive
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
// moot doesn't have msg.data
.filter((msg) => msg.metadata.account === bobID && msg.data)
.filter(msg => msg.metadata.domain === 'post')
.map((msg) => msg.data.text)
if (arr.length < n) {
await p(setTimeout)(100)
} else {
break
}
}
assert.equal(arr.length, n + 1, `alice has ${arr.length} posts from bob`)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('create 100 messages in parallel that still manage to sync realtime (without creating an initial post before starting realtime)', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('alice and bob connected')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
const n = 100
const hundred = []
for (let i = 0; i < n; i++) {
hundred.push(i)
}
await Promise.all(hundred.map(i => p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: `post nr ${i}` },
})))
assert('bob published 100 posts in parallel')
const bobMsgs = await flatten(bob.db.msgs())
// 1 for creating bob's account, and 1 for the 'post' moot
assert.equal(bobMsgs.length, n + 2, "bob has all of his own messages")
let arr
// just waiting for them to arrive
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
// moot doesn't have msg.data
.filter((msg) => msg.metadata.account === bobID && msg.data)
.filter(msg => msg.metadata.domain === 'post')
.map((msg) => msg.data.text)
if (arr.length < n) {
await p(setTimeout)(100)
} else {
break
}
}
assert.equal(arr.length, n, `alice has ${arr.length} posts from bob`)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})