Compare commits

..

No commits in common. "master" and "rev1" have entirely different histories.
master ... rev1

21 changed files with 779 additions and 2623 deletions

25
.github/workflows/node.js.yml vendored Normal file
View File

@ -0,0 +1,25 @@
name: CI
on:
push:
branches: [master]
pull_request:
branches: [master]
jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
matrix:
node-version: [16.x, 18.x]
steps:
- uses: actions/checkout@v3
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
- run: npm install
- run: npm test

1
.gitignore vendored
View File

@ -4,7 +4,6 @@ pnpm-lock.yaml
package-lock.json
coverage
*~
lib/*.d.ts
# For misc scripts and experiments:
/gitignored

View File

@ -1,13 +0,0 @@
matrix:
NODE_VERSION:
- 18
- 20
steps:
test:
when:
event: [push]
image: node:${NODE_VERSION}
commands:
- npm install
- npm test

View File

@ -1,13 +1,9 @@
# pzp-sync
PZP replication using Kleppmann's hash graph sync
https://martin.kleppmann.com/2020/12/02/bloom-filter-hash-graph-sync.html
https://arxiv.org/abs/2012.00472
**Work in progress**
## Installation
```
npm install pzp-sync
We're not on npm yet. In your package.json, include this as
```js
"ppppp-tangle-sync": "github:staltz/ppppp-tangle-sync"
```

View File

@ -1,24 +1,17 @@
const p = require('promisify-4loc')
const { BloomFilter } = require('bloom-filters')
const MsgV4 = require('pzp-db/msg-v4')
const makeDebug = require('debug')
const debug = makeDebug('pzp:sync')
const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range')
const FeedV1 = require('ppppp-db/feed-v1')
const p = require('util').promisify
const { isEmptyRange, estimateMsgCount } = require('./range')
const { parseGoal } = require('./goal')
/**
* @typedef {ReturnType<import('pzp-db').init>} PZPDB
* @typedef {ReturnType<import('pzp-dict').init>} PZPDict
* @typedef {ReturnType<import('pzp-set').init>} PZPSet
* @typedef {import('pzp-db/msg-v4').Msg} Msg
* @typedef {import('pzp-db/db-tangle')} DBTangle
* @typedef {import('pzp-goals').Goal} Goal
* @typedef {import('./range').Range} Range
* @typedef {string} MsgID
*/
/**
* @param {Iterable<unknown>} iter
* @typedef {import('./goal').Goal} Goal
*/
function countIter(iter) {
let count = 0
for (const _ of iter) count++
@ -26,33 +19,24 @@ function countIter(iter) {
}
class Algorithm {
/** @type {ConstructorParameters<typeof Algorithm>[0]} */
#peer
/** @param {{ db: PZPDB, dict: PZPDict, set: PZPSet }} peer */
constructor(peer) {
this.#peer = peer
}
/**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer has for
* the given tangle known by the `rootID`.
*
* @param {string} rootID
* @returns {Promise<Range>}
*/
async haveRange(rootID) {
const rootMsg = await p(this.#peer.db.get)(rootID)
if (!rootMsg) return EMPTY_RANGE
haveRange(rootMsgHash) {
const rootMsg = this.#peer.db.get(rootMsgHash)
if (!rootMsg) return [1, 0]
let minDepth = Number.MAX_SAFE_INTEGER
let maxDepth = 0
for await (const rec of this.#peer.db.records()) {
if (!rec?.msg?.data && rec.id !== rootID) continue
for (const rec of this.#peer.db.records()) {
if (!rec.msg?.content) continue
const tangles = rec.msg.metadata.tangles
if (rec.id === rootID) {
if (rec.hash === rootMsgHash) {
minDepth = 0
} else if (tangles[rootID]) {
const depth = tangles[rootID].depth
} else if (tangles[rootMsgHash]) {
const depth = tangles[rootMsgHash].depth
minDepth = Math.min(minDepth, depth)
maxDepth = Math.max(maxDepth, depth)
}
@ -61,9 +45,6 @@ class Algorithm {
}
/**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given the goal "all" and local and remote have ranges.
*
* @param {Range} localHaveRange
* @param {Range} remoteHaveRange
* @returns {Range}
@ -73,10 +54,6 @@ class Algorithm {
}
/**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given the goal "newest" (alongside with a `count` parameter) and local and
* remote have ranges.
*
* @param {Range} localHaveRange
* @param {Range} remoteHaveRange
* @param {number} count
@ -85,329 +62,173 @@ class Algorithm {
#wantNewestRange(localHaveRange, remoteHaveRange, count) {
const [minLocalHave, maxLocalHave] = localHaveRange
const [minRemoteHave, maxRemoteHave] = remoteHaveRange
if (maxRemoteHave < minLocalHave) return EMPTY_RANGE
if (maxRemoteHave <= maxLocalHave) return [1, 0]
const maxWant = maxRemoteHave
const size = count > maxWant - maxLocalHave ? count : maxWant - maxLocalHave
const minWant = Math.max(
maxWant - size + 1,
maxLocalHave - size + 1,
minRemoteHave
)
const size = Math.max(maxWant - maxLocalHave, count)
const minWant = Math.max(maxWant - size, maxLocalHave + 1, minRemoteHave)
return [minWant, maxWant]
}
/**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given the goal "dict" or "set" and local and remote have ranges.
*
* @param {number} minGhostDepth
* @param {Range} localHaveRange
* @param {Range} remoteHaveRange
* @param {number} count
* @returns {Range}
*/
#wantDictOrSetRange(minGhostDepth, remoteHaveRange) {
const [minRemoteHave, maxRemoteHave] = remoteHaveRange
if (maxRemoteHave < minGhostDepth) return EMPTY_RANGE
const maxWant = maxRemoteHave
const minWant = Math.max(minGhostDepth, minRemoteHave)
return [minWant, maxWant]
#wantOldestRange(localHaveRange, remoteHaveRange, count) {
// TODO: implement
throw new Error('not implemented')
}
/**
* Calculates the range ([minDepth, maxDepth]) of msgs that the peer wants,
* given a `goal`.
*
* @param {Range} localHave
* @param {Range} remoteHave
* @param {Goal?} goal
* @returns {Promise<Range>}
* @returns {Range}
*/
async wantRange(localHave, remoteHave, goal) {
if (!goal) return EMPTY_RANGE
if (isEmptyRange(remoteHave)) return EMPTY_RANGE
switch (goal.type) {
case 'all':
return this.#wantAllRange(localHave, remoteHave)
case 'dict':
const minDictGhostDepth = await p(this.#peer.dict.minGhostDepth)(goal.id)
return this.#wantDictOrSetRange(minDictGhostDepth, remoteHave)
case 'set':
const minSetGhostDepth = await p(this.#peer.set.minGhostDepth)(goal.id)
return this.#wantDictOrSetRange(minSetGhostDepth, remoteHave)
case 'newest':
return this.#wantNewestRange(localHave, remoteHave, goal.count)
case 'none':
return EMPTY_RANGE
default:
throw new Error(`Unrecognized goal type: ${goal.type}`)
wantRange(localHave, remoteHave, goal) {
if (!goal) return [1, 0]
if (isEmptyRange(remoteHave)) return [1, 0]
const { type, count } = parseGoal(goal)
if (type === 'all') {
return this.#wantAllRange(localHave, remoteHave)
} else if (type === 'newest') {
return this.#wantNewestRange(localHave, remoteHave, count)
} else if (type === 'oldest') {
return this.#wantOldestRange(localHave, remoteHave, count)
}
}
/**
* Returns a bloom filter that represents the msgs that this peer has in the
* database, matching the given tangle `rootID` and `range`. The `round` is
* used to identify bloom filter items from different rounds.
*
* The bloom filter also includes account msgs that are outside the tangle
* `rootID`, but required for validation of tangle `rootID` msgs.
*
* @param {string} rootID
* @param {number} round
* @param {Range} range
* @param {Iterable<string>} extraIds
* @returns {Promise<JSON>}
*/
async bloomFor(rootID, round, range, extraIds = []) {
bloomFor(rootMsgHash, round, range, extraIds = []) {
const filterSize =
(isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds)
const filter = BloomFilter.create(2 * filterSize, 0.00001)
if (!isEmptyRange(range)) {
const rangeMsgs = await this.getMsgsInRange(rootID, range)
const accountMsgs = await this.getAccountMsgsFor(rangeMsgs)
for (const msg of accountMsgs.concat(rangeMsgs)) {
filter.add('' + round + MsgV4.getMsgID(msg))
for (const msg of this.yieldMsgsIn(rootMsgHash, range)) {
filter.add('' + round + FeedV1.getMsgHash(msg))
}
}
const ghosts = this.#peer.db.ghosts.get(rootID)
for (const ghostMsgID of ghosts) {
// No need to check depths because the `range` is by definition taking
// into account local ghost depths
filter.add('' + round + ghostMsgID)
}
for (const msgID of extraIds) {
filter.add('' + round + msgID)
for (const msgId of extraIds) {
filter.add('' + round + msgId)
}
return filter.saveAsJSON()
}
/**
* Returns msg IDs for msgs that are missing in the remote peer's database for
* the tangle `rootID` within `range`, judging by the given `remoteBloomJSON`
* (and `round`) bloom filter.
*
* This may also contain account msgs that are outside the tangle `rootID`,
* but required to validate the msgs in that tangle.
*
* @param {string} rootID
* @param {number} round
* @param {Range} range
* @param {JSON} remoteBloomJSON
* @returns {Promise<Array<MsgID>>}
*/
async getMsgsMissing(rootID, round, range, remoteBloomJSON) {
msgsMissing(rootMsgHash, round, range, remoteBloomJSON) {
if (isEmptyRange(range)) return []
const remoteFilter = BloomFilter.fromJSON(remoteBloomJSON)
const missing = []
const rangeMsgs = await this.getMsgsInRange(rootID, range)
const accountMsgs = await this.getAccountMsgsFor(rangeMsgs)
for (const msg of accountMsgs.concat(rangeMsgs)) {
const msgID = MsgV4.getMsgID(msg)
if (!remoteFilter.has('' + round + msgID)) {
missing.push(msgID)
for (const msg of this.yieldMsgsIn(rootMsgHash, range)) {
const msgHash = FeedV1.getMsgHash(msg)
if (!remoteFilter.has('' + round + msgHash)) {
missing.push(msgHash)
}
}
return missing
}
/**
* Returns an array of account msgs that are required for validating the given
* `msgs`.
*
* @param {Array<Msg>} msgs
* @returns {Promise<Array<Msg>>}
*/
async getAccountMsgsFor(msgs) {
const accountTips = /** @type {Map<MsgID, Set<string>>} */ (new Map())
for (const msg of msgs) {
if (MsgV4.isFeedMsg(msg)) {
const set = accountTips.get(msg.metadata.account) ?? new Set()
for (const tip of msg.metadata.accountTips) {
set.add(tip)
}
accountTips.set(msg.metadata.account, set)
}
}
const accountMsgs = []
for (const [accountID, tips] of accountTips) {
const accountTangle = await p(this.#peer.db.getTangle)(accountID)
if (!accountTangle) continue
accountMsgs.push(...(await accountTangle.slice([], [...tips])))
}
return accountMsgs
}
/**
* Among the given `msgIDs`, find those that are account msgs and return them
* as msgs.
*
* @param {Iterable<MsgID>} msgIDs
* @returns {Promise<Array<Msg>>}
*/
async filterAndFetchAccountMsgs(msgIDs) {
const accountMsgs = []
for (const msgID of msgIDs) {
const msg = await p(this.#peer.db.get)(msgID)
if (msg?.metadata.account === 'self') {
accountMsgs.push(msg)
}
}
return accountMsgs
}
/**
* Returns msgs that have a depth within the given `range` for the tangle
* `rootID`.
*
* @param {string} rootID
* @param {Range} range
* @returns {Promise<Array<Msg>>}
*/
async getMsgsInRange(rootID, range) {
*yieldMsgsIn(rootMsgHash, range) {
const [minDepth, maxDepth] = range
const rootMsg = await p(this.#peer.db.get)(rootID)
if (!rootMsg) return []
const msgs = []
if (minDepth === 0) {
msgs.push(rootMsg)
const rootMsg = this.#peer.db.get(rootMsgHash)
if (!rootMsg) return
if (minDepth === 0) yield rootMsg
for (const msg of this.#peer.db.msgs()) {
const tangles = msg.metadata.tangles
if (
tangles[rootMsgHash] &&
tangles[rootMsgHash].depth >= minDepth &&
tangles[rootMsgHash].depth <= maxDepth
) {
yield msg
}
}
const tangle = await p(this.#peer.db.getTangle)(rootID)
if (!tangle) return msgs
for (const msg of await tangle.slice()) {
const depth = msg.metadata.tangles[rootID]?.depth ?? 0
if (depth >= minDepth && depth <= maxDepth) {
}
async pruneNewest(rootMsgHash, count) {
const tangle = this.#peer.db.getTangle(rootMsgHash)
const sorted = tangle.topoSort()
if (sorted.length <= count) return
const msgHash = sorted[sorted.length - count]
const { deletables, erasables } = tangle.getDeletablesAndErasables(msgHash)
const del = p(this.#peer.db.del)
const erase = p(this.#peer.db.erase)
for (const msgHash of deletables) {
await del(msgHash)
}
for (const msgHash of erasables) {
await erase(msgHash)
}
}
async commit(rootMsgHash, newMsgs, goal, myWantRange) {
// Filter out contentful newMsgs that are not in my want-range
const [minWant, maxWant] = myWantRange
const validNewMsgs = newMsgs
.filter((msg) => {
if (!msg.content) return true // contentless messages are always valid
const depth = msg.metadata.tangles[rootMsgHash]?.depth ?? 0
if (depth === 0 && FeedV1.getMsgHash(msg) !== rootMsgHash) {
return false // the rootMsg is the only acceptable depth-zero msg
}
return minWant <= depth && depth <= maxWant
})
.sort((a, b) => {
const aDepth = a.metadata.tangles[rootMsgHash]?.depth ?? 0
const bDepth = b.metadata.tangles[rootMsgHash]?.depth ?? 0
return aDepth - bDepth
})
// Simulate adding this whole tangle, and check if it's valid
let err
if ((err = this.#peer.db.validateTangle(rootMsgHash, validNewMsgs))) {
throw err
}
// Add new messages TODO: optimize perf, avoiding await / try / catch
for (const msg of newMsgs) {
try {
await p(this.#peer.db.add)(msg, rootMsgHash)
} catch {}
}
// Prune. Ideally this should be in a garbage collection module
const { type, count } = parseGoal(goal)
if (type === 'newest') return await this.pruneNewest(rootMsgHash, count)
if (type === 'oldest') throw new Error('not implemented') // TODO:
}
/**
* @param {string} rootMsgHash
* @param {Set<string>} msgHashes
* @returns
*/
getTangleSlice(rootMsgHash, msgHashes) {
if (msgHashes.size === 0) return []
const tangle = this.#peer.db.getTangle(rootMsgHash)
const sorted = tangle.topoSort()
let oldestMsgHash = null
for (const msgHash of sorted) {
if (msgHashes.has(msgHash)) {
oldestMsgHash = msgHash
break
}
}
const { erasables } = tangle.getDeletablesAndErasables(oldestMsgHash)
const msgs = []
for (const msgHash of sorted) {
let isErasable = erasables.includes(msgHash)
if (!msgHashes.has(msgHash) && !isErasable) continue
const msg = this.#peer.db.get(msgHash)
if (!msg) continue
if (isErasable) {
msgs.push({ ...msg, content: null })
} else {
msgs.push(msg)
}
}
return msgs
}
/**
* Given the input msgs (or msg IDs), return those that are part of the tangle
* `rootID`, plus dataless msgs part of a trail to the tangle root, including
* the root itself.
*
* @param {string} rootID
* @param {Set<string> | Array<Msg>} msgs
* @returns {Promise<Array<Msg>>}
*/
async getTangleMsgs(rootID, msgs) {
if (Array.isArray(msgs) && msgs.length === 0) return []
if (!Array.isArray(msgs) && msgs.size === 0) return []
const msgIDs = [...msgs].map((m) =>
typeof m === 'string' ? m : MsgV4.getMsgID(m)
)
const tangle = await p(this.#peer.db.getTangle)(rootID)
if (!tangle) return []
return tangle.slice(msgIDs, [])
}
/**
* Erase or delete low-depth msgs from the tangle `rootID`, preserving at
* least `count` high-depth msgs.
*
* @param {string} rootID
* @param {number} count
*/
async pruneNewest(rootID, count) {
/** @type {DBTangle | null} */
const tangle = await p(this.#peer.db.getTangle)(rootID)
if (!tangle) return
const sorted = tangle.topoSort()
if (sorted.length <= count) return
const msgID = sorted[sorted.length - count] // New "oldest dataful msg"
const deletablesErasables = tangle.getDeletablesAndErasables(msgID)
if (!deletablesErasables) return
const { deletables, erasables } = deletablesErasables
const del = p(this.#peer.db.del)
const erase = p(this.#peer.db.erase)
for (const msgID of deletables) {
await del(msgID)
}
for (const msgID of erasables) {
await erase(msgID)
}
}
/**
* Filter out msgs I didn't actually ask for. "Trust but verify". Also sort
* them by depth. Also sorts such that (not-this-tangle) account msgs are
* first.
*
* @param {string} rootID
* @param {Array<Msg>} msgs
* @param {Range} myWantRange
* @returns {Array<Msg>}
*/
filterReceivedMsgs(rootID, msgs, myWantRange) {
const [minWant, maxWant] = myWantRange
const validNewMsgs = msgs
.filter((msg) => {
if (msg.metadata.account === 'self') return true
const depth = msg.metadata.tangles[rootID]?.depth ?? 0
if (depth === 0 && MsgV4.getMsgID(msg) !== rootID) {
return false // the rootMsg is the only acceptable depth-zero msg
}
if (!msg.data) {
return true
} else {
return minWant <= depth
}
})
.sort((a, b) => {
const aAccount = a.metadata.account
const bAccount = b.metadata.account
if (aAccount === 'self' && bAccount !== 'self') return -1
if (aAccount !== 'self' && bAccount === 'self') return 1
const aDepth = a.metadata.tangles[rootID]?.depth ?? 0
const bDepth = b.metadata.tangles[rootID]?.depth ?? 0
return aDepth - bDepth
})
return validNewMsgs
}
/**
* @param {Array<Msg>} msgs
*/
getMsgIDs(msgs) {
return msgs.map((msg) => MsgV4.getMsgID(msg))
}
/**
* Takes the new msgs and adds them to the database. Also performs pruning as
* post-processing.
*
* @param {string} rootID
* @param {Array<Msg>} newMsgs
* @param {Goal} goal
*/
async commit(rootID, newMsgs, goal) {
// TODO: Simulate adding this whole tangle, and check if it's valid
// Add new messages
for (const msg of newMsgs) {
try {
if (msg.metadata.account === 'self') {
await p(this.#peer.db.add)(msg, null /* infer tangleID */)
} else {
await p(this.#peer.db.add)(msg, rootID)
}
} catch (err) {
debug('Commit failed to add msg in db: %o', err)
}
}
if (goal.type === 'newest') {
return await this.pruneNewest(rootID, goal.count)
}
}
}
module.exports = Algorithm

53
lib/goal.js Normal file
View File

@ -0,0 +1,53 @@
/**
* @typedef {'all'} GoalAll
*/
/**
* @typedef {`newest-${number}`} GoalNewest
*/
/**
* @typedef {`oldest-${number}`} GoalOldest
*/
/**
* @typedef {GoalAll|GoalNewest|GoalOldest} Goal
*/
/**
* @typedef {{type: 'all'; count: never}} ParsedAll
*/
/**
* @typedef {{type: 'newest' |'oldest'; count: number}} ParsedLimited
*/
/**
* @typedef {ParsedAll | ParsedLimited} ParsedGoal
*/
/**
* @param {Goal} goal
* @returns {ParsedGoal}
*/
function parseGoal(goal) {
if (goal === 'all') {
return { type: 'all' }
}
const matchN = goal.match(/^newest-(\d+)$/)
if (matchN) {
return { type: 'newest', count: Number(matchN[1]) }
}
const matchO = goal.match(/^oldest-(\d+)$/)
if (matchO) {
return { type: 'oldest', count: Number(matchO[1]) }
}
throw new Error(`Invalid goal: ${goal}`)
}
module.exports = {
parseGoal,
}

View File

@ -1,123 +1,96 @@
// @ts-ignore
const toPull = require('push-stream-to-pull-stream') // @ts-ignore
const getSeverity = require('ssb-network-errors')
const toPull = require('push-stream-to-pull-stream')
const pull = require('pull-stream')
const FeedV1 = require('ppppp-db/feed-v1')
const makeDebug = require('debug')
const getSeverity = require('ssb-network-errors')
const Algorithm = require('./algorithm')
const SyncStream = require('./stream')
/**
* @typedef {ReturnType<import('pzp-db').init>} PZPDB
* @typedef {ReturnType<import('pzp-dict').init>} PZPDict
* @typedef {ReturnType<import('pzp-set').init>} PZPSet
* @typedef {ReturnType<import('pzp-goals').init>} PZPGoals
* @typedef {import('node:events').EventEmitter} Emitter
* @typedef {(cb: (err: Error) => void) => import('pull-stream').Duplex<unknown, unknown>} GetDuplex
* @typedef {{ pubkey: string }} SHSE
* @typedef {import('./goal').Goal} Goal
*/
/**
* @param {Error} err
* @param {string} namespace
* @param {string} methodName
*/
function isMuxrpcMissingError(err, namespace, methodName) {
const jsErrorMessage = `method:${namespace},${methodName} is not in list of allowed methods`
const goErrorMessage = `muxrpc: no such command: ${namespace}.${methodName}`
return err.message === jsErrorMessage || err.message === goErrorMessage
}
/**
* @param {Emitter & {
* db: PZPDB,
* dict: PZPDict,
* set: PZPSet,
* goals: PZPGoals,
* shse: SHSE
* }} peer
* @param {unknown} config
*/
function initSync(peer, config) {
const debug = makeDebug(`pzp:sync`)
const algo = new Algorithm(peer)
let started = false
module.exports = {
name: 'tangleSync',
manifest: {
connect: 'duplex',
setGoal: 'sync',
initiate: 'sync',
},
permissions: {
anonymous: {
allow: ['connect'],
},
},
init(peer, config) {
const debug = makeDebug(`ppppp:tangleSync`)
const goals = new Map()
const algo = new Algorithm(peer)
const streams = /** @type {Array<SyncStream>} */ ([])
/**
* @param {string} remoteId
* @param {boolean} iamClient
*/
function createStream(remoteId, iamClient) {
// prettier-ignore
debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId)
const { shse, db, goals } = peer
const stream = new SyncStream(shse.pubkey, debug, db, goals, algo)
streams.push(stream)
return stream
}
/**
* @param {{ shse: SHSE, sync: { connect: GetDuplex }, once: CallableFunction }} rpc
* @param {boolean} iamClient
*/
function onSyncRPCConnect(rpc, iamClient) {
if (rpc.shse.pubkey === peer.shse.pubkey) return // connecting to myself
if (!iamClient) return
const stream = createStream(rpc.shse.pubkey, true)
const local = toPull.duplex(stream)
let abort = /** @type {CallableFunction | null} */ (null)
const remote = rpc.sync.connect((networkError) => {
if (networkError && getSeverity(networkError) >= 3) {
if (isMuxrpcMissingError(networkError, 'sync', 'connect')) {
debug('peer %s does not support sync', rpc.shse.pubkey)
// } else if (isReconnectedError(networkError)) { // TODO: bring back
// Do nothing, this is a harmless error
} else {
console.error(`rpc.sync.connect exception:`, networkError)
}
abort?.(true, () => {})
}
})
abort = pull(local, remote, local)
rpc.once('closed', () => {
abort?.(true, () => {})
})
if (started) stream.initiate()
}
peer.on('rpc:connect', onSyncRPCConnect)
/**
* @this {{shse: {pubkey: string}}}
*/
function connect() {
return toPull.duplex(createStream(this.shse.pubkey, false))
}
function start() {
if (started) return
started = true
for (const stream of streams) {
stream.initiate()
const streams = []
function createStream(remoteId, iamClient) {
// prettier-ignore
debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId)
const stream = new SyncStream(peer.id, debug, goals, algo)
streams.push(stream)
return stream
}
}
return {
connect,
start,
}
}
peer.on('rpc:connect', function onSyncRPCConnect(rpc, iamClient) {
if (rpc.id === peer.id) return // local client connecting to local server
if (!iamClient) return
const local = toPull.duplex(createStream(rpc.id, true))
exports.name = 'sync'
exports.needs = ['db', 'dict', 'set', 'goals', 'shse']
exports.manifest = {
connect: 'duplex',
initiate: 'sync',
}
exports.init = initSync
exports.permissions = {
anonymous: {
allow: ['connect'],
const remote = rpc.tangleSync.connect((networkError) => {
if (networkError && getSeverity(networkError) >= 3) {
if (isMuxrpcMissingError(networkError, 'tangleSync', 'connect')) {
console.warn(`peer ${rpc.id} does not support sync connect`)
// } else if (isReconnectedError(networkError)) { // TODO: bring back
// Do nothing, this is a harmless error
} else {
console.error(`rpc.tangleSync.connect exception:`, networkError)
}
}
})
pull(local, remote, local)
})
function connect() {
// `this` refers to the remote peer who called this muxrpc API
return toPull.duplex(createStream(this.id, false))
}
/**
* @param {string} tangleId
* @param {Goal} goal
*/
function setGoal(tangleId, goal = 'all') {
goals.set(tangleId, goal)
}
function setFeedGoal(author, type, goal = 'all') {
const tangleId = FeedV1.getFeedRootHash(author, type)
goals.set(tangleId, goal)
}
function initiate() {
for (const stream of streams) {
stream.initiate()
}
}
return {
connect,
setGoal,
setFeedGoal,
initiate,
}
},
}

View File

@ -1,14 +1,3 @@
/**
* @param {any} range
* @return {range is Range}
*/
function isRange(range) {
if (!Array.isArray(range)) return false
if (range.length !== 2) return false
if (!Number.isInteger(range[0]) || !Number.isInteger(range[1])) return false
return true
}
/**
* @typedef {[number, number]} Range
*/
@ -34,11 +23,7 @@ function estimateMsgCount(range) {
else return estimate
}
const EMPTY_RANGE = /** @type {Range} */ ([1, 0])
module.exports = {
isRange,
isEmptyRange,
estimateMsgCount,
EMPTY_RANGE,
}

View File

@ -1,236 +1,150 @@
// @ts-ignore
const Pipeable = require('push-stream/pipeable')
const p = require('promisify-4loc')
const { isRange, isEmptyRange } = require('./range')
const { isMsgId, isBloom, isMsgIds, isMsgs } = require('./util')
const { isEmptyRange } = require('./range')
/**
* @typedef {ReturnType<import('pzp-goals').init>} PZPGoals
* @typedef {ReturnType<import('pzp-db').init>} PZPDB
* @typedef {import('pzp-db').RecPresent} Rec
* @typedef {import('pzp-db/msg-v4').Msg} Msg
* @typedef {import('./range').Range} Range
* @typedef {import('./algorithm')} Algorithm
* @typedef {import('pzp-goals').Goal} Goal
* @typedef {string} MsgID
* @typedef {{id: string}} WithId
* @typedef {WithId & {phase: 0, payload?: undefined}} Data0
* @typedef {WithId & {phase: 1, payload: Range}} Data1
* @typedef {WithId & {phase: 2, payload: { haveRange: Range, wantRange: Range }}} Data2
* @typedef {WithId & {phase: 3, payload: { wantRange: Range, bloom: JSON }}} Data3
* @typedef {WithId & {phase: 4 | 5 | 6 | 7, payload: { msgIDs: Array<MsgID>, bloom: JSON }}} Data4567
* @typedef {WithId & {phase: 8, payload: { msgs: Array<Msg>, bloom: JSON }}} Data8
* @typedef {WithId & {phase: 9, payload: Array<Msg>}} Data9
* @typedef {Data0 | Data1 | Data2 | Data3 | Data4567 | Data8 | Data9} Data
*/
/**
* @template T
* @typedef {[T] extends [void] ?
* (...args: [Error] | []) => void :
* (...args: [Error] | [null, T]) => void
* } CB
* @typedef {import('./goal').Goal} Goal
*/
class SyncStream extends Pipeable {
#myId
/** @type {Algorithm} */
#algo
/** @type {CallableFunction} */
#debug
/** @type {Set<string>} Set of tangleId */
#algo
/** Set of tangleId
* @type {Set<string>} */
#requested
/** @type {PZPDB} */
#db
/** @type {PZPGoals} */
/** tangleId => goal
* @type {Map<string, Goal>} */
#goals
/**
* tangleId => have-range by local peer
* @type {Map<string, [number, number]>}
*/
#localHave
/**
* tangleId => want-range by local peer
* @type {Map<string, [number, number]>}
*/
#localWant
/**
* tangleId => have-range by remote peer
* @type {Map<string, [number, number]>}
*/
#remoteHave
/**
* tangleId => want-range by remote peer
* @type {Map<string, [number, number]>}
*/
#remoteWant
/**
* tangleId => Set of msgIDs
* @type {Map<string, Set<string>>}
*/
#receivableMsgs
/**
* tangleId => Set of msgIDs
* @type {Map<string, Set<string>>}
*/
#sendableMsgs
/** @type {Set<string>} */
#realtimeSyncing
/**
* @param {string} localId
* @param {CallableFunction} debug
* @param {PZPDB} db
* @param {PZPGoals} goals
* @param {Algorithm} algo
*/
constructor(localId, debug, db, goals, algo) {
constructor(localId, debug, goals, algo) {
super()
this.paused = false // TODO: should we start as paused=true?
this.ended = false
/** @type {any} */
this.source = this.sink = null
this.#myId = localId.slice(0, 6)
this.#debug = debug
this.#db = db
this.#goals = goals
this.#algo = algo
this.#requested = new Set()
this.#realtimeSyncing = new Set()
this.#localHave = new Map()
this.#localWant = new Map()
this.#remoteHave = new Map()
this.#remoteWant = new Map()
this.#receivableMsgs = new Map()
this.#sendableMsgs = new Map()
// Setup real-time syncing
this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => {
if (!this.sink || this.sink.paused) return
const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles))
this.resume()
for (const id of tangleIDs) {
if (this.#realtimeSyncing.has(id)) {
if (this.#receivableMsgs.has(msgID)) continue
if (this.#receivableMsgs.get(id)?.has(msgID)) continue
if (this.#sendableMsgs.has(msgID)) continue
if (this.#sendableMsgs.get(id)?.has(msgID)) continue
this.sink.write({ id, phase: 9, payload: [msg] })
// prettier-ignore
this.#debug('%s Stream OUTr: sent msg %s in %s', this.#myId, msgID, id)
return
}
}
})
}
initiate() {
for (const goal of this.#goals.list()) {
this.#requested.add(goal.id)
for (const id of this.#goals.keys()) {
this.#requested.add(id)
}
this.resume()
this.#goals.watch((/** @type {any} */ goal) => {
if (!this.#requested.has(goal.id) && goal.type !== 'none') {
this.#requested.add(goal.id)
this.resume()
}
})
}
#canSend() {
return this.sink && !this.sink.paused && !this.ended
}
/**
* @param {string} id
* @param {Array<MsgID>} msgIDs
*/
#updateSendableMsgs(id, msgIDs) {
#updateSendableMsgs(id, msgs) {
const set = this.#sendableMsgs.get(id) ?? new Set()
for (const msgID of msgIDs) {
set.add(msgID)
for (const msg of msgs) {
set.add(msg)
}
this.#sendableMsgs.set(id, set)
}
/**
* @param {string} id
* @param {Array<string>} msgIDs
*/
#updateReceivableMsgs(id, msgIDs) {
#updateReceivableMsgs(id, msgs) {
const set = this.#receivableMsgs.get(id) ?? new Set()
for (const msgID of msgIDs) {
set.add(msgID)
for (const msg of msgs) {
set.add(msg)
}
this.#receivableMsgs.set(id, set)
}
/**
* @param {string} id
*/
async #sendLocalHave(id) {
const localHaveRange = await this.#algo.haveRange(id)
#sendLocalHave(id) {
const localHaveRange = this.#algo.haveRange(id)
this.#localHave.set(id, localHaveRange)
this.sink.write({ id, phase: 1, payload: localHaveRange })
// prettier-ignore
this.#debug('%s Stream OUT1: sent local have-range %o for %s', this.#myId, localHaveRange, id)
this.#debug('%s Stream OUT1: send local have-range %o for %s', this.#myId, localHaveRange, id)
this.sink.write({ id, phase: 1, payload: localHaveRange })
}
/**
* @param {string} id
* @param {Range} remoteHaveRange
*/
async #sendLocalHaveAndWant(id, remoteHaveRange) {
#sendLocalHaveAndWant(id, remoteHaveRange) {
// prettier-ignore
this.#debug('%s Stream IN1: got remote have-range %o for %s', this.#myId, remoteHaveRange, id)
this.#remoteHave.set(id, remoteHaveRange)
const goal = this.#goals.get(id)
const haveRange = await this.#algo.haveRange(id)
const wantRange = await this.#algo.wantRange(haveRange, remoteHaveRange, goal)
const haveRange = this.#algo.haveRange(id)
const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal)
this.#localHave.set(id, haveRange)
this.#localWant.set(id, wantRange)
this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } })
// prettier-ignore
this.#debug('%s Stream OUT2: sent local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id)
this.#debug('%s Stream OUT2: send local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id)
this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } })
}
/**
* @param {string} id
* @param {Range} remoteHaveRange
* @param {Range} remoteWantRange
*/
async #sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) {
#sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) {
// prettier-ignore
this.#debug('%s Stream IN2: got remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id)
this.#remoteHave.set(id, remoteHaveRange)
this.#remoteWant.set(id, remoteWantRange)
const goal = this.#goals.get(id)
const haveRange = this.#localHave.get(id) ?? [-1, -1]
const localWant = await this.#algo.wantRange(haveRange, remoteHaveRange, goal)
this.#localWant.set(id, localWant)
const localBloom0 = await this.#algo.bloomFor(id, 0, localWant)
const haveRange = this.#localHave.get(id)
const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal)
this.#localWant.set(id, wantRange)
const localBloom0 = this.#algo.bloomFor(id, 0, wantRange)
this.sink.write({
id,
phase: 3,
payload: { bloom: localBloom0, wantRange: localWant },
payload: { bloom: localBloom0, wantRange },
})
// prettier-ignore
this.#debug('%s Stream OUT3: sent local want-range %o and bloom round 0 for %s', this.#myId, localWant, id)
this.#debug('%s Stream OUT3: send local want-range %o and bloom round 0 for %s', this.#myId, wantRange, id)
}
/**
* @param {string} id
* @param {Range} remoteWantRange
* @param {JSON} remoteBloom
*/
async #sendInitBloomRes(id, remoteWantRange, remoteBloom) {
#sendInitBloomRes(id, remoteWantRange, remoteBloom) {
// prettier-ignore
this.#debug('%s Stream IN3: got remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id)
this.#remoteWant.set(id, remoteWantRange)
const msgIDsForThem = await this.#algo.getMsgsMissing(
const msgIDsForThem = this.#algo.msgsMissing(
id,
0,
remoteWantRange,
@ -238,30 +152,22 @@ class SyncStream extends Pipeable {
)
this.#updateSendableMsgs(id, msgIDsForThem)
const localWantRange = this.#localWant.get(id)
if (!localWantRange) throw new Error(`Local want-range not set for ${id}`)
const localBloom = await this.#algo.bloomFor(id, 0, localWantRange)
const localBloom = this.#algo.bloomFor(id, 0, localWantRange)
this.sink.write({
id,
phase: 4,
payload: { bloom: localBloom, msgIDs: msgIDsForThem },
})
// prettier-ignore
this.#debug('%s Stream OUT4: sent bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem)
this.#debug('%s Stream OUT4: send bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem)
}
/**
* @param {string} id
* @param {number} phase
* @param {number} round
* @param {JSON} remoteBloom
* @param {Array<MsgID>} msgIDsForMe
*/
async #sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) {
#sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) {
// prettier-ignore
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round-1, id, msgIDsForMe)
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
const remoteWantRange = this.#remoteWant.get(id)
this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = await this.#algo.getMsgsMissing(
const msgIDsForThem = this.#algo.msgsMissing(
id,
round - 1,
remoteWantRange,
@ -270,30 +176,22 @@ class SyncStream extends Pipeable {
this.#updateSendableMsgs(id, msgIDsForThem)
const extras = this.#receivableMsgs.get(id)
const localWantRange = this.#localWant.get(id)
if (!localWantRange) throw new Error(`Local want-range not set for ${id}`)
const localBloom = await this.#algo.bloomFor(id, round, localWantRange, extras)
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras)
this.sink.write({
id,
phase,
payload: { bloom: localBloom, msgIDs: msgIDsForThem },
})
// prettier-ignore
this.#debug('%s Stream OUT%s: sent bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem)
this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem)
}
/**
* @param {string} id
* @param {number} phase
* @param {number} round
* @param {JSON} remoteBloom
* @param {Array<MsgID>} msgIDsForMe
*/
async #sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) {
#sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) {
// prettier-ignore
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round, id, msgIDsForMe)
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
const remoteWantRange = this.#remoteWant.get(id)
this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = await this.#algo.getMsgsMissing(
const msgIDsForThem = this.#algo.msgsMissing(
id,
round,
remoteWantRange,
@ -302,29 +200,22 @@ class SyncStream extends Pipeable {
this.#updateSendableMsgs(id, msgIDsForThem)
const extras = this.#receivableMsgs.get(id)
const localWantRange = this.#localWant.get(id)
if (!localWantRange) throw new Error(`Local want-range not set for ${id}`)
const localBloom = await this.#algo.bloomFor(id, round, localWantRange, extras)
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras)
this.sink.write({
id,
phase,
payload: { bloom: localBloom, msgIDs: msgIDsForThem },
})
// prettier-ignore
this.#debug('%s Stream OUT%s: sent bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem)
this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem)
}
/**
* @param {string} id
* @param {number} round
* @param {JSON} remoteBloom
* @param {Array<MsgID>} msgIDsForMe
*/
async #sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) {
#sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) {
// prettier-ignore
this.#debug('%s Stream IN7: got bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe)
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
const remoteWantRange = this.#remoteWant.get(id)
this.#updateReceivableMsgs(id, msgIDsForMe)
const msgIDsForThem = await this.#algo.getMsgsMissing(
const msgIDsForThem = this.#algo.msgsMissing(
id,
round,
remoteWantRange,
@ -332,36 +223,24 @@ class SyncStream extends Pipeable {
)
this.#updateSendableMsgs(id, msgIDsForThem)
const msgIDs = this.#sendableMsgs.get(id) ?? new Set()
const tangleMsgs = await this.#algo.getTangleMsgs(id, msgIDs)
const accountMsgs = await this.#algo.filterAndFetchAccountMsgs(msgIDs)
const msgs = accountMsgs.concat(tangleMsgs)
const msgs = this.#algo.getTangleSlice(id, msgIDs)
const extras = this.#receivableMsgs.get(id)
const localWantRange = this.#localWant.get(id)
if (!localWantRange) throw new Error(`Local want-range not set for ${id}`)
const localBloom = await this.#algo.bloomFor(id, round, localWantRange, extras)
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras)
this.sink.write({
id,
phase: 8,
payload: { msgs, bloom: localBloom },
})
// prettier-ignore
this.#debug('%s Stream OUT8: sent bloom round %s plus %s msgs in %s', this.#myId, round, msgs.length, id)
if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) {
this.#realtimeSyncing.add(id)
}
this.#debug('%s Stream OUT8: send bloom round %s plus %s msgs in %s', this.#myId, round, msgs.length, id)
}
/**
* @param {string} id
* @param {number} round
* @param {JSON} remoteBloom
* @param {Array<Msg>} msgsForMe
*/
async #sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) {
#sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) {
// prettier-ignore
this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id)
const remoteWantRange = this.#remoteWant.get(id) ?? [0, 0]
const msgIDsForThem = await this.#algo.getMsgsMissing(
const remoteWantRange = this.#remoteWant.get(id)
const msgIDsForThem = this.#algo.msgsMissing(
id,
round,
remoteWantRange,
@ -369,80 +248,62 @@ class SyncStream extends Pipeable {
)
this.#updateSendableMsgs(id, msgIDsForThem)
const msgIDs = this.#sendableMsgs.get(id) ?? new Set()
const tangleMsgs = await this.#algo.getTangleMsgs(id, msgIDs)
const accountMsgs = await this.#algo.filterAndFetchAccountMsgs(msgIDs)
const msgs = accountMsgs.concat(tangleMsgs)
this.sink.write({ id, phase: 9, payload: msgs })
const msgs = this.#algo.getTangleSlice(id, msgIDs)
// prettier-ignore
this.#debug('%s Stream OUT9: sent %s msgs in %s', this.#myId, msgs.length, id)
if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) {
this.#realtimeSyncing.add(id)
}
this.#consumeMissingMsgs(id, msgsForMe)
}
this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id)
this.sink.write({ id, phase: 9, payload: msgs })
/**
* @param {string} id
* @param {Array<Msg>} msgsForMe
* @returns
*/
#consumeMissingMsgs(id, msgsForMe) {
const goal = this.#goals.get(id)
const localWant = this.#localWant.get(id)
this.#requested.delete(id)
this.#localHave.delete(id)
this.#localWant.delete(id)
this.#remoteHave.delete(id)
this.#remoteWant.delete(id)
this.#receivableMsgs.delete(id)
this.#sendableMsgs.delete(id)
if (msgsForMe.length === 0) return
const goal = this.#goals.get(id)
if (!goal) {
this.#debug('%s Stream exception: no goal found for %s', this.#myId, id)
return
}
const localWantRange = this.#localWant.get(id)
if (!localWantRange) {
// prettier-ignore
this.#debug('%s Stream exception: local want-range not set for %s', this.#myId, id)
return
}
const validMsgs = this.#algo.filterReceivedMsgs(
id,
msgsForMe,
localWantRange
)
const validMsgIDs = this.#algo.getMsgIDs(validMsgs)
this.#updateReceivableMsgs(id, validMsgIDs)
try {
this.#algo.commit(id, validMsgs, goal)
this.#algo.commit(id, msgsForMe, goal, localWant)
} catch (err) {
// prettier-ignore
this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err)
}
}
/**
* @param {string} id
* @param {Range} remoteWantRange
*/
async #sendMsgsInRemoteWant(id, remoteWantRange) {
const msgs = []
const rangeMsgs = await this.#algo.getMsgsInRange(id, remoteWantRange)
const tangleMsgs = await this.#algo.getTangleMsgs(id, rangeMsgs)
const accountMsgs = await this.#algo.getAccountMsgsFor(tangleMsgs)
for (const msg of accountMsgs) msgs.push(msg)
for (const msg of tangleMsgs) msgs.push(msg)
const msgIDs = this.#algo.getMsgIDs(msgs)
this.#updateSendableMsgs(id, msgIDs)
this.sink.write({ id, phase: 9, payload: msgs })
#consumeMissingMsgs(id, msgsForMe) {
// prettier-ignore
this.#debug('%s Stream OUT9: sent %s msgs in %s', this.#myId, msgs.length, id)
if (!this.#realtimeSyncing.has(id) && !isEmptyRange(remoteWantRange)) {
this.#realtimeSyncing.add(id)
this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, msgsForMe.length, id)
const goal = this.#goals.get(id)
const localWant = this.#localWant.get(id)
this.#requested.delete(id)
this.#localHave.delete(id)
this.#localWant.delete(id)
this.#remoteHave.delete(id)
this.#remoteWant.delete(id)
this.#receivableMsgs.delete(id)
this.#sendableMsgs.delete(id)
if (msgsForMe.length === 0) return
try {
this.#algo.commit(id, msgsForMe, goal, localWant)
} catch (err) {
// prettier-ignore
this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err)
}
}
// source method
#sendMsgsInRemoteWant(id, remoteWantRange) {
const msgs = []
for (const msg of this.#algo.yieldMsgsIn(id, remoteWantRange)) {
msgs.push(msg)
}
// prettier-ignore
this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id)
this.sink.write({ id, phase: 9, payload: msgs })
}
// as a source
resume() {
if (!this.sink || this.sink.paused) return
@ -452,40 +313,19 @@ class SyncStream extends Pipeable {
}
}
/**
* sink method
* @param {Data} data
*/
// as a sink
write(data) {
// prettier-ignore
if (!data) return this.#debug('Invalid data from remote peer: missing data')
// prettier-ignore
if (typeof data !== 'object') return this.#debug('Invalid data from remote peer: not an object')
// prettier-ignore
if (Array.isArray(data)) return this.#debug('Invalid data from remote peer: is an array')
const { id, phase, payload } = data
// prettier-ignore
if (typeof phase !== 'number') return this.#debug("Invalid data from remote peer: phase isn't a number")
// prettier-ignore
if (!isMsgId(id)) return this.#debug('Invalid data from remote peer: id is not a valid msg id')
// prettier-ignore
if (phase !== 0 && !payload) return this.#debug('Invalid data from remote peer: payload is missing')
switch (phase) {
case 0: {
return this.#sendLocalHave(id)
}
case 1: {
// prettier-ignore
if (!isRange(payload)) return this.#debug('Invalid data from remote peer: payload is not a range in phase 1')
return this.#sendLocalHaveAndWant(id, payload)
}
case 2: {
const { haveRange, wantRange } = payload
// prettier-ignore
if (!isRange(haveRange) || !isRange(wantRange)) return this.#debug('Invalid data from remote peer: haveRange or wantRange is not a range in phase 2')
if (isEmptyRange(haveRange)) {
// prettier-ignore
this.#debug('%s Stream IN2: received remote have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id)
@ -496,15 +336,10 @@ class SyncStream extends Pipeable {
}
case 3: {
const { wantRange, bloom } = payload
// prettier-ignore
if (!isRange(wantRange)) return this.#debug('Invalid data from remote peer: wantRange is not a range in phase 3')
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 3')
const haveRange = this.#remoteHave.get(id)
if (haveRange && isEmptyRange(haveRange)) {
// prettier-ignore
this.#debug('%s Stream IN3: received remote want-range %o and remembers empty have-range %o for %s', this.#myId, wantRange, haveRange, id)
this.#debug('%s Stream IN3: received remote want-range want-range %o and remember empty have-range %o for %s', this.#myId, wantRange, haveRange, id)
return this.#sendMsgsInRemoteWant(id, wantRange)
} else {
return this.#sendInitBloomRes(id, wantRange, bloom)
@ -512,78 +347,40 @@ class SyncStream extends Pipeable {
}
case 4: {
const { bloom, msgIDs } = payload
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 4')
// prettier-ignore
if (!isMsgIds(msgIDs)) return this.#debug('Invalid data from remote peer: msgIDs is not an array of msg ids in phase 4')
return this.#sendBloomReq(id, phase + 1, 1, bloom, msgIDs)
}
case 5: {
const { bloom, msgIDs } = payload
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 5')
// prettier-ignore
if (!isMsgIds(msgIDs)) return this.#debug('Invalid data from remote peer: msgIDs is not an array of msg ids in phase 5')
return this.#sendBloomRes(id, phase + 1, 1, bloom, msgIDs)
}
case 6: {
const { bloom, msgIDs } = payload
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 6')
// prettier-ignore
if (!isMsgIds(msgIDs)) return this.#debug('Invalid data from remote peer: msgIDs is not an array of msg ids in phase 6')
return this.#sendBloomReq(id, phase + 1, 2, bloom, msgIDs)
}
case 7: {
const { bloom, msgIDs } = payload
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 7')
// prettier-ignore
if (!isMsgIds(msgIDs)) return this.#debug('Invalid data from remote peer: msgIDs is not an array of msg ids in phase 7')
return this.#sendMissingMsgsReq(id, 2, bloom, msgIDs)
}
case 8: {
const { bloom, msgs } = payload
// prettier-ignore
if (!isBloom(bloom)) return this.#debug('Invalid data from remote peer: bloom is not a bloom in phase 8')
// prettier-ignore
if (!isMsgs(msgs)) return this.#debug('Invalid data from remote peer: msgs is not an array of msgs in phase 8')
return this.#sendMissingMsgsRes(id, 2, bloom, msgs)
}
case 9: {
// prettier-ignore
if (!isMsgs(payload)) return this.#debug('Invalid data from remote peer: payload is not an array of msgs in phase 9')
// prettier-ignore
this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, payload.length, id)
return this.#consumeMissingMsgs(id, payload)
}
default: {
// prettier-ignore
return this.#debug('Invalid data from remote peer: phase is an invalid number')
}
}
this.#debug('Stream IN: unknown %o', data)
}
/**
* source method
* @param {Error} err
*/
// as a source
abort(err) {
this.ended = true
if (this.source && !this.source.ended) this.source.abort(err)
if (this.sink && !this.sink.ended) this.sink.end(err)
}
/**
* sink method
* @param {Error} err
*/
// as a sink
end(err) {
this.ended = true
if (this.source && !this.source.ended) this.source.abort(err)

View File

@ -1,94 +0,0 @@
const bs58 = require('bs58')
/**
* @typedef {import('./range').Range} Range
* @typedef {import('pzp-db/msg-v4').Msg} Msg
*/
/**
* @param {any} msgId
* @return {msgId is string}
*/
function isMsgId(msgId) {
try {
const d = bs58.decode(msgId)
return d.length === 32
} catch {
return false
}
}
/**
* @param {any} msgIds
* @return {msgIds is Array<string>}
*/
function isMsgIds(msgIds) {
if (!Array.isArray(msgIds)) return false
return msgIds.every(isMsgId)
}
/**
* @param {any} msgs
* @return {msgs is Array<Msg>}
*/
function isMsgs(msgs) {
if (!Array.isArray(msgs)) return false
return msgs.every(isMsg)
}
/**
* @param {any} bloom
* @return {bloom is string}
*/
function isBloom(bloom) {
// TODO: validate when blooming is stabilized
return !!bloom
}
/**
* @param {any} msg
* @returns {msg is Msg}
*/
function isMsg(msg) {
if (!msg || typeof msg !== 'object') {
return false
}
if (!('data' in msg)) {
return false
}
if (!msg.metadata || typeof msg.metadata !== 'object') {
return false
}
if (!('dataHash' in msg.metadata)) {
return false
}
if (!('dataSize' in msg.metadata)) {
return false
}
if (!('account' in msg.metadata)) {
return false
}
if (!('accountTips' in msg.metadata)) {
return false
}
if (!('tangles' in msg.metadata)) {
return false
}
if (!('domain' in msg.metadata)) {
return false
}
if (msg.metadata.v !== 4) {
return false
}
if (typeof msg.sig !== 'string') {
return false
}
return true
}
module.exports = {
isMsgId,
isMsgIds,
isMsgs,
isBloom,
isMsg,
}

View File

@ -1,17 +1,18 @@
{
"name": "pzp-sync",
"version": "1.0.4",
"description": "PZP replication using Kleppmann's hash graph sync",
"name": "ppppp-tangle-sync",
"version": "1.0.0",
"description": "PPPPP replication using Kleppmann's hash graph sync",
"author": "Andre Staltz <contact@staltz.com>",
"license": "MIT",
"homepage": "https://codeberg.org/pzp/pzp-sync",
"homepage": "https://github.com/staltz/ppppp-tangle-sync",
"repository": {
"type": "git",
"url": "git@codeberg.org:pzp/pzp-sync.git"
"url": "git@github.com:staltz/ppppp-tangle-sync.git"
},
"main": "index.js",
"files": [
"lib/**"
"*.js",
"lib/*.js"
],
"exports": {
".": {
@ -24,38 +25,31 @@
},
"dependencies": {
"bloom-filters": "^3.0.0",
"bs58": "^5.0.0",
"debug": "^4.3.4",
"promisify-4loc": "^1.0.0",
"multicb": "^1.2.2",
"pull-stream": "^3.7.0",
"push-stream": "^11.2.0",
"push-stream-to-pull-stream": "^1.0.5",
"ssb-network-errors": "^1.0.1"
},
"devDependencies": {
"@types/debug": "^4.1.9",
"@types/pull-stream": "3.6.3",
"@types/node": "16.x",
"bs58": "^5.0.0",
"c8": "7",
"pzp-caps": "^1.0.0",
"pzp-db": "^1.0.4",
"pzp-dict": "^1.0.0",
"pzp-goals": "^1.0.0",
"pzp-keypair": "^1.0.0",
"pzp-set": "^1.0.0",
"ppppp-db": "github:staltz/ppppp-db",
"prettier": "^2.6.2",
"pretty-quick": "^3.1.3",
"rimraf": "^4.4.0",
"secret-stack": "~8.1.0",
"secret-handshake-ext": "0.0.11",
"secret-stack": "^6.4.1",
"ssb-box": "^1.0.1",
"typescript": "^5.1.3"
"ssb-caps": "^1.1.0",
"ssb-classic": "^1.1.0",
"ssb-keys": "^8.5.0",
"ssb-uri2": "^2.4.1",
"tap-arc": "^0.3.5",
"tape": "^5.6.3"
},
"scripts": {
"clean-check": "tsc --build --clean",
"prepublishOnly": "npm run clean-check && tsc --build",
"postpublish": "npm run clean-check",
"test": "npm run clean-check && node --test",
"test": "tape test/*.js | tap-arc --bail",
"format-code": "prettier --write \"(lib|test)/**/*.js\"",
"format-code-staged": "pretty-quick --staged --pattern \"(lib|test)/**/*.js\"",
"coverage": "c8 --reporter=lcov npm run test"

View File

@ -1,11 +1,7 @@
For each given tangle, peers exchange ranges (tuples `[minDepth, maxDepth]`)
"haveRange" and "wantRange". Then each peer creates a bloom filter representing
the msgs they currently have in their wantRange, and these such bloom filters.
Based on the remote peer's bloom filter, they exchange msgs that appear to be
missing. The bloom filter is a representation of msgs I already have in my want-range,
so you know you can (probably?) skip sending them to me. The "probably?" uncertainty is reduced by doing several rounds of such exchange. In the end, each peer knows with high certainty which msgs the other peer is missing in their declared want-range, and thus exchange such msgs.
The bloom filter is a representation of msgs I already have in my want-range,
so you know you can (probably?) skip sending them to me.
In the process, associated account msgs are included even though the tangle being replicated might not be an account tangle. This is because validation of a received tangle msg may require the account msgs.
The "probably?" uncertainty is reduced by doing several rounds.
```mermaid
@ -13,10 +9,8 @@ sequenceDiagram
participant A as Alice
participant B as Bob
note over A: I want to sync tangle<br/>with ID "T" and goal aliceG
note over B: I want to sync tangle<br/>with ID "T" and goal bobG
note over A: aliceHave := getHaveRange(T)
A->>B: Phase 1: Send T and aliceHave
note over A: I want to sync tangle with ID "T"
A->>B: 1: Send local have-range for T
%% opt Alice's have-range is empty
%% B->>A: 2: Send local have-range and (empty) want-range for ID
@ -25,106 +19,30 @@ A->>B: Phase 1: Send T and aliceHave
%% note over A: done
%% end
Note over B: bobHave := getHaveRange(T)
Note over B: bobWant := getWantRange(bobHave, aliceHave, bobG)
B->>A: Phase 2: Send T, bobHave and bobWant
Note over B: Calculate local want-range based on<br/>local have-range and remote have-range
B->>A: 2: Send local have-range and want-range for T
%% opt Bob's have-range is empty
%% A->>B: All msgs in remote want-range
%% note over B: done
%% end
Note over A: aliceWant := getWantRange(aliceHave, bobHave, aliceG)
Note over A: aliceBF0 := bloomFor(T, 0, aliceWant)
A->>B: Phase 3: Send T, aliceWant and aliceBF0
Note over B: aliceMiss0 := msgsMissing(T, 0, aliceWant, aliceBF0)
Note over B: bobBF0 := bloomFor(T, 0, bobWant)
B->>A: Phase 4: Send T, bobBF0 and aliceMiss0
Note over A: bobMiss0 := msgsMissing(T, 0, bobWant, bobBF0)
Note over A: aliceBF1 := bloomFor(T, 1, aliceWant, aliceMiss0)
A->>B: Phase 5: Send T, aliceBF1 and bobMiss0
Note over B: aliceMiss1 := msgsMissing(T, 1, aliceWant, aliceBF1)
Note over B: bobBF1 := bloomFor(T, 1, bobWant, bobMiss0)
B->>A: Phase 6: Send T, bobBF1 and aliceMiss1
Note over A: bobMiss1 := msgsMissing(T, 1, bobWant, bobBF1)
Note over A: aliceBF2 := bloomFor(T, 2, aliceWant, aliceMiss0 + aliceMiss1)
A->>B: Phase 7: Send T, aliceBF2 and bobMiss1
Note over B: aliceMiss2 := msgsMissing(T, 2, aliceWant, aliceBF2)
Note over B: aliceMiss := aliceMiss0 + aliceMiss1 + aliceMiss2
Note over B: aliceMsgs := tangleSlice(T, aliceMiss)
Note over B: bobBF2 := bloomFor(T, 2, bobWant, bobMiss0 + bobMiss1)
B->>A: Phase 8: Send T, bobBF2 and aliceMsgs
Note over A: commit(aliceMsgs)
Note over A: bobMiss2 := msgsMissing(T, 2, bobWant, bobBF2)
Note over A: bobMiss := bobMiss0 + bobMiss1 + bobMiss2
Note over A: bobMsgs := tangleSlice(T, bobMiss)
Note over A: msgs := bobMsgs + associatedAccountMsgs(bobMsgs)
A->>B: Phase 9: Send T and msgs
Note over B: commit(msgs)
```
Methods:
```
/**
* Determines the range of depths of msgs I have in the tangle
*/
getHaveRange(tangleID) -> [minDepth, maxDepth]
```
```
/**
* Determines the range of depths of (new) msgs I want from the tangle
*/
getWantRange(localHaveRange, remoteHaveRange, goal) -> [minDepth, maxDepth]
```
```
/**
* For each `msg` in `msgs`, pick the set of msgs from the tangle `msg.metadata.account` (up to `msg.metadata.accountTips`), then combine together all these subsets.
* Returns all such account msgs.
*/
associatedAccountMsgs(msgs)
```
```
/**
* Creates a serialized bloom filter containing the identifiers `${round}${msgID}` for:
* - Each msg in the tangle `tangleID` within depth `range` (inclusive)
* - Each msg in associatedAccountMsgs(tangle msgs above)
* - Each "ghost" msg ID for this tangle
* - Each "extra" msg ID from `extraMsgIDs`
*/
bloomFor(tangleId, round, range, extraMsgIDs) -> Bloom
```
```
/**
* Returns the msg IDs in the tangle `tangleID` which satisfy:
* - `msg.metadata.tangles[tangleID].depth` within `range` (inclusive)
* - `${round}${msgID}` not in `bloom`
* Plus msg IDs of associatedAccountMsgs(tangle msgs above)
*/
msgsMissing(tangleID, round, range, bloom) -> Array<MsgID>
```
```
/**
* Identifies the lowest depth msg in `msgID` as "lowest" and then returns an
* Array of msgs with:
* - `lowest`
* - msgs posterior to `lowest`
* - trail from `lowest` to the root
* The Array is topologically sorted.
*/
tangleSlice(tangleID, msgIDs) -> Array<Msg>
```
```
/**
* Receives an Array of PZP msgs, validates and persists each in the database.
*/
commit(msgs) -> void
Note over A: Calculate BF over all<br />msgs in my want-range
A->>B: 3: Send local want-range and BF for round 0
Note over B: Read BF to know which<br />msgs they are (maybe) missing
Note over B: Calculate BF over all<br />msgs in my want-range
B->>A: 4: Send BF for round 0 and A's round 0 missing msg IDs
Note over A: ...
A->>B: 5: Send BF for round 1 and B's missing round 0 msg IDs
Note over B: ...
B->>A: 6: Send BF for round 1 and A' missing round 1 msg IDs
Note over A: ...
A->>B: 7: Send BF for round 2 and B's missing round 2 msg IDs
Note over B: ...
B->>A: 8: Send BF for round 2 and A's missing msgs
Note over A: Commit received msgs
A->>B: 9: Send B's missing msgs
Note over B: Commit received msgs
```
Peers exchange

View File

@ -1,79 +0,0 @@
const test = require('node:test')
const assert = require('node:assert')
const p = require('util').promisify
const Keypair = require('pzp-keypair')
const { createPeer } = require('./util')
const aliceKeypair = Keypair.generate('ed25519', 'alice')
const bobKeys = Keypair.generate('ed25519', 'bob')
async function getAccount(iter) {
const ary = []
for await (const it of iter) {
ary.push(it)
}
return ary
.filter((m) => m.metadata.account === 'self' && m.data?.action === 'add')
.map((m) => m.data.key.bytes)
}
test('sync an account tangle', async (t) => {
const alice = createPeer({ name: 'alice', global: { keypair: aliceKeypair } })
const bob = createPeer({ name: 'bob', global: { keypair: bobKeys } })
await alice.db.loaded()
await bob.db.loaded()
// Alice's account tangle
await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
const aliceKeypair1 = Keypair.generate('ed25519', 'alice1')
await p(alice.db.account.add)({
account: aliceID,
keypair: aliceKeypair1,
})
const aliceKeypair2 = Keypair.generate('ed25519', 'alice2')
await p(alice.db.account.add)({
account: aliceID,
keypair: aliceKeypair2,
})
assert.deepEqual(
await getAccount(alice.db.msgs()),
[aliceKeypair.public, aliceKeypair1.public, aliceKeypair2.public],
'alice has her account tangle'
)
assert.deepEqual(
await getAccount(bob.db.msgs()),
[],
"bob doesn't have alice's account tangle"
)
// start() on purpose before connect, to test whether this also works
bob.sync.start()
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
// Set goals on purpose after connect, to test whether this also works
bob.goals.set(aliceID, 'all')
alice.goals.set(aliceID, 'all')
await p(setTimeout)(1000)
assert('sync!')
assert.deepEqual(
await getAccount(bob.db.msgs()),
[aliceKeypair.public, aliceKeypair1.public, aliceKeypair2.public],
"bob has alice's account tangle"
)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})

View File

@ -1,219 +0,0 @@
const test = require('node:test')
const assert = require('node:assert')
const p = require('node:util').promisify
const Keypair = require('pzp-keypair')
const MsgV4 = require('pzp-db/msg-v4')
const { createPeer } = require('./util')
const aliceKeypair = Keypair.generate('ed25519', 'alice')
async function flatten(iter) {
const ary = []
for await (const it of iter) {
ary.push(it)
}
return ary
}
test('sync goal=dict from scratch', async (t) => {
const SPAN = 5
const alice = createPeer({
name: 'alice',
global: {
keypair: aliceKeypair,
},
dict: { ghostSpan: SPAN },
})
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
// Alice sets up an account and a dict
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
await p(alice.dict.load)(aliceID)
const aliceAccountRoot = await p(alice.db.get)(aliceID)
// Bob knows Alice
await p(bob.db.add)(aliceAccountRoot, aliceID)
// Alice constructs a dict
await p(alice.dict.update)('profile', { age: 25 })
await p(alice.dict.update)('profile', { name: 'ALICE' })
const mootID = alice.dict.getFeedID('profile')
// Assert situation at Alice before sync
{
const arr = (await flatten(alice.db.msgs()))
.map((msg) => msg.data?.update)
.filter((x) => !!x)
.map((x) => x.age ?? x.name ?? x.gender)
assert.deepEqual(arr, [25, 'ALICE'], 'alice has age+name dict')
}
// Assert situation at Bob before sync
{
const arr = (await flatten(bob.db.msgs()))
.map((msg) => msg.data?.update)
.filter((x) => !!x)
.map((x) => x.age ?? x.name ?? x.gender)
assert.deepEqual(arr, [], 'alice has empty dict')
}
// Trigger sync
alice.goals.set(mootID, 'dict')
bob.goals.set(mootID, 'dict')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
// Assert situation at Bob after sync
{
const arr = (await flatten(bob.db.msgs()))
.map((msg) => msg.data?.update)
.filter((x) => !!x)
.map((x) => x.age ?? x.name ?? x.gender)
assert.deepEqual(arr, [25, 'ALICE'], 'alice has age+name dict')
}
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
//
// R-?-?-o-o
// \
// o
//
// where "o" is a dict update and "?" is a ghost
test('sync goal=dict with ghostSpan=2', async (t) => {
const SPAN = 5
const alice = createPeer({
name: 'alice',
global: {
keypair: aliceKeypair,
},
dict: { ghostSpan: SPAN },
})
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
// Alice sets up an account and a dict
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
await p(alice.dict.load)(aliceID)
const aliceAccountRoot = await p(alice.db.get)(aliceID)
// Bob knows Alice
await p(bob.db.add)(aliceAccountRoot, aliceID)
// Alice constructs a dict
await p(alice.dict.update)('profile', { name: 'alice' })
await p(alice.dict.update)('profile', { age: 24 })
await p(alice.dict.update)('profile', { name: 'Alice' })
await p(alice.dict.update)('profile', { age: 25 })
await p(alice.dict.update)('profile', { name: 'ALICE' })
let moot
let rec1
let rec2
let rec3
let rec4
let rec5
for await (const rec of alice.db.records()) {
if (rec.msg.metadata.dataSize === 0) moot = rec
if (rec.msg.data?.update?.name === 'alice') rec1 = rec
if (rec.msg.data?.update?.age === 24) rec2 = rec
if (rec.msg.data?.update?.name === 'Alice') rec3 = rec
if (rec.msg.data?.update?.age === 25) rec4 = rec
if (rec.msg.data?.update?.name === 'ALICE') rec5 = rec
}
// Bob knows the whole dict
await p(bob.db.add)(moot.msg, moot.id)
await p(bob.db.add)(rec1.msg, moot.id)
await p(bob.db.add)(rec2.msg, moot.id)
await p(bob.db.add)(rec3.msg, moot.id)
await p(bob.db.add)(rec4.msg, moot.id)
await p(bob.db.add)(rec5.msg, moot.id)
// Bob knows a branched off msg that Alice doesn't know
{
const tangle = new MsgV4.Tangle(moot.id)
tangle.add(moot.id, moot.msg)
tangle.add(rec1.id, rec1.msg)
const msg = MsgV4.create({
keypair: aliceKeypair,
domain: 'dict_v1__profile',
account: aliceID,
accountTips: [aliceID],
data: { update: { gender: 'w' }, supersedes: [] },
tangles: {
[moot.id]: tangle,
},
})
await p(bob.db.add)(msg, moot.id)
}
// Simulate Alice garbage collecting part of the dict
{
const fieldRoots = alice.dict._getFieldRoots('profile')
assert.deepEqual(fieldRoots.age, [rec4.id])
assert.deepEqual(fieldRoots.name, [rec5.id])
const tangle = await p(alice.db.getTangle)(alice.dict.getFeedID('profile'))
const { deletables, erasables } = tangle.getDeletablesAndErasables(rec4.id)
assert.equal(deletables.size, 2)
assert.equal(erasables.size, 2)
assert.ok(deletables.has(rec1.id))
assert.ok(deletables.has(rec2.id))
assert.ok(erasables.has(rec3.id))
assert.ok(erasables.has(moot.id))
for (const msgID of deletables) {
await p(alice.db.ghosts.add)({ msgID, tangleID: moot.id, span: SPAN })
await p(alice.db.del)(msgID)
}
for (const msgID of erasables) {
if (msgID === moot.id) continue
await p(alice.db.erase)(msgID)
}
}
// Assert situation at Alice before sync
assert.deepEqual(
await p(alice.dict.read)(aliceID, 'profile'),
{ age: 25, name: 'ALICE' },
'alice has age+name dict'
)
assert.deepEqual(alice.db.ghosts.get(moot.id), [rec1.id, rec2.id])
// Trigger sync
alice.goals.set(moot.id, 'dict')
bob.goals.set(moot.id, 'dict')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
// Assert situation at Alice before sync: she got the branched off msg
assert.deepEqual(
await p(alice.dict.read)(aliceID, 'profile'),
{ age: 25, name: 'ALICE', gender: 'w' },
'alice has age+name+gender dict'
)
assert.deepEqual(alice.db.ghosts.get(moot.id), [rec2.id])
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})

View File

@ -1,239 +1,89 @@
const test = require('node:test')
const assert = require('node:assert')
const p = require('node:util').promisify
const Keypair = require('pzp-keypair')
const test = require('tape')
const path = require('path')
const os = require('os')
const rimraf = require('rimraf')
const SecretStack = require('secret-stack')
const caps = require('ssb-caps')
const FeedV1 = require('ppppp-db/feed-v1')
const p = require('util').promisify
const Algorithm = require('../lib/algorithm')
const { createPeer } = require('./util')
const { generateKeypair } = require('./util')
const carolKeypair = Keypair.generate('ed25519', 'carol')
const bobKeypair2 = Keypair.generate('ed25519', 'bob2')
const createPeer = SecretStack({ appKey: caps.shs })
.use(require('ppppp-db'))
.use(require('ssb-box'))
.use(require('../lib'))
async function flatten(iter) {
const ary = []
for await (const it of iter) {
ary.push(it)
}
return ary
}
test('sync a feed without pre-knowing the owner account', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
for (let i = 1; i <= 5; i++) {
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm' + i },
})
}
assert('bob published posts 1..5')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m1', 'm2', 'm3', 'm4', 'm5'],
'alice has posts 1..5 from bob'
)
}
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('sync a feed with updated msgs from new account keypair', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
for (let i = 1; i <= 5; i++) {
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm' + i },
})
}
assert('bob published posts 1..5')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m1', 'm2', 'm3', 'm4', 'm5'],
'alice has posts 1..5 from bob'
)
}
await p(remoteAlice.close)(true)
// --------------------------------------------
// Bob adds a new keypair and published with it
// --------------------------------------------
const consent = bob.db.account.consent({
account: bobID,
keypair: bobKeypair2,
})
await p(bob.db.account.add)({
account: bobID,
keypair: bobKeypair2,
consent,
powers: [],
})
for (let i = 6; i <= 7; i++) {
await p(bob.db.feed.publish)({
account: bobID,
keypair: bobKeypair2,
domain: 'post',
data: { text: 'm' + i },
})
}
assert('bob with new keypair published posts 6..7')
const remoteAlice2 = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'],
'alice has posts 1..7 from bob'
)
}
await p(remoteAlice2.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice')
const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob')
const aliceKeys = generateKeypair('alice')
const bobKeys = generateKeypair('bob')
test('sync a feed with goal=all', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
rimraf.sync(ALICE_DIR)
rimraf.sync(BOB_DIR)
const alice = createPeer({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createPeer({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded()
await bob.db.loaded()
const carolID = await p(alice.db.account.create)({
keypair: carolKeypair,
subdomain: 'account',
_nonce: 'carol',
})
const carolAccountRoot = await p(alice.db.get)(carolID)
// Bob knows Carol
await p(bob.db.add)(carolAccountRoot, carolID)
const carolKeys = generateKeypair('carol')
const carolMsgs = []
const carolID = carolKeys.id
const carolID_b58 = FeedV1.stripAuthor(carolID)
for (let i = 1; i <= 10; i++) {
const rec = await p(alice.db.feed.publish)({
account: carolID,
domain: 'post',
data: { text: 'm' + i },
keypair: carolKeypair,
const rec = await p(alice.db.create)({
type: 'post',
content: { text: 'm' + i },
keys: carolKeys,
})
carolMsgs.push(rec.msg)
}
assert('alice has msgs 1..10 from carol')
t.pass('alice has msgs 1..10 from carol')
const carolPostsMootID = alice.db.feed.getID(carolID, 'post')
const carolPostsMoot = await p(alice.db.get)(carolPostsMootID)
const carolRootHash = alice.db.getFeedRoot(carolID, 'post')
const carolRootMsg = alice.db.get(carolRootHash)
await p(bob.db.add)(carolPostsMoot, carolPostsMootID)
await p(bob.db.add)(carolRootMsg, carolRootHash)
for (let i = 0; i < 7; i++) {
await p(bob.db.add)(carolMsgs[i], carolPostsMootID)
await p(bob.db.add)(carolMsgs[i], carolRootHash)
}
{
const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
const arr = [...bob.db.msgs()]
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content)
.map((msg) => msg.content.text)
t.deepEquals(
arr,
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'],
'bob has msgs 1..7 from carol'
)
}
bob.goals.set(carolPostsMootID, 'all')
alice.goals.set(carolPostsMootID, 'all')
bob.tangleSync.setGoal(carolRootHash, 'all')
alice.tangleSync.setGoal(carolRootHash, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
t.pass('bob connected to alice')
bob.sync.start()
bob.tangleSync.initiate()
await p(setTimeout)(1000)
assert('sync!')
t.pass('tangleSync!')
{
const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
const arr = [...bob.db.msgs()]
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content)
.map((msg) => msg.content.text)
t.deepEquals(
arr,
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7', 'm8', 'm9', 'm10'],
'bob has msgs 1..10 from carol'
@ -246,68 +96,70 @@ test('sync a feed with goal=all', async (t) => {
})
test('sync a feed with goal=newest', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
rimraf.sync(ALICE_DIR)
rimraf.sync(BOB_DIR)
const alice = createPeer({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createPeer({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded()
await bob.db.loaded()
const carolID = await p(alice.db.account.create)({
keypair: carolKeypair,
subdomain: 'account',
_nonce: 'carol',
})
const carolAccountRoot = await p(alice.db.get)(carolID)
// Bob knows Carol
await p(bob.db.add)(carolAccountRoot, carolID)
const carolKeys = generateKeypair('carol')
const carolMsgs = []
const carolID = carolKeys.id
const carolID_b58 = FeedV1.stripAuthor(carolID)
for (let i = 1; i <= 10; i++) {
const rec = await p(alice.db.feed.publish)({
account: carolID,
domain: 'post',
data: { text: 'm' + i },
keypair: carolKeypair,
const rec = await p(alice.db.create)({
type: 'post',
content: { text: 'm' + i },
keys: carolKeys,
})
carolMsgs.push(rec.msg)
}
assert('alice has msgs 1..10 from carol')
t.pass('alice has msgs 1..10 from carol')
const carolPostsMootID = alice.db.feed.getID(carolID, 'post')
const carolPostsMoot = await p(alice.db.get)(carolPostsMootID)
const carolRootHash = alice.db.getFeedRoot(carolID, 'post')
const carolRootMsg = alice.db.get(carolRootHash)
await p(bob.db.add)(carolPostsMoot, carolPostsMootID)
await p(bob.db.add)(carolRootMsg, carolRootHash)
for (let i = 0; i < 7; i++) {
await p(bob.db.add)(carolMsgs[i], carolPostsMootID)
await p(bob.db.add)(carolMsgs[i], carolRootHash)
}
{
const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
const arr = [...bob.db.msgs()]
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content)
.map((msg) => msg.content.text)
t.deepEquals(
arr,
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'],
'bob has msgs 1..7 from carol'
)
}
bob.goals.set(carolPostsMootID, 'newest-5')
alice.goals.set(carolPostsMootID, 'all')
bob.tangleSync.setGoal(carolRootHash, 'newest-5')
alice.tangleSync.setGoal(carolRootHash, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
t.pass('bob connected to alice')
bob.sync.start()
bob.tangleSync.initiate()
await p(setTimeout)(1000)
assert('sync!')
t.pass('tangleSync!')
{
const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
const arr = [...bob.db.msgs()]
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content)
.map((msg) => msg.content.text)
t.deepEquals(
arr,
['m6', 'm7', 'm8', 'm9', 'm10'],
'bob has msgs 6..10 from carol'
@ -320,76 +172,78 @@ test('sync a feed with goal=newest', async (t) => {
})
test('sync a feed with goal=newest but too far behind', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
rimraf.sync(ALICE_DIR)
rimraf.sync(BOB_DIR)
const alice = createPeer({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createPeer({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded()
await bob.db.loaded()
const carolID = await p(alice.db.account.create)({
keypair: carolKeypair,
subdomain: 'account',
_nonce: 'carol',
})
const carolIDMsg = await p(alice.db.get)(carolID)
// Bob knows Carol
await p(bob.db.add)(carolIDMsg, carolID)
const carolKeys = generateKeypair('carol')
const carolMsgs = []
const carolID = carolKeys.id
const carolID_b58 = FeedV1.stripAuthor(carolID)
for (let i = 1; i <= 10; i++) {
const rec = await p(alice.db.feed.publish)({
account: carolID,
domain: 'post',
data: { text: 'm' + i },
keypair: carolKeypair,
const rec = await p(alice.db.create)({
type: 'post',
content: { text: 'm' + i },
keys: carolKeys,
})
carolMsgs.push(rec.msg)
}
const carolPostsMootID = alice.db.feed.getID(carolID, 'post')
const carolPostsMoot = await p(alice.db.get)(carolPostsMootID)
const carolRootHash = alice.db.getFeedRoot(carolID, 'post')
const carolRootMsg = alice.db.get(carolRootHash)
const algo = new Algorithm(alice)
await algo.pruneNewest(carolPostsMootID, 5)
await algo.pruneNewest(carolRootHash, 5)
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
const arr = [...alice.db.msgs()]
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content)
.map((msg) => msg.content.text)
t.deepEquals(
arr,
['m6', 'm7', 'm8', 'm9', 'm10'],
'alice has msgs 6..10 from carol'
)
}
await p(bob.db.add)(carolPostsMoot, carolPostsMootID)
await p(bob.db.add)(carolRootMsg, carolRootHash)
for (let i = 0; i < 2; i++) {
await p(bob.db.add)(carolMsgs[i], carolPostsMootID)
await p(bob.db.add)(carolMsgs[i], carolRootHash)
}
{
const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, ['m1', 'm2'], 'bob has msgs 1..2 from carol')
const arr = [...bob.db.msgs()]
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content)
.map((msg) => msg.content.text)
t.deepEquals(arr, ['m1', 'm2'], 'bob has msgs 1..2 from carol')
}
alice.goals.set(carolPostsMootID, 'newest-5')
bob.goals.set(carolPostsMootID, 'newest-8')
alice.tangleSync.setFeedGoal(carolID, 'post', 'newest-5')
bob.tangleSync.setFeedGoal(carolID, 'post', 'newest-5')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
t.pass('bob connected to alice')
bob.sync.start()
bob.tangleSync.initiate()
await p(setTimeout)(1000)
assert('sync!')
t.pass('tangleSync!')
{
const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
const arr = [...bob.db.msgs()]
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content)
.map((msg) => msg.content.text)
t.deepEquals(
arr,
['m6', 'm7', 'm8', 'm9', 'm10'],
'bob has msgs 6..10 from carol'
@ -400,122 +254,3 @@ test('sync a feed with goal=newest but too far behind', async (t) => {
await p(alice.close)(true)
await p(bob.close)(true)
})
// Bob replicates a small "newest" part of Carol's feed, then
// Alice replicates what Bob has, even though she wants more.
// Finally, Alice replicates from Carol the whole feed.
test('sync small newest slice of a feed, then the whole feed', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
const carol = createPeer({ name: 'carol' })
await alice.db.loaded()
await bob.db.loaded()
await carol.db.loaded()
const carolID = await p(carol.db.account.create)({
subdomain: 'account',
_nonce: 'carol',
})
const carolIDMsg = await p(carol.db.get)(carolID)
// Alice and Bob know Carol
await p(alice.db.add)(carolIDMsg, carolID)
await p(bob.db.add)(carolIDMsg, carolID)
const carolPosts = []
for (let i = 1; i <= 9; i++) {
const rec = await p(carol.db.feed.publish)({
account: carolID,
domain: 'post',
data: { text: 'm' + i },
})
carolPosts.push(rec.msg)
}
const carolPostsMootID = carol.db.feed.getID(carolID, 'post')
const carolPostsMoot = await p(carol.db.get)(carolPostsMootID)
{
const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'bob has nothing from carol')
}
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has nothing from carol')
}
alice.goals.set(carolPostsMootID, 'all')
bob.goals.set(carolPostsMootID, 'newest-4')
carol.goals.set(carolPostsMootID, 'all')
const bobDialingCarol = await p(bob.connect)(carol.getAddress())
assert('bob connected to carol')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(bob.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m6', 'm7', 'm8', 'm9'],
'bob has msgs 6..9 from carol'
)
}
await p(bobDialingCarol.close)(true)
const aliceDialingBob = await p(alice.connect)(bob.getAddress())
assert('alice connected to bob')
alice.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(
arr,
['m6', 'm7', 'm8', 'm9'],
'alice has msgs 6..9 from carol'
)
}
await p(aliceDialingBob.close)(true)
const aliceDialingCarol = await p(alice.connect)(carol.getAddress())
assert('alice connected to alice')
alice.sync.start()
await p(setTimeout)(2000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === carolID && msg.data)
.map((msg) => msg.data.text)
.sort()
assert.deepEqual(
arr,
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7', 'm8', 'm9'],
'alice has msgs 1..9 from carol'
)
}
await p(aliceDialingCarol.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
await p(carol.close)(true)
})

View File

@ -1,382 +0,0 @@
const test = require('node:test')
const assert = require('node:assert')
const p = require('node:util').promisify
const { createPeer } = require('./util')
async function flatten(iter) {
const ary = []
for await (const it of iter) {
ary.push(it)
}
return ary
}
test('sync feed msgs in realtime after the 9 rounds', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm0' },
})
assert('bob published post 0')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, ['m0'], 'alice has post 0 from bob')
}
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm1' },
})
assert('bob published post 1')
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm2' },
})
assert('bob published post 2')
{
let arr
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
if (arr.length < 3) {
await p(setTimeout)(200)
continue
}
}
assert.deepEqual(arr, ['m0', 'm1', 'm2'], 'alice has posts 0..2 from bob')
}
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('sync feed msgs in realtime after the 9 rounds, reverse', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm0' },
})
assert('bob published post 0')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteBob = await p(alice.connect)(bob.getAddress())
assert('bob connected to alice')
// Reverse direction of who "starts"
alice.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, ['m0'], 'alice has post 0 from bob')
}
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm1' },
})
assert('bob published post 1')
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm2' },
})
assert('bob published post 2')
{
let arr
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
if (arr.length < 3) {
await p(setTimeout)(200)
continue
}
}
assert.deepEqual(arr, ['m0', 'm1', 'm2'], 'alice has posts 0..2 from bob')
}
await p(remoteBob.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('create 100 messages in parallel that still manage to sync realtime (create an initial post before starting realtime too)', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm0' },
})
assert('bob published post 0')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('alice and bob connected')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
const n = 100
const hundred = []
for (let i = 0; i < n; i++) {
hundred.push(i)
}
await Promise.all(hundred.map(i => p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: `post nr ${i}` },
})))
assert('bob published 100 posts in parallel')
const bobMsgs = await flatten(bob.db.msgs())
// 1 for creating bob's account, 1 for the 'post' moot, and 1 for first post
assert.equal(bobMsgs.length, n + 3, "bob has all of his own messages")
let arr
// just waiting for them to arrive
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
// moot doesn't have msg.data
.filter((msg) => msg.metadata.account === bobID && msg.data)
.filter(msg => msg.metadata.domain === 'post')
.map((msg) => msg.data.text)
if (arr.length < n) {
await p(setTimeout)(100)
} else {
break
}
}
assert.equal(arr.length, n + 1, `alice has ${arr.length} posts from bob`)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('create 100 messages in parallel that still manage to sync realtime (without creating an initial post before starting realtime)', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('alice and bob connected')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
const n = 100
const hundred = []
for (let i = 0; i < n; i++) {
hundred.push(i)
}
await Promise.all(hundred.map(i => p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: `post nr ${i}` },
})))
assert('bob published 100 posts in parallel')
const bobMsgs = await flatten(bob.db.msgs())
// 1 for creating bob's account, and 1 for the 'post' moot
assert.equal(bobMsgs.length, n + 2, "bob has all of his own messages")
let arr
// just waiting for them to arrive
for (let i = 0; i < 100; i++) {
arr = (await flatten(alice.db.msgs()))
// moot doesn't have msg.data
.filter((msg) => msg.metadata.account === bobID && msg.data)
.filter(msg => msg.metadata.domain === 'post')
.map((msg) => msg.data.text)
if (arr.length < n) {
await p(setTimeout)(100)
} else {
break
}
}
assert.equal(arr.length, n, `alice has ${arr.length} posts from bob`)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
test('create 200 messages that manage to replicate with low "newest" goals', async (t) => {
const n = 200
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = (await flatten(alice.db.msgs()))
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
const confirmed = []
// for keeping track of which msgs have arrived
for (let i = 0; i < n; i++) {
confirmed.push(false)
}
alice.db.onRecordAdded(rec => {
if (rec.msg.data?.text) {
const num = Number.parseInt(rec.msg.data.text)
confirmed[num] = true
}
})
bob.goals.set(bobPostsID, 'newest-50')
alice.goals.set(bobPostsID, 'newest-50')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('alice and bob connected')
bob.sync.start()
await p(setTimeout)(1000)
assert('sync!')
const hundred = []
for (let i = 0; i < n; i++) {
hundred.push(i)
}
Promise.all(hundred.map(i => p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: `${i}` },
})))
assert(`bob published ${n} posts in parallel`)
let tries = 30
// just waiting for them to arrive
do {
await p(setTimeout)(100)
} while (!confirmed.every(v => v === true) && tries-- > 0)
assert.equal(confirmed.filter(v => v === true).length, n, `alice has all of bob's posts`)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})

View File

@ -1,218 +0,0 @@
const test = require('node:test')
const assert = require('node:assert')
const p = require('node:util').promisify
const Keypair = require('pzp-keypair')
const MsgV4 = require('pzp-db/msg-v4')
const { createPeer } = require('./util')
const aliceKeypair = Keypair.generate('ed25519', 'alice')
function getItems(subdomain, arr) {
return arr
.filter((msg) => msg.metadata.domain === `set_v1__${subdomain}`)
.map((msg) => msg.data)
.filter((data) => !!data)
.map((data) => data.add?.[0] ?? '-' + data.del?.[0])
}
test('sync goal=set from scratch', async (t) => {
const SPAN = 5
const alice = createPeer({
name: 'alice',
global: {
keypair: aliceKeypair,
},
set: { ghostSpan: SPAN },
})
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
// Alice sets up an account and a set
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
await p(alice.set.load)(aliceID)
const aliceAccountRoot = await p(alice.db.get)(aliceID)
// Bob knows Alice
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
await p(bob.set.load)(bobID)
await p(bob.db.add)(aliceAccountRoot, aliceID)
// Alice constructs a set
await p(alice.set.add)('names', 'Alice')
await p(alice.set.add)('names', 'Bob')
const mootID = alice.set.getFeedID('names')
// Assert situation at Alice before sync
assert.deepEqual(
await p(alice.set.values)('names', aliceID),
['Alice', 'Bob'],
'alice has Alice+Bob set'
)
// Assert situation at Bob before sync
assert.deepEqual(await p(bob.set.values)('names', aliceID), [], 'bob has empty set')
// Trigger sync
alice.goals.set(mootID, 'set')
bob.goals.set(mootID, 'set')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(2000)
assert('sync!')
// Assert situation at Bob after sync
assert.deepEqual(
await p(bob.set.values)('names', aliceID),
['Alice', 'Bob'],
'bob has Alice+Bob set'
)
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})
//
// R-?-?-?-?-o-o
// \
// o
//
// where "o" is a set update and "?" is a ghost
test('sync goal=set with ghostSpan=5', async (t) => {
const SPAN = 5
const alice = createPeer({
name: 'alice',
global: {
keypair: aliceKeypair,
},
set: { ghostSpan: SPAN },
})
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
// Alice sets up an account and a set
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
await p(alice.set.load)(aliceID)
const aliceAccountRoot = await p(alice.db.get)(aliceID)
// Bob knows Alice
await p(bob.db.add)(aliceAccountRoot, aliceID)
// Alice constructs a set
await p(alice.set.add)('follows', 'alice')
await p(alice.set.add)('follows', 'bob')
await p(alice.set.del)('follows', 'alice')
await p(alice.set.del)('follows', 'bob')
await p(alice.set.add)('follows', 'Alice')
await p(alice.set.add)('follows', 'Bob')
let moot
let rec1
let rec2
let rec3
let rec4
let rec5
let rec6
for await (const rec of alice.db.records()) {
if (rec.msg.metadata.dataSize === 0) moot = rec
if (rec.msg.data?.add?.[0] === 'alice') rec1 = rec
if (rec.msg.data?.add?.[0] === 'bob') rec2 = rec
if (rec.msg.data?.del?.[0] === 'alice') rec3 = rec
if (rec.msg.data?.del?.[0] === 'bob') rec4 = rec
if (rec.msg.data?.add?.[0] === 'Alice') rec5 = rec
if (rec.msg.data?.add?.[0] === 'Bob') rec6 = rec
}
// Bob knows the whole set
await p(bob.db.add)(moot.msg, moot.id)
await p(bob.db.add)(rec1.msg, moot.id)
await p(bob.db.add)(rec2.msg, moot.id)
await p(bob.db.add)(rec3.msg, moot.id)
await p(bob.db.add)(rec4.msg, moot.id)
await p(bob.db.add)(rec5.msg, moot.id)
await p(bob.db.add)(rec6.msg, moot.id)
// Bob knows a branched off msg that Alice doesn't know
{
const tangle = new MsgV4.Tangle(moot.id)
tangle.add(moot.id, moot.msg)
tangle.add(rec1.id, rec1.msg)
const msg = MsgV4.create({
keypair: aliceKeypair,
domain: 'set_v1__follows',
account: aliceID,
accountTips: [aliceID],
data: { add: ['Carol'], del: [], supersedes: [] },
tangles: {
[moot.id]: tangle,
},
})
await p(bob.db.add)(msg, moot.id)
}
// Simulate Alice garbage collecting part of the set
{
const itemRoots = alice.set._getItemRoots('follows')
assert.deepEqual(itemRoots, { Alice: [rec5.id], Bob: [rec6.id] })
const tangle = await p(alice.db.getTangle)(alice.set.getFeedID('follows'))
const { deletables, erasables } = tangle.getDeletablesAndErasables(rec5.id)
assert.equal(deletables.size, 2)
assert.equal(erasables.size, 3)
assert.ok(deletables.has(rec1.id))
assert.ok(deletables.has(rec2.id))
assert.ok(erasables.has(moot.id))
assert.ok(erasables.has(rec3.id))
assert.ok(erasables.has(rec4.id))
for (const msgID of deletables) {
await p(alice.db.ghosts.add)({ msgID, tangleID: moot.id, span: SPAN })
await p(alice.db.del)(msgID)
}
for (const msgID of erasables) {
if (msgID === moot.id) continue
await p(alice.db.erase)(msgID)
}
}
// Assert situation at Alice before sync
assert.deepEqual(
await p(alice.set.values)('follows', aliceID),
['Alice', 'Bob'],
'alice has Alice+Bob set'
)
assert.deepEqual(alice.db.ghosts.get(moot.id), [rec1.id, rec2.id])
// Trigger sync
alice.goals.set(moot.id, 'set')
bob.goals.set(moot.id, 'set')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
bob.sync.start()
await p(setTimeout)(2000)
assert('sync!')
// Assert situation at Alice after sync: she got the branched off msg
assert.deepEqual(
await p(alice.set.values)('follows', aliceID),
['Carol', 'Alice', 'Bob'],
'alice has Alice+Bob+Carol set'
)
assert.deepEqual(alice.db.ghosts.get(moot.id), [rec2.id])
await p(remoteAlice.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})

View File

@ -1,18 +1,24 @@
const test = require('node:test')
const assert = require('node:assert')
const test = require('tape')
const path = require('path')
const os = require('os')
const rimraf = require('rimraf')
const SecretStack = require('secret-stack')
const caps = require('ssb-caps')
const p = require('util').promisify
const Keypair = require('pzp-keypair')
const { createPeer } = require('./util')
const { generateKeypair } = require('./util')
const carolKeypair = Keypair.generate('ed25519', 'carol')
const daveKeypair = Keypair.generate('ed25519', 'dave')
const createSSB = SecretStack({ appKey: caps.shs })
.use(require('ppppp-db'))
.use(require('ssb-box'))
.use(require('../lib'))
async function getTexts(iter) {
const ary = []
for await (i of iter) {
ary.push(i)
}
return ary.filter((msg) => msg.data?.text).map((msg) => msg.data.text)
const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice')
const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob')
const aliceKeys = generateKeypair('alice')
const bobKeys = generateKeypair('bob')
function getTexts(iter) {
return [...iter].filter((msg) => msg.content).map((msg) => msg.content.text)
}
/*
@ -53,124 +59,103 @@ graph TB;
```
*/
test('sync a thread where both peers have portions', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
rimraf.sync(ALICE_DIR)
rimraf.sync(BOB_DIR)
const alice = createSSB({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createSSB({
keys: bobKeys,
path: BOB_DIR,
})
const carolKeys = generateKeypair('carol')
const carolID = carolKeys.id
const daveKeys = generateKeypair('dave')
const daveID = daveKeys.id
await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
const aliceIDMsg = await p(alice.db.get)(aliceID)
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
const startA = await p(alice.db.create)({
type: 'post',
content: { text: 'A' },
keys: aliceKeys,
})
const bobIDMsg = await p(bob.db.get)(bobID)
// Alice created Carol
const carolID = await p(alice.db.account.create)({
subdomain: 'account',
keypair: carolKeypair,
_nonce: 'carol',
})
const carolIDMsg = await p(alice.db.get)(carolID)
// Alice created Dave
const daveID = await p(alice.db.account.create)({
subdomain: 'account',
keypair: daveKeypair,
_nonce: 'dave',
})
const daveIDMsg = await p(alice.db.get)(daveID)
// Alice knows Bob
await p(alice.db.add)(bobIDMsg, bobID)
// Bob knows Alice, Carol, and Dave
await p(bob.db.add)(aliceIDMsg, aliceID)
await p(bob.db.add)(carolIDMsg, carolID)
await p(bob.db.add)(daveIDMsg, daveID)
const startA = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A' },
})
const rootHashA = alice.db.feed.getID(aliceID, 'post')
const rootMsgA = await p(alice.db.get)(rootHashA)
const rootHashA = alice.db.getFeedRoot(aliceKeys.id, 'post')
const rootMsgA = alice.db.get(rootHashA)
await p(bob.db.add)(rootMsgA, rootHashA)
await p(bob.db.add)(startA.msg, rootHashA)
const replyB1 = await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'B1' },
tangles: [startA.id],
const replyB1 = await p(bob.db.create)({
type: 'post',
content: { text: 'B1' },
tangles: [startA.hash],
keys: bobKeys,
})
const replyB2 = await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'B2' },
tangles: [startA.id],
const replyB2 = await p(bob.db.create)({
type: 'post',
content: { text: 'B2' },
tangles: [startA.hash],
keys: bobKeys,
})
const rootHashB = bob.db.feed.getID(bobID, 'post')
const rootMsgB = await p(bob.db.get)(rootHashB)
const rootHashB = bob.db.getFeedRoot(bobKeys.id, 'post')
const rootMsgB = bob.db.get(rootHashB)
await p(alice.db.add)(rootMsgB, rootHashB)
await p(alice.db.add)(replyB1.msg, rootHashB)
await p(alice.db.add)(replyB2.msg, rootHashB)
const replyC1 = await p(alice.db.feed.publish)({
account: carolID,
domain: 'post',
data: { text: 'C1' },
tangles: [startA.id],
keypair: carolKeypair,
const replyC1 = await p(alice.db.create)({
type: 'post',
content: { text: 'C1' },
tangles: [startA.hash],
keys: carolKeys,
})
const replyD1 = await p(bob.db.feed.publish)({
account: daveID,
domain: 'post',
data: { text: 'D1' },
tangles: [startA.id],
keypair: daveKeypair,
const replyD1 = await p(bob.db.create)({
type: 'post',
content: { text: 'D1' },
tangles: [startA.hash],
keys: daveKeys,
})
assert.deepEqual(
await getTexts(alice.db.msgs()),
t.deepEquals(
getTexts(alice.db.msgs()),
['A', 'B1', 'B2', 'C1'],
'alice has a portion of the thread'
)
assert.deepEqual(
await getTexts(bob.db.msgs()),
t.deepEquals(
getTexts(bob.db.msgs()),
['A', 'B1', 'B2', 'D1'],
'bob has another portion of the thread'
)
bob.goals.set(startA.id, 'all')
alice.goals.set(startA.id, 'all')
bob.tangleSync.setGoal(startA.hash, 'all')
alice.tangleSync.setGoal(startA.hash, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
t.pass('bob connected to alice')
bob.sync.start()
bob.tangleSync.initiate()
await p(setTimeout)(1000)
assert('sync!')
t.pass('tangleSync!')
assert.deepEqual(
await getTexts(alice.db.msgs()),
t.deepEquals(
getTexts(alice.db.msgs()),
['A', 'B1', 'B2', 'C1', 'D1'],
'alice has the full thread'
)
assert.deepEqual(
await getTexts(bob.db.msgs()),
t.deepEquals(
getTexts(bob.db.msgs()),
['A', 'B1', 'B2', 'D1', 'C1'],
'bob has the full thread'
)
@ -181,70 +166,63 @@ test('sync a thread where both peers have portions', async (t) => {
})
test('sync a thread where initiator does not have the root', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
rimraf.sync(ALICE_DIR)
rimraf.sync(BOB_DIR)
const alice = createSSB({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createSSB({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
const aliceIDMsg = await p(alice.db.get)(aliceID)
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobIDMsg = await p(bob.db.get)(bobID)
// Alice knows Bob
await p(alice.db.add)(bobIDMsg, bobID)
// Bob knows Alice
await p(bob.db.add)(aliceIDMsg, aliceID)
const rootA = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A' },
const rootA = await p(alice.db.create)({
type: 'post',
content: { text: 'A' },
keys: aliceKeys,
})
const replyA1 = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A1' },
tangles: [rootA.id],
const replyA1 = await p(alice.db.create)({
type: 'post',
content: { text: 'A1' },
tangles: [rootA.hash],
keys: aliceKeys,
})
const replyA2 = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A2' },
tangles: [rootA.id],
const replyA2 = await p(alice.db.create)({
type: 'post',
content: { text: 'A2' },
tangles: [rootA.hash],
keys: aliceKeys,
})
assert.deepEqual(
await getTexts(alice.db.msgs()),
t.deepEquals(
getTexts(alice.db.msgs()),
['A', 'A1', 'A2'],
'alice has the full thread'
)
assert.deepEqual(await getTexts(bob.db.msgs()), [], 'bob has nothing')
t.deepEquals(getTexts(bob.db.msgs()), [], 'bob has nothing')
bob.goals.set(rootA.id, 'all')
bob.tangleSync.setGoal(rootA.hash, 'all')
// ON PURPOSE: alice does not set the goal
// alice.goals.set(rootA.id, 'all')
// alice.tangleSync.setGoal(rootA.hash, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice')
t.pass('bob connected to alice')
bob.sync.start()
bob.tangleSync.initiate()
await p(setTimeout)(1000)
assert('sync!')
t.pass('tangleSync!')
assert.deepEqual(
await getTexts(bob.db.msgs()),
t.deepEquals(
getTexts(bob.db.msgs()),
['A', 'A1', 'A2'],
'bob has the full thread'
)
@ -255,69 +233,62 @@ test('sync a thread where initiator does not have the root', async (t) => {
})
test('sync a thread where receiver does not have the root', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
rimraf.sync(ALICE_DIR)
rimraf.sync(BOB_DIR)
const alice = createSSB({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createSSB({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
const aliceIDMsg = await p(alice.db.get)(aliceID)
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobIDMsg = await p(bob.db.get)(bobID)
// Alice knows Bob
await p(alice.db.add)(bobIDMsg, bobID)
// Bob knows Alice
await p(bob.db.add)(aliceIDMsg, aliceID)
const rootA = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A' },
const rootA = await p(alice.db.create)({
type: 'post',
content: { text: 'A' },
keys: aliceKeys,
})
const replyA1 = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A1' },
tangles: [rootA.id],
const replyA1 = await p(alice.db.create)({
type: 'post',
content: { text: 'A1' },
tangles: [rootA.hash],
keys: aliceKeys,
})
const replyA2 = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A2' },
tangles: [rootA.id],
const replyA2 = await p(alice.db.create)({
type: 'post',
content: { text: 'A2' },
tangles: [rootA.hash],
keys: aliceKeys,
})
assert.deepEqual(
await getTexts(alice.db.msgs()),
t.deepEquals(
getTexts(alice.db.msgs()),
['A', 'A1', 'A2'],
'alice has the full thread'
)
assert.deepEqual(await getTexts(bob.db.msgs()), [], 'bob has nothing')
t.deepEquals(getTexts(bob.db.msgs()), [], 'bob has nothing')
bob.goals.set(rootA.id, 'all')
alice.goals.set(rootA.id, 'all')
bob.tangleSync.setGoal(rootA.hash, 'all')
alice.tangleSync.setGoal(rootA.hash, 'all')
const remoteBob = await p(alice.connect)(bob.getAddress())
assert('alice connected to bob')
t.pass('alice connected to bob')
alice.sync.start()
alice.tangleSync.initiate()
await p(setTimeout)(1000)
assert('sync!')
t.pass('tangleSync!')
assert.deepEqual(
await getTexts(bob.db.msgs()),
t.deepEquals(
getTexts(bob.db.msgs()),
['A', 'A1', 'A2'],
'bob has the full thread'
)
@ -328,76 +299,69 @@ test('sync a thread where receiver does not have the root', async (t) => {
})
test('sync a thread with reactions too', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
rimraf.sync(ALICE_DIR)
rimraf.sync(BOB_DIR)
const alice = createSSB({
keys: aliceKeys,
path: ALICE_DIR,
})
const bob = createSSB({
keys: bobKeys,
path: BOB_DIR,
})
await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({
subdomain: 'account',
_nonce: 'alice',
})
const aliceIDMsg = await p(alice.db.get)(aliceID)
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
const bobIDMsg = await p(bob.db.get)(bobID)
// Alice knows Bob
await p(alice.db.add)(bobIDMsg, bobID)
// Bob knows Alice
await p(bob.db.add)(aliceIDMsg, aliceID)
const rootA = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A' },
const rootA = await p(alice.db.create)({
type: 'post',
content: { text: 'A' },
keys: aliceKeys,
})
const replyA1 = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A1' },
tangles: [rootA.id],
const replyA1 = await p(alice.db.create)({
type: 'post',
content: { text: 'A1' },
tangles: [rootA.hash],
keys: aliceKeys,
})
const replyA2 = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'post',
data: { text: 'A2' },
tangles: [rootA.id],
const replyA2 = await p(alice.db.create)({
type: 'post',
content: { text: 'A2' },
tangles: [rootA.hash],
keys: aliceKeys,
})
const reactionA3 = await p(alice.db.feed.publish)({
account: aliceID,
domain: 'reaction',
data: { text: 'yes', link: replyA1.id },
tangles: [rootA.id, replyA1.id],
const reactionA3 = await p(alice.db.create)({
type: 'reaction',
content: { text: 'yes', link: replyA1.hash },
tangles: [rootA.hash, replyA1.hash],
keys: aliceKeys,
})
assert.deepEqual(
await getTexts(alice.db.msgs()),
t.deepEquals(
getTexts(alice.db.msgs()),
['A', 'A1', 'A2', 'yes'],
'alice has the full thread'
)
assert.deepEqual(await getTexts(bob.db.msgs()), [], 'bob has nothing')
t.deepEquals(getTexts(bob.db.msgs()), [], 'bob has nothing')
bob.goals.set(rootA.id, 'all')
alice.goals.set(rootA.id, 'all')
bob.tangleSync.setGoal(rootA.hash, 'all')
alice.tangleSync.setGoal(rootA.hash, 'all')
const remoteBob = await p(alice.connect)(bob.getAddress())
assert('alice connected to bob')
t.pass('alice connected to bob')
alice.sync.start()
alice.tangleSync.initiate()
await p(setTimeout)(1000)
assert('sync!')
t.pass('tangleSync!')
assert.deepEqual(
await getTexts(bob.db.msgs()),
t.deepEquals(
getTexts(bob.db.msgs()),
['A', 'A1', 'A2', 'yes'],
'bob has the full thread'
)

View File

@ -1,53 +1,14 @@
const OS = require('node:os')
const Path = require('node:path')
const rimraf = require('rimraf')
const caps = require('pzp-caps')
const Keypair = require('pzp-keypair')
const ssbKeys = require('ssb-keys')
const SSBURI = require('ssb-uri2')
const base58 = require('bs58')
function createPeer(config) {
if (config.name) {
const name = config.name
const tmp = OS.tmpdir()
config.global ??= {}
config.global.path ??= Path.join(tmp, `pzp-sync-${name}-${Date.now()}`)
config.global.keypair ??= Keypair.generate('ed25519', name)
delete config.name
}
if (!config.global) {
throw new Error('need config.global in createPeer()')
}
if (!config.global.path) {
throw new Error('need config.global.path in createPeer()')
}
if (!config.global.keypair) {
throw new Error('need config.global.keypair in createPeer()')
}
rimraf.sync(config.global.path)
return require('secret-stack/bare')()
.use(require('secret-stack/plugins/net'))
.use(require('secret-handshake-ext/secret-stack'))
.use(require('pzp-db'))
.use(require('pzp-dict'))
.use(require('pzp-set'))
.use(require('pzp-goals'))
.use(require('ssb-box'))
.use(require('../lib'))
.call(null, {
shse: { caps },
...config,
global: {
connections: {
incoming: {
net: [{ scope: 'device', transform: 'shse', port: null }],
},
outgoing: {
net: [{ transform: 'shse' }],
},
},
...config.global,
},
})
function generateKeypair(seed) {
const keys = ssbKeys.generate('ed25519', seed, 'buttwoo-v1')
const { data } = SSBURI.decompose(keys.id)
keys.id = `ppppp:feed/v1/${base58.encode(Buffer.from(data, 'base64'))}`
return keys
}
module.exports = { createPeer }
module.exports = {
generateKeypair,
}

View File

@ -1,44 +0,0 @@
const test = require('node:test')
const assert = require('node:assert')
const Algorithm = require('../lib/algorithm')
const EMPTY = [1, 0]
test('want-range for goal=newest-3', async (t) => {
const algo = new Algorithm({ db: null })
const goal = { type: 'newest', count: 3 }
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 3], goal), [2, 3])
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 5], goal), [3, 5])
assert.deepStrictEqual(await algo.wantRange([1, 3], [2, 4], goal), [2, 4])
assert.deepStrictEqual(await algo.wantRange([1, 5], [2, 4], goal), [3, 4])
assert.deepStrictEqual(await algo.wantRange([1, 3], [4, 6], goal), [4, 6])
assert.deepStrictEqual(await algo.wantRange([4, 6], [1, 3], goal), EMPTY)
assert.deepStrictEqual(await algo.wantRange([1, 3], [6, 7], goal), [6, 7])
})
test('want-range for goal=all', async (t) => {
const algo = new Algorithm({ db: null })
const goal = { type: 'all' }
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 3], goal), [1, 3])
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 5], goal), [1, 5])
assert.deepStrictEqual(await algo.wantRange([1, 3], [2, 4], goal), [2, 4])
assert.deepStrictEqual(await algo.wantRange([1, 5], [2, 4], goal), [2, 4])
assert.deepStrictEqual(await algo.wantRange([1, 3], [4, 6], goal), [4, 6])
assert.deepStrictEqual(await algo.wantRange([4, 6], [1, 3], goal), [1, 3])
assert.deepStrictEqual(await algo.wantRange([1, 3], [6, 7], goal), [6, 7])
})
test('want-range for goal=dict', async (t) => {
const algo = new Algorithm({ db: null, dict: { minGhostDepth: (id, cb) => cb(null, 3) } })
const goal = { type: 'dict' }
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 3], goal), [3, 3])
assert.deepStrictEqual(await algo.wantRange([2, 4], [1, 5], goal), [3, 5])
assert.deepStrictEqual(await algo.wantRange([1, 3], [2, 4], goal), [3, 4])
assert.deepStrictEqual(await algo.wantRange([1, 5], [2, 4], goal), [3, 4])
assert.deepStrictEqual(await algo.wantRange([1, 3], [4, 6], goal), [4, 6])
assert.deepStrictEqual(await algo.wantRange([4, 6], [1, 3], goal), [3, 3])
assert.deepStrictEqual(await algo.wantRange([1, 3], [6, 7], goal), [6, 7])
})

View File

@ -1,16 +0,0 @@
{
"include": ["lib/**/*.js"],
"exclude": ["coverage/", "node_modules/", "test/"],
"compilerOptions": {
"checkJs": true,
"declaration": true,
"emitDeclarationOnly": true,
"exactOptionalPropertyTypes": true,
"forceConsistentCasingInFileNames": true,
"lib": ["es2022", "dom"],
"module": "node16",
"skipLibCheck": true,
"strict": true,
"target": "es2022"
}
}