Working code for streaming downloader.

This commit is contained in:
John Preston 2019-04-12 11:34:39 +04:00
parent 27018d94ee
commit e1114530ab
8 changed files with 132 additions and 80 deletions

View File

@ -871,9 +871,8 @@ void DocumentData::save(
id, id,
_dc, _dc,
origin, origin,
(saveToCache() Data::DocumentCacheKey(_dc, id),
? std::make_optional(Data::DocumentCacheKey(_dc, id)) mediaKey(),
: std::nullopt),
std::move(reader), std::move(reader),
toFile, toFile,
size, size,

View File

@ -55,7 +55,7 @@ int File::Context::read(bytes::span buffer) {
_semaphore.acquire(); _semaphore.acquire();
if (_interrupted) { if (_interrupted) {
return -1; return -1;
} else if (const auto error = _reader->failed()) { } else if (const auto error = _reader->streamingError()) {
fail(*error); fail(*error);
return -1; return -1;
} }

View File

@ -596,25 +596,9 @@ std::optional<int> Reader::Slices::readCacheRequiredFor(int offset) {
} }
const auto index = offset / kInSlice; const auto index = offset / kInSlice;
auto &slice = _data[index]; auto &slice = _data[index];
if (slice.flags & (Flag::LoadedFromCache | Flag::LoadingFromCache)) { return (slice.flags & Flag::LoadedFromCache)
return std::nullopt; ? std::nullopt
} : std::make_optional(index + 1);
slice.flags |= Flag::LoadingFromCache;
return (index + 1);
}
bool Reader::Slices::waitForCacheRequiredFor(int offset) const {
Expects(offset < _size);
using Flag = Slice::Flag;
if (_header.flags & Flag::LoadingFromCache) {
return true;
} else if (isFullInHeader()) {
return false;
}
const auto index = offset / kInSlice;
const auto &slice = _data[index];
return (slice.flags & Flag::LoadingFromCache);
} }
void Reader::Slices::markSliceUsed(int sliceIndex) { void Reader::Slices::markSliceUsed(int sliceIndex) {
@ -841,6 +825,12 @@ void Reader::loadForDownloader(int offset) {
} }
} }
void Reader::doneForDownloader(int offset) {
if (_downloaderOffsetsRequested.remove(offset) && !_streamingActive) {
processDownloaderRequests();
}
}
void Reader::cancelForDownloader() { void Reader::cancelForDownloader() {
if (_downloaderAttached.load(std::memory_order_acquire)) { if (_downloaderAttached.load(std::memory_order_acquire)) {
_downloaderOffsetRequests.take(); _downloaderOffsetRequests.take();
@ -878,19 +868,33 @@ void Reader::checkForDownloaderChange(int checkItemsCount) {
_offsetsForDownloader.erase( _offsetsForDownloader.erase(
begin(_offsetsForDownloader), begin(_offsetsForDownloader),
changed + 1); changed + 1);
_downloaderSliceNumber = 0;
_downloaderSliceCache = std::nullopt;
} }
} }
void Reader::checkForDownloaderReadyOffsets() { void Reader::checkForDownloaderReadyOffsets() {
// If a requested part is available right now we simply fire it on the // If a requested part is available right now we simply fire it on the
// main thread, until the first not-available-right-now offset is found. // main thread, until the first not-available-right-now offset is found.
const auto ready = [&](int offset, QByteArray &&bytes) {
crl::on_main(this, [=, bytes = std::move(bytes)]() mutable {
_partsForDownloader.fire({ offset, std::move(bytes) });
});
return true;
};
const auto unavailable = [&](int offset) { const auto unavailable = [&](int offset) {
auto bytes = _slices.partForDownloader(offset); auto bytes = _slices.partForDownloader(offset);
if (!bytes.isEmpty()) { if (!bytes.isEmpty()) {
crl::on_main(this, [=, bytes = std::move(bytes)]() mutable { return !ready(offset, std::move(bytes));
_partsForDownloader.fire({ offset, std::move(bytes) }); }
}); const auto sliceIndex = (offset / kInSlice);
return false; if ((sliceIndex + 1 == _downloaderSliceNumber)
&& _downloaderSliceCache) {
const auto i = _downloaderSliceCache->find(
offset - sliceIndex * kInSlice);
if (i != _downloaderSliceCache->end()) {
return !ready(offset, std::move(i->second));
}
} }
return true; return true;
}; };
@ -900,7 +904,7 @@ void Reader::checkForDownloaderReadyOffsets() {
} }
void Reader::processDownloaderRequests() { void Reader::processDownloaderRequests() {
checkForSomethingMoreReceived(); processCacheResults();
enqueueDownloaderOffsets(); enqueueDownloaderOffsets();
checkForDownloaderReadyOffsets(); checkForDownloaderReadyOffsets();
if (empty(_offsetsForDownloader)) { if (empty(_offsetsForDownloader)) {
@ -908,17 +912,31 @@ void Reader::processDownloaderRequests() {
} }
const auto offset = _offsetsForDownloader.front(); const auto offset = _offsetsForDownloader.front();
if (_cacheHelper) { if (_cacheHelper && downloaderWaitForCachedSlice(offset)) {
if (const auto sliceNumber = _slices.readCacheRequiredFor(offset)) { return;
readFromCache(*sliceNumber);
return;
} else if (_slices.waitForCacheRequiredFor(offset)) {
return;
}
} }
_offsetsForDownloader.pop_front(); _offsetsForDownloader.pop_front();
loadAtOffset(offset); if (_downloaderOffsetsRequested.emplace(offset).second) {
_loader->load(offset);
}
}
bool Reader::downloaderWaitForCachedSlice(int offset) {
const auto sliceNumber = _slices.readCacheRequiredFor(offset);
if (sliceNumber.value_or(0) != _downloaderSliceNumber) {
_downloaderSliceNumber = sliceNumber.value_or(0);
_downloaderSliceCache = std::nullopt;
if (_downloaderSliceNumber) {
if (readFromCacheForDownloader()) {
return true;
}
_downloaderSliceCache = PartsMap();
}
} else if (_downloaderSliceNumber && !_downloaderSliceCache) {
return true;
}
return false;
} }
void Reader::checkCacheResultsForDownloader() { void Reader::checkCacheResultsForDownloader() {
@ -977,6 +995,17 @@ void Reader::readFromCache(int sliceNumber) {
}); });
} }
bool Reader::readFromCacheForDownloader() {
Expects(_cacheHelper != nullptr);
Expects(_downloaderSliceNumber > 0);
if (_slices.headerModeUnknown()) {
return false;
}
readFromCache(_downloaderSliceNumber);
return true;
}
void Reader::putToCache(SerializedSlice &&slice) { void Reader::putToCache(SerializedSlice &&slice) {
Expects(_cacheHelper != nullptr); Expects(_cacheHelper != nullptr);
Expects(slice.number >= 0); Expects(slice.number >= 0);
@ -990,8 +1019,8 @@ int Reader::size() const {
return _loader->size(); return _loader->size();
} }
std::optional<Error> Reader::failed() const { std::optional<Error> Reader::streamingError() const {
return _failed; return _streamingError;
} }
void Reader::headerDone() { void Reader::headerDone() {
@ -1027,7 +1056,7 @@ bool Reader::fill(
}; };
checkForSomethingMoreReceived(); checkForSomethingMoreReceived();
if (_failed) { if (_streamingError) {
return failed(); return failed();
} }
@ -1039,7 +1068,7 @@ bool Reader::fill(
startWaiting(); startWaiting();
} while (checkForSomethingMoreReceived()); } while (checkForSomethingMoreReceived());
return _failed ? failed() : false; return _streamingError ? failed() : false;
} }
bool Reader::fillFromSlices(int offset, bytes::span buffer) { bool Reader::fillFromSlices(int offset, bytes::span buffer) {
@ -1047,7 +1076,7 @@ bool Reader::fillFromSlices(int offset, bytes::span buffer) {
auto result = _slices.fill(offset, buffer); auto result = _slices.fill(offset, buffer);
if (!result.filled && _slices.headerWontBeFilled()) { if (!result.filled && _slices.headerWontBeFilled()) {
_failed = Error::NotStreamable; _streamingError = Error::NotStreamable;
return false; return false;
} }
@ -1079,7 +1108,9 @@ void Reader::cancelLoadInRange(int from, int till) {
Expects(from < till); Expects(from < till);
for (const auto offset : _loadingOffsets.takeInRange(from, till)) { for (const auto offset : _loadingOffsets.takeInRange(from, till)) {
_loader->cancel(offset); if (!_downloaderOffsetsRequested.contains(offset)) {
_loader->cancel(offset);
}
} }
} }
@ -1093,14 +1124,22 @@ void Reader::checkLoadWillBeFirst(int offset) {
bool Reader::processCacheResults() { bool Reader::processCacheResults() {
if (!_cacheHelper) { if (!_cacheHelper) {
return false; return false;
} else if (_failed) {
return false;
} }
QMutexLocker lock(&_cacheHelper->mutex); QMutexLocker lock(&_cacheHelper->mutex);
auto loaded = base::take(_cacheHelper->results); auto loaded = base::take(_cacheHelper->results);
lock.unlock(); lock.unlock();
if (_downloaderSliceNumber) {
const auto i = loaded.find(_downloaderSliceNumber);
if (i != end(loaded)) {
_downloaderSliceCache = i->second;
}
}
if (_streamingError) {
return false;
}
for (auto &[sliceNumber, result] : loaded) { for (auto &[sliceNumber, result] : loaded) {
_slices.processCacheResult(sliceNumber, std::move(result)); _slices.processCacheResult(sliceNumber, std::move(result));
} }
@ -1114,14 +1153,14 @@ bool Reader::processCacheResults() {
} }
bool Reader::processLoadedParts() { bool Reader::processLoadedParts() {
if (_failed) { if (_streamingError) {
return false; return false;
} }
auto loaded = _loadedParts.take(); auto loaded = _loadedParts.take();
for (auto &part : loaded) { for (auto &part : loaded) {
if (!part.valid(size())) { if (!part.valid(size())) {
_failed = Error::LoadFailed; _streamingError = Error::LoadFailed;
return false; return false;
} else if (!_loadingOffsets.remove(part.offset)) { } else if (!_loadingOffsets.remove(part.offset)) {
continue; continue;

View File

@ -43,7 +43,7 @@ public:
int offset, int offset,
bytes::span buffer, bytes::span buffer,
not_null<crl::semaphore*> notify); not_null<crl::semaphore*> notify);
[[nodiscard]] std::optional<Error> failed() const; [[nodiscard]] std::optional<Error> streamingError() const;
void headerDone(); void headerDone();
// Thread safe. // Thread safe.
@ -56,6 +56,7 @@ public:
void stopStreaming(bool stillActive = false); void stopStreaming(bool stillActive = false);
[[nodiscard]] rpl::producer<LoadedPart> partsForDownloader() const; [[nodiscard]] rpl::producer<LoadedPart> partsForDownloader() const;
void loadForDownloader(int offset); void loadForDownloader(int offset);
void doneForDownloader(int offset);
void cancelForDownloader(); void cancelForDownloader();
~Reader(); ~Reader();
@ -139,7 +140,6 @@ private:
[[nodiscard]] QByteArray partForDownloader(int offset) const; [[nodiscard]] QByteArray partForDownloader(int offset) const;
[[nodiscard]] std::optional<int> readCacheRequiredFor(int offset); [[nodiscard]] std::optional<int> readCacheRequiredFor(int offset);
[[nodiscard]] bool waitForCacheRequiredFor(int offset) const;
private: private:
enum class HeaderMode { enum class HeaderMode {
@ -173,7 +173,9 @@ private:
}; };
// 0 is for headerData, slice index = sliceNumber - 1. // 0 is for headerData, slice index = sliceNumber - 1.
// returns false if asked for a known-empty downloader slice cache.
void readFromCache(int sliceNumber); void readFromCache(int sliceNumber);
[[nodiscard]] bool readFromCacheForDownloader();
bool processCacheResults(); bool processCacheResults();
void putToCache(SerializedSlice &&data); void putToCache(SerializedSlice &&data);
@ -190,6 +192,7 @@ private:
void processDownloaderRequests(); void processDownloaderRequests();
void checkCacheResultsForDownloader(); void checkCacheResultsForDownloader();
[[nodiscard]] bool downloaderWaitForCachedSlice(int offset);
void enqueueDownloaderOffsets(); void enqueueDownloaderOffsets();
void checkForDownloaderChange(int checkItemsCount); void checkForDownloaderChange(int checkItemsCount);
void checkForDownloaderReadyOffsets(); void checkForDownloaderReadyOffsets();
@ -207,11 +210,16 @@ private:
PriorityQueue _loadingOffsets; PriorityQueue _loadingOffsets;
Slices _slices; Slices _slices;
std::optional<Error> _failed;
// Even if streaming had failed, the Reader can work for the downloader.
std::optional<Error> _streamingError;
std::atomic<bool> _downloaderAttached = false; std::atomic<bool> _downloaderAttached = false;
base::thread_safe_queue<int> _downloaderOffsetRequests; base::thread_safe_queue<int> _downloaderOffsetRequests;
std::deque<int> _offsetsForDownloader; std::deque<int> _offsetsForDownloader;
base::flat_set<int> _downloaderOffsetsRequested;
int _downloaderSliceNumber = 0; // > 0 means we want it from cache.
std::optional<PartsMap> _downloaderSliceCache;
// Main thread. // Main thread.
rpl::event_stream<LoadedPart> _partsForDownloader; rpl::event_stream<LoadedPart> _partsForDownloader;

View File

@ -473,8 +473,8 @@ bool FileLoader::tryLoadLocal() {
} }
const auto weak = make_weak(this); const auto weak = make_weak(this);
if (const auto key = cacheKey()) { if (_toCache == LoadToCacheAsWell) {
loadLocal(*key); loadLocal(cacheKey());
emit progress(this); emit progress(this);
} }
if (!weak) { if (!weak) {
@ -594,17 +594,18 @@ bool FileLoader::finalizeResult() {
if (_localStatus == LocalStatus::NotFound) { if (_localStatus == LocalStatus::NotFound) {
if (const auto key = fileLocationKey()) { if (const auto key = fileLocationKey()) {
Local::writeFileLocation(*key, FileLocation(_filename)); if (!_filename.isEmpty()) {
} Local::writeFileLocation(*key, FileLocation(_filename));
if (const auto key = cacheKey()) {
if (_data.size() <= Storage::kMaxFileInMemory) {
Auth().data().cache().put(
*key,
Storage::Cache::Database::TaggedValue(
base::duplicate(_data),
_cacheTag));
} }
} }
if ((_toCache == LoadToCacheAsWell)
&& (_data.size() <= Storage::kMaxFileInMemory)) {
Auth().data().cache().put(
cacheKey(),
Storage::Cache::Database::TaggedValue(
base::duplicate(_data),
_cacheTag));
}
} }
_downloader->taskFinished().notify(); _downloader->taskFinished().notify();
return true; return true;
@ -1161,20 +1162,18 @@ void mtpFileLoader::changeCDNParams(
makeRequest(offset); makeRequest(offset);
} }
std::optional<Storage::Cache::Key> mtpFileLoader::cacheKey() const { Storage::Cache::Key mtpFileLoader::cacheKey() const {
return _location.match([&](const WebFileLocation &location) { return _location.match([&](const WebFileLocation &location) {
return std::make_optional(Data::WebDocumentCacheKey(location)); return Data::WebDocumentCacheKey(location);
}, [&](const GeoPointLocation &location) { }, [&](const GeoPointLocation &location) {
return std::make_optional(Data::GeoPointCacheKey(location)); return Data::GeoPointCacheKey(location);
}, [&](const StorageFileLocation &location) { }, [&](const StorageFileLocation &location) {
return (_toCache == LoadToCacheAsWell) return location.cacheKey();
? std::make_optional(location.cacheKey())
: std::nullopt;
}); });
} }
std::optional<MediaKey> mtpFileLoader::fileLocationKey() const { std::optional<MediaKey> mtpFileLoader::fileLocationKey() const {
if (_locationType != UnknownFileLocation && !_filename.isEmpty()) { if (_locationType != UnknownFileLocation) {
return mediaKey(_locationType, dcId(), objId()); return mediaKey(_locationType, dcId(), objId());
} }
return std::nullopt; return std::nullopt;
@ -1246,7 +1245,7 @@ void webFileLoader::loadError() {
cancel(true); cancel(true);
} }
std::optional<Storage::Cache::Key> webFileLoader::cacheKey() const { Storage::Cache::Key webFileLoader::cacheKey() const {
return Data::UrlCacheKey(_url); return Data::UrlCacheKey(_url);
} }

View File

@ -172,7 +172,7 @@ protected:
bool tryLoadLocal(); bool tryLoadLocal();
void loadLocal(const Storage::Cache::Key &key); void loadLocal(const Storage::Cache::Key &key);
virtual std::optional<Storage::Cache::Key> cacheKey() const = 0; virtual Storage::Cache::Key cacheKey() const = 0;
virtual std::optional<MediaKey> fileLocationKey() const = 0; virtual std::optional<MediaKey> fileLocationKey() const = 0;
virtual void cancelRequests() = 0; virtual void cancelRequests() = 0;
@ -273,7 +273,7 @@ private:
int limit = 0; int limit = 0;
QByteArray hash; QByteArray hash;
}; };
std::optional<Storage::Cache::Key> cacheKey() const override; Storage::Cache::Key cacheKey() const override;
std::optional<MediaKey> fileLocationKey() const override; std::optional<MediaKey> fileLocationKey() const override;
void cancelRequests() override; void cancelRequests() override;
@ -358,7 +358,7 @@ public:
protected: protected:
void cancelRequests() override; void cancelRequests() override;
std::optional<Storage::Cache::Key> cacheKey() const override; Storage::Cache::Key cacheKey() const override;
std::optional<MediaKey> fileLocationKey() const override; std::optional<MediaKey> fileLocationKey() const override;
bool loadPart() override; bool loadPart() override;

View File

@ -23,7 +23,8 @@ StreamedFileDownloader::StreamedFileDownloader(
uint64 objectId, uint64 objectId,
MTP::DcId dcId, MTP::DcId dcId,
Data::FileOrigin origin, Data::FileOrigin origin,
std::optional<Cache::Key> cacheKey, Cache::Key cacheKey,
MediaKey fileLocationKey,
std::shared_ptr<Reader> reader, std::shared_ptr<Reader> reader,
// For FileLoader // For FileLoader
@ -45,6 +46,7 @@ StreamedFileDownloader::StreamedFileDownloader(
, _objectId(objectId) , _objectId(objectId)
, _origin(origin) , _origin(origin)
, _cacheKey(cacheKey) , _cacheKey(cacheKey)
, _fileLocationKey(fileLocationKey)
, _reader(std::move(reader)) { , _reader(std::move(reader)) {
_partIsSaved.resize((size + kPartSize - 1) / kPartSize, false); _partIsSaved.resize((size + kPartSize - 1) / kPartSize, false);
@ -76,12 +78,12 @@ void StreamedFileDownloader::stop() {
cancelRequests(); cancelRequests();
} }
std::optional<Storage::Cache::Key> StreamedFileDownloader::cacheKey() const { Storage::Cache::Key StreamedFileDownloader::cacheKey() const {
return _cacheKey; return _cacheKey;
} }
std::optional<MediaKey> StreamedFileDownloader::fileLocationKey() const { std::optional<MediaKey> StreamedFileDownloader::fileLocationKey() const {
return std::nullopt; AssertIsDebug(); return _fileLocationKey;
} }
void StreamedFileDownloader::cancelRequests() { void StreamedFileDownloader::cancelRequests() {
@ -109,7 +111,7 @@ bool StreamedFileDownloader::loadPart() {
return false; return false;
} }
_nextPartIndex = index + 1; _nextPartIndex = index + 1;
_reader->loadForDownloader(index); _reader->loadForDownloader(index * kPartSize);
AssertIsDebug(); AssertIsDebug();
//_downloader->requestedAmountIncrement( //_downloader->requestedAmountIncrement(
// requestData.dcId, // requestData.dcId,
@ -123,11 +125,13 @@ bool StreamedFileDownloader::loadPart() {
void StreamedFileDownloader::savePart(const LoadedPart &part) { void StreamedFileDownloader::savePart(const LoadedPart &part) {
Expects(part.offset >= 0 && part.offset < _reader->size()); Expects(part.offset >= 0 && part.offset < _reader->size());
Expects(part.offset % kPartSize == 0); Expects(part.offset % kPartSize == 0);
if (_finished || _cancelled) { if (_finished || _cancelled) {
return; return;
} }
const auto index = part.offset / kPartSize; const auto offset = part.offset;
const auto index = offset / kPartSize;
Assert(index >= 0 && index < _partIsSaved.size()); Assert(index >= 0 && index < _partIsSaved.size());
if (_partIsSaved[index]) { if (_partIsSaved[index]) {
return; return;
@ -142,7 +146,7 @@ void StreamedFileDownloader::savePart(const LoadedPart &part) {
// -kPartSize); // -kPartSize);
--_queue->queriesCount; --_queue->queriesCount;
} }
if (!writeResultPart(part.offset, bytes::make_span(part.bytes))) { if (!writeResultPart(offset, bytes::make_span(part.bytes))) {
return; return;
} }
if (ranges::find(_partIsSaved, false) == end(_partIsSaved)) { if (ranges::find(_partIsSaved, false) == end(_partIsSaved)) {
@ -150,6 +154,7 @@ void StreamedFileDownloader::savePart(const LoadedPart &part) {
return; return;
} }
} }
_reader->doneForDownloader(offset);
notifyAboutProgress(); notifyAboutProgress();
} }

View File

@ -25,7 +25,8 @@ public:
uint64 objectId, uint64 objectId,
MTP::DcId dcId, MTP::DcId dcId,
Data::FileOrigin origin, Data::FileOrigin origin,
std::optional<Cache::Key> cacheKey, Cache::Key cacheKey,
MediaKey fileLocationKey,
std::shared_ptr<Media::Streaming::Reader> reader, std::shared_ptr<Media::Streaming::Reader> reader,
// For FileLoader // For FileLoader
@ -43,7 +44,7 @@ public:
void stop() override; void stop() override;
private: private:
std::optional<Storage::Cache::Key> cacheKey() const override; Cache::Key cacheKey() const override;
std::optional<MediaKey> fileLocationKey() const override; std::optional<MediaKey> fileLocationKey() const override;
void cancelRequests() override; void cancelRequests() override;
bool loadPart() override; bool loadPart() override;
@ -52,7 +53,8 @@ private:
uint64 _objectId = 0; uint64 _objectId = 0;
Data::FileOrigin _origin; Data::FileOrigin _origin;
std::optional<Cache::Key> _cacheKey; Cache::Key _cacheKey;
MediaKey _fileLocationKey;
std::shared_ptr<Media::Streaming::Reader> _reader; std::shared_ptr<Media::Streaming::Reader> _reader;
std::vector<bool> _partIsSaved; // vector<bool> :D std::vector<bool> _partIsSaved; // vector<bool> :D