diff --git a/README.md b/README.md index 90f3d41..d1bc2b2 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,10 @@ PZP replication using Kleppmann's hash graph sync +https://martin.kleppmann.com/2020/12/02/bloom-filter-hash-graph-sync.html + +https://arxiv.org/abs/2012.00472 + ## Installation ``` diff --git a/lib/stream.js b/lib/stream.js index d11aa9b..6ac728f 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -108,6 +108,9 @@ class SyncStream extends Pipeable { this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => { if (!this.sink || this.sink.paused) return const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles)) + + this.resume() + for (const id of tangleIDs) { if (this.#realtimeSyncing.has(id)) { if (this.#receivableMsgs.has(msgID)) continue diff --git a/test/realtime.test.js b/test/realtime.test.js index 3dc3b12..45ee76c 100644 --- a/test/realtime.test.js +++ b/test/realtime.test.js @@ -167,3 +167,146 @@ test('sync feed msgs in realtime after the 9 rounds, reverse', async (t) => { await p(alice.close)(true) await p(bob.close)(true) }) + +test('create 100 messages in parallel that still manage to sync realtime (create an initial post before starting realtime too)', async (t) => { + const alice = createPeer({ name: 'alice' }) + const bob = createPeer({ name: 'bob' }) + + await alice.db.loaded() + await bob.db.loaded() + + const bobID = await p(bob.db.account.create)({ + subdomain: 'account', + _nonce: 'bob', + }) + + await p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: 'm0' }, + }) + assert('bob published post 0') + + const bobPostsID = bob.db.feed.getID(bobID, 'post') + + { + const arr = (await flatten(alice.db.msgs())) + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + assert.deepEqual(arr, [], 'alice has no posts from bob') + } + + bob.goals.set(bobPostsID, 'all') + alice.goals.set(bobPostsID, 'all') + + const remoteAlice = await p(bob.connect)(alice.getAddress()) + assert('alice and bob connected') + + bob.sync.start() + await p(setTimeout)(1000) + assert('sync!') + + const n = 100 + const hundred = [] + for (let i = 0; i < n; i++) { + hundred.push(i) + } + await Promise.all(hundred.map(i => p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: `post nr ${i}` }, + }))) + assert('bob published 100 posts in parallel') + + const bobMsgs = await flatten(bob.db.msgs()) + // 1 for creating bob's account, 1 for the 'post' moot, and 1 for first post + assert.equal(bobMsgs.length, n + 3, "bob has all of his own messages") + + let arr + // just waiting for them to arrive + for (let i = 0; i < 100; i++) { + arr = (await flatten(alice.db.msgs())) + // moot doesn't have msg.data + .filter((msg) => msg.metadata.account === bobID && msg.data) + .filter(msg => msg.metadata.domain === 'post') + .map((msg) => msg.data.text) + if (arr.length < n) { + await p(setTimeout)(100) + } else { + break + } + } + assert.equal(arr.length, n + 1, `alice has ${arr.length} posts from bob`) + + await p(remoteAlice.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +}) + +test('create 100 messages in parallel that still manage to sync realtime (without creating an initial post before starting realtime)', async (t) => { + const alice = createPeer({ name: 'alice' }) + const bob = createPeer({ name: 'bob' }) + + await alice.db.loaded() + await bob.db.loaded() + + const bobID = await p(bob.db.account.create)({ + subdomain: 'account', + _nonce: 'bob', + }) + + const bobPostsID = bob.db.feed.getID(bobID, 'post') + + { + const arr = (await flatten(alice.db.msgs())) + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + assert.deepEqual(arr, [], 'alice has no posts from bob') + } + + bob.goals.set(bobPostsID, 'all') + alice.goals.set(bobPostsID, 'all') + + const remoteAlice = await p(bob.connect)(alice.getAddress()) + assert('alice and bob connected') + + bob.sync.start() + await p(setTimeout)(1000) + assert('sync!') + + const n = 100 + const hundred = [] + for (let i = 0; i < n; i++) { + hundred.push(i) + } + await Promise.all(hundred.map(i => p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: `post nr ${i}` }, + }))) + assert('bob published 100 posts in parallel') + + const bobMsgs = await flatten(bob.db.msgs()) + // 1 for creating bob's account, and 1 for the 'post' moot + assert.equal(bobMsgs.length, n + 2, "bob has all of his own messages") + + let arr + // just waiting for them to arrive + for (let i = 0; i < 100; i++) { + arr = (await flatten(alice.db.msgs())) + // moot doesn't have msg.data + .filter((msg) => msg.metadata.account === bobID && msg.data) + .filter(msg => msg.metadata.domain === 'post') + .map((msg) => msg.data.text) + if (arr.length < n) { + await p(setTimeout)(100) + } else { + break + } + } + assert.equal(arr.length, n, `alice has ${arr.length} posts from bob`) + + await p(remoteAlice.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +}) \ No newline at end of file