diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h index df476872b..51c486e7b 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h @@ -40,7 +40,7 @@ public: [[nodiscard]] virtual rpl::producer parts() const = 0; virtual void attachDownloader( - Storage::StreamedFileDownloader *downloader) = 0; + not_null downloader) = 0; virtual void clearAttachedDownloader() = 0; virtual ~Loader() = default; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp index f5620c1c8..00cef555d 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp @@ -79,7 +79,7 @@ rpl::producer LoaderLocal::parts() const { } void LoaderLocal::attachDownloader( - Storage::StreamedFileDownloader *downloader) { + not_null downloader) { Unexpected("Downloader attached to a local streaming loader."); } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h index d741b18fc..b7436e2a5 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h @@ -33,7 +33,7 @@ public: [[nodiscard]] rpl::producer parts() const override; void attachDownloader( - Storage::StreamedFileDownloader *downloader) override; + not_null downloader) override; void clearAttachedDownloader() override; private: diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp index ca93ac426..6a713909b 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp @@ -91,7 +91,7 @@ void LoaderMtproto::cancelForOffset(int offset) { } void LoaderMtproto::attachDownloader( - Storage::StreamedFileDownloader *downloader) { + not_null downloader) { _downloader = downloader; } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h index 0a32717df..a7f16f4f8 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h @@ -40,7 +40,7 @@ public: [[nodiscard]] rpl::producer parts() const override; void attachDownloader( - Storage::StreamedFileDownloader *downloader) override; + not_null downloader) override; void clearAttachedDownloader() override; private: diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index 79bdfcdd1..15d41aad8 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -906,7 +906,7 @@ rpl::producer Reader::partsForDownloader() const { } void Reader::loadForDownloader( - Storage::StreamedFileDownloader *downloader, + not_null downloader, int offset) { if (_attachedDownloader != downloader) { if (_attachedDownloader) { @@ -931,7 +931,7 @@ void Reader::doneForDownloader(int offset) { } void Reader::cancelForDownloader( - Storage::StreamedFileDownloader *downloader) { + not_null downloader) { if (_attachedDownloader == downloader) { _downloaderOffsetRequests.take(); _attachedDownloader = nullptr; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index 7ea333987..45f24187c 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -61,10 +61,11 @@ public: void stopStreaming(bool stillActive = false); [[nodiscard]] rpl::producer partsForDownloader() const; void loadForDownloader( - Storage::StreamedFileDownloader *downloader, + not_null downloader, int offset); void doneForDownloader(int offset); - void cancelForDownloader(Storage::StreamedFileDownloader *downloader); + void cancelForDownloader( + not_null downloader); ~Reader(); diff --git a/Telegram/SourceFiles/mtproto/facade.h b/Telegram/SourceFiles/mtproto/facade.h index 75868ce83..037085366 100644 --- a/Telegram/SourceFiles/mtproto/facade.h +++ b/Telegram/SourceFiles/mtproto/facade.h @@ -34,13 +34,13 @@ constexpr ShiftedDcId updaterDcId(DcId dcId) { return ShiftDcId(dcId, kUpdaterDcShift); } -constexpr auto kDownloadSessionsCount = 2; constexpr auto kUploadSessionsCount = 2; namespace details { constexpr ShiftedDcId downloadDcId(DcId dcId, int index) { - static_assert(kDownloadSessionsCount < kMaxMediaDcCount, "Too large MTPDownloadSessionsCount!"); + Expects(index < kMaxMediaDcCount); + return ShiftDcId(dcId, kBaseDownloadDcShift + index); }; @@ -48,13 +48,12 @@ constexpr ShiftedDcId downloadDcId(DcId dcId, int index) { // send(req, callbacks, MTP::downloadDcId(dc, index)) - for download shifted dc id inline ShiftedDcId downloadDcId(DcId dcId, int index) { - Expects(index >= 0 && index < kDownloadSessionsCount); return details::downloadDcId(dcId, index); } inline constexpr bool isDownloadDcId(ShiftedDcId shiftedDcId) { return (shiftedDcId >= details::downloadDcId(0, 0)) - && (shiftedDcId < details::downloadDcId(0, kDownloadSessionsCount - 1) + kDcShift); + && (shiftedDcId < details::downloadDcId(0, kMaxMediaDcCount - 1) + kDcShift); } inline bool isCdnDc(MTPDdcOption::Flags flags) { diff --git a/Telegram/SourceFiles/mtproto/session_private.cpp b/Telegram/SourceFiles/mtproto/session_private.cpp index 6dc25c250..9ee0039a5 100644 --- a/Telegram/SourceFiles/mtproto/session_private.cpp +++ b/Telegram/SourceFiles/mtproto/session_private.cpp @@ -1033,8 +1033,6 @@ void SessionPrivate::onSentSome(uint64 size) { } if (isUploadDcId(_shiftedDcId)) { remain *= kUploadSessionsCount; - } else if (isDownloadDcId(_shiftedDcId)) { - remain *= kDownloadSessionsCount; } _waitForReceivedTimer.callOnce(remain); } diff --git a/Telegram/SourceFiles/storage/file_download.cpp b/Telegram/SourceFiles/storage/file_download.cpp index 9dbef1d25..0be8c9cfd 100644 --- a/Telegram/SourceFiles/storage/file_download.cpp +++ b/Telegram/SourceFiles/storage/file_download.cpp @@ -43,6 +43,9 @@ constexpr auto kMaxWebFileQueries = 8; // fixed part size download for hash checking. constexpr auto kPartSize = 128 * 1024; +constexpr auto kStartSessionsCount = 1; +constexpr auto kMaxSessionsCount = 8; + } // namespace void DownloadManager::Queue::enqueue(not_null loader) { @@ -121,6 +124,7 @@ void DownloadManager::checkSendNext() { const auto bestIndex = [&] { const auto i = _requestedBytesAmount.find(dcId); if (i == end(_requestedBytesAmount)) { + _requestedBytesAmount[dcId].resize(kStartSessionsCount); return 0; } const auto j = ranges::min_element(i->second); @@ -136,6 +140,9 @@ void DownloadManager::checkSendNext() { loader->loadPart(bestIndex); } } + if (_requestedBytesAmount[0].empty()) { + _requestedBytesAmount[0] = std::vector(1, 0); + } if (_requestedBytesAmount[0][0] < kMaxWebFileQueries) { if (const auto loader = _webLoaders.nextLoader()) { loader->loadPart(0); @@ -147,13 +154,14 @@ void DownloadManager::requestedAmountIncrement( MTP::DcId dcId, int index, int amount) { - Expects(index >= 0 && index < MTP::kDownloadSessionsCount); - using namespace rpl::mappers; auto it = _requestedBytesAmount.find(dcId); if (it == _requestedBytesAmount.end()) { - it = _requestedBytesAmount.emplace(dcId, RequestedInDc { { 0 } }).first; + it = _requestedBytesAmount.emplace( + dcId, + std::vector(dcId ? kStartSessionsCount : 1, 0) + ).first; } it->second[index] += amount; if (!dcId) { @@ -163,6 +171,7 @@ void DownloadManager::requestedAmountIncrement( killDownloadSessionsStop(dcId); } else if (ranges::find_if(it->second, _1 > 0) == end(it->second)) { killDownloadSessionsStart(dcId); + checkSendNext(); } } @@ -197,8 +206,11 @@ void DownloadManager::killDownloadSessions() { auto left = kKillSessionTimeout; for (auto i = _killDownloadSessionTimes.begin(); i != _killDownloadSessionTimes.end(); ) { if (i->second <= now) { - for (int j = 0; j < MTP::kDownloadSessionsCount; ++j) { - MTP::stopSession(MTP::downloadDcId(i->first, j)); + const auto j = _requestedBytesAmount.find(i->first); + if (j != end(_requestedBytesAmount)) { + for (auto index = 0; index != int(j->second.size()); ++index) { + MTP::stopSession(MTP::downloadDcId(i->first, index)); + } } i = _killDownloadSessionTimes.erase(i); } else { @@ -458,8 +470,6 @@ void FileLoader::cancel(bool fail) { } _data = QByteArray(); - const auto downloader = _downloader; - const auto sessionGuard = &session(); const auto weak = QPointer(this); if (fail) { emit failed(this, started); diff --git a/Telegram/SourceFiles/storage/file_download.h b/Telegram/SourceFiles/storage/file_download.h index c4702611a..73e7e7763 100644 --- a/Telegram/SourceFiles/storage/file_download.h +++ b/Telegram/SourceFiles/storage/file_download.h @@ -41,7 +41,7 @@ public: [[nodiscard]] virtual MTP::DcId dcId() const = 0; [[nodiscard]] virtual bool readyToRequest() const = 0; - [[nodiscard]] virtual void loadPart(int dcIndex) = 0; + virtual void loadPart(int dcIndex) = 0; }; @@ -91,8 +91,7 @@ private: base::Observable _taskFinishedObservable; - using RequestedInDc = std::array; - base::flat_map _requestedBytesAmount; + base::flat_map> _requestedBytesAmount; base::flat_map _killDownloadSessionTimes; base::Timer _killDownloadSessionsTimer;