From a473c8fec132ed4e5042151b90624b03a85c2b18 Mon Sep 17 00:00:00 2001 From: Andre Staltz Date: Sun, 9 Apr 2023 11:24:49 +0300 Subject: [PATCH] init from dagsync --- .github/workflows/node.js.yml | 25 +++ .gitignore | 9 + .prettierrc.yaml | 7 + LICENSE | 121 +++++++++++++ README.md | 1 + index.js | 1 + lib/algorithm.js | 81 +++++++++ lib/feed-sync.js | 111 ++++++++++++ lib/plugin.js | 85 +++++++++ lib/stream.js | 328 ++++++++++++++++++++++++++++++++++ lib/thread-sync.js | 61 +++++++ package.json | 54 ++++++ test/feed-sync.test.js | 153 ++++++++++++++++ test/thread-sync.test.js | 303 +++++++++++++++++++++++++++++++ test/util.js | 14 ++ 15 files changed, 1354 insertions(+) create mode 100644 .github/workflows/node.js.yml create mode 100644 .gitignore create mode 100644 .prettierrc.yaml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 index.js create mode 100644 lib/algorithm.js create mode 100644 lib/feed-sync.js create mode 100644 lib/plugin.js create mode 100644 lib/stream.js create mode 100644 lib/thread-sync.js create mode 100644 package.json create mode 100644 test/feed-sync.test.js create mode 100644 test/thread-sync.test.js create mode 100644 test/util.js diff --git a/.github/workflows/node.js.yml b/.github/workflows/node.js.yml new file mode 100644 index 0000000..0e22925 --- /dev/null +++ b/.github/workflows/node.js.yml @@ -0,0 +1,25 @@ +name: CI + +on: + push: + branches: [master] + pull_request: + branches: [master] + +jobs: + test: + runs-on: ubuntu-latest + timeout-minutes: 10 + + strategy: + matrix: + node-version: [16.x, 18.x] + + steps: + - uses: actions/checkout@v2 + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v1 + with: + node-version: ${{ matrix.node-version }} + - run: npm install + - run: npm test diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4b96477 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.vscode +node_modules +pnpm-lock.yaml +package-lock.json +coverage +*~ + +# For misc scripts and experiments: +/gitignored diff --git a/.prettierrc.yaml b/.prettierrc.yaml new file mode 100644 index 0000000..7dffc51 --- /dev/null +++ b/.prettierrc.yaml @@ -0,0 +1,7 @@ +# SPDX-FileCopyrightText: 2021 Anders Rune Jensen +# SPDX-FileCopyrightText: 2021 Andre 'Staltz' Medeiros +# +# SPDX-License-Identifier: Unlicense + +semi: false +singleQuote: true diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0e259d4 --- /dev/null +++ b/LICENSE @@ -0,0 +1,121 @@ +Creative Commons Legal Code + +CC0 1.0 Universal + + CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE + LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN + ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS + INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES + REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS + PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM + THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED + HEREUNDER. + +Statement of Purpose + +The laws of most jurisdictions throughout the world automatically confer +exclusive Copyright and Related Rights (defined below) upon the creator +and subsequent owner(s) (each and all, an "owner") of an original work of +authorship and/or a database (each, a "Work"). + +Certain owners wish to permanently relinquish those rights to a Work for +the purpose of contributing to a commons of creative, cultural and +scientific works ("Commons") that the public can reliably and without fear +of later claims of infringement build upon, modify, incorporate in other +works, reuse and redistribute as freely as possible in any form whatsoever +and for any purposes, including without limitation commercial purposes. +These owners may contribute to the Commons to promote the ideal of a free +culture and the further production of creative, cultural and scientific +works, or to gain reputation or greater distribution for their Work in +part through the use and efforts of others. + +For these and/or other purposes and motivations, and without any +expectation of additional consideration or compensation, the person +associating CC0 with a Work (the "Affirmer"), to the extent that he or she +is an owner of Copyright and Related Rights in the Work, voluntarily +elects to apply CC0 to the Work and publicly distribute the Work under its +terms, with knowledge of his or her Copyright and Related Rights in the +Work and the meaning and intended legal effect of CC0 on those rights. + +1. Copyright and Related Rights. A Work made available under CC0 may be +protected by copyright and related or neighboring rights ("Copyright and +Related Rights"). Copyright and Related Rights include, but are not +limited to, the following: + + i. the right to reproduce, adapt, distribute, perform, display, + communicate, and translate a Work; + ii. moral rights retained by the original author(s) and/or performer(s); +iii. publicity and privacy rights pertaining to a person's image or + likeness depicted in a Work; + iv. rights protecting against unfair competition in regards to a Work, + subject to the limitations in paragraph 4(a), below; + v. rights protecting the extraction, dissemination, use and reuse of data + in a Work; + vi. database rights (such as those arising under Directive 96/9/EC of the + European Parliament and of the Council of 11 March 1996 on the legal + protection of databases, and under any national implementation + thereof, including any amended or successor version of such + directive); and +vii. other similar, equivalent or corresponding rights throughout the + world based on applicable law or treaty, and any national + implementations thereof. + +2. Waiver. To the greatest extent permitted by, but not in contravention +of, applicable law, Affirmer hereby overtly, fully, permanently, +irrevocably and unconditionally waives, abandons, and surrenders all of +Affirmer's Copyright and Related Rights and associated claims and causes +of action, whether now known or unknown (including existing as well as +future claims and causes of action), in the Work (i) in all territories +worldwide, (ii) for the maximum duration provided by applicable law or +treaty (including future time extensions), (iii) in any current or future +medium and for any number of copies, and (iv) for any purpose whatsoever, +including without limitation commercial, advertising or promotional +purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each +member of the public at large and to the detriment of Affirmer's heirs and +successors, fully intending that such Waiver shall not be subject to +revocation, rescission, cancellation, termination, or any other legal or +equitable action to disrupt the quiet enjoyment of the Work by the public +as contemplated by Affirmer's express Statement of Purpose. + +3. Public License Fallback. Should any part of the Waiver for any reason +be judged legally invalid or ineffective under applicable law, then the +Waiver shall be preserved to the maximum extent permitted taking into +account Affirmer's express Statement of Purpose. In addition, to the +extent the Waiver is so judged Affirmer hereby grants to each affected +person a royalty-free, non transferable, non sublicensable, non exclusive, +irrevocable and unconditional license to exercise Affirmer's Copyright and +Related Rights in the Work (i) in all territories worldwide, (ii) for the +maximum duration provided by applicable law or treaty (including future +time extensions), (iii) in any current or future medium and for any number +of copies, and (iv) for any purpose whatsoever, including without +limitation commercial, advertising or promotional purposes (the +"License"). The License shall be deemed effective as of the date CC0 was +applied by Affirmer to the Work. Should any part of the License for any +reason be judged legally invalid or ineffective under applicable law, such +partial invalidity or ineffectiveness shall not invalidate the remainder +of the License, and in such case Affirmer hereby affirms that he or she +will not (i) exercise any of his or her remaining Copyright and Related +Rights in the Work or (ii) assert any associated claims and causes of +action with respect to the Work, in either case contrary to Affirmer's +express Statement of Purpose. + +4. Limitations and Disclaimers. + + a. No trademark or patent rights held by Affirmer are waived, abandoned, + surrendered, licensed or otherwise affected by this document. + b. Affirmer offers the Work as-is and makes no representations or + warranties of any kind concerning the Work, express, implied, + statutory or otherwise, including without limitation warranties of + title, merchantability, fitness for a particular purpose, non + infringement, or the absence of latent or other defects, accuracy, or + the present or absence of errors, whether or not discoverable, all to + the greatest extent permissible under applicable law. + c. Affirmer disclaims responsibility for clearing rights of other persons + that may apply to the Work or any use thereof, including without + limitation any person's Copyright and Related Rights in the Work. + Further, Affirmer disclaims responsibility for obtaining any necessary + consents, permissions or other rights required for any use of the + Work. + d. Affirmer understands and acknowledges that Creative Commons is not a + party to this document and has no duty or obligation with respect to + this CC0 or use of the Work. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ba9d17d --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +**Work in progress** \ No newline at end of file diff --git a/index.js b/index.js new file mode 100644 index 0000000..fb8b856 --- /dev/null +++ b/index.js @@ -0,0 +1 @@ +module.exports = [require('./lib/feed-sync'), require('./lib/thread-sync')] \ No newline at end of file diff --git a/lib/algorithm.js b/lib/algorithm.js new file mode 100644 index 0000000..f54e352 --- /dev/null +++ b/lib/algorithm.js @@ -0,0 +1,81 @@ +const { BloomFilter } = require('bloom-filters') +const FeedV1 = require('ppppp-db/lib/feed-v1') + +module.exports = function syncAlgorithm(opts = {}) { + const { + haveRange, + wantRange, + estimateMsgCount, + yieldMsgsIn, + commit, + } = opts + if (typeof haveRange !== 'function') { + throw new Error('function haveRange is required') + } + if (typeof wantRange !== 'function') { + throw new Error('function wantRange is required') + } + if (typeof estimateMsgCount !== 'function') { + throw new Error('function estimateMsgCount is required') + } + if (typeof yieldMsgsIn !== 'function') { + throw new Error('function yieldMsgsIn is required') + } + if (typeof commit !== 'function') { + throw new Error('function commit is required') + } + + function isEmptyRange(range) { + const [min, max] = range + return min > max + } + + function countIter(iter) { + let count = 0 + for (const _ of iter) count++ + return count + } + + function betterWantRange(feedId, localHaveRange, remoteHaveRange) { + if (isEmptyRange(remoteHaveRange)) return [1, 0] + else return wantRange(feedId, localHaveRange, remoteHaveRange) + } + + function bloomFor(feedId, round, range, extraIds = []) { + const filterSize = + (isEmptyRange(range) ? 2 : estimateMsgCount(range)) + countIter(extraIds) + const filter = BloomFilter.create(2 * filterSize, 0.00001) + if (!isEmptyRange(range)) { + for (const msg of yieldMsgsIn(feedId, range)) { + filter.add('' + round + FeedV1.getMsgHash(msg)) + } + } + for (const msgId of extraIds) { + filter.add('' + round + msgId) + } + return filter.saveAsJSON() + } + + function msgsMissing(feedId, round, range, remoteBloomJSON) { + if (isEmptyRange(range)) return [] + const remoteFilter = BloomFilter.fromJSON(remoteBloomJSON) + const missing = [] + for (const msg of yieldMsgsIn(feedId, range)) { + const msgHash = FeedV1.getMsgHash(msg) + if (!remoteFilter.has('' + round + msgHash)) { + missing.push(msgHash) + } + } + return missing + } + + return { + haveRange, + wantRange: betterWantRange, + isEmptyRange, + bloomFor, + msgsMissing, + yieldMsgsIn, + commit, + } +} diff --git a/lib/feed-sync.js b/lib/feed-sync.js new file mode 100644 index 0000000..7eb999f --- /dev/null +++ b/lib/feed-sync.js @@ -0,0 +1,111 @@ +const p = require('util').promisify +const FeedV1 = require('ppppp-db/lib/feed-v1') +const syncPlugin = require('./plugin') + +module.exports = syncPlugin('feedSync', (peer, config) => { + const limit = config.feedSync?.limit ?? 1000 + + function* take(n, iter) { + if (n === 0) return + let i = 0 + for (const item of iter) { + yield item + if (++i >= n) break + } + } + + function* filter(iter, fn) { + for (const item of iter) { + if (fn(item)) yield item + } + } + + return { + haveRange(feedId) { + let minDepth = Number.MAX_SAFE_INTEGER + let maxDepth = 0 + for (const msg of peer.db.msgs()) { + if (FeedV1.getFeedId(msg) === feedId) { + minDepth = Math.min(minDepth, msg.metadata.depth) + maxDepth = Math.max(maxDepth, msg.metadata.depth) + } + } + return [minDepth, maxDepth] + }, + + wantRange(feedId, localHaveRange, remoteHaveRange) { + const [minLocalHave, maxLocalHave] = localHaveRange + const [minRemoteHave, maxRemoteHave] = remoteHaveRange + if (maxRemoteHave <= maxLocalHave) return [1, 0] + const maxWant = maxRemoteHave + const size = Math.max(maxWant - maxLocalHave, limit) + const minWant = Math.max(maxWant - size, maxLocalHave + 1, minRemoteHave) + return [minWant, maxWant] + }, + + estimateMsgCount(range) { + const [minDepth, maxDepth] = range + const estimate = 2 * (maxDepth - minDepth + 1) + if (estimate > 1000) return 1000 + else if (estimate < 5) return 5 + else return estimate + }, + + *yieldMsgsIn(feedId, range) { + const [minDepth, maxDepth] = range + for (const msg of peer.db.msgs()) { + if ( + FeedV1.getFeedId(msg) === feedId && + msg.metadata.depth >= minDepth && + msg.metadata.depth <= maxDepth + ) { + yield msg + } + } + }, + + async commit(newMsgs, feedId, cb) { + newMsgs.sort((a, b) => a.metadata.depth - b.metadata.depth) // mutation + const isRelevantRec = (rec) => FeedV1.getFeedId(rec.msg) === feedId + + // Find max sequence in the database + let oldLastDepth = 0 + let oldCount = 0 + for (const rec of peer.db.records()) { + if (!isRelevantRec(rec)) continue + oldCount += 1 + oldLastDepth = Math.max(oldLastDepth, rec.msg.metadata.depth) + } + + const isContinuation = newMsgs[0].metadata.depth === oldLastDepth + 1 + // Refuse creating holes in the feed + if (!isContinuation && newMsgs.length < limit) { + console.error( + `feedSync failed to persist msgs for ${feedId} because ` + + 'they are not a continuation, and not enough messages' + ) + return cb() + } + + // Delete old messages in the database + if (isContinuation) { + // Delete just enough msgs to make room for the new ones + const N = Math.max(0, oldCount + newMsgs.length - limit) + for (const rec of take(N, filter(peer.db.records(), isRelevantRec))) { + await p(peer.db.del)(rec.hash) + } + } else { + // Delete all the old ones + for (const rec of filter(peer.db.records(), isRelevantRec)) { + await p(peer.db.del)(rec.hash) + } + } + + // Add new messages + for (const msg of newMsgs) { + await p(peer.db.add)(msg) + } + cb() + }, + } +}) diff --git a/lib/plugin.js b/lib/plugin.js new file mode 100644 index 0000000..8efbfa7 --- /dev/null +++ b/lib/plugin.js @@ -0,0 +1,85 @@ +const toPull = require('push-stream-to-pull-stream') +const pull = require('pull-stream') +const makeDebug = require('debug') +const getSeverity = require('ssb-network-errors') +const syncAlgorithm = require('./algorithm') +const SyncStream = require('./stream') + +function isMuxrpcMissingError(err, namespace, methodName) { + const jsErrorMessage = `method:${namespace},${methodName} is not in list of allowed methods` + const goErrorMessage = `muxrpc: no such command: ${namespace}.${methodName}` + return err.message === jsErrorMessage || err.message === goErrorMessage +} + +module.exports = function makeSyncPlugin(name, getOpts) { + return { + name: name, + manifest: { + connect: 'duplex', + request: 'sync', + }, + permissions: { + anonymous: { + allow: ['connect'], + }, + }, + init(peer, config) { + const debug = makeDebug(`ppppp:${name}`) + const opts = getOpts(peer, config) + const algo = syncAlgorithm(opts) + + algo.getMsgs = function getMsgs(msgIds) { + const msgs = [] + for (const msgId of msgIds) { + const msg = peer.db.get(msgId) + if (msg) msgs.push(msg) + } + return msgs + } + + const streams = [] + function createStream(remoteId, isClient) { + // prettier-ignore + debug('Opening a stream with remote %s %s', isClient ? 'server' : 'client', remoteId) + const stream = new SyncStream(peer.id, debug, algo) + streams.push(stream) + return stream + } + + peer.on('rpc:connect', function onSyncRPCConnect(rpc, isClient) { + if (rpc.id === peer.id) return // local client connecting to local server + if (!isClient) return + const local = toPull.duplex(createStream(rpc.id, true)) + + const remote = rpc[name].connect((networkError) => { + if (networkError && getSeverity(networkError) >= 3) { + if (isMuxrpcMissingError(networkError, name, 'connect')) { + console.warn(`peer ${rpc.id} does not support sync connect`) + // } else if (isReconnectedError(networkError)) { // TODO: bring back + // Do nothing, this is a harmless error + } else { + console.error(`rpc.${name}.connect exception:`, networkError) + } + } + }) + + pull(local, remote, local) + }) + + function connect() { + // `this` refers to the remote peer who called this muxrpc API + return toPull.duplex(createStream(this.id, false)) + } + + function request(id) { + for (const stream of streams) { + stream.request(id) + } + } + return { + connect, + request, + } + }, + } +} diff --git a/lib/stream.js b/lib/stream.js new file mode 100644 index 0000000..69a6ebc --- /dev/null +++ b/lib/stream.js @@ -0,0 +1,328 @@ +const Pipeable = require('push-stream/pipeable') + +class SyncStream extends Pipeable { + #myId + #debug + #algo + #requested + #remoteHave + #remoteWant + #receivableMsgs + #sendableMsgs + + constructor(localId, debug, algo) { + super() + this.paused = false // TODO: should we start as paused=true? + this.ended = false + this.source = this.sink = null + this.#myId = localId.slice(0, 6) + this.#debug = debug + this.#algo = algo + this.#requested = new Set() + this.#remoteHave = new Map() // id => have-range by remote peer + this.#remoteWant = new Map() // id => want-range by remote peer + this.#receivableMsgs = new Map() // id => Set + this.#sendableMsgs = new Map() // id => Set + } + + // public API + request(id) { + this.#requested.add(id) + this.resume() + } + + #canSend() { + return this.sink && !this.sink.paused && !this.ended + } + + #updateSendableMsgs(id, msgs) { + const set = this.#sendableMsgs.get(id) ?? new Set() + for (const msg of msgs) { + set.add(msg) + } + this.#sendableMsgs.set(id, set) + } + + #updateReceivableMsgs(id, msgs) { + const set = this.#receivableMsgs.get(id) ?? new Set() + for (const msg of msgs) { + set.add(msg) + } + this.#receivableMsgs.set(id, set) + } + + #sendLocalHave(id) { + const localHaveRange = this.#algo.haveRange(id) + // prettier-ignore + this.#debug('%s Stream OUT: send local have-range %o for %s', this.#myId, localHaveRange, id) + this.sink.write({ id, phase: 1, payload: localHaveRange }) + } + + #sendLocalHaveAndWant(id, remoteHaveRange) { + // prettier-ignore + this.#debug('%s Stream IN: received remote have-range %o for %s', this.#myId, remoteHaveRange, id) + this.#remoteHave.set(id, remoteHaveRange) + const haveRange = this.#algo.haveRange(id) + const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange) + // prettier-ignore + this.#debug('%s Stream OUT: send local have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) + this.sink.write({ id, phase: 2, payload: { haveRange, wantRange } }) + } + + #sendLocalWantAndInitBloom(id, remoteHaveRange, remoteWantRange) { + // prettier-ignore + this.#debug('%s Stream IN: received remote have-range %o and want-range %o for %s', this.#myId, remoteHaveRange, remoteWantRange, id) + this.#remoteHave.set(id, remoteHaveRange) + this.#remoteWant.set(id, remoteWantRange) + const haveRange = this.#algo.haveRange(id) + const wantRange = this.#algo.wantRange(id, haveRange, remoteHaveRange) + const localBloom0 = this.#algo.bloomFor(id, 0, remoteWantRange) + this.sink.write({ + id, + phase: 3, + payload: { bloom: localBloom0, wantRange }, + }) + // prettier-ignore + this.#debug('%s Stream OUT: send local want-range %o and bloom round 0 for %s', this.#myId, wantRange, id) + } + + #sendInitBloomRes(id, remoteWantRange, remoteBloom) { + // prettier-ignore + this.#debug('%s Stream IN: received remote want-range %o and bloom round 0 for %s', this.#myId, remoteWantRange, id) + this.#remoteWant.set(id, remoteWantRange) + const msgIDsForThem = this.#algo.msgsMissing( + id, + 0, + remoteWantRange, + remoteBloom + ) + this.#updateSendableMsgs(id, msgIDsForThem) + const localBloom = this.#algo.bloomFor(id, 0, remoteWantRange) + this.sink.write({ + id, + phase: 4, + payload: { bloom: localBloom, msgIDs: msgIDsForThem }, + }) + // prettier-ignore + this.#debug('%s Stream OUT: send bloom round 0 plus msgIDs in %s: %o', this.#myId, id, msgIDsForThem) + } + + #sendBloomReq(id, phase, round, remoteBloom, msgIDsForMe) { + // prettier-ignore + this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round-1, id, msgIDsForMe) + const remoteWantRange = this.#remoteWant.get(id) + this.#updateReceivableMsgs(id, msgIDsForMe) + const msgIDsForThem = this.#algo.msgsMissing( + id, + round - 1, + remoteWantRange, + remoteBloom + ) + this.#updateSendableMsgs(id, msgIDsForThem) + const localBloom = this.#algo.bloomFor( + id, + round, + remoteWantRange, + this.#receivableMsgs.get(id) + ) + this.sink.write({ + id, + phase, + payload: { bloom: localBloom, msgIDs: msgIDsForThem }, + }) + // prettier-ignore + this.#debug('%s Stream OUT: send bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForThem) + } + + #sendBloomRes(id, phase, round, remoteBloom, msgIDsForMe) { + // prettier-ignore + this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe) + const remoteWantRange = this.#remoteWant.get(id) + this.#updateReceivableMsgs(id, msgIDsForMe) + const msgIDsForThem = this.#algo.msgsMissing( + id, + round, + remoteWantRange, + remoteBloom + ) + this.#updateSendableMsgs(id, msgIDsForThem) + const localBloom = this.#algo.bloomFor( + id, + round, + remoteWantRange, + this.#receivableMsgs.get(id) + ) + this.sink.write({ + id, + phase, + payload: { bloom: localBloom, msgIDs: msgIDsForThem }, + }) + // prettier-ignore + this.#debug('%s Stream OUT: send bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForThem) + } + + #sendLastBloomRes(id, phase, round, remoteBloom, msgIDsForMe) { + // prettier-ignore + this.#debug('%s Stream IN: received bloom round %s plus msgIDs in %s: %o', this.#myId, round, id, msgIDsForMe) + const remoteWantRange = this.#remoteWant.get(id) + this.#updateReceivableMsgs(id, msgIDsForMe) + const msgIDsForThem = this.#algo.msgsMissing( + id, + round, + remoteWantRange, + remoteBloom + ) + this.#updateSendableMsgs(id, msgIDsForThem) + this.sink.write({ id, phase, payload: msgIDsForThem }) + // prettier-ignore + this.#debug('%s Stream OUT: send msgIDs in %s: %o', this.#myId, id, msgIDsForThem) + } + + #sendMissingMsgsReq(id, msgIDsForMe) { + // prettier-ignore + this.#debug('%s Stream IN: received msgIDs in %s: %o', this.#myId, id, msgIDsForMe) + this.#updateReceivableMsgs(id, msgIDsForMe) + const msgIDs = this.#sendableMsgs.has(id) + ? [...this.#sendableMsgs.get(id)] + : [] + const msgs = this.#algo.getMsgs(msgIDs) + // prettier-ignore + this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) + this.sink.write({ id, phase: 9, payload: msgs }) + } + + #sendMissingMsgsRes(id, msgsForMe) { + // prettier-ignore + this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id) + const msgIDs = this.#sendableMsgs.has(id) + ? [...this.#sendableMsgs.get(id)] + : [] + + const msgs = this.#algo.getMsgs(msgIDs) + // prettier-ignore + this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) + this.sink.write({ id, phase: 10, payload: msgs }) + + this.#requested.delete(id) + this.#remoteHave.delete(id) + this.#remoteWant.delete(id) + this.#receivableMsgs.delete(id) + this.#sendableMsgs.delete(id) + if (msgsForMe.length === 0) return + this.#algo.commit(msgsForMe, id, (err) => { + // prettier-ignore + if (err) throw new Error('sendMissingMsgsRes failed because sink failed', {cause: err}) + }) + } + + #consumeMissingMsgs(id, msgsForMe) { + // prettier-ignore + this.#debug('%s Stream IN: received %s msgs in %s', this.#myId, msgsForMe.length, id) + + this.#requested.delete(id) + this.#remoteHave.delete(id) + this.#remoteWant.delete(id) + this.#receivableMsgs.delete(id) + this.#sendableMsgs.delete(id) + if (msgsForMe.length === 0) return + this.#algo.commit(msgsForMe, id, (err) => { + // prettier-ignore + if (err) throw new Error('sendMissingMsgsRes failed because sink failed', {cause: err}) + }) + } + + #sendMsgsInRemoteWant(id, remoteWantRange) { + const msgs = [] + for (const msg of this.#algo.yieldMsgsIn(id, remoteWantRange)) { + msgs.push(msg) + } + // prettier-ignore + this.#debug('%s Stream OUT: send %s msgs in %s', this.#myId, msgs.length, id) + this.sink.write({ id, phase: 10, payload: msgs }) + } + + // as a source + resume() { + if (!this.sink || this.sink.paused) return + + for (const id of this.#requested) { + if (!this.#canSend()) return + this.#sendLocalHave(id) + } + } + + // as a sink + write(data) { + const { id, phase, payload } = data + + switch (phase) { + case 1: { + return this.#sendLocalHaveAndWant(id, payload) + } + case 2: { + const { haveRange, wantRange } = payload + if (this.#algo.isEmptyRange(haveRange)) { + // prettier-ignore + this.#debug('%s Stream IN: received remote have-range %o and want-range %o for %s', this.#myId, haveRange, wantRange, id) + return this.#sendMsgsInRemoteWant(id, wantRange) + } else { + return this.#sendLocalWantAndInitBloom(id, haveRange, wantRange) + } + } + case 3: { + const { wantRange, bloom } = payload + const haveRange = this.#remoteHave.get(id) + if (haveRange && this.#algo.isEmptyRange(haveRange)) { + // prettier-ignore + this.#debug('%s Stream IN: received remote want-range want-range %o and remember empty have-range %o for %s', this.#myId, wantRange, haveRange, id) + return this.#sendMsgsInRemoteWant(id, wantRange) + } else { + return this.#sendInitBloomRes(id, wantRange, bloom) + } + } + case 4: { + const { bloom, msgIDs } = payload + return this.#sendBloomReq(id, phase + 1, 1, bloom, msgIDs) + } + case 5: { + const { bloom, msgIDs } = payload + return this.#sendBloomRes(id, phase + 1, 1, bloom, msgIDs) + } + case 6: { + const { bloom, msgIDs } = payload + return this.#sendBloomReq(id, phase + 1, 2, bloom, msgIDs) + } + case 7: { + const { bloom, msgIDs } = payload + return this.#sendLastBloomRes(id, phase + 1, 2, bloom, msgIDs) + } + case 8: { + return this.#sendMissingMsgsReq(id, payload) + } + case 9: { + return this.#sendMissingMsgsRes(id, payload) + } + case 10: { + return this.#consumeMissingMsgs(id, payload) + } + } + + this.#debug('Stream IN: unknown %o', data) + } + + // as a source + abort(err) { + this.ended = true + if (this.source && !this.source.ended) this.source.abort(err) + if (this.sink && !this.sink.ended) this.sink.end(err) + } + + // as a sink + end(err) { + this.ended = true + if (this.source && !this.source.ended) this.source.abort(err) + if (this.sink && !this.sink.ended) this.sink.end(err) + } +} + +module.exports = SyncStream diff --git a/lib/thread-sync.js b/lib/thread-sync.js new file mode 100644 index 0000000..58e2c00 --- /dev/null +++ b/lib/thread-sync.js @@ -0,0 +1,61 @@ +const p = require('util').promisify +const dagSyncPlugin = require('./plugin') + +module.exports = dagSyncPlugin('threadSync', (peer, config) => ({ + haveRange(rootMsgHash) { + const rootMsg = peer.db.get(rootMsgHash) + if (!rootMsg) return [1, 0] + let maxDepth = 0 + for (const rec of peer.db.records()) { + const tangles = rec.msg.metadata.tangles + if (rec.hash !== rootMsgHash && tangles?.[rootMsgHash]) { + const depth = tangles[rootMsgHash].depth + maxDepth = Math.max(maxDepth, depth) + } + } + return [0, maxDepth] + }, + + wantRange(rootMsgId, localHaveRange, remoteHaveRange) { + const [minLocalHave, maxLocalHave] = localHaveRange + const [minRemoteHave, maxRemoteHave] = remoteHaveRange + if (minRemoteHave !== 0) throw new Error('minRemoteHave must be 0') + return [0, Math.max(maxLocalHave, maxRemoteHave)] + }, + + estimateMsgCount(range) { + const [minDepth, maxDepth] = range + const estimate = 2 * (maxDepth - minDepth + 1) + if (estimate > 1000) return 1000 + else if (estimate < 5) return 5 + else return estimate + }, + + *yieldMsgsIn(rootMsgId, range) { + const [minDepth, maxDepth] = range + const rootMsg = peer.db.get(rootMsgId) + if (!rootMsg) return + for (const msg of peer.db.msgs()) { + const tangles = msg.metadata.tangles + if ( + tangles?.[rootMsgId] && + tangles[rootMsgId].depth >= minDepth && + tangles[rootMsgId].depth <= maxDepth + ) { + yield msg + } + } + }, + + async commit(newMsgs, rootMsgId, cb) { + newMsgs.sort((a, b) => { + const aDepth = a.metadata.tangles[rootMsgId].depth + const bDepth = b.metadata.tangles[rootMsgId].depth + return aDepth - bDepth + }) + for (const msg of newMsgs) { + await p(peer.db.add)(msg) + } + cb() + }, +})) diff --git a/package.json b/package.json new file mode 100644 index 0000000..938372d --- /dev/null +++ b/package.json @@ -0,0 +1,54 @@ +{ + "name": "dagsync", + "version": "1.0.0", + "description": "SSB replication using Kleppmann's hash graph sync", + "author": "Andre Staltz ", + "license": "CC0-1.0", + "homepage": "https://github.com/staltz/dagsync", + "repository": { + "type": "git", + "url": "git@github.com:staltz/dagsync.git" + }, + "main": "index.js", + "files": [ + "*.js", + "lib/*.js", + "compat/*.js" + ], + "engines": { + "node": ">=16" + }, + "dependencies": { + "bloom-filters": "^3.0.0", + "debug": "^4.3.4", + "multicb": "^1.2.2", + "pull-stream": "^3.7.0", + "push-stream": "^11.2.0", + "push-stream-to-pull-stream": "^1.0.5", + "ssb-network-errors": "^1.0.1" + }, + "devDependencies": { + "bs58": "^5.0.0", + "ppppp-db": "../db", + "rimraf": "^4.4.0", + "secret-stack": "^6.4.1", + "ssb-box": "^1.0.1", + "ssb-caps": "^1.1.0", + "ssb-classic": "^1.1.0", + "ssb-keys": "^8.5.0", + "ssb-uri2": "^2.4.1", + "tap-arc": "^0.3.5", + "tape": "^5.6.3" + }, + "scripts": { + "test": "tape test/*.js | tap-arc --bail", + "format-code": "prettier --write \"*.js\" \"(test|compat|indexes|operators)/*.js\"", + "format-code-staged": "pretty-quick --staged --pattern \"*.js\" --pattern \"(test|compat|indexes|operators)/*.js\"", + "coverage": "c8 --reporter=lcov npm run test" + }, + "husky": { + "hooks": { + "pre-commit": "npm run format-code-staged" + } + } +} diff --git a/test/feed-sync.test.js b/test/feed-sync.test.js new file mode 100644 index 0000000..9d765ff --- /dev/null +++ b/test/feed-sync.test.js @@ -0,0 +1,153 @@ +const test = require('tape') +const path = require('path') +const os = require('os') +const rimraf = require('rimraf') +const SecretStack = require('secret-stack') +const caps = require('ssb-caps') +const p = require('util').promisify +const { generateKeypair } = require('./util') + +const createPeer = SecretStack({ appKey: caps.shs }) + .use(require('ppppp-db')) + .use(require('ssb-box')) + .use(require('../')) + +test('sync a sliced classic feed', async (t) => { + const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') + const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') + + rimraf.sync(ALICE_DIR) + rimraf.sync(BOB_DIR) + + const alice = createPeer({ + keys: generateKeypair('alice'), + path: ALICE_DIR, + }) + + const bob = createPeer({ + keys: generateKeypair('bob'), + path: BOB_DIR, + }) + + await alice.db.loaded() + await bob.db.loaded() + + const carolKeys = generateKeypair('carol') + const carolMsgs = [] + const carolID = carolKeys.id + const carolID_b58 = carolID.split('ppppp:feed/v1/')[1] + const carolPostFeedId = carolID + '/post' + for (let i = 1; i <= 10; i++) { + const rec = await p(alice.db.create)({ + type: 'post', + content: { text: 'm' + i }, + keys: carolKeys, + }) + carolMsgs.push(rec.msg) + } + t.pass('alice has msgs 1..10 from carol') + + for (let i = 0; i < 7; i++) { + await p(bob.db.add)(carolMsgs[i]) + } + + { + const arr = [...bob.db.msgs()] + .filter((msg) => msg.metadata.who === carolID_b58) + .map((msg) => msg.content.text) + t.deepEquals( + arr, + ['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7'], + 'bob has msgs 1..7 from carol' + ) + } + + const remoteAlice = await p(bob.connect)(alice.getAddress()) + t.pass('bob connected to alice') + + bob.feedSync.request(carolPostFeedId) + await p(setTimeout)(1000) + t.pass('feedSync!') + + { + const arr = [...bob.db.msgs()] + .filter((msg) => msg.metadata.who === carolID_b58) + .map((msg) => msg.content.text) + t.deepEquals( + arr, + ['m1', 'm2', 'm3', 'm4', 'm5', 'm6', 'm7', 'm8', 'm9', 'm10'], + 'bob has msgs 1..10 from carol' + ) + } + + await p(remoteAlice.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +}) + +// FIXME: +test.skip('delete old msgs and sync latest msgs', async (t) => { + const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') + const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') + + rimraf.sync(ALICE_DIR) + rimraf.sync(BOB_DIR) + + const alice = createPeer({ + keys: generateKeypair('alice'), + path: ALICE_DIR, + }) + + const bob = createPeer({ + keys: generateKeypair('bob'), + path: BOB_DIR, + feedSync: { + limit: 3, + }, + }) + + await alice.db.loaded() + await bob.db.loaded() + + const carolKeys = generateKeypair('carol') + const carolMsgs = [] + const carolID = carolKeys.id + for (let i = 1; i <= 10; i++) { + const msg = await p(alice.db.create)({ + feedFormat: 'classic', + content: { type: 'post', text: 'm' + i }, + keys: carolKeys, + }) + carolMsgs.push(msg) + } + t.pass('alice has msgs 1..10 from carol') + + await p(bob.db.add)(carolMsgs[5].value) + await p(bob.db.add)(carolMsgs[6].value) + await p(bob.db.add)(carolMsgs[7].value) + + { + const arr = bob.db + .filterAsArray((msg) => msg?.value.author === carolID) + .map((msg) => msg.value.content.text) + t.deepEquals(arr, ['m6', 'm7', 'm8'], 'bob has msgs 6..8 from carol') + } + + const remoteAlice = await p(bob.connect)(alice.getAddress()) + t.pass('bob connected to alice') + + bob.feedSync.request(carolID) + await p(setTimeout)(1000) + t.pass('feedSync!') + + { + const arr = bob.db + .filterAsArray((msg) => msg?.value.author === carolID) + .map((msg) => msg.value.content.text) + t.deepEquals(arr, ['m8', 'm9', 'm10'], 'bob has msgs 8..10 from carol') + } + + await p(remoteAlice.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +}) diff --git a/test/thread-sync.test.js b/test/thread-sync.test.js new file mode 100644 index 0000000..00df205 --- /dev/null +++ b/test/thread-sync.test.js @@ -0,0 +1,303 @@ +const test = require('tape') +const ssbKeys = require('ssb-keys') +const path = require('path') +const os = require('os') +const rimraf = require('rimraf') +const SecretStack = require('secret-stack') +const caps = require('ssb-caps') +const FeedV1 = require('ppppp-db/lib/feed-v1') +const p = require('util').promisify +const { generateKeypair } = require('./util') + +const createSSB = SecretStack({ appKey: caps.shs }) + .use(require('ppppp-db')) + .use(require('ssb-box')) + .use(require('../')) + +/* +BEFORE dagsync: +```mermaid +graph TB; + subgraph Bob + direction TB + rootAb[root by A] + replyB1b[reply by B] + replyB2b[reply by B] + replyD1b[reply by D] + rootAb-->replyB1b-->replyB2b & replyD1b + end + subgraph Alice + direction TB + rootAa[root by A] + replyB1a[reply by B] + replyB2a[reply by B] + replyC1a[reply by C] + rootAa-->replyB1a-->replyB2a + rootAa-->replyC1a + end +``` + +AFTER dagsync: +```mermaid +graph TB; + subgraph Bob + rootA[root by A] + replyB1[reply by B] + replyB2[reply by B] + replyC1[reply by C] + replyD1[reply by D] + rootA-->replyB1-->replyB2 & replyD1 + rootA-->replyC1 + end +``` +*/ +test('sync a thread where both peers have portions', async (t) => { + const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') + const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') + + rimraf.sync(ALICE_DIR) + rimraf.sync(BOB_DIR) + + const alice = createSSB({ + keys: generateKeypair('alice'), + path: ALICE_DIR, + }) + + const bob = createSSB({ + keys: generateKeypair('bob'), + path: BOB_DIR, + }) + + const carolKeys = generateKeypair('carol') + const carolID = carolKeys.id + + const daveKeys = generateKeypair('dave') + const daveID = daveKeys.id + + await alice.db.loaded() + await bob.db.loaded() + + const rootA = await p(alice.db.create)({ + type: 'post', + content: { text: 'A' }, + keys: alice.config.keys, + }) + await p(bob.db.add)(rootA.msg) + + await p(setTimeout)(10) + + const replyB1 = await p(bob.db.create)({ + type: 'post', + content: { text: 'B1' }, + tangles: [rootA.hash], + keys: bob.config.keys, + }) + + await p(setTimeout)(10) + + const replyB2 = await p(bob.db.create)({ + type: 'post', + content: { text: 'B2' }, + tangles: [rootA.hash], + keys: bob.config.keys, + }) + await p(alice.db.add)(replyB1.msg) + await p(alice.db.add)(replyB2.msg) + + await p(setTimeout)(10) + + const replyC1 = await p(alice.db.create)({ + type: 'post', + content: { text: 'C1' }, + tangles: [rootA.hash], + keys: carolKeys, + }) + + await p(setTimeout)(10) + + const replyD1 = await p(bob.db.create)({ + type: 'post', + content: { text: 'D1' }, + tangles: [rootA.hash], + keys: daveKeys, + }) + + t.deepEquals( + [...alice.db.msgs()].map((msg) => msg.content.text), + ['A', 'B1', 'B2', 'C1'], + 'alice has a portion of the thread' + ) + + t.deepEquals( + [...bob.db.msgs()].map((msg) => msg.content.text), + ['A', 'B1', 'B2', 'D1'], + 'bob has another portion of the thread' + ) + + const remoteAlice = await p(bob.connect)(alice.getAddress()) + t.pass('bob connected to alice') + + bob.threadSync.request(rootA.hash) + await p(setTimeout)(1000) + t.pass('threadSync!') + + t.deepEquals( + [...bob.db.msgs()].map((msg) => msg.content.text), + ['A', 'B1', 'B2', 'D1', 'C1'], + 'bob has the full thread' + ) + + t.deepEquals( + [...alice.db.msgs()].map((msg) => msg.content.text), + ['A', 'B1', 'B2', 'C1', 'D1'], + 'alice has the full thread' + ) + + await p(remoteAlice.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +}) + +test('sync a thread where first peer does not have the root', async (t) => { + const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') + const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') + + rimraf.sync(ALICE_DIR) + rimraf.sync(BOB_DIR) + + const alice = createSSB({ + keys: ssbKeys.generate('ed25519', 'alice'), + path: ALICE_DIR, + }) + + const bob = createSSB({ + keys: ssbKeys.generate('ed25519', 'bob'), + path: BOB_DIR, + }) + + await alice.db.loaded() + await bob.db.loaded() + + const rootA = await p(alice.db.create)({ + feedFormat: 'classic', + content: { type: 'post', text: 'A' }, + keys: alice.config.keys, + }) + + await p(setTimeout)(10) + + const replyA1 = await p(alice.db.create)({ + feedFormat: 'classic', + content: { type: 'post', text: 'A1', root: rootA.key, branch: rootA.key }, + keys: alice.config.keys, + }) + + await p(setTimeout)(10) + + const replyA2 = await p(alice.db.create)({ + feedFormat: 'classic', + content: { type: 'post', text: 'A2', root: rootA.key, branch: replyA1.key }, + keys: alice.config.keys, + }) + + t.deepEquals( + alice.db.filterAsArray((msg) => true).map((msg) => msg.value.content.text), + ['A', 'A1', 'A2'], + 'alice has the full thread' + ) + + t.deepEquals( + bob.db.filterAsArray((msg) => true).map((msg) => msg.value.content.text), + [], + 'bob has nothing' + ) + + const remoteAlice = await p(bob.connect)(alice.getAddress()) + t.pass('bob connected to alice') + + bob.threadSync.request(rootA.key) + await p(setTimeout)(1000) + t.pass('threadSync!') + + t.deepEquals( + bob.db.filterAsArray((msg) => true).map((msg) => msg.value.content.text), + ['A', 'A1', 'A2'], + 'bob has the full thread' + ) + + await p(remoteAlice.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +}) + +test('sync a thread where second peer does not have the root', async (t) => { + const ALICE_DIR = path.join(os.tmpdir(), 'dagsync-alice') + const BOB_DIR = path.join(os.tmpdir(), 'dagsync-bob') + + rimraf.sync(ALICE_DIR) + rimraf.sync(BOB_DIR) + + const alice = createSSB({ + keys: ssbKeys.generate('ed25519', 'alice'), + path: ALICE_DIR, + }) + + const bob = createSSB({ + keys: ssbKeys.generate('ed25519', 'bob'), + path: BOB_DIR, + }) + + await alice.db.loaded() + await bob.db.loaded() + + const rootA = await p(alice.db.create)({ + feedFormat: 'classic', + content: { type: 'post', text: 'A' }, + keys: alice.config.keys, + }) + + await p(setTimeout)(10) + + const replyA1 = await p(alice.db.create)({ + feedFormat: 'classic', + content: { type: 'post', text: 'A1', root: rootA.key, branch: rootA.key }, + keys: alice.config.keys, + }) + + await p(setTimeout)(10) + + const replyA2 = await p(alice.db.create)({ + feedFormat: 'classic', + content: { type: 'post', text: 'A2', root: rootA.key, branch: replyA1.key }, + keys: alice.config.keys, + }) + + t.deepEquals( + alice.db.filterAsArray((msg) => true).map((msg) => msg.value.content.text), + ['A', 'A1', 'A2'], + 'alice has the full thread' + ) + + t.deepEquals( + bob.db.filterAsArray((msg) => true).map((msg) => msg.value.content.text), + [], + 'bob has nothing' + ) + + const remoteBob = await p(alice.connect)(bob.getAddress()) + t.pass('alice connected to bob') + + alice.threadSync.request(rootA.key) + await p(setTimeout)(1000) + t.pass('threadSync!') + + t.deepEquals( + bob.db.filterAsArray((msg) => true).map((msg) => msg.value.content.text), + ['A', 'A1', 'A2'], + 'bob has the full thread' + ) + + await p(remoteBob.close)(true) + await p(alice.close)(true) + await p(bob.close)(true) +}) diff --git a/test/util.js b/test/util.js new file mode 100644 index 0000000..51feb19 --- /dev/null +++ b/test/util.js @@ -0,0 +1,14 @@ +const ssbKeys = require('ssb-keys') +const SSBURI = require('ssb-uri2') +const base58 = require('bs58') + +function generateKeypair(seed) { + const keys = ssbKeys.generate('ed25519', seed, 'buttwoo-v1') + const { data } = SSBURI.decompose(keys.id) + keys.id = `ppppp:feed/v1/${base58.encode(Buffer.from(data, 'base64'))}` + return keys +} + +module.exports = { + generateKeypair, +}