update to ppppp-net
This commit is contained in:
parent
d5b72c3abc
commit
c499e0f945
|
@ -85,8 +85,7 @@ module.exports = class HubObserver {
|
||||||
const { name, admin } = this.#hubMetadata
|
const { name, admin } = this.#hubMetadata
|
||||||
if (name) metadata.name = name
|
if (name) metadata.name = name
|
||||||
if (admin) metadata.admin = admin
|
if (admin) metadata.admin = admin
|
||||||
this.#peer.conn.db().update(this.#address, metadata)
|
this.#peer.net.updateInfo(this.#address, metadata)
|
||||||
this.#peer.conn.hub().update(this.#address, metadata)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
debug('Announcing myself to hub %s', this.#hubPubkey)
|
debug('Announcing myself to hub %s', this.#hubPubkey)
|
||||||
|
@ -126,9 +125,8 @@ module.exports = class HubObserver {
|
||||||
|
|
||||||
// Update onlineCount metadata for this hub
|
// Update onlineCount metadata for this hub
|
||||||
const onlineCount = this.#attendants.size
|
const onlineCount = this.#attendants.size
|
||||||
this.#peer.conn.hub().update(this.#address, { onlineCount })
|
this.#peer.net.updateInfo(this.#address, { onlineCount })
|
||||||
|
|
||||||
// Update ssb-conn-staging
|
|
||||||
const hubName = this.#hubMetadata?.name
|
const hubName = this.#hubMetadata?.name
|
||||||
if (event.type === 'state') {
|
if (event.type === 'state') {
|
||||||
for (const pubkey of event.pubkeys) {
|
for (const pubkey of event.pubkeys) {
|
||||||
|
@ -139,8 +137,7 @@ module.exports = class HubObserver {
|
||||||
} else if (event.type === 'left') {
|
} else if (event.type === 'left') {
|
||||||
const address = this.#getAddress(event.pubkey)
|
const address = this.#getAddress(event.pubkey)
|
||||||
debug('Will disconnect and unstage %s', address)
|
debug('Will disconnect and unstage %s', address)
|
||||||
this.#peer.conn.unstage(address)
|
this.#peer.net.disconnect(address)
|
||||||
this.#peer.conn.disconnect(address)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -156,7 +153,7 @@ module.exports = class HubObserver {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Typically, this should stage the new attendant, but it's not up to us to
|
* Typically, this should stage the new attendant, but it's not up to us to
|
||||||
* decide that. We just notify other modules (like the ssb-conn scheduler) and
|
* decide that. We just notify other modules (like the net scheduler) and
|
||||||
* they listen to the notify stream and stage it if they want.
|
* they listen to the notify stream and stage it if they want.
|
||||||
*
|
*
|
||||||
* @param {string} attendantPubkey
|
* @param {string} attendantPubkey
|
||||||
|
@ -212,16 +209,11 @@ module.exports = class HubObserver {
|
||||||
this.#attendantsDrain?.abort()
|
this.#attendantsDrain?.abort()
|
||||||
for (const pubkey of this.#attendants) {
|
for (const pubkey of this.#attendants) {
|
||||||
const address = this.#getAddress(pubkey)
|
const address = this.#getAddress(pubkey)
|
||||||
this.#peer.conn.unstage(address)
|
this.#peer.net.forget(address)
|
||||||
}
|
|
||||||
for (const [addr, data] of this.#peer.conn.staging().entries()) {
|
|
||||||
if (data.hub === this.#hubPubkey) {
|
|
||||||
this.#peer.conn.unstage(addr)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
this.rpc.close(true, (/** @type {any} */ err) => {
|
this.rpc.close(true, (/** @type {any} */ err) => {
|
||||||
if (err) debug('error when closing connection with room: %o', err)
|
if (err) debug('error when closing connection with room: %o', err)
|
||||||
})
|
})
|
||||||
this.#peer.conn.disconnect(this.#address, () => {})
|
this.#peer.net.disconnect(this.#address)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,18 @@ const { muxrpcMissing } = require('./utils')
|
||||||
* @typedef {Map<string, HubObserver>} Hubs
|
* @typedef {Map<string, HubObserver>} Hubs
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Array<[any, any]>} parsedAddress
|
||||||
|
* @returns {string | undefined}
|
||||||
|
*/
|
||||||
|
function extractSHSEPubkey(parsedAddress) {
|
||||||
|
for (const [transport, transform] of parsedAddress) {
|
||||||
|
if (transform.name === 'shse') {
|
||||||
|
return bs58.encode(transform.pubkey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {Hubs} hubs
|
* @param {Hubs} hubs
|
||||||
* @param {any} peer
|
* @param {any} peer
|
||||||
|
@ -28,12 +40,11 @@ const makeTunnelPlugin = (hubs, peer) => (/** @type {any}} */ msConfig) => {
|
||||||
server(onConnect, startedCB) {
|
server(onConnect, startedCB) {
|
||||||
// Once a peer connects, detect hubs, and setup observers
|
// Once a peer connects, detect hubs, and setup observers
|
||||||
pull(
|
pull(
|
||||||
peer.conn.hub().listen(),
|
peer.net.listen(),
|
||||||
pull.filter(({ type }) => type === 'connected'),
|
pull.filter(({ type }) => type === 'connected'),
|
||||||
pull.drain(async ({ address, key, details }) => {
|
pull.drain(async ({ address, parsedAddress, details }) => {
|
||||||
if (!key) return
|
const pubkey = extractSHSEPubkey(parsedAddress)
|
||||||
// TODO: once ssb-conn is replaced, we won't need this anymore:
|
if (!pubkey) return
|
||||||
const pubkey = key.slice(1, -8) // not base64, it's @BASE58.ed25519
|
|
||||||
if (hubs.has(pubkey)) return
|
if (hubs.has(pubkey)) return
|
||||||
if (!details?.rpc) return
|
if (!details?.rpc) return
|
||||||
if (address.startsWith('tunnel:')) return
|
if (address.startsWith('tunnel:')) return
|
||||||
|
@ -56,12 +67,11 @@ const makeTunnelPlugin = (hubs, peer) => (/** @type {any}} */ msConfig) => {
|
||||||
|
|
||||||
// Once a hub disconnects, teardown
|
// Once a hub disconnects, teardown
|
||||||
pull(
|
pull(
|
||||||
peer.conn.hub().listen(),
|
peer.net.listen(),
|
||||||
pull.filter(({ type }) => type === 'disconnected'),
|
pull.filter(({ type }) => type === 'disconnected'),
|
||||||
pull.drain(({ key }) => {
|
pull.drain(({ parsedAddress }) => {
|
||||||
if (!key) return
|
const pubkey = extractSHSEPubkey(parsedAddress)
|
||||||
// TODO: once ssb-conn is replaced, we won't need this anymore:
|
if (!pubkey) return
|
||||||
const pubkey = key.slice(1, -8) // not base64, it's @BASE58.ed25519
|
|
||||||
if (!hubs.has(pubkey)) return
|
if (!hubs.has(pubkey)) return
|
||||||
hubs.get(pubkey)?.close()
|
hubs.get(pubkey)?.close()
|
||||||
hubs.delete(pubkey)
|
hubs.delete(pubkey)
|
||||||
|
@ -101,41 +111,6 @@ const makeTunnelPlugin = (hubs, peer) => (/** @type {any}} */ msConfig) => {
|
||||||
hubRPC = hubs.get(hub)?.rpc
|
hubRPC = hubs.get(hub)?.rpc
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no hub found, look up hub in connDB and connect to it
|
|
||||||
if (!hubRPC) {
|
|
||||||
for (const [msaddr] of peer.conn.db().entries()) {
|
|
||||||
const pubkey = msaddr.split('~shse:')[1]
|
|
||||||
if (pubkey === hub) {
|
|
||||||
// prettier-ignore
|
|
||||||
debug(`to connect to ${addrStr} we first have to connect to ${hub}`)
|
|
||||||
const [err, rpc] = await run(peer.conn.connect)(msaddr)
|
|
||||||
if (err) {
|
|
||||||
// prettier-ignore
|
|
||||||
cb(new Error(`cant connect to ${addrStr} because cant reach the hub ${hub} due to: ` + err.message ?? err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
hubRPC = rpc
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If no hub found, find tunnel addr in connDB and connect to its `hub`
|
|
||||||
if (!hubRPC) {
|
|
||||||
const addrStrPlusShse = `tunnel:${hub}:${target}~shse:${target}`
|
|
||||||
const peerData = peer.conn.db().get(addrStrPlusShse)
|
|
||||||
if (peerData?.hub === hub && peerData?.hubAddress) {
|
|
||||||
// prettier-ignore
|
|
||||||
debug(`to connect to ${addrStr} we first have to connect to ${hub}`)
|
|
||||||
const [err, rpc] = await run(peer.conn.connect)(peerData.hubAddress)
|
|
||||||
if (err) {
|
|
||||||
// prettier-ignore
|
|
||||||
cb(new Error(`cant connect to ${addrStr} because cant reach the hub ${hub} due to: ` + err.message ?? err))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
hubRPC = rpc
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// If still no hub is found, consider it unknown
|
// If still no hub is found, consider it unknown
|
||||||
if (!hubRPC) {
|
if (!hubRPC) {
|
||||||
// prettier-ignore
|
// prettier-ignore
|
||||||
|
@ -158,11 +133,10 @@ const makeTunnelPlugin = (hubs, peer) => (/** @type {any}} */ msConfig) => {
|
||||||
* @param {any} addr
|
* @param {any} addr
|
||||||
*/
|
*/
|
||||||
parse(addr) {
|
parse(addr) {
|
||||||
if (typeof addr === 'object') return addr
|
if (typeof addr === 'object' && addr.name === 'tunnel') return addr
|
||||||
|
const [name, hub, target] = addr.split(':')
|
||||||
const [prefix, hub, target] = addr.split(':')
|
if (name !== 'tunnel') return
|
||||||
if (prefix !== 'tunnel') return
|
return { name, hub, target }
|
||||||
return { prefix, hub, target }
|
|
||||||
},
|
},
|
||||||
|
|
||||||
stringify() {
|
stringify() {
|
||||||
|
|
|
@ -7,7 +7,7 @@ const { ErrorDuplex } = require('./utils')
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
name: 'hubClient',
|
name: 'hubClient',
|
||||||
// needs: ['conn'], // FIXME: uncomment once we re-write conn
|
needs: ['net'],
|
||||||
manifest: {
|
manifest: {
|
||||||
connect: 'duplex',
|
connect: 'duplex',
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in New Issue