account msgs piggyback on tangle msg sync

This commit is contained in:
Andre Staltz 2023-12-16 16:18:23 +02:00
parent 33ef08c62b
commit 6a9f46b337
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
4 changed files with 334 additions and 72 deletions

View File

@ -54,6 +54,9 @@ class Algorithm {
}
/**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer has for
* the given tangle known by the `rootID`.
*
* @param {string} rootID
* @returns {Range}
*/
@ -77,6 +80,9 @@ class Algorithm {
}
/**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given the goal "all" and local and remote have ranges.
*
* @param {Range} localHaveRange
* @param {Range} remoteHaveRange
* @returns {Range}
@ -86,6 +92,10 @@ class Algorithm {
}
/**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given the goal "newest" (alongside with a `count` parameter) and local and
* remote have ranges.
*
* @param {Range} localHaveRange
* @param {Range} remoteHaveRange
* @param {number} count
@ -106,6 +116,9 @@ class Algorithm {
}
/**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given the goal "dict" or "set" and local and remote have ranges.
*
* @param {number} minGhostDepth
* @param {Range} remoteHaveRange
* @returns {Range}
@ -119,6 +132,9 @@ class Algorithm {
}
/**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given a `goal`.
*
* @param {Range} localHave
* @param {Range} remoteHave
* @param {Goal?} goal
@ -154,6 +170,13 @@ class Algorithm {
}
/**
* Returns a bloom filter that represents the msgs that this peer has in the
* database, matching the given tangle `rootID` and `range`. The `round` is
* used to identify bloom filter items from different rounds.
*
* The bloom filter also includes account msgs that are outside the tangle
* `rootID`, but required for validation of tangle `rootID` msgs.
*
* @param {string} rootID
* @param {number} round
* @param {Range} range
@ -165,7 +188,9 @@ class Algorithm {
(isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds)
const filter = BloomFilter.create(2 * filterSize, 0.00001)
if (!isEmptyRange(range)) {
for (const msg of this.yieldMsgsIn(rootID, range)) {
const rangeMsgs = this.getMsgsInRange(rootID, range)
const accountMsgs = this.getAccountMsgsFor(rangeMsgs)
for (const msg of accountMsgs.concat(rangeMsgs)) {
filter.add('' + round + MsgV3.getMsgID(msg))
}
}
@ -182,17 +207,26 @@ class Algorithm {
}
/**
* Returns msg IDs for msgs that are missing in the remote peer's database for
* the tangle `rootID` within `range`, judging by the given `remoteBloomJSON`
* (and `round`) bloom filter.
*
* This may also contain account msgs that are outside the tangle `rootID`,
* but required to validate the msgs in that tangle.
*
* @param {string} rootID
* @param {number} round
* @param {Range} range
* @param {JSON} remoteBloomJSON
* @returns
* @returns {Array<MsgID>}
*/
msgsMissing(rootID, round, range, remoteBloomJSON) {
getMsgsMissing(rootID, round, range, remoteBloomJSON) {
if (isEmptyRange(range)) return []
const remoteFilter = BloomFilter.fromJSON(remoteBloomJSON)
const missing = []
for (const msg of this.yieldMsgsIn(rootID, range)) {
const rangeMsgs = this.getMsgsInRange(rootID, range)
const accountMsgs = this.getAccountMsgsFor(rangeMsgs)
for (const msg of accountMsgs.concat(rangeMsgs)) {
const msgID = MsgV3.getMsgID(msg)
if (!remoteFilter.has('' + round + msgID)) {
missing.push(msgID)
@ -202,27 +236,99 @@ class Algorithm {
}
/**
* @param {string} rootID
* @param {Range} range
* Returns an array of account msgs that are required for validating the given
* `msgs`.
*
* @param {Array<Msg>} msgs
* @returns {Array<Msg>}
*/
*yieldMsgsIn(rootID, range) {
const [minDepth, maxDepth] = range
const rootMsg = this.#peer.db.get(rootID)
if (!rootMsg) return
if (minDepth === 0) yield rootMsg
for (const msg of this.#peer.db.msgs()) {
const tangles = msg.metadata.tangles
if (
tangles[rootID] &&
tangles[rootID].depth >= minDepth &&
tangles[rootID].depth <= maxDepth
) {
yield msg
getAccountMsgsFor(msgs) {
const accountTips = /** @type {Map<MsgID, Set<string>>} */ (new Map())
for (const msg of msgs) {
if (MsgV3.isFeedMsg(msg)) {
const set = accountTips.get(msg.metadata.account) ?? new Set()
for (const tip of msg.metadata.accountTips) {
set.add(tip)
}
accountTips.set(msg.metadata.account, set)
}
}
const accountMsgs = []
for (const [accountID, tips] of accountTips) {
const accountTangle = this.#peer.db.getTangle(accountID)
accountMsgs.push(...accountTangle.slice([], [...tips]))
}
return accountMsgs
}
/**
* Among the given `msgIDs`, find those that are account msgs and return them
* as msgs.
*
* @param {Iterable<MsgID>} msgIDs
* @returns {Array<Msg>}
*/
filterAndFetchAccountMsgs(msgIDs) {
const accountMsgs = []
for (const msgID of msgIDs) {
const msg = this.#peer.db.get(msgID)
if (msg?.metadata.account === 'self') {
accountMsgs.push(msg)
}
}
return accountMsgs
}
/**
* Returns msgs that have a depth within the given `range` for the tangle
* `rootID`.
*
* @param {string} rootID
* @param {Range} range
* @returns {Array<Msg>}
*/
getMsgsInRange(rootID, range) {
const [minDepth, maxDepth] = range
const rootMsg = this.#peer.db.get(rootID)
if (!rootMsg) return []
const msgs = []
if (minDepth === 0) {
msgs.push(rootMsg)
}
const tangle = this.#peer.db.getTangle(rootID)
for (const msg of tangle.slice()) {
const depth = msg.metadata.tangles[rootID]?.depth ?? 0
if (depth >= minDepth && depth <= maxDepth) {
msgs.push(msg)
}
}
return msgs
}
/**
* Given the input msgs (or msg IDs), return those that are part of the tangle
* `rootID`, plus dataless msgs part of a trail to the tangle root, including
* the root itself.
*
* @param {string} rootID
* @param {Set<string> | Array<Msg>} msgs
* @returns {Array<Msg>}
*/
getTangleMsgs(rootID, msgs) {
if (Array.isArray(msgs) && msgs.length === 0) return []
if (!Array.isArray(msgs) && msgs.size === 0) return []
const msgIDs = [...msgs].map((m) =>
typeof m === 'string' ? m : MsgV3.getMsgID(m)
)
const tangle = this.#peer.db.getTangle(rootID)
return tangle.slice(msgIDs, [])
}
/**
* Erase or delete low-depth msgs from the tangle `rootID`, preserving at
* least `count` high-depth msgs.
*
* @param {string} rootID
* @param {number} count
*/
@ -243,7 +349,10 @@ class Algorithm {
}
/**
* Filter out msgs I didn't actually ask for. "Trust but verify"
* Filter out msgs I didn't actually ask for. "Trust but verify". Also sort
* them by depth. Also sorts such that (not-this-tangle) account msgs are
* first.
*
* @param {string} rootID
* @param {Array<Msg>} msgs
* @param {Range} myWantRange
@ -254,6 +363,7 @@ class Algorithm {
const validNewMsgs = msgs
.filter((msg) => {
if (msg.metadata.account === 'self') return true
const depth = msg.metadata.tangles[rootID]?.depth ?? 0
if (depth === 0 && MsgV3.getMsgID(msg) !== rootID) {
return false // the rootMsg is the only acceptable depth-zero msg
@ -265,6 +375,10 @@ class Algorithm {
}
})
.sort((a, b) => {
const aAccount = a.metadata.account
const bAccount = b.metadata.account
if (aAccount === 'self' && bAccount !== 'self') return -1
if (aAccount !== 'self' && bAccount === 'self') return 1
const aDepth = a.metadata.tangles[rootID]?.depth ?? 0
const bDepth = b.metadata.tangles[rootID]?.depth ?? 0
return aDepth - bDepth
@ -274,6 +388,9 @@ class Algorithm {
}
/**
* Takes the new msgs and adds them to the database. Also performs pruning as
* post-processing.
*
* @param {string} rootID
* @param {Array<Msg>} newMsgs
* @param {Goal} goal
@ -287,7 +404,11 @@ class Algorithm {
// Add new messages
for (const msg of validNewMsgs) {
try {
await p(this.#peer.db.add)(msg, rootID) //, doneAdding())
if (msg.metadata.account === 'self') {
await p(this.#peer.db.add)(msg, null /* infer tangleID */)
} else {
await p(this.#peer.db.add)(msg, rootID)
}
} catch (err) {
debug('Commit failed to add msg in db: %o', err)
}
@ -297,42 +418,6 @@ class Algorithm {
return await this.pruneNewest(rootID, goal.count)
}
}
/**
* @param {string} rootID
* @param {Set<string>} msgIDs
* @returns {Array<Msg>}
*/
getTangleSlice(rootID, msgIDs) {
if (msgIDs.size === 0) return []
const tangle = this.#peer.db.getTangle(rootID)
const sorted = tangle.topoSort()
let oldestMsgID = null
for (const msgID of sorted) {
if (msgIDs.has(msgID)) {
oldestMsgID = msgID
break
}
}
if (oldestMsgID === null) {
throw new Error('No common msgID found in tangle given inputs')
}
const { erasables } = tangle.getDeletablesAndErasables(oldestMsgID)
const msgs = []
for (const msgID of sorted) {
let isErasable = erasables.has(msgID)
if (!msgIDs.has(msgID) && !isErasable) continue
const msg = this.#peer.db.get(msgID)
if (!msg) continue
if (isErasable) {
msgs.push({ ...msg, data: null })
} else {
msgs.push(msg)
}
}
return msgs
}
}
module.exports = Algorithm

View File

@ -198,7 +198,7 @@ class SyncStream extends Pipeable {
// prettier-ignore
this.#debug('%s Stream IN3: got remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id)
this.#remoteWant.set(id, remoteWantRange)
const msgIDsForThem = this.#algo.msgsMissing(
const msgIDsForThem = this.#algo.getMsgsMissing(
id,
0,
remoteWantRange,
@ -230,7 +230,7 @@ class SyncStream extends Pipeable {
const remoteWantRange = this.#remoteWant.get(id)
if (!remoteWantRange) throw new Error('remote want-range not set')
this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = this.#algo.msgsMissing(
const msgIDsForThem = this.#algo.getMsgsMissing(
id,
round - 1,
remoteWantRange,
@ -263,7 +263,7 @@ class SyncStream extends Pipeable {
const remoteWantRange = this.#remoteWant.get(id)
if (!remoteWantRange) throw new Error('remote want-range not set')
this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = this.#algo.msgsMissing(
const msgIDsForThem = this.#algo.getMsgsMissing(
id,
round,
remoteWantRange,
@ -295,7 +295,7 @@ class SyncStream extends Pipeable {
const remoteWantRange = this.#remoteWant.get(id)
if (!remoteWantRange) throw new Error('remote want-range not set')
this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = this.#algo.msgsMissing(
const msgIDsForThem = this.#algo.getMsgsMissing(
id,
round,
remoteWantRange,
@ -303,7 +303,9 @@ class SyncStream extends Pipeable {
)
this.#updateSendableMsgs(id, msgIDsForThem)
const msgIDs = this.#sendableMsgs.get(id) ?? new Set()
const msgs = this.#algo.getTangleSlice(id, msgIDs)
const tangleMsgs = this.#algo.getTangleMsgs(id, msgIDs)
const accountMsgs = this.#algo.filterAndFetchAccountMsgs(msgIDs)
const msgs = accountMsgs.concat(tangleMsgs)
const extras = this.#receivableMsgs.get(id)
const localWantRange = this.#localWant.get(id)
if (!localWantRange) throw new Error('local want-range not set')
@ -328,7 +330,7 @@ class SyncStream extends Pipeable {
this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id)
const remoteWantRange = this.#remoteWant.get(id)
if (!remoteWantRange) throw new Error('remote want-range not set')
const msgIDsForThem = this.#algo.msgsMissing(
const msgIDsForThem = this.#algo.getMsgsMissing(
id,
round,
remoteWantRange,
@ -336,7 +338,9 @@ class SyncStream extends Pipeable {
)
this.#updateSendableMsgs(id, msgIDsForThem)
const msgIDs = this.#sendableMsgs.get(id) ?? new Set()
const msgs = this.#algo.getTangleSlice(id, msgIDs)
const tangleMsgs = this.#algo.getTangleMsgs(id, msgIDs)
const accountMsgs = this.#algo.filterAndFetchAccountMsgs(msgIDs)
const msgs = accountMsgs.concat(tangleMsgs)
// prettier-ignore
this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id)
this.sink.write({ id, phase: 9, payload: msgs })
@ -397,9 +401,11 @@ class SyncStream extends Pipeable {
*/
#sendMsgsInRemoteWant(id, remoteWantRange) {
const msgs = []
for (const msg of this.#algo.yieldMsgsIn(id, remoteWantRange)) {
msgs.push(msg)
}
const rangeMsgs = this.#algo.getMsgsInRange(id, remoteWantRange)
const tangleMsgs = this.#algo.getTangleMsgs(id, rangeMsgs)
const accountMsgs = this.#algo.getAccountMsgsFor(tangleMsgs)
for (const msg of accountMsgs) msgs.push(msg)
for (const msg of tangleMsgs) msgs.push(msg)
// prettier-ignore
this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id)
this.sink.write({ id, phase: 9, payload: msgs })

View File

@ -1,7 +1,11 @@
The bloom filter is a representation of msgs I already have in my want-range,
so you know you can (probably?) skip sending them to me.
For each given tangle, peers exchange ranges (tuples `[minDepth, maxDepth]`)
"haveRange" and "wantRange". Then each peer creates a bloom filter representing
the msgs they currently have in their wantRange, and these such bloom filters.
Based on the remote peer's bloom filter, they exchange msgs that appear to be
missing. The bloom filter is a representation of msgs I already have in my want-range,
so you know you can (probably?) skip sending them to me. The "probably?" uncertainty is reduced by doing several rounds of such exchange. In the end, each peer knows with high certainty which msgs the other peer is missing in their declared want-range, and thus exchange such msgs.
The "probably?" uncertainty is reduced by doing several rounds.
In the process, associated account msgs are included even though the tangle being replicated might not be an account tangle. This is because validation of a received tangle msg may require the account msgs.
```mermaid
@ -54,8 +58,9 @@ Note over A: commit(aliceMsgs)
Note over A: bobMiss2 := msgsMissing(T, 2, bobWant, bobBF2)
Note over A: bobMiss := bobMiss0 + bobMiss1 + bobMiss2
Note over A: bobMsgs := tangleSlice(T, bobMiss)
A->>B: Phase 9: Send T and bobMsgs
Note over B: commit(bobMsgs)
Note over A: msgs := bobMsgs + associatedAccountMsgs(bobMsgs)
A->>B: Phase 9: Send T and msgs
Note over B: commit(msgs)
```
Methods:
@ -74,10 +79,19 @@ getHaveRange(tangleID) -> [minDepth, maxDepth]
getWantRange(localHaveRange, remoteHaveRange, goal) -> [minDepth, maxDepth]
```
```
/**
* For each `msg` in `msgs`, pick the set of msgs from the tangle `msg.metadata.account` (up to `msg.metadata.accountTips`), then combine together all these subsets.
* Returns all such account msgs.
*/
associatedAccountMsgs(msgs)
```
```
/**
* Creates a serialized bloom filter containing the identifiers `${round}${msgID}` for:
* - Each msg in the tangle `tangleID` within depth `range` (inclusive)
* - Each msg in associatedAccountMsgs(tangle msgs above)
* - Each "ghost" msg ID for this tangle
* - Each "extra" msg ID from `extraMsgIDs`
*/
@ -89,6 +103,7 @@ bloomFor(tangleId, round, range, extraMsgIDs) -> Bloom
* Returns the msg IDs in the tangle `tangleID` which satisfy:
* - `msg.metadata.tangles[tangleID].depth` within `range` (inclusive)
* - `${round}${msgID}` not in `bloom`
* Plus msg IDs of associatedAccountMsgs(tangle msgs above)
*/
msgsMissing(tangleID, round, range, bloom) -> Array<MsgID>
```

View File

@ -6,6 +6,162 @@ const Algorithm = require('../lib/algorithm')
const { createPeer } = require('./util')
const carolKeypair = Keypair.generate('ed25519', 'carol')
const bobKeypair2 = Keypair.generate('ed25519', 'bob2')
test('sync a feed without pre-knowing the owner account', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
for (let i = 1; i <= 5; i++) {
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm' + i },
})
}
assert('bob published posts 1..5')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = [...alice.db.msgs()]
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = [...alice.db.msgs()]
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m1', 'm2', 'm3', 'm4', 'm5'],
'alice has posts 1..5 from bob'
)
}
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('sync a feed with updated msgs from new account keypair', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
for (let i = 1; i <= 5; i++) {
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm' + i },
})
}
assert('bob published posts 1..5')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = [...alice.db.msgs()]
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = [...alice.db.msgs()]
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m1', 'm2', 'm3', 'm4', 'm5'],
'alice has posts 1..5 from bob'
)
}
await p(remoteAlice.close)(true)
// --------------------------------------------
// Bob adds a new keypair and published with it
// --------------------------------------------
const consent = bob.db.account.consent({
account: bobID,
keypair: bobKeypair2,
})
await p(bob.db.account.add)({
account: bobID,
keypair: bobKeypair2,
consent,
powers: [],
})
for (let i = 6; i <= 7; i++) {
await p(bob.db.feed.publish)({
account: bobID,
keypair: bobKeypair2,
domain: 'post',
data: { text: 'm' + i },
})
}
assert('bob with new keypair published posts 6..7')
const remoteAlice2 = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = [...alice.db.msgs()]
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'],
'alice has posts 1..7 from bob'
)
}
await p(remoteAlice2.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('sync a feed with goal=all', async (t) => {
const alice = createPeer({ name: 'alice' })