From cbf12a5e8c3d30adcbe02290e252a15ba744c446 Mon Sep 17 00:00:00 2001 From: Jacob Karlsson Date: Thu, 23 May 2024 18:46:36 +0200 Subject: [PATCH 1/4] Start adding realtime test with 100 msgs --- test/realtime.test.js | 68 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/test/realtime.test.js b/test/realtime.test.js index 3dc3b12..76763da 100644 --- a/test/realtime.test.js +++ b/test/realtime.test.js @@ -167,3 +167,71 @@ test('sync feed msgs in realtime after the 9 rounds, reverse', async (t) => { await p(alice.close)(true) await p(bob.close)(true) }) + +test('create 100 messages in parallel that still manage to sync realtime', async (t) => { + const alice = createPeer({ name: 'alice' }) + const bob = createPeer({ name: 'bob' }) + + await alice.db.loaded() + await bob.db.loaded() + + const bobID = await p(bob.db.account.create)({ + subdomain: 'account', + _nonce: 'bob', + }) + + const bobPostsID = bob.db.feed.getID(bobID, 'post') + + { + const arr = (await flatten(alice.db.msgs())) + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + assert.deepEqual(arr, [], 'alice has no posts from bob') + } + + bob.goals.set(bobPostsID, 'all') + alice.goals.set(bobPostsID, 'all') + + const remoteBob = await p(alice.connect)(bob.getAddress()) + assert('alice and bob connected') + + alice.sync.start() + await p(setTimeout)(1000) + assert('sync!') + + const n = 1 + const hundred = [] + for (let i = 0; i < n; i++) { + hundred.push(i) + } + await Promise.all(hundred.map(i => p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: `post nr ${i}` }, + }))) + assert('bob published 100 posts in parallel') + + const bobMsgs = await flatten(bob.db.msgs()) + // 1 for creating bob's account and 1 for the 'post' moot + assert.equal(bobMsgs.length, n + 2, "bob has all of his own messages") + + let arr + // just waiting for them to arrive + for (let i = 0; i < 100; i++) { + arr = (await flatten(alice.db.msgs())) + //.filter((msg) => msg.metadata.account === bobID && msg.data) + .filter(msg => msg.domain === 'post') + .map((msg) => msg.data.text) + if (arr.length < n) { + await p(setTimeout)(100) + } else { + break + } + } + console.log('msgs', arr) + assert.equal(arr.length, n, `alice has ${arr.length} posts from bob`) + + await p(remoteBob.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +}) \ No newline at end of file From e199eb97d37b1f6c246ac4d90f614d6792fbc979 Mon Sep 17 00:00:00 2001 From: Jacob Karlsson Date: Tue, 28 May 2024 15:38:30 +0200 Subject: [PATCH 2/4] Fix realtime 100 test with a hack --- test/realtime.test.js | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/test/realtime.test.js b/test/realtime.test.js index 76763da..f0b7050 100644 --- a/test/realtime.test.js +++ b/test/realtime.test.js @@ -180,6 +180,14 @@ test('create 100 messages in parallel that still manage to sync realtime', async _nonce: 'bob', }) + // TODO: for some reason realtime doesn't work unless we post this initial one before starting replication. fix the bug and remove this, or create an issue to resolve it later + await p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: 'm0' }, + }) + assert('bob published post 0') + const bobPostsID = bob.db.feed.getID(bobID, 'post') { @@ -192,14 +200,14 @@ test('create 100 messages in parallel that still manage to sync realtime', async bob.goals.set(bobPostsID, 'all') alice.goals.set(bobPostsID, 'all') - const remoteBob = await p(alice.connect)(bob.getAddress()) + const remoteAlice = await p(bob.connect)(alice.getAddress()) assert('alice and bob connected') - alice.sync.start() + bob.sync.start() await p(setTimeout)(1000) assert('sync!') - const n = 1 + const n = 100 const hundred = [] for (let i = 0; i < n; i++) { hundred.push(i) @@ -212,15 +220,16 @@ test('create 100 messages in parallel that still manage to sync realtime', async assert('bob published 100 posts in parallel') const bobMsgs = await flatten(bob.db.msgs()) - // 1 for creating bob's account and 1 for the 'post' moot - assert.equal(bobMsgs.length, n + 2, "bob has all of his own messages") + // 1 for creating bob's account, 1 for the 'post' moot, and 1 for first post + assert.equal(bobMsgs.length, n + 3, "bob has all of his own messages") let arr // just waiting for them to arrive for (let i = 0; i < 100; i++) { arr = (await flatten(alice.db.msgs())) - //.filter((msg) => msg.metadata.account === bobID && msg.data) - .filter(msg => msg.domain === 'post') + // moot doesn't have msg.data + .filter((msg) => msg.metadata.account === bobID && msg.data) + .filter(msg => msg.metadata.domain === 'post') .map((msg) => msg.data.text) if (arr.length < n) { await p(setTimeout)(100) @@ -228,10 +237,9 @@ test('create 100 messages in parallel that still manage to sync realtime', async break } } - console.log('msgs', arr) - assert.equal(arr.length, n, `alice has ${arr.length} posts from bob`) + assert.equal(arr.length, n + 1, `alice has ${arr.length + 1} posts from bob`) - await p(remoteBob.close)(true) + await p(remoteAlice.close)(true) await p(alice.close)(true) await p(bob.close)(true) }) \ No newline at end of file From 636cd9adcc147bc5f291c66f0eec37fa89a7e0b6 Mon Sep 17 00:00:00 2001 From: Jacob Karlsson Date: Wed, 29 May 2024 14:48:40 +0200 Subject: [PATCH 3/4] Add failing test where we don't create a message before realtime sync --- README.md | 4 +++ lib/stream.js | 3 ++ test/realtime.test.js | 73 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 77 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 90f3d41..d1bc2b2 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,10 @@ PZP replication using Kleppmann's hash graph sync +https://martin.kleppmann.com/2020/12/02/bloom-filter-hash-graph-sync.html + +https://arxiv.org/abs/2012.00472 + ## Installation ``` diff --git a/lib/stream.js b/lib/stream.js index d11aa9b..d91075a 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -108,6 +108,9 @@ class SyncStream extends Pipeable { this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => { if (!this.sink || this.sink.paused) return const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles)) + + //this.resume() + for (const id of tangleIDs) { if (this.#realtimeSyncing.has(id)) { if (this.#receivableMsgs.has(msgID)) continue diff --git a/test/realtime.test.js b/test/realtime.test.js index f0b7050..45ee76c 100644 --- a/test/realtime.test.js +++ b/test/realtime.test.js @@ -168,7 +168,7 @@ test('sync feed msgs in realtime after the 9 rounds, reverse', async (t) => { await p(bob.close)(true) }) -test('create 100 messages in parallel that still manage to sync realtime', async (t) => { +test('create 100 messages in parallel that still manage to sync realtime (create an initial post before starting realtime too)', async (t) => { const alice = createPeer({ name: 'alice' }) const bob = createPeer({ name: 'bob' }) @@ -180,7 +180,6 @@ test('create 100 messages in parallel that still manage to sync realtime', async _nonce: 'bob', }) - // TODO: for some reason realtime doesn't work unless we post this initial one before starting replication. fix the bug and remove this, or create an issue to resolve it later await p(bob.db.feed.publish)({ account: bobID, domain: 'post', @@ -237,7 +236,75 @@ test('create 100 messages in parallel that still manage to sync realtime', async break } } - assert.equal(arr.length, n + 1, `alice has ${arr.length + 1} posts from bob`) + assert.equal(arr.length, n + 1, `alice has ${arr.length} posts from bob`) + + await p(remoteAlice.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +}) + +test('create 100 messages in parallel that still manage to sync realtime (without creating an initial post before starting realtime)', async (t) => { + const alice = createPeer({ name: 'alice' }) + const bob = createPeer({ name: 'bob' }) + + await alice.db.loaded() + await bob.db.loaded() + + const bobID = await p(bob.db.account.create)({ + subdomain: 'account', + _nonce: 'bob', + }) + + const bobPostsID = bob.db.feed.getID(bobID, 'post') + + { + const arr = (await flatten(alice.db.msgs())) + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + assert.deepEqual(arr, [], 'alice has no posts from bob') + } + + bob.goals.set(bobPostsID, 'all') + alice.goals.set(bobPostsID, 'all') + + const remoteAlice = await p(bob.connect)(alice.getAddress()) + assert('alice and bob connected') + + bob.sync.start() + await p(setTimeout)(1000) + assert('sync!') + + const n = 100 + const hundred = [] + for (let i = 0; i < n; i++) { + hundred.push(i) + } + await Promise.all(hundred.map(i => p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: `post nr ${i}` }, + }))) + assert('bob published 100 posts in parallel') + + const bobMsgs = await flatten(bob.db.msgs()) + // 1 for creating bob's account, and 1 for the 'post' moot + assert.equal(bobMsgs.length, n + 2, "bob has all of his own messages") + + let arr + // just waiting for them to arrive + for (let i = 0; i < 100; i++) { + arr = (await flatten(alice.db.msgs())) + // moot doesn't have msg.data + .filter((msg) => msg.metadata.account === bobID && msg.data) + .filter(msg => msg.metadata.domain === 'post') + .map((msg) => msg.data.text) + if (arr.length < n) { + await p(setTimeout)(100) + } else { + break + } + } + assert.equal(arr.length, n, `alice has ${arr.length} posts from bob`) await p(remoteAlice.close)(true) await p(alice.close)(true) From 87b6b2568535a84d36c087653b1aed4e764e0485 Mon Sep 17 00:00:00 2001 From: Jacob Karlsson Date: Wed, 29 May 2024 14:56:20 +0200 Subject: [PATCH 4/4] Fix failing realtime test --- lib/stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/stream.js b/lib/stream.js index d91075a..6ac728f 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -109,7 +109,7 @@ class SyncStream extends Pipeable { if (!this.sink || this.sink.paused) return const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles)) - //this.resume() + this.resume() for (const id of tangleIDs) { if (this.#realtimeSyncing.has(id)) {