Fix compactor and add simple tests for it.

This commit is contained in:
John Preston 2018-08-25 13:06:54 +03:00
parent 2940023cb0
commit f6a6a39d3b
8 changed files with 321 additions and 45 deletions

View File

@ -30,6 +30,23 @@ bool BinlogWrapper::failed() const {
return _failed; return _failed;
} }
base::optional<BasicHeader> BinlogWrapper::ReadHeader(
File &binlog,
const Settings &settings) {
auto result = BasicHeader();
if (binlog.offset() != 0) {
return {};
} else if (binlog.read(bytes::object_as_span(&result)) != sizeof(result)) {
return {};
} else if (result.format != Format::Format_0) {
return {};
} else if (settings.trackEstimatedTime
!= !!(result.flags & result.kTrackEstimatedTime)) {
return {};
}
return result;
}
bool BinlogWrapper::readPart() { bool BinlogWrapper::readPart() {
if (_finished) { if (_finished) {
return false; return false;
@ -44,21 +61,20 @@ bool BinlogWrapper::readPart() {
return no(); return no();
} }
Assert(_notParsedBytes >= 0 && _notParsedBytes <= _part.size()); if (!_part.empty() && _full.data() != _part.data()) {
if (_notParsedBytes > 0 && _notParsedBytes < _part.size()) { bytes::move(_full, _part);
bytes::move(_full, _part.subspan(_part.size() - _notParsedBytes)); _part = _full.subspan(0, _part.size());
} }
const auto amount = std::min( const auto amount = std::min(
left, left,
int64(_full.size() - _notParsedBytes)); int64(_full.size() - _part.size()));
Assert(amount > 0); Assert(amount > 0);
const auto readBytes = _binlog.read( const auto readBytes = _binlog.read(
_full.subspan(_notParsedBytes, amount)); _full.subspan(_part.size(), amount));
if (!readBytes) { if (!readBytes) {
return no(); return no();
} }
_notParsedBytes += readBytes; _part = _full.subspan(0, _part.size() + readBytes);
_part = _full.subspan(0, _notParsedBytes);
return true; return true;
} }
@ -74,10 +90,9 @@ bytes::const_span BinlogWrapper::readRecord(ReadRecordSize readRecordSize) {
_finished = _failed = true; _finished = _failed = true;
return {}; return {};
} }
Assert(size >= 0 && size <= _notParsedBytes); Assert(size >= 0);
const auto result = _part.subspan(0, size); const auto result = _part.subspan(0, size);
_part = _part.subspan(size); _part = _part.subspan(size);
_notParsedBytes -= size;
return result; return result;
} }
@ -86,9 +101,9 @@ void BinlogWrapper::finish(size_type rollback) {
if (rollback > 0) { if (rollback > 0) {
_failed = true; _failed = true;
_notParsedBytes += rollback;
} }
_binlog.seek(_binlog.offset() - _notParsedBytes); rollback += _part.size();
_binlog.seek(_binlog.offset() - rollback);
} }
} // namespace details } // namespace details

View File

@ -26,6 +26,10 @@ public:
bool finished() const; bool finished() const;
bool failed() const; bool failed() const;
static base::optional<BasicHeader> ReadHeader(
File &binlog,
const Settings &settings);
private: private:
template <typename ...Records> template <typename ...Records>
friend class BinlogReader; friend class BinlogReader;
@ -45,7 +49,6 @@ private:
bytes::vector _data; bytes::vector _data;
bytes::span _full; bytes::span _full;
bytes::span _part; bytes::span _part;
index_type _notParsedBytes = 0;
bool _finished = false; bool _finished = false;
bool _failed = false; bool _failed = false;
@ -158,7 +161,7 @@ inline size_type BinlogReaderRecursive<Record, Other...>::ReadRecordSize(
} }
const auto head = reinterpret_cast<const Head*>(data.data()); const auto head = reinterpret_cast<const Head*>(data.data());
const auto count = head->validateCount(); const auto count = head->validateCount();
return (count >= 0 && count < partsLimit) return (count >= 0 && count <= partsLimit)
? (sizeof(Head) + count * sizeof(Part)) ? (sizeof(Head) + count * sizeof(Part))
: kRecordSizeInvalid; : kRecordSizeInvalid;
} else { } else {

View File

@ -37,6 +37,7 @@ private:
QString binlogPath() const; QString binlogPath() const;
QString compactPath() const; QString compactPath() const;
bool openBinlog(); bool openBinlog();
bool readHeader();
bool openCompact(); bool openCompact();
void parseChunk(); void parseChunk();
void fail(); void fail();
@ -66,6 +67,7 @@ private:
QString _base; QString _base;
Settings _settings; Settings _settings;
EncryptionKey _key; EncryptionKey _key;
BasicHeader _header;
Info _info; Info _info;
File _binlog; File _binlog;
File _compact; File _compact;
@ -108,7 +110,7 @@ void CompactorObject::initList() {
} }
void CompactorObject::start() { void CompactorObject::start() {
if (!openBinlog() || !openCompact()) { if (!openBinlog() || !readHeader() || !openCompact()) {
fail(); fail();
} }
if (_settings.trackEstimatedTime) { if (_settings.trackEstimatedTime) {
@ -138,10 +140,24 @@ bool CompactorObject::openBinlog() {
&& (_binlog.size() >= _info.till); && (_binlog.size() >= _info.till);
} }
bool CompactorObject::readHeader() {
const auto header = BinlogWrapper::ReadHeader(_binlog, _settings);
if (!header) {
return false;
}
_header = *header;
return true;
}
bool CompactorObject::openCompact() { bool CompactorObject::openCompact() {
const auto path = compactPath(); const auto path = compactPath();
const auto result = _compact.open(path, File::Mode::Write, _key); const auto result = _compact.open(path, File::Mode::Write, _key);
return (result == File::Result::Success); if (result != File::Result::Success) {
return false;
} else if (!_compact.write(bytes::object_as_span(&_header))) {
return false;
}
return true;
} }
void CompactorObject::fail() { void CompactorObject::fail() {
@ -168,6 +184,7 @@ void CompactorObject::finish() {
} }
void CompactorObject::finalize() { void CompactorObject::finalize() {
_binlog.close();
_compact.close(); _compact.close();
auto lastCatchUp = 0; auto lastCatchUp = 0;
@ -360,18 +377,20 @@ Compactor::Compactor(
Compactor::~Compactor() = default; Compactor::~Compactor() = default;
int64 CatchUp( int64 CatchUp(
const QString &compactPath, const QString &compactPath,
const QString &binlogPath, const QString &binlogPath,
const EncryptionKey &key, const EncryptionKey &key,
int64 from, int64 from,
size_type block) { size_type block) {
File binlog, compact; File binlog, compact;
const auto result1 = binlog.open(binlogPath, File::Mode::Read, key); const auto result1 = binlog.open(binlogPath, File::Mode::Read, key);
if (result1 != File::Result::Success) { if (result1 != File::Result::Success) {
return 0; return 0;
} }
const auto till = binlog.size(); const auto till = binlog.size();
if (till < from || !binlog.seek(from)) { if (till == from) {
return till;
} else if (till < from || !binlog.seek(from)) {
return 0; return 0;
} }
const auto result2 = compact.open( const auto result2 = compact.open(

View File

@ -193,17 +193,11 @@ File::Result DatabaseObject::openBinlog(
} }
bool DatabaseObject::readHeader() { bool DatabaseObject::readHeader() {
auto header = BasicHeader(); if (const auto header = BinlogWrapper::ReadHeader(_binlog, _settings)) {
if (_binlog.read(bytes::object_as_span(&header)) != sizeof(header)) { _time.setRelative((_time.system = header->systemTime));
return false; return true;
} else if (header.format != Format::Format_0) {
return false;
} else if (_settings.trackEstimatedTime
!= !!(header.flags & header.kTrackEstimatedTime)) {
return false;
} }
_time.setRelative((_time.system = header.systemTime)); return false;
return true;
} }
bool DatabaseObject::writeHeader() { bool DatabaseObject::writeHeader() {
@ -571,7 +565,8 @@ void DatabaseObject::compactorDone(
compactorFail(); compactorFail();
return; return;
} }
} else if (!File::Move(path, ready)) { }
if (!File::Move(path, ready)) {
compactorFail(); compactorFail();
return; return;
} }
@ -584,7 +579,7 @@ void DatabaseObject::compactorDone(
compactorFail(); compactorFail();
return; return;
} }
const auto result = _binlog.open(path, File::Mode::ReadAppend, _key); const auto result = _binlog.open(binlog, File::Mode::ReadAppend, _key);
if (result != File::Result::Success || !_binlog.seek(_binlog.size())) { if (result != File::Result::Success || !_binlog.seek(_binlog.size())) {
// megafail // megafail
compactorFail(); compactorFail();
@ -906,11 +901,10 @@ void DatabaseObject::checkCompactor() {
|| !_settings.compactAfterExcess || !_settings.compactAfterExcess
|| _binlogExcessLength < _settings.compactAfterExcess) { || _binlogExcessLength < _settings.compactAfterExcess) {
return; return;
} else if (_settings.compactAfterFullSize) { } else if (_settings.compactAfterFullSize
if (_binlogExcessLength * _settings.compactAfterFullSize && (_binlogExcessLength * _settings.compactAfterFullSize
< _settings.compactAfterExcess * _binlog.size()) { < _settings.compactAfterExcess * _binlog.size())) {
return; return;
}
} else if (crl::time() < _compactor.nextAttempt) { } else if (crl::time() < _compactor.nextAttempt) {
return; return;
} }

View File

@ -13,6 +13,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "base/concurrent_timer.h" #include "base/concurrent_timer.h"
#include "base/bytes.h" #include "base/bytes.h"
#include "base/flat_set.h" #include "base/flat_set.h"
#include <set>
namespace Storage { namespace Storage {
namespace Cache { namespace Cache {

View File

@ -18,6 +18,10 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
using namespace Storage::Cache; using namespace Storage::Cache;
const auto DisableLimitsTests = true;
const auto DisableCompactTests = false;
const auto DisableLargeTest = false;
const auto key = Storage::EncryptionKey(bytes::make_vector( const auto key = Storage::EncryptionKey(bytes::make_vector(
bytes::make_span("\ bytes::make_span("\
abcdefgh01234567abcdefgh01234567abcdefgh01234567abcdefgh01234567\ abcdefgh01234567abcdefgh01234567abcdefgh01234567abcdefgh01234567\
@ -102,7 +106,7 @@ Error Put(Database &db, const Key &key, const QByteArray &value) {
} }
void Remove(Database &db, const Key &key) { void Remove(Database &db, const Key &key) {
db.remove(Key{ 0, 1 }, [&] { Semaphore.release(); }); db.remove(key, [&] { Semaphore.release(); });
Semaphore.acquire(); Semaphore.acquire();
} }
@ -119,7 +123,7 @@ const auto AdvanceTime = [](int32 seconds) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000) * seconds); std::this_thread::sleep_for(std::chrono::milliseconds(1000) * seconds);
}; };
TEST_CASE("encrypted cache db", "[storage_cache_database]") { TEST_CASE("init timers", "[storage_cache_database]") {
static auto init = [] { static auto init = [] {
int argc = 0; int argc = 0;
char **argv = nullptr; char **argv = nullptr;
@ -127,6 +131,211 @@ TEST_CASE("encrypted cache db", "[storage_cache_database]") {
static base::ConcurrentTimerEnvironment environment; static base::ConcurrentTimerEnvironment environment;
return true; return true;
}(); }();
}
TEST_CASE("compacting db", "[storage_cache_database]") {
if (DisableCompactTests) {
return;
}
const auto write = [](Database &db, uint32 from, uint32 till, QByteArray base) {
for (auto i = from; i != till; ++i) {
auto value = base;
value[0] = char('A') + i;
const auto result = Put(db, Key{ i, i + 1 }, value);
REQUIRE(result.type == Error::Type::None);
}
};
const auto put = [&](Database &db, uint32 from, uint32 till) {
write(db, from, till, TestValue1);
};
const auto reput = [&](Database &db, uint32 from, uint32 till) {
write(db, from, till, TestValue2);
};
const auto remove = [](Database &db, uint32 from, uint32 till) {
for (auto i = from; i != till; ++i) {
Remove(db, Key{ i, i + 1 });
}
};
const auto get = [](Database &db, uint32 from, uint32 till) {
for (auto i = from; i != till; ++i) {
db.get(Key{ i, i + 1 }, nullptr);
}
};
const auto check = [](Database &db, uint32 from, uint32 till, QByteArray base) {
for (auto i = from; i != till; ++i) {
auto value = base;
if (!value.isEmpty()) {
value[0] = char('A') + i;
}
const auto result = Get(db, Key{ i, i + 1 });
REQUIRE((result == value));
}
};
SECTION("simple compact with min size") {
auto settings = Settings;
settings.writeBundleDelay = crl::time_type(100);
settings.readBlockSize = 512;
settings.maxBundledRecords = 5;
settings.compactAfterExcess = (3 * (16 * 5 + 16) + 15 * 32) / 2;
settings.compactAfterFullSize = (sizeof(details::BasicHeader)
+ 40 * 32) / 2
+ settings.compactAfterExcess;
Database db(name, settings);
REQUIRE(Clear(db).type == Error::Type::None);
REQUIRE(Open(db, key).type == Error::Type::None);
put(db, 0, 30);
remove(db, 0, 15);
put(db, 30, 40);
reput(db, 15, 29);
AdvanceTime(1);
const auto path = GetBinlogPath();
const auto size = QFile(path).size();
reput(db, 29, 30); // starts compactor
AdvanceTime(2);
REQUIRE(QFile(path).size() < size);
remove(db, 30, 35);
reput(db, 35, 37);
put(db, 15, 20);
put(db, 40, 45);
const auto fullcheck = [&] {
check(db, 0, 15, {});
check(db, 15, 20, TestValue1);
check(db, 20, 30, TestValue2);
check(db, 30, 35, {});
check(db, 35, 37, TestValue2);
check(db, 37, 45, TestValue1);
};
fullcheck();
Close(db);
REQUIRE(Open(db, key).type == Error::Type::None);
fullcheck();
Close(db);
}
SECTION("simple compact without min size") {
auto settings = Settings;
settings.writeBundleDelay = crl::time_type(100);
settings.readBlockSize = 512;
settings.maxBundledRecords = 5;
settings.compactAfterExcess = 3 * (16 * 5 + 16) + 15 * 32;
Database db(name, settings);
REQUIRE(Clear(db).type == Error::Type::None);
REQUIRE(Open(db, key).type == Error::Type::None);
put(db, 0, 30);
remove(db, 0, 15);
put(db, 30, 40);
reput(db, 15, 29);
AdvanceTime(1);
const auto path = GetBinlogPath();
const auto size = QFile(path).size();
reput(db, 29, 30); // starts compactor
AdvanceTime(2);
REQUIRE(QFile(path).size() < size);
remove(db, 30, 35);
reput(db, 35, 37);
put(db, 15, 20);
put(db, 40, 45);
const auto fullcheck = [&] {
check(db, 0, 15, {});
check(db, 15, 20, TestValue1);
check(db, 20, 30, TestValue2);
check(db, 30, 35, {});
check(db, 35, 37, TestValue2);
check(db, 37, 45, TestValue1);
};
fullcheck();
Close(db);
REQUIRE(Open(db, key).type == Error::Type::None);
fullcheck();
Close(db);
}
SECTION("double compact") {
auto settings = Settings;
settings.writeBundleDelay = crl::time_type(100);
settings.readBlockSize = 512;
settings.maxBundledRecords = 5;
settings.compactAfterExcess = 3 * (16 * 5 + 16) + 15 * 32;
Database db(name, settings);
REQUIRE(Clear(db).type == Error::Type::None);
REQUIRE(Open(db, key).type == Error::Type::None);
put(db, 0, 30);
remove(db, 0, 15);
reput(db, 15, 29);
AdvanceTime(1);
const auto path = GetBinlogPath();
const auto size1 = QFile(path).size();
reput(db, 29, 30); // starts compactor
AdvanceTime(2);
REQUIRE(QFile(path).size() < size1);
put(db, 30, 45);
remove(db, 20, 35);
put(db, 15, 20);
reput(db, 35, 44);
const auto size2 = QFile(path).size();
reput(db, 44, 45); // starts compactor
AdvanceTime(2);
const auto after = QFile(path).size();
REQUIRE(after < size1);
REQUIRE(after < size2);
const auto fullcheck = [&] {
check(db, 0, 15, {});
check(db, 15, 20, TestValue1);
check(db, 20, 35, {});
check(db, 35, 45, TestValue2);
};
fullcheck();
Close(db);
REQUIRE(Open(db, key).type == Error::Type::None);
fullcheck();
Close(db);
}
SECTION("time tracking compact") {
auto settings = Settings;
settings.writeBundleDelay = crl::time_type(100);
settings.trackEstimatedTime = true;
settings.readBlockSize = 512;
settings.maxBundledRecords = 5;
settings.compactAfterExcess = 6 * (16 * 5 + 16)
+ 3 * (16 * 5 + 16)
+ 15 * 48
+ 3 * (16 * 5 + 16)
+ (16 * 1 + 16);
Database db(name, settings);
REQUIRE(Clear(db).type == Error::Type::None);
REQUIRE(Open(db, key).type == Error::Type::None);
put(db, 0, 30);
get(db, 0, 30);
//AdvanceTime(1); get's will be written instantly becase !(30 % 5)
remove(db, 0, 15);
reput(db, 15, 30);
get(db, 0, 30);
AdvanceTime(1);
const auto path = GetBinlogPath();
const auto size = QFile(path).size();
get(db, 29, 30); // starts compactor delayed
AdvanceTime(2);
REQUIRE(QFile(path).size() < size);
const auto fullcheck = [&] {
check(db, 15, 30, TestValue2);
};
fullcheck();
Close(db);
REQUIRE(Open(db, key).type == Error::Type::None);
fullcheck();
Close(db);
}
}
TEST_CASE("encrypted cache db", "[storage_cache_database]") {
SECTION("writing db") { SECTION("writing db") {
Database db(name, Settings); Database db(name, Settings);
@ -169,6 +378,33 @@ TEST_CASE("encrypted cache db", "[storage_cache_database]") {
REQUIRE(same == next); REQUIRE(same == next);
Close(db); Close(db);
} }
SECTION("reading db in many chunks") {
auto settings = Settings;
settings.readBlockSize = 512;
settings.maxBundledRecords = 5;
settings.trackEstimatedTime = true;
Database db(name, settings);
const auto count = 30U;
REQUIRE(Clear(db).type == Error::Type::None);
REQUIRE(Open(db, key).type == Error::Type::None);
for (auto i = 0U; i != count; ++i) {
auto value = TestValue1;
value[0] = char('A') + i;
const auto result = Put(db, Key{ i, i * 2 }, value);
REQUIRE(result.type == Error::Type::None);
}
Close(db);
REQUIRE(Open(db, key).type == Error::Type::None);
for (auto i = 0U; i != count; ++i) {
auto value = TestValue1;
value[0] = char('A') + i;
REQUIRE((Get(db, Key{ i, i * 2 }) == value));
}
Close(db);
}
} }
TEST_CASE("cache db remove", "[storage_cache_database]") { TEST_CASE("cache db remove", "[storage_cache_database]") {
@ -179,7 +415,7 @@ TEST_CASE("cache db remove", "[storage_cache_database]") {
REQUIRE(Open(db, key).type == Error::Type::None); REQUIRE(Open(db, key).type == Error::Type::None);
REQUIRE(Put(db, Key{ 0, 1 }, TestValue1).type == Error::Type::None); REQUIRE(Put(db, Key{ 0, 1 }, TestValue1).type == Error::Type::None);
REQUIRE(Put(db, Key{ 1, 0 }, TestValue2).type == Error::Type::None); REQUIRE(Put(db, Key{ 1, 0 }, TestValue2).type == Error::Type::None);
db.remove(Key{ 0, 1 }, nullptr); Remove(db, Key{ 0, 1 });
REQUIRE(Get(db, Key{ 0, 1 }).isEmpty()); REQUIRE(Get(db, Key{ 0, 1 }).isEmpty());
REQUIRE((Get(db, Key{ 1, 0 }) == TestValue2)); REQUIRE((Get(db, Key{ 1, 0 }) == TestValue2));
Close(db); Close(db);
@ -256,6 +492,9 @@ TEST_CASE("cache db bundled actions", "[storage_cache_database]") {
} }
TEST_CASE("cache db limits", "[storage_cache_database]") { TEST_CASE("cache db limits", "[storage_cache_database]") {
if (DisableLimitsTests) {
return;
}
SECTION("db both limit") { SECTION("db both limit") {
auto settings = Settings; auto settings = Settings;
settings.trackEstimatedTime = true; settings.trackEstimatedTime = true;

View File

@ -316,11 +316,12 @@ bool File::Move(const QString &from, const QString &to) {
} }
QFile destination(to); QFile destination(to);
if (destination.exists()) { if (destination.exists()) {
FileLock locker; {
if (!locker.lock(destination, QIODevice::WriteOnly)) { FileLock locker;
return false; if (!locker.lock(destination, QIODevice::WriteOnly)) {
return false;
}
} }
locker.unlock();
destination.close(); destination.close();
if (!destination.remove()) { if (!destination.remove()) {
return false; return false;

View File

@ -160,6 +160,10 @@ TEST_CASE("simple encrypted file", "[storage_encrypted_file]") {
REQUIRE(read == data.size()); REQUIRE(read == data.size());
REQUIRE(data == bytes::concatenate(Test1, Test1)); REQUIRE(data == bytes::concatenate(Test1, Test1));
} }
SECTION("moving file") {
const auto result = Storage::File::Move(Name, "other.file");
REQUIRE(result);
}
} }
TEST_CASE("two process encrypted file", "[storage_encrypted_file]") { TEST_CASE("two process encrypted file", "[storage_encrypted_file]") {