realtime sync works in both connection directions

This commit is contained in:
Andre Staltz 2024-02-29 11:30:39 +02:00
parent 54aa67a08f
commit 47eb2dd27f
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
2 changed files with 104 additions and 21 deletions

View File

@ -93,23 +93,10 @@ class SyncStream extends Pipeable {
this.#remoteWant = new Map() this.#remoteWant = new Map()
this.#receivableMsgs = new Map() this.#receivableMsgs = new Map()
this.#sendableMsgs = new Map() this.#sendableMsgs = new Map()
}
initiate() {
for (const goal of this.#goals.list()) {
this.#requested.add(goal.id)
}
this.resume()
this.#goals.watch((/** @type {any} */ goal) => {
if (!this.#requested.has(goal.id) && goal.type !== 'none') {
this.#requested.add(goal.id)
this.resume()
}
})
// Setup real-time syncing // Setup real-time syncing
this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => { this.#db.onRecordAdded((/** @type {Rec} */ { id: msgID, msg }) => {
if (!this.sink || this.sink.paused) return
const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles)) const tangleIDs = [msgID].concat(Object.keys(msg.metadata.tangles))
for (const id of tangleIDs) { for (const id of tangleIDs) {
if (this.#realtimeSyncing.has(id)) { if (this.#realtimeSyncing.has(id)) {
@ -126,6 +113,20 @@ class SyncStream extends Pipeable {
}) })
} }
initiate() {
for (const goal of this.#goals.list()) {
this.#requested.add(goal.id)
}
this.resume()
this.#goals.watch((/** @type {any} */ goal) => {
if (!this.#requested.has(goal.id) && goal.type !== 'none') {
this.#requested.add(goal.id)
this.resume()
}
})
}
#canSend() { #canSend() {
return this.sink && !this.sink.paused && !this.ended return this.sink && !this.sink.paused && !this.ended
} }

View File

@ -62,13 +62,16 @@ test('sync feed msgs in realtime after the 9 rounds', async (t) => {
}) })
assert('bob published post 2') assert('bob published post 2')
for (let i = 0; i < 100; i++) { {
const arr = [...alice.db.msgs()] let arr
.filter((msg) => msg.metadata.account === bobID && msg.data) for (let i = 0; i < 100; i++) {
.map((msg) => msg.data.text) arr = [...alice.db.msgs()]
if (arr.length < 3) { .filter((msg) => msg.metadata.account === bobID && msg.data)
await p(setTimeout)(200) .map((msg) => msg.data.text)
continue if (arr.length < 3) {
await p(setTimeout)(200)
continue
}
} }
assert.deepEqual(arr, ['m0', 'm1', 'm2'], 'alice has posts 0..2 from bob') assert.deepEqual(arr, ['m0', 'm1', 'm2'], 'alice has posts 0..2 from bob')
} }
@ -77,3 +80,82 @@ test('sync feed msgs in realtime after the 9 rounds', async (t) => {
await p(alice.close)(true) await p(alice.close)(true)
await p(bob.close)(true) await p(bob.close)(true)
}) })
test('sync feed msgs in realtime after the 9 rounds, reverse', async (t) => {
const alice = createPeer({ name: 'alice' })
const bob = createPeer({ name: 'bob' })
await alice.db.loaded()
await bob.db.loaded()
const bobID = await p(bob.db.account.create)({
subdomain: 'account',
_nonce: 'bob',
})
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm0' },
})
assert('bob published post 0')
const bobPostsID = bob.db.feed.getID(bobID, 'post')
{
const arr = [...alice.db.msgs()]
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, [], 'alice has no posts from bob')
}
bob.goals.set(bobPostsID, 'all')
alice.goals.set(bobPostsID, 'all')
const remoteBob = await p(alice.connect)(bob.getAddress())
assert('bob connected to alice')
// Reverse direction of who "starts"
alice.sync.start()
await p(setTimeout)(1000)
assert('sync!')
{
const arr = [...alice.db.msgs()]
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
assert.deepEqual(arr, ['m0'], 'alice has post 0 from bob')
}
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm1' },
})
assert('bob published post 1')
await p(bob.db.feed.publish)({
account: bobID,
domain: 'post',
data: { text: 'm2' },
})
assert('bob published post 2')
{
let arr
for (let i = 0; i < 100; i++) {
arr = [...alice.db.msgs()]
.filter((msg) => msg.metadata.account === bobID && msg.data)
.map((msg) => msg.data.text)
if (arr.length < 3) {
await p(setTimeout)(200)
continue
}
}
assert.deepEqual(arr, ['m0', 'm1', 'm2'], 'alice has posts 0..2 from bob')
}
await p(remoteBob.close)(true)
await p(alice.close)(true)
await p(bob.close)(true)
})