diff --git a/lib/stream.js b/lib/stream.js index 55522a6..5122ed6 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -103,13 +103,13 @@ class SyncStream extends Pipeable { const localHaveRange = this.#algo.haveRange(id) this.#localHave.set(id, localHaveRange) // prettier-ignore - this.#debug('%s Stream OUT: send local have-range %o for %s', this.#myId, localHaveRange, id) + this.#debug('%s Stream OUT1: send local have-range %o for %s', this.#myId, localHaveRange, id) this.sink.write({ id, phase: 1, payload: localHaveRange }) } #sendLocalHaveAndWant(id, remoteHaveRange) { // prettier-ignore - this.#debug('%s Stream IN: received remote have-range %o for %s', this.#myId, remoteHaveRange, id) + this.#debug('%s Stream IN1: got remote have-range %o for %s', this.#myId, remoteHaveRange, id) this.#remoteHave.set(id, remoteHaveRange) const goal = this.#goals.get(id) const haveRange = this.#algo.haveRange(id) @@ -117,13 +117,13 @@ class SyncStream extends Pipeable { this.#localHave.set(id, haveRange) this.#localWant.set(id, wantRange) // prettier-ignore - this.#debug('%s Stream OUT: send local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) + this.#debug('%s Stream OUT2: send local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } }) } #sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) { // prettier-ignore - this.#debug('%s Stream IN: received remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id) + this.#debug('%s Stream IN2: got remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id) this.#remoteHave.set(id, remoteHaveRange) this.#remoteWant.set(id, remoteWantRange) const goal = this.#goals.get(id) @@ -137,12 +137,12 @@ class SyncStream extends Pipeable { payload: { bloom: localBloom0, wantRange }, }) // prettier-ignore - this.#debug('%s Stream OUT: send local want-range %o and bloom round 0 for %s', this.#myId, wantRange, id) + this.#debug('%s Stream OUT3: send local want-range %o and bloom round 0 for %s', this.#myId, wantRange, id) } #sendInitBloomRes(id, remoteWantRange, remoteBloom) { // prettier-ignore - this.#debug('%s Stream IN: received remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id) + this.#debug('%s Stream IN3: got remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id) this.#remoteWant.set(id, remoteWantRange) const msgIDsForThem = this.#algo.msgsMissing( id, @@ -159,12 +159,12 @@ class SyncStream extends Pipeable { payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore - this.#debug('%s Stream OUT: send bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem) + this.#debug('%s Stream OUT4: send bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem) } #sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) { // prettier-ignore - this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round-1, id, msgIDsForMe) + this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round-1, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.msgsMissing( @@ -183,12 +183,12 @@ class SyncStream extends Pipeable { payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore - this.#debug('%s Stream OUT: send bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForThem) + this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) } #sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) { // prettier-ignore - this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe) + this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.msgsMissing( @@ -207,12 +207,12 @@ class SyncStream extends Pipeable { payload: { bloom: localBloom, msgIDs: msgIDsForThem }, }) // prettier-ignore - this.#debug('%s Stream OUT: send bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForThem) + this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) } - #sendLastBloomRes(id, phase, round, remoteBloom, msgIDsForMe) { + #sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) { // prettier-ignore - this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe) + this.#debug('%s Stream IN7: got bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.msgsMissing( @@ -222,30 +222,36 @@ class SyncStream extends Pipeable { remoteBloom ) this.#updateSendableMsgs(id, msgIDsForThem) - this.sink.write({ id, phase, payload: msgIDsForThem }) + const msgIDs = this.#sendableMsgs.get(id) ?? new Set() + const msgs = this.#algo.getTangleSlice(id, msgIDs) + const extras = this.#receivableMsgs.get(id) + const localWantRange = this.#localWant.get(id) + const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) + this.sink.write({ + id, + phase: 8, + payload: { msgs, bloom: localBloom }, + }) // prettier-ignore - this.#debug('%s Stream OUT: send msgIDs in %s: %o', this.#myId, id, msgIDsForThem) + this.#debug('%s Stream OUT8: send bloom round %s plus %s msgs in %s', this.#myId, round, msgs.length, id) } - #sendMissingMsgsReq(id, msgIDsForMe) { + #sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) { // prettier-ignore - this.#debug('%s Stream IN: received msgIDs in %s: %o', this.#myId, id, msgIDsForMe) - this.#updateReceivableMsgs(id, msgIDsForMe) + this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id) + const remoteWantRange = this.#remoteWant.get(id) + const msgIDsForThem = this.#algo.msgsMissing( + id, + round, + remoteWantRange, + remoteBloom + ) + this.#updateSendableMsgs(id, msgIDsForThem) const msgIDs = this.#sendableMsgs.get(id) ?? new Set() const msgs = this.#algo.getTangleSlice(id, msgIDs) // prettier-ignore - this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) + this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id) this.sink.write({ id, phase: 9, payload: msgs }) - } - - #sendMissingMsgsRes(id, msgsForMe) { - // prettier-ignore - this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id) - const msgIDs = this.#sendableMsgs.get(id) ?? new Set() - const msgs = this.#algo.getTangleSlice(id, msgIDs) - // prettier-ignore - this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) - this.sink.write({ id, phase: 10, payload: msgs }) const goal = this.#goals.get(id) const localWant = this.#localWant.get(id) @@ -267,7 +273,7 @@ class SyncStream extends Pipeable { #consumeMissingMsgs(id, msgsForMe) { // prettier-ignore - this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id) + this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, msgsForMe.length, id) const goal = this.#goals.get(id) const localWant = this.#localWant.get(id) @@ -293,8 +299,8 @@ class SyncStream extends Pipeable { msgs.push(msg) } // prettier-ignore - this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) - this.sink.write({ id, phase: 10, payload: msgs }) + this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id) + this.sink.write({ id, phase: 9, payload: msgs }) } // as a source @@ -322,7 +328,7 @@ class SyncStream extends Pipeable { const { haveRange, wantRange } = payload if (isEmptyRange(haveRange)) { // prettier-ignore - this.#debug('%s Stream IN: received remote have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) + this.#debug('%s Stream IN2: received remote have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) return this.#sendMsgsInRemoteWant(id, wantRange) } else { return this.#sendLocalWantAndInitBloom(id, haveRange, wantRange) @@ -333,7 +339,7 @@ class SyncStream extends Pipeable { const haveRange = this.#remoteHave.get(id) if (haveRange && isEmptyRange(haveRange)) { // prettier-ignore - this.#debug('%s Stream IN: received remote want-range want-range %o and remember empty have-range %o for %s', this.#myId, wantRange, haveRange, id) + this.#debug('%s Stream IN3: received remote want-range want-range %o and remember empty have-range %o for %s', this.#myId, wantRange, haveRange, id) return this.#sendMsgsInRemoteWant(id, wantRange) } else { return this.#sendInitBloomRes(id, wantRange, bloom) @@ -353,15 +359,13 @@ class SyncStream extends Pipeable { } case 7: { const { bloom, msgIDs } = payload - return this.#sendLastBloomRes(id, phase + 1, 2, bloom, msgIDs) + return this.#sendMissingMsgsReq(id, 2, bloom, msgIDs) } case 8: { - return this.#sendMissingMsgsReq(id, payload) + const { bloom, msgs } = payload + return this.#sendMissingMsgsRes(id, 2, bloom, msgs) } case 9: { - return this.#sendMissingMsgsRes(id, payload) - } - case 10: { return this.#consumeMissingMsgs(id, payload) } } diff --git a/protospec.md b/protospec.md new file mode 100644 index 0000000..e341de2 --- /dev/null +++ b/protospec.md @@ -0,0 +1,104 @@ +The bloom filter is a representation of msgs I already have in my want-range, +so you know you can (probably?) skip sending them to me. + +The "probably?" uncertainty is reduced by doing several rounds. + + +```mermaid +sequenceDiagram + +participant A as Alice +participant B as Bob +note over A: I want to sync tangle with ID "T" +A->>B: 1: Send local have-range for T + +%% opt Alice's have-range is empty +%% B->>A: 2: Send local have-range and (empty) want-range for ID +%% A->>B: Send local want-range for ID +%% B->>A: All msgs in remote want-range +%% note over A: done +%% end + +Note over B: Calculate local want-range based on
local have-range and remote have-range +B->>A: 2: Send local have-range and want-range for T + +%% opt Bob's have-range is empty +%% A->>B: All msgs in remote want-range +%% note over B: done +%% end + +Note over A: Calculate BF over all
msgs in my want-range +A->>B: 3: Send local want-range and BF for round 0 +Note over B: Read BF to know which
msgs they are (maybe) missing +Note over B: Calculate BF over all
msgs in my want-range +B->>A: 4: Send BF for round 0 and A's round 0 missing msg IDs +Note over A: ... +A->>B: 5: Send BF for round 1 and B's missing round 0 msg IDs +Note over B: ... +B->>A: 6: Send BF for round 1 and A' missing round 1 msg IDs +Note over A: ... +A->>B: 7: Send BF for round 2 and B's missing round 2 msg IDs +Note over B: ... +B->>A: 8: Send BF for round 2 and A's missing msgs +Note over A: Commit received msgs +A->>B: 9: Send B's missing msgs +Note over B: Commit received msgs +``` + +Peers exchange + +```typescript +type Range = [number, number] + +interface WithId { + /** TangleID: msg hash of the tangle's root msg */ + id: string, +} + +interface Data1 extends WithId { + phase: 1, + payload: Range, +} + +interface Data2 extends WithId { + phase: 2, + payload: { + haveRange: Range, + wantRange: Range, + } +} + +interface Data3 extends WithId { + phase: 3, + payload: { + wantRange: Range, + bloom: string, // "bloom-filters" specific format TODO: generalize + } +} + +interface Data4567 extends WithId { + phase: 4 | 5 | 6 | 7, + payload: { + msgIDs: Array, + bloom: string, // "bloom-filters" specific format TODO: generalize + } +} + +interface Data8 extends WithId { + phase: 8, + payload: Array, +} + +interface Data9 extends WithId { + phase: 9, + payload: Array, +} + +type Data = { + id: string, // TangleID: msg hash of the tangle's root msg + phase: 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10, + payload: { + + } +} +``` \ No newline at end of file