From fa4d8f2fbdf1df23a6927aa39fafed4589330a52 Mon Sep 17 00:00:00 2001 From: John Preston Date: Mon, 23 Dec 2019 12:37:03 +0300 Subject: [PATCH] Support priorities in download tasks. --- .../media_streaming_loader_mtproto.cpp | 10 +- .../media_streaming_loader_mtproto.h | 1 + .../storage/download_manager_mtproto.cpp | 94 ++++++++++++------- .../storage/download_manager_mtproto.h | 16 ++-- 4 files changed, 79 insertions(+), 42 deletions(-) diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp index e54f73520..1b49998c2 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp @@ -51,11 +51,17 @@ void LoaderMtproto::load(int offset) { if (haveSentRequestForOffset(offset)) { return; } else if (_requested.add(offset)) { - addToQueue(); // #TODO download priority + addToQueueWithPriority(); } }); } +void LoaderMtproto::addToQueueWithPriority() { + addToQueue([&] { + return 1; + }()); +} + void LoaderMtproto::stop() { crl::on_main(this, [=] { cancelAllRequests(); @@ -73,7 +79,7 @@ void LoaderMtproto::cancel(int offset) { void LoaderMtproto::cancelForOffset(int offset) { if (haveSentRequestForOffset(offset)) { cancelRequestForOffset(offset); - addToQueue(); // #TODO download priority + addToQueueWithPriority(); } else { _requested.remove(offset); } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h index 380890d42..1b301e810 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h @@ -46,6 +46,7 @@ private: void cancelOnFail() override; void cancelForOffset(int offset); + void addToQueueWithPriority(); const int _size = 0; diff --git a/Telegram/SourceFiles/storage/download_manager_mtproto.cpp b/Telegram/SourceFiles/storage/download_manager_mtproto.cpp index e261401a8..9d6f25be4 100644 --- a/Telegram/SourceFiles/storage/download_manager_mtproto.cpp +++ b/Telegram/SourceFiles/storage/download_manager_mtproto.cpp @@ -38,50 +38,72 @@ constexpr auto kBadRequestDurationThreshold = 8 * crl::time(1000); } // namespace -void DownloadManagerMtproto::Queue::enqueue(not_null task) { - const auto i = ranges::find(_tasks, task); - if (i != end(_tasks)) { - return; +void DownloadManagerMtproto::Queue::enqueue( + not_null task, + int priority) { + const auto position = ranges::find_if(_tasks, [&](const Enqueued &task) { + return task.priority <= priority; + }) - begin(_tasks); + const auto now = ranges::find(_tasks, task, &Enqueued::task); + const auto i = [&] { + if (now != end(_tasks)) { + (now->priority = priority); + return now; + } + _tasks.push_back({ task, priority }); + return end(_tasks) - 1; + }(); + const auto j = begin(_tasks) + position; + if (j < i) { + std::rotate(j, i, i + 1); + } else if (j > i + 1) { + std::rotate(i, i + 1, j); } - _tasks.push_back(task); - _previousGeneration.erase( - ranges::remove(_previousGeneration, task), - end(_previousGeneration)); } void DownloadManagerMtproto::Queue::remove(not_null task) { - _tasks.erase(ranges::remove(_tasks, task), end(_tasks)); - _previousGeneration.erase( - ranges::remove(_previousGeneration, task), - end(_previousGeneration)); + _tasks.erase(ranges::remove(_tasks, task, &Enqueued::task), end(_tasks)); } void DownloadManagerMtproto::Queue::resetGeneration() { - if (!_previousGeneration.empty()) { - _tasks.reserve(_tasks.size() + _previousGeneration.size()); - std::copy( - begin(_previousGeneration), - end(_previousGeneration), - std::back_inserter(_tasks)); - _previousGeneration.clear(); + const auto from = ranges::find(_tasks, 0, &Enqueued::priority); + for (auto &task : ranges::make_subrange(from, end(_tasks))) { + if (task.priority) { + Assert(task.priority == -1); + break; + } + task.priority = -1; } - std::swap(_tasks, _previousGeneration); } bool DownloadManagerMtproto::Queue::empty() const { - return _tasks.empty() && _previousGeneration.empty(); + return _tasks.empty(); } -auto DownloadManagerMtproto::Queue::nextTask() const -> Task* { - auto &&all = ranges::view::concat(_tasks, _previousGeneration); - const auto i = ranges::find(all, true, &Task::readyToRequest); - return (i != all.end()) ? i->get() : nullptr; +auto DownloadManagerMtproto::Queue::nextTask(bool onlyHighestPriority) const +-> Task* { + if (_tasks.empty()) { + return nullptr; + } + const auto highestPriority = _tasks.front().priority; + const auto notHighestPriority = [&](const Enqueued &enqueued) { + return (enqueued.priority != highestPriority); + }; + const auto till = (onlyHighestPriority && highestPriority > 0) + ? ranges::find_if(_tasks, notHighestPriority) + : end(_tasks); + const auto readyToRequest = [&](const Enqueued &enqueued) { + return enqueued.task->readyToRequest(); + }; + const auto first = ranges::find_if( + ranges::make_subrange(begin(_tasks), till), + readyToRequest); + return (first != till) ? first->task.get() : nullptr; } void DownloadManagerMtproto::Queue::removeSession(int index) { - auto &&all = ranges::view::concat(_tasks, _previousGeneration); - for (const auto task : all) { - task->removeSession(index); + for (const auto &enqueued : _tasks) { + enqueued.task->removeSession(index); } } @@ -111,10 +133,10 @@ DownloadManagerMtproto::~DownloadManagerMtproto() { killSessions(); } -void DownloadManagerMtproto::enqueue(not_null task) { +void DownloadManagerMtproto::enqueue(not_null task, int priority) { const auto dcId = task->dcId(); auto &queue = _queues[dcId]; - queue.enqueue(task); + queue.enqueue(task, priority); if (!_resetGenerationTimer.isActive()) { _resetGenerationTimer.callOnce(kResetDownloadPrioritiesTimeout); } @@ -150,8 +172,9 @@ void DownloadManagerMtproto::checkSendNext(MTP::DcId dcId, Queue &queue) { } bool DownloadManagerMtproto::trySendNextPart(MTP::DcId dcId, Queue &queue) { + auto &balanceData = _balanceData[dcId]; + const auto &sessions = balanceData.sessions; const auto bestIndex = [&] { - const auto &sessions = _balanceData[dcId].sessions; const auto proj = [](const DcSessionBalanceData &data) { return (data.requested < data.maxWaitedAmount) ? data.requested @@ -165,7 +188,8 @@ bool DownloadManagerMtproto::trySendNextPart(MTP::DcId dcId, Queue &queue) { if (bestIndex < 0) { return false; } - if (const auto task = queue.nextTask()) { + const auto onlyHighestPriority = (balanceData.totalRequested > 0); + if (const auto task = queue.nextTask(onlyHighestPriority)) { task->loadPart(bestIndex); return true; } @@ -180,6 +204,7 @@ int DownloadManagerMtproto::changeRequestedAmount( Assert(i != _balanceData.end()); Assert(index < i->second.sessions.size()); const auto result = (i->second.sessions[index].requested += delta); + i->second.totalRequested += delta; const auto findNonEmptySession = [](const DcBalanceData &data) { using namespace rpl::mappers; return ranges::find_if( @@ -370,6 +395,7 @@ void DownloadManagerMtproto::killSessions(MTP::DcId dcId) { const auto i = _balanceData.find(dcId); if (i != end(_balanceData)) { auto &dc = i->second; + Assert(dc.totalRequested == 0); auto sessions = base::take(dc.sessions); dc = DcBalanceData(); for (auto j = 0; j != int(sessions.size()); ++j) { @@ -807,8 +833,8 @@ void DownloadMtprotoTask::cancelRequest(mtpRequestId requestId) { } } -void DownloadMtprotoTask::addToQueue() { - _owner->enqueue(this); +void DownloadMtprotoTask::addToQueue(int priority) { + _owner->enqueue(this, priority); } void DownloadMtprotoTask::removeFromQueue() { diff --git a/Telegram/SourceFiles/storage/download_manager_mtproto.h b/Telegram/SourceFiles/storage/download_manager_mtproto.h index 2a7a0bff8..de342fd68 100644 --- a/Telegram/SourceFiles/storage/download_manager_mtproto.h +++ b/Telegram/SourceFiles/storage/download_manager_mtproto.h @@ -35,7 +35,7 @@ public: return *_api; } - void enqueue(not_null task); + void enqueue(not_null task, int priority); void remove(not_null task); [[nodiscard]] base::Observable &taskFinished() { @@ -53,16 +53,19 @@ public: private: class Queue final { public: - void enqueue(not_null task); + void enqueue(not_null task, int priority); void remove(not_null task); void resetGeneration(); [[nodiscard]] bool empty() const; - [[nodiscard]] Task *nextTask() const; + [[nodiscard]] Task *nextTask(bool onlyHighestPriority) const; void removeSession(int index); private: - std::vector> _tasks; - std::vector> _previousGeneration; + struct Enqueued { + not_null task; + int priority = 0; + }; + std::vector _tasks; }; struct DcSessionBalanceData { @@ -80,6 +83,7 @@ private: int sessionRemoveIndex = 0; int sessionRemoveTimes = 0; int timeouts = 0; // Since all sessions had successes >= required. + int totalRequested = 0; }; void checkSendNext(); @@ -149,7 +153,7 @@ protected: void cancelAllRequests(); void cancelRequestForOffset(int offset); - void addToQueue(); + void addToQueue(int priority = 0); void removeFromQueue(); [[nodiscard]] ApiWrap &api() const {