From fe15ee742d4a9418b79ba586106f5dafb52be221 Mon Sep 17 00:00:00 2001
From: John Preston <johnprestonmail@gmail.com>
Date: Thu, 11 Apr 2019 14:52:49 +0400
Subject: [PATCH] Track in Reader if it is used in streaming.

---
 Telegram/SourceFiles/base/thread_safe_wrap.h  | 60 +++++++++++++++++++
 .../media/streaming/media_streaming_file.cpp  |  9 ++-
 .../media/streaming/media_streaming_file.h    |  2 +-
 .../streaming/media_streaming_player.cpp      | 10 +++-
 .../media/streaming/media_streaming_player.h  |  1 +
 .../streaming/media_streaming_reader.cpp      | 55 +++++++++++++----
 .../media/streaming/media_streaming_reader.h  | 14 ++++-
 Telegram/gyp/lib_base.gyp                     |  1 +
 8 files changed, 133 insertions(+), 19 deletions(-)
 create mode 100644 Telegram/SourceFiles/base/thread_safe_wrap.h

diff --git a/Telegram/SourceFiles/base/thread_safe_wrap.h b/Telegram/SourceFiles/base/thread_safe_wrap.h
new file mode 100644
index 000000000..c0029f1b1
--- /dev/null
+++ b/Telegram/SourceFiles/base/thread_safe_wrap.h
@@ -0,0 +1,60 @@
+/*
+This file is part of Telegram Desktop,
+the official desktop application for the Telegram messaging service.
+
+For license and copyright information please follow this link:
+https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
+*/
+#pragma once
+
+#include <utility>
+
+namespace base {
+
+template <typename T>
+class thread_safe_wrap {
+public:
+	template <typename ...Args>
+	thread_safe_wrap(Args &&...args) : _value(std::forward<Args>(args)...) {
+	}
+
+	template <typename Callback>
+	auto with(Callback &&callback) {
+		QMutexLocker lock(&_mutex);
+		return callback(_value);
+	}
+
+	template <typename Callback>
+	auto with(Callback &&callback) const {
+		QMutexLocker lock(&_mutex);
+		return callback(_value);
+	}
+
+private:
+	T _value;
+	QMutex _mutex;
+
+};
+
+template <typename T>
+class thread_safe_queue {
+public:
+	template <typename ...Args>
+	void emplace(Args &&...args) {
+		_wrap.with([&](std::vector<T> &value) {
+			value.emplace_back(std::forward<Args>(args)...);
+		});
+	}
+
+	std::vector<T> take() {
+		return _wrap.with([&](std::vector<T> &value) {
+			return std::exchange(value, std::vector<T>());
+		});
+	}
+
+private:
+	thread_safe_wrap<std::vector<T>> _wrap;
+
+};
+
+} // namespace base
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
index 813df6137..f0a96700a 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp
@@ -265,7 +265,9 @@ void File::Context::readNextPacket() {
 		const auto more = _delegate->fileProcessPacket(std::move(*packet));
 		if (!more) {
 			do {
+				_reader->startSleep(&_semaphore);
 				_semaphore.acquire();
+				_reader->stopSleep();
 			} while (!unroll() && !_delegate->fileReadMore());
 		}
 	} else {
@@ -332,8 +334,9 @@ File::File(
 }
 
 void File::start(not_null<FileDelegate*> delegate, crl::time position) {
-	stop();
+	stop(true);
 
+	_reader->startStreaming();
 	_context.emplace(delegate, _reader.get());
 	_thread = std::thread([=, context = &*_context] {
 		context->start(position);
@@ -349,12 +352,12 @@ void File::wake() {
 	_context->wake();
 }
 
-void File::stop() {
+void File::stop(bool stillActive) {
 	if (_thread.joinable()) {
 		_context->interrupt();
 		_thread.join();
 	}
-	_reader->stop();
+	_reader->stopStreaming(stillActive);
 	_context.reset();
 }
 
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.h b/Telegram/SourceFiles/media/streaming/media_streaming_file.h
index 31523add7..1c253b17a 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_file.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.h
@@ -33,7 +33,7 @@ public:
 
 	void start(not_null<FileDelegate*> delegate, crl::time position);
 	void wake();
-	void stop();
+	void stop(bool stillActive = false);
 
 	[[nodiscard]] bool isRemoteLoader() const;
 
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp
index 954060d17..fc40f27e8 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp
@@ -464,7 +464,7 @@ void Player::play(const PlaybackOptions &options) {
 
 	const auto previous = getCurrentReceivedTill(computeTotalDuration());
 
-	stop();
+	stop(true);
 	_lastFailure = std::nullopt;
 
 	savePreviousReceivedTill(options, previous);
@@ -542,6 +542,10 @@ void Player::resume() {
 	updatePausedState();
 }
 
+void Player::stop() {
+	stop(false);
+}
+
 void Player::updatePausedState() {
 	const auto paused = _pausedByUser || _pausedByWaitingForData;
 	if (_paused == paused) {
@@ -680,8 +684,8 @@ void Player::checkVideoStep() {
 	}
 }
 
-void Player::stop() {
-	_file->stop();
+void Player::stop(bool stillActive) {
+	_file->stop(stillActive);
 	_sessionLifetime = rpl::lifetime();
 	_stage = Stage::Uninitialized;
 	_audio = nullptr;
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.h b/Telegram/SourceFiles/media/streaming/media_streaming_player.h
index 5cbc06c93..4c5b74ec3 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_player.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.h
@@ -89,6 +89,7 @@ private:
 	void streamReady(Information &&information);
 	void streamFailed(Error error);
 	void start();
+	void stop(bool stillActive);
 	void provideStartInformation();
 	void fail(Error error);
 	void checkVideoStep();
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp
index 75c571281..9d4e14a2a 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp
@@ -713,8 +713,8 @@ Reader::Reader(
 
 		_loadedParts.emplace(std::move(part));
 
-		if (const auto waiting = _waiting.load()) {
-			_waiting = nullptr;
+		if (const auto waiting = _waiting.load(std::memory_order_acquire)) {
+			_waiting.store(nullptr, std::memory_order_release);
 			waiting->release();
 		}
 	}, _lifetime);
@@ -724,8 +724,34 @@ Reader::Reader(
 	}
 }
 
-void Reader::stop() {
-	_waiting = nullptr;
+void Reader::startSleep(not_null<crl::semaphore*> wake) {
+	_sleeping.store(wake, std::memory_order_release);
+	processDownloaderRequests();
+}
+
+void Reader::wakeFromSleep() {
+	if (const auto sleeping = _sleeping.load(std::memory_order_acquire)) {
+		_sleeping.store(nullptr, std::memory_order_release);
+		sleeping->release();
+	}
+}
+
+void Reader::stopSleep() {
+	_sleeping.store(nullptr, std::memory_order_release);
+}
+
+void Reader::startStreaming() {
+	_streamingActive = true;
+}
+
+void Reader::stopStreaming(bool stillActive) {
+	Expects(_sleeping == nullptr);
+
+	_waiting.store(nullptr, std::memory_order_release);
+	if (!stillActive) {
+		_streamingActive = false;
+		processDownloaderRequests();
+	}
 }
 
 rpl::producer<LoadedPart> Reader::partsForDownloader() const {
@@ -735,7 +761,11 @@ rpl::producer<LoadedPart> Reader::partsForDownloader() const {
 void Reader::loadForDownloader(int offset) {
 	_downloaderAttached.store(true, std::memory_order_release);
 	_downloaderOffsetRequests.emplace(offset);
-	AssertIsDebug(); // wake?
+	if (_streamingActive) {
+		wakeFromSleep();
+	} else {
+		processDownloaderRequests();
+	}
 }
 
 void Reader::cancelForDownloader() {
@@ -745,6 +775,10 @@ void Reader::cancelForDownloader() {
 	}
 }
 
+void Reader::processDownloaderRequests() {
+
+}
+
 bool Reader::isRemoteLoader() const {
 	return _loader->baseCacheKey().has_value();
 }
@@ -772,7 +806,7 @@ void Reader::readFromCache(int sliceNumber) {
 			QMutexLocker lock(&strong->mutex);
 			strong->results.emplace(sliceNumber, std::move(result));
 			if (const auto waiting = strong->waiting.load()) {
-				strong->waiting = nullptr;
+				strong->waiting.store(nullptr, std::memory_order_release);
 				waiting->release();
 			}
 		}
@@ -810,12 +844,12 @@ bool Reader::fill(
 		if (_cacheHelper) {
 			_cacheHelper->waiting = notify.get();
 		}
-		_waiting = notify.get();
+		_waiting.store(notify.get(), std::memory_order_release);
 	};
 	const auto clearWaiting = [&] {
-		_waiting = nullptr;
+		_waiting.store(nullptr, std::memory_order_release);
 		if (_cacheHelper) {
-			_cacheHelper->waiting = nullptr;
+			_cacheHelper->waiting.store(nullptr, std::memory_order_release);
 		}
 	};
 	const auto done = [&] {
@@ -929,6 +963,7 @@ bool Reader::processLoadedParts() {
 		return false;
 	}
 
+
 	auto loaded = _loadedParts.take();
 	for (auto &part : loaded) {
 		if (part.offset == LoadedPart::kFailedOffset
@@ -958,7 +993,7 @@ void Reader::finalizeCache() {
 	}
 	if (_cacheHelper->waiting != nullptr) {
 		QMutexLocker lock(&_cacheHelper->mutex);
-		_cacheHelper->waiting = nullptr;
+		_cacheHelper->waiting.store(nullptr, std::memory_order_release);
 	}
 	auto toCache = _slices.unloadToCache();
 	while (toCache.number >= 0) {
diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h
index 58829be74..c452f29cc 100644
--- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h
+++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h
@@ -10,7 +10,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
 #include "media/streaming/media_streaming_loader.h"
 #include "base/bytes.h"
 #include "base/weak_ptr.h"
-#include "base/thread_safe_queue.h"
+#include "base/thread_safe_wrap.h"
 
 namespace Storage {
 namespace Cache {
@@ -45,9 +45,15 @@ public:
 		not_null<crl::semaphore*> notify);
 	[[nodiscard]] std::optional<Error> failed() const;
 	void headerDone();
-	void stop();
+
+	// Thread safe.
+	void startSleep(not_null<crl::semaphore*> wake);
+	void wakeFromSleep();
+	void stopSleep();
 
 	// Main thread.
+	void startStreaming();
+	void stopStreaming(bool stillActive = false);
 	[[nodiscard]] rpl::producer<LoadedPart> partsForDownloader() const;
 	void loadForDownloader(int offset);
 	void cancelForDownloader();
@@ -184,6 +190,8 @@ private:
 
 	void finalizeCache();
 
+	void processDownloaderRequests();
+
 	static std::shared_ptr<CacheHelper> InitCacheHelper(
 		std::optional<Storage::Cache::Key> baseKey);
 
@@ -193,6 +201,7 @@ private:
 
 	base::thread_safe_queue<LoadedPart> _loadedParts;
 	std::atomic<crl::semaphore*> _waiting = nullptr;
+	std::atomic<crl::semaphore*> _sleeping = nullptr;
 	PriorityQueue _loadingOffsets;
 
 	Slices _slices;
@@ -203,6 +212,7 @@ private:
 
 	// Main thread.
 	rpl::event_stream<LoadedPart> _partsForDownloader;
+	bool _streamingActive = false;
 	rpl::lifetime _lifetime;
 
 };
diff --git a/Telegram/gyp/lib_base.gyp b/Telegram/gyp/lib_base.gyp
index 12dee1cf9..3ffbf7f90 100644
--- a/Telegram/gyp/lib_base.gyp
+++ b/Telegram/gyp/lib_base.gyp
@@ -72,6 +72,7 @@
       '<(src_loc)/base/qthelp_url.h',
       '<(src_loc)/base/runtime_composer.cpp',
       '<(src_loc)/base/runtime_composer.h',
+      '<(src_loc)/base/thread_safe_wrap.h',
       '<(src_loc)/base/timer.cpp',
       '<(src_loc)/base/timer.h',
       '<(src_loc)/base/type_traits.h',