From 7df89f343974386a7443878d74f113ff9dec5ba1 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Wed, 12 Apr 2023 23:17:02 +0300 Subject: [PATCH] set goals: all, newest, oldest --- lib/algorithm.js | 75 ++++++++++++++++++++++++--- lib/goal.js | 53 ++++++++++++++++++++ lib/plugin.js | 26 ++++++++-- lib/range.js | 12 +++++ lib/stream.js | 106 +++++++++++++++++++++++++++++---------- test/feed-sync.test.js | 5 +- test/thread-sync.test.js | 5 +- 7 files changed, 240 insertions(+), 42 deletions(-) create mode 100644 lib/goal.js diff --git a/lib/algorithm.js b/lib/algorithm.js index 55394e3..625e7f8 100644 --- a/lib/algorithm.js +++ b/lib/algorithm.js @@ -2,6 +2,15 @@ const { BloomFilter } = require('bloom-filters') const FeedV1 = require('ppppp-db/lib/feed-v1') const p = require('util').promisify const { isEmptyRange, estimateMsgCount } = require('./range') +const { parseGoal } = require('./goal') + +/** + * @typedef {import('./range').Range} Range + */ + +/** + * @typedef {import('./goal').Goal} Goal + */ function countIter(iter) { let count = 0 @@ -30,20 +39,70 @@ class Algorithm { return [0, maxDepth] } - wantRange(rootMsgId, localHaveRange, remoteHaveRange) { - if (isEmptyRange(remoteHaveRange)) return [1, 0] - const [minLocalHave, maxLocalHave] = localHaveRange - const [minRemoteHave, maxRemoteHave] = remoteHaveRange - if (minRemoteHave !== 0) throw new Error('minRemoteHave must be 0') - return [0, Math.max(maxLocalHave, maxRemoteHave)] + /** + * @param {string} rootMsgHash + * @param {Range} localHaveRange + * @param {Range} remoteHaveRange + * @returns {Range} + */ + #wantAllRange(rootMsgHash, localHaveRange, remoteHaveRange) { + return remoteHaveRange } - bloomFor(feedId, round, range, extraIds = []) { + /** + * @param {string} rootMsgHash + * @param {Range} localHaveRange + * @param {Range} remoteHaveRange + * @param {number} count + * @returns {Range} + */ + #wantNewestRange(rootMsgHash, localHaveRange, remoteHaveRange, count) { + const [minLocalHave, maxLocalHave] = localHaveRange + const [minRemoteHave, maxRemoteHave] = remoteHaveRange + if (maxRemoteHave <= maxLocalHave) return [1, 0] + const maxWant = maxRemoteHave + const size = Math.max(maxWant - maxLocalHave, count) + const minWant = Math.max(maxWant - size, maxLocalHave + 1, minRemoteHave) + return [minWant, maxWant] + } + + /** + * @param {string} rootMsgHash + * @param {Range} localHaveRange + * @param {Range} remoteHaveRange + * @param {number} count + * @returns {Range} + */ + #wantOldestRange(rootMsgHash, localHaveRange, remoteHaveRange, count) { + // FIXME: + } + + /** + * @param {string} rootMsgHash // FIXME: delete YAGNI + * @param {Range} localHave + * @param {Range} remoteHave + * @param {Goal?} goal + * @returns {Range} + */ + wantRange(rootMsgHash, localHave, remoteHave, goal) { + if (!goal) return [1, 0] + if (isEmptyRange(remoteHave)) return [1, 0] + const { type, count } = parseGoal(goal) + if (type === 'all') { + return this.#wantAllRange(rootMsgHash, localHave, remoteHave) + } else if (type === 'newest') { + return this.#wantNewestRange(rootMsgHash, localHave, remoteHave, count) + } else if (type === 'oldest') { + return this.#wantOldestRange(rootMsgHash, localHave, remoteHave, count) + } + } + + bloomFor(rootMsgHash, round, range, extraIds = []) { const filterSize = (isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds) const filter = BloomFilter.create(2 * filterSize, 0.00001) if (!isEmptyRange(range)) { - for (const msg of this.yieldMsgsIn(feedId, range)) { + for (const msg of this.yieldMsgsIn(rootMsgHash, range)) { filter.add('' + round + FeedV1.getMsgHash(msg)) } } diff --git a/lib/goal.js b/lib/goal.js new file mode 100644 index 0000000..b4a89b3 --- /dev/null +++ b/lib/goal.js @@ -0,0 +1,53 @@ +/** + * @typedef {'all'} GoalAll + */ + +/** + * @typedef {`newest-${number}`} GoalNewest + */ + +/** + * @typedef {`oldest-${number}`} GoalOldest + */ + +/** + * @typedef {GoalAll|GoalNewest|GoalOldest} Goal + */ + +/** + * @typedef {{type: 'all'; count: never}} ParsedAll + */ + +/** + * @typedef {{type: 'newest' |'oldest'; count: number}} ParsedLimited + */ + +/** + * @typedef {ParsedAll | ParsedLimited} ParsedGoal + */ + +/** + * @param {Goal} goal + * @returns {ParsedGoal} + */ +function parseGoal(goal) { + if (goal === 'all') { + return { type: 'all' } + } + + const matchN = goal.match(/^newest-(\d+)$/) + if (matchN) { + return { type: 'newest', count: Number(matchN[1]) } + } + + const matchO = goal.match(/^oldest-(\d+)$/) + if (matchO) { + return { type: 'oldest', count: Number(matchO[1]) } + } + + throw new Error(`Invalid goal: ${goal}`) +} + +module.exports = { + parseGoal, +} diff --git a/lib/plugin.js b/lib/plugin.js index 8bad376..325402c 100644 --- a/lib/plugin.js +++ b/lib/plugin.js @@ -5,6 +5,10 @@ const getSeverity = require('ssb-network-errors') const Algorithm = require('./algorithm') const SyncStream = require('./stream') +/** + * @typedef {import('./goal').Goal} Goal + */ + function isMuxrpcMissingError(err, namespace, methodName) { const jsErrorMessage = `method:${namespace},${methodName} is not in list of allowed methods` const goErrorMessage = `muxrpc: no such command: ${namespace}.${methodName}` @@ -15,7 +19,8 @@ module.exports = { name: 'tangleSync', manifest: { connect: 'duplex', - request: 'sync', + setGoal: 'sync', + initiate: 'sync', }, permissions: { anonymous: { @@ -24,13 +29,14 @@ module.exports = { }, init(peer, config) { const debug = makeDebug(`ppppp:tangleSync`) + const goals = new Map() const algo = new Algorithm(peer) const streams = [] function createStream(remoteId, iamClient) { // prettier-ignore debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId) - const stream = new SyncStream(peer.id, debug, algo) + const stream = new SyncStream(peer.id, debug, goals, algo) streams.push(stream) return stream } @@ -60,14 +66,24 @@ module.exports = { return toPull.duplex(createStream(this.id, false)) } - function request(id) { + /** + * @param {string} id + * @param {Goal} goal + */ + function setGoal(id, goal = 'all') { + goals.set(id, goal) + } + + function initiate() { for (const stream of streams) { - stream.request(id) + stream.initiate() } } + return { connect, - request, + setGoal, + initiate, } }, } diff --git a/lib/range.js b/lib/range.js index 8031f66..25c5e85 100644 --- a/lib/range.js +++ b/lib/range.js @@ -1,8 +1,20 @@ +/** + * @typedef {[number, number]} Range + */ + +/** + * @param {Range} range + * @returns {boolean} + */ function isEmptyRange(range) { const [min, max] = range return min > max } +/** + * @param {Range} range + * @returns {number} + */ function estimateMsgCount(range) { const [minDepth, maxDepth] = range const estimate = 2 * (maxDepth - minDepth + 1) diff --git a/lib/stream.js b/lib/stream.js index e82be8c..bb64f61 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -1,34 +1,81 @@ const Pipeable = require('push-stream/pipeable') -const {isEmptyRange} = require('./range') +const { isEmptyRange } = require('./range') + +/** + * @typedef {import('./goal').Goal} Goal + */ class SyncStream extends Pipeable { #myId #debug #algo + + /** Set of tangleId + * @type {Set} */ #requested + + /** tangleId => goal + * @type {Map} */ + #goals + + /** + * tangleId => have-range by local peer + * @type {Map} + */ + #localHave + + /** + * tangleId => want-range by local peer + * @type {Map} + */ + #localWant + + /** + * tangleId => have-range by remote peer + * @type {Map} + */ #remoteHave + + /** + * tangleId => want-range by remote peer + * @type {Map} + */ #remoteWant + + /** + * tangleId => Set of msgIDs + * @type {Map>} + */ #receivableMsgs + + /** + * tangleId => Set of msgIDs + * @type {Map>} + */ #sendableMsgs - constructor(localId, debug, algo) { + constructor(localId, debug, goals, algo) { super() this.paused = false // TODO: should we start as paused=true? this.ended = false this.source = this.sink = null this.#myId = localId.slice(0, 6) this.#debug = debug + this.#goals = goals this.#algo = algo this.#requested = new Set() - this.#remoteHave = new Map() // id => have-range by remote peer - this.#remoteWant = new Map() // id => want-range by remote peer - this.#receivableMsgs = new Map() // id => Set - this.#sendableMsgs = new Map() // id => Set + this.#localHave = new Map() + this.#localWant = new Map() + this.#remoteHave = new Map() + this.#remoteWant = new Map() + this.#receivableMsgs = new Map() + this.#sendableMsgs = new Map() } - // public API - request(id) { - this.#requested.add(id) + initiate() { + for (const id of this.#goals.keys()) { + this.#requested.add(id) + } this.resume() } @@ -54,6 +101,7 @@ class SyncStream extends Pipeable { #sendLocalHave(id) { const localHaveRange = this.#algo.haveRange(id) + this.#localHave.set(id, localHaveRange) // prettier-ignore this.#debug('%s Stream OUT: send local have-range %o for %s', this.#myId, localHaveRange, id) this.sink.write({ id, phase: 1, payload: localHaveRange }) @@ -63,8 +111,11 @@ class SyncStream extends Pipeable { // prettier-ignore this.#debug('%s Stream IN: received remote have-range %o for %s', this.#myId, remoteHaveRange, id) this.#remoteHave.set(id, remoteHaveRange) + const goal = this.#goals.get(id) const haveRange = this.#algo.haveRange(id) - const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange) + const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange, goal) + this.#localHave.set(id, haveRange) + this.#localWant.set(id, wantRange) // prettier-ignore this.#debug('%s Stream OUT: send local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } }) @@ -75,9 +126,11 @@ class SyncStream extends Pipeable { this.#debug('%s Stream IN: received remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id) this.#remoteHave.set(id, remoteHaveRange) this.#remoteWant.set(id, remoteWantRange) - const haveRange = this.#algo.haveRange(id) - const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange) - const localBloom0 = this.#algo.bloomFor(id, 0, remoteWantRange) + const goal = this.#goals.get(id) + const haveRange = this.#localHave.get(id) + const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange, goal) + this.#localWant.set(id, wantRange) + const localBloom0 = this.#algo.bloomFor(id, 0, wantRange) this.sink.write({ id, phase: 3, @@ -95,10 +148,11 @@ class SyncStream extends Pipeable { id, 0, remoteWantRange, - remoteBloom + remoteBloom // representation of everything they have for me ) this.#updateSendableMsgs(id, msgIDsForThem) - const localBloom = this.#algo.bloomFor(id, 0, remoteWantRange) + const localWantRange = this.#localWant.get(id) + const localBloom = this.#algo.bloomFor(id, 0, localWantRange) this.sink.write({ id, phase: 4, @@ -120,12 +174,9 @@ class SyncStream extends Pipeable { remoteBloom ) this.#updateSendableMsgs(id, msgIDsForThem) - const localBloom = this.#algo.bloomFor( - id, - round, - remoteWantRange, - this.#receivableMsgs.get(id) - ) + const extras = this.#receivableMsgs.get(id) + const localWantRange = this.#localWant.get(id) + const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) this.sink.write({ id, phase, @@ -147,12 +198,9 @@ class SyncStream extends Pipeable { remoteBloom ) this.#updateSendableMsgs(id, msgIDsForThem) - const localBloom = this.#algo.bloomFor( - id, - round, - remoteWantRange, - this.#receivableMsgs.get(id) - ) + const extras = this.#receivableMsgs.get(id) + const localWantRange = this.#localWant.get(id) + const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) this.sink.write({ id, phase, @@ -205,6 +253,8 @@ class SyncStream extends Pipeable { this.sink.write({ id, phase: 10, payload: msgs }) this.#requested.delete(id) + this.#localHave.delete(id) + this.#localWant.delete(id) this.#remoteHave.delete(id) this.#remoteWant.delete(id) this.#receivableMsgs.delete(id) @@ -221,6 +271,8 @@ class SyncStream extends Pipeable { this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id) this.#requested.delete(id) + this.#localHave.delete(id) + this.#localWant.delete(id) this.#remoteHave.delete(id) this.#remoteWant.delete(id) this.#receivableMsgs.delete(id) diff --git a/test/feed-sync.test.js b/test/feed-sync.test.js index 2762347..a5f3c14 100644 --- a/test/feed-sync.test.js +++ b/test/feed-sync.test.js @@ -67,10 +67,13 @@ test('sync a normal feed', async (t) => { ) } + bob.tangleSync.setGoal(carolRootHash, 'all') + alice.tangleSync.setGoal(carolRootHash, 'all') + const remoteAlice = await p(bob.connect)(alice.getAddress()) t.pass('bob connected to alice') - bob.tangleSync.request(carolRootHash) + bob.tangleSync.initiate() await p(setTimeout)(1000) t.pass('tangleSync!') diff --git a/test/thread-sync.test.js b/test/thread-sync.test.js index 77187d0..dbcfde6 100644 --- a/test/thread-sync.test.js +++ b/test/thread-sync.test.js @@ -150,10 +150,13 @@ test('sync a thread where both peers have portions', async (t) => { 'bob has another portion of the thread' ) + bob.tangleSync.setGoal(startA.hash, 'all') + alice.tangleSync.setGoal(startA.hash, 'all') + const remoteAlice = await p(bob.connect)(alice.getAddress()) t.pass('bob connected to alice') - bob.tangleSync.request(startA.hash) + bob.tangleSync.initiate() await p(setTimeout)(1000) t.pass('tangleSync!')