From d426f7242a523b510fdfe6c9309c23928847c311 Mon Sep 17 00:00:00 2001 From: John Preston Date: Tue, 21 Aug 2018 21:53:11 +0300 Subject: [PATCH] Implement Storage::Cache::Compactor for database. --- .../cache/storage_cache_binlog_reader.cpp | 11 +- .../cache/storage_cache_binlog_reader.h | 3 +- .../storage/cache/storage_cache_compactor.cpp | 367 +++++++++++++++++- .../storage/cache/storage_cache_compactor.h | 21 +- .../cache/storage_cache_database_object.cpp | 184 +++++++-- .../cache/storage_cache_database_object.h | 40 +- .../storage/cache/storage_cache_types.h | 20 +- .../storage/storage_encrypted_file.cpp | 21 + .../storage/storage_encrypted_file.h | 2 + Telegram/SourceFiles/storage/storage_pch.h | 1 + 10 files changed, 595 insertions(+), 75 deletions(-) diff --git a/Telegram/SourceFiles/storage/cache/storage_cache_binlog_reader.cpp b/Telegram/SourceFiles/storage/cache/storage_cache_binlog_reader.cpp index 492d5f940..50756f0f2 100644 --- a/Telegram/SourceFiles/storage/cache/storage_cache_binlog_reader.cpp +++ b/Telegram/SourceFiles/storage/cache/storage_cache_binlog_reader.cpp @@ -11,14 +11,21 @@ namespace Storage { namespace Cache { namespace details { -BinlogWrapper::BinlogWrapper(File &binlog, const Settings &settings) +BinlogWrapper::BinlogWrapper( + File &binlog, + const Settings &settings, + int64 till) : _binlog(binlog) , _settings(settings) -, _till(_binlog.size()) +, _till(till ? till : _binlog.size()) , _data(_settings.readBlockSize) , _full(_data) { } +bool BinlogWrapper::finished() const { + return _finished; +} + bool BinlogWrapper::failed() const { return _failed; } diff --git a/Telegram/SourceFiles/storage/cache/storage_cache_binlog_reader.h b/Telegram/SourceFiles/storage/cache/storage_cache_binlog_reader.h index ef48d19e4..fede34185 100644 --- a/Telegram/SourceFiles/storage/cache/storage_cache_binlog_reader.h +++ b/Telegram/SourceFiles/storage/cache/storage_cache_binlog_reader.h @@ -21,8 +21,9 @@ class BinlogReader; class BinlogWrapper { public: - BinlogWrapper(File &binlog, const Settings &settings); + BinlogWrapper(File &binlog, const Settings &settings, int64 till = 0); + bool finished() const; bool failed() const; private: diff --git a/Telegram/SourceFiles/storage/cache/storage_cache_compactor.cpp b/Telegram/SourceFiles/storage/cache/storage_cache_compactor.cpp index 981ae015b..876692c9b 100644 --- a/Telegram/SourceFiles/storage/cache/storage_cache_compactor.cpp +++ b/Telegram/SourceFiles/storage/cache/storage_cache_compactor.cpp @@ -7,47 +7,396 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL */ #include "storage/cache/storage_cache_compactor.h" +#include "storage/cache/storage_cache_database_object.h" +#include "storage/cache/storage_cache_binlog_reader.h" +#include + namespace Storage { namespace Cache { namespace details { class CompactorObject { public: + using Info = Compactor::Info; + CompactorObject( crl::weak_on_queue weak, - const QString &path, - crl::weak_on_queue database); + crl::weak_on_queue database, + const QString &base, + const Settings &settings, + EncryptionKey &&key, + const Info &info); private: + using Entry = DatabaseObject::Entry; + using Raw = DatabaseObject::Raw; + using RawSpan = gsl::span; + static QString CompactFilename(); + void start(); + QString binlogPath() const; + QString compactPath() const; + bool openBinlog(); + bool openCompact(); + void parseChunk(); + void fail(); + void done(int64 till); + void finish(); + void finalize(); + + std::vector readChunk(); + bool readBlock(std::vector &result); + void processValues(const std::vector &values); + + template + void initList(); + RawSpan fillList(RawSpan values); + template + RawSpan fillList(std::vector &list, RawSpan values); + template + void addListRecord( + std::vector &list, + const Raw &raw); + bool writeList(); + template + bool writeMultiStore(); crl::weak_on_queue _weak; crl::weak_on_queue _database; - QString _path; + QString _base; + Settings _settings; + EncryptionKey _key; + Info _info; + File _binlog; + File _compact; + BinlogWrapper _wrapper; + size_type _partSize = 0; + std::unordered_set _written; + base::variant< + std::vector, + std::vector> _list; }; CompactorObject::CompactorObject( crl::weak_on_queue weak, - const QString &path, - crl::weak_on_queue database) + crl::weak_on_queue database, + const QString &base, + const Settings &settings, + EncryptionKey &&key, + const Info &info) : _weak(std::move(weak)) , _database(std::move(database)) -, _path(path) { +, _base(base) +, _settings(settings) +, _key(std::move(key)) +, _info(info) +, _wrapper(_binlog, _settings, _info.till) +, _partSize(_settings.maxBundledRecords) { // Perhaps a better estimate? + Expects(_settings.compactChunkSize > 0); + + _written.reserve(_info.keysCount); start(); } +template +void CompactorObject::initList() { + using Part = typename MultiRecord::Part; + auto list = std::vector(); + list.reserve(_partSize); + _list = std::move(list); +} + void CompactorObject::start() { + if (!openBinlog() || !openCompact()) { + fail(); + } + if (_settings.trackEstimatedTime) { + initList(); + } else { + initList(); + } + parseChunk(); +} + +QString CompactorObject::CompactFilename() { + return QStringLiteral("binlog-temp"); +} + +QString CompactorObject::binlogPath() const { + return _base + DatabaseObject::BinlogFilename(); +} + +QString CompactorObject::compactPath() const { + return _base + CompactFilename(); +} + +bool CompactorObject::openBinlog() { + const auto path = binlogPath(); + const auto result = _binlog.open(path, File::Mode::Read, _key); + return (result == File::Result::Success) + && (_binlog.size() >= _info.till); +} + +bool CompactorObject::openCompact() { + const auto path = compactPath(); + const auto result = _compact.open(path, File::Mode::Write, _key); + return (result == File::Result::Success); +} + +void CompactorObject::fail() { + _compact.close(); + QFile(compactPath()).remove(); + _database.with([](DatabaseObject &database) { + database.compactorFail(); + }); +} + +void CompactorObject::done(int64 till) { + const auto path = compactPath(); + _database.with([=](DatabaseObject &database) { + database.compactorDone(path, till); + }); +} + +void CompactorObject::finish() { + if (writeList()) { + finalize(); + } else { + fail(); + } +} + +void CompactorObject::finalize() { + _compact.close(); + + auto lastCatchUp = 0; + auto from = _info.till; + while (true) { + const auto till = CatchUp( + compactPath(), + binlogPath(), + _key, + from, + _settings.readBlockSize); + if (!till) { + fail(); + return; + } else if (till == from + || (lastCatchUp > 0 && (till - from) >= lastCatchUp)) { + done(till); + return; + } + lastCatchUp = (till - from); + from = till; + } +} + +bool CompactorObject::writeList() { + if (_list.is>()) { + return writeMultiStore(); + } else if (_list.is>()) { + return writeMultiStore(); + } else { + Unexpected("List type in CompactorObject::writeList."); + } +} + +template +bool CompactorObject::writeMultiStore() { + using Part = typename MultiRecord::Part; + Assert(_list.is>()); + auto &list = _list.get_unchecked>(); + if (list.empty()) { + return true; + } + const auto guard = gsl::finally([&] { list.clear(); }); + const auto size = list.size(); + auto header = MultiRecord(size); + if (_compact.write(bytes::object_as_span(&header)) + && _compact.write(bytes::make_span(list))) { + _compact.flush(); + return true; + } + return false; +} + +std::vector CompactorObject::readChunk() { + const auto limit = _settings.compactChunkSize; + auto result = std::vector(); + while (result.size() < limit) { + if (!readBlock(result)) { + break; + } + } + return result; +} + +bool CompactorObject::readBlock(std::vector &result) { + const auto push = [&](const Store &store) { + result.push_back(store.key); + return true; + }; + const auto pushMulti = [&](const auto &element) { + while (const auto record = element()) { + push(*record); + } + return true; + }; + if (_settings.trackEstimatedTime) { + BinlogReader< + StoreWithTime, + MultiStoreWithTime, + MultiRemove, + MultiAccess> reader(_wrapper); + return !reader.readTillEnd([&](const StoreWithTime &record) { + return push(record); + }, [&](const MultiStoreWithTime &header, const auto &element) { + return pushMulti(element); + }, [&](const MultiRemove &header, const auto &element) { + return true; + }, [&](const MultiAccess &header, const auto &element) { + return true; + }); + } else { + BinlogReader< + Store, + MultiStore, + MultiRemove> reader(_wrapper); + return !reader.readTillEnd([&](const Store &record) { + return push(record); + }, [&](const MultiStore &header, const auto &element) { + return pushMulti(element); + }, [&](const MultiRemove &header, const auto &element) { + return true; + }); + } +} + +void CompactorObject::parseChunk() { + auto keys = readChunk(); + if (_wrapper.failed()) { + fail(); + return; + } else if (keys.empty()) { + finish(); + return; + } + _database.with([ + weak = _weak, + keys = std::move(keys) + ](DatabaseObject &database) { + auto result = database.getManyRaw(keys); + weak.with([result = std::move(result)](CompactorObject &that) { + that.processValues(result); + }); + }); +} + +void CompactorObject::processValues( + const std::vector> &values) { + auto left = gsl::make_span(values); + while (true) { + left = fillList(left); + if (left.empty()) { + break; + } else if (!writeList()) { + fail(); + return; + } + } + parseChunk(); +} + +auto CompactorObject::fillList(RawSpan values) -> RawSpan { + return _list.match([&](auto &list) { + return fillList(list, values); + }); +} + +template +auto CompactorObject::fillList( + std::vector &list, + RawSpan values +) -> RawSpan { + const auto b = std::begin(values); + const auto e = std::end(values); + auto i = b; + while (i != e && list.size() != _partSize) { + addListRecord(list, *i++); + } + return values.subspan(i - b); +} + +template +void CompactorObject::addListRecord( + std::vector &list, + const Raw &raw) { + if (!_written.emplace(raw.first).second) { + return; + } + auto record = RecordStore(); + record.key = raw.first; + record.size = ReadTo(raw.second.size); + record.checksum = raw.second.checksum; + record.tag = raw.second.tag; + record.place = raw.second.place; + if constexpr (std::is_same_v) { + record.time.setRelative(raw.second.useTime); + record.time.system = _info.systemTime; + } + list.push_back(record); } Compactor::Compactor( - const QString &path, - crl::weak_on_queue database) -: _wrapped(path, std::move(database)) { + crl::weak_on_queue database, + const QString &base, + const Settings &settings, + EncryptionKey &&key, + const Info &info) +: _wrapped(std::move(database), base, settings, std::move(key), info) { } Compactor::~Compactor() = default; +int64 CatchUp( + const QString &compactPath, + const QString &binlogPath, + const EncryptionKey &key, + int64 from, + size_type block) { + File binlog, compact; + const auto result1 = binlog.open(binlogPath, File::Mode::Read, key); + if (result1 != File::Result::Success) { + return 0; + } + const auto till = binlog.size(); + if (till < from || !binlog.seek(from)) { + return 0; + } + const auto result2 = compact.open( + compactPath, + File::Mode::ReadAppend, + key); + if (result2 != File::Result::Success || !compact.seek(compact.size())) { + return 0; + } + auto buffer = bytes::vector(block); + auto bytes = bytes::make_span(buffer); + do { + const auto left = (till - from); + const auto limit = std::min(size_type(left), block); + const auto read = binlog.read(bytes.subspan(0, limit)); + if (!read || read > limit) { + return 0; + } else if (!compact.write(bytes.subspan(0, read))) { + return 0; + } + from += read; + } while (from != till); + return till; +} + } // namespace details } // namespace Cache } // namespace Storage diff --git a/Telegram/SourceFiles/storage/cache/storage_cache_compactor.h b/Telegram/SourceFiles/storage/cache/storage_cache_compactor.h index 8abf7ee39..1752b4358 100644 --- a/Telegram/SourceFiles/storage/cache/storage_cache_compactor.h +++ b/Telegram/SourceFiles/storage/cache/storage_cache_compactor.h @@ -11,6 +11,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include namespace Storage { +class EncryptionKey; namespace Cache { namespace details { @@ -19,9 +20,18 @@ class DatabaseObject; class Compactor { public: + struct Info { + int64 till = 0; + uint32 systemTime = 0; + size_type keysCount = 0; + }; + Compactor( - const QString &path, - crl::weak_on_queue database); + crl::weak_on_queue database, + const QString &base, + const Settings &settings, + EncryptionKey &&key, + const Info &info); ~Compactor(); @@ -31,6 +41,13 @@ private: }; +int64 CatchUp( + const QString &compactPath, + const QString &binlogPath, + const EncryptionKey &key, + int64 from, + size_type block); + } // namespace details } // namespace Cache } // namespace Storage diff --git a/Telegram/SourceFiles/storage/cache/storage_cache_database_object.cpp b/Telegram/SourceFiles/storage/cache/storage_cache_database_object.cpp index 654181c70..7ee2ec104 100644 --- a/Telegram/SourceFiles/storage/cache/storage_cache_database_object.cpp +++ b/Telegram/SourceFiles/storage/cache/storage_cache_database_object.cpp @@ -25,6 +25,8 @@ namespace Cache { namespace details { namespace { +constexpr auto kMaxDelayAfterFailure = 24 * 60 * 60 * crl::time_type(1000); + uint32 CountChecksum(bytes::const_span data) { const auto seed = uint32(0); return XXH32(data.data(), data.size(), seed); @@ -63,7 +65,7 @@ DatabaseObject::Entry::Entry( uint8 tag, uint32 checksum, size_type size, - int64 useTime) + uint64 useTime) : useTime(useTime) , size(size) , checksum(checksum) @@ -80,8 +82,10 @@ DatabaseObject::DatabaseObject( , _settings(settings) , _writeBundlesTimer(_weak, [=] { writeBundles(); checkCompactor(); }) , _pruneTimer(_weak, [=] { prune(); }) { - Expects(_settings.maxDataSize < kDataSizeLimit); - Expects(_settings.maxBundledRecords < kBundledRecordsLimit); + Expects(_settings.maxDataSize > 0 + && _settings.maxDataSize < kDataSizeLimit); + Expects(_settings.maxBundledRecords > 0 + && _settings.maxBundledRecords < kBundledRecordsLimit); Expects(!_settings.totalTimeLimit || _settings.totalTimeLimit > 0); Expects(!_settings.totalSizeLimit @@ -89,7 +93,9 @@ DatabaseObject::DatabaseObject( } template -void DatabaseObject::invokeCallback(Callback &&callback, Args &&...args) { +void DatabaseObject::invokeCallback( + Callback &&callback, + Args &&...args) const { if (callback) { callback(std::move(args)...); } @@ -137,23 +143,39 @@ QString DatabaseObject::computePath(Version version) const { return _base + QString::number(version) + '/'; } -QString DatabaseObject::binlogFilename() const { +QString DatabaseObject::BinlogFilename() { return QStringLiteral("binlog"); } +QString DatabaseObject::CompactReadyFilename() { + return QStringLiteral("binlog-ready"); +} + QString DatabaseObject::binlogPath(Version version) const { - return computePath(version) + binlogFilename(); + return computePath(version) + BinlogFilename(); } QString DatabaseObject::binlogPath() const { - return _path + binlogFilename(); + return _path + BinlogFilename(); +} + +QString DatabaseObject::compactReadyPath(Version version) const { + return computePath(version) + CompactReadyFilename(); +} + +QString DatabaseObject::compactReadyPath() const { + return _path + CompactReadyFilename(); } File::Result DatabaseObject::openBinlog( Version version, File::Mode mode, EncryptionKey &key) { + const auto ready = compactReadyPath(version); const auto path = binlogPath(version); + if (QFile(ready).exists() && !File::Move(ready, path)) { + return File::Result::Failed; + } const auto result = _binlog.open(path, mode, key); if (result == File::Result::Success) { const auto headerRequired = (mode == File::Mode::Read) @@ -180,14 +202,14 @@ bool DatabaseObject::readHeader() { != !!(header.flags & header.kTrackEstimatedTime)) { return false; } - _relativeTime = _latestSystemTime = header.systemTime; + _time.setRelative((_time.system = header.systemTime)); return true; } bool DatabaseObject::writeHeader() { auto header = BasicHeader(); const auto now = _settings.trackEstimatedTime ? GetUnixtime() : 0; - _relativeTime = _latestSystemTime = header.systemTime = now; + _time.setRelative((_time.system = header.systemTime = now)); if (_settings.trackEstimatedTime) { header.flags |= header.kTrackEstimatedTime; } @@ -243,16 +265,17 @@ void DatabaseObject::readBinlog() { optimize(); } -int64 DatabaseObject::countRelativeTime() const { +uint64 DatabaseObject::countRelativeTime() const { const auto now = GetUnixtime(); - const auto delta = std::max(int64(now) - int64(_latestSystemTime), 0LL); - return _relativeTime + delta; + const auto delta = std::max(int64(now) - int64(_time.system), 0LL); + return _time.getRelative() + delta; } -int64 DatabaseObject::pruneBeforeTime() const { - return _settings.totalTimeLimit - ? (countRelativeTime() - _settings.totalTimeLimit) - : 0LL; +uint64 DatabaseObject::pruneBeforeTime() const { + const auto relative = countRelativeTime(); + return (_settings.totalTimeLimit && relative > _settings.totalTimeLimit) + ? (relative - _settings.totalTimeLimit) + : 0ULL; } void DatabaseObject::optimize() { @@ -265,12 +288,13 @@ bool DatabaseObject::startDelayedPruning() { if (!_settings.trackEstimatedTime || _map.empty()) { return false; } + const auto before = pruneBeforeTime(); const auto pruning = [&] { if (_settings.totalSizeLimit > 0 && _totalSize > _settings.totalSizeLimit) { return true; } else if (_minimalEntryTime != 0 - && _minimalEntryTime <= pruneBeforeTime()) { + && _minimalEntryTime <= before) { return true; } return false; @@ -282,8 +306,8 @@ bool DatabaseObject::startDelayedPruning() { } return true; } else if (_minimalEntryTime != 0) { - const auto before = pruneBeforeTime(); - const auto seconds = (_minimalEntryTime - before); + Assert(_minimalEntryTime > before); + const auto seconds = int64(_minimalEntryTime - before); if (!_pruneTimer.isActive()) { _pruneTimer.callOnce(std::min( seconds * crl::time_type(1000), @@ -384,7 +408,7 @@ void DatabaseObject::adjustRelativeTime() { return; } const auto now = GetUnixtime(); - if (now < _latestSystemTime) { + if (now < _time.system) { writeMultiAccessBlock(); } } @@ -402,7 +426,7 @@ bool DatabaseObject::processRecordStoreGeneric( record->tag, record->checksum, size, - _relativeTime); + _time.getRelative()); if (!postprocess(entry, record)) { return false; } @@ -424,7 +448,7 @@ bool DatabaseObject::processRecordStore( Entry &entry, not_null record) { applyTimePoint(record->time); - entry.useTime = _relativeTime; + entry.useTime = record->time.getRelative(); return true; }; return processRecordStoreGeneric(record, postprocess); @@ -464,16 +488,14 @@ bool DatabaseObject::processRecordMultiAccess( const GetElement &element) { Expects(_settings.trackEstimatedTime); - if (header.time.relativeAdvancement > _settings.maxTimeAdvancement) { - return false; - } applyTimePoint(header.time); + const auto relative = header.time.getRelative(); _binlogExcessLength += sizeof(header); while (const auto entry = element()) { _binlogExcessLength += sizeof(*entry); if (const auto i = _map.find(*entry); i != end(_map)) { - i->second.useTime = _relativeTime; + i->second.useTime = relative; } } return true; @@ -516,26 +538,78 @@ void DatabaseObject::eraseMapEntry(const Map::const_iterator &i) { } EstimatedTimePoint DatabaseObject::countTimePoint() const { - const auto now = std::max(GetUnixtime(), 1); - const auto delta = std::max(int64(now) - int64(_latestSystemTime), 0LL); + const auto now = GetUnixtime(); + const auto delta = std::max(int64(now) - int64(_time.system), 0LL); auto result = EstimatedTimePoint(); result.system = now; - result.relativeAdvancement = std::min( - delta, - int64(_settings.maxTimeAdvancement)); + result.setRelative(_time.getRelative() + delta); return result; } void DatabaseObject::applyTimePoint(EstimatedTimePoint time) { - _relativeTime += time.relativeAdvancement; - _latestSystemTime = time.system; + const auto possible = time.getRelative(); + const auto current = _time.getRelative(); + if (possible > current) { + _time = time; + } +} + +void DatabaseObject::compactorDone( + const QString &path, + int64 originalReadTill) { + const auto size = _binlog.size(); + const auto binlog = binlogPath(); + const auto ready = compactReadyPath(); + if (originalReadTill != size) { + originalReadTill = CatchUp( + path, + binlog, + _key, + originalReadTill, + _settings.readBlockSize); + if (originalReadTill != size) { + compactorFail(); + return; + } + } else if (!File::Move(path, ready)) { + compactorFail(); + return; + } + const auto guard = gsl::finally([&] { + _compactor = CompactorWrap(); + }); + _binlog.close(); + if (!File::Move(ready, binlog)) { + // megafail + compactorFail(); + return; + } + const auto result = _binlog.open(path, File::Mode::ReadAppend, _key); + if (result != File::Result::Success || !_binlog.seek(_binlog.size())) { + // megafail + compactorFail(); + return; + } + _binlogExcessLength -= _compactor.excessLength; + Assert(_binlogExcessLength >= 0); +} + +void DatabaseObject::compactorFail() { + const auto delay = _compactor.delayAfterFailure; + _compactor = CompactorWrap(); + _compactor.nextAttempt = crl::time() + delay; + _compactor.delayAfterFailure = std::min( + delay * 2, + kMaxDelayAfterFailure); + QFile(compactReadyPath()).remove(); } void DatabaseObject::close(FnMut done) { writeBundles(); _cleaner = CleanerWrap(); - _compactor = nullptr; + _compactor = CompactorWrap(); _binlog.close(); + _key = EncryptionKey(); invokeCallback(done); _map.clear(); _binlogExcessLength = 0; @@ -643,12 +717,14 @@ base::optional DatabaseObject::writeKeyPlace( } auto record = StoreWithTime(); record.time = countTimePoint(); - if (record.time.relativeAdvancement * crl::time_type(1000) + const auto writing = record.time.getRelative(); + const auto current = _time.getRelative(); + Assert(writing >= current); + if ((writing - current) * crl::time_type(1000) < _settings.writeBundleDelay) { - // We don't want to produce a lot of unique relativeTime values. + // We don't want to produce a lot of unique _time.relative values. // So if change in it is not large we stick to the old value. - record.time.system = _latestSystemTime; - record.time.relativeAdvancement = 0; + record.time = _time; } return writeKeyPlaceGeneric(std::move(record), key, data, checksum); } @@ -782,10 +858,10 @@ void DatabaseObject::writeMultiAccessBlock() { list.push_back(key); } } - applyTimePoint(time); + _time = time; for (const auto &entry : list) { if (const auto i = _map.find(entry); i != end(_map)) { - i->second.useTime = _relativeTime; + i->second.useTime = _time.getRelative(); } } @@ -826,7 +902,7 @@ void DatabaseObject::cleanerDone(Error error) { } void DatabaseObject::checkCompactor() { - if (_compactor + if (_compactor.object || !_settings.compactAfterExcess || _binlogExcessLength < _settings.compactAfterExcess) { return; @@ -835,8 +911,20 @@ void DatabaseObject::checkCompactor() { < _settings.compactAfterExcess * _binlog.size()) { return; } + } else if (crl::time() < _compactor.nextAttempt) { + return; } - _compactor = std::make_unique(_path, _weak); + auto info = Compactor::Info(); + info.till = _binlog.size(); + info.systemTime = _time.system; + info.keysCount = _map.size(); + _compactor.object = std::make_unique( + _weak, + _path, + _settings, + base::duplicate(_key), + info); + _compactor.excessLength = _binlogExcessLength; } void DatabaseObject::clear(FnMut done) { @@ -848,6 +936,18 @@ void DatabaseObject::clear(FnMut done) { writeVersion(version) ? Error::NoError() : ioError(versionPath())); } +auto DatabaseObject::getManyRaw(const std::vector keys) const +-> std::vector { + auto result = std::vector(); + result.reserve(keys.size()); + for (const auto &key : keys) { + if (const auto i = _map.find(key); i != end(_map)) { + result.push_back(*i); + } + } + return result; +} + DatabaseObject::~DatabaseObject() { close(nullptr); } diff --git a/Telegram/SourceFiles/storage/cache/storage_cache_database_object.h b/Telegram/SourceFiles/storage/cache/storage_cache_database_object.h index 5c88c4636..0a4b3609b 100644 --- a/Telegram/SourceFiles/storage/cache/storage_cache_database_object.h +++ b/Telegram/SourceFiles/storage/cache/storage_cache_database_object.h @@ -38,9 +38,12 @@ public: void clear(FnMut done); - ~DatabaseObject(); + static QString BinlogFilename(); + static QString CompactReadyFilename(); + + void compactorDone(const QString &path, int64 originalReadTill); + void compactorFail(); -private: struct Entry { Entry() = default; Entry( @@ -48,29 +51,42 @@ private: uint8 tag, uint32 checksum, size_type size, - int64 useTime); + uint64 useTime); - int64 useTime = 0; + uint64 useTime = 0; size_type size = 0; uint32 checksum = 0; PlaceId place = { { 0 } }; uint8 tag = 0; }; + using Raw = std::pair; + std::vector getManyRaw(const std::vector keys) const; + + ~DatabaseObject(); + +private: struct CleanerWrap { std::unique_ptr object; base::binary_guard guard; }; + struct CompactorWrap { + std::unique_ptr object; + int64 excessLength = 0; + crl::time_type nextAttempt = 0; + crl::time_type delayAfterFailure = 10 * crl::time_type(1000); + }; using Map = std::unordered_map; template - void invokeCallback(Callback &&callback, Args &&...args); + void invokeCallback(Callback &&callback, Args &&...args) const; Error ioError(const QString &path) const; QString computePath(Version version) const; QString binlogPath(Version version) const; QString binlogPath() const; - QString binlogFilename() const; + QString compactReadyPath(Version version) const; + QString compactReadyPath() const; File::Result openBinlog( Version version, File::Mode mode, @@ -106,10 +122,10 @@ private: void checkCompactor(); void adjustRelativeTime(); bool startDelayedPruning(); - int64 countRelativeTime() const; + uint64 countRelativeTime() const; EstimatedTimePoint countTimePoint() const; void applyTimePoint(EstimatedTimePoint time); - int64 pruneBeforeTime() const; + uint64 pruneBeforeTime() const; void prune(); void collectTimePrune( base::flat_set &stale, @@ -161,20 +177,18 @@ private: std::set _removing; std::set _accessed; - int64 _relativeTime = 0; - int64 _timeCorrection = 0; - uint32 _latestSystemTime = 0; + EstimatedTimePoint _time; int64 _binlogExcessLength = 0; int64 _totalSize = 0; - int64 _minimalEntryTime = 0; + uint64 _minimalEntryTime = 0; size_type _entriesWithMinimalTimeCount = 0; base::ConcurrentTimer _writeBundlesTimer; base::ConcurrentTimer _pruneTimer; CleanerWrap _cleaner; - std::unique_ptr _compactor; + CompactorWrap _compactor; }; diff --git a/Telegram/SourceFiles/storage/cache/storage_cache_types.h b/Telegram/SourceFiles/storage/cache/storage_cache_types.h index 4c8632179..392051098 100644 --- a/Telegram/SourceFiles/storage/cache/storage_cache_types.h +++ b/Telegram/SourceFiles/storage/cache/storage_cache_types.h @@ -40,11 +40,11 @@ struct Settings { int64 compactAfterExcess = 8 * 1024 * 1024; int64 compactAfterFullSize = 0; + size_type compactChunkSize = 16 * 1024; bool trackEstimatedTime = true; int64 totalSizeLimit = 1024 * 1024 * 1024; size_type totalTimeLimit = 30 * 86400; // One month in seconds. - size_type maxTimeAdvancement = 365 * 86400; // One year in seconds. crl::time_type pruneTimeout = 5 * crl::time_type(1000); crl::time_type maxPruneCheckTimeout = 3600 * crl::time_type(1000); }; @@ -131,8 +131,17 @@ struct BasicHeader { }; struct EstimatedTimePoint { + uint32 relative1 = 0; + uint32 relative2 = 0; uint32 system = 0; - uint32 relativeAdvancement = 0; + + void setRelative(uint64 value) { + relative1 = uint32(value & 0xFFFFFFFFU); + relative2 = uint32((value >> 32) & 0xFFFFFFFFU); + } + uint64 getRelative() const { + return uint64(relative1) | (uint64(relative2) << 32); + } }; struct Store { @@ -145,12 +154,10 @@ struct Store { uint32 checksum = 0; Key key; }; -static_assert(GoodForEncryption); struct StoreWithTime : Store { EstimatedTimePoint time; - uint32 reserved1 = 0; - uint32 reserved2 = 0; + uint32 reserved = 0; }; struct MultiStore { @@ -168,6 +175,8 @@ struct MultiStore { size_type validateCount() const; }; struct MultiStoreWithTime : MultiStore { + using MultiStore::MultiStore; + using Part = StoreWithTime; }; @@ -196,7 +205,6 @@ struct MultiAccess { RecordType type = kType; RecordsCount count = { { 0 } }; EstimatedTimePoint time; - uint32 reserved = 0; using Part = Key; size_type validateCount() const; diff --git a/Telegram/SourceFiles/storage/storage_encrypted_file.cpp b/Telegram/SourceFiles/storage/storage_encrypted_file.cpp index 27af9e037..9db8f30e7 100644 --- a/Telegram/SourceFiles/storage/storage_encrypted_file.cpp +++ b/Telegram/SourceFiles/storage/storage_encrypted_file.cpp @@ -309,4 +309,25 @@ bool File::seek(int64 offset) { return true; } +bool File::Move(const QString &from, const QString &to) { + QFile source(from); + if (!source.exists()) { + return false; + } + QFile destination(to); + if (destination.exists()) { + FileLock locker; + if (!locker.lock(destination, QIODevice::WriteOnly)) { + return false; + } + locker.unlock(); + destination.close(); + if (!destination.remove()) { + return false; + } + } + return source.rename(to); +} + + } // namespace Storage diff --git a/Telegram/SourceFiles/storage/storage_encrypted_file.h b/Telegram/SourceFiles/storage/storage_encrypted_file.h index a07f1f3e0..31d899002 100644 --- a/Telegram/SourceFiles/storage/storage_encrypted_file.h +++ b/Telegram/SourceFiles/storage/storage_encrypted_file.h @@ -43,6 +43,8 @@ public: void close(); + static bool Move(const QString &from, const QString &to); + private: Result attemptOpen(Mode mode, const EncryptionKey &key); Result attemptOpenForRead(const EncryptionKey &key); diff --git a/Telegram/SourceFiles/storage/storage_pch.h b/Telegram/SourceFiles/storage/storage_pch.h index 9fcfcb244..64d635c3d 100644 --- a/Telegram/SourceFiles/storage/storage_pch.h +++ b/Telegram/SourceFiles/storage/storage_pch.h @@ -17,6 +17,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include #include +#include #include #include