diff --git a/lib/algorithm.js b/lib/algorithm.js index 62fc7c6..bd3c569 100644 --- a/lib/algorithm.js +++ b/lib/algorithm.js @@ -340,7 +340,7 @@ class Algorithm { * @param {Range} myWantRange * @returns {Array} */ - #filterReceivedMsgs(rootID, msgs, myWantRange) { + filterReceivedMsgs(rootID, msgs, myWantRange) { const [minWant, maxWant] = myWantRange const validNewMsgs = msgs @@ -351,9 +351,9 @@ class Algorithm { return false // the rootMsg is the only acceptable depth-zero msg } if (!msg.data) { - return depth <= maxWant + return true } else { - return minWant <= depth && depth <= maxWant + return minWant <= depth } }) .sort((a, b) => { @@ -369,6 +369,13 @@ class Algorithm { return validNewMsgs } + /** + * @param {Array} msgs + */ + getMsgIDs(msgs) { + return msgs.map((msg) => MsgV4.getMsgID(msg)) + } + /** * Takes the new msgs and adds them to the database. Also performs pruning as * post-processing. @@ -376,15 +383,12 @@ class Algorithm { * @param {string} rootID * @param {Array} newMsgs * @param {Goal} goal - * @param {Range} myWantRange */ - async commit(rootID, newMsgs, goal, myWantRange) { - const validNewMsgs = this.#filterReceivedMsgs(rootID, newMsgs, myWantRange) - + async commit(rootID, newMsgs, goal) { // TODO: Simulate adding this whole tangle, and check if it's valid // Add new messages - for (const msg of validNewMsgs) { + for (const msg of newMsgs) { try { if (msg.metadata.account === 'self') { await p(this.#peer.db.add)(msg, null /* infer tangleID */) diff --git a/lib/index.js b/lib/index.js index 749d21f..bdf842a 100644 --- a/lib/index.js +++ b/lib/index.js @@ -51,7 +51,8 @@ function initSync(peer, config) { function createStream(remoteId, iamClient) { // prettier-ignore debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId) - const stream = new SyncStream(peer.shse.pubkey, debug, peer.goals, algo) + const { shse, db, goals } = peer + const stream = new SyncStream(shse.pubkey, debug, db, goals, algo) streams.push(stream) return stream } diff --git a/lib/stream.js b/lib/stream.js index baf4a61..aaf330c 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -4,6 +4,8 @@ const { isEmptyRange } = require('./range') /** * @typedef {ReturnType} PPPPPGoals + * @typedef {ReturnType} PPPPPDB + * @typedef {import('ppppp-db').RecPresent} Rec * @typedef {import('ppppp-db/msg-v4').Msg} Msg * @typedef {import('./range').Range} Range * @typedef {import('./algorithm')} Algorithm @@ -28,6 +30,8 @@ class SyncStream extends Pipeable { #debug /** @type {Set} Set of tangleId */ #requested + /** @type {PPPPPDB} */ + #db /** @type {PPPPPGoals} */ #goals /** @@ -60,15 +64,17 @@ class SyncStream extends Pipeable { * @type {Map>} */ #sendableMsgs + /** @type {Set} */ + #realtimeSyncing /** - * * @param {string} localId * @param {CallableFunction} debug + * @param {PPPPPDB} db * @param {PPPPPGoals} goals * @param {Algorithm} algo */ - constructor(localId, debug, goals, algo) { + constructor(localId, debug, db, goals, algo) { super() this.paused = false // TODO: should we start as paused=true? this.ended = false @@ -76,9 +82,11 @@ class SyncStream extends Pipeable { this.source = this.sink = null this.#myId = localId.slice(0, 6) this.#debug = debug + this.#db = db this.#goals = goals this.#algo = algo this.#requested = new Set() + this.#realtimeSyncing = new Set() this.#localHave = new Map() this.#localWant = new Map() this.#remoteHave = new Map() @@ -99,6 +107,23 @@ class SyncStream extends Pipeable { this.resume() } }) + + // Setup real-time syncing + this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => { + const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles)) + for (const id of tangleIDs) { + if (this.#realtimeSyncing.has(id)) { + if (this.#receivableMsgs.has(msgID)) continue + if (this.#receivableMsgs.get(id)?.has(msgID)) continue + if (this.#sendableMsgs.has(msgID)) continue + if (this.#sendableMsgs.get(id)?.has(msgID)) continue + this.sink.write({ id, phase: 9, payload: [msg] }) + // prettier-ignore + this.#debug('%s Stream OUTr: sent msg %s in %s', this.#myId, msgID, id) + return + } + } + }) } #canSend() { @@ -135,9 +160,9 @@ class SyncStream extends Pipeable { #sendLocalHave(id) { const localHaveRange = this.#algo.haveRange(id) this.#localHave.set(id, localHaveRange) - // prettier-ignore - this.#debug('%s Stream OUT1: send local have-range %o for %s', this.#myId, localHaveRange, id) this.sink.write({ id, phase: 1, payload: localHaveRange }) + // prettier-ignore + this.#debug('%s Stream OUT1: sent local have-range %o for %s', this.#myId, localHaveRange, id) } /** @@ -153,9 +178,9 @@ class SyncStream extends Pipeable { const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal) this.#localHave.set(id, haveRange) this.#localWant.set(id, wantRange) - // prettier-ignore - this.#debug('%s Stream OUT2: send local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } }) + // prettier-ignore + this.#debug('%s Stream OUT2: sent local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) } /** @@ -180,7 +205,7 @@ class SyncStream extends Pipeable { payload: { bloom: localBloom0, wantRange: localWant }, }) // prettier-ignore - this.#debug('%s Stream OUT3: send local want-range %o and bloom round 0 for %s', this.#myId, localWant, id) + this.#debug('%s Stream OUT3: sent local want-range %o and bloom round 0 for %s', this.#myId, localWant, id) } /** @@ -208,7 +233,7 @@ class SyncStream extends Pipeable { payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore - this.#debug('%s Stream OUT4: send bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem) + this.#debug('%s Stream OUT4: sent bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem) } /** @@ -241,7 +266,7 @@ class SyncStream extends Pipeable { payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore - this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) + this.#debug('%s Stream OUT%s: sent bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) } /** @@ -274,7 +299,7 @@ class SyncStream extends Pipeable { payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore - this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) + this.#debug('%s Stream OUT%s: sent bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) } /** @@ -310,7 +335,10 @@ class SyncStream extends Pipeable { payload: { msgs, bloom: localBloom }, }) // prettier-ignore - this.#debug('%s Stream OUT8: send bloom round %s plus %s msgs in %s', this.#myId, round, msgs.length, id) + this.#debug('%s Stream OUT8: sent bloom round %s plus %s msgs in %s', this.#myId, round, msgs.length, id) + if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) { + this.#realtimeSyncing.add(id) + } } /** @@ -335,28 +363,13 @@ class SyncStream extends Pipeable { const tangleMsgs = this.#algo.getTangleMsgs(id, msgIDs) const accountMsgs = this.#algo.filterAndFetchAccountMsgs(msgIDs) const msgs = accountMsgs.concat(tangleMsgs) - // prettier-ignore - this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id) this.sink.write({ id, phase: 9, payload: msgs }) - - const goal = this.#goals.get(id) - if (!goal) throw new Error(`No goal found for "${id}"`) - const localWantRange = this.#localWant.get(id) - if (!localWantRange) throw new Error(`Local want-range not set for ${id}`) - this.#requested.delete(id) - this.#localHave.delete(id) - this.#localWant.delete(id) - this.#remoteHave.delete(id) - this.#remoteWant.delete(id) - this.#receivableMsgs.delete(id) - this.#sendableMsgs.delete(id) - if (msgsForMe.length === 0) return - try { - this.#algo.commit(id, msgsForMe, goal, localWantRange) - } catch (err) { - // prettier-ignore - this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err) + // prettier-ignore + this.#debug('%s Stream OUT9: sent %s msgs in %s', this.#myId, msgs.length, id) + if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) { + this.#realtimeSyncing.add(id) } + this.#consumeMissingMsgs(id, msgsForMe) } /** @@ -365,24 +378,34 @@ class SyncStream extends Pipeable { * @returns */ #consumeMissingMsgs(id, msgsForMe) { - // prettier-ignore - this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, msgsForMe.length, id) - const localWantRange = this.#localWant.get(id) - this.#requested.delete(id) this.#localHave.delete(id) - this.#localWant.delete(id) this.#remoteHave.delete(id) this.#remoteWant.delete(id) - this.#receivableMsgs.delete(id) - this.#sendableMsgs.delete(id) if (msgsForMe.length === 0) return const goal = this.#goals.get(id) - if (!goal) throw new Error(`No goal found for "${id}"`) - if (!localWantRange) throw new Error(`Local want-range not set for "${id}"`) + if (!goal) { + this.#debug('%s Stream exception: no goal found for %s', this.#myId, id) + return + } + const localWantRange = this.#localWant.get(id) + if (!localWantRange) { + // prettier-ignore + this.#debug('%s Stream exception: local want-range not set for %s', this.#myId, id) + return + } + + const validMsgs = this.#algo.filterReceivedMsgs( + id, + msgsForMe, + localWantRange + ) + const validMsgIDs = this.#algo.getMsgIDs(validMsgs) + this.#updateReceivableMsgs(id, validMsgIDs) + try { - this.#algo.commit(id, msgsForMe, goal, localWantRange) + this.#algo.commit(id, validMsgs, goal) } catch (err) { // prettier-ignore this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err) @@ -400,9 +423,14 @@ class SyncStream extends Pipeable { const accountMsgs = this.#algo.getAccountMsgsFor(tangleMsgs) for (const msg of accountMsgs) msgs.push(msg) for (const msg of tangleMsgs) msgs.push(msg) - // prettier-ignore - this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id) + const msgIDs = this.#algo.getMsgIDs(msgs) + this.#updateSendableMsgs(id, msgIDs) this.sink.write({ id, phase: 9, payload: msgs }) + // prettier-ignore + this.#debug('%s Stream OUT9: sent %s msgs in %s', this.#myId, msgs.length, id) + if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) { + this.#realtimeSyncing.add(id) + } } // source method @@ -422,6 +450,7 @@ class SyncStream extends Pipeable { write(data) { const { id, phase, payload } = data + // TODO: validate that each data objects has the exact correct shape switch (phase) { case 0: { return this.#sendLocalHave(id) @@ -471,6 +500,8 @@ class SyncStream extends Pipeable { return this.#sendMissingMsgsRes(id, 2, bloom, msgs) } case 9: { + // prettier-ignore + this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, payload.length, id) return this.#consumeMissingMsgs(id, payload) } } diff --git a/test/realtime.test.js b/test/realtime.test.js new file mode 100644 index 0000000..1ce81bb --- /dev/null +++ b/test/realtime.test.js @@ -0,0 +1,79 @@ +const test = require('node:test') +const assert = require('node:assert') +const p = require('node:util').promisify +const { createPeer } = require('./util') + +test('sync feed msgs in realtime after the 9 rounds', async (t) => { + const alice = createPeer({ name: 'alice' }) + const bob = createPeer({ name: 'bob' }) + + await alice.db.loaded() + await bob.db.loaded() + + const bobID = await p(bob.db.account.create)({ + subdomain: 'account', + _nonce: 'bob', + }) + + await p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: 'm0' }, + }) + assert('bob published post 0') + + const bobPostsID = bob.db.feed.getID(bobID, 'post') + + { + const arr = [...alice.db.msgs()] + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + assert.deepEqual(arr, [], 'alice has no posts from bob') + } + + bob.goals.set(bobPostsID, 'all') + alice.goals.set(bobPostsID, 'all') + + const remoteAlice = await p(bob.connect)(alice.getAddress()) + assert('bob connected to alice') + + bob.sync.start() + await p(setTimeout)(1000) + assert('sync!') + + { + const arr = [...alice.db.msgs()] + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + assert.deepEqual(arr, ['m0'], 'alice has post 0 from bob') + } + + await p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: 'm1' }, + }) + assert('bob published post 1') + + await p(bob.db.feed.publish)({ + account: bobID, + domain: 'post', + data: { text: 'm2' }, + }) + assert('bob published post 2') + + for (let i = 0; i < 100; i++) { + const arr = [...alice.db.msgs()] + .filter((msg) => msg.metadata.account === bobID && msg.data) + .map((msg) => msg.data.text) + if (arr.length < 3) { + await p(setTimeout)(200) + continue + } + assert.deepEqual(arr, ['m0', 'm1', 'm2'], 'alice has posts 0..2 from bob') + } + + await p(remoteAlice.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +})