diff --git a/lib/log/errors.js b/lib/log/errors.js index e4f2b59..d505a51 100644 --- a/lib/log/errors.js +++ b/lib/log/errors.js @@ -53,6 +53,11 @@ function compactWithMaxLiveStreamErr() { return new Error('Compaction cannot run if there are live streams configured with opts.lt or opts.lte') } +function overwriteLargerThanOld() { + // prettier-ignore + return new Error('Data to be overwritten should not be larger than existing data') +} + function appendLargerThanBlockErr() { return new Error('Data to be appended is larger than block size') } @@ -70,6 +75,7 @@ module.exports = { deletedRecordErr, delDuringCompactErr, compactWithMaxLiveStreamErr, + overwriteLargerThanOld, appendLargerThanBlockErr, unexpectedTruncationErr, } diff --git a/lib/log/index.js b/lib/log/index.js index d4e98b7..c21045f 100644 --- a/lib/log/index.js +++ b/lib/log/index.js @@ -17,6 +17,7 @@ const { // delDuringCompactErr, appendLargerThanBlockErr, unexpectedTruncationErr, + overwriteLargerThanOld, // compactWithMaxLiveStreamErr, } = require('./errors') const Record = require('./record') @@ -67,17 +68,6 @@ function assert(check, message) { if (!check) throw new Error(message) } -/** - * The "End of Block" is a special field used to mark the end of a block, and - * in practice it's like a Record header "length" field, with the value 0. - * In most cases, the end region of a block will have a larger length than this, - * but we want to guarantee there is at *least* this many bytes at the end. - */ -const EOB = { - SIZE: Record.HEADER_SIZE, - asNumber: 0, -} - const DEFAULT_BLOCK_SIZE = 65536 const DEFAULT_WRITE_TIMEOUT = 250 const DEFAULT_VALIDATE = () => true @@ -89,7 +79,7 @@ const DEFAULT_VALIDATE = () => true * @param {string} filename * @param {Options} opts */ -function AsyncAppendOnlyLog(filename, opts) { +function Log(filename, opts) { const DEFAULT_CODEC = /** @type {Codec} */ ( /** @type {any} */ ({ encode: (/** @type {any} */ x) => x, @@ -327,18 +317,17 @@ function AsyncAppendOnlyLog(filename, opts) { */ function getLastGoodRecord(blockBuf, blockStart, cb) { let lastGoodOffset = 0 - for (let offsetInRecord = 0; offsetInRecord < blockSize; ) { - const length = Record.readDataLength(blockBuf, offsetInRecord) - if (length === EOB.asNumber) break - const [dataBuf, recSize] = Record.read(blockBuf, offsetInRecord) - const isLengthCorrupt = offsetInRecord + recSize > blockSize - const isDataCorrupt = !isBufferZero(dataBuf) && !validateRecord(dataBuf) + for (let offsetInRec = 0; offsetInRec < blockSize; ) { + if (Record.isEOB(blockBuf, offsetInRec)) break + const [dataBuf, recSize, dataLength] = Record.read(blockBuf, offsetInRec) + const isLengthCorrupt = offsetInRec + recSize > blockSize + const isDataCorrupt = dataLength > 0 && !validateRecord(dataBuf) if (isLengthCorrupt || isDataCorrupt) { - fixBlock(blockBuf, offsetInRecord, blockStart, lastGoodOffset, cb) + fixBlock(blockBuf, offsetInRec, blockStart, lastGoodOffset, cb) return } - lastGoodOffset = offsetInRecord - offsetInRecord += recSize + lastGoodOffset = offsetInRec + offsetInRec += recSize } cb(null, lastGoodOffset) @@ -388,8 +377,12 @@ function AsyncAppendOnlyLog(filename, opts) { getBlock(offset, function gotBlock(err, blockBuf) { if (err) return cb(err) - const [dataBuf] = Record.read(blockBuf, getOffsetInBlock(offset)) - if (isBufferZero(dataBuf)) return cb(deletedRecordErr()) + const offsetInBlock = getOffsetInBlock(offset) + const [dataBuf, _recSize, dataLength, emptyLength] = Record.read( + blockBuf, + offsetInBlock + ) + if (dataLength === 0 && emptyLength > 0) return cb(deletedRecordErr()) // @ts-ignore cb(null, codec.decode(dataBuf)) }) @@ -407,18 +400,21 @@ function AsyncAppendOnlyLog(filename, opts) { */ function getDataNextOffset(blockBuf, offset) { const offsetInBlock = getOffsetInBlock(offset) - const [dataBuf, recSize] = Record.read(blockBuf, offsetInBlock) - const nextLength = Record.readDataLength(blockBuf, offsetInBlock + recSize) + const [dataBuf, recSize, dataLength, emptyLength] = Record.read( + blockBuf, + offsetInBlock + ) + const nextOffsetInBlock = offsetInBlock + recSize let nextOffset - if (nextLength === EOB.asNumber) { + if (Record.isEOB(blockBuf, nextOffsetInBlock)) { if (getNextBlockStart(offset) > since.value) nextOffset = -1 else nextOffset = 0 } else { nextOffset = offset + recSize } - if (isBufferZero(dataBuf)) return [nextOffset, null, recSize] + if (dataLength === 0 && emptyLength > 0) return [nextOffset, null, recSize] else return [nextOffset, codec.decode(dataBuf), recSize] } @@ -478,11 +474,9 @@ function AsyncAppendOnlyLog(filename, opts) { if (err) return cb(err) assert(blockBuf, 'blockBuf should be defined in gotBlockForDelete') const actualBlockBuf = blocksWithDeletables.get(blockIndex) ?? blockBuf - Record.overwriteWithZeroes(actualBlockBuf, getOffsetInBlock(offset)) - deletedBytes += Record.readSize( - actualBlockBuf, - getOffsetInBlock(offset) - ) + const offsetInBlock = getOffsetInBlock(offset) + Record.overwriteAsEmpty(actualBlockBuf, offsetInBlock) + deletedBytes += Record.readSize(actualBlockBuf, offsetInBlock) blocksWithDeletables.set(blockIndex, actualBlockBuf) scheduleFlushDelete() cb() @@ -502,7 +496,7 @@ function AsyncAppendOnlyLog(filename, opts) { * @param {number} offsetInBlock */ function hasNoSpaceFor(dataBuf, offsetInBlock) { - return offsetInBlock + Record.size(dataBuf) + EOB.SIZE > blockSize + return offsetInBlock + Record.size(dataBuf) + Record.EOB_SIZE > blockSize } const scheduleFlushDelete = debounce(flushDelete, writeTimeout) @@ -550,8 +544,9 @@ function AsyncAppendOnlyLog(filename, opts) { let encodedData = codec.encode(data) if (typeof encodedData === 'string') encodedData = b4a.from(encodedData) - if (Record.size(encodedData) + EOB.SIZE > blockSize) + if (Record.size(encodedData) + Record.EOB_SIZE > blockSize) { throw appendLargerThanBlockErr() + } assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set') assert(typeof nextOffsetInBlock === 'number', 'nextOffsetInBlock not set') @@ -646,12 +641,49 @@ function AsyncAppendOnlyLog(filename, opts) { * @param {B4A } blockBuf * @param {CB} cb */ - function overwrite(blockIndex, blockBuf, cb) { + function overwriteBlock(blockIndex, blockBuf, cb) { cache.set(blockIndex, blockBuf) const blockStart = blockIndex * blockSize writeWithFSync(blockStart, blockBuf, null, cb) } + /** + * @param {number} offset + * @param {extractCodecType} data + * @param {CB} cb + */ + function overwrite(offset, data, cb) { + let encodedData = codec.encode(data) + if (typeof encodedData === 'string') encodedData = b4a.from(encodedData) + + assert(typeof latestBlockIndex === 'number', 'latestBlockIndex not set') + assert(typeof nextOffsetInBlock === 'number', 'nextOffsetInBlock not set') + const logSize = latestBlockIndex * blockSize + nextOffsetInBlock + if (typeof offset !== 'number') return cb(nanOffsetErr(offset)) + if (isNaN(offset)) return cb(nanOffsetErr(offset)) + if (offset < 0) return cb(negativeOffsetErr(offset)) + if (offset >= logSize) return cb(outOfBoundsOffsetErr(offset, logSize)) + + // Get the existing record at offset + getBlock(offset, function gotBlock(err, blockBuf) { + if (err) return cb(err) + const offsetInBlock = getOffsetInBlock(offset) + const oldDataLength = Record.readDataLength(blockBuf, offsetInBlock) + const oldEmptyLength = Record.readEmptyLength(blockBuf, offsetInBlock) + // Make sure encodedData fits inside existing record + if (Record.size(encodedData) > oldDataLength + oldEmptyLength) { + return cb(overwriteLargerThanOld()) + } + const newEmptyLength = oldDataLength - encodedData.length + deletedBytes += newEmptyLength + // write + Record.write(blockBuf, offsetInBlock, encodedData, newEmptyLength) + + const blockStart = getBlockStart(offset) + writeWithFSync(blockStart, blockBuf, null, cb) + }) + } + /** * @param {number} newLatestBlockIndex * @param {CB} cb @@ -819,6 +851,7 @@ function AsyncAppendOnlyLog(filename, opts) { scan: onLoad(scan), // TODO del: onLoad(del), // TODO append: onLoad(append), // TODO + overwrite: onLoad(overwrite), // TODO close: onLoad(close), // TODO onDrain: onLoad(onDrain), // TODO onDeletesFlushed: onLoad(onDeletesFlushed), @@ -830,7 +863,7 @@ function AsyncAppendOnlyLog(filename, opts) { // Internals needed by ./compaction.js: filename, blockSize, - overwrite, + overwriteBlock, truncate, hasNoSpaceFor, @@ -839,4 +872,4 @@ function AsyncAppendOnlyLog(filename, opts) { } } -module.exports = AsyncAppendOnlyLog +module.exports = Log diff --git a/lib/log/record.js b/lib/log/record.js index 76a49b3..5256f6d 100644 --- a/lib/log/record.js +++ b/lib/log/record.js @@ -8,20 +8,22 @@ const b4a = require('b4a') Binary format for a Record: - - + + The "Header" is the first two bytes for the dataLength. */ -const HEADER_SIZE = 2 // uint16 +const HEADER_D = 2 // uint16 +const HEADER_E = 2 // uint16 +const HEADER_SIZE = HEADER_D + HEADER_E // uint16 /** * @param {B4A} dataBuf */ function size(dataBuf) { - return HEADER_SIZE + dataBuf.length + return HEADER_D + HEADER_E + dataBuf.length } /** @@ -41,54 +43,127 @@ function readDataLength(blockBuf, offsetInBlock) { * @param {B4A} blockBuf * @param {number} offsetInBlock */ -function readSize(blockBuf, offsetInBlock) { - const dataLength = readDataLength(blockBuf, offsetInBlock) - return HEADER_SIZE + dataLength +function readEmptyLength(blockBuf, offsetInBlock) { + const view = new DataView( + blockBuf.buffer, + blockBuf.byteOffset, + blockBuf.byteLength + ) + return view.getUint16(offsetInBlock + 2, true) } /** * @param {B4A} blockBuf * @param {number} offsetInBlock - * @returns {[B4A, number]} + */ +function isEmpty(blockBuf, offsetInBlock) { + return ( + readDataLength(blockBuf, offsetInBlock) === 0 && + readEmptyLength(blockBuf, offsetInBlock) > 0 + ) +} + +// const EOB = { +// SIZE: Record.HEADER_SIZE, +// asNumber: 0, +// } + +/** + * The "End of Block" is a special field 4-bytes-long used to mark the end of a + * block, and in practice it's like a Record header "dataLength" and + * "emptyLength" fields both with the value 0. + * + * In most cases, the end region of a block will be much more than 4 bytes of + * zero, but we want to guarantee there is at *least* 4 bytes at the end. + * @param {B4A} blockBuf + * @param {number} offsetInBlock + */ +function isEOB(blockBuf, offsetInBlock) { + return ( + readDataLength(blockBuf, offsetInBlock) === 0 && + readEmptyLength(blockBuf, offsetInBlock) === 0 + ) +} + +/** + * @param {B4A} blockBuf + * @param {number} offsetInBlock + */ +function readSize(blockBuf, offsetInBlock) { + const dataLength = readDataLength(blockBuf, offsetInBlock) + const emptyLength = readEmptyLength(blockBuf, offsetInBlock) + return HEADER_D + HEADER_E + dataLength + emptyLength +} + +/** + * @param {B4A} blockBuf + * @param {number} offsetInBlock + * @returns {[B4A, number, number, number]} */ function read(blockBuf, offsetInBlock) { const dataLength = readDataLength(blockBuf, offsetInBlock) - const dataStart = offsetInBlock + HEADER_SIZE - const dataBuf = blockBuf.slice(dataStart, dataStart + dataLength) - const size = HEADER_SIZE + dataLength - return [dataBuf, size] + const emptyLength = readEmptyLength(blockBuf, offsetInBlock) + const dataStart = offsetInBlock + HEADER_D + HEADER_E + const dataBuf = blockBuf.subarray(dataStart, dataStart + dataLength) + const size = HEADER_D + HEADER_E + dataLength + emptyLength + return [dataBuf, size, dataLength, emptyLength] } /** * @param {B4A} blockBuf * @param {number} offsetInBlock * @param {B4A} dataBuf + * @param {number} emptySize */ -function write(blockBuf, offsetInBlock, dataBuf) { - // write dataLength - const view = new DataView(blockBuf.buffer, blockBuf.byteOffset, blockBuf.byteLength) - view.setUint16(offsetInBlock, dataBuf.length, true) - // write dataBuf - b4a.copy(dataBuf, blockBuf, offsetInBlock + HEADER_SIZE) +function write(blockBuf, offsetInBlock, dataBuf, emptySize = 0) { + const dataSize = dataBuf.length + const dataHeaderPos = offsetInBlock + const emptyHeaderPos = dataHeaderPos + HEADER_D + const dataBodyPos = emptyHeaderPos + HEADER_E + const emptyBodyPos = dataBodyPos + dataSize + + // write header + { + const view = new DataView( + blockBuf.buffer, + blockBuf.byteOffset, + blockBuf.byteLength + ) + view.setUint16(dataHeaderPos, dataSize, true) + if (emptySize > 0) { + view.setUint16(emptyHeaderPos, emptySize, true) + } + } + + // write body + { + if (dataSize > 0) { + b4a.copy(dataBuf, blockBuf, dataBodyPos) + } + if (emptySize > 0) { + b4a.fill(blockBuf, 0, emptyBodyPos, emptyBodyPos + emptySize) + } + } } /** * @param {B4A} blockBuf * @param {number} offsetInBlock */ -function overwriteWithZeroes(blockBuf, offsetInBlock) { +function overwriteAsEmpty(blockBuf, offsetInBlock) { const dataLength = readDataLength(blockBuf, offsetInBlock) - const dataStart = offsetInBlock + HEADER_SIZE - const dataEnd = dataStart + dataLength - blockBuf.fill(0, dataStart, dataEnd) + write(blockBuf, offsetInBlock, b4a.alloc(0), dataLength) } module.exports = { - HEADER_SIZE, + EOB_SIZE: HEADER_D + HEADER_E, size, readDataLength, + readEmptyLength, readSize, read, write, - overwriteWithZeroes, + overwriteAsEmpty, + isEmpty, + isEOB, } diff --git a/protospec.md b/protospec.md index 09318d8..277ba72 100644 --- a/protospec.md +++ b/protospec.md @@ -177,6 +177,8 @@ The `prev` array for a tangle should list: Whenever we need to serialize any JSON in the context of creating a Feed V1 message, we follow the "JSON Canonicalization Scheme" (JSC) defined by [RFC 8785](https://tools.ietf.org/html/rfc8785). +A serialized msg must not be larger than 65535 UTF-8 bytes. + # Msg V2 Background: https://github.com/ssbc/ssb2-discussion-forum/issues/24 diff --git a/test/add.test.js b/test/add.test.js index 0b81c01..3b42a7d 100644 --- a/test/add.test.js +++ b/test/add.test.js @@ -50,7 +50,7 @@ test('add()', async (t) => { await p(peer.db._getLog().onDrain)() const stats = await p(peer.db.logStats)() - assert.deepEqual(stats, { totalBytes: 900, deletedBytes: 0 }) + assert.deepEqual(stats, { totalBytes: 904, deletedBytes: 0 }) await p(peer.close)(true) }) diff --git a/test/log/basic.test.js b/test/log/basic.test.js index 15930d4..5c173d8 100644 --- a/test/log/basic.test.js +++ b/test/log/basic.test.js @@ -19,7 +19,7 @@ test('Log basics', async function (t) { assert.equal(offset1, 0) const offset2 = await p(log.append)(msg2) - assert.equal(offset2, msg1.length + 2) + assert.equal(offset2, msg1.length + 4) const b1 = await p(log._get)(offset1) assert.equal(b1.toString(), msg1.toString()) @@ -47,7 +47,7 @@ test('Log basics', async function (t) { assert.equal(offset1, 0) const offset2 = await p(log.append)(json2) - assert.equal(offset2, 20) + assert.equal(offset2, 22) const rec1 = await p(log._get)(offset1) assert.deepEqual(rec1, json1) @@ -66,12 +66,12 @@ test('Log basics', async function (t) { }) await p(log.onDrain)() - assert.equal(log.since.value, 20) + assert.equal(log.since.value, 22) const rec1 = await p(log._get)(0) assert.deepEqual(rec1, json1) - const rec2 = await p(log._get)(20) + const rec2 = await p(log._get)(22) assert.deepEqual(rec2, json2) await p(log.close)() diff --git a/test/log/corrupt-records.test.js b/test/log/corrupt-records.test.js index d1a9797..3c6d4ef 100644 --- a/test/log/corrupt-records.test.js +++ b/test/log/corrupt-records.test.js @@ -107,9 +107,9 @@ test('Log handles corrupted length', async (t) => { const msg2 = encode({ bool: true, test: 'testing2' }) block.writeUInt16LE(msg1.length, 0) - msg1.copy(block, 2) - block.writeUInt16LE(65534, 2 + msg1.length) // corrupt! - msg2.copy(block, 2 + msg1.length + 2) + msg1.copy(block, 4) + block.writeUInt16LE(65534, 4 + msg1.length) // corrupt! + msg2.copy(block, 4 + msg1.length + 4) await p(raf.write.bind(raf))(0, block) diff --git a/test/log/fix-buggy-write.test.js b/test/log/fix-buggy-write.test.js index cbe4e21..26d462b 100644 --- a/test/log/fix-buggy-write.test.js +++ b/test/log/fix-buggy-write.test.js @@ -22,7 +22,7 @@ test('Log fix buggy write', async (t) => { const offset1 = await p(log.append)(msg1) assert.equal(offset1, 0) const offset2 = await p(log.append)(msg2) - assert.equal(offset2, 36) + assert.equal(offset2, 38) await p(log.onDrain)() let arr = [] diff --git a/test/log/overwrite.test.js b/test/log/overwrite.test.js new file mode 100644 index 0000000..c0dfc3e --- /dev/null +++ b/test/log/overwrite.test.js @@ -0,0 +1,79 @@ +const test = require('node:test') +const assert = require('node:assert') +const fs = require('node:fs') +const p = require('node:util').promisify +const Log = require('../../lib/log') + +const msg1 = Buffer.from('hello world hello world hello world') +const msg2 = Buffer.from('ola mundo ola mundo ola mundo') + +test('Log overwrites', async (t) => { + await t.test('Simple overwrite', async (t) => { + const file = '/tmp/ppppp-db-log-test-overwrite.log' + try { + fs.unlinkSync(file) + } catch (_) {} + const log = Log(file, { blockSize: 2 * 1024 }) + + const offset1 = await p(log.append)(msg1) + assert.equal(offset1, 0) + const offset2 = await p(log.append)(msg2) + assert.ok(offset2 > offset1) + + const buf1 = await p(log._get)(offset1) + assert.equal(buf1.toString(), msg1.toString()) + const buf2 = await p(log._get)(offset2) + assert.equal(buf2.toString(), msg2.toString()) + + await p(log.overwrite)(offset1, Buffer.from('hi world')) + const buf = await p(log._get)(offset1) + assert.equal(buf.toString(), 'hi world') + + let arr = [] + await new Promise((resolve, reject) => { + log.scan( + (offset, data, size) => { + arr.push(data.toString()) + }, + (err) => { + if (err) reject(err) + else resolve() + } + ) + }) + + assert.deepEqual(arr, ['hi world', 'ola mundo ola mundo ola mundo']) + + await p(log.close)() + }) + + await t.test('Cannot overwrite larger data', async (t) => { + const file = '/tmp/ppppp-db-log-test-overwrite-larger.log' + try { + fs.unlinkSync(file) + } catch (_) {} + const log = Log(file, { blockSize: 2 * 1024 }) + + const offset1 = await p(log.append)(msg1) + assert.equal(offset1, 0) + const offset2 = await p(log.append)(msg2) + assert.ok(offset2 > offset1) + + const buf1 = await p(log._get)(offset1) + assert.equal(buf1.toString(), msg1.toString()) + const buf2 = await p(log._get)(offset2) + assert.equal(buf2.toString(), msg2.toString()) + + const promise = p(log.overwrite)( + offset1, + Buffer.from('hello world hello world hello world hello world') + ) + await assert.rejects(promise, (err) => { + assert.ok(err) + assert.match(err.message, /should not be larger than existing data/) + return true + }) + + await p(log.close)() + }) +})