diff --git a/lib/hub-observer.js b/lib/hub-observer.js index 48e8d93..b3ecac3 100644 --- a/lib/hub-observer.js +++ b/lib/hub-observer.js @@ -4,6 +4,8 @@ const pull = require('pull-stream') const getSeverity = require('ssb-network-errors') /** + * @typedef {ReturnType} PPPPPNet + * @typedef {{net: PPPPPNet, shse: {pubkey: string}}} Peer * @typedef {(pull.Sink & {abort: () => void})} Drain * @typedef {{type: 'state', pubkeys: Array}} AttendantsEventState * @typedef {{type: 'joined', pubkey: string}} AttendantsEventJoined @@ -15,49 +17,31 @@ const getSeverity = require('ssb-network-errors') */ module.exports = class HubObserver { - /** - * @readonly - * @type {any} - */ + /**@readonly @type {Peer}*/ #peer - - /** - * @type {string} - */ + /**@type {string}*/ #hubPubkey - - /** - * @type {string} - */ - #address - - /** - * @type {{name: string, admin: string}} - */ + /**@type {`/${string}`}*/ + #multiaddr + /**@type {{name: string, admin: string}}*/ #hubMetadata - - /** - * @type {Set} - */ + /**@type {Set}*/ #attendants - - /** - * @type {Drain | undefined} - */ + /**@type {Drain | undefined}*/ #attendantsDrain /** - * @param {any} peer + * @param {Peer} peer * @param {string} hubPubkey - * @param {string} address + * @param {`/${string}`} multiaddr * @param {{name: string, admin: string}} hubMetadata * @param {any} rpc * @param {any} onConnect */ - constructor(peer, hubPubkey, address, hubMetadata, rpc, onConnect) { + constructor(peer, hubPubkey, multiaddr, hubMetadata, rpc, onConnect) { this.#peer = peer this.#hubPubkey = hubPubkey - this.#address = address + this.#multiaddr = multiaddr this.#hubMetadata = hubMetadata this.#attendants = new Set() @@ -80,12 +64,12 @@ module.exports = class HubObserver { this.#hubMetadata && Object.keys(this.#hubMetadata).length >= 1 ) { - /** @type {Record} */ + /** @type {any} */ const metadata = { type: 'hub' } const { name, admin } = this.#hubMetadata if (name) metadata.name = name if (admin) metadata.admin = admin - this.#peer.net.updateInfo(this.#address, metadata) + this.#peer.net.updateInfo(this.#multiaddr, metadata) } debug('Announcing myself to hub %s', this.#hubPubkey) @@ -125,7 +109,7 @@ module.exports = class HubObserver { // Update onlineCount metadata for this hub const onlineCount = this.#attendants.size - this.#peer.net.updateInfo(this.#address, { onlineCount }) + this.#peer.net.updateInfo(this.#multiaddr, /**@type {any}*/ ({ onlineCount })) const hubName = this.#hubMetadata?.name if (event.type === 'state') { @@ -135,9 +119,9 @@ module.exports = class HubObserver { } else if (event.type === 'joined') { this.#notifyNewAttendant(event.pubkey, this.#hubPubkey, hubName) } else if (event.type === 'left') { - const address = this.#getAddress(event.pubkey) - debug('Will disconnect and unstage %s', address) - this.#peer.net.disconnect(address) + const multiaddr = this.#getMultiaddr(event.pubkey) + debug('Will disconnect and unstage %s', multiaddr) + this.#peer.net.disconnect(multiaddr) } } @@ -163,9 +147,10 @@ module.exports = class HubObserver { #notifyNewAttendant(attendantPubkey, hubPubkey, hubName) { if (attendantPubkey === hubPubkey) return if (attendantPubkey === this.#peer.shse.pubkey) return - const address = this.#getAddress(attendantPubkey) + const multiaddr = this.#getMultiaddr(attendantPubkey) + // @ts-ignore this.#peer.hubClient._notifyDiscoveredAttendant({ - address, + multiaddr, attendantPubkey, hubPubkey, hubName, @@ -190,8 +175,8 @@ module.exports = class HubObserver { /** * @param {string} pubkey */ - #getAddress(pubkey) { - return `tunnel:${this.#hubPubkey}:${pubkey}~shse:${pubkey}` + #getMultiaddr(pubkey) { + return `/tunnel/${this.#hubPubkey}.${pubkey}/shse/${pubkey}` } /** @@ -208,12 +193,12 @@ module.exports = class HubObserver { close() { this.#attendantsDrain?.abort() for (const pubkey of this.#attendants) { - const address = this.#getAddress(pubkey) - this.#peer.net.forget(address) + const multiaddr = this.#getMultiaddr(pubkey) + this.#peer.net.forget(multiaddr) } this.rpc.close(true, (/** @type {any} */ err) => { if (err) debug('error when closing connection with room: %o', err) }) - this.#peer.net.disconnect(this.#address) + this.#peer.net.disconnect(this.#multiaddr) } } diff --git a/lib/ms-tunnel.js b/lib/ms-tunnel.js index e13c302..64585c4 100644 --- a/lib/ms-tunnel.js +++ b/lib/ms-tunnel.js @@ -1,4 +1,3 @@ -const bs58 = require('bs58') const debug = require('debug')('ppppp:hub-client') const pull = require('pull-stream') const run = require('promisify-tuple') @@ -6,27 +5,13 @@ const HubObserver = require('./hub-observer') const { muxrpcMissing } = require('./utils') /** + * @typedef {ReturnType} PPPPPNet * @typedef {Map} Hubs */ -/** - * @param {string} addresses - * @returns {string | undefined} - */ -function extractSHSEPubkey(addresses) { - for (const address of addresses.split(';')) { - for (const [transport, transform] of address.split('~')) { - const [name, pubkey, extra] = transform.split(':') - if (name === 'shse') { - return pubkey - } - } - } -} - /** * @param {Hubs} hubs - * @param {any} peer + * @param {{net: PPPPPNet, shse: {pubkey: string}}} peer */ const makeTunnelPlugin = (hubs, peer) => (/** @type {any}} */ msConfig) => { const self = { @@ -45,12 +30,11 @@ const makeTunnelPlugin = (hubs, peer) => (/** @type {any}} */ msConfig) => { pull( peer.net.listen(), pull.filter(({ type }) => type === 'connected'), - pull.drain(async ({ address, details }) => { - const pubkey = extractSHSEPubkey(address) + pull.drain(async ({ multiaddr, pubkey, details }) => { if (!pubkey) return if (hubs.has(pubkey)) return if (!details?.rpc) return - if (address.startsWith('tunnel:')) return + if (multiaddr.startsWith('/tunnel/')) return const rpc = details.rpc const [err, res] = await run(rpc.hub.metadata)() if (muxrpcMissing(err)) return @@ -63,7 +47,14 @@ const makeTunnelPlugin = (hubs, peer) => (/** @type {any}} */ msConfig) => { hubs.get(pubkey)?.cancel() hubs.delete(pubkey) } - const obs = new HubObserver(peer, pubkey, address, res, rpc, onConnect) + const obs = new HubObserver( + peer, + pubkey, + multiaddr, + res, + rpc, + onConnect + ) hubs.set(pubkey, obs) }) ) @@ -72,8 +63,7 @@ const makeTunnelPlugin = (hubs, peer) => (/** @type {any}} */ msConfig) => { pull( peer.net.listen(), pull.filter(({ type }) => type === 'disconnected'), - pull.drain(({ address }) => { - const pubkey = extractSHSEPubkey(address) + pull.drain(({ pubkey }) => { if (!pubkey) return if (!hubs.has(pubkey)) return hubs.get(pubkey)?.close() @@ -97,7 +87,7 @@ const makeTunnelPlugin = (hubs, peer) => (/** @type {any}} */ msConfig) => { * @param {CallableFunction} cb */ async client(address, cb) { - debug(`we wish to connect to %o`, address) + debug(`We wish to connect to %o`, address) const opts = self.parse(address) if (!opts) { cb(new Error(`invalid tunnel address ${address}`)) @@ -121,12 +111,12 @@ const makeTunnelPlugin = (hubs, peer) => (/** @type {any}} */ msConfig) => { return } - debug(`will call createTunnel with ${target} via hub ${hub}`) + debug(`Will call createTunnel with ${target} via hub ${hub}`) const duplex = hubRPC.hub.createTunnel( target, (/** @type {any} */ err) => { // prettier-ignore - if (err) debug('tunnel duplex broken with %o because %s', address, err.message ?? err) + if (err) debug('Tunnel duplex broken with %o because %s', address, err.message ?? err) } ) cb(null, duplex) diff --git a/lib/plugin-hub-client.js b/lib/plugin-hub-client.js index 92c44ab..51b6c82 100644 --- a/lib/plugin-hub-client.js +++ b/lib/plugin-hub-client.js @@ -3,11 +3,34 @@ const DuplexPair = require('pull-pair/duplex') // @ts-ignore const Notify = require('pull-notify') const debug = require('debug')('ppppp:hub-client') const makeTunnelPlugin = require('./ms-tunnel') -const { ErrorDuplex } = require('./utils') +const { ErrorDuplex, msaddrToMultiaddr, multiaddrToMsaddr } = require('./utils') + +const HUBS_SUBDOMAIN = 'hubs' + +/** + * @typedef {ReturnType} PPPPPNet + * @typedef {ReturnType} PPPPPSet + * @typedef {import('ppppp-net').Info} Info + * @typedef {{ + * net: PPPPPNet, + * set: PPPPPSet, + * multiserver: { + * transport(transport: any): void + * }, + * shse: { + * pubkey: string + * } + * }} Peer + */ + +/** + * @template T + * @typedef {(...args: [Error] | [null, T]) => void } CB + */ module.exports = { name: 'hubClient', - needs: ['net'], + needs: ['shse', 'net', 'set'], manifest: { connect: 'duplex', }, @@ -18,7 +41,7 @@ module.exports = { }, /** - * @param {any} peer + * @param {Peer} peer * @param {any} config */ init(peer, config) { @@ -42,6 +65,47 @@ module.exports = { }) return { + /** + * @param {`/${string}`} multiaddr + * @param {CB} cb + */ + addHub(multiaddr, cb) { + peer.set.add(HUBS_SUBDOMAIN, multiaddr, (err, _) => { + // prettier-ignore + if (err) return cb(new Error('Failed to add Hub to my Set feed', {cause: err})) + peer.net.connect(multiaddr, (err, rpc) => { + // prettier-ignore + if (err) return cb(new Error('Failed to connect to Hub after adding it to my Set feed', {cause: err})) + cb(null, void 0) + }) + }) + }, + + /** + * @param {number} amount + * @param {CB>} cb + */ + getHubs(amount, cb) { + const source = peer.net.peers() + source(null, (err, peers) => { + if (err === true || !peers) return cb(null, []) + // prettier-ignore + if (err) return cb(new Error('Failed to get hubs', { cause: err })) + // @ts-ignore + const infoMap = /**@type {Map<`/${string}`, Info>}*/ (new Map(peers)) + const multiaddrs = peer.set.values(HUBS_SUBDOMAIN) + const hubs = [] + for (const multiaddr of multiaddrs) { + const stats = infoMap.get(multiaddr)?.stats ?? { failure: 1 } + hubs.push({ multiaddr, stats }) + } + hubs.sort((a, b) => (a.stats.failure ?? 1) - (b.stats.failure ?? 1)) + hubs.splice(amount) + const returnable = hubs.map((h) => h.multiaddr) + cb(null, returnable) + }) + }, + /** * @param {string} origin * @returns {import('pull-stream').Duplex} @@ -61,11 +125,6 @@ module.exports = { } }, - // Needed due to https://github.com/ssb-ngi-pointer/ssb-room-client/pull/3#issuecomment-808322434 - ping() { - return Date.now() - }, - // Internal method, needed for api-plugin.ts getHubsMap() { return hubs diff --git a/package.json b/package.json index 863d9e5..e101d7d 100644 --- a/package.json +++ b/package.json @@ -23,8 +23,10 @@ "node": ">=16" }, "dependencies": { + "@types/ip": "1.1.3", "bs58": "^5.0.0", "debug": "^4.3.4", + "ip": "1.1.8", "promisify-tuple": "~1.2.0", "pull-notify": "~0.1.2", "pull-pair": "~1.1.0", @@ -37,6 +39,8 @@ "@types/pull-stream": "^3.6.2", "c8": "7", "husky": "^4.3.0", + "ppppp-net": "github:staltz/ppppp-net", + "ppppp-set": "github:staltz/ppppp-set", "prettier": "^2.6.2", "pretty-quick": "^3.1.3", "typescript": "^5.1.3"