mirror of https://codeberg.org/pzp/pzp-net.git
interpool glue, parsedAddress, and forget()
This commit is contained in:
parent
a2521e0146
commit
b0e9039c21
|
@ -13,6 +13,7 @@ We're not on npm yet. In your package.json, include this as
|
||||||
- [x] connect
|
- [x] connect
|
||||||
- [x] stage
|
- [x] stage
|
||||||
- [x] stats.json
|
- [x] stats.json
|
||||||
- [ ] interpool glue
|
- [x] interpool glue
|
||||||
|
- [ ] writing hubs to stats.json
|
||||||
- [ ] firewall
|
- [ ] firewall
|
||||||
- [ ] scheduler
|
- [ ] scheduler
|
||||||
|
|
|
@ -1,7 +1,12 @@
|
||||||
type CB<T> = (...args: [Error] | [null, T]) => void
|
type CB<T> = (...args: [Error] | [null, T]) => void
|
||||||
|
|
||||||
declare module 'atomic-file-rw' {
|
declare module 'atomic-file-rw' {
|
||||||
export function readFile(path: string, encoding: string, cb: CB<string>): void;
|
export function readFile(path: string, encoding: string, cb: CB<string>): void
|
||||||
export function writeFile(path: string, data: string, encoding: string, cb: CB<string>): void;
|
export function writeFile(
|
||||||
export function deleteFile(path: string, cb: CB<null>): void;
|
path: string,
|
||||||
}
|
data: string,
|
||||||
|
encoding: string,
|
||||||
|
cb: CB<string>
|
||||||
|
): void
|
||||||
|
export function deleteFile(path: string, cb: CB<null>): void
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
declare module 'obz' {
|
||||||
|
type Remove = () => void
|
||||||
|
export interface Obz<X> {
|
||||||
|
(listener: (value: X) => void): Remove
|
||||||
|
set(value: X): unknown
|
||||||
|
value: X
|
||||||
|
}
|
||||||
|
function createObz(): Obz
|
||||||
|
export = createObz
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
declare module 'pull-cat' {
|
declare module 'pull-cat' {
|
||||||
function concat(...args: Array<any>): any;
|
function concat(...args: Array<any>): any
|
||||||
export = concat;
|
export = concat
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
declare module 'pull-notify' {
|
declare module 'pull-notify' {
|
||||||
interface Notify {
|
interface Notify {
|
||||||
(data: any): void;
|
(data: any): void
|
||||||
listen(): unknown;
|
listen(): unknown
|
||||||
end(): void;
|
end(): void
|
||||||
}
|
}
|
||||||
function CreateNotify(): Notify
|
function CreateNotify(): Notify
|
||||||
export = CreateNotify
|
export = CreateNotify
|
||||||
|
|
|
@ -1,5 +1,16 @@
|
||||||
|
|
||||||
declare module 'pull-ping' {
|
declare module 'pull-ping' {
|
||||||
function pullPing(opts: {timeout: number, serve?: boolean}): unknown;
|
type Abort = Error | boolean | null
|
||||||
export = pullPing;
|
type EndOrError = Abort
|
||||||
}
|
type SourceCallback<T> = (end: EndOrError, data?: T) => void
|
||||||
|
type Source = (endOrError: Abort, cb: SourceCallback<T>) => void
|
||||||
|
export interface PullPingDuplex {
|
||||||
|
(endOrError: Abort, cb: SourceCallback<T>): void
|
||||||
|
rtt: any
|
||||||
|
skew: any
|
||||||
|
}
|
||||||
|
function pullPing(
|
||||||
|
opts: { timeout: number; serve?: boolean },
|
||||||
|
cb?: CallableFunction
|
||||||
|
): PullPingDuplex
|
||||||
|
export = pullPing
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
declare module 'statistics' {
|
||||||
|
export interface Statistics {
|
||||||
|
mean: number
|
||||||
|
stdev: number
|
||||||
|
count: number
|
||||||
|
sum: number
|
||||||
|
sqsum: number
|
||||||
|
}
|
||||||
|
function stats(
|
||||||
|
x: Statistics | number | null | undefined,
|
||||||
|
y: number
|
||||||
|
): Statistics
|
||||||
|
export = stats
|
||||||
|
}
|
|
@ -2,7 +2,6 @@ const debug = require('debug')('ppppp:net:connections')
|
||||||
const createNotify = require('pull-notify')
|
const createNotify = require('pull-notify')
|
||||||
const run = require('promisify-tuple')
|
const run = require('promisify-tuple')
|
||||||
const IP = require('ip')
|
const IP = require('ip')
|
||||||
const msNetPlugin = require('multiserver/plugins/net')({})
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @typedef {import('./index').RpcConnectListener} RpcConnectListener
|
* @typedef {import('./index').RpcConnectListener} RpcConnectListener
|
||||||
|
@ -11,6 +10,22 @@ const msNetPlugin = require('multiserver/plugins/net')({})
|
||||||
* @typedef {import('./index').Peer} Peer
|
* @typedef {import('./index').Peer} Peer
|
||||||
* @typedef {import('./infos').Info} Info
|
* @typedef {import('./infos').Info} Info
|
||||||
* @typedef {import('./infos')} Infos
|
* @typedef {import('./infos')} Infos
|
||||||
|
* @typedef {{
|
||||||
|
* type:
|
||||||
|
* | 'connecting'
|
||||||
|
* | 'connected'
|
||||||
|
* | 'connecting-failed'
|
||||||
|
* | 'disconnecting'
|
||||||
|
* | 'disconnected';
|
||||||
|
* address: Address;
|
||||||
|
* parsedAddress: any;
|
||||||
|
* details?: any;
|
||||||
|
* }} ConnectionEvent
|
||||||
|
* @typedef {{
|
||||||
|
* (ev: ConnectionEvent): void;
|
||||||
|
* listen: () => import('pull-stream').Source<ConnectionEvent>;
|
||||||
|
* end: () => void;
|
||||||
|
* }} NotifyEvent
|
||||||
*/
|
*/
|
||||||
|
|
||||||
class Connections {
|
class Connections {
|
||||||
|
@ -20,7 +35,7 @@ class Connections {
|
||||||
#infos
|
#infos
|
||||||
/** @type {boolean} */
|
/** @type {boolean} */
|
||||||
#closed
|
#closed
|
||||||
/** @type {ReturnType<createNotify>} */
|
/** @type {NotifyEvent} */
|
||||||
#notifyEvent
|
#notifyEvent
|
||||||
/** @type {Map<Address, RPC>} */
|
/** @type {Map<Address, RPC>} */
|
||||||
#rpcs
|
#rpcs
|
||||||
|
@ -39,7 +54,7 @@ class Connections {
|
||||||
this.#peer = peer
|
this.#peer = peer
|
||||||
this.#infos = infos
|
this.#infos = infos
|
||||||
this.#closed = false
|
this.#closed = false
|
||||||
this.#notifyEvent = createNotify()
|
this.#notifyEvent = /**@type {any}*/ (createNotify())
|
||||||
this.#rpcs = new Map()
|
this.#rpcs = new Map()
|
||||||
this.#connectRetries = new Set()
|
this.#connectRetries = new Set()
|
||||||
|
|
||||||
|
@ -47,18 +62,15 @@ class Connections {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {Address} address
|
* @param {Array<any>} parsedAddress
|
||||||
* @returns {Info['inferredType']}
|
* @returns {Info['inferredType']}
|
||||||
*/
|
*/
|
||||||
static inferPeerType(address) {
|
static inferPeerType(parsedAddress) {
|
||||||
// TODO perhaps the `type` should be provided by each multiserver plugin?
|
for (const subParsed of parsedAddress) {
|
||||||
// like when multiserver plugins provide the `stream.address` to secret-stack
|
const [transport, transform] = subParsed
|
||||||
if (address.startsWith('tunnel:')) return 'tunnel'
|
if (transport.name === 'tunnel') return 'tunnel'
|
||||||
if (address.startsWith('net:')) {
|
if (transport.name === 'net') {
|
||||||
const netAddr = address.split('~')[0]
|
if (IP.isPrivate(transport.host)) return 'lan'
|
||||||
const parsed = msNetPlugin.parse(netAddr)
|
|
||||||
if (parsed?.host) {
|
|
||||||
if (IP.isPrivate(parsed.host)) return 'lan'
|
|
||||||
else return 'internet'
|
else return 'internet'
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -90,33 +102,50 @@ class Connections {
|
||||||
const initiator = weAreClient ? 'we' : 'they'
|
const initiator = weAreClient ? 'we' : 'they'
|
||||||
debug('Connected to %s, %s initiated it', address, initiator)
|
debug('Connected to %s, %s initiated it', address, initiator)
|
||||||
|
|
||||||
|
const parsedAddress = this.#peer.multiserver.parse(address)
|
||||||
this.#rpcs.set(address, rpc)
|
this.#rpcs.set(address, rpc)
|
||||||
rpc.once('closed', () => {
|
rpc.once('closed', () => {
|
||||||
debug('Disconnected from %s', address)
|
debug('Disconnected from %s', address)
|
||||||
this.#rpcs.delete(address)
|
this.#rpcs.delete(address)
|
||||||
this.#infos.update(address, { state: 'disconnected' })
|
this.#infos.update(address, { state: 'disconnected' })
|
||||||
this.#notifyEvent({ type: 'disconnected', address })
|
this.#notifyEvent({ type: 'disconnected', address, parsedAddress })
|
||||||
this.#infos.emit()
|
this.#infos.emit()
|
||||||
})
|
})
|
||||||
|
|
||||||
const state = /**@type {Info['state']}*/ ('connected')
|
const state = /**@type {Info['state']}*/ ('connected')
|
||||||
const inferredType = Connections.inferPeerType(address)
|
const inferredType = Connections.inferPeerType(parsedAddress)
|
||||||
this.#infos.update(address, { state, inferredType })
|
this.#infos.update(address, { state, inferredType })
|
||||||
this.#notifyEvent({
|
this.#notifyEvent({
|
||||||
type: state,
|
type: state,
|
||||||
address,
|
address,
|
||||||
|
parsedAddress,
|
||||||
details: { rpc, weAreClient },
|
details: { rpc, weAreClient },
|
||||||
})
|
})
|
||||||
this.#infos.emit()
|
this.#infos.emit()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TODO: We need to fix secret-stack so that it has a ".ready()" async method,
|
||||||
|
* after it has initialized multiserver. Then we can remove this hack.
|
||||||
|
*/
|
||||||
|
async #waitForSecretStackToInitMultiserver() {
|
||||||
|
for( let i = 0; i < 10; i++ ) {
|
||||||
|
try {
|
||||||
|
this.#peer.multiserver.parse('')
|
||||||
|
return
|
||||||
|
} catch (err) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 16))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {string} address
|
* @param {string} address
|
||||||
* @returns {Promise<RPC>}
|
* @returns {Promise<RPC>}
|
||||||
*/
|
*/
|
||||||
async connect(address) {
|
async connect(address) {
|
||||||
this.#assertNotClosed()
|
this.#assertNotClosed()
|
||||||
// this._assertValidAddress(address);
|
await this.#waitForSecretStackToInitMultiserver()
|
||||||
|
|
||||||
const prevInfo = this.#infos.get(address)
|
const prevInfo = this.#infos.get(address)
|
||||||
switch (prevInfo?.state ?? 'disconnected') {
|
switch (prevInfo?.state ?? 'disconnected') {
|
||||||
|
@ -155,8 +184,9 @@ class Connections {
|
||||||
case 'disconnected': {
|
case 'disconnected': {
|
||||||
debug('Connecting to %s', address)
|
debug('Connecting to %s', address)
|
||||||
const state = /**@type {Info['state']}*/ ('connecting')
|
const state = /**@type {Info['state']}*/ ('connecting')
|
||||||
|
const parsedAddress = this.#peer.multiserver.parse(address)
|
||||||
this.#infos.update(address, { state })
|
this.#infos.update(address, { state })
|
||||||
this.#notifyEvent({ type: state, address })
|
this.#notifyEvent({ type: state, address, parsedAddress })
|
||||||
this.#infos.emit()
|
this.#infos.emit()
|
||||||
|
|
||||||
const [err, rpc] = await run(this.#peer.connect)(address)
|
const [err, rpc] = await run(this.#peer.connect)(address)
|
||||||
|
@ -166,6 +196,7 @@ class Connections {
|
||||||
this.#notifyEvent({
|
this.#notifyEvent({
|
||||||
type: 'connecting-failed',
|
type: 'connecting-failed',
|
||||||
address,
|
address,
|
||||||
|
parsedAddress,
|
||||||
details: err,
|
details: err,
|
||||||
})
|
})
|
||||||
this.#infos.emit()
|
this.#infos.emit()
|
||||||
|
@ -201,6 +232,7 @@ class Connections {
|
||||||
*/
|
*/
|
||||||
async disconnect(address) {
|
async disconnect(address) {
|
||||||
this.#assertNotClosed()
|
this.#assertNotClosed()
|
||||||
|
await this.#waitForSecretStackToInitMultiserver()
|
||||||
const prevInfo = this.#infos.get(address)
|
const prevInfo = this.#infos.get(address)
|
||||||
if (!prevInfo || prevInfo?.state === 'disconnected') return false
|
if (!prevInfo || prevInfo?.state === 'disconnected') return false
|
||||||
if (prevInfo.state === 'disconnecting') return false
|
if (prevInfo.state === 'disconnecting') return false
|
||||||
|
@ -233,8 +265,9 @@ class Connections {
|
||||||
|
|
||||||
debug('Disconnecting from %s', address)
|
debug('Disconnecting from %s', address)
|
||||||
const state = /**@type {Info['state']}*/ ('disconnecting')
|
const state = /**@type {Info['state']}*/ ('disconnecting')
|
||||||
|
const parsedAddress = this.#peer.multiserver.parse(address)
|
||||||
this.#infos.update(address, { state })
|
this.#infos.update(address, { state })
|
||||||
this.#notifyEvent({ type: state, address })
|
this.#notifyEvent({ type: state, address, parsedAddress })
|
||||||
this.#infos.emit()
|
this.#infos.emit()
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
await run(rpc.close)(true)
|
await run(rpc.close)(true)
|
||||||
|
@ -250,9 +283,12 @@ class Connections {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @returns {import('pull-stream').Source<ConnectionEvent>}
|
||||||
|
*/
|
||||||
listen() {
|
listen() {
|
||||||
this.#assertNotClosed()
|
this.#assertNotClosed()
|
||||||
return this.#notifyEvent.listen()
|
return /**@type {any}*/ (this.#notifyEvent.listen())
|
||||||
}
|
}
|
||||||
|
|
||||||
reset() {
|
reset() {
|
||||||
|
|
|
@ -0,0 +1,114 @@
|
||||||
|
const pull = require('pull-stream')
|
||||||
|
const stats = require('statistics')
|
||||||
|
const ping = require('pull-ping')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef {import('./index').Address} Address
|
||||||
|
* @typedef {import('./index').RPC} RPC
|
||||||
|
* @typedef {import('./index').Peer} Peer
|
||||||
|
* @typedef {import('./connections')} Connections
|
||||||
|
* @typedef {import('./connections').ConnectionEvent} Event
|
||||||
|
* @typedef {import('./infos')} Infos
|
||||||
|
*/
|
||||||
|
|
||||||
|
const PROGRAM_STARTUP = Date.now()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Infos} infos
|
||||||
|
* @param {Connections} connections
|
||||||
|
*/
|
||||||
|
function glue(infos, connections) {
|
||||||
|
/**
|
||||||
|
* @param {Address} address
|
||||||
|
* @param {RPC} rpc
|
||||||
|
*/
|
||||||
|
function setupPing(address, rpc) {
|
||||||
|
const PING_TIMEOUT = 5 * 6e4 // 5 minutes
|
||||||
|
const pp = ping({ serve: true, timeout: PING_TIMEOUT }, () => {})
|
||||||
|
infos.updateStats(address, () => ({
|
||||||
|
ping: {
|
||||||
|
rtt: pp.rtt,
|
||||||
|
skew: pp.skew,
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
pull(
|
||||||
|
pp,
|
||||||
|
rpc.net.ping({ timeout: PING_TIMEOUT }, (err, _) => {
|
||||||
|
console.warn('remote peer ping err', err)
|
||||||
|
// if (err?.name === 'TypeError') {
|
||||||
|
// infos.update(address, {stats: {ping: {fail: true}}});
|
||||||
|
// }
|
||||||
|
}),
|
||||||
|
pp
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Event} ev
|
||||||
|
*/
|
||||||
|
function onConnectingFailed(ev) {
|
||||||
|
infos.updateStats(ev.address, (prevStats) => ({
|
||||||
|
failure: (prevStats?.failure ?? 0) + 1,
|
||||||
|
stateChange: Date.now(),
|
||||||
|
duration: stats(prevStats?.duration, 0),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Event} ev
|
||||||
|
*/
|
||||||
|
function onConnected(ev) {
|
||||||
|
infos.updateStats(ev.address, () => ({
|
||||||
|
stateChange: Date.now(),
|
||||||
|
failure: 0,
|
||||||
|
}))
|
||||||
|
if (ev.details.weAreClient) setupPing(ev.address, ev.details.rpc)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Event} ev
|
||||||
|
*/
|
||||||
|
function bumpStateChange(ev) {
|
||||||
|
infos.updateStats(ev.address, () => ({
|
||||||
|
stateChange: Date.now(),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Event} ev
|
||||||
|
*/
|
||||||
|
function onDisconnected(ev) {
|
||||||
|
infos.updateStats(ev.address, (prevStats) => ({
|
||||||
|
stateChange: Date.now(),
|
||||||
|
duration: stats(
|
||||||
|
prevStats?.duration,
|
||||||
|
Date.now() - (prevStats?.stateChange ?? PROGRAM_STARTUP)
|
||||||
|
),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
pull(
|
||||||
|
connections.listen(),
|
||||||
|
pull.drain((ev) => {
|
||||||
|
switch (ev.type) {
|
||||||
|
case 'connecting':
|
||||||
|
case 'disconnecting':
|
||||||
|
bumpStateChange(ev)
|
||||||
|
break
|
||||||
|
case 'connecting-failed':
|
||||||
|
onConnectingFailed(ev)
|
||||||
|
break
|
||||||
|
case 'connected':
|
||||||
|
onConnected(ev)
|
||||||
|
break
|
||||||
|
case 'disconnected':
|
||||||
|
onDisconnected(ev)
|
||||||
|
break
|
||||||
|
default:
|
||||||
|
throw new Error('Unknown connection event type: ' + ev.type)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = glue
|
22
lib/index.js
22
lib/index.js
|
@ -4,8 +4,11 @@ const Infos = require('./infos')
|
||||||
const Stats = require('./stats')
|
const Stats = require('./stats')
|
||||||
const Connections = require('./connections')
|
const Connections = require('./connections')
|
||||||
const Scheduler = require('./scheduler')
|
const Scheduler = require('./scheduler')
|
||||||
|
const glue = require('./glue')
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @typedef {import('pull-stream').Duplex<unknown, unknown>} Duplex
|
||||||
|
* @typedef {import('./connections').ConnectionEvent} ConnectionEvent
|
||||||
* @typedef {string} Address
|
* @typedef {string} Address
|
||||||
* @typedef {(rpc: RPC, weAreClient: boolean) => void} RpcConnectListener
|
* @typedef {(rpc: RPC, weAreClient: boolean) => void} RpcConnectListener
|
||||||
* @typedef {{
|
* @typedef {{
|
||||||
|
@ -18,6 +21,13 @@ const Scheduler = require('./scheduler')
|
||||||
* once(event: 'closed', cb: CB<void>): void;
|
* once(event: 'closed', cb: CB<void>): void;
|
||||||
* addListener(event: 'rpc:connect', listener: RpcConnectListener): void;
|
* addListener(event: 'rpc:connect', listener: RpcConnectListener): void;
|
||||||
* removeListener(event: 'rpc:connect', listener: RpcConnectListener): void;
|
* removeListener(event: 'rpc:connect', listener: RpcConnectListener): void;
|
||||||
|
* multiserver: {
|
||||||
|
* parse(address: string): any
|
||||||
|
* },
|
||||||
|
* net: {
|
||||||
|
* ping(opts: {timeout: number}, cb: CB<void>): Duplex;
|
||||||
|
* listen(): import('pull-stream').Source<ConnectionEvent>;
|
||||||
|
* },
|
||||||
* }} Peer
|
* }} Peer
|
||||||
* @typedef {Peer & {stream: {address: string}}} RPC
|
* @typedef {Peer & {stream: {address: string}}} RPC
|
||||||
* @typedef {{
|
* @typedef {{
|
||||||
|
@ -78,6 +88,7 @@ function initNet(peer, config) {
|
||||||
|
|
||||||
async function start() {
|
async function start() {
|
||||||
await stats.loaded()
|
await stats.loaded()
|
||||||
|
glue(infos, connections)
|
||||||
queueMicrotask(scheduler.start.bind(scheduler))
|
queueMicrotask(scheduler.start.bind(scheduler))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,6 +132,15 @@ function initNet(peer, config) {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Address} address
|
||||||
|
*/
|
||||||
|
function forget(address) {
|
||||||
|
disconnect(address, () => {
|
||||||
|
infos.remove(address)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {Address} address
|
* @param {Address} address
|
||||||
* @param {Info} info
|
* @param {Info} info
|
||||||
|
@ -152,6 +172,7 @@ function initNet(peer, config) {
|
||||||
stage,
|
stage,
|
||||||
connect,
|
connect,
|
||||||
disconnect,
|
disconnect,
|
||||||
|
forget,
|
||||||
updateInfo,
|
updateInfo,
|
||||||
listen,
|
listen,
|
||||||
peers,
|
peers,
|
||||||
|
@ -164,6 +185,7 @@ exports.manifest = {
|
||||||
start: 'sync',
|
start: 'sync',
|
||||||
stop: 'sync',
|
stop: 'sync',
|
||||||
stage: 'sync',
|
stage: 'sync',
|
||||||
|
forget: 'sync',
|
||||||
connect: 'async',
|
connect: 'async',
|
||||||
disconnect: 'async',
|
disconnect: 'async',
|
||||||
listen: 'source',
|
listen: 'source',
|
||||||
|
|
59
lib/infos.js
59
lib/infos.js
|
@ -1,14 +1,18 @@
|
||||||
const createNotify = require('pull-notify')
|
const createNotify = require('pull-notify')
|
||||||
const pullConcat = require('pull-cat')
|
const pullConcat = require('pull-cat')
|
||||||
const pull = require('pull-stream')
|
const pull = require('pull-stream')
|
||||||
|
const Obz = require('obz')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @template T
|
||||||
|
* @typedef {import('obz').Obz<T>} Obz
|
||||||
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @typedef {import('./index').Address} Address
|
* @typedef {import('./index').Address} Address
|
||||||
* @typedef {import('./stats').StatsInfo} StatsInfo
|
* @typedef {import('./stats').StatsInfo} StatsInfo
|
||||||
* @typedef {{
|
* @typedef {{
|
||||||
* state: 'connected' | 'disconnected' | 'connecting' | 'disconnecting',
|
* state: 'connected' | 'disconnected' | 'connecting' | 'disconnecting',
|
||||||
* connBirth?: number,
|
|
||||||
* connUpdated?: number,
|
|
||||||
* inferredType?: 'internet' | 'lan' | 'tunnel' | undefined;
|
* inferredType?: 'internet' | 'lan' | 'tunnel' | undefined;
|
||||||
* stats?: StatsInfo
|
* stats?: StatsInfo
|
||||||
* }} Info
|
* }} Info
|
||||||
|
@ -19,10 +23,13 @@ class Infos {
|
||||||
#map
|
#map
|
||||||
/** @type {ReturnType<createNotify>} */
|
/** @type {ReturnType<createNotify>} */
|
||||||
#notify
|
#notify
|
||||||
|
/** @type {Obz<Address>} */
|
||||||
|
#onStatsUpdated
|
||||||
|
|
||||||
constructor() {
|
constructor() {
|
||||||
this.#map = new Map()
|
this.#map = new Map()
|
||||||
this.#notify = createNotify()
|
this.#notify = createNotify()
|
||||||
|
this.#onStatsUpdated = Obz()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -47,25 +54,55 @@ class Infos {
|
||||||
* @returns {void}
|
* @returns {void}
|
||||||
*/
|
*/
|
||||||
update(address, info) {
|
update(address, info) {
|
||||||
const now = Date.now()
|
const hasNewStats = !!info.stats
|
||||||
const connUpdated = now // FIXME: not just conn
|
|
||||||
const prevInfo = this.#map.get(address)
|
const prevInfo = this.#map.get(address)
|
||||||
if (prevInfo) {
|
if (prevInfo) {
|
||||||
for (const key of Object.keys(info)) {
|
for (const key of Object.keys(info)) {
|
||||||
const k = /**@type {keyof Info}*/ (key)
|
const k = /**@type {keyof Info}*/ (key)
|
||||||
if (typeof info[k] === 'undefined') delete info[k]
|
if (typeof info[k] === 'undefined') delete info[k]
|
||||||
}
|
}
|
||||||
this.#map.set(address, { ...prevInfo, connUpdated, ...info })
|
this.#map.set(address, { ...prevInfo, ...info })
|
||||||
} else if (!info.state) {
|
} else if (!info.state) {
|
||||||
this.#map.set(address, { ...info, state: 'disconnected' })
|
this.#map.set(address, { ...info, state: 'disconnected' })
|
||||||
} else {
|
} else {
|
||||||
const connBirth = now
|
this.#map.set(address, /**@type {Info}*/ (info))
|
||||||
this.#map.set(address, {
|
|
||||||
.../**@type {Info}*/ (info),
|
|
||||||
connBirth,
|
|
||||||
connUpdated,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
if (hasNewStats) {
|
||||||
|
this.#onStatsUpdated.set(address)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Address} address
|
||||||
|
* @param {(prevStats: Partial<Info['stats']>) => Partial<Info['stats']>} getStats
|
||||||
|
* @returns {void}
|
||||||
|
*/
|
||||||
|
updateStats(address, getStats) {
|
||||||
|
const prevInfo = this.#map.get(address)
|
||||||
|
if (!prevInfo) return
|
||||||
|
this.#map.set(address, {
|
||||||
|
...prevInfo,
|
||||||
|
stats: {
|
||||||
|
...prevInfo?.stats,
|
||||||
|
...getStats(prevInfo?.stats),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
this.#onStatsUpdated.set(address)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Parameters<Obz<Address>>[0]} listener
|
||||||
|
*/
|
||||||
|
onStatsUpdated(listener) {
|
||||||
|
return this.#onStatsUpdated(listener)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Address} address
|
||||||
|
*/
|
||||||
|
remove(address) {
|
||||||
|
this.#map.delete(address)
|
||||||
|
this.#onStatsUpdated.set(address)
|
||||||
}
|
}
|
||||||
|
|
||||||
size() {
|
size() {
|
||||||
|
|
53
lib/stats.js
53
lib/stats.js
|
@ -6,13 +6,7 @@ const atomic = require('atomic-file-rw')
|
||||||
/**
|
/**
|
||||||
* @typedef {import('./index').Address} Address
|
* @typedef {import('./index').Address} Address
|
||||||
* @typedef {import('./infos')} Infos
|
* @typedef {import('./infos')} Infos
|
||||||
* @typedef {{
|
* @typedef {import('statistics').Statistics} Statistics
|
||||||
* mean: number;
|
|
||||||
* stdev: number;
|
|
||||||
* count: number;
|
|
||||||
* sum: number;
|
|
||||||
* sqsum: number;
|
|
||||||
* }} Statistics
|
|
||||||
* @typedef {{
|
* @typedef {{
|
||||||
* birth?: number;
|
* birth?: number;
|
||||||
* key?: string;
|
* key?: string;
|
||||||
|
@ -91,6 +85,8 @@ class Stats {
|
||||||
/** @type {(reason: any) => void} */
|
/** @type {(reason: any) => void} */
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
#loadedReject
|
#loadedReject
|
||||||
|
/** @type {NodeJS.Timeout | null} */
|
||||||
|
#scheduledWriteTask
|
||||||
|
|
||||||
static FILENAME = 'stats.json'
|
static FILENAME = 'stats.json'
|
||||||
static DEFAULT_PERSIST_TIMEOUT = 2000
|
static DEFAULT_PERSIST_TIMEOUT = 2000
|
||||||
|
@ -109,6 +105,7 @@ class Stats {
|
||||||
this.#loadedResolve = resolve
|
this.#loadedResolve = resolve
|
||||||
this.#loadedReject = reject
|
this.#loadedReject = reject
|
||||||
})
|
})
|
||||||
|
this.#scheduledWriteTask = null
|
||||||
|
|
||||||
this.#readFromDisk(this.#path, (err, fileContents) => {
|
this.#readFromDisk(this.#path, (err, fileContents) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
|
@ -130,6 +127,10 @@ class Stats {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
this.#infos.onStatsUpdated(() => {
|
||||||
|
if (!this.#closed) this.#scheduleWrite()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -150,12 +151,34 @@ class Stats {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#cancelScheduleWrite() {
|
||||||
|
if (this.#scheduledWriteTask) {
|
||||||
|
clearTimeout(this.#scheduledWriteTask)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#scheduleWrite() {
|
||||||
|
if (this.#persistTimeout === 0) {
|
||||||
|
this.#writeToDisk()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
this.#cancelScheduleWrite()
|
||||||
|
this.#scheduledWriteTask = setTimeout(() => {
|
||||||
|
this.#writeToDisk((err, _) => {
|
||||||
|
if (err) {
|
||||||
|
console.error(`Failed to write to disk ${Stats.FILENAME}`, err)
|
||||||
|
}
|
||||||
|
this.#scheduledWriteTask = null
|
||||||
|
})
|
||||||
|
}, this.#persistTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {CB<unknown>=} cb
|
* @param {CB<unknown>=} cb
|
||||||
* @returns {void}
|
* @returns {void}
|
||||||
*/
|
*/
|
||||||
#writeToDisk(cb) {
|
#writeToDisk(cb) {
|
||||||
if (this.#infos.size() === 0) return
|
|
||||||
debug(`Begun serializing and writing ${Stats.FILENAME}`)
|
debug(`Begun serializing and writing ${Stats.FILENAME}`)
|
||||||
const record = /**@type {Record<Address, StatsInfo>}*/ ({})
|
const record = /**@type {Record<Address, StatsInfo>}*/ ({})
|
||||||
for (let [address, info] of this.#infos.entries()) {
|
for (let [address, info] of this.#infos.entries()) {
|
||||||
|
@ -172,15 +195,11 @@ class Stats {
|
||||||
}
|
}
|
||||||
|
|
||||||
close() {
|
close() {
|
||||||
this.#closed = true;
|
this.#closed = true
|
||||||
// FIXME: implement
|
this.#cancelScheduleWrite()
|
||||||
// this._cancelScheduleWrite();
|
this.#writeToDisk()
|
||||||
// this._write();
|
;/**@type {any}*/ (this).#infos = void 0
|
||||||
// this._map?.clear();
|
debug('Closed the Stats instance')
|
||||||
// (this as any)._map = void 0;
|
|
||||||
// (this as any)._notify = void 0;
|
|
||||||
// (this as any)._stateFile = void 0;
|
|
||||||
debug('Closed the Stats instance');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -31,7 +31,7 @@
|
||||||
"debug": "^4.3.2",
|
"debug": "^4.3.2",
|
||||||
"has-network2": ">=0.0.3",
|
"has-network2": ">=0.0.3",
|
||||||
"ip": "^1.1.5",
|
"ip": "^1.1.5",
|
||||||
"multiserver": "3",
|
"obz": "~1.1.0",
|
||||||
"on-change-network-strict": "1.0.0",
|
"on-change-network-strict": "1.0.0",
|
||||||
"on-wakeup": "^1.0.1",
|
"on-wakeup": "^1.0.1",
|
||||||
"promisify-tuple": "^1.0.1",
|
"promisify-tuple": "^1.0.1",
|
||||||
|
@ -50,14 +50,11 @@
|
||||||
"bs58": "^5.0.0",
|
"bs58": "^5.0.0",
|
||||||
"c8": "7",
|
"c8": "7",
|
||||||
"ppppp-caps": "github:staltz/ppppp-caps",
|
"ppppp-caps": "github:staltz/ppppp-caps",
|
||||||
"ppppp-db": "github:staltz/ppppp-db",
|
|
||||||
"ppppp-dict": "github:staltz/ppppp-dict",
|
|
||||||
"ppppp-keypair": "github:staltz/ppppp-keypair",
|
"ppppp-keypair": "github:staltz/ppppp-keypair",
|
||||||
"ppppp-set": "github:staltz/ppppp-set",
|
|
||||||
"prettier": "^2.6.2",
|
"prettier": "^2.6.2",
|
||||||
"pretty-quick": "^3.1.3",
|
"pretty-quick": "^3.1.3",
|
||||||
"rimraf": "^4.4.0",
|
"rimraf": "^4.4.0",
|
||||||
"secret-handshake-ext": "0.0.11",
|
"secret-handshake-ext": "0.0.12",
|
||||||
"secret-stack": "~8.1.0",
|
"secret-stack": "~8.1.0",
|
||||||
"ssb-box": "^1.0.1",
|
"ssb-box": "^1.0.1",
|
||||||
"typescript": "^5.1.3"
|
"typescript": "^5.1.3"
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
const test = require('node:test')
|
const test = require('node:test')
|
||||||
const assert = require('node:assert')
|
const assert = require('node:assert')
|
||||||
|
const FS = require('node:fs')
|
||||||
|
const Path = require('node:path')
|
||||||
const p = require('node:util').promisify
|
const p = require('node:util').promisify
|
||||||
const { createPeerMock } = require('./util')
|
const { createPeerMock } = require('./util')
|
||||||
|
|
||||||
|
@ -36,11 +38,58 @@ test('Glueing together stats with connections', async (t) => {
|
||||||
assert.equal(entriesBefore.length, 0, 'there is no entry in peers()')
|
assert.equal(entriesBefore.length, 0, 'there is no entry in peers()')
|
||||||
|
|
||||||
const stagingResult = peer.net.stage(address, { mode: 'internet' })
|
const stagingResult = peer.net.stage(address, { mode: 'internet' })
|
||||||
assert.equal(stagingResult, true, 'stage() should refuse')
|
assert.equal(stagingResult, true, 'stage() successful')
|
||||||
|
|
||||||
const entriesAfter = await p(peer.net.peers())(null)
|
const entriesAfter = await p(peer.net.peers())(null)
|
||||||
assert.equal(entriesAfter.length, 1, 'there is one entry in peers()')
|
assert.equal(entriesAfter.length, 1, 'there is one entry in peers()')
|
||||||
assert.equal(entriesAfter[0][0], address, 'entry addr ok')
|
assert.equal(entriesAfter[0][0], address, 'entry addr ok')
|
||||||
assert.equal(entriesAfter[0][1].state, 'disconnected', 'entry state ok')
|
assert.equal(entriesAfter[0][1].state, 'disconnected', 'entry state ok')
|
||||||
})
|
})
|
||||||
|
|
||||||
|
await t.test('connect() will trigger stats persistence', async (t) => {
|
||||||
|
const peer = createPeerMock()
|
||||||
|
const address = TEST_ADDR
|
||||||
|
|
||||||
|
const entriesBefore = await p(peer.net.peers())(null)
|
||||||
|
assert.equal(entriesBefore.length, 0, 'there is no entry in peers()')
|
||||||
|
|
||||||
|
const rpc = await p(peer.net.connect)(address)
|
||||||
|
assert.ok(rpc, 'connect() successful')
|
||||||
|
|
||||||
|
const statsJSONPath = Path.join(peer.mockDir, 'net', './stats.json')
|
||||||
|
while (FS.existsSync(statsJSONPath) === false) {
|
||||||
|
await p(setTimeout)(1)
|
||||||
|
}
|
||||||
|
const fileContents = FS.readFileSync(statsJSONPath, 'utf8')
|
||||||
|
const json = JSON.parse(fileContents)
|
||||||
|
assert.deepEqual(Object.keys(json), [TEST_ADDR])
|
||||||
|
assert.deepEqual(Object.keys(json[TEST_ADDR]), ['stateChange'])
|
||||||
|
})
|
||||||
|
|
||||||
|
await t.test('forget() will remove stats', async (t) => {
|
||||||
|
const peer = createPeerMock()
|
||||||
|
const address = TEST_ADDR
|
||||||
|
|
||||||
|
const entriesBefore = await p(peer.net.peers())(null)
|
||||||
|
assert.equal(entriesBefore.length, 0, 'there is no entry in peers()')
|
||||||
|
|
||||||
|
const rpc = await p(peer.net.connect)(address)
|
||||||
|
assert.ok(rpc, 'connect() successful')
|
||||||
|
|
||||||
|
const statsJSONPath = Path.join(peer.mockDir, 'net', './stats.json')
|
||||||
|
while (FS.existsSync(statsJSONPath) === false) {
|
||||||
|
await p(setTimeout)(1)
|
||||||
|
}
|
||||||
|
const fileContents = FS.readFileSync(statsJSONPath, 'utf8')
|
||||||
|
assert.equal(fileContents.length > 10, true, 'stats.json is not empty')
|
||||||
|
|
||||||
|
peer.net.forget(address)
|
||||||
|
await p(setTimeout)(200)
|
||||||
|
|
||||||
|
const entriesAfterForget = await p(peer.net.peers())(null)
|
||||||
|
assert.equal(entriesAfterForget.length, 0, 'there is no entry in peers()')
|
||||||
|
|
||||||
|
const fileContents2 = FS.readFileSync(statsJSONPath, 'utf8')
|
||||||
|
assert.equal(fileContents2, '{}', 'stats.json is empty')
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -64,18 +64,30 @@ test('net', async (t) => {
|
||||||
pull(
|
pull(
|
||||||
peer.net.listen(),
|
peer.net.listen(),
|
||||||
pull.drain((ev) => {
|
pull.drain((ev) => {
|
||||||
++i
|
try {
|
||||||
if (i === 1) {
|
++i
|
||||||
assert.equal(ev.type, 'connecting', 'event.type ok')
|
if (i === 1) {
|
||||||
assert.equal(ev.address, TEST_ADDR, 'event.address ok')
|
assert.equal(ev.type, 'connecting', 'event.type ok')
|
||||||
} else if (i === 2) {
|
assert.equal(ev.address, TEST_ADDR, 'event.address ok')
|
||||||
assert.equal(ev.type, 'connecting-failed', 'event.type ok')
|
assert.equal(ev.parsedAddress.length, 1)
|
||||||
assert.equal(ev.address, TEST_ADDR, 'event.address ok')
|
assert.equal(ev.parsedAddress[0].length, 2)
|
||||||
assert.ok(ev.details, 'event.details ok')
|
assert.deepEqual(ev.parsedAddress[0][0], {
|
||||||
assert.equal(ev.details.code, 'ECONNREFUSED', 'event.details err')
|
name: 'net',
|
||||||
resolve()
|
host: 'localhost',
|
||||||
} else {
|
port: 9752,
|
||||||
reject(new Error('too many emissions'))
|
})
|
||||||
|
assert.equal(ev.parsedAddress[0][1].name, 'shse')
|
||||||
|
} else if (i === 2) {
|
||||||
|
assert.equal(ev.type, 'connecting-failed', 'event.type ok')
|
||||||
|
assert.equal(ev.address, TEST_ADDR, 'event.address ok')
|
||||||
|
assert.ok(ev.details, 'event.details ok')
|
||||||
|
assert.equal(ev.details.code, 'ECONNREFUSED', 'event.details err')
|
||||||
|
queueMicrotask(resolve)
|
||||||
|
} else {
|
||||||
|
queueMicrotask(() => reject(new Error('too many emissions')))
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
reject(err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
|
@ -9,6 +9,10 @@ const Infos = require('../lib/infos')
|
||||||
test('Stats', async (t) => {
|
test('Stats', async (t) => {
|
||||||
await t.test('Recovers from corrupted JSON file', async () => {
|
await t.test('Recovers from corrupted JSON file', async () => {
|
||||||
const dirPath = Path.join(__dirname, './fixtures/corrupted')
|
const dirPath = Path.join(__dirname, './fixtures/corrupted')
|
||||||
|
const statsJSONPath = Path.join(dirPath, './stats.json')
|
||||||
|
const fileContents = FS.readFileSync(statsJSONPath, 'utf8')
|
||||||
|
console.log(fileContents);
|
||||||
|
|
||||||
const infos = new Infos()
|
const infos = new Infos()
|
||||||
const stats = new Stats(dirPath, infos)
|
const stats = new Stats(dirPath, infos)
|
||||||
assert.ok(stats, 'Stats instance was created')
|
assert.ok(stats, 'Stats instance was created')
|
||||||
|
@ -22,6 +26,10 @@ test('Stats', async (t) => {
|
||||||
const [address, info] = entriesAfter[0]
|
const [address, info] = entriesAfter[0]
|
||||||
assert.equal(address, 'net:staltz.com:8008~noauth', 'the address looks ok')
|
assert.equal(address, 'net:staltz.com:8008~noauth', 'the address looks ok')
|
||||||
assert.equal(info.stats.source, 'stored', 'the info looks ok')
|
assert.equal(info.stats.source, 'stored', 'the info looks ok')
|
||||||
|
|
||||||
|
stats.close()
|
||||||
|
await p(setTimeout)(50)
|
||||||
|
FS.writeFileSync(statsJSONPath, fileContents)
|
||||||
})
|
})
|
||||||
|
|
||||||
await t.test('Creates JSON file when it is absent', async () => {
|
await t.test('Creates JSON file when it is absent', async () => {
|
||||||
|
|
15
test/util.js
15
test/util.js
|
@ -60,15 +60,30 @@ function createPeerMock() {
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
cb(null, {
|
cb(null, {
|
||||||
once: () => {},
|
once: () => {},
|
||||||
|
net: {
|
||||||
|
ping() {
|
||||||
|
return {
|
||||||
|
source: () => {},
|
||||||
|
sink: () => {},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}, 200)
|
}, 200)
|
||||||
},
|
},
|
||||||
once: () => {},
|
once: () => {},
|
||||||
|
multiserver: {
|
||||||
|
parse: () => [[{ name: 'net', host: 'localhost', port: 9752 }]],
|
||||||
|
},
|
||||||
|
mockDir: testPath,
|
||||||
}
|
}
|
||||||
const mockConfig = {
|
const mockConfig = {
|
||||||
global: {
|
global: {
|
||||||
path: testPath,
|
path: testPath,
|
||||||
},
|
},
|
||||||
|
net: {
|
||||||
|
persistTimeout: 0,
|
||||||
|
}
|
||||||
// shse: { caps }
|
// shse: { caps }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue