diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index da9560d70..b4b47af10 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -21,6 +21,7 @@ constexpr auto kInSlice = kPartsInSlice * kPartSize; constexpr auto kMaxPartsInHeader = 72; constexpr auto kMaxInHeader = kMaxPartsInHeader * kPartSize; constexpr auto kMaxOnlyInHeader = 80 * kPartSize; +constexpr auto kSlicesInMemory = 2; // 1 MB of parts are requested from cloud ahead of reading demand. constexpr auto kPreloadPartsAhead = 8; @@ -304,6 +305,10 @@ bool Reader::Slices::processCacheResult( Expects(sliceNumber >= 0 && sliceNumber <= _data.size()); auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header); + if (!(slice.flags &Slice::Flag::LoadingFromCache)) { + // We could've already unloaded this slice using LRU _usedSlices. + return true; + } const auto success = slice.processCacheData( std::move(result), maxSliceSize(sliceNumber)); @@ -336,8 +341,10 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult { Expects(offset + buffer.size() <= _size); Expects(buffer.size() <= kInSlice); + using Flag = Slice::Flag; + if (_headerMode != HeaderMode::NoCache - && !(_header.flags & Slice::Flag::LoadedFromCache)) { + && !(_header.flags & Flag::LoadedFromCache)) { // Waiting for initial cache query. return {}; } @@ -356,12 +363,11 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult { if (sliceIndex == _data.size()) { return; } - using Flag = Slice::Flag; if (!(_data[sliceIndex].flags & Flag::LoadedFromCache) && _headerMode != HeaderMode::NoCache && _headerMode != HeaderMode::Unknown) { if (!(_data[sliceIndex].flags & Flag::LoadingFromCache)) { - _data[sliceIndex].flags |= Slice::Flag::LoadingFromCache; + _data[sliceIndex].flags |= Flag::LoadingFromCache; result.sliceNumbersFromCache.add(sliceIndex + 1); } return; @@ -385,23 +391,49 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult { handlePrepareResult(fromSlice, first); handlePrepareResult(fromSlice + 1, second); if (first.ready && second.ready) { + markSliceUsed(fromSlice); CopyLoaded( buffer, ranges::make_iterator_range(first.start, first.finish), firstFrom, firstTill); if (fromSlice + 1 < tillSlice) { + markSliceUsed(fromSlice + 1); CopyLoaded( buffer.subspan(firstTill - firstFrom), ranges::make_iterator_range(second.start, second.finish), secondFrom, secondTill); } + if (_usedSlices.size() > kSlicesInMemory + && _headerMode != HeaderMode::Unknown) { + const auto purgeSlice = _usedSlices.front(); + _usedSlices.pop_front(); + if (_headerMode == HeaderMode::NoCache + || !(_data[purgeSlice].flags & Flag::ChangedSinceCache)) { + _data[purgeSlice] = Slice(); + } else { + result.toCache = serializeAndUnloadSlice(purgeSlice + 1); + } + } result.filled = true; } return result; } +void Reader::Slices::markSliceUsed(int sliceIndex) { + const auto i = ranges::find(_usedSlices, sliceIndex); + const auto end = _usedSlices.end(); + if (i == end) { + _usedSlices.push_back(sliceIndex); + } else { + const auto next = i + 1; + if (next != end) { + std::rotate(i, next, end); + } + } +} + int Reader::Slices::maxSliceSize(int sliceNumber) const { return (sliceNumber == _data.size()) ? (_size - (sliceNumber - 1) * kInSlice) @@ -411,11 +443,13 @@ int Reader::Slices::maxSliceSize(int sliceNumber) const { } Reader::SerializedSlice Reader::Slices::serializeAndUnloadSlice( - int sliceNumber, - Slice &slice) const { - Expects(!slice.parts.empty()); + int sliceNumber) { + Expects(sliceNumber >= 0 && sliceNumber <= _data.size()); + auto &slice = sliceNumber ? _data[sliceNumber - 1] : _header; const auto count = slice.parts.size(); + Assert(count > 0); + auto result = SerializedSlice(); result.number = sliceNumber; const auto continuous = FindNotLoadedStart(slice.parts, 0); @@ -455,12 +489,11 @@ Reader::SerializedSlice Reader::Slices::unloadToCache() { return {}; } if (_header.flags & Slice::Flag::ChangedSinceCache) { - return serializeAndUnloadSlice(0, _header); + return serializeAndUnloadSlice(0); } - auto &&indexed = ranges::view::zip(_data, ranges::view::ints(0)); - for (auto &&[slice, index] : indexed) { - if (slice.flags & Slice::Flag::ChangedSinceCache) { - return serializeAndUnloadSlice(index + 1, slice); + for (auto i = 0, count = int(_data.size()); i != count; ++i) { + if (_data[i].flags & Slice::Flag::ChangedSinceCache) { + return serializeAndUnloadSlice(i + 1); } } return {}; @@ -502,6 +535,7 @@ std::shared_ptr Reader::InitCacheHelper( void Reader::readFromCache(int sliceNumber) { Expects(_cacheHelper != nullptr); + LOG(("READING FROM CACHE: %1").arg(sliceNumber)); const auto key = _cacheHelper->key(sliceNumber); const auto weak = std::weak_ptr(_cacheHelper); _owner->cacheBigFile().get(key, [=](QByteArray &&result) { @@ -686,9 +720,6 @@ bool Reader::processLoadedParts() { static auto real = 0; static auto skip = 0; void Reader::loadAtOffset(int offset) { - if (offset == 655360) { - int a = 0; - } if (_loadingOffsets.add(offset)) { LOG(("START LOAD: %1").arg(offset)); _loader->load(offset); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index e14e7bad1..040a6fcce 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -126,12 +126,12 @@ private: void applyHeaderCacheData(); int maxSliceSize(int sliceNumber) const; - SerializedSlice serializeAndUnloadSlice( - int sliceNumber, - Slice &slice) const; + SerializedSlice serializeAndUnloadSlice(int sliceNumber); + void markSliceUsed(int sliceIndex); std::vector _data; Slice _header; + std::deque _usedSlices; int _size = 0; HeaderMode _headerMode = HeaderMode::Unknown;