diff --git a/lib/algorithm.js b/lib/algorithm.js index ee4fb96..725f084 100644 --- a/lib/algorithm.js +++ b/lib/algorithm.js @@ -28,15 +28,20 @@ class Algorithm { haveRange(rootMsgHash) { const rootMsg = this.#peer.db.get(rootMsgHash) if (!rootMsg) return [1, 0] + let minDepth = Number.MAX_SAFE_INTEGER let maxDepth = 0 for (const rec of this.#peer.db.records()) { const tangles = rec.msg.metadata.tangles - if (rec.hash !== rootMsgHash && tangles[rootMsgHash]) { + if (!rec.msg.content) continue + if (rec.hash === rootMsgHash) { + minDepth = 0 + } else if (tangles[rootMsgHash]) { const depth = tangles[rootMsgHash].depth + minDepth = Math.min(minDepth, depth) maxDepth = Math.max(maxDepth, depth) } } - return [0, maxDepth] + return [minDepth, maxDepth] } /** diff --git a/lib/old-algorithm.js b/lib/old-algorithm.js deleted file mode 100644 index f54e352..0000000 --- a/lib/old-algorithm.js +++ /dev/null @@ -1,81 +0,0 @@ -const { BloomFilter } = require('bloom-filters') -const FeedV1 = require('ppppp-db/lib/feed-v1') - -module.exports = function syncAlgorithm(opts = {}) { - const { - haveRange, - wantRange, - estimateMsgCount, - yieldMsgsIn, - commit, - } = opts - if (typeof haveRange !== 'function') { - throw new Error('function haveRange is required') - } - if (typeof wantRange !== 'function') { - throw new Error('function wantRange is required') - } - if (typeof estimateMsgCount !== 'function') { - throw new Error('function estimateMsgCount is required') - } - if (typeof yieldMsgsIn !== 'function') { - throw new Error('function yieldMsgsIn is required') - } - if (typeof commit !== 'function') { - throw new Error('function commit is required') - } - - function isEmptyRange(range) { - const [min, max] = range - return min > max - } - - function countIter(iter) { - let count = 0 - for (const _ of iter) count++ - return count - } - - function betterWantRange(feedId, localHaveRange, remoteHaveRange) { - if (isEmptyRange(remoteHaveRange)) return [1, 0] - else return wantRange(feedId, localHaveRange, remoteHaveRange) - } - - function bloomFor(feedId, round, range, extraIds = []) { - const filterSize = - (isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds) - const filter = BloomFilter.create(2 * filterSize, 0.00001) - if (!isEmptyRange(range)) { - for (const msg of yieldMsgsIn(feedId, range)) { - filter.add('' + round + FeedV1.getMsgHash(msg)) - } - } - for (const msgId of extraIds) { - filter.add('' + round + msgId) - } - return filter.saveAsJSON() - } - - function msgsMissing(feedId, round, range, remoteBloomJSON) { - if (isEmptyRange(range)) return [] - const remoteFilter = BloomFilter.fromJSON(remoteBloomJSON) - const missing = [] - for (const msg of yieldMsgsIn(feedId, range)) { - const msgHash = FeedV1.getMsgHash(msg) - if (!remoteFilter.has('' + round + msgHash)) { - missing.push(msgHash) - } - } - return missing - } - - return { - haveRange, - wantRange: betterWantRange, - isEmptyRange, - bloomFor, - msgsMissing, - yieldMsgsIn, - commit, - } -} diff --git a/lib/old-feed-sync.js b/lib/old-feed-sync.js deleted file mode 100644 index 7eb999f..0000000 --- a/lib/old-feed-sync.js +++ /dev/null @@ -1,111 +0,0 @@ -const p = require('util').promisify -const FeedV1 = require('ppppp-db/lib/feed-v1') -const syncPlugin = require('./plugin') - -module.exports = syncPlugin('feedSync', (peer, config) => { - const limit = config.feedSync?.limit ?? 1000 - - function* take(n, iter) { - if (n === 0) return - let i = 0 - for (const item of iter) { - yield item - if (++i >= n) break - } - } - - function* filter(iter, fn) { - for (const item of iter) { - if (fn(item)) yield item - } - } - - return { - haveRange(feedId) { - let minDepth = Number.MAX_SAFE_INTEGER - let maxDepth = 0 - for (const msg of peer.db.msgs()) { - if (FeedV1.getFeedId(msg) === feedId) { - minDepth = Math.min(minDepth, msg.metadata.depth) - maxDepth = Math.max(maxDepth, msg.metadata.depth) - } - } - return [minDepth, maxDepth] - }, - - wantRange(feedId, localHaveRange, remoteHaveRange) { - const [minLocalHave, maxLocalHave] = localHaveRange - const [minRemoteHave, maxRemoteHave] = remoteHaveRange - if (maxRemoteHave <= maxLocalHave) return [1, 0] - const maxWant = maxRemoteHave - const size = Math.max(maxWant - maxLocalHave, limit) - const minWant = Math.max(maxWant - size, maxLocalHave + 1, minRemoteHave) - return [minWant, maxWant] - }, - - estimateMsgCount(range) { - const [minDepth, maxDepth] = range - const estimate = 2 * (maxDepth - minDepth + 1) - if (estimate > 1000) return 1000 - else if (estimate < 5) return 5 - else return estimate - }, - - *yieldMsgsIn(feedId, range) { - const [minDepth, maxDepth] = range - for (const msg of peer.db.msgs()) { - if ( - FeedV1.getFeedId(msg) === feedId && - msg.metadata.depth >= minDepth && - msg.metadata.depth <= maxDepth - ) { - yield msg - } - } - }, - - async commit(newMsgs, feedId, cb) { - newMsgs.sort((a, b) => a.metadata.depth - b.metadata.depth) // mutation - const isRelevantRec = (rec) => FeedV1.getFeedId(rec.msg) === feedId - - // Find max sequence in the database - let oldLastDepth = 0 - let oldCount = 0 - for (const rec of peer.db.records()) { - if (!isRelevantRec(rec)) continue - oldCount += 1 - oldLastDepth = Math.max(oldLastDepth, rec.msg.metadata.depth) - } - - const isContinuation = newMsgs[0].metadata.depth === oldLastDepth + 1 - // Refuse creating holes in the feed - if (!isContinuation && newMsgs.length < limit) { - console.error( - `feedSync failed to persist msgs for ${feedId} because ` + - 'they are not a continuation, and not enough messages' - ) - return cb() - } - - // Delete old messages in the database - if (isContinuation) { - // Delete just enough msgs to make room for the new ones - const N = Math.max(0, oldCount + newMsgs.length - limit) - for (const rec of take(N, filter(peer.db.records(), isRelevantRec))) { - await p(peer.db.del)(rec.hash) - } - } else { - // Delete all the old ones - for (const rec of filter(peer.db.records(), isRelevantRec)) { - await p(peer.db.del)(rec.hash) - } - } - - // Add new messages - for (const msg of newMsgs) { - await p(peer.db.add)(msg) - } - cb() - }, - } -}) diff --git a/lib/old-thread-sync.js b/lib/old-thread-sync.js deleted file mode 100644 index d706037..0000000 --- a/lib/old-thread-sync.js +++ /dev/null @@ -1,61 +0,0 @@ -const p = require('util').promisify -const dagSyncPlugin = require('./plugin') - -module.exports = dagSyncPlugin('threadSync', (peer, config) => ({ - haveRange(rootMsgHash) { - const rootMsg = peer.db.get(rootMsgHash) - if (!rootMsg) return [1, 0] - let maxDepth = 0 - for (const rec of peer.db.records()) { - const tangles = rec.msg.metadata.tangles - if (rec.hash !== rootMsgHash && tangles?.[rootMsgHash]) { - const depth = tangles[rootMsgHash].depth - maxDepth = Math.max(maxDepth, depth) - } - } - return [0, maxDepth] - }, - - wantRange(rootMsgId, localHaveRange, remoteHaveRange) { - const [minLocalHave, maxLocalHave] = localHaveRange - const [minRemoteHave, maxRemoteHave] = remoteHaveRange - if (minRemoteHave !== 0) throw new Error('minRemoteHave must be 0') - return [0, Math.max(maxLocalHave, maxRemoteHave)] - }, - - estimateMsgCount(range) { - const [minDepth, maxDepth] = range - const estimate = 2 * (maxDepth - minDepth + 1) - if (estimate > 1000) return 1000 - else if (estimate < 5) return 5 - else return estimate - }, - - *yieldMsgsIn(rootMsgHash, range) { - const [minDepth, maxDepth] = range - const rootMsg = peer.db.get(rootMsgHash) - if (!rootMsg) return - for (const msg of peer.db.msgs()) { - const tangles = msg.metadata.tangles - if ( - tangles[rootMsgHash] && - tangles[rootMsgHash].depth >= minDepth && - tangles[rootMsgHash].depth <= maxDepth - ) { - yield msg - } - } - }, - - async commit(newMsgs, rootMsgHash, cb) { - newMsgs.sort((a, b) => { - const aDepth = a.metadata.tangles[rootMsgHash].depth - const bDepth = b.metadata.tangles[rootMsgHash].depth - return aDepth - bDepth - }) - for (const msg of newMsgs) { - await p(peer.db.add)(msg, rootMsgHash) - } - cb() - }, -}))