diff --git a/lib/algorithm.js b/lib/algorithm.js index 725f084..acecc79 100644 --- a/lib/algorithm.js +++ b/lib/algorithm.js @@ -31,6 +31,7 @@ class Algorithm { let minDepth = Number.MAX_SAFE_INTEGER let maxDepth = 0 for (const rec of this.#peer.db.records()) { + if (!rec.msg) continue const tangles = rec.msg.metadata.tangles if (!rec.msg.content) continue if (rec.hash === rootMsgHash) { @@ -162,30 +163,70 @@ class Algorithm { } } - async commit(newMsgs, rootMsgHash, goal) { - // Add new messages - newMsgs.sort((a, b) => { - const aDepth = a.metadata.tangles[rootMsgHash].depth - const bDepth = b.metadata.tangles[rootMsgHash].depth - return aDepth - bDepth - }) - // FIXME: if invalid (does not reach root), reject - // FIXME: if newMsgs are too new, drop my tangle and reset - // FIXME: if newMsgs are too old, reject newMsgs - for (const msg of newMsgs) { - await p(this.#peer.db.add)(msg, rootMsgHash) + async commit(rootMsgHash, newMsgs, goal, myWantRange) { + // Filter out contentful newMsgs that are not in my want-range + const [minWant, maxWant] = myWantRange + const validNewMsgs = newMsgs + .filter((msg) => { + if (!msg.content) return true // contentless messages are always valid + const depth = msg.metadata.tangles[rootMsgHash]?.depth + if (!depth) return false // if it's not the root, it must have a depth + return minWant <= depth && depth <= maxWant + }) + .sort((a, b) => { + const aDepth = a.metadata.tangles[rootMsgHash]?.depth ?? 0 + const bDepth = b.metadata.tangles[rootMsgHash]?.depth ?? 0 + return aDepth - bDepth + }) + + // Simulate adding this whole tangle, and check if it's valid + let err + if ((err = this.#peer.db.validateTangle(rootMsgHash, validNewMsgs))) { + throw err } + // Add new messages TODO: optimize perf, avoiding await / try / catch + for (const msg of newMsgs) { + try { + await p(this.#peer.db.add)(msg, rootMsgHash) + } catch {} + } + + // Prune. Ideally this should be in a garbage collection module const { type, count } = parseGoal(goal) if (type === 'newest') return await this.pruneNewest(rootMsgHash, count) if (type === 'oldest') throw new Error('not implemented') // TODO: } - getMsgs(msgIds) { + /** + * @param {string} rootMsgHash + * @param {Set} msgHashes + * @returns + */ + getTangleSlice(rootMsgHash, msgHashes) { + if (msgHashes.size === 0) return [] + const tangle = this.#peer.db.getTangle(rootMsgHash) + const sorted = tangle.topoSort() + let oldestMsgHash = null + for (const msgHash of sorted) { + if (msgHashes.has(msgHash)) { + oldestMsgHash = msgHash + break + } + } + const { erasables } = tangle.getDeletablesAndErasables(oldestMsgHash) + const msgs = [] - for (const msgId of msgIds) { - const msg = this.#peer.db.get(msgId) - if (msg) msgs.push(msg) + for (const msgHash of sorted) { + let isErasable = erasables.includes(msgHash) + if (!msgHashes.has(msgHash) && !isErasable) continue + const msg = this.#peer.db.get(msgHash) + if (!msg) continue + if (isErasable) { + msgs.push({ ...msg, content: null }) + } else { + msgs.push(msg) + } } return msgs } diff --git a/lib/stream.js b/lib/stream.js index fbb209f..55522a6 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -231,10 +231,8 @@ class SyncStream extends Pipeable { // prettier-ignore this.#debug('%s Stream IN: received msgIDs in %s: %o', this.#myId, id, msgIDsForMe) this.#updateReceivableMsgs(id, msgIDsForMe) - const msgIDs = this.#sendableMsgs.has(id) - ? [...this.#sendableMsgs.get(id)] - : [] - const msgs = this.#algo.getMsgs(msgIDs) + const msgIDs = this.#sendableMsgs.get(id) ?? new Set() + const msgs = this.#algo.getTangleSlice(id, msgIDs) // prettier-ignore this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) this.sink.write({ id, phase: 9, payload: msgs }) @@ -243,16 +241,14 @@ class SyncStream extends Pipeable { #sendMissingMsgsRes(id, msgsForMe) { // prettier-ignore this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id) - const msgIDs = this.#sendableMsgs.has(id) - ? [...this.#sendableMsgs.get(id)] - : [] - - const msgs = this.#algo.getMsgs(msgIDs) + const msgIDs = this.#sendableMsgs.get(id) ?? new Set() + const msgs = this.#algo.getTangleSlice(id, msgIDs) // prettier-ignore this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) this.sink.write({ id, phase: 10, payload: msgs }) const goal = this.#goals.get(id) + const localWant = this.#localWant.get(id) this.#requested.delete(id) this.#localHave.delete(id) this.#localWant.delete(id) @@ -261,7 +257,12 @@ class SyncStream extends Pipeable { this.#receivableMsgs.delete(id) this.#sendableMsgs.delete(id) if (msgsForMe.length === 0) return - this.#algo.commit(msgsForMe, id, goal) + try { + this.#algo.commit(id, msgsForMe, goal, localWant) + } catch (err) { + // prettier-ignore + this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err) + } } #consumeMissingMsgs(id, msgsForMe) { @@ -269,6 +270,7 @@ class SyncStream extends Pipeable { this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id) const goal = this.#goals.get(id) + const localWant = this.#localWant.get(id) this.#requested.delete(id) this.#localHave.delete(id) this.#localWant.delete(id) @@ -277,7 +279,12 @@ class SyncStream extends Pipeable { this.#receivableMsgs.delete(id) this.#sendableMsgs.delete(id) if (msgsForMe.length === 0) return - this.#algo.commit(msgsForMe, id, goal) + try { + this.#algo.commit(id, msgsForMe, goal, localWant) + } catch (err) { + // prettier-ignore + this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err) + } } #sendMsgsInRemoteWant(id, remoteWantRange) { diff --git a/test/feed-sync.test.js b/test/feed-sync.test.js index 06685d1..ce81b95 100644 --- a/test/feed-sync.test.js +++ b/test/feed-sync.test.js @@ -6,6 +6,7 @@ const SecretStack = require('secret-stack') const caps = require('ssb-caps') const FeedV1 = require('ppppp-db/lib/feed-v1') const p = require('util').promisify +const Algorithm = require('../lib/algorithm') const { generateKeypair } = require('./util') const createPeer = SecretStack({ appKey: caps.shs }) @@ -170,3 +171,90 @@ test('sync a feed with goal=newest', async (t) => { await p(alice.close)(true) await p(bob.close)(true) }) + +test('with goal=newest but too far behind', async (t) => { + const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') + const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') + + rimraf.sync(ALICE_DIR) + rimraf.sync(BOB_DIR) + + const alice = createPeer({ + keys: generateKeypair('alice'), + path: ALICE_DIR, + }) + + const bob = createPeer({ + keys: generateKeypair('bob'), + path: BOB_DIR, + }) + + await alice.db.loaded() + await bob.db.loaded() + + const carolKeys = generateKeypair('carol') + const carolMsgs = [] + const carolID = carolKeys.id + const carolID_b58 = FeedV1.stripAuthor(carolID) + for (let i = 1; i <= 10; i++) { + const rec = await p(alice.db.create)({ + type: 'post', + content: { text: 'm' + i }, + keys: carolKeys, + }) + carolMsgs.push(rec.msg) + } + + const carolRootHash = alice.db.getFeedRoot(carolID, 'post') + const carolRootMsg = alice.db.get(carolRootHash) + + const algo = new Algorithm(alice) + await algo.pruneNewest(carolRootHash, 5) + { + const arr = [...alice.db.msgs()] + .filter((msg) => msg.metadata.who === carolID_b58 && msg.content) + .map((msg) => msg.content.text) + t.deepEquals( + arr, + ['m6', 'm7', 'm8', 'm9', 'm10'], + 'alice has msgs 6..10 from carol' + ) + } + + await p(bob.db.add)(carolRootMsg, carolRootHash) + for (let i = 0; i < 2; i++) { + await p(bob.db.add)(carolMsgs[i], carolRootHash) + } + + { + const arr = [...bob.db.msgs()] + .filter((msg) => msg.metadata.who === carolID_b58 && msg.content) + .map((msg) => msg.content.text) + t.deepEquals(arr, ['m1', 'm2'], 'bob has msgs 1..2 from carol') + } + + alice.tangleSync.setGoal(carolRootHash, 'newest-5') + bob.tangleSync.setGoal(carolRootHash, 'newest-5') + + const remoteAlice = await p(bob.connect)(alice.getAddress()) + t.pass('bob connected to alice') + + bob.tangleSync.initiate() + await p(setTimeout)(1000) + t.pass('tangleSync!') + + { + const arr = [...bob.db.msgs()] + .filter((msg) => msg.metadata.who === carolID_b58 && msg.content) + .map((msg) => msg.content.text) + t.deepEquals( + arr, + ['m6', 'm7', 'm8', 'm9', 'm10'], + 'bob has msgs 6..10 from carol' + ) + } + + await p(remoteAlice.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +})