From 84b09795f369e91bc54a584d399d717b9c568a10 Mon Sep 17 00:00:00 2001 From: John Preston Date: Fri, 8 Mar 2019 15:23:34 +0400 Subject: [PATCH] Store first slice in the header cache key. --- .../streaming/media_streaming_reader.cpp | 238 +++++++++++++----- .../media/streaming/media_streaming_reader.h | 32 ++- 2 files changed, 194 insertions(+), 76 deletions(-) diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index e43cf71a6..da75de80e 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -21,6 +21,7 @@ constexpr auto kPartsInSlice = 64; constexpr auto kInSlice = kPartsInSlice * kPartSize; constexpr auto kMaxPartsInHeader = 16; constexpr auto kMaxOnlyInHeader = 80 * kPartSize; +constexpr auto kPartsOutsideFirstSliceGood = 8; constexpr auto kSlicesInMemory = 2; // 1 MB of header parts can be outside the first slice for us to still @@ -30,6 +31,10 @@ constexpr auto kSlicesInMemory = 2; // 1 MB of parts are requested from cloud ahead of reading demand. constexpr auto kPreloadPartsAhead = 8; +bool IsContiguousSerialization(int serializedSize, int maxSliceSize) { + return !(serializedSize % kPartSize) || (serializedSize == maxSliceSize); +} + template // Range::value_type is Pair int FindNotLoadedStart(Range &&parts, int offset) { auto result = offset; @@ -108,7 +113,9 @@ Storage::Cache::Key Reader::CacheHelper::key(int sliceNumber) const { return Storage::Cache::Key{ baseKey.high, baseKey.low + sliceNumber }; } -bool Reader::Slice::processCacheData(QByteArray &&data, int maxSliceSize) { +bytes::const_span Reader::Slice::processCacheData( + bytes::const_span data, + int maxSize) { Expects((flags & Flag::LoadingFromCache) != 0); Expects(!(flags & Flag::LoadedFromCache)); @@ -117,24 +124,28 @@ bool Reader::Slice::processCacheData(QByteArray &&data, int maxSliceSize) { flags &= ~Flag::LoadingFromCache; }); - const auto size = data.size(); - if (!(size % kPartSize) || (size == maxSliceSize)) { - if (size > maxSliceSize) { - return false; + const auto size = int(data.size()); + if (IsContiguousSerialization(size, maxSize)) { + if (size > maxSize) { + return {}; } for (auto offset = 0; offset < size; offset += kPartSize) { - parts.emplace(offset, data.mid(offset, kPartSize)); + const auto part = data.subspan( + offset, + std::min(kPartSize, size - offset)); + parts.try_emplace( + offset, + reinterpret_cast(part.data()), + part.size()); } - return true; + return {}; } - return processComplexCacheData( - bytes::make_span(data), - maxSliceSize); + return processComplexCacheData(bytes::make_span(data), maxSize); } -bool Reader::Slice::processComplexCacheData( +bytes::const_span Reader::Slice::processComplexCacheData( bytes::const_span data, - int maxSliceSize) { + int maxSize) { const auto takeInt = [&]() -> std::optional { if (data.size() < sizeof(int32)) { return std::nullopt; @@ -144,35 +155,42 @@ bool Reader::Slice::processComplexCacheData( data = data.subspan(sizeof(int32)); return result; }; - const auto takeBytes = [&](int count) -> std::optional { + const auto takeBytes = [&](int count) { if (count <= 0 || data.size() < count) { - return std::nullopt; + return bytes::const_span(); } - auto result = QByteArray( - reinterpret_cast(data.data()), - count); + const auto result = data.subspan(0, count); data = data.subspan(count); return result; }; - const auto count = takeInt().value_or(0); - if (count <= 0) { - return false; + const auto maybeCount = takeInt(); + if (!maybeCount) { + return {}; + } + const auto count = *maybeCount; + if (count < 0) { + return {}; + } else if (!count) { + return data; } for (auto i = 0; i != count; ++i) { const auto offset = takeInt().value_or(0); const auto size = takeInt().value_or(0); - auto bytes = takeBytes(size).value_or(QByteArray()); + const auto bytes = takeBytes(size); if (offset < 0 - || offset >= maxSliceSize + || offset >= maxSize || size <= 0 - || size > maxSliceSize - || offset + size > maxSliceSize + || size > maxSize + || offset + size > maxSize || bytes.size() != size) { - return false; + return {}; } - parts.emplace(offset, std::move(bytes)); + parts.try_emplace( + offset, + reinterpret_cast(bytes.data()), + bytes.size()); } - return true; + return data; } void Reader::Slice::addPart(int offset, QByteArray bytes) { @@ -258,26 +276,50 @@ Reader::Slices::Slices(int size, bool useCache) : _size(size) { Expects(size > 0); - _header.flags |= Slice::Flag::Header; if (useCache) { _header.flags |= Slice::Flag::LoadingFromCache; } else { _headerMode = HeaderMode::NoCache; } - if (!fullInHeader()) { + if (!isFullInHeader()) { _data.resize((_size + kInSlice - 1) / kInSlice); } } -bool Reader::Slices::fullInHeader() const { +bool Reader::Slices::headerModeUnknown() const { + return (_headerMode == HeaderMode::Unknown); +} + +bool Reader::Slices::isFullInHeader() const { return (_size <= kMaxOnlyInHeader); } +bool Reader::Slices::isGoodHeader() const { + return (_headerMode == HeaderMode::Good); +} + +bool Reader::Slices::computeIsGoodHeader() const { + if (isFullInHeader()) { + return false; + } + const auto outsideFirstSliceIt = ranges::lower_bound( + _header.parts, + kInSlice, + ranges::less(), + &base::flat_map::value_type::first); + const auto outsideFirstSlice = end(_header.parts) - outsideFirstSliceIt; + return (outsideFirstSlice <= kPartsOutsideFirstSliceGood); +} + void Reader::Slices::headerDone(bool fromCache) { if (_headerMode != HeaderMode::Unknown) { return; } - _headerMode = HeaderMode::Small; + _headerMode = isFullInHeader() + ? HeaderMode::Full + : computeIsGoodHeader() + ? HeaderMode::Good + : HeaderMode::Small; if (!fromCache) { for (auto &slice : _data) { using Flag = Slice::Flag; @@ -289,14 +331,14 @@ void Reader::Slices::headerDone(bool fromCache) { } bool Reader::Slices::headerWontBeFilled() const { - return (_headerMode == HeaderMode::Unknown) + return headerModeUnknown() && (_header.parts.size() >= kMaxPartsInHeader); } void Reader::Slices::applyHeaderCacheData() { - if (_header.parts.empty()) { + if (_header.parts.empty() || _headerMode != HeaderMode::Unknown) { return; - } else if (fullInHeader()) { + } else if (isFullInHeader()) { headerDone(true); return; } @@ -311,28 +353,43 @@ void Reader::Slices::applyHeaderCacheData() { void Reader::Slices::processCacheResult( int sliceNumber, - QByteArray &&result) { + bytes::const_span result) { Expects(sliceNumber >= 0 && sliceNumber <= _data.size()); auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header); - if (!(slice.flags &Slice::Flag::LoadingFromCache)) { + if (!sliceNumber && isGoodHeader()) { + // We've loaded header slice because really we wanted first slice. + if (!(_data[0].flags & Slice::Flag::LoadingFromCache)) { + // We could've already unloaded this slice using LRU _usedSlices. + return; + } + // So just process whole result even if we didn't want header really. + slice.flags |= Slice::Flag::LoadingFromCache; + } + if (!(slice.flags & Slice::Flag::LoadingFromCache)) { // We could've already unloaded this slice using LRU _usedSlices. return; } - const auto success = slice.processCacheData( - std::move(result), + const auto remaining = slice.processCacheData( + result, maxSliceSize(sliceNumber)); if (!sliceNumber) { applyHeaderCacheData(); + if (isGoodHeader()) { + // When we first read header we don't request the first slice. + // But we get it, so let's apply it anyway. + _data[0].flags |= Slice::Flag::LoadingFromCache; + processCacheResult(1, remaining); + } } } void Reader::Slices::processPart( int offset, QByteArray &&bytes) { - Expects(fullInHeader() || (offset / kInSlice < _data.size())); + Expects(isFullInHeader() || (offset / kInSlice < _data.size())); - if (fullInHeader()) { + if (isFullInHeader()) { _header.addPart(offset, bytes); return; } else if (_headerMode == HeaderMode::Unknown) { @@ -357,8 +414,9 @@ auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult { if (_headerMode != HeaderMode::NoCache && !(_header.flags & Flag::LoadedFromCache)) { // Waiting for initial cache query. + Assert(_header.flags & Flag::LoadingFromCache); return {}; - } else if (fullInHeader()) { + } else if (isFullInHeader()) { return fillFromHeader(offset, buffer); } @@ -496,42 +554,84 @@ Reader::SerializedSlice Reader::Slices::serializeAndUnloadUnused() { } Reader::SerializedSlice Reader::Slices::serializeAndUnloadSlice( - int sliceNumber) { + int sliceNumber) { Expects(sliceNumber >= 0 && sliceNumber <= _data.size()); + if (isGoodHeader() && (sliceNumber == 1)) { + return serializeAndUnloadSlice(0); + } + const auto writeHeaderAndSlice = isGoodHeader() && !sliceNumber; + auto &slice = sliceNumber ? _data[sliceNumber - 1] : _header; const auto count = slice.parts.size(); Assert(count > 0); auto result = SerializedSlice(); result.number = sliceNumber; - const auto continuous = FindNotLoadedStart(slice.parts, 0); - if (continuous > slice.parts.back().first) { + + // We always use complex serialization for header + first slice. + const auto continuousTill = writeHeaderAndSlice + ? 0 + : FindNotLoadedStart(slice.parts, 0); + const auto continuous = (continuousTill > slice.parts.back().first); + if (continuous) { // All data is continuous. result.data.reserve(count * kPartSize); for (const auto &[offset, part] : slice.parts) { result.data.append(part); } } else { - const auto intSize = sizeof(int32); - result.data.reserve(count * kPartSize + 2 * intSize * (count + 1)); - const auto appendInt = [&](int value) { - auto serialized = int32(value); - result.data.append( - reinterpret_cast(&serialized), - intSize); - }; - appendInt(count); - for (const auto &[offset, part] : slice.parts) { - appendInt(offset); - appendInt(part.size()); - result.data.append(part); + result.data = serializeComplexSlice(slice); + if (writeHeaderAndSlice) { + result.data.append(serializeAndUnloadFirstSliceNoHeader()); } - if (result.data.size() == maxSliceSize(sliceNumber)) { - // Make sure this data won't be taken for full continuous data. - appendInt(0); + + // Make sure this data won't be taken for full continuous data. + const auto maxSize = maxSliceSize(sliceNumber); + while (IsContiguousSerialization(result.data.size(), maxSize)) { + result.data.push_back(char(0)); } } + + // We may serialize header in the middle of streaming, if we use + // HeaderMode::Good and we unload first slice. We still require + // header data to continue working, so don't really unload the header. + if (sliceNumber) { + slice = Slice(); + } else { + slice.flags &= ~Slice::Flag::ChangedSinceCache; + } + return result; +} + +QByteArray Reader::Slices::serializeComplexSlice(const Slice &slice) const { + auto result = QByteArray(); + const auto count = slice.parts.size(); + const auto intSize = sizeof(int32); + result.reserve(count * kPartSize + 2 * intSize * (count + 1)); + const auto appendInt = [&](int value) { + auto serialized = int32(value); + result.append( + reinterpret_cast(&serialized), + intSize); + }; + appendInt(count); + for (const auto &[offset, part] : slice.parts) { + appendInt(offset); + appendInt(part.size()); + result.append(part); + } + return result; +} + +QByteArray Reader::Slices::serializeAndUnloadFirstSliceNoHeader() { + Expects(_data[0].flags & Slice::Flag::LoadedFromCache); + + auto &slice = _data[0]; + for (const auto &[offset, part] : _header.parts) { + slice.parts.erase(offset); + } + auto result = serializeComplexSlice(slice); slice = Slice(); return result; } @@ -595,7 +695,11 @@ std::shared_ptr Reader::InitCacheHelper( // 0 is for headerData, slice index = sliceNumber - 1. void Reader::readFromCache(int sliceNumber) { Expects(_cacheHelper != nullptr); + Expects(!sliceNumber || !_slices.headerModeUnknown()); + if (sliceNumber == 1 && _slices.isGoodHeader()) { + return readFromCache(1); + } LOG(("READING FROM CACHE: %1").arg(sliceNumber)); const auto key = _cacheHelper->key(sliceNumber); const auto weak = std::weak_ptr(_cacheHelper); @@ -703,8 +807,12 @@ bool Reader::fillFromSlices(int offset, bytes::span buffer) { ++several; - if (_cacheHelper && result.toCache.number > 0) { - const auto index = result.toCache.number - 1; + if (_cacheHelper && result.toCache.number >= 0) { + // If we put to cache the header (number == 0) that means we're in + // HeaderMode::Good and really are putting the first slice to cache. + Assert(result.toCache.number > 0 || _slices.isGoodHeader()); + + const auto index = std::min(result.toCache.number, 1) - 1; cancelLoadInRange(index * kInSlice, (index + 1) * kInSlice); putToCache(std::move(result.toCache)); } @@ -747,10 +855,8 @@ bool Reader::processCacheResults() { auto loaded = base::take(_cacheHelper->results); lock.unlock(); - for (auto &&part : loaded) { - const auto sliceNumber = part.first; - auto &serialized = part.second; - _slices.processCacheResult(sliceNumber, std::move(serialized)); + for (const auto &[sliceNumber, result] : loaded) { + _slices.processCacheResult(sliceNumber, bytes::make_span(result)); } return !loaded.empty(); } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index 97efd9a53..700830249 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -77,7 +77,6 @@ private: struct Slice { enum class Flag : uchar { - Header = 0x01, LoadingFromCache = 0x02, LoadedFromCache = 0x04, ChangedSinceCache = 0x08, @@ -92,10 +91,12 @@ private: bool ready = true; }; - bool processCacheData(QByteArray &&data, int maxSliceSize); - bool processComplexCacheData( + bytes::const_span processCacheData( bytes::const_span data, - int maxSliceSize); + int maxSize); + bytes::const_span processComplexCacheData( + bytes::const_span data, + int maxSize); void addPart(int offset, QByteArray bytes); PrepareFillResult prepareFill(int from, int till); @@ -115,8 +116,11 @@ private: void headerDone(bool fromCache); [[nodiscard]] bool headerWontBeFilled() const; + [[nodiscard]] bool headerModeUnknown() const; + [[nodiscard]] bool isFullInHeader() const; + [[nodiscard]] bool isGoodHeader() const; - void processCacheResult(int sliceNumber, QByteArray &&result); + void processCacheResult(int sliceNumber, bytes::const_span result); void processPart(int offset, QByteArray &&bytes); [[nodiscard]] FillResult fill(int offset, bytes::span buffer); @@ -126,16 +130,24 @@ private: enum class HeaderMode { Unknown, Small, + Good, + Full, NoCache, }; void applyHeaderCacheData(); - int maxSliceSize(int sliceNumber) const; - SerializedSlice serializeAndUnloadSlice(int sliceNumber); - SerializedSlice serializeAndUnloadUnused(); + [[nodiscard]] int maxSliceSize(int sliceNumber) const; + [[nodiscard]] SerializedSlice serializeAndUnloadSlice( + int sliceNumber); + [[nodiscard]] SerializedSlice serializeAndUnloadUnused(); + [[nodiscard]] QByteArray serializeComplexSlice( + const Slice &slice) const; + [[nodiscard]] QByteArray serializeAndUnloadFirstSliceNoHeader(); void markSliceUsed(int sliceIndex); - bool fullInHeader() const; - FillResult fillFromHeader(int offset, bytes::span buffer); + [[nodiscard]] bool computeIsGoodHeader() const; + [[nodiscard]] FillResult fillFromHeader( + int offset, + bytes::span buffer); std::vector _data; Slice _header;