diff --git a/README.md b/README.md index fd8c41c..72e1e40 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ We're not on npm yet. In your package.json, include this as - [x] connect - [x] stage - [x] stats.json -- [ ] interpool glue +- [x] interpool glue +- [ ] writing hubs to stats.json - [ ] firewall - [ ] scheduler diff --git a/declarations/atomic-file-rw.d.ts b/declarations/atomic-file-rw.d.ts index 620bae9..59c2868 100644 --- a/declarations/atomic-file-rw.d.ts +++ b/declarations/atomic-file-rw.d.ts @@ -1,7 +1,12 @@ type CB = (...args: [Error] | [null, T]) => void declare module 'atomic-file-rw' { - export function readFile(path: string, encoding: string, cb: CB): void; - export function writeFile(path: string, data: string, encoding: string, cb: CB): void; - export function deleteFile(path: string, cb: CB): void; -} \ No newline at end of file + export function readFile(path: string, encoding: string, cb: CB): void + export function writeFile( + path: string, + data: string, + encoding: string, + cb: CB + ): void + export function deleteFile(path: string, cb: CB): void +} diff --git a/declarations/obz.d.ts b/declarations/obz.d.ts new file mode 100644 index 0000000..11de2fc --- /dev/null +++ b/declarations/obz.d.ts @@ -0,0 +1,10 @@ +declare module 'obz' { + type Remove = () => void + export interface Obz { + (listener: (value: X) => void): Remove + set(value: X): unknown + value: X + } + function createObz(): Obz + export = createObz +} \ No newline at end of file diff --git a/declarations/pull-cat.d.ts b/declarations/pull-cat.d.ts index f79718c..d4e5c7d 100644 --- a/declarations/pull-cat.d.ts +++ b/declarations/pull-cat.d.ts @@ -1,4 +1,4 @@ declare module 'pull-cat' { - function concat(...args: Array): any; - export = concat; -} \ No newline at end of file + function concat(...args: Array): any + export = concat +} diff --git a/declarations/pull-notify.d.ts b/declarations/pull-notify.d.ts index db21e66..daa2d84 100644 --- a/declarations/pull-notify.d.ts +++ b/declarations/pull-notify.d.ts @@ -1,8 +1,8 @@ declare module 'pull-notify' { interface Notify { - (data: any): void; - listen(): unknown; - end(): void; + (data: any): void + listen(): unknown + end(): void } function CreateNotify(): Notify export = CreateNotify diff --git a/declarations/pull-ping.d.ts b/declarations/pull-ping.d.ts index 5a0d411..927a4ae 100644 --- a/declarations/pull-ping.d.ts +++ b/declarations/pull-ping.d.ts @@ -1,5 +1,16 @@ - declare module 'pull-ping' { - function pullPing(opts: {timeout: number, serve?: boolean}): unknown; - export = pullPing; -} \ No newline at end of file + type Abort = Error | boolean | null + type EndOrError = Abort + type SourceCallback = (end: EndOrError, data?: T) => void + type Source = (endOrError: Abort, cb: SourceCallback) => void + export interface PullPingDuplex { + (endOrError: Abort, cb: SourceCallback): void + rtt: any + skew: any + } + function pullPing( + opts: { timeout: number; serve?: boolean }, + cb?: CallableFunction + ): PullPingDuplex + export = pullPing +} diff --git a/declarations/statistics.d.ts b/declarations/statistics.d.ts new file mode 100644 index 0000000..faa5990 --- /dev/null +++ b/declarations/statistics.d.ts @@ -0,0 +1,14 @@ +declare module 'statistics' { + export interface Statistics { + mean: number + stdev: number + count: number + sum: number + sqsum: number + } + function stats( + x: Statistics | number | null | undefined, + y: number + ): Statistics + export = stats +} diff --git a/lib/connections.js b/lib/connections.js index 43fd74c..b2ab823 100644 --- a/lib/connections.js +++ b/lib/connections.js @@ -2,7 +2,6 @@ const debug = require('debug')('ppppp:net:connections') const createNotify = require('pull-notify') const run = require('promisify-tuple') const IP = require('ip') -const msNetPlugin = require('multiserver/plugins/net')({}) /** * @typedef {import('./index').RpcConnectListener} RpcConnectListener @@ -11,6 +10,22 @@ const msNetPlugin = require('multiserver/plugins/net')({}) * @typedef {import('./index').Peer} Peer * @typedef {import('./infos').Info} Info * @typedef {import('./infos')} Infos + * @typedef {{ + * type: + * | 'connecting' + * | 'connected' + * | 'connecting-failed' + * | 'disconnecting' + * | 'disconnected'; + * address: Address; + * parsedAddress: any; + * details?: any; + * }} ConnectionEvent + * @typedef {{ + * (ev: ConnectionEvent): void; + * listen: () => import('pull-stream').Source; + * end: () => void; + * }} NotifyEvent */ class Connections { @@ -20,7 +35,7 @@ class Connections { #infos /** @type {boolean} */ #closed - /** @type {ReturnType} */ + /** @type {NotifyEvent} */ #notifyEvent /** @type {Map} */ #rpcs @@ -39,7 +54,7 @@ class Connections { this.#peer = peer this.#infos = infos this.#closed = false - this.#notifyEvent = createNotify() + this.#notifyEvent = /**@type {any}*/ (createNotify()) this.#rpcs = new Map() this.#connectRetries = new Set() @@ -47,18 +62,15 @@ class Connections { } /** - * @param {Address} address + * @param {Array} parsedAddress * @returns {Info['inferredType']} */ - static inferPeerType(address) { - // TODO perhaps the `type` should be provided by each multiserver plugin? - // like when multiserver plugins provide the `stream.address` to secret-stack - if (address.startsWith('tunnel:')) return 'tunnel' - if (address.startsWith('net:')) { - const netAddr = address.split('~')[0] - const parsed = msNetPlugin.parse(netAddr) - if (parsed?.host) { - if (IP.isPrivate(parsed.host)) return 'lan' + static inferPeerType(parsedAddress) { + for (const subParsed of parsedAddress) { + const [transport, transform] = subParsed + if (transport.name === 'tunnel') return 'tunnel' + if (transport.name === 'net') { + if (IP.isPrivate(transport.host)) return 'lan' else return 'internet' } } @@ -90,33 +102,50 @@ class Connections { const initiator = weAreClient ? 'we' : 'they' debug('Connected to %s, %s initiated it', address, initiator) + const parsedAddress = this.#peer.multiserver.parse(address) this.#rpcs.set(address, rpc) rpc.once('closed', () => { debug('Disconnected from %s', address) this.#rpcs.delete(address) this.#infos.update(address, { state: 'disconnected' }) - this.#notifyEvent({ type: 'disconnected', address }) + this.#notifyEvent({ type: 'disconnected', address, parsedAddress }) this.#infos.emit() }) const state = /**@type {Info['state']}*/ ('connected') - const inferredType = Connections.inferPeerType(address) + const inferredType = Connections.inferPeerType(parsedAddress) this.#infos.update(address, { state, inferredType }) this.#notifyEvent({ type: state, address, + parsedAddress, details: { rpc, weAreClient }, }) this.#infos.emit() } + /** + * TODO: We need to fix secret-stack so that it has a ".ready()" async method, + * after it has initialized multiserver. Then we can remove this hack. + */ + async #waitForSecretStackToInitMultiserver() { + for( let i = 0; i < 10; i++ ) { + try { + this.#peer.multiserver.parse('') + return + } catch (err) { + await new Promise((resolve) => setTimeout(resolve, 16)) + } + } + } + /** * @param {string} address * @returns {Promise} */ async connect(address) { this.#assertNotClosed() - // this._assertValidAddress(address); + await this.#waitForSecretStackToInitMultiserver() const prevInfo = this.#infos.get(address) switch (prevInfo?.state ?? 'disconnected') { @@ -155,8 +184,9 @@ class Connections { case 'disconnected': { debug('Connecting to %s', address) const state = /**@type {Info['state']}*/ ('connecting') + const parsedAddress = this.#peer.multiserver.parse(address) this.#infos.update(address, { state }) - this.#notifyEvent({ type: state, address }) + this.#notifyEvent({ type: state, address, parsedAddress }) this.#infos.emit() const [err, rpc] = await run(this.#peer.connect)(address) @@ -166,6 +196,7 @@ class Connections { this.#notifyEvent({ type: 'connecting-failed', address, + parsedAddress, details: err, }) this.#infos.emit() @@ -201,6 +232,7 @@ class Connections { */ async disconnect(address) { this.#assertNotClosed() + await this.#waitForSecretStackToInitMultiserver() const prevInfo = this.#infos.get(address) if (!prevInfo || prevInfo?.state === 'disconnected') return false if (prevInfo.state === 'disconnecting') return false @@ -233,8 +265,9 @@ class Connections { debug('Disconnecting from %s', address) const state = /**@type {Info['state']}*/ ('disconnecting') + const parsedAddress = this.#peer.multiserver.parse(address) this.#infos.update(address, { state }) - this.#notifyEvent({ type: state, address }) + this.#notifyEvent({ type: state, address, parsedAddress }) this.#infos.emit() // @ts-ignore await run(rpc.close)(true) @@ -250,9 +283,12 @@ class Connections { return true } + /** + * @returns {import('pull-stream').Source} + */ listen() { this.#assertNotClosed() - return this.#notifyEvent.listen() + return /**@type {any}*/ (this.#notifyEvent.listen()) } reset() { diff --git a/lib/glue.js b/lib/glue.js new file mode 100644 index 0000000..b7b0327 --- /dev/null +++ b/lib/glue.js @@ -0,0 +1,114 @@ +const pull = require('pull-stream') +const stats = require('statistics') +const ping = require('pull-ping') + +/** + * @typedef {import('./index').Address} Address + * @typedef {import('./index').RPC} RPC + * @typedef {import('./index').Peer} Peer + * @typedef {import('./connections')} Connections + * @typedef {import('./connections').ConnectionEvent} Event + * @typedef {import('./infos')} Infos + */ + +const PROGRAM_STARTUP = Date.now() + +/** + * @param {Infos} infos + * @param {Connections} connections + */ +function glue(infos, connections) { + /** + * @param {Address} address + * @param {RPC} rpc + */ + function setupPing(address, rpc) { + const PING_TIMEOUT = 5 * 6e4 // 5 minutes + const pp = ping({ serve: true, timeout: PING_TIMEOUT }, () => {}) + infos.updateStats(address, () => ({ + ping: { + rtt: pp.rtt, + skew: pp.skew, + }, + })) + pull( + pp, + rpc.net.ping({ timeout: PING_TIMEOUT }, (err, _) => { + console.warn('remote peer ping err', err) + // if (err?.name === 'TypeError') { + // infos.update(address, {stats: {ping: {fail: true}}}); + // } + }), + pp + ) + } + + /** + * @param {Event} ev + */ + function onConnectingFailed(ev) { + infos.updateStats(ev.address, (prevStats) => ({ + failure: (prevStats?.failure ?? 0) + 1, + stateChange: Date.now(), + duration: stats(prevStats?.duration, 0), + })) + } + + /** + * @param {Event} ev + */ + function onConnected(ev) { + infos.updateStats(ev.address, () => ({ + stateChange: Date.now(), + failure: 0, + })) + if (ev.details.weAreClient) setupPing(ev.address, ev.details.rpc) + } + + /** + * @param {Event} ev + */ + function bumpStateChange(ev) { + infos.updateStats(ev.address, () => ({ + stateChange: Date.now(), + })) + } + + /** + * @param {Event} ev + */ + function onDisconnected(ev) { + infos.updateStats(ev.address, (prevStats) => ({ + stateChange: Date.now(), + duration: stats( + prevStats?.duration, + Date.now() - (prevStats?.stateChange ?? PROGRAM_STARTUP) + ), + })) + } + + pull( + connections.listen(), + pull.drain((ev) => { + switch (ev.type) { + case 'connecting': + case 'disconnecting': + bumpStateChange(ev) + break + case 'connecting-failed': + onConnectingFailed(ev) + break + case 'connected': + onConnected(ev) + break + case 'disconnected': + onDisconnected(ev) + break + default: + throw new Error('Unknown connection event type: ' + ev.type) + } + }) + ) +} + +module.exports = glue diff --git a/lib/index.js b/lib/index.js index f0b4dc8..c9f7151 100644 --- a/lib/index.js +++ b/lib/index.js @@ -4,8 +4,11 @@ const Infos = require('./infos') const Stats = require('./stats') const Connections = require('./connections') const Scheduler = require('./scheduler') +const glue = require('./glue') /** + * @typedef {import('pull-stream').Duplex} Duplex + * @typedef {import('./connections').ConnectionEvent} ConnectionEvent * @typedef {string} Address * @typedef {(rpc: RPC, weAreClient: boolean) => void} RpcConnectListener * @typedef {{ @@ -18,6 +21,13 @@ const Scheduler = require('./scheduler') * once(event: 'closed', cb: CB): void; * addListener(event: 'rpc:connect', listener: RpcConnectListener): void; * removeListener(event: 'rpc:connect', listener: RpcConnectListener): void; + * multiserver: { + * parse(address: string): any + * }, + * net: { + * ping(opts: {timeout: number}, cb: CB): Duplex; + * listen(): import('pull-stream').Source; + * }, * }} Peer * @typedef {Peer & {stream: {address: string}}} RPC * @typedef {{ @@ -78,6 +88,7 @@ function initNet(peer, config) { async function start() { await stats.loaded() + glue(infos, connections) queueMicrotask(scheduler.start.bind(scheduler)) } @@ -121,6 +132,15 @@ function initNet(peer, config) { ) } + /** + * @param {Address} address + */ + function forget(address) { + disconnect(address, () => { + infos.remove(address) + }) + } + /** * @param {Address} address * @param {Info} info @@ -152,6 +172,7 @@ function initNet(peer, config) { stage, connect, disconnect, + forget, updateInfo, listen, peers, @@ -164,6 +185,7 @@ exports.manifest = { start: 'sync', stop: 'sync', stage: 'sync', + forget: 'sync', connect: 'async', disconnect: 'async', listen: 'source', diff --git a/lib/infos.js b/lib/infos.js index bd01949..3c5e2ee 100644 --- a/lib/infos.js +++ b/lib/infos.js @@ -1,14 +1,18 @@ const createNotify = require('pull-notify') const pullConcat = require('pull-cat') const pull = require('pull-stream') +const Obz = require('obz') + +/** + * @template T + * @typedef {import('obz').Obz} Obz + */ /** * @typedef {import('./index').Address} Address * @typedef {import('./stats').StatsInfo} StatsInfo * @typedef {{ * state: 'connected' | 'disconnected' | 'connecting' | 'disconnecting', - * connBirth?: number, - * connUpdated?: number, * inferredType?: 'internet' | 'lan' | 'tunnel' | undefined; * stats?: StatsInfo * }} Info @@ -19,10 +23,13 @@ class Infos { #map /** @type {ReturnType} */ #notify + /** @type {Obz
} */ + #onStatsUpdated constructor() { this.#map = new Map() this.#notify = createNotify() + this.#onStatsUpdated = Obz() } /** @@ -47,25 +54,55 @@ class Infos { * @returns {void} */ update(address, info) { - const now = Date.now() - const connUpdated = now // FIXME: not just conn + const hasNewStats = !!info.stats const prevInfo = this.#map.get(address) if (prevInfo) { for (const key of Object.keys(info)) { const k = /**@type {keyof Info}*/ (key) if (typeof info[k] === 'undefined') delete info[k] } - this.#map.set(address, { ...prevInfo, connUpdated, ...info }) + this.#map.set(address, { ...prevInfo, ...info }) } else if (!info.state) { this.#map.set(address, { ...info, state: 'disconnected' }) } else { - const connBirth = now - this.#map.set(address, { - .../**@type {Info}*/ (info), - connBirth, - connUpdated, - }) + this.#map.set(address, /**@type {Info}*/ (info)) } + if (hasNewStats) { + this.#onStatsUpdated.set(address) + } + } + + /** + * @param {Address} address + * @param {(prevStats: Partial) => Partial} getStats + * @returns {void} + */ + updateStats(address, getStats) { + const prevInfo = this.#map.get(address) + if (!prevInfo) return + this.#map.set(address, { + ...prevInfo, + stats: { + ...prevInfo?.stats, + ...getStats(prevInfo?.stats), + }, + }) + this.#onStatsUpdated.set(address) + } + + /** + * @param {Parameters>[0]} listener + */ + onStatsUpdated(listener) { + return this.#onStatsUpdated(listener) + } + + /** + * @param {Address} address + */ + remove(address) { + this.#map.delete(address) + this.#onStatsUpdated.set(address) } size() { diff --git a/lib/stats.js b/lib/stats.js index a95f218..5a3188c 100644 --- a/lib/stats.js +++ b/lib/stats.js @@ -6,13 +6,7 @@ const atomic = require('atomic-file-rw') /** * @typedef {import('./index').Address} Address * @typedef {import('./infos')} Infos - * @typedef {{ - * mean: number; - * stdev: number; - * count: number; - * sum: number; - * sqsum: number; - * }} Statistics + * @typedef {import('statistics').Statistics} Statistics * @typedef {{ * birth?: number; * key?: string; @@ -91,6 +85,8 @@ class Stats { /** @type {(reason: any) => void} */ // @ts-ignore #loadedReject + /** @type {NodeJS.Timeout | null} */ + #scheduledWriteTask static FILENAME = 'stats.json' static DEFAULT_PERSIST_TIMEOUT = 2000 @@ -109,6 +105,7 @@ class Stats { this.#loadedResolve = resolve this.#loadedReject = reject }) + this.#scheduledWriteTask = null this.#readFromDisk(this.#path, (err, fileContents) => { if (err) { @@ -130,6 +127,10 @@ class Stats { return } }) + + this.#infos.onStatsUpdated(() => { + if (!this.#closed) this.#scheduleWrite() + }) } /** @@ -150,12 +151,34 @@ class Stats { } } + #cancelScheduleWrite() { + if (this.#scheduledWriteTask) { + clearTimeout(this.#scheduledWriteTask) + } + } + + #scheduleWrite() { + if (this.#persistTimeout === 0) { + this.#writeToDisk() + return + } + + this.#cancelScheduleWrite() + this.#scheduledWriteTask = setTimeout(() => { + this.#writeToDisk((err, _) => { + if (err) { + console.error(`Failed to write to disk ${Stats.FILENAME}`, err) + } + this.#scheduledWriteTask = null + }) + }, this.#persistTimeout) + } + /** * @param {CB=} cb * @returns {void} */ #writeToDisk(cb) { - if (this.#infos.size() === 0) return debug(`Begun serializing and writing ${Stats.FILENAME}`) const record = /**@type {Record}*/ ({}) for (let [address, info] of this.#infos.entries()) { @@ -172,15 +195,11 @@ class Stats { } close() { - this.#closed = true; - // FIXME: implement - // this._cancelScheduleWrite(); - // this._write(); - // this._map?.clear(); - // (this as any)._map = void 0; - // (this as any)._notify = void 0; - // (this as any)._stateFile = void 0; - debug('Closed the Stats instance'); + this.#closed = true + this.#cancelScheduleWrite() + this.#writeToDisk() + ;/**@type {any}*/ (this).#infos = void 0 + debug('Closed the Stats instance') } /** diff --git a/package.json b/package.json index bcee009..fd9350a 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "debug": "^4.3.2", "has-network2": ">=0.0.3", "ip": "^1.1.5", - "multiserver": "3", + "obz": "~1.1.0", "on-change-network-strict": "1.0.0", "on-wakeup": "^1.0.1", "promisify-tuple": "^1.0.1", @@ -50,14 +50,11 @@ "bs58": "^5.0.0", "c8": "7", "ppppp-caps": "github:staltz/ppppp-caps", - "ppppp-db": "github:staltz/ppppp-db", - "ppppp-dict": "github:staltz/ppppp-dict", "ppppp-keypair": "github:staltz/ppppp-keypair", - "ppppp-set": "github:staltz/ppppp-set", "prettier": "^2.6.2", "pretty-quick": "^3.1.3", "rimraf": "^4.4.0", - "secret-handshake-ext": "0.0.11", + "secret-handshake-ext": "0.0.12", "secret-stack": "~8.1.0", "ssb-box": "^1.0.1", "typescript": "^5.1.3" diff --git a/test/glue.test.js b/test/glue.test.js index 5745cb0..1c40e53 100644 --- a/test/glue.test.js +++ b/test/glue.test.js @@ -1,5 +1,7 @@ const test = require('node:test') const assert = require('node:assert') +const FS = require('node:fs') +const Path = require('node:path') const p = require('node:util').promisify const { createPeerMock } = require('./util') @@ -36,11 +38,58 @@ test('Glueing together stats with connections', async (t) => { assert.equal(entriesBefore.length, 0, 'there is no entry in peers()') const stagingResult = peer.net.stage(address, { mode: 'internet' }) - assert.equal(stagingResult, true, 'stage() should refuse') + assert.equal(stagingResult, true, 'stage() successful') const entriesAfter = await p(peer.net.peers())(null) assert.equal(entriesAfter.length, 1, 'there is one entry in peers()') assert.equal(entriesAfter[0][0], address, 'entry addr ok') assert.equal(entriesAfter[0][1].state, 'disconnected', 'entry state ok') }) + + await t.test('connect() will trigger stats persistence', async (t) => { + const peer = createPeerMock() + const address = TEST_ADDR + + const entriesBefore = await p(peer.net.peers())(null) + assert.equal(entriesBefore.length, 0, 'there is no entry in peers()') + + const rpc = await p(peer.net.connect)(address) + assert.ok(rpc, 'connect() successful') + + const statsJSONPath = Path.join(peer.mockDir, 'net', './stats.json') + while (FS.existsSync(statsJSONPath) === false) { + await p(setTimeout)(1) + } + const fileContents = FS.readFileSync(statsJSONPath, 'utf8') + const json = JSON.parse(fileContents) + assert.deepEqual(Object.keys(json), [TEST_ADDR]) + assert.deepEqual(Object.keys(json[TEST_ADDR]), ['stateChange']) + }) + + await t.test('forget() will remove stats', async (t) => { + const peer = createPeerMock() + const address = TEST_ADDR + + const entriesBefore = await p(peer.net.peers())(null) + assert.equal(entriesBefore.length, 0, 'there is no entry in peers()') + + const rpc = await p(peer.net.connect)(address) + assert.ok(rpc, 'connect() successful') + + const statsJSONPath = Path.join(peer.mockDir, 'net', './stats.json') + while (FS.existsSync(statsJSONPath) === false) { + await p(setTimeout)(1) + } + const fileContents = FS.readFileSync(statsJSONPath, 'utf8') + assert.equal(fileContents.length > 10, true, 'stats.json is not empty') + + peer.net.forget(address) + await p(setTimeout)(200) + + const entriesAfterForget = await p(peer.net.peers())(null) + assert.equal(entriesAfterForget.length, 0, 'there is no entry in peers()') + + const fileContents2 = FS.readFileSync(statsJSONPath, 'utf8') + assert.equal(fileContents2, '{}', 'stats.json is empty') + }) }) diff --git a/test/index.test.js b/test/index.test.js index 2a5e26e..6034706 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -64,18 +64,30 @@ test('net', async (t) => { pull( peer.net.listen(), pull.drain((ev) => { - ++i - if (i === 1) { - assert.equal(ev.type, 'connecting', 'event.type ok') - assert.equal(ev.address, TEST_ADDR, 'event.address ok') - } else if (i === 2) { - assert.equal(ev.type, 'connecting-failed', 'event.type ok') - assert.equal(ev.address, TEST_ADDR, 'event.address ok') - assert.ok(ev.details, 'event.details ok') - assert.equal(ev.details.code, 'ECONNREFUSED', 'event.details err') - resolve() - } else { - reject(new Error('too many emissions')) + try { + ++i + if (i === 1) { + assert.equal(ev.type, 'connecting', 'event.type ok') + assert.equal(ev.address, TEST_ADDR, 'event.address ok') + assert.equal(ev.parsedAddress.length, 1) + assert.equal(ev.parsedAddress[0].length, 2) + assert.deepEqual(ev.parsedAddress[0][0], { + name: 'net', + host: 'localhost', + port: 9752, + }) + assert.equal(ev.parsedAddress[0][1].name, 'shse') + } else if (i === 2) { + assert.equal(ev.type, 'connecting-failed', 'event.type ok') + assert.equal(ev.address, TEST_ADDR, 'event.address ok') + assert.ok(ev.details, 'event.details ok') + assert.equal(ev.details.code, 'ECONNREFUSED', 'event.details err') + queueMicrotask(resolve) + } else { + queueMicrotask(() => reject(new Error('too many emissions'))) + } + } catch (err) { + reject(err) } }) ) diff --git a/test/stats.test.js b/test/stats.test.js index 2371c19..4fe5a18 100644 --- a/test/stats.test.js +++ b/test/stats.test.js @@ -9,6 +9,10 @@ const Infos = require('../lib/infos') test('Stats', async (t) => { await t.test('Recovers from corrupted JSON file', async () => { const dirPath = Path.join(__dirname, './fixtures/corrupted') + const statsJSONPath = Path.join(dirPath, './stats.json') + const fileContents = FS.readFileSync(statsJSONPath, 'utf8') + console.log(fileContents); + const infos = new Infos() const stats = new Stats(dirPath, infos) assert.ok(stats, 'Stats instance was created') @@ -22,6 +26,10 @@ test('Stats', async (t) => { const [address, info] = entriesAfter[0] assert.equal(address, 'net:staltz.com:8008~noauth', 'the address looks ok') assert.equal(info.stats.source, 'stored', 'the info looks ok') + + stats.close() + await p(setTimeout)(50) + FS.writeFileSync(statsJSONPath, fileContents) }) await t.test('Creates JSON file when it is absent', async () => { diff --git a/test/util.js b/test/util.js index b87de50..c715c62 100644 --- a/test/util.js +++ b/test/util.js @@ -60,15 +60,30 @@ function createPeerMock() { setTimeout(() => { cb(null, { once: () => {}, + net: { + ping() { + return { + source: () => {}, + sink: () => {}, + } + }, + }, }) }, 200) }, once: () => {}, + multiserver: { + parse: () => [[{ name: 'net', host: 'localhost', port: 9752 }]], + }, + mockDir: testPath, } const mockConfig = { global: { path: testPath, }, + net: { + persistTimeout: 0, + } // shse: { caps } }