From f0070499d60c3dd8be888a55363101684c35c0e4 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Fri, 12 Jan 2024 13:15:51 +0200 Subject: [PATCH] init --- .github/workflows/node.js.yml | 25 +++ .gitignore | 7 + .prettierrc.yaml | 2 + LICENSE | 20 ++ README.md | 18 ++ declarations/atomic-file-rw.d.ts | 7 + declarations/multiserver.d.ts | 7 + declarations/pull-cat.d.ts | 4 + declarations/pull-notify.d.ts | 9 + declarations/pull-ping.d.ts | 5 + lib/connections.js | 276 +++++++++++++++++++++++++ lib/firewall.js | 11 + lib/index.js | 176 ++++++++++++++++ lib/infos.js | 91 ++++++++ lib/scheduler.js | 15 ++ lib/stats.js | 194 +++++++++++++++++ package.json | 79 +++++++ test/fixtures/absent/README.md | 1 + test/fixtures/corrupted/stats.json | 7 + test/fixtures/irrecoverable/stats.json | 12 ++ test/fixtures/present/stats.json | 11 + test/glue.test.js | 46 +++++ test/index.test.js | 85 ++++++++ test/stats.test.js | 78 +++++++ test/util.js | 80 +++++++ tsconfig.json | 17 ++ 26 files changed, 1283 insertions(+) create mode 100644 .github/workflows/node.js.yml create mode 100644 .gitignore create mode 100644 .prettierrc.yaml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 declarations/atomic-file-rw.d.ts create mode 100644 declarations/multiserver.d.ts create mode 100644 declarations/pull-cat.d.ts create mode 100644 declarations/pull-notify.d.ts create mode 100644 declarations/pull-ping.d.ts create mode 100644 lib/connections.js create mode 100644 lib/firewall.js create mode 100644 lib/index.js create mode 100644 lib/infos.js create mode 100644 lib/scheduler.js create mode 100644 lib/stats.js create mode 100644 package.json create mode 100644 test/fixtures/absent/README.md create mode 100644 test/fixtures/corrupted/stats.json create mode 100644 test/fixtures/irrecoverable/stats.json create mode 100644 test/fixtures/present/stats.json create mode 100644 test/glue.test.js create mode 100644 test/index.test.js create mode 100644 test/stats.test.js create mode 100644 test/util.js create mode 100644 tsconfig.json diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml new file mode 100644 index 0000000..b3558e2 --- /dev/null +++ b/.github/workflows/node.js.yml @@ -0,0 +1,25 @@ +name: CI + +on: + push: + branches: [master] + pull_request: + branches: [master] + +jobs: + test: + runs-on: ubuntu-latest + timeout-minutes: 10 + + strategy: + matrix: + node-version: [18.x, 20.x] + + steps: + - uses: actions/checkout@v3 + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v3 + with: + node-version: ${{ matrix.node-version }} + - run: npm install + - run: npm test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b093961 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +.vscode +node_modules +pnpm-lock.yaml +package-lock.json +coverage +*~ +lib/*.d.ts \ No newline at end of file diff --git a/.prettierrc.yaml b/.prettierrc.yaml new file mode 100644 index 0000000..1d2127c --- /dev/null +++ b/.prettierrc.yaml @@ -0,0 +1,2 @@ +semi: false +singleQuote: true diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7b90542 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +The MIT License (MIT) + +Copyright (c) 2024 Andre 'Staltz' Medeiros + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..fd8c41c --- /dev/null +++ b/README.md @@ -0,0 +1,18 @@ +**Work in progress** + +## Installation + +We're not on npm yet. In your package.json, include this as + +```js +"ppppp-net": "github:staltz/ppppp-net" +``` + +**TODO:** + +- [x] connect +- [x] stage +- [x] stats.json +- [ ] interpool glue +- [ ] firewall +- [ ] scheduler diff --git a/declarations/atomic-file-rw.d.ts b/declarations/atomic-file-rw.d.ts new file mode 100644 index 0000000..620bae9 --- /dev/null +++ b/declarations/atomic-file-rw.d.ts @@ -0,0 +1,7 @@ +type CB = (...args: [Error] | [null, T]) => void + +declare module 'atomic-file-rw' { + export function readFile(path: string, encoding: string, cb: CB): void; + export function writeFile(path: string, data: string, encoding: string, cb: CB): void; + export function deleteFile(path: string, cb: CB): void; +} \ No newline at end of file diff --git a/declarations/multiserver.d.ts b/declarations/multiserver.d.ts new file mode 100644 index 0000000..9584f6b --- /dev/null +++ b/declarations/multiserver.d.ts @@ -0,0 +1,7 @@ +declare module 'multiserver/plugins/net' { + interface NetPlugin { + parse(addr: string): { host: string; port: number } | undefined + } + function createNetPlugin(options: any): NetPlugin + export = createNetPlugin +} diff --git a/declarations/pull-cat.d.ts b/declarations/pull-cat.d.ts new file mode 100644 index 0000000..f79718c --- /dev/null +++ b/declarations/pull-cat.d.ts @@ -0,0 +1,4 @@ +declare module 'pull-cat' { + function concat(...args: Array): any; + export = concat; +} \ No newline at end of file diff --git a/declarations/pull-notify.d.ts b/declarations/pull-notify.d.ts new file mode 100644 index 0000000..db21e66 --- /dev/null +++ b/declarations/pull-notify.d.ts @@ -0,0 +1,9 @@ +declare module 'pull-notify' { + interface Notify { + (data: any): void; + listen(): unknown; + end(): void; + } + function CreateNotify(): Notify + export = CreateNotify +} diff --git a/declarations/pull-ping.d.ts b/declarations/pull-ping.d.ts new file mode 100644 index 0000000..5a0d411 --- /dev/null +++ b/declarations/pull-ping.d.ts @@ -0,0 +1,5 @@ + +declare module 'pull-ping' { + function pullPing(opts: {timeout: number, serve?: boolean}): unknown; + export = pullPing; +} \ No newline at end of file diff --git a/lib/connections.js b/lib/connections.js new file mode 100644 index 0000000..43fd74c --- /dev/null +++ b/lib/connections.js @@ -0,0 +1,276 @@ +const debug = require('debug')('ppppp:net:connections') +const createNotify = require('pull-notify') +const run = require('promisify-tuple') +const IP = require('ip') +const msNetPlugin = require('multiserver/plugins/net')({}) + +/** + * @typedef {import('./index').RpcConnectListener} RpcConnectListener + * @typedef {import('./index').Address} Address + * @typedef {import('./index').RPC} RPC + * @typedef {import('./index').Peer} Peer + * @typedef {import('./infos').Info} Info + * @typedef {import('./infos')} Infos + */ + +class Connections { + /** @type {Peer} */ + #peer + /** @type {Infos} */ + #infos + /** @type {boolean} */ + #closed + /** @type {ReturnType} */ + #notifyEvent + /** @type {Map} */ + #rpcs + + /** + * Used only to schedule a connect when a disconnect is in progress. + * @type {Set
} + */ + #connectRetries + + /** + * @param {Peer} peer + * @param {Infos} infos + */ + constructor(peer, infos) { + this.#peer = peer + this.#infos = infos + this.#closed = false + this.#notifyEvent = createNotify() + this.#rpcs = new Map() + this.#connectRetries = new Set() + + this.#peer.addListener('rpc:connect', this.#onRpcConnect) + } + + /** + * @param {Address} address + * @returns {Info['inferredType']} + */ + static inferPeerType(address) { + // TODO perhaps the `type` should be provided by each multiserver plugin? + // like when multiserver plugins provide the `stream.address` to secret-stack + if (address.startsWith('tunnel:')) return 'tunnel' + if (address.startsWith('net:')) { + const netAddr = address.split('~')[0] + const parsed = msNetPlugin.parse(netAddr) + if (parsed?.host) { + if (IP.isPrivate(parsed.host)) return 'lan' + else return 'internet' + } + } + return + } + + #assertNotClosed() { + if (this.#closed) { + throw new Error('This Connections instance is closed, create a new one.') + } + } + + /** + * @type {RpcConnectListener} + */ + #onRpcConnect = (rpc, weAreClient) => { + // Don't process self connections, whatever that means: + if (rpc.shse.pubkey === this.#peer.shse.pubkey) return + // This branch is already handled by this.connect() + if (weAreClient) return + + this.#prepareConnectedRPC(rpc.stream.address, rpc, weAreClient) + } + + /** + * @type {(address: Address, rpc: RPC, weAreClient: boolean) => void} + */ + #prepareConnectedRPC = (address, rpc, weAreClient) => { + const initiator = weAreClient ? 'we' : 'they' + debug('Connected to %s, %s initiated it', address, initiator) + + this.#rpcs.set(address, rpc) + rpc.once('closed', () => { + debug('Disconnected from %s', address) + this.#rpcs.delete(address) + this.#infos.update(address, { state: 'disconnected' }) + this.#notifyEvent({ type: 'disconnected', address }) + this.#infos.emit() + }) + + const state = /**@type {Info['state']}*/ ('connected') + const inferredType = Connections.inferPeerType(address) + this.#infos.update(address, { state, inferredType }) + this.#notifyEvent({ + type: state, + address, + details: { rpc, weAreClient }, + }) + this.#infos.emit() + } + + /** + * @param {string} address + * @returns {Promise} + */ + async connect(address) { + this.#assertNotClosed() + // this._assertValidAddress(address); + + const prevInfo = this.#infos.get(address) + switch (prevInfo?.state ?? 'disconnected') { + case 'connected': { + const rpc = this.#rpcs.get(address) + if (!rpc) { + // prettier-ignore + throw new Error(`Failed to connect to ${address} due to inconsistent internal state`); + } + return rpc + } + + case 'disconnecting': { + // If disconnecting, schedule a connect() after disconnection completed + this.#connectRetries.add(address) + // note: control flow should fall through below! + } + case 'connecting': { + return new Promise((resolve, reject) => { + let timeout = 100 + const checkAgain = () => { + const rpc = this.#rpcs.get(address) + if (rpc) resolve(rpc) + else if (timeout > 5 * 60e3) { + // prettier-ignore + reject(new Error(`Failed to connect to ${address} after waiting a long time`)) + } else { + timeout *= 2 + setTimeout(checkAgain, timeout) + } + } + checkAgain() + }) + } + + case 'disconnected': { + debug('Connecting to %s', address) + const state = /**@type {Info['state']}*/ ('connecting') + this.#infos.update(address, { state }) + this.#notifyEvent({ type: state, address }) + this.#infos.emit() + + const [err, rpc] = await run(this.#peer.connect)(address) + if (err) { + this.#infos.update(address, { state: 'disconnected' }) + debug('Failed to connect to %s because: %s', address, err.message) + this.#notifyEvent({ + type: 'connecting-failed', + address, + details: err, + }) + this.#infos.emit() + throw err + } + + const concurrentInfo = this.#infos.get(address) + if (!concurrentInfo || concurrentInfo.state !== 'connected') { + this.#prepareConnectedRPC(address, rpc, true) + return rpc + } else { + const rpc2 = this.#rpcs.get(address) + if (!rpc2) { + // prettier-ignore + throw new Error(`Failed to connect to ${address} due to inconsistent internal state`); + } + return rpc2 + } + } + + default: { + // prettier-ignore + debug('Unexpected control flow, peer %s has bad state %o', address, prevInfo) + // prettier-ignore + throw new Error(`Unexpected control flow, peer ${address} has bad state "${prevInfo?.state ?? '?'}"`) + } + } + } + + /** + * @param {Address} address + * @returns {Promise} + */ + async disconnect(address) { + this.#assertNotClosed() + const prevInfo = this.#infos.get(address) + if (!prevInfo || prevInfo?.state === 'disconnected') return false + if (prevInfo.state === 'disconnecting') return false + + /**@type {RPC}*/ + let rpc + if (prevInfo.state === 'connecting') { + rpc = await new Promise((resolve) => { + let timeout = 100 + const checkAgain = () => { + const rpc = this.#rpcs.get(address) + if (rpc) resolve(rpc) + else { + timeout *= 2 + timeout = Math.min(timeout, 30e3) + setTimeout(checkAgain, 100) + } + } + checkAgain() + }) + } else if (prevInfo.state === 'connected') { + const maybeRPC = this.#rpcs.get(address) + if (!maybeRPC) { + // prettier-ignore + throw new Error(`Failed to disconnect from ${address} due to inconsistent internal state`); + } else { + rpc = maybeRPC + } + } + + debug('Disconnecting from %s', address) + const state = /**@type {Info['state']}*/ ('disconnecting') + this.#infos.update(address, { state }) + this.#notifyEvent({ type: state, address }) + this.#infos.emit() + // @ts-ignore + await run(rpc.close)(true) + // Additional cleanup will execute in the "closed" event handler + + // Re-connect because while disconnect() was running, + // someone called connect() + if (this.#connectRetries.has(address)) { + this.#connectRetries.delete(address) + this.connect(address) + } + + return true + } + + listen() { + this.#assertNotClosed() + return this.#notifyEvent.listen() + } + + reset() { + if (this.#closed) return + for (const rpc of this.#rpcs.values()) { + rpc.close(true) + } + } + + close() { + this.reset() + this.#peer.removeListener('rpc:connect', this.#onRpcConnect) + this.#closed = true + this.#rpcs.clear() + this.#connectRetries.clear() + this.#notifyEvent.end() + debug('Closed') + } +} + +module.exports = Connections diff --git a/lib/firewall.js b/lib/firewall.js new file mode 100644 index 0000000..8277987 --- /dev/null +++ b/lib/firewall.js @@ -0,0 +1,11 @@ +class Firewall { + constructor() { + // FIXME: implement + } + + start() { + // FIXME: implement + } +} + +module.exports = Firewall \ No newline at end of file diff --git a/lib/index.js b/lib/index.js new file mode 100644 index 0000000..f6af342 --- /dev/null +++ b/lib/index.js @@ -0,0 +1,176 @@ +const pullPing = require('pull-ping') +const Path = require('path') +const Infos = require('./infos') +const Stats = require('./stats') +const Connections = require('./connections') +const Scheduler = require('./scheduler') + +/** + * @typedef {string} Address + * @typedef {(rpc: RPC, weAreClient: boolean) => void} RpcConnectListener + * @typedef {{ + * shse: {pubkey: string}; + * close: { + * (errOrEnd: boolean, cb?: CB): void, + * hook(hookIt: (this: unknown, fn: any, args: any) => any): void + * }; + * connect(address: string, cb: CB): void; + * once(event: 'closed', cb: CB): void; + * addListener(event: 'rpc:connect', listener: RpcConnectListener): void; + * removeListener(event: 'rpc:connect', listener: RpcConnectListener): void; + * }} Peer + * @typedef {Peer & {stream: {address: string}}} RPC + * @typedef {{ + * global: { + * path?: string + * timers?: { + * ping?: number + * }, + * }, + * net?: { + * autostart?: boolean, + * persistTimeout?: number, + * }, + * }} Config + * @typedef {Config & {global: {path: string}}} ExpectedConfig + * @typedef {import('./infos').Info} Info + */ + +/** + * @template T + * @typedef {(...args: [Error] | [null, T]) => void } CB + */ + +/** + * @param {Config} config + * @returns {asserts config is ExpectedConfig} + */ +function assertValidConfig(config) { + if (typeof config.global?.path !== 'string') { + throw new Error('net plugin requires config.global.path') + } +} + +/** + * @param {Peer} peer + * @param {Config} config + */ +function initNet(peer, config) { + assertValidConfig(config) + const autostart = config.net?.autostart ?? true + const netDir = Path.join(config.global.path, 'net') + + const infos = new Infos() + const stats = new Stats(netDir, infos, config.net?.persistTimeout) + const connections = new Connections(peer, infos) + const scheduler = new Scheduler() + + peer.close.hook(function (fn, args) { + scheduler.stop() + connections.close() + stats.close() + return fn.apply(this, args) + }) + + if (autostart) { + start() + } + + async function start() { + await stats.loaded() + queueMicrotask(scheduler.start.bind(scheduler)) + } + + function stop() { + scheduler.stop() + } + + /** + * @param {Address} address + * @param {Partial} info + */ + function stage(address, info) { + if (info.state) throw new Error('Cannot stage peer info with "state" field') + if (infos.has(address)) { + return false + } else { + infos.update(address, info) + return true + } + } + + /** + * @param {Address} address + * @param {CB} cb + */ + function connect(address, cb) { + connections.connect(address).then( + (result) => cb(null, result), + (err) => cb(err) + ) + } + + /** + * @param {Address} address + * @param {CB} cb + */ + function disconnect(address, cb) { + return connections.disconnect(address).then( + (result) => cb(null, result), + (err) => cb(err) + ) + } + + /** + * @param {Address} address + * @param {Info} info + */ + function updateInfo(address, info) { + infos.update(address, info) + } + + function listen() { + return connections.listen() + } + + function peers() { + return infos.liveEntries() + } + + function ping() { + const MIN = 10e3 // 10sec + const DEFAULT = 5 * 60e3 // 5min + const MAX = 30 * 60e3 // 30min + let timeout = config.global.timers?.ping ?? DEFAULT + timeout = Math.max(MIN, Math.min(timeout, MAX)) + return pullPing({ timeout }) + } + + return { + start, + stop, + stage, + connect, + disconnect, + updateInfo, + listen, + peers, + ping, + } +} + +exports.name = 'net' +exports.manifest = { + start: 'sync', + stop: 'sync', + stage: 'sync', + connect: 'async', + disconnect: 'async', + listen: 'source', + peers: 'source', + ping: 'duplex', +} +exports.permissions = { + anonymous: { allow: ['ping'] }, +} +exports.init = initNet diff --git a/lib/infos.js b/lib/infos.js new file mode 100644 index 0000000..bd01949 --- /dev/null +++ b/lib/infos.js @@ -0,0 +1,91 @@ +const createNotify = require('pull-notify') +const pullConcat = require('pull-cat') +const pull = require('pull-stream') + +/** + * @typedef {import('./index').Address} Address + * @typedef {import('./stats').StatsInfo} StatsInfo + * @typedef {{ + * state: 'connected' | 'disconnected' | 'connecting' | 'disconnecting', + * connBirth?: number, + * connUpdated?: number, + * inferredType?: 'internet' | 'lan' | 'tunnel' | undefined; + * stats?: StatsInfo + * }} Info + */ + +class Infos { + /** @type {Map} */ + #map + /** @type {ReturnType} */ + #notify + + constructor() { + this.#map = new Map() + this.#notify = createNotify() + } + + /** + * @param {Address} address + * @returns {Info | undefined} + */ + get(address) { + return this.#map.get(address) + } + + /** + * @param {Address} address + * @returns {boolean} + */ + has(address) { + return this.#map.has(address) + } + + /** + * @param {Address} address + * @param {Partial} info + * @returns {void} + */ + update(address, info) { + const now = Date.now() + const connUpdated = now // FIXME: not just conn + const prevInfo = this.#map.get(address) + if (prevInfo) { + for (const key of Object.keys(info)) { + const k = /**@type {keyof Info}*/ (key) + if (typeof info[k] === 'undefined') delete info[k] + } + this.#map.set(address, { ...prevInfo, connUpdated, ...info }) + } else if (!info.state) { + this.#map.set(address, { ...info, state: 'disconnected' }) + } else { + const connBirth = now + this.#map.set(address, { + .../**@type {Info}*/ (info), + connBirth, + connUpdated, + }) + } + } + + size() { + return this.#map.size + } + + emit() { + this.#notify(Array.from(this.#map.entries())) + } + + entries() { + return this.#map.entries() + } + + liveEntries() { + return pullConcat([ + pull.values([Array.from(this.#map.entries())]), + this.#notify.listen(), + ]) + } +} + +module.exports = Infos diff --git a/lib/scheduler.js b/lib/scheduler.js new file mode 100644 index 0000000..3556f79 --- /dev/null +++ b/lib/scheduler.js @@ -0,0 +1,15 @@ +class Scheduler { + constructor() { + // FIXME: implement + } + + start() { + // FIXME: implement + } + + stop() { + // FIXME: implement + } +} + +module.exports = Scheduler \ No newline at end of file diff --git a/lib/stats.js b/lib/stats.js new file mode 100644 index 0000000..a95f218 --- /dev/null +++ b/lib/stats.js @@ -0,0 +1,194 @@ +const Path = require('path') +const FS = require('fs') +const debug = require('debug')('ppppp:net:stats') +const atomic = require('atomic-file-rw') + +/** + * @typedef {import('./index').Address} Address + * @typedef {import('./infos')} Infos + * @typedef {{ + * mean: number; + * stdev: number; + * count: number; + * sum: number; + * sqsum: number; + * }} Statistics + * @typedef {{ + * birth?: number; + * key?: string; + * source?: string; + * failure?: number; + * stateChange?: number; + * duration?: Statistics; + * ping?: { + * rtt: Statistics; + * skew: Statistics; + * }; + * [name: string]: any; + * }} StatsInfo + */ + +/** + * @template T + * @typedef {import('./index').CB} CB + */ + +/** + * Automatically heal from corruption .json files. + * + * - Remove (some) extraneous characters from the end of the file + * - If nothing works, return empty object instead of crashing + */ +const SelfHealingJSONCodec = { + /** + * @param {any} obj + */ + encode(obj) { + return JSON.stringify(obj, null, 2) + }, + /** + * @param {any} input + * @returns {Record} + */ + decode(input) { + if (!input) return {} + const str = /**@type {string}*/ (input.toString()) + const MAX_TRIM = 10 + let foundCorruption = false + for (let i = 0; i < MAX_TRIM; i++) { + try { + return JSON.parse(str.substring(0, str.length - i)) + } catch (err) { + if (!foundCorruption) { + foundCorruption = true + // prettier-ignore + console.warn(`WARNING: ppppp-net found a corrupted ${Stats.FILENAME} file and is attempting to heal it`) + } + continue + } + } + console.error( + `ERROR! ppppp-net failed to heal corrupted ${Stats.FILENAME} file` + ) + return {} + }, +} + +class Stats { + /** @type {string} */ + #path + /** @type {number} */ + #persistTimeout + /** @type {boolean} */ + #closed + /** @type {Infos} */ + #infos + /** @type {Promise} */ + #loadedPromise + /** @type {(value: true) => void} */ + // @ts-ignore + #loadedResolve + /** @type {(reason: any) => void} */ + // @ts-ignore + #loadedReject + + static FILENAME = 'stats.json' + static DEFAULT_PERSIST_TIMEOUT = 2000 + + /** + * @param {string} dir + * @param {Infos} infos + * @param {number | undefined} persistTimeout + */ + constructor(dir, infos, persistTimeout) { + this.#path = Path.join(dir, Stats.FILENAME) + this.#persistTimeout = persistTimeout ?? Stats.DEFAULT_PERSIST_TIMEOUT + this.#closed = false + this.#infos = infos + this.#loadedPromise = new Promise((resolve, reject) => { + this.#loadedResolve = resolve + this.#loadedReject = reject + }) + + this.#readFromDisk(this.#path, (err, fileContents) => { + if (err) { + this.#loadedReject(err) + debug(`Failed to load ${Stats.FILENAME}`) + return + } else if (fileContents) { + const vals = SelfHealingJSONCodec.decode(fileContents) + for (const [address, statsInfo] of Object.entries(vals)) { + this.#infos.update(address, { stats: statsInfo }) + } + this.#loadedResolve(true) + debug('Loaded conn.json into ConnDB in memory') + } else { + atomic.writeFile(this.#path, '{}', 'utf8', () => {}) + this.#loadedResolve(true) + // prettier-ignore + debug(`Created new ${Stats.FILENAME} because there was no existing one.`); + return + } + }) + } + + /** + * @param {string} path + * @param {CB} cb + */ + #readFromDisk(path, cb) { + if (typeof localStorage !== 'undefined' && localStorage !== null) { + // In a browser + atomic.readFile(path, 'utf8', cb) + } else { + // In Node.js + if (FS.existsSync(path)) { + atomic.readFile(path, 'utf8', cb) + } else { + cb(null, null) + } + } + } + + /** + * @param {CB=} cb + * @returns {void} + */ + #writeToDisk(cb) { + if (this.#infos.size() === 0) return + debug(`Begun serializing and writing ${Stats.FILENAME}`) + const record = /**@type {Record}*/ ({}) + for (let [address, info] of this.#infos.entries()) { + if (info.stats) { + record[address] = info.stats + } + } + const json = SelfHealingJSONCodec.encode(record) + atomic.writeFile(this.#path, json, 'utf8', (err, x) => { + if (!err) debug(`Done serializing and writing ${Stats.FILENAME}`) + if (err) return cb?.(err) + cb?.(null, null) + }) + } + + close() { + this.#closed = true; + // FIXME: implement + // this._cancelScheduleWrite(); + // this._write(); + // this._map?.clear(); + // (this as any)._map = void 0; + // (this as any)._notify = void 0; + // (this as any)._stateFile = void 0; + debug('Closed the Stats instance'); + } + + /** + * @returns {Promise} + */ + loaded() { + return this.#loadedPromise + } +} + +module.exports = Stats diff --git a/package.json b/package.json new file mode 100644 index 0000000..8ce8565 --- /dev/null +++ b/package.json @@ -0,0 +1,79 @@ +{ + "name": "ppppp-net", + "version": "1.0.0", + "description": "PPPPP plugin to manage connections with hubs and peers", + "author": "Andre Staltz ", + "license": "MIT", + "homepage": "https://github.com/staltz/ppppp-net", + "repository": { + "type": "git", + "url": "git@github.com:staltz/ppppp-net.git" + }, + "main": "index.js", + "files": [ + "*.js", + "lib/*.js", + "lib/*.d.ts" + ], + "types": "types/index.d.ts", + "exports": { + ".": { + "require": "./lib/index.js" + } + }, + "type": "commonjs", + "engines": { + "node": ">=18" + }, + "dependencies": { + "@types/pull-stream": "^3.6.7", + "atomic-file-rw": "^0.3.0", + "debug": "^4.3.2", + "has-network2": ">=0.0.3", + "ip": "^1.1.5", + "multiserver": "3", + "on-change-network-strict": "1.0.0", + "on-wakeup": "^1.0.1", + "promisify-tuple": "^1.0.1", + "pull-cat": "~1.1.11", + "pull-notify": "^0.1.2", + "pull-pause": "~0.0.2", + "pull-ping": "^2.0.3", + "pull-stream": "^3.6.14", + "statistics": "^3.3.0", + "ziii": "~1.0.2" + }, + "devDependencies": { + "@types/debug": "^4.1.12", + "@types/ip": "^1.1.3", + "@types/node": "18", + "bs58": "^5.0.0", + "c8": "7", + "ppppp-caps": "file:../caps", + "ppppp-db": "file:../db", + "ppppp-dict": "file:../dict", + "ppppp-keypair": "file:../keypair", + "ppppp-set": "file:../set", + "prettier": "^2.6.2", + "pretty-quick": "^3.1.3", + "rimraf": "^4.4.0", + "secret-handshake-ext": "0.0.11", + "secret-stack": "~8.1.0", + "ssb-box": "^1.0.1", + "typescript": "^5.1.3" + }, + "scripts": { + "clean-check": "tsc --build --clean", + "prepublishOnly": "npm run clean-check && tsc --build", + "postpublish": "npm run clean-check", + "test": "npm run clean-check && node --test", + "format-code": "prettier --write \"(lib|test)/**/*.js\"", + "format-code-staged": "pretty-quick --staged --pattern \"(lib|test)/**/*.js\"", + "coverage": "c8 --reporter=lcov npm run test" + }, + "husky": { + "hooks": { + "pre-commit": "npm run format-code-staged" + } + } +} diff --git a/test/fixtures/absent/README.md b/test/fixtures/absent/README.md new file mode 100644 index 0000000..7738c42 --- /dev/null +++ b/test/fixtures/absent/README.md @@ -0,0 +1 @@ +The purpose of this file is to show, in git, that this directory should be empty, to test the use case where `stats.json` is absent. \ No newline at end of file diff --git a/test/fixtures/corrupted/stats.json b/test/fixtures/corrupted/stats.json new file mode 100644 index 0000000..6157e97 --- /dev/null +++ b/test/fixtures/corrupted/stats.json @@ -0,0 +1,7 @@ +{ + "net:staltz.com:8008~noauth": { + "source": "stored" + } +}, +} +}, \ No newline at end of file diff --git a/test/fixtures/irrecoverable/stats.json b/test/fixtures/irrecoverable/stats.json new file mode 100644 index 0000000..9a67845 --- /dev/null +++ b/test/fixtures/irrecoverable/stats.json @@ -0,0 +1,12 @@ +absolute +025xyyx rubbish !!!!file--------that +cannot be +healed because ======= +it's not-f-c-c +} +} +} +} +} +JSON#### +of any kind \ No newline at end of file diff --git a/test/fixtures/present/stats.json b/test/fixtures/present/stats.json new file mode 100644 index 0000000..81590e0 --- /dev/null +++ b/test/fixtures/present/stats.json @@ -0,0 +1,11 @@ +{ + "net:staltz.com:8008~noauth": { + "duration": { + "mean": 0, + "stdev": 0, + "count": 440, + "sum": 0, + "sqsum": 0 + } + } +} \ No newline at end of file diff --git a/test/glue.test.js b/test/glue.test.js new file mode 100644 index 0000000..5745cb0 --- /dev/null +++ b/test/glue.test.js @@ -0,0 +1,46 @@ +const test = require('node:test') +const assert = require('node:assert') +const p = require('node:util').promisify +const { createPeerMock } = require('./util') + +const TEST_ADDR = + 'net:localhost:9752~shse:EqTMFv7zm8hpPyAkj789qdJgqtz81AEbcinpAs24RRUC' + +test('Glueing together stats with connections', async (t) => { + await t.test('stage() is ignored when peer already connected', async () => { + const peer = createPeerMock() + + const address = TEST_ADDR + const result = await p(peer.net.connect)(address) + assert.ok(result, 'connect was succesful') + + const entriesBefore = await p(peer.net.peers())(null) + assert.equal(entriesBefore.length, 1, 'there is one entry in peers()') + assert.equal(entriesBefore[0][0], address, 'entry addr ok') + assert.equal(entriesBefore[0][1].state, 'connected', 'entry state ok') + + const stagingResult = peer.net.stage(address, { mode: 'internet' }) + assert.equal(stagingResult, false, 'stage() should refuse') + + const entriesAfter = await p(peer.net.peers())(null) + assert.equal(entriesAfter.length, 1, 'there is one entry in peers()') + assert.equal(entriesAfter[0][0], address, 'entry addr ok') + assert.equal(entriesAfter[0][1].state, 'connected', 'entry state ok') + }) + + await t.test('stage() successful', async (t) => { + const peer = createPeerMock() + const address = TEST_ADDR + + const entriesBefore = await p(peer.net.peers())(null) + assert.equal(entriesBefore.length, 0, 'there is no entry in peers()') + + const stagingResult = peer.net.stage(address, { mode: 'internet' }) + assert.equal(stagingResult, true, 'stage() should refuse') + + const entriesAfter = await p(peer.net.peers())(null) + assert.equal(entriesAfter.length, 1, 'there is one entry in peers()') + assert.equal(entriesAfter[0][0], address, 'entry addr ok') + assert.equal(entriesAfter[0][1].state, 'disconnected', 'entry state ok') + }) +}) diff --git a/test/index.test.js b/test/index.test.js new file mode 100644 index 0000000..8f4422d --- /dev/null +++ b/test/index.test.js @@ -0,0 +1,85 @@ +const test = require('node:test') +const assert = require('node:assert') +const p = require('node:util').promisify +const pull = require('pull-stream') +const { createPeer } = require('./util') + +const TEST_ADDR = + 'net:localhost:9752~shse:EqTMFv7zm8hpPyAkj789qdJgqtz81AEbcinpAs24RRUC' + +test('net', async (t) => { + await t.test('connect() rejects given unreachable address', async () => { + const peer = createPeer({ name: 'alice' }) + await assert.rejects(p(peer.net.connect)(TEST_ADDR), /ECONNREFUSED/) + await p(peer.close)(true) + }) + + await t.test('peers() emits all entries as they update', async () => { + const peer = createPeer({ name: 'alice' }) + + await new Promise((resolve, reject) => { + let i = 0 + pull( + peer.net.peers(), + pull.drain((entries) => { + ++i + if (i === 1) { + assert('FIRST EMISSION') + assert.equal(entries.length, 0, 'entries === []') + } else if (i === 2) { + assert('SECOND EMISSION') + assert.equal(entries.length, 1, 'there is one entry') + const entry = entries[0] + assert.equal(entry[0], TEST_ADDR, 'left is the address') + assert.equal(typeof entry[1], 'object', 'right is the data') + assert.equal(entry[1].state, 'connecting', 'state is connecting') + } else if (i === 3) { + assert('THIRD EMISSION') + assert.equal(entries.length, 1, 'entries === []') + const entry = entries[0] + assert.equal(entry[0], TEST_ADDR, 'left is the address') + assert.equal(typeof entry[1], 'object', 'right is the data') + assert.equal(entry[1].state, 'disconnected', 'state disconnected') + resolve() + } else { + reject(new Error('too many emissions')) + } + }) + ) + + peer.net.connect(TEST_ADDR, () => {}) + }) + + await p(peer.close)(true) + }) + + await t.test('listen() emits events', async () => { + const peer = createPeer({ name: 'alice' }) + + await new Promise((resolve, reject) => { + let i = 0 + pull( + peer.net.listen(), + pull.drain((ev) => { + ++i + if (i === 1) { + assert.equal(ev.type, 'connecting', 'event.type ok') + assert.equal(ev.address, TEST_ADDR, 'event.address ok') + } else if (i === 2) { + assert.equal(ev.type, 'connecting-failed', 'event.type ok') + assert.equal(ev.address, TEST_ADDR, 'event.address ok') + assert.ok(ev.details, 'event.details ok') + assert.equal(ev.details.code, 'ECONNREFUSED', 'event.details err') + resolve() + } else { + reject(new Error('too many emissions')) + } + }) + ) + + peer.net.connect(TEST_ADDR, () => {}) + }) + + await p(peer.close)(true) + }) +}) diff --git a/test/stats.test.js b/test/stats.test.js new file mode 100644 index 0000000..2371c19 --- /dev/null +++ b/test/stats.test.js @@ -0,0 +1,78 @@ +const test = require('node:test') +const assert = require('node:assert') +const Path = require('node:path') +const FS = require('node:fs') +const p = require('node:util').promisify +const Stats = require('../lib/stats') +const Infos = require('../lib/infos') + +test('Stats', async (t) => { + await t.test('Recovers from corrupted JSON file', async () => { + const dirPath = Path.join(__dirname, './fixtures/corrupted') + const infos = new Infos() + const stats = new Stats(dirPath, infos) + assert.ok(stats, 'Stats instance was created') + + const entriesBefore = Array.from(infos.entries()) + assert.equal(entriesBefore.length, 0, 'before loaded(), there is no data') + + await stats.loaded() + const entriesAfter = Array.from(infos.entries()) + assert.equal(entriesAfter.length, 1, 'after loaded(), there is data') + const [address, info] = entriesAfter[0] + assert.equal(address, 'net:staltz.com:8008~noauth', 'the address looks ok') + assert.equal(info.stats.source, 'stored', 'the info looks ok') + }) + + await t.test('Creates JSON file when it is absent', async () => { + const dirPath = Path.join(__dirname, './fixtures/absent') + const statsJSONPath = Path.join(dirPath, './stats.json') + assert.equal(FS.existsSync(statsJSONPath), false, 'stats.json doesnt exist') + + const infos = new Infos() + const stats = new Stats(dirPath, infos) + assert.ok(stats, 'Stats instance was created') + + while (FS.existsSync(statsJSONPath) === false) { + await p(setTimeout)(1) + } + + const fileContents = FS.readFileSync(statsJSONPath, 'utf8') + assert.equal(fileContents, '{}', 'stats.json data should be empty JSON') + + FS.unlinkSync(statsJSONPath) + }) + + await t.test('Loads when JSON file is present', async () => { + const dirPath = Path.join(__dirname, './fixtures/present') + const statsJSONPath = Path.join(dirPath, './stats.json') + assert.equal(FS.existsSync(statsJSONPath), true, 'stats.json exists') + + const infos = new Infos() + const stats = new Stats(dirPath, infos) + assert.ok(stats, 'Stats instance was created') + + await stats.loaded() + + const entries = Array.from(infos.entries()) + assert.equal(entries.length === 1, true, 'stats has one entry') + assert.equal(entries[0][0], 'net:staltz.com:8008~noauth', 'entry addr ok') + assert.ok(entries[0][1].stats.duration, 'entry stats.duration ok') + }) + + await t.test('Cannot recover from totally broken JSON file', async () => { + const dirPath = Path.join(__dirname, './fixtures/irrecoverable') + + const infos = new Infos() + const stats = new Stats(dirPath, infos) + assert.ok(stats, 'Stats instance was created') + + const entriesBefore = Array.from(infos.entries()) + assert.equal(entriesBefore.length, 0, 'before loaded(), there is no data') + + await stats.loaded() + + const entriesAfter = Array.from(infos.entries()) + assert.equal(entriesAfter.length, 0, 'after loaded(), there is data') + }) +}) diff --git a/test/util.js b/test/util.js new file mode 100644 index 0000000..b87de50 --- /dev/null +++ b/test/util.js @@ -0,0 +1,80 @@ +const OS = require('node:os') +const FS = require('node:fs') +const Path = require('node:path') +const rimraf = require('rimraf') +const caps = require('ppppp-caps') +const Keypair = require('ppppp-keypair') +const net = require('../lib/index') + +function createPeer(config) { + if (config.name) { + const name = config.name + const tmp = OS.tmpdir() + config.global ??= {} + config.global.path ??= Path.join(tmp, `ppppp-net-${name}-${Date.now()}`) + config.global.keypair ??= Keypair.generate('ed25519', name) + delete config.name + } + if (!config.global) { + throw new Error('need config.global in createPeer()') + } + if (!config.global.path) { + throw new Error('need config.global.path in createPeer()') + } + if (!config.global.keypair) { + throw new Error('need config.global.keypair in createPeer()') + } + + rimraf.sync(config.global.path) + return require('secret-stack/bare')() + .use(require('secret-stack/plugins/net')) + .use(require('secret-handshake-ext/secret-stack')) + .use(net) + .call(null, { + shse: { caps }, + ...config, + global: { + connections: { + incoming: { + net: [{ scope: 'device', transform: 'shse', port: null }], + }, + outgoing: { + net: [{ transform: 'shse' }], + }, + }, + ...config.global, + }, + }) +} + +function createPeerMock() { + const testPath = FS.mkdtempSync(Path.join(OS.tmpdir(), 'ppppp-net-')) + + const mockPeer = { + addListener() {}, + close: { + hook: () => {}, + }, + post: () => {}, + connect: (_address, cb) => { + setTimeout(() => { + cb(null, { + once: () => {}, + }) + }, 200) + }, + once: () => {}, + } + const mockConfig = { + global: { + path: testPath, + }, + // shse: { caps } + } + + mockPeer.net = net.init(mockPeer, mockConfig) + + return mockPeer +} + +module.exports = { createPeer, createPeerMock } diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..7bd7313 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,17 @@ +{ + "include": ["declarations", "lib/**/*.js"], + "exclude": ["coverage/", "node_modules/", "test/"], + "compilerOptions": { + "checkJs": true, + "declaration": true, + "emitDeclarationOnly": true, + "exactOptionalPropertyTypes": true, + "forceConsistentCasingInFileNames": true, + "lib": ["es2022", "dom"], + "module": "node16", + "skipLibCheck": true, + "strict": true, + "target": "es2022", + "typeRoots": ["node_modules/@types", "declarations"] + } +} \ No newline at end of file