use thread-sync for feeds too

This commit is contained in:
Andre Staltz 2023-04-10 22:32:28 +03:00
parent a473c8fec1
commit 5b0244709b
4 changed files with 30 additions and 16 deletions

View File

@ -2,7 +2,7 @@ const toPull = require('push-stream-to-pull-stream')
const pull = require('pull-stream') const pull = require('pull-stream')
const makeDebug = require('debug') const makeDebug = require('debug')
const getSeverity = require('ssb-network-errors') const getSeverity = require('ssb-network-errors')
const syncAlgorithm = require('./algorithm') const syncAlgorithm = require('./old-algorithm')
const SyncStream = require('./stream') const SyncStream = require('./stream')
function isMuxrpcMissingError(err, namespace, methodName) { function isMuxrpcMissingError(err, namespace, methodName) {

View File

@ -31,30 +31,30 @@ module.exports = dagSyncPlugin('threadSync', (peer, config) => ({
else return estimate else return estimate
}, },
*yieldMsgsIn(rootMsgId, range) { *yieldMsgsIn(rootMsgHash, range) {
const [minDepth, maxDepth] = range const [minDepth, maxDepth] = range
const rootMsg = peer.db.get(rootMsgId) const rootMsg = peer.db.get(rootMsgHash)
if (!rootMsg) return if (!rootMsg) return
for (const msg of peer.db.msgs()) { for (const msg of peer.db.msgs()) {
const tangles = msg.metadata.tangles const tangles = msg.metadata.tangles
if ( if (
tangles?.[rootMsgId] && tangles[rootMsgHash] &&
tangles[rootMsgId].depth >= minDepth && tangles[rootMsgHash].depth >= minDepth &&
tangles[rootMsgId].depth <= maxDepth tangles[rootMsgHash].depth <= maxDepth
) { ) {
yield msg yield msg
} }
} }
}, },
async commit(newMsgs, rootMsgId, cb) { async commit(newMsgs, rootMsgHash, cb) {
newMsgs.sort((a, b) => { newMsgs.sort((a, b) => {
const aDepth = a.metadata.tangles[rootMsgId].depth const aDepth = a.metadata.tangles[rootMsgHash].depth
const bDepth = b.metadata.tangles[rootMsgId].depth const bDepth = b.metadata.tangles[rootMsgHash].depth
return aDepth - bDepth return aDepth - bDepth
}) })
for (const msg of newMsgs) { for (const msg of newMsgs) {
await p(peer.db.add)(msg) await p(peer.db.add)(msg, rootMsgHash)
} }
cb() cb()
}, },

View File

@ -4,6 +4,7 @@ const os = require('os')
const rimraf = require('rimraf') const rimraf = require('rimraf')
const SecretStack = require('secret-stack') const SecretStack = require('secret-stack')
const caps = require('ssb-caps') const caps = require('ssb-caps')
const FeedV1 = require('ppppp-db/lib/feed-v1')
const p = require('util').promisify const p = require('util').promisify
const { generateKeypair } = require('./util') const { generateKeypair } = require('./util')
@ -12,7 +13,7 @@ const createPeer = SecretStack({ appKey: caps.shs })
.use(require('ssb-box')) .use(require('ssb-box'))
.use(require('../')) .use(require('../'))
test('sync a sliced classic feed', async (t) => { test('sync a normal feed', async (t) => {
const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice')
const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob')
@ -47,13 +48,23 @@ test('sync a sliced classic feed', async (t) => {
} }
t.pass('alice has msgs 1..10 from carol') t.pass('alice has msgs 1..10 from carol')
let carolRootMsg = null
for (const msg of alice.db.msgs()) {
if (msg.metadata.who === carolID_b58 && !msg.content) {
carolRootMsg = msg
break
}
}
const carolRootHash = FeedV1.getMsgHash(carolRootMsg)
await p(bob.db.add)(carolRootMsg, carolRootHash)
for (let i = 0; i < 7; i++) { for (let i = 0; i < 7; i++) {
await p(bob.db.add)(carolMsgs[i]) await p(bob.db.add)(carolMsgs[i], carolRootHash)
} }
{ {
const arr = [...bob.db.msgs()] const arr = [...bob.db.msgs()]
.filter((msg) => msg.metadata.who === carolID_b58) .filter((msg) => msg.metadata.who === carolID_b58 && msg.content)
.map((msg) => msg.content.text) .map((msg) => msg.content.text)
t.deepEquals( t.deepEquals(
arr, arr,
@ -65,13 +76,13 @@ test('sync a sliced classic feed', async (t) => {
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
t.pass('bob connected to alice') t.pass('bob connected to alice')
bob.feedSync.request(carolPostFeedId) bob.threadSync.request(carolRootHash)
await p(setTimeout)(1000) await p(setTimeout)(1000)
t.pass('feedSync!') t.pass('tangleSync!')
{ {
const arr = [...bob.db.msgs()] const arr = [...bob.db.msgs()]
.filter((msg) => msg.metadata.who === carolID_b58) .filter((msg) => msg.metadata.who === carolID_b58 && msg.content)
.map((msg) => msg.content.text) .map((msg) => msg.content.text)
t.deepEquals( t.deepEquals(
arr, arr,
@ -85,6 +96,9 @@ test('sync a sliced classic feed', async (t) => {
await p(bob.close)(true) await p(bob.close)(true)
}) })
// FIXME:
test.skip('sync a sliced feed', async (t) => {})
// FIXME: // FIXME:
test.skip('delete old msgs and sync latest msgs', async (t) => { test.skip('delete old msgs and sync latest msgs', async (t) => {
const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice')