const awaitable = require('pull-awaitable') const run = require('promisify-tuple') const debug = require('debug')('pzp:net:scheduler') /** * @typedef {ReturnType} PZPDB * @typedef {ReturnType} PZPSet * @typedef {`/${string}`} Multiaddr * @typedef {import('./infos')} Infos * @typedef {import('./connections')} Connections * @typedef {{ * db?: PZPDB; * set?: PZPSet; * shse: { * pubkey: string; * } * }} Peer */ /** * @template T * @typedef {[T] extends [void] ? * (...args: [Error] | []) => void : * (...args: [Error] | [null, T]) => void * } CB */ class Scheduler { /** @type {Peer} */ #peer /** @type {Connections} */ #connections /** @type {boolean} */ #closed /** * @param {Peer} peer * @param {Connections} connections */ constructor(peer, connections) { this.#peer = peer this.#connections = connections this.#closed = true } /** * @param {Multiaddr} multiaddr */ async #scheduleWithHub(multiaddr) { /**@type {any}*/ let hubRPC try { hubRPC = await this.#connections.connect(multiaddr) } catch (err) { debug('Failed to connect to hub at "%s" because %o', multiaddr, err) return } const attendantsStream = awaitable(hubRPC.hub.attendants()) const hubPubkey = hubRPC.shse.pubkey for await (const attendants of attendantsStream) { for (const attendant of attendants) { if (attendant !== this.#peer.shse.pubkey) { const tunnelMultiaddr = /** @type {Multiaddr} */ ( `/tunnel/${hubPubkey}.${attendant}/shse/${attendant}` ) this.#connections.connect(tunnelMultiaddr) } } } } /** * @param {CB} cb */ #setupHubDiscovery(cb) { this.#peer.set?.values('hubs', null, (err, /** @type {Array | undefined} */ multiaddrs) => { if (err) return cb(err) if (!multiaddrs) return cb() for (const multiaddr of multiaddrs) { this.#scheduleWithHub(multiaddr) } // @ts-ignore const stopWatch = this.#peer.set?.watch(({subdomain, event, value}) => { if (subdomain === 'hubs' && event === 'add') { this.#scheduleWithHub(value) } }); cb() }) } /** * @param {CB} cb */ start(cb) { if (!this.#closed) return this.#closed = false this.#setupHubDiscovery(cb); } stop() { this.#closed = true // FIXME: implement } } module.exports = Scheduler