Rename to pzp and use async db fns

This commit is contained in:
Jacob Karlsson 2024-05-05 20:22:09 +02:00
parent f46b979d8d
commit 001ab41071
8 changed files with 144 additions and 124 deletions

View File

@ -1,25 +0,0 @@
name: CI
on:
push:
branches: [master]
pull_request:
branches: [master]
jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 10
strategy:
matrix:
node-version: [18.x, 20.x]
steps:
- uses: actions/checkout@v3
- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}
- run: npm install
- run: npm test

13
.woodpecker.yaml Normal file
View File

@ -0,0 +1,13 @@
matrix:
NODE_VERSION:
- 18
- 20
steps:
test:
when:
event: [push]
image: node:${NODE_VERSION}
commands:
- npm install
- npm test

View File

@ -1,9 +1,9 @@
**Work in progress** # pzp-conductor
PZP manager that sets tangle goals
## Installation ## Installation
We're not on npm yet. In your package.json, include this as
```js ```js
"ppppp-conductor": "github:staltz/ppppp-conductor" npm install pzp-conductor
``` ```

View File

@ -1,26 +1,34 @@
const makeDebug = require('debug') const makeDebug = require('debug')
const MsgV4 = require('ppppp-db/msg-v4') const MsgV4 = require('pzp-db/msg-v4')
/** /**
* @typedef {ReturnType<import('ppppp-db').init>} PPPPPDB * @typedef {ReturnType<import('pzp-db').init>} PZPDB
* @typedef {ReturnType<import('ppppp-goals').init>} PPPPPGoal * @typedef {ReturnType<import('pzp-goals').init>} PZPGoal
* @typedef {import('ppppp-goals').GoalDSL} GoalDSL * @typedef {import('pzp-goals').GoalDSL} GoalDSL
* @typedef {ReturnType<import('ppppp-set').init>} PPPPPSet * @typedef {ReturnType<import('pzp-set').init>} PZPSet
* @typedef {ReturnType<import('ppppp-dict').init>} PPPPPDict * @typedef {ReturnType<import('pzp-dict').init>} PZPDict
* @typedef {ReturnType<import('ppppp-sync').init>} PPPPPSync * @typedef {ReturnType<import('pzp-sync').init>} PZPSync
* @typedef {ReturnType<import('ppppp-gc').init>} PPPPPGC * @typedef {ReturnType<import('pzp-gc').init>} PZPGC
* @typedef {`${string}@${GoalDSL}`} Rule * @typedef {`${string}@${GoalDSL}`} Rule
* @typedef {[Array<Rule>, Array<Rule>]} Rules * @typedef {[Array<Rule>, Array<Rule>]} Rules
* @typedef {{ * @typedef {{
* db: PPPPPDB, * db: PZPDB,
* goals: PPPPPGoal, * goals: PZPGoal,
* set: PPPPPSet, * set: PZPSet,
* sync: PPPPPSync, * sync: PZPSync,
* gc: PPPPPGC, * gc: PZPGC,
* dict: PPPPPDict | null, * dict: PZPDict | null,
* }} Peer * }} Peer
*/ */
/**
* @template T
* @typedef {T extends void ?
* (...args: [Error] | []) => void :
* (...args: [Error] | [null, T]) => void
* } CB
*/
/** /**
* @param {any} rule * @param {any} rule
* @returns {[string, GoalDSL]} * @returns {[string, GoalDSL]}
@ -81,7 +89,7 @@ function initConductor(peer, config) {
*/ */
const MAX_DECENT_MAXBYTES = 100 * 1024 * 1024 // 100 MB const MAX_DECENT_MAXBYTES = 100 * 1024 * 1024 // 100 MB
const debug = makeDebug('ppppp:conductor') const debug = makeDebug('pzp:conductor')
/** /**
* @param {Array<Rule>} rules * @param {Array<Rule>} rules
@ -213,57 +221,63 @@ function initConductor(peer, config) {
/** /**
* Starts automatic sync and garbage collection. * Starts automatic sync and garbage collection.
* Assumes that PPPPP Set has been loaded with the same accountID. * Assumes that PZP Set has been loaded with the same accountID.
* *
* @param {string} myID * @param {string} myID
* @param {[Array<Rule>, Array<Rule>]} rules * @param {[Array<Rule>, Array<Rule>]} rules
* @param {number} maxBytes * @param {number} maxBytes
* @param {CB<void>} cb
*/ */
function start(myID, rules, maxBytes) { function start(myID, rules, maxBytes, cb) {
if (maxBytes < MIN_MAXBYTES) { if (maxBytes < MIN_MAXBYTES) {
// prettier-ignore // prettier-ignore
throw new Error(`ppppp-conductor maxBytes must be at least ${MIN_MAXBYTES} bytes, got ${maxBytes}`) return cb(Error(`pzp-conductor maxBytes must be at least ${MIN_MAXBYTES} bytes, got ${maxBytes}`))
} }
if (maxBytes > MAX_DECENT_MAXBYTES) { if (maxBytes > MAX_DECENT_MAXBYTES) {
// prettier-ignore // prettier-ignore
debug('WARNING. maxBytes is too big, we recommend at most %s bytes', MAX_DECENT_MAXBYTES) debug('WARNING. maxBytes is too big, we recommend at most %s bytes', MAX_DECENT_MAXBYTES)
} }
const follows = peer.set.values('follows') peer.set.values('follows', null, (err, follows) => {
const numFollows = follows.length if (err) return cb(err)
const [myRules, theirRules] = validateRules(rules, numFollows, maxBytes)
// Set up goals for my account and each account I follow const numFollows = follows.length
setupAccountGoals(myID, myRules) const [myRules, theirRules] = validateRules(rules, numFollows, maxBytes)
for (const theirID of follows) {
setupAccountGoals(theirID, theirRules) // Set up goals for my account and each account I follow
} setupAccountGoals(myID, myRules)
// @ts-ignore for (const theirID of follows) {
peer.set.watch(({ event, subdomain, value }) => {
const theirID = value
if (subdomain === 'follows' && event === 'add') {
setupAccountGoals(theirID, theirRules) setupAccountGoals(theirID, theirRules)
} }
if (subdomain === 'follows' && event === 'del') { // @ts-ignore
teardownAccountGoals(theirID, theirRules) peer.set.watch(({ event, subdomain, value }) => {
} const theirID = value
if (subdomain === 'blocks' && event === 'add') { if (subdomain === 'follows' && event === 'add') {
teardownAccountGoals(theirID, theirRules) setupAccountGoals(theirID, theirRules)
} }
if (subdomain === 'follows' && event === 'del') {
teardownAccountGoals(theirID, theirRules)
}
if (subdomain === 'blocks' && event === 'add') {
teardownAccountGoals(theirID, theirRules)
}
})
// Figure out ghost span for each account
const totalGhostableFeeds =
countGhostableFeeds(myRules) +
numFollows * countGhostableFeeds(theirRules)
const TOTAL_GHOSTS = ESTIMATE_TOTAL_GHOST_BYTES / MSG_ID_BYTES
const ghostSpan = Math.round(TOTAL_GHOSTS / totalGhostableFeeds)
peer.set.setGhostSpan(ghostSpan)
peer.dict?.setGhostSpan(ghostSpan)
// Kick off garbage collection and synchronization
peer.gc.start(maxBytes)
peer.sync.start()
cb()
}) })
// Figure out ghost span for each account
const totalGhostableFeeds =
countGhostableFeeds(myRules) +
numFollows * countGhostableFeeds(theirRules)
const TOTAL_GHOSTS = ESTIMATE_TOTAL_GHOST_BYTES / MSG_ID_BYTES
const ghostSpan = Math.round(TOTAL_GHOSTS / totalGhostableFeeds)
peer.set.setGhostSpan(ghostSpan)
peer.dict?.setGhostSpan(ghostSpan)
// Kick off garbage collection and synchronization
peer.gc.start(maxBytes)
peer.sync.start()
} }
return { return {

View File

@ -1,13 +1,13 @@
{ {
"name": "ppppp-conductor", "name": "pzp-conductor",
"version": "1.0.0", "version": "0.0.1",
"description": "PPPPP manager that sets tangle goals", "description": "PZP manager that sets tangle goals",
"author": "Andre Staltz <contact@staltz.com>", "author": "Andre Staltz <contact@staltz.com>",
"license": "MIT", "license": "MIT",
"homepage": "https://github.com/staltz/ppppp-conductor", "homepage": "https://codeberg.org/pzp/pzp-conductor",
"repository": { "repository": {
"type": "git", "type": "git",
"url": "git@github.com:staltz/ppppp-conductor.git" "url": "git@codeberg.org:pzp/pzp-conductor.git"
}, },
"type": "commonjs", "type": "commonjs",
"main": "index.js", "main": "index.js",
@ -26,18 +26,18 @@
"debug": "^4.3.4" "debug": "^4.3.4"
}, },
"devDependencies": { "devDependencies": {
"@types/node": "18.x", "@types/node": "^18.19.31",
"@types/debug": "4.1.9", "@types/debug": "4.1.9",
"bs58": "^5.0.0", "bs58": "^5.0.0",
"c8": "7", "c8": "7",
"ppppp-caps": "github:staltz/ppppp-caps#93fa810b9a40b78aef4872d4c2a8412cccb52929", "pzp-caps": "^1.0.0",
"ppppp-db": "github:staltz/ppppp-db#cf1532965ea1d16929ed2291a9b737a4ce74caac", "pzp-db": "^1.0.1",
"ppppp-dict": "github:staltz/ppppp-dict#c40d51be6cb96982b4fe691a292b3c12b6f49a36", "pzp-dict": "^1.0.0",
"ppppp-gc": "github:staltz/ppppp-gc#9075f983d8fa9a13c18a63451a78bed5912e78d0", "pzp-gc": "^1.0.0",
"ppppp-goals": "github:staltz/ppppp-goals#46a8d8889c668cf291607963fd7301f21aa634b5", "pzp-goals": "^1.0.0",
"ppppp-keypair": "github:staltz/ppppp-keypair#c33980c580e33f9a35cb0c672b916ec9fe8b4c6d", "pzp-keypair": "^1.0.0",
"ppppp-set": "github:staltz/ppppp-set#07c3e295b2d09d2d6c3ac6b5b93ad2ea80698452", "pzp-set": "^1.0.0",
"ppppp-sync": "github:staltz/ppppp-sync#93f00dbd04267f472fbf2f3ae63495092d3a921e", "pzp-sync": "^1.0.0",
"prettier": "^2.6.2", "prettier": "^2.6.2",
"pretty-quick": "^3.1.3", "pretty-quick": "^3.1.3",
"rimraf": "^4.4.0", "rimraf": "^4.4.0",

View File

@ -3,7 +3,11 @@ const assert = require('node:assert')
const p = require('node:util').promisify const p = require('node:util').promisify
const { createPeer } = require('./util') const { createPeer } = require('./util')
function getTexts(msgs) { async function getTexts(iter) {
const msgs = []
for await (i of iter) {
msgs.push(i)
}
return msgs.filter((msg) => msg.data?.text).map((msg) => msg.data.text) return msgs.filter((msg) => msg.data?.text).map((msg) => msg.data.text)
} }
@ -18,7 +22,7 @@ test('Sets goals according to input rules', async (t) => {
}) })
await p(alice.set.load)(aliceID) await p(alice.set.load)(aliceID)
alice.conductor.start( await p(alice.conductor.start)(
aliceID, aliceID,
[['posts@newest-100', 'hubs@set', 'profile@dict']], [['posts@newest-100', 'hubs@set', 'profile@dict']],
64_000_000 64_000_000
@ -107,15 +111,15 @@ test('Replicate selected feeds of followed accounts', async (t) => {
// Alice follows Bob, but not Carol // Alice follows Bob, but not Carol
assert(await p(alice.set.add)('follows', bobID), 'alice follows bob') assert(await p(alice.set.add)('follows', bobID), 'alice follows bob')
alice.conductor.start(aliceID, [['post@all'], ['post@all']], 64_000_000) await p(alice.conductor.start)(aliceID, [['post@all'], ['post@all']], 64_000_000)
bob.conductor.start(bobID, [['post@all'], ['post@all']], 64_000_000) await p(bob.conductor.start)(bobID, [['post@all'], ['post@all']], 64_000_000)
const aliceDialingBob = await p(alice.connect)(bob.getAddress()) const aliceDialingBob = await p(alice.connect)(bob.getAddress())
const aliceDialingCarol = await p(alice.connect)(carol.getAddress()) const aliceDialingCarol = await p(alice.connect)(carol.getAddress())
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert.deepEqual( assert.deepEqual(
getTexts([...alice.db.msgs()]), await getTexts(alice.db.msgs()),
['A0', 'A1', 'A2', 'A3', 'A4', /* */ 'B0', 'B1', 'B2', 'B3', 'B4'], ['A0', 'A1', 'A2', 'A3', 'A4', /* */ 'B0', 'B1', 'B2', 'B3', 'B4'],
'alice has alice and bob posts' 'alice has alice and bob posts'
) )
@ -185,15 +189,15 @@ test('GC selected feeds of followed accounts', async (t) => {
// Alice follows Bob, but not Carol // Alice follows Bob, but not Carol
assert(await p(alice.set.add)('follows', bobID), 'alice follows bob') assert(await p(alice.set.add)('follows', bobID), 'alice follows bob')
alice.conductor.start(aliceID, [['post@all'], ['post@all']], 64_000_000) await p(alice.conductor.start)(aliceID, [['post@all'], ['post@all']], 64_000_000)
bob.conductor.start(bobID, [['post@all'], ['post@all']], 64_000_000) await p(bob.conductor.start)(bobID, [['post@all'], ['post@all']], 64_000_000)
const aliceDialingBob = await p(alice.connect)(bob.getAddress()) const aliceDialingBob = await p(alice.connect)(bob.getAddress())
const aliceDialingCarol = await p(alice.connect)(carol.getAddress()) const aliceDialingCarol = await p(alice.connect)(carol.getAddress())
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert.deepEqual( assert.deepEqual(
getTexts([...alice.db.msgs()]), await getTexts(alice.db.msgs()),
['A0', 'A1', 'A2', 'A3', 'A4', /* */ 'B0', 'B1', 'B2', 'B3', 'B4'], ['A0', 'A1', 'A2', 'A3', 'A4', /* */ 'B0', 'B1', 'B2', 'B3', 'B4'],
'alice has alice and bob posts' 'alice has alice and bob posts'
) )
@ -201,13 +205,13 @@ test('GC selected feeds of followed accounts', async (t) => {
await p(aliceDialingBob.close)(true) await p(aliceDialingBob.close)(true)
await p(aliceDialingCarol.close)(true) await p(aliceDialingCarol.close)(true)
alice.conductor.start(aliceID, [['post@all'], ['post@newest-2']], 8_000) await p(alice.conductor.start)(aliceID, [['post@all'], ['post@newest-2']], 8_000)
const aliceDialingBob2 = await p(alice.connect)(bob.getAddress()) const aliceDialingBob2 = await p(alice.connect)(bob.getAddress())
const aliceDialingCarol2 = await p(alice.connect)(carol.getAddress()) const aliceDialingCarol2 = await p(alice.connect)(carol.getAddress())
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert.deepEqual( assert.deepEqual(
getTexts([...alice.db.msgs()]), await getTexts(alice.db.msgs()),
['A0', 'A1', 'A2', 'A3', 'A4', /* */ 'B3', 'B4'], ['A0', 'A1', 'A2', 'A3', 'A4', /* */ 'B3', 'B4'],
'alice has alice and bob posts' 'alice has alice and bob posts'
) )
@ -277,15 +281,15 @@ test('GC recently-unfollowed accounts', async (t) => {
// Alice follows Bob, but not Carol // Alice follows Bob, but not Carol
assert(await p(alice.set.add)('follows', bobID), 'alice follows bob') assert(await p(alice.set.add)('follows', bobID), 'alice follows bob')
alice.conductor.start(aliceID, [['post@all'], ['post@all']], 4_000) await p(alice.conductor.start)(aliceID, [['post@all'], ['post@all']], 4_000)
bob.conductor.start(bobID, [['post@all'], ['post@all']], 4_000) await p(bob.conductor.start)(bobID, [['post@all'], ['post@all']], 4_000)
const aliceDialingBob = await p(alice.connect)(bob.getAddress()) const aliceDialingBob = await p(alice.connect)(bob.getAddress())
const aliceDialingCarol = await p(alice.connect)(carol.getAddress()) const aliceDialingCarol = await p(alice.connect)(carol.getAddress())
await p(setTimeout)(2000) await p(setTimeout)(2000)
assert.deepEqual( assert.deepEqual(
getTexts([...alice.db.msgs()]), await getTexts(alice.db.msgs()),
['A0', 'A1', 'A2', 'A3', 'A4', /* */ 'B0', 'B1', 'B2', 'B3', 'B4'], ['A0', 'A1', 'A2', 'A3', 'A4', /* */ 'B0', 'B1', 'B2', 'B3', 'B4'],
'alice has alice and bob posts' 'alice has alice and bob posts'
) )
@ -294,7 +298,7 @@ test('GC recently-unfollowed accounts', async (t) => {
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert.deepEqual( assert.deepEqual(
getTexts([...alice.db.msgs()]), await getTexts(alice.db.msgs()),
['A0', 'A1', 'A2', 'A3', 'A4'], ['A0', 'A1', 'A2', 'A3', 'A4'],
'alice has alice posts' 'alice has alice posts'
) )
@ -364,15 +368,15 @@ test('GC recently-blocked accounts', async (t) => {
// Alice follows Bob, but not Carol // Alice follows Bob, but not Carol
assert(await p(alice.set.add)('follows', bobID), 'alice follows bob') assert(await p(alice.set.add)('follows', bobID), 'alice follows bob')
alice.conductor.start(aliceID, [['post@all'], ['post@all']], 4_000) await p(alice.conductor.start)(aliceID, [['post@all'], ['post@all']], 4_000)
bob.conductor.start(bobID, [['post@all'], ['post@all']], 4_000) await p(bob.conductor.start)(bobID, [['post@all'], ['post@all']], 4_000)
const aliceDialingBob = await p(alice.connect)(bob.getAddress()) const aliceDialingBob = await p(alice.connect)(bob.getAddress())
const aliceDialingCarol = await p(alice.connect)(carol.getAddress()) const aliceDialingCarol = await p(alice.connect)(carol.getAddress())
await p(setTimeout)(2000) await p(setTimeout)(2000)
assert.deepEqual( assert.deepEqual(
getTexts([...alice.db.msgs()]), await getTexts(alice.db.msgs()),
['A0', 'A1', 'A2', 'A3', 'A4', /* */ 'B0', 'B1', 'B2', 'B3', 'B4'], ['A0', 'A1', 'A2', 'A3', 'A4', /* */ 'B0', 'B1', 'B2', 'B3', 'B4'],
'alice has alice and bob posts' 'alice has alice and bob posts'
) )
@ -381,7 +385,7 @@ test('GC recently-blocked accounts', async (t) => {
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert.deepEqual( assert.deepEqual(
getTexts([...alice.db.msgs()]), await getTexts(alice.db.msgs()),
['A0', 'A1', 'A2', 'A3', 'A4'], ['A0', 'A1', 'A2', 'A3', 'A4'],
'alice has alice posts' 'alice has alice posts'
) )
@ -427,8 +431,8 @@ test('Set and Dict ghost spans', async (t) => {
// Alice follows Bob, but not Carol // Alice follows Bob, but not Carol
assert(await p(alice.set.add)('follows', bobID), 'alice follows bob') assert(await p(alice.set.add)('follows', bobID), 'alice follows bob')
alice.conductor.start(aliceID, [['post@all'], ['post@all']], 4_000) await p(alice.conductor.start)(aliceID, [['post@all'], ['post@all']], 4_000)
bob.conductor.start(bobID, [['post@all'], ['post@all']], 4_000) await p(bob.conductor.start)(bobID, [['post@all'], ['post@all']], 4_000)
assert.equal(alice.set.getGhostSpan(), 5958, 'alice set ghost span is 2') assert.equal(alice.set.getGhostSpan(), 5958, 'alice set ghost span is 2')
assert.equal(alice.dict.getGhostSpan(), 5958, 'alice set ghost span is 2') assert.equal(alice.dict.getGhostSpan(), 5958, 'alice set ghost span is 2')

View File

@ -1,15 +1,17 @@
const OS = require('node:os') const OS = require('node:os')
const Path = require('node:path') const Path = require('node:path')
const rimraf = require('rimraf') const rimraf = require('rimraf')
const caps = require('ppppp-caps') /** @type {string} */
const Keypair = require('ppppp-keypair') // @ts-ignore
const caps = require('pzp-caps')
const Keypair = require('pzp-keypair')
function createPeer(config) { function createPeer(config) {
if (config.name) { if (config.name) {
const name = config.name const name = config.name
const tmp = OS.tmpdir() const tmp = OS.tmpdir()
config.global ??= {} config.global ??= {}
config.global.path ??= Path.join(tmp, `ppppp-conduct-${name}-${Date.now()}`) config.global.path ??= Path.join(tmp, `pzp-conduct-${name}-${Date.now()}`)
config.global.keypair ??= Keypair.generate('ed25519', name) config.global.keypair ??= Keypair.generate('ed25519', name)
delete config.name delete config.name
} }
@ -24,16 +26,19 @@ function createPeer(config) {
} }
rimraf.sync(config.global.path) rimraf.sync(config.global.path)
// @ts-ignore
return require('secret-stack/bare')() return require('secret-stack/bare')()
// @ts-ignore
.use(require('secret-stack/plugins/net')) .use(require('secret-stack/plugins/net'))
.use(require('secret-handshake-ext/secret-stack')) .use(require('secret-handshake-ext/secret-stack'))
// @ts-ignore
.use(require('ssb-box')) .use(require('ssb-box'))
.use(require('ppppp-db')) .use(require('pzp-db'))
.use(require('ppppp-set')) .use(require('pzp-set'))
.use(require('ppppp-dict')) .use(require('pzp-dict'))
.use(require('ppppp-goals')) .use(require('pzp-goals'))
.use(require('ppppp-sync')) .use(require('pzp-sync'))
.use(require('ppppp-gc')) .use(require('pzp-gc'))
.use(require('../lib')) .use(require('../lib'))
.call(null, { .call(null, {
shse: { caps }, shse: { caps },

View File

@ -1,13 +1,22 @@
{ {
"include": ["lib/**/*.js"], "include": [
"exclude": ["coverage/", "node_modules/", "test/"], "lib/**/*.js"
],
"exclude": [
"coverage/",
"node_modules/",
"test/"
],
"compilerOptions": { "compilerOptions": {
"checkJs": true, "checkJs": true,
"declaration": true, "declaration": true,
"emitDeclarationOnly": true, "emitDeclarationOnly": true,
"exactOptionalPropertyTypes": true, "exactOptionalPropertyTypes": true,
"forceConsistentCasingInFileNames": true, "forceConsistentCasingInFileNames": true,
"lib": ["es2022", "dom"], "lib": [
"es2022",
"dom"
],
"module": "node16", "module": "node16",
"skipLibCheck": true, "skipLibCheck": true,
"strict": true, "strict": true,