set goals: all, newest, oldest

This commit is contained in:
Andre Staltz 2023-04-12 23:17:02 +03:00
parent f3a8d805e9
commit 7df89f3439
7 changed files with 240 additions and 42 deletions

View File

@ -2,6 +2,15 @@ const { BloomFilter } = require('bloom-filters')
const FeedV1 = require('ppppp-db/lib/feed-v1')
const p = require('util').promisify
const { isEmptyRange, estimateMsgCount } = require('./range')
const { parseGoal } = require('./goal')
/**
* @typedef {import('./range').Range} Range
*/
/**
* @typedef {import('./goal').Goal} Goal
*/
function countIter(iter) {
let count = 0
@ -30,20 +39,70 @@ class Algorithm {
return [0, maxDepth]
}
wantRange(rootMsgId, localHaveRange, remoteHaveRange) {
if (isEmptyRange(remoteHaveRange)) return [1, 0]
const [minLocalHave, maxLocalHave] = localHaveRange
const [minRemoteHave, maxRemoteHave] = remoteHaveRange
if (minRemoteHave !== 0) throw new Error('minRemoteHave must be 0')
return [0, Math.max(maxLocalHave, maxRemoteHave)]
/**
* @param {string} rootMsgHash
* @param {Range} localHaveRange
* @param {Range} remoteHaveRange
* @returns {Range}
*/
#wantAllRange(rootMsgHash, localHaveRange, remoteHaveRange) {
return remoteHaveRange
}
bloomFor(feedId, round, range, extraIds = []) {
/**
* @param {string} rootMsgHash
* @param {Range} localHaveRange
* @param {Range} remoteHaveRange
* @param {number} count
* @returns {Range}
*/
#wantNewestRange(rootMsgHash, localHaveRange, remoteHaveRange, count) {
const [minLocalHave, maxLocalHave] = localHaveRange
const [minRemoteHave, maxRemoteHave] = remoteHaveRange
if (maxRemoteHave <= maxLocalHave) return [1, 0]
const maxWant = maxRemoteHave
const size = Math.max(maxWant - maxLocalHave, count)
const minWant = Math.max(maxWant - size, maxLocalHave + 1, minRemoteHave)
return [minWant, maxWant]
}
/**
* @param {string} rootMsgHash
* @param {Range} localHaveRange
* @param {Range} remoteHaveRange
* @param {number} count
* @returns {Range}
*/
#wantOldestRange(rootMsgHash, localHaveRange, remoteHaveRange, count) {
// FIXME:
}
/**
* @param {string} rootMsgHash // FIXME: delete YAGNI
* @param {Range} localHave
* @param {Range} remoteHave
* @param {Goal?} goal
* @returns {Range}
*/
wantRange(rootMsgHash, localHave, remoteHave, goal) {
if (!goal) return [1, 0]
if (isEmptyRange(remoteHave)) return [1, 0]
const { type, count } = parseGoal(goal)
if (type === 'all') {
return this.#wantAllRange(rootMsgHash, localHave, remoteHave)
} else if (type === 'newest') {
return this.#wantNewestRange(rootMsgHash, localHave, remoteHave, count)
} else if (type === 'oldest') {
return this.#wantOldestRange(rootMsgHash, localHave, remoteHave, count)
}
}
bloomFor(rootMsgHash, round, range, extraIds = []) {
const filterSize =
(isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds)
const filter = BloomFilter.create(2 * filterSize, 0.00001)
if (!isEmptyRange(range)) {
for (const msg of this.yieldMsgsIn(feedId, range)) {
for (const msg of this.yieldMsgsIn(rootMsgHash, range)) {
filter.add('' + round + FeedV1.getMsgHash(msg))
}
}

53
lib/goal.js Normal file
View File

@ -0,0 +1,53 @@
/**
* @typedef {'all'} GoalAll
*/
/**
* @typedef {`newest-${number}`} GoalNewest
*/
/**
* @typedef {`oldest-${number}`} GoalOldest
*/
/**
* @typedef {GoalAll|GoalNewest|GoalOldest} Goal
*/
/**
* @typedef {{type: 'all'; count: never}} ParsedAll
*/
/**
* @typedef {{type: 'newest' |'oldest'; count: number}} ParsedLimited
*/
/**
* @typedef {ParsedAll | ParsedLimited} ParsedGoal
*/
/**
* @param {Goal} goal
* @returns {ParsedGoal}
*/
function parseGoal(goal) {
if (goal === 'all') {
return { type: 'all' }
}
const matchN = goal.match(/^newest-(\d+)$/)
if (matchN) {
return { type: 'newest', count: Number(matchN[1]) }
}
const matchO = goal.match(/^oldest-(\d+)$/)
if (matchO) {
return { type: 'oldest', count: Number(matchO[1]) }
}
throw new Error(`Invalid goal: ${goal}`)
}
module.exports = {
parseGoal,
}

View File

@ -5,6 +5,10 @@ const getSeverity = require('ssb-network-errors')
const Algorithm = require('./algorithm')
const SyncStream = require('./stream')
/**
* @typedef {import('./goal').Goal} Goal
*/
function isMuxrpcMissingError(err, namespace, methodName) {
const jsErrorMessage = `method:${namespace},${methodName} is not in list of allowed methods`
const goErrorMessage = `muxrpc: no such command: ${namespace}.${methodName}`
@ -15,7 +19,8 @@ module.exports = {
name: 'tangleSync',
manifest: {
connect: 'duplex',
request: 'sync',
setGoal: 'sync',
initiate: 'sync',
},
permissions: {
anonymous: {
@ -24,13 +29,14 @@ module.exports = {
},
init(peer, config) {
const debug = makeDebug(`ppppp:tangleSync`)
const goals = new Map()
const algo = new Algorithm(peer)
const streams = []
function createStream(remoteId, iamClient) {
// prettier-ignore
debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId)
const stream = new SyncStream(peer.id, debug, algo)
const stream = new SyncStream(peer.id, debug, goals, algo)
streams.push(stream)
return stream
}
@ -60,14 +66,24 @@ module.exports = {
return toPull.duplex(createStream(this.id, false))
}
function request(id) {
/**
* @param {string} id
* @param {Goal} goal
*/
function setGoal(id, goal = 'all') {
goals.set(id, goal)
}
function initiate() {
for (const stream of streams) {
stream.request(id)
stream.initiate()
}
}
return {
connect,
request,
setGoal,
initiate,
}
},
}

View File

@ -1,8 +1,20 @@
/**
* @typedef {[number, number]} Range
*/
/**
* @param {Range} range
* @returns {boolean}
*/
function isEmptyRange(range) {
const [min, max] = range
return min > max
}
/**
* @param {Range} range
* @returns {number}
*/
function estimateMsgCount(range) {
const [minDepth, maxDepth] = range
const estimate = 2 * (maxDepth - minDepth + 1)

View File

@ -1,34 +1,81 @@
const Pipeable = require('push-stream/pipeable')
const {isEmptyRange} = require('./range')
const { isEmptyRange } = require('./range')
/**
* @typedef {import('./goal').Goal} Goal
*/
class SyncStream extends Pipeable {
#myId
#debug
#algo
/** Set of tangleId
* @type {Set<string>} */
#requested
/** tangleId => goal
* @type {Map<string, Goal>} */
#goals
/**
* tangleId => have-range by local peer
* @type {Map<string, [number, number]>}
*/
#localHave
/**
* tangleId => want-range by local peer
* @type {Map<string, [number, number]>}
*/
#localWant
/**
* tangleId => have-range by remote peer
* @type {Map<string, [number, number]>}
*/
#remoteHave
/**
* tangleId => want-range by remote peer
* @type {Map<string, [number, number]>}
*/
#remoteWant
/**
* tangleId => Set of msgIDs
* @type {Map<string, Set<string>>}
*/
#receivableMsgs
/**
* tangleId => Set of msgIDs
* @type {Map<string, Set<string>>}
*/
#sendableMsgs
constructor(localId, debug, algo) {
constructor(localId, debug, goals, algo) {
super()
this.paused = false // TODO: should we start as paused=true?
this.ended = false
this.source = this.sink = null
this.#myId = localId.slice(0, 6)
this.#debug = debug
this.#goals = goals
this.#algo = algo
this.#requested = new Set()
this.#remoteHave = new Map() // id => have-range by remote peer
this.#remoteWant = new Map() // id => want-range by remote peer
this.#receivableMsgs = new Map() // id => Set<msgIDs>
this.#sendableMsgs = new Map() // id => Set<msgIDs>
this.#localHave = new Map()
this.#localWant = new Map()
this.#remoteHave = new Map()
this.#remoteWant = new Map()
this.#receivableMsgs = new Map()
this.#sendableMsgs = new Map()
}
// public API
request(id) {
initiate() {
for (const id of this.#goals.keys()) {
this.#requested.add(id)
}
this.resume()
}
@ -54,6 +101,7 @@ class SyncStream extends Pipeable {
#sendLocalHave(id) {
const localHaveRange = this.#algo.haveRange(id)
this.#localHave.set(id, localHaveRange)
// prettier-ignore
this.#debug('%s Stream OUT: send local have-range %o for %s', this.#myId, localHaveRange, id)
this.sink.write({ id, phase: 1, payload: localHaveRange })
@ -63,8 +111,11 @@ class SyncStream extends Pipeable {
// prettier-ignore
this.#debug('%s Stream IN: received remote have-range %o for %s', this.#myId, remoteHaveRange, id)
this.#remoteHave.set(id, remoteHaveRange)
const goal = this.#goals.get(id)
const haveRange = this.#algo.haveRange(id)
const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange)
const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange, goal)
this.#localHave.set(id, haveRange)
this.#localWant.set(id, wantRange)
// prettier-ignore
this.#debug('%s Stream OUT: send local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id)
this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } })
@ -75,9 +126,11 @@ class SyncStream extends Pipeable {
this.#debug('%s Stream IN: received remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id)
this.#remoteHave.set(id, remoteHaveRange)
this.#remoteWant.set(id, remoteWantRange)
const haveRange = this.#algo.haveRange(id)
const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange)
const localBloom0 = this.#algo.bloomFor(id, 0, remoteWantRange)
const goal = this.#goals.get(id)
const haveRange = this.#localHave.get(id)
const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange, goal)
this.#localWant.set(id, wantRange)
const localBloom0 = this.#algo.bloomFor(id, 0, wantRange)
this.sink.write({
id,
phase: 3,
@ -95,10 +148,11 @@ class SyncStream extends Pipeable {
id,
0,
remoteWantRange,
remoteBloom
remoteBloom // representation of everything they have for me
)
this.#updateSendableMsgs(id, msgIDsForThem)
const localBloom = this.#algo.bloomFor(id, 0, remoteWantRange)
const localWantRange = this.#localWant.get(id)
const localBloom = this.#algo.bloomFor(id, 0, localWantRange)
this.sink.write({
id,
phase: 4,
@ -120,12 +174,9 @@ class SyncStream extends Pipeable {
remoteBloom
)
this.#updateSendableMsgs(id, msgIDsForThem)
const localBloom = this.#algo.bloomFor(
id,
round,
remoteWantRange,
this.#receivableMsgs.get(id)
)
const extras = this.#receivableMsgs.get(id)
const localWantRange = this.#localWant.get(id)
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras)
this.sink.write({
id,
phase,
@ -147,12 +198,9 @@ class SyncStream extends Pipeable {
remoteBloom
)
this.#updateSendableMsgs(id, msgIDsForThem)
const localBloom = this.#algo.bloomFor(
id,
round,
remoteWantRange,
this.#receivableMsgs.get(id)
)
const extras = this.#receivableMsgs.get(id)
const localWantRange = this.#localWant.get(id)
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras)
this.sink.write({
id,
phase,
@ -205,6 +253,8 @@ class SyncStream extends Pipeable {
this.sink.write({ id, phase: 10, payload: msgs })
this.#requested.delete(id)
this.#localHave.delete(id)
this.#localWant.delete(id)
this.#remoteHave.delete(id)
this.#remoteWant.delete(id)
this.#receivableMsgs.delete(id)
@ -221,6 +271,8 @@ class SyncStream extends Pipeable {
this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id)
this.#requested.delete(id)
this.#localHave.delete(id)
this.#localWant.delete(id)
this.#remoteHave.delete(id)
this.#remoteWant.delete(id)
this.#receivableMsgs.delete(id)

View File

@ -67,10 +67,13 @@ test('sync a normal feed', async (t) => {
)
}
bob.tangleSync.setGoal(carolRootHash, 'all')
alice.tangleSync.setGoal(carolRootHash, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
t.pass('bob connected to alice')
bob.tangleSync.request(carolRootHash)
bob.tangleSync.initiate()
await p(setTimeout)(1000)
t.pass('tangleSync!')

View File

@ -150,10 +150,13 @@ test('sync a thread where both peers have portions', async (t) => {
'bob has another portion of the thread'
)
bob.tangleSync.setGoal(startA.hash, 'all')
alice.tangleSync.setGoal(startA.hash, 'all')
const remoteAlice = await p(bob.connect)(alice.getAddress())
t.pass('bob connected to alice')
bob.tangleSync.request(startA.hash)
bob.tangleSync.initiate()
await p(setTimeout)(1000)
t.pass('tangleSync!')