From 4f371a08cf3a5a2c360ee9d9b7a521da3cf0a880 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Fri, 16 Jun 2023 13:12:07 +0300 Subject: [PATCH] init --- .gitignore | 9 ++ .prettierrc.yaml | 2 + LICENSE | 20 ++++ README.md | 1 + lib/hub-observer.js | 227 +++++++++++++++++++++++++++++++++++++++ lib/index.js | 4 + lib/ms-tunnel.js | 165 ++++++++++++++++++++++++++++ lib/plugin-hub-client.js | 78 ++++++++++++++ lib/plugin-hub.js | 56 ++++++++++ lib/utils.js | 34 ++++++ package.json | 54 ++++++++++ tsconfig.json | 15 +++ 12 files changed, 665 insertions(+) create mode 100644 .gitignore create mode 100644 .prettierrc.yaml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 lib/hub-observer.js create mode 100644 lib/index.js create mode 100644 lib/ms-tunnel.js create mode 100644 lib/plugin-hub-client.js create mode 100644 lib/plugin-hub.js create mode 100644 lib/utils.js create mode 100644 package.json create mode 100644 tsconfig.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4b96477 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.vscode +node_modules +pnpm-lock.yaml +package-lock.json +coverage +*~ + +# For misc scripts and experiments: +/gitignored diff --git a/.prettierrc.yaml b/.prettierrc.yaml new file mode 100644 index 0000000..1d2127c --- /dev/null +++ b/.prettierrc.yaml @@ -0,0 +1,2 @@ +semi: false +singleQuote: true diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..fcb6945 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2023 Andre 'Staltz' Medeiros + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ba9d17d --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +**Work in progress** \ No newline at end of file diff --git a/lib/hub-observer.js b/lib/hub-observer.js new file mode 100644 index 0000000..250fac7 --- /dev/null +++ b/lib/hub-observer.js @@ -0,0 +1,227 @@ +const debug = require('debug')('ppppp:hub-client') +const pull = require('pull-stream') +// @ts-ignore +const getSeverity = require('ssb-network-errors') + +/** + * @typedef {(pull.Sink & {abort: () => void})} Drain + * @typedef {{type: 'state', pubkeys: Array}} AttendantsEventState + * @typedef {{type: 'joined', pubkey: string}} AttendantsEventJoined + * @typedef {{type: 'left', pubkey: string}} AttendantsEventLeft + */ + +/** + * @typedef {AttendantsEventState | AttendantsEventJoined | AttendantsEventLeft} AttendantsEvent + */ + +module.exports = class HubObserver { + /** + * @readonly + * @type {any} + */ + #sstack + + /** + * @type {string} + */ + #hubPubkey + + /** + * @type {string} + */ + #address + + /** + * @type {{name: string, admin: string}} + */ + #hubMetadata + + /** + * @type {Set} + */ + #attendants + + /** + * @type {Drain | undefined} + */ + #attendantsDrain + + /** + * @param {any} sstack + * @param {string} hubPubkey + * @param {string} address + * @param {{name: string, admin: string}} hubMetadata + * @param {any} rpc + * @param {any} onConnect + */ + constructor(sstack, hubPubkey, address, hubMetadata, rpc, onConnect) { + this.#sstack = sstack + this.#hubPubkey = hubPubkey + this.#address = address + this.#hubMetadata = hubMetadata + this.#attendants = new Set() + + this.rpc = rpc + + /** + * @param {any} stream + * @param {string} peerPubkey + */ + this.handler = (stream, peerPubkey) => { + stream.address = `tunnel:${this.#hubPubkey}:${peerPubkey}` + // prettier-ignore + debug('Handler will call onConnect for the stream.address: %s', stream.address); + onConnect(stream) + } + + // If metadata is a plain object with at least one field + if ( + typeof this.#hubMetadata === 'object' && + this.#hubMetadata && + Object.keys(this.#hubMetadata).length >= 1 + ) { + /** @type {Record} */ + const metadata = { type: 'hub' } + const { name, admin } = this.#hubMetadata + if (name) metadata.name = name + if (admin) metadata.admin = admin + this.#sstack.conn.db().update(this.#address, metadata) + this.#sstack.conn.hub().update(this.#address, metadata) + } + + debug('Announcing myself to hub %s', this.#hubPubkey) + pull( + this.rpc.hub.attendants(), + (this.#attendantsDrain = /** @type {Drain} */ ( + pull.drain(this.#attendantsUpdated, this.#attendantsEnded) + )) + ) + } + + /** + * @param {AttendantsEvent} event + */ + #attendantsUpdated = (event) => { + // debug log + if (event.type === 'state') { + // prettier-ignore + debug('initial attendants in %s: %s', this.#hubPubkey, JSON.stringify(event.pubkeys)) + } else if (event.type === 'joined') { + debug('attendant joined %s: %s', this.#hubPubkey, event.pubkey) + } else if (event.type === 'left') { + debug('attendant left %s: %s', this.#hubPubkey, event.pubkey) + } + + // Update attendants set + if (event.type === 'state') { + this.#attendants.clear() + for (const pubkey of event.pubkeys) { + this.#attendants.add(pubkey) + } + } else if (event.type === 'joined') { + this.#attendants.add(event.pubkey) + } else if (event.type === 'left') { + this.#attendants.delete(event.pubkey) + } + + // Update onlineCount metadata for this hub + const onlineCount = this.#attendants.size + this.#sstack.conn.hub().update(this.#address, { onlineCount }) + + // Update ssb-conn-staging + const hubName = this.#hubMetadata?.name + if (event.type === 'state') { + for (const pubkey of event.pubkeys) { + this.#notifyNewAttendant(pubkey, this.#hubPubkey, hubName) + } + } else if (event.type === 'joined') { + this.#notifyNewAttendant(event.pubkey, this.#hubPubkey, hubName) + } else if (event.type === 'left') { + const address = this.#getAddress(event.pubkey) + debug('Will disconnect and unstage %s', address) + this.#sstack.conn.unstage(address) + this.#sstack.conn.disconnect(address) + } + } + + /** + * @param {Error | boolean | null | undefined} err + * @returns + */ + #attendantsEnded = (err) => { + if (err && err !== true) { + this.#handleStreamError(err) + } + } + + /** + * Typically, this should stage the new attendant, but it's not up to us to + * decide that. We just notify other modules (like the ssb-conn scheduler) and + * they listen to the notify stream and stage it if they want. + * + * @param {string} attendantPubkey + * @param {string} hubPubkey + * @param {string} hubName + */ + #notifyNewAttendant(attendantPubkey, hubPubkey, hubName) { + if (attendantPubkey === hubPubkey) return + if (attendantPubkey === this.#sstack.id) return + const address = this.#getAddress(attendantPubkey) + this.#sstack.hubClient._notifyDiscoveredAttendant({ + address, + attendantPubkey, + hubPubkey, + hubName, + }) + } + + /** + * @param {Error} err + */ + #handleStreamError(err) { + const severity = getSeverity(err) + if (severity === 1) { + // pre-emptively destroy the stream, assuming the other + // end is packet-stream 2.0.0 sending end messages. + this.close() + } else if (severity >= 2) { + // prettier-ignore + console.error(`error getting updates from hub ${this.#hubPubkey} because ${err.message}`); + } + } + + /** + * @param {string} pubkey + */ + #getAddress(pubkey) { + return `tunnel:${this.#hubPubkey}:${pubkey}~shse:${pubkey}` + } + + /** + * Similar to close(), but just destroys this "observer", not the + * underlying connections. + */ + cancel() { + this.#attendantsDrain?.abort() + } + + /** + * Similar to cancel(), but also closes connection with the hub server. + */ + close() { + this.#attendantsDrain?.abort() + for (const pubkey of this.#attendants) { + const address = this.#getAddress(pubkey) + this.#sstack.conn.unstage(address) + } + for (const [addr, data] of this.#sstack.conn.staging().entries()) { + if (data.hub === this.#hubPubkey) { + this.#sstack.conn.unstage(addr) + } + } + this.rpc.close(true, (/** @type {any} */ err) => { + if (err) debug('error when closing connection with room: %o', err) + }) + this.#sstack.conn.disconnect(this.#address, () => {}) + } +} diff --git a/lib/index.js b/lib/index.js new file mode 100644 index 0000000..58bea17 --- /dev/null +++ b/lib/index.js @@ -0,0 +1,4 @@ +const hub = require('./plugin-hub') +const hubClient = require('./plugin-hub-client') + +module.exports = [hub, hubClient] diff --git a/lib/ms-tunnel.js b/lib/ms-tunnel.js new file mode 100644 index 0000000..b464c19 --- /dev/null +++ b/lib/ms-tunnel.js @@ -0,0 +1,165 @@ +const debug = require('debug')('ppppp:hub-client') +const pull = require('pull-stream') +const run = require('promisify-tuple') +const HubObserver = require('./hub-observer') +const { muxrpcMissing } = require('./utils') + +/** + * @typedef {Map} Hubs + */ + +/** + * @param {Hubs} hubs + * @param {any} sstack + */ +const makeTunnelPlugin = (hubs, sstack) => (/** @type {any}} */ msConfig) => { + const self = { + name: 'tunnel', + + scope() { + return msConfig.scope + }, + + /** + * @param {CallableFunction} onConnect + * @param {CallableFunction} startedCB + */ + server(onConnect, startedCB) { + // Once a peer connects, detect hubs, and setup observers + pull( + sstack.conn.hub().listen(), + pull.filter(({ type }) => type === 'connected'), + pull.drain(async ({ address, key, details }) => { + if (!key) return + if (hubs.has(key)) return + if (!details?.rpc) return + if (address.startsWith('tunnel:')) return + const rpc = details.rpc + const [err, res] = await run(rpc.hub.metadata)() + if (muxrpcMissing(err)) return + if (err) { + debug('failure when calling hub.metadata: %s', err.message ?? err) + return + } + debug('connected to hub %s', key) + if (hubs.has(key)) { + hubs.get(key)?.cancel() + hubs.delete(key) + } + const obs = new HubObserver(sstack, key, address, rpc, res, onConnect) + hubs.set(key, obs) + }) + ) + + // Once a hub disconnects, teardown + pull( + sstack.conn.hub().listen(), + pull.filter(({ type }) => type === 'disconnected'), + pull.drain(({ key }) => { + if (!key) return + if (!hubs.has(key)) return + hubs.get(key)?.close() + hubs.delete(key) + }) + ) + + startedCB() + + // close this ms plugin + return () => { + for (const hubObserver of hubs.values()) { + hubObserver.close() + } + hubs.clear() + } + }, + + /** + * @param {string} address + * @param {CallableFunction} cb + */ + async client(address, cb) { + debug(`we wish to connect to %o`, address) + const opts = self.parse(address) + if (!opts) { + cb(new Error(`invalid tunnel address ${address}`)) + return + } + const { hub, target } = opts + const addrStr = `tunnel:${hub}:${target}` + + let hubRPC = hubs.get(hub)?.rpc + + // If no hub found, look up hub in connDB and connect to it + if (!hubRPC) { + for (const [msaddr] of sstack.conn.db().entries()) { + const pubkey = msaddr.split('~shse:')[1] + if (pubkey === hub) { + // prettier-ignore + debug(`to connect to ${addrStr} we first have to connect to ${hub}`) + const [err, rpc] = await run(sstack.conn.connect)(msaddr) + if (err) { + // prettier-ignore + cb(new Error(`cant connect to ${addrStr} because cant reach the hub ${hub} due to: ` + err.message ?? err)) + return + } + hubRPC = rpc + } + } + } + + // If no hub found, find tunnel addr in connDB and connect to its `hub` + if (!hubRPC) { + const addrStrPlusShse = `tunnel:${hub}:${target}~shse:${target}` + const peerData = sstack.conn.db().get(addrStrPlusShse) + if (peerData?.hub === hub && peerData?.hubAddress) { + // prettier-ignore + debug(`to connect to ${addrStr} we first have to connect to ${hub}`) + const [err, rpc] = await run(sstack.conn.connect)(peerData.hubAddress) + if (err) { + // prettier-ignore + cb(new Error(`cant connect to ${addrStr} because cant reach the hub ${hub} due to: ` + err.message ?? err)) + return + } + hubRPC = rpc + } + } + + // If still no hub is found, consider it unknown + if (!hubRPC) { + // prettier-ignore + cb(new Error(`cant connect to ${addrStr} because hub ${hub} is offline or unknown`)) + return + } + + debug(`will call createTunnel with ${target} via hub ${hub}`) + const duplex = hubRPC.hub.createTunnel( + target, + (/** @type {any} */ err) => { + // prettier-ignore + if (err) debug('tunnel duplex broken with %o because %s', address, err.message ?? err) + } + ) + cb(null, duplex) + }, + + /** + * @param {any} addr + */ + parse(addr) { + if (typeof addr === 'object') return addr + + const [prefix, hub, target] = addr.split(':') + if (prefix !== 'tunnel') return + return { prefix, hub, target } + }, + + stringify() { + return undefined + }, + } + + return self +} + +module.exports = makeTunnelPlugin diff --git a/lib/plugin-hub-client.js b/lib/plugin-hub-client.js new file mode 100644 index 0000000..a08503d --- /dev/null +++ b/lib/plugin-hub-client.js @@ -0,0 +1,78 @@ +// @ts-ignore +const DuplexPair = require('pull-pair/duplex') // @ts-ignore +const Notify = require('pull-notify') +const debug = require('debug')('ppppp:hub-client') +const makeTunnelPlugin = require('./ms-tunnel') +const { ErrorDuplex } = require('./utils') + +module.exports = { + name: 'hubClient', + manifest: { + add: 'async', + connect: 'duplex', + }, + /** + * @param {any} sstack + * @param {any} config + */ + init(sstack, config) { + if (!sstack.conn?.connect) { + throw new Error('hub-client is missing the required ssb-conn plugin') + } + + const hubs = new Map() + + sstack.multiserver.transport({ + name: 'tunnel', + create: makeTunnelPlugin(hubs, sstack), + }) + + // Setup discoveredAttendants source pull-stream + const _notifyDiscoveredAttendant = Notify() + function discoveredAttendants() { + return _notifyDiscoveredAttendant.listen() + } + // @ts-ignore + sstack.close.hook(function (fn, args) { + _notifyDiscoveredAttendant?.end() + // @ts-ignore + fn.apply(this, args) + }) + + return { + /** + * @param {string} origin + * @returns {import('pull-stream').Duplex} + */ + connect(origin) { + // @ts-ignore + const hub = this.id // FIXME: this comes from secret-stack + debug('received hubClient.connect(%s) via hub %s', origin, hub) + if (hubs.has(hub) && origin) { + debug('connect() will resolve because handler exists') + const handler = hubs.get(hub).handler + const [ins, outs] = DuplexPair() + handler(ins, origin) + return outs + } else { + return ErrorDuplex(`Could not connect to ${origin} via ${hub}`) + } + }, + + // Needed due to https://github.com/ssb-ngi-pointer/ssb-room-client/pull/3#issuecomment-808322434 + ping() { + return Date.now() + }, + + // Internal method, needed for api-plugin.ts + getHubsMap() { + return hubs + }, + + discoveredAttendants, + + // underscore so other modules IN THIS LIBRARY can use it + _notifyDiscoveredAttendant, + } + }, +} diff --git a/lib/plugin-hub.js b/lib/plugin-hub.js new file mode 100644 index 0000000..05d1814 --- /dev/null +++ b/lib/plugin-hub.js @@ -0,0 +1,56 @@ +const pull = require('pull-stream') +const { ErrorDuplex } = require('./utils') + +module.exports = { + name: 'hub', + version: '1.0.0', + manifest: { + ping: 'sync', + metadata: 'async', + attendants: 'source', + createTunnel: 'duplex', + createToken: 'async', + }, + permissions: { + anonymous: { + allow: ['createTunnel', 'ping', 'attendants', 'createToken', 'metadata'], + }, + }, + + /** + * @param {any} sstack + */ + init(sstack) { + return { + attendants() { + return pull.error(new Error('Not implemented on the client')) + }, + + /** + * @param {string} origin + * @returns {pull.Duplex} + */ + connect(origin) { + return ErrorDuplex('Not implemented on the client') + }, + + ping() { + throw new Error('Not implemented on the client') + }, + + /** + * @param {CallableFunction} cb + */ + createToken(cb) { + cb(new Error('Not implemented on the client')) + }, + + /** + * @param {CallableFunction} cb + */ + metadata(cb) { + cb(new Error('Not implemented on the client')) + }, + } + }, +} diff --git a/lib/utils.js b/lib/utils.js new file mode 100644 index 0000000..8917714 --- /dev/null +++ b/lib/utils.js @@ -0,0 +1,34 @@ +/** + * @param {Error | string | null | undefined} err + */ +function muxrpcMissing(err) { + if (!err) return false + const errString = + typeof err === 'string' + ? err + : typeof err.message === 'string' + ? err.message + : '' + return errString.endsWith('not in list of allowed methods') +} + +/** + * @param {string} message + * @returns {import('pull-stream').Duplex} + */ +function ErrorDuplex(message) { + const err = new Error(message) + return { + source(_abort, cb) { + cb(err) + }, + sink(read) { + read(err, () => {}) + }, + } +} + +module.exports = { + muxrpcMissing, + ErrorDuplex, +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..da70a04 --- /dev/null +++ b/package.json @@ -0,0 +1,54 @@ +{ + "name": "ppppp-hub-client", + "version": "0.0.1", + "description": "secret-stack plugin to connect to a ppppp-hub", + "author": "Andre Staltz ", + "license": "MIT", + "homepage": "https://github.com/staltz/ppppp-hub-client", + "repository": { + "type": "git", + "url": "git@github.com:staltz/ppppp-hub-client.git" + }, + "main": "index.js", + "files": [ + "lib/**/*.js" + ], + "exports": { + ".": { + "require": "./lib/index.js" + } + }, + "type": "commonjs", + "engines": { + "node": ">=16" + }, + "dependencies": { + "debug": "^4.3.4", + "promisify-tuple": "~1.2.0", + "pull-notify": "~0.1.2", + "pull-pair": "~1.1.0", + "pull-stream": "~3.7.0", + "ssb-network-errors": "~1.0.1" + }, + "devDependencies": { + "@types/debug": "^4.1.8", + "@types/pull-stream": "^3.6.2", + "bs58": "^5.0.0", + "c8": "7", + "husky": "^4.3.0", + "prettier": "^2.6.2", + "pretty-quick": "^3.1.3", + "typescript": "^5.1.3" + }, + "scripts": { + "test": "node --test", + "format-code": "prettier --write \"(lib|test)/**/*.js\"", + "format-code-staged": "pretty-quick --staged --pattern \"(lib|test)/**/*.js\"", + "coverage": "c8 --reporter=lcov npm run test" + }, + "husky": { + "hooks": { + "pre-commit": "npm run format-code-staged" + } + } +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..f6eb639 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,15 @@ +{ + "include": ["lib/**/*.js"], + "exclude": ["coverage/", "node_modules/", "test/"], + "compilerOptions": { + "checkJs": true, + "noEmit": true, + "exactOptionalPropertyTypes": true, + "forceConsistentCasingInFileNames": true, + "lib": ["es2021", "dom"], + "module": "node16", + "skipLibCheck": true, + "strict": true, + "target": "es2021" + } +} \ No newline at end of file