Implement Storage::Cache::Compactor for database.

This commit is contained in:
John Preston 2018-08-21 21:53:11 +03:00
parent 2f9d65b4eb
commit d426f7242a
10 changed files with 595 additions and 75 deletions

View File

@ -11,14 +11,21 @@ namespace Storage {
namespace Cache { namespace Cache {
namespace details { namespace details {
BinlogWrapper::BinlogWrapper(File &binlog, const Settings &settings) BinlogWrapper::BinlogWrapper(
File &binlog,
const Settings &settings,
int64 till)
: _binlog(binlog) : _binlog(binlog)
, _settings(settings) , _settings(settings)
, _till(_binlog.size()) , _till(till ? till : _binlog.size())
, _data(_settings.readBlockSize) , _data(_settings.readBlockSize)
, _full(_data) { , _full(_data) {
} }
bool BinlogWrapper::finished() const {
return _finished;
}
bool BinlogWrapper::failed() const { bool BinlogWrapper::failed() const {
return _failed; return _failed;
} }

View File

@ -21,8 +21,9 @@ class BinlogReader;
class BinlogWrapper { class BinlogWrapper {
public: public:
BinlogWrapper(File &binlog, const Settings &settings); BinlogWrapper(File &binlog, const Settings &settings, int64 till = 0);
bool finished() const;
bool failed() const; bool failed() const;
private: private:

View File

@ -7,47 +7,396 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/ */
#include "storage/cache/storage_cache_compactor.h" #include "storage/cache/storage_cache_compactor.h"
#include "storage/cache/storage_cache_database_object.h"
#include "storage/cache/storage_cache_binlog_reader.h"
#include <unordered_set>
namespace Storage { namespace Storage {
namespace Cache { namespace Cache {
namespace details { namespace details {
class CompactorObject { class CompactorObject {
public: public:
using Info = Compactor::Info;
CompactorObject( CompactorObject(
crl::weak_on_queue<CompactorObject> weak, crl::weak_on_queue<CompactorObject> weak,
const QString &path, crl::weak_on_queue<DatabaseObject> database,
crl::weak_on_queue<DatabaseObject> database); const QString &base,
const Settings &settings,
EncryptionKey &&key,
const Info &info);
private: private:
using Entry = DatabaseObject::Entry;
using Raw = DatabaseObject::Raw;
using RawSpan = gsl::span<const Raw>;
static QString CompactFilename();
void start(); void start();
QString binlogPath() const;
QString compactPath() const;
bool openBinlog();
bool openCompact();
void parseChunk();
void fail();
void done(int64 till);
void finish();
void finalize();
std::vector<Key> readChunk();
bool readBlock(std::vector<Key> &result);
void processValues(const std::vector<Raw> &values);
template <typename MultiRecord>
void initList();
RawSpan fillList(RawSpan values);
template <typename RecordStore>
RawSpan fillList(std::vector<RecordStore> &list, RawSpan values);
template <typename RecordStore>
void addListRecord(
std::vector<RecordStore> &list,
const Raw &raw);
bool writeList();
template <typename MultiRecord>
bool writeMultiStore();
crl::weak_on_queue<CompactorObject> _weak; crl::weak_on_queue<CompactorObject> _weak;
crl::weak_on_queue<DatabaseObject> _database; crl::weak_on_queue<DatabaseObject> _database;
QString _path; QString _base;
Settings _settings;
EncryptionKey _key;
Info _info;
File _binlog;
File _compact;
BinlogWrapper _wrapper;
size_type _partSize = 0;
std::unordered_set<Key> _written;
base::variant<
std::vector<MultiStore::Part>,
std::vector<MultiStoreWithTime::Part>> _list;
}; };
CompactorObject::CompactorObject( CompactorObject::CompactorObject(
crl::weak_on_queue<CompactorObject> weak, crl::weak_on_queue<CompactorObject> weak,
const QString &path, crl::weak_on_queue<DatabaseObject> database,
crl::weak_on_queue<DatabaseObject> database) const QString &base,
const Settings &settings,
EncryptionKey &&key,
const Info &info)
: _weak(std::move(weak)) : _weak(std::move(weak))
, _database(std::move(database)) , _database(std::move(database))
, _path(path) { , _base(base)
, _settings(settings)
, _key(std::move(key))
, _info(info)
, _wrapper(_binlog, _settings, _info.till)
, _partSize(_settings.maxBundledRecords) { // Perhaps a better estimate?
Expects(_settings.compactChunkSize > 0);
_written.reserve(_info.keysCount);
start(); start();
} }
template <typename MultiRecord>
void CompactorObject::initList() {
using Part = typename MultiRecord::Part;
auto list = std::vector<Part>();
list.reserve(_partSize);
_list = std::move(list);
}
void CompactorObject::start() { void CompactorObject::start() {
if (!openBinlog() || !openCompact()) {
fail();
}
if (_settings.trackEstimatedTime) {
initList<MultiStoreWithTime>();
} else {
initList<MultiStore>();
}
parseChunk();
}
QString CompactorObject::CompactFilename() {
return QStringLiteral("binlog-temp");
}
QString CompactorObject::binlogPath() const {
return _base + DatabaseObject::BinlogFilename();
}
QString CompactorObject::compactPath() const {
return _base + CompactFilename();
}
bool CompactorObject::openBinlog() {
const auto path = binlogPath();
const auto result = _binlog.open(path, File::Mode::Read, _key);
return (result == File::Result::Success)
&& (_binlog.size() >= _info.till);
}
bool CompactorObject::openCompact() {
const auto path = compactPath();
const auto result = _compact.open(path, File::Mode::Write, _key);
return (result == File::Result::Success);
}
void CompactorObject::fail() {
_compact.close();
QFile(compactPath()).remove();
_database.with([](DatabaseObject &database) {
database.compactorFail();
});
}
void CompactorObject::done(int64 till) {
const auto path = compactPath();
_database.with([=](DatabaseObject &database) {
database.compactorDone(path, till);
});
}
void CompactorObject::finish() {
if (writeList()) {
finalize();
} else {
fail();
}
}
void CompactorObject::finalize() {
_compact.close();
auto lastCatchUp = 0;
auto from = _info.till;
while (true) {
const auto till = CatchUp(
compactPath(),
binlogPath(),
_key,
from,
_settings.readBlockSize);
if (!till) {
fail();
return;
} else if (till == from
|| (lastCatchUp > 0 && (till - from) >= lastCatchUp)) {
done(till);
return;
}
lastCatchUp = (till - from);
from = till;
}
}
bool CompactorObject::writeList() {
if (_list.is<std::vector<MultiStore::Part>>()) {
return writeMultiStore<MultiStore>();
} else if (_list.is<std::vector<MultiStoreWithTime::Part>>()) {
return writeMultiStore<MultiStoreWithTime>();
} else {
Unexpected("List type in CompactorObject::writeList.");
}
}
template <typename MultiRecord>
bool CompactorObject::writeMultiStore() {
using Part = typename MultiRecord::Part;
Assert(_list.is<std::vector<Part>>());
auto &list = _list.get_unchecked<std::vector<Part>>();
if (list.empty()) {
return true;
}
const auto guard = gsl::finally([&] { list.clear(); });
const auto size = list.size();
auto header = MultiRecord(size);
if (_compact.write(bytes::object_as_span(&header))
&& _compact.write(bytes::make_span(list))) {
_compact.flush();
return true;
}
return false;
}
std::vector<Key> CompactorObject::readChunk() {
const auto limit = _settings.compactChunkSize;
auto result = std::vector<Key>();
while (result.size() < limit) {
if (!readBlock(result)) {
break;
}
}
return result;
}
bool CompactorObject::readBlock(std::vector<Key> &result) {
const auto push = [&](const Store &store) {
result.push_back(store.key);
return true;
};
const auto pushMulti = [&](const auto &element) {
while (const auto record = element()) {
push(*record);
}
return true;
};
if (_settings.trackEstimatedTime) {
BinlogReader<
StoreWithTime,
MultiStoreWithTime,
MultiRemove,
MultiAccess> reader(_wrapper);
return !reader.readTillEnd([&](const StoreWithTime &record) {
return push(record);
}, [&](const MultiStoreWithTime &header, const auto &element) {
return pushMulti(element);
}, [&](const MultiRemove &header, const auto &element) {
return true;
}, [&](const MultiAccess &header, const auto &element) {
return true;
});
} else {
BinlogReader<
Store,
MultiStore,
MultiRemove> reader(_wrapper);
return !reader.readTillEnd([&](const Store &record) {
return push(record);
}, [&](const MultiStore &header, const auto &element) {
return pushMulti(element);
}, [&](const MultiRemove &header, const auto &element) {
return true;
});
}
}
void CompactorObject::parseChunk() {
auto keys = readChunk();
if (_wrapper.failed()) {
fail();
return;
} else if (keys.empty()) {
finish();
return;
}
_database.with([
weak = _weak,
keys = std::move(keys)
](DatabaseObject &database) {
auto result = database.getManyRaw(keys);
weak.with([result = std::move(result)](CompactorObject &that) {
that.processValues(result);
});
});
}
void CompactorObject::processValues(
const std::vector<std::pair<Key, Entry>> &values) {
auto left = gsl::make_span(values);
while (true) {
left = fillList(left);
if (left.empty()) {
break;
} else if (!writeList()) {
fail();
return;
}
}
parseChunk();
}
auto CompactorObject::fillList(RawSpan values) -> RawSpan {
return _list.match([&](auto &list) {
return fillList(list, values);
});
}
template <typename RecordStore>
auto CompactorObject::fillList(
std::vector<RecordStore> &list,
RawSpan values
) -> RawSpan {
const auto b = std::begin(values);
const auto e = std::end(values);
auto i = b;
while (i != e && list.size() != _partSize) {
addListRecord(list, *i++);
}
return values.subspan(i - b);
}
template <typename RecordStore>
void CompactorObject::addListRecord(
std::vector<RecordStore> &list,
const Raw &raw) {
if (!_written.emplace(raw.first).second) {
return;
}
auto record = RecordStore();
record.key = raw.first;
record.size = ReadTo<EntrySize>(raw.second.size);
record.checksum = raw.second.checksum;
record.tag = raw.second.tag;
record.place = raw.second.place;
if constexpr (std::is_same_v<RecordStore, StoreWithTime>) {
record.time.setRelative(raw.second.useTime);
record.time.system = _info.systemTime;
}
list.push_back(record);
} }
Compactor::Compactor( Compactor::Compactor(
const QString &path, crl::weak_on_queue<DatabaseObject> database,
crl::weak_on_queue<DatabaseObject> database) const QString &base,
: _wrapped(path, std::move(database)) { const Settings &settings,
EncryptionKey &&key,
const Info &info)
: _wrapped(std::move(database), base, settings, std::move(key), info) {
} }
Compactor::~Compactor() = default; Compactor::~Compactor() = default;
int64 CatchUp(
const QString &compactPath,
const QString &binlogPath,
const EncryptionKey &key,
int64 from,
size_type block) {
File binlog, compact;
const auto result1 = binlog.open(binlogPath, File::Mode::Read, key);
if (result1 != File::Result::Success) {
return 0;
}
const auto till = binlog.size();
if (till < from || !binlog.seek(from)) {
return 0;
}
const auto result2 = compact.open(
compactPath,
File::Mode::ReadAppend,
key);
if (result2 != File::Result::Success || !compact.seek(compact.size())) {
return 0;
}
auto buffer = bytes::vector(block);
auto bytes = bytes::make_span(buffer);
do {
const auto left = (till - from);
const auto limit = std::min(size_type(left), block);
const auto read = binlog.read(bytes.subspan(0, limit));
if (!read || read > limit) {
return 0;
} else if (!compact.write(bytes.subspan(0, read))) {
return 0;
}
from += read;
} while (from != till);
return till;
}
} // namespace details } // namespace details
} // namespace Cache } // namespace Cache
} // namespace Storage } // namespace Storage

View File

@ -11,6 +11,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include <crl/crl_object_on_queue.h> #include <crl/crl_object_on_queue.h>
namespace Storage { namespace Storage {
class EncryptionKey;
namespace Cache { namespace Cache {
namespace details { namespace details {
@ -19,9 +20,18 @@ class DatabaseObject;
class Compactor { class Compactor {
public: public:
struct Info {
int64 till = 0;
uint32 systemTime = 0;
size_type keysCount = 0;
};
Compactor( Compactor(
const QString &path, crl::weak_on_queue<DatabaseObject> database,
crl::weak_on_queue<DatabaseObject> database); const QString &base,
const Settings &settings,
EncryptionKey &&key,
const Info &info);
~Compactor(); ~Compactor();
@ -31,6 +41,13 @@ private:
}; };
int64 CatchUp(
const QString &compactPath,
const QString &binlogPath,
const EncryptionKey &key,
int64 from,
size_type block);
} // namespace details } // namespace details
} // namespace Cache } // namespace Cache
} // namespace Storage } // namespace Storage

View File

@ -25,6 +25,8 @@ namespace Cache {
namespace details { namespace details {
namespace { namespace {
constexpr auto kMaxDelayAfterFailure = 24 * 60 * 60 * crl::time_type(1000);
uint32 CountChecksum(bytes::const_span data) { uint32 CountChecksum(bytes::const_span data) {
const auto seed = uint32(0); const auto seed = uint32(0);
return XXH32(data.data(), data.size(), seed); return XXH32(data.data(), data.size(), seed);
@ -63,7 +65,7 @@ DatabaseObject::Entry::Entry(
uint8 tag, uint8 tag,
uint32 checksum, uint32 checksum,
size_type size, size_type size,
int64 useTime) uint64 useTime)
: useTime(useTime) : useTime(useTime)
, size(size) , size(size)
, checksum(checksum) , checksum(checksum)
@ -80,8 +82,10 @@ DatabaseObject::DatabaseObject(
, _settings(settings) , _settings(settings)
, _writeBundlesTimer(_weak, [=] { writeBundles(); checkCompactor(); }) , _writeBundlesTimer(_weak, [=] { writeBundles(); checkCompactor(); })
, _pruneTimer(_weak, [=] { prune(); }) { , _pruneTimer(_weak, [=] { prune(); }) {
Expects(_settings.maxDataSize < kDataSizeLimit); Expects(_settings.maxDataSize > 0
Expects(_settings.maxBundledRecords < kBundledRecordsLimit); && _settings.maxDataSize < kDataSizeLimit);
Expects(_settings.maxBundledRecords > 0
&& _settings.maxBundledRecords < kBundledRecordsLimit);
Expects(!_settings.totalTimeLimit Expects(!_settings.totalTimeLimit
|| _settings.totalTimeLimit > 0); || _settings.totalTimeLimit > 0);
Expects(!_settings.totalSizeLimit Expects(!_settings.totalSizeLimit
@ -89,7 +93,9 @@ DatabaseObject::DatabaseObject(
} }
template <typename Callback, typename ...Args> template <typename Callback, typename ...Args>
void DatabaseObject::invokeCallback(Callback &&callback, Args &&...args) { void DatabaseObject::invokeCallback(
Callback &&callback,
Args &&...args) const {
if (callback) { if (callback) {
callback(std::move(args)...); callback(std::move(args)...);
} }
@ -137,23 +143,39 @@ QString DatabaseObject::computePath(Version version) const {
return _base + QString::number(version) + '/'; return _base + QString::number(version) + '/';
} }
QString DatabaseObject::binlogFilename() const { QString DatabaseObject::BinlogFilename() {
return QStringLiteral("binlog"); return QStringLiteral("binlog");
} }
QString DatabaseObject::CompactReadyFilename() {
return QStringLiteral("binlog-ready");
}
QString DatabaseObject::binlogPath(Version version) const { QString DatabaseObject::binlogPath(Version version) const {
return computePath(version) + binlogFilename(); return computePath(version) + BinlogFilename();
} }
QString DatabaseObject::binlogPath() const { QString DatabaseObject::binlogPath() const {
return _path + binlogFilename(); return _path + BinlogFilename();
}
QString DatabaseObject::compactReadyPath(Version version) const {
return computePath(version) + CompactReadyFilename();
}
QString DatabaseObject::compactReadyPath() const {
return _path + CompactReadyFilename();
} }
File::Result DatabaseObject::openBinlog( File::Result DatabaseObject::openBinlog(
Version version, Version version,
File::Mode mode, File::Mode mode,
EncryptionKey &key) { EncryptionKey &key) {
const auto ready = compactReadyPath(version);
const auto path = binlogPath(version); const auto path = binlogPath(version);
if (QFile(ready).exists() && !File::Move(ready, path)) {
return File::Result::Failed;
}
const auto result = _binlog.open(path, mode, key); const auto result = _binlog.open(path, mode, key);
if (result == File::Result::Success) { if (result == File::Result::Success) {
const auto headerRequired = (mode == File::Mode::Read) const auto headerRequired = (mode == File::Mode::Read)
@ -180,14 +202,14 @@ bool DatabaseObject::readHeader() {
!= !!(header.flags & header.kTrackEstimatedTime)) { != !!(header.flags & header.kTrackEstimatedTime)) {
return false; return false;
} }
_relativeTime = _latestSystemTime = header.systemTime; _time.setRelative((_time.system = header.systemTime));
return true; return true;
} }
bool DatabaseObject::writeHeader() { bool DatabaseObject::writeHeader() {
auto header = BasicHeader(); auto header = BasicHeader();
const auto now = _settings.trackEstimatedTime ? GetUnixtime() : 0; const auto now = _settings.trackEstimatedTime ? GetUnixtime() : 0;
_relativeTime = _latestSystemTime = header.systemTime = now; _time.setRelative((_time.system = header.systemTime = now));
if (_settings.trackEstimatedTime) { if (_settings.trackEstimatedTime) {
header.flags |= header.kTrackEstimatedTime; header.flags |= header.kTrackEstimatedTime;
} }
@ -243,16 +265,17 @@ void DatabaseObject::readBinlog() {
optimize(); optimize();
} }
int64 DatabaseObject::countRelativeTime() const { uint64 DatabaseObject::countRelativeTime() const {
const auto now = GetUnixtime(); const auto now = GetUnixtime();
const auto delta = std::max(int64(now) - int64(_latestSystemTime), 0LL); const auto delta = std::max(int64(now) - int64(_time.system), 0LL);
return _relativeTime + delta; return _time.getRelative() + delta;
} }
int64 DatabaseObject::pruneBeforeTime() const { uint64 DatabaseObject::pruneBeforeTime() const {
return _settings.totalTimeLimit const auto relative = countRelativeTime();
? (countRelativeTime() - _settings.totalTimeLimit) return (_settings.totalTimeLimit && relative > _settings.totalTimeLimit)
: 0LL; ? (relative - _settings.totalTimeLimit)
: 0ULL;
} }
void DatabaseObject::optimize() { void DatabaseObject::optimize() {
@ -265,12 +288,13 @@ bool DatabaseObject::startDelayedPruning() {
if (!_settings.trackEstimatedTime || _map.empty()) { if (!_settings.trackEstimatedTime || _map.empty()) {
return false; return false;
} }
const auto before = pruneBeforeTime();
const auto pruning = [&] { const auto pruning = [&] {
if (_settings.totalSizeLimit > 0 if (_settings.totalSizeLimit > 0
&& _totalSize > _settings.totalSizeLimit) { && _totalSize > _settings.totalSizeLimit) {
return true; return true;
} else if (_minimalEntryTime != 0 } else if (_minimalEntryTime != 0
&& _minimalEntryTime <= pruneBeforeTime()) { && _minimalEntryTime <= before) {
return true; return true;
} }
return false; return false;
@ -282,8 +306,8 @@ bool DatabaseObject::startDelayedPruning() {
} }
return true; return true;
} else if (_minimalEntryTime != 0) { } else if (_minimalEntryTime != 0) {
const auto before = pruneBeforeTime(); Assert(_minimalEntryTime > before);
const auto seconds = (_minimalEntryTime - before); const auto seconds = int64(_minimalEntryTime - before);
if (!_pruneTimer.isActive()) { if (!_pruneTimer.isActive()) {
_pruneTimer.callOnce(std::min( _pruneTimer.callOnce(std::min(
seconds * crl::time_type(1000), seconds * crl::time_type(1000),
@ -384,7 +408,7 @@ void DatabaseObject::adjustRelativeTime() {
return; return;
} }
const auto now = GetUnixtime(); const auto now = GetUnixtime();
if (now < _latestSystemTime) { if (now < _time.system) {
writeMultiAccessBlock(); writeMultiAccessBlock();
} }
} }
@ -402,7 +426,7 @@ bool DatabaseObject::processRecordStoreGeneric(
record->tag, record->tag,
record->checksum, record->checksum,
size, size,
_relativeTime); _time.getRelative());
if (!postprocess(entry, record)) { if (!postprocess(entry, record)) {
return false; return false;
} }
@ -424,7 +448,7 @@ bool DatabaseObject::processRecordStore(
Entry &entry, Entry &entry,
not_null<const StoreWithTime*> record) { not_null<const StoreWithTime*> record) {
applyTimePoint(record->time); applyTimePoint(record->time);
entry.useTime = _relativeTime; entry.useTime = record->time.getRelative();
return true; return true;
}; };
return processRecordStoreGeneric(record, postprocess); return processRecordStoreGeneric(record, postprocess);
@ -464,16 +488,14 @@ bool DatabaseObject::processRecordMultiAccess(
const GetElement &element) { const GetElement &element) {
Expects(_settings.trackEstimatedTime); Expects(_settings.trackEstimatedTime);
if (header.time.relativeAdvancement > _settings.maxTimeAdvancement) {
return false;
}
applyTimePoint(header.time); applyTimePoint(header.time);
const auto relative = header.time.getRelative();
_binlogExcessLength += sizeof(header); _binlogExcessLength += sizeof(header);
while (const auto entry = element()) { while (const auto entry = element()) {
_binlogExcessLength += sizeof(*entry); _binlogExcessLength += sizeof(*entry);
if (const auto i = _map.find(*entry); i != end(_map)) { if (const auto i = _map.find(*entry); i != end(_map)) {
i->second.useTime = _relativeTime; i->second.useTime = relative;
} }
} }
return true; return true;
@ -516,26 +538,78 @@ void DatabaseObject::eraseMapEntry(const Map::const_iterator &i) {
} }
EstimatedTimePoint DatabaseObject::countTimePoint() const { EstimatedTimePoint DatabaseObject::countTimePoint() const {
const auto now = std::max(GetUnixtime(), 1); const auto now = GetUnixtime();
const auto delta = std::max(int64(now) - int64(_latestSystemTime), 0LL); const auto delta = std::max(int64(now) - int64(_time.system), 0LL);
auto result = EstimatedTimePoint(); auto result = EstimatedTimePoint();
result.system = now; result.system = now;
result.relativeAdvancement = std::min( result.setRelative(_time.getRelative() + delta);
delta,
int64(_settings.maxTimeAdvancement));
return result; return result;
} }
void DatabaseObject::applyTimePoint(EstimatedTimePoint time) { void DatabaseObject::applyTimePoint(EstimatedTimePoint time) {
_relativeTime += time.relativeAdvancement; const auto possible = time.getRelative();
_latestSystemTime = time.system; const auto current = _time.getRelative();
if (possible > current) {
_time = time;
}
}
void DatabaseObject::compactorDone(
const QString &path,
int64 originalReadTill) {
const auto size = _binlog.size();
const auto binlog = binlogPath();
const auto ready = compactReadyPath();
if (originalReadTill != size) {
originalReadTill = CatchUp(
path,
binlog,
_key,
originalReadTill,
_settings.readBlockSize);
if (originalReadTill != size) {
compactorFail();
return;
}
} else if (!File::Move(path, ready)) {
compactorFail();
return;
}
const auto guard = gsl::finally([&] {
_compactor = CompactorWrap();
});
_binlog.close();
if (!File::Move(ready, binlog)) {
// megafail
compactorFail();
return;
}
const auto result = _binlog.open(path, File::Mode::ReadAppend, _key);
if (result != File::Result::Success || !_binlog.seek(_binlog.size())) {
// megafail
compactorFail();
return;
}
_binlogExcessLength -= _compactor.excessLength;
Assert(_binlogExcessLength >= 0);
}
void DatabaseObject::compactorFail() {
const auto delay = _compactor.delayAfterFailure;
_compactor = CompactorWrap();
_compactor.nextAttempt = crl::time() + delay;
_compactor.delayAfterFailure = std::min(
delay * 2,
kMaxDelayAfterFailure);
QFile(compactReadyPath()).remove();
} }
void DatabaseObject::close(FnMut<void()> done) { void DatabaseObject::close(FnMut<void()> done) {
writeBundles(); writeBundles();
_cleaner = CleanerWrap(); _cleaner = CleanerWrap();
_compactor = nullptr; _compactor = CompactorWrap();
_binlog.close(); _binlog.close();
_key = EncryptionKey();
invokeCallback(done); invokeCallback(done);
_map.clear(); _map.clear();
_binlogExcessLength = 0; _binlogExcessLength = 0;
@ -643,12 +717,14 @@ base::optional<QString> DatabaseObject::writeKeyPlace(
} }
auto record = StoreWithTime(); auto record = StoreWithTime();
record.time = countTimePoint(); record.time = countTimePoint();
if (record.time.relativeAdvancement * crl::time_type(1000) const auto writing = record.time.getRelative();
const auto current = _time.getRelative();
Assert(writing >= current);
if ((writing - current) * crl::time_type(1000)
< _settings.writeBundleDelay) { < _settings.writeBundleDelay) {
// We don't want to produce a lot of unique relativeTime values. // We don't want to produce a lot of unique _time.relative values.
// So if change in it is not large we stick to the old value. // So if change in it is not large we stick to the old value.
record.time.system = _latestSystemTime; record.time = _time;
record.time.relativeAdvancement = 0;
} }
return writeKeyPlaceGeneric(std::move(record), key, data, checksum); return writeKeyPlaceGeneric(std::move(record), key, data, checksum);
} }
@ -782,10 +858,10 @@ void DatabaseObject::writeMultiAccessBlock() {
list.push_back(key); list.push_back(key);
} }
} }
applyTimePoint(time); _time = time;
for (const auto &entry : list) { for (const auto &entry : list) {
if (const auto i = _map.find(entry); i != end(_map)) { if (const auto i = _map.find(entry); i != end(_map)) {
i->second.useTime = _relativeTime; i->second.useTime = _time.getRelative();
} }
} }
@ -826,7 +902,7 @@ void DatabaseObject::cleanerDone(Error error) {
} }
void DatabaseObject::checkCompactor() { void DatabaseObject::checkCompactor() {
if (_compactor if (_compactor.object
|| !_settings.compactAfterExcess || !_settings.compactAfterExcess
|| _binlogExcessLength < _settings.compactAfterExcess) { || _binlogExcessLength < _settings.compactAfterExcess) {
return; return;
@ -835,8 +911,20 @@ void DatabaseObject::checkCompactor() {
< _settings.compactAfterExcess * _binlog.size()) { < _settings.compactAfterExcess * _binlog.size()) {
return; return;
} }
} else if (crl::time() < _compactor.nextAttempt) {
return;
} }
_compactor = std::make_unique<Compactor>(_path, _weak); auto info = Compactor::Info();
info.till = _binlog.size();
info.systemTime = _time.system;
info.keysCount = _map.size();
_compactor.object = std::make_unique<Compactor>(
_weak,
_path,
_settings,
base::duplicate(_key),
info);
_compactor.excessLength = _binlogExcessLength;
} }
void DatabaseObject::clear(FnMut<void(Error)> done) { void DatabaseObject::clear(FnMut<void(Error)> done) {
@ -848,6 +936,18 @@ void DatabaseObject::clear(FnMut<void(Error)> done) {
writeVersion(version) ? Error::NoError() : ioError(versionPath())); writeVersion(version) ? Error::NoError() : ioError(versionPath()));
} }
auto DatabaseObject::getManyRaw(const std::vector<Key> keys) const
-> std::vector<Raw> {
auto result = std::vector<Raw>();
result.reserve(keys.size());
for (const auto &key : keys) {
if (const auto i = _map.find(key); i != end(_map)) {
result.push_back(*i);
}
}
return result;
}
DatabaseObject::~DatabaseObject() { DatabaseObject::~DatabaseObject() {
close(nullptr); close(nullptr);
} }

View File

@ -38,9 +38,12 @@ public:
void clear(FnMut<void(Error)> done); void clear(FnMut<void(Error)> done);
~DatabaseObject(); static QString BinlogFilename();
static QString CompactReadyFilename();
void compactorDone(const QString &path, int64 originalReadTill);
void compactorFail();
private:
struct Entry { struct Entry {
Entry() = default; Entry() = default;
Entry( Entry(
@ -48,29 +51,42 @@ private:
uint8 tag, uint8 tag,
uint32 checksum, uint32 checksum,
size_type size, size_type size,
int64 useTime); uint64 useTime);
int64 useTime = 0; uint64 useTime = 0;
size_type size = 0; size_type size = 0;
uint32 checksum = 0; uint32 checksum = 0;
PlaceId place = { { 0 } }; PlaceId place = { { 0 } };
uint8 tag = 0; uint8 tag = 0;
}; };
using Raw = std::pair<Key, Entry>;
std::vector<Raw> getManyRaw(const std::vector<Key> keys) const;
~DatabaseObject();
private:
struct CleanerWrap { struct CleanerWrap {
std::unique_ptr<Cleaner> object; std::unique_ptr<Cleaner> object;
base::binary_guard guard; base::binary_guard guard;
}; };
struct CompactorWrap {
std::unique_ptr<Compactor> object;
int64 excessLength = 0;
crl::time_type nextAttempt = 0;
crl::time_type delayAfterFailure = 10 * crl::time_type(1000);
};
using Map = std::unordered_map<Key, Entry>; using Map = std::unordered_map<Key, Entry>;
template <typename Callback, typename ...Args> template <typename Callback, typename ...Args>
void invokeCallback(Callback &&callback, Args &&...args); void invokeCallback(Callback &&callback, Args &&...args) const;
Error ioError(const QString &path) const; Error ioError(const QString &path) const;
QString computePath(Version version) const; QString computePath(Version version) const;
QString binlogPath(Version version) const; QString binlogPath(Version version) const;
QString binlogPath() const; QString binlogPath() const;
QString binlogFilename() const; QString compactReadyPath(Version version) const;
QString compactReadyPath() const;
File::Result openBinlog( File::Result openBinlog(
Version version, Version version,
File::Mode mode, File::Mode mode,
@ -106,10 +122,10 @@ private:
void checkCompactor(); void checkCompactor();
void adjustRelativeTime(); void adjustRelativeTime();
bool startDelayedPruning(); bool startDelayedPruning();
int64 countRelativeTime() const; uint64 countRelativeTime() const;
EstimatedTimePoint countTimePoint() const; EstimatedTimePoint countTimePoint() const;
void applyTimePoint(EstimatedTimePoint time); void applyTimePoint(EstimatedTimePoint time);
int64 pruneBeforeTime() const; uint64 pruneBeforeTime() const;
void prune(); void prune();
void collectTimePrune( void collectTimePrune(
base::flat_set<Key> &stale, base::flat_set<Key> &stale,
@ -161,20 +177,18 @@ private:
std::set<Key> _removing; std::set<Key> _removing;
std::set<Key> _accessed; std::set<Key> _accessed;
int64 _relativeTime = 0; EstimatedTimePoint _time;
int64 _timeCorrection = 0;
uint32 _latestSystemTime = 0;
int64 _binlogExcessLength = 0; int64 _binlogExcessLength = 0;
int64 _totalSize = 0; int64 _totalSize = 0;
int64 _minimalEntryTime = 0; uint64 _minimalEntryTime = 0;
size_type _entriesWithMinimalTimeCount = 0; size_type _entriesWithMinimalTimeCount = 0;
base::ConcurrentTimer _writeBundlesTimer; base::ConcurrentTimer _writeBundlesTimer;
base::ConcurrentTimer _pruneTimer; base::ConcurrentTimer _pruneTimer;
CleanerWrap _cleaner; CleanerWrap _cleaner;
std::unique_ptr<Compactor> _compactor; CompactorWrap _compactor;
}; };

View File

@ -40,11 +40,11 @@ struct Settings {
int64 compactAfterExcess = 8 * 1024 * 1024; int64 compactAfterExcess = 8 * 1024 * 1024;
int64 compactAfterFullSize = 0; int64 compactAfterFullSize = 0;
size_type compactChunkSize = 16 * 1024;
bool trackEstimatedTime = true; bool trackEstimatedTime = true;
int64 totalSizeLimit = 1024 * 1024 * 1024; int64 totalSizeLimit = 1024 * 1024 * 1024;
size_type totalTimeLimit = 30 * 86400; // One month in seconds. size_type totalTimeLimit = 30 * 86400; // One month in seconds.
size_type maxTimeAdvancement = 365 * 86400; // One year in seconds.
crl::time_type pruneTimeout = 5 * crl::time_type(1000); crl::time_type pruneTimeout = 5 * crl::time_type(1000);
crl::time_type maxPruneCheckTimeout = 3600 * crl::time_type(1000); crl::time_type maxPruneCheckTimeout = 3600 * crl::time_type(1000);
}; };
@ -131,8 +131,17 @@ struct BasicHeader {
}; };
struct EstimatedTimePoint { struct EstimatedTimePoint {
uint32 relative1 = 0;
uint32 relative2 = 0;
uint32 system = 0; uint32 system = 0;
uint32 relativeAdvancement = 0;
void setRelative(uint64 value) {
relative1 = uint32(value & 0xFFFFFFFFU);
relative2 = uint32((value >> 32) & 0xFFFFFFFFU);
}
uint64 getRelative() const {
return uint64(relative1) | (uint64(relative2) << 32);
}
}; };
struct Store { struct Store {
@ -145,12 +154,10 @@ struct Store {
uint32 checksum = 0; uint32 checksum = 0;
Key key; Key key;
}; };
static_assert(GoodForEncryption<Store>);
struct StoreWithTime : Store { struct StoreWithTime : Store {
EstimatedTimePoint time; EstimatedTimePoint time;
uint32 reserved1 = 0; uint32 reserved = 0;
uint32 reserved2 = 0;
}; };
struct MultiStore { struct MultiStore {
@ -168,6 +175,8 @@ struct MultiStore {
size_type validateCount() const; size_type validateCount() const;
}; };
struct MultiStoreWithTime : MultiStore { struct MultiStoreWithTime : MultiStore {
using MultiStore::MultiStore;
using Part = StoreWithTime; using Part = StoreWithTime;
}; };
@ -196,7 +205,6 @@ struct MultiAccess {
RecordType type = kType; RecordType type = kType;
RecordsCount count = { { 0 } }; RecordsCount count = { { 0 } };
EstimatedTimePoint time; EstimatedTimePoint time;
uint32 reserved = 0;
using Part = Key; using Part = Key;
size_type validateCount() const; size_type validateCount() const;

View File

@ -309,4 +309,25 @@ bool File::seek(int64 offset) {
return true; return true;
} }
bool File::Move(const QString &from, const QString &to) {
QFile source(from);
if (!source.exists()) {
return false;
}
QFile destination(to);
if (destination.exists()) {
FileLock locker;
if (!locker.lock(destination, QIODevice::WriteOnly)) {
return false;
}
locker.unlock();
destination.close();
if (!destination.remove()) {
return false;
}
}
return source.rename(to);
}
} // namespace Storage } // namespace Storage

View File

@ -43,6 +43,8 @@ public:
void close(); void close();
static bool Move(const QString &from, const QString &to);
private: private:
Result attemptOpen(Mode mode, const EncryptionKey &key); Result attemptOpen(Mode mode, const EncryptionKey &key);
Result attemptOpenForRead(const EncryptionKey &key); Result attemptOpenForRead(const EncryptionKey &key);

View File

@ -17,6 +17,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include <vector> #include <vector>
#include <unordered_map> #include <unordered_map>
#include <unordered_set>
#include <set> #include <set>
#include <range/v3/all.hpp> #include <range/v3/all.hpp>