diff --git a/lib/algorithm.js b/lib/algorithm.js index e5a83ad..ee4fb96 100644 --- a/lib/algorithm.js +++ b/lib/algorithm.js @@ -74,7 +74,8 @@ class Algorithm { * @returns {Range} */ #wantOldestRange(localHaveRange, remoteHaveRange, count) { - // FIXME: + // TODO: implement + throw new Error('not implemented') } /** @@ -140,16 +141,39 @@ class Algorithm { } } - async commit(newMsgs, rootMsgHash, cb) { + async pruneNewest(rootMsgHash, count) { + const tangle = this.#peer.db.getTangle(rootMsgHash) + const sorted = tangle.topoSort() + if (sorted.length <= count) return + const msgHash = sorted[sorted.length - count] + const { deletables, erasables } = tangle.getDeletablesAndErasables(msgHash) + const del = p(this.#peer.db.del) + const erase = p(this.#peer.db.erase) + for (const msgHash of deletables) { + await del(msgHash) + } + for (const msgHash of erasables) { + await erase(msgHash) + } + } + + async commit(newMsgs, rootMsgHash, goal) { + // Add new messages newMsgs.sort((a, b) => { const aDepth = a.metadata.tangles[rootMsgHash].depth const bDepth = b.metadata.tangles[rootMsgHash].depth return aDepth - bDepth }) + // FIXME: if invalid (does not reach root), reject + // FIXME: if newMsgs are too new, drop my tangle and reset + // FIXME: if newMsgs are too old, reject newMsgs for (const msg of newMsgs) { await p(this.#peer.db.add)(msg, rootMsgHash) } - cb() + + const { type, count } = parseGoal(goal) + if (type === 'newest') return await this.pruneNewest(rootMsgHash, count) + if (type === 'oldest') throw new Error('not implemented') // TODO: } getMsgs(msgIds) { diff --git a/lib/stream.js b/lib/stream.js index 9be419b..fbb209f 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -252,6 +252,7 @@ class SyncStream extends Pipeable { this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) this.sink.write({ id, phase: 10, payload: msgs }) + const goal = this.#goals.get(id) this.#requested.delete(id) this.#localHave.delete(id) this.#localWant.delete(id) @@ -260,16 +261,14 @@ class SyncStream extends Pipeable { this.#receivableMsgs.delete(id) this.#sendableMsgs.delete(id) if (msgsForMe.length === 0) return - this.#algo.commit(msgsForMe, id, (err) => { - // prettier-ignore - if (err) throw new Error('sendMissingMsgsRes failed because sink failed', {cause: err}) - }) + this.#algo.commit(msgsForMe, id, goal) } #consumeMissingMsgs(id, msgsForMe) { // prettier-ignore this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id) + const goal = this.#goals.get(id) this.#requested.delete(id) this.#localHave.delete(id) this.#localWant.delete(id) @@ -278,10 +277,7 @@ class SyncStream extends Pipeable { this.#receivableMsgs.delete(id) this.#sendableMsgs.delete(id) if (msgsForMe.length === 0) return - this.#algo.commit(msgsForMe, id, (err) => { - // prettier-ignore - if (err) throw new Error('sendMissingMsgsRes failed because sink failed', {cause: err}) - }) + this.#algo.commit(msgsForMe, id, goal) } #sendMsgsInRemoteWant(id, remoteWantRange) { diff --git a/test/feed-sync.test.js b/test/feed-sync.test.js index c19a7ee..06685d1 100644 --- a/test/feed-sync.test.js +++ b/test/feed-sync.test.js @@ -13,7 +13,7 @@ const createPeer = SecretStack({ appKey: caps.shs }) .use(require('ssb-box')) .use(require('../')) -test('sync a normal feed', async (t) => { +test('sync a feed with goal=all', async (t) => { const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') @@ -92,11 +92,7 @@ test('sync a normal feed', async (t) => { await p(bob.close)(true) }) -// FIXME: -test.skip('sync a sliced feed', async (t) => {}) - -// FIXME: -test.skip('delete old msgs and sync latest msgs', async (t) => { +test('sync a feed with goal=newest', async (t) => { const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') @@ -111,9 +107,6 @@ test.skip('delete old msgs and sync latest msgs', async (t) => { const bob = createPeer({ keys: generateKeypair('bob'), path: BOB_DIR, - feedSync: { - limit: 3, - }, }) await alice.db.loaded() @@ -122,39 +115,55 @@ test.skip('delete old msgs and sync latest msgs', async (t) => { const carolKeys = generateKeypair('carol') const carolMsgs = [] const carolID = carolKeys.id + const carolID_b58 = FeedV1.stripAuthor(carolID) for (let i = 1; i <= 10; i++) { - const msg = await p(alice.db.create)({ - feedFormat: 'classic', - content: { type: 'post', text: 'm' + i }, + const rec = await p(alice.db.create)({ + type: 'post', + content: { text: 'm' + i }, keys: carolKeys, }) - carolMsgs.push(msg) + carolMsgs.push(rec.msg) } t.pass('alice has msgs 1..10 from carol') - await p(bob.db.add)(carolMsgs[5].value) - await p(bob.db.add)(carolMsgs[6].value) - await p(bob.db.add)(carolMsgs[7].value) + const carolRootHash = alice.db.getFeedRoot(carolID, 'post') + const carolRootMsg = alice.db.get(carolRootHash) + + await p(bob.db.add)(carolRootMsg, carolRootHash) + for (let i = 0; i < 7; i++) { + await p(bob.db.add)(carolMsgs[i], carolRootHash) + } { - const arr = bob.db - .filterAsArray((msg) => msg?.value.author === carolID) - .map((msg) => msg.value.content.text) - t.deepEquals(arr, ['m6', 'm7', 'm8'], 'bob has msgs 6..8 from carol') + const arr = [...bob.db.msgs()] + .filter((msg) => msg.metadata.who === carolID_b58 && msg.content) + .map((msg) => msg.content.text) + t.deepEquals( + arr, + ['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'], + 'bob has msgs 1..7 from carol' + ) } + bob.tangleSync.setGoal(carolRootHash, 'newest-5') + alice.tangleSync.setGoal(carolRootHash, 'all') + const remoteAlice = await p(bob.connect)(alice.getAddress()) t.pass('bob connected to alice') - bob.feedSync.request(carolID) + bob.tangleSync.initiate() await p(setTimeout)(1000) - t.pass('feedSync!') + t.pass('tangleSync!') { - const arr = bob.db - .filterAsArray((msg) => msg?.value.author === carolID) - .map((msg) => msg.value.content.text) - t.deepEquals(arr, ['m8', 'm9', 'm10'], 'bob has msgs 8..10 from carol') + const arr = [...bob.db.msgs()] + .filter((msg) => msg.metadata.who === carolID_b58 && msg.content) + .map((msg) => msg.content.text) + t.deepEquals( + arr, + ['m6', 'm7', 'm8', 'm9', 'm10'], + 'bob has msgs 6..10 from carol' + ) } await p(remoteAlice.close)(true)