Track in Reader if it is used in streaming.

This commit is contained in:
John Preston 2019-04-11 14:52:49 +04:00
parent ebf2a678b1
commit fe15ee742d
8 changed files with 133 additions and 19 deletions

View File

@ -0,0 +1,60 @@
/*
This file is part of Telegram Desktop,
the official desktop application for the Telegram messaging service.
For license and copyright information please follow this link:
https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
*/
#pragma once
#include <utility>
namespace base {
template <typename T>
class thread_safe_wrap {
public:
template <typename ...Args>
thread_safe_wrap(Args &&...args) : _value(std::forward<Args>(args)...) {
}
template <typename Callback>
auto with(Callback &&callback) {
QMutexLocker lock(&_mutex);
return callback(_value);
}
template <typename Callback>
auto with(Callback &&callback) const {
QMutexLocker lock(&_mutex);
return callback(_value);
}
private:
T _value;
QMutex _mutex;
};
template <typename T>
class thread_safe_queue {
public:
template <typename ...Args>
void emplace(Args &&...args) {
_wrap.with([&](std::vector<T> &value) {
value.emplace_back(std::forward<Args>(args)...);
});
}
std::vector<T> take() {
return _wrap.with([&](std::vector<T> &value) {
return std::exchange(value, std::vector<T>());
});
}
private:
thread_safe_wrap<std::vector<T>> _wrap;
};
} // namespace base

View File

