From ebf2a678b1f37c3ad7cc2507d16b0eda11747351 Mon Sep 17 00:00:00 2001 From: John Preston Date: Thu, 11 Apr 2019 11:59:18 +0400 Subject: [PATCH] Use a special FileLoader for streamed documents. --- Telegram/SourceFiles/data/data_document.cpp | 5 +- .../streaming/media_streaming_reader.cpp | 58 +++- .../media/streaming/media_streaming_reader.h | 30 +- .../media/view/media_view_overlay_widget.cpp | 20 +- .../media/view/media_view_overlay_widget.h | 1 - .../SourceFiles/storage/file_download.cpp | 309 ++++++++---------- Telegram/SourceFiles/storage/file_download.h | 42 ++- .../storage/streamed_file_downloader.cpp | 90 ++++- .../storage/streamed_file_downloader.h | 9 +- 9 files changed, 345 insertions(+), 219 deletions(-) diff --git a/Telegram/SourceFiles/data/data_document.cpp b/Telegram/SourceFiles/data/data_document.cpp index a5f1b73ab..6416dd16d 100644 --- a/Telegram/SourceFiles/data/data_document.cpp +++ b/Telegram/SourceFiles/data/data_document.cpp @@ -866,9 +866,10 @@ void DocumentData::save( } } else { status = FileReady; -/* if (auto reader = owner().documentStreamedReader(this, origin)) { + if (auto reader = owner().documentStreamedReader(this, origin)) { _loader = new Storage::StreamedFileDownloader( id, + _dc, origin, (saveToCache() ? std::make_optional(Data::DocumentCacheKey(_dc, id)) @@ -881,7 +882,7 @@ void DocumentData::save( fromCloud, autoLoading, cacheTag()); - } else */if (hasWebLocation()) { + } else if (hasWebLocation()) { _loader = new mtpFileLoader( _urlLocation, size, diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index e6c7426ad..75c571281 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -670,6 +670,18 @@ QByteArray Reader::Slices::serializeAndUnloadFirstSliceNoHeader() { return result; } +template +void Reader::Slices::enumerateParts(int sliceNumber, Callback &&callback) { + const auto shift = sliceNumber ? ((sliceNumber - 1) * kInSlice) : 0; + const auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header); + for (const auto &[offset, bytes] : slice.parts) { + callback(LoadedPart{ offset + shift, bytes }); + } + if (!sliceNumber && isGoodHeader()) { + enumerateParts(1, std::forward(callback)); + } +} + Reader::SerializedSlice Reader::Slices::unloadToCache() { if (_headerMode == HeaderMode::Unknown || _headerMode == HeaderMode::NoCache) { @@ -695,9 +707,11 @@ Reader::Reader( , _slices(_loader->size(), _cacheHelper != nullptr) { _loader->parts( ) | rpl::start_with_next([=](LoadedPart &&part) { - QMutexLocker lock(&_loadedPartsMutex); - _loadedParts.push_back(std::move(part)); - lock.unlock(); + if (_downloaderAttached.load(std::memory_order_acquire)) { + _partsForDownloader.fire_copy(part); + } + + _loadedParts.emplace(std::move(part)); if (const auto waiting = _waiting.load()) { _waiting = nullptr; @@ -714,6 +728,23 @@ void Reader::stop() { _waiting = nullptr; } +rpl::producer Reader::partsForDownloader() const { + return _partsForDownloader.events(); +} + +void Reader::loadForDownloader(int offset) { + _downloaderAttached.store(true, std::memory_order_release); + _downloaderOffsetRequests.emplace(offset); + AssertIsDebug(); // wake? +} + +void Reader::cancelForDownloader() { + if (_downloaderAttached.load(std::memory_order_acquire)) { + _downloaderOffsetRequests.take(); + _downloaderAttached.store(false, std::memory_order_release); + } +} + bool Reader::isRemoteLoader() const { return _loader->baseCacheKey().has_value(); } @@ -870,24 +901,35 @@ bool Reader::processCacheResults() { } QMutexLocker lock(&_cacheHelper->mutex); - auto loaded = base::take(_cacheHelper->results); + const auto loaded = base::take(_cacheHelper->results); lock.unlock(); for (const auto &[sliceNumber, result] : loaded) { _slices.processCacheResult(sliceNumber, bytes::make_span(result)); } + if (_downloaderAttached.load(std::memory_order_acquire)) { + for (const auto &[sliceNumber, result] : loaded) { + sendPartsToDownloader(sliceNumber); + } + } return !loaded.empty(); } +void Reader::sendPartsToDownloader(int sliceNumber) { + _slices.enumerateParts(sliceNumber, [&](LoadedPart &&part) { + crl::on_main(this, [=, part = std::move(part)]() mutable { + AssertIsDebug(); // maybe send them with small timeout? + _partsForDownloader.fire(std::move(part)); + }); + }); +} + bool Reader::processLoadedParts() { if (_failed) { return false; } - QMutexLocker lock(&_loadedPartsMutex); - auto loaded = base::take(_loadedParts); - lock.unlock(); - + auto loaded = _loadedParts.take(); for (auto &part : loaded) { if (part.offset == LoadedPart::kFailedOffset || (part.bytes.size() != Loader::kPartSize diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index 605334aad..58829be74 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -9,6 +9,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "media/streaming/media_streaming_loader.h" #include "base/bytes.h" +#include "base/weak_ptr.h" +#include "base/thread_safe_queue.h" namespace Storage { namespace Cache { @@ -27,22 +29,28 @@ class Loader; struct LoadedPart; enum class Error; -class Reader final { +class Reader final : public base::has_weak_ptr { public: + // Main thread. Reader(not_null owner, std::unique_ptr loader); + // Any thread. [[nodiscard]] int size() const; + [[nodiscard]] bool isRemoteLoader() const; + + // Single thread. [[nodiscard]] bool fill( int offset, bytes::span buffer, not_null notify); [[nodiscard]] std::optional failed() const; - void headerDone(); - void stop(); - [[nodiscard]] bool isRemoteLoader() const; + // Main thread. + [[nodiscard]] rpl::producer partsForDownloader() const; + void loadForDownloader(int offset); + void cancelForDownloader(); ~Reader(); @@ -126,6 +134,10 @@ private: [[nodiscard]] FillResult fill(int offset, bytes::span buffer); [[nodiscard]] SerializedSlice unloadToCache(); + // callback(LoadedPart(..)). + template + void enumerateParts(int sliceNumber, Callback &&callback); + private: enum class HeaderMode { Unknown, @@ -160,6 +172,7 @@ private: // 0 is for headerData, slice index = sliceNumber - 1. void readFromCache(int sliceNumber); bool processCacheResults(); + void sendPartsToDownloader(int sliceNumber); void putToCache(SerializedSlice &&data); void cancelLoadInRange(int from, int till); @@ -178,13 +191,18 @@ private: const std::unique_ptr _loader; const std::shared_ptr _cacheHelper; - QMutex _loadedPartsMutex; - std::vector _loadedParts; + base::thread_safe_queue _loadedParts; std::atomic _waiting = nullptr; PriorityQueue _loadingOffsets; Slices _slices; std::optional _failed; + + std::atomic _downloaderAttached = false; + base::thread_safe_queue _downloaderOffsetRequests; + + // Main thread. + rpl::event_stream _partsForDownloader; rpl::lifetime _lifetime; }; diff --git a/Telegram/SourceFiles/media/view/media_view_overlay_widget.cpp b/Telegram/SourceFiles/media/view/media_view_overlay_widget.cpp index 818fb05fd..70573db23 100644 --- a/Telegram/SourceFiles/media/view/media_view_overlay_widget.cpp +++ b/Telegram/SourceFiles/media/view/media_view_overlay_widget.cpp @@ -1831,7 +1831,7 @@ void OverlayWidget::displayDocument(DocumentData *doc, HistoryItem *item) { } else { _doc->automaticLoad(fileOrigin(), item); - if (_doc->canBePlayed() && !_doc->loading()) { + if (_doc->canBePlayed()) { initStreaming(); } else if (_doc->isVideoFile()) { initStreamingThumbnail(); @@ -2456,19 +2456,7 @@ void OverlayWidget::validatePhotoCurrentImage() { } } -void OverlayWidget::checkLoadingWhileStreaming() { - if (_streamed && _doc->loading()) { - crl::on_main(this, [=, doc = _doc] { - if (!isHidden() && _doc == doc) { - redisplayContent(); - } - }); - } -} - void OverlayWidget::paintEvent(QPaintEvent *e) { - checkLoadingWhileStreaming(); - const auto r = e->rect(); const auto ®ion = e->region(); const auto rects = region.rects(); @@ -2961,10 +2949,10 @@ void OverlayWidget::keyPressEvent(QKeyEvent *e) { } else if (e->key() == Qt::Key_Copy || (e->key() == Qt::Key_C && ctrl)) { onCopy(); } else if (e->key() == Qt::Key_Enter || e->key() == Qt::Key_Return || e->key() == Qt::Key_Space) { - if (_doc && !_doc->loading() && (documentBubbleShown() || !_doc->loaded())) { - onDocClick(); - } else if (_streamed) { + if (_streamed) { playbackPauseResume(); + } else if (_doc && !_doc->loading() && (documentBubbleShown() || !_doc->loaded())) { + onDocClick(); } } else if (e->key() == Qt::Key_Left) { if (_controlsHideTimer.isActive()) { diff --git a/Telegram/SourceFiles/media/view/media_view_overlay_widget.h b/Telegram/SourceFiles/media/view/media_view_overlay_widget.h index 38df416af..43da370bd 100644 --- a/Telegram/SourceFiles/media/view/media_view_overlay_widget.h +++ b/Telegram/SourceFiles/media/view/media_view_overlay_widget.h @@ -229,7 +229,6 @@ private: void updateActions(); void resizeCenteredControls(); void resizeContentByScreenSize(); - void checkLoadingWhileStreaming(); void displayPhoto(not_null photo, HistoryItem *item); void displayDocument(DocumentData *document, HistoryItem *item); diff --git a/Telegram/SourceFiles/storage/file_download.cpp b/Telegram/SourceFiles/storage/file_download.cpp index 10ef27eaa..4c9a8c5eb 100644 --- a/Telegram/SourceFiles/storage/file_download.cpp +++ b/Telegram/SourceFiles/storage/file_download.cpp @@ -28,10 +28,17 @@ namespace { // How much time without download causes additional session kill. constexpr auto kKillSessionTimeout = crl::time(5000); +// Max 16 file parts downloaded at the same time, 128 KB each. +constexpr auto kMaxFileQueries = 16; + +// Max 8 http[s] files downloaded at the same time. +constexpr auto kMaxWebFileQueries = 8; + } // namespace Downloader::Downloader() -: _killDownloadSessionsTimer([=] { killDownloadSessions(); }) { +: _killDownloadSessionsTimer([=] { killDownloadSessions(); }) +, _queueForWeb(kMaxWebFileQueries) { } void Downloader::clearPriorities() { @@ -106,6 +113,18 @@ int Downloader::chooseDcIndexForRequest(MTP::DcId dcId) const { return result; } +not_null Downloader::queueForDc(MTP::DcId dcId) { + const auto i = _queuesForDc.find(dcId); + const auto result = (i != end(_queuesForDc)) + ? i + : _queuesForDc.emplace(dcId, Queue(kMaxFileQueries)).first; + return &result->second; +} + +not_null Downloader::queueForWeb() { + return &_queueForWeb; +} + Downloader::~Downloader() { killDownloadSessions(); } @@ -116,28 +135,12 @@ namespace { constexpr auto kDownloadPhotoPartSize = 64 * 1024; // 64kb for photo constexpr auto kDownloadDocumentPartSize = 128 * 1024; // 128kb for document -constexpr auto kMaxFileQueries = 16; // max 16 file parts downloaded at the same time -constexpr auto kMaxWebFileQueries = 8; // max 8 http[s] files downloaded at the same time constexpr auto kDownloadCdnPartSize = 128 * 1024; // 128kb for cdn requests } // namespace -struct FileLoaderQueue { - FileLoaderQueue(int queriesLimit) : queriesLimit(queriesLimit) { - } - int queriesCount = 0; - int queriesLimit = 0; - FileLoader *start = nullptr; - FileLoader *end = nullptr; -}; - namespace { -using LoaderQueues = QMap; -LoaderQueues queues; - -FileLoaderQueue _webQueue(kMaxWebFileQueries); - QThread *_webLoadThread = nullptr; WebLoadManager *_webLoadManager = nullptr; WebLoadManager *webLoadManager() { @@ -252,7 +255,7 @@ void FileLoader::notifyAboutProgress() { LoadNextFromQueue(queue); } -void FileLoader::LoadNextFromQueue(not_null queue) { +void FileLoader::LoadNextFromQueue(not_null queue) { if (queue->queriesCount >= queue->queriesLimit) { return; } @@ -524,6 +527,89 @@ void FileLoader::startLoading(bool loadFirst, bool prior) { loadPart(); } +int FileLoader::currentOffset() const { + return (_fileIsOpen ? _file.size() : _data.size()) - _skippedBytes; +} + +bool FileLoader::writeResultPart(int offset, bytes::const_span buffer) { + Expects(!_finished); + + if (!buffer.empty()) { + if (_fileIsOpen) { + auto fsize = _file.size(); + if (offset < fsize) { + _skippedBytes -= buffer.size(); + } else if (offset > fsize) { + _skippedBytes += offset - fsize; + } + _file.seek(offset); + if (_file.write(reinterpret_cast(buffer.data()), buffer.size()) != qint64(buffer.size())) { + cancel(true); + return false; + } + } else { + _data.reserve(offset + buffer.size()); + if (offset > _data.size()) { + _skippedBytes += offset - _data.size(); + _data.resize(offset); + } + if (offset == _data.size()) { + _data.append(reinterpret_cast(buffer.data()), buffer.size()); + } else { + _skippedBytes -= buffer.size(); + if (int64(offset + buffer.size()) > _data.size()) { + _data.resize(offset + buffer.size()); + } + const auto dst = bytes::make_detached_span(_data).subspan( + offset, + buffer.size()); + bytes::copy(dst, buffer); + } + } + } + return true; +} + +bool FileLoader::finalizeResult() { + Expects(!_finished); + + if (!_filename.isEmpty() && (_toCache == LoadToCacheAsWell)) { + if (!_fileIsOpen) { + _fileIsOpen = _file.open(QIODevice::WriteOnly); + } + if (!_fileIsOpen || _file.write(_data) != qint64(_data.size())) { + cancel(true); + return false; + } + } + + _finished = true; + if (_fileIsOpen) { + _file.close(); + _fileIsOpen = false; + Platform::File::PostprocessDownloaded( + QFileInfo(_file).absoluteFilePath()); + } + removeFromQueue(); + + if (_localStatus == LocalStatus::NotFound) { + if (const auto key = fileLocationKey()) { + Local::writeFileLocation(*key, FileLocation(_filename)); + } + if (const auto key = cacheKey()) { + if (_data.size() <= Storage::kMaxFileInMemory) { + Auth().data().cache().put( + *key, + Storage::Cache::Database::TaggedValue( + base::duplicate(_data), + _cacheTag)); + } + } + } + _downloader->taskFinished().notify(); + return true; +} + mtpFileLoader::mtpFileLoader( const StorageFileLocation &location, Data::FileOrigin origin, @@ -544,12 +630,7 @@ mtpFileLoader::mtpFileLoader( cacheTag) , _location(location) , _origin(origin) { - auto shiftedDcId = MTP::downloadDcId(dcId(), 0); - auto i = queues.find(shiftedDcId); - if (i == queues.cend()) { - i = queues.insert(shiftedDcId, FileLoaderQueue(kMaxFileQueries)); - } - _queue = &i.value(); + _queue = _downloader->queueForDc(dcId()); } mtpFileLoader::mtpFileLoader( @@ -567,12 +648,7 @@ mtpFileLoader::mtpFileLoader( autoLoading, cacheTag) , _location(location) { - auto shiftedDcId = MTP::downloadDcId(dcId(), 0); - auto i = queues.find(shiftedDcId); - if (i == queues.cend()) { - i = queues.insert(shiftedDcId, FileLoaderQueue(kMaxFileQueries)); - } - _queue = &i.value(); + _queue = _downloader->queueForDc(dcId()); } mtpFileLoader::mtpFileLoader( @@ -590,16 +666,7 @@ mtpFileLoader::mtpFileLoader( autoLoading, cacheTag) , _location(location) { - auto shiftedDcId = MTP::downloadDcId(dcId(), 0); - auto i = queues.find(shiftedDcId); - if (i == queues.cend()) { - i = queues.insert(shiftedDcId, FileLoaderQueue(kMaxFileQueries)); - } - _queue = &i.value(); -} - -int mtpFileLoader::currentOffset() const { - return (_fileIsOpen ? _file.size() : _data.size()) - _skippedBytes; + _queue = _downloader->queueForDc(dcId()); } Data::FileOrigin mtpFileLoader::fileOrigin() const { @@ -947,88 +1014,17 @@ int mtpFileLoader::finishSentRequestGetOffset(mtpRequestId requestId) { } bool mtpFileLoader::feedPart(int offset, bytes::const_span buffer) { - Expects(!_finished); - - if (!buffer.empty()) { - if (_fileIsOpen) { - auto fsize = _file.size(); - if (offset < fsize) { - _skippedBytes -= buffer.size(); - } else if (offset > fsize) { - _skippedBytes += offset - fsize; - } - _file.seek(offset); - if (_file.write(reinterpret_cast(buffer.data()), buffer.size()) != qint64(buffer.size())) { - cancel(true); - return false; - } - } else { - _data.reserve(offset + buffer.size()); - if (offset > _data.size()) { - _skippedBytes += offset - _data.size(); - _data.resize(offset); - } - if (offset == _data.size()) { - _data.append(reinterpret_cast(buffer.data()), buffer.size()); - } else { - _skippedBytes -= buffer.size(); - if (int64(offset + buffer.size()) > _data.size()) { - _data.resize(offset + buffer.size()); - } - const auto dst = bytes::make_detached_span(_data).subspan( - offset, - buffer.size()); - bytes::copy(dst, buffer); - } - } + if (!writeResultPart(offset, buffer)) { + return false; } if (buffer.empty() || (buffer.size() % 1024)) { // bad next offset _lastComplete = true; } - if (_sentRequests.empty() + const auto finished = _sentRequests.empty() && _cdnUncheckedParts.empty() - && (_lastComplete || (_size && _nextRequestOffset >= _size))) { - if (!_filename.isEmpty() && (_toCache == LoadToCacheAsWell)) { - if (!_fileIsOpen) { - _fileIsOpen = _file.open(QIODevice::WriteOnly); - } - if (!_fileIsOpen || _file.write(_data) != qint64(_data.size())) { - cancel(true); - return false; - } - } - _finished = true; - if (_fileIsOpen) { - _file.close(); - _fileIsOpen = false; - Platform::File::PostprocessDownloaded(QFileInfo(_file).absoluteFilePath()); - } - removeFromQueue(); - - if (_localStatus == LocalStatus::NotFound) { - if (_locationType != UnknownFileLocation - && !_filename.isEmpty()) { - Local::writeFileLocation( - mediaKey(_locationType, dcId(), objId()), - FileLocation(_filename)); - } - if (_location.is() - || _locationType == UnknownFileLocation - || _toCache == LoadToCacheAsWell) { - if (const auto key = cacheKey()) { - if (_data.size() <= Storage::kMaxFileInMemory) { - Auth().data().cache().put( - *key, - Storage::Cache::Database::TaggedValue( - base::duplicate(_data), - _cacheTag)); - } - } - } - } - } - if (_finished) { - _downloader->taskFinished().notify(); + && (_lastComplete || (_size && _nextRequestOffset >= _size)); + if (finished && !finalizeResult()) { + return false; } return true; } @@ -1177,6 +1173,13 @@ std::optional mtpFileLoader::cacheKey() const { }); } +std::optional mtpFileLoader::fileLocationKey() const { + if (_locationType != UnknownFileLocation && !_filename.isEmpty()) { + return mediaKey(_locationType, dcId(), objId()); + } + return std::nullopt; +} + mtpFileLoader::~mtpFileLoader() { cancelRequests(); } @@ -1198,11 +1201,15 @@ webFileLoader::webFileLoader( , _url(url) , _requestSent(false) , _already(0) { - _queue = &_webQueue; + _queue = _downloader->queueForWeb(); } bool webFileLoader::loadPart() { - if (_finished || _requestSent || _webLoadManager == FinishedWebLoadManager) return false; + if (_finished + || _requestSent + || _webLoadManager == FinishedWebLoadManager) { + return false; + } if (!_webLoadManager) { _webLoadMainManager = new WebLoadMainManager(); @@ -1221,55 +1228,21 @@ int webFileLoader::currentOffset() const { return _already; } -void webFileLoader::onProgress(qint64 already, qint64 size) { +void webFileLoader::loadProgress(qint64 already, qint64 size) { _size = size; _already = already; - - emit progress(this); -} - -void webFileLoader::onFinished(const QByteArray &data) { - if (_fileIsOpen) { - if (_file.write(data.constData(), data.size()) != qint64(data.size())) { - return cancel(true); - } - } else { - _data = data; - } - if (!_filename.isEmpty() && (_toCache == LoadToCacheAsWell)) { - if (!_fileIsOpen) _fileIsOpen = _file.open(QIODevice::WriteOnly); - if (!_fileIsOpen) { - return cancel(true); - } - if (_file.write(_data) != qint64(_data.size())) { - return cancel(true); - } - } - _finished = true; - if (_fileIsOpen) { - _file.close(); - _fileIsOpen = false; - Platform::File::PostprocessDownloaded(QFileInfo(_file).absoluteFilePath()); - } - removeFromQueue(); - - if (_localStatus == LocalStatus::NotFound) { - if (const auto key = cacheKey()) { - if (_data.size() <= Storage::kMaxFileInMemory) { - Auth().data().cache().put( - *key, - Storage::Cache::Database::TaggedValue( - base::duplicate(_data), - _cacheTag)); - } - } - } - _downloader->taskFinished().notify(); - notifyAboutProgress(); } -void webFileLoader::onError() { +void webFileLoader::loadFinished(const QByteArray &data) { + if (writeResultPart(0, bytes::make_span(data))) { + if (finalizeResult()) { + notifyAboutProgress(); + } + } +} + +void webFileLoader::loadError() { cancel(true); } @@ -1277,6 +1250,10 @@ std::optional webFileLoader::cacheKey() const { return Data::UrlCacheKey(_url); } +std::optional webFileLoader::fileLocationKey() const { + return std::nullopt; +} + void webFileLoader::cancelRequests() { if (!webLoadManager()) return; webLoadManager()->stop(this); @@ -1621,18 +1598,18 @@ WebLoadManager::~WebLoadManager() { void WebLoadMainManager::progress(webFileLoader *loader, qint64 already, qint64 size) { if (webLoadManager() && webLoadManager()->carries(loader)) { - loader->onProgress(already, size); + loader->loadProgress(already, size); } } void WebLoadMainManager::finished(webFileLoader *loader, QByteArray data) { if (webLoadManager() && webLoadManager()->carries(loader)) { - loader->onFinished(data); + loader->loadFinished(data); } } void WebLoadMainManager::error(webFileLoader *loader) { if (webLoadManager() && webLoadManager()->carries(loader)) { - loader->onError(); + loader->loadError(); } } diff --git a/Telegram/SourceFiles/storage/file_download.h b/Telegram/SourceFiles/storage/file_download.h index 9a1c792f2..4277b7ae9 100644 --- a/Telegram/SourceFiles/storage/file_download.h +++ b/Telegram/SourceFiles/storage/file_download.h @@ -26,7 +26,17 @@ constexpr auto kMaxWallPaperDimension = 4096; // 4096x4096 is max area. class Downloader final { public: + struct Queue { + Queue(int queriesLimit) : queriesLimit(queriesLimit) { + } + int queriesCount = 0; + int queriesLimit = 0; + FileLoader *start = nullptr; + FileLoader *end = nullptr; + }; + Downloader(); + ~Downloader(); int currentPriority() const { return _priority; @@ -40,7 +50,8 @@ public: void requestedAmountIncrement(MTP::DcId dcId, int index, int amount); int chooseDcIndexForRequest(MTP::DcId dcId) const; - ~Downloader(); + not_null queueForDc(MTP::DcId dcId); + not_null queueForWeb(); private: void killDownloadSessionsStart(MTP::DcId dcId); @@ -56,6 +67,9 @@ private: base::flat_map _killDownloadSessionTimes; base::Timer _killDownloadSessionsTimer; + std::map _queuesForDc; + Queue _queueForWeb; + }; } // namespace Storage @@ -72,7 +86,6 @@ struct StorageImageSaved { class mtpFileLoader; class webFileLoader; -struct FileLoaderQueue; class FileLoader : public QObject { Q_OBJECT @@ -106,7 +119,7 @@ public: } virtual Data::FileOrigin fileOrigin() const; float64 currentProgress() const; - virtual int currentOffset() const = 0; + virtual int currentOffset() const; int fullSize() const; bool setFileName(const QString &filename); // set filename for loaders to cache @@ -146,6 +159,8 @@ signals: void failed(FileLoader *loader, bool started); protected: + using Queue = Storage::Downloader::Queue; + enum class LocalStatus { NotTried, NotFound, @@ -158,6 +173,7 @@ protected: bool tryLoadLocal(); void loadLocal(const Storage::Cache::Key &key); virtual std::optional cacheKey() const = 0; + virtual std::optional fileLocationKey() const = 0; virtual void cancelRequests() = 0; void startLoading(bool loadFirst, bool prior); @@ -165,14 +181,17 @@ protected: void cancel(bool failed); void notifyAboutProgress(); - static void LoadNextFromQueue(not_null queue); + static void LoadNextFromQueue(not_null queue); virtual bool loadPart() = 0; + bool writeResultPart(int offset, bytes::const_span buffer); + bool finalizeResult(); + not_null _downloader; FileLoader *_prev = nullptr; FileLoader *_next = nullptr; int _priority = 0; - FileLoaderQueue *_queue = nullptr; + Queue *_queue = nullptr; bool _paused = false; bool _autoLoading = false; @@ -192,6 +211,7 @@ protected: QByteArray _data; int _size = 0; + int _skippedBytes = 0; LocationType _locationType = LocationType(); base::binary_guard _localLoading; @@ -227,7 +247,6 @@ public: bool autoLoading, uint8 cacheTag); - int currentOffset() const override; Data::FileOrigin fileOrigin() const override; uint64 objId() const override; @@ -255,6 +274,7 @@ private: QByteArray hash; }; std::optional cacheKey() const override; + std::optional fileLocationKey() const override; void cancelRequests() override; MTP::DcId dcId() const; @@ -270,8 +290,8 @@ private: void requestMoreCdnFileHashes(); void getCdnFileHashesDone(const MTPVector &result, mtpRequestId requestId); - bool feedPart(int offset, bytes::const_span buffer); void partLoaded(int offset, bytes::const_span buffer); + bool feedPart(int offset, bytes::const_span buffer); bool partFailed(const RPCError &error, mtpRequestId requestId); bool normalPartFailed(QByteArray fileReference, const RPCError &error, mtpRequestId requestId); @@ -294,7 +314,6 @@ private: std::map _sentRequests; bool _lastComplete = false; - int32 _skippedBytes = 0; int32 _nextRequestOffset = 0; base::variant< @@ -327,9 +346,9 @@ public: int currentOffset() const override; - void onProgress(qint64 already, qint64 size); - void onFinished(const QByteArray &data); - void onError(); + void loadProgress(qint64 already, qint64 size); + void loadFinished(const QByteArray &data); + void loadError(); void stop() override { cancelRequests(); @@ -340,6 +359,7 @@ public: protected: void cancelRequests() override; std::optional cacheKey() const override; + std::optional fileLocationKey() const override; bool loadPart() override; QString _url; diff --git a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp index f5d4024ed..f593e1f08 100644 --- a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp +++ b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp @@ -13,15 +13,18 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL namespace Storage { namespace { -constexpr auto kPartSize = Media::Streaming::Loader::kPartSize; +using namespace Media::Streaming; + +constexpr auto kPartSize = Loader::kPartSize; } // namespace StreamedFileDownloader::StreamedFileDownloader( uint64 objectId, + MTP::DcId dcId, Data::FileOrigin origin, std::optional cacheKey, - std::shared_ptr reader, + std::shared_ptr reader, // For FileLoader const QString &toFile, @@ -44,6 +47,17 @@ StreamedFileDownloader::StreamedFileDownloader( , _cacheKey(cacheKey) , _reader(std::move(reader)) { _partIsSaved.resize((size + kPartSize - 1) / kPartSize, false); + + _reader->partsForDownloader( + ) | rpl::start_with_next([=](const LoadedPart &part) { + if (part.offset == LoadedPart::kFailedOffset) { + cancel(true); + } else { + savePart(std::move(part)); + } + }, _lifetime); + + _queue = _downloader->queueForDc(dcId); } StreamedFileDownloader::~StreamedFileDownloader() { @@ -58,10 +72,6 @@ Data::FileOrigin StreamedFileDownloader::fileOrigin() const { return _origin; } -int StreamedFileDownloader::currentOffset() const { - return 0; -} - void StreamedFileDownloader::stop() { cancelRequests(); } @@ -70,11 +80,77 @@ std::optional StreamedFileDownloader::cacheKey() const { return _cacheKey; } +std::optional StreamedFileDownloader::fileLocationKey() const { + return std::nullopt; AssertIsDebug(); +} + void StreamedFileDownloader::cancelRequests() { + const auto requests = std::count( + begin(_partIsSaved), + begin(_partIsSaved) + _nextPartIndex, + false); + _queue->queriesCount -= requests; + _nextPartIndex = 0; + + _reader->cancelForDownloader(); } bool StreamedFileDownloader::loadPart() { - return false; + if (_finished || _nextPartIndex >= size(_partIsSaved)) { + return false; + } + const auto index = std::find( + begin(_partIsSaved) + _nextPartIndex, + end(_partIsSaved), + false + ) - begin(_partIsSaved); + if (index == size(_partIsSaved)) { + _nextPartIndex = index; + return false; + } + _nextPartIndex = index + 1; + _reader->loadForDownloader(index); + AssertIsDebug(); + //_downloader->requestedAmountIncrement( + // requestData.dcId, + // requestData.dcIndex, + // kPartSize); + ++_queue->queriesCount; + + return true; +} + +void StreamedFileDownloader::savePart(const LoadedPart &part) { + Expects(part.offset >= 0 && part.offset < _reader->size()); + Expects(part.offset % kPartSize == 0); + if (_finished || _cancelled) { + return; + } + + const auto index = part.offset / kPartSize; + Assert(index >= 0 && index < _partIsSaved.size()); + if (_partIsSaved[index]) { + return; + } + _partIsSaved[index] = true; + + if (index < _nextPartIndex) { + AssertIsDebug(); + //_downloader->requestedAmountIncrement( + // requestData.dcId, + // requestData.dcIndex, + // -kPartSize); + --_queue->queriesCount; + } + if (!writeResultPart(part.offset, bytes::make_span(part.bytes))) { + return; + } + if (ranges::find(_partIsSaved, false) == end(_partIsSaved)) { + if (!finalizeResult()) { + return; + } + } + notifyAboutProgress(); } } // namespace Storage diff --git a/Telegram/SourceFiles/storage/streamed_file_downloader.h b/Telegram/SourceFiles/storage/streamed_file_downloader.h index 9e5d1f1cf..143154897 100644 --- a/Telegram/SourceFiles/storage/streamed_file_downloader.h +++ b/Telegram/SourceFiles/storage/streamed_file_downloader.h @@ -13,6 +13,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL namespace Media { namespace Streaming { class Reader; +struct LoadedPart; } // namespace Streaming } // namespace Media @@ -22,6 +23,7 @@ class StreamedFileDownloader final : public FileLoader { public: StreamedFileDownloader( uint64 objectId, + MTP::DcId dcId, Data::FileOrigin origin, std::optional cacheKey, std::shared_ptr reader, @@ -38,15 +40,16 @@ public: uint64 objId() const override; Data::FileOrigin fileOrigin() const override; - int currentOffset() const override; void stop() override; private: std::optional cacheKey() const override; + std::optional fileLocationKey() const override; void cancelRequests() override; bool loadPart() override; -private: + void savePart(const Media::Streaming::LoadedPart &part); + uint64 _objectId = 0; Data::FileOrigin _origin; std::optional _cacheKey; @@ -55,6 +58,8 @@ private: std::vector _partIsSaved; // vector :D int _nextPartIndex = 0; + rpl::lifetime _lifetime; + }; } // namespace Storage