Restore download priorities without streaming.

This commit is contained in:
John Preston 2019-12-23 14:13:32 +03:00
parent 85545dba64
commit 8a3506af89
9 changed files with 48 additions and 8 deletions

View File

@ -328,6 +328,11 @@ void File::Context::handleEndOfFile() {
if (error) { if (error) {
logFatal(qstr("av_seek_frame")); logFatal(qstr("av_seek_frame"));
} }
// If we loaded a file till the end then we think it is fully cached,
// assume we finished loading and don't want to keep all other
// download tasks throttled because of an active streaming.
_reader->tryRemoveLoaderAsync();
} else { } else {
_readTillEnd = true; _readTillEnd = true;
} }
@ -374,12 +379,10 @@ bool File::Context::finished() const {
return unroll() || _readTillEnd; return unroll() || _readTillEnd;
} }
void File::Context::waitTillInterrupted() { void File::Context::stopStreamingAsync() {
while (!interrupted()) { // If we finished loading we don't want to keep all other
_reader->startSleep(&_semaphore); // download tasks throttled because of an active streaming.
_semaphore.acquire(); _reader->stopStreamingAsync();
}
_reader->stopSleep();
} }
File::File( File::File(
@ -398,7 +401,9 @@ void File::start(not_null<FileDelegate*> delegate, crl::time position) {
while (!context->finished()) { while (!context->finished()) {
context->readNextPacket(); context->readNextPacket();
} }
context->waitTillInterrupted(); if (!context->interrupted()) {
context->stopStreamingAsync();
}
}); });
} }

View File

@ -56,7 +56,7 @@ private:
[[nodiscard]] bool failed() const; [[nodiscard]] bool failed() const;
[[nodiscard]] bool finished() const; [[nodiscard]] bool finished() const;
void waitTillInterrupted(); void stopStreamingAsync();
private: private:
enum class SleepPolicy { enum class SleepPolicy {

View File

@ -37,6 +37,9 @@ public:
virtual void setPriority(int priority) = 0; virtual void setPriority(int priority) = 0;
virtual void stop() = 0; virtual void stop() = 0;
// Remove from queue if no requests are in progress.
virtual void tryRemoveFromQueue() = 0;
// Parts will be sent from the main thread. // Parts will be sent from the main thread.
[[nodiscard]] virtual rpl::producer<LoadedPart> parts() const = 0; [[nodiscard]] virtual rpl::producer<LoadedPart> parts() const = 0;

View File

@ -77,6 +77,9 @@ void LoaderLocal::setPriority(int priority) {
void LoaderLocal::stop() { void LoaderLocal::stop() {
} }
void LoaderLocal::tryRemoveFromQueue() {
}
rpl::producer<LoadedPart> LoaderLocal::parts() const { rpl::producer<LoadedPart> LoaderLocal::parts() const {
return _parts.events(); return _parts.events();
} }

View File

@ -30,6 +30,8 @@ public:
void setPriority(int priority) override; void setPriority(int priority) override;
void stop() override; void stop() override;
void tryRemoveFromQueue() override;
// Parts will be sent from the main thread. // Parts will be sent from the main thread.
[[nodiscard]] rpl::producer<LoadedPart> parts() const override; [[nodiscard]] rpl::producer<LoadedPart> parts() const override;

View File

@ -68,6 +68,14 @@ void LoaderMtproto::stop() {
}); });
} }
void LoaderMtproto::tryRemoveFromQueue() {
crl::on_main(this, [=] {
if (_requested.empty() && !haveSentRequests()) {
removeFromQueue();
}
});
}
void LoaderMtproto::cancel(int offset) { void LoaderMtproto::cancel(int offset) {
crl::on_main(this, [=] { crl::on_main(this, [=] {
cancelForOffset(offset); cancelForOffset(offset);

View File

@ -33,6 +33,8 @@ public:
void setPriority(int priority) override; void setPriority(int priority) override;
void stop() override; void stop() override;
void tryRemoveFromQueue() override;
// Parts will be sent from the main thread. // Parts will be sent from the main thread.
[[nodiscard]] rpl::producer<LoadedPart> parts() const override; [[nodiscard]] rpl::producer<LoadedPart> parts() const override;

View File

@ -886,6 +886,19 @@ void Reader::stopSleep() {
_sleeping.store(nullptr, std::memory_order_release); _sleeping.store(nullptr, std::memory_order_release);
} }
void Reader::stopStreamingAsync() {
_stopStreamingAsync = true;
crl::on_main(this, [=] {
if (_stopStreamingAsync) {
stopStreaming(false);
}
});
}
void Reader::tryRemoveLoaderAsync() {
_loader->tryRemoveFromQueue();
}
void Reader::startStreaming() { void Reader::startStreaming() {
_streamingActive = true; _streamingActive = true;
refreshLoaderPriority(); refreshLoaderPriority();
@ -894,6 +907,7 @@ void Reader::startStreaming() {
void Reader::stopStreaming(bool stillActive) { void Reader::stopStreaming(bool stillActive) {
Expects(_sleeping == nullptr); Expects(_sleeping == nullptr);
_stopStreamingAsync = false;
_waiting.store(nullptr, std::memory_order_release); _waiting.store(nullptr, std::memory_order_release);
if (!stillActive) { if (!stillActive) {
_streamingActive = false; _streamingActive = false;

View File

@ -57,6 +57,8 @@ public:
void startSleep(not_null<crl::semaphore*> wake); void startSleep(not_null<crl::semaphore*> wake);
void wakeFromSleep(); void wakeFromSleep();
void stopSleep(); void stopSleep();
void stopStreamingAsync();
void tryRemoveLoaderAsync();
// Main thread. // Main thread.
void startStreaming(); void startStreaming();
@ -233,6 +235,7 @@ private:
base::thread_safe_queue<LoadedPart, std::vector> _loadedParts; base::thread_safe_queue<LoadedPart, std::vector> _loadedParts;
std::atomic<crl::semaphore*> _waiting = nullptr; std::atomic<crl::semaphore*> _waiting = nullptr;
std::atomic<crl::semaphore*> _sleeping = nullptr; std::atomic<crl::semaphore*> _sleeping = nullptr;
std::atomic<bool> _stopStreamingAsync = false;
PriorityQueue _loadingOffsets; PriorityQueue _loadingOffsets;
Slices _slices; Slices _slices;