@ -265,7 +265,9 @@ void File::Context::readNextPacket() {
const auto more = _delegate->fileProcessPacket(std::move(*packet));
if (!more) {
do {
_reader->startSleep(&_semaphore);
_semaphore.acquire();
_reader->stopSleep();
} while (!unroll() && !_delegate->fileReadMore());
}
} else {
@ -332,8 +334,9 @@ File::File(
}
void File::start(not_null<FileDelegate*> delegate, crl::time position) {
stop();
stop(true);
_reader->startStreaming();
_context.emplace(delegate, _reader.get());
_thread = std::thread([=, context = &*_context] {
context->start(position);
@ -349,12 +352,12 @@ void File::wake() {
_context->wake();
}
void File::stop() {
void File::stop(bool stillActive) {
if (_thread.joinable()) {
_context->interrupt();
_thread.join();
}
_reader->stop();
_reader->stopStreaming(stillActive);
_context.reset();
}

View File

@ -33,7 +33,7 @@ public:
void start(not_null<FileDelegate*> delegate, crl::time position);
void wake();
void stop();
void stop(bool stillActive = false);
[[nodiscard]] bool isRemoteLoader() const;

View File

@ -464,7 +464,7 @@ void Player::play(const PlaybackOptions &options) {
const auto previous = getCurrentReceivedTill(computeTotalDuration());
stop();
stop(true);
_lastFailure = std::nullopt;
savePreviousReceivedTill(options, previous);
@ -542,6 +542,10 @@ void Player::resume() {
updatePausedState();
}
void Player::stop() {
stop(false);
}
void Player::updatePausedState() {
const auto paused = _pausedByUser || _pausedByWaitingForData;
if (_paused == paused) {
@ -680,8 +684,8 @@ void Player::checkVideoStep() {
}
}
void Player::stop() {
_file->stop();
void Player::stop(bool stillActive) {
_file->stop(stillActive);
_sessionLifetime = rpl::lifetime();
_stage = Stage::Uninitialized;
_audio = nullptr;

View File

@ -89,6 +89,7 @@ private:
void streamReady(Information &&information);
void streamFailed(Error error);
void start();
void stop(bool stillActive);
void provideStartInformation();
void fail(Error error);
void checkVideoStep();

View File

@ -713,8 +713,8 @@ Reader::Reader(
_loadedParts.emplace(std::move(part));
if (const auto waiting = _waiting.load()) {
_waiting = nullptr;
if (const auto waiting = _waiting.load(std::memory_order_acquire)) {
_waiting.store(nullptr, std::memory_order_release);
waiting->release();
}
}, _lifetime);
@ -724,8 +724,34 @@ Reader::Reader(
}
}
void Reader::stop() {
_waiting = nullptr;
void Reader::startSleep(not_null<crl::semaphore*> wake) {
_sleeping.store(wake, std::memory_order_release);
processDownloaderRequests();
}
void Reader::wakeFromSleep() {
if (const auto sleeping = _sleeping.load(std::memory_order_acquire)) {
_sleeping.store(nullptr, std::memory_order_release);
sleeping->release();
}
}
void Reader::stopSleep() {
_sleeping.store(nullptr, std::memory_order_release);
}
void Reader::startStreaming() {
_streamingActive = true;
}
void Reader::stopStreaming(bool stillActive) {
Expects(_sleeping == nullptr);
_waiting.store(nullptr, std::memory_order_release);
if (!stillActive) {
_streamingActive = false;
processDownloaderRequests();
}
}
rpl::producer<LoadedPart> Reader::partsForDownloader() const {
@ -735,7 +761,11 @@ rpl::producer<LoadedPart> Reader::partsForDownloader() const {
void Reader::loadForDownloader(int offset) {
_downloaderAttached.store(true, std::memory_order_release);
_downloaderOffsetRequests.emplace(offset);
AssertIsDebug(); // wake?
if (_streamingActive) {
wakeFromSleep();
} else {
processDownloaderRequests();
}
}
void Reader::cancelForDownloader() {
@ -745,6 +775,10 @@ void Reader::cancelForDownloader() {
}
}
void Reader::processDownloaderRequests() {
}
bool Reader::isRemoteLoader() const {
return _loader->baseCacheKey().has_value();
}
@ -772,7 +806,7 @@ void Reader::readFromCache(int sliceNumber) {
QMutexLocker lock(&strong->mutex);
strong->results.emplace(sliceNumber, std::move(result));
if (const auto waiting = strong->waiting.load()) {
strong->waiting = nullptr;
strong->waiting.store(nullptr, std::memory_order_release);
waiting->release();
}
}
@ -810,12 +844,12 @@ bool Reader::fill(
if (_cacheHelper) {
_cacheHelper->waiting = notify.get();
}
_waiting = notify.get();
_waiting.store(notify.get(), std::memory_order_release);
};
const auto clearWaiting = [&] {
_waiting = nullptr;
_waiting.store(nullptr, std::memory_order_release);
if (_cacheHelper) {
_cacheHelper->waiting = nullptr;
_cacheHelper->waiting.store(nullptr, std::memory_order_release);
}
};
const auto done = [&] {
@ -929,6 +963,7 @@ bool Reader::processLoadedParts() {
return false;
}
auto loaded = _loadedParts.take();
for (auto &part : loaded) {
if (part.offset == LoadedPart::kFailedOffset
@ -958,7 +993,7 @@ void Reader::finalizeCache() {
}
if (_cacheHelper->waiting != nullptr) {
QMutexLocker lock(&_cacheHelper->mutex);
_cacheHelper->waiting = nullptr;
_cacheHelper->waiting.store(nullptr, std::memory_order_release);
}
auto toCache = _slices.unloadToCache();
while (toCache.number >= 0) {

View File

@ -10,7 +10,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "media/streaming/media_streaming_loader.h"
#include "base/bytes.h"
#include "base/weak_ptr.h"
#include "base/thread_safe_queue.h"
#include "base/thread_safe_wrap.h"
namespace Storage {
namespace Cache {
@ -45,9 +45,15 @@ public:
not_null<crl::semaphore*> notify);
[[nodiscard]] std::optional<Error> failed() const;
void headerDone();
void stop();
// Thread safe.
void startSleep(not_null<crl::semaphore*> wake);
void wakeFromSleep();
void stopSleep();
// Main thread.
void startStreaming();
void stopStreaming(bool stillActive = false);
[[nodiscard]] rpl::producer<LoadedPart> partsForDownloader() const;
void loadForDownloader(int offset);
void cancelForDownloader();
@ -184,6 +190,8 @@ private:
void finalizeCache();
void processDownloaderRequests();
static std::shared_ptr<CacheHelper> InitCacheHelper(
std::optional<Storage::Cache::Key> baseKey);
@ -193,6 +201,7 @@ private:
base::thread_safe_queue<LoadedPart> _loadedParts;
std::atomic<crl::semaphore*> _waiting = nullptr;
std::atomic<crl::semaphore*> _sleeping = nullptr;
PriorityQueue _loadingOffsets;
Slices _slices;
@ -203,6 +212,7 @@ private:
// Main thread.
rpl::event_stream<LoadedPart> _partsForDownloader;
bool _streamingActive = false;
rpl::lifetime _lifetime;
};

View File

@ -72,6 +72,7 @@
'<(src_loc)/base/qthelp_url.h',
'<(src_loc)/base/runtime_composer.cpp',
'<(src_loc)/base/runtime_composer.h',
'<(src_loc)/base/thread_safe_wrap.h',
'<(src_loc)/base/timer.cpp',
'<(src_loc)/base/timer.h',
'<(src_loc)/base/type_traits.h',