more general haveRange()

This commit is contained in:
Andre Staltz 2023-04-17 17:02:15 +03:00
parent 1a76e665de
commit 277f9dbbee
4 changed files with 7 additions and 255 deletions

View File

@ -28,15 +28,20 @@ class Algorithm {
haveRange(rootMsgHash) { haveRange(rootMsgHash) {
const rootMsg = this.#peer.db.get(rootMsgHash) const rootMsg = this.#peer.db.get(rootMsgHash)
if (!rootMsg) return [1, 0] if (!rootMsg) return [1, 0]
let minDepth = Number.MAX_SAFE_INTEGER
let maxDepth = 0 let maxDepth = 0
for (const rec of this.#peer.db.records()) { for (const rec of this.#peer.db.records()) {
const tangles = rec.msg.metadata.tangles const tangles = rec.msg.metadata.tangles
if (rec.hash !== rootMsgHash && tangles[rootMsgHash]) { if (!rec.msg.content) continue
if (rec.hash === rootMsgHash) {
minDepth = 0
} else if (tangles[rootMsgHash]) {
const depth = tangles[rootMsgHash].depth const depth = tangles[rootMsgHash].depth
minDepth = Math.min(minDepth, depth)
maxDepth = Math.max(maxDepth, depth) maxDepth = Math.max(maxDepth, depth)
} }
} }
return [0, maxDepth] return [minDepth, maxDepth]
} }
/** /**

View File

@ -1,81 +0,0 @@
const { BloomFilter } = require('bloom-filters')
const FeedV1 = require('ppppp-db/lib/feed-v1')
module.exports = function syncAlgorithm(opts = {}) {
const {
haveRange,
wantRange,
estimateMsgCount,
yieldMsgsIn,
commit,
} = opts
if (typeof haveRange !== 'function') {
throw new Error('function haveRange is required')
}
if (typeof wantRange !== 'function') {
throw new Error('function wantRange is required')
}
if (typeof estimateMsgCount !== 'function') {
throw new Error('function estimateMsgCount is required')
}
if (typeof yieldMsgsIn !== 'function') {
throw new Error('function yieldMsgsIn is required')
}
if (typeof commit !== 'function') {
throw new Error('function commit is required')
}
function isEmptyRange(range) {
const [min, max] = range
return min > max
}
function countIter(iter) {
let count = 0
for (const _ of iter) count++
return count
}
function betterWantRange(feedId, localHaveRange, remoteHaveRange) {
if (isEmptyRange(remoteHaveRange)) return [1, 0]
else return wantRange(feedId, localHaveRange, remoteHaveRange)
}
function bloomFor(feedId, round, range, extraIds = []) {
const filterSize =
(isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds)
const filter = BloomFilter.create(2 * filterSize, 0.00001)
if (!isEmptyRange(range)) {
for (const msg of yieldMsgsIn(feedId, range)) {
filter.add('' + round + FeedV1.getMsgHash(msg))
}
}
for (const msgId of extraIds) {
filter.add('' + round + msgId)
}
return filter.saveAsJSON()
}
function msgsMissing(feedId, round, range, remoteBloomJSON) {
if (isEmptyRange(range)) return []
const remoteFilter = BloomFilter.fromJSON(remoteBloomJSON)
const missing = []
for (const msg of yieldMsgsIn(feedId, range)) {
const msgHash = FeedV1.getMsgHash(msg)
if (!remoteFilter.has('' + round + msgHash)) {
missing.push(msgHash)
}
}
return missing
}
return {
haveRange,
wantRange: betterWantRange,
isEmptyRange,
bloomFor,
msgsMissing,
yieldMsgsIn,
commit,
}
}

View File

@ -1,111 +0,0 @@
const p = require('util').promisify
const FeedV1 = require('ppppp-db/lib/feed-v1')
const syncPlugin = require('./plugin')
module.exports = syncPlugin('feedSync', (peer, config) => {
const limit = config.feedSync?.limit ?? 1000
function* take(n, iter) {
if (n === 0) return
let i = 0
for (const item of iter) {
yield item
if (++i >= n) break
}
}
function* filter(iter, fn) {
for (const item of iter) {
if (fn(item)) yield item
}
}
return {
haveRange(feedId) {
let minDepth = Number.MAX_SAFE_INTEGER
let maxDepth = 0
for (const msg of peer.db.msgs()) {
if (FeedV1.getFeedId(msg) === feedId) {
minDepth = Math.min(minDepth, msg.metadata.depth)
maxDepth = Math.max(maxDepth, msg.metadata.depth)
}
}
return [minDepth, maxDepth]
},
wantRange(feedId, localHaveRange, remoteHaveRange) {
const [minLocalHave, maxLocalHave] = localHaveRange
const [minRemoteHave, maxRemoteHave] = remoteHaveRange
if (maxRemoteHave <= maxLocalHave) return [1, 0]
const maxWant = maxRemoteHave
const size = Math.max(maxWant - maxLocalHave, limit)
const minWant = Math.max(maxWant - size, maxLocalHave + 1, minRemoteHave)
return [minWant, maxWant]
},
estimateMsgCount(range) {
const [minDepth, maxDepth] = range
const estimate = 2 * (maxDepth - minDepth + 1)
if (estimate > 1000) return 1000
else if (estimate < 5) return 5
else return estimate
},
*yieldMsgsIn(feedId, range) {
const [minDepth, maxDepth] = range
for (const msg of peer.db.msgs()) {
if (
FeedV1.getFeedId(msg) === feedId &&
msg.metadata.depth >= minDepth &&
msg.metadata.depth <= maxDepth
) {
yield msg
}
}
},
async commit(newMsgs, feedId, cb) {
newMsgs.sort((a, b) => a.metadata.depth - b.metadata.depth) // mutation
const isRelevantRec = (rec) => FeedV1.getFeedId(rec.msg) === feedId
// Find max sequence in the database
let oldLastDepth = 0
let oldCount = 0
for (const rec of peer.db.records()) {
if (!isRelevantRec(rec)) continue
oldCount += 1
oldLastDepth = Math.max(oldLastDepth, rec.msg.metadata.depth)
}
const isContinuation = newMsgs[0].metadata.depth === oldLastDepth + 1
// Refuse creating holes in the feed
if (!isContinuation && newMsgs.length < limit) {
console.error(
`feedSync failed to persist msgs for ${feedId} because ` +
'they are not a continuation, and not enough messages'
)
return cb()
}
// Delete old messages in the database
if (isContinuation) {
// Delete just enough msgs to make room for the new ones
const N = Math.max(0, oldCount + newMsgs.length - limit)
for (const rec of take(N, filter(peer.db.records(), isRelevantRec))) {
await p(peer.db.del)(rec.hash)
}
} else {
// Delete all the old ones
for (const rec of filter(peer.db.records(), isRelevantRec)) {
await p(peer.db.del)(rec.hash)
}
}
// Add new messages
for (const msg of newMsgs) {
await p(peer.db.add)(msg)
}
cb()
},
}
})

View File

@ -1,61 +0,0 @@
const p = require('util').promisify
const dagSyncPlugin = require('./plugin')
module.exports = dagSyncPlugin('threadSync', (peer, config) => ({
haveRange(rootMsgHash) {
const rootMsg = peer.db.get(rootMsgHash)
if (!rootMsg) return [1, 0]
let maxDepth = 0
for (const rec of peer.db.records()) {
const tangles = rec.msg.metadata.tangles
if (rec.hash !== rootMsgHash && tangles?.[rootMsgHash]) {
const depth = tangles[rootMsgHash].depth
maxDepth = Math.max(maxDepth, depth)
}
}
return [0, maxDepth]
},
wantRange(rootMsgId, localHaveRange, remoteHaveRange) {
const [minLocalHave, maxLocalHave] = localHaveRange
const [minRemoteHave, maxRemoteHave] = remoteHaveRange
if (minRemoteHave !== 0) throw new Error('minRemoteHave must be 0')
return [0, Math.max(maxLocalHave, maxRemoteHave)]
},
estimateMsgCount(range) {
const [minDepth, maxDepth] = range
const estimate = 2 * (maxDepth - minDepth + 1)
if (estimate > 1000) return 1000
else if (estimate < 5) return 5
else return estimate
},
*yieldMsgsIn(rootMsgHash, range) {
const [minDepth, maxDepth] = range
const rootMsg = peer.db.get(rootMsgHash)
if (!rootMsg) return
for (const msg of peer.db.msgs()) {
const tangles = msg.metadata.tangles
if (
tangles[rootMsgHash] &&
tangles[rootMsgHash].depth >= minDepth &&
tangles[rootMsgHash].depth <= maxDepth
) {
yield msg
}
}
},
async commit(newMsgs, rootMsgHash, cb) {
newMsgs.sort((a, b) => {
const aDepth = a.metadata.tangles[rootMsgHash].depth
const bDepth = b.metadata.tangles[rootMsgHash].depth
return aDepth - bDepth
})
for (const msg of newMsgs) {
await p(peer.db.add)(msg, rootMsgHash)
}
cb()
},
}))