Support priorities in download tasks.

This commit is contained in:
John Preston 2019-12-23 12:37:03 +03:00
parent 8ae6156477
commit fa4d8f2fbd
4 changed files with 79 additions and 42 deletions

View File

@ -51,11 +51,17 @@ void LoaderMtproto::load(int offset) {
if (haveSentRequestForOffset(offset)) {
return;
} else if (_requested.add(offset)) {
addToQueue(); // #TODO download priority
addToQueueWithPriority();
}
});
}
void LoaderMtproto::addToQueueWithPriority() {
addToQueue([&] {
return 1;
}());
}
void LoaderMtproto::stop() {
crl::on_main(this, [=] {
cancelAllRequests();
@ -73,7 +79,7 @@ void LoaderMtproto::cancel(int offset) {
void LoaderMtproto::cancelForOffset(int offset) {
if (haveSentRequestForOffset(offset)) {
cancelRequestForOffset(offset);
addToQueue(); // #TODO download priority
addToQueueWithPriority();
} else {
_requested.remove(offset);
}

View File

@ -46,6 +46,7 @@ private:
void cancelOnFail() override;
void cancelForOffset(int offset);
void addToQueueWithPriority();
const int _size = 0;

View File

@ -38,50 +38,72 @@ constexpr auto kBadRequestDurationThreshold = 8 * crl::time(1000);
} // namespace
void DownloadManagerMtproto::Queue::enqueue(not_null<Task*> task) {
const auto i = ranges::find(_tasks, task);
if (i != end(_tasks)) {
return;
void DownloadManagerMtproto::Queue::enqueue(
not_null<Task*> task,
int priority) {
const auto position = ranges::find_if(_tasks, [&](const Enqueued &task) {
return task.priority <= priority;
}) - begin(_tasks);
const auto now = ranges::find(_tasks, task, &Enqueued::task);
const auto i = [&] {
if (now != end(_tasks)) {
(now->priority = priority);
return now;
}
_tasks.push_back({ task, priority });
return end(_tasks) - 1;
}();
const auto j = begin(_tasks) + position;
if (j < i) {
std::rotate(j, i, i + 1);
} else if (j > i + 1) {
std::rotate(i, i + 1, j);
}
_tasks.push_back(task);
_previousGeneration.erase(
ranges::remove(_previousGeneration, task),
end(_previousGeneration));
}
void DownloadManagerMtproto::Queue::remove(not_null<Task*> task) {
_tasks.erase(ranges::remove(_tasks, task), end(_tasks));
_previousGeneration.erase(
ranges::remove(_previousGeneration, task),
end(_previousGeneration));
_tasks.erase(ranges::remove(_tasks, task, &Enqueued::task), end(_tasks));
}
void DownloadManagerMtproto::Queue::resetGeneration() {
if (!_previousGeneration.empty()) {
_tasks.reserve(_tasks.size() + _previousGeneration.size());
std::copy(
begin(_previousGeneration),
end(_previousGeneration),
std::back_inserter(_tasks));
_previousGeneration.clear();
const auto from = ranges::find(_tasks, 0, &Enqueued::priority);
for (auto &task : ranges::make_subrange(from, end(_tasks))) {
if (task.priority) {
Assert(task.priority == -1);
break;
}
task.priority = -1;
}
std::swap(_tasks, _previousGeneration);
}
bool DownloadManagerMtproto::Queue::empty() const {
return _tasks.empty() && _previousGeneration.empty();
return _tasks.empty();
}
auto DownloadManagerMtproto::Queue::nextTask() const -> Task* {
auto &&all = ranges::view::concat(_tasks, _previousGeneration);
const auto i = ranges::find(all, true, &Task::readyToRequest);
return (i != all.end()) ? i->get() : nullptr;
auto DownloadManagerMtproto::Queue::nextTask(bool onlyHighestPriority) const
-> Task* {
if (_tasks.empty()) {
return nullptr;
}
const auto highestPriority = _tasks.front().priority;
const auto notHighestPriority = [&](const Enqueued &enqueued) {
return (enqueued.priority != highestPriority);
};
const auto till = (onlyHighestPriority && highestPriority > 0)
? ranges::find_if(_tasks, notHighestPriority)
: end(_tasks);
const auto readyToRequest = [&](const Enqueued &enqueued) {
return enqueued.task->readyToRequest();
};
const auto first = ranges::find_if(
ranges::make_subrange(begin(_tasks), till),
readyToRequest);
return (first != till) ? first->task.get() : nullptr;
}
void DownloadManagerMtproto::Queue::removeSession(int index) {
auto &&all = ranges::view::concat(_tasks, _previousGeneration);
for (const auto task : all) {
task->removeSession(index);
for (const auto &enqueued : _tasks) {
enqueued.task->removeSession(index);
}
}
@ -111,10 +133,10 @@ DownloadManagerMtproto::~DownloadManagerMtproto() {
killSessions();
}
void DownloadManagerMtproto::enqueue(not_null<Task*> task) {
void DownloadManagerMtproto::enqueue(not_null<Task*> task, int priority) {
const auto dcId = task->dcId();
auto &queue = _queues[dcId];
queue.enqueue(task);
queue.enqueue(task, priority);
if (!_resetGenerationTimer.isActive()) {
_resetGenerationTimer.callOnce(kResetDownloadPrioritiesTimeout);
}
@ -150,8 +172,9 @@ void DownloadManagerMtproto::checkSendNext(MTP::DcId dcId, Queue &queue) {
}
bool DownloadManagerMtproto::trySendNextPart(MTP::DcId dcId, Queue &queue) {
auto &balanceData = _balanceData[dcId];
const auto &sessions = balanceData.sessions;
const auto bestIndex = [&] {
const auto &sessions = _balanceData[dcId].sessions;
const auto proj = [](const DcSessionBalanceData &data) {
return (data.requested < data.maxWaitedAmount)
? data.requested
@ -165,7 +188,8 @@ bool DownloadManagerMtproto::trySendNextPart(MTP::DcId dcId, Queue &queue) {
if (bestIndex < 0) {
return false;
}
if (const auto task = queue.nextTask()) {
const auto onlyHighestPriority = (balanceData.totalRequested > 0);
if (const auto task = queue.nextTask(onlyHighestPriority)) {
task->loadPart(bestIndex);
return true;
}
@ -180,6 +204,7 @@ int DownloadManagerMtproto::changeRequestedAmount(
Assert(i != _balanceData.end());
Assert(index < i->second.sessions.size());
const auto result = (i->second.sessions[index].requested += delta);
i->second.totalRequested += delta;
const auto findNonEmptySession = [](const DcBalanceData &data) {
using namespace rpl::mappers;
return ranges::find_if(
@ -370,6 +395,7 @@ void DownloadManagerMtproto::killSessions(MTP::DcId dcId) {
const auto i = _balanceData.find(dcId);
if (i != end(_balanceData)) {
auto &dc = i->second;
Assert(dc.totalRequested == 0);
auto sessions = base::take(dc.sessions);
dc = DcBalanceData();
for (auto j = 0; j != int(sessions.size()); ++j) {
@ -807,8 +833,8 @@ void DownloadMtprotoTask::cancelRequest(mtpRequestId requestId) {
}
}
void DownloadMtprotoTask::addToQueue() {
_owner->enqueue(this);
void DownloadMtprotoTask::addToQueue(int priority) {
_owner->enqueue(this, priority);
}
void DownloadMtprotoTask::removeFromQueue() {

View File

@ -35,7 +35,7 @@ public:
return *_api;
}
void enqueue(not_null<Task*> task);
void enqueue(not_null<Task*> task, int priority);
void remove(not_null<Task*> task);
[[nodiscard]] base::Observable<void> &taskFinished() {
@ -53,16 +53,19 @@ public:
private:
class Queue final {
public:
void enqueue(not_null<Task*> task);
void enqueue(not_null<Task*> task, int priority);
void remove(not_null<Task*> task);
void resetGeneration();
[[nodiscard]] bool empty() const;
[[nodiscard]] Task *nextTask() const;
[[nodiscard]] Task *nextTask(bool onlyHighestPriority) const;
void removeSession(int index);
private:
std::vector<not_null<Task*>> _tasks;
std::vector<not_null<Task*>> _previousGeneration;
struct Enqueued {
not_null<Task*> task;
int priority = 0;
};
std::vector<Enqueued> _tasks;
};
struct DcSessionBalanceData {
@ -80,6 +83,7 @@ private:
int sessionRemoveIndex = 0;
int sessionRemoveTimes = 0;
int timeouts = 0; // Since all sessions had successes >= required.
int totalRequested = 0;
};
void checkSendNext();
@ -149,7 +153,7 @@ protected:
void cancelAllRequests();
void cancelRequestForOffset(int offset);
void addToQueue();
void addToQueue(int priority = 0);
void removeFromQueue();
[[nodiscard]] ApiWrap &api() const {