can correctly replicate tangles
This commit is contained in:
parent
e8440b217a
commit
59d7e0d731
126
lib/index.js
126
lib/index.js
|
@ -1,8 +1,132 @@
|
|||
const makeDebug = require('debug')
|
||||
/**
|
||||
* @typedef {ReturnType<import('ppppp-db').init>} PPPPPDB
|
||||
* @typedef {ReturnType<import('ppppp-goals').init>} PPPPPGoal
|
||||
* @typedef {ReturnType<import('ppppp-set').init>} PPPPPSet
|
||||
* @typedef {ReturnType<import('ppppp-sync').init>} PPPPPSync
|
||||
* @typedef {ReturnType<import('ppppp-gc').init>} PPPPPGC
|
||||
* @typedef {[Array<string>, Array<string>]} Rules
|
||||
* @typedef {{
|
||||
* db:PPPPPDB | null,
|
||||
* goals: PPPPPGoal | null,
|
||||
* set: PPPPPSet | null,
|
||||
* sync: PPPPPSync | null,
|
||||
* gc: PPPPPGC | null,
|
||||
* }} UnknownPeer
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {{ db: PPPPPDB | null }} peer
|
||||
* @returns {asserts peer is { db: PPPPPDB }}
|
||||
*/
|
||||
function assertDBPlugin(peer) {
|
||||
if (!peer.db) throw new Error('conductor plugin needs ppppp-db plugin')
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {{ goals: PPPPPGoal | null }} peer
|
||||
* @returns {asserts peer is { goals: PPPPPGoal }}
|
||||
*/
|
||||
function assertGoalsPlugin(peer) {
|
||||
if (!peer.goals) throw new Error('conductor plugin needs ppppp-goals plugin')
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {{ set: PPPPPSet | null }} peer
|
||||
* @returns {asserts peer is { set: PPPPPSet }}
|
||||
*/
|
||||
function assertSetPlugin(peer) {
|
||||
if (!peer.set) throw new Error('conductor plugin needs ppppp-set plugin')
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {{ gc: PPPPPGC | null }} peer
|
||||
* @returns {asserts peer is { gc: PPPPPGC }}
|
||||
*/
|
||||
function assertGCPlugin(peer) {
|
||||
if (!peer.gc) throw new Error('conductor plugin needs ppppp-gc plugin')
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {{ sync: PPPPPSync | null }} peer
|
||||
* @returns {asserts peer is { sync: PPPPPSync }}
|
||||
*/
|
||||
function assertSyncPlugin(peer) {
|
||||
if (!peer.sync) throw new Error('conductor plugin needs ppppp-sync plugin')
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {UnknownPeer} peer
|
||||
* @param {unknown} config
|
||||
*/
|
||||
function initConductor(peer, config) {
|
||||
assertDBPlugin(peer)
|
||||
assertGoalsPlugin(peer)
|
||||
assertSetPlugin(peer)
|
||||
assertGCPlugin(peer)
|
||||
assertSyncPlugin(peer)
|
||||
|
||||
/**
|
||||
* Set replication goals for various tangles of an account:
|
||||
* - Account tangle
|
||||
* - Follow tangle (a Set)
|
||||
* - Each tangle in the rule
|
||||
*
|
||||
* The "rule" is just a list of domains of feeds.
|
||||
* @param {string} accountID
|
||||
* @param {Array<string>} rule
|
||||
*/
|
||||
function setupAccountGoals(accountID, rule) {
|
||||
assertDBPlugin(peer)
|
||||
assertSetPlugin(peer)
|
||||
assertGoalsPlugin(peer)
|
||||
|
||||
peer.goals.set(accountID, 'all')
|
||||
|
||||
const followDomain = peer.set.getDomain('follow')
|
||||
const followFeedID = peer.db.feed.getID(accountID, followDomain)
|
||||
peer.goals.set(followFeedID, 'set')
|
||||
|
||||
for (const domain of rule) {
|
||||
const feedID = peer.db.feed.getID(accountID, domain)
|
||||
peer.goals.set(feedID, 'all') // TODO better goal?
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts automatic sync and garbage collection.
|
||||
* Assumes that PPPPP Set has been loaded with the same accountID.
|
||||
*
|
||||
* @param {string} myID
|
||||
* @param {Rules} rules
|
||||
* @param {number} maxBytes
|
||||
*/
|
||||
function start(myID, rules, maxBytes) {
|
||||
assertDBPlugin(peer)
|
||||
assertSetPlugin(peer)
|
||||
assertGoalsPlugin(peer)
|
||||
assertGCPlugin(peer)
|
||||
assertSyncPlugin(peer)
|
||||
|
||||
const [myRule, theirRule] = rules
|
||||
|
||||
// TODO: Figure out goals for each tangle, and sizes according to maxLogBytes
|
||||
// TODO: Figure out ghost spans for dicts and sets
|
||||
|
||||
setupAccountGoals(myID, myRule)
|
||||
|
||||
// TODO: watch the set for live updates, on add, syncAccount()
|
||||
// TODO: watch the set for live updates, on remove, forgetAccount()
|
||||
const followedAccounts = peer.set.values('follow')
|
||||
for (const theirID of followedAccounts) {
|
||||
setupAccountGoals(theirID, theirRule)
|
||||
}
|
||||
|
||||
peer.gc.start(maxBytes)
|
||||
peer.sync.start()
|
||||
}
|
||||
|
||||
return {
|
||||
start,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
const test = require('node:test')
|
||||
const assert = require('node:assert')
|
||||
const p = require('node:util').promisify
|
||||
const { createPeer } = require('./util')
|
||||
|
||||
function getTexts(msgs) {
|
||||
return msgs.filter((msg) => msg.data?.text).map((msg) => msg.data.text)
|
||||
}
|
||||
|
||||
test('Replicate selected feeds of followed accounts', async (t) => {
|
||||
// Alice
|
||||
const alice = createPeer({ name: 'alice' })
|
||||
await alice.db.loaded()
|
||||
// Alice creates her own account
|
||||
const aliceID = await p(alice.db.account.create)({
|
||||
subdomain: 'account',
|
||||
_nonce: 'alice',
|
||||
})
|
||||
await p(alice.set.load)(aliceID)
|
||||
// Alice creates a feed of posts
|
||||
for (let i = 0; i < 5; i++) {
|
||||
await p(alice.db.feed.publish)({
|
||||
account: aliceID,
|
||||
domain: 'post',
|
||||
data: { text: 'A' + i },
|
||||
})
|
||||
}
|
||||
|
||||
// Bob
|
||||
const bob = createPeer({ name: 'bob' })
|
||||
await bob.db.loaded()
|
||||
// Bob creates his own account
|
||||
const bobID = await p(bob.db.account.create)({
|
||||
subdomain: 'account',
|
||||
_nonce: 'bob',
|
||||
})
|
||||
await p(bob.set.load)(bobID)
|
||||
// Bob creates a feed of posts
|
||||
for (let i = 0; i < 5; i++) {
|
||||
await p(bob.db.feed.publish)({
|
||||
account: bobID,
|
||||
domain: 'post',
|
||||
data: { text: 'B' + i },
|
||||
})
|
||||
}
|
||||
|
||||
// Carol
|
||||
const carol = createPeer({ name: 'carol' })
|
||||
await carol.db.loaded()
|
||||
// Carol creates her own account
|
||||
const carolID = await p(carol.db.account.create)({
|
||||
subdomain: 'account',
|
||||
_nonce: 'carol',
|
||||
})
|
||||
await p(carol.set.load)(bobID)
|
||||
// Carol creates a feed of posts
|
||||
for (let i = 0; i < 5; i++) {
|
||||
await p(carol.db.feed.publish)({
|
||||
account: carolID,
|
||||
domain: 'post',
|
||||
data: { text: 'C' + i },
|
||||
})
|
||||
}
|
||||
|
||||
// Alice follows Bob, but not Carol
|
||||
assert(await p(alice.set.add)('follow', bobID), 'alice follows bob')
|
||||
|
||||
alice.conductor.start(aliceID, [['post'], ['post']], 64_000_000)
|
||||
bob.conductor.start(bobID, [['post'], ['post']], 64_000_000)
|
||||
|
||||
const aliceDialingBob = await p(alice.connect)(bob.getAddress())
|
||||
const aliceDialingCarol = await p(alice.connect)(carol.getAddress())
|
||||
await p(setTimeout)(1000)
|
||||
|
||||
assert.deepEqual(
|
||||
getTexts([...alice.db.msgs()]),
|
||||
['A0', 'A1', 'A2', 'A3', 'A4', /* */ 'B0', 'B1', 'B2', 'B3', 'B4'],
|
||||
'alice has alice and bob posts'
|
||||
)
|
||||
|
||||
await p(aliceDialingBob.close)(true)
|
||||
await p(aliceDialingCarol.close)(true)
|
||||
await p(alice.close)(true)
|
||||
await p(bob.close)(true)
|
||||
await p(carol.close)(true)
|
||||
})
|
|
@ -0,0 +1,43 @@
|
|||
const os = require('node:os')
|
||||
const path = require('node:path')
|
||||
const rimraf = require('rimraf')
|
||||
const caps = require('ppppp-caps')
|
||||
const Keypair = require('ppppp-keypair')
|
||||
|
||||
function createPeer(opts) {
|
||||
if (opts.name) {
|
||||
const tmp = os.tmpdir()
|
||||
const now = Date.now()
|
||||
opts.db ??= {path: path.join(tmp, `ppppp-conductor-${opts.name}-${now}`)}
|
||||
opts.keypair ??= Keypair.generate('ed25519', opts.name)
|
||||
opts.name = undefined
|
||||
}
|
||||
if (!opts.db.path) throw new Error('need opts.db.path in createPeer()')
|
||||
if (!opts.keypair) throw new Error('need opts.keypair in createPeer()')
|
||||
|
||||
rimraf.sync(opts.db.path)
|
||||
return require('secret-stack/bare')()
|
||||
.use(require('secret-stack/plugins/net'))
|
||||
.use(require('secret-handshake-ext/secret-stack'))
|
||||
.use(require('ppppp-db'))
|
||||
.use(require('ssb-box'))
|
||||
.use(require('ppppp-set'))
|
||||
.use(require('ppppp-goals'))
|
||||
.use(require('ppppp-sync'))
|
||||
.use(require('ppppp-gc'))
|
||||
.use(require('../lib'))
|
||||
.call(null, {
|
||||
caps,
|
||||
connections: {
|
||||
incoming: {
|
||||
net: [{ scope: 'device', transform: 'shse', port: null }],
|
||||
},
|
||||
outgoing: {
|
||||
net: [{ transform: 'shse' }],
|
||||
},
|
||||
},
|
||||
...opts,
|
||||
})
|
||||
}
|
||||
|
||||
module.exports = { createPeer }
|
Loading…
Reference in New Issue