From 08d9fd1eda253533a5f9551bd59ee75075b2e929 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Mon, 10 Apr 2023 22:53:36 +0300 Subject: [PATCH] make new algorithm.js --- index.js | 2 +- lib/algorithm.js | 107 ++++++++++++++++++++++++++++++++++++++ lib/plugin.js | 114 ++++++++++++++++++----------------------- lib/range.js | 17 ++++++ lib/stream.js | 5 +- test/feed-sync.test.js | 2 +- 6 files changed, 180 insertions(+), 67 deletions(-) create mode 100644 lib/algorithm.js create mode 100644 lib/range.js diff --git a/index.js b/index.js index fb8b856..a736313 100644 --- a/index.js +++ b/index.js @@ -1 +1 @@ -module.exports = [require('./lib/feed-sync'), require('./lib/thread-sync')] \ No newline at end of file +module.exports = [require('./lib/plugin')] \ No newline at end of file diff --git a/lib/algorithm.js b/lib/algorithm.js new file mode 100644 index 0000000..55394e3 --- /dev/null +++ b/lib/algorithm.js @@ -0,0 +1,107 @@ +const { BloomFilter } = require('bloom-filters') +const FeedV1 = require('ppppp-db/lib/feed-v1') +const p = require('util').promisify +const { isEmptyRange, estimateMsgCount } = require('./range') + +function countIter(iter) { + let count = 0 + for (const _ of iter) count++ + return count +} + +class Algorithm { + #peer + + constructor(peer) { + this.#peer = peer + } + + haveRange(rootMsgHash) { + const rootMsg = this.#peer.db.get(rootMsgHash) + if (!rootMsg) return [1, 0] + let maxDepth = 0 + for (const rec of this.#peer.db.records()) { + const tangles = rec.msg.metadata.tangles + if (rec.hash !== rootMsgHash && tangles[rootMsgHash]) { + const depth = tangles[rootMsgHash].depth + maxDepth = Math.max(maxDepth, depth) + } + } + return [0, maxDepth] + } + + wantRange(rootMsgId, localHaveRange, remoteHaveRange) { + if (isEmptyRange(remoteHaveRange)) return [1, 0] + const [minLocalHave, maxLocalHave] = localHaveRange + const [minRemoteHave, maxRemoteHave] = remoteHaveRange + if (minRemoteHave !== 0) throw new Error('minRemoteHave must be 0') + return [0, Math.max(maxLocalHave, maxRemoteHave)] + } + + bloomFor(feedId, round, range, extraIds = []) { + const filterSize = + (isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds) + const filter = BloomFilter.create(2 * filterSize, 0.00001) + if (!isEmptyRange(range)) { + for (const msg of this.yieldMsgsIn(feedId, range)) { + filter.add('' + round + FeedV1.getMsgHash(msg)) + } + } + for (const msgId of extraIds) { + filter.add('' + round + msgId) + } + return filter.saveAsJSON() + } + + msgsMissing(rootMsgHash, round, range, remoteBloomJSON) { + if (isEmptyRange(range)) return [] + const remoteFilter = BloomFilter.fromJSON(remoteBloomJSON) + const missing = [] + for (const msg of this.yieldMsgsIn(rootMsgHash, range)) { + const msgHash = FeedV1.getMsgHash(msg) + if (!remoteFilter.has('' + round + msgHash)) { + missing.push(msgHash) + } + } + return missing + } + + *yieldMsgsIn(rootMsgHash, range) { + const [minDepth, maxDepth] = range + const rootMsg = this.#peer.db.get(rootMsgHash) + if (!rootMsg) return + for (const msg of this.#peer.db.msgs()) { + const tangles = msg.metadata.tangles + if ( + tangles[rootMsgHash] && + tangles[rootMsgHash].depth >= minDepth && + tangles[rootMsgHash].depth <= maxDepth + ) { + yield msg + } + } + } + + async commit(newMsgs, rootMsgHash, cb) { + newMsgs.sort((a, b) => { + const aDepth = a.metadata.tangles[rootMsgHash].depth + const bDepth = b.metadata.tangles[rootMsgHash].depth + return aDepth - bDepth + }) + for (const msg of newMsgs) { + await p(this.#peer.db.add)(msg, rootMsgHash) + } + cb() + } + + getMsgs(msgIds) { + const msgs = [] + for (const msgId of msgIds) { + const msg = this.#peer.db.get(msgId) + if (msg) msgs.push(msg) + } + return msgs + } +} + +module.exports = Algorithm diff --git a/lib/plugin.js b/lib/plugin.js index 436aa82..43aa1a2 100644 --- a/lib/plugin.js +++ b/lib/plugin.js @@ -2,7 +2,7 @@ const toPull = require('push-stream-to-pull-stream') const pull = require('pull-stream') const makeDebug = require('debug') const getSeverity = require('ssb-network-errors') -const syncAlgorithm = require('./old-algorithm') +const Algorithm = require('./algorithm') const SyncStream = require('./stream') function isMuxrpcMissingError(err, namespace, methodName) { @@ -11,75 +11,63 @@ function isMuxrpcMissingError(err, namespace, methodName) { return err.message === jsErrorMessage || err.message === goErrorMessage } -module.exports = function makeSyncPlugin(name, getOpts) { - return { - name: name, - manifest: { - connect: 'duplex', - request: 'sync', +module.exports = { + name: 'tangleSync', + manifest: { + connect: 'duplex', + request: 'sync', + }, + permissions: { + anonymous: { + allow: ['connect'], }, - permissions: { - anonymous: { - allow: ['connect'], - }, - }, - init(peer, config) { - const debug = makeDebug(`ppppp:${name}`) - const opts = getOpts(peer, config) - const algo = syncAlgorithm(opts) + }, + init(peer, config) { + const debug = makeDebug(`ppppp:tangleSync`) + const algo = new Algorithm(peer) - algo.getMsgs = function getMsgs(msgIds) { - const msgs = [] - for (const msgId of msgIds) { - const msg = peer.db.get(msgId) - if (msg) msgs.push(msg) - } - return msgs - } + const streams = [] + function createStream(remoteId, isClient) { + // prettier-ignore + debug('Opening a stream with remote %s %s', isClient ? 'server' : 'client', remoteId) + const stream = new SyncStream(peer.id, debug, algo) + streams.push(stream) + return stream + } - const streams = [] - function createStream(remoteId, isClient) { - // prettier-ignore - debug('Opening a stream with remote %s %s', isClient ? 'server' : 'client', remoteId) - const stream = new SyncStream(peer.id, debug, algo) - streams.push(stream) - return stream - } + peer.on('rpc:connect', function onSyncRPCConnect(rpc, isClient) { + if (rpc.id === peer.id) return // local client connecting to local server + if (!isClient) return + const local = toPull.duplex(createStream(rpc.id, true)) - peer.on('rpc:connect', function onSyncRPCConnect(rpc, isClient) { - if (rpc.id === peer.id) return // local client connecting to local server - if (!isClient) return - const local = toPull.duplex(createStream(rpc.id, true)) - - const remote = rpc[name].connect((networkError) => { - if (networkError && getSeverity(networkError) >= 3) { - if (isMuxrpcMissingError(networkError, name, 'connect')) { - console.warn(`peer ${rpc.id} does not support sync connect`) - // } else if (isReconnectedError(networkError)) { // TODO: bring back - // Do nothing, this is a harmless error - } else { - console.error(`rpc.${name}.connect exception:`, networkError) - } + const remote = rpc.tangleSync.connect((networkError) => { + if (networkError && getSeverity(networkError) >= 3) { + if (isMuxrpcMissingError(networkError, 'tangleSync', 'connect')) { + console.warn(`peer ${rpc.id} does not support sync connect`) + // } else if (isReconnectedError(networkError)) { // TODO: bring back + // Do nothing, this is a harmless error + } else { + console.error(`rpc.tangleSync.connect exception:`, networkError) } - }) - - pull(local, remote, local) + } }) - function connect() { - // `this` refers to the remote peer who called this muxrpc API - return toPull.duplex(createStream(this.id, false)) - } + pull(local, remote, local) + }) - function request(id) { - for (const stream of streams) { - stream.request(id) - } + function connect() { + // `this` refers to the remote peer who called this muxrpc API + return toPull.duplex(createStream(this.id, false)) + } + + function request(id) { + for (const stream of streams) { + stream.request(id) } - return { - connect, - request, - } - }, - } + } + return { + connect, + request, + } + }, } diff --git a/lib/range.js b/lib/range.js new file mode 100644 index 0000000..8031f66 --- /dev/null +++ b/lib/range.js @@ -0,0 +1,17 @@ +function isEmptyRange(range) { + const [min, max] = range + return min > max +} + +function estimateMsgCount(range) { + const [minDepth, maxDepth] = range + const estimate = 2 * (maxDepth - minDepth + 1) + if (estimate > 1000) return 1000 + else if (estimate < 5) return 5 + else return estimate +} + +module.exports = { + isEmptyRange, + estimateMsgCount +} diff --git a/lib/stream.js b/lib/stream.js index 69a6ebc..6704907 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -1,4 +1,5 @@ const Pipeable = require('push-stream/pipeable') +const {isEmptyRange} = require('./range') class SyncStream extends Pipeable { #myId @@ -261,7 +262,7 @@ class SyncStream extends Pipeable { } case 2: { const { haveRange, wantRange } = payload - if (this.#algo.isEmptyRange(haveRange)) { + if (isEmptyRange(haveRange)) { // prettier-ignore this.#debug('%s Stream IN: received remote have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) return this.#sendMsgsInRemoteWant(id, wantRange) @@ -272,7 +273,7 @@ class SyncStream extends Pipeable { case 3: { const { wantRange, bloom } = payload const haveRange = this.#remoteHave.get(id) - if (haveRange && this.#algo.isEmptyRange(haveRange)) { + if (haveRange && isEmptyRange(haveRange)) { // prettier-ignore this.#debug('%s Stream IN: received remote want-range want-range %o and remember empty have-range %o for %s', this.#myId, wantRange, haveRange, id) return this.#sendMsgsInRemoteWant(id, wantRange) diff --git a/test/feed-sync.test.js b/test/feed-sync.test.js index c8cafb9..1ae1da0 100644 --- a/test/feed-sync.test.js +++ b/test/feed-sync.test.js @@ -76,7 +76,7 @@ test('sync a normal feed', async (t) => { const remoteAlice = await p(bob.connect)(alice.getAddress()) t.pass('bob connected to alice') - bob.threadSync.request(carolRootHash) + bob.tangleSync.request(carolRootHash) await p(setTimeout)(1000) t.pass('tangleSync!')