mirror of https://codeberg.org/pzp/pzp-sync.git
pruneNewest
This commit is contained in:
parent
182156e1a2
commit
1a76e665de
|
@ -74,7 +74,8 @@ class Algorithm {
|
||||||
* @returns {Range}
|
* @returns {Range}
|
||||||
*/
|
*/
|
||||||
#wantOldestRange(localHaveRange, remoteHaveRange, count) {
|
#wantOldestRange(localHaveRange, remoteHaveRange, count) {
|
||||||
// FIXME:
|
// TODO: implement
|
||||||
|
throw new Error('not implemented')
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -140,16 +141,39 @@ class Algorithm {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async commit(newMsgs, rootMsgHash, cb) {
|
async pruneNewest(rootMsgHash, count) {
|
||||||
|
const tangle = this.#peer.db.getTangle(rootMsgHash)
|
||||||
|
const sorted = tangle.topoSort()
|
||||||
|
if (sorted.length <= count) return
|
||||||
|
const msgHash = sorted[sorted.length - count]
|
||||||
|
const { deletables, erasables } = tangle.getDeletablesAndErasables(msgHash)
|
||||||
|
const del = p(this.#peer.db.del)
|
||||||
|
const erase = p(this.#peer.db.erase)
|
||||||
|
for (const msgHash of deletables) {
|
||||||
|
await del(msgHash)
|
||||||
|
}
|
||||||
|
for (const msgHash of erasables) {
|
||||||
|
await erase(msgHash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async commit(newMsgs, rootMsgHash, goal) {
|
||||||
|
// Add new messages
|
||||||
newMsgs.sort((a, b) => {
|
newMsgs.sort((a, b) => {
|
||||||
const aDepth = a.metadata.tangles[rootMsgHash].depth
|
const aDepth = a.metadata.tangles[rootMsgHash].depth
|
||||||
const bDepth = b.metadata.tangles[rootMsgHash].depth
|
const bDepth = b.metadata.tangles[rootMsgHash].depth
|
||||||
return aDepth - bDepth
|
return aDepth - bDepth
|
||||||
})
|
})
|
||||||
|
// FIXME: if invalid (does not reach root), reject
|
||||||
|
// FIXME: if newMsgs are too new, drop my tangle and reset
|
||||||
|
// FIXME: if newMsgs are too old, reject newMsgs
|
||||||
for (const msg of newMsgs) {
|
for (const msg of newMsgs) {
|
||||||
await p(this.#peer.db.add)(msg, rootMsgHash)
|
await p(this.#peer.db.add)(msg, rootMsgHash)
|
||||||
}
|
}
|
||||||
cb()
|
|
||||||
|
const { type, count } = parseGoal(goal)
|
||||||
|
if (type === 'newest') return await this.pruneNewest(rootMsgHash, count)
|
||||||
|
if (type === 'oldest') throw new Error('not implemented') // TODO:
|
||||||
}
|
}
|
||||||
|
|
||||||
getMsgs(msgIds) {
|
getMsgs(msgIds) {
|
||||||
|
|
|
@ -252,6 +252,7 @@ class SyncStream extends Pipeable {
|
||||||
this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id)
|
this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id)
|
||||||
this.sink.write({ id, phase: 10, payload: msgs })
|
this.sink.write({ id, phase: 10, payload: msgs })
|
||||||
|
|
||||||
|
const goal = this.#goals.get(id)
|
||||||
this.#requested.delete(id)
|
this.#requested.delete(id)
|
||||||
this.#localHave.delete(id)
|
this.#localHave.delete(id)
|
||||||
this.#localWant.delete(id)
|
this.#localWant.delete(id)
|
||||||
|
@ -260,16 +261,14 @@ class SyncStream extends Pipeable {
|
||||||
this.#receivableMsgs.delete(id)
|
this.#receivableMsgs.delete(id)
|
||||||
this.#sendableMsgs.delete(id)
|
this.#sendableMsgs.delete(id)
|
||||||
if (msgsForMe.length === 0) return
|
if (msgsForMe.length === 0) return
|
||||||
this.#algo.commit(msgsForMe, id, (err) => {
|
this.#algo.commit(msgsForMe, id, goal)
|
||||||
// prettier-ignore
|
|
||||||
if (err) throw new Error('sendMissingMsgsRes failed because sink failed', {cause: err})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#consumeMissingMsgs(id, msgsForMe) {
|
#consumeMissingMsgs(id, msgsForMe) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id)
|
this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id)
|
||||||
|
|
||||||
|
const goal = this.#goals.get(id)
|
||||||
this.#requested.delete(id)
|
this.#requested.delete(id)
|
||||||
this.#localHave.delete(id)
|
this.#localHave.delete(id)
|
||||||
this.#localWant.delete(id)
|
this.#localWant.delete(id)
|
||||||
|
@ -278,10 +277,7 @@ class SyncStream extends Pipeable {
|
||||||
this.#receivableMsgs.delete(id)
|
this.#receivableMsgs.delete(id)
|
||||||
this.#sendableMsgs.delete(id)
|
this.#sendableMsgs.delete(id)
|
||||||
if (msgsForMe.length === 0) return
|
if (msgsForMe.length === 0) return
|
||||||
this.#algo.commit(msgsForMe, id, (err) => {
|
this.#algo.commit(msgsForMe, id, goal)
|
||||||
// prettier-ignore
|
|
||||||
if (err) throw new Error('sendMissingMsgsRes failed because sink failed', {cause: err})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#sendMsgsInRemoteWant(id, remoteWantRange) {
|
#sendMsgsInRemoteWant(id, remoteWantRange) {
|
||||||
|
|
|
@ -13,7 +13,7 @@ const createPeer = SecretStack({ appKey: caps.shs })
|
||||||
.use(require('ssb-box'))
|
.use(require('ssb-box'))
|
||||||
.use(require('../'))
|
.use(require('../'))
|
||||||
|
|
||||||
test('sync a normal feed', async (t) => {
|
test('sync a feed with goal=all', async (t) => {
|
||||||
const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice')
|
const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice')
|
||||||
const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob')
|
const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob')
|
||||||
|
|
||||||
|
@ -92,11 +92,7 @@ test('sync a normal feed', async (t) => {
|
||||||
await p(bob.close)(true)
|
await p(bob.close)(true)
|
||||||
})
|
})
|
||||||
|
|
||||||
// FIXME:
|
test('sync a feed with goal=newest', async (t) => {
|
||||||
test.skip('sync a sliced feed', async (t) => {})
|
|
||||||
|
|
||||||
// FIXME:
|
|
||||||
test.skip('delete old msgs and sync latest msgs', async (t) => {
|
|
||||||
const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice')
|
const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice')
|
||||||
const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob')
|
const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob')
|
||||||
|
|
||||||
|
@ -111,9 +107,6 @@ test.skip('delete old msgs and sync latest msgs', async (t) => {
|
||||||
const bob = createPeer({
|
const bob = createPeer({
|
||||||
keys: generateKeypair('bob'),
|
keys: generateKeypair('bob'),
|
||||||
path: BOB_DIR,
|
path: BOB_DIR,
|
||||||
feedSync: {
|
|
||||||
limit: 3,
|
|
||||||
},
|
|
||||||
})
|
})
|
||||||
|
|
||||||
await alice.db.loaded()
|
await alice.db.loaded()
|
||||||
|
@ -122,39 +115,55 @@ test.skip('delete old msgs and sync latest msgs', async (t) => {
|
||||||
const carolKeys = generateKeypair('carol')
|
const carolKeys = generateKeypair('carol')
|
||||||
const carolMsgs = []
|
const carolMsgs = []
|
||||||
const carolID = carolKeys.id
|
const carolID = carolKeys.id
|
||||||
|
const carolID_b58 = FeedV1.stripAuthor(carolID)
|
||||||
for (let i = 1; i <= 10; i++) {
|
for (let i = 1; i <= 10; i++) {
|
||||||
const msg = await p(alice.db.create)({
|
const rec = await p(alice.db.create)({
|
||||||
feedFormat: 'classic',
|
type: 'post',
|
||||||
content: { type: 'post', text: 'm' + i },
|
content: { text: 'm' + i },
|
||||||
keys: carolKeys,
|
keys: carolKeys,
|
||||||
})
|
})
|
||||||
carolMsgs.push(msg)
|
carolMsgs.push(rec.msg)
|
||||||
}
|
}
|
||||||
t.pass('alice has msgs 1..10 from carol')
|
t.pass('alice has msgs 1..10 from carol')
|
||||||
|
|
||||||
await p(bob.db.add)(carolMsgs[5].value)
|
const carolRootHash = alice.db.getFeedRoot(carolID, 'post')
|
||||||
await p(bob.db.add)(carolMsgs[6].value)
|
const carolRootMsg = alice.db.get(carolRootHash)
|
||||||
await p(bob.db.add)(carolMsgs[7].value)
|
|
||||||
|
await p(bob.db.add)(carolRootMsg, carolRootHash)
|
||||||
|
for (let i = 0; i < 7; i++) {
|
||||||
|
await p(bob.db.add)(carolMsgs[i], carolRootHash)
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
const arr = bob.db
|
const arr = [...bob.db.msgs()]
|
||||||
.filterAsArray((msg) => msg?.value.author === carolID)
|
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content)
|
||||||
.map((msg) => msg.value.content.text)
|
.map((msg) => msg.content.text)
|
||||||
t.deepEquals(arr, ['m6', 'm7', 'm8'], 'bob has msgs 6..8 from carol')
|
t.deepEquals(
|
||||||
|
arr,
|
||||||
|
['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'],
|
||||||
|
'bob has msgs 1..7 from carol'
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bob.tangleSync.setGoal(carolRootHash, 'newest-5')
|
||||||
|
alice.tangleSync.setGoal(carolRootHash, 'all')
|
||||||
|
|
||||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
||||||
t.pass('bob connected to alice')
|
t.pass('bob connected to alice')
|
||||||
|
|
||||||
bob.feedSync.request(carolID)
|
bob.tangleSync.initiate()
|
||||||
await p(setTimeout)(1000)
|
await p(setTimeout)(1000)
|
||||||
t.pass('feedSync!')
|
t.pass('tangleSync!')
|
||||||
|
|
||||||
{
|
{
|
||||||
const arr = bob.db
|
const arr = [...bob.db.msgs()]
|
||||||
.filterAsArray((msg) => msg?.value.author === carolID)
|
.filter((msg) => msg.metadata.who === carolID_b58 && msg.content)
|
||||||
.map((msg) => msg.value.content.text)
|
.map((msg) => msg.content.text)
|
||||||
t.deepEquals(arr, ['m8', 'm9', 'm10'], 'bob has msgs 8..10 from carol')
|
t.deepEquals(
|
||||||
|
arr,
|
||||||
|
['m6', 'm7', 'm8', 'm9', 'm10'],
|
||||||
|
'bob has msgs 6..10 from carol'
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
await p(remoteAlice.close)(true)
|
await p(remoteAlice.close)(true)
|
||||||
|
|
Loading…
Reference in New Issue