From 5b0244709be9974fcedd4a27f997ea4fa4c5650f Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Mon, 10 Apr 2023 22:32:28 +0300 Subject: [PATCH] use thread-sync for feeds too --- lib/{algorithm.js => old-algorithm.js} | 0 lib/plugin.js | 2 +- lib/thread-sync.js | 18 +++++++++--------- test/feed-sync.test.js | 26 ++++++++++++++++++++------ 4 files changed, 30 insertions(+), 16 deletions(-) rename lib/{algorithm.js => old-algorithm.js} (100%) diff --git a/lib/algorithm.js b/lib/old-algorithm.js similarity index 100% rename from lib/algorithm.js rename to lib/old-algorithm.js diff --git a/lib/plugin.js b/lib/plugin.js index 8efbfa7..436aa82 100644 --- a/lib/plugin.js +++ b/lib/plugin.js @@ -2,7 +2,7 @@ const toPull = require('push-stream-to-pull-stream') const pull = require('pull-stream') const makeDebug = require('debug') const getSeverity = require('ssb-network-errors') -const syncAlgorithm = require('./algorithm') +const syncAlgorithm = require('./old-algorithm') const SyncStream = require('./stream') function isMuxrpcMissingError(err, namespace, methodName) { diff --git a/lib/thread-sync.js b/lib/thread-sync.js index 58e2c00..d706037 100644 --- a/lib/thread-sync.js +++ b/lib/thread-sync.js @@ -31,30 +31,30 @@ module.exports = dagSyncPlugin('threadSync', (peer, config) => ({ else return estimate }, - *yieldMsgsIn(rootMsgId, range) { + *yieldMsgsIn(rootMsgHash, range) { const [minDepth, maxDepth] = range - const rootMsg = peer.db.get(rootMsgId) + const rootMsg = peer.db.get(rootMsgHash) if (!rootMsg) return for (const msg of peer.db.msgs()) { const tangles = msg.metadata.tangles if ( - tangles?.[rootMsgId] && - tangles[rootMsgId].depth >= minDepth && - tangles[rootMsgId].depth <= maxDepth + tangles[rootMsgHash] && + tangles[rootMsgHash].depth >= minDepth && + tangles[rootMsgHash].depth <= maxDepth ) { yield msg } } }, - async commit(newMsgs, rootMsgId, cb) { + async commit(newMsgs, rootMsgHash, cb) { newMsgs.sort((a, b) => { - const aDepth = a.metadata.tangles[rootMsgId].depth - const bDepth = b.metadata.tangles[rootMsgId].depth + const aDepth = a.metadata.tangles[rootMsgHash].depth + const bDepth = b.metadata.tangles[rootMsgHash].depth return aDepth - bDepth }) for (const msg of newMsgs) { - await p(peer.db.add)(msg) + await p(peer.db.add)(msg, rootMsgHash) } cb() }, diff --git a/test/feed-sync.test.js b/test/feed-sync.test.js index 9d765ff..c8cafb9 100644 --- a/test/feed-sync.test.js +++ b/test/feed-sync.test.js @@ -4,6 +4,7 @@ const os = require('os') const rimraf = require('rimraf') const SecretStack = require('secret-stack') const caps = require('ssb-caps') +const FeedV1 = require('ppppp-db/lib/feed-v1') const p = require('util').promisify const { generateKeypair } = require('./util') @@ -12,7 +13,7 @@ const createPeer = SecretStack({ appKey: caps.shs }) .use(require('ssb-box')) .use(require('../')) -test('sync a sliced classic feed', async (t) => { +test('sync a normal feed', async (t) => { const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') @@ -47,13 +48,23 @@ test('sync a sliced classic feed', async (t) => { } t.pass('alice has msgs 1..10 from carol') + let carolRootMsg = null + for (const msg of alice.db.msgs()) { + if (msg.metadata.who === carolID_b58 && !msg.content) { + carolRootMsg = msg + break + } + } + const carolRootHash = FeedV1.getMsgHash(carolRootMsg) + + await p(bob.db.add)(carolRootMsg, carolRootHash) for (let i = 0; i < 7; i++) { - await p(bob.db.add)(carolMsgs[i]) + await p(bob.db.add)(carolMsgs[i], carolRootHash) } { const arr = [...bob.db.msgs()] - .filter((msg) => msg.metadata.who === carolID_b58) + .filter((msg) => msg.metadata.who === carolID_b58 && msg.content) .map((msg) => msg.content.text) t.deepEquals( arr, @@ -65,13 +76,13 @@ test('sync a sliced classic feed', async (t) => { const remoteAlice = await p(bob.connect)(alice.getAddress()) t.pass('bob connected to alice') - bob.feedSync.request(carolPostFeedId) + bob.threadSync.request(carolRootHash) await p(setTimeout)(1000) - t.pass('feedSync!') + t.pass('tangleSync!') { const arr = [...bob.db.msgs()] - .filter((msg) => msg.metadata.who === carolID_b58) + .filter((msg) => msg.metadata.who === carolID_b58 && msg.content) .map((msg) => msg.content.text) t.deepEquals( arr, @@ -85,6 +96,9 @@ test('sync a sliced classic feed', async (t) => { await p(bob.close)(true) }) +// FIXME: +test.skip('sync a sliced feed', async (t) => {}) + // FIXME: test.skip('delete old msgs and sync latest msgs', async (t) => { const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice')