From 8a3506af892cf635507617d7c73d7d265d640954 Mon Sep 17 00:00:00 2001 From: John Preston Date: Mon, 23 Dec 2019 14:13:32 +0300 Subject: [PATCH] Restore download priorities without streaming. --- .../media/streaming/media_streaming_file.cpp | 19 ++++++++++++------- .../media/streaming/media_streaming_file.h | 2 +- .../media/streaming/media_streaming_loader.h | 3 +++ .../media_streaming_loader_local.cpp | 3 +++ .../streaming/media_streaming_loader_local.h | 2 ++ .../media_streaming_loader_mtproto.cpp | 8 ++++++++ .../media_streaming_loader_mtproto.h | 2 ++ .../streaming/media_streaming_reader.cpp | 14 ++++++++++++++ .../media/streaming/media_streaming_reader.h | 3 +++ 9 files changed, 48 insertions(+), 8 deletions(-) diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp index 76eca72e5..e230ed7cb 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp @@ -328,6 +328,11 @@ void File::Context::handleEndOfFile() { if (error) { logFatal(qstr("av_seek_frame")); } + + // If we loaded a file till the end then we think it is fully cached, + // assume we finished loading and don't want to keep all other + // download tasks throttled because of an active streaming. + _reader->tryRemoveLoaderAsync(); } else { _readTillEnd = true; } @@ -374,12 +379,10 @@ bool File::Context::finished() const { return unroll() || _readTillEnd; } -void File::Context::waitTillInterrupted() { - while (!interrupted()) { - _reader->startSleep(&_semaphore); - _semaphore.acquire(); - } - _reader->stopSleep(); +void File::Context::stopStreamingAsync() { + // If we finished loading we don't want to keep all other + // download tasks throttled because of an active streaming. + _reader->stopStreamingAsync(); } File::File( @@ -398,7 +401,9 @@ void File::start(not_null delegate, crl::time position) { while (!context->finished()) { context->readNextPacket(); } - context->waitTillInterrupted(); + if (!context->interrupted()) { + context->stopStreamingAsync(); + } }); } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.h b/Telegram/SourceFiles/media/streaming/media_streaming_file.h index e6992021a..6c059b09f 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_file.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.h @@ -56,7 +56,7 @@ private: [[nodiscard]] bool failed() const; [[nodiscard]] bool finished() const; - void waitTillInterrupted(); + void stopStreamingAsync(); private: enum class SleepPolicy { diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h index 6b315b692..5ccdae833 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h @@ -37,6 +37,9 @@ public: virtual void setPriority(int priority) = 0; virtual void stop() = 0; + // Remove from queue if no requests are in progress. + virtual void tryRemoveFromQueue() = 0; + // Parts will be sent from the main thread. [[nodiscard]] virtual rpl::producer parts() const = 0; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp index a8b462a9d..dd8d937b3 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.cpp @@ -77,6 +77,9 @@ void LoaderLocal::setPriority(int priority) { void LoaderLocal::stop() { } +void LoaderLocal::tryRemoveFromQueue() { +} + rpl::producer LoaderLocal::parts() const { return _parts.events(); } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h index 5b8b80e9b..fccd53495 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_local.h @@ -30,6 +30,8 @@ public: void setPriority(int priority) override; void stop() override; + void tryRemoveFromQueue() override; + // Parts will be sent from the main thread. [[nodiscard]] rpl::producer parts() const override; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp index 137ddbf6f..555665643 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp @@ -68,6 +68,14 @@ void LoaderMtproto::stop() { }); } +void LoaderMtproto::tryRemoveFromQueue() { + crl::on_main(this, [=] { + if (_requested.empty() && !haveSentRequests()) { + removeFromQueue(); + } + }); +} + void LoaderMtproto::cancel(int offset) { crl::on_main(this, [=] { cancelForOffset(offset); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h index 15b5b628c..4d4c33fa3 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h @@ -33,6 +33,8 @@ public: void setPriority(int priority) override; void stop() override; + void tryRemoveFromQueue() override; + // Parts will be sent from the main thread. [[nodiscard]] rpl::producer parts() const override; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index c6e24e62c..365d1270b 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -886,6 +886,19 @@ void Reader::stopSleep() { _sleeping.store(nullptr, std::memory_order_release); } +void Reader::stopStreamingAsync() { + _stopStreamingAsync = true; + crl::on_main(this, [=] { + if (_stopStreamingAsync) { + stopStreaming(false); + } + }); +} + +void Reader::tryRemoveLoaderAsync() { + _loader->tryRemoveFromQueue(); +} + void Reader::startStreaming() { _streamingActive = true; refreshLoaderPriority(); @@ -894,6 +907,7 @@ void Reader::startStreaming() { void Reader::stopStreaming(bool stillActive) { Expects(_sleeping == nullptr); + _stopStreamingAsync = false; _waiting.store(nullptr, std::memory_order_release); if (!stillActive) { _streamingActive = false; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index 126fade25..7d1b97652 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -57,6 +57,8 @@ public: void startSleep(not_null wake); void wakeFromSleep(); void stopSleep(); + void stopStreamingAsync(); + void tryRemoveLoaderAsync(); // Main thread. void startStreaming(); @@ -233,6 +235,7 @@ private: base::thread_safe_queue _loadedParts; std::atomic _waiting = nullptr; std::atomic _sleeping = nullptr; + std::atomic _stopStreamingAsync = false; PriorityQueue _loadingOffsets; Slices _slices;