Load streaming parts using Storage::DownloadManager.

This commit is contained in:
John Preston 2019-12-04 10:42:55 +03:00
parent 3ae2986c25
commit f522cc9444
8 changed files with 82 additions and 52 deletions

View File

@ -50,6 +50,10 @@ bool PriorityQueue::remove(int value) {
return true; return true;
} }
bool PriorityQueue::empty() const {
return _data.empty();
}
std::optional<int> PriorityQueue::front() const { std::optional<int> PriorityQueue::front() const {
return _data.empty() return _data.empty()
? std::nullopt ? std::nullopt

View File

@ -52,9 +52,10 @@ public:
bool add(int value); bool add(int value);
bool remove(int value); bool remove(int value);
void increasePriority(); void increasePriority();
std::optional<int> front() const; [[nodiscard]] bool empty() const;
std::optional<int> take(); [[nodiscard]] std::optional<int> front() const;
base::flat_set<int> takeInRange(int from, int till); [[nodiscard]] std::optional<int> take();
[[nodiscard]] base::flat_set<int> takeInRange(int from, int till);
void clear(); void clear();
private: private:

View File

@ -37,6 +37,7 @@ LoaderMtproto::~LoaderMtproto() {
for (const auto [index, amount] : _amountByDcIndex) { for (const auto [index, amount] : _amountByDcIndex) {
changeRequestedAmount(index, -amount); changeRequestedAmount(index, -amount);
} }
_owner->remove(this);
} }
std::optional<Storage::Cache::Key> LoaderMtproto::baseCacheKey() const { std::optional<Storage::Cache::Key> LoaderMtproto::baseCacheKey() const {
@ -60,7 +61,7 @@ void LoaderMtproto::load(int offset) {
if (_requests.contains(offset)) { if (_requests.contains(offset)) {
return; return;
} else if (_requested.add(offset)) { } else if (_requested.add(offset)) {
sendNext(); _owner->enqueue(this); // #TODO download priority
} }
}); });
} }
@ -72,6 +73,7 @@ void LoaderMtproto::stop() {
_api.requestCanceller(), _api.requestCanceller(),
&base::flat_map<int, mtpRequestId>::value_type::second); &base::flat_map<int, mtpRequestId>::value_type::second);
_requested.clear(); _requested.clear();
_owner->remove(this);
}); });
} }
@ -84,7 +86,7 @@ void LoaderMtproto::cancel(int offset) {
void LoaderMtproto::cancelForOffset(int offset) { void LoaderMtproto::cancelForOffset(int offset) {
if (const auto requestId = _requests.take(offset)) { if (const auto requestId = _requests.take(offset)) {
_api.request(*requestId).cancel(); _api.request(*requestId).cancel();
sendNext(); _owner->enqueue(this);
} else { } else {
_requested.remove(offset); _requested.remove(offset);
} }
@ -110,17 +112,21 @@ void LoaderMtproto::changeRequestedAmount(int index, int amount) {
_amountByDcIndex[index] += amount; _amountByDcIndex[index] += amount;
} }
void LoaderMtproto::sendNext() { MTP::DcId LoaderMtproto::dcId() const {
if (_requests.size() >= kMaxConcurrentRequests) { return _dcId;
return; }
}
bool LoaderMtproto::readyToRequest() const {
return !_requested.empty();
}
void LoaderMtproto::loadPart(int dcIndex) {
const auto offset = _requested.take().value_or(-1); const auto offset = _requested.take().value_or(-1);
if (offset < 0) { if (offset < 0) {
return; return;
} }
const auto index = _owner->chooseDcIndexForRequest(_dcId); changeRequestedAmount(dcIndex, kPartSize);
changeRequestedAmount(index, kPartSize);
const auto usedFileReference = _location.fileReference(); const auto usedFileReference = _location.fileReference();
const auto id = _api.request(MTPupload_GetFile( const auto id = _api.request(MTPupload_GetFile(
@ -129,23 +135,21 @@ void LoaderMtproto::sendNext() {
MTP_int(offset), MTP_int(offset),
MTP_int(kPartSize) MTP_int(kPartSize)
)).done([=](const MTPupload_File &result) { )).done([=](const MTPupload_File &result) {
changeRequestedAmount(index, -kPartSize); changeRequestedAmount(dcIndex, -kPartSize);
requestDone(offset, result); requestDone(offset, result);
}).fail([=](const RPCError &error) { }).fail([=](const RPCError &error) {
changeRequestedAmount(index, -kPartSize); changeRequestedAmount(dcIndex, -kPartSize);
requestFailed(offset, error, usedFileReference); requestFailed(offset, error, usedFileReference);
}).toDC( }).toDC(
MTP::downloadDcId(_dcId, index) MTP::downloadDcId(_dcId, dcIndex)
).send(); ).send();
_requests.emplace(offset, id); _requests.emplace(offset, id);
sendNext();
} }
void LoaderMtproto::requestDone(int offset, const MTPupload_File &result) { void LoaderMtproto::requestDone(int offset, const MTPupload_File &result) {
result.match([&](const MTPDupload_file &data) { result.match([&](const MTPDupload_file &data) {
_requests.erase(offset); _requests.erase(offset);
sendNext(); _owner->enqueue(this);
_parts.fire({ offset, data.vbytes().v }); _parts.fire({ offset, data.vbytes().v });
}, [&](const MTPDupload_fileCdnRedirect &data) { }, [&](const MTPDupload_fileCdnRedirect &data) {
changeCdnParams( changeCdnParams(
@ -189,7 +193,7 @@ void LoaderMtproto::requestFailed(
return; return;
} else { } else {
_requested.add(offset); _requested.add(offset);
sendNext(); _owner->enqueue(this);
} }
}; };
_owner->api().refreshFileReference( _owner->api().refreshFileReference(

View File

@ -10,15 +10,15 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "media/streaming/media_streaming_loader.h" #include "media/streaming/media_streaming_loader.h"
#include "mtproto/sender.h" #include "mtproto/sender.h"
#include "data/data_file_origin.h" #include "data/data_file_origin.h"
#include "storage/file_download.h"
namespace Storage {
class DownloadManager;
} // namespace Storage
namespace Media { namespace Media {
namespace Streaming { namespace Streaming {
class LoaderMtproto : public Loader, public base::has_weak_ptr { class LoaderMtproto
: public Loader
, public base::has_weak_ptr
, public Storage::Downloader {
public: public:
LoaderMtproto( LoaderMtproto(
not_null<Storage::DownloadManager*> owner, not_null<Storage::DownloadManager*> owner,
@ -44,7 +44,9 @@ public:
void clearAttachedDownloader() override; void clearAttachedDownloader() override;
private: private:
void sendNext(); MTP::DcId dcId() const override;
bool readyToRequest() const override;
void loadPart(int dcIndex) override;
void requestDone(int offset, const MTPupload_File &result); void requestDone(int offset, const MTPupload_File &result);
void requestFailed( void requestFailed(

View File

@ -45,6 +45,7 @@ constexpr auto kPartSize = 128 * 1024;
constexpr auto kStartSessionsCount = 1; constexpr auto kStartSessionsCount = 1;
constexpr auto kMaxSessionsCount = 8; constexpr auto kMaxSessionsCount = 8;
constexpr auto kResetDownloadPrioritiesTimeout = crl::time(200);
} // namespace } // namespace
@ -78,6 +79,10 @@ void DownloadManager::Queue::resetGeneration() {
std::swap(_loaders, _previousGeneration); std::swap(_loaders, _previousGeneration);
} }
bool DownloadManager::Queue::empty() const {
return _loaders.empty() && _previousGeneration.empty();
}
Downloader *DownloadManager::Queue::nextLoader() const { Downloader *DownloadManager::Queue::nextLoader() const {
auto &&all = ranges::view::concat(_loaders, _previousGeneration); auto &&all = ranges::view::concat(_loaders, _previousGeneration);
const auto i = ranges::find(all, true, &FileLoader::readyToRequest); const auto i = ranges::find(all, true, &FileLoader::readyToRequest);
@ -86,6 +91,7 @@ Downloader *DownloadManager::Queue::nextLoader() const {
DownloadManager::DownloadManager(not_null<ApiWrap*> api) DownloadManager::DownloadManager(not_null<ApiWrap*> api)
: _api(api) : _api(api)
, _resetGenerationTimer([=] { resetGeneration(); })
, _killDownloadSessionsTimer([=] { killDownloadSessions(); }) { , _killDownloadSessionsTimer([=] { killDownloadSessions(); }) {
} }
@ -96,11 +102,8 @@ DownloadManager::~DownloadManager() {
void DownloadManager::enqueue(not_null<Downloader*> loader) { void DownloadManager::enqueue(not_null<Downloader*> loader) {
const auto dcId = loader->dcId(); const auto dcId = loader->dcId();
(dcId ? _mtprotoLoaders[dcId] : _webLoaders).enqueue(loader); (dcId ? _mtprotoLoaders[dcId] : _webLoaders).enqueue(loader);
if (!_resettingGeneration) { if (!_resetGenerationTimer.isActive()) {
_resettingGeneration = true; _resetGenerationTimer.callOnce(kResetDownloadPrioritiesTimeout);
crl::on_main(this, [=] {
resetGeneration();
});
} }
checkSendNext(); checkSendNext();
} }
@ -112,7 +115,7 @@ void DownloadManager::remove(not_null<Downloader*> loader) {
} }
void DownloadManager::resetGeneration() { void DownloadManager::resetGeneration() {
_resettingGeneration = false; _resetGenerationTimer.cancel();
for (auto &[dcId, queue] : _mtprotoLoaders) { for (auto &[dcId, queue] : _mtprotoLoaders) {
queue.resetGeneration(); queue.resetGeneration();
} }
@ -121,6 +124,9 @@ void DownloadManager::resetGeneration() {
void DownloadManager::checkSendNext() { void DownloadManager::checkSendNext() {
for (auto &[dcId, queue] : _mtprotoLoaders) { for (auto &[dcId, queue] : _mtprotoLoaders) {
if (queue.empty()) {
continue;
}
const auto bestIndex = [&] { const auto bestIndex = [&] {
const auto i = _requestedBytesAmount.find(dcId); const auto i = _requestedBytesAmount.find(dcId);
if (i == end(_requestedBytesAmount)) { if (i == end(_requestedBytesAmount)) {

View File

@ -71,6 +71,7 @@ private:
void enqueue(not_null<Downloader*> loader); void enqueue(not_null<Downloader*> loader);
void remove(not_null<Downloader*> loader); void remove(not_null<Downloader*> loader);
void resetGeneration(); void resetGeneration();
[[nodiscard]] bool empty() const;
[[nodiscard]] Downloader *nextLoader() const; [[nodiscard]] Downloader *nextLoader() const;
private: private:
@ -92,13 +93,13 @@ private:
base::Observable<void> _taskFinishedObservable; base::Observable<void> _taskFinishedObservable;
base::flat_map<MTP::DcId, std::vector<int>> _requestedBytesAmount; base::flat_map<MTP::DcId, std::vector<int>> _requestedBytesAmount;
base::Timer _resetGenerationTimer;
base::flat_map<MTP::DcId, crl::time> _killDownloadSessionTimes; base::flat_map<MTP::DcId, crl::time> _killDownloadSessionTimes;
base::Timer _killDownloadSessionsTimer; base::Timer _killDownloadSessionsTimer;
base::flat_map<MTP::DcId, Queue> _mtprotoLoaders; base::flat_map<MTP::DcId, Queue> _mtprotoLoaders;
Queue _webLoaders; Queue _webLoaders;
bool _resettingGeneration = false;
}; };

View File

@ -16,6 +16,7 @@ namespace {
using namespace Media::Streaming; using namespace Media::Streaming;
constexpr auto kPartSize = Loader::kPartSize; constexpr auto kPartSize = Loader::kPartSize;
constexpr auto kRequestPartsCount = 8;
} // namespace } // namespace
@ -60,6 +61,8 @@ StreamedFileDownloader::StreamedFileDownloader(
savePart(std::move(part)); savePart(std::move(part));
} }
}, _lifetime); }, _lifetime);
requestParts();
} }
StreamedFileDownloader::~StreamedFileDownloader() { StreamedFileDownloader::~StreamedFileDownloader() {
@ -78,6 +81,31 @@ void StreamedFileDownloader::stop() {
cancelRequests(); cancelRequests();
} }
void StreamedFileDownloader::requestParts() {
while (!_finished
&& _nextPartIndex < _partsCount
&& _partsRequested < kRequestPartsCount) {
requestPart();
}
}
void StreamedFileDownloader::requestPart() {
Expects(!_finished);
const auto index = std::find(
begin(_partIsSaved) + _nextPartIndex,
end(_partIsSaved),
false
) - begin(_partIsSaved);
if (index == _partsCount) {
_nextPartIndex = _partsCount;
return;
}
_nextPartIndex = index + 1;
_reader->loadForDownloader(this, index * kPartSize);
++_partsRequested;
}
QByteArray StreamedFileDownloader::readLoadedPart(int offset) { QByteArray StreamedFileDownloader::readLoadedPart(int offset) {
Expects(offset >= 0 && offset < _size); Expects(offset >= 0 && offset < _size);
Expects(!(offset % kPartSize)); Expects(!(offset % kPartSize));
@ -104,30 +132,11 @@ void StreamedFileDownloader::cancelRequests() {
} }
bool StreamedFileDownloader::readyToRequest() const { bool StreamedFileDownloader::readyToRequest() const {
if (_finished || _nextPartIndex >= _partsCount) { return false;
return false;
}
_nextPartIndex = std::find(
begin(_partIsSaved) + _nextPartIndex,
end(_partIsSaved),
false
) - begin(_partIsSaved);
return (_nextPartIndex < _partsCount);
} }
void StreamedFileDownloader::loadPart(int dcIndex) { void StreamedFileDownloader::loadPart(int dcIndex) {
const auto index = std::find( Unexpected("StreamedFileDownloader can't load parts.");
begin(_partIsSaved) + _nextPartIndex,
end(_partIsSaved),
false
) - begin(_partIsSaved);
if (index == _partsCount) {
_nextPartIndex = _partsCount;
return;
}
_nextPartIndex = index + 1;
_reader->loadForDownloader(this, index * kPartSize);
++_partsRequested;
} }
void StreamedFileDownloader::savePart(const LoadedPart &part) { void StreamedFileDownloader::savePart(const LoadedPart &part) {
@ -159,6 +168,7 @@ void StreamedFileDownloader::savePart(const LoadedPart &part) {
} }
} }
_reader->doneForDownloader(offset); _reader->doneForDownloader(offset);
requestParts();
notifyAboutProgress(); notifyAboutProgress();
} }

View File

@ -51,6 +51,8 @@ private:
void cancelRequests() override; void cancelRequests() override;
bool readyToRequest() const override; bool readyToRequest() const override;
void loadPart(int dcIndex) override; void loadPart(int dcIndex) override;
void requestParts();
void requestPart();
void savePart(const Media::Streaming::LoadedPart &part); void savePart(const Media::Streaming::LoadedPart &part);