From 27018d94eeaf7f910d7098cf77a5600c5a243cb0 Mon Sep 17 00:00:00 2001 From: John Preston Date: Thu, 11 Apr 2019 17:07:48 +0400 Subject: [PATCH] Load file parts even when not streaming. --- Telegram/SourceFiles/base/thread_safe_wrap.h | 12 +- .../streaming/media_streaming_loader.cpp | 6 + .../media/streaming/media_streaming_loader.h | 2 + .../streaming/media_streaming_reader.cpp | 162 ++++++++++++++---- .../media/streaming/media_streaming_reader.h | 15 +- 5 files changed, 151 insertions(+), 46 deletions(-) diff --git a/Telegram/SourceFiles/base/thread_safe_wrap.h b/Telegram/SourceFiles/base/thread_safe_wrap.h index c0029f1b1..6d97151c7 100644 --- a/Telegram/SourceFiles/base/thread_safe_wrap.h +++ b/Telegram/SourceFiles/base/thread_safe_wrap.h @@ -36,24 +36,24 @@ private: }; -template +template typename Container = std::deque> class thread_safe_queue { public: template void emplace(Args &&...args) { - _wrap.with([&](std::vector &value) { + _wrap.with([&](Container &value) { value.emplace_back(std::forward(args)...); }); } - std::vector take() { - return _wrap.with([&](std::vector &value) { - return std::exchange(value, std::vector()); + Container take() { + return _wrap.with([&](Container &value) { + return std::exchange(value, Container()); }); } private: - thread_safe_wrap> _wrap; + thread_safe_wrap> _wrap; }; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp index c82e3fecb..a84c6b39c 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp @@ -10,6 +10,12 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL namespace Media { namespace Streaming { +bool LoadedPart::valid(int size) const { + return (offset != kFailedOffset) + && ((bytes.size() == Loader::kPartSize) + || (offset + bytes.size() == size)); +} + bool operator<( const PriorityQueue::Entry &a, const PriorityQueue::Entry &b) { diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h index 9066817ef..b9c0eb675 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h @@ -15,6 +15,8 @@ struct LoadedPart { QByteArray bytes; static constexpr auto kFailedOffset = -1; + + [[nodiscard]] bool valid(int size) const; }; class Loader { diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index a2e1736db..5e39cddb5 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -573,6 +573,50 @@ auto Reader::Slices::fillFromHeader(int offset, bytes::span buffer) return result; } +QByteArray Reader::Slices::partForDownloader(int offset) const { + Expects(offset < _size); + + if (const auto i = _header.parts.find(offset); i != end(_header.parts)) { + return i->second; + } else if (isFullInHeader()) { + return QByteArray(); + } + const auto index = offset / kInSlice; + const auto &slice = _data[index]; + const auto i = slice.parts.find(offset - index * kInSlice); + return (i != end(slice.parts)) ? i->second : QByteArray(); +} + +std::optional Reader::Slices::readCacheRequiredFor(int offset) { + Expects(offset < _size); + + using Flag = Slice::Flag; + if ((_header.flags & Flag::LoadingFromCache) || isFullInHeader()) { + return std::nullopt; + } + const auto index = offset / kInSlice; + auto &slice = _data[index]; + if (slice.flags & (Flag::LoadedFromCache | Flag::LoadingFromCache)) { + return std::nullopt; + } + slice.flags |= Flag::LoadingFromCache; + return (index + 1); +} + +bool Reader::Slices::waitForCacheRequiredFor(int offset) const { + Expects(offset < _size); + + using Flag = Slice::Flag; + if (_header.flags & Flag::LoadingFromCache) { + return true; + } else if (isFullInHeader()) { + return false; + } + const auto index = offset / kInSlice; + const auto &slice = _data[index]; + return (slice.flags & Flag::LoadingFromCache); +} + void Reader::Slices::markSliceUsed(int sliceIndex) { const auto i = ranges::find(_usedSlices, sliceIndex); const auto end = _usedSlices.end(); @@ -711,18 +755,6 @@ QByteArray Reader::Slices::serializeAndUnloadFirstSliceNoHeader() { return result; } -template -void Reader::Slices::enumerateParts(int sliceNumber, Callback &&callback) { - const auto shift = sliceNumber ? ((sliceNumber - 1) * kInSlice) : 0; - const auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header); - for (const auto &[offset, bytes] : slice.parts) { - callback(LoadedPart{ offset + shift, bytes }); - } - if (!sliceNumber && isGoodHeader()) { - enumerateParts(1, std::forward(callback)); - } -} - Reader::SerializedSlice Reader::Slices::unloadToCache() { if (_headerMode == HeaderMode::Unknown || _headerMode == HeaderMode::NoCache) { @@ -816,12 +848,84 @@ void Reader::cancelForDownloader() { } } -void Reader::processDownloaderRequests() { +void Reader::enqueueDownloaderOffsets() { + auto offsets = _downloaderOffsetRequests.take(); + if (!empty(offsets)) { + if (!empty(_offsetsForDownloader)) { + _offsetsForDownloader.insert( + end(_offsetsForDownloader), + std::make_move_iterator(begin(offsets)), + std::make_move_iterator(end(offsets))); + checkForDownloaderChange(offsets.size() + 1); + } else { + _offsetsForDownloader = std::move(offsets); + checkForDownloaderChange(offsets.size()); + } + } +} +void Reader::checkForDownloaderChange(int checkItemsCount) { + Expects(checkItemsCount <= _offsetsForDownloader.size()); + + // If a requested offset is less-or-equal of some previously requested + // offset, it means that the downloader was changed, ignore old offsets. + const auto end = _offsetsForDownloader.end(); + const auto changed = std::adjacent_find( + end - checkItemsCount, + end, + [](int first, int second) { return (second <= first); }); + if (changed != end) { + _offsetsForDownloader.erase( + begin(_offsetsForDownloader), + changed + 1); + } +} + +void Reader::checkForDownloaderReadyOffsets() { + // If a requested part is available right now we simply fire it on the + // main thread, until the first not-available-right-now offset is found. + const auto unavailable = [&](int offset) { + auto bytes = _slices.partForDownloader(offset); + if (!bytes.isEmpty()) { + crl::on_main(this, [=, bytes = std::move(bytes)]() mutable { + _partsForDownloader.fire({ offset, std::move(bytes) }); + }); + return false; + } + return true; + }; + _offsetsForDownloader.erase( + begin(_offsetsForDownloader), + ranges::find_if(_offsetsForDownloader, unavailable)); +} + +void Reader::processDownloaderRequests() { + checkForSomethingMoreReceived(); + enqueueDownloaderOffsets(); + checkForDownloaderReadyOffsets(); + if (empty(_offsetsForDownloader)) { + return; + } + + const auto offset = _offsetsForDownloader.front(); + if (_cacheHelper) { + if (const auto sliceNumber = _slices.readCacheRequiredFor(offset)) { + readFromCache(*sliceNumber); + return; + } else if (_slices.waitForCacheRequiredFor(offset)) { + return; + } + } + + _offsetsForDownloader.pop_front(); + loadAtOffset(offset); } void Reader::checkCacheResultsForDownloader() { - + if (_streamingActive) { + return; + } + processDownloaderRequests(); } bool Reader::isRemoteLoader() const { @@ -922,8 +1026,7 @@ bool Reader::fill( return false; }; - processCacheResults(); - processLoadedParts(); + checkForSomethingMoreReceived(); if (_failed) { return failed(); } @@ -934,7 +1037,7 @@ bool Reader::fill( return true; } startWaiting(); - } while (processCacheResults() || processLoadedParts()); + } while (checkForSomethingMoreReceived()); return _failed ? failed() : false; } @@ -1007,34 +1110,17 @@ bool Reader::processCacheResults() { Assert(loaded.size() > 1); Assert((loaded.begin() + 1)->first == 1); } - if (_downloaderAttached.load(std::memory_order_acquire)) { - for (const auto &[sliceNumber, result] : loaded) { - sendPartsToDownloader(sliceNumber); - } - } return !loaded.empty(); } -void Reader::sendPartsToDownloader(int sliceNumber) { - _slices.enumerateParts(sliceNumber, [&](LoadedPart &&part) { - crl::on_main(this, [=, part = std::move(part)]() mutable { - AssertIsDebug(); // maybe send them with small timeout? - _partsForDownloader.fire(std::move(part)); - }); - }); -} - bool Reader::processLoadedParts() { if (_failed) { return false; } - auto loaded = _loadedParts.take(); for (auto &part : loaded) { - if (part.offset == LoadedPart::kFailedOffset - || (part.bytes.size() != Loader::kPartSize - && part.offset + part.bytes.size() != size())) { + if (!part.valid(size())) { _failed = Error::LoadFailed; return false; } else if (!_loadingOffsets.remove(part.offset)) { @@ -1047,6 +1133,12 @@ bool Reader::processLoadedParts() { return !loaded.empty(); } +bool Reader::checkForSomethingMoreReceived() { + const auto result1 = processCacheResults(); + const auto result2 = processLoadedParts(); + return result1 || result2; +} + void Reader::loadAtOffset(int offset) { if (_loadingOffsets.add(offset)) { _loader->load(offset); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index 23233a62b..506b604de 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -137,9 +137,9 @@ private: [[nodiscard]] FillResult fill(int offset, bytes::span buffer); [[nodiscard]] SerializedSlice unloadToCache(); - // callback(LoadedPart(..)). - template - void enumerateParts(int sliceNumber, Callback &&callback); + [[nodiscard]] QByteArray partForDownloader(int offset) const; + [[nodiscard]] std::optional readCacheRequiredFor(int offset); + [[nodiscard]] bool waitForCacheRequiredFor(int offset) const; private: enum class HeaderMode { @@ -175,7 +175,6 @@ private: // 0 is for headerData, slice index = sliceNumber - 1. void readFromCache(int sliceNumber); bool processCacheResults(); - void sendPartsToDownloader(int sliceNumber); void putToCache(SerializedSlice &&data); void cancelLoadInRange(int from, int till); @@ -183,12 +182,17 @@ private: void checkLoadWillBeFirst(int offset); bool processLoadedParts(); + bool checkForSomethingMoreReceived(); + bool fillFromSlices(int offset, bytes::span buffer); void finalizeCache(); void processDownloaderRequests(); void checkCacheResultsForDownloader(); + void enqueueDownloaderOffsets(); + void checkForDownloaderChange(int checkItemsCount); + void checkForDownloaderReadyOffsets(); static std::shared_ptr InitCacheHelper( std::optional baseKey); @@ -197,7 +201,7 @@ private: const std::unique_ptr _loader; const std::shared_ptr _cacheHelper; - base::thread_safe_queue _loadedParts; + base::thread_safe_queue _loadedParts; std::atomic _waiting = nullptr; std::atomic _sleeping = nullptr; PriorityQueue _loadingOffsets; @@ -207,6 +211,7 @@ private: std::atomic _downloaderAttached = false; base::thread_safe_queue _downloaderOffsetRequests; + std::deque _offsetsForDownloader; // Main thread. rpl::event_stream _partsForDownloader;