Use a special FileLoader for streamed documents.

This commit is contained in:
John Preston 2019-04-11 11:59:18 +04:00
parent 8c0cd9b9e9
commit ebf2a678b1
9 changed files with 345 additions and 219 deletions

View File

@ -866,9 +866,10 @@ void DocumentData::save(
} }
} else { } else {
status = FileReady; status = FileReady;
/* if (auto reader = owner().documentStreamedReader(this, origin)) { if (auto reader = owner().documentStreamedReader(this, origin)) {
_loader = new Storage::StreamedFileDownloader( _loader = new Storage::StreamedFileDownloader(
id, id,
_dc,
origin, origin,
(saveToCache() (saveToCache()
? std::make_optional(Data::DocumentCacheKey(_dc, id)) ? std::make_optional(Data::DocumentCacheKey(_dc, id))
@ -881,7 +882,7 @@ void DocumentData::save(
fromCloud, fromCloud,
autoLoading, autoLoading,
cacheTag()); cacheTag());
} else */if (hasWebLocation()) { } else if (hasWebLocation()) {
_loader = new mtpFileLoader( _loader = new mtpFileLoader(
_urlLocation, _urlLocation,
size, size,

View File

@ -670,6 +670,18 @@ QByteArray Reader::Slices::serializeAndUnloadFirstSliceNoHeader() {
return result; return result;
} }
template <typename Callback>
void Reader::Slices::enumerateParts(int sliceNumber, Callback &&callback) {
const auto shift = sliceNumber ? ((sliceNumber - 1) * kInSlice) : 0;
const auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header);
for (const auto &[offset, bytes] : slice.parts) {
callback(LoadedPart{ offset + shift, bytes });
}
if (!sliceNumber && isGoodHeader()) {
enumerateParts(1, std::forward<Callback>(callback));
}
}
Reader::SerializedSlice Reader::Slices::unloadToCache() { Reader::SerializedSlice Reader::Slices::unloadToCache() {
if (_headerMode == HeaderMode::Unknown if (_headerMode == HeaderMode::Unknown
|| _headerMode == HeaderMode::NoCache) { || _headerMode == HeaderMode::NoCache) {
@ -695,9 +707,11 @@ Reader::Reader(
, _slices(_loader->size(), _cacheHelper != nullptr) { , _slices(_loader->size(), _cacheHelper != nullptr) {
_loader->parts( _loader->parts(
) | rpl::start_with_next([=](LoadedPart &&part) { ) | rpl::start_with_next([=](LoadedPart &&part) {
QMutexLocker lock(&_loadedPartsMutex); if (_downloaderAttached.load(std::memory_order_acquire)) {
_loadedParts.push_back(std::move(part)); _partsForDownloader.fire_copy(part);
lock.unlock(); }
_loadedParts.emplace(std::move(part));
if (const auto waiting = _waiting.load()) { if (const auto waiting = _waiting.load()) {
_waiting = nullptr; _waiting = nullptr;
@ -714,6 +728,23 @@ void Reader::stop() {
_waiting = nullptr; _waiting = nullptr;
} }
rpl::producer<LoadedPart> Reader::partsForDownloader() const {
return _partsForDownloader.events();
}
void Reader::loadForDownloader(int offset) {
_downloaderAttached.store(true, std::memory_order_release);
_downloaderOffsetRequests.emplace(offset);
AssertIsDebug(); // wake?
}
void Reader::cancelForDownloader() {
if (_downloaderAttached.load(std::memory_order_acquire)) {
_downloaderOffsetRequests.take();
_downloaderAttached.store(false, std::memory_order_release);
}
}
bool Reader::isRemoteLoader() const { bool Reader::isRemoteLoader() const {
return _loader->baseCacheKey().has_value(); return _loader->baseCacheKey().has_value();
} }
@ -870,24 +901,35 @@ bool Reader::processCacheResults() {
} }
QMutexLocker lock(&_cacheHelper->mutex); QMutexLocker lock(&_cacheHelper->mutex);
auto loaded = base::take(_cacheHelper->results); const auto loaded = base::take(_cacheHelper->results);
lock.unlock(); lock.unlock();
for (const auto &[sliceNumber, result] : loaded) { for (const auto &[sliceNumber, result] : loaded) {
_slices.processCacheResult(sliceNumber, bytes::make_span(result)); _slices.processCacheResult(sliceNumber, bytes::make_span(result));
} }
if (_downloaderAttached.load(std::memory_order_acquire)) {
for (const auto &[sliceNumber, result] : loaded) {
sendPartsToDownloader(sliceNumber);
}
}
return !loaded.empty(); return !loaded.empty();
} }
void Reader::sendPartsToDownloader(int sliceNumber) {
_slices.enumerateParts(sliceNumber, [&](LoadedPart &&part) {
crl::on_main(this, [=, part = std::move(part)]() mutable {
AssertIsDebug(); // maybe send them with small timeout?
_partsForDownloader.fire(std::move(part));
});
});
}
bool Reader::processLoadedParts() { bool Reader::processLoadedParts() {
if (_failed) { if (_failed) {
return false; return false;
} }
QMutexLocker lock(&_loadedPartsMutex); auto loaded = _loadedParts.take();
auto loaded = base::take(_loadedParts);
lock.unlock();
for (auto &part : loaded) { for (auto &part : loaded) {
if (part.offset == LoadedPart::kFailedOffset if (part.offset == LoadedPart::kFailedOffset
|| (part.bytes.size() != Loader::kPartSize || (part.bytes.size() != Loader::kPartSize

View File

@ -9,6 +9,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "media/streaming/media_streaming_loader.h" #include "media/streaming/media_streaming_loader.h"
#include "base/bytes.h" #include "base/bytes.h"
#include "base/weak_ptr.h"
#include "base/thread_safe_queue.h"
namespace Storage { namespace Storage {
namespace Cache { namespace Cache {
@ -27,22 +29,28 @@ class Loader;
struct LoadedPart; struct LoadedPart;
enum class Error; enum class Error;
class Reader final { class Reader final : public base::has_weak_ptr {
public: public:
// Main thread.
Reader(not_null<Data::Session*> owner, std::unique_ptr<Loader> loader); Reader(not_null<Data::Session*> owner, std::unique_ptr<Loader> loader);
// Any thread.
[[nodiscard]] int size() const; [[nodiscard]] int size() const;
[[nodiscard]] bool isRemoteLoader() const;
// Single thread.
[[nodiscard]] bool fill( [[nodiscard]] bool fill(
int offset, int offset,
bytes::span buffer, bytes::span buffer,
not_null<crl::semaphore*> notify); not_null<crl::semaphore*> notify);
[[nodiscard]] std::optional<Error> failed() const; [[nodiscard]] std::optional<Error> failed() const;
void headerDone(); void headerDone();
void stop(); void stop();
[[nodiscard]] bool isRemoteLoader() const; // Main thread.
[[nodiscard]] rpl::producer<LoadedPart> partsForDownloader() const;
void loadForDownloader(int offset);
void cancelForDownloader();
~Reader(); ~Reader();
@ -126,6 +134,10 @@ private:
[[nodiscard]] FillResult fill(int offset, bytes::span buffer); [[nodiscard]] FillResult fill(int offset, bytes::span buffer);
[[nodiscard]] SerializedSlice unloadToCache(); [[nodiscard]] SerializedSlice unloadToCache();
// callback(LoadedPart(..)).
template <typename Callback>
void enumerateParts(int sliceNumber, Callback &&callback);
private: private:
enum class HeaderMode { enum class HeaderMode {
Unknown, Unknown,
@ -160,6 +172,7 @@ private:
// 0 is for headerData, slice index = sliceNumber - 1. // 0 is for headerData, slice index = sliceNumber - 1.
void readFromCache(int sliceNumber); void readFromCache(int sliceNumber);
bool processCacheResults(); bool processCacheResults();
void sendPartsToDownloader(int sliceNumber);
void putToCache(SerializedSlice &&data); void putToCache(SerializedSlice &&data);
void cancelLoadInRange(int from, int till); void cancelLoadInRange(int from, int till);
@ -178,13 +191,18 @@ private:
const std::unique_ptr<Loader> _loader; const std::unique_ptr<Loader> _loader;
const std::shared_ptr<CacheHelper> _cacheHelper; const std::shared_ptr<CacheHelper> _cacheHelper;
QMutex _loadedPartsMutex; base::thread_safe_queue<LoadedPart> _loadedParts;
std::vector<LoadedPart> _loadedParts;
std::atomic<crl::semaphore*> _waiting = nullptr; std::atomic<crl::semaphore*> _waiting = nullptr;
PriorityQueue _loadingOffsets; PriorityQueue _loadingOffsets;
Slices _slices; Slices _slices;
std::optional<Error> _failed; std::optional<Error> _failed;
std::atomic<bool> _downloaderAttached = false;
base::thread_safe_queue<int> _downloaderOffsetRequests;
// Main thread.
rpl::event_stream<LoadedPart> _partsForDownloader;
rpl::lifetime _lifetime; rpl::lifetime _lifetime;
}; };

View File

@ -1831,7 +1831,7 @@ void OverlayWidget::displayDocument(DocumentData *doc, HistoryItem *item) {
} else { } else {
_doc->automaticLoad(fileOrigin(), item); _doc->automaticLoad(fileOrigin(), item);
if (_doc->canBePlayed() && !_doc->loading()) { if (_doc->canBePlayed()) {
initStreaming(); initStreaming();
} else if (_doc->isVideoFile()) { } else if (_doc->isVideoFile()) {
initStreamingThumbnail(); initStreamingThumbnail();
@ -2456,19 +2456,7 @@ void OverlayWidget::validatePhotoCurrentImage() {
} }
} }
void OverlayWidget::checkLoadingWhileStreaming() {
if (_streamed && _doc->loading()) {
crl::on_main(this, [=, doc = _doc] {
if (!isHidden() && _doc == doc) {
redisplayContent();
}
});
}
}
void OverlayWidget::paintEvent(QPaintEvent *e) { void OverlayWidget::paintEvent(QPaintEvent *e) {
checkLoadingWhileStreaming();
const auto r = e->rect(); const auto r = e->rect();
const auto &region = e->region(); const auto &region = e->region();
const auto rects = region.rects(); const auto rects = region.rects();
@ -2961,10 +2949,10 @@ void OverlayWidget::keyPressEvent(QKeyEvent *e) {
} else if (e->key() == Qt::Key_Copy || (e->key() == Qt::Key_C && ctrl)) { } else if (e->key() == Qt::Key_Copy || (e->key() == Qt::Key_C && ctrl)) {
onCopy(); onCopy();
} else if (e->key() == Qt::Key_Enter || e->key() == Qt::Key_Return || e->key() == Qt::Key_Space) { } else if (e->key() == Qt::Key_Enter || e->key() == Qt::Key_Return || e->key() == Qt::Key_Space) {
if (_doc && !_doc->loading() && (documentBubbleShown() || !_doc->loaded())) { if (_streamed) {
onDocClick();
} else if (_streamed) {
playbackPauseResume(); playbackPauseResume();
} else if (_doc && !_doc->loading() && (documentBubbleShown() || !_doc->loaded())) {
onDocClick();
} }
} else if (e->key() == Qt::Key_Left) { } else if (e->key() == Qt::Key_Left) {
if (_controlsHideTimer.isActive()) { if (_controlsHideTimer.isActive()) {

View File

@ -229,7 +229,6 @@ private:
void updateActions(); void updateActions();
void resizeCenteredControls(); void resizeCenteredControls();
void resizeContentByScreenSize(); void resizeContentByScreenSize();
void checkLoadingWhileStreaming();
void displayPhoto(not_null<PhotoData*> photo, HistoryItem *item); void displayPhoto(not_null<PhotoData*> photo, HistoryItem *item);
void displayDocument(DocumentData *document, HistoryItem *item); void displayDocument(DocumentData *document, HistoryItem *item);

View File

@ -28,10 +28,17 @@ namespace {
// How much time without download causes additional session kill. // How much time without download causes additional session kill.
constexpr auto kKillSessionTimeout = crl::time(5000); constexpr auto kKillSessionTimeout = crl::time(5000);
// Max 16 file parts downloaded at the same time, 128 KB each.
constexpr auto kMaxFileQueries = 16;
// Max 8 http[s] files downloaded at the same time.
constexpr auto kMaxWebFileQueries = 8;
} // namespace } // namespace
Downloader::Downloader() Downloader::Downloader()
: _killDownloadSessionsTimer([=] { killDownloadSessions(); }) { : _killDownloadSessionsTimer([=] { killDownloadSessions(); })
, _queueForWeb(kMaxWebFileQueries) {
} }
void Downloader::clearPriorities() { void Downloader::clearPriorities() {
@ -106,6 +113,18 @@ int Downloader::chooseDcIndexForRequest(MTP::DcId dcId) const {
return result; return result;
} }
not_null<Downloader::Queue*> Downloader::queueForDc(MTP::DcId dcId) {
const auto i = _queuesForDc.find(dcId);
const auto result = (i != end(_queuesForDc))
? i
: _queuesForDc.emplace(dcId, Queue(kMaxFileQueries)).first;
return &result->second;
}
not_null<Downloader::Queue*> Downloader::queueForWeb() {
return &_queueForWeb;
}
Downloader::~Downloader() { Downloader::~Downloader() {
killDownloadSessions(); killDownloadSessions();
} }
@ -116,28 +135,12 @@ namespace {
constexpr auto kDownloadPhotoPartSize = 64 * 1024; // 64kb for photo constexpr auto kDownloadPhotoPartSize = 64 * 1024; // 64kb for photo
constexpr auto kDownloadDocumentPartSize = 128 * 1024; // 128kb for document constexpr auto kDownloadDocumentPartSize = 128 * 1024; // 128kb for document
constexpr auto kMaxFileQueries = 16; // max 16 file parts downloaded at the same time
constexpr auto kMaxWebFileQueries = 8; // max 8 http[s] files downloaded at the same time
constexpr auto kDownloadCdnPartSize = 128 * 1024; // 128kb for cdn requests constexpr auto kDownloadCdnPartSize = 128 * 1024; // 128kb for cdn requests
} // namespace } // namespace
struct FileLoaderQueue {
FileLoaderQueue(int queriesLimit) : queriesLimit(queriesLimit) {
}
int queriesCount = 0;
int queriesLimit = 0;
FileLoader *start = nullptr;
FileLoader *end = nullptr;
};
namespace { namespace {
using LoaderQueues = QMap<int32, FileLoaderQueue>;
LoaderQueues queues;
FileLoaderQueue _webQueue(kMaxWebFileQueries);
QThread *_webLoadThread = nullptr; QThread *_webLoadThread = nullptr;
WebLoadManager *_webLoadManager = nullptr; WebLoadManager *_webLoadManager = nullptr;
WebLoadManager *webLoadManager() { WebLoadManager *webLoadManager() {
@ -252,7 +255,7 @@ void FileLoader::notifyAboutProgress() {
LoadNextFromQueue(queue); LoadNextFromQueue(queue);
} }
void FileLoader::LoadNextFromQueue(not_null<FileLoaderQueue*> queue) { void FileLoader::LoadNextFromQueue(not_null<Queue*> queue) {
if (queue->queriesCount >= queue->queriesLimit) { if (queue->queriesCount >= queue->queriesLimit) {
return; return;
} }
@ -524,6 +527,89 @@ void FileLoader::startLoading(bool loadFirst, bool prior) {
loadPart(); loadPart();
} }
int FileLoader::currentOffset() const {
return (_fileIsOpen ? _file.size() : _data.size()) - _skippedBytes;
}
bool FileLoader::writeResultPart(int offset, bytes::const_span buffer) {
Expects(!_finished);
if (!buffer.empty()) {
if (_fileIsOpen) {
auto fsize = _file.size();
if (offset < fsize) {
_skippedBytes -= buffer.size();
} else if (offset > fsize) {
_skippedBytes += offset - fsize;
}
_file.seek(offset);
if (_file.write(reinterpret_cast<const char*>(buffer.data()), buffer.size()) != qint64(buffer.size())) {
cancel(true);
return false;
}
} else {
_data.reserve(offset + buffer.size());
if (offset > _data.size()) {
_skippedBytes += offset - _data.size();
_data.resize(offset);
}
if (offset == _data.size()) {
_data.append(reinterpret_cast<const char*>(buffer.data()), buffer.size());
} else {
_skippedBytes -= buffer.size();
if (int64(offset + buffer.size()) > _data.size()) {
_data.resize(offset + buffer.size());
}
const auto dst = bytes::make_detached_span(_data).subspan(
offset,
buffer.size());
bytes::copy(dst, buffer);
}
}
}
return true;
}
bool FileLoader::finalizeResult() {
Expects(!_finished);
if (!_filename.isEmpty() && (_toCache == LoadToCacheAsWell)) {
if (!_fileIsOpen) {
_fileIsOpen = _file.open(QIODevice::WriteOnly);
}
if (!_fileIsOpen || _file.write(_data) != qint64(_data.size())) {
cancel(true);
return false;
}
}
_finished = true;
if (_fileIsOpen) {
_file.close();
_fileIsOpen = false;
Platform::File::PostprocessDownloaded(
QFileInfo(_file).absoluteFilePath());
}
removeFromQueue();
if (_localStatus == LocalStatus::NotFound) {
if (const auto key = fileLocationKey()) {
Local::writeFileLocation(*key, FileLocation(_filename));
}
if (const auto key = cacheKey()) {
if (_data.size() <= Storage::kMaxFileInMemory) {
Auth().data().cache().put(
*key,
Storage::Cache::Database::TaggedValue(
base::duplicate(_data),
_cacheTag));
}
}
}
_downloader->taskFinished().notify();
return true;
}
mtpFileLoader::mtpFileLoader( mtpFileLoader::mtpFileLoader(
const StorageFileLocation &location, const StorageFileLocation &location,
Data::FileOrigin origin, Data::FileOrigin origin,
@ -544,12 +630,7 @@ mtpFileLoader::mtpFileLoader(
cacheTag) cacheTag)
, _location(location) , _location(location)
, _origin(origin) { , _origin(origin) {
auto shiftedDcId = MTP::downloadDcId(dcId(), 0); _queue = _downloader->queueForDc(dcId());
auto i = queues.find(shiftedDcId);
if (i == queues.cend()) {
i = queues.insert(shiftedDcId, FileLoaderQueue(kMaxFileQueries));
}
_queue = &i.value();
} }
mtpFileLoader::mtpFileLoader( mtpFileLoader::mtpFileLoader(
@ -567,12 +648,7 @@ mtpFileLoader::mtpFileLoader(
autoLoading, autoLoading,
cacheTag) cacheTag)
, _location(location) { , _location(location) {
auto shiftedDcId = MTP::downloadDcId(dcId(), 0); _queue = _downloader->queueForDc(dcId());
auto i = queues.find(shiftedDcId);
if (i == queues.cend()) {
i = queues.insert(shiftedDcId, FileLoaderQueue(kMaxFileQueries));
}
_queue = &i.value();
} }
mtpFileLoader::mtpFileLoader( mtpFileLoader::mtpFileLoader(
@ -590,16 +666,7 @@ mtpFileLoader::mtpFileLoader(
autoLoading, autoLoading,
cacheTag) cacheTag)
, _location(location) { , _location(location) {
auto shiftedDcId = MTP::downloadDcId(dcId(), 0); _queue = _downloader->queueForDc(dcId());
auto i = queues.find(shiftedDcId);
if (i == queues.cend()) {
i = queues.insert(shiftedDcId, FileLoaderQueue(kMaxFileQueries));
}
_queue = &i.value();
}
int mtpFileLoader::currentOffset() const {
return (_fileIsOpen ? _file.size() : _data.size()) - _skippedBytes;
} }
Data::FileOrigin mtpFileLoader::fileOrigin() const { Data::FileOrigin mtpFileLoader::fileOrigin() const {
@ -947,88 +1014,17 @@ int mtpFileLoader::finishSentRequestGetOffset(mtpRequestId requestId) {
} }
bool mtpFileLoader::feedPart(int offset, bytes::const_span buffer) { bool mtpFileLoader::feedPart(int offset, bytes::const_span buffer) {
Expects(!_finished); if (!writeResultPart(offset, buffer)) {
return false;
if (!buffer.empty()) {
if (_fileIsOpen) {
auto fsize = _file.size();
if (offset < fsize) {
_skippedBytes -= buffer.size();
} else if (offset > fsize) {
_skippedBytes += offset - fsize;
}
_file.seek(offset);
if (_file.write(reinterpret_cast<const char*>(buffer.data()), buffer.size()) != qint64(buffer.size())) {
cancel(true);
return false;
}
} else {
_data.reserve(offset + buffer.size());
if (offset > _data.size()) {
_skippedBytes += offset - _data.size();
_data.resize(offset);
}
if (offset == _data.size()) {
_data.append(reinterpret_cast<const char*>(buffer.data()), buffer.size());
} else {
_skippedBytes -= buffer.size();
if (int64(offset + buffer.size()) > _data.size()) {
_data.resize(offset + buffer.size());
}
const auto dst = bytes::make_detached_span(_data).subspan(
offset,
buffer.size());
bytes::copy(dst, buffer);
}
}
} }
if (buffer.empty() || (buffer.size() % 1024)) { // bad next offset if (buffer.empty() || (buffer.size() % 1024)) { // bad next offset
_lastComplete = true; _lastComplete = true;
} }
if (_sentRequests.empty() const auto finished = _sentRequests.empty()
&& _cdnUncheckedParts.empty() && _cdnUncheckedParts.empty()
&& (_lastComplete || (_size && _nextRequestOffset >= _size))) { && (_lastComplete || (_size && _nextRequestOffset >= _size));
if (!_filename.isEmpty() && (_toCache == LoadToCacheAsWell)) { if (finished && !finalizeResult()) {
if (!_fileIsOpen) { return false;
_fileIsOpen = _file.open(QIODevice::WriteOnly);
}
if (!_fileIsOpen || _file.write(_data) != qint64(_data.size())) {
cancel(true);
return false;
}
}
_finished = true;
if (_fileIsOpen) {
_file.close();
_fileIsOpen = false;
Platform::File::PostprocessDownloaded(QFileInfo(_file).absoluteFilePath());
}
removeFromQueue();
if (_localStatus == LocalStatus::NotFound) {
if (_locationType != UnknownFileLocation
&& !_filename.isEmpty()) {
Local::writeFileLocation(
mediaKey(_locationType, dcId(), objId()),
FileLocation(_filename));
}
if (_location.is<WebFileLocation>()
|| _locationType == UnknownFileLocation
|| _toCache == LoadToCacheAsWell) {
if (const auto key = cacheKey()) {
if (_data.size() <= Storage::kMaxFileInMemory) {
Auth().data().cache().put(
*key,
Storage::Cache::Database::TaggedValue(
base::duplicate(_data),
_cacheTag));
}
}
}
}
}
if (_finished) {
_downloader->taskFinished().notify();
} }
return true; return true;
} }
@ -1177,6 +1173,13 @@ std::optional<Storage::Cache::Key> mtpFileLoader::cacheKey() const {
}); });
} }
std::optional<MediaKey> mtpFileLoader::fileLocationKey() const {
if (_locationType != UnknownFileLocation && !_filename.isEmpty()) {
return mediaKey(_locationType, dcId(), objId());
}
return std::nullopt;
}
mtpFileLoader::~mtpFileLoader() { mtpFileLoader::~mtpFileLoader() {
cancelRequests(); cancelRequests();
} }
@ -1198,11 +1201,15 @@ webFileLoader::webFileLoader(
, _url(url) , _url(url)
, _requestSent(false) , _requestSent(false)
, _already(0) { , _already(0) {
_queue = &_webQueue; _queue = _downloader->queueForWeb();
} }
bool webFileLoader::loadPart() { bool webFileLoader::loadPart() {
if (_finished || _requestSent || _webLoadManager == FinishedWebLoadManager) return false; if (_finished
|| _requestSent
|| _webLoadManager == FinishedWebLoadManager) {
return false;
}
if (!_webLoadManager) { if (!_webLoadManager) {
_webLoadMainManager = new WebLoadMainManager(); _webLoadMainManager = new WebLoadMainManager();
@ -1221,55 +1228,21 @@ int webFileLoader::currentOffset() const {
return _already; return _already;
} }
void webFileLoader::onProgress(qint64 already, qint64 size) { void webFileLoader::loadProgress(qint64 already, qint64 size) {
_size = size; _size = size;
_already = already; _already = already;
emit progress(this);
}
void webFileLoader::onFinished(const QByteArray &data) {
if (_fileIsOpen) {
if (_file.write(data.constData(), data.size()) != qint64(data.size())) {
return cancel(true);
}
} else {
_data = data;
}
if (!_filename.isEmpty() && (_toCache == LoadToCacheAsWell)) {
if (!_fileIsOpen) _fileIsOpen = _file.open(QIODevice::WriteOnly);
if (!_fileIsOpen) {
return cancel(true);
}
if (_file.write(_data) != qint64(_data.size())) {
return cancel(true);
}
}
_finished = true;
if (_fileIsOpen) {
_file.close();
_fileIsOpen = false;
Platform::File::PostprocessDownloaded(QFileInfo(_file).absoluteFilePath());
}
removeFromQueue();
if (_localStatus == LocalStatus::NotFound) {
if (const auto key = cacheKey()) {
if (_data.size() <= Storage::kMaxFileInMemory) {
Auth().data().cache().put(
*key,
Storage::Cache::Database::TaggedValue(
base::duplicate(_data),
_cacheTag));
}
}
}
_downloader->taskFinished().notify();
notifyAboutProgress(); notifyAboutProgress();
} }
void webFileLoader::onError() { void webFileLoader::loadFinished(const QByteArray &data) {
if (writeResultPart(0, bytes::make_span(data))) {
if (finalizeResult()) {
notifyAboutProgress();
}
}
}
void webFileLoader::loadError() {
cancel(true); cancel(true);
} }
@ -1277,6 +1250,10 @@ std::optional<Storage::Cache::Key> webFileLoader::cacheKey() const {
return Data::UrlCacheKey(_url); return Data::UrlCacheKey(_url);
} }
std::optional<MediaKey> webFileLoader::fileLocationKey() const {
return std::nullopt;
}
void webFileLoader::cancelRequests() { void webFileLoader::cancelRequests() {
if (!webLoadManager()) return; if (!webLoadManager()) return;
webLoadManager()->stop(this); webLoadManager()->stop(this);
@ -1621,18 +1598,18 @@ WebLoadManager::~WebLoadManager() {
void WebLoadMainManager::progress(webFileLoader *loader, qint64 already, qint64 size) { void WebLoadMainManager::progress(webFileLoader *loader, qint64 already, qint64 size) {
if (webLoadManager() && webLoadManager()->carries(loader)) { if (webLoadManager() && webLoadManager()->carries(loader)) {
loader->onProgress(already, size); loader->loadProgress(already, size);
} }
} }
void WebLoadMainManager::finished(webFileLoader *loader, QByteArray data) { void WebLoadMainManager::finished(webFileLoader *loader, QByteArray data) {
if (webLoadManager() && webLoadManager()->carries(loader)) { if (webLoadManager() && webLoadManager()->carries(loader)) {
loader->onFinished(data); loader->loadFinished(data);
} }
} }
void WebLoadMainManager::error(webFileLoader *loader) { void WebLoadMainManager::error(webFileLoader *loader) {
if (webLoadManager() && webLoadManager()->carries(loader)) { if (webLoadManager() && webLoadManager()->carries(loader)) {
loader->onError(); loader->loadError();
} }
} }

View File

@ -26,7 +26,17 @@ constexpr auto kMaxWallPaperDimension = 4096; // 4096x4096 is max area.
class Downloader final { class Downloader final {
public: public:
struct Queue {
Queue(int queriesLimit) : queriesLimit(queriesLimit) {
}
int queriesCount = 0;
int queriesLimit = 0;
FileLoader *start = nullptr;
FileLoader *end = nullptr;
};
Downloader(); Downloader();
~Downloader();
int currentPriority() const { int currentPriority() const {
return _priority; return _priority;
@ -40,7 +50,8 @@ public:
void requestedAmountIncrement(MTP::DcId dcId, int index, int amount); void requestedAmountIncrement(MTP::DcId dcId, int index, int amount);
int chooseDcIndexForRequest(MTP::DcId dcId) const; int chooseDcIndexForRequest(MTP::DcId dcId) const;
~Downloader(); not_null<Queue*> queueForDc(MTP::DcId dcId);
not_null<Queue*> queueForWeb();
private: private:
void killDownloadSessionsStart(MTP::DcId dcId); void killDownloadSessionsStart(MTP::DcId dcId);
@ -56,6 +67,9 @@ private:
base::flat_map<MTP::DcId, crl::time> _killDownloadSessionTimes; base::flat_map<MTP::DcId, crl::time> _killDownloadSessionTimes;
base::Timer _killDownloadSessionsTimer; base::Timer _killDownloadSessionsTimer;
std::map<MTP::DcId, Queue> _queuesForDc;
Queue _queueForWeb;
}; };
} // namespace Storage } // namespace Storage
@ -72,7 +86,6 @@ struct StorageImageSaved {
class mtpFileLoader; class mtpFileLoader;
class webFileLoader; class webFileLoader;
struct FileLoaderQueue;
class FileLoader : public QObject { class FileLoader : public QObject {
Q_OBJECT Q_OBJECT
@ -106,7 +119,7 @@ public:
} }
virtual Data::FileOrigin fileOrigin() const; virtual Data::FileOrigin fileOrigin() const;
float64 currentProgress() const; float64 currentProgress() const;
virtual int currentOffset() const = 0; virtual int currentOffset() const;
int fullSize() const; int fullSize() const;
bool setFileName(const QString &filename); // set filename for loaders to cache bool setFileName(const QString &filename); // set filename for loaders to cache
@ -146,6 +159,8 @@ signals:
void failed(FileLoader *loader, bool started); void failed(FileLoader *loader, bool started);
protected: protected:
using Queue = Storage::Downloader::Queue;
enum class LocalStatus { enum class LocalStatus {
NotTried, NotTried,
NotFound, NotFound,
@ -158,6 +173,7 @@ protected:
bool tryLoadLocal(); bool tryLoadLocal();
void loadLocal(const Storage::Cache::Key &key); void loadLocal(const Storage::Cache::Key &key);
virtual std::optional<Storage::Cache::Key> cacheKey() const = 0; virtual std::optional<Storage::Cache::Key> cacheKey() const = 0;
virtual std::optional<MediaKey> fileLocationKey() const = 0;
virtual void cancelRequests() = 0; virtual void cancelRequests() = 0;
void startLoading(bool loadFirst, bool prior); void startLoading(bool loadFirst, bool prior);
@ -165,14 +181,17 @@ protected:
void cancel(bool failed); void cancel(bool failed);
void notifyAboutProgress(); void notifyAboutProgress();
static void LoadNextFromQueue(not_null<FileLoaderQueue*> queue); static void LoadNextFromQueue(not_null<Queue*> queue);
virtual bool loadPart() = 0; virtual bool loadPart() = 0;
bool writeResultPart(int offset, bytes::const_span buffer);
bool finalizeResult();
not_null<Storage::Downloader*> _downloader; not_null<Storage::Downloader*> _downloader;
FileLoader *_prev = nullptr; FileLoader *_prev = nullptr;
FileLoader *_next = nullptr; FileLoader *_next = nullptr;
int _priority = 0; int _priority = 0;
FileLoaderQueue *_queue = nullptr; Queue *_queue = nullptr;
bool _paused = false; bool _paused = false;
bool _autoLoading = false; bool _autoLoading = false;
@ -192,6 +211,7 @@ protected:
QByteArray _data; QByteArray _data;
int _size = 0; int _size = 0;
int _skippedBytes = 0;
LocationType _locationType = LocationType(); LocationType _locationType = LocationType();
base::binary_guard _localLoading; base::binary_guard _localLoading;
@ -227,7 +247,6 @@ public:
bool autoLoading, bool autoLoading,
uint8 cacheTag); uint8 cacheTag);
int currentOffset() const override;
Data::FileOrigin fileOrigin() const override; Data::FileOrigin fileOrigin() const override;
uint64 objId() const override; uint64 objId() const override;
@ -255,6 +274,7 @@ private:
QByteArray hash; QByteArray hash;
}; };
std::optional<Storage::Cache::Key> cacheKey() const override; std::optional<Storage::Cache::Key> cacheKey() const override;
std::optional<MediaKey> fileLocationKey() const override;
void cancelRequests() override; void cancelRequests() override;
MTP::DcId dcId() const; MTP::DcId dcId() const;
@ -270,8 +290,8 @@ private:
void requestMoreCdnFileHashes(); void requestMoreCdnFileHashes();
void getCdnFileHashesDone(const MTPVector<MTPFileHash> &result, mtpRequestId requestId); void getCdnFileHashesDone(const MTPVector<MTPFileHash> &result, mtpRequestId requestId);
bool feedPart(int offset, bytes::const_span buffer);
void partLoaded(int offset, bytes::const_span buffer); void partLoaded(int offset, bytes::const_span buffer);
bool feedPart(int offset, bytes::const_span buffer);
bool partFailed(const RPCError &error, mtpRequestId requestId); bool partFailed(const RPCError &error, mtpRequestId requestId);
bool normalPartFailed(QByteArray fileReference, const RPCError &error, mtpRequestId requestId); bool normalPartFailed(QByteArray fileReference, const RPCError &error, mtpRequestId requestId);
@ -294,7 +314,6 @@ private:
std::map<mtpRequestId, RequestData> _sentRequests; std::map<mtpRequestId, RequestData> _sentRequests;
bool _lastComplete = false; bool _lastComplete = false;
int32 _skippedBytes = 0;
int32 _nextRequestOffset = 0; int32 _nextRequestOffset = 0;
base::variant< base::variant<
@ -327,9 +346,9 @@ public:
int currentOffset() const override; int currentOffset() const override;
void onProgress(qint64 already, qint64 size); void loadProgress(qint64 already, qint64 size);
void onFinished(const QByteArray &data); void loadFinished(const QByteArray &data);
void onError(); void loadError();
void stop() override { void stop() override {
cancelRequests(); cancelRequests();
@ -340,6 +359,7 @@ public:
protected: protected:
void cancelRequests() override; void cancelRequests() override;
std::optional<Storage::Cache::Key> cacheKey() const override; std::optional<Storage::Cache::Key> cacheKey() const override;
std::optional<MediaKey> fileLocationKey() const override;
bool loadPart() override; bool loadPart() override;
QString _url; QString _url;

View File

@ -13,15 +13,18 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
namespace Storage { namespace Storage {
namespace { namespace {
constexpr auto kPartSize = Media::Streaming::Loader::kPartSize; using namespace Media::Streaming;
constexpr auto kPartSize = Loader::kPartSize;
} // namespace } // namespace
StreamedFileDownloader::StreamedFileDownloader( StreamedFileDownloader::StreamedFileDownloader(
uint64 objectId, uint64 objectId,
MTP::DcId dcId,
Data::FileOrigin origin, Data::FileOrigin origin,
std::optional<Cache::Key> cacheKey, std::optional<Cache::Key> cacheKey,
std::shared_ptr<Media::Streaming::Reader> reader, std::shared_ptr<Reader> reader,
// For FileLoader // For FileLoader
const QString &toFile, const QString &toFile,
@ -44,6 +47,17 @@ StreamedFileDownloader::StreamedFileDownloader(
, _cacheKey(cacheKey) , _cacheKey(cacheKey)
, _reader(std::move(reader)) { , _reader(std::move(reader)) {
_partIsSaved.resize((size + kPartSize - 1) / kPartSize, false); _partIsSaved.resize((size + kPartSize - 1) / kPartSize, false);
_reader->partsForDownloader(
) | rpl::start_with_next([=](const LoadedPart &part) {
if (part.offset == LoadedPart::kFailedOffset) {
cancel(true);
} else {
savePart(std::move(part));
}
}, _lifetime);
_queue = _downloader->queueForDc(dcId);
} }
StreamedFileDownloader::~StreamedFileDownloader() { StreamedFileDownloader::~StreamedFileDownloader() {
@ -58,10 +72,6 @@ Data::FileOrigin StreamedFileDownloader::fileOrigin() const {
return _origin; return _origin;
} }
int StreamedFileDownloader::currentOffset() const {
return 0;
}
void StreamedFileDownloader::stop() { void StreamedFileDownloader::stop() {
cancelRequests(); cancelRequests();
} }
@ -70,11 +80,77 @@ std::optional<Storage::Cache::Key> StreamedFileDownloader::cacheKey() const {
return _cacheKey; return _cacheKey;
} }
std::optional<MediaKey> StreamedFileDownloader::fileLocationKey() const {
return std::nullopt; AssertIsDebug();
}
void StreamedFileDownloader::cancelRequests() { void StreamedFileDownloader::cancelRequests() {
const auto requests = std::count(
begin(_partIsSaved),
begin(_partIsSaved) + _nextPartIndex,
false);
_queue->queriesCount -= requests;
_nextPartIndex = 0;
_reader->cancelForDownloader();
} }
bool StreamedFileDownloader::loadPart() { bool StreamedFileDownloader::loadPart() {
return false; if (_finished || _nextPartIndex >= size(_partIsSaved)) {
return false;
}
const auto index = std::find(
begin(_partIsSaved) + _nextPartIndex,
end(_partIsSaved),
false
) - begin(_partIsSaved);
if (index == size(_partIsSaved)) {
_nextPartIndex = index;
return false;
}
_nextPartIndex = index + 1;
_reader->loadForDownloader(index);
AssertIsDebug();
//_downloader->requestedAmountIncrement(
// requestData.dcId,
// requestData.dcIndex,
// kPartSize);
++_queue->queriesCount;
return true;
}
void StreamedFileDownloader::savePart(const LoadedPart &part) {
Expects(part.offset >= 0 && part.offset < _reader->size());
Expects(part.offset % kPartSize == 0);
if (_finished || _cancelled) {
return;
}
const auto index = part.offset / kPartSize;
Assert(index >= 0 && index < _partIsSaved.size());
if (_partIsSaved[index]) {
return;
}
_partIsSaved[index] = true;
if (index < _nextPartIndex) {
AssertIsDebug();
//_downloader->requestedAmountIncrement(
// requestData.dcId,
// requestData.dcIndex,
// -kPartSize);
--_queue->queriesCount;
}
if (!writeResultPart(part.offset, bytes::make_span(part.bytes))) {
return;
}
if (ranges::find(_partIsSaved, false) == end(_partIsSaved)) {
if (!finalizeResult()) {
return;
}
}
notifyAboutProgress();
} }
} // namespace Storage } // namespace Storage

View File

@ -13,6 +13,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
namespace Media { namespace Media {
namespace Streaming { namespace Streaming {
class Reader; class Reader;
struct LoadedPart;
} // namespace Streaming } // namespace Streaming
} // namespace Media } // namespace Media
@ -22,6 +23,7 @@ class StreamedFileDownloader final : public FileLoader {
public: public:
StreamedFileDownloader( StreamedFileDownloader(
uint64 objectId, uint64 objectId,
MTP::DcId dcId,
Data::FileOrigin origin, Data::FileOrigin origin,
std::optional<Cache::Key> cacheKey, std::optional<Cache::Key> cacheKey,
std::shared_ptr<Media::Streaming::Reader> reader, std::shared_ptr<Media::Streaming::Reader> reader,
@ -38,15 +40,16 @@ public:
uint64 objId() const override; uint64 objId() const override;
Data::FileOrigin fileOrigin() const override; Data::FileOrigin fileOrigin() const override;
int currentOffset() const override;
void stop() override; void stop() override;
private: private:
std::optional<Storage::Cache::Key> cacheKey() const override; std::optional<Storage::Cache::Key> cacheKey() const override;
std::optional<MediaKey> fileLocationKey() const override;
void cancelRequests() override; void cancelRequests() override;
bool loadPart() override; bool loadPart() override;
private: void savePart(const Media::Streaming::LoadedPart &part);
uint64 _objectId = 0; uint64 _objectId = 0;
Data::FileOrigin _origin; Data::FileOrigin _origin;
std::optional<Cache::Key> _cacheKey; std::optional<Cache::Key> _cacheKey;
@ -55,6 +58,8 @@ private:
std::vector<bool> _partIsSaved; // vector<bool> :D std::vector<bool> _partIsSaved; // vector<bool> :D
int _nextPartIndex = 0; int _nextPartIndex = 0;
rpl::lifetime _lifetime;
}; };
} // namespace Storage } // namespace Storage