mirror of https://codeberg.org/pzp/pzp-net.git
315 lines
8.8 KiB
JavaScript
315 lines
8.8 KiB
JavaScript
const debug = require('debug')('ppppp:net:connections')
|
|
const createNotify = require('pull-notify')
|
|
const run = require('promisify-tuple')
|
|
const IP = require('ip')
|
|
const Multiaddr = require('./multiaddr')
|
|
|
|
/**
|
|
* @typedef {import('./index').RpcConnectListener} RpcConnectListener
|
|
* @typedef {import('./index').Multiaddr} Multiaddr
|
|
* @typedef {import('./index').RPC} RPC
|
|
* @typedef {import('./index').Peer} Peer
|
|
* @typedef {import('./infos').Info} Info
|
|
* @typedef {import('./infos')} Infos
|
|
* @typedef {{
|
|
* type:
|
|
* | 'connecting'
|
|
* | 'connected'
|
|
* | 'connecting-failed'
|
|
* | 'disconnecting'
|
|
* | 'disconnected';
|
|
* multiaddr: Multiaddr;
|
|
* pubkey: string | undefined;
|
|
* details?: any;
|
|
* }} ConnectionEvent
|
|
* @typedef {{
|
|
* (ev: ConnectionEvent): void;
|
|
* listen: () => import('pull-stream').Source<ConnectionEvent>;
|
|
* end: () => void;
|
|
* }} NotifyEvent
|
|
*/
|
|
|
|
class Connections {
|
|
/** @type {Peer} */
|
|
#peer
|
|
/** @type {Infos} */
|
|
#infos
|
|
/** @type {boolean} */
|
|
#closed
|
|
/** @type {NotifyEvent} */
|
|
#notifyEvent
|
|
/** @type {Map<Multiaddr, RPC>} */
|
|
#rpcs
|
|
|
|
/**
|
|
* Used only to schedule a connect when a disconnect is in progress.
|
|
* @type {Set<Multiaddr>}
|
|
*/
|
|
#connectRetries
|
|
|
|
/**
|
|
* @param {Peer} peer
|
|
* @param {Infos} infos
|
|
*/
|
|
constructor(peer, infos) {
|
|
this.#peer = peer
|
|
this.#infos = infos
|
|
this.#closed = false
|
|
this.#notifyEvent = /**@type {any}*/ (createNotify())
|
|
this.#rpcs = new Map()
|
|
this.#connectRetries = new Set()
|
|
|
|
this.#peer.addListener('rpc:connect', this.#onRpcConnect)
|
|
}
|
|
|
|
/**
|
|
* @param {string} address
|
|
* @returns {Info['inferredType']}
|
|
*/
|
|
static inferPeerType(address) {
|
|
if (address.startsWith('tunnel:')) return 'tunnel'
|
|
if (address.startsWith('net:')) {
|
|
const netAddr = address.split('~')[0]
|
|
const [, host, port] = netAddr.split(':')
|
|
if (host) {
|
|
if (IP.isPrivate(host)) return 'lan'
|
|
else return 'internet'
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
/**
|
|
* @param {string} addresses
|
|
* @returns {string | undefined}
|
|
*/
|
|
static extractSHSEPubkey(addresses) {
|
|
for (const address of addresses.split(';')) {
|
|
const [transport, transform] = address.split('~')
|
|
const [name, pubkey, extra] = transform.split(':')
|
|
if (name === 'shse') {
|
|
return pubkey
|
|
}
|
|
}
|
|
}
|
|
|
|
#assertNotClosed() {
|
|
if (this.#closed) {
|
|
throw new Error('This Connections instance is closed, create a new one.')
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @type {RpcConnectListener}
|
|
*/
|
|
#onRpcConnect = (rpc, weAreClient) => {
|
|
// Don't process self connections, whatever that means:
|
|
if (rpc.shse.pubkey === this.#peer.shse.pubkey) return
|
|
// This branch is already handled by this.connect()
|
|
if (weAreClient) return
|
|
|
|
this.#prepareConnectedRPC(rpc.stream.address, rpc, weAreClient)
|
|
}
|
|
|
|
/**
|
|
* @type {(address: string, rpc: RPC, weAreClient: boolean) => void}
|
|
*/
|
|
#prepareConnectedRPC = (address, rpc, weAreClient) => {
|
|
const initiator = weAreClient ? 'we' : 'they'
|
|
const multiaddr = Multiaddr.fromMs(address)
|
|
debug('Connected to %s, %s initiated it', multiaddr, initiator)
|
|
|
|
const pubkey = Connections.extractSHSEPubkey(address)
|
|
this.#rpcs.set(multiaddr, rpc)
|
|
rpc.once('closed', () => {
|
|
debug('Disconnected from %s', multiaddr)
|
|
this.#rpcs.delete(multiaddr)
|
|
this.#infos.update(multiaddr, { state: 'disconnected' })
|
|
this.#notifyEvent({ type: 'disconnected', multiaddr, pubkey })
|
|
this.#infos.emit()
|
|
})
|
|
|
|
const state = /**@type {Info['state']}*/ ('connected')
|
|
const inferredType = Connections.inferPeerType(address)
|
|
this.#infos.update(multiaddr, { state, inferredType })
|
|
this.#notifyEvent({
|
|
type: state,
|
|
multiaddr,
|
|
pubkey,
|
|
details: { rpc, weAreClient },
|
|
})
|
|
this.#infos.emit()
|
|
}
|
|
|
|
/**
|
|
* @param {Multiaddr} multiaddr
|
|
* @returns {Promise<RPC>}
|
|
*/
|
|
async connect(multiaddr) {
|
|
this.#assertNotClosed()
|
|
|
|
const address = Multiaddr.toMs(multiaddr)
|
|
const prevInfo = this.#infos.get(multiaddr)
|
|
switch (prevInfo?.state ?? 'disconnected') {
|
|
case 'connected': {
|
|
const rpc = this.#rpcs.get(multiaddr)
|
|
if (!rpc) {
|
|
// prettier-ignore
|
|
throw new Error(`Failed to connect to ${multiaddr} due to inconsistent internal state`);
|
|
}
|
|
return rpc
|
|
}
|
|
|
|
case 'disconnecting': {
|
|
// If disconnecting, schedule a connect() after disconnection completed
|
|
this.#connectRetries.add(multiaddr)
|
|
// note: control flow should fall through below!
|
|
}
|
|
case 'connecting': {
|
|
return new Promise((resolve, reject) => {
|
|
let timeout = 100
|
|
const checkAgain = () => {
|
|
const rpc = this.#rpcs.get(multiaddr)
|
|
if (rpc) resolve(rpc)
|
|
else if (timeout > 5 * 60e3) {
|
|
// prettier-ignore
|
|
reject(new Error(`Failed to connect to ${multiaddr} after waiting a long time`))
|
|
} else {
|
|
timeout *= 2
|
|
setTimeout(checkAgain, timeout)
|
|
}
|
|
}
|
|
checkAgain()
|
|
})
|
|
}
|
|
|
|
case 'disconnected': {
|
|
debug('Connecting to %s', multiaddr)
|
|
const state = /**@type {Info['state']}*/ ('connecting')
|
|
const pubkey = Connections.extractSHSEPubkey(address)
|
|
this.#infos.update(multiaddr, { state })
|
|
this.#notifyEvent({ type: state, multiaddr, pubkey })
|
|
this.#infos.emit()
|
|
|
|
const [err, rpc] = await run(this.#peer.connect)(address)
|
|
if (err) {
|
|
this.#infos.update(multiaddr, { state: 'disconnected' })
|
|
debug('Failed to connect to %s because: %s', multiaddr, err.message)
|
|
this.#notifyEvent({
|
|
type: 'connecting-failed',
|
|
multiaddr,
|
|
pubkey,
|
|
details: err,
|
|
})
|
|
this.#infos.emit()
|
|
throw err
|
|
}
|
|
|
|
const concurrentInfo = this.#infos.get(multiaddr)
|
|
if (!concurrentInfo || concurrentInfo.state !== 'connected') {
|
|
this.#prepareConnectedRPC(address, rpc, true)
|
|
return rpc
|
|
} else {
|
|
const rpc2 = this.#rpcs.get(multiaddr)
|
|
if (!rpc2) {
|
|
// prettier-ignore
|
|
throw new Error(`Failed to connect to ${multiaddr} due to inconsistent internal state`);
|
|
}
|
|
return rpc2
|
|
}
|
|
}
|
|
|
|
default: {
|
|
// prettier-ignore
|
|
debug('Unexpected control flow, peer %s has bad state %o', multiaddr, prevInfo)
|
|
// prettier-ignore
|
|
throw new Error(`Unexpected control flow, peer ${multiaddr} has bad state "${prevInfo?.state ?? '?'}"`)
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @param {Multiaddr} multiaddr
|
|
* @returns {Promise<boolean>}
|
|
*/
|
|
async disconnect(multiaddr) {
|
|
this.#assertNotClosed()
|
|
const address = Multiaddr.toMs(multiaddr)
|
|
const prevInfo = this.#infos.get(multiaddr)
|
|
if (!prevInfo || prevInfo?.state === 'disconnected') return false
|
|
if (prevInfo.state === 'disconnecting') return false
|
|
|
|
/**@type {RPC}*/
|
|
let rpc
|
|
if (prevInfo.state === 'connecting') {
|
|
rpc = await new Promise((resolve) => {
|
|
let timeout = 100
|
|
const checkAgain = () => {
|
|
const rpc = this.#rpcs.get(multiaddr)
|
|
if (rpc) resolve(rpc)
|
|
else {
|
|
timeout *= 2
|
|
timeout = Math.min(timeout, 30e3)
|
|
setTimeout(checkAgain, 100)
|
|
}
|
|
}
|
|
checkAgain()
|
|
})
|
|
} else if (prevInfo.state === 'connected') {
|
|
const maybeRPC = this.#rpcs.get(multiaddr)
|
|
if (!maybeRPC) {
|
|
// prettier-ignore
|
|
throw new Error(`Failed to disconnect from ${multiaddr} due to inconsistent internal state`);
|
|
} else {
|
|
rpc = maybeRPC
|
|
}
|
|
}
|
|
|
|
debug('Disconnecting from %s', multiaddr)
|
|
const state = /**@type {Info['state']}*/ ('disconnecting')
|
|
const pubkey = Connections.extractSHSEPubkey(address)
|
|
this.#infos.update(multiaddr, { state })
|
|
this.#notifyEvent({ type: state, multiaddr, pubkey })
|
|
this.#infos.emit()
|
|
// @ts-ignore
|
|
await run(rpc.close)(true)
|
|
// Additional cleanup will execute in the "closed" event handler
|
|
|
|
// Re-connect because while disconnect() was running,
|
|
// someone called connect()
|
|
if (this.#connectRetries.has(multiaddr)) {
|
|
this.#connectRetries.delete(multiaddr)
|
|
this.connect(multiaddr)
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
/**
|
|
* @returns {import('pull-stream').Source<ConnectionEvent>}
|
|
*/
|
|
listen() {
|
|
this.#assertNotClosed()
|
|
return /**@type {any}*/ (this.#notifyEvent.listen())
|
|
}
|
|
|
|
reset() {
|
|
if (this.#closed) return
|
|
for (const rpc of this.#rpcs.values()) {
|
|
rpc.close(true)
|
|
}
|
|
}
|
|
|
|
close() {
|
|
this.reset()
|
|
this.#peer.removeListener('rpc:connect', this.#onRpcConnect)
|
|
this.#closed = true
|
|
this.#rpcs.clear()
|
|
this.#connectRetries.clear()
|
|
this.#notifyEvent.end()
|
|
debug('Closed')
|
|
}
|
|
}
|
|
|
|
module.exports = Connections
|