Remove from memory old file slices.

This commit is contained in:
John Preston 2019-02-26 17:57:47 +04:00
parent 2208621050
commit 4636c74586
2 changed files with 48 additions and 17 deletions

View File

@ -21,6 +21,7 @@ constexpr auto kInSlice = kPartsInSlice * kPartSize;
constexpr auto kMaxPartsInHeader = 72; constexpr auto kMaxPartsInHeader = 72;
constexpr auto kMaxInHeader = kMaxPartsInHeader * kPartSize; constexpr auto kMaxInHeader = kMaxPartsInHeader * kPartSize;
constexpr auto kMaxOnlyInHeader = 80 * kPartSize; constexpr auto kMaxOnlyInHeader = 80 * kPartSize;
constexpr auto kSlicesInMemory = 2;
// 1 MB of parts are requested from cloud ahead of reading demand. // 1 MB of parts are requested from cloud ahead of reading demand.
constexpr auto kPreloadPartsAhead = 8; constexpr auto kPreloadPartsAhead = 8;
@ -304,6 +305,10 @@ bool Reader::Slices::processCacheResult(
Expects(sliceNumber >= 0 && sliceNumber <= _data.size()); Expects(sliceNumber >= 0 && sliceNumber <= _data.size());
auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header); auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header);
if (!(slice.flags &Slice::Flag::LoadingFromCache)) {
// We could've already unloaded this slice using LRU _usedSlices.
return true;
}
const auto success = slice.processCacheData( const auto success = slice.processCacheData(
std::move(result), std::move(result),
maxSliceSize(sliceNumber)); maxSliceSize(sliceNumber));
@ -336,8 +341,10 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult {
Expects(offset + buffer.size() <= _size); Expects(offset + buffer.size() <= _size);
Expects(buffer.size() <= kInSlice); Expects(buffer.size() <= kInSlice);
using Flag = Slice::Flag;
if (_headerMode != HeaderMode::NoCache if (_headerMode != HeaderMode::NoCache
&& !(_header.flags & Slice::Flag::LoadedFromCache)) { && !(_header.flags & Flag::LoadedFromCache)) {
// Waiting for initial cache query. // Waiting for initial cache query.
return {}; return {};
} }
@ -356,12 +363,11 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult {
if (sliceIndex == _data.size()) { if (sliceIndex == _data.size()) {
return; return;
} }
using Flag = Slice::Flag;
if (!(_data[sliceIndex].flags & Flag::LoadedFromCache) if (!(_data[sliceIndex].flags & Flag::LoadedFromCache)
&& _headerMode != HeaderMode::NoCache && _headerMode != HeaderMode::NoCache
&& _headerMode != HeaderMode::Unknown) { && _headerMode != HeaderMode::Unknown) {
if (!(_data[sliceIndex].flags & Flag::LoadingFromCache)) { if (!(_data[sliceIndex].flags & Flag::LoadingFromCache)) {
_data[sliceIndex].flags |= Slice::Flag::LoadingFromCache; _data[sliceIndex].flags |= Flag::LoadingFromCache;
result.sliceNumbersFromCache.add(sliceIndex + 1); result.sliceNumbersFromCache.add(sliceIndex + 1);
} }
return; return;
@ -385,23 +391,49 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult {
handlePrepareResult(fromSlice, first); handlePrepareResult(fromSlice, first);
handlePrepareResult(fromSlice + 1, second); handlePrepareResult(fromSlice + 1, second);
if (first.ready && second.ready) { if (first.ready && second.ready) {
markSliceUsed(fromSlice);
CopyLoaded( CopyLoaded(
buffer, buffer,
ranges::make_iterator_range(first.start, first.finish), ranges::make_iterator_range(first.start, first.finish),
firstFrom, firstFrom,
firstTill); firstTill);
if (fromSlice + 1 < tillSlice) { if (fromSlice + 1 < tillSlice) {
markSliceUsed(fromSlice + 1);
CopyLoaded( CopyLoaded(
buffer.subspan(firstTill - firstFrom), buffer.subspan(firstTill - firstFrom),
ranges::make_iterator_range(second.start, second.finish), ranges::make_iterator_range(second.start, second.finish),
secondFrom, secondFrom,
secondTill); secondTill);
} }
if (_usedSlices.size() > kSlicesInMemory
&& _headerMode != HeaderMode::Unknown) {
const auto purgeSlice = _usedSlices.front();
_usedSlices.pop_front();
if (_headerMode == HeaderMode::NoCache
|| !(_data[purgeSlice].flags & Flag::ChangedSinceCache)) {
_data[purgeSlice] = Slice();
} else {
result.toCache = serializeAndUnloadSlice(purgeSlice + 1);
}
}
result.filled = true; result.filled = true;
} }
return result; return result;
} }
void Reader::Slices::markSliceUsed(int sliceIndex) {
const auto i = ranges::find(_usedSlices, sliceIndex);
const auto end = _usedSlices.end();
if (i == end) {
_usedSlices.push_back(sliceIndex);
} else {
const auto next = i + 1;
if (next != end) {
std::rotate(i, next, end);
}
}
}
int Reader::Slices::maxSliceSize(int sliceNumber) const { int Reader::Slices::maxSliceSize(int sliceNumber) const {
return (sliceNumber == _data.size()) return (sliceNumber == _data.size())
? (_size - (sliceNumber - 1) * kInSlice) ? (_size - (sliceNumber - 1) * kInSlice)
@ -411,11 +443,13 @@ int Reader::Slices::maxSliceSize(int sliceNumber) const {
} }
Reader::SerializedSlice Reader::Slices::serializeAndUnloadSlice( Reader::SerializedSlice Reader::Slices::serializeAndUnloadSlice(
int sliceNumber, int sliceNumber) {
Slice &slice) const { Expects(sliceNumber >= 0 && sliceNumber <= _data.size());
Expects(!slice.parts.empty());
auto &slice = sliceNumber ? _data[sliceNumber - 1] : _header;
const auto count = slice.parts.size(); const auto count = slice.parts.size();
Assert(count > 0);
auto result = SerializedSlice(); auto result = SerializedSlice();
result.number = sliceNumber; result.number = sliceNumber;
const auto continuous = FindNotLoadedStart(slice.parts, 0); const auto continuous = FindNotLoadedStart(slice.parts, 0);
@ -455,12 +489,11 @@ Reader::SerializedSlice Reader::Slices::unloadToCache() {
return {}; return {};
} }
if (_header.flags & Slice::Flag::ChangedSinceCache) { if (_header.flags & Slice::Flag::ChangedSinceCache) {
return serializeAndUnloadSlice(0, _header); return serializeAndUnloadSlice(0);
} }
auto &&indexed = ranges::view::zip(_data, ranges::view::ints(0)); for (auto i = 0, count = int(_data.size()); i != count; ++i) {
for (auto &&[slice, index] : indexed) { if (_data[i].flags & Slice::Flag::ChangedSinceCache) {
if (slice.flags & Slice::Flag::ChangedSinceCache) { return serializeAndUnloadSlice(i + 1);
return serializeAndUnloadSlice(index + 1, slice);
} }
} }
return {}; return {};
@ -502,6 +535,7 @@ std::shared_ptr<Reader::CacheHelper> Reader::InitCacheHelper(
void Reader::readFromCache(int sliceNumber) { void Reader::readFromCache(int sliceNumber) {
Expects(_cacheHelper != nullptr); Expects(_cacheHelper != nullptr);
LOG(("READING FROM CACHE: %1").arg(sliceNumber));
const auto key = _cacheHelper->key(sliceNumber); const auto key = _cacheHelper->key(sliceNumber);
const auto weak = std::weak_ptr<CacheHelper>(_cacheHelper); const auto weak = std::weak_ptr<CacheHelper>(_cacheHelper);
_owner->cacheBigFile().get(key, [=](QByteArray &&result) { _owner->cacheBigFile().get(key, [=](QByteArray &&result) {
@ -686,9 +720,6 @@ bool Reader::processLoadedParts() {
static auto real = 0; static auto real = 0;
static auto skip = 0; static auto skip = 0;
void Reader::loadAtOffset(int offset) { void Reader::loadAtOffset(int offset) {
if (offset == 655360) {
int a = 0;
}
if (_loadingOffsets.add(offset)) { if (_loadingOffsets.add(offset)) {
LOG(("START LOAD: %1").arg(offset)); LOG(("START LOAD: %1").arg(offset));
_loader->load(offset); _loader->load(offset);

View File

@ -126,12 +126,12 @@ private:
void applyHeaderCacheData(); void applyHeaderCacheData();
int maxSliceSize(int sliceNumber) const; int maxSliceSize(int sliceNumber) const;
SerializedSlice serializeAndUnloadSlice( SerializedSlice serializeAndUnloadSlice(int sliceNumber);
int sliceNumber, void markSliceUsed(int sliceIndex);
Slice &slice) const;
std::vector<Slice> _data; std::vector<Slice> _data;
Slice _header; Slice _header;
std::deque<int> _usedSlices;
int _size = 0; int _size = 0;
HeaderMode _headerMode = HeaderMode::Unknown; HeaderMode _headerMode = HeaderMode::Unknown;