From b2895a39ed571c59d49014e7dc9dc7aa6bb6f6cc Mon Sep 17 00:00:00 2001 From: John Preston Date: Fri, 12 Apr 2019 14:50:41 +0400 Subject: [PATCH] Register streaming loaders in Storage::Downloader. --- Telegram/SourceFiles/auth_session.cpp | 4 +- Telegram/SourceFiles/data/data_document.cpp | 2 +- .../media_streaming_loader_mtproto.cpp | 35 +++-- .../media_streaming_loader_mtproto.h | 14 +- .../streaming/media_streaming_reader.cpp | 131 +++++++++++------- .../media/streaming/media_streaming_reader.h | 28 ++-- .../SourceFiles/storage/file_download.cpp | 11 +- Telegram/SourceFiles/storage/file_download.h | 10 +- Telegram/SourceFiles/storage/file_upload.cpp | 3 +- Telegram/SourceFiles/storage/file_upload.h | 8 +- .../storage/streamed_file_downloader.cpp | 10 -- 11 files changed, 158 insertions(+), 98 deletions(-) diff --git a/Telegram/SourceFiles/auth_session.cpp b/Telegram/SourceFiles/auth_session.cpp index 70d139c3e..ca8513779 100644 --- a/Telegram/SourceFiles/auth_session.cpp +++ b/Telegram/SourceFiles/auth_session.cpp @@ -417,8 +417,8 @@ AuthSession::AuthSession(const MTPUser &user) : _autoLockTimer([this] { checkAutoLock(); }) , _api(std::make_unique(this)) , _calls(std::make_unique()) -, _downloader(std::make_unique()) -, _uploader(std::make_unique()) +, _downloader(std::make_unique(_api.get())) +, _uploader(std::make_unique(_api.get())) , _storage(std::make_unique()) , _notifications(std::make_unique(this)) , _data(std::make_unique(this)) diff --git a/Telegram/SourceFiles/data/data_document.cpp b/Telegram/SourceFiles/data/data_document.cpp index 526d940b5..bebaf6e77 100644 --- a/Telegram/SourceFiles/data/data_document.cpp +++ b/Telegram/SourceFiles/data/data_document.cpp @@ -1240,7 +1240,7 @@ auto DocumentData::createStreamingLoader( } return hasRemoteLocation() ? std::make_unique( - &session().api(), + &session().downloader(), StorageFileLocation( _dc, session().userId(), diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp index 4b1fe4673..23e193cd1 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp @@ -21,16 +21,23 @@ constexpr auto kMaxConcurrentRequests = 4; } // namespace LoaderMtproto::LoaderMtproto( - not_null api, + not_null owner, const StorageFileLocation &location, int size, Data::FileOrigin origin) -: _api(api) +: _owner(owner) , _location(location) +, _dcId(location.dcId()) , _size(size) , _origin(origin) { } +LoaderMtproto::~LoaderMtproto() { + for (const auto [index, amount] : _amountByDcIndex) { + changeRequestedAmount(index, -amount); + } +} + std::optional LoaderMtproto::baseCacheKey() const { return _location.bigFileBaseCacheKey(); } @@ -97,6 +104,11 @@ void LoaderMtproto::increasePriority() { }); } +void LoaderMtproto::changeRequestedAmount(int index, int amount) { + _owner->requestedAmountIncrement(_dcId, index, amount); + _amountByDcIndex[index] += amount; +} + void LoaderMtproto::sendNext() { if (_requests.size() >= kMaxConcurrentRequests) { return; @@ -106,20 +118,23 @@ void LoaderMtproto::sendNext() { return; } - static auto DcIndex = 0; + const auto index = _owner->chooseDcIndexForRequest(_dcId); + changeRequestedAmount(index, kPartSize); + const auto usedFileReference = _location.fileReference(); const auto id = _sender.request(MTPupload_GetFile( _location.tl(Auth().userId()), MTP_int(offset), MTP_int(kPartSize) )).done([=](const MTPupload_File &result) { + changeRequestedAmount(index, -kPartSize); requestDone(offset, result); }).fail([=](const RPCError &error) { + changeRequestedAmount(index, -kPartSize); requestFailed(offset, error, usedFileReference); - }).toDC(MTP::downloadDcId( - _location.dcId(), - (++DcIndex) % MTP::kDownloadSessionsCount - )).send(); + }).toDC( + MTP::downloadDcId(_dcId, index) + ).send(); _requests.emplace(offset, id); sendNext(); @@ -175,14 +190,14 @@ void LoaderMtproto::requestFailed( sendNext(); } }; - _api->refreshFileReference(_origin, crl::guard(this, callback)); + _owner->api().refreshFileReference( + _origin, + crl::guard(this, callback)); } rpl::producer LoaderMtproto::parts() const { return _parts.events(); } -LoaderMtproto::~LoaderMtproto() = default; - } // namespace Streaming } // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h index 5e21d3dcb..755e63dd9 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h @@ -11,7 +11,9 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "mtproto/sender.h" #include "data/data_file_origin.h" -class ApiWrap; +namespace Storage { +class Downloader; +} // namespace Storage namespace Media { namespace Streaming { @@ -19,10 +21,11 @@ namespace Streaming { class LoaderMtproto : public Loader, public base::has_weak_ptr { public: LoaderMtproto( - not_null api, + not_null owner, const StorageFileLocation &location, int size, Data::FileOrigin origin); + ~LoaderMtproto(); [[nodiscard]] auto baseCacheKey() const -> std::optional override; @@ -40,8 +43,6 @@ public: Storage::StreamedFileDownloader *downloader) override; void clearAttachedDownloader() override; - ~LoaderMtproto(); - private: void sendNext(); @@ -58,11 +59,13 @@ private: const QByteArray &encryptionIV, const QVector &hashes); void cancelForOffset(int offset); + void changeRequestedAmount(int index, int amount); - const not_null _api; + const not_null _owner; // _location can be changed with an updated file_reference. StorageFileLocation _location; + MTP::DcId _dcId = 0; const int _size = 0; const Data::FileOrigin _origin; @@ -71,6 +74,7 @@ private: PriorityQueue _requested; base::flat_map _requests; + base::flat_map _amountByDcIndex; rpl::event_stream _parts; Storage::StreamedFileDownloader *_downloader = nullptr; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index d97e7e385..2a083ea33 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -26,6 +26,7 @@ constexpr auto kSlicesInMemory = 2; // 1 MB of parts are requested from cloud ahead of reading demand. constexpr auto kPreloadPartsAhead = 8; +constexpr auto kDownloaderRequestsLimit = 4; using PartsMap = base::flat_map; @@ -591,18 +592,16 @@ bool Reader::Slices::waitingForHeaderCache() const { return (_header.flags & Slice::Flag::LoadingFromCache); } -std::optional Reader::Slices::readCacheRequiredFor(int offset) { +bool Reader::Slices::readCacheForDownloaderRequired(int offset) { Expects(offset < _size); Expects(!waitingForHeaderCache()); if (isFullInHeader()) { - return std::nullopt; + return false; } const auto index = offset / kInSlice; auto &slice = _data[index]; - return (slice.flags & Slice::Flag::LoadedFromCache) - ? std::nullopt - : std::make_optional(index + 1); + return !(slice.flags & Slice::Flag::LoadedFromCache); } void Reader::Slices::markSliceUsed(int sliceIndex) { @@ -884,35 +883,39 @@ void Reader::checkForDownloaderChange(int checkItemsCount) { _offsetsForDownloader.erase( begin(_offsetsForDownloader), changed + 1); - _downloaderSliceNumber = 0; - _downloaderSliceCache = std::nullopt; + _downloaderReadCache.clear(); + _downloaderOffsetAcks.take(); } } void Reader::checkForDownloaderReadyOffsets() { // If a requested part is available right now we simply fire it on the // main thread, until the first not-available-right-now offset is found. - const auto ready = [&](int offset, QByteArray &&bytes) { + const auto unavailableInBytes = [&](int offset, QByteArray &&bytes) { + if (bytes.isEmpty()) { + return true; + } crl::on_main(this, [=, bytes = std::move(bytes)]() mutable { _partsForDownloader.fire({ offset, std::move(bytes) }); }); - return true; + return false; + }; + const auto unavailableInCache = [&](int offset) { + const auto index = (offset / kInSlice); + const auto sliceNumber = index + 1; + const auto i = _downloaderReadCache.find(sliceNumber); + if (i == end(_downloaderReadCache) || !i->second) { + return true; + } + const auto j = i->second->find(offset - index * kInSlice); + if (j == end(*i->second)) { + return true; + } + return unavailableInBytes(offset, std::move(j->second)); }; const auto unavailable = [&](int offset) { - auto bytes = _slices.partForDownloader(offset); - if (!bytes.isEmpty()) { - return !ready(offset, std::move(bytes)); - } - const auto sliceIndex = (offset / kInSlice); - if ((sliceIndex + 1 == _downloaderSliceNumber) - && _downloaderSliceCache) { - const auto i = _downloaderSliceCache->find( - offset - sliceIndex * kInSlice); - if (i != _downloaderSliceCache->end()) { - return !ready(offset, std::move(i->second)); - } - } - return true; + return unavailableInBytes(offset, _slices.partForDownloader(offset)) + && unavailableInCache(offset); }; _offsetsForDownloader.erase( begin(_offsetsForDownloader), @@ -923,22 +926,42 @@ void Reader::processDownloaderRequests() { processCacheResults(); enqueueDownloaderOffsets(); checkForDownloaderReadyOffsets(); - if (empty(_offsetsForDownloader)) { - return; + pruneDoneDownloaderRequests(); + if (!empty(_offsetsForDownloader)) { + pruneDownloaderCache(_offsetsForDownloader.front()); + sendDownloaderRequests(); } +} - const auto offset = _offsetsForDownloader.front(); - if (_cacheHelper && downloaderWaitForCachedSlice(offset)) { - return; - } +void Reader::pruneDownloaderCache(int minimalOffset) { + const auto minimalSliceNumber = (minimalOffset / kInSlice) + 1; + const auto removeTill = ranges::lower_bound( + _downloaderReadCache, + minimalSliceNumber, + ranges::less(), + &base::flat_map>::value_type::first); + _downloaderReadCache.erase(_downloaderReadCache.begin(), removeTill); +} +void Reader::pruneDoneDownloaderRequests() { for (const auto done : _downloaderOffsetAcks.take()) { _downloaderOffsetsRequested.remove(done); + const auto i = ranges::find(_offsetsForDownloader, done); + if (i != end(_offsetsForDownloader)) { + _offsetsForDownloader.erase(i); + } } +} - _offsetsForDownloader.pop_front(); - if (_downloaderOffsetsRequested.emplace(offset).second) { - _loader->load(offset); +void Reader::sendDownloaderRequests() { + auto &&offsets = ranges::view::all( + _offsetsForDownloader + ) | ranges::view::take(kDownloaderRequestsLimit); + for (const auto offset : offsets) { + if ((!_cacheHelper || !downloaderWaitForCachedSlice(offset)) + && _downloaderOffsetsRequested.emplace(offset).second) { + _loader->load(offset); + } } } @@ -946,20 +969,22 @@ bool Reader::downloaderWaitForCachedSlice(int offset) { if (_slices.waitingForHeaderCache()) { return true; } - const auto sliceNumber = _slices.readCacheRequiredFor(offset); - if (sliceNumber.value_or(0) != _downloaderSliceNumber) { - _downloaderSliceNumber = sliceNumber.value_or(0); - _downloaderSliceCache = std::nullopt; - if (_downloaderSliceNumber) { - if (readFromCacheForDownloader()) { - return true; - } - _downloaderSliceCache = PartsMap(); - } - } else if (_downloaderSliceNumber && !_downloaderSliceCache) { - return true; + if (!_slices.readCacheForDownloaderRequired(offset)) { + return false; } - return false; + const auto sliceNumber = (offset / kInSlice) + 1; + auto i = _downloaderReadCache.find(sliceNumber); + if (i == _downloaderReadCache.end()) { + // If we didn't request that slice yet, try requesting it. + // If there is no need to (header mode is unknown) - place empty map. + // Otherwise place std::nullopt and wait for the cache result. + i = _downloaderReadCache.emplace( + sliceNumber, + (readFromCacheForDownloader(sliceNumber) + ? std::nullopt + : std::make_optional(PartsMap()))).first; + } + return !i->second; } void Reader::checkCacheResultsForDownloader() { @@ -1018,14 +1043,14 @@ void Reader::readFromCache(int sliceNumber) { }); } -bool Reader::readFromCacheForDownloader() { +bool Reader::readFromCacheForDownloader(int sliceNumber) { Expects(_cacheHelper != nullptr); - Expects(_downloaderSliceNumber > 0); + Expects(sliceNumber > 0); if (_slices.headerModeUnknown()) { return false; } - readFromCache(_downloaderSliceNumber); + readFromCache(sliceNumber); return true; } @@ -1153,10 +1178,12 @@ bool Reader::processCacheResults() { auto loaded = base::take(_cacheHelper->results); lock.unlock(); - if (_downloaderSliceNumber) { - const auto i = loaded.find(_downloaderSliceNumber); - if (i != end(loaded)) { - _downloaderSliceCache = i->second; + for (auto &[sliceNumber, cachedParts] : _downloaderReadCache) { + if (!cachedParts) { + const auto i = loaded.find(sliceNumber); + if (i != end(loaded)) { + cachedParts = i->second; + } } } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index bb720c34b..e631eaf2f 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -146,7 +146,7 @@ private: [[nodiscard]] SerializedSlice unloadToCache(); [[nodiscard]] QByteArray partForDownloader(int offset) const; - [[nodiscard]] std::optional readCacheRequiredFor(int offset); + [[nodiscard]] bool readCacheForDownloaderRequired(int offset); private: enum class HeaderMode { @@ -182,7 +182,7 @@ private: // 0 is for headerData, slice index = sliceNumber - 1. // returns false if asked for a known-empty downloader slice cache. void readFromCache(int sliceNumber); - [[nodiscard]] bool readFromCacheForDownloader(); + [[nodiscard]] bool readFromCacheForDownloader(int sliceNumber); bool processCacheResults(); void putToCache(SerializedSlice &&data); @@ -199,6 +199,9 @@ private: void processDownloaderRequests(); void checkCacheResultsForDownloader(); + void pruneDownloaderCache(int minimalOffset); + void pruneDoneDownloaderRequests(); + void sendDownloaderRequests(); [[nodiscard]] bool downloaderWaitForCachedSlice(int offset); void enqueueDownloaderOffsets(); void checkForDownloaderChange(int checkItemsCount); @@ -221,17 +224,24 @@ private: // Even if streaming had failed, the Reader can work for the downloader. std::optional _streamingError; - Storage::StreamedFileDownloader *_attachedDownloader = nullptr; - base::thread_safe_queue _downloaderOffsetRequests; - base::thread_safe_queue _downloaderOffsetAcks; - std::deque _offsetsForDownloader; - base::flat_set _downloaderOffsetsRequested; - int _downloaderSliceNumber = 0; // > 0 means we want it from cache. - std::optional _downloaderSliceCache; + // In case streaming is active both main and streaming threads have work. + // In case only downloader is active, all work is done on main thread. // Main thread. + Storage::StreamedFileDownloader *_attachedDownloader = nullptr; rpl::event_stream _partsForDownloader; bool _streamingActive = false; + + // Streaming thread. + std::deque _offsetsForDownloader; + base::flat_set _downloaderOffsetsRequested; + base::flat_map> _downloaderReadCache; + + // Communication from main thread to streaming thread. + // Streaming thread to main thread communicates using crl::on_main. + base::thread_safe_queue _downloaderOffsetRequests; + base::thread_safe_queue _downloaderOffsetAcks; + rpl::lifetime _lifetime; }; diff --git a/Telegram/SourceFiles/storage/file_download.cpp b/Telegram/SourceFiles/storage/file_download.cpp index c874fb474..14faaea3c 100644 --- a/Telegram/SourceFiles/storage/file_download.cpp +++ b/Telegram/SourceFiles/storage/file_download.cpp @@ -36,8 +36,9 @@ constexpr auto kMaxWebFileQueries = 8; } // namespace -Downloader::Downloader() -: _killDownloadSessionsTimer([=] { killDownloadSessions(); }) +Downloader::Downloader(not_null api) +: _api(api) +, _killDownloadSessionsTimer([=] { killDownloadSessions(); }) , _queueForWeb(kMaxWebFileQueries) { } @@ -48,14 +49,16 @@ void Downloader::clearPriorities() { void Downloader::requestedAmountIncrement(MTP::DcId dcId, int index, int amount) { Expects(index >= 0 && index < MTP::kDownloadSessionsCount); + using namespace rpl::mappers; + auto it = _requestedBytesAmount.find(dcId); if (it == _requestedBytesAmount.cend()) { it = _requestedBytesAmount.emplace(dcId, RequestedInDc { { 0 } }).first; } it->second[index] += amount; - if (it->second[index]) { + if (amount > 0) { killDownloadSessionsStop(dcId); - } else { + } else if (ranges::find_if(it->second, _1 > 0) == end(it->second)) { killDownloadSessionsStart(dcId); } } diff --git a/Telegram/SourceFiles/storage/file_download.h b/Telegram/SourceFiles/storage/file_download.h index 2727e87b5..b949787d9 100644 --- a/Telegram/SourceFiles/storage/file_download.h +++ b/Telegram/SourceFiles/storage/file_download.h @@ -12,6 +12,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "base/binary_guard.h" #include "data/data_file_origin.h" +class ApiWrap; + namespace Storage { namespace Cache { struct Key; @@ -35,9 +37,13 @@ public: FileLoader *end = nullptr; }; - Downloader(); + explicit Downloader(not_null api); ~Downloader(); + ApiWrap &api() const { + return *_api; + } + int currentPriority() const { return _priority; } @@ -58,6 +64,8 @@ private: void killDownloadSessionsStop(MTP::DcId dcId); void killDownloadSessions(); + not_null _api; + base::Observable _taskFinishedObservable; int _priority = 1; diff --git a/Telegram/SourceFiles/storage/file_upload.cpp b/Telegram/SourceFiles/storage/file_upload.cpp index 66156880d..af3d36ac3 100644 --- a/Telegram/SourceFiles/storage/file_upload.cpp +++ b/Telegram/SourceFiles/storage/file_upload.cpp @@ -140,7 +140,8 @@ const QString &Uploader::File::filename() const { return file ? file->filename : media.filename; } -Uploader::Uploader() { +Uploader::Uploader(not_null api) +: _api(api) { nextTimer.setSingleShot(true); connect(&nextTimer, SIGNAL(timeout()), this, SLOT(sendNext())); stopSessionsTimer.setSingleShot(true); diff --git a/Telegram/SourceFiles/storage/file_upload.h b/Telegram/SourceFiles/storage/file_upload.h index a02f7aa83..16e3f8d30 100644 --- a/Telegram/SourceFiles/storage/file_upload.h +++ b/Telegram/SourceFiles/storage/file_upload.h @@ -9,6 +9,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL struct FileLoadResult; struct SendMediaReady; +class ApiWrap; namespace Storage { @@ -53,7 +54,9 @@ class Uploader : public QObject, public RPCSender { Q_OBJECT public: - Uploader(); + explicit Uploader(not_null api); + ~Uploader(); + void uploadMedia(const FullMsgId &msgId, const SendMediaReady &image); void upload( const FullMsgId &msgId, @@ -96,8 +99,6 @@ public: return _secureFailed.events(); } - ~Uploader(); - public slots: void unpause(); void sendNext(); @@ -111,6 +112,7 @@ private: void currentFailed(); + not_null _api; base::flat_map requestsSent; base::flat_map docRequestsSent; base::flat_map dcMap; diff --git a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp index 8edd0baa0..12c929fee 100644 --- a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp +++ b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp @@ -125,11 +125,6 @@ bool StreamedFileDownloader::loadPart() { _nextPartIndex = index + 1; _reader->loadForDownloader(this, index * kPartSize); - AssertIsDebug(); - //_downloader->requestedAmountIncrement( - // requestData.dcId, - // requestData.dcIndex, - // kPartSize); ++_partsRequested; ++_queue->queriesCount; @@ -154,11 +149,6 @@ void StreamedFileDownloader::savePart(const LoadedPart &part) { ++_partsSaved; if (index < _nextPartIndex) { - AssertIsDebug(); - //_downloader->requestedAmountIncrement( - // requestData.dcId, - // requestData.dcIndex, - // -kPartSize); --_partsRequested; --_queue->queriesCount; }