diff --git a/lib/algorithm.js b/lib/algorithm.js index 05fb485..a33b1a5 100644 --- a/lib/algorithm.js +++ b/lib/algorithm.js @@ -2,12 +2,16 @@ const { BloomFilter } = require('bloom-filters') const MsgV3 = require('ppppp-db/msg-v3') const p = require('util').promisify const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range') -const { parseGoal } = require('./goal') /** * @typedef {import('./range').Range} Range - * @typedef {import('./goal').Goal} Goal * @typedef {import('ppppp-db/msg-v3').Msg} Msg + * + * @typedef {{ + * type: 'all' | 'newest' | 'oldest', + * count: number, + * id: string + * }} Goal */ function countIter(iter) { @@ -87,13 +91,12 @@ class Algorithm { wantRange(localHave, remoteHave, goal) { if (!goal) return EMPTY_RANGE if (isEmptyRange(remoteHave)) return EMPTY_RANGE - const { type, count } = parseGoal(goal) - if (type === 'all') { + if (goal.type === 'all') { return this.#wantAllRange(localHave, remoteHave) - } else if (type === 'newest') { - return this.#wantNewestRange(localHave, remoteHave, count) - } else if (type === 'oldest') { - return this.#wantOldestRange(localHave, remoteHave, count) + } else if (goal.type === 'newest') { + return this.#wantNewestRange(localHave, remoteHave, goal.count) + } else if (goal.type === 'oldest') { + return this.#wantOldestRange(localHave, remoteHave, goal.count) } } @@ -189,6 +192,12 @@ class Algorithm { return validNewMsgs } + /** + * @param {string} rootID + * @param {Array} newMsgs + * @param {Goal} goal + * @param {Range} myWantRange + */ async commit(rootID, newMsgs, goal, myWantRange) { const validNewMsgs = this.#filterReceivedMsgs(rootID, newMsgs, myWantRange) @@ -202,10 +211,12 @@ class Algorithm { } catch {} } - // Prune. Ideally this should be in a garbage collection module - const { type, count } = parseGoal(goal) - if (type === 'newest') return await this.pruneNewest(rootID, count) - if (type === 'oldest') throw new Error('not implemented') // TODO: + if (goal.type === 'newest') { + return await this.pruneNewest(rootID, goal.count) + } + if (goal.type === 'oldest') { + throw new Error('not implemented') // TODO + } } /** diff --git a/lib/goal.js b/lib/goal.js deleted file mode 100644 index b4a89b3..0000000 --- a/lib/goal.js +++ /dev/null @@ -1,53 +0,0 @@ -/** - * @typedef {'all'} GoalAll - */ - -/** - * @typedef {`newest-${number}`} GoalNewest - */ - -/** - * @typedef {`oldest-${number}`} GoalOldest - */ - -/** - * @typedef {GoalAll|GoalNewest|GoalOldest} Goal - */ - -/** - * @typedef {{type: 'all'; count: never}} ParsedAll - */ - -/** - * @typedef {{type: 'newest' |'oldest'; count: number}} ParsedLimited - */ - -/** - * @typedef {ParsedAll | ParsedLimited} ParsedGoal - */ - -/** - * @param {Goal} goal - * @returns {ParsedGoal} - */ -function parseGoal(goal) { - if (goal === 'all') { - return { type: 'all' } - } - - const matchN = goal.match(/^newest-(\d+)$/) - if (matchN) { - return { type: 'newest', count: Number(matchN[1]) } - } - - const matchO = goal.match(/^oldest-(\d+)$/) - if (matchO) { - return { type: 'oldest', count: Number(matchO[1]) } - } - - throw new Error(`Invalid goal: ${goal}`) -} - -module.exports = { - parseGoal, -} diff --git a/lib/index.js b/lib/index.js index 2649a13..3791ccd 100644 --- a/lib/index.js +++ b/lib/index.js @@ -5,10 +5,6 @@ const getSeverity = require('ssb-network-errors') const Algorithm = require('./algorithm') const SyncStream = require('./stream') -/** - * @typedef {import('./goal').Goal} Goal - */ - function isMuxrpcMissingError(err, namespace, methodName) { const jsErrorMessage = `method:${namespace},${methodName} is not in list of allowed methods` const goErrorMessage = `muxrpc: no such command: ${namespace}.${methodName}` @@ -19,7 +15,6 @@ module.exports = { name: 'tangleSync', manifest: { connect: 'duplex', - setGoal: 'sync', initiate: 'sync', }, permissions: { @@ -28,15 +23,17 @@ module.exports = { }, }, init(peer, config) { + if (!peer.goals) { + throw new Error('tangleSync requires the goals plugin') + } const debug = makeDebug(`ppppp:tangleSync`) - const goals = new Map() const algo = new Algorithm(peer) const streams = [] function createStream(remoteId, iamClient) { // prettier-ignore debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId) - const stream = new SyncStream(peer.shse.pubkey, debug, goals, algo) + const stream = new SyncStream(peer.shse.pubkey, debug, peer.goals, algo) streams.push(stream) return stream } @@ -67,14 +64,6 @@ module.exports = { return toPull.duplex(createStream(this.id, false)) } - /** - * @param {string} tangleId - * @param {Goal} goal - */ - function setGoal(tangleId, goal = 'all') { - goals.set(tangleId, goal) - } - function initiate() { for (const stream of streams) { stream.initiate() @@ -83,7 +72,6 @@ module.exports = { return { connect, - setGoal, initiate, } }, diff --git a/lib/stream.js b/lib/stream.js index 5122ed6..0a2bdb6 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -2,7 +2,11 @@ const Pipeable = require('push-stream/pipeable') const { isEmptyRange } = require('./range') /** - * @typedef {import('./goal').Goal} Goal + * @typedef {{ + * type: 'all' | 'newest' | 'oldest', + * count: number, + * id: string + * }} Goal */ class SyncStream extends Pipeable { @@ -15,7 +19,10 @@ class SyncStream extends Pipeable { #requested /** tangleId => goal - * @type {Map} */ + * @type {{ + * getByID(id: string): Goal | null, + * list(): IterableIterator, + * }} */ #goals /** @@ -73,8 +80,8 @@ class SyncStream extends Pipeable { } initiate() { - for (const id of this.#goals.keys()) { - this.#requested.add(id) + for (const goal of this.#goals.list()) { + this.#requested.add(goal.id) } this.resume() } @@ -111,7 +118,7 @@ class SyncStream extends Pipeable { // prettier-ignore this.#debug('%s Stream IN1: got remote have-range %o for %s', this.#myId, remoteHaveRange, id) this.#remoteHave.set(id, remoteHaveRange) - const goal = this.#goals.get(id) + const goal = this.#goals.getByID(id) const haveRange = this.#algo.haveRange(id) const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal) this.#localHave.set(id, haveRange) @@ -126,7 +133,7 @@ class SyncStream extends Pipeable { this.#debug('%s Stream IN2: got remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id) this.#remoteHave.set(id, remoteHaveRange) this.#remoteWant.set(id, remoteWantRange) - const goal = this.#goals.get(id) + const goal = this.#goals.getByID(id) const haveRange = this.#localHave.get(id) const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal) this.#localWant.set(id, wantRange) @@ -253,7 +260,7 @@ class SyncStream extends Pipeable { this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id) this.sink.write({ id, phase: 9, payload: msgs }) - const goal = this.#goals.get(id) + const goal = this.#goals.getByID(id) const localWant = this.#localWant.get(id) this.#requested.delete(id) this.#localHave.delete(id) @@ -275,7 +282,7 @@ class SyncStream extends Pipeable { // prettier-ignore this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, msgsForMe.length, id) - const goal = this.#goals.get(id) + const goal = this.#goals.getByID(id) const localWant = this.#localWant.get(id) this.#requested.delete(id) this.#localHave.delete(id) diff --git a/package.json b/package.json index 5a86d6b..debf82f 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "c8": "7", "ppppp-db": "github:staltz/ppppp-db", "ppppp-caps": "github:staltz/ppppp-caps", + "ppppp-goals": "github:staltz/ppppp-goals", "ppppp-keypair": "github:staltz/ppppp-keypair", "prettier": "^2.6.2", "pretty-quick": "^3.1.3", diff --git a/test/account-sync.test.js b/test/account-sync.test.js index 2292ecb..98f9f82 100644 --- a/test/account-sync.test.js +++ b/test/account-sync.test.js @@ -51,8 +51,8 @@ test('sync an account tangle', async (t) => { "bob doesn't have alice's account tangle" ) - bob.tangleSync.setGoal(aliceID, 'all') - alice.tangleSync.setGoal(aliceID, 'all') + bob.goals.set(aliceID, 'all') + alice.goals.set(aliceID, 'all') const remoteAlice = await p(bob.connect)(alice.getAddress()) assert('bob connected to alice') diff --git a/test/feed-sync.test.js b/test/feed-sync.test.js index 2494d55..e49c9ac 100644 --- a/test/feed-sync.test.js +++ b/test/feed-sync.test.js @@ -55,8 +55,8 @@ test('sync a feed with goal=all', async (t) => { ) } - bob.tangleSync.setGoal(carolPostsMootID, 'all') - alice.tangleSync.setGoal(carolPostsMootID, 'all') + bob.goals.set(carolPostsMootID, 'all') + alice.goals.set(carolPostsMootID, 'all') const remoteAlice = await p(bob.connect)(alice.getAddress()) assert('bob connected to alice') @@ -129,8 +129,8 @@ test('sync a feed with goal=newest', async (t) => { ) } - bob.tangleSync.setGoal(carolPostsMootID, 'newest-5') - alice.tangleSync.setGoal(carolPostsMootID, 'all') + bob.goals.set(carolPostsMootID, 'newest-5') + alice.goals.set(carolPostsMootID, 'all') const remoteAlice = await p(bob.connect)(alice.getAddress()) assert('bob connected to alice') @@ -211,8 +211,8 @@ test('sync a feed with goal=newest but too far behind', async (t) => { assert.deepEqual(arr, ['m1', 'm2'], 'bob has msgs 1..2 from carol') } - alice.tangleSync.setGoal(carolPostsMootID, 'newest-5') - bob.tangleSync.setGoal(carolPostsMootID, 'newest-8') + alice.goals.set(carolPostsMootID, 'newest-5') + bob.goals.set(carolPostsMootID, 'newest-8') const remoteAlice = await p(bob.connect)(alice.getAddress()) assert('bob connected to alice') diff --git a/test/thread-sync.test.js b/test/thread-sync.test.js index 9d98e22..9cbf3f5 100644 --- a/test/thread-sync.test.js +++ b/test/thread-sync.test.js @@ -149,8 +149,8 @@ test('sync a thread where both peers have portions', async (t) => { 'bob has another portion of the thread' ) - bob.tangleSync.setGoal(startA.id, 'all') - alice.tangleSync.setGoal(startA.id, 'all') + bob.goals.set(startA.id, 'all') + alice.goals.set(startA.id, 'all') const remoteAlice = await p(bob.connect)(alice.getAddress()) assert('bob connected to alice') @@ -228,9 +228,9 @@ test('sync a thread where initiator does not have the root', async (t) => { assert.deepEqual(getTexts(bob.db.msgs()), [], 'bob has nothing') - bob.tangleSync.setGoal(rootA.id, 'all') + bob.goals.set(rootA.id, 'all') // ON PURPOSE: alice does not set the goal - // alice.tangleSync.setGoal(rootA.id, 'all') + // alice.goals.set(rootA.id, 'all') const remoteAlice = await p(bob.connect)(alice.getAddress()) assert('bob connected to alice') @@ -302,8 +302,8 @@ test('sync a thread where receiver does not have the root', async (t) => { assert.deepEqual(getTexts(bob.db.msgs()), [], 'bob has nothing') - bob.tangleSync.setGoal(rootA.id, 'all') - alice.tangleSync.setGoal(rootA.id, 'all') + bob.goals.set(rootA.id, 'all') + alice.goals.set(rootA.id, 'all') const remoteBob = await p(alice.connect)(bob.getAddress()) assert('alice connected to bob') @@ -382,8 +382,8 @@ test('sync a thread with reactions too', async (t) => { assert.deepEqual(getTexts(bob.db.msgs()), [], 'bob has nothing') - bob.tangleSync.setGoal(rootA.id, 'all') - alice.tangleSync.setGoal(rootA.id, 'all') + bob.goals.set(rootA.id, 'all') + alice.goals.set(rootA.id, 'all') const remoteBob = await p(alice.connect)(bob.getAddress()) assert('alice connected to bob') diff --git a/test/util.js b/test/util.js index 0d8e070..57f22b4 100644 --- a/test/util.js +++ b/test/util.js @@ -18,6 +18,7 @@ function createPeer(opts) { .use(require('secret-stack/plugins/net')) .use(require('secret-handshake-ext/secret-stack')) .use(require('ppppp-db')) + .use(require('ppppp-goals')) .use(require('ssb-box')) .use(require('../lib')) .call(null, {