// @ts-ignore const Pipeable = require('push-stream/pipeable') const { isEmptyRange } = require('./range') /** * @typedef {ReturnType} PPPPPGoals * @typedef {ReturnType} PPPPPDB * @typedef {import('ppppp-db').RecPresent} Rec * @typedef {import('ppppp-db/msg-v4').Msg} Msg * @typedef {import('./range').Range} Range * @typedef {import('./algorithm')} Algorithm * @typedef {import('ppppp-goals').Goal} Goal * @typedef {string} MsgID * @typedef {{id: string}} WithId * @typedef {WithId & {phase: 0, payload?: undefined}} Data0 * @typedef {WithId & {phase: 1, payload: Range}} Data1 * @typedef {WithId & {phase: 2, payload: { haveRange: Range, wantRange: Range }}} Data2 * @typedef {WithId & {phase: 3, payload: { wantRange: Range, bloom: JSON }}} Data3 * @typedef {WithId & {phase: 4 | 5 | 6 | 7, payload: { msgIDs: Array, bloom: JSON }}} Data4567 * @typedef {WithId & {phase: 8, payload: { msgs: Array, bloom: JSON }}} Data8 * @typedef {WithId & {phase: 9, payload: Array}} Data9 * @typedef {Data0 | Data1 | Data2 | Data3 | Data4567 | Data8 | Data9} Data */ class SyncStream extends Pipeable { #myId /** @type {Algorithm} */ #algo /** @type {CallableFunction} */ #debug /** @type {Set} Set of tangleId */ #requested /** @type {PPPPPDB} */ #db /** @type {PPPPPGoals} */ #goals /** * tangleId => have-range by local peer * @type {Map} */ #localHave /** * tangleId => want-range by local peer * @type {Map} */ #localWant /** * tangleId => have-range by remote peer * @type {Map} */ #remoteHave /** * tangleId => want-range by remote peer * @type {Map} */ #remoteWant /** * tangleId => Set of msgIDs * @type {Map>} */ #receivableMsgs /** * tangleId => Set of msgIDs * @type {Map>} */ #sendableMsgs /** @type {Set} */ #realtimeSyncing /** * @param {string} localId * @param {CallableFunction} debug * @param {PPPPPDB} db * @param {PPPPPGoals} goals * @param {Algorithm} algo */ constructor(localId, debug, db, goals, algo) { super() this.paused = false // TODO: should we start as paused=true? this.ended = false /** @type {any} */ this.source = this.sink = null this.#myId = localId.slice(0, 6) this.#debug = debug this.#db = db this.#goals = goals this.#algo = algo this.#requested = new Set() this.#realtimeSyncing = new Set() this.#localHave = new Map() this.#localWant = new Map() this.#remoteHave = new Map() this.#remoteWant = new Map() this.#receivableMsgs = new Map() this.#sendableMsgs = new Map() } initiate() { for (const goal of this.#goals.list()) { this.#requested.add(goal.id) } this.resume() this.#goals.watch((/** @type {any} */ goal) => { if (!this.#requested.has(goal.id) && goal.type !== 'none') { this.#requested.add(goal.id) this.resume() } }) // Setup real-time syncing this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => { const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles)) for (const id of tangleIDs) { if (this.#realtimeSyncing.has(id)) { if (this.#receivableMsgs.has(msgID)) continue if (this.#receivableMsgs.get(id)?.has(msgID)) continue if (this.#sendableMsgs.has(msgID)) continue if (this.#sendableMsgs.get(id)?.has(msgID)) continue this.sink.write({ id, phase: 9, payload: [msg] }) // prettier-ignore this.#debug('%s Stream OUTr: sent msg %s in %s', this.#myId, msgID, id) return } } }) } #canSend() { return this.sink && !this.sink.paused && !this.ended } /** * @param {string} id * @param {Array} msgIDs */ #updateSendableMsgs(id, msgIDs) { const set = this.#sendableMsgs.get(id) ?? new Set() for (const msgID of msgIDs) { set.add(msgID) } this.#sendableMsgs.set(id, set) } /** * @param {string} id * @param {Array} msgIDs */ #updateReceivableMsgs(id, msgIDs) { const set = this.#receivableMsgs.get(id) ?? new Set() for (const msgID of msgIDs) { set.add(msgID) } this.#receivableMsgs.set(id, set) } /** * @param {string} id */ #sendLocalHave(id) { const localHaveRange = this.#algo.haveRange(id) this.#localHave.set(id, localHaveRange) this.sink.write({ id, phase: 1, payload: localHaveRange }) // prettier-ignore this.#debug('%s Stream OUT1: sent local have-range %o for %s', this.#myId, localHaveRange, id) } /** * @param {string} id * @param {Range} remoteHaveRange */ #sendLocalHaveAndWant(id, remoteHaveRange) { // prettier-ignore this.#debug('%s Stream IN1: got remote have-range %o for %s', this.#myId, remoteHaveRange, id) this.#remoteHave.set(id, remoteHaveRange) const goal = this.#goals.get(id) const haveRange = this.#algo.haveRange(id) const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal) this.#localHave.set(id, haveRange) this.#localWant.set(id, wantRange) this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } }) // prettier-ignore this.#debug('%s Stream OUT2: sent local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) } /** * @param {string} id * @param {Range} remoteHaveRange * @param {Range} remoteWantRange */ #sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) { // prettier-ignore this.#debug('%s Stream IN2: got remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id) this.#remoteHave.set(id, remoteHaveRange) this.#remoteWant.set(id, remoteWantRange) const goal = this.#goals.get(id) const haveRange = this.#localHave.get(id) if (!haveRange) throw new Error(`Local have-range not set for ${id}`) const localWant = this.#algo.wantRange(haveRange, remoteHaveRange, goal) this.#localWant.set(id, localWant) const localBloom0 = this.#algo.bloomFor(id, 0, localWant) this.sink.write({ id, phase: 3, payload: { bloom: localBloom0, wantRange: localWant }, }) // prettier-ignore this.#debug('%s Stream OUT3: sent local want-range %o and bloom round 0 for %s', this.#myId, localWant, id) } /** * @param {string} id * @param {Range} remoteWantRange * @param {JSON} remoteBloom */ #sendInitBloomRes(id, remoteWantRange, remoteBloom) { // prettier-ignore this.#debug('%s Stream IN3: got remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id) this.#remoteWant.set(id, remoteWantRange) const msgIDsForThem = this.#algo.getMsgsMissing( id, 0, remoteWantRange, remoteBloom // representation of everything they have for me ) this.#updateSendableMsgs(id, msgIDsForThem) const localWantRange = this.#localWant.get(id) if (!localWantRange) throw new Error(`Local want-range not set for ${id}`) const localBloom = this.#algo.bloomFor(id, 0, localWantRange) this.sink.write({ id, phase: 4, payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore this.#debug('%s Stream OUT4: sent bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem) } /** * @param {string} id * @param {number} phase * @param {number} round * @param {JSON} remoteBloom * @param {Array} msgIDsForMe */ #sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) { // prettier-ignore this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round-1, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`) this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.getMsgsMissing( id, round - 1, remoteWantRange, remoteBloom ) this.#updateSendableMsgs(id, msgIDsForThem) const extras = this.#receivableMsgs.get(id) const localWantRange = this.#localWant.get(id) if (!localWantRange) throw new Error(`Local want-range not set for ${id}`) const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) this.sink.write({ id, phase, payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore this.#debug('%s Stream OUT%s: sent bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) } /** * @param {string} id * @param {number} phase * @param {number} round * @param {JSON} remoteBloom * @param {Array} msgIDsForMe */ #sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) { // prettier-ignore this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`) this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.getMsgsMissing( id, round, remoteWantRange, remoteBloom ) this.#updateSendableMsgs(id, msgIDsForThem) const extras = this.#receivableMsgs.get(id) const localWantRange = this.#localWant.get(id) if (!localWantRange) throw new Error(`Local want-range not set for ${id}`) const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) this.sink.write({ id, phase, payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore this.#debug('%s Stream OUT%s: sent bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) } /** * @param {string} id * @param {number} round * @param {JSON} remoteBloom * @param {Array} msgIDsForMe */ #sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) { // prettier-ignore this.#debug('%s Stream IN7: got bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`) this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.getMsgsMissing( id, round, remoteWantRange, remoteBloom ) this.#updateSendableMsgs(id, msgIDsForThem) const msgIDs = this.#sendableMsgs.get(id) ?? new Set() const tangleMsgs = this.#algo.getTangleMsgs(id, msgIDs) const accountMsgs = this.#algo.filterAndFetchAccountMsgs(msgIDs) const msgs = accountMsgs.concat(tangleMsgs) const extras = this.#receivableMsgs.get(id) const localWantRange = this.#localWant.get(id) if (!localWantRange) throw new Error(`Local want-range not set for ${id}`) const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) this.sink.write({ id, phase: 8, payload: { msgs, bloom: localBloom }, }) // prettier-ignore this.#debug('%s Stream OUT8: sent bloom round %s plus %s msgs in %s', this.#myId, round, msgs.length, id) if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) { this.#realtimeSyncing.add(id) } } /** * @param {string} id * @param {number} round * @param {JSON} remoteBloom * @param {Array} msgsForMe */ #sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) { // prettier-ignore this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id) const remoteWantRange = this.#remoteWant.get(id) if (!remoteWantRange) throw new Error(`Remote want-range not set for ${id}`) const msgIDsForThem = this.#algo.getMsgsMissing( id, round, remoteWantRange, remoteBloom ) this.#updateSendableMsgs(id, msgIDsForThem) const msgIDs = this.#sendableMsgs.get(id) ?? new Set() const tangleMsgs = this.#algo.getTangleMsgs(id, msgIDs) const accountMsgs = this.#algo.filterAndFetchAccountMsgs(msgIDs) const msgs = accountMsgs.concat(tangleMsgs) this.sink.write({ id, phase: 9, payload: msgs }) // prettier-ignore this.#debug('%s Stream OUT9: sent %s msgs in %s', this.#myId, msgs.length, id) if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) { this.#realtimeSyncing.add(id) } this.#consumeMissingMsgs(id, msgsForMe) } /** * @param {string} id * @param {Array} msgsForMe * @returns */ #consumeMissingMsgs(id, msgsForMe) { this.#requested.delete(id) this.#localHave.delete(id) this.#remoteHave.delete(id) this.#remoteWant.delete(id) if (msgsForMe.length === 0) return const goal = this.#goals.get(id) if (!goal) { this.#debug('%s Stream exception: no goal found for %s', this.#myId, id) return } const localWantRange = this.#localWant.get(id) if (!localWantRange) { // prettier-ignore this.#debug('%s Stream exception: local want-range not set for %s', this.#myId, id) return } const validMsgs = this.#algo.filterReceivedMsgs( id, msgsForMe, localWantRange ) const validMsgIDs = this.#algo.getMsgIDs(validMsgs) this.#updateReceivableMsgs(id, validMsgIDs) try { this.#algo.commit(id, validMsgs, goal) } catch (err) { // prettier-ignore this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err) } } /** * @param {string} id * @param {Range} remoteWantRange */ #sendMsgsInRemoteWant(id, remoteWantRange) { const msgs = [] const rangeMsgs = this.#algo.getMsgsInRange(id, remoteWantRange) const tangleMsgs = this.#algo.getTangleMsgs(id, rangeMsgs) const accountMsgs = this.#algo.getAccountMsgsFor(tangleMsgs) for (const msg of accountMsgs) msgs.push(msg) for (const msg of tangleMsgs) msgs.push(msg) const msgIDs = this.#algo.getMsgIDs(msgs) this.#updateSendableMsgs(id, msgIDs) this.sink.write({ id, phase: 9, payload: msgs }) // prettier-ignore this.#debug('%s Stream OUT9: sent %s msgs in %s', this.#myId, msgs.length, id) if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) { this.#realtimeSyncing.add(id) } } // source method resume() { if (!this.sink || this.sink.paused) return for (const id of this.#requested) { if (!this.#canSend()) return this.write({ id, phase: 0 }) } } /** * sink method * @param {Data} data */ write(data) { const { id, phase, payload } = data // TODO: validate that each data objects has the exact correct shape switch (phase) { case 0: { return this.#sendLocalHave(id) } case 1: { return this.#sendLocalHaveAndWant(id, payload) } case 2: { const { haveRange, wantRange } = payload if (isEmptyRange(haveRange)) { // prettier-ignore this.#debug('%s Stream IN2: received remote have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) return this.#sendMsgsInRemoteWant(id, wantRange) } else { return this.#sendLocalWantAndInitBloom(id, haveRange, wantRange) } } case 3: { const { wantRange, bloom } = payload const haveRange = this.#remoteHave.get(id) if (haveRange && isEmptyRange(haveRange)) { // prettier-ignore this.#debug('%s Stream IN3: received remote want-range %o and remembers empty have-range %o for %s', this.#myId, wantRange, haveRange, id) return this.#sendMsgsInRemoteWant(id, wantRange) } else { return this.#sendInitBloomRes(id, wantRange, bloom) } } case 4: { const { bloom, msgIDs } = payload return this.#sendBloomReq(id, phase + 1, 1, bloom, msgIDs) } case 5: { const { bloom, msgIDs } = payload return this.#sendBloomRes(id, phase + 1, 1, bloom, msgIDs) } case 6: { const { bloom, msgIDs } = payload return this.#sendBloomReq(id, phase + 1, 2, bloom, msgIDs) } case 7: { const { bloom, msgIDs } = payload return this.#sendMissingMsgsReq(id, 2, bloom, msgIDs) } case 8: { const { bloom, msgs } = payload return this.#sendMissingMsgsRes(id, 2, bloom, msgs) } case 9: { // prettier-ignore this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, payload.length, id) return this.#consumeMissingMsgs(id, payload) } } this.#debug('Stream IN: unknown %o', data) } /** * source method * @param {Error} err */ abort(err) { this.ended = true if (this.source && !this.source.ended) this.source.abort(err) if (this.sink && !this.sink.ended) this.sink.end(err) } /** * sink method * @param {Error} err */ end(err) { this.ended = true if (this.source && !this.source.ended) this.source.abort(err) if (this.sink && !this.sink.ended) this.sink.end(err) } } module.exports = SyncStream