update db, rename to ppppp-sync

This commit is contained in:
Andre Staltz 2023-11-24 15:08:48 +02:00
parent ecd604a46f
commit e7bc0e1918
No known key found for this signature in database
GPG Key ID: 9EDE23EA7E8A4890
9 changed files with 68 additions and 79 deletions

View File

@ -1,6 +1,8 @@
const p = require('promisify-4loc') const p = require('promisify-4loc')
const { BloomFilter } = require('bloom-filters') const { BloomFilter } = require('bloom-filters')
const MsgV3 = require('ppppp-db/msg-v3') const MsgV3 = require('ppppp-db/msg-v3')
const makeDebug = require('debug')
const debug = makeDebug('ppppp:sync')
const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range') const { EMPTY_RANGE, isEmptyRange, estimateMsgCount } = require('./range')
/** /**
@ -28,7 +30,7 @@ function countIter(iter) {
*/ */
function assertDictPlugin(peer) { function assertDictPlugin(peer) {
if (!peer.dict) { if (!peer.dict) {
throw new Error('tanglesync plugin requires ppppp-dict plugin') throw new Error('sync plugin requires ppppp-dict plugin')
} }
} }
@ -38,7 +40,7 @@ function assertDictPlugin(peer) {
*/ */
function assertSetPlugin(peer) { function assertSetPlugin(peer) {
if (!peer.set) { if (!peer.set) {
throw new Error('tanglesync plugin requires ppppp-set plugin') throw new Error('sync plugin requires ppppp-set plugin')
} }
} }
@ -287,8 +289,7 @@ class Algorithm {
try { try {
await p(this.#peer.db.add)(msg, rootID) //, doneAdding()) await p(this.#peer.db.add)(msg, rootID) //, doneAdding())
} catch (err) { } catch (err) {
// TODO: debug('Commit failed to add msg in db: %o', err)
// debug('commit failed %o', err)
} }
} }

View File

@ -32,7 +32,7 @@ function isMuxrpcMissingError(err, namespace, methodName) {
* @returns {asserts peer is { db: PPPPPDB }} * @returns {asserts peer is { db: PPPPPDB }}
*/ */
function assertDBExists(peer) { function assertDBExists(peer) {
if (!peer.db) throw new Error('tangleSync requires ppppp-db plugin') if (!peer.db) throw new Error('sync requires ppppp-db plugin')
} }
/** /**
@ -40,7 +40,7 @@ function assertDBExists(peer) {
* @returns {asserts peer is { goals: PPPPPGoals }} * @returns {asserts peer is { goals: PPPPPGoals }}
*/ */
function assertGoalsExists(peer) { function assertGoalsExists(peer) {
if (!peer.goals) throw new Error('tangleSync requires ppppp-goals plugin') if (!peer.goals) throw new Error('sync requires ppppp-goals plugin')
} }
/** /**
@ -48,11 +48,11 @@ function assertGoalsExists(peer) {
* @returns {asserts peer is { shse: SHSE }} * @returns {asserts peer is { shse: SHSE }}
*/ */
function assertSHSEExists(peer) { function assertSHSEExists(peer) {
if (!peer.shse) throw new Error('tangleSync requires secret-handshake-ext') if (!peer.shse) throw new Error('sync requires secret-handshake-ext')
} }
module.exports = { module.exports = {
name: 'tangleSync', name: 'sync',
manifest: { manifest: {
connect: 'duplex', connect: 'duplex',
initiate: 'sync', initiate: 'sync',
@ -77,7 +77,7 @@ module.exports = {
assertDBExists(peer) assertDBExists(peer)
assertGoalsExists(peer) assertGoalsExists(peer)
assertSHSEExists(peer) assertSHSEExists(peer)
const debug = makeDebug(`ppppp:tangleSync`) const debug = makeDebug(`ppppp:sync`)
const algo = new Algorithm(peer) const algo = new Algorithm(peer)
const streams = /** @type {Array<SyncStream>} */ ([]) const streams = /** @type {Array<SyncStream>} */ ([])
@ -97,7 +97,7 @@ module.exports = {
} }
/** /**
* @param {{ shse: SHSE, tangleSync: { connect: GetDuplex } }} rpc * @param {{ shse: SHSE, sync: { connect: GetDuplex } }} rpc
* @param {boolean} iamClient * @param {boolean} iamClient
*/ */
function onSyncRPCConnect(rpc, iamClient) { function onSyncRPCConnect(rpc, iamClient) {
@ -107,14 +107,14 @@ module.exports = {
const local = toPull.duplex(createStream(rpc.shse.pubkey, true)) const local = toPull.duplex(createStream(rpc.shse.pubkey, true))
let abort = /** @type {CallableFunction | null} */ (null) let abort = /** @type {CallableFunction | null} */ (null)
const remote = rpc.tangleSync.connect((networkError) => { const remote = rpc.sync.connect((networkError) => {
if (networkError && getSeverity(networkError) >= 3) { if (networkError && getSeverity(networkError) >= 3) {
if (isMuxrpcMissingError(networkError, 'tangleSync', 'connect')) { if (isMuxrpcMissingError(networkError, 'sync', 'connect')) {
debug('peer %s does not support tangleSync', rpc.shse.pubkey) debug('peer %s does not support sync', rpc.shse.pubkey)
// } else if (isReconnectedError(networkError)) { // TODO: bring back // } else if (isReconnectedError(networkError)) { // TODO: bring back
// Do nothing, this is a harmless error // Do nothing, this is a harmless error
} else { } else {
console.error(`rpc.tangleSync.connect exception:`, networkError) console.error(`rpc.sync.connect exception:`, networkError)
} }
abort?.(true, () => {}) abort?.(true, () => {})
} }
@ -124,16 +124,13 @@ module.exports = {
peer.on('rpc:connect', onSyncRPCConnect) peer.on('rpc:connect', onSyncRPCConnect)
/** /**
* @this {{id: string}} * @this {{shse: {pubkey: string}}}
*/ */
function connect() { function connect() {
// `this` refers to the remote peer who called this muxrpc API return toPull.duplex(createStream(this.shse.pubkey, false))
return toPull.duplex(createStream(this.id, false))
// TODO: fix muxrpc to replace this.id with this.shse.pubkey.
// this.id comes from muxrpc, not secret-stack
} }
function initiate() { function start() {
for (const stream of streams) { for (const stream of streams) {
stream.initiate() stream.initiate()
} }
@ -141,7 +138,7 @@ module.exports = {
return { return {
connect, connect,
initiate, start,
} }
}, },
} }

View File

@ -1,13 +1,13 @@
{ {
"name": "ppppp-tangle-sync", "name": "ppppp-sync",
"version": "1.0.0", "version": "1.0.0",
"description": "PPPPP replication using Kleppmann's hash graph sync", "description": "PPPPP replication using Kleppmann's hash graph sync",
"author": "Andre Staltz <contact@staltz.com>", "author": "Andre Staltz <contact@staltz.com>",
"license": "MIT", "license": "MIT",
"homepage": "https://github.com/staltz/ppppp-tangle-sync", "homepage": "https://github.com/staltz/ppppp-sync",
"repository": { "repository": {
"type": "git", "type": "git",
"url": "git@github.com:staltz/ppppp-tangle-sync.git" "url": "git@github.com:staltz/ppppp-sync.git"
}, },
"main": "index.js", "main": "index.js",
"files": [ "files": [
@ -47,8 +47,8 @@
"prettier": "^2.6.2", "prettier": "^2.6.2",
"pretty-quick": "^3.1.3", "pretty-quick": "^3.1.3",
"rimraf": "^4.4.0", "rimraf": "^4.4.0",
"secret-stack": "~7.1.0", "secret-stack": "~7.1.1",
"secret-handshake-ext": "^0.0.8", "secret-handshake-ext": "0.0.8",
"ssb-box": "^1.0.1", "ssb-box": "^1.0.1",
"typescript": "^5.1.3" "typescript": "^5.1.3"
}, },

View File

@ -10,7 +10,7 @@ const bobKeys = Keypair.generate('ed25519', 'bob')
function getAccount(iter) { function getAccount(iter) {
return [...iter] return [...iter]
.filter((m) => m.metadata.account === 'self' && m.data?.action === 'add') .filter((m) => m.metadata.account === 'self' && m.data?.action === 'add')
.map((m) => m.data.add.key.bytes) .map((m) => m.data.key.bytes)
} }
test('sync an account tangle', async (t) => { test('sync an account tangle', async (t) => {
@ -23,7 +23,7 @@ test('sync an account tangle', async (t) => {
// Alice's account tangle // Alice's account tangle
await alice.db.loaded() await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({ const aliceID = await p(alice.db.account.create)({
domain: 'account', subdomain: 'account',
_nonce: 'alice', _nonce: 'alice',
}) })
@ -57,9 +57,9 @@ test('sync an account tangle', async (t) => {
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert('tangleSync!') assert('sync!')
assert.deepEqual( assert.deepEqual(
getAccount(bob.db.msgs()), getAccount(bob.db.msgs()),

View File

@ -27,7 +27,7 @@ test('sync goal=dict with ghostSpan=2', async (t) => {
// Alice sets up an account and a dict // Alice sets up an account and a dict
const aliceID = await p(alice.db.account.create)({ const aliceID = await p(alice.db.account.create)({
domain: 'account', subdomain: 'account',
_nonce: 'alice', _nonce: 'alice',
}) })
await p(alice.dict.load)(aliceID) await p(alice.dict.load)(aliceID)
@ -107,7 +107,7 @@ test('sync goal=dict with ghostSpan=2', async (t) => {
} }
} }
// Assert situation at Alice before tangleSync // Assert situation at Alice before sync
{ {
const arr = [...alice.db.msgs()] const arr = [...alice.db.msgs()]
.map((msg) => msg.data?.update) .map((msg) => msg.data?.update)
@ -117,16 +117,16 @@ test('sync goal=dict with ghostSpan=2', async (t) => {
} }
assert.deepEqual(alice.db.ghosts.get(moot.id), [rec1.id, rec2.id]) assert.deepEqual(alice.db.ghosts.get(moot.id), [rec1.id, rec2.id])
// Trigger tangleSync // Trigger sync
alice.goals.set(moot.id, 'dict') alice.goals.set(moot.id, 'dict')
bob.goals.set(moot.id, 'dict') bob.goals.set(moot.id, 'dict')
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert('tangleSync!') assert('sync!')
// Assert situation at Alice before tangleSync: she got the branched off msg // Assert situation at Alice before sync: she got the branched off msg
{ {
const arr = [...alice.db.msgs()] const arr = [...alice.db.msgs()]
.map((msg) => msg.data?.update) .map((msg) => msg.data?.update)

View File

@ -16,7 +16,7 @@ test('sync a feed with goal=all', async (t) => {
const carolID = await p(alice.db.account.create)({ const carolID = await p(alice.db.account.create)({
keypair: carolKeypair, keypair: carolKeypair,
domain: 'account', subdomain: 'account',
_nonce: 'carol', _nonce: 'carol',
}) })
const carolAccountRoot = alice.db.get(carolID) const carolAccountRoot = alice.db.get(carolID)
@ -61,9 +61,9 @@ test('sync a feed with goal=all', async (t) => {
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert('tangleSync!') assert('sync!')
{ {
const arr = [...bob.db.msgs()] const arr = [...bob.db.msgs()]
@ -90,7 +90,7 @@ test('sync a feed with goal=newest', async (t) => {
const carolID = await p(alice.db.account.create)({ const carolID = await p(alice.db.account.create)({
keypair: carolKeypair, keypair: carolKeypair,
domain: 'account', subdomain: 'account',
_nonce: 'carol', _nonce: 'carol',
}) })
const carolAccountRoot = alice.db.get(carolID) const carolAccountRoot = alice.db.get(carolID)
@ -135,9 +135,9 @@ test('sync a feed with goal=newest', async (t) => {
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert('tangleSync!') assert('sync!')
{ {
const arr = [...bob.db.msgs()] const arr = [...bob.db.msgs()]
@ -164,7 +164,7 @@ test('sync a feed with goal=newest but too far behind', async (t) => {
const carolID = await p(alice.db.account.create)({ const carolID = await p(alice.db.account.create)({
keypair: carolKeypair, keypair: carolKeypair,
domain: 'account', subdomain: 'account',
_nonce: 'carol', _nonce: 'carol',
}) })
const carolIDMsg = alice.db.get(carolID) const carolIDMsg = alice.db.get(carolID)
@ -217,9 +217,9 @@ test('sync a feed with goal=newest but too far behind', async (t) => {
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert('tangleSync!') assert('sync!')
{ {
const arr = [...bob.db.msgs()] const arr = [...bob.db.msgs()]

View File

@ -35,7 +35,7 @@ test('sync goal=set with ghostSpan=2', async (t) => {
// Alice sets up an account and a set // Alice sets up an account and a set
const aliceID = await p(alice.db.account.create)({ const aliceID = await p(alice.db.account.create)({
domain: 'account', subdomain: 'account',
_nonce: 'alice', _nonce: 'alice',
}) })
await p(alice.set.load)(aliceID) await p(alice.set.load)(aliceID)
@ -68,14 +68,6 @@ test('sync goal=set with ghostSpan=2', async (t) => {
if (rec.msg.data?.add?.[0] === 'Bob') rec6 = rec if (rec.msg.data?.add?.[0] === 'Bob') rec6 = rec
} }
console.log('moot', moot.id);
console.log('msg1', rec1.id);
console.log('msg2', rec2.id);
console.log('msg3', rec3.id);
console.log('msg4', rec4.id);
console.log('msg5', rec5.id);
console.log('msg6', rec6.id);
// Bob knows the whole set // Bob knows the whole set
await p(bob.db.add)(moot.msg, moot.id) await p(bob.db.add)(moot.msg, moot.id)
await p(bob.db.add)(rec1.msg, moot.id) await p(bob.db.add)(rec1.msg, moot.id)
@ -127,24 +119,23 @@ console.log('msg6', rec6.id);
} }
} }
// Assert situation at Alice before tangleSync // Assert situation at Alice before sync
{ {
const arr = getItems([...alice.db.msgs()]) const arr = getItems([...alice.db.msgs()])
console.log(arr)
assert.deepEqual(arr, ['Alice', 'Bob'], 'alice has Alice+Bob set') assert.deepEqual(arr, ['Alice', 'Bob'], 'alice has Alice+Bob set')
} }
assert.deepEqual(alice.db.ghosts.get(moot.id), [rec1.id, rec2.id]) assert.deepEqual(alice.db.ghosts.get(moot.id), [rec1.id, rec2.id])
// Trigger tangleSync // Trigger sync
alice.goals.set(moot.id, 'set') alice.goals.set(moot.id, 'set')
bob.goals.set(moot.id, 'set') bob.goals.set(moot.id, 'set')
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert('tangleSync!') assert('sync!')
// Assert situation at Alice after tangleSync: she got the branched off msg // Assert situation at Alice after sync: she got the branched off msg
{ {
const arr = getItems([...alice.db.msgs()]) const arr = getItems([...alice.db.msgs()])
assert.deepEqual( assert.deepEqual(

View File

@ -54,21 +54,21 @@ test('sync a thread where both peers have portions', async (t) => {
await alice.db.loaded() await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({ const aliceID = await p(alice.db.account.create)({
domain: 'account', subdomain: 'account',
_nonce: 'alice', _nonce: 'alice',
}) })
const aliceIDMsg = alice.db.get(aliceID) const aliceIDMsg = alice.db.get(aliceID)
await bob.db.loaded() await bob.db.loaded()
const bobID = await p(bob.db.account.create)({ const bobID = await p(bob.db.account.create)({
domain: 'account', subdomain: 'account',
_nonce: 'bob', _nonce: 'bob',
}) })
const bobIDMsg = bob.db.get(bobID) const bobIDMsg = bob.db.get(bobID)
// Alice created Carol // Alice created Carol
const carolID = await p(alice.db.account.create)({ const carolID = await p(alice.db.account.create)({
domain: 'account', subdomain: 'account',
keypair: carolKeypair, keypair: carolKeypair,
_nonce: 'carol', _nonce: 'carol',
}) })
@ -76,7 +76,7 @@ test('sync a thread where both peers have portions', async (t) => {
// Alice created Dave // Alice created Dave
const daveID = await p(alice.db.account.create)({ const daveID = await p(alice.db.account.create)({
domain: 'account', subdomain: 'account',
keypair: daveKeypair, keypair: daveKeypair,
_nonce: 'dave', _nonce: 'dave',
}) })
@ -155,9 +155,9 @@ test('sync a thread where both peers have portions', async (t) => {
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert('tangleSync!') assert('sync!')
assert.deepEqual( assert.deepEqual(
getTexts(alice.db.msgs()), getTexts(alice.db.msgs()),
@ -182,14 +182,14 @@ test('sync a thread where initiator does not have the root', async (t) => {
await alice.db.loaded() await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({ const aliceID = await p(alice.db.account.create)({
domain: 'account', subdomain: 'account',
_nonce: 'alice', _nonce: 'alice',
}) })
const aliceIDMsg = alice.db.get(aliceID) const aliceIDMsg = alice.db.get(aliceID)
await bob.db.loaded() await bob.db.loaded()
const bobID = await p(bob.db.account.create)({ const bobID = await p(bob.db.account.create)({
domain: 'account', subdomain: 'account',
_nonce: 'bob', _nonce: 'bob',
}) })
const bobIDMsg = bob.db.get(bobID) const bobIDMsg = bob.db.get(bobID)
@ -235,9 +235,9 @@ test('sync a thread where initiator does not have the root', async (t) => {
const remoteAlice = await p(bob.connect)(alice.getAddress()) const remoteAlice = await p(bob.connect)(alice.getAddress())
assert('bob connected to alice') assert('bob connected to alice')
bob.tangleSync.initiate() bob.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert('tangleSync!') assert('sync!')
assert.deepEqual( assert.deepEqual(
getTexts(bob.db.msgs()), getTexts(bob.db.msgs()),
@ -256,14 +256,14 @@ test('sync a thread where receiver does not have the root', async (t) => {
await alice.db.loaded() await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({ const aliceID = await p(alice.db.account.create)({
domain: 'account', subdomain: 'account',
_nonce: 'alice', _nonce: 'alice',
}) })
const aliceIDMsg = alice.db.get(aliceID) const aliceIDMsg = alice.db.get(aliceID)
await bob.db.loaded() await bob.db.loaded()
const bobID = await p(bob.db.account.create)({ const bobID = await p(bob.db.account.create)({
domain: 'account', subdomain: 'account',
_nonce: 'bob', _nonce: 'bob',
}) })
const bobIDMsg = bob.db.get(bobID) const bobIDMsg = bob.db.get(bobID)
@ -308,9 +308,9 @@ test('sync a thread where receiver does not have the root', async (t) => {
const remoteBob = await p(alice.connect)(bob.getAddress()) const remoteBob = await p(alice.connect)(bob.getAddress())
assert('alice connected to bob') assert('alice connected to bob')
alice.tangleSync.initiate() alice.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert('tangleSync!') assert('sync!')
assert.deepEqual( assert.deepEqual(
getTexts(bob.db.msgs()), getTexts(bob.db.msgs()),
@ -329,14 +329,14 @@ test('sync a thread with reactions too', async (t) => {
await alice.db.loaded() await alice.db.loaded()
const aliceID = await p(alice.db.account.create)({ const aliceID = await p(alice.db.account.create)({
domain: 'account', subdomain: 'account',
_nonce: 'alice', _nonce: 'alice',
}) })
const aliceIDMsg = alice.db.get(aliceID) const aliceIDMsg = alice.db.get(aliceID)
await bob.db.loaded() await bob.db.loaded()
const bobID = await p(bob.db.account.create)({ const bobID = await p(bob.db.account.create)({
domain: 'account', subdomain: 'account',
_nonce: 'bob', _nonce: 'bob',
}) })
const bobIDMsg = bob.db.get(bobID) const bobIDMsg = bob.db.get(bobID)
@ -388,9 +388,9 @@ test('sync a thread with reactions too', async (t) => {
const remoteBob = await p(alice.connect)(bob.getAddress()) const remoteBob = await p(alice.connect)(bob.getAddress())
assert('alice connected to bob') assert('alice connected to bob')
alice.tangleSync.initiate() alice.sync.start()
await p(setTimeout)(1000) await p(setTimeout)(1000)
assert('tangleSync!') assert('sync!')
assert.deepEqual( assert.deepEqual(
getTexts(bob.db.msgs()), getTexts(bob.db.msgs()),

View File

@ -7,7 +7,7 @@ const Keypair = require('ppppp-keypair')
function createPeer(opts) { function createPeer(opts) {
if (opts.name) { if (opts.name) {
const tmp = os.tmpdir() const tmp = os.tmpdir()
opts.path ??= path.join(tmp, `tanglesync-${opts.name}-${Date.now()}`) opts.path ??= path.join(tmp, `ppppp-sync-${opts.name}-${Date.now()}`)
opts.keypair ??= Keypair.generate('ed25519', opts.name) opts.keypair ??= Keypair.generate('ed25519', opts.name)
opts.name = undefined opts.name = undefined
} }