From 47eb2dd27fa32f79db9a320e0fff8c5cdc393bdf Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Thu, 29 Feb 2024 11:30:39 +0200 Subject: [PATCH] realtime sync works in both connection directions --- lib/stream.js | 29 ++++++------- test/realtime.test.js | 96 +++++++++++++++++++++++++++++++++++++++---- 2 files changed, 104 insertions(+), 21 deletions(-) diff --git a/lib/stream.js b/lib/stream.js index aaf330c..774e555 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -93,23 +93,10 @@ class SyncStream extends Pipeable { this.#remoteWant = new Map() this.#receivableMsgs = new Map() this.#sendableMsgs = new Map() - } - - initiate() { - for (const goal of this.#goals.list()) { - this.#requested.add(goal.id) - } - this.resume() - - this.#goals.watch((/** @type {any} */ goal) => { - if (!this.#requested.has(goal.id) && goal.type !== 'none') { - this.#requested.add(goal.id) - this.resume() - } - }) // Setup real-time syncing this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => { + if (!this.sink || this.sink.paused) return const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles)) for (const id of tangleIDs) { if (this.#realtimeSyncing.has(id)) { @@ -126,6 +113,20 @@ class SyncStream extends Pipeable { }) } + initiate() { + for (const goal of this.#goals.list()) { + this.#requested.add(goal.id) + } + this.resume() + + this.#goals.watch((/** @type {any} */ goal) => { + if (!this.#requested.has(goal.id) && goal.type !== 'none') { + this.#requested.add(goal.id) + this.resume() + } + }) + } + #canSend() { return this.sink && !this.sink.paused && !this.ended } diff --git a/test/realtime.test.js b/test/realtime.test.js index 1ce81bb..3e9eca0 100644 --- a/test/realtime.test.js +++ b/test/realtime.test.js @@ -62,13 +62,16 @@ test('sync feed msgs in realtime after the 9 rounds', async (t) => { }) assert('bob published post 2') - for (let i = 0; i < 100; i++) { - const arr = [...alice.db.msgs()] - .filter((msg) => msg.metadata.account === bobID && msg.data) - .map((msg) => msg.data.text) - if (arr.length < 3) { - await p(setTimeout)(200) - continue + { + let arr + for (let i = 0; i < 100; i++) { + arr = [...alice.db.msgs()] + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + if (arr.length < 3) { + await p(setTimeout)(200) + continue + } } assert.deepEqual(arr, ['m0', 'm1', 'm2'], 'alice has posts 0..2 from bob') } @@ -77,3 +80,82 @@ test('sync feed msgs in realtime after the 9 rounds', async (t) => { await p(alice.close)(true) await p(bob.close)(true) }) + +test('sync feed msgs in realtime after the 9 rounds, reverse', async (t) => { + const alice = createPeer({ name: 'alice' }) + const bob = createPeer({ name: 'bob' }) + + await alice.db.loaded() + await bob.db.loaded() + + const bobID = await p(bob.db.account.create)({ + subdomain: 'account', + _nonce: 'bob', + }) + + await p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: 'm0' }, + }) + assert('bob published post 0') + + const bobPostsID = bob.db.feed.getID(bobID, 'post') + + { + const arr = [...alice.db.msgs()] + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + assert.deepEqual(arr, [], 'alice has no posts from bob') + } + + bob.goals.set(bobPostsID, 'all') + alice.goals.set(bobPostsID, 'all') + + const remoteBob = await p(alice.connect)(bob.getAddress()) + assert('bob connected to alice') + + // Reverse direction of who "starts" + alice.sync.start() + await p(setTimeout)(1000) + assert('sync!') + + { + const arr = [...alice.db.msgs()] + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + assert.deepEqual(arr, ['m0'], 'alice has post 0 from bob') + } + + await p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: 'm1' }, + }) + assert('bob published post 1') + + await p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: 'm2' }, + }) + assert('bob published post 2') + + { + let arr + for (let i = 0; i < 100; i++) { + arr = [...alice.db.msgs()] + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + if (arr.length < 3) { + await p(setTimeout)(200) + continue + } + } + assert.deepEqual(arr, ['m0', 'm1', 'm2'], 'alice has posts 0..2 from bob') + } + + await p(remoteBob.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +})