mirror of https://codeberg.org/pzp/pzp-sync.git
revise phases in Stream and write down spec
This commit is contained in:
parent
4051ba00e2
commit
2a2b184dd1
|
@ -103,13 +103,13 @@ class SyncStream extends Pipeable {
|
||||||
const localHaveRange = this.#algo.haveRange(id)
|
const localHaveRange = this.#algo.haveRange(id)
|
||||||
this.#localHave.set(id, localHaveRange)
|
this.#localHave.set(id, localHaveRange)
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream OUT: send local have-range %o for %s', this.#myId, localHaveRange, id)
|
this.#debug('%s Stream OUT1: send local have-range %o for %s', this.#myId, localHaveRange, id)
|
||||||
this.sink.write({ id, phase: 1, payload: localHaveRange })
|
this.sink.write({ id, phase: 1, payload: localHaveRange })
|
||||||
}
|
}
|
||||||
|
|
||||||
#sendLocalHaveAndWant(id, remoteHaveRange) {
|
#sendLocalHaveAndWant(id, remoteHaveRange) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN: received remote have-range %o for %s', this.#myId, remoteHaveRange, id)
|
this.#debug('%s Stream IN1: got remote have-range %o for %s', this.#myId, remoteHaveRange, id)
|
||||||
this.#remoteHave.set(id, remoteHaveRange)
|
this.#remoteHave.set(id, remoteHaveRange)
|
||||||
const goal = this.#goals.get(id)
|
const goal = this.#goals.get(id)
|
||||||
const haveRange = this.#algo.haveRange(id)
|
const haveRange = this.#algo.haveRange(id)
|
||||||
|
@ -117,13 +117,13 @@ class SyncStream extends Pipeable {
|
||||||
this.#localHave.set(id, haveRange)
|
this.#localHave.set(id, haveRange)
|
||||||
this.#localWant.set(id, wantRange)
|
this.#localWant.set(id, wantRange)
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream OUT: send local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id)
|
this.#debug('%s Stream OUT2: send local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id)
|
||||||
this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } })
|
this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } })
|
||||||
}
|
}
|
||||||
|
|
||||||
#sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) {
|
#sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN: received remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id)
|
this.#debug('%s Stream IN2: got remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id)
|
||||||
this.#remoteHave.set(id, remoteHaveRange)
|
this.#remoteHave.set(id, remoteHaveRange)
|
||||||
this.#remoteWant.set(id, remoteWantRange)
|
this.#remoteWant.set(id, remoteWantRange)
|
||||||
const goal = this.#goals.get(id)
|
const goal = this.#goals.get(id)
|
||||||
|
@ -137,12 +137,12 @@ class SyncStream extends Pipeable {
|
||||||
payload: { bloom: localBloom0, wantRange },
|
payload: { bloom: localBloom0, wantRange },
|
||||||
})
|
})
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream OUT: send local want-range %o and bloom round 0 for %s', this.#myId, wantRange, id)
|
this.#debug('%s Stream OUT3: send local want-range %o and bloom round 0 for %s', this.#myId, wantRange, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
#sendInitBloomRes(id, remoteWantRange, remoteBloom) {
|
#sendInitBloomRes(id, remoteWantRange, remoteBloom) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN: received remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id)
|
this.#debug('%s Stream IN3: got remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id)
|
||||||
this.#remoteWant.set(id, remoteWantRange)
|
this.#remoteWant.set(id, remoteWantRange)
|
||||||
const msgIDsForThem = this.#algo.msgsMissing(
|
const msgIDsForThem = this.#algo.msgsMissing(
|
||||||
id,
|
id,
|
||||||
|
@ -159,12 +159,12 @@ class SyncStream extends Pipeable {
|
||||||
payload: { bloom: localBloom, msgIDs: msgIDsForThem },
|
payload: { bloom: localBloom, msgIDs: msgIDsForThem },
|
||||||
})
|
})
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream OUT: send bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem)
|
this.#debug('%s Stream OUT4: send bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem)
|
||||||
}
|
}
|
||||||
|
|
||||||
#sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) {
|
#sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round-1, id, msgIDsForMe)
|
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round-1, id, msgIDsForMe)
|
||||||
const remoteWantRange = this.#remoteWant.get(id)
|
const remoteWantRange = this.#remoteWant.get(id)
|
||||||
this.#updateReceivableMsgs(id, msgIDsForMe)
|
this.#updateReceivableMsgs(id, msgIDsForMe)
|
||||||
const msgIDsForThem = this.#algo.msgsMissing(
|
const msgIDsForThem = this.#algo.msgsMissing(
|
||||||
|
@ -183,12 +183,12 @@ class SyncStream extends Pipeable {
|
||||||
payload: { bloom: localBloom, msgIDs: msgIDsForThem },
|
payload: { bloom: localBloom, msgIDs: msgIDsForThem },
|
||||||
})
|
})
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream OUT: send bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForThem)
|
this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem)
|
||||||
}
|
}
|
||||||
|
|
||||||
#sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) {
|
#sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe)
|
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round, id, msgIDsForMe)
|
||||||
const remoteWantRange = this.#remoteWant.get(id)
|
const remoteWantRange = this.#remoteWant.get(id)
|
||||||
this.#updateReceivableMsgs(id, msgIDsForMe)
|
this.#updateReceivableMsgs(id, msgIDsForMe)
|
||||||
const msgIDsForThem = this.#algo.msgsMissing(
|
const msgIDsForThem = this.#algo.msgsMissing(
|
||||||
|
@ -207,12 +207,12 @@ class SyncStream extends Pipeable {
|
||||||
payload: { bloom: localBloom, msgIDs: msgIDsForThem },
|
payload: { bloom: localBloom, msgIDs: msgIDsForThem },
|
||||||
})
|
})
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream OUT: send bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForThem)
|
this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem)
|
||||||
}
|
}
|
||||||
|
|
||||||
#sendLastBloomRes(id, phase, round, remoteBloom, msgIDsForMe) {
|
#sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe)
|
this.#debug('%s Stream IN7: got bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe)
|
||||||
const remoteWantRange = this.#remoteWant.get(id)
|
const remoteWantRange = this.#remoteWant.get(id)
|
||||||
this.#updateReceivableMsgs(id, msgIDsForMe)
|
this.#updateReceivableMsgs(id, msgIDsForMe)
|
||||||
const msgIDsForThem = this.#algo.msgsMissing(
|
const msgIDsForThem = this.#algo.msgsMissing(
|
||||||
|
@ -222,30 +222,36 @@ class SyncStream extends Pipeable {
|
||||||
remoteBloom
|
remoteBloom
|
||||||
)
|
)
|
||||||
this.#updateSendableMsgs(id, msgIDsForThem)
|
this.#updateSendableMsgs(id, msgIDsForThem)
|
||||||
this.sink.write({ id, phase, payload: msgIDsForThem })
|
const msgIDs = this.#sendableMsgs.get(id) ?? new Set()
|
||||||
|
const msgs = this.#algo.getTangleSlice(id, msgIDs)
|
||||||
|
const extras = this.#receivableMsgs.get(id)
|
||||||
|
const localWantRange = this.#localWant.get(id)
|
||||||
|
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras)
|
||||||
|
this.sink.write({
|
||||||
|
id,
|
||||||
|
phase: 8,
|
||||||
|
payload: { msgs, bloom: localBloom },
|
||||||
|
})
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream OUT: send msgIDs in %s: %o', this.#myId, id, msgIDsForThem)
|
this.#debug('%s Stream OUT8: send bloom round %s plus %s msgs in %s', this.#myId, round, msgs.length, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
#sendMissingMsgsReq(id, msgIDsForMe) {
|
#sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN: received msgIDs in %s: %o', this.#myId, id, msgIDsForMe)
|
this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id)
|
||||||
this.#updateReceivableMsgs(id, msgIDsForMe)
|
const remoteWantRange = this.#remoteWant.get(id)
|
||||||
|
const msgIDsForThem = this.#algo.msgsMissing(
|
||||||
|
id,
|
||||||
|
round,
|
||||||
|
remoteWantRange,
|
||||||
|
remoteBloom
|
||||||
|
)
|
||||||
|
this.#updateSendableMsgs(id, msgIDsForThem)
|
||||||
const msgIDs = this.#sendableMsgs.get(id) ?? new Set()
|
const msgIDs = this.#sendableMsgs.get(id) ?? new Set()
|
||||||
const msgs = this.#algo.getTangleSlice(id, msgIDs)
|
const msgs = this.#algo.getTangleSlice(id, msgIDs)
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id)
|
this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id)
|
||||||
this.sink.write({ id, phase: 9, payload: msgs })
|
this.sink.write({ id, phase: 9, payload: msgs })
|
||||||
}
|
|
||||||
|
|
||||||
#sendMissingMsgsRes(id, msgsForMe) {
|
|
||||||
// prettier-ignore
|
|
||||||
this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id)
|
|
||||||
const msgIDs = this.#sendableMsgs.get(id) ?? new Set()
|
|
||||||
const msgs = this.#algo.getTangleSlice(id, msgIDs)
|
|
||||||
// prettier-ignore
|
|
||||||
this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id)
|
|
||||||
this.sink.write({ id, phase: 10, payload: msgs })
|
|
||||||
|
|
||||||
const goal = this.#goals.get(id)
|
const goal = this.#goals.get(id)
|
||||||
const localWant = this.#localWant.get(id)
|
const localWant = this.#localWant.get(id)
|
||||||
|
@ -267,7 +273,7 @@ class SyncStream extends Pipeable {
|
||||||
|
|
||||||
#consumeMissingMsgs(id, msgsForMe) {
|
#consumeMissingMsgs(id, msgsForMe) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id)
|
this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, msgsForMe.length, id)
|
||||||
|
|
||||||
const goal = this.#goals.get(id)
|
const goal = this.#goals.get(id)
|
||||||
const localWant = this.#localWant.get(id)
|
const localWant = this.#localWant.get(id)
|
||||||
|
@ -293,8 +299,8 @@ class SyncStream extends Pipeable {
|
||||||
msgs.push(msg)
|
msgs.push(msg)
|
||||||
}
|
}
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id)
|
this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id)
|
||||||
this.sink.write({ id, phase: 10, payload: msgs })
|
this.sink.write({ id, phase: 9, payload: msgs })
|
||||||
}
|
}
|
||||||
|
|
||||||
// as a source
|
// as a source
|
||||||
|
@ -322,7 +328,7 @@ class SyncStream extends Pipeable {
|
||||||
const { haveRange, wantRange } = payload
|
const { haveRange, wantRange } = payload
|
||||||
if (isEmptyRange(haveRange)) {
|
if (isEmptyRange(haveRange)) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN: received remote have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id)
|
this.#debug('%s Stream IN2: received remote have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id)
|
||||||
return this.#sendMsgsInRemoteWant(id, wantRange)
|
return this.#sendMsgsInRemoteWant(id, wantRange)
|
||||||
} else {
|
} else {
|
||||||
return this.#sendLocalWantAndInitBloom(id, haveRange, wantRange)
|
return this.#sendLocalWantAndInitBloom(id, haveRange, wantRange)
|
||||||
|
@ -333,7 +339,7 @@ class SyncStream extends Pipeable {
|
||||||
const haveRange = this.#remoteHave.get(id)
|
const haveRange = this.#remoteHave.get(id)
|
||||||
if (haveRange && isEmptyRange(haveRange)) {
|
if (haveRange && isEmptyRange(haveRange)) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN: received remote want-range want-range %o and remember empty have-range %o for %s', this.#myId, wantRange, haveRange, id)
|
this.#debug('%s Stream IN3: received remote want-range want-range %o and remember empty have-range %o for %s', this.#myId, wantRange, haveRange, id)
|
||||||
return this.#sendMsgsInRemoteWant(id, wantRange)
|
return this.#sendMsgsInRemoteWant(id, wantRange)
|
||||||
} else {
|
} else {
|
||||||
return this.#sendInitBloomRes(id, wantRange, bloom)
|
return this.#sendInitBloomRes(id, wantRange, bloom)
|
||||||
|
@ -353,15 +359,13 @@ class SyncStream extends Pipeable {
|
||||||
}
|
}
|
||||||
case 7: {
|
case 7: {
|
||||||
const { bloom, msgIDs } = payload
|
const { bloom, msgIDs } = payload
|
||||||
return this.#sendLastBloomRes(id, phase + 1, 2, bloom, msgIDs)
|
return this.#sendMissingMsgsReq(id, 2, bloom, msgIDs)
|
||||||
}
|
}
|
||||||
case 8: {
|
case 8: {
|
||||||
return this.#sendMissingMsgsReq(id, payload)
|
const { bloom, msgs } = payload
|
||||||
|
return this.#sendMissingMsgsRes(id, 2, bloom, msgs)
|
||||||
}
|
}
|
||||||
case 9: {
|
case 9: {
|
||||||
return this.#sendMissingMsgsRes(id, payload)
|
|
||||||
}
|
|
||||||
case 10: {
|
|
||||||
return this.#consumeMissingMsgs(id, payload)
|
return this.#consumeMissingMsgs(id, payload)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,104 @@
|
||||||
|
The bloom filter is a representation of msgs I already have in my want-range,
|
||||||
|
so you know you can (probably?) skip sending them to me.
|
||||||
|
|
||||||
|
The "probably?" uncertainty is reduced by doing several rounds.
|
||||||
|
|
||||||
|
|
||||||
|
```mermaid
|
||||||
|
sequenceDiagram
|
||||||
|
|
||||||
|
participant A as Alice
|
||||||
|
participant B as Bob
|
||||||
|
note over A: I want to sync tangle with ID "T"
|
||||||
|
A->>B: 1: Send local have-range for T
|
||||||
|
|
||||||
|
%% opt Alice's have-range is empty
|
||||||
|
%% B->>A: 2: Send local have-range and (empty) want-range for ID
|
||||||
|
%% A->>B: Send local want-range for ID
|
||||||
|
%% B->>A: All msgs in remote want-range
|
||||||
|
%% note over A: done
|
||||||
|
%% end
|
||||||
|
|
||||||
|
Note over B: Calculate local want-range based on<br/>local have-range and remote have-range
|
||||||
|
B->>A: 2: Send local have-range and want-range for T
|
||||||
|
|
||||||
|
%% opt Bob's have-range is empty
|
||||||
|
%% A->>B: All msgs in remote want-range
|
||||||
|
%% note over B: done
|
||||||
|
%% end
|
||||||
|
|
||||||
|
Note over A: Calculate BF over all<br />msgs in my want-range
|
||||||
|
A->>B: 3: Send local want-range and BF for round 0
|
||||||
|
Note over B: Read BF to know which<br />msgs they are (maybe) missing
|
||||||
|
Note over B: Calculate BF over all<br />msgs in my want-range
|
||||||
|
B->>A: 4: Send BF for round 0 and A's round 0 missing msg IDs
|
||||||
|
Note over A: ...
|
||||||
|
A->>B: 5: Send BF for round 1 and B's missing round 0 msg IDs
|
||||||
|
Note over B: ...
|
||||||
|
B->>A: 6: Send BF for round 1 and A' missing round 1 msg IDs
|
||||||
|
Note over A: ...
|
||||||
|
A->>B: 7: Send BF for round 2 and B's missing round 2 msg IDs
|
||||||
|
Note over B: ...
|
||||||
|
B->>A: 8: Send BF for round 2 and A's missing msgs
|
||||||
|
Note over A: Commit received msgs
|
||||||
|
A->>B: 9: Send B's missing msgs
|
||||||
|
Note over B: Commit received msgs
|
||||||
|
```
|
||||||
|
|
||||||
|
Peers exchange
|
||||||
|
|
||||||
|
```typescript
|
||||||
|
type Range = [number, number]
|
||||||
|
|
||||||
|
interface WithId {
|
||||||
|
/** TangleID: msg hash of the tangle's root msg */
|
||||||
|
id: string,
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Data1 extends WithId {
|
||||||
|
phase: 1,
|
||||||
|
payload: Range,
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Data2 extends WithId {
|
||||||
|
phase: 2,
|
||||||
|
payload: {
|
||||||
|
haveRange: Range,
|
||||||
|
wantRange: Range,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Data3 extends WithId {
|
||||||
|
phase: 3,
|
||||||
|
payload: {
|
||||||
|
wantRange: Range,
|
||||||
|
bloom: string, // "bloom-filters" specific format TODO: generalize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Data4567 extends WithId {
|
||||||
|
phase: 4 | 5 | 6 | 7,
|
||||||
|
payload: {
|
||||||
|
msgIDs: Array<string>,
|
||||||
|
bloom: string, // "bloom-filters" specific format TODO: generalize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Data8 extends WithId {
|
||||||
|
phase: 8,
|
||||||
|
payload: Array<string>,
|
||||||
|
}
|
||||||
|
|
||||||
|
interface Data9 extends WithId {
|
||||||
|
phase: 9,
|
||||||
|
payload: Array<Msg>,
|
||||||
|
}
|
||||||
|
|
||||||
|
type Data = {
|
||||||
|
id: string, // TangleID: msg hash of the tangle's root msg
|
||||||
|
phase: 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10,
|
||||||
|
payload: {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
Loading…
Reference in New Issue