mirror of https://codeberg.org/pzp/pzp-sync.git
replace internal goals with ppppp-goals
This commit is contained in:
parent
c4bd7f9b49
commit
8e43c0efab
|
@ -2,12 +2,16 @@ const { BloomFilter } = require('bloom-filters')
|
||||||
const MsgV3 = require('ppppp-db/msg-v3')
|
const MsgV3 = require('ppppp-db/msg-v3')
|
||||||
const p = require('util').promisify
|
const p = require('util').promisify
|
||||||
const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range')
|
const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range')
|
||||||
const { parseGoal } = require('./goal')
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @typedef {import('./range').Range} Range
|
* @typedef {import('./range').Range} Range
|
||||||
* @typedef {import('./goal').Goal} Goal
|
|
||||||
* @typedef {import('ppppp-db/msg-v3').Msg} Msg
|
* @typedef {import('ppppp-db/msg-v3').Msg} Msg
|
||||||
|
*
|
||||||
|
* @typedef {{
|
||||||
|
* type: 'all' | 'newest' | 'oldest',
|
||||||
|
* count: number,
|
||||||
|
* id: string
|
||||||
|
* }} Goal
|
||||||
*/
|
*/
|
||||||
|
|
||||||
function countIter(iter) {
|
function countIter(iter) {
|
||||||
|
@ -87,13 +91,12 @@ class Algorithm {
|
||||||
wantRange(localHave, remoteHave, goal) {
|
wantRange(localHave, remoteHave, goal) {
|
||||||
if (!goal) return EMPTY_RANGE
|
if (!goal) return EMPTY_RANGE
|
||||||
if (isEmptyRange(remoteHave)) return EMPTY_RANGE
|
if (isEmptyRange(remoteHave)) return EMPTY_RANGE
|
||||||
const { type, count } = parseGoal(goal)
|
if (goal.type === 'all') {
|
||||||
if (type === 'all') {
|
|
||||||
return this.#wantAllRange(localHave, remoteHave)
|
return this.#wantAllRange(localHave, remoteHave)
|
||||||
} else if (type === 'newest') {
|
} else if (goal.type === 'newest') {
|
||||||
return this.#wantNewestRange(localHave, remoteHave, count)
|
return this.#wantNewestRange(localHave, remoteHave, goal.count)
|
||||||
} else if (type === 'oldest') {
|
} else if (goal.type === 'oldest') {
|
||||||
return this.#wantOldestRange(localHave, remoteHave, count)
|
return this.#wantOldestRange(localHave, remoteHave, goal.count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,6 +192,12 @@ class Algorithm {
|
||||||
return validNewMsgs
|
return validNewMsgs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {string} rootID
|
||||||
|
* @param {Array<Msg>} newMsgs
|
||||||
|
* @param {Goal} goal
|
||||||
|
* @param {Range} myWantRange
|
||||||
|
*/
|
||||||
async commit(rootID, newMsgs, goal, myWantRange) {
|
async commit(rootID, newMsgs, goal, myWantRange) {
|
||||||
const validNewMsgs = this.#filterReceivedMsgs(rootID, newMsgs, myWantRange)
|
const validNewMsgs = this.#filterReceivedMsgs(rootID, newMsgs, myWantRange)
|
||||||
|
|
||||||
|
@ -202,10 +211,12 @@ class Algorithm {
|
||||||
} catch {}
|
} catch {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prune. Ideally this should be in a garbage collection module
|
if (goal.type === 'newest') {
|
||||||
const { type, count } = parseGoal(goal)
|
return await this.pruneNewest(rootID, goal.count)
|
||||||
if (type === 'newest') return await this.pruneNewest(rootID, count)
|
}
|
||||||
if (type === 'oldest') throw new Error('not implemented') // TODO:
|
if (goal.type === 'oldest') {
|
||||||
|
throw new Error('not implemented') // TODO
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
53
lib/goal.js
53
lib/goal.js
|
@ -1,53 +0,0 @@
|
||||||
/**
|
|
||||||
* @typedef {'all'} GoalAll
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @typedef {`newest-${number}`} GoalNewest
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @typedef {`oldest-${number}`} GoalOldest
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @typedef {GoalAll|GoalNewest|GoalOldest} Goal
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @typedef {{type: 'all'; count: never}} ParsedAll
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @typedef {{type: 'newest' |'oldest'; count: number}} ParsedLimited
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @typedef {ParsedAll | ParsedLimited} ParsedGoal
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {Goal} goal
|
|
||||||
* @returns {ParsedGoal}
|
|
||||||
*/
|
|
||||||
function parseGoal(goal) {
|
|
||||||
if (goal === 'all') {
|
|
||||||
return { type: 'all' }
|
|
||||||
}
|
|
||||||
|
|
||||||
const matchN = goal.match(/^newest-(\d+)$/)
|
|
||||||
if (matchN) {
|
|
||||||
return { type: 'newest', count: Number(matchN[1]) }
|
|
||||||
}
|
|
||||||
|
|
||||||
const matchO = goal.match(/^oldest-(\d+)$/)
|
|
||||||
if (matchO) {
|
|
||||||
return { type: 'oldest', count: Number(matchO[1]) }
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new Error(`Invalid goal: ${goal}`)
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = {
|
|
||||||
parseGoal,
|
|
||||||
}
|
|
20
lib/index.js
20
lib/index.js
|
@ -5,10 +5,6 @@ const getSeverity = require('ssb-network-errors')
|
||||||
const Algorithm = require('./algorithm')
|
const Algorithm = require('./algorithm')
|
||||||
const SyncStream = require('./stream')
|
const SyncStream = require('./stream')
|
||||||
|
|
||||||
/**
|
|
||||||
* @typedef {import('./goal').Goal} Goal
|
|
||||||
*/
|
|
||||||
|
|
||||||
function isMuxrpcMissingError(err, namespace, methodName) {
|
function isMuxrpcMissingError(err, namespace, methodName) {
|
||||||
const jsErrorMessage = `method:${namespace},${methodName} is not in list of allowed methods`
|
const jsErrorMessage = `method:${namespace},${methodName} is not in list of allowed methods`
|
||||||
const goErrorMessage = `muxrpc: no such command: ${namespace}.${methodName}`
|
const goErrorMessage = `muxrpc: no such command: ${namespace}.${methodName}`
|
||||||
|
@ -19,7 +15,6 @@ module.exports = {
|
||||||
name: 'tangleSync',
|
name: 'tangleSync',
|
||||||
manifest: {
|
manifest: {
|
||||||
connect: 'duplex',
|
connect: 'duplex',
|
||||||
setGoal: 'sync',
|
|
||||||
initiate: 'sync',
|
initiate: 'sync',
|
||||||
},
|
},
|
||||||
permissions: {
|
permissions: {
|
||||||
|
@ -28,15 +23,17 @@ module.exports = {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
init(peer, config) {
|
init(peer, config) {
|
||||||
|
if (!peer.goals) {
|
||||||
|
throw new Error('tangleSync requires the goals plugin')
|
||||||
|
}
|
||||||
const debug = makeDebug(`ppppp:tangleSync`)
|
const debug = makeDebug(`ppppp:tangleSync`)
|
||||||
const goals = new Map()
|
|
||||||
const algo = new Algorithm(peer)
|
const algo = new Algorithm(peer)
|
||||||
|
|
||||||
const streams = []
|
const streams = []
|
||||||
function createStream(remoteId, iamClient) {
|
function createStream(remoteId, iamClient) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId)
|
debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId)
|
||||||
const stream = new SyncStream(peer.shse.pubkey, debug, goals, algo)
|
const stream = new SyncStream(peer.shse.pubkey, debug, peer.goals, algo)
|
||||||
streams.push(stream)
|
streams.push(stream)
|
||||||
return stream
|
return stream
|
||||||
}
|
}
|
||||||
|
@ -67,14 +64,6 @@ module.exports = {
|
||||||
return toPull.duplex(createStream(this.id, false))
|
return toPull.duplex(createStream(this.id, false))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param {string} tangleId
|
|
||||||
* @param {Goal} goal
|
|
||||||
*/
|
|
||||||
function setGoal(tangleId, goal = 'all') {
|
|
||||||
goals.set(tangleId, goal)
|
|
||||||
}
|
|
||||||
|
|
||||||
function initiate() {
|
function initiate() {
|
||||||
for (const stream of streams) {
|
for (const stream of streams) {
|
||||||
stream.initiate()
|
stream.initiate()
|
||||||
|
@ -83,7 +72,6 @@ module.exports = {
|
||||||
|
|
||||||
return {
|
return {
|
||||||
connect,
|
connect,
|
||||||
setGoal,
|
|
||||||
initiate,
|
initiate,
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
|
@ -2,7 +2,11 @@ const Pipeable = require('push-stream/pipeable')
|
||||||
const { isEmptyRange } = require('./range')
|
const { isEmptyRange } = require('./range')
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @typedef {import('./goal').Goal} Goal
|
* @typedef {{
|
||||||
|
* type: 'all' | 'newest' | 'oldest',
|
||||||
|
* count: number,
|
||||||
|
* id: string
|
||||||
|
* }} Goal
|
||||||
*/
|
*/
|
||||||
|
|
||||||
class SyncStream extends Pipeable {
|
class SyncStream extends Pipeable {
|
||||||
|
@ -15,7 +19,10 @@ class SyncStream extends Pipeable {
|
||||||
#requested
|
#requested
|
||||||
|
|
||||||
/** tangleId => goal
|
/** tangleId => goal
|
||||||
* @type {Map<string, Goal>} */
|
* @type {{
|
||||||
|
* getByID(id: string): Goal | null,
|
||||||
|
* list(): IterableIterator<Goal>,
|
||||||
|
* }} */
|
||||||
#goals
|
#goals
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -73,8 +80,8 @@ class SyncStream extends Pipeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
initiate() {
|
initiate() {
|
||||||
for (const id of this.#goals.keys()) {
|
for (const goal of this.#goals.list()) {
|
||||||
this.#requested.add(id)
|
this.#requested.add(goal.id)
|
||||||
}
|
}
|
||||||
this.resume()
|
this.resume()
|
||||||
}
|
}
|
||||||
|
@ -111,7 +118,7 @@ class SyncStream extends Pipeable {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN1: got remote have-range %o for %s', this.#myId, remoteHaveRange, id)
|
this.#debug('%s Stream IN1: got remote have-range %o for %s', this.#myId, remoteHaveRange, id)
|
||||||
this.#remoteHave.set(id, remoteHaveRange)
|
this.#remoteHave.set(id, remoteHaveRange)
|
||||||
const goal = this.#goals.get(id)
|
const goal = this.#goals.getByID(id)
|
||||||
const haveRange = this.#algo.haveRange(id)
|
const haveRange = this.#algo.haveRange(id)
|
||||||
const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal)
|
const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal)
|
||||||
this.#localHave.set(id, haveRange)
|
this.#localHave.set(id, haveRange)
|
||||||
|
@ -126,7 +133,7 @@ class SyncStream extends Pipeable {
|
||||||
this.#debug('%s Stream IN2: got remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id)
|
this.#debug('%s Stream IN2: got remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id)
|
||||||
this.#remoteHave.set(id, remoteHaveRange)
|
this.#remoteHave.set(id, remoteHaveRange)
|
||||||
this.#remoteWant.set(id, remoteWantRange)
|
this.#remoteWant.set(id, remoteWantRange)
|
||||||
const goal = this.#goals.get(id)
|
const goal = this.#goals.getByID(id)
|
||||||
const haveRange = this.#localHave.get(id)
|
const haveRange = this.#localHave.get(id)
|
||||||
const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal)
|
const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal)
|
||||||
this.#localWant.set(id, wantRange)
|
this.#localWant.set(id, wantRange)
|
||||||
|
@ -253,7 +260,7 @@ class SyncStream extends Pipeable {
|
||||||
this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id)
|
this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id)
|
||||||
this.sink.write({ id, phase: 9, payload: msgs })
|
this.sink.write({ id, phase: 9, payload: msgs })
|
||||||
|
|
||||||
const goal = this.#goals.get(id)
|
const goal = this.#goals.getByID(id)
|
||||||
const localWant = this.#localWant.get(id)
|
const localWant = this.#localWant.get(id)
|
||||||
this.#requested.delete(id)
|
this.#requested.delete(id)
|
||||||
this.#localHave.delete(id)
|
this.#localHave.delete(id)
|
||||||
|
@ -275,7 +282,7 @@ class SyncStream extends Pipeable {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, msgsForMe.length, id)
|
this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, msgsForMe.length, id)
|
||||||
|
|
||||||
const goal = this.#goals.get(id)
|
const goal = this.#goals.getByID(id)
|
||||||
const localWant = this.#localWant.get(id)
|
const localWant = this.#localWant.get(id)
|
||||||
this.#requested.delete(id)
|
this.#requested.delete(id)
|
||||||
this.#localHave.delete(id)
|
this.#localHave.delete(id)
|
||||||
|
|
|
@ -37,6 +37,7 @@
|
||||||
"c8": "7",
|
"c8": "7",
|
||||||
"ppppp-db": "github:staltz/ppppp-db",
|
"ppppp-db": "github:staltz/ppppp-db",
|
||||||
"ppppp-caps": "github:staltz/ppppp-caps",
|
"ppppp-caps": "github:staltz/ppppp-caps",
|
||||||
|
"ppppp-goals": "github:staltz/ppppp-goals",
|
||||||
"ppppp-keypair": "github:staltz/ppppp-keypair",
|
"ppppp-keypair": "github:staltz/ppppp-keypair",
|
||||||
"prettier": "^2.6.2",
|
"prettier": "^2.6.2",
|
||||||
"pretty-quick": "^3.1.3",
|
"pretty-quick": "^3.1.3",
|
||||||
|
|
|
@ -51,8 +51,8 @@ test('sync an account tangle', async (t) => {
|
||||||
"bob doesn't have alice's account tangle"
|
"bob doesn't have alice's account tangle"
|
||||||
)
|
)
|
||||||
|
|
||||||
bob.tangleSync.setGoal(aliceID, 'all')
|
bob.goals.set(aliceID, 'all')
|
||||||
alice.tangleSync.setGoal(aliceID, 'all')
|
alice.goals.set(aliceID, 'all')
|
||||||
|
|
||||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
||||||
assert('bob connected to alice')
|
assert('bob connected to alice')
|
||||||
|
|
|
@ -55,8 +55,8 @@ test('sync a feed with goal=all', async (t) => {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
bob.tangleSync.setGoal(carolPostsMootID, 'all')
|
bob.goals.set(carolPostsMootID, 'all')
|
||||||
alice.tangleSync.setGoal(carolPostsMootID, 'all')
|
alice.goals.set(carolPostsMootID, 'all')
|
||||||
|
|
||||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
||||||
assert('bob connected to alice')
|
assert('bob connected to alice')
|
||||||
|
@ -129,8 +129,8 @@ test('sync a feed with goal=newest', async (t) => {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
bob.tangleSync.setGoal(carolPostsMootID, 'newest-5')
|
bob.goals.set(carolPostsMootID, 'newest-5')
|
||||||
alice.tangleSync.setGoal(carolPostsMootID, 'all')
|
alice.goals.set(carolPostsMootID, 'all')
|
||||||
|
|
||||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
||||||
assert('bob connected to alice')
|
assert('bob connected to alice')
|
||||||
|
@ -211,8 +211,8 @@ test('sync a feed with goal=newest but too far behind', async (t) => {
|
||||||
assert.deepEqual(arr, ['m1', 'm2'], 'bob has msgs 1..2 from carol')
|
assert.deepEqual(arr, ['m1', 'm2'], 'bob has msgs 1..2 from carol')
|
||||||
}
|
}
|
||||||
|
|
||||||
alice.tangleSync.setGoal(carolPostsMootID, 'newest-5')
|
alice.goals.set(carolPostsMootID, 'newest-5')
|
||||||
bob.tangleSync.setGoal(carolPostsMootID, 'newest-8')
|
bob.goals.set(carolPostsMootID, 'newest-8')
|
||||||
|
|
||||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
||||||
assert('bob connected to alice')
|
assert('bob connected to alice')
|
||||||
|
|
|
@ -149,8 +149,8 @@ test('sync a thread where both peers have portions', async (t) => {
|
||||||
'bob has another portion of the thread'
|
'bob has another portion of the thread'
|
||||||
)
|
)
|
||||||
|
|
||||||
bob.tangleSync.setGoal(startA.id, 'all')
|
bob.goals.set(startA.id, 'all')
|
||||||
alice.tangleSync.setGoal(startA.id, 'all')
|
alice.goals.set(startA.id, 'all')
|
||||||
|
|
||||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
||||||
assert('bob connected to alice')
|
assert('bob connected to alice')
|
||||||
|
@ -228,9 +228,9 @@ test('sync a thread where initiator does not have the root', async (t) => {
|
||||||
|
|
||||||
assert.deepEqual(getTexts(bob.db.msgs()), [], 'bob has nothing')
|
assert.deepEqual(getTexts(bob.db.msgs()), [], 'bob has nothing')
|
||||||
|
|
||||||
bob.tangleSync.setGoal(rootA.id, 'all')
|
bob.goals.set(rootA.id, 'all')
|
||||||
// ON PURPOSE: alice does not set the goal
|
// ON PURPOSE: alice does not set the goal
|
||||||
// alice.tangleSync.setGoal(rootA.id, 'all')
|
// alice.goals.set(rootA.id, 'all')
|
||||||
|
|
||||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
||||||
assert('bob connected to alice')
|
assert('bob connected to alice')
|
||||||
|
@ -302,8 +302,8 @@ test('sync a thread where receiver does not have the root', async (t) => {
|
||||||
|
|
||||||
assert.deepEqual(getTexts(bob.db.msgs()), [], 'bob has nothing')
|
assert.deepEqual(getTexts(bob.db.msgs()), [], 'bob has nothing')
|
||||||
|
|
||||||
bob.tangleSync.setGoal(rootA.id, 'all')
|
bob.goals.set(rootA.id, 'all')
|
||||||
alice.tangleSync.setGoal(rootA.id, 'all')
|
alice.goals.set(rootA.id, 'all')
|
||||||
|
|
||||||
const remoteBob = await p(alice.connect)(bob.getAddress())
|
const remoteBob = await p(alice.connect)(bob.getAddress())
|
||||||
assert('alice connected to bob')
|
assert('alice connected to bob')
|
||||||
|
@ -382,8 +382,8 @@ test('sync a thread with reactions too', async (t) => {
|
||||||
|
|
||||||
assert.deepEqual(getTexts(bob.db.msgs()), [], 'bob has nothing')
|
assert.deepEqual(getTexts(bob.db.msgs()), [], 'bob has nothing')
|
||||||
|
|
||||||
bob.tangleSync.setGoal(rootA.id, 'all')
|
bob.goals.set(rootA.id, 'all')
|
||||||
alice.tangleSync.setGoal(rootA.id, 'all')
|
alice.goals.set(rootA.id, 'all')
|
||||||
|
|
||||||
const remoteBob = await p(alice.connect)(bob.getAddress())
|
const remoteBob = await p(alice.connect)(bob.getAddress())
|
||||||
assert('alice connected to bob')
|
assert('alice connected to bob')
|
||||||
|
|
|
@ -18,6 +18,7 @@ function createPeer(opts) {
|
||||||
.use(require('secret-stack/plugins/net'))
|
.use(require('secret-stack/plugins/net'))
|
||||||
.use(require('secret-handshake-ext/secret-stack'))
|
.use(require('secret-handshake-ext/secret-stack'))
|
||||||
.use(require('ppppp-db'))
|
.use(require('ppppp-db'))
|
||||||
|
.use(require('ppppp-goals'))
|
||||||
.use(require('ssb-box'))
|
.use(require('ssb-box'))
|
||||||
.use(require('../lib'))
|
.use(require('../lib'))
|
||||||
.call(null, {
|
.call(null, {
|
||||||
|
|
Loading…
Reference in New Issue