pzp-net/lib/scheduler.js

110 lines
2.5 KiB
JavaScript

const awaitable = require('pull-awaitable')
const run = require('promisify-tuple')
const debug = require('debug')('pzp:net:scheduler')
/**
* @typedef {ReturnType<import('pzp-db').init>} PZPDB
* @typedef {ReturnType<import('pzp-set').init>} PZPSet
* @typedef {`/${string}`} Multiaddr
* @typedef {import('./infos')} Infos
* @typedef {import('./connections')} Connections
* @typedef {{
* db?: PZPDB;
* set?: PZPSet;
* shse: {
* pubkey: string;
* }
* }} Peer
*/
/**
* @template T
* @typedef {[T] extends [void] ?
* (...args: [Error] | []) => void :
* (...args: [Error] | [null, T]) => void
* } CB
*/
class Scheduler {
/** @type {Peer} */
#peer
/** @type {Connections} */
#connections
/** @type {boolean} */
#closed
/**
* @param {Peer} peer
* @param {Connections} connections
*/
constructor(peer, connections) {
this.#peer = peer
this.#connections = connections
this.#closed = true
}
/**
* @param {Multiaddr} multiaddr
*/
async #scheduleWithHub(multiaddr) {
/**@type {any}*/
let hubRPC
try {
hubRPC = await this.#connections.connect(multiaddr)
} catch (err) {
debug('Failed to connect to hub at "%s" because %o', multiaddr, err)
return
}
const attendantsStream = awaitable(hubRPC.hub.attendants())
const hubPubkey = hubRPC.shse.pubkey
for await (const attendants of attendantsStream) {
for (const attendant of attendants) {
if (attendant !== this.#peer.shse.pubkey) {
const tunnelMultiaddr = /** @type {Multiaddr} */ (
`/tunnel/${hubPubkey}.${attendant}/shse/${attendant}`
)
this.#connections.connect(tunnelMultiaddr)
}
}
}
}
/**
* @param {CB<void>} cb
*/
#setupHubDiscovery(cb) {
this.#peer.set?.values('hubs', null, (err, /** @type {Array<Multiaddr> | undefined} */ multiaddrs) => {
if (err) return cb(err)
if (!multiaddrs) return cb()
for (const multiaddr of multiaddrs) {
this.#scheduleWithHub(multiaddr)
}
// @ts-ignore
const stopWatch = this.#peer.set?.watch(({subdomain, event, value}) => {
if (subdomain === 'hubs' && event === 'add') {
this.#scheduleWithHub(value)
}
});
cb()
})
}
/**
* @param {CB<void>} cb
*/
start(cb) {
if (!this.#closed) return
this.#closed = false
this.#setupHubDiscovery(cb);
}
stop() {
this.#closed = true
// FIXME: implement
}
}
module.exports = Scheduler