From e1114530abf265146c3754e466bc16e84b40d35e Mon Sep 17 00:00:00 2001
From: John Preston <johnprestonmail@gmail.com>
Date: Fri, 12 Apr 2019 11:34:39 +0400
Subject: [PATCH] Working code for streaming downloader.

---
 Telegram/SourceFiles/data/data_document.cpp   |   5 +-
 .../media/streaming/media_streaming_file.cpp  |   2 +-
 .../streaming/media_streaming_reader.cpp      | 123 ++++++++++++------
 .../media/streaming/media_streaming_reader.h  |  14 +-
 .../SourceFiles/storage/file_download.cpp     |  37 +++---
 Telegram/SourceFiles/storage/file_download.h  |   6 +-
 .../storage/streamed_file_downloader.cpp      |  17 ++-
 .../storage/streamed_file_downloader.h        |   8 +-
 8 files changed, 132 insertions(+), 80 deletions(-)

diff --git a/Telegram/SourceFiles/data/data_document.cpp b/Telegram/SourceFiles/data/data_document.cpp
index 6416dd16d..f130f2c62 100644
--- a/Telegram/SourceFiles/data/data_document.cpp
+++ b/Telegram/SourceFiles/data/data_document.cpp
@@ -871,9 +871,8 @@ void DocumentData::save(
 				id,
 				_dc,
 				origin,
-				(saveToCache()
-					? std::make_optional(Data::DocumentCacheKey(_dc, id))
-					: std::nullopt),
+				Data::DocumentCacheKey(_dc, id),
+				mediaKey(),
 				std::move(reader),
 				toFile,
 				size,
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
index f0a96700a..5b53285a5 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
@@ -55,7 +55,7 @@ int File::Context::read(bytes::span buffer) {
 		_semaphore.acquire();
 		if (_interrupted) {
 			return -1;
-		} else if (const auto error = _reader->failed()) {
+		} else if (const auto error = _reader->streamingError()) {
 			fail(*error);
 			return -1;
 		}
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp
index 5e39cddb5..7e2099a24 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp
@@ -596,25 +596,9 @@ std::optional<int> Reader::Slices::readCacheRequiredFor(int offset) {
 	}
 	const auto index = offset / kInSlice;
 	auto &slice = _data[index];
-	if (slice.flags & (Flag::LoadedFromCache | Flag::LoadingFromCache)) {
-		return std::nullopt;
-	}
-	slice.flags |= Flag::LoadingFromCache;
-	return (index + 1);
-}
-
-bool Reader::Slices::waitForCacheRequiredFor(int offset) const {
-	Expects(offset < _size);
-
-	using Flag = Slice::Flag;
-	if (_header.flags & Flag::LoadingFromCache) {
-		return true;
-	} else if (isFullInHeader()) {
-		return false;
-	}
-	const auto index = offset / kInSlice;
-	const auto &slice = _data[index];
-	return (slice.flags & Flag::LoadingFromCache);
+	return (slice.flags & Flag::LoadedFromCache)
+		? std::nullopt
+		: std::make_optional(index + 1);
 }
 
 void Reader::Slices::markSliceUsed(int sliceIndex) {
@@ -841,6 +825,12 @@ void Reader::loadForDownloader(int offset) {
 	}
 }
 
+void Reader::doneForDownloader(int offset) {
+	if (_downloaderOffsetsRequested.remove(offset) && !_streamingActive) {
+		processDownloaderRequests();
+	}
+}
+
 void Reader::cancelForDownloader() {
 	if (_downloaderAttached.load(std::memory_order_acquire)) {
 		_downloaderOffsetRequests.take();
@@ -878,19 +868,33 @@ void Reader::checkForDownloaderChange(int checkItemsCount) {
 		_offsetsForDownloader.erase(
 			begin(_offsetsForDownloader),
 			changed + 1);
+		_downloaderSliceNumber = 0;
+		_downloaderSliceCache = std::nullopt;
 	}
 }
 
 void Reader::checkForDownloaderReadyOffsets() {
 	// If a requested part is available right now we simply fire it on the
 	// main thread, until the first not-available-right-now offset is found.
+	const auto ready = [&](int offset, QByteArray &&bytes) {
+		crl::on_main(this, [=, bytes = std::move(bytes)]() mutable {
+			_partsForDownloader.fire({ offset, std::move(bytes) });
+		});
+		return true;
+	};
 	const auto unavailable = [&](int offset) {
 		auto bytes = _slices.partForDownloader(offset);
 		if (!bytes.isEmpty()) {
-			crl::on_main(this, [=, bytes = std::move(bytes)]() mutable {
-				_partsForDownloader.fire({ offset, std::move(bytes) });
-			});
-			return false;
+			return !ready(offset, std::move(bytes));
+		}
+		const auto sliceIndex = (offset / kInSlice);
+		if ((sliceIndex + 1 == _downloaderSliceNumber)
+			&& _downloaderSliceCache) {
+			const auto i = _downloaderSliceCache->find(
+				offset - sliceIndex * kInSlice);
+			if (i != _downloaderSliceCache->end()) {
+				return !ready(offset, std::move(i->second));
+			}
 		}
 		return true;
 	};
@@ -900,7 +904,7 @@ void Reader::checkForDownloaderReadyOffsets() {
 }
 
 void Reader::processDownloaderRequests() {
-	checkForSomethingMoreReceived();
+	processCacheResults();
 	enqueueDownloaderOffsets();
 	checkForDownloaderReadyOffsets();
 	if (empty(_offsetsForDownloader)) {
@@ -908,17 +912,31 @@ void Reader::processDownloaderRequests() {
 	}
 
 	const auto offset = _offsetsForDownloader.front();
-	if (_cacheHelper) {
-		if (const auto sliceNumber = _slices.readCacheRequiredFor(offset)) {
-			readFromCache(*sliceNumber);
-			return;
-		} else if (_slices.waitForCacheRequiredFor(offset)) {
-			return;
-		}
+	if (_cacheHelper && downloaderWaitForCachedSlice(offset)) {
+		return;
 	}
 
 	_offsetsForDownloader.pop_front();
-	loadAtOffset(offset);
+	if (_downloaderOffsetsRequested.emplace(offset).second) {
+		_loader->load(offset);
+	}
+}
+
+bool Reader::downloaderWaitForCachedSlice(int offset) {
+	const auto sliceNumber = _slices.readCacheRequiredFor(offset);
+	if (sliceNumber.value_or(0) != _downloaderSliceNumber) {
+		_downloaderSliceNumber = sliceNumber.value_or(0);
+		_downloaderSliceCache = std::nullopt;
+		if (_downloaderSliceNumber) {
+			if (readFromCacheForDownloader()) {
+				return true;
+			}
+			_downloaderSliceCache = PartsMap();
+		}
+	} else if (_downloaderSliceNumber && !_downloaderSliceCache) {
+		return true;
+	}
+	return false;
 }
 
 void Reader::checkCacheResultsForDownloader() {
@@ -977,6 +995,17 @@ void Reader::readFromCache(int sliceNumber) {
 	});
 }
 
+bool Reader::readFromCacheForDownloader() {
+	Expects(_cacheHelper != nullptr);
+	Expects(_downloaderSliceNumber > 0);
+
+	if (_slices.headerModeUnknown()) {
+		return false;
+	}
+	readFromCache(_downloaderSliceNumber);
+	return true;
+}
+
 void Reader::putToCache(SerializedSlice &&slice) {
 	Expects(_cacheHelper != nullptr);
 	Expects(slice.number >= 0);
@@ -990,8 +1019,8 @@ int Reader::size() const {
 	return _loader->size();
 }
 
-std::optional<Error> Reader::failed() const {
-	return _failed;
+std::optional<Error> Reader::streamingError() const {
+	return _streamingError;
 }
 
 void Reader::headerDone() {
@@ -1027,7 +1056,7 @@ bool Reader::fill(
 	};
 
 	checkForSomethingMoreReceived();
-	if (_failed) {
+	if (_streamingError) {
 		return failed();
 	}
 
@@ -1039,7 +1068,7 @@ bool Reader::fill(
 		startWaiting();
 	} while (checkForSomethingMoreReceived());
 
-	return _failed ? failed() : false;
+	return _streamingError ? failed() : false;
 }
 
 bool Reader::fillFromSlices(int offset, bytes::span buffer) {
@@ -1047,7 +1076,7 @@ bool Reader::fillFromSlices(int offset, bytes::span buffer) {
 
 	auto result = _slices.fill(offset, buffer);
 	if (!result.filled && _slices.headerWontBeFilled()) {
-		_failed = Error::NotStreamable;
+		_streamingError = Error::NotStreamable;
 		return false;
 	}
 
@@ -1079,7 +1108,9 @@ void Reader::cancelLoadInRange(int from, int till) {
 	Expects(from < till);
 
 	for (const auto offset : _loadingOffsets.takeInRange(from, till)) {
-		_loader->cancel(offset);
+		if (!_downloaderOffsetsRequested.contains(offset)) {
+			_loader->cancel(offset);
+		}
 	}
 }
 
@@ -1093,14 +1124,22 @@ void Reader::checkLoadWillBeFirst(int offset) {
 bool Reader::processCacheResults() {
 	if (!_cacheHelper) {
 		return false;
-	} else if (_failed) {
-		return false;
 	}
 
 	QMutexLocker lock(&_cacheHelper->mutex);
 	auto loaded = base::take(_cacheHelper->results);
 	lock.unlock();
 
+	if (_downloaderSliceNumber) {
+		const auto i = loaded.find(_downloaderSliceNumber);
+		if (i != end(loaded)) {
+			_downloaderSliceCache = i->second;
+		}
+	}
+
+	if (_streamingError) {
+		return false;
+	}
 	for (auto &[sliceNumber, result] : loaded) {
 		_slices.processCacheResult(sliceNumber, std::move(result));
 	}
@@ -1114,14 +1153,14 @@ bool Reader::processCacheResults() {
 }
 
 bool Reader::processLoadedParts() {
-	if (_failed) {
+	if (_streamingError) {
 		return false;
 	}
 
 	auto loaded = _loadedParts.take();
 	for (auto &part : loaded) {
 		if (!part.valid(size())) {
-			_failed = Error::LoadFailed;
+			_streamingError = Error::LoadFailed;
 			return false;
 		} else if (!_loadingOffsets.remove(part.offset)) {
 			continue;
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h
index 506b604de..a2a5e9626 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h
@@ -43,7 +43,7 @@ public:
 		int offset,
 		bytes::span buffer,
 		not_null<crl::semaphore*> notify);
-	[[nodiscard]] std::optional<Error> failed() const;
+	[[nodiscard]] std::optional<Error> streamingError() const;
 	void headerDone();
 
 	// Thread safe.
@@ -56,6 +56,7 @@ public:
 	void stopStreaming(bool stillActive = false);
 	[[nodiscard]] rpl::producer<LoadedPart> partsForDownloader() const;
 	void loadForDownloader(int offset);
+	void doneForDownloader(int offset);
 	void cancelForDownloader();
 
 	~Reader();
@@ -139,7 +140,6 @@ private:
 
 		[[nodiscard]] QByteArray partForDownloader(int offset) const;
 		[[nodiscard]] std::optional<int> readCacheRequiredFor(int offset);
-		[[nodiscard]] bool waitForCacheRequiredFor(int offset) const;
 
 	private:
 		enum class HeaderMode {
@@ -173,7 +173,9 @@ private:
 	};
 
 	// 0 is for headerData, slice index = sliceNumber - 1.
+	// returns false if asked for a known-empty downloader slice cache.
 	void readFromCache(int sliceNumber);
+	[[nodiscard]] bool readFromCacheForDownloader();
 	bool processCacheResults();
 	void putToCache(SerializedSlice &&data);
 
@@ -190,6 +192,7 @@ private:
 
 	void processDownloaderRequests();
 	void checkCacheResultsForDownloader();
+	[[nodiscard]] bool downloaderWaitForCachedSlice(int offset);
 	void enqueueDownloaderOffsets();
 	void checkForDownloaderChange(int checkItemsCount);
 	void checkForDownloaderReadyOffsets();
@@ -207,11 +210,16 @@ private:
 	PriorityQueue _loadingOffsets;
 
 	Slices _slices;
-	std::optional<Error> _failed;
+
+	// Even if streaming had failed, the Reader can work for the downloader.
+	std::optional<Error> _streamingError;
 
 	std::atomic<bool> _downloaderAttached = false;
 	base::thread_safe_queue<int> _downloaderOffsetRequests;
 	std::deque<int> _offsetsForDownloader;
+	base::flat_set<int> _downloaderOffsetsRequested;
+	int _downloaderSliceNumber = 0; // > 0 means we want it from cache.
+	std::optional<PartsMap> _downloaderSliceCache;
 
 	// Main thread.
 	rpl::event_stream<LoadedPart> _partsForDownloader;
diff --git a/Telegram/SourceFiles/storage/file_download.cpp b/Telegram/SourceFiles/storage/file_download.cpp
index 4c9a8c5eb..951b076d6 100644
--- a/Telegram/SourceFiles/storage/file_download.cpp
+++ b/Telegram/SourceFiles/storage/file_download.cpp
@@ -473,8 +473,8 @@ bool FileLoader::tryLoadLocal() {
 	}
 
 	const auto weak = make_weak(this);
-	if (const auto key = cacheKey()) {
-		loadLocal(*key);
+	if (_toCache == LoadToCacheAsWell) {
+		loadLocal(cacheKey());
 		emit progress(this);
 	}
 	if (!weak) {
@@ -594,17 +594,18 @@ bool FileLoader::finalizeResult() {
 
 	if (_localStatus == LocalStatus::NotFound) {
 		if (const auto key = fileLocationKey()) {
-			Local::writeFileLocation(*key, FileLocation(_filename));
-		}
-		if (const auto key = cacheKey()) {
-			if (_data.size() <= Storage::kMaxFileInMemory) {
-				Auth().data().cache().put(
-					*key,
-					Storage::Cache::Database::TaggedValue(
-						base::duplicate(_data),
-						_cacheTag));
+			if (!_filename.isEmpty()) {
+				Local::writeFileLocation(*key, FileLocation(_filename));
 			}
 		}
+		if ((_toCache == LoadToCacheAsWell)
+			&& (_data.size() <= Storage::kMaxFileInMemory)) {
+			Auth().data().cache().put(
+				cacheKey(),
+				Storage::Cache::Database::TaggedValue(
+					base::duplicate(_data),
+					_cacheTag));
+		}
 	}
 	_downloader->taskFinished().notify();
 	return true;
@@ -1161,20 +1162,18 @@ void mtpFileLoader::changeCDNParams(
 	makeRequest(offset);
 }
 
-std::optional<Storage::Cache::Key> mtpFileLoader::cacheKey() const {
+Storage::Cache::Key mtpFileLoader::cacheKey() const {
 	return _location.match([&](const WebFileLocation &location) {
-		return std::make_optional(Data::WebDocumentCacheKey(location));
+		return Data::WebDocumentCacheKey(location);
 	}, [&](const GeoPointLocation &location) {
-		return std::make_optional(Data::GeoPointCacheKey(location));
+		return Data::GeoPointCacheKey(location);
 	}, [&](const StorageFileLocation &location) {
-		return (_toCache == LoadToCacheAsWell)
-			? std::make_optional(location.cacheKey())
-			: std::nullopt;
+		return location.cacheKey();
 	});
 }
 
 std::optional<MediaKey> mtpFileLoader::fileLocationKey() const {
-	if (_locationType != UnknownFileLocation && !_filename.isEmpty()) {
+	if (_locationType != UnknownFileLocation) {
 		return mediaKey(_locationType, dcId(), objId());
 	}
 	return std::nullopt;
@@ -1246,7 +1245,7 @@ void webFileLoader::loadError() {
 	cancel(true);
 }
 
-std::optional<Storage::Cache::Key> webFileLoader::cacheKey() const {
+Storage::Cache::Key webFileLoader::cacheKey() const {
 	return Data::UrlCacheKey(_url);
 }
 
diff --git a/Telegram/SourceFiles/storage/file_download.h b/Telegram/SourceFiles/storage/file_download.h
index 4277b7ae9..9ffbce4a0 100644
--- a/Telegram/SourceFiles/storage/file_download.h
+++ b/Telegram/SourceFiles/storage/file_download.h
@@ -172,7 +172,7 @@ protected:
 
 	bool tryLoadLocal();
 	void loadLocal(const Storage::Cache::Key &key);
-	virtual std::optional<Storage::Cache::Key> cacheKey() const = 0;
+	virtual Storage::Cache::Key cacheKey() const = 0;
 	virtual std::optional<MediaKey> fileLocationKey() const = 0;
 	virtual void cancelRequests() = 0;
 
@@ -273,7 +273,7 @@ private:
 		int limit = 0;
 		QByteArray hash;
 	};
-	std::optional<Storage::Cache::Key> cacheKey() const override;
+	Storage::Cache::Key cacheKey() const override;
 	std::optional<MediaKey> fileLocationKey() const override;
 	void cancelRequests() override;
 
@@ -358,7 +358,7 @@ public:
 
 protected:
 	void cancelRequests() override;
-	std::optional<Storage::Cache::Key> cacheKey() const override;
+	Storage::Cache::Key cacheKey() const override;
 	std::optional<MediaKey> fileLocationKey() const override;
 	bool loadPart() override;
 
diff --git a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp
index f593e1f08..f9783fed9 100644
--- a/Telegram/SourceFiles/storage/streamed_file_downloader.cpp
+++ b/Telegram/SourceFiles/storage/streamed_file_downloader.cpp
@@ -23,7 +23,8 @@ StreamedFileDownloader::StreamedFileDownloader(
 	uint64 objectId,
 	MTP::DcId dcId,
 	Data::FileOrigin origin,
-	std::optional<Cache::Key> cacheKey,
+	Cache::Key cacheKey,
+	MediaKey fileLocationKey,
 	std::shared_ptr<Reader> reader,
 
 	// For FileLoader
@@ -45,6 +46,7 @@ StreamedFileDownloader::StreamedFileDownloader(
 , _objectId(objectId)
 , _origin(origin)
 , _cacheKey(cacheKey)
+, _fileLocationKey(fileLocationKey)
 , _reader(std::move(reader)) {
 	_partIsSaved.resize((size + kPartSize - 1) / kPartSize, false);
 
@@ -76,12 +78,12 @@ void StreamedFileDownloader::stop() {
 	cancelRequests();
 }
 
-std::optional<Storage::Cache::Key> StreamedFileDownloader::cacheKey() const {
+Storage::Cache::Key StreamedFileDownloader::cacheKey() const {
 	return _cacheKey;
 }
 
 std::optional<MediaKey> StreamedFileDownloader::fileLocationKey() const {
-	return std::nullopt; AssertIsDebug();
+	return _fileLocationKey;
 }
 
 void StreamedFileDownloader::cancelRequests() {
@@ -109,7 +111,7 @@ bool StreamedFileDownloader::loadPart() {
 		return false;
 	}
 	_nextPartIndex = index + 1;
-	_reader->loadForDownloader(index);
+	_reader->loadForDownloader(index * kPartSize);
 	AssertIsDebug();
 	//_downloader->requestedAmountIncrement(
 	//	requestData.dcId,
@@ -123,11 +125,13 @@ bool StreamedFileDownloader::loadPart() {
 void StreamedFileDownloader::savePart(const LoadedPart &part) {
 	Expects(part.offset >= 0 && part.offset < _reader->size());
 	Expects(part.offset % kPartSize == 0);
+
 	if (_finished || _cancelled) {
 		return;
 	}
 
-	const auto index = part.offset / kPartSize;
+	const auto offset = part.offset;
+	const auto index = offset / kPartSize;
 	Assert(index >= 0 && index < _partIsSaved.size());
 	if (_partIsSaved[index]) {
 		return;
@@ -142,7 +146,7 @@ void StreamedFileDownloader::savePart(const LoadedPart &part) {
 		//	-kPartSize);
 		--_queue->queriesCount;
 	}
-	if (!writeResultPart(part.offset, bytes::make_span(part.bytes))) {
+	if (!writeResultPart(offset, bytes::make_span(part.bytes))) {
 		return;
 	}
 	if (ranges::find(_partIsSaved, false) == end(_partIsSaved)) {
@@ -150,6 +154,7 @@ void StreamedFileDownloader::savePart(const LoadedPart &part) {
 			return;
 		}
 	}
+	_reader->doneForDownloader(offset);
 	notifyAboutProgress();
 }
 
diff --git a/Telegram/SourceFiles/storage/streamed_file_downloader.h b/Telegram/SourceFiles/storage/streamed_file_downloader.h
index 143154897..dfc9827f0 100644
--- a/Telegram/SourceFiles/storage/streamed_file_downloader.h
+++ b/Telegram/SourceFiles/storage/streamed_file_downloader.h
@@ -25,7 +25,8 @@ public:
 		uint64 objectId,
 		MTP::DcId dcId,
 		Data::FileOrigin origin,
-		std::optional<Cache::Key> cacheKey,
+		Cache::Key cacheKey,
+		MediaKey fileLocationKey,
 		std::shared_ptr<Media::Streaming::Reader> reader,
 
 		// For FileLoader
@@ -43,7 +44,7 @@ public:
 	void stop() override;
 
 private:
-	std::optional<Storage::Cache::Key> cacheKey() const override;
+	Cache::Key cacheKey() const override;
 	std::optional<MediaKey> fileLocationKey() const override;
 	void cancelRequests() override;
 	bool loadPart() override;
@@ -52,7 +53,8 @@ private:
 
 	uint64 _objectId = 0;
 	Data::FileOrigin _origin;
-	std::optional<Cache::Key> _cacheKey;
+	Cache::Key _cacheKey;
+	MediaKey _fileLocationKey;
 	std::shared_ptr<Media::Streaming::Reader> _reader;
 
 	std::vector<bool> _partIsSaved; // vector<bool> :D