Write removes/touches in bundles. Apply limits.

This commit is contained in:
John Preston 2018-08-18 21:45:12 +03:00
parent b9af3c7f34
commit e5dda6dd49
5 changed files with 901 additions and 81 deletions

View File

@ -11,6 +11,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "storage/storage_encryption.h"
#include "storage/storage_encrypted_file.h"
#include "base/flat_set.h"
#include "base/flat_map.h"
#include "base/algorithm.h"
#include "base/concurrent_timer.h"
#include <crl/crl.h>
@ -35,20 +36,18 @@ namespace Cache {
namespace details {
namespace {
constexpr auto kMaxBundledRecords = 256 * 1024;
constexpr auto kReadBlockSize = 8 * 1024 * 1024;
constexpr auto kRecordSizeUnknown = size_type(-1);
constexpr auto kRecordSizeInvalid = size_type(-2);
constexpr auto kMaxDataSize = 10 * 1024 * 1024;
constexpr auto kRemoveBundleDelay = 60 * 60 * crl::time_type(1000);
using RecordType = uint8;
using PlaceId = std::array<uint8, 7>;
using EntrySize = std::array<uint8, 3>;
using RecordsCount = std::array<uint8, 3>;
static_assert(kMaxBundledRecords < (1 << (RecordsCount().size() * 8)));
static_assert(kMaxDataSize < (1 << (EntrySize().size() * 8)));
constexpr auto kRecordSizeUnknown = size_type(-1);
constexpr auto kRecordSizeInvalid = size_type(-2);
constexpr auto kBundledRecordsLimit = (1 << (RecordsCount().size() * 8));
constexpr auto kDataSizeLimit = (1 << (EntrySize().size() * 8));
template <typename Record>
constexpr auto GoodForEncryption = ((sizeof(Record) & 0x0F) == 0);
template <typename Packed>
Packed ReadTo(size_type count) {
@ -93,24 +92,48 @@ QString PlaceFromId(PlaceId place) {
};
for (auto i = 0; i != place.size(); ++i) {
push(place[i]);
if (i == 1) {
if (!i) {
result.push_back('/');
}
}
return result;
}
int32 GetUnixtime() {
return int32(time(nullptr));
}
enum class Format : uint32 {
Format_0,
};
struct BasicHeader {
BasicHeader();
Format format : 8;
uint32 flags : 24;
uint32 date = 0;
uint32 reserved1 = 0;
uint32 reserved2 = 0;
};
static_assert(GoodForEncryption<BasicHeader>);
BasicHeader::BasicHeader()
: format(Format::Format_0)
, flags(0) {
}
struct Store {
static constexpr auto kType = RecordType(0x01);
RecordType type = kType;
uint8 tag = 0;
PlaceId place = { { 0 } };
EntrySize size = { { 0 } };
PlaceId place = { { 0 } };
uint32 checksum = 0;
Key key;
};
static_assert(sizeof(Store) == 1 + 7 + 1 + 3 + 4 + 16);
static_assert(GoodForEncryption<Store>);
struct MultiStoreHeader {
static constexpr auto kType = RecordType(0x02);
@ -119,22 +142,25 @@ struct MultiStoreHeader {
RecordType type = kType;
RecordsCount count = { { 0 } };
uint32 reserved1 = 0;
uint32 reserved2 = 0;
uint32 reserved3 = 0;
};
struct MultiStorePart {
uint8 reserved = 0;
PlaceId place = { { 0 } };
uint8 tag = 0;
EntrySize size = { { 0 } };
PlaceId place = { { 0 } };
uint32 checksum = 0;
Key key;
};
static_assert(sizeof(MultiStoreHeader) == 4);
static_assert(sizeof(MultiStorePart) == sizeof(Store));
static_assert(GoodForEncryption<MultiStoreHeader>);
static_assert(GoodForEncryption<MultiStorePart>);
MultiStoreHeader::MultiStoreHeader(size_type count)
: type(kType)
, count(ReadTo<RecordsCount>(count)) {
Expects(count >= 0 && count < kMaxBundledRecords);
Expects(count >= 0 && count < kBundledRecordsLimit);
}
struct MultiRemoveHeader {
@ -144,17 +170,51 @@ struct MultiRemoveHeader {
RecordType type = kType;
RecordsCount count = { { 0 } };
uint32 reserved1 = 0;
uint32 reserved2 = 0;
uint32 reserved3 = 0;
};
struct MultiRemovePart {
Key key;
};
static_assert(sizeof(MultiRemoveHeader) == 4);
static_assert(sizeof(MultiRemovePart) == 16);
static_assert(GoodForEncryption<MultiRemoveHeader>);
static_assert(GoodForEncryption<MultiRemovePart>);
MultiRemoveHeader::MultiRemoveHeader(size_type count)
: type(kType)
, count(ReadTo<RecordsCount>(count)) {
Expects(count >= 0 && count < kMaxBundledRecords);
Expects(count >= 0 && count < kBundledRecordsLimit);
}
struct MultiTouchedHeader {
static constexpr auto kType = RecordType(0x04);
MultiTouchedHeader(
uint32 time,
uint32 advancement,
size_type count = 0);
RecordType type = kType;
RecordsCount count = { { 0 } };
uint32 timeAdvancement = 0;
uint32 systemTime = 0;
uint32 reserved = 0;
};
struct MultiTouchedPart {
Key key;
};
static_assert(GoodForEncryption<MultiTouchedHeader>);
static_assert(GoodForEncryption<MultiTouchedPart>);
MultiTouchedHeader::MultiTouchedHeader(
uint32 time,
uint32 advancement,
size_type count)
: type(kType)
, count(ReadTo<RecordsCount>(count))
, timeAdvancement(advancement)
, systemTime(time) {
Expects(count >= 0 && count < kBundledRecordsLimit);
}
} // namespace
@ -173,7 +233,7 @@ public:
void put(const Key &key, QByteArray value, FnMut<void(Error)> done);
void get(const Key &key, FnMut<void(QByteArray)> done);
void remove(const Key &key, FnMut<void()> done);
void remove(const Key &key, FnMut<void()> done = nullptr);
void clear(FnMut<void(Error)> done);
@ -182,17 +242,24 @@ public:
private:
struct Entry {
Entry() = default;
Entry(PlaceId place, uint8 tag, uint32 checksum, size_type size);
Entry(
PlaceId place,
uint8 tag,
uint32 checksum,
size_type size,
int64 useTime);
uint64 tag = 0;
uint32 checksum = 0;
int64 useTime = 0;
size_type size = 0;
uint32 checksum = 0;
PlaceId place = { { 0 } };
uint8 tag = 0;
};
struct CleanerWrap {
std::unique_ptr<Cleaner> object;
base::binary_guard guard;
};
using Map = std::unordered_map<Key, Entry>;
template <typename Callback, typename ...Args>
void invokeCallback(Callback &&callback, Args &&...args);
@ -207,6 +274,8 @@ private:
Version version,
File::Mode mode,
EncryptionKey &key);
bool readHeader();
bool writeHeader();
void readBinlog();
size_type readBinlogRecords(bytes::const_span data);
size_type readBinlogRecordSize(bytes::const_span data) const;
@ -214,6 +283,22 @@ private:
bool readRecordStore(bytes::const_span data);
bool readRecordMultiStore(bytes::const_span data);
bool readRecordMultiRemove(bytes::const_span data);
bool readRecordMultiTouched(bytes::const_span data);
void adjustRelativeTime();
void startDelayedPruning();
int64 countRelativeTime() const;
int64 pruneBeforeTime() const;
void prune();
void collectTimePrune(
base::flat_set<Key> &stale,
int64 &staleTotalSize);
void collectSizePrune(
base::flat_set<Key> &stale,
int64 &staleTotalSize);
void setMapEntry(const Key &key, Entry &&entry);
void eraseMapEntry(const Map::const_iterator &i);
Version findAvailableVersion() const;
QString versionPath() const;
@ -223,7 +308,13 @@ private:
QString placePath(PlaceId place) const;
bool isFreePlace(PlaceId place) const;
QString writeKeyPlace(const Key &key, size_type size, uint32 checksum);
void writeMultiRemoveLazy();
void writeMultiRemove();
void writeMultiTouchedLazy();
void writeMultiTouched();
void writeMultiTouchedBlock();
void writeBundlesLazy();
void writeBundles();
void createCleaner();
void cleanerDone(Error error);
@ -233,10 +324,20 @@ private:
Settings _settings;
EncryptionKey _key;
File _binlog;
std::unordered_map<Key, Entry> _map;
Map _map;
std::set<Key> _removing;
std::set<Key> _touched;
base::ConcurrentTimer _writeRemoveTimer;
int64 _relativeTime = 0;
int64 _timeCorrection = 0;
uint32 _latestSystemTime = 0;
int64 _totalSize = 0;
int64 _minimalEntryTime = 0;
size_type _entriesWithMinimalTimeCount = 0;
base::ConcurrentTimer _writeBundlesTimer;
base::ConcurrentTimer _pruneTimer;
CleanerWrap _cleaner;
@ -246,11 +347,13 @@ Database::Entry::Entry(
PlaceId place,
uint8 tag,
uint32 checksum,
size_type size)
: tag(tag)
size_type size,
int64 useTime)
: useTime(useTime)
, size(size)
, checksum(checksum)
, place(place)
, size(size) {
, tag(tag)
, place(place) {
}
Database::Database(
@ -260,19 +363,20 @@ Database::Database(
: _weak(std::move(weak))
, _base(ComputeBasePath(path))
, _settings(settings)
, _writeRemoveTimer(_weak, [=] { writeMultiRemove(); }) {
, _writeBundlesTimer(_weak, [=] { writeBundles(); })
, _pruneTimer(_weak, [=] { prune(); }) {
Expects(_settings.maxDataSize < kDataSizeLimit);
Expects(_settings.maxBundledRecords < kBundledRecordsLimit);
Expects(!_settings.totalTimeLimit
|| _settings.totalTimeLimit > 0);
Expects(!_settings.totalSizeLimit
|| _settings.totalSizeLimit > _settings.maxDataSize);
}
template <typename Callback, typename ...Args>
void Database::invokeCallback(Callback &&callback, Args &&...args) {
if (callback) {
callback(std::move(args)...);
//crl::on_main([
// callback = std::move(callback),
// args = std::forward<Args>(args)...
//]() mutable {
// callback(std::move(args)...);
//});
}
}
@ -337,16 +441,40 @@ File::Result Database::openBinlog(
const auto path = binlogPath(version);
const auto result = _binlog.open(path, mode, key);
if (result == File::Result::Success) {
_path = computePath(version);
_key = std::move(key);
createCleaner();
readBinlog();
const auto headerRequired = (mode == File::Mode::Read)
|| (mode == File::Mode::ReadAppend && _binlog.size() > 0);
if (headerRequired ? readHeader() : writeHeader()) {
_path = computePath(version);
_key = std::move(key);
createCleaner();
readBinlog();
} else {
return File::Result::Failed;
}
}
return result;
}
bool Database::readHeader() {
auto header = BasicHeader();
if (_binlog.read(bytes::object_as_span(&header)) != sizeof(header)) {
return false;
} else if (header.format != Format::Format_0) {
return false;
}
_relativeTime = _latestSystemTime = header.date;
return true;
}
bool Database::writeHeader() {
auto header = BasicHeader();
const auto now = std::max(GetUnixtime(), 1);
_relativeTime = _latestSystemTime = header.date = now;
return _binlog.write(bytes::object_as_span(&header));
}
void Database::readBinlog() {
auto data = bytes::vector(kReadBlockSize);
auto data = bytes::vector(_settings.readBlockSize);
const auto full = bytes::make_span(data);
auto notParsedBytes = index_type(0);
while (true) {
@ -368,6 +496,144 @@ void Database::readBinlog() {
}
}
_binlog.seek(_binlog.offset() - notParsedBytes);
adjustRelativeTime();
startDelayedPruning();
}
int64 Database::countRelativeTime() const {
const auto now = std::max(GetUnixtime(), 1);
const auto delta = std::max(int64(now) - int64(_latestSystemTime), 0LL);
return _relativeTime + delta;
}
int64 Database::pruneBeforeTime() const {
return _settings.totalTimeLimit
? (countRelativeTime() - _settings.totalTimeLimit)
: 0LL;
}
void Database::startDelayedPruning() {
if (_map.empty()) {
return;
}
const auto pruning = [&] {
if (_settings.totalSizeLimit > 0
&& _totalSize > _settings.totalSizeLimit) {
return true;
} else if (_minimalEntryTime != 0
&& _minimalEntryTime <= pruneBeforeTime()) {
return true;
}
return false;
}();
if (pruning) {
if (!_pruneTimer.isActive()
|| _pruneTimer.remainingTime() > _settings.pruneTimeout) {
_pruneTimer.callOnce(_settings.pruneTimeout);
}
} else if (_minimalEntryTime != 0) {
const auto before = pruneBeforeTime();
const auto seconds = (_minimalEntryTime - before);
if (!_pruneTimer.isActive()) {
_pruneTimer.callOnce(std::min(
seconds * crl::time_type(1000),
_settings.maxPruneCheckTimeout));
}
}
}
void Database::prune() {
auto stale = base::flat_set<Key>();
auto staleTotalSize = int64();
collectTimePrune(stale, staleTotalSize);
collectSizePrune(stale, staleTotalSize);
for (const auto &key : stale) {
remove(key);
}
startDelayedPruning();
}
void Database::collectTimePrune(
base::flat_set<Key> &stale,
int64 &staleTotalSize) {
if (!_settings.totalTimeLimit) {
return;
}
const auto before = pruneBeforeTime();
if (!_minimalEntryTime || _minimalEntryTime > before) {
return;
}
_minimalEntryTime = 0;
_entriesWithMinimalTimeCount = 0;
for (const auto &[key, entry] : _map) {
if (entry.useTime <= before) {
stale.emplace(key);
staleTotalSize += entry.size;
} else if (!_minimalEntryTime
|| _minimalEntryTime > entry.useTime) {
_minimalEntryTime = entry.useTime;
_entriesWithMinimalTimeCount = 1;
} else if (_minimalEntryTime == entry.useTime) {
++_entriesWithMinimalTimeCount;
}
}
}
void Database::collectSizePrune(
base::flat_set<Key> &stale,
int64 &staleTotalSize) {
const auto removeSize = (_settings.totalSizeLimit > 0)
? (_totalSize - staleTotalSize - _settings.totalSizeLimit)
: 0;
if (removeSize <= 0) {
return;
}
using Bucket = std::pair<const Key, Entry>;
auto oldest = base::flat_multi_map<
int64,
const Bucket*,
std::greater<>>();
auto oldestTotalSize = int64();
const auto canRemoveFirst = [&](const Entry &adding) {
const auto totalSizeAfterAdd = oldestTotalSize + adding.size;
const auto &first = oldest.begin()->second->second;
return (adding.useTime <= first.useTime
&& (totalSizeAfterAdd - removeSize >= first.size));
};
for (const auto &bucket : _map) {
const auto &entry = bucket.second;
if (stale.contains(bucket.first)) {
continue;
}
const auto add = (oldestTotalSize < removeSize)
? true
: (entry.useTime < oldest.begin()->second->second.useTime);
if (!add) {
continue;
}
while (!oldest.empty() && canRemoveFirst(entry)) {
oldestTotalSize -= oldest.begin()->second->second.size;
oldest.erase(oldest.begin());
}
oldestTotalSize += entry.size;
oldest.emplace(entry.useTime, &bucket);
}
for (const auto &pair : oldest) {
stale.emplace(pair.second->first);
}
staleTotalSize += oldestTotalSize;
}
void Database::adjustRelativeTime() {
const auto now = std::max(GetUnixtime(), 1);
if (now < _latestSystemTime) {
writeMultiTouchedBlock();
}
}
size_type Database::readBinlogRecords(bytes::const_span data) {
@ -399,7 +665,7 @@ size_type Database::readBinlogRecordSize(bytes::const_span data) const {
const auto header = reinterpret_cast<const MultiStoreHeader*>(
data.data());
const auto count = ReadFrom(header->count);
return (count > 0 && count < kMaxBundledRecords)
return (count > 0 && count < _settings.maxBundledRecords)
? (sizeof(MultiStoreHeader)
+ count * sizeof(MultiStorePart))
: kRecordSizeInvalid;
@ -411,13 +677,25 @@ size_type Database::readBinlogRecordSize(bytes::const_span data) const {
const auto header = reinterpret_cast<const MultiRemoveHeader*>(
data.data());
const auto count = ReadFrom(header->count);
return (count > 0 && count < kMaxBundledRecords)
return (count > 0 && count < _settings.maxBundledRecords)
? (sizeof(MultiRemoveHeader)
+ count * sizeof(MultiRemovePart))
: kRecordSizeInvalid;
}
return kRecordSizeUnknown;
case MultiTouchedHeader::kType:
if (data.size() >= sizeof(MultiTouchedHeader)) {
const auto header = reinterpret_cast<const MultiTouchedHeader*>(
data.data());
const auto count = ReadFrom(header->count);
return (count > 0 && count < _settings.maxBundledRecords)
? (sizeof(MultiTouchedHeader)
+ count * sizeof(MultiTouchedPart))
: kRecordSizeInvalid;
}
return kRecordSizeUnknown;
}
return kRecordSizeInvalid;
}
@ -435,25 +713,35 @@ bool Database::readBinlogRecord(bytes::const_span data) {
case MultiRemoveHeader::kType:
return readRecordMultiRemove(data);
case MultiTouchedHeader::kType:
return readRecordMultiTouched(data);
}
Unexpected("Bad type in Database::readBinlogRecord.");
}
bool Database::readRecordStore(bytes::const_span data) {
Expects(data.size() >= sizeof(Store));
const auto record = reinterpret_cast<const Store*>(data.data());
const auto size = ReadFrom(record->size);
if (size > kMaxDataSize) {
if (size > _settings.maxDataSize) {
return false;
}
_map[record->key] = Entry(
record->place,
record->tag,
record->checksum,
size);
setMapEntry(
record->key,
Entry(
record->place,
record->tag,
record->checksum,
size,
_relativeTime));
return true;
}
bool Database::readRecordMultiStore(bytes::const_span data) {
Expects(data.size() >= sizeof(MultiStoreHeader));
const auto bytes = data.data();
const auto record = reinterpret_cast<const MultiStoreHeader*>(bytes);
const auto count = ReadFrom(record->count);
@ -465,15 +753,55 @@ bool Database::readRecordMultiStore(bytes::const_span data) {
count);
for (const auto &part : parts) {
const auto size = ReadFrom(part.size);
if (part.reserved != 0 || size > kMaxDataSize) {
if (size > _settings.maxDataSize) {
return false;
}
_map[part.key] = Entry(part.place, part.tag, part.checksum, size);
setMapEntry(
part.key,
Entry(
part.place,
part.tag,
part.checksum,
size,
_relativeTime));
}
return true;
}
void Database::setMapEntry(const Key &key, Entry &&entry) {
auto &already = _map[key];
_totalSize += entry.size - already.size;
if (entry.useTime != 0
&& (entry.useTime < _minimalEntryTime || !_minimalEntryTime)) {
_minimalEntryTime = entry.useTime;
_entriesWithMinimalTimeCount = 1;
} else if (_minimalEntryTime != 0 && already.useTime != entry.useTime) {
if (entry.useTime == _minimalEntryTime) {
Assert(_entriesWithMinimalTimeCount > 0);
++_entriesWithMinimalTimeCount;
} else if (already.useTime == _minimalEntryTime) {
Assert(_entriesWithMinimalTimeCount > 0);
--_entriesWithMinimalTimeCount;
}
}
already = std::move(entry);
}
void Database::eraseMapEntry(const Map::const_iterator &i) {
if (i != end(_map)) {
const auto &entry = i->second;
_totalSize -= entry.size;
if (_minimalEntryTime != 0 && entry.useTime == _minimalEntryTime) {
Assert(_entriesWithMinimalTimeCount > 0);
--_entriesWithMinimalTimeCount;
}
_map.erase(i);
}
}
bool Database::readRecordMultiRemove(bytes::const_span data) {
Expects(data.size() >= sizeof(MultiRemoveHeader));
const auto bytes = data.data();
const auto record = reinterpret_cast<const MultiRemoveHeader*>(bytes);
const auto count = ReadFrom(record->count);
@ -484,15 +812,38 @@ bool Database::readRecordMultiRemove(bytes::const_span data) {
bytes + sizeof(MultiRemoveHeader)),
count);
for (const auto &part : parts) {
_map.erase(part.key);
eraseMapEntry(_map.find(part.key));
}
return true;
}
bool Database::readRecordMultiTouched(bytes::const_span data) {
Expects(data.size() >= sizeof(MultiTouchedHeader));
const auto bytes = data.data();
const auto record = reinterpret_cast<const MultiTouchedHeader*>(bytes);
if (record->timeAdvancement > _settings.maxTimeAdvancement) {
return false;
}
_relativeTime += record->timeAdvancement;
_latestSystemTime = record->systemTime;
const auto count = ReadFrom(record->count);
Assert(data.size() >= sizeof(MultiTouchedHeader)
+ count * sizeof(MultiTouchedPart));
const auto parts = gsl::make_span(
reinterpret_cast<const MultiTouchedPart*>(
bytes + sizeof(MultiTouchedHeader)),
count);
for (const auto &part : parts) {
if (const auto i = _map.find(part.key); i != end(_map)) {
i->second.useTime = _relativeTime;
}
}
return true;
}
void Database::close(FnMut<void()> done) {
if (_writeRemoveTimer.isActive()) {
writeMultiRemove();
}
writeBundles();
_cleaner = CleanerWrap();
_binlog.close();
invokeCallback(done);
@ -502,6 +853,8 @@ void Database::put(
const Key &key,
QByteArray value,
FnMut<void(Error)> done) {
_removing.erase(key);
const auto checksum = CountChecksum(bytes::make_span(value));
const auto path = writeKeyPlace(key, value.size(), checksum);
if (path.isEmpty()) {
@ -528,6 +881,10 @@ void Database::put(
} else {
data.flush();
invokeCallback(done, Error::NoError());
_touched.emplace(key);
writeMultiTouchedLazy();
startDelayedPruning();
}
} break;
@ -539,16 +896,19 @@ QString Database::writeKeyPlace(
const Key &key,
size_type size,
uint32 checksum) {
Expects(size <= kMaxDataSize);
Expects(size <= _settings.maxDataSize);
auto record = Store();
record.key = key;
record.size = ReadTo<EntrySize>(size);
record.checksum = checksum;
do {
bytes::set_random(bytes::object_as_span(&record.place));
} while (!isFreePlace(record.place));
if (const auto i = _map.find(key); i != end(_map)) {
record.place = i->second.place;
} else {
do {
bytes::set_random(bytes::object_as_span(&record.place));
} while (!isFreePlace(record.place));
}
const auto result = placePath(record.place);
auto writeable = record;
const auto success = _binlog.write(bytes::object_as_span(&writeable));
@ -561,12 +921,17 @@ QString Database::writeKeyPlace(
}
void Database::get(const Key &key, FnMut<void(QByteArray)> done) {
if (_removing.find(key) != end(_removing)) {
invokeCallback(done, QByteArray());
return;
}
const auto i = _map.find(key);
if (i == _map.end()) {
invokeCallback(done, QByteArray());
return;
}
const auto &entry = i->second;
const auto path = placePath(entry.place);
File data;
const auto result = data.open(path, File::Mode::Read, _key);
@ -587,6 +952,10 @@ void Database::get(const Key &key, FnMut<void(QByteArray)> done) {
invokeCallback(done, QByteArray());
} else {
invokeCallback(done, std::move(result));
_touched.emplace(key);
writeMultiTouchedLazy();
startDelayedPruning();
}
} break;
@ -598,23 +967,31 @@ void Database::remove(const Key &key, FnMut<void()> done) {
const auto i = _map.find(key);
if (i != _map.end()) {
_removing.emplace(key);
if (_removing.size() == kMaxBundledRecords) {
_writeRemoveTimer.cancel();
writeMultiRemove();
} else if (!_writeRemoveTimer.isActive()) {
_writeRemoveTimer.callOnce(kRemoveBundleDelay);
}
writeMultiRemoveLazy();
const auto &entry = i->second;
const auto path = placePath(entry.place);
_map.erase(i);
const auto path = placePath(i->second.place);
eraseMapEntry(i);
QFile(path).remove();
}
invokeCallback(done);
}
void Database::writeBundlesLazy() {
if (!_writeBundlesTimer.isActive()) {
_writeBundlesTimer.callOnce(_settings.writeBundleDelay);
}
}
void Database::writeMultiRemoveLazy() {
if (_removing.size() == _settings.maxBundledRecords) {
writeMultiRemove();
} else {
writeBundlesLazy();
}
}
void Database::writeMultiRemove() {
Expects(_removing.size() <= kMaxBundledRecords);
Expects(_removing.size() <= _settings.maxBundledRecords);
if (_removing.empty()) {
return;
@ -632,6 +1009,60 @@ void Database::writeMultiRemove() {
}
}
void Database::writeMultiTouchedLazy() {
if (_touched.size() == _settings.maxBundledRecords) {
writeMultiTouched();
} else {
writeBundlesLazy();
}
}
void Database::writeMultiTouched() {
if (!_touched.empty()) {
writeMultiTouchedBlock();
}
}
void Database::writeMultiTouchedBlock() {
Expects(_touched.size() <= _settings.maxBundledRecords);
const auto now = std::max(GetUnixtime(), 1);
const auto delta = std::max(int64(now) - int64(_latestSystemTime), 0LL);
const auto advancement = std::min(
delta,
int64(_settings.maxTimeAdvancement));
const auto size = _touched.size();
auto header = MultiTouchedHeader(now, advancement, size);
auto list = std::vector<MultiTouchedPart>();
if (size > 0) {
list.reserve(size);
for (const auto &key : base::take(_touched)) {
list.push_back({ key });
}
}
_latestSystemTime = now;
if (advancement > 0) {
_relativeTime += advancement;
for (const auto &entry : list) {
if (const auto i = _map.find(entry.key); i != end(_map)) {
i->second.useTime = _relativeTime;
}
}
}
if (_binlog.write(bytes::object_as_span(&header))) {
if (size > 0) {
_binlog.write(bytes::make_span(list));
}
_binlog.flush();
}
}
void Database::writeBundles() {
writeMultiRemove();
writeMultiTouched();
}
void Database::createCleaner() {
auto [left, right] = base::make_binary_guard();
_cleaner.guard = std::move(left);

View File

@ -10,6 +10,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "storage/cache/storage_cache_types.h"
#include "base/basic_types.h"
#include <crl/crl_object_on_queue.h>
#include <crl/crl_time.h>
#include <QtCore/QString>
namespace Storage {
@ -39,7 +40,15 @@ inline bool operator<(const Key &a, const Key &b) {
class Database {
public:
struct Settings {
size_type sizeLimit = 0;
int64 totalSizeLimit = 1024 * 1024 * 1024;
size_type totalTimeLimit = 30 * 86400; // One month in seconds.
size_type maxBundledRecords = 16 * 1024;
size_type readBlockSize = 8 * 1024 * 1024;
size_type maxDataSize = 10 * 1024 * 1024;
crl::time_type writeBundleDelay = 15 * 60 * crl::time_type(1000);
size_type maxTimeAdvancement = 365 * 86400; // One year in seconds.
crl::time_type pruneTimeout = 5 * crl::time_type(1000);
crl::time_type maxPruneCheckTimeout = 60 * 60 * crl::time_type(1000);
};
Database(const QString &path, const Settings &settings);

View File

@ -9,8 +9,11 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "storage/cache/storage_cache_database.h"
#include "storage/storage_encryption.h"
#include "storage/storage_encrypted_file.h"
#include "base/concurrent_timer.h"
#include <crl/crl.h>
#include <QtCore/QFile>
#include <QtWidgets/QApplication>
#include <thread>
using namespace Storage::Cache;
@ -25,6 +28,33 @@ abcdefgh01234567abcdefgh01234567abcdefgh01234567abcdefgh01234567\
const auto name = QString("test.db");
const auto SmallSleep = [] {
static auto SleepTime = 0;
if (SleepTime > 5000) {
return false;
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
SleepTime += 10;
return true;
};
QString GetBinlogPath() {
using namespace Storage;
QFile versionFile(name + "/version");
while (!versionFile.open(QIODevice::ReadOnly)) {
if (!SmallSleep()) {
return QString();
}
}
const auto bytes = versionFile.readAll();
if (bytes.size() != 4) {
return QString();
}
const auto version = *reinterpret_cast<const int32*>(bytes.data());
return name + '/' + QString::number(version) + "/binlog";
}
const auto TestValue1 = QByteArray("testbytetestbyt");
const auto TestValue2 = QByteArray("bytetestbytetestb");
@ -42,9 +72,26 @@ const auto GetValue = [](QByteArray value) {
Semaphore.release();
};
const auto Settings = Database::Settings{ 1024 };
const auto Settings = [] {
auto result = Database::Settings();
result.writeBundleDelay = 1 * crl::time_type(1000);
result.pruneTimeout = 1 * crl::time_type(1500);
result.maxDataSize = 20;
return result;
}();
const auto AdvanceTime = [](int32 seconds) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000) * seconds);
};
TEST_CASE("encrypted cache db", "[storage_cache_database]") {
static auto init = [] {
int argc = 0;
char **argv = nullptr;
static QCoreApplication application(argc, argv);
static base::ConcurrentTimerEnvironment environment;
return true;
}();
SECTION("writing db") {
Database db(name, Settings);
@ -103,5 +150,331 @@ TEST_CASE("encrypted cache db", "[storage_cache_database]") {
db.get(Key{ 1, 0 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue2));
db.close([&] { Semaphore.release(); });
Semaphore.acquire();
}
}
TEST_CASE("cache db remove", "[storage_cache_database]") {
SECTION("db remove deletes value") {
Database db(name, Settings);
db.clear(GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
db.open(key, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
db.put(Key{ 0, 1 }, TestValue1, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
db.put(Key{ 1, 0 }, TestValue2, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
db.remove(Key{ 0, 1 }, nullptr);
db.get(Key{ 0, 1 }, GetValue);
Semaphore.acquire();
REQUIRE(Value.isEmpty());
db.get(Key{ 1, 0 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue2));
db.close([&] { Semaphore.release(); });
Semaphore.acquire();
}
SECTION("db remove deletes value permanently") {
Database db(name, Settings);
db.open(key, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
db.get(Key{ 1, 0 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue2));
db.close([&] { Semaphore.release(); });
Semaphore.acquire();
}
}
TEST_CASE("cache db bundled actions", "[storage_cache_database]") {
SECTION("db touched written lazily") {
Database db(name, Settings);
db.clear(GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
db.open(key, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
const auto path = GetBinlogPath();
db.put(Key{ 0, 1 }, TestValue1, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
const auto size = QFile(path).size();
db.get(Key{ 0, 1 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue1));
const auto same = QFile(path).size();
REQUIRE(same == size);
AdvanceTime(2);
const auto next = QFile(path).size();
REQUIRE(next > size);
db.close([&] { Semaphore.release(); });
Semaphore.acquire();
}
SECTION("db touched written on close") {
Database db(name, Settings);
db.clear(GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
db.open(key, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
const auto path = GetBinlogPath();
db.put(Key{ 0, 1 }, TestValue1, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
const auto size = QFile(path).size();
db.get(Key{ 0, 1 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue1));
const auto same = QFile(path).size();
REQUIRE(same == size);
db.close([&] { Semaphore.release(); });
Semaphore.acquire();
const auto next = QFile(path).size();
REQUIRE(next > size);
}
SECTION("db remove written lazily") {
Database db(name, Settings);
db.clear(GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
db.open(key, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
const auto path = GetBinlogPath();
db.put(Key{ 0, 1 }, TestValue1, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
const auto size = QFile(path).size();
db.remove(Key{ 0, 1 }, [&] { Semaphore.release(); });
Semaphore.acquire();
const auto same = QFile(path).size();
REQUIRE(same == size);
AdvanceTime(2);
const auto next = QFile(path).size();
REQUIRE(next > size);
db.close([&] { Semaphore.release(); });
Semaphore.acquire();
}
SECTION("db remove written on close") {
Database db(name, Settings);
db.clear(GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
db.open(key, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
const auto path = GetBinlogPath();
db.put(Key{ 0, 1 }, TestValue1, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
const auto size = QFile(path).size();
db.remove(Key{ 0, 1 }, [&] { Semaphore.release(); });
Semaphore.acquire();
const auto same = QFile(path).size();
REQUIRE(same == size);
db.close([&] { Semaphore.release(); });
Semaphore.acquire();
const auto next = QFile(path).size();
REQUIRE(next > size);
}
}
TEST_CASE("cache db limits", "[storage_cache_database]") {
SECTION("db both limit") {
auto settings = Settings;
settings.totalSizeLimit = 17 * 3 + 1;
settings.totalTimeLimit = 4;
Database db(name, settings);
db.clear(nullptr);
db.open(key, nullptr);
db.put(Key{ 0, 1 }, TestValue1, nullptr);
db.put(Key{ 1, 0 }, TestValue2, nullptr);
AdvanceTime(2);
db.get(Key{ 1, 0 }, nullptr);
AdvanceTime(3);
db.put(Key{ 1, 1 }, TestValue1, nullptr);
db.put(Key{ 2, 0 }, TestValue2, nullptr);
db.put(Key{ 0, 2 }, TestValue1, nullptr);
AdvanceTime(2);
db.get(Key{ 0, 1 }, GetValue);
Semaphore.acquire();
REQUIRE(Value.isEmpty());
db.get(Key{ 1, 0 }, GetValue);
Semaphore.acquire();
REQUIRE(Value.isEmpty());
db.get(Key{ 1, 1 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue1));
db.get(Key{ 2, 0 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue2));
db.get(Key{ 0, 2 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue1));
db.close([&] { Semaphore.release(); });
Semaphore.acquire();
}
SECTION("db size limit") {
auto settings = Settings;
settings.totalSizeLimit = 17 * 3 + 1;
Database db(name, settings);
db.clear(nullptr);
db.open(key, nullptr);
db.put(Key{ 0, 1 }, TestValue1, nullptr);
AdvanceTime(2);
db.put(Key{ 1, 0 }, TestValue2, nullptr);
AdvanceTime(2);
db.put(Key{ 1, 1 }, TestValue1, nullptr);
db.get(Key{ 0, 1 }, nullptr);
AdvanceTime(2);
db.put(Key{ 2, 0 }, TestValue2, nullptr);
// Removing { 1, 0 } will be scheduled.
db.get(Key{ 0, 1 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue1));
db.get(Key{ 1, 1 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue1));
db.get(Key{ 2, 0 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue2));
AdvanceTime(2);
// Removing { 1, 0 } performed.
db.get(Key{ 1, 0 }, GetValue);
Semaphore.acquire();
REQUIRE(Value.isEmpty());
db.get(Key{ 1, 1 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue1));
db.put(Key{ 0, 2 }, TestValue1, nullptr);
db.put(Key{ 2, 2 }, TestValue2, GetResult);
Semaphore.acquire();
REQUIRE(Result.type == Error::Type::None);
// Removing { 0, 1 } and { 2, 0 } will be scheduled.
AdvanceTime(2);
// Removing { 0, 1 } and { 2, 0 } performed.
db.get(Key{ 0, 1 }, GetValue);
Semaphore.acquire();
REQUIRE(Value.isEmpty());
db.get(Key{ 2, 0 }, GetValue);
Semaphore.acquire();
REQUIRE(Value.isEmpty());
db.get(Key{ 1, 1 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue1));
db.get(Key{ 0, 2 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue1));
db.get(Key{ 2, 2 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue2));
db.close([&] { Semaphore.release(); });
Semaphore.acquire();
}
SECTION("db time limit") {
auto settings = Settings;
settings.totalTimeLimit = 3;
Database db(name, settings);
db.clear(nullptr);
db.open(key, nullptr);
db.put(Key{ 0, 1 }, TestValue1, nullptr);
db.put(Key{ 1, 0 }, TestValue2, nullptr);
db.put(Key{ 1, 1 }, TestValue1, nullptr);
db.put(Key{ 2, 0 }, TestValue2, nullptr);
AdvanceTime(1);
db.get(Key{ 1, 0 }, nullptr);
db.get(Key{ 1, 1 }, nullptr);
AdvanceTime(1);
db.get(Key{ 1, 0 }, nullptr);
db.get(Key{ 0, 1 }, nullptr);
AdvanceTime(1);
db.get(Key{ 1, 0 }, nullptr);
db.get(Key{ 0, 1 }, nullptr);
AdvanceTime(2);
db.get(Key{ 2, 0 }, GetValue);
Semaphore.acquire();
REQUIRE(Value.isEmpty());
db.get(Key{ 1, 1 }, GetValue);
Semaphore.acquire();
REQUIRE(Value.isEmpty());
db.get(Key{ 1, 0 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue2));
db.get(Key{ 0, 1 }, GetValue);
Semaphore.acquire();
REQUIRE((Value == TestValue1));
db.close([&] { Semaphore.release(); });
Semaphore.acquire();
}
}

View File

@ -14,16 +14,28 @@ namespace {
constexpr auto kBlockSize = CtrState::kBlockSize;
} // namespace
enum class Format : uint32 {
Format_0,
};
struct BasicHeader {
BasicHeader();
struct File::BasicHeader {
bytes::array<kSaltSize> salt = { { bytes::type() } };
Format format = Format::Format_0;
uint32 reserved = 0;
Format format : 8;
uint32 reserved1 : 24;
uint32 reserved2 = 0;
uint64 applicationVersion = 0;
bytes::array<openssl::kSha256Size> checksum = { { bytes::type() } };
};
BasicHeader::BasicHeader()
: format(Format::Format_0)
, reserved1(0) {
}
} // namespace
File::Result File::open(
const QString &path,
Mode mode,

View File

@ -44,11 +44,6 @@ public:
void close();
private:
enum class Format : uint32 {
Format_0,
};
struct BasicHeader;
Result attemptOpen(Mode mode, const EncryptionKey &key);
Result attemptOpenForRead(const EncryptionKey &key);
Result attemptOpenForReadAppend(const EncryptionKey &key);