From 636cd9adcc147bc5f291c66f0eec37fa89a7e0b6 Mon Sep 17 00:00:00 2001 From: Jacob Karlsson Date: Wed, 29 May 2024 14:48:40 +0200 Subject: [PATCH] Add failing test where we don't create a message before realtime sync --- README.md | 4 +++ lib/stream.js | 3 ++ test/realtime.test.js | 73 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 77 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 90f3d41..d1bc2b2 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,10 @@ PZP replication using Kleppmann's hash graph sync +https://martin.kleppmann.com/2020/12/02/bloom-filter-hash-graph-sync.html + +https://arxiv.org/abs/2012.00472 + ## Installation ``` diff --git a/lib/stream.js b/lib/stream.js index d11aa9b..d91075a 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -108,6 +108,9 @@ class SyncStream extends Pipeable { this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => { if (!this.sink || this.sink.paused) return const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles)) + + //this.resume() + for (const id of tangleIDs) { if (this.#realtimeSyncing.has(id)) { if (this.#receivableMsgs.has(msgID)) continue diff --git a/test/realtime.test.js b/test/realtime.test.js index f0b7050..45ee76c 100644 --- a/test/realtime.test.js +++ b/test/realtime.test.js @@ -168,7 +168,7 @@ test('sync feed msgs in realtime after the 9 rounds, reverse', async (t) => { await p(bob.close)(true) }) -test('create 100 messages in parallel that still manage to sync realtime', async (t) => { +test('create 100 messages in parallel that still manage to sync realtime (create an initial post before starting realtime too)', async (t) => { const alice = createPeer({ name: 'alice' }) const bob = createPeer({ name: 'bob' }) @@ -180,7 +180,6 @@ test('create 100 messages in parallel that still manage to sync realtime', async _nonce: 'bob', }) - // TODO: for some reason realtime doesn't work unless we post this initial one before starting replication. fix the bug and remove this, or create an issue to resolve it later await p(bob.db.feed.publish)({ account: bobID, domain: 'post', @@ -237,7 +236,75 @@ test('create 100 messages in parallel that still manage to sync realtime', async break } } - assert.equal(arr.length, n + 1, `alice has ${arr.length + 1} posts from bob`) + assert.equal(arr.length, n + 1, `alice has ${arr.length} posts from bob`) + + await p(remoteAlice.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +}) + +test('create 100 messages in parallel that still manage to sync realtime (without creating an initial post before starting realtime)', async (t) => { + const alice = createPeer({ name: 'alice' }) + const bob = createPeer({ name: 'bob' }) + + await alice.db.loaded() + await bob.db.loaded() + + const bobID = await p(bob.db.account.create)({ + subdomain: 'account', + _nonce: 'bob', + }) + + const bobPostsID = bob.db.feed.getID(bobID, 'post') + + { + const arr = (await flatten(alice.db.msgs())) + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + assert.deepEqual(arr, [], 'alice has no posts from bob') + } + + bob.goals.set(bobPostsID, 'all') + alice.goals.set(bobPostsID, 'all') + + const remoteAlice = await p(bob.connect)(alice.getAddress()) + assert('alice and bob connected') + + bob.sync.start() + await p(setTimeout)(1000) + assert('sync!') + + const n = 100 + const hundred = [] + for (let i = 0; i < n; i++) { + hundred.push(i) + } + await Promise.all(hundred.map(i => p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: `post nr ${i}` }, + }))) + assert('bob published 100 posts in parallel') + + const bobMsgs = await flatten(bob.db.msgs()) + // 1 for creating bob's account, and 1 for the 'post' moot + assert.equal(bobMsgs.length, n + 2, "bob has all of his own messages") + + let arr + // just waiting for them to arrive + for (let i = 0; i < 100; i++) { + arr = (await flatten(alice.db.msgs())) + // moot doesn't have msg.data + .filter((msg) => msg.metadata.account === bobID && msg.data) + .filter(msg => msg.metadata.domain === 'post') + .map((msg) => msg.data.text) + if (arr.length < n) { + await p(setTimeout)(100) + } else { + break + } + } + assert.equal(arr.length, n, `alice has ${arr.length} posts from bob`) await p(remoteAlice.close)(true) await p(alice.close)(true)