mirror of https://codeberg.org/pzp/pzp-sync.git
update goals API, add type checking
This commit is contained in:
parent
30b5e7cf8a
commit
eff0c57517
|
@ -1,19 +1,21 @@
|
|||
// @ts-ignore
|
||||
const multicb = require('multicb')
|
||||
const p = require('promisify-4loc')
|
||||
const { BloomFilter } = require('bloom-filters')
|
||||
const MsgV3 = require('ppppp-db/msg-v3')
|
||||
const p = require('util').promisify
|
||||
const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range')
|
||||
|
||||
/**
|
||||
* @typedef {import('./range').Range} Range
|
||||
* @typedef {ReturnType<import('ppppp-db').init>} PPPPPDB
|
||||
* @typedef {import('ppppp-db/msg-v3').Msg} Msg
|
||||
*
|
||||
* @typedef {{
|
||||
* type: 'all' | 'newest' | 'oldest',
|
||||
* count: number,
|
||||
* id: string
|
||||
* }} Goal
|
||||
* @typedef {import('ppppp-goals').Goal} Goal
|
||||
* @typedef {import('./range').Range} Range
|
||||
* @typedef {string} MsgID
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {Iterable<unknown>} iter
|
||||
*/
|
||||
function countIter(iter) {
|
||||
let count = 0
|
||||
for (const _ of iter) count++
|
||||
|
@ -21,12 +23,18 @@ function countIter(iter) {
|
|||
}
|
||||
|
||||
class Algorithm {
|
||||
/** @type {{ db: PPPPPDB }} */
|
||||
#peer
|
||||
|
||||
/** @param {{ db: PPPPPDB }} peer */
|
||||
constructor(peer) {
|
||||
this.#peer = peer
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} rootID
|
||||
* @returns {Range}
|
||||
*/
|
||||
haveRange(rootID) {
|
||||
const rootMsg = this.#peer.db.get(rootID)
|
||||
if (!rootMsg) return EMPTY_RANGE
|
||||
|
@ -91,15 +99,23 @@ class Algorithm {
|
|||
wantRange(localHave, remoteHave, goal) {
|
||||
if (!goal) return EMPTY_RANGE
|
||||
if (isEmptyRange(remoteHave)) return EMPTY_RANGE
|
||||
if (goal.type === 'all') {
|
||||
if (goal.type === 'all' || goal.type === 'set' || goal.type === 'record') {
|
||||
return this.#wantAllRange(localHave, remoteHave)
|
||||
} else if (goal.type === 'newest') {
|
||||
return this.#wantNewestRange(localHave, remoteHave, goal.count)
|
||||
} else if (goal.type === 'oldest') {
|
||||
return this.#wantOldestRange(localHave, remoteHave, goal.count)
|
||||
}
|
||||
return EMPTY_RANGE
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} rootID
|
||||
* @param {number} round
|
||||
* @param {Range} range
|
||||
* @param {Iterable<string>} extraIds
|
||||
* @returns {JSON}
|
||||
*/
|
||||
bloomFor(rootID, round, range, extraIds = []) {
|
||||
const filterSize =
|
||||
(isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds)
|
||||
|
@ -115,6 +131,13 @@ class Algorithm {
|
|||
return filter.saveAsJSON()
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} rootID
|
||||
* @param {number} round
|
||||
* @param {Range} range
|
||||
* @param {JSON} remoteBloomJSON
|
||||
* @returns
|
||||
*/
|
||||
msgsMissing(rootID, round, range, remoteBloomJSON) {
|
||||
if (isEmptyRange(range)) return []
|
||||
const remoteFilter = BloomFilter.fromJSON(remoteBloomJSON)
|
||||
|
@ -128,6 +151,10 @@ class Algorithm {
|
|||
return missing
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} rootID
|
||||
* @param {Range} range
|
||||
*/
|
||||
*yieldMsgsIn(rootID, range) {
|
||||
const [minDepth, maxDepth] = range
|
||||
const rootMsg = this.#peer.db.get(rootID)
|
||||
|
@ -145,6 +172,10 @@ class Algorithm {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} rootID
|
||||
* @param {number} count
|
||||
*/
|
||||
async pruneNewest(rootID, count) {
|
||||
const tangle = this.#peer.db.getTangle(rootID)
|
||||
const sorted = tangle.topoSort()
|
||||
|
@ -204,11 +235,15 @@ class Algorithm {
|
|||
// TODO: Simulate adding this whole tangle, and check if it's valid
|
||||
|
||||
// Add new messages
|
||||
// TODO: optimize perf, avoiding await / try / catch
|
||||
const doneAdding = multicb({ pluck: 1 })
|
||||
for (const msg of validNewMsgs) {
|
||||
try {
|
||||
await p(this.#peer.db.add)(msg, rootID)
|
||||
} catch {}
|
||||
this.#peer.db.add(msg, rootID, doneAdding())
|
||||
}
|
||||
try {
|
||||
await p(doneAdding)()
|
||||
} catch (err) {
|
||||
// TODO:
|
||||
// debug('commit failed %o', err)
|
||||
}
|
||||
|
||||
if (goal.type === 'newest') {
|
||||
|
@ -222,7 +257,7 @@ class Algorithm {
|
|||
/**
|
||||
* @param {string} rootID
|
||||
* @param {Set<string>} msgIDs
|
||||
* @returns
|
||||
* @returns {Array<Msg>}
|
||||
*/
|
||||
getTangleSlice(rootID, msgIDs) {
|
||||
if (msgIDs.size === 0) return []
|
||||
|
@ -235,11 +270,14 @@ class Algorithm {
|
|||
break
|
||||
}
|
||||
}
|
||||
if (oldestMsgID === null) {
|
||||
throw new Error('No common msgID found in tangle given inputs')
|
||||
}
|
||||
const { erasables } = tangle.getDeletablesAndErasables(oldestMsgID)
|
||||
|
||||
const msgs = []
|
||||
for (const msgID of sorted) {
|
||||
let isErasable = erasables.includes(msgID)
|
||||
let isErasable = erasables.has(msgID)
|
||||
if (!msgIDs.has(msgID) && !isErasable) continue
|
||||
const msg = this.#peer.db.get(msgID)
|
||||
if (!msg) continue
|
||||
|
|
87
lib/index.js
87
lib/index.js
|
@ -1,16 +1,54 @@
|
|||
const toPull = require('push-stream-to-pull-stream')
|
||||
// @ts-ignore
|
||||
const toPull = require('push-stream-to-pull-stream') // @ts-ignore
|
||||
const getSeverity = require('ssb-network-errors')
|
||||
const pull = require('pull-stream')
|
||||
const makeDebug = require('debug')
|
||||
const getSeverity = require('ssb-network-errors')
|
||||
const Algorithm = require('./algorithm')
|
||||
const SyncStream = require('./stream')
|
||||
|
||||
/**
|
||||
* @typedef {ReturnType<import('ppppp-goals').init>} PPPPPGoal
|
||||
* @typedef {ReturnType<import('ppppp-db').init>} PPPPPDB
|
||||
* @typedef {import('node:events').EventEmitter} Emitter
|
||||
* @typedef {(cb: (err: Error) => void) => import('pull-stream').Duplex<unknown, unknown>} GetDuplex
|
||||
* @typedef {{ pubkey: string }} SHSE
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {Error} err
|
||||
* @param {string} namespace
|
||||
* @param {string} methodName
|
||||
*/
|
||||
function isMuxrpcMissingError(err, namespace, methodName) {
|
||||
const jsErrorMessage = `method:${namespace},${methodName} is not in list of allowed methods`
|
||||
const goErrorMessage = `muxrpc: no such command: ${namespace}.${methodName}`
|
||||
return err.message === jsErrorMessage || err.message === goErrorMessage
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {{ db: PPPPPDB | null }} peer
|
||||
* @returns {asserts peer is { db: PPPPPDB }}
|
||||
*/
|
||||
function assertDBExists(peer) {
|
||||
if (!peer.db) throw new Error('tangleSync requires ppppp-db plugin')
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {{ goals: PPPPPGoal | null }} peer
|
||||
* @returns {asserts peer is { goals: PPPPPGoal }}
|
||||
*/
|
||||
function assertGoalsExists(peer) {
|
||||
if (!peer.goals) throw new Error('tangleSync requires ppppp-goals plugin')
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {{ shse: SHSE | null }} peer
|
||||
* @returns {asserts peer is { shse: SHSE }}
|
||||
*/
|
||||
function assertSHSEExists(peer) {
|
||||
if (!peer.shse) throw new Error('tangleSync requires secret-handshake-ext')
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
name: 'tangleSync',
|
||||
manifest: {
|
||||
|
@ -22,15 +60,31 @@ module.exports = {
|
|||
allow: ['connect'],
|
||||
},
|
||||
},
|
||||
|
||||
/**
|
||||
* @param {Emitter & {
|
||||
* db: PPPPPDB | null,
|
||||
* goals: PPPPPGoal | null,
|
||||
* shse: SHSE | null
|
||||
* }} peer
|
||||
* @param {unknown} config
|
||||
*/
|
||||
init(peer, config) {
|
||||
if (!peer.goals) {
|
||||
throw new Error('tangleSync requires the goals plugin')
|
||||
}
|
||||
assertDBExists(peer)
|
||||
assertGoalsExists(peer)
|
||||
assertSHSEExists(peer)
|
||||
const debug = makeDebug(`ppppp:tangleSync`)
|
||||
const algo = new Algorithm(peer)
|
||||
|
||||
const streams = []
|
||||
const streams = /** @type {Array<SyncStream>} */ ([])
|
||||
|
||||
/**
|
||||
* @param {string} remoteId
|
||||
* @param {boolean} iamClient
|
||||
*/
|
||||
function createStream(remoteId, iamClient) {
|
||||
assertSHSEExists(peer)
|
||||
assertGoalsExists(peer)
|
||||
// prettier-ignore
|
||||
debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId)
|
||||
const stream = new SyncStream(peer.shse.pubkey, debug, peer.goals, algo)
|
||||
|
@ -38,12 +92,17 @@ module.exports = {
|
|||
return stream
|
||||
}
|
||||
|
||||
peer.on('rpc:connect', function onSyncRPCConnect(rpc, iamClient) {
|
||||
if (rpc.shse.pubkey === peer.shse.pubkey) return // local client connecting to local server
|
||||
/**
|
||||
* @param {{ shse: SHSE, tangleSync: { connect: GetDuplex } }} rpc
|
||||
* @param {boolean} iamClient
|
||||
*/
|
||||
function onSyncRPCConnect(rpc, iamClient) {
|
||||
assertSHSEExists(peer)
|
||||
if (rpc.shse.pubkey === peer.shse.pubkey) return // connecting to myself
|
||||
if (!iamClient) return
|
||||
const local = toPull.duplex(createStream(rpc.id, true))
|
||||
const local = toPull.duplex(createStream(rpc.shse.pubkey, true))
|
||||
|
||||
let abort
|
||||
let abort = /** @type {CallableFunction | null} */ (null)
|
||||
const remote = rpc.tangleSync.connect((networkError) => {
|
||||
if (networkError && getSeverity(networkError) >= 3) {
|
||||
if (isMuxrpcMissingError(networkError, 'tangleSync', 'connect')) {
|
||||
|
@ -57,11 +116,17 @@ module.exports = {
|
|||
}
|
||||
})
|
||||
abort = pull(local, remote, local)
|
||||
})
|
||||
}
|
||||
peer.on('rpc:connect', onSyncRPCConnect)
|
||||
|
||||
/**
|
||||
* @this {{id: string}}
|
||||
*/
|
||||
function connect() {
|
||||
// `this` refers to the remote peer who called this muxrpc API
|
||||
return toPull.duplex(createStream(this.id, false))
|
||||
// TODO: fix muxrpc to replace this.id with this.shse.pubkey.
|
||||
// this.id comes from muxrpc, not secret-stack
|
||||
}
|
||||
|
||||
function initiate() {
|
||||
|
|
|
@ -23,7 +23,7 @@ function estimateMsgCount(range) {
|
|||
else return estimate
|
||||
}
|
||||
|
||||
const EMPTY_RANGE = [1, 0]
|
||||
const EMPTY_RANGE = /** @type {Range} */ ([1, 0])
|
||||
|
||||
module.exports = {
|
||||
isEmptyRange,
|
||||
|
|
153
lib/stream.js
153
lib/stream.js
|
@ -1,28 +1,45 @@
|
|||
// @ts-ignore
|
||||
const Pipeable = require('push-stream/pipeable')
|
||||
const { isEmptyRange } = require('./range')
|
||||
|
||||
/**
|
||||
* @typedef {ReturnType<import('ppppp-goals').init>} PPPPPGoals
|
||||
* @typedef {import('ppppp-db/msg-v3').Msg} Msg
|
||||
* @typedef {import('./range').Range} Range
|
||||
* @typedef {import('./algorithm')} Algorithm
|
||||
* @typedef {{
|
||||
* type: 'all' | 'newest' | 'oldest',
|
||||
* count: number,
|
||||
* id: string
|
||||
* }} Goal
|
||||
* @typedef {string} MsgID
|
||||
* @typedef {{id: string}} WithId
|
||||
* @typedef {WithId & {phase: 0, payload?: undefined}} Data0
|
||||
* @typedef {WithId & {phase: 1, payload: Range}} Data1
|
||||
* @typedef {WithId & {phase: 2, payload: { haveRange: Range, wantRange: Range }}} Data2
|
||||
* @typedef {WithId & {phase: 3, payload: { wantRange: Range, bloom: JSON }}} Data3
|
||||
* @typedef {WithId & {phase: 4 | 5 | 6 | 7, payload: { msgIDs: Array<MsgID>, bloom: JSON }}} Data4567
|
||||
* @typedef {WithId & {phase: 8, payload: { msgs: Array<Msg>, bloom: JSON }}} Data8
|
||||
* @typedef {WithId & {phase: 9, payload: Array<Msg>}} Data9
|
||||
* @typedef {Data0 | Data1 | Data2 | Data3 | Data4567 | Data8 | Data9} Data
|
||||
*/
|
||||
|
||||
class SyncStream extends Pipeable {
|
||||
#myId
|
||||
#debug
|
||||
|
||||
/** @type {Algorithm} */
|
||||
#algo
|
||||
|
||||
/** @type {CallableFunction} */
|
||||
#debug
|
||||
|
||||
/** Set of tangleId
|
||||
* @type {Set<string>} */
|
||||
#requested
|
||||
|
||||
/** tangleId => goal
|
||||
* @type {{
|
||||
* getByID(id: string): Goal | null,
|
||||
* list(): IterableIterator<Goal>,
|
||||
* }} */
|
||||
* @type {PPPPPGoals}
|
||||
*/
|
||||
#goals
|
||||
|
||||
/**
|
||||
|
@ -61,10 +78,18 @@ class SyncStream extends Pipeable {
|
|||
*/
|
||||
#sendableMsgs
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {string} localId
|
||||
* @param {CallableFunction} debug
|
||||
* @param {PPPPPGoals} goals
|
||||
* @param {Algorithm} algo
|
||||
*/
|
||||
constructor(localId, debug, goals, algo) {
|
||||
super()
|
||||
this.paused = false // TODO: should we start as paused=true?
|
||||
this.ended = false
|
||||
/** @type {any} */
|
||||
this.source = this.sink = null
|
||||
this.#myId = localId.slice(0, 6)
|
||||
this.#debug = debug
|
||||
|
@ -90,22 +115,33 @@ class SyncStream extends Pipeable {
|
|||
return this.sink && !this.sink.paused && !this.ended
|
||||
}
|
||||
|
||||
#updateSendableMsgs(id, msgs) {
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {Array<MsgID>} msgIDs
|
||||
*/
|
||||
#updateSendableMsgs(id, msgIDs) {
|
||||
const set = this.#sendableMsgs.get(id) ?? new Set()
|
||||
for (const msg of msgs) {
|
||||
set.add(msg)
|
||||
for (const msgID of msgIDs) {
|
||||
set.add(msgID)
|
||||
}
|
||||
this.#sendableMsgs.set(id, set)
|
||||
}
|
||||
|
||||
#updateReceivableMsgs(id, msgs) {
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {Array<string>} msgIDs
|
||||
*/
|
||||
#updateReceivableMsgs(id, msgIDs) {
|
||||
const set = this.#receivableMsgs.get(id) ?? new Set()
|
||||
for (const msg of msgs) {
|
||||
set.add(msg)
|
||||
for (const msgID of msgIDs) {
|
||||
set.add(msgID)
|
||||
}
|
||||
this.#receivableMsgs.set(id, set)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
*/
|
||||
#sendLocalHave(id) {
|
||||
const localHaveRange = this.#algo.haveRange(id)
|
||||
this.#localHave.set(id, localHaveRange)
|
||||
|
@ -114,11 +150,15 @@ class SyncStream extends Pipeable {
|
|||
this.sink.write({ id, phase: 1, payload: localHaveRange })
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {Range} remoteHaveRange
|
||||
*/
|
||||
#sendLocalHaveAndWant(id, remoteHaveRange) {
|
||||
// prettier-ignore
|
||||
this.#debug('%s Stream IN1: got remote have-range %o for %s', this.#myId, remoteHaveRange, id)
|
||||
this.#remoteHave.set(id, remoteHaveRange)
|
||||
const goal = this.#goals.getByID(id)
|
||||
const goal = this.#goals.get(id)
|
||||
const haveRange = this.#algo.haveRange(id)
|
||||
const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal)
|
||||
this.#localHave.set(id, haveRange)
|
||||
|
@ -128,13 +168,19 @@ class SyncStream extends Pipeable {
|
|||
this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } })
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {Range} remoteHaveRange
|
||||
* @param {Range} remoteWantRange
|
||||
*/
|
||||
#sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) {
|
||||
// prettier-ignore
|
||||
this.#debug('%s Stream IN2: got remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id)
|
||||
this.#remoteHave.set(id, remoteHaveRange)
|
||||
this.#remoteWant.set(id, remoteWantRange)
|
||||
const goal = this.#goals.getByID(id)
|
||||
const goal = this.#goals.get(id)
|
||||
const haveRange = this.#localHave.get(id)
|
||||
if (!haveRange) throw new Error('local have-range not set')
|
||||
const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal)
|
||||
this.#localWant.set(id, wantRange)
|
||||
const localBloom0 = this.#algo.bloomFor(id, 0, wantRange)
|
||||
|
@ -147,6 +193,11 @@ class SyncStream extends Pipeable {
|
|||
this.#debug('%s Stream OUT3: send local want-range %o and bloom round 0 for %s', this.#myId, wantRange, id)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {Range} remoteWantRange
|
||||
* @param {JSON} remoteBloom
|
||||
*/
|
||||
#sendInitBloomRes(id, remoteWantRange, remoteBloom) {
|
||||
// prettier-ignore
|
||||
this.#debug('%s Stream IN3: got remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id)
|
||||
|
@ -159,6 +210,7 @@ class SyncStream extends Pipeable {
|
|||
)
|
||||
this.#updateSendableMsgs(id, msgIDsForThem)
|
||||
const localWantRange = this.#localWant.get(id)
|
||||
if (!localWantRange) throw new Error('local want-range not set')
|
||||
const localBloom = this.#algo.bloomFor(id, 0, localWantRange)
|
||||
this.sink.write({
|
||||
id,
|
||||
|
@ -169,10 +221,18 @@ class SyncStream extends Pipeable {
|
|||
this.#debug('%s Stream OUT4: send bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {number} phase
|
||||
* @param {number} round
|
||||
* @param {JSON} remoteBloom
|
||||
* @param {Array<MsgID>} msgIDsForMe
|
||||
*/
|
||||
#sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) {
|
||||
// prettier-ignore
|
||||
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round-1, id, msgIDsForMe)
|
||||
const remoteWantRange = this.#remoteWant.get(id)
|
||||
if (!remoteWantRange) throw new Error('remote want-range not set')
|
||||
this.#updateReceivableMsgs(id, msgIDsForMe)
|
||||
const msgIDsForThem = this.#algo.msgsMissing(
|
||||
id,
|
||||
|
@ -183,6 +243,7 @@ class SyncStream extends Pipeable {
|
|||
this.#updateSendableMsgs(id, msgIDsForThem)
|
||||
const extras = this.#receivableMsgs.get(id)
|
||||
const localWantRange = this.#localWant.get(id)
|
||||
if (!localWantRange) throw new Error('local want-range not set')
|
||||
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras)
|
||||
this.sink.write({
|
||||
id,
|
||||
|
@ -193,10 +254,18 @@ class SyncStream extends Pipeable {
|
|||
this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {number} phase
|
||||
* @param {number} round
|
||||
* @param {JSON} remoteBloom
|
||||
* @param {Array<MsgID>} msgIDsForMe
|
||||
*/
|
||||
#sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) {
|
||||
// prettier-ignore
|
||||
this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round, id, msgIDsForMe)
|
||||
const remoteWantRange = this.#remoteWant.get(id)
|
||||
if (!remoteWantRange) throw new Error('remote want-range not set')
|
||||
this.#updateReceivableMsgs(id, msgIDsForMe)
|
||||
const msgIDsForThem = this.#algo.msgsMissing(
|
||||
id,
|
||||
|
@ -207,6 +276,7 @@ class SyncStream extends Pipeable {
|
|||
this.#updateSendableMsgs(id, msgIDsForThem)
|
||||
const extras = this.#receivableMsgs.get(id)
|
||||
const localWantRange = this.#localWant.get(id)
|
||||
if (!localWantRange) throw new Error('local want-range not set')
|
||||
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras)
|
||||
this.sink.write({
|
||||
id,
|
||||
|
@ -217,10 +287,17 @@ class SyncStream extends Pipeable {
|
|||
this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {number} round
|
||||
* @param {JSON} remoteBloom
|
||||
* @param {Array<MsgID>} msgIDsForMe
|
||||
*/
|
||||
#sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) {
|
||||
// prettier-ignore
|
||||
this.#debug('%s Stream IN7: got bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe)
|
||||
const remoteWantRange = this.#remoteWant.get(id)
|
||||
if (!remoteWantRange) throw new Error('remote want-range not set')
|
||||
this.#updateReceivableMsgs(id, msgIDsForMe)
|
||||
const msgIDsForThem = this.#algo.msgsMissing(
|
||||
id,
|
||||
|
@ -233,6 +310,7 @@ class SyncStream extends Pipeable {
|
|||
const msgs = this.#algo.getTangleSlice(id, msgIDs)
|
||||
const extras = this.#receivableMsgs.get(id)
|
||||
const localWantRange = this.#localWant.get(id)
|
||||
if (!localWantRange) throw new Error('local want-range not set')
|
||||
const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras)
|
||||
this.sink.write({
|
||||
id,
|
||||
|
@ -243,10 +321,17 @@ class SyncStream extends Pipeable {
|
|||
this.#debug('%s Stream OUT8: send bloom round %s plus %s msgs in %s', this.#myId, round, msgs.length, id)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {number} round
|
||||
* @param {JSON} remoteBloom
|
||||
* @param {Array<Msg>} msgsForMe
|
||||
*/
|
||||
#sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) {
|
||||
// prettier-ignore
|
||||
this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id)
|
||||
const remoteWantRange = this.#remoteWant.get(id)
|
||||
if (!remoteWantRange) throw new Error('remote want-range not set')
|
||||
const msgIDsForThem = this.#algo.msgsMissing(
|
||||
id,
|
||||
round,
|
||||
|
@ -260,8 +345,10 @@ class SyncStream extends Pipeable {
|
|||
this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id)
|
||||
this.sink.write({ id, phase: 9, payload: msgs })
|
||||
|
||||
const goal = this.#goals.getByID(id)
|
||||
const localWant = this.#localWant.get(id)
|
||||
const goal = this.#goals.get(id)
|
||||
if (!goal) throw new Error(`No goal found for "${id}"`)
|
||||
const localWantRange = this.#localWant.get(id)
|
||||
if (!localWantRange) throw new Error('local want-range not set')
|
||||
this.#requested.delete(id)
|
||||
this.#localHave.delete(id)
|
||||
this.#localWant.delete(id)
|
||||
|
@ -271,19 +358,26 @@ class SyncStream extends Pipeable {
|
|||
this.#sendableMsgs.delete(id)
|
||||
if (msgsForMe.length === 0) return
|
||||
try {
|
||||
this.#algo.commit(id, msgsForMe, goal, localWant)
|
||||
this.#algo.commit(id, msgsForMe, goal, localWantRange)
|
||||
} catch (err) {
|
||||
// prettier-ignore
|
||||
this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {Array<Msg>} msgsForMe
|
||||
* @returns
|
||||
*/
|
||||
#consumeMissingMsgs(id, msgsForMe) {
|
||||
// prettier-ignore
|
||||
this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, msgsForMe.length, id)
|
||||
|
||||
const goal = this.#goals.getByID(id)
|
||||
const localWant = this.#localWant.get(id)
|
||||
const goal = this.#goals.get(id)
|
||||
if (!goal) throw new Error(`No goal found for "${id}"`)
|
||||
const localWantRange = this.#localWant.get(id)
|
||||
if (!localWantRange) throw new Error('local want-range not set')
|
||||
this.#requested.delete(id)
|
||||
this.#localHave.delete(id)
|
||||
this.#localWant.delete(id)
|
||||
|
@ -293,13 +387,17 @@ class SyncStream extends Pipeable {
|
|||
this.#sendableMsgs.delete(id)
|
||||
if (msgsForMe.length === 0) return
|
||||
try {
|
||||
this.#algo.commit(id, msgsForMe, goal, localWant)
|
||||
this.#algo.commit(id, msgsForMe, goal, localWantRange)
|
||||
} catch (err) {
|
||||
// prettier-ignore
|
||||
this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} id
|
||||
* @param {Range} remoteWantRange
|
||||
*/
|
||||
#sendMsgsInRemoteWant(id, remoteWantRange) {
|
||||
const msgs = []
|
||||
for (const msg of this.#algo.yieldMsgsIn(id, remoteWantRange)) {
|
||||
|
@ -310,7 +408,7 @@ class SyncStream extends Pipeable {
|
|||
this.sink.write({ id, phase: 9, payload: msgs })
|
||||
}
|
||||
|
||||
// as a source
|
||||
// source method
|
||||
resume() {
|
||||
if (!this.sink || this.sink.paused) return
|
||||
|
||||
|
@ -320,7 +418,10 @@ class SyncStream extends Pipeable {
|
|||
}
|
||||
}
|
||||
|
||||
// as a sink
|
||||
/**
|
||||
* sink method
|
||||
* @param {Data} data
|
||||
*/
|
||||
write(data) {
|
||||
const { id, phase, payload } = data
|
||||
|
||||
|
@ -380,14 +481,20 @@ class SyncStream extends Pipeable {
|
|||
this.#debug('Stream IN: unknown %o', data)
|
||||
}
|
||||
|
||||
// as a source
|
||||
/**
|
||||
* source method
|
||||
* @param {Error} err
|
||||
*/
|
||||
abort(err) {
|
||||
this.ended = true
|
||||
if (this.source && !this.source.ended) this.source.abort(err)
|
||||
if (this.sink && !this.sink.ended) this.sink.end(err)
|
||||
}
|
||||
|
||||
// as a sink
|
||||
/**
|
||||
* sink method
|
||||
* @param {Error} err
|
||||
*/
|
||||
end(err) {
|
||||
this.ended = true
|
||||
if (this.source && !this.source.ended) this.source.abort(err)
|
||||
|
|
13
package.json
13
package.json
|
@ -26,12 +26,17 @@
|
|||
"dependencies": {
|
||||
"bloom-filters": "^3.0.0",
|
||||
"debug": "^4.3.4",
|
||||
"multicb": "1.2.2",
|
||||
"promisify-4loc": "^1.0.0",
|
||||
"pull-stream": "^3.7.0",
|
||||
"push-stream": "^11.2.0",
|
||||
"push-stream-to-pull-stream": "^1.0.5",
|
||||
"ssb-network-errors": "^1.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/debug": "^4.1.9",
|
||||
"@types/pull-stream": "3.6.3",
|
||||
"@types/node": "16.x",
|
||||
"bs58": "^5.0.0",
|
||||
"c8": "7",
|
||||
"ppppp-db": "github:staltz/ppppp-db",
|
||||
|
@ -43,10 +48,14 @@
|
|||
"rimraf": "^4.4.0",
|
||||
"secret-stack": "~7.1.0",
|
||||
"secret-handshake-ext": "^0.0.8",
|
||||
"ssb-box": "^1.0.1"
|
||||
"ssb-box": "^1.0.1",
|
||||
"typescript": "^5.1.3"
|
||||
},
|
||||
"scripts": {
|
||||
"test": "node --test",
|
||||
"clean-check": "tsc --build --clean",
|
||||
"prepublishOnly": "npm run clean-check && tsc --build",
|
||||
"postpublish": "npm run clean-check",
|
||||
"test": "npm run clean-check && node --test",
|
||||
"format-code": "prettier --write \"(lib|test)/**/*.js\"",
|
||||
"format-code-staged": "pretty-quick --staged --pattern \"(lib|test)/**/*.js\"",
|
||||
"coverage": "c8 --reporter=lcov npm run test"
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
{
|
||||
"include": ["lib/**/*.js"],
|
||||
"exclude": ["coverage/", "node_modules/", "test/"],
|
||||
"compilerOptions": {
|
||||
"checkJs": true,
|
||||
"declaration": true,
|
||||
"emitDeclarationOnly": true,
|
||||
"exactOptionalPropertyTypes": true,
|
||||
"forceConsistentCasingInFileNames": true,
|
||||
"lib": ["es2022", "dom"],
|
||||
"module": "node16",
|
||||
"skipLibCheck": true,
|
||||
"strict": true,
|
||||
"target": "es2021"
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue