pzp-hub-client/lib/hub-observer.js

228 lines
6.1 KiB
JavaScript

const debug = require('debug')('ppppp:hub-client')
const pull = require('pull-stream')
// @ts-ignore
const getSeverity = require('ssb-network-errors')
/**
* @typedef {(pull.Sink<AttendantsEvent> & {abort: () => void})} Drain
* @typedef {{type: 'state', pubkeys: Array<string>}} AttendantsEventState
* @typedef {{type: 'joined', pubkey: string}} AttendantsEventJoined
* @typedef {{type: 'left', pubkey: string}} AttendantsEventLeft
*/
/**
* @typedef {AttendantsEventState | AttendantsEventJoined | AttendantsEventLeft} AttendantsEvent
*/
module.exports = class HubObserver {
/**
* @readonly
* @type {any}
*/
#peer
/**
* @type {string}
*/
#hubPubkey
/**
* @type {string}
*/
#address
/**
* @type {{name: string, admin: string}}
*/
#hubMetadata
/**
* @type {Set<string>}
*/
#attendants
/**
* @type {Drain | undefined}
*/
#attendantsDrain
/**
* @param {any} peer
* @param {string} hubPubkey
* @param {string} address
* @param {{name: string, admin: string}} hubMetadata
* @param {any} rpc
* @param {any} onConnect
*/
constructor(peer, hubPubkey, address, hubMetadata, rpc, onConnect) {
this.#peer = peer
this.#hubPubkey = hubPubkey
this.#address = address
this.#hubMetadata = hubMetadata
this.#attendants = new Set()
this.rpc = rpc
/**
* @param {any} stream
* @param {string} peerPubkey
*/
this.handler = (stream, peerPubkey) => {
stream.address = `tunnel:${this.#hubPubkey}:${peerPubkey}`
// prettier-ignore
debug('Handler will call onConnect for the stream.address: %s', stream.address);
onConnect(stream)
}
// If metadata is a plain object with at least one field
if (
typeof this.#hubMetadata === 'object' &&
this.#hubMetadata &&
Object.keys(this.#hubMetadata).length >= 1
) {
/** @type {Record<string, string>} */
const metadata = { type: 'hub' }
const { name, admin } = this.#hubMetadata
if (name) metadata.name = name
if (admin) metadata.admin = admin
this.#peer.conn.db().update(this.#address, metadata)
this.#peer.conn.hub().update(this.#address, metadata)
}
debug('Announcing myself to hub %s', this.#hubPubkey)
pull(
this.rpc.hub.attendants(),
(this.#attendantsDrain = /** @type {Drain} */ (
pull.drain(this.#attendantsUpdated, this.#attendantsEnded)
))
)
}
/**
* @param {AttendantsEvent} event
*/
#attendantsUpdated = (event) => {
// debug log
if (event.type === 'state') {
// prettier-ignore
debug('initial attendants in %s: %s', this.#hubPubkey, JSON.stringify(event.pubkeys))
} else if (event.type === 'joined') {
debug('attendant joined %s: %s', this.#hubPubkey, event.pubkey)
} else if (event.type === 'left') {
debug('attendant left %s: %s', this.#hubPubkey, event.pubkey)
}
// Update attendants set
if (event.type === 'state') {
this.#attendants.clear()
for (const pubkey of event.pubkeys) {
this.#attendants.add(pubkey)
}
} else if (event.type === 'joined') {
this.#attendants.add(event.pubkey)
} else if (event.type === 'left') {
this.#attendants.delete(event.pubkey)
}
// Update onlineCount metadata for this hub
const onlineCount = this.#attendants.size
this.#peer.conn.hub().update(this.#address, { onlineCount })
// Update ssb-conn-staging
const hubName = this.#hubMetadata?.name
if (event.type === 'state') {
for (const pubkey of event.pubkeys) {
this.#notifyNewAttendant(pubkey, this.#hubPubkey, hubName)
}
} else if (event.type === 'joined') {
this.#notifyNewAttendant(event.pubkey, this.#hubPubkey, hubName)
} else if (event.type === 'left') {
const address = this.#getAddress(event.pubkey)
debug('Will disconnect and unstage %s', address)
this.#peer.conn.unstage(address)
this.#peer.conn.disconnect(address)
}
}
/**
* @param {Error | boolean | null | undefined} err
* @returns
*/
#attendantsEnded = (err) => {
if (err && err !== true) {
this.#handleStreamError(err)
}
}
/**
* Typically, this should stage the new attendant, but it's not up to us to
* decide that. We just notify other modules (like the ssb-conn scheduler) and
* they listen to the notify stream and stage it if they want.
*
* @param {string} attendantPubkey
* @param {string} hubPubkey
* @param {string} hubName
*/
#notifyNewAttendant(attendantPubkey, hubPubkey, hubName) {
if (attendantPubkey === hubPubkey) return
if (attendantPubkey === this.#peer.shse.pubkey) return
const address = this.#getAddress(attendantPubkey)
this.#peer.hubClient._notifyDiscoveredAttendant({
address,
attendantPubkey,
hubPubkey,
hubName,
})
}
/**
* @param {Error} err
*/
#handleStreamError(err) {
const severity = getSeverity(err)
if (severity === 1) {
// pre-emptively destroy the stream, assuming the other
// end is packet-stream 2.0.0 sending end messages.
this.close()
} else if (severity >= 2) {
// prettier-ignore
console.error(`error getting updates from hub ${this.#hubPubkey} because ${err.message}`);
}
}
/**
* @param {string} pubkey
*/
#getAddress(pubkey) {
return `tunnel:${this.#hubPubkey}:${pubkey}~shse:${pubkey}`
}
/**
* Similar to close(), but just destroys this "observer", not the
* underlying connections.
*/
cancel() {
this.#attendantsDrain?.abort()
}
/**
* Similar to cancel(), but also closes connection with the hub server.
*/
close() {
this.#attendantsDrain?.abort()
for (const pubkey of this.#attendants) {
const address = this.#getAddress(pubkey)
this.#peer.conn.unstage(address)
}
for (const [addr, data] of this.#peer.conn.staging().entries()) {
if (data.hub === this.#hubPubkey) {
this.#peer.conn.unstage(addr)
}
}
this.rpc.close(true, (/** @type {any} */ err) => {
if (err) debug('error when closing connection with room: %o', err)
})
this.#peer.conn.disconnect(this.#address, () => {})
}
}