diff --git a/Telegram/SourceFiles/base/thread_safe_wrap.h b/Telegram/SourceFiles/base/thread_safe_wrap.h new file mode 100644 index 000000000..c0029f1b1 --- /dev/null +++ b/Telegram/SourceFiles/base/thread_safe_wrap.h @@ -0,0 +1,60 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +#include + +namespace base { + +template +class thread_safe_wrap { +public: + template + thread_safe_wrap(Args &&...args) : _value(std::forward(args)...) { + } + + template + auto with(Callback &&callback) { + QMutexLocker lock(&_mutex); + return callback(_value); + } + + template + auto with(Callback &&callback) const { + QMutexLocker lock(&_mutex); + return callback(_value); + } + +private: + T _value; + QMutex _mutex; + +}; + +template +class thread_safe_queue { +public: + template + void emplace(Args &&...args) { + _wrap.with([&](std::vector &value) { + value.emplace_back(std::forward(args)...); + }); + } + + std::vector take() { + return _wrap.with([&](std::vector &value) { + return std::exchange(value, std::vector()); + }); + } + +private: + thread_safe_wrap> _wrap; + +}; + +} // namespace base diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp index 813df6137..f0a96700a 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp @@ -265,7 +265,9 @@ void File::Context::readNextPacket() { const auto more = _delegate->fileProcessPacket(std::move(*packet)); if (!more) { do { + _reader->startSleep(&_semaphore); _semaphore.acquire(); + _reader->stopSleep(); } while (!unroll() && !_delegate->fileReadMore()); } } else { @@ -332,8 +334,9 @@ File::File( } void File::start(not_null delegate, crl::time position) { - stop(); + stop(true); + _reader->startStreaming(); _context.emplace(delegate, _reader.get()); _thread = std::thread([=, context = &*_context] { context->start(position); @@ -349,12 +352,12 @@ void File::wake() { _context->wake(); } -void File::stop() { +void File::stop(bool stillActive) { if (_thread.joinable()) { _context->interrupt(); _thread.join(); } - _reader->stop(); + _reader->stopStreaming(stillActive); _context.reset(); } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.h b/Telegram/SourceFiles/media/streaming/media_streaming_file.h index 31523add7..1c253b17a 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_file.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.h @@ -33,7 +33,7 @@ public: void start(not_null delegate, crl::time position); void wake(); - void stop(); + void stop(bool stillActive = false); [[nodiscard]] bool isRemoteLoader() const; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp index 954060d17..fc40f27e8 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.cpp @@ -464,7 +464,7 @@ void Player::play(const PlaybackOptions &options) { const auto previous = getCurrentReceivedTill(computeTotalDuration()); - stop(); + stop(true); _lastFailure = std::nullopt; savePreviousReceivedTill(options, previous); @@ -542,6 +542,10 @@ void Player::resume() { updatePausedState(); } +void Player::stop() { + stop(false); +} + void Player::updatePausedState() { const auto paused = _pausedByUser || _pausedByWaitingForData; if (_paused == paused) { @@ -680,8 +684,8 @@ void Player::checkVideoStep() { } } -void Player::stop() { - _file->stop(); +void Player::stop(bool stillActive) { + _file->stop(stillActive); _sessionLifetime = rpl::lifetime(); _stage = Stage::Uninitialized; _audio = nullptr; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_player.h b/Telegram/SourceFiles/media/streaming/media_streaming_player.h index 5cbc06c93..4c5b74ec3 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_player.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_player.h @@ -89,6 +89,7 @@ private: void streamReady(Information &&information); void streamFailed(Error error); void start(); + void stop(bool stillActive); void provideStartInformation(); void fail(Error error); void checkVideoStep(); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index 75c571281..9d4e14a2a 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -713,8 +713,8 @@ Reader::Reader( _loadedParts.emplace(std::move(part)); - if (const auto waiting = _waiting.load()) { - _waiting = nullptr; + if (const auto waiting = _waiting.load(std::memory_order_acquire)) { + _waiting.store(nullptr, std::memory_order_release); waiting->release(); } }, _lifetime); @@ -724,8 +724,34 @@ Reader::Reader( } } -void Reader::stop() { - _waiting = nullptr; +void Reader::startSleep(not_null wake) { + _sleeping.store(wake, std::memory_order_release); + processDownloaderRequests(); +} + +void Reader::wakeFromSleep() { + if (const auto sleeping = _sleeping.load(std::memory_order_acquire)) { + _sleeping.store(nullptr, std::memory_order_release); + sleeping->release(); + } +} + +void Reader::stopSleep() { + _sleeping.store(nullptr, std::memory_order_release); +} + +void Reader::startStreaming() { + _streamingActive = true; +} + +void Reader::stopStreaming(bool stillActive) { + Expects(_sleeping == nullptr); + + _waiting.store(nullptr, std::memory_order_release); + if (!stillActive) { + _streamingActive = false; + processDownloaderRequests(); + } } rpl::producer Reader::partsForDownloader() const { @@ -735,7 +761,11 @@ rpl::producer Reader::partsForDownloader() const { void Reader::loadForDownloader(int offset) { _downloaderAttached.store(true, std::memory_order_release); _downloaderOffsetRequests.emplace(offset); - AssertIsDebug(); // wake? + if (_streamingActive) { + wakeFromSleep(); + } else { + processDownloaderRequests(); + } } void Reader::cancelForDownloader() { @@ -745,6 +775,10 @@ void Reader::cancelForDownloader() { } } +void Reader::processDownloaderRequests() { + +} + bool Reader::isRemoteLoader() const { return _loader->baseCacheKey().has_value(); } @@ -772,7 +806,7 @@ void Reader::readFromCache(int sliceNumber) { QMutexLocker lock(&strong->mutex); strong->results.emplace(sliceNumber, std::move(result)); if (const auto waiting = strong->waiting.load()) { - strong->waiting = nullptr; + strong->waiting.store(nullptr, std::memory_order_release); waiting->release(); } } @@ -810,12 +844,12 @@ bool Reader::fill( if (_cacheHelper) { _cacheHelper->waiting = notify.get(); } - _waiting = notify.get(); + _waiting.store(notify.get(), std::memory_order_release); }; const auto clearWaiting = [&] { - _waiting = nullptr; + _waiting.store(nullptr, std::memory_order_release); if (_cacheHelper) { - _cacheHelper->waiting = nullptr; + _cacheHelper->waiting.store(nullptr, std::memory_order_release); } }; const auto done = [&] { @@ -929,6 +963,7 @@ bool Reader::processLoadedParts() { return false; } + auto loaded = _loadedParts.take(); for (auto &part : loaded) { if (part.offset == LoadedPart::kFailedOffset @@ -958,7 +993,7 @@ void Reader::finalizeCache() { } if (_cacheHelper->waiting != nullptr) { QMutexLocker lock(&_cacheHelper->mutex); - _cacheHelper->waiting = nullptr; + _cacheHelper->waiting.store(nullptr, std::memory_order_release); } auto toCache = _slices.unloadToCache(); while (toCache.number >= 0) { diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index 58829be74..c452f29cc 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -10,7 +10,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "media/streaming/media_streaming_loader.h" #include "base/bytes.h" #include "base/weak_ptr.h" -#include "base/thread_safe_queue.h" +#include "base/thread_safe_wrap.h" namespace Storage { namespace Cache { @@ -45,9 +45,15 @@ public: not_null notify); [[nodiscard]] std::optional failed() const; void headerDone(); - void stop(); + + // Thread safe. + void startSleep(not_null wake); + void wakeFromSleep(); + void stopSleep(); // Main thread. + void startStreaming(); + void stopStreaming(bool stillActive = false); [[nodiscard]] rpl::producer partsForDownloader() const; void loadForDownloader(int offset); void cancelForDownloader(); @@ -184,6 +190,8 @@ private: void finalizeCache(); + void processDownloaderRequests(); + static std::shared_ptr InitCacheHelper( std::optional baseKey); @@ -193,6 +201,7 @@ private: base::thread_safe_queue _loadedParts; std::atomic _waiting = nullptr; + std::atomic _sleeping = nullptr; PriorityQueue _loadingOffsets; Slices _slices; @@ -203,6 +212,7 @@ private: // Main thread. rpl::event_stream _partsForDownloader; + bool _streamingActive = false; rpl::lifetime _lifetime; }; diff --git a/Telegram/gyp/lib_base.gyp b/Telegram/gyp/lib_base.gyp index 12dee1cf9..3ffbf7f90 100644 --- a/Telegram/gyp/lib_base.gyp +++ b/Telegram/gyp/lib_base.gyp @@ -72,6 +72,7 @@ '<(src_loc)/base/qthelp_url.h', '<(src_loc)/base/runtime_composer.cpp', '<(src_loc)/base/runtime_composer.h', + '<(src_loc)/base/thread_safe_wrap.h', '<(src_loc)/base/timer.cpp', '<(src_loc)/base/timer.h', '<(src_loc)/base/type_traits.h',