diff --git a/Telegram/SourceFiles/data/data_document.cpp b/Telegram/SourceFiles/data/data_document.cpp index 6416dd16d..f130f2c62 100644 --- a/Telegram/SourceFiles/data/data_document.cpp +++ b/Telegram/SourceFiles/data/data_document.cpp @@ -871,9 +871,8 @@ void DocumentData::save( id, _dc, origin, - (saveToCache() - ? std::make_optional(Data::DocumentCacheKey(_dc, id)) - : std::nullopt), + Data::DocumentCacheKey(_dc, id), + mediaKey(), std::move(reader), toFile, size, diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp index f0a96700a..5b53285a5 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp @@ -55,7 +55,7 @@ int File::Context::read(bytes::span buffer) { _semaphore.acquire(); if (_interrupted) { return -1; - } else if (const auto error = _reader->failed()) { + } else if (const auto error = _reader->streamingError()) { fail(*error); return -1; } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index 5e39cddb5..7e2099a24 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -596,25 +596,9 @@ std::optional Reader::Slices::readCacheRequiredFor(int offset) { } const auto index = offset / kInSlice; auto &slice = _data[index]; - if (slice.flags & (Flag::LoadedFromCache | Flag::LoadingFromCache)) { - return std::nullopt; - } - slice.flags |= Flag::LoadingFromCache; - return (index + 1); -} - -bool Reader::Slices::waitForCacheRequiredFor(int offset) const { - Expects(offset < _size); - - using Flag = Slice::Flag; - if (_header.flags & Flag::LoadingFromCache) { - return true; - } else if (isFullInHeader()) { - return false; - } - const auto index = offset / kInSlice; - const auto &slice = _data[index]; - return (slice.flags & Flag::LoadingFromCache); + return (slice.flags & Flag::LoadedFromCache) + ? std::nullopt + : std::make_optional(index + 1); } void Reader::Slices::markSliceUsed(int sliceIndex) { @@ -841,6 +825,12 @@ void Reader::loadForDownloader(int offset) { } } +void Reader::doneForDownloader(int offset) { + if (_downloaderOffsetsRequested.remove(offset) && !_streamingActive) { + processDownloaderRequests(); + } +} + void Reader::cancelForDownloader() { if (_downloaderAttached.load(std::memory_order_acquire)) { _downloaderOffsetRequests.take(); @@ -878,19 +868,33 @@ void Reader::checkForDownloaderChange(int checkItemsCount) { _offsetsForDownloader.erase( begin(_offsetsForDownloader), changed + 1); + _downloaderSliceNumber = 0; + _downloaderSliceCache = std::nullopt; } } void Reader::checkForDownloaderReadyOffsets() { // If a requested part is available right now we simply fire it on the // main thread, until the first not-available-right-now offset is found. + const auto ready = [&](int offset, QByteArray &&bytes) { + crl::on_main(this, [=, bytes = std::move(bytes)]() mutable { + _partsForDownloader.fire({ offset, std::move(bytes) }); + }); + return true; + }; const auto unavailable = [&](int offset) { auto bytes = _slices.partForDownloader(offset); if (!bytes.isEmpty()) { - crl::on_main(this, [=, bytes = std::move(bytes)]() mutable { - _partsForDownloader.fire({ offset, std::move(bytes) }); - }); - return false; + return !ready(offset, std::move(bytes)); + } + const auto sliceIndex = (offset / kInSlice); + if ((sliceIndex + 1 == _downloaderSliceNumber) + && _downloaderSliceCache) { + const auto i = _downloaderSliceCache->find( + offset - sliceIndex * kInSlice); + if (i != _downloaderSliceCache->end()) { + return !ready(offset, std::move(i->second)); + } } return true; }; @@ -900,7 +904,7 @@ void Reader::checkForDownloaderReadyOffsets() { } void Reader::processDownloaderRequests() { - checkForSomethingMoreReceived(); + processCacheResults(); enqueueDownloaderOffsets(); checkForDownloaderReadyOffsets(); if (empty(_offsetsForDownloader)) { @@ -908,17 +912,31 @@ void Reader::processDownloaderRequests() { } const auto offset = _offsetsForDownloader.front(); - if (_cacheHelper) { - if (const auto sliceNumber = _slices.readCacheRequiredFor(offset)) { - readFromCache(*sliceNumber); - return; - } else if (_slices.waitForCacheRequiredFor(offset)) { - return; - } + if (_cacheHelper && downloaderWaitForCachedSlice(offset)) { + return; } _offsetsForDownloader.pop_front(); - loadAtOffset(offset); + if (_downloaderOffsetsRequested.emplace(offset).second) { + _loader->load(offset); + } +} + +bool Reader::downloaderWaitForCachedSlice(int offset) { + const auto sliceNumber = _slices.readCacheRequiredFor(offset); + if (sliceNumber.value_or(0) != _downloaderSliceNumber) { + _downloaderSliceNumber = sliceNumber.value_or(0); + _downloaderSliceCache = std::nullopt; + if (_downloaderSliceNumber) { + if (readFromCacheForDownloader()) { + return true; + } + _downloaderSliceCache = PartsMap(); + } + } else if (_downloaderSliceNumber && !_downloaderSliceCache) { + return true; + } + return false; } void Reader::checkCacheResultsForDownloader() { @@ -977,6 +995,17 @@ void Reader::readFromCache(int sliceNumber) { }); } +bool Reader::readFromCacheForDownloader() { + Expects(_cacheHelper != nullptr); + Expects(_downloaderSliceNumber > 0); + + if (_slices.headerModeUnknown()) { + return false; + } + readFromCache(_downloaderSliceNumber); + return true; +} + void Reader::putToCache(SerializedSlice &&slice) { Expects(_cacheHelper != nullptr); Expects(slice.number >= 0); @@ -990,8 +1019,8 @@ int Reader::size() const { return _loader->size(); } -std::optional Reader::failed() const { - return _failed; +std::optional Reader::streamingError() const { + return _streamingError; } void Reader::headerDone() { @@ -1027,7 +1056,7 @@ bool Reader::fill( }; checkForSomethingMoreReceived(); - if (_failed) { + if (_streamingError) { return failed(); } @@ -1039,7 +1068,7 @@ bool Reader::fill( startWaiting(); } while (checkForSomethingMoreReceived()); - return _failed ? failed() : false; + return _streamingError ? failed() : false; } bool Reader::fillFromSlices(int offset, bytes::span buffer) { @@ -1047,7 +1076,7 @@ bool Reader::fillFromSlices(int offset, bytes::span buffer) { auto result = _slices.fill(offset, buffer); if (!result.filled && _slices.headerWontBeFilled()) { - _failed = Error::NotStreamable; + _streamingError = Error::NotStreamable; return false; } @@ -1079,7 +1108,9 @@ void Reader::cancelLoadInRange(int from, int till) { Expects(from < till); for (const auto offset : _loadingOffsets.takeInRange(from, till)) { - _loader->cancel(offset); + if (!_downloaderOffsetsRequested.contains(offset)) { + _loader->cancel(offset); + } } } @@ -1093,14 +1124,22 @@ void Reader::checkLoadWillBeFirst(int offset) { bool Reader::processCacheResults() { if (!_cacheHelper) { return false; - } else if (_failed) { - return false; } QMutexLocker lock(&_cacheHelper->mutex); auto loaded = base::take(_cacheHelper->results); lock.unlock(); + if (_downloaderSliceNumber) { + const auto i = loaded.find(_downloaderSliceNumber); + if (i != end(loaded)) { + _downloaderSliceCache = i->second; + } + } + + if (_streamingError) { + return false; + } for (auto &[sliceNumber, result] : loaded) { _slices.processCacheResult(sliceNumber, std::move(result)); } @@ -1114,14 +1153,14 @@ bool Reader::processCacheResults() { } bool Reader::processLoadedParts() { - if (_failed) { + if (_streamingError) { return false; } auto loaded = _loadedParts.take(); for (auto &part : loaded) { if (!part.valid(size())) { - _failed = Error::LoadFailed; + _streamingError = Error::LoadFailed; return false; } else if (!_loadingOffsets.remove(part.offset)) { continue; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index 506b604de..a2a5e9626 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -43,7 +43,7 @@ public: int offset, bytes::span buffer, not_null notify); - [[nodiscard]] std::optional failed() const; + [[nodiscard]] std::optional streamingError() const; void headerDone(); // Thread safe. @@ -56,6 +56,7 @@ public: void stopStreaming(bool stillActive = false); [[nodiscard]] rpl::producer partsForDownloader() const; void loadForDownloader(int offset); + void doneForDownloader(int offset); void cancelForDownloader(); ~Reader(); @@ -139,7 +140,6 @@ private: [[nodiscard]] QByteArray partForDownloader(int offset) const; [[nodiscard]] std::optional readCacheRequiredFor(int offset); - [[nodiscard]] bool waitForCacheRequiredFor(int offset) const; private: enum class HeaderMode { @@ -173,7 +173,9 @@ private: }; // 0 is for headerData, slice index = sliceNumber - 1. + // returns false if asked for a known-empty downloader slice cache. void readFromCache(int sliceNumber); + [[nodiscard]] bool readFromCacheForDownloader(); bool processCacheResults(); void putToCache(SerializedSlice &&data); @@ -190,6 +192,7 @@ private: void processDownloaderRequests(); void checkCacheResultsForDownloader(); + [[nodiscard]] bool downloaderWaitForCachedSlice(int offset); void enqueueDownloaderOffsets(); void checkForDownloaderChange(int checkItemsCount); void checkForDownloaderReadyOffsets(); @@ -207,11 +210,16 @@ private: PriorityQueue _loadingOffsets; Slices _slices; - std::optional _failed; + + // Even if streaming had failed, the Reader can work for the downloader. + std::optional _streamingError; std::atomic _downloaderAttached = false; base::thread_safe_queue _downloaderOffsetRequests; std::deque _offsetsForDownloader; + base::flat_set _downloaderOffsetsRequested; + int _downloaderSliceNumber = 0; // > 0 means we want it from cache. + std::optional _downloaderSliceCache; // Main thread. rpl::event_stream _partsForDownloader; diff --git a/Telegram/SourceFiles/storage/file_download.cpp b/Telegram/SourceFiles/storage/file_download.cpp index 4c9a8c5eb..951b076d6 100644 --- a/Telegram/SourceFiles/storage/file_download.cpp +++ b/Telegram/SourceFiles/storage/file_download.cpp @@ -473,8 +473,8 @@ bool FileLoader::tryLoadLocal() { } const auto weak = make_weak(this); - if (const auto key = cacheKey()) { - loadLocal(*key); + if (_toCache == LoadToCacheAsWell) { + loadLocal(cacheKey()); emit progress(this); } if (!weak) { @@ -594,17 +594,18 @@ bool FileLoader::finalizeResult() { if (_localStatus == LocalStatus::NotFound) { if (const auto key = fileLocationKey()) { - Local::writeFileLocation(*key, FileLocation(_filename)); - } - if (const auto key = cacheKey()) { - if (_data.size() <= Storage::kMaxFileInMemory) { - Auth().data().cache().put( - *key, - Storage::Cache::Database::TaggedValue( - base::duplicate(_data), - _cacheTag)); + if (!_filename.isEmpty()) { + Local::writeFileLocation(*key, FileLocation(_filename)); } } + if ((_toCache == LoadToCacheAsWell) + && (_data.size() <= Storage::kMaxFileInMemory)) { + Auth().data().cache().put( + cacheKey(), + Storage::Cache::Database::TaggedValue( + base::duplicate(_data), + _cacheTag)); + } } _downloader->taskFinished().notify(); return true; @@ -1161,20 +1162,18 @@ void mtpFileLoader::changeCDNParams( makeRequest(offset); } -std::optional mtpFileLoader::cacheKey() const { +Storage::Cache::Key mtpFileLoader::cacheKey() const { return _location.match([&](const WebFileLocation &location) { - return std::make_optional(Data::WebDocumentCacheKey(location)); + return Data::WebDocumentCacheKey(location); }, [&](const GeoPointLocation &location) { - return std::make_optional(Data::GeoPointCacheKey(location)); + return Data::GeoPointCacheKey(location); }, [&](const StorageFileLocation &location) { - return (_toCache == LoadToCacheAsWell) - ? std::make_optional(location.cacheKey()) - : std::nullopt; + return location.cacheKey(); }); } std::optional mtpFileLoader::fileLocationKey() const { - if (_locationType != UnknownFileLocation && !_filename.isEmpty()) { + if (_locationType != UnknownFileLocation) { return mediaKey(_locationType, dcId(), objId()); } return std::nullopt; @@ -1246,7 +1245,7 @@ void webFileLoader::loadError() { cancel(true); } -std::optional webFileLoader::cacheKey() const { +Storage::Cache::Key webFileLoader::cacheKey() const { return Data::UrlCacheKey(_url); } diff --git a/Telegram/SourceFiles/storage/file_download.h b/Telegram/SourceFiles/storage/file_download.h index 4277b7ae9..9ffbce4a0 100644 --- a/Telegram/SourceFiles/storage/file_download.h +++ b/Telegram/SourceFiles/storage/file_download.h @@ -172,7 +172,7 @@ protected: bool tryLoadLocal(); void loadLocal(const Storage::Cache::Key &key); - virtual std::optional cacheKey() const = 0; + virtual Storage::Cache::Key cacheKey() const = 0; virtual std::optional fileLocationKey() const = 0; virtual void cancelRequests() = 0; @@ -273,7 +273,7 @@ private: int limit = 0; QByteArray hash; }; - std::optional cacheKey() const override; + Storage::Cache::Key cacheKey() const override; std::optional fileLocationKey() const override; void cancelRequests() override; @@ -358,7 +358,7 @@ public: protected: void cancelRequests() override; - std::optional cacheKey() const override; + Storage::Cache::Key cacheKey() const override; std::optional fileLocationKey() const override; bool loadPart() override; diff --git a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp index f593e1f08..f9783fed9 100644 --- a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp +++ b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp @@ -23,7 +23,8 @@ StreamedFileDownloader::StreamedFileDownloader( uint64 objectId, MTP::DcId dcId, Data::FileOrigin origin, - std::optional cacheKey, + Cache::Key cacheKey, + MediaKey fileLocationKey, std::shared_ptr reader, // For FileLoader @@ -45,6 +46,7 @@ StreamedFileDownloader::StreamedFileDownloader( , _objectId(objectId) , _origin(origin) , _cacheKey(cacheKey) +, _fileLocationKey(fileLocationKey) , _reader(std::move(reader)) { _partIsSaved.resize((size + kPartSize - 1) / kPartSize, false); @@ -76,12 +78,12 @@ void StreamedFileDownloader::stop() { cancelRequests(); } -std::optional StreamedFileDownloader::cacheKey() const { +Storage::Cache::Key StreamedFileDownloader::cacheKey() const { return _cacheKey; } std::optional StreamedFileDownloader::fileLocationKey() const { - return std::nullopt; AssertIsDebug(); + return _fileLocationKey; } void StreamedFileDownloader::cancelRequests() { @@ -109,7 +111,7 @@ bool StreamedFileDownloader::loadPart() { return false; } _nextPartIndex = index + 1; - _reader->loadForDownloader(index); + _reader->loadForDownloader(index * kPartSize); AssertIsDebug(); //_downloader->requestedAmountIncrement( // requestData.dcId, @@ -123,11 +125,13 @@ bool StreamedFileDownloader::loadPart() { void StreamedFileDownloader::savePart(const LoadedPart &part) { Expects(part.offset >= 0 && part.offset < _reader->size()); Expects(part.offset % kPartSize == 0); + if (_finished || _cancelled) { return; } - const auto index = part.offset / kPartSize; + const auto offset = part.offset; + const auto index = offset / kPartSize; Assert(index >= 0 && index < _partIsSaved.size()); if (_partIsSaved[index]) { return; @@ -142,7 +146,7 @@ void StreamedFileDownloader::savePart(const LoadedPart &part) { // -kPartSize); --_queue->queriesCount; } - if (!writeResultPart(part.offset, bytes::make_span(part.bytes))) { + if (!writeResultPart(offset, bytes::make_span(part.bytes))) { return; } if (ranges::find(_partIsSaved, false) == end(_partIsSaved)) { @@ -150,6 +154,7 @@ void StreamedFileDownloader::savePart(const LoadedPart &part) { return; } } + _reader->doneForDownloader(offset); notifyAboutProgress(); } diff --git a/Telegram/SourceFiles/storage/streamed_file_downloader.h b/Telegram/SourceFiles/storage/streamed_file_downloader.h index 143154897..dfc9827f0 100644 --- a/Telegram/SourceFiles/storage/streamed_file_downloader.h +++ b/Telegram/SourceFiles/storage/streamed_file_downloader.h @@ -25,7 +25,8 @@ public: uint64 objectId, MTP::DcId dcId, Data::FileOrigin origin, - std::optional cacheKey, + Cache::Key cacheKey, + MediaKey fileLocationKey, std::shared_ptr reader, // For FileLoader @@ -43,7 +44,7 @@ public: void stop() override; private: - std::optional cacheKey() const override; + Cache::Key cacheKey() const override; std::optional fileLocationKey() const override; void cancelRequests() override; bool loadPart() override; @@ -52,7 +53,8 @@ private: uint64 _objectId = 0; Data::FileOrigin _origin; - std::optional _cacheKey; + Cache::Key _cacheKey; + MediaKey _fileLocationKey; std::shared_ptr _reader; std::vector _partIsSaved; // vector :D