const Pipeable = require('push-stream/pipeable') class SyncStream extends Pipeable { #myId #debug #algo #requested #remoteHave #remoteWant #receivableMsgs #sendableMsgs constructor(localId, debug, algo) { super() this.paused = false // TODO: should we start as paused=true? this.ended = false this.source = this.sink = null this.#myId = localId.slice(0, 6) this.#debug = debug this.#algo = algo this.#requested = new Set() this.#remoteHave = new Map() // id => have-range by remote peer this.#remoteWant = new Map() // id => want-range by remote peer this.#receivableMsgs = new Map() // id => Set this.#sendableMsgs = new Map() // id => Set } // public API request(id) { this.#requested.add(id) this.resume() } #canSend() { return this.sink && !this.sink.paused && !this.ended } #updateSendableMsgs(id, msgs) { const set = this.#sendableMsgs.get(id) ?? new Set() for (const msg of msgs) { set.add(msg) } this.#sendableMsgs.set(id, set) } #updateReceivableMsgs(id, msgs) { const set = this.#receivableMsgs.get(id) ?? new Set() for (const msg of msgs) { set.add(msg) } this.#receivableMsgs.set(id, set) } #sendLocalHave(id) { const localHaveRange = this.#algo.haveRange(id) // prettier-ignore this.#debug('%s Stream OUT: send local have-range %o for %s', this.#myId, localHaveRange, id) this.sink.write({ id, phase: 1, payload: localHaveRange }) } #sendLocalHaveAndWant(id, remoteHaveRange) { // prettier-ignore this.#debug('%s Stream IN: received remote have-range %o for %s', this.#myId, remoteHaveRange, id) this.#remoteHave.set(id, remoteHaveRange) const haveRange = this.#algo.haveRange(id) const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange) // prettier-ignore this.#debug('%s Stream OUT: send local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } }) } #sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) { // prettier-ignore this.#debug('%s Stream IN: received remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id) this.#remoteHave.set(id, remoteHaveRange) this.#remoteWant.set(id, remoteWantRange) const haveRange = this.#algo.haveRange(id) const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange) const localBloom0 = this.#algo.bloomFor(id, 0, remoteWantRange) this.sink.write({ id, phase: 3, payload: { bloom: localBloom0, wantRange }, }) // prettier-ignore this.#debug('%s Stream OUT: send local want-range %o and bloom round 0 for %s', this.#myId, wantRange, id) } #sendInitBloomRes(id, remoteWantRange, remoteBloom) { // prettier-ignore this.#debug('%s Stream IN: received remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id) this.#remoteWant.set(id, remoteWantRange) const msgIDsForThem = this.#algo.msgsMissing( id, 0, remoteWantRange, remoteBloom ) this.#updateSendableMsgs(id, msgIDsForThem) const localBloom = this.#algo.bloomFor(id, 0, remoteWantRange) this.sink.write({ id, phase: 4, payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore this.#debug('%s Stream OUT: send bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem) } #sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) { // prettier-ignore this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round-1, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.msgsMissing( id, round - 1, remoteWantRange, remoteBloom ) this.#updateSendableMsgs(id, msgIDsForThem) const localBloom = this.#algo.bloomFor( id, round, remoteWantRange, this.#receivableMsgs.get(id) ) this.sink.write({ id, phase, payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore this.#debug('%s Stream OUT: send bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForThem) } #sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) { // prettier-ignore this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.msgsMissing( id, round, remoteWantRange, remoteBloom ) this.#updateSendableMsgs(id, msgIDsForThem) const localBloom = this.#algo.bloomFor( id, round, remoteWantRange, this.#receivableMsgs.get(id) ) this.sink.write({ id, phase, payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore this.#debug('%s Stream OUT: send bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForThem) } #sendLastBloomRes(id, phase, round, remoteBloom, msgIDsForMe) { // prettier-ignore this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.msgsMissing( id, round, remoteWantRange, remoteBloom ) this.#updateSendableMsgs(id, msgIDsForThem) this.sink.write({ id, phase, payload: msgIDsForThem }) // prettier-ignore this.#debug('%s Stream OUT: send msgIDs in %s: %o', this.#myId, id, msgIDsForThem) } #sendMissingMsgsReq(id, msgIDsForMe) { // prettier-ignore this.#debug('%s Stream IN: received msgIDs in %s: %o', this.#myId, id, msgIDsForMe) this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDs = this.#sendableMsgs.has(id) ? [...this.#sendableMsgs.get(id)] : [] const msgs = this.#algo.getMsgs(msgIDs) // prettier-ignore this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) this.sink.write({ id, phase: 9, payload: msgs }) } #sendMissingMsgsRes(id, msgsForMe) { // prettier-ignore this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id) const msgIDs = this.#sendableMsgs.has(id) ? [...this.#sendableMsgs.get(id)] : [] const msgs = this.#algo.getMsgs(msgIDs) // prettier-ignore this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) this.sink.write({ id, phase: 10, payload: msgs }) this.#requested.delete(id) this.#remoteHave.delete(id) this.#remoteWant.delete(id) this.#receivableMsgs.delete(id) this.#sendableMsgs.delete(id) if (msgsForMe.length === 0) return this.#algo.commit(msgsForMe, id, (err) => { // prettier-ignore if (err) throw new Error('sendMissingMsgsRes failed because sink failed', {cause: err}) }) } #consumeMissingMsgs(id, msgsForMe) { // prettier-ignore this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id) this.#requested.delete(id) this.#remoteHave.delete(id) this.#remoteWant.delete(id) this.#receivableMsgs.delete(id) this.#sendableMsgs.delete(id) if (msgsForMe.length === 0) return this.#algo.commit(msgsForMe, id, (err) => { // prettier-ignore if (err) throw new Error('sendMissingMsgsRes failed because sink failed', {cause: err}) }) } #sendMsgsInRemoteWant(id, remoteWantRange) { const msgs = [] for (const msg of this.#algo.yieldMsgsIn(id, remoteWantRange)) { msgs.push(msg) } // prettier-ignore this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) this.sink.write({ id, phase: 10, payload: msgs }) } // as a source resume() { if (!this.sink || this.sink.paused) return for (const id of this.#requested) { if (!this.#canSend()) return this.#sendLocalHave(id) } } // as a sink write(data) { const { id, phase, payload } = data switch (phase) { case 1: { return this.#sendLocalHaveAndWant(id, payload) } case 2: { const { haveRange, wantRange } = payload if (this.#algo.isEmptyRange(haveRange)) { // prettier-ignore this.#debug('%s Stream IN: received remote have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) return this.#sendMsgsInRemoteWant(id, wantRange) } else { return this.#sendLocalWantAndInitBloom(id, haveRange, wantRange) } } case 3: { const { wantRange, bloom } = payload const haveRange = this.#remoteHave.get(id) if (haveRange && this.#algo.isEmptyRange(haveRange)) { // prettier-ignore this.#debug('%s Stream IN: received remote want-range want-range %o and remember empty have-range %o for %s', this.#myId, wantRange, haveRange, id) return this.#sendMsgsInRemoteWant(id, wantRange) } else { return this.#sendInitBloomRes(id, wantRange, bloom) } } case 4: { const { bloom, msgIDs } = payload return this.#sendBloomReq(id, phase + 1, 1, bloom, msgIDs) } case 5: { const { bloom, msgIDs } = payload return this.#sendBloomRes(id, phase + 1, 1, bloom, msgIDs) } case 6: { const { bloom, msgIDs } = payload return this.#sendBloomReq(id, phase + 1, 2, bloom, msgIDs) } case 7: { const { bloom, msgIDs } = payload return this.#sendLastBloomRes(id, phase + 1, 2, bloom, msgIDs) } case 8: { return this.#sendMissingMsgsReq(id, payload) } case 9: { return this.#sendMissingMsgsRes(id, payload) } case 10: { return this.#consumeMissingMsgs(id, payload) } } this.#debug('Stream IN: unknown %o', data) } // as a source abort(err) { this.ended = true if (this.source && !this.source.ended) this.source.abort(err) if (this.sink && !this.sink.ended) this.sink.end(err) } // as a sink end(err) { this.ended = true if (this.source && !this.source.ended) this.source.abort(err) if (this.sink && !this.sink.ended) this.sink.end(err) } } module.exports = SyncStream