Compare commits

...

72 Commits
rev1 ... master

Author SHA1 Message Date
Powersource 89401de0c6 Merge pull request 'Add test for low newest goals' (#14) from low-newest into master
Reviewed-on: https://codeberg.org/pzp/pzp-sync/pulls/14
2024-06-28 11:04:37 +00:00
Jacob Karlsson 39f8cca208 Remove log 2024-06-28 13:02:38 +02:00
Jacob Karlsson cb557ef15a Add test for low newest goals 2024-06-28 12:54:28 +02:00
Jacob Karlsson 779d7aab31 1.0.4 2024-06-09 17:43:04 +02:00
Jacob Karlsson 8f2bd08a60 Don't throw on missing local want range 2024-06-09 17:42:24 +02:00
Jacob Karlsson a517fd465c 1.0.3 2024-06-05 17:30:11 +02:00
Jacob Karlsson 2b74aa6b1a Fix not checking null on dbtangle call 2024-06-05 17:29:54 +02:00
Jacob Karlsson b851d3eaf1 1.0.2 2024-05-29 17:29:21 +02:00
Powersource dbb4bb28e6 Merge pull request 'Handle missing remoteWants without throwing' (#9) from missing-remotewant into master
Reviewed-on: https://codeberg.org/pzp/pzp-sync/pulls/9
2024-05-29 15:27:51 +00:00
Jacob Karlsson 5797116a87 Handle missing remoteWants without throwing 2024-05-29 17:17:47 +02:00
Jacob Karlsson c6d142dd0c 1.0.1 2024-05-29 15:13:06 +02:00
Powersource 5410d7a87c Merge pull request 'Add realtime sync test with 100 msgs' (#6) from 100-realtime into master
Reviewed-on: https://codeberg.org/pzp/pzp-sync/pulls/6
2024-05-29 13:11:30 +00:00
Jacob Karlsson 87b6b25685 Fix failing realtime test 2024-05-29 14:56:20 +02:00
Jacob Karlsson 636cd9adcc Add failing test where we don't create a message before realtime sync 2024-05-29 14:48:40 +02:00
Jacob Karlsson e199eb97d3 Fix realtime 100 test with a hack 2024-05-28 15:38:30 +02:00
Jacob Karlsson cbf12a5e8c Start adding realtime test with 100 msgs 2024-05-23 18:46:36 +02:00
Jacob Karlsson c1a5b0a9e2 1.0.0 2024-05-05 17:27:07 +02:00
Powersource 661d8d8b0b Merge pull request 'Use async db fns and rename to pzp' (#2) from async-db into master
Reviewed-on: https://codeberg.org/pzp/pzp-sync/pulls/2
2024-05-05 15:21:18 +00:00
Jacob Karlsson 358b673c5a Add codeberg ci 2024-05-05 16:48:00 +02:00
Jacob Karlsson 6a8ed87f6f Remove todo comment in favor of #3 2024-05-04 18:54:32 +02:00
Jacob Karlsson 7e56b024c9 Use async db fns and rename to pzp 2024-05-04 15:54:04 +02:00
Powersource 5fef427ebb
Validate all incoming payloads in Stream (#6) 2024-03-13 13:04:45 +01:00
Andre Staltz 93f00dbd04
Test that dataless msgs can be replaced by dataful 2024-03-05 17:07:33 +02:00
Andre Staltz 47eb2dd27f
realtime sync works in both connection directions 2024-02-29 11:30:39 +02:00
Andre Staltz 54aa67a08f
realtime sync 2024-02-22 16:37:28 +02:00
Andre Staltz e9971410eb
Improve error messages in Stream 2024-02-13 15:36:57 +02:00
Andre Staltz 1d3169724d
package.json ppppp deps specify commit tag 2024-01-31 13:49:18 +02:00
Andre Staltz 0bc100557e
treat cases where getTangle() is null 2024-01-30 17:50:09 +02:00
Andre Staltz 2a4f39a832
minor refactor 2024-01-23 18:24:38 +02:00
Andre Staltz cdbc2de15e
add tests for empty dict and set 2024-01-16 16:17:15 +02:00
Andre Staltz 6e54ca2724
close connect() stream on rpc closed 2024-01-14 13:09:44 +02:00
Andre Staltz ae3d1d83d8
update secret-stack to 8.1 2024-01-08 11:33:45 +02:00
Andre Staltz 26dcefc459
update plugin assertion errors 2024-01-02 13:13:27 +02:00
Andre Staltz 0cd2a56fc8
update secret-stack to 8.0 2023-12-29 13:00:45 +02:00
Andre Staltz d7a5c2694f
update to msg-v4 2023-12-25 12:31:55 +02:00
Andre Staltz dd8499cf91
handle "none" goals 2023-12-19 16:29:42 +02:00
Andre Staltz f4ab599bd1
support start() before peers connect 2023-12-16 16:55:45 +02:00
Andre Staltz 6a9f46b337
account msgs piggyback on tangle msg sync 2023-12-16 16:18:23 +02:00
Andre Staltz 33ef08c62b
update protospec 2023-12-14 14:00:39 +02:00
Andre Staltz 6ba8d55dde
fix stream phase 9 again 2023-12-13 16:15:29 +02:00
Andre Staltz d1ccc3426e
fix stream phase 9 when no msgs received 2023-12-13 16:08:42 +02:00
Andre Staltz 782b435a98
better secret-stack plugin shape 2023-12-13 15:14:36 +02:00
Andre Staltz 0ae7704994
update shse 2023-11-28 10:09:09 +02:00
Andre Staltz fd559349f2
ci: use node.js 18 and 20 2023-11-28 10:08:17 +02:00
Andre Staltz 4b11ab9d9c
fix tests 2023-11-28 10:08:05 +02:00
Andre Staltz 6aeaf896de
update readme 2023-11-24 15:09:26 +02:00
Andre Staltz e7bc0e1918
update db, rename to ppppp-sync 2023-11-24 15:08:48 +02:00
Andre Staltz ecd604a46f
support goal=dict and goal=set 2023-11-03 13:34:02 +02:00
Andre Staltz 4c29073028
replace ppppp-record with ppppp-dict 2023-10-26 13:27:23 +03:00
Andre Staltz 0bc405739e
update a test 2023-10-26 12:55:27 +03:00
Andre Staltz 4218dabb60
can replicate record tangles containing ghosts 2023-10-25 19:35:27 +03:00
Andre Staltz 31ed9d0036
cosmetic refactor 2023-10-25 15:33:44 +03:00
Andre Staltz 574d43f1a6
fix algorithm wantRange plus test it 2023-10-25 14:13:15 +03:00
Andre Staltz 20be886872
remove goal "oldest" 2023-10-12 15:31:41 +03:00
Andre Staltz 4637a24960
fix calculation of wantRange for newest goal
We shouldn't use maxLocalHave+1 because we are interested in
EVERYTHING the remote peer has, as long as it is inside the "size"
defined for this goal. Bloom filters will then make sure we are
not getting what we already have in this range.
2023-10-12 13:13:52 +03:00
Andre Staltz eff0c57517
update goals API, add type checking 2023-09-29 14:48:54 +03:00
Andre Staltz 30b5e7cf8a
remove unused dep 2023-09-29 12:18:50 +03:00
Andre Staltz 8e43c0efab
replace internal goals with ppppp-goals 2023-09-08 16:42:27 +03:00
Andre Staltz c4bd7f9b49
improve test of feed sync when pruning range hole 2023-09-08 13:47:21 +03:00
Andre Staltz cfb186c0fe
refactor algorithm commit() 2023-09-08 13:46:54 +03:00
Andre Staltz fc30b3e2d2
refactor range 2023-09-08 13:46:18 +03:00
Andre Staltz 317f7c0a05
update to new ppppp-db with rec.id 2023-09-07 17:31:11 +03:00
Andre Staltz f6f16bdd47
rename identity to account 2023-08-10 10:40:11 +03:00
Andre Staltz ffb8860fb8
use cutting-edge secret-stack with bare and shse.pubkey 2023-07-14 13:35:38 +03:00
Andre Staltz 403fcefc3a
improve logic when an RPC connect is detected 2023-07-13 11:15:16 +03:00
Andre Staltz af5ac3435d
replace tape with node:test 2023-07-11 11:08:53 +03:00
Andre Staltz 07f83677d4
update to msg-v3 format 2023-07-11 11:02:22 +03:00
Andre Staltz 17b720cb60
use ppppp-db latest 2023-06-05 11:31:44 +03:00
Andre Staltz 7a84f8b51e
tests for identity tangle sync 2023-05-26 14:46:08 +03:00
Andre Staltz 1a2d73cf8d
use feed.getId() in tests 2023-05-26 14:45:42 +03:00
Andre Staltz 82fead6c2d
update tests to not directly use MsgV2 APIs 2023-05-26 14:14:11 +03:00
Andre Staltz 32c5b903b1
update to support msg-v2 2023-05-26 13:54:32 +03:00
21 changed files with 2610 additions and 766 deletions

View File

@ -1,25 +0,0 @@
name: CI
on:
push:
branches: [master]
pull_request:
branches: [master]
jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
matrix:
node-version: [16.x, 18.x]
steps:
- uses: actions/checkout@v3
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
- run: npm install
- run: npm test

1
.gitignore vendored
View File

@ -4,6 +4,7 @@ pnpm-lock.yaml
package-lock.json package-lock.json
coverage coverage
*~ *~
lib/*.d.ts
# For misc scripts and experiments: # For misc scripts and experiments:
/gitignored /gitignored

13
.woodpecker.yaml Normal file
View File

@ -0,0 +1,13 @@
matrix:
NODE_VERSION:
- 18
- 20
steps:
test:
when:
event: [push]
image: node:${NODE_VERSION}
commands:
- npm install
- npm test

View File

@ -1,9 +1,13 @@
**Work in progress** # pzp-sync
PZP replication using Kleppmann's hash graph sync
https://martin.kleppmann.com/2020/12/02/bloom-filter-hash-graph-sync.html
https://arxiv.org/abs/2012.00472
## Installation ## Installation
We're not on npm yet. In your package.json, include this as ```
npm install pzp-sync
```js
"ppppp-tangle-sync": "github:staltz/ppppp-tangle-sync"
``` ```

View File

@ -1,17 +1,24 @@
const p = require('promisify-4loc')
const { BloomFilter } = require('bloom-filters') const { BloomFilter } = require('bloom-filters')
const FeedV1 = require('ppppp-db/feed-v1') const MsgV4 = require('pzp-db/msg-v4')
const p = require('util').promisify const makeDebug = require('debug')
const { isEmptyRange, estimateMsgCount } = require('./range') const debug = makeDebug('pzp:sync')
const { parseGoal } = require('./goal') const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range')
/** /**
* @typedef {ReturnType<import('pzp-db').init>} PZPDB
* @typedef {ReturnType<import('pzp-dict').init>} PZPDict
* @typedef {ReturnType<import('pzp-set').init>} PZPSet
* @typedef {import('pzp-db/msg-v4').Msg} Msg
* @typedef {import('pzp-db/db-tangle')} DBTangle
* @typedef {import('pzp-goals').Goal} Goal
* @typedef {import('./range').Range} Range * @typedef {import('./range').Range} Range
* @typedef {string} MsgID
*/ */
/** /**
* @typedef {import('./goal').Goal} Goal * @param {Iterable<unknown>} iter
*/ */
function countIter(iter) { function countIter(iter) {
let count = 0 let count = 0
for (const _ of iter) count++ for (const _ of iter) count++
@ -19,24 +26,33 @@ function countIter(iter) {
} }
class Algorithm { class Algorithm {
/** @type {ConstructorParameters<typeof Algorithm>[0]} */
#peer #peer
/** @param {{ db: PZPDB, dict: PZPDict, set: PZPSet }} peer */
constructor(peer) { constructor(peer) {
this.#peer = peer this.#peer = peer
} }
haveRange(rootMsgHash) { /**
const rootMsg = this.#peer.db.get(rootMsgHash) * Calculates the range ([minDepth, maxDepth]) of msgs that the peer has for
if (!rootMsg) return [1, 0] * the given tangle known by the `rootID`.
*
* @param {string} rootID
* @returns {Promise<Range>}
*/
async haveRange(rootID) {
const rootMsg = await p(this.#peer.db.get)(rootID)
if (!rootMsg) return EMPTY_RANGE
let minDepth = Number.MAX_SAFE_INTEGER let minDepth = Number.MAX_SAFE_INTEGER
let maxDepth = 0 let maxDepth = 0
for (const rec of this.#peer.db.records()) { for await (const rec of this.#peer.db.records()) {
if (!rec.msg?.content) continue if (!rec?.msg?.data && rec.id !== rootID) continue
const tangles = rec.msg.metadata.tangles const tangles = rec.msg.metadata.tangles
if (rec.hash === rootMsgHash) { if (rec.id === rootID) {
minDepth = 0 minDepth = 0
} else if (tangles[rootMsgHash]) { } else if (tangles[rootID]) {
const depth = tangles[rootMsgHash].depth const depth = tangles[rootID].depth
minDepth = Math.min(minDepth, depth) minDepth = Math.min(minDepth, depth)
maxDepth = Math.max(maxDepth, depth) maxDepth = Math.max(maxDepth, depth)
} }
@ -45,6 +61,9 @@ class Algorithm {
} }
/** /**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given the goal "all" and local and remote have ranges.
*
* @param {Range} localHaveRange * @param {Range} localHaveRange
* @param {Range} remoteHaveRange * @param {Range} remoteHaveRange
* @returns {Range} * @returns {Range}
@ -54,6 +73,10 @@ class Algorithm {
} }
/** /**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given the goal "newest" (alongside with a `count` parameter) and local and
* remote have ranges.
*
* @param {Range} localHaveRange * @param {Range} localHaveRange
* @param {Range} remoteHaveRange * @param {Range} remoteHaveRange
* @param {number} count * @param {number} count
@ -62,173 +85,329 @@ class Algorithm {
#wantNewestRange(localHaveRange, remoteHaveRange, count) { #wantNewestRange(localHaveRange, remoteHaveRange, count) {
const [minLocalHave, maxLocalHave] = localHaveRange const [minLocalHave, maxLocalHave] = localHaveRange
const [minRemoteHave, maxRemoteHave] = remoteHaveRange const [minRemoteHave, maxRemoteHave] = remoteHaveRange
if (maxRemoteHave <= maxLocalHave) return [1, 0] if (maxRemoteHave < minLocalHave) return EMPTY_RANGE
const maxWant = maxRemoteHave const maxWant = maxRemoteHave
const size = Math.max(maxWant - maxLocalHave, count) const size = count > maxWant - maxLocalHave ? count : maxWant - maxLocalHave
const minWant = Math.max(maxWant - size, maxLocalHave + 1, minRemoteHave) const minWant = Math.max(
maxWant - size + 1,
maxLocalHave - size + 1,
minRemoteHave
)
return [minWant, maxWant] return [minWant, maxWant]
} }
/** /**
* @param {Range} localHaveRange * Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given the goal "dict" or "set" and local and remote have ranges.
*
* @param {number} minGhostDepth
* @param {Range} remoteHaveRange * @param {Range} remoteHaveRange
* @param {number} count
* @returns {Range} * @returns {Range}
*/ */
#wantOldestRange(localHaveRange, remoteHaveRange, count) { #wantDictOrSetRange(minGhostDepth, remoteHaveRange) {
// TODO: implement const [minRemoteHave, maxRemoteHave] = remoteHaveRange
throw new Error('not implemented') if (maxRemoteHave < minGhostDepth) return EMPTY_RANGE
const maxWant = maxRemoteHave
const minWant = Math.max(minGhostDepth, minRemoteHave)
return [minWant, maxWant]
} }
/** /**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given a `goal`.
*
* @param {Range} localHave * @param {Range} localHave
* @param {Range} remoteHave * @param {Range} remoteHave
* @param {Goal?} goal * @param {Goal?} goal
* @returns {Range} * @returns {Promise<Range>}
*/ */
wantRange(localHave, remoteHave, goal) { async wantRange(localHave, remoteHave, goal) {
if (!goal) return [1, 0] if (!goal) return EMPTY_RANGE
if (isEmptyRange(remoteHave)) return [1, 0] if (isEmptyRange(remoteHave)) return EMPTY_RANGE
const { type, count } = parseGoal(goal)
if (type === 'all') { switch (goal.type) {
return this.#wantAllRange(localHave, remoteHave) case 'all':
} else if (type === 'newest') { return this.#wantAllRange(localHave, remoteHave)
return this.#wantNewestRange(localHave, remoteHave, count)
} else if (type === 'oldest') { case 'dict':
return this.#wantOldestRange(localHave, remoteHave, count) const minDictGhostDepth = await p(this.#peer.dict.minGhostDepth)(goal.id)
return this.#wantDictOrSetRange(minDictGhostDepth, remoteHave)
case 'set':
const minSetGhostDepth = await p(this.#peer.set.minGhostDepth)(goal.id)
return this.#wantDictOrSetRange(minSetGhostDepth, remoteHave)
case 'newest':
return this.#wantNewestRange(localHave, remoteHave, goal.count)
case 'none':
return EMPTY_RANGE
default:
throw new Error(`Unrecognized goal type: ${goal.type}`)
} }
} }
bloomFor(rootMsgHash, round, range, extraIds = []) { /**
* Returns a bloom filter that represents the msgs that this peer has in the
* database, matching the given tangle `rootID` and `range`. The `round` is
* used to identify bloom filter items from different rounds.
*
* The bloom filter also includes account msgs that are outside the tangle
* `rootID`, but required for validation of tangle `rootID` msgs.
*
* @param {string} rootID
* @param {number} round
* @param {Range} range
* @param {Iterable<string>} extraIds
* @returns {Promise<JSON>}
*/
async bloomFor(rootID, round, range, extraIds = []) {
const filterSize = const filterSize =
(isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds) (isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds)
const filter = BloomFilter.create(2 * filterSize, 0.00001) const filter = BloomFilter.create(2 * filterSize, 0.00001)
if (!isEmptyRange(range)) { if (!isEmptyRange(range)) {
for (const msg of this.yieldMsgsIn(rootMsgHash, range)) { const rangeMsgs = await this.getMsgsInRange(rootID, range)
filter.add('' + round + FeedV1.getMsgHash(msg)) const accountMsgs = await this.getAccountMsgsFor(rangeMsgs)
for (const msg of accountMsgs.concat(rangeMsgs)) {
filter.add('' + round + MsgV4.getMsgID(msg))
} }
} }
for (const msgId of extraIds) { const ghosts = this.#peer.db.ghosts.get(rootID)
filter.add('' + round + msgId) for (const ghostMsgID of ghosts) {
// No need to check depths because the `range` is by definition taking
// into account local ghost depths
filter.add('' + round + ghostMsgID)
}
for (const msgID of extraIds) {
filter.add('' + round + msgID)
} }
return filter.saveAsJSON() return filter.saveAsJSON()
} }
msgsMissing(rootMsgHash, round, range, remoteBloomJSON) { /**
* Returns msg IDs for msgs that are missing in the remote peer's database for
* the tangle `rootID` within `range`, judging by the given `remoteBloomJSON`
* (and `round`) bloom filter.
*
* This may also contain account msgs that are outside the tangle `rootID`,
* but required to validate the msgs in that tangle.
*
* @param {string} rootID
* @param {number} round
* @param {Range} range
* @param {JSON} remoteBloomJSON
* @returns {Promise<Array<MsgID>>}
*/
async getMsgsMissing(rootID, round, range, remoteBloomJSON) {
if (isEmptyRange(range)) return [] if (isEmptyRange(range)) return []
const remoteFilter = BloomFilter.fromJSON(remoteBloomJSON) const remoteFilter = BloomFilter.fromJSON(remoteBloomJSON)
const missing = [] const missing = []
for (const msg of this.yieldMsgsIn(rootMsgHash, range)) { const rangeMsgs = await this.getMsgsInRange(rootID, range)
const msgHash = FeedV1.getMsgHash(msg) const accountMsgs = await this.getAccountMsgsFor(rangeMsgs)
if (!remoteFilter.has('' + round + msgHash)) { for (const msg of accountMsgs.concat(rangeMsgs)) {
missing.push(msgHash) const msgID = MsgV4.getMsgID(msg)
if (!remoteFilter.has('' + round + msgID)) {
missing.push(msgID)
} }
} }
return missing return missing
} }
*yieldMsgsIn(rootMsgHash, range) { /**
const [minDepth, maxDepth] = range * Returns an array of account msgs that are required for validating the given
const rootMsg = this.#peer.db.get(rootMsgHash) * `msgs`.
if (!rootMsg) return *
if (minDepth === 0) yield rootMsg * @param {Array<Msg>} msgs
for (const msg of this.#peer.db.msgs()) { * @returns {Promise<Array<Msg>>}
const tangles = msg.metadata.tangles */
if ( async getAccountMsgsFor(msgs) {
tangles[rootMsgHash] && const accountTips = /** @type {Map<MsgID, Set<string>>} */ (new Map())
tangles[rootMsgHash].depth >= minDepth && for (const msg of msgs) {
tangles[rootMsgHash].depth <= maxDepth if (MsgV4.isFeedMsg(msg)) {
) { const set = accountTips.get(msg.metadata.account) ?? new Set()
yield msg for (const tip of msg.metadata.accountTips) {
set.add(tip)
}
accountTips.set(msg.metadata.account, set)
} }
} }
}
async pruneNewest(rootMsgHash, count) { const accountMsgs = []
const tangle = this.#peer.db.getTangle(rootMsgHash) for (const [accountID, tips] of accountTips) {
const sorted = tangle.topoSort() const accountTangle = await p(this.#peer.db.getTangle)(accountID)
if (sorted.length <= count) return if (!accountTangle) continue
const msgHash = sorted[sorted.length - count] accountMsgs.push(...(await accountTangle.slice([], [...tips])))
const { deletables, erasables } = tangle.getDeletablesAndErasables(msgHash)
const del = p(this.#peer.db.del)
const erase = p(this.#peer.db.erase)
for (const msgHash of deletables) {
await del(msgHash)
} }
for (const msgHash of erasables) { return accountMsgs
await erase(msgHash)
}
}
async commit(rootMsgHash, newMsgs, goal, myWantRange) {
// Filter out contentful newMsgs that are not in my want-range
const [minWant, maxWant] = myWantRange
const validNewMsgs = newMsgs
.filter((msg) => {
if (!msg.content) return true // contentless messages are always valid
const depth = msg.metadata.tangles[rootMsgHash]?.depth ?? 0
if (depth === 0 && FeedV1.getMsgHash(msg) !== rootMsgHash) {
return false // the rootMsg is the only acceptable depth-zero msg
}
return minWant <= depth && depth <= maxWant
})
.sort((a, b) => {
const aDepth = a.metadata.tangles[rootMsgHash]?.depth ?? 0
const bDepth = b.metadata.tangles[rootMsgHash]?.depth ?? 0
return aDepth - bDepth
})
// Simulate adding this whole tangle, and check if it's valid
let err
if ((err = this.#peer.db.validateTangle(rootMsgHash, validNewMsgs))) {
throw err
}
// Add new messages TODO: optimize perf, avoiding await / try / catch
for (const msg of newMsgs) {
try {
await p(this.#peer.db.add)(msg, rootMsgHash)
} catch {}
}
// Prune. Ideally this should be in a garbage collection module
const { type, count } = parseGoal(goal)
if (type === 'newest') return await this.pruneNewest(rootMsgHash, count)
if (type === 'oldest') throw new Error('not implemented') // TODO:
} }
/** /**
* @param {string} rootMsgHash * Among the given `msgIDs`, find those that are account msgs and return them
* @param {Set<string>} msgHashes * as msgs.
* @returns *
* @param {Iterable<MsgID>} msgIDs
* @returns {Promise<Array<Msg>>}
*/ */
getTangleSlice(rootMsgHash, msgHashes) { async filterAndFetchAccountMsgs(msgIDs) {
if (msgHashes.size === 0) return [] const accountMsgs = []
const tangle = this.#peer.db.getTangle(rootMsgHash) for (const msgID of msgIDs) {
const sorted = tangle.topoSort() const msg = await p(this.#peer.db.get)(msgID)
let oldestMsgHash = null if (msg?.metadata.account === 'self') {
for (const msgHash of sorted) { accountMsgs.push(msg)
if (msgHashes.has(msgHash)) {
oldestMsgHash = msgHash
break
} }
} }
const { erasables } = tangle.getDeletablesAndErasables(oldestMsgHash) return accountMsgs
}
/**
* Returns msgs that have a depth within the given `range` for the tangle
* `rootID`.
*
* @param {string} rootID
* @param {Range} range
* @returns {Promise<Array<Msg>>}
*/
async getMsgsInRange(rootID, range) {
const [minDepth, maxDepth] = range
const rootMsg = await p(this.#peer.db.get)(rootID)
if (!rootMsg) return []
const msgs = [] const msgs = []
for (const msgHash of sorted) { if (minDepth === 0) {
let isErasable = erasables.includes(msgHash) msgs.push(rootMsg)
if (!msgHashes.has(msgHash) && !isErasable) continue }
const msg = this.#peer.db.get(msgHash) const tangle = await p(this.#peer.db.getTangle)(rootID)
if (!msg) continue if (!tangle) return msgs
if (isErasable) { for (const msg of await tangle.slice()) {
msgs.push({ ...msg, content: null }) const depth = msg.metadata.tangles[rootID]?.depth ?? 0
} else { if (depth >= minDepth && depth <= maxDepth) {
msgs.push(msg) msgs.push(msg)
} }
} }
return msgs return msgs
} }
/**
* Given the input msgs (or msg IDs), return those that are part of the tangle
* `rootID`, plus dataless msgs part of a trail to the tangle root, including
* the root itself.
*
* @param {string} rootID
* @param {Set<string> | Array<Msg>} msgs
* @returns {Promise<Array<Msg>>}
*/
async getTangleMsgs(rootID, msgs) {
if (Array.isArray(msgs) && msgs.length === 0) return []
if (!Array.isArray(msgs) && msgs.size === 0) return []
const msgIDs = [...msgs].map((m) =>
typeof m === 'string' ? m : MsgV4.getMsgID(m)
)
const tangle = await p(this.#peer.db.getTangle)(rootID)
if (!tangle) return []
return tangle.slice(msgIDs, [])
}
/**
* Erase or delete low-depth msgs from the tangle `rootID`, preserving at
* least `count` high-depth msgs.
*
* @param {string} rootID
* @param {number} count
*/
async pruneNewest(rootID, count) {
/** @type {DBTangle | null} */
const tangle = await p(this.#peer.db.getTangle)(rootID)
if (!tangle) return
const sorted = tangle.topoSort()
if (sorted.length <= count) return
const msgID = sorted[sorted.length - count] // New "oldest dataful msg"
const deletablesErasables = tangle.getDeletablesAndErasables(msgID)
if (!deletablesErasables) return
const { deletables, erasables } = deletablesErasables
const del = p(this.#peer.db.del)
const erase = p(this.#peer.db.erase)
for (const msgID of deletables) {
await del(msgID)
}
for (const msgID of erasables) {
await erase(msgID)
}
}
/**
* Filter out msgs I didn't actually ask for. "Trust but verify". Also sort
* them by depth. Also sorts such that (not-this-tangle) account msgs are
* first.
*
* @param {string} rootID
* @param {Array<Msg>} msgs
* @param {Range} myWantRange
* @returns {Array<Msg>}
*/
filterReceivedMsgs(rootID, msgs, myWantRange) {
const [minWant, maxWant] = myWantRange
const validNewMsgs = msgs
.filter((msg) => {
if (msg.metadata.account === 'self') return true
const depth = msg.metadata.tangles[rootID]?.depth ?? 0
if (depth === 0 && MsgV4.getMsgID(msg) !== rootID) {
return false // the rootMsg is the only acceptable depth-zero msg
}
if (!msg.data) {
return true
} else {
return minWant <= depth
}
})
.sort((a, b) => {
const aAccount = a.metadata.account
const bAccount = b.metadata.account
if (aAccount === 'self' && bAccount !== 'self') return -1
if (aAccount !== 'self' && bAccount === 'self') return 1
const aDepth = a.metadata.tangles[rootID]?.depth ?? 0
const bDepth = b.metadata.tangles[rootID]?.depth ?? 0
return aDepth - bDepth
})
return validNewMsgs
}
/**
* @param {Array<Msg>} msgs
*/
getMsgIDs(msgs) {
return msgs.map((msg) => MsgV4.getMsgID(msg))
}
/**
* Takes the new msgs and adds them to the database. Also performs pruning as
* post-processing.
*
* @param {string} rootID
* @param {Array<Msg>} newMsgs
* @param {Goal} goal
*/
async commit(rootID, newMsgs, goal) {
// TODO: Simulate adding this whole tangle, and check if it's valid
// Add new messages
for (const msg of newMsgs) {
try {
if (msg.metadata.account === 'self') {
await p(this.#peer.db.add)(msg, null /* infer tangleID */)
} else {
await p(this.#peer.db.add)(msg, rootID)
}
} catch (err) {
debug('Commit failed to add msg in db: %o', err)
}
}
if (goal.type === 'newest') {
return await this.pruneNewest(rootID, goal.count)
}
}
} }
module.exports = Algorithm module.exports = Algorithm

View File

@ -1,53 +0,0 @@
/**
* @typedef {'all'} GoalAll
*/
/**
* @typedef {`newest-${number}`} GoalNewest
*/
/**
* @typedef {`oldest-${number}`} GoalOldest
*/
/**
* @typedef {GoalAll|GoalNewest|GoalOldest} Goal
*/
/**
* @typedef {{type: 'all'; count: never}} ParsedAll
*/
/**
* @typedef {{type: 'newest' |'oldest'; count: number}} ParsedLimited
*/
/**
* @typedef {ParsedAll | ParsedLimited} ParsedGoal
*/
/**
* @param {Goal} goal
* @returns {ParsedGoal}
*/
function parseGoal(goal) {
if (goal === 'all') {
return { type: 'all' }
}
const matchN = goal.match(/^newest-(\d+)$/)
if (matchN) {
return { type: 'newest', count: Number(matchN[1]) }
}
const matchO = goal.match(/^oldest-(\d+)$/)
if (matchO) {
return { type: 'oldest', count: Number(matchO[1]) }
}
throw new Error(`Invalid goal: ${goal}`)
}
module.exports = {
parseGoal,
}

View File

@ -1,96 +1,123 @@
const toPull = require('push-stream-to-pull-stream') // @ts-ignore
const pull = require('pull-stream') const toPull = require('push-stream-to-pull-stream') // @ts-ignore
const FeedV1 = require('ppppp-db/feed-v1')
const makeDebug = require('debug')
const getSeverity = require('ssb-network-errors') const getSeverity = require('ssb-network-errors')
const pull = require('pull-stream')
const makeDebug = require('debug')
const Algorithm = require('./algorithm') const Algorithm = require('./algorithm')
const SyncStream = require('./stream') const SyncStream = require('./stream')
/** /**
* @typedef {import('./goal').Goal} Goal * @typedef {ReturnType<import('pzp-db').init>} PZPDB
* @typedef {ReturnType<import('pzp-dict').init>} PZPDict
* @typedef {ReturnType<import('pzp-set').init>} PZPSet
* @typedef {ReturnType<import('pzp-goals').init>} PZPGoals
* @typedef {import('node:events').EventEmitter} Emitter
* @typedef {(cb: (err: Error) => void) => import('pull-stream').Duplex<unknown, unknown>} GetDuplex
* @typedef {{ pubkey: string }} SHSE
*/ */
/**
* @param {Error} err
* @param {string} namespace
* @param {string} methodName
*/
function isMuxrpcMissingError(err, namespace, methodName) { function isMuxrpcMissingError(err, namespace, methodName) {
const jsErrorMessage = `method:${namespace},${methodName} is not in list of allowed methods` const jsErrorMessage = `method:${namespace},${methodName} is not in list of allowed methods`
const goErrorMessage = `muxrpc: no such command: ${namespace}.${methodName}` const goErrorMessage = `muxrpc: no such command: ${namespace}.${methodName}`
return err.message === jsErrorMessage || err.message === goErrorMessage return err.message === jsErrorMessage || err.message === goErrorMessage
} }
module.exports = { /**
name: 'tangleSync', * @param {Emitter & {
manifest: { * db: PZPDB,
connect: 'duplex', * dict: PZPDict,
setGoal: 'sync', * set: PZPSet,
initiate: 'sync', * goals: PZPGoals,
}, * shse: SHSE
permissions: { * }} peer
anonymous: { * @param {unknown} config
allow: ['connect'], */
}, function initSync(peer, config) {
}, const debug = makeDebug(`pzp:sync`)
init(peer, config) { const algo = new Algorithm(peer)
const debug = makeDebug(`ppppp:tangleSync`) let started = false
const goals = new Map()
const algo = new Algorithm(peer)
const streams = [] const streams = /** @type {Array<SyncStream>} */ ([])
function createStream(remoteId, iamClient) {
// prettier-ignore
debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId)
const stream = new SyncStream(peer.id, debug, goals, algo)
streams.push(stream)
return stream
}
peer.on('rpc:connect', function onSyncRPCConnect(rpc, iamClient) { /**
if (rpc.id === peer.id) return // local client connecting to local server * @param {string} remoteId
if (!iamClient) return * @param {boolean} iamClient
const local = toPull.duplex(createStream(rpc.id, true)) */
function createStream(remoteId, iamClient) {
// prettier-ignore
debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId)
const { shse, db, goals } = peer
const stream = new SyncStream(shse.pubkey, debug, db, goals, algo)
streams.push(stream)
return stream
}
const remote = rpc.tangleSync.connect((networkError) => { /**
if (networkError && getSeverity(networkError) >= 3) { * @param {{ shse: SHSE, sync: { connect: GetDuplex }, once: CallableFunction }} rpc
if (isMuxrpcMissingError(networkError, 'tangleSync', 'connect')) { * @param {boolean} iamClient
console.warn(`peer ${rpc.id} does not support sync connect`) */
// } else if (isReconnectedError(networkError)) { // TODO: bring back function onSyncRPCConnect(rpc, iamClient) {
// Do nothing, this is a harmless error if (rpc.shse.pubkey === peer.shse.pubkey) return // connecting to myself
} else { if (!iamClient) return
console.error(`rpc.tangleSync.connect exception:`, networkError) const stream = createStream(rpc.shse.pubkey, true)
} const local = toPull.duplex(stream)
let abort = /** @type {CallableFunction | null} */ (null)
const remote = rpc.sync.connect((networkError) => {
if (networkError && getSeverity(networkError) >= 3) {
if (isMuxrpcMissingError(networkError, 'sync', 'connect')) {
debug('peer %s does not support sync', rpc.shse.pubkey)
// } else if (isReconnectedError(networkError)) { // TODO: bring back
// Do nothing, this is a harmless error
} else {
console.error(`rpc.sync.connect exception:`, networkError)
} }
}) abort?.(true, () => {})
pull(local, remote, local)
})
function connect() {
// `this` refers to the remote peer who called this muxrpc API
return toPull.duplex(createStream(this.id, false))
}
/**
* @param {string} tangleId
* @param {Goal} goal
*/
function setGoal(tangleId, goal = 'all') {
goals.set(tangleId, goal)
}
function setFeedGoal(author, type, goal = 'all') {
const tangleId = FeedV1.getFeedRootHash(author, type)
goals.set(tangleId, goal)
}
function initiate() {
for (const stream of streams) {
stream.initiate()
} }
} })
abort = pull(local, remote, local)
rpc.once('closed', () => {
abort?.(true, () => {})
})
if (started) stream.initiate()
}
peer.on('rpc:connect', onSyncRPCConnect)
return { /**
connect, * @this {{shse: {pubkey: string}}}
setGoal, */
setFeedGoal, function connect() {
initiate, return toPull.duplex(createStream(this.shse.pubkey, false))
}
function start() {
if (started) return
started = true
for (const stream of streams) {
stream.initiate()
} }
}
return {
connect,
start,
}
}
exports.name = 'sync'
exports.needs = ['db', 'dict', 'set', 'goals', 'shse']
exports.manifest = {
connect: 'duplex',
initiate: 'sync',
}
exports.init = initSync
exports.permissions = {
anonymous: {
allow: ['connect'],
}, },
} }

View File

@ -1,3 +1,14 @@
/**
* @param {any} range
* @return {range is Range}
*/
function isRange(range) {
if (!Array.isArray(range)) return false
if (range.length !== 2) return false
if (!Number.isInteger(range[0]) || !Number.isInteger(range[1])) return false
return true
}
/** /**
* @typedef {[number, number]} Range * @typedef {[number, number]} Range
*/ */
@ -23,7 +34,11 @@ function estimateMsgCount(range) {
else return estimate else return estimate
} }
const EMPTY_RANGE = /** @type {Range} */ ([1, 0])
module.exports = { module.exports = {
isRange,
isEmptyRange, isEmptyRange,
estimateMsgCount, estimateMsgCount,
EMPTY_RANGE,
} }

View File

@ -1,150 +1,236 @@
// @ts-ignore
const Pipeable = require('push-stream/pipeable') const Pipeable = require('push-stream/pipeable')
const { isEmptyRange } = require('./range') const p = require('promisify-4loc')
const { isRange, isEmptyRange } = require('./range')
const { isMsgId, isBloom, isMsgIds, isMsgs } = require('./util')
/** /**
* @typedef {import('./goal').Goal} Goal * @typedef {ReturnType<import('pzp-goals').init>} PZPGoals
* @typedef {ReturnType<import('pzp-db').init>} PZPDB
* @typedef {import('pzp-db').RecPresent} Rec
* @typedef {import('pzp-db/msg-v4').Msg} Msg
* @typedef {import('./range').Range} Range
* @typedef {import('./algorithm')} Algorithm
* @typedef {import('pzp-goals').Goal} Goal
* @typedef {string} MsgID
* @typedef {{id: string}} WithId
* @typedef {WithId & {phase: 0, payload?: undefined}} Data0
* @typedef {WithId & {phase: 1, payload: Range}} Data1
* @typedef {WithId & {phase: 2, payload: { haveRange: Range, wantRange: Range }}} Data2
* @typedef {WithId & {phase: 3, payload: { wantRange: Range, bloom: JSON }}} Data3
* @typedef {WithId & {phase: 4 | 5 | 6 | 7, payload: { msgIDs: Array<MsgID>, bloom: JSON }}} Data4567
* @typedef {WithId & {phase: 8, payload: { msgs: Array<Msg>, bloom: JSON }}} Data8
* @typedef {WithId & {phase: 9, payload: Array<Msg>}} Data9
* @typedef {Data0 | Data1 | Data2 | Data3 | Data4567 | Data8 | Data9} Data
*/
/**
* @template T
* @typedef {[T] extends [void] ?
* (...args: [Error] | []) => void :
* (...args: [Error] | [null, T]) => void
* } CB
*/ */
class SyncStream extends Pipeable { class SyncStream extends Pipeable {
#myId #myId
#debug /** @type {Algorithm} */
#algo #algo
/** @type {CallableFunction} */
/** Set of tangleId #debug
* @type {Set<string>} */ /** @type {Set<string>} Set of tangleId */
#requested #requested
/** @type {PZPDB} */
/** tangleId => goal #db
* @type {Map<string, Goal>} */ /** @type {PZPGoals} */
#goals #goals
/** /**
* tangleId => have-range by local peer * tangleId => have-range by local peer
* @type {Map<string, [number, number]>} * @type {Map<string, [number, number]>}
*/ */
#localHave #localHave
/** /**
* tangleId => want-range by local peer * tangleId => want-range by local peer
* @type {Map<string, [number, number]>} * @type {Map<string, [number, number]>}
*/ */
#localWant #localWant
/** /**
* tangleId => have-range by remote peer * tangleId => have-range by remote peer
* @type {Map<string, [number, number]>} * @type {Map<string, [number, number]>}
*/ */
#remoteHave #remoteHave
/** /**
* tangleId => want-range by remote peer * tangleId => want-range by remote peer
* @type {Map<string, [number, number]>} * @type {Map<string, [number, number]>}
*/ */
#remoteWant #remoteWant
/** /**
* tangleId => Set of msgIDs * tangleId => Set of msgIDs
* @type {Map<string, Set<string>>} * @type {Map<string, Set<string>>}
*/ */
#receivableMsgs #receivableMsgs
/** /**
* tangleId => Set of msgIDs * tangleId => Set of msgIDs
* @type {Map<string, Set<string>>} * @type {Map<string, Set<string>>}
*/ */
#sendableMsgs #sendableMsgs
/** @type {Set<string>} */
#realtimeSyncing
constructor(localId, debug, goals, algo) { /**
* @param {string} localId
* @param {CallableFunction} debug
* @param {PZPDB} db
* @param {PZPGoals} goals
* @param {Algorithm} algo
*/
constructor(localId, debug, db, goals, algo) {
super() super()
this.paused = false // TODO: should we start as paused=true? this.paused = false // TODO: should we start as paused=true?
this.ended = false this.ended = false
/** @type {any} */
this.source = this.sink = null this.source = this.sink = null
this.#myId = localId.slice(0, 6) this.#myId = localId.slice(0, 6)
this.#debug = debug this.#debug = debug
this.#db = db
this.#goals = goals this.#goals = goals
this.#algo = algo this.#algo = algo
this.#requested = new Set() this.#requested = new Set()
this.#realtimeSyncing = new Set()
this.#localHave = new Map() this.#localHave = new Map()
this.#localWant = new Map() this.#localWant = new Map()
this.#remoteHave = new Map() this.#remoteHave = new Map()
this.#remoteWant = new Map() this.#remoteWant = new Map()
this.#receivableMsgs = new Map() this.#receivableMsgs = new Map()
this.#sendableMsgs = new Map() this.#sendableMsgs = new Map()
// Setup real-time syncing
this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => {
if (!this.sink || this.sink.paused) return
const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles))
this.resume()
for (const id of tangleIDs) {
if (this.#realtimeSyncing.has(id)) {
if (this.#receivableMsgs.has(msgID)) continue
if (this.#receivableMsgs.get(id)?.has(msgID)) continue
if (this.#sendableMsgs.has(msgID)) continue
if (this.#sendableMsgs.get(id)?.has(msgID)) continue
this.sink.write({ id, phase: 9, payload: [msg] })
// prettier-ignore
this.#debug('%s Stream OUTr: sent msg %s in %s', this.#myId, msgID, id)
return
}
}
})
} }
initiate() { initiate() {
for (const id of this.#goals.keys()) { for (const goal of this.#goals.list()) {
this.#requested.add(id) this.#requested.add(goal.id)
} }
this.resume() this.resume()
this.#goals.watch((/** @type {any} */ goal) => {
if (!this.#requested.has(goal.id) && goal.type !== 'none') {
this.#requested.add(goal.id)
this.resume()
}
})
} }
#canSend() { #canSend() {
return this.sink && !this.sink.paused && !this.ended return this.sink && !this.sink.paused && !this.ended
} }
#updateSendableMsgs(id, msgs) { /**
* @param {string} id
* @param {Array<MsgID>} msgIDs
*/
#updateSendableMsgs(id, msgIDs) {
const set = this.#sendableMsgs.get(id) ?? new Set() const set = this.#sendableMsgs.get(id) ?? new Set()
for (const msg of msgs) { for (const msgID of msgIDs) {
set.add(msg) set.add(msgID)
} }
this.#sendableMsgs.set(id, set) this.#sendableMsgs.set(id, set)
} }
#updateReceivableMsgs(id, msgs) { /**
* @param {string} id
* @param {Array<string>} msgIDs
*/
#updateReceivableMsgs(id, msgIDs) {
const set = this.#receivableMsgs.get(id) ?? new Set() const set = this.#receivableMsgs.get(id) ?? new Set()
for (const msg of msgs) { for (const msgID of msgIDs) {
set.add(msg) set.add(msgID)
} }
this.#receivableMsgs.set(id, set) this.#receivableMsgs.set(id, set)
} }
#sendLocalHave(id) { /**
const localHaveRange = this.#algo.haveRange(id) * @param {string} id
*/
async #sendLocalHave(id) {
const localHaveRange = await this.#algo.haveRange(id)
this.#localHave.set(id, localHaveRange) this.#localHave.set(id, localHaveRange)
// prettier-ignore
this.#debug('%s Stream OUT1: send local have-range %o for %s', this.#myId, localHaveRange, id)
this.sink.write({ id, phase: 1, payload: localHaveRange }) this.sink.write({ id, phase: 1, payload: localHaveRange })
// prettier-ignore
this.#debug('%s Stream OUT1: sent local have-range %o for %s', this.#myId, localHaveRange, id)
} }
#sendLocalHaveAndWant(id, remoteHaveRange) { /**
* @param {string} id
* @param {Range} remoteHaveRange
*/
async #sendLocalHaveAndWant(id, remoteHaveRange) {
// prettier-ignore // prettier-ignore
this.#debug('%s Stream IN1: got remote have-range %o for %s', this.#myId, remoteHaveRange, id) this.#debug('%s Stream IN1: got remote have-range %o for %s', this.#myId, remoteHaveRange, id)
this.#remoteHave.set(id, remoteHaveRange) this.#remoteHave.set(id, remoteHaveRange)
const goal = this.#goals.get(id) const goal = this.#goals.get(id)
const haveRange = this.#algo.haveRange(id) const haveRange = await this.#algo.haveRange(id)
const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal) const wantRange = await this.#algo.wantRange(haveRange, remoteHaveRange, goal)
this.#localHave.set(id, haveRange) this.#localHave.set(id, haveRange)
this.#localWant.set(id, wantRange) this.#localWant.set(id, wantRange)
// prettier-ignore
this.#debug('%s Stream OUT2: send local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id)
this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } }) this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } })
// prettier-ignore
this.#debug('%s Stream OUT2: sent local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id)
} }
#sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) { /**
* @param {string} id
* @param {Range} remoteHaveRange
* @param {Range} remoteWantRange
*/
async #sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) {
// prettier-ignore // prettier-ignore
this.#debug('%s Stream IN2: got remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id) this.#debug('%s Stream IN2: got remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id)
this.#remoteHave.set(id, remoteHaveRange) this.#remoteHave.set(id, remoteHaveRange)
this.#remoteWant.set(id, remoteWantRange) this.#remoteWant.set(id, remoteWantRange)
const goal = this.#goals.get(id) const goal = this.#goals.get(id)
const haveRange = this.#localHave.get(id) const haveRange = this.#localHave.get(id) ?? [-1, -1]
const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal) const localWant = await this.#algo.wantRange(haveRange, remoteHaveRange, goal)
this.#localWant.set(id, wantRange) this.#localWant.set(id, localWant)
const localBloom0 = this.#algo.bloomFor(id, 0, wantRange) const localBloom0 = await this.#algo.bloomFor(id, 0, localWant)
this.sink.write({ this.sink.write({
id, id,
phase: 3, phase: 3,
payload: { bloom: localBloom0, wantRange }, payload: { bloom: localBloom0, wantRange: localWant },
}) })
// prettier-ignore // prettier-ignore
this.#debug('%s Stream OUT3: send local want-range %o and bloom round 0 for %s', this.#myId, wantRange, id) this.#debug('%s Stream OUT3: sent local want-range %o and bloom round 0 for %s', this.#myId, localWant, id)
} }
#sendInitBloomRes(id, remoteWantRange, remoteBloom) { /**
* @param {string} id
* @param {Range} remoteWantRange
* @param {JSON} remoteBloom
*/
async #sendInitBloomRes(id, remoteWantRange, remoteBloom) {
// prettier-ignore // prettier-ignore
this.#debug('%s Stream IN3: got remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id) this.#debug('%s Stream IN3: got remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id)
this.#remoteWant.set(id, remoteWantRange) this.#remoteWant.set(id, remoteWantRange)
const msgIDsForThem = this.#algo.msgsMissing( const msgIDsForThem = await this.#algo.getMsgsMissing(
id, id,
0, 0,
remoteWantRange, remoteWantRange,
@ -152,22 +238,30 @@ class SyncStream extends Pipeable {
) )
this.#updateSendableMsgs(id, msgIDsForThem) this.#updateSendableMsgs(id, msgIDsForThem)
const localWantRange = this.#localWant.get(id) const localWantRange = this.#localWant.get(id)
const localBloom = this.#algo.bloomFor(id, 0, localWantRange) if (!localWantRange) throw new Error(`Local want-range not set for ${id}`)
const localBloom = await this.#algo.bloomFor(id, 0, localWantRange)
this.sink.write({ this.sink.write({
id, id,
phase: 4, phase: 4,
payload: { bloom: localBloom, msgIDs: msgIDsForThem }, payload: { bloom: localBloom, msgIDs: msgIDsForThem },
}) })
// prettier-ignore // prettier-ignore
this.#debug('%s Stream OUT4: send bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem) this.#debug('%s Stream OUT4: sent bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem)
} }
#sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) { /**
* @param {string} id
* @param {number} phase
* @param {number} round
* @param {JSON} remoteBloom
* @param {Array<MsgID>} msgIDsForMe
*/
async #sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) {
// prettier-ignore // prettier-ignore
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round-1, id, msgIDsForMe) this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round-1, id, msgIDsForMe)
const remoteWantRange = this.#remoteWant.get(id) const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
this.#updateReceivableMsgs(id, msgIDsForMe) this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = this.#algo.msgsMissing( const msgIDsForThem = await this.#algo.getMsgsMissing(
id, id,
round - 1, round - 1,
remoteWantRange, remoteWantRange,
@ -176,22 +270,30 @@ class SyncStream extends Pipeable {
this.#updateSendableMsgs(id, msgIDsForThem) this.#updateSendableMsgs(id, msgIDsForThem)
const extras = this.#receivableMsgs.get(id) const extras = this.#receivableMsgs.get(id)
const localWantRange = this.#localWant.get(id) const localWantRange = this.#localWant.get(id)
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) if (!localWantRange) throw new Error(`Local want-range not set for ${id}`)
const localBloom = await this.#algo.bloomFor(id, round, localWantRange, extras)
this.sink.write({ this.sink.write({
id, id,
phase, phase,
payload: { bloom: localBloom, msgIDs: msgIDsForThem }, payload: { bloom: localBloom, msgIDs: msgIDsForThem },
}) })
// prettier-ignore // prettier-ignore
this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) this.#debug('%s Stream OUT%s: sent bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem)
} }
#sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) { /**
* @param {string} id
* @param {number} phase
* @param {number} round
* @param {JSON} remoteBloom
* @param {Array<MsgID>} msgIDsForMe
*/
async #sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) {
// prettier-ignore // prettier-ignore
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round, id, msgIDsForMe) this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round, id, msgIDsForMe)
const remoteWantRange = this.#remoteWant.get(id) const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
this.#updateReceivableMsgs(id, msgIDsForMe) this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = this.#algo.msgsMissing( const msgIDsForThem = await this.#algo.getMsgsMissing(
id, id,
round, round,
remoteWantRange, remoteWantRange,
@ -200,22 +302,29 @@ class SyncStream extends Pipeable {
this.#updateSendableMsgs(id, msgIDsForThem) this.#updateSendableMsgs(id, msgIDsForThem)
const extras = this.#receivableMsgs.get(id) const extras = this.#receivableMsgs.get(id)
const localWantRange = this.#localWant.get(id) const localWantRange = this.#localWant.get(id)
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) if (!localWantRange) throw new Error(`Local want-range not set for ${id}`)
const localBloom = await this.#algo.bloomFor(id, round, localWantRange, extras)
this.sink.write({ this.sink.write({
id, id,
phase, phase,
payload: { bloom: localBloom, msgIDs: msgIDsForThem }, payload: { bloom: localBloom, msgIDs: msgIDsForThem },
}) })
// prettier-ignore // prettier-ignore
this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) this.#debug('%s Stream OUT%s: sent bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem)
} }
#sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) { /**
* @param {string} id
* @param {number} round
* @param {JSON} remoteBloom
* @param {Array<MsgID>} msgIDsForMe
*/
async #sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) {
// prettier-ignore // prettier-ignore
this.#debug('%s Stream IN7: got bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe) this.#debug('%s Stream IN7: got bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe)
const remoteWantRange = this.#remoteWant.get(id) const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
this.#updateReceivableMsgs(id, msgIDsForMe) this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = this.#algo.msgsMissing( const msgIDsForThem = await this.#algo.getMsgsMissing(
id, id,
round, round,
remoteWantRange, remoteWantRange,
@ -223,24 +332,36 @@ class SyncStream extends Pipeable {
) )
this.#updateSendableMsgs(id, msgIDsForThem) this.#updateSendableMsgs(id, msgIDsForThem)
const msgIDs = this.#sendableMsgs.get(id) ?? new Set() const msgIDs = this.#sendableMsgs.get(id) ?? new Set()
const msgs = this.#algo.getTangleSlice(id, msgIDs) const tangleMsgs = await this.#algo.getTangleMsgs(id, msgIDs)
const accountMsgs = await this.#algo.filterAndFetchAccountMsgs(msgIDs)
const msgs = accountMsgs.concat(tangleMsgs)
const extras = this.#receivableMsgs.get(id) const extras = this.#receivableMsgs.get(id)
const localWantRange = this.#localWant.get(id) const localWantRange = this.#localWant.get(id)
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) if (!localWantRange) throw new Error(`Local want-range not set for ${id}`)
const localBloom = await this.#algo.bloomFor(id, round, localWantRange, extras)
this.sink.write({ this.sink.write({
id, id,
phase: 8, phase: 8,
payload: { msgs, bloom: localBloom }, payload: { msgs, bloom: localBloom },
}) })
// prettier-ignore // prettier-ignore
this.#debug('%s Stream OUT8: send bloom round %s plus %s msgs in %s', this.#myId, round, msgs.length, id) this.#debug('%s Stream OUT8: sent bloom round %s plus %s msgs in %s', this.#myId, round, msgs.length, id)
if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) {
this.#realtimeSyncing.add(id)
}
} }
#sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) { /**
* @param {string} id
* @param {number} round
* @param {JSON} remoteBloom
* @param {Array<Msg>} msgsForMe
*/
async #sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) {
// prettier-ignore // prettier-ignore
this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id) this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id)
const remoteWantRange = this.#remoteWant.get(id) const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
const msgIDsForThem = this.#algo.msgsMissing( const msgIDsForThem = await this.#algo.getMsgsMissing(
id, id,
round, round,
remoteWantRange, remoteWantRange,
@ -248,62 +369,80 @@ class SyncStream extends Pipeable {
) )
this.#updateSendableMsgs(id, msgIDsForThem) this.#updateSendableMsgs(id, msgIDsForThem)
const msgIDs = this.#sendableMsgs.get(id) ?? new Set() const msgIDs = this.#sendableMsgs.get(id) ?? new Set()
const msgs = this.#algo.getTangleSlice(id, msgIDs) const tangleMsgs = await this.#algo.getTangleMsgs(id, msgIDs)
// prettier-ignore const accountMsgs = await this.#algo.filterAndFetchAccountMsgs(msgIDs)
this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id) const msgs = accountMsgs.concat(tangleMsgs)
this.sink.write({ id, phase: 9, payload: msgs }) this.sink.write({ id, phase: 9, payload: msgs })
// prettier-ignore
const goal = this.#goals.get(id) this.#debug('%s Stream OUT9: sent %s msgs in %s', this.#myId, msgs.length, id)
const localWant = this.#localWant.get(id) if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) {
this.#requested.delete(id) this.#realtimeSyncing.add(id)
this.#localHave.delete(id)
this.#localWant.delete(id)
this.#remoteHave.delete(id)
this.#remoteWant.delete(id)
this.#receivableMsgs.delete(id)
this.#sendableMsgs.delete(id)
if (msgsForMe.length === 0) return
try {
this.#algo.commit(id, msgsForMe, goal, localWant)
} catch (err) {
// prettier-ignore
this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err)
} }
this.#consumeMissingMsgs(id, msgsForMe)
} }
/**
* @param {string} id
* @param {Array<Msg>} msgsForMe
* @returns
*/
#consumeMissingMsgs(id, msgsForMe) { #consumeMissingMsgs(id, msgsForMe) {
// prettier-ignore
this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, msgsForMe.length, id)
const goal = this.#goals.get(id)
const localWant = this.#localWant.get(id)
this.#requested.delete(id) this.#requested.delete(id)
this.#localHave.delete(id) this.#localHave.delete(id)
this.#localWant.delete(id)
this.#remoteHave.delete(id) this.#remoteHave.delete(id)
this.#remoteWant.delete(id) this.#remoteWant.delete(id)
this.#receivableMsgs.delete(id)
this.#sendableMsgs.delete(id)
if (msgsForMe.length === 0) return if (msgsForMe.length === 0) return
const goal = this.#goals.get(id)
if (!goal) {
this.#debug('%s Stream exception: no goal found for %s', this.#myId, id)
return
}
const localWantRange = this.#localWant.get(id)
if (!localWantRange) {
// prettier-ignore
this.#debug('%s Stream exception: local want-range not set for %s', this.#myId, id)
return
}
const validMsgs = this.#algo.filterReceivedMsgs(
id,
msgsForMe,
localWantRange
)
const validMsgIDs = this.#algo.getMsgIDs(validMsgs)
this.#updateReceivableMsgs(id, validMsgIDs)
try { try {
this.#algo.commit(id, msgsForMe, goal, localWant) this.#algo.commit(id, validMsgs, goal)
} catch (err) { } catch (err) {
// prettier-ignore // prettier-ignore
this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err) this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err)
} }
} }
#sendMsgsInRemoteWant(id, remoteWantRange) { /**
* @param {string} id
* @param {Range} remoteWantRange
*/
async #sendMsgsInRemoteWant(id, remoteWantRange) {
const msgs = [] const msgs = []
for (const msg of this.#algo.yieldMsgsIn(id, remoteWantRange)) { const rangeMsgs = await this.#algo.getMsgsInRange(id, remoteWantRange)
msgs.push(msg) const tangleMsgs = await this.#algo.getTangleMsgs(id, rangeMsgs)
} const accountMsgs = await this.#algo.getAccountMsgsFor(tangleMsgs)
// prettier-ignore for (const msg of accountMsgs) msgs.push(msg)
this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id) for (const msg of tangleMsgs) msgs.push(msg)
const msgIDs = this.#algo.getMsgIDs(msgs)
this.#updateSendableMsgs(id, msgIDs)
this.sink.write({ id, phase: 9, payload: msgs }) this.sink.write({ id, phase: 9, payload: msgs })
// prettier-ignore
this.#debug('%s Stream OUT9: sent %s msgs in %s', this.#myId, msgs.length, id)
if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) {
this.#realtimeSyncing.add(id)
}
} }
// as a source // source method
resume() { resume() {
if (!this.sink || this.sink.paused) return if (!this.sink || this.sink.paused) return
@ -313,19 +452,40 @@ class SyncStream extends Pipeable {
} }
} }
// as a sink /**
* sink method
* @param {Data} data
*/
write(data) { write(data) {
// prettier-ignore
if (!data) return this.#debug('Invalid data from remote peer: missing data')
// prettier-ignore
if (typeof data !== 'object') return this.#debug('Invalid data from remote peer: not an object')
// prettier-ignore
if (Array.isArray(data)) return this.#debug('Invalid data from remote peer: is an array')
const { id, phase, payload } = data const { id, phase, payload } = data
// prettier-ignore
if (typeof phase !== 'number') return this.#debug("Invalid data from remote peer: phase isn't a number")
// prettier-ignore
if (!isMsgId(id)) return this.#debug('Invalid data from remote peer: id is not a valid msg id')
// prettier-ignore
if (phase !== 0 && !payload) return this.#debug('Invalid data from remote peer: payload is missing')
switch (phase) { switch (phase) {
case 0: { case 0: {
return this.#sendLocalHave(id) return this.#sendLocalHave(id)
} }
case 1: { case 1: {
// prettier-ignore
if (!isRange(payload)) return this.#debug('Invalid data from remote peer: payload is not a range in phase 1')
return this.#sendLocalHaveAndWant(id, payload) return this.#sendLocalHaveAndWant(id, payload)
} }
case 2: { case 2: {
const { haveRange, wantRange } = payload const { haveRange, wantRange } = payload
// prettier-ignore
if (!isRange(haveRange) || !isRange(wantRange)) return this.#debug('Invalid data from remote peer: haveRange or wantRange is not a range in phase 2')
if (isEmptyRange(haveRange)) { if (isEmptyRange(haveRange)) {
// prettier-ignore // prettier-ignore
this.#debug('%s Stream IN2: received remote have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) this.#debug('%s Stream IN2: received remote have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id)
@ -336,10 +496,15 @@ class SyncStream extends Pipeable {
} }
case 3: { case 3: {
const { wantRange, bloom } = payload const { wantRange, bloom } = payload
// prettier-ignore
if (!isRange(wantRange)) return this.#debug('Invalid data from remote peer: wantRange is not a range in phase 3')
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 3')
const haveRange = this.#remoteHave.get(id) const haveRange = this.#remoteHave.get(id)
if (haveRange && isEmptyRange(haveRange)) { if (haveRange && isEmptyRange(haveRange)) {
// prettier-ignore // prettier-ignore
this.#debug('%s Stream IN3: received remote want-range want-range %o and remember empty have-range %o for %s', this.#myId, wantRange, haveRange, id) this.#debug('%s Stream IN3: received remote want-range %o and remembers empty have-range %o for %s', this.#myId, wantRange, haveRange, id)
return this.#sendMsgsInRemoteWant(id, wantRange) return this.#sendMsgsInRemoteWant(id, wantRange)
} else { } else {
return this.#sendInitBloomRes(id, wantRange, bloom) return this.#sendInitBloomRes(id, wantRange, bloom)
@ -347,40 +512,78 @@ class SyncStream extends Pipeable {
} }
case 4: { case 4: {
const { bloom, msgIDs } = payload const { bloom, msgIDs } = payload
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 4')
// prettier-ignore
if (!isMsgIds(msgIDs)) return this.#debug('Invalid data from remote peer: msgIDs is not an array of msg ids in phase 4')
return this.#sendBloomReq(id, phase + 1, 1, bloom, msgIDs) return this.#sendBloomReq(id, phase + 1, 1, bloom, msgIDs)
} }
case 5: { case 5: {
const { bloom, msgIDs } = payload const { bloom, msgIDs } = payload
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 5')
// prettier-ignore
if (!isMsgIds(msgIDs)) return this.#debug('Invalid data from remote peer: msgIDs is not an array of msg ids in phase 5')
return this.#sendBloomRes(id, phase + 1, 1, bloom, msgIDs) return this.#sendBloomRes(id, phase + 1, 1, bloom, msgIDs)
} }
case 6: { case 6: {
const { bloom, msgIDs } = payload const { bloom, msgIDs } = payload
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 6')
// prettier-ignore
if (!isMsgIds(msgIDs)) return this.#debug('Invalid data from remote peer: msgIDs is not an array of msg ids in phase 6')
return this.#sendBloomReq(id, phase + 1, 2, bloom, msgIDs) return this.#sendBloomReq(id, phase + 1, 2, bloom, msgIDs)
} }
case 7: { case 7: {
const { bloom, msgIDs } = payload const { bloom, msgIDs } = payload
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 7')
// prettier-ignore
if (!isMsgIds(msgIDs)) return this.#debug('Invalid data from remote peer: msgIDs is not an array of msg ids in phase 7')
return this.#sendMissingMsgsReq(id, 2, bloom, msgIDs) return this.#sendMissingMsgsReq(id, 2, bloom, msgIDs)
} }
case 8: { case 8: {
const { bloom, msgs } = payload const { bloom, msgs } = payload
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 8')
// prettier-ignore
if (!isMsgs(msgs)) return this.#debug('Invalid data from remote peer: msgs is not an array of msgs in phase 8')
return this.#sendMissingMsgsRes(id, 2, bloom, msgs) return this.#sendMissingMsgsRes(id, 2, bloom, msgs)
} }
case 9: { case 9: {
// prettier-ignore
if (!isMsgs(payload)) return this.#debug('Invalid data from remote peer: payload is not an array of msgs in phase 9')
// prettier-ignore
this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, payload.length, id)
return this.#consumeMissingMsgs(id, payload) return this.#consumeMissingMsgs(id, payload)
} }
default: {
// prettier-ignore
return this.#debug('Invalid data from remote peer: phase is an invalid number')
}
} }
this.#debug('Stream IN: unknown %o', data)
} }
// as a source /**
* source method
* @param {Error} err
*/
abort(err) { abort(err) {
this.ended = true this.ended = true
if (this.source && !this.source.ended) this.source.abort(err) if (this.source && !this.source.ended) this.source.abort(err)
if (this.sink && !this.sink.ended) this.sink.end(err) if (this.sink && !this.sink.ended) this.sink.end(err)
} }
// as a sink /**
* sink method
* @param {Error} err
*/
end(err) { end(err) {
this.ended = true this.ended = true
if (this.source && !this.source.ended) this.source.abort(err) if (this.source && !this.source.ended) this.source.abort(err)

94
lib/util.js Normal file
View File

@ -0,0 +1,94 @@
const bs58 = require('bs58')
/**
* @typedef {import('./range').Range} Range
* @typedef {import('pzp-db/msg-v4').Msg} Msg
*/
/**
* @param {any} msgId
* @return {msgId is string}
*/
function isMsgId(msgId) {
try {
const d = bs58.decode(msgId)
return d.length === 32
} catch {
return false
}
}
/**
* @param {any} msgIds
* @return {msgIds is Array<string>}
*/
function isMsgIds(msgIds) {
if (!Array.isArray(msgIds)) return false
return msgIds.every(isMsgId)
}
/**
* @param {any} msgs
* @return {msgs is Array<Msg>}
*/
function isMsgs(msgs) {
if (!Array.isArray(msgs)) return false
return msgs.every(isMsg)
}
/**
* @param {any} bloom
* @return {bloom is string}
*/
function isBloom(bloom) {
// TODO: validate when blooming is stabilized
return !!bloom
}
/**
* @param {any} msg
* @returns {msg is Msg}
*/
function isMsg(msg) {
if (!msg || typeof msg !== 'object') {
return false
}
if (!('data' in msg)) {
return false
}
if (!msg.metadata || typeof msg.metadata !== 'object') {
return false
}
if (!('dataHash' in msg.metadata)) {
return false
}
if (!('dataSize' in msg.metadata)) {
return false
}
if (!('account' in msg.metadata)) {
return false
}
if (!('accountTips' in msg.metadata)) {
return false
}
if (!('tangles' in msg.metadata)) {
return false
}
if (!('domain' in msg.metadata)) {
return false
}
if (msg.metadata.v !== 4) {
return false
}
if (typeof msg.sig !== 'string') {
return false
}
return true
}
module.exports = {
isMsgId,
isMsgIds,
isMsgs,
isBloom,
isMsg,
}

View File

@ -1,18 +1,17 @@
{ {
"name": "ppppp-tangle-sync", "name": "pzp-sync",
"version": "1.0.0", "version": "1.0.4",
"description": "PPPPP replication using Kleppmann's hash graph sync", "description": "PZP replication using Kleppmann's hash graph sync",
"author": "Andre Staltz <contact@staltz.com>", "author": "Andre Staltz <contact@staltz.com>",
"license": "MIT", "license": "MIT",
"homepage": "https://github.com/staltz/ppppp-tangle-sync", "homepage": "https://codeberg.org/pzp/pzp-sync",
"repository": { "repository": {
"type": "git", "type": "git",
"url": "git@github.com:staltz/ppppp-tangle-sync.git" "url": "git@codeberg.org:pzp/pzp-sync.git"
}, },
"main": "index.js", "main": "index.js",
"files": [ "files": [
"*.js", "lib/**"
"lib/*.js"
], ],
"exports": { "exports": {
".": { ".": {
@ -25,31 +24,38 @@
}, },
"dependencies": { "dependencies": {
"bloom-filters": "^3.0.0", "bloom-filters": "^3.0.0",
"bs58": "^5.0.0",
"debug": "^4.3.4", "debug": "^4.3.4",
"multicb": "^1.2.2", "promisify-4loc": "^1.0.0",
"pull-stream": "^3.7.0", "pull-stream": "^3.7.0",
"push-stream": "^11.2.0", "push-stream": "^11.2.0",
"push-stream-to-pull-stream": "^1.0.5", "push-stream-to-pull-stream": "^1.0.5",
"ssb-network-errors": "^1.0.1" "ssb-network-errors": "^1.0.1"
}, },
"devDependencies": { "devDependencies": {
"bs58": "^5.0.0", "@types/debug": "^4.1.9",
"@types/pull-stream": "3.6.3",
"@types/node": "16.x",
"c8": "7", "c8": "7",
"ppppp-db": "github:staltz/ppppp-db", "pzp-caps": "^1.0.0",
"pzp-db": "^1.0.4",
"pzp-dict": "^1.0.0",
"pzp-goals": "^1.0.0",
"pzp-keypair": "^1.0.0",
"pzp-set": "^1.0.0",
"prettier": "^2.6.2", "prettier": "^2.6.2",
"pretty-quick": "^3.1.3", "pretty-quick": "^3.1.3",
"rimraf": "^4.4.0", "rimraf": "^4.4.0",
"secret-stack": "^6.4.1", "secret-stack": "~8.1.0",
"secret-handshake-ext": "0.0.11",
"ssb-box": "^1.0.1", "ssb-box": "^1.0.1",
"ssb-caps": "^1.1.0", "typescript": "^5.1.3"
"ssb-classic": "^1.1.0",
"ssb-keys": "^8.5.0",
"ssb-uri2": "^2.4.1",
"tap-arc": "^0.3.5",
"tape": "^5.6.3"
}, },
"scripts": { "scripts": {
"test": "tape test/*.js | tap-arc --bail", "clean-check": "tsc --build --clean",
"prepublishOnly": "npm run clean-check && tsc --build",
"postpublish": "npm run clean-check",
"test": "npm run clean-check && node --test",
"format-code": "prettier --write \"(lib|test)/**/*.js\"", "format-code": "prettier --write \"(lib|test)/**/*.js\"",
"format-code-staged": "pretty-quick --staged --pattern \"(lib|test)/**/*.js\"", "format-code-staged": "pretty-quick --staged --pattern \"(lib|test)/**/*.js\"",
"coverage": "c8 --reporter=lcov npm run test" "coverage": "c8 --reporter=lcov npm run test"

View File

@ -1,7 +1,11 @@
The bloom filter is a representation of msgs I already have in my want-range, For each given tangle, peers exchange ranges (tuples `[minDepth, maxDepth]`)
so you know you can (probably?) skip sending them to me. "haveRange" and "wantRange". Then each peer creates a bloom filter representing
the msgs they currently have in their wantRange, and these such bloom filters.
Based on the remote peer's bloom filter, they exchange msgs that appear to be
missing. The bloom filter is a representation of msgs I already have in my want-range,
so you know you can (probably?) skip sending them to me. The "probably?" uncertainty is reduced by doing several rounds of such exchange. In the end, each peer knows with high certainty which msgs the other peer is missing in their declared want-range, and thus exchange such msgs.
The "probably?" uncertainty is reduced by doing several rounds. In the process, associated account msgs are included even though the tangle being replicated might not be an account tangle. This is because validation of a received tangle msg may require the account msgs.
```mermaid ```mermaid
@ -9,8 +13,10 @@ sequenceDiagram
participant A as Alice participant A as Alice
participant B as Bob participant B as Bob
note over A: I want to sync tangle with ID "T" note over A: I want to sync tangle<br/>with ID "T" and goal aliceG
A->>B: 1: Send local have-range for T note over B: I want to sync tangle<br/>with ID "T" and goal bobG
note over A: aliceHave := getHaveRange(T)
A->>B: Phase 1: Send T and aliceHave
%% opt Alice's have-range is empty %% opt Alice's have-range is empty
%% B->>A: 2: Send local have-range and (empty) want-range for ID %% B->>A: 2: Send local have-range and (empty) want-range for ID
@ -19,30 +25,106 @@ A->>B: 1: Send local have-range for T
%% note over A: done %% note over A: done
%% end %% end
Note over B: Calculate local want-range based on<br/>local have-range and remote have-range Note over B: bobHave := getHaveRange(T)
B->>A: 2: Send local have-range and want-range for T Note over B: bobWant := getWantRange(bobHave, aliceHave, bobG)
B->>A: Phase 2: Send T, bobHave and bobWant
%% opt Bob's have-range is empty %% opt Bob's have-range is empty
%% A->>B: All msgs in remote want-range %% A->>B: All msgs in remote want-range
%% note over B: done %% note over B: done
%% end %% end
Note over A: Calculate BF over all<br />msgs in my want-range Note over A: aliceWant := getWantRange(aliceHave, bobHave, aliceG)
A->>B: 3: Send local want-range and BF for round 0 Note over A: aliceBF0 := bloomFor(T, 0, aliceWant)
Note over B: Read BF to know which<br />msgs they are (maybe) missing A->>B: Phase 3: Send T, aliceWant and aliceBF0
Note over B: Calculate BF over all<br />msgs in my want-range Note over B: aliceMiss0 := msgsMissing(T, 0, aliceWant, aliceBF0)
B->>A: 4: Send BF for round 0 and A's round 0 missing msg IDs Note over B: bobBF0 := bloomFor(T, 0, bobWant)
Note over A: ... B->>A: Phase 4: Send T, bobBF0 and aliceMiss0
A->>B: 5: Send BF for round 1 and B's missing round 0 msg IDs Note over A: bobMiss0 := msgsMissing(T, 0, bobWant, bobBF0)
Note over B: ... Note over A: aliceBF1 := bloomFor(T, 1, aliceWant, aliceMiss0)
B->>A: 6: Send BF for round 1 and A' missing round 1 msg IDs A->>B: Phase 5: Send T, aliceBF1 and bobMiss0
Note over A: ... Note over B: aliceMiss1 := msgsMissing(T, 1, aliceWant, aliceBF1)
A->>B: 7: Send BF for round 2 and B's missing round 2 msg IDs Note over B: bobBF1 := bloomFor(T, 1, bobWant, bobMiss0)
Note over B: ... B->>A: Phase 6: Send T, bobBF1 and aliceMiss1
B->>A: 8: Send BF for round 2 and A's missing msgs Note over A: bobMiss1 := msgsMissing(T, 1, bobWant, bobBF1)
Note over A: Commit received msgs Note over A: aliceBF2 := bloomFor(T, 2, aliceWant, aliceMiss0 + aliceMiss1)
A->>B: 9: Send B's missing msgs A->>B: Phase 7: Send T, aliceBF2 and bobMiss1
Note over B: Commit received msgs Note over B: aliceMiss2 := msgsMissing(T, 2, aliceWant, aliceBF2)
Note over B: aliceMiss := aliceMiss0 + aliceMiss1 + aliceMiss2
Note over B: aliceMsgs := tangleSlice(T, aliceMiss)
Note over B: bobBF2 := bloomFor(T, 2, bobWant, bobMiss0 + bobMiss1)
B->>A: Phase 8: Send T, bobBF2 and aliceMsgs
Note over A: commit(aliceMsgs)
Note over A: bobMiss2 := msgsMissing(T, 2, bobWant, bobBF2)
Note over A: bobMiss := bobMiss0 + bobMiss1 + bobMiss2
Note over A: bobMsgs := tangleSlice(T, bobMiss)
Note over A: msgs := bobMsgs + associatedAccountMsgs(bobMsgs)
A->>B: Phase 9: Send T and msgs
Note over B: commit(msgs)
```
Methods:
```
/**
* Determines the range of depths of msgs I have in the tangle
*/
getHaveRange(tangleID) -> [minDepth, maxDepth]
```
```
/**
* Determines the range of depths of (new) msgs I want from the tangle
*/
getWantRange(localHaveRange, remoteHaveRange, goal) -> [minDepth, maxDepth]
```
```
/**
* For each `msg` in `msgs`, pick the set of msgs from the tangle `msg.metadata.account` (up to `msg.metadata.accountTips`), then combine together all these subsets.
* Returns all such account msgs.
*/
associatedAccountMsgs(msgs)
```
```
/**
* Creates a serialized bloom filter containing the identifiers `${round}${msgID}` for:
* - Each msg in the tangle `tangleID` within depth `range` (inclusive)
* - Each msg in associatedAccountMsgs(tangle msgs above)
* - Each "ghost" msg ID for this tangle
* - Each "extra" msg ID from `extraMsgIDs`
*/
bloomFor(tangleId, round, range, extraMsgIDs) -> Bloom
```
```
/**
* Returns the msg IDs in the tangle `tangleID` which satisfy:
* - `msg.metadata.tangles[tangleID].depth` within `range` (inclusive)
* - `${round}${msgID}` not in `bloom`
* Plus msg IDs of associatedAccountMsgs(tangle msgs above)
*/
msgsMissing(tangleID, round, range, bloom) -> Array<MsgID>
```
```
/**
* Identifies the lowest depth msg in `msgID` as "lowest" and then returns an
* Array of msgs with:
* - `lowest`
* - msgs posterior to `lowest`
* - trail from `lowest` to the root
* The Array is topologically sorted.
*/
tangleSlice(tangleID, msgIDs) -> Array<Msg>
```
```
/**
* Receives an Array of PZP msgs, validates and persists each in the database.
*/
commit(msgs) -> void
``` ```
Peers exchange Peers exchange

79
test/account-sync.test.js Normal file
View File

@ -0,0 +1,79 @@
const test = require('node:test')
const assert = require('node:assert')
const p = require('util').promisify
const Keypair = require('pzp-keypair')
const { createPeer } = require('./util')
const aliceKeypair = Keypair.generate('ed25519', 'alice')
const bobKeys = Keypair.generate('ed25519', 'bob')
async function getAccount(iter) {
const ary = []
for await (const it of iter) {
ary.push(it)
}
return ary
.filter((m) => m.metadata.account === 'self' && m.data?.action === 'add')
.map((m) => m.data.key.bytes)
}
test('sync an account tangle', async (t) => {
const alice = createPeer({ name: 'alice', global: { keypair: aliceKeypair } })
const bob = createPeer({ name: 'bob', global: { keypair: bobKeys } })
await alice.db.loaded()
await bob.db.loaded()
// Alice's account tangle
await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
const aliceKeypair1 = Keypair.generate('ed25519', 'alice1')
await p(alice.db.account.add)({
account: aliceID,
keypair: aliceKeypair1,
})
const aliceKeypair2 = Keypair.generate('ed25519', 'alice2')
await p(alice.db.account.add)({
account: aliceID,
keypair: aliceKeypair2,
})
assert.deepEqual(
await getAccount(alice.db.msgs()),
[aliceKeypair.public, aliceKeypair1.public, aliceKeypair2.public],
'alice has her account tangle'
)
assert.deepEqual(
await getAccount(bob.db.msgs()),
[],
"bob doesn't have alice's account tangle"
)
// start() on purpose before connect, to test whether this also works
bob.sync.start()
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
// Set goals on purpose after connect, to test whether this also works
bob.goals.set(aliceID, 'all')
alice.goals.set(aliceID, 'all')
await p(setTimeout)(1000)
assert('sync!')
assert.deepEqual(
await getAccount(bob.db.msgs()),
[aliceKeypair.public, aliceKeypair1.public, aliceKeypair2.public],
"bob has alice's account tangle"
)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})

219
test/dict-sync.test.js Normal file
View File

@ -0,0 +1,219 @@
const test = require('node:test')
const assert = require('node:assert')
const p = require('node:util').promisify
const Keypair = require('pzp-keypair')
const MsgV4 = require('pzp-db/msg-v4')
const { createPeer } = require('./util')
const aliceKeypair = Keypair.generate('ed25519', 'alice')
async function flatten(iter) {
const ary = []
for await (const it of iter) {
ary.push(it)
}
return ary
}
test('sync goal=dict from scratch', async (t) => {
const SPAN = 5
const alice = createPeer({
name: 'alice',
global: {
keypair: aliceKeypair,
},
dict: { ghostSpan: SPAN },
})
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
// Alice sets up an account and a dict
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
await p(alice.dict.load)(aliceID)
const aliceAccountRoot = await p(alice.db.get)(aliceID)
// Bob knows Alice
await p(bob.db.add)(aliceAccountRoot, aliceID)
// Alice constructs a dict
await p(alice.dict.update)('profile', { age: 25 })
await p(alice.dict.update)('profile', { name: 'ALICE' })
const mootID = alice.dict.getFeedID('profile')
// Assert situation at Alice before sync
{
const arr = (await flatten(alice.db.msgs()))
.map((msg) => msg.data?.update)
.filter((x) => !!x)
.map((x) => x.age ?? x.name ?? x.gender)
assert.deepEqual(arr, [25, 'ALICE'], 'alice has age+name dict')
}
// Assert situation at Bob before sync
{
const arr = (await flatten(bob.db.msgs()))
.map((msg) => msg.data?.update)
.filter((x) => !!x)
.map((x) => x.age ?? x.name ?? x.gender)
assert.deepEqual(arr, [], 'alice has empty dict')
}
// Trigger sync
alice.goals.set(mootID, 'dict')
bob.goals.set(mootID, 'dict')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
// Assert situation at Bob after sync
{
const arr = (await flatten(bob.db.msgs()))
.map((msg) => msg.data?.update)
.filter((x) => !!x)
.map((x) => x.age ?? x.name ?? x.gender)
assert.deepEqual(arr, [25, 'ALICE'], 'alice has age+name dict')
}
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
//
// R-?-?-o-o
// \
// o
//
// where "o" is a dict update and "?" is a ghost
test('sync goal=dict with ghostSpan=2', async (t) => {
const SPAN = 5
const alice = createPeer({
name: 'alice',
global: {
keypair: aliceKeypair,
},
dict: { ghostSpan: SPAN },
})
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
// Alice sets up an account and a dict
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
await p(alice.dict.load)(aliceID)
const aliceAccountRoot = await p(alice.db.get)(aliceID)
// Bob knows Alice
await p(bob.db.add)(aliceAccountRoot, aliceID)
// Alice constructs a dict
await p(alice.dict.update)('profile', { name: 'alice' })
await p(alice.dict.update)('profile', { age: 24 })
await p(alice.dict.update)('profile', { name: 'Alice' })
await p(alice.dict.update)('profile', { age: 25 })
await p(alice.dict.update)('profile', { name: 'ALICE' })
let moot
let rec1
let rec2
let rec3
let rec4
let rec5
for await (const rec of alice.db.records()) {
if (rec.msg.metadata.dataSize === 0) moot = rec
if (rec.msg.data?.update?.name === 'alice') rec1 = rec
if (rec.msg.data?.update?.age === 24) rec2 = rec
if (rec.msg.data?.update?.name === 'Alice') rec3 = rec
if (rec.msg.data?.update?.age === 25) rec4 = rec
if (rec.msg.data?.update?.name === 'ALICE') rec5 = rec
}
// Bob knows the whole dict
await p(bob.db.add)(moot.msg, moot.id)
await p(bob.db.add)(rec1.msg, moot.id)
await p(bob.db.add)(rec2.msg, moot.id)
await p(bob.db.add)(rec3.msg, moot.id)
await p(bob.db.add)(rec4.msg, moot.id)
await p(bob.db.add)(rec5.msg, moot.id)
// Bob knows a branched off msg that Alice doesn't know
{
const tangle = new MsgV4.Tangle(moot.id)
tangle.add(moot.id, moot.msg)
tangle.add(rec1.id, rec1.msg)
const msg = MsgV4.create({
keypair: aliceKeypair,
domain: 'dict_v1__profile',
account: aliceID,
accountTips: [aliceID],
data: { update: { gender: 'w' }, supersedes: [] },
tangles: {
[moot.id]: tangle,
},
})
await p(bob.db.add)(msg, moot.id)
}
// Simulate Alice garbage collecting part of the dict
{
const fieldRoots = alice.dict._getFieldRoots('profile')
assert.deepEqual(fieldRoots.age, [rec4.id])
assert.deepEqual(fieldRoots.name, [rec5.id])
const tangle = await p(alice.db.getTangle)(alice.dict.getFeedID('profile'))
const { deletables, erasables } = tangle.getDeletablesAndErasables(rec4.id)
assert.equal(deletables.size, 2)
assert.equal(erasables.size, 2)
assert.ok(deletables.has(rec1.id))
assert.ok(deletables.has(rec2.id))
assert.ok(erasables.has(rec3.id))
assert.ok(erasables.has(moot.id))
for (const msgID of deletables) {
await p(alice.db.ghosts.add)({ msgID, tangleID: moot.id, span: SPAN })
await p(alice.db.del)(msgID)
}
for (const msgID of erasables) {
if (msgID === moot.id) continue
await p(alice.db.erase)(msgID)
}
}
// Assert situation at Alice before sync
assert.deepEqual(
await p(alice.dict.read)(aliceID, 'profile'),
{ age: 25, name: 'ALICE' },
'alice has age+name dict'
)
assert.deepEqual(alice.db.ghosts.get(moot.id), [rec1.id, rec2.id])
// Trigger sync
alice.goals.set(moot.id, 'dict')
bob.goals.set(moot.id, 'dict')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
// Assert situation at Alice before sync: she got the branched off msg
assert.deepEqual(
await p(alice.dict.read)(aliceID, 'profile'),
{ age: 25, name: 'ALICE', gender: 'w' },
'alice has age+name+gender dict'
)
assert.deepEqual(alice.db.ghosts.get(moot.id), [rec2.id])
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})

View File

@ -1,89 +1,239 @@
const test = require('tape') const test = require('node:test')
const path = require('path') const assert = require('node:assert')
const os = require('os') const p = require('node:util').promisify
const rimraf = require('rimraf') const Keypair = require('pzp-keypair')
const SecretStack = require('secret-stack')
const caps = require('ssb-caps')
const FeedV1 = require('ppppp-db/feed-v1')
const p = require('util').promisify
const Algorithm = require('../lib/algorithm') const Algorithm = require('../lib/algorithm')
const { generateKeypair } = require('./util') const { createPeer } = require('./util')
const createPeer = SecretStack({ appKey: caps.shs }) const carolKeypair = Keypair.generate('ed25519', 'carol')
.use(require('ppppp-db')) const bobKeypair2 = Keypair.generate('ed25519', 'bob2')
.use(require('ssb-box'))
.use(require('../lib'))
const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') async function flatten(iter) {
const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') const ary = []
const aliceKeys = generateKeypair('alice') for await (const it of iter) {
const bobKeys = generateKeypair('bob') ary.push(it)
}
return ary
}
test('sync a feed with goal=all', async (t) => { test('sync a feed without pre-knowing the owner account', async (t) => {
rimraf.sync(ALICE_DIR) const alice = createPeer({ name: 'alice' })
rimraf.sync(BOB_DIR) const bob = createPeer({ name: 'bob' })
const alice = createPeer({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createPeer({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded() await alice.db.loaded()
await bob.db.loaded() await bob.db.loaded()
const carolKeys = generateKeypair('carol') const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
for (let i = 1; i <= 5; i++) {
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm' + i },
})
}
assert('bob published posts 1..5')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m1', 'm2', 'm3', 'm4', 'm5'],
'alice has posts 1..5 from bob'
)
}
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('sync a feed with updated msgs from new account keypair', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
for (let i = 1; i <= 5; i++) {
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm' + i },
})
}
assert('bob published posts 1..5')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m1', 'm2', 'm3', 'm4', 'm5'],
'alice has posts 1..5 from bob'
)
}
await p(remoteAlice.close)(true)
// --------------------------------------------
// Bob adds a new keypair and published with it
// --------------------------------------------
const consent = bob.db.account.consent({
account: bobID,
keypair: bobKeypair2,
})
await p(bob.db.account.add)({
account: bobID,
keypair: bobKeypair2,
consent,
powers: [],
})
for (let i = 6; i <= 7; i++) {
await p(bob.db.feed.publish)({
account: bobID,
keypair: bobKeypair2,
domain: 'post',
data: { text: 'm' + i },
})
}
assert('bob with new keypair published posts 6..7')
const remoteAlice2 = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'],
'alice has posts 1..7 from bob'
)
}
await p(remoteAlice2.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('sync a feed with goal=all', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const carolID = await p(alice.db.account.create)({
keypair: carolKeypair,
subdomain: 'account',
_nonce: 'carol',
})
const carolAccountRoot = await p(alice.db.get)(carolID)
// Bob knows Carol
await p(bob.db.add)(carolAccountRoot, carolID)
const carolMsgs = [] const carolMsgs = []
const carolID = carolKeys.id
const carolID_b58 = FeedV1.stripAuthor(carolID)
for (let i = 1; i <= 10; i++) { for (let i = 1; i <= 10; i++) {
const rec = await p(alice.db.create)({ const rec = await p(alice.db.feed.publish)({
type: 'post', account: carolID,
content: { text: 'm' + i }, domain: 'post',
keys: carolKeys, data: { text: 'm' + i },
keypair: carolKeypair,
}) })
carolMsgs.push(rec.msg) carolMsgs.push(rec.msg)
} }
t.pass('alice has msgs 1..10 from carol') assert('alice has msgs 1..10 from carol')
const carolRootHash = alice.db.getFeedRoot(carolID, 'post') const carolPostsMootID = alice.db.feed.getID(carolID, 'post')
const carolRootMsg = alice.db.get(carolRootHash) const carolPostsMoot = await p(alice.db.get)(carolPostsMootID)
await p(bob.db.add)(carolRootMsg, carolRootHash) await p(bob.db.add)(carolPostsMoot, carolPostsMootID)
for (let i = 0; i < 7; i++) { for (let i = 0; i < 7; i++) {
await p(bob.db.add)(carolMsgs[i], carolRootHash) await p(bob.db.add)(carolMsgs[i], carolPostsMootID)
} }
{ {
const arr = [...bob.db.msgs()] const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content) .filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.content.text) .map((msg) => msg.data.text)
t.deepEquals( assert.deepEqual(
arr, arr,
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'], ['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'],
'bob has msgs 1..7 from carol' 'bob has msgs 1..7 from carol'
) )
} }
bob.tangleSync.setGoal(carolRootHash, 'all') bob.goals.set(carolPostsMootID, 'all')
alice.tangleSync.setGoal(carolRootHash, 'all') alice.goals.set(carolPostsMootID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
t.pass('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
t.pass('tangleSync!') assert('sync!')
{ {
const arr = [...bob.db.msgs()] const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content) .filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.content.text) .map((msg) => msg.data.text)
t.deepEquals( assert.deepEqual(
arr, arr,
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7', 'm8', 'm9', 'm10'], ['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7', 'm8', 'm9', 'm10'],
'bob has msgs 1..10 from carol' 'bob has msgs 1..10 from carol'
@ -96,70 +246,68 @@ test('sync a feed with goal=all', async (t) => {
}) })
test('sync a feed with goal=newest', async (t) => { test('sync a feed with goal=newest', async (t) => {
rimraf.sync(ALICE_DIR) const alice = createPeer({ name: 'alice' })
rimraf.sync(BOB_DIR) const bob = createPeer({ name: 'bob' })
const alice = createPeer({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createPeer({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded() await alice.db.loaded()
await bob.db.loaded() await bob.db.loaded()
const carolKeys = generateKeypair('carol') const carolID = await p(alice.db.account.create)({
keypair: carolKeypair,
subdomain: 'account',
_nonce: 'carol',
})
const carolAccountRoot = await p(alice.db.get)(carolID)
// Bob knows Carol
await p(bob.db.add)(carolAccountRoot, carolID)
const carolMsgs = [] const carolMsgs = []
const carolID = carolKeys.id
const carolID_b58 = FeedV1.stripAuthor(carolID)
for (let i = 1; i <= 10; i++) { for (let i = 1; i <= 10; i++) {
const rec = await p(alice.db.create)({ const rec = await p(alice.db.feed.publish)({
type: 'post', account: carolID,
content: { text: 'm' + i }, domain: 'post',
keys: carolKeys, data: { text: 'm' + i },
keypair: carolKeypair,
}) })
carolMsgs.push(rec.msg) carolMsgs.push(rec.msg)
} }
t.pass('alice has msgs 1..10 from carol') assert('alice has msgs 1..10 from carol')
const carolRootHash = alice.db.getFeedRoot(carolID, 'post') const carolPostsMootID = alice.db.feed.getID(carolID, 'post')
const carolRootMsg = alice.db.get(carolRootHash) const carolPostsMoot = await p(alice.db.get)(carolPostsMootID)
await p(bob.db.add)(carolRootMsg, carolRootHash) await p(bob.db.add)(carolPostsMoot, carolPostsMootID)
for (let i = 0; i < 7; i++) { for (let i = 0; i < 7; i++) {
await p(bob.db.add)(carolMsgs[i], carolRootHash) await p(bob.db.add)(carolMsgs[i], carolPostsMootID)
} }
{ {
const arr = [...bob.db.msgs()] const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content) .filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.content.text) .map((msg) => msg.data.text)
t.deepEquals( assert.deepEqual(
arr, arr,
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'], ['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'],
'bob has msgs 1..7 from carol' 'bob has msgs 1..7 from carol'
) )
} }
bob.tangleSync.setGoal(carolRootHash, 'newest-5') bob.goals.set(carolPostsMootID, 'newest-5')
alice.tangleSync.setGoal(carolRootHash, 'all') alice.goals.set(carolPostsMootID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
t.pass('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
t.pass('tangleSync!') assert('sync!')
{ {
const arr = [...bob.db.msgs()] const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content) .filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.content.text) .map((msg) => msg.data.text)
t.deepEquals( assert.deepEqual(
arr, arr,
['m6', 'm7', 'm8', 'm9', 'm10'], ['m6', 'm7', 'm8', 'm9', 'm10'],
'bob has msgs 6..10 from carol' 'bob has msgs 6..10 from carol'
@ -172,78 +320,76 @@ test('sync a feed with goal=newest', async (t) => {
}) })
test('sync a feed with goal=newest but too far behind', async (t) => { test('sync a feed with goal=newest but too far behind', async (t) => {
rimraf.sync(ALICE_DIR) const alice = createPeer({ name: 'alice' })
rimraf.sync(BOB_DIR) const bob = createPeer({ name: 'bob' })
const alice = createPeer({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createPeer({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded() await alice.db.loaded()
await bob.db.loaded() await bob.db.loaded()
const carolKeys = generateKeypair('carol') const carolID = await p(alice.db.account.create)({
keypair: carolKeypair,
subdomain: 'account',
_nonce: 'carol',
})
const carolIDMsg = await p(alice.db.get)(carolID)
// Bob knows Carol
await p(bob.db.add)(carolIDMsg, carolID)
const carolMsgs = [] const carolMsgs = []
const carolID = carolKeys.id
const carolID_b58 = FeedV1.stripAuthor(carolID)
for (let i = 1; i <= 10; i++) { for (let i = 1; i <= 10; i++) {
const rec = await p(alice.db.create)({ const rec = await p(alice.db.feed.publish)({
type: 'post', account: carolID,
content: { text: 'm' + i }, domain: 'post',
keys: carolKeys, data: { text: 'm' + i },
keypair: carolKeypair,
}) })
carolMsgs.push(rec.msg) carolMsgs.push(rec.msg)
} }
const carolRootHash = alice.db.getFeedRoot(carolID, 'post') const carolPostsMootID = alice.db.feed.getID(carolID, 'post')
const carolRootMsg = alice.db.get(carolRootHash) const carolPostsMoot = await p(alice.db.get)(carolPostsMootID)
const algo = new Algorithm(alice) const algo = new Algorithm(alice)
await algo.pruneNewest(carolRootHash, 5) await algo.pruneNewest(carolPostsMootID, 5)
{ {
const arr = [...alice.db.msgs()] const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content) .filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.content.text) .map((msg) => msg.data.text)
t.deepEquals( assert.deepEqual(
arr, arr,
['m6', 'm7', 'm8', 'm9', 'm10'], ['m6', 'm7', 'm8', 'm9', 'm10'],
'alice has msgs 6..10 from carol' 'alice has msgs 6..10 from carol'
) )
} }
await p(bob.db.add)(carolRootMsg, carolRootHash) await p(bob.db.add)(carolPostsMoot, carolPostsMootID)
for (let i = 0; i < 2; i++) { for (let i = 0; i < 2; i++) {
await p(bob.db.add)(carolMsgs[i], carolRootHash) await p(bob.db.add)(carolMsgs[i], carolPostsMootID)
} }
{ {
const arr = [...bob.db.msgs()] const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content) .filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.content.text) .map((msg) => msg.data.text)
t.deepEquals(arr, ['m1', 'm2'], 'bob has msgs 1..2 from carol') assert.deepEqual(arr, ['m1', 'm2'], 'bob has msgs 1..2 from carol')
} }
alice.tangleSync.setFeedGoal(carolID, 'post', 'newest-5') alice.goals.set(carolPostsMootID, 'newest-5')
bob.tangleSync.setFeedGoal(carolID, 'post', 'newest-5') bob.goals.set(carolPostsMootID, 'newest-8')
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
t.pass('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
t.pass('tangleSync!') assert('sync!')
{ {
const arr = [...bob.db.msgs()] const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content) .filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.content.text) .map((msg) => msg.data.text)
t.deepEquals( assert.deepEqual(
arr, arr,
['m6', 'm7', 'm8', 'm9', 'm10'], ['m6', 'm7', 'm8', 'm9', 'm10'],
'bob has msgs 6..10 from carol' 'bob has msgs 6..10 from carol'
@ -254,3 +400,122 @@ test('sync a feed with goal=newest but too far behind', async (t) => {
await p(alice.close)(true) await p(alice.close)(true)
await p(bob.close)(true) await p(bob.close)(true)
}) })
// Bob replicates a small "newest" part of Carol's feed, then
// Alice replicates what Bob has, even though she wants more.
// Finally, Alice replicates from Carol the whole feed.
test('sync small newest slice of a feed, then the whole feed', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
const carol = createPeer({ name: 'carol' })
await alice.db.loaded()
await bob.db.loaded()
await carol.db.loaded()
const carolID = await p(carol.db.account.create)({
subdomain: 'account',
_nonce: 'carol',
})
const carolIDMsg = await p(carol.db.get)(carolID)
// Alice and Bob know Carol
await p(alice.db.add)(carolIDMsg, carolID)
await p(bob.db.add)(carolIDMsg, carolID)
const carolPosts = []
for (let i = 1; i <= 9; i++) {
const rec = await p(carol.db.feed.publish)({
account: carolID,
domain: 'post',
data: { text: 'm' + i },
})
carolPosts.push(rec.msg)
}
const carolPostsMootID = carol.db.feed.getID(carolID, 'post')
const carolPostsMoot = await p(carol.db.get)(carolPostsMootID)
{
const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'bob has nothing from carol')
}
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has nothing from carol')
}
alice.goals.set(carolPostsMootID, 'all')
bob.goals.set(carolPostsMootID, 'newest-4')
carol.goals.set(carolPostsMootID, 'all')
const bobDialingCarol = await p(bob.connect)(carol.getAddress())
assert('bob connected to carol')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m6', 'm7', 'm8', 'm9'],
'bob has msgs 6..9 from carol'
)
}
await p(bobDialingCarol.close)(true)
const aliceDialingBob = await p(alice.connect)(bob.getAddress())
assert('alice connected to bob')
alice.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m6', 'm7', 'm8', 'm9'],
'alice has msgs 6..9 from carol'
)
}
await p(aliceDialingBob.close)(true)
const aliceDialingCarol = await p(alice.connect)(carol.getAddress())
assert('alice connected to alice')
alice.sync.start()
await p(setTimeout)(2000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
.sort()
assert.deepEqual(
arr,
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7', 'm8', 'm9'],
'alice has msgs 1..9 from carol'
)
}
await p(aliceDialingCarol.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
await p(carol.close)(true)
})

382
test/realtime.test.js Normal file
View File

@ -0,0 +1,382 @@
const test = require('node:test')
const assert = require('node:assert')
const p = require('node:util').promisify
const { createPeer } = require('./util')
async function flatten(iter) {
const ary = []
for await (const it of iter) {
ary.push(it)
}
return ary
}
test('sync feed msgs in realtime after the 9 rounds', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm0' },
})
assert('bob published post 0')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, ['m0'], 'alice has post 0 from bob')
}
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm1' },
})
assert('bob published post 1')
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm2' },
})
assert('bob published post 2')
{
let arr
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
if (arr.length < 3) {
await p(setTimeout)(200)
continue
}
}
assert.deepEqual(arr, ['m0', 'm1', 'm2'], 'alice has posts 0..2 from bob')
}
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('sync feed msgs in realtime after the 9 rounds, reverse', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm0' },
})
assert('bob published post 0')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteBob = await p(alice.connect)(bob.getAddress())
assert('bob connected to alice')
// Reverse direction of who "starts"
alice.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, ['m0'], 'alice has post 0 from bob')
}
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm1' },
})
assert('bob published post 1')
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm2' },
})
assert('bob published post 2')
{
let arr
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
if (arr.length < 3) {
await p(setTimeout)(200)
continue
}
}
assert.deepEqual(arr, ['m0', 'm1', 'm2'], 'alice has posts 0..2 from bob')
}
await p(remoteBob.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('create 100 messages in parallel that still manage to sync realtime (create an initial post before starting realtime too)', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm0' },
})
assert('bob published post 0')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('alice and bob connected')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
const n = 100
const hundred = []
for (let i = 0; i < n; i++) {
hundred.push(i)
}
await Promise.all(hundred.map(i => p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: `post nr ${i}` },
})))
assert('bob published 100 posts in parallel')
const bobMsgs = await flatten(bob.db.msgs())
// 1 for creating bob's account, 1 for the 'post' moot, and 1 for first post
assert.equal(bobMsgs.length, n + 3, "bob has all of his own messages")
let arr
// just waiting for them to arrive
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
// moot doesn't have msg.data
.filter((msg) => msg.metadata.account === bobID && msg.data)
.filter(msg => msg.metadata.domain === 'post')
.map((msg) => msg.data.text)
if (arr.length < n) {
await p(setTimeout)(100)
} else {
break
}
}
assert.equal(arr.length, n + 1, `alice has ${arr.length} posts from bob`)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('create 100 messages in parallel that still manage to sync realtime (without creating an initial post before starting realtime)', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('alice and bob connected')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
const n = 100
const hundred = []
for (let i = 0; i < n; i++) {
hundred.push(i)
}
await Promise.all(hundred.map(i => p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: `post nr ${i}` },
})))
assert('bob published 100 posts in parallel')
const bobMsgs = await flatten(bob.db.msgs())
// 1 for creating bob's account, and 1 for the 'post' moot
assert.equal(bobMsgs.length, n + 2, "bob has all of his own messages")
let arr
// just waiting for them to arrive
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
// moot doesn't have msg.data
.filter((msg) => msg.metadata.account === bobID && msg.data)
.filter(msg => msg.metadata.domain === 'post')
.map((msg) => msg.data.text)
if (arr.length < n) {
await p(setTimeout)(100)
} else {
break
}
}
assert.equal(arr.length, n, `alice has ${arr.length} posts from bob`)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('create 200 messages that manage to replicate with low "newest" goals', async (t) => {
const n = 200
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
const confirmed = []
// for keeping track of which msgs have arrived
for (let i = 0; i < n; i++) {
confirmed.push(false)
}
alice.db.onRecordAdded(rec => {
if (rec.msg.data?.text) {
const num = Number.parseInt(rec.msg.data.text)
confirmed[num] = true
}
})
bob.goals.set(bobPostsID, 'newest-50')
alice.goals.set(bobPostsID, 'newest-50')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('alice and bob connected')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
const hundred = []
for (let i = 0; i < n; i++) {
hundred.push(i)
}
Promise.all(hundred.map(i => p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: `${i}` },
})))
assert(`bob published ${n} posts in parallel`)
let tries = 30
// just waiting for them to arrive
do {
await p(setTimeout)(100)
} while (!confirmed.every(v => v === true) && tries-- > 0)
assert.equal(confirmed.filter(v => v === true).length, n, `alice has all of bob's posts`)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})

218
test/set-sync.test.js Normal file
View File

@ -0,0 +1,218 @@
const test = require('node:test')
const assert = require('node:assert')
const p = require('node:util').promisify
const Keypair = require('pzp-keypair')
const MsgV4 = require('pzp-db/msg-v4')
const { createPeer } = require('./util')
const aliceKeypair = Keypair.generate('ed25519', 'alice')
function getItems(subdomain, arr) {
return arr
.filter((msg) => msg.metadata.domain === `set_v1__${subdomain}`)
.map((msg) => msg.data)
.filter((data) => !!data)
.map((data) => data.add?.[0] ?? '-' + data.del?.[0])
}
test('sync goal=set from scratch', async (t) => {
const SPAN = 5
const alice = createPeer({
name: 'alice',
global: {
keypair: aliceKeypair,
},
set: { ghostSpan: SPAN },
})
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
// Alice sets up an account and a set
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
await p(alice.set.load)(aliceID)
const aliceAccountRoot = await p(alice.db.get)(aliceID)
// Bob knows Alice
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
await p(bob.set.load)(bobID)
await p(bob.db.add)(aliceAccountRoot, aliceID)
// Alice constructs a set
await p(alice.set.add)('names', 'Alice')
await p(alice.set.add)('names', 'Bob')
const mootID = alice.set.getFeedID('names')
// Assert situation at Alice before sync
assert.deepEqual(
await p(alice.set.values)('names', aliceID),
['Alice', 'Bob'],
'alice has Alice+Bob set'
)
// Assert situation at Bob before sync
assert.deepEqual(await p(bob.set.values)('names', aliceID), [], 'bob has empty set')
// Trigger sync
alice.goals.set(mootID, 'set')
bob.goals.set(mootID, 'set')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(2000)
assert('sync!')
// Assert situation at Bob after sync
assert.deepEqual(
await p(bob.set.values)('names', aliceID),
['Alice', 'Bob'],
'bob has Alice+Bob set'
)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
//
// R-?-?-?-?-o-o
// \
// o
//
// where "o" is a set update and "?" is a ghost
test('sync goal=set with ghostSpan=5', async (t) => {
const SPAN = 5
const alice = createPeer({
name: 'alice',
global: {
keypair: aliceKeypair,
},
set: { ghostSpan: SPAN },
})
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
// Alice sets up an account and a set
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
await p(alice.set.load)(aliceID)
const aliceAccountRoot = await p(alice.db.get)(aliceID)
// Bob knows Alice
await p(bob.db.add)(aliceAccountRoot, aliceID)
// Alice constructs a set
await p(alice.set.add)('follows', 'alice')
await p(alice.set.add)('follows', 'bob')
await p(alice.set.del)('follows', 'alice')
await p(alice.set.del)('follows', 'bob')
await p(alice.set.add)('follows', 'Alice')
await p(alice.set.add)('follows', 'Bob')
let moot
let rec1
let rec2
let rec3
let rec4
let rec5
let rec6
for await (const rec of alice.db.records()) {
if (rec.msg.metadata.dataSize === 0) moot = rec
if (rec.msg.data?.add?.[0] === 'alice') rec1 = rec
if (rec.msg.data?.add?.[0] === 'bob') rec2 = rec
if (rec.msg.data?.del?.[0] === 'alice') rec3 = rec
if (rec.msg.data?.del?.[0] === 'bob') rec4 = rec
if (rec.msg.data?.add?.[0] === 'Alice') rec5 = rec
if (rec.msg.data?.add?.[0] === 'Bob') rec6 = rec
}
// Bob knows the whole set
await p(bob.db.add)(moot.msg, moot.id)
await p(bob.db.add)(rec1.msg, moot.id)
await p(bob.db.add)(rec2.msg, moot.id)
await p(bob.db.add)(rec3.msg, moot.id)
await p(bob.db.add)(rec4.msg, moot.id)
await p(bob.db.add)(rec5.msg, moot.id)
await p(bob.db.add)(rec6.msg, moot.id)
// Bob knows a branched off msg that Alice doesn't know
{
const tangle = new MsgV4.Tangle(moot.id)
tangle.add(moot.id, moot.msg)
tangle.add(rec1.id, rec1.msg)
const msg = MsgV4.create({
keypair: aliceKeypair,
domain: 'set_v1__follows',
account: aliceID,
accountTips: [aliceID],
data: { add: ['Carol'], del: [], supersedes: [] },
tangles: {
[moot.id]: tangle,
},
})
await p(bob.db.add)(msg, moot.id)
}
// Simulate Alice garbage collecting part of the set
{
const itemRoots = alice.set._getItemRoots('follows')
assert.deepEqual(itemRoots, { Alice: [rec5.id], Bob: [rec6.id] })
const tangle = await p(alice.db.getTangle)(alice.set.getFeedID('follows'))
const { deletables, erasables } = tangle.getDeletablesAndErasables(rec5.id)
assert.equal(deletables.size, 2)
assert.equal(erasables.size, 3)
assert.ok(deletables.has(rec1.id))
assert.ok(deletables.has(rec2.id))
assert.ok(erasables.has(moot.id))
assert.ok(erasables.has(rec3.id))
assert.ok(erasables.has(rec4.id))
for (const msgID of deletables) {
await p(alice.db.ghosts.add)({ msgID, tangleID: moot.id, span: SPAN })
await p(alice.db.del)(msgID)
}
for (const msgID of erasables) {
if (msgID === moot.id) continue
await p(alice.db.erase)(msgID)
}
}
// Assert situation at Alice before sync
assert.deepEqual(
await p(alice.set.values)('follows', aliceID),
['Alice', 'Bob'],
'alice has Alice+Bob set'
)
assert.deepEqual(alice.db.ghosts.get(moot.id), [rec1.id, rec2.id])
// Trigger sync
alice.goals.set(moot.id, 'set')
bob.goals.set(moot.id, 'set')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(2000)
assert('sync!')
// Assert situation at Alice after sync: she got the branched off msg
assert.deepEqual(
await p(alice.set.values)('follows', aliceID),
['Carol', 'Alice', 'Bob'],
'alice has Alice+Bob+Carol set'
)
assert.deepEqual(alice.db.ghosts.get(moot.id), [rec2.id])
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})

View File

@ -1,24 +1,18 @@
const test = require('tape') const test = require('node:test')
const path = require('path') const assert = require('node:assert')
const os = require('os')
const rimraf = require('rimraf')
const SecretStack = require('secret-stack')
const caps = require('ssb-caps')
const p = require('util').promisify const p = require('util').promisify
const { generateKeypair } = require('./util') const Keypair = require('pzp-keypair')
const { createPeer } = require('./util')
const createSSB = SecretStack({ appKey: caps.shs }) const carolKeypair = Keypair.generate('ed25519', 'carol')
.use(require('ppppp-db')) const daveKeypair = Keypair.generate('ed25519', 'dave')
.use(require('ssb-box'))
.use(require('../lib'))
const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') async function getTexts(iter) {
const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') const ary = []
const aliceKeys = generateKeypair('alice') for await (i of iter) {
const bobKeys = generateKeypair('bob') ary.push(i)
}
function getTexts(iter) { return ary.filter((msg) => msg.data?.text).map((msg) => msg.data.text)
return [...iter].filter((msg) => msg.content).map((msg) => msg.content.text)
} }
/* /*
@ -59,103 +53,124 @@ graph TB;
``` ```
*/ */
test('sync a thread where both peers have portions', async (t) => { test('sync a thread where both peers have portions', async (t) => {
rimraf.sync(ALICE_DIR) const alice = createPeer({ name: 'alice' })
rimraf.sync(BOB_DIR) const bob = createPeer({ name: 'bob' })
const alice = createSSB({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createSSB({
keys: bobKeys,
path: BOB_DIR,
})
const carolKeys = generateKeypair('carol')
const carolID = carolKeys.id
const daveKeys = generateKeypair('dave')
const daveID = daveKeys.id
await alice.db.loaded() await alice.db.loaded()
await bob.db.loaded() const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
const startA = await p(alice.db.create)({ _nonce: 'alice',
type: 'post',
content: { text: 'A' },
keys: aliceKeys,
}) })
const rootHashA = alice.db.getFeedRoot(aliceKeys.id, 'post') const aliceIDMsg = await p(alice.db.get)(aliceID)
const rootMsgA = alice.db.get(rootHashA)
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobIDMsg = await p(bob.db.get)(bobID)
// Alice created Carol
const carolID = await p(alice.db.account.create)({
subdomain: 'account',
keypair: carolKeypair,
_nonce: 'carol',
})
const carolIDMsg = await p(alice.db.get)(carolID)
// Alice created Dave
const daveID = await p(alice.db.account.create)({
subdomain: 'account',
keypair: daveKeypair,
_nonce: 'dave',
})
const daveIDMsg = await p(alice.db.get)(daveID)
// Alice knows Bob
await p(alice.db.add)(bobIDMsg, bobID)
// Bob knows Alice, Carol, and Dave
await p(bob.db.add)(aliceIDMsg, aliceID)
await p(bob.db.add)(carolIDMsg, carolID)
await p(bob.db.add)(daveIDMsg, daveID)
const startA = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A' },
})
const rootHashA = alice.db.feed.getID(aliceID, 'post')
const rootMsgA = await p(alice.db.get)(rootHashA)
await p(bob.db.add)(rootMsgA, rootHashA) await p(bob.db.add)(rootMsgA, rootHashA)
await p(bob.db.add)(startA.msg, rootHashA) await p(bob.db.add)(startA.msg, rootHashA)
const replyB1 = await p(bob.db.create)({ const replyB1 = await p(bob.db.feed.publish)({
type: 'post', account: bobID,
content: { text: 'B1' }, domain: 'post',
tangles: [startA.hash], data: { text: 'B1' },
keys: bobKeys, tangles: [startA.id],
}) })
const replyB2 = await p(bob.db.create)({ const replyB2 = await p(bob.db.feed.publish)({
type: 'post', account: bobID,
content: { text: 'B2' }, domain: 'post',
tangles: [startA.hash], data: { text: 'B2' },
keys: bobKeys, tangles: [startA.id],
}) })
const rootHashB = bob.db.getFeedRoot(bobKeys.id, 'post') const rootHashB = bob.db.feed.getID(bobID, 'post')
const rootMsgB = bob.db.get(rootHashB) const rootMsgB = await p(bob.db.get)(rootHashB)
await p(alice.db.add)(rootMsgB, rootHashB) await p(alice.db.add)(rootMsgB, rootHashB)
await p(alice.db.add)(replyB1.msg, rootHashB) await p(alice.db.add)(replyB1.msg, rootHashB)
await p(alice.db.add)(replyB2.msg, rootHashB) await p(alice.db.add)(replyB2.msg, rootHashB)
const replyC1 = await p(alice.db.create)({ const replyC1 = await p(alice.db.feed.publish)({
type: 'post', account: carolID,
content: { text: 'C1' }, domain: 'post',
tangles: [startA.hash], data: { text: 'C1' },
keys: carolKeys, tangles: [startA.id],
keypair: carolKeypair,
}) })
const replyD1 = await p(bob.db.create)({ const replyD1 = await p(bob.db.feed.publish)({
type: 'post', account: daveID,
content: { text: 'D1' }, domain: 'post',
tangles: [startA.hash], data: { text: 'D1' },
keys: daveKeys, tangles: [startA.id],
keypair: daveKeypair,
}) })
t.deepEquals( assert.deepEqual(
getTexts(alice.db.msgs()), await getTexts(alice.db.msgs()),
['A', 'B1', 'B2', 'C1'], ['A', 'B1', 'B2', 'C1'],
'alice has a portion of the thread' 'alice has a portion of the thread'
) )
t.deepEquals( assert.deepEqual(
getTexts(bob.db.msgs()), await getTexts(bob.db.msgs()),
['A', 'B1', 'B2', 'D1'], ['A', 'B1', 'B2', 'D1'],
'bob has another portion of the thread' 'bob has another portion of the thread'
) )
bob.tangleSync.setGoal(startA.hash, 'all') bob.goals.set(startA.id, 'all')
alice.tangleSync.setGoal(startA.hash, 'all') alice.goals.set(startA.id, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
t.pass('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
t.pass('tangleSync!') assert('sync!')
t.deepEquals( assert.deepEqual(
getTexts(alice.db.msgs()), await getTexts(alice.db.msgs()),
['A', 'B1', 'B2', 'C1', 'D1'], ['A', 'B1', 'B2', 'C1', 'D1'],
'alice has the full thread' 'alice has the full thread'
) )
t.deepEquals( assert.deepEqual(
getTexts(bob.db.msgs()), await getTexts(bob.db.msgs()),
['A', 'B1', 'B2', 'D1', 'C1'], ['A', 'B1', 'B2', 'D1', 'C1'],
'bob has the full thread' 'bob has the full thread'
) )
@ -166,63 +181,70 @@ test('sync a thread where both peers have portions', async (t) => {
}) })
test('sync a thread where initiator does not have the root', async (t) => { test('sync a thread where initiator does not have the root', async (t) => {
rimraf.sync(ALICE_DIR) const alice = createPeer({ name: 'alice' })
rimraf.sync(BOB_DIR) const bob = createPeer({ name: 'bob' })
const alice = createSSB({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createSSB({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded() await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
const aliceIDMsg = await p(alice.db.get)(aliceID)
await bob.db.loaded() await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobIDMsg = await p(bob.db.get)(bobID)
const rootA = await p(alice.db.create)({ // Alice knows Bob
type: 'post', await p(alice.db.add)(bobIDMsg, bobID)
content: { text: 'A' },
keys: aliceKeys, // Bob knows Alice
await p(bob.db.add)(aliceIDMsg, aliceID)
const rootA = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A' },
}) })
const replyA1 = await p(alice.db.create)({ const replyA1 = await p(alice.db.feed.publish)({
type: 'post', account: aliceID,
content: { text: 'A1' }, domain: 'post',
tangles: [rootA.hash], data: { text: 'A1' },
keys: aliceKeys, tangles: [rootA.id],
}) })
const replyA2 = await p(alice.db.create)({ const replyA2 = await p(alice.db.feed.publish)({
type: 'post', account: aliceID,
content: { text: 'A2' }, domain: 'post',
tangles: [rootA.hash], data: { text: 'A2' },
keys: aliceKeys, tangles: [rootA.id],
}) })
t.deepEquals( assert.deepEqual(
getTexts(alice.db.msgs()), await getTexts(alice.db.msgs()),
['A', 'A1', 'A2'], ['A', 'A1', 'A2'],
'alice has the full thread' 'alice has the full thread'
) )
t.deepEquals(getTexts(bob.db.msgs()), [], 'bob has nothing') assert.deepEqual(await getTexts(bob.db.msgs()), [], 'bob has nothing')
bob.tangleSync.setGoal(rootA.hash, 'all') bob.goals.set(rootA.id, 'all')
// ON PURPOSE: alice does not set the goal // ON PURPOSE: alice does not set the goal
// alice.tangleSync.setGoal(rootA.hash, 'all') // alice.goals.set(rootA.id, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
t.pass('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
t.pass('tangleSync!') assert('sync!')
t.deepEquals( assert.deepEqual(
getTexts(bob.db.msgs()), await getTexts(bob.db.msgs()),
['A', 'A1', 'A2'], ['A', 'A1', 'A2'],
'bob has the full thread' 'bob has the full thread'
) )
@ -233,62 +255,69 @@ test('sync a thread where initiator does not have the root', async (t) => {
}) })
test('sync a thread where receiver does not have the root', async (t) => { test('sync a thread where receiver does not have the root', async (t) => {
rimraf.sync(ALICE_DIR) const alice = createPeer({ name: 'alice' })
rimraf.sync(BOB_DIR) const bob = createPeer({ name: 'bob' })
const alice = createSSB({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createSSB({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded() await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
const aliceIDMsg = await p(alice.db.get)(aliceID)
await bob.db.loaded() await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobIDMsg = await p(bob.db.get)(bobID)
const rootA = await p(alice.db.create)({ // Alice knows Bob
type: 'post', await p(alice.db.add)(bobIDMsg, bobID)
content: { text: 'A' },
keys: aliceKeys, // Bob knows Alice
await p(bob.db.add)(aliceIDMsg, aliceID)
const rootA = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A' },
}) })
const replyA1 = await p(alice.db.create)({ const replyA1 = await p(alice.db.feed.publish)({
type: 'post', account: aliceID,
content: { text: 'A1' }, domain: 'post',
tangles: [rootA.hash], data: { text: 'A1' },
keys: aliceKeys, tangles: [rootA.id],
}) })
const replyA2 = await p(alice.db.create)({ const replyA2 = await p(alice.db.feed.publish)({
type: 'post', account: aliceID,
content: { text: 'A2' }, domain: 'post',
tangles: [rootA.hash], data: { text: 'A2' },
keys: aliceKeys, tangles: [rootA.id],
}) })
t.deepEquals( assert.deepEqual(
getTexts(alice.db.msgs()), await getTexts(alice.db.msgs()),
['A', 'A1', 'A2'], ['A', 'A1', 'A2'],
'alice has the full thread' 'alice has the full thread'
) )
t.deepEquals(getTexts(bob.db.msgs()), [], 'bob has nothing') assert.deepEqual(await getTexts(bob.db.msgs()), [], 'bob has nothing')
bob.tangleSync.setGoal(rootA.hash, 'all') bob.goals.set(rootA.id, 'all')
alice.tangleSync.setGoal(rootA.hash, 'all') alice.goals.set(rootA.id, 'all')
const remoteBob = await p(alice.connect)(bob.getAddress()) const remoteBob = await p(alice.connect)(bob.getAddress())
t.pass('alice connected to bob') assert('alice connected to bob')
alice.tangleSync.initiate() alice.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
t.pass('tangleSync!') assert('sync!')
t.deepEquals( assert.deepEqual(
getTexts(bob.db.msgs()), await getTexts(bob.db.msgs()),
['A', 'A1', 'A2'], ['A', 'A1', 'A2'],
'bob has the full thread' 'bob has the full thread'
) )
@ -299,69 +328,76 @@ test('sync a thread where receiver does not have the root', async (t) => {
}) })
test('sync a thread with reactions too', async (t) => { test('sync a thread with reactions too', async (t) => {
rimraf.sync(ALICE_DIR) const alice = createPeer({ name: 'alice' })
rimraf.sync(BOB_DIR) const bob = createPeer({ name: 'bob' })
const alice = createSSB({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createSSB({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded() await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
const aliceIDMsg = await p(alice.db.get)(aliceID)
await bob.db.loaded() await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobIDMsg = await p(bob.db.get)(bobID)
const rootA = await p(alice.db.create)({ // Alice knows Bob
type: 'post', await p(alice.db.add)(bobIDMsg, bobID)
content: { text: 'A' },
keys: aliceKeys, // Bob knows Alice
await p(bob.db.add)(aliceIDMsg, aliceID)
const rootA = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A' },
}) })
const replyA1 = await p(alice.db.create)({ const replyA1 = await p(alice.db.feed.publish)({
type: 'post', account: aliceID,
content: { text: 'A1' }, domain: 'post',
tangles: [rootA.hash], data: { text: 'A1' },
keys: aliceKeys, tangles: [rootA.id],
}) })
const replyA2 = await p(alice.db.create)({ const replyA2 = await p(alice.db.feed.publish)({
type: 'post', account: aliceID,
content: { text: 'A2' }, domain: 'post',
tangles: [rootA.hash], data: { text: 'A2' },
keys: aliceKeys, tangles: [rootA.id],
}) })
const reactionA3 = await p(alice.db.create)({ const reactionA3 = await p(alice.db.feed.publish)({
type: 'reaction', account: aliceID,
content: { text: 'yes', link: replyA1.hash }, domain: 'reaction',
tangles: [rootA.hash, replyA1.hash], data: { text: 'yes', link: replyA1.id },
keys: aliceKeys, tangles: [rootA.id, replyA1.id],
}) })
t.deepEquals( assert.deepEqual(
getTexts(alice.db.msgs()), await getTexts(alice.db.msgs()),
['A', 'A1', 'A2', 'yes'], ['A', 'A1', 'A2', 'yes'],
'alice has the full thread' 'alice has the full thread'
) )
t.deepEquals(getTexts(bob.db.msgs()), [], 'bob has nothing') assert.deepEqual(await getTexts(bob.db.msgs()), [], 'bob has nothing')
bob.tangleSync.setGoal(rootA.hash, 'all') bob.goals.set(rootA.id, 'all')
alice.tangleSync.setGoal(rootA.hash, 'all') alice.goals.set(rootA.id, 'all')
const remoteBob = await p(alice.connect)(bob.getAddress()) const remoteBob = await p(alice.connect)(bob.getAddress())
t.pass('alice connected to bob') assert('alice connected to bob')
alice.tangleSync.initiate() alice.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
t.pass('tangleSync!') assert('sync!')
t.deepEquals( assert.deepEqual(
getTexts(bob.db.msgs()), await getTexts(bob.db.msgs()),
['A', 'A1', 'A2', 'yes'], ['A', 'A1', 'A2', 'yes'],
'bob has the full thread' 'bob has the full thread'
) )

View File

@ -1,14 +1,53 @@
const ssbKeys = require('ssb-keys') const OS = require('node:os')
const SSBURI = require('ssb-uri2') const Path = require('node:path')
const base58 = require('bs58') const rimraf = require('rimraf')
const caps = require('pzp-caps')
const Keypair = require('pzp-keypair')
function generateKeypair(seed) { function createPeer(config) {
const keys = ssbKeys.generate('ed25519', seed, 'buttwoo-v1') if (config.name) {
const { data } = SSBURI.decompose(keys.id) const name = config.name
keys.id = `ppppp:feed/v1/${base58.encode(Buffer.from(data, 'base64'))}` const tmp = OS.tmpdir()
return keys config.global ??= {}
config.global.path ??= Path.join(tmp, `pzp-sync-${name}-${Date.now()}`)
config.global.keypair ??= Keypair.generate('ed25519', name)
delete config.name
}
if (!config.global) {
throw new Error('need config.global in createPeer()')
}
if (!config.global.path) {
throw new Error('need config.global.path in createPeer()')
}
if (!config.global.keypair) {
throw new Error('need config.global.keypair in createPeer()')
}
rimraf.sync(config.global.path)
return require('secret-stack/bare')()
.use(require('secret-stack/plugins/net'))
.use(require('secret-handshake-ext/secret-stack'))
.use(require('pzp-db'))
.use(require('pzp-dict'))
.use(require('pzp-set'))
.use(require('pzp-goals'))
.use(require('ssb-box'))
.use(require('../lib'))
.call(null, {
shse: { caps },
...config,
global: {
connections: {
incoming: {
net: [{ scope: 'device', transform: 'shse', port: null }],
},
outgoing: {
net: [{ transform: 'shse' }],
},
},
...config.global,
},
})
} }
module.exports = { module.exports = { createPeer }
generateKeypair,
}

44
test/want-range.test.js Normal file
View File

@ -0,0 +1,44 @@
const test = require('node:test')
const assert = require('node:assert')
const Algorithm = require('../lib/algorithm')
const EMPTY = [1, 0]
test('want-range for goal=newest-3', async (t) => {
const algo = new Algorithm({ db: null })
const goal = { type: 'newest', count: 3 }
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 3], goal), [2, 3])
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 5], goal), [3, 5])
assert.deepStrictEqual(await algo.wantRange([1, 3], [2, 4], goal), [2, 4])
assert.deepStrictEqual(await algo.wantRange([1, 5], [2, 4], goal), [3, 4])
assert.deepStrictEqual(await algo.wantRange([1, 3], [4, 6], goal), [4, 6])
assert.deepStrictEqual(await algo.wantRange([4, 6], [1, 3], goal), EMPTY)
assert.deepStrictEqual(await algo.wantRange([1, 3], [6, 7], goal), [6, 7])
})
test('want-range for goal=all', async (t) => {
const algo = new Algorithm({ db: null })
const goal = { type: 'all' }
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 3], goal), [1, 3])
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 5], goal), [1, 5])
assert.deepStrictEqual(await algo.wantRange([1, 3], [2, 4], goal), [2, 4])
assert.deepStrictEqual(await algo.wantRange([1, 5], [2, 4], goal), [2, 4])
assert.deepStrictEqual(await algo.wantRange([1, 3], [4, 6], goal), [4, 6])
assert.deepStrictEqual(await algo.wantRange([4, 6], [1, 3], goal), [1, 3])
assert.deepStrictEqual(await algo.wantRange([1, 3], [6, 7], goal), [6, 7])
})
test('want-range for goal=dict', async (t) => {
const algo = new Algorithm({ db: null, dict: { minGhostDepth: (id, cb) => cb(null, 3) } })
const goal = { type: 'dict' }
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 3], goal), [3, 3])
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 5], goal), [3, 5])
assert.deepStrictEqual(await algo.wantRange([1, 3], [2, 4], goal), [3, 4])
assert.deepStrictEqual(await algo.wantRange([1, 5], [2, 4], goal), [3, 4])
assert.deepStrictEqual(await algo.wantRange([1, 3], [4, 6], goal), [4, 6])
assert.deepStrictEqual(await algo.wantRange([4, 6], [1, 3], goal), [3, 3])
assert.deepStrictEqual(await algo.wantRange([1, 3], [6, 7], goal), [6, 7])
})

16
tsconfig.json Normal file
View File

@ -0,0 +1,16 @@
{
"include": ["lib/**/*.js"],
"exclude": ["coverage/", "node_modules/", "test/"],
"compilerOptions": {
"checkJs": true,
"declaration": true,
"emitDeclarationOnly": true,
"exactOptionalPropertyTypes": true,
"forceConsistentCasingInFileNames": true,
"lib": ["es2022", "dom"],
"module": "node16",
"skipLibCheck": true,
"strict": true,
"target": "es2022"
}
}