mirror of https://codeberg.org/pzp/pzp-sync.git
support start() before peers connect
This commit is contained in:
parent
6a9f46b337
commit
f4ab599bd1
|
@ -67,6 +67,7 @@ function initSync(peer, config) {
|
||||||
assertSHSEExists(peer)
|
assertSHSEExists(peer)
|
||||||
const debug = makeDebug(`ppppp:sync`)
|
const debug = makeDebug(`ppppp:sync`)
|
||||||
const algo = new Algorithm(peer)
|
const algo = new Algorithm(peer)
|
||||||
|
let started = false
|
||||||
|
|
||||||
const streams = /** @type {Array<SyncStream>} */ ([])
|
const streams = /** @type {Array<SyncStream>} */ ([])
|
||||||
|
|
||||||
|
@ -92,7 +93,8 @@ function initSync(peer, config) {
|
||||||
assertSHSEExists(peer)
|
assertSHSEExists(peer)
|
||||||
if (rpc.shse.pubkey === peer.shse.pubkey) return // connecting to myself
|
if (rpc.shse.pubkey === peer.shse.pubkey) return // connecting to myself
|
||||||
if (!iamClient) return
|
if (!iamClient) return
|
||||||
const local = toPull.duplex(createStream(rpc.shse.pubkey, true))
|
const stream = createStream(rpc.shse.pubkey, true)
|
||||||
|
const local = toPull.duplex(stream)
|
||||||
|
|
||||||
let abort = /** @type {CallableFunction | null} */ (null)
|
let abort = /** @type {CallableFunction | null} */ (null)
|
||||||
const remote = rpc.sync.connect((networkError) => {
|
const remote = rpc.sync.connect((networkError) => {
|
||||||
|
@ -108,6 +110,7 @@ function initSync(peer, config) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
abort = pull(local, remote, local)
|
abort = pull(local, remote, local)
|
||||||
|
if (started) stream.initiate()
|
||||||
}
|
}
|
||||||
peer.on('rpc:connect', onSyncRPCConnect)
|
peer.on('rpc:connect', onSyncRPCConnect)
|
||||||
|
|
||||||
|
@ -119,6 +122,8 @@ function initSync(peer, config) {
|
||||||
}
|
}
|
||||||
|
|
||||||
function start() {
|
function start() {
|
||||||
|
if (started) return
|
||||||
|
started = true
|
||||||
for (const stream of streams) {
|
for (const stream of streams) {
|
||||||
stream.initiate()
|
stream.initiate()
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,6 +105,13 @@ class SyncStream extends Pipeable {
|
||||||
this.#requested.add(goal.id)
|
this.#requested.add(goal.id)
|
||||||
}
|
}
|
||||||
this.resume()
|
this.resume()
|
||||||
|
|
||||||
|
this.#goals.watch((/** @type {any} */ goal) => {
|
||||||
|
if (!this.#requested.has(goal.id)) {
|
||||||
|
this.#requested.add(goal.id)
|
||||||
|
this.resume()
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#canSend() {
|
#canSend() {
|
||||||
|
|
|
@ -51,13 +51,16 @@ test('sync an account tangle', async (t) => {
|
||||||
"bob doesn't have alice's account tangle"
|
"bob doesn't have alice's account tangle"
|
||||||
)
|
)
|
||||||
|
|
||||||
bob.goals.set(aliceID, 'all')
|
|
||||||
alice.goals.set(aliceID, 'all')
|
|
||||||
|
|
||||||
|
// start() on purpose before connect, to test whether this also works
|
||||||
|
bob.sync.start()
|
||||||
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
const remoteAlice = await p(bob.connect)(alice.getAddress())
|
||||||
assert('bob connected to alice')
|
assert('bob connected to alice')
|
||||||
|
|
||||||
bob.sync.start()
|
// Set goals on purpose after connect, to test whether this also works
|
||||||
|
bob.goals.set(aliceID, 'all')
|
||||||
|
alice.goals.set(aliceID, 'all')
|
||||||
|
|
||||||
await p(setTimeout)(1000)
|
await p(setTimeout)(1000)
|
||||||
assert('sync!')
|
assert('sync!')
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue