diff --git a/declarations/pull-awaitable.d.ts b/declarations/pull-awaitable.d.ts new file mode 100644 index 0000000..fe7a985 --- /dev/null +++ b/declarations/pull-awaitable.d.ts @@ -0,0 +1,4 @@ +declare module 'pull-awaitable' { + function awaitable(pullStream: any): AsyncIterableIterator + export = awaitable +} diff --git a/lib/index.js b/lib/index.js index 0cc5242..dcd9e9d 100644 --- a/lib/index.js +++ b/lib/index.js @@ -75,7 +75,7 @@ function initNet(peer, config) { const infos = new Infos() const stats = new Stats(netDir, infos, config.net?.persistTimeout) const connections = new Connections(peer, infos) - const scheduler = new Scheduler() + const scheduler = new Scheduler(peer, connections) peer.close.hook(function (fn, args) { scheduler.stop() diff --git a/lib/scheduler.js b/lib/scheduler.js index 3556f79..d81ec4d 100644 --- a/lib/scheduler.js +++ b/lib/scheduler.js @@ -1,10 +1,76 @@ +const awaitable = require('pull-awaitable') +const run = require('promisify-tuple') +const debug = require('debug')('ppppp:net:scheduler') + +/** + * @typedef {ReturnType} PPPPPDB + * @typedef {ReturnType} PPPPPSet + * @typedef {`/${string}`} Multiaddr + * @typedef {import('./infos')} Infos + * @typedef {import('./connections')} Connections + * @typedef {{ + * db?: PPPPPDB; + * set?: PPPPPSet; + * shse: { + * pubkey: string; + * } + * }} Peer + */ + class Scheduler { - constructor() { - // FIXME: implement + /** @type {Peer} */ + #peer + /** @type {Connections} */ + #connections + + /** + * @param {Peer} peer + * @param {Connections} connections + */ + constructor(peer, connections) { + this.#peer = peer + this.#connections = connections + } + + /** + * @param {Multiaddr} multiaddr + */ + async #scheduleWithHub(multiaddr) { + /** @type {[Error, null] | [null, any]} */ + const [err, hubRPC] = await run(this.#connections.connect)(multiaddr) + if (err) { + debug('Failed to connect to hub at "%s" because %o', multiaddr, err) + return + } + + const attendantsStream = awaitable(hubRPC.hub.attendants()) + const hubPubkey = hubRPC.shse.pubkey + for await (const attendants of attendantsStream) { + for (const attendant of attendants) { + if (attendant !== this.#peer.shse.pubkey) { + const tunnelMultiaddr = /** @type {Multiaddr} */ ( + `/tunnel/${hubPubkey}.${attendant}/shse/${attendant}` + ) + this.#connections.connect(tunnelMultiaddr) + } + } + } + } + + #setupHubDiscovery() { + /** @type {Array | undefined} */ + const multiaddrs = this.#peer.set?.values('hubs') + if (!multiaddrs) return + for (const multiaddr of multiaddrs) { + this.#scheduleWithHub(multiaddr) + } } start() { - // FIXME: implement + if (!this.closed) return + this.closed = false + + this.#setupHubDiscovery(); } stop() { @@ -12,4 +78,4 @@ class Scheduler { } } -module.exports = Scheduler \ No newline at end of file +module.exports = Scheduler diff --git a/package.json b/package.json index f170327..5c778a7 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "on-change-network-strict": "1.0.0", "on-wakeup": "^1.0.1", "promisify-tuple": "^1.0.1", + "pull-awaitable": "1.0.0", "pull-cat": "~1.1.11", "pull-notify": "^0.1.2", "pull-pause": "~0.0.2", @@ -50,7 +51,9 @@ "bs58": "^5.0.0", "c8": "7", "ppppp-caps": "github:staltz/ppppp-caps#93fa810b9a40b78aef4872d4c2a8412cccb52929", + "ppppp-db": "github:staltz/ppppp-db#667b33779d98aff12a9b0cd2d7c80469a95cd04e", "ppppp-keypair": "github:staltz/ppppp-keypair#61ef4420578f450dc2cc7b1efc1c5a691a871c74", + "ppppp-set": "github:staltz/ppppp-set#8983ba29f03db95a76b4bd9a55aa4392b350fdbb", "prettier": "^2.6.2", "pretty-quick": "^3.1.3", "rimraf": "^4.4.0",