diff --git a/lib/algorithm.js b/lib/algorithm.js index a33b1a5..476ec9e 100644 --- a/lib/algorithm.js +++ b/lib/algorithm.js @@ -1,19 +1,21 @@ +// @ts-ignore +const multicb = require('multicb') +const p = require('promisify-4loc') const { BloomFilter } = require('bloom-filters') const MsgV3 = require('ppppp-db/msg-v3') -const p = require('util').promisify const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range') /** - * @typedef {import('./range').Range} Range + * @typedef {ReturnType} PPPPPDB * @typedef {import('ppppp-db/msg-v3').Msg} Msg - * - * @typedef {{ - * type: 'all' | 'newest' | 'oldest', - * count: number, - * id: string - * }} Goal + * @typedef {import('ppppp-goals').Goal} Goal + * @typedef {import('./range').Range} Range + * @typedef {string} MsgID */ +/** + * @param {Iterable} iter + */ function countIter(iter) { let count = 0 for (const _ of iter) count++ @@ -21,12 +23,18 @@ function countIter(iter) { } class Algorithm { + /** @type {{ db: PPPPPDB }} */ #peer + /** @param {{ db: PPPPPDB }} peer */ constructor(peer) { this.#peer = peer } + /** + * @param {string} rootID + * @returns {Range} + */ haveRange(rootID) { const rootMsg = this.#peer.db.get(rootID) if (!rootMsg) return EMPTY_RANGE @@ -91,15 +99,23 @@ class Algorithm { wantRange(localHave, remoteHave, goal) { if (!goal) return EMPTY_RANGE if (isEmptyRange(remoteHave)) return EMPTY_RANGE - if (goal.type === 'all') { + if (goal.type === 'all' || goal.type === 'set' || goal.type === 'record') { return this.#wantAllRange(localHave, remoteHave) } else if (goal.type === 'newest') { return this.#wantNewestRange(localHave, remoteHave, goal.count) } else if (goal.type === 'oldest') { return this.#wantOldestRange(localHave, remoteHave, goal.count) } + return EMPTY_RANGE } + /** + * @param {string} rootID + * @param {number} round + * @param {Range} range + * @param {Iterable} extraIds + * @returns {JSON} + */ bloomFor(rootID, round, range, extraIds = []) { const filterSize = (isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds) @@ -115,6 +131,13 @@ class Algorithm { return filter.saveAsJSON() } + /** + * @param {string} rootID + * @param {number} round + * @param {Range} range + * @param {JSON} remoteBloomJSON + * @returns + */ msgsMissing(rootID, round, range, remoteBloomJSON) { if (isEmptyRange(range)) return [] const remoteFilter = BloomFilter.fromJSON(remoteBloomJSON) @@ -128,6 +151,10 @@ class Algorithm { return missing } + /** + * @param {string} rootID + * @param {Range} range + */ *yieldMsgsIn(rootID, range) { const [minDepth, maxDepth] = range const rootMsg = this.#peer.db.get(rootID) @@ -145,6 +172,10 @@ class Algorithm { } } + /** + * @param {string} rootID + * @param {number} count + */ async pruneNewest(rootID, count) { const tangle = this.#peer.db.getTangle(rootID) const sorted = tangle.topoSort() @@ -204,11 +235,15 @@ class Algorithm { // TODO: Simulate adding this whole tangle, and check if it's valid // Add new messages - // TODO: optimize perf, avoiding await / try / catch + const doneAdding = multicb({ pluck: 1 }) for (const msg of validNewMsgs) { - try { - await p(this.#peer.db.add)(msg, rootID) - } catch {} + this.#peer.db.add(msg, rootID, doneAdding()) + } + try { + await p(doneAdding)() + } catch (err) { + // TODO: + // debug('commit failed %o', err) } if (goal.type === 'newest') { @@ -222,7 +257,7 @@ class Algorithm { /** * @param {string} rootID * @param {Set} msgIDs - * @returns + * @returns {Array} */ getTangleSlice(rootID, msgIDs) { if (msgIDs.size === 0) return [] @@ -235,11 +270,14 @@ class Algorithm { break } } + if (oldestMsgID === null) { + throw new Error('No common msgID found in tangle given inputs') + } const { erasables } = tangle.getDeletablesAndErasables(oldestMsgID) const msgs = [] for (const msgID of sorted) { - let isErasable = erasables.includes(msgID) + let isErasable = erasables.has(msgID) if (!msgIDs.has(msgID) && !isErasable) continue const msg = this.#peer.db.get(msgID) if (!msg) continue diff --git a/lib/index.js b/lib/index.js index 3791ccd..ee19016 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,16 +1,54 @@ -const toPull = require('push-stream-to-pull-stream') +// @ts-ignore +const toPull = require('push-stream-to-pull-stream') // @ts-ignore +const getSeverity = require('ssb-network-errors') const pull = require('pull-stream') const makeDebug = require('debug') -const getSeverity = require('ssb-network-errors') const Algorithm = require('./algorithm') const SyncStream = require('./stream') +/** + * @typedef {ReturnType} PPPPPGoal + * @typedef {ReturnType} PPPPPDB + * @typedef {import('node:events').EventEmitter} Emitter + * @typedef {(cb: (err: Error) => void) => import('pull-stream').Duplex} GetDuplex + * @typedef {{ pubkey: string }} SHSE + */ + +/** + * @param {Error} err + * @param {string} namespace + * @param {string} methodName + */ function isMuxrpcMissingError(err, namespace, methodName) { const jsErrorMessage = `method:${namespace},${methodName} is not in list of allowed methods` const goErrorMessage = `muxrpc: no such command: ${namespace}.${methodName}` return err.message === jsErrorMessage || err.message === goErrorMessage } +/** + * @param {{ db: PPPPPDB | null }} peer + * @returns {asserts peer is { db: PPPPPDB }} + */ +function assertDBExists(peer) { + if (!peer.db) throw new Error('tangleSync requires ppppp-db plugin') +} + +/** + * @param {{ goals: PPPPPGoal | null }} peer + * @returns {asserts peer is { goals: PPPPPGoal }} + */ +function assertGoalsExists(peer) { + if (!peer.goals) throw new Error('tangleSync requires ppppp-goals plugin') +} + +/** + * @param {{ shse: SHSE | null }} peer + * @returns {asserts peer is { shse: SHSE }} + */ +function assertSHSEExists(peer) { + if (!peer.shse) throw new Error('tangleSync requires secret-handshake-ext') +} + module.exports = { name: 'tangleSync', manifest: { @@ -22,15 +60,31 @@ module.exports = { allow: ['connect'], }, }, + + /** + * @param {Emitter & { + * db: PPPPPDB | null, + * goals: PPPPPGoal | null, + * shse: SHSE | null + * }} peer + * @param {unknown} config + */ init(peer, config) { - if (!peer.goals) { - throw new Error('tangleSync requires the goals plugin') - } + assertDBExists(peer) + assertGoalsExists(peer) + assertSHSEExists(peer) const debug = makeDebug(`ppppp:tangleSync`) const algo = new Algorithm(peer) - const streams = [] + const streams = /** @type {Array} */ ([]) + + /** + * @param {string} remoteId + * @param {boolean} iamClient + */ function createStream(remoteId, iamClient) { + assertSHSEExists(peer) + assertGoalsExists(peer) // prettier-ignore debug('Opening a stream with remote %s %s', iamClient ? 'server' : 'client', remoteId) const stream = new SyncStream(peer.shse.pubkey, debug, peer.goals, algo) @@ -38,12 +92,17 @@ module.exports = { return stream } - peer.on('rpc:connect', function onSyncRPCConnect(rpc, iamClient) { - if (rpc.shse.pubkey === peer.shse.pubkey) return // local client connecting to local server + /** + * @param {{ shse: SHSE, tangleSync: { connect: GetDuplex } }} rpc + * @param {boolean} iamClient + */ + function onSyncRPCConnect(rpc, iamClient) { + assertSHSEExists(peer) + if (rpc.shse.pubkey === peer.shse.pubkey) return // connecting to myself if (!iamClient) return - const local = toPull.duplex(createStream(rpc.id, true)) + const local = toPull.duplex(createStream(rpc.shse.pubkey, true)) - let abort + let abort = /** @type {CallableFunction | null} */ (null) const remote = rpc.tangleSync.connect((networkError) => { if (networkError && getSeverity(networkError) >= 3) { if (isMuxrpcMissingError(networkError, 'tangleSync', 'connect')) { @@ -57,11 +116,17 @@ module.exports = { } }) abort = pull(local, remote, local) - }) + } + peer.on('rpc:connect', onSyncRPCConnect) + /** + * @this {{id: string}} + */ function connect() { // `this` refers to the remote peer who called this muxrpc API return toPull.duplex(createStream(this.id, false)) + // TODO: fix muxrpc to replace this.id with this.shse.pubkey. + // this.id comes from muxrpc, not secret-stack } function initiate() { diff --git a/lib/range.js b/lib/range.js index 0f8aec8..2a6bc7d 100644 --- a/lib/range.js +++ b/lib/range.js @@ -23,7 +23,7 @@ function estimateMsgCount(range) { else return estimate } -const EMPTY_RANGE = [1, 0] +const EMPTY_RANGE = /** @type {Range} */ ([1, 0]) module.exports = { isEmptyRange, diff --git a/lib/stream.js b/lib/stream.js index 0a2bdb6..08d643b 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -1,28 +1,45 @@ +// @ts-ignore const Pipeable = require('push-stream/pipeable') const { isEmptyRange } = require('./range') /** + * @typedef {ReturnType} PPPPPGoals + * @typedef {import('ppppp-db/msg-v3').Msg} Msg + * @typedef {import('./range').Range} Range + * @typedef {import('./algorithm')} Algorithm * @typedef {{ * type: 'all' | 'newest' | 'oldest', * count: number, * id: string * }} Goal + * @typedef {string} MsgID + * @typedef {{id: string}} WithId + * @typedef {WithId & {phase: 0, payload?: undefined}} Data0 + * @typedef {WithId & {phase: 1, payload: Range}} Data1 + * @typedef {WithId & {phase: 2, payload: { haveRange: Range, wantRange: Range }}} Data2 + * @typedef {WithId & {phase: 3, payload: { wantRange: Range, bloom: JSON }}} Data3 + * @typedef {WithId & {phase: 4 | 5 | 6 | 7, payload: { msgIDs: Array, bloom: JSON }}} Data4567 + * @typedef {WithId & {phase: 8, payload: { msgs: Array, bloom: JSON }}} Data8 + * @typedef {WithId & {phase: 9, payload: Array}} Data9 + * @typedef {Data0 | Data1 | Data2 | Data3 | Data4567 | Data8 | Data9} Data */ class SyncStream extends Pipeable { #myId - #debug + + /** @type {Algorithm} */ #algo + /** @type {CallableFunction} */ + #debug + /** Set of tangleId * @type {Set} */ #requested /** tangleId => goal - * @type {{ - * getByID(id: string): Goal | null, - * list(): IterableIterator, - * }} */ + * @type {PPPPPGoals} + */ #goals /** @@ -61,10 +78,18 @@ class SyncStream extends Pipeable { */ #sendableMsgs + /** + * + * @param {string} localId + * @param {CallableFunction} debug + * @param {PPPPPGoals} goals + * @param {Algorithm} algo + */ constructor(localId, debug, goals, algo) { super() this.paused = false // TODO: should we start as paused=true? this.ended = false + /** @type {any} */ this.source = this.sink = null this.#myId = localId.slice(0, 6) this.#debug = debug @@ -90,22 +115,33 @@ class SyncStream extends Pipeable { return this.sink && !this.sink.paused && !this.ended } - #updateSendableMsgs(id, msgs) { + /** + * @param {string} id + * @param {Array} msgIDs + */ + #updateSendableMsgs(id, msgIDs) { const set = this.#sendableMsgs.get(id) ?? new Set() - for (const msg of msgs) { - set.add(msg) + for (const msgID of msgIDs) { + set.add(msgID) } this.#sendableMsgs.set(id, set) } - #updateReceivableMsgs(id, msgs) { + /** + * @param {string} id + * @param {Array} msgIDs + */ + #updateReceivableMsgs(id, msgIDs) { const set = this.#receivableMsgs.get(id) ?? new Set() - for (const msg of msgs) { - set.add(msg) + for (const msgID of msgIDs) { + set.add(msgID) } this.#receivableMsgs.set(id, set) } + /** + * @param {string} id + */ #sendLocalHave(id) { const localHaveRange = this.#algo.haveRange(id) this.#localHave.set(id, localHaveRange) @@ -114,11 +150,15 @@ class SyncStream extends Pipeable { this.sink.write({ id, phase: 1, payload: localHaveRange }) } + /** + * @param {string} id + * @param {Range} remoteHaveRange + */ #sendLocalHaveAndWant(id, remoteHaveRange) { // prettier-ignore this.#debug('%s Stream IN1: got remote have-range %o for %s', this.#myId, remoteHaveRange, id) this.#remoteHave.set(id, remoteHaveRange) - const goal = this.#goals.getByID(id) + const goal = this.#goals.get(id) const haveRange = this.#algo.haveRange(id) const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal) this.#localHave.set(id, haveRange) @@ -128,13 +168,19 @@ class SyncStream extends Pipeable { this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } }) } + /** + * @param {string} id + * @param {Range} remoteHaveRange + * @param {Range} remoteWantRange + */ #sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) { // prettier-ignore this.#debug('%s Stream IN2: got remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id) this.#remoteHave.set(id, remoteHaveRange) this.#remoteWant.set(id, remoteWantRange) - const goal = this.#goals.getByID(id) + const goal = this.#goals.get(id) const haveRange = this.#localHave.get(id) + if (!haveRange) throw new Error('local have-range not set') const wantRange = this.#algo.wantRange(haveRange, remoteHaveRange, goal) this.#localWant.set(id, wantRange) const localBloom0 = this.#algo.bloomFor(id, 0, wantRange) @@ -147,6 +193,11 @@ class SyncStream extends Pipeable { this.#debug('%s Stream OUT3: send local want-range %o and bloom round 0 for %s', this.#myId, wantRange, id) } + /** + * @param {string} id + * @param {Range} remoteWantRange + * @param {JSON} remoteBloom + */ #sendInitBloomRes(id, remoteWantRange, remoteBloom) { // prettier-ignore this.#debug('%s Stream IN3: got remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id) @@ -159,6 +210,7 @@ class SyncStream extends Pipeable { ) this.#updateSendableMsgs(id, msgIDsForThem) const localWantRange = this.#localWant.get(id) + if (!localWantRange) throw new Error('local want-range not set') const localBloom = this.#algo.bloomFor(id, 0, localWantRange) this.sink.write({ id, @@ -169,10 +221,18 @@ class SyncStream extends Pipeable { this.#debug('%s Stream OUT4: send bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem) } + /** + * @param {string} id + * @param {number} phase + * @param {number} round + * @param {JSON} remoteBloom + * @param {Array} msgIDsForMe + */ #sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) { // prettier-ignore this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round-1, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) + if (!remoteWantRange) throw new Error('remote want-range not set') this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.msgsMissing( id, @@ -183,6 +243,7 @@ class SyncStream extends Pipeable { this.#updateSendableMsgs(id, msgIDsForThem) const extras = this.#receivableMsgs.get(id) const localWantRange = this.#localWant.get(id) + if (!localWantRange) throw new Error('local want-range not set') const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) this.sink.write({ id, @@ -193,10 +254,18 @@ class SyncStream extends Pipeable { this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) } + /** + * @param {string} id + * @param {number} phase + * @param {number} round + * @param {JSON} remoteBloom + * @param {Array} msgIDsForMe + */ #sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) { // prettier-ignore this.#debug('%s Stream IN%s: got bloom round %s plus msgIDs in %s: %o', this.#myId, phase-1, round, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) + if (!remoteWantRange) throw new Error('remote want-range not set') this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.msgsMissing( id, @@ -207,6 +276,7 @@ class SyncStream extends Pipeable { this.#updateSendableMsgs(id, msgIDsForThem) const extras = this.#receivableMsgs.get(id) const localWantRange = this.#localWant.get(id) + if (!localWantRange) throw new Error('local want-range not set') const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) this.sink.write({ id, @@ -217,10 +287,17 @@ class SyncStream extends Pipeable { this.#debug('%s Stream OUT%s: send bloom round %s plus msgIDs in %s: %o', this.#myId, phase, round, id, msgIDsForThem) } + /** + * @param {string} id + * @param {number} round + * @param {JSON} remoteBloom + * @param {Array} msgIDsForMe + */ #sendMissingMsgsReq(id, round, remoteBloom, msgIDsForMe) { // prettier-ignore this.#debug('%s Stream IN7: got bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe) const remoteWantRange = this.#remoteWant.get(id) + if (!remoteWantRange) throw new Error('remote want-range not set') this.#updateReceivableMsgs(id, msgIDsForMe) const msgIDsForThem = this.#algo.msgsMissing( id, @@ -233,6 +310,7 @@ class SyncStream extends Pipeable { const msgs = this.#algo.getTangleSlice(id, msgIDs) const extras = this.#receivableMsgs.get(id) const localWantRange = this.#localWant.get(id) + if (!localWantRange) throw new Error('local want-range not set') const localBloom = this.#algo.bloomFor(id, round, localWantRange, extras) this.sink.write({ id, @@ -243,10 +321,17 @@ class SyncStream extends Pipeable { this.#debug('%s Stream OUT8: send bloom round %s plus %s msgs in %s', this.#myId, round, msgs.length, id) } + /** + * @param {string} id + * @param {number} round + * @param {JSON} remoteBloom + * @param {Array} msgsForMe + */ #sendMissingMsgsRes(id, round, remoteBloom, msgsForMe) { // prettier-ignore this.#debug('%s Stream IN8: got bloom round %s plus %s msgs in %s', this.#myId, round, msgsForMe.length, id) const remoteWantRange = this.#remoteWant.get(id) + if (!remoteWantRange) throw new Error('remote want-range not set') const msgIDsForThem = this.#algo.msgsMissing( id, round, @@ -260,8 +345,10 @@ class SyncStream extends Pipeable { this.#debug('%s Stream OUT9: send %s msgs in %s', this.#myId, msgs.length, id) this.sink.write({ id, phase: 9, payload: msgs }) - const goal = this.#goals.getByID(id) - const localWant = this.#localWant.get(id) + const goal = this.#goals.get(id) + if (!goal) throw new Error(`No goal found for "${id}"`) + const localWantRange = this.#localWant.get(id) + if (!localWantRange) throw new Error('local want-range not set') this.#requested.delete(id) this.#localHave.delete(id) this.#localWant.delete(id) @@ -271,19 +358,26 @@ class SyncStream extends Pipeable { this.#sendableMsgs.delete(id) if (msgsForMe.length === 0) return try { - this.#algo.commit(id, msgsForMe, goal, localWant) + this.#algo.commit(id, msgsForMe, goal, localWantRange) } catch (err) { // prettier-ignore this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err) } } + /** + * @param {string} id + * @param {Array} msgsForMe + * @returns + */ #consumeMissingMsgs(id, msgsForMe) { // prettier-ignore this.#debug('%s Stream IN9: got %s msgs in %s', this.#myId, msgsForMe.length, id) - const goal = this.#goals.getByID(id) - const localWant = this.#localWant.get(id) + const goal = this.#goals.get(id) + if (!goal) throw new Error(`No goal found for "${id}"`) + const localWantRange = this.#localWant.get(id) + if (!localWantRange) throw new Error('local want-range not set') this.#requested.delete(id) this.#localHave.delete(id) this.#localWant.delete(id) @@ -293,13 +387,17 @@ class SyncStream extends Pipeable { this.#sendableMsgs.delete(id) if (msgsForMe.length === 0) return try { - this.#algo.commit(id, msgsForMe, goal, localWant) + this.#algo.commit(id, msgsForMe, goal, localWantRange) } catch (err) { // prettier-ignore this.#debug('%s Stream could not commit received messages, because: %s', this.#myId, err) } } + /** + * @param {string} id + * @param {Range} remoteWantRange + */ #sendMsgsInRemoteWant(id, remoteWantRange) { const msgs = [] for (const msg of this.#algo.yieldMsgsIn(id, remoteWantRange)) { @@ -310,7 +408,7 @@ class SyncStream extends Pipeable { this.sink.write({ id, phase: 9, payload: msgs }) } - // as a source + // source method resume() { if (!this.sink || this.sink.paused) return @@ -320,7 +418,10 @@ class SyncStream extends Pipeable { } } - // as a sink + /** + * sink method + * @param {Data} data + */ write(data) { const { id, phase, payload } = data @@ -380,14 +481,20 @@ class SyncStream extends Pipeable { this.#debug('Stream IN: unknown %o', data) } - // as a source + /** + * source method + * @param {Error} err + */ abort(err) { this.ended = true if (this.source && !this.source.ended) this.source.abort(err) if (this.sink && !this.sink.ended) this.sink.end(err) } - // as a sink + /** + * sink method + * @param {Error} err + */ end(err) { this.ended = true if (this.source && !this.source.ended) this.source.abort(err) diff --git a/package.json b/package.json index da799fe..2110c65 100644 --- a/package.json +++ b/package.json @@ -26,12 +26,17 @@ "dependencies": { "bloom-filters": "^3.0.0", "debug": "^4.3.4", + "multicb": "1.2.2", + "promisify-4loc": "^1.0.0", "pull-stream": "^3.7.0", "push-stream": "^11.2.0", "push-stream-to-pull-stream": "^1.0.5", "ssb-network-errors": "^1.0.1" }, "devDependencies": { + "@types/debug": "^4.1.9", + "@types/pull-stream": "3.6.3", + "@types/node": "16.x", "bs58": "^5.0.0", "c8": "7", "ppppp-db": "github:staltz/ppppp-db", @@ -43,10 +48,14 @@ "rimraf": "^4.4.0", "secret-stack": "~7.1.0", "secret-handshake-ext": "^0.0.8", - "ssb-box": "^1.0.1" + "ssb-box": "^1.0.1", + "typescript": "^5.1.3" }, "scripts": { - "test": "node --test", + "clean-check": "tsc --build --clean", + "prepublishOnly": "npm run clean-check && tsc --build", + "postpublish": "npm run clean-check", + "test": "npm run clean-check && node --test", "format-code": "prettier --write \"(lib|test)/**/*.js\"", "format-code-staged": "pretty-quick --staged --pattern \"(lib|test)/**/*.js\"", "coverage": "c8 --reporter=lcov npm run test" diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..bd2acd5 --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,16 @@ +{ + "include": ["lib/**/*.js"], + "exclude": ["coverage/", "node_modules/", "test/"], + "compilerOptions": { + "checkJs": true, + "declaration": true, + "emitDeclarationOnly": true, + "exactOptionalPropertyTypes": true, + "forceConsistentCasingInFileNames": true, + "lib": ["es2022", "dom"], + "module": "node16", + "skipLibCheck": true, + "strict": true, + "target": "es2021" + } +} \ No newline at end of file