From 220862105014de0c7e482a9b2356dcd495f78394 Mon Sep 17 00:00:00 2001 From: John Preston Date: Mon, 25 Feb 2019 21:26:08 +0400 Subject: [PATCH] First version of caching in media streaming. --- .../SourceFiles/boxes/local_storage_box.cpp | 1 + Telegram/SourceFiles/data/data_session.cpp | 8 + Telegram/SourceFiles/data/data_session.h | 12 +- Telegram/SourceFiles/data/data_types.cpp | 2 +- .../media/streaming/media_streaming_file.cpp | 4 +- .../streaming/media_streaming_loader.cpp | 70 ++ .../media/streaming/media_streaming_loader.h | 30 +- .../media_streaming_loader_mtproto.cpp | 102 +-- .../media_streaming_loader_mtproto.h | 12 +- .../streaming/media_streaming_reader.cpp | 657 ++++++++++++++++-- .../media/streaming/media_streaming_reader.h | 131 +++- .../storage/cache/storage_cache_database.cpp | 8 + .../storage/cache/storage_cache_database.h | 2 + Telegram/SourceFiles/storage/localstorage.cpp | 51 +- Telegram/SourceFiles/storage/localstorage.h | 7 +- 15 files changed, 979 insertions(+), 118 deletions(-) diff --git a/Telegram/SourceFiles/boxes/local_storage_box.cpp b/Telegram/SourceFiles/boxes/local_storage_box.cpp index eaa99b2a1..1db946a0f 100644 --- a/Telegram/SourceFiles/boxes/local_storage_box.cpp +++ b/Telegram/SourceFiles/boxes/local_storage_box.cpp @@ -471,6 +471,7 @@ void LocalStorageBox::save() { update.totalSizeLimit = _sizeLimit; update.totalTimeLimit = _timeLimit; Local::updateCacheSettings(update); + Local::updateCacheBigFileSettings(update); Auth().data().cache().updateSettings(update); closeBox(); } diff --git a/Telegram/SourceFiles/data/data_session.cpp b/Telegram/SourceFiles/data/data_session.cpp index eb8f38519..a1a43d7e2 100644 --- a/Telegram/SourceFiles/data/data_session.cpp +++ b/Telegram/SourceFiles/data/data_session.cpp @@ -145,11 +145,15 @@ Session::Session(not_null session) , _cache(Core::App().databases().get( Local::cachePath(), Local::cacheSettings())) +, _bigFileCache(Core::App().databases().get( + Local::cacheBigFilePath(), + Local::cacheBigFileSettings())) , _selfDestructTimer([=] { checkSelfDestructItems(); }) , _a_sendActions(animation(this, &Session::step_typings)) , _groups(this) , _unmuteByFinishedTimer([=] { unmuteByFinished(); }) { _cache->open(Local::cacheKey()); + _bigFileCache->open(Local::cacheBigFileKey()); setupContactViewsViewer(); setupChannelLeavingViewer(); @@ -745,6 +749,10 @@ Storage::Cache::Database &Session::cache() { return *_cache; } +Storage::Cache::Database &Session::cacheBigFile() { + return *_bigFileCache; +} + void Session::startExport(PeerData *peer) { startExport(peer ? peer->input : MTP_inputPeerEmpty()); } diff --git a/Telegram/SourceFiles/data/data_session.h b/Telegram/SourceFiles/data/data_session.h index 50c5de0f6..d06663164 100644 --- a/Telegram/SourceFiles/data/data_session.h +++ b/Telegram/SourceFiles/data/data_session.h @@ -60,7 +60,7 @@ public: explicit Session(not_null session); ~Session(); - AuthSession &session() const { + [[nodiscard]] AuthSession &session() const { return *_session; } @@ -70,18 +70,21 @@ public: void startExport(const MTPInputPeer &singlePeer); void suggestStartExport(TimeId availableAt); void clearExportSuggestion(); - rpl::producer currentExportView() const; + [[nodiscard]] auto currentExportView() const + -> rpl::producer; bool exportInProgress() const; void stopExportWithConfirmation(FnMut callback); void stopExport(); - const Passport::SavedCredentials *passportCredentials() const; + [[nodiscard]] auto passportCredentials() const + -> const Passport::SavedCredentials*; void rememberPassportCredentials( Passport::SavedCredentials data, crl::time rememberFor); void forgetPassportCredentials(); - Storage::Cache::Database &cache(); + [[nodiscard]] Storage::Cache::Database &cache(); + [[nodiscard]] Storage::Cache::Database &cacheBigFile(); [[nodiscard]] not_null peer(PeerId id); [[nodiscard]] not_null peer(UserId id) = delete; @@ -670,6 +673,7 @@ private: not_null _session; Storage::DatabasePointer _cache; + Storage::DatabasePointer _bigFileCache; std::unique_ptr _export; std::unique_ptr _exportPanel; diff --git a/Telegram/SourceFiles/data/data_types.cpp b/Telegram/SourceFiles/data/data_types.cpp index a0c728283..5d99729b5 100644 --- a/Telegram/SourceFiles/data/data_types.cpp +++ b/Telegram/SourceFiles/data/data_types.cpp @@ -71,7 +71,7 @@ Storage::Cache::Key StorageCacheKey(const StorageImageLocation &location) { Storage::Cache::Key WebDocumentCacheKey(const WebFileLocation &location) { const auto dcId = uint64(location.dc()) & 0xFFULL; - const auto url = location.url(); + const auto &url = location.url(); const auto hash = openssl::Sha256(bytes::make_span(url)); const auto bytes = bytes::make_span(hash); const auto bytes1 = bytes.subspan(0, sizeof(uint32)); diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp index 34b12d414..e8f8977c3 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_file.cpp @@ -37,8 +37,9 @@ int File::Context::read(bytes::span buffer) { } else if (!amount) { return amount; } + buffer = buffer.subspan(0, amount); - while (!_reader->fill(buffer, _offset, &_semaphore)) { + while (!_reader->fill(_offset, buffer, &_semaphore)) { _delegate->fileWaitingForData(); _semaphore.acquire(); if (_interrupted) { @@ -236,6 +237,7 @@ void File::Context::start(crl::time position) { return; } + _reader->headerDone(); if (video.codec || audio.codec) { seekToPosition(video.codec ? video : audio, position); } diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp index 2c5177f53..c82e3fecb 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader.cpp @@ -10,5 +10,75 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL namespace Media { namespace Streaming { +bool operator<( + const PriorityQueue::Entry &a, + const PriorityQueue::Entry &b) { + if (a.priority > b.priority) { + return true; + } else if (a.priority < b.priority) { + return false; + } else { + return a.value < b.value; + } +} + +bool PriorityQueue::add(int value) { + const auto i = ranges::find(_data, value, &Entry::value); + if (i == end(_data)) { + _data.insert({ value, _priority }); + return true; + } else if (i->priority != _priority) { + _data.erase(i); + _data.insert({ value, _priority }); + return true; + } + return false; +} + +bool PriorityQueue::remove(int value) { + const auto i = ranges::find(_data, value, &Entry::value); + if (i == end(_data)) { + return false; + } + _data.erase(i); + return true; +} + +std::optional PriorityQueue::front() const { + return _data.empty() + ? std::nullopt + : std::make_optional(_data.front().value); +} + +std::optional PriorityQueue::take() { + if (_data.empty()) { + return std::nullopt; + } + const auto result = _data.front().value; + _data.erase(_data.begin()); + return result; +} + +base::flat_set PriorityQueue::takeInRange(int from, int till) { + auto result = base::flat_set(); + for (auto i = _data.begin(); i != _data.end();) { + if (i->value >= from && i->value < till) { + result.emplace(i->value); + i = _data.erase(i); + } else { + ++i; + } + } + return result; +} + +void PriorityQueue::clear() { + _data.clear(); +} + +void PriorityQueue::increasePriority() { + ++_priority; +} + } // namespace Streaming } // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h index 0b12342fa..9066817ef 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader.h @@ -21,10 +21,13 @@ class Loader { public: static constexpr auto kPartSize = 128 * 1024; - //[[nodiscard]] virtual Storage::Cache::Key baseCacheKey() const = 0; + [[nodiscard]] virtual auto baseCacheKey() const + -> std::optional = 0; [[nodiscard]] virtual int size() const = 0; - virtual void load(int offset, int till = -1) = 0; + virtual void load(int offset) = 0; + virtual void cancel(int offset) = 0; + virtual void increasePriority() = 0; virtual void stop() = 0; // Parts will be sent from the main thread. @@ -34,5 +37,28 @@ public: }; +class PriorityQueue { +public: + bool add(int value); + bool remove(int value); + void increasePriority(); + std::optional front() const; + std::optional take(); + base::flat_set takeInRange(int from, int till); + void clear(); + +private: + struct Entry { + int value = 0; + int priority = 0; + }; + + friend bool operator<(const Entry &a, const Entry &b); + + base::flat_set _data; + int _priority = 0; + +}; + } // namespace Streaming } // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp index 672a6f815..4b70c3eb9 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.cpp @@ -8,12 +8,15 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "media/streaming/media_streaming_loader_mtproto.h" #include "apiwrap.h" +#include "storage/cache/storage_cache_types.h" namespace Media { namespace Streaming { namespace { constexpr auto kMaxConcurrentRequests = 2; +constexpr auto kDocumentBaseCacheTag = 0x0000000000010000ULL; +constexpr auto kDocumentBaseCacheMask = 0x000000000000FF00ULL; } // namespace @@ -30,27 +33,68 @@ LoaderMtproto::LoaderMtproto( , _origin(origin) { } +std::optional LoaderMtproto::baseCacheKey() const { + return _location.match([&](const MTPDinputDocumentFileLocation &data) { + const auto id = data.vid.v; + const auto high = kDocumentBaseCacheTag + | ((uint64(_dcId) << 16) & kDocumentBaseCacheMask) + | (id >> 48); + const auto low = (id << 16); + + Ensures((low & 0xFFULL) == 0); + return Storage::Cache::Key{ high, low }; + }, [](auto &&) -> Storage::Cache::Key { + Unexpected("Not implemented file location type."); + }); +} + int LoaderMtproto::size() const { return _size; } -void LoaderMtproto::load(int offset, int till) { +void LoaderMtproto::load(int offset) { crl::on_main(this, [=] { - cancelRequestsBefore(offset); - _till = till; - sendNext(offset); + if (_requests.contains(offset)) { + return; + } else if (_requested.add(offset)) { + sendNext(); + } }); } -void LoaderMtproto::sendNext(int possibleOffset) { - Expects((possibleOffset % kPartSize) == 0); +void LoaderMtproto::stop() { + crl::on_main(this, [=] { + ranges::for_each( + base::take(_requests), + _sender.requestCanceller(), + &base::flat_map::value_type::second); + _requested.clear(); + }); +} - const auto offset = _requests.empty() - ? possibleOffset - : _requests.back().first + kPartSize; - if ((_till >= 0 && offset >= _till) || (_size > 0 && offset >= _size)) { +void LoaderMtproto::cancel(int offset) { + crl::on_main(this, [=] { + if (const auto requestId = _requests.take(offset)) { + _sender.request(*requestId).cancel(); + sendNext(); + } else { + _requested.remove(offset); + } + }); +} + +void LoaderMtproto::increasePriority() { + crl::on_main(this, [=] { + _requested.increasePriority(); + }); +} + +void LoaderMtproto::sendNext() { + if (_requests.size() >= kMaxConcurrentRequests) { return; - } else if (_requests.size() >= kMaxConcurrentRequests) { + } + const auto offset = _requested.take().value_or(-1); + if (offset < 0) { return; } @@ -68,15 +112,13 @@ void LoaderMtproto::sendNext(int possibleOffset) { ).send(); _requests.emplace(offset, id); - sendNext(offset + kPartSize); + sendNext(); } void LoaderMtproto::requestDone(int offset, const MTPupload_File &result) { result.match([&](const MTPDupload_file &data) { _requests.erase(offset); - if (data.vbytes.v.size() == kPartSize) { - sendNext(offset + kPartSize); - } + sendNext(); _parts.fire({ offset, data.vbytes.v }); }, [&](const MTPDupload_fileCdnRedirect &data) { changeCdnParams( @@ -111,41 +153,11 @@ void LoaderMtproto::requestFailed(int offset, const RPCError &error) { _api->refreshFileReference(_origin, crl::guard(this, callback)); } -void LoaderMtproto::stop() { - crl::on_main(this, [=] { - for (const auto [offset, requestId] : base::take(_requests)) { - _sender.request(requestId).cancel(); - } - }); -} - rpl::producer LoaderMtproto::parts() const { return _parts.events(); } LoaderMtproto::~LoaderMtproto() = default; -void LoaderMtproto::cancelRequestsBefore(int offset) { - const auto from = begin(_requests); - const auto till = ranges::lower_bound( - _requests, - offset, - ranges::less(), - [](auto pair) { return pair.first; }); - ranges::for_each( - from, - till, - _sender.requestCanceller(), - &base::flat_map::value_type::second); - _requests.erase(from, till); - - if (!_requests.empty() && _requests.front().first > offset) { - ranges::for_each( - base::take(_requests), - _sender.requestCanceller(), - &base::flat_map::value_type::second); - } -} - } // namespace Streaming } // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h index af95f8c75..3174b6236 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_loader_mtproto.h @@ -25,10 +25,13 @@ public: int size, Data::FileOrigin origin); - //[[nodiscard]] Storage::Cache::Key baseCacheKey() const override; + [[nodiscard]] auto baseCacheKey() const + -> std::optional override; [[nodiscard]] int size() const override; - void load(int offset, int till = -1) override; + void load(int offset) override; + void cancel(int offset) override; + void increasePriority() override; void stop() override; // Parts will be sent from the main thread. @@ -37,8 +40,7 @@ public: ~LoaderMtproto(); private: - void cancelRequestsBefore(int offset); - void sendNext(int possibleOffset); + void sendNext(); void requestDone(int offset, const MTPupload_File &result); void requestFailed(int offset, const RPCError &error); @@ -56,9 +58,9 @@ private: const int _size = 0; const Data::FileOrigin _origin; - int _till = -1; MTP::Sender _sender; + PriorityQueue _requested; base::flat_map _requests; rpl::event_stream _parts; diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp index 0c92059e7..da9560d70 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.cpp @@ -15,6 +15,16 @@ namespace Media { namespace Streaming { namespace { +constexpr auto kPartSize = Loader::kPartSize; +constexpr auto kPartsInSlice = 64; +constexpr auto kInSlice = kPartsInSlice * kPartSize; +constexpr auto kMaxPartsInHeader = 72; +constexpr auto kMaxInHeader = kMaxPartsInHeader * kPartSize; +constexpr auto kMaxOnlyInHeader = 80 * kPartSize; + +// 1 MB of parts are requested from cloud ahead of reading demand. +constexpr auto kPreloadPartsAhead = 8; + template // Range::value_type is Pair int FindNotLoadedStart(Range &&parts, int offset) { auto result = offset; @@ -50,11 +60,419 @@ void CopyLoaded(bytes::span buffer, Range &&parts, int offset, int till) { } // namespace +template +bool Reader::StackIntVector::add(int value) { + using namespace rpl::mappers; + + const auto i = ranges::find_if(_storage, _1 < 0); + if (i == end(_storage)) { + return false; + } + *i = value; + const auto next = i + 1; + if (next != end(_storage)) { + *next = -1; + } + return true; +} + +template +auto Reader::StackIntVector::values() const { + using namespace rpl::mappers; + + return ranges::view::all(_storage) | ranges::view::take_while(_1 >= 0); +} + +struct Reader::CacheHelper { + explicit CacheHelper(Storage::Cache::Key baseKey); + + Storage::Cache::Key key(int sliceNumber) const; + + const Storage::Cache::Key baseKey; + + QMutex mutex; + base::flat_map results; + std::atomic waiting = nullptr; +}; + +Reader::CacheHelper::CacheHelper(Storage::Cache::Key baseKey) +: baseKey(baseKey) { +} + +Storage::Cache::Key Reader::CacheHelper::key(int sliceNumber) const { + return Storage::Cache::Key{ baseKey.high, baseKey.low + sliceNumber }; +} + +bool Reader::Slice::processCacheData(QByteArray &&data, int maxSliceSize) { + Expects((flags & Flag::LoadingFromCache) != 0); + Expects(!(flags & Flag::LoadedFromCache)); + + const auto guard = gsl::finally([&] { + flags |= Flag::LoadedFromCache; + flags &= ~Flag::LoadingFromCache; + }); + + const auto size = data.size(); + if (!(size % kPartSize) || (size == maxSliceSize)) { + if (size > maxSliceSize) { + return false; + } + for (auto offset = 0; offset < size; offset += kPartSize) { + parts.emplace(offset, data.mid(offset, kPartSize)); + } + return true; + } + return processComplexCacheData( + bytes::make_span(data), + maxSliceSize); +} + +bool Reader::Slice::processComplexCacheData( + bytes::const_span data, + int maxSliceSize) { + const auto takeInt = [&]() -> std::optional { + if (data.size() < sizeof(int32)) { + return std::nullopt; + } + const auto bytes = data.data(); + const auto result = *reinterpret_cast(bytes); + data = data.subspan(sizeof(int32)); + return result; + }; + const auto takeBytes = [&](int count) -> std::optional { + if (count <= 0 || data.size() < count) { + return std::nullopt; + } + auto result = QByteArray( + reinterpret_cast(data.data()), + count); + data = data.subspan(count); + return result; + }; + const auto count = takeInt().value_or(0); + if (count <= 0) { + return false; + } + for (auto i = 0; i != count; ++i) { + const auto offset = takeInt().value_or(0); + const auto size = takeInt().value_or(0); + auto bytes = takeBytes(size).value_or(QByteArray()); + if (offset < 0 + || offset >= maxSliceSize + || size <= 0 + || size > maxSliceSize + || offset + size > maxSliceSize + || bytes.size() != size) { + return false; + } + parts.emplace(offset, std::move(bytes)); + } + return true; +} + +void Reader::Slice::addPart(int offset, QByteArray bytes) { + Expects(!parts.contains(offset)); + + parts.emplace(offset, std::move(bytes)); + if (flags & Flag::LoadedFromCache) { + flags |= Flag::ChangedSinceCache; + } +} + +auto Reader::Slice::prepareFill(int from, int till) -> PrepareFillResult { + auto result = PrepareFillResult(); + + result.ready = false; + const auto fromOffset = (from / kPartSize) * kPartSize; + const auto tillPart = (till + kPartSize - 1) / kPartSize; + const auto preloadTillOffset = std::min( + (tillPart + kPreloadPartsAhead) * kPartSize, + kInSlice); + + const auto after = ranges::upper_bound( + parts, + from, + ranges::less(), + &base::flat_map::value_type::first); + if (after == begin(parts)) { + result.offsetsFromLoader = offsetsFromLoader( + fromOffset, + preloadTillOffset); + return result; + } + + const auto start = after - 1; + const auto finish = ranges::lower_bound( + start, + end(parts), + till, + ranges::less(), + &base::flat_map::value_type::first); + const auto haveTill = FindNotLoadedStart( + ranges::make_iterator_range(start, finish), + fromOffset); + if (haveTill < till) { + result.offsetsFromLoader = offsetsFromLoader( + haveTill, + preloadTillOffset); + return result; + } + result.ready = true; + result.start = start; + result.finish = finish; + result.offsetsFromLoader = offsetsFromLoader( + tillPart * kPartSize, + preloadTillOffset); + return result; +} + +auto Reader::Slice::offsetsFromLoader(int from, int till) const +-> StackIntVector { + auto result = StackIntVector(); + + const auto after = ranges::upper_bound( + parts, + from, + ranges::less(), + &base::flat_map::value_type::first); + auto check = (after == begin(parts)) ? after : (after - 1); + const auto end = parts.end(); + for (auto offset = from; offset != till; offset += kPartSize) { + while (check != end && check->first < offset) { + ++check; + } + if (check != end && check->first == offset) { + continue; + } else if (!result.add(offset)) { + break; + } + } + return result; +} + +Reader::Slices::Slices(int size, bool useCache) +: _size(size) { + Expects(size > 0); + + _header.flags |= Slice::Flag::Header; + if (useCache) { + _header.flags |= Slice::Flag::LoadingFromCache; + // #TODO streaming HeaderMode::Full. + //if (_size <= kMaxOnlyInHeader) { + // _headerMode = HeaderMode::Full; + //} + } else { + _headerMode = HeaderMode::NoCache; + } + + const auto count = ((_size + kInSlice - 1) / kInSlice); + _data.resize(count); +} + +void Reader::Slices::headerDone(bool fromCache) { + if (_headerMode != HeaderMode::Unknown) { + return; + } + // #TODO streaming HeaderMode::Full. + _headerMode = HeaderMode::Small; + if (!fromCache) { + for (auto &slice : _data) { + using Flag = Slice::Flag; + Assert(!(slice.flags + & (Flag::LoadingFromCache | Flag::LoadedFromCache))); + slice.flags |= Slice::Flag::LoadedFromCache; + } + } +} + +void Reader::Slices::applyHeaderCacheData() { + if (_header.parts.empty()) { + return; + } + for (const auto &[offset, part] : _header.parts) { + const auto index = offset / kInSlice; + _data[index].addPart( + offset - index * kInSlice, + base::duplicate(part)); + } + headerDone(true); +} + +bool Reader::Slices::processCacheResult( + int sliceNumber, + QByteArray &&result) { + Expects(sliceNumber >= 0 && sliceNumber <= _data.size()); + + auto &slice = (sliceNumber ? _data[sliceNumber - 1] : _header); + const auto success = slice.processCacheData( + std::move(result), + maxSliceSize(sliceNumber)); + if (!sliceNumber) { + applyHeaderCacheData(); + } + return success; +} + +bool Reader::Slices::processPart(int offset, QByteArray &&bytes) { + Expects(offset / kInSlice < _data.size()); + + const auto index = offset / kInSlice; + if (_headerMode == HeaderMode::Unknown) { + if (_header.parts.contains(offset)) { + return true; + } else if (_header.parts.size() == kMaxPartsInHeader) { + // #TODO streaming later unavailable, full load? + return false; + } + _header.addPart(offset, bytes); + } + _data[index].addPart(offset - index * kInSlice, std::move(bytes)); + return true; +} + +auto Reader::Slices::fill(int offset, bytes::span buffer) -> FillResult { + Expects(!buffer.empty()); + Expects(offset >= 0 && offset < _size); + Expects(offset + buffer.size() <= _size); + Expects(buffer.size() <= kInSlice); + + if (_headerMode != HeaderMode::NoCache + && !(_header.flags & Slice::Flag::LoadedFromCache)) { + // Waiting for initial cache query. + return {}; + } + + auto result = FillResult(); + const auto till = int(offset + buffer.size()); + const auto fromSlice = offset / kInSlice; + const auto tillSlice = (till + kInSlice - 1) / kInSlice; + Assert(fromSlice >= 0 + && (fromSlice + 1 == tillSlice || fromSlice + 2 == tillSlice) + && tillSlice <= _data.size()); + + const auto handlePrepareResult = [&]( + int sliceIndex, + const Slice::PrepareFillResult &prepared) { + if (sliceIndex == _data.size()) { + return; + } + using Flag = Slice::Flag; + if (!(_data[sliceIndex].flags & Flag::LoadedFromCache) + && _headerMode != HeaderMode::NoCache + && _headerMode != HeaderMode::Unknown) { + if (!(_data[sliceIndex].flags & Flag::LoadingFromCache)) { + _data[sliceIndex].flags |= Slice::Flag::LoadingFromCache; + result.sliceNumbersFromCache.add(sliceIndex + 1); + } + return; + } + for (const auto offset : prepared.offsetsFromLoader.values()) { + const auto full = offset + sliceIndex * kInSlice; + if (sliceIndex + 1 != _data.size() || full < _size) { + result.offsetsFromLoader.add(full); + } + } + }; + + const auto firstFrom = offset - fromSlice * kInSlice; + const auto firstTill = std::min(kInSlice, till - fromSlice * kInSlice); + const auto secondFrom = 0; + const auto secondTill = till - (fromSlice + 1) * kInSlice; + const auto first = _data[fromSlice].prepareFill(firstFrom, firstTill); + const auto second = (fromSlice + 1 < tillSlice) + ? _data[fromSlice + 1].prepareFill(secondFrom, secondTill) + : Slice::PrepareFillResult(); + handlePrepareResult(fromSlice, first); + handlePrepareResult(fromSlice + 1, second); + if (first.ready && second.ready) { + CopyLoaded( + buffer, + ranges::make_iterator_range(first.start, first.finish), + firstFrom, + firstTill); + if (fromSlice + 1 < tillSlice) { + CopyLoaded( + buffer.subspan(firstTill - firstFrom), + ranges::make_iterator_range(second.start, second.finish), + secondFrom, + secondTill); + } + result.filled = true; + } + return result; +} + +int Reader::Slices::maxSliceSize(int sliceNumber) const { + return (sliceNumber == _data.size()) + ? (_size - (sliceNumber - 1) * kInSlice) + : (sliceNumber > 0) + ? kInSlice + : _size; +} + +Reader::SerializedSlice Reader::Slices::serializeAndUnloadSlice( + int sliceNumber, + Slice &slice) const { + Expects(!slice.parts.empty()); + + const auto count = slice.parts.size(); + auto result = SerializedSlice(); + result.number = sliceNumber; + const auto continuous = FindNotLoadedStart(slice.parts, 0); + if (continuous > slice.parts.back().first) { + // All data is continuous. + result.data.reserve(count * kPartSize); + for (const auto &[offset, part] : slice.parts) { + result.data.append(part); + } + } else { + const auto intSize = sizeof(int32); + result.data.reserve(count * kPartSize + 2 * intSize * (count + 1)); + const auto appendInt = [&](int value) { + auto serialized = int32(value); + result.data.append( + reinterpret_cast(&serialized), + intSize); + }; + appendInt(count); + for (const auto &[offset, part] : slice.parts) { + appendInt(offset); + appendInt(part.size()); + result.data.append(part); + } + if (result.data.size() == maxSliceSize(sliceNumber)) { + // Make sure this data won't be taken for full continuous data. + appendInt(0); + } + } + slice = Slice(); + return result; +} + +Reader::SerializedSlice Reader::Slices::unloadToCache() { + if (_headerMode == HeaderMode::Unknown + || _headerMode == HeaderMode::NoCache) { + return {}; + } + if (_header.flags & Slice::Flag::ChangedSinceCache) { + return serializeAndUnloadSlice(0, _header); + } + auto &&indexed = ranges::view::zip(_data, ranges::view::ints(0)); + for (auto &&[slice, index] : indexed) { + if (slice.flags & Slice::Flag::ChangedSinceCache) { + return serializeAndUnloadSlice(index + 1, slice); + } + } + return {}; +} + Reader::Reader( not_null owner, std::unique_ptr loader) : _owner(owner) -, _loader(std::move(loader)) { +, _loader(std::move(loader)) +, _cacheHelper(InitCacheHelper(_loader->baseCacheKey())) +, _slices(_loader->size(), _cacheHelper != nullptr) { _loader->parts( ) | rpl::start_with_next([=](LoadedPart &&part) { QMutexLocker lock(&_loadedPartsMutex); @@ -66,6 +484,45 @@ Reader::Reader( waiting->release(); } }, _lifetime); + + if (_cacheHelper) { + readFromCache(0); + } +} + +std::shared_ptr Reader::InitCacheHelper( + std::optional baseKey) { + if (!baseKey) { + return nullptr; + } + return std::make_shared(*baseKey); +} + +// 0 is for headerData, slice index = sliceNumber - 1. +void Reader::readFromCache(int sliceNumber) { + Expects(_cacheHelper != nullptr); + + const auto key = _cacheHelper->key(sliceNumber); + const auto weak = std::weak_ptr(_cacheHelper); + _owner->cacheBigFile().get(key, [=](QByteArray &&result) { + if (const auto strong = weak.lock()) { + QMutexLocker lock(&strong->mutex); + strong->results.emplace(sliceNumber, std::move(result)); + if (const auto waiting = strong->waiting.load()) { + strong->waiting = nullptr; + waiting->release(); + } + } + }); +} + +void Reader::putToCache(SerializedSlice &&slice) { + Expects(_cacheHelper != nullptr); + Expects(slice.number >= 0); + + _owner->cacheBigFile().put( + _cacheHelper->key(slice.number), + std::move(slice.data)); } int Reader::size() const { @@ -76,86 +533,192 @@ bool Reader::failed() const { return _failed; } +void Reader::headerDone() { + _slices.headerDone(false); +} +static auto fills = 0; +static auto several = 0; +static auto fulltime = 0; +static auto maxtime = 0; bool Reader::fill( - bytes::span buffer, int offset, - crl::semaphore *notify) { + bytes::span buffer, + not_null notify) { Expects(offset + buffer.size() <= size()); - const auto wait = [&](int offset) { - _waiting = notify; - loadFor(offset); - return false; + const auto now = crl::now(); + const auto guard = gsl::finally([&] { + const auto time = int(crl::now() - now); + fulltime += time; + maxtime = std::max(maxtime, time); + }); + ++fills; + + const auto startWaiting = [&] { + if (_cacheHelper) { + _cacheHelper->waiting = notify.get(); + } + _waiting = notify.get(); + }; + const auto clearWaiting = [&] { + _waiting = nullptr; + if (_cacheHelper) { + _cacheHelper->waiting = nullptr; + } }; const auto done = [&] { - _waiting = nullptr; + clearWaiting(); return true; }; const auto failed = [&] { - _waiting = nullptr; - if (notify) { - notify->release(); - } + clearWaiting(); + notify->release(); return false; }; + processCacheResults(); processLoadedParts(); if (_failed) { return failed(); } - const auto after = ranges::upper_bound( - _data, - offset, - ranges::less(), - &base::flat_map::value_type::first); - if (after == begin(_data)) { - return wait(offset); - } + do { + if (fillFromSlices(offset, buffer)) { + clearWaiting(); + return true; + } + startWaiting(); + } while (processCacheResults() || processLoadedParts()); - const auto till = int(offset + buffer.size()); - const auto start = after - 1; - const auto finish = ranges::lower_bound( - start, - end(_data), - till, - ranges::less(), - &base::flat_map::value_type::first); - const auto parts = ranges::make_iterator_range(start, finish); - - const auto haveTill = FindNotLoadedStart(parts, offset); - if (haveTill < till) { - return wait(haveTill); - } - CopyLoaded(buffer, parts, offset, till); - return done(); + return _failed ? failed() : false; } -void Reader::processLoadedParts() { - QMutexLocker lock(&_loadedPartsMutex); - auto loaded = std::move(_loadedParts); +bool Reader::fillFromSlices(int offset, bytes::span buffer) { + using namespace rpl::mappers; + + auto result = _slices.fill(offset, buffer); + + for (const auto sliceNumber : result.sliceNumbersFromCache.values()) { + readFromCache(sliceNumber); + } + + ++several; + + if (_cacheHelper && result.toCache.number > 0) { + const auto index = result.toCache.number - 1; + cancelLoadInRange(index * kInSlice, (index + 1) * kInSlice); + putToCache(std::move(result.toCache)); + } + auto checkPriority = true; + for (const auto offset : result.offsetsFromLoader.values()) { + if (checkPriority) { + checkLoadWillBeFirst(offset); + checkPriority = false; + } + loadAtOffset(offset); + } + return result.filled; +} + +void Reader::cancelLoadInRange(int from, int till) { + Expects(from < till); + + for (const auto offset : _loadingOffsets.takeInRange(from, till)) { + LOG(("CANCEL LOAD: %1").arg(offset)); + _loader->cancel(offset); + } +} + +void Reader::checkLoadWillBeFirst(int offset) { + if (_loadingOffsets.front().value_or(offset) != offset) { + LOG(("CHANGING PRIORITY, WAS: %1, NOW: %2").arg(_loadingOffsets.front().value_or(offset)).arg(offset)); + _loadingOffsets.increasePriority(); + _loader->increasePriority(); + } +} + +bool Reader::processCacheResults() { + if (!_cacheHelper) { + return false; + } else if (_failed) { + return false; + } + + QMutexLocker lock(&_cacheHelper->mutex); + auto loaded = base::take(_cacheHelper->results); lock.unlock(); - if (_failed) { - return; + for (auto &&part : loaded) { + const auto sliceNumber = part.first; + auto &serialized = part.second; + _slices.processCacheResult(sliceNumber, std::move(serialized)); } + return !loaded.empty(); +} + +bool Reader::processLoadedParts() { + if (_failed) { + return false; + } + + QMutexLocker lock(&_loadedPartsMutex); + auto loaded = base::take(_loadedParts); + lock.unlock(); + for (auto &part : loaded) { if (part.offset == LoadedPart::kFailedOffset || (part.bytes.size() != Loader::kPartSize && part.offset + part.bytes.size() != size())) { _failed = true; - return; + return false; + } else if (!_loadingOffsets.remove(part.offset)) { + continue; + } else if (!_slices.processPart( + part.offset, + std::move(part.bytes))) { + _failed = true; + return false; } - _data.emplace(part.offset, std::move(part.bytes)); + } + return !loaded.empty(); +} + +static auto real = 0; +static auto skip = 0; +void Reader::loadAtOffset(int offset) { + if (offset == 655360) { + int a = 0; + } + if (_loadingOffsets.add(offset)) { + LOG(("START LOAD: %1").arg(offset)); + _loader->load(offset); + ++real; + } else { + ++skip; } } -void Reader::loadFor(int offset) { - const auto part = offset / Loader::kPartSize; - _loader->load(part * Loader::kPartSize); +void Reader::finalizeCache() { + if (!_cacheHelper) { + return; + } + if (_cacheHelper->waiting != nullptr) { + QMutexLocker lock(&_cacheHelper->mutex); + _cacheHelper->waiting = nullptr; + } + auto toCache = _slices.unloadToCache(); + while (toCache.number >= 0) { + putToCache(std::move(toCache)); + toCache = _slices.unloadToCache(); + } + _owner->cacheBigFile().sync(); } -Reader::~Reader() = default; +Reader::~Reader() { + const auto now = crl::now(); + finalizeCache(); + LOG(("PARTS: %1, REAL: %2, SKIP: %3, FILLS: %4, TIME: %5, MAX: %6, FINALIZE: %7, REPEATED: %8").arg((_loader->size() + kPartSize - 1) / kPartSize).arg(real).arg(skip).arg(fills).arg(fulltime / float64(fills)).arg(maxtime).arg(crl::now() - now).arg(several)); +} } // namespace Streaming } // namespace Media diff --git a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h index dca3d54b3..e14e7bad1 100644 --- a/Telegram/SourceFiles/media/streaming/media_streaming_reader.h +++ b/Telegram/SourceFiles/media/streaming/media_streaming_reader.h @@ -7,8 +7,15 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL */ #pragma once +#include "media/streaming/media_streaming_loader.h" #include "base/bytes.h" +namespace Storage { +namespace Cache { +struct Key; +} // namespace Cache +} // namespace Storage + namespace Data { class Session; } // namespace Data @@ -25,26 +32,138 @@ public: int size() const; bool fill( - bytes::span buffer, int offset, - crl::semaphore *notify = nullptr); + bytes::span buffer, + not_null notify); bool failed() const; + void headerDone(); + ~Reader(); private: - void processLoadedParts(); - void loadFor(int offset); + static constexpr auto kLoadFromRemoteMax = 8; + + struct CacheHelper; + + template + class StackIntVector { + public: + bool add(int value); + auto values() const; + + private: + std::array _storage = { -1 }; + + }; + + struct SerializedSlice { + int number = -1; + QByteArray data; + }; + struct FillResult { + static constexpr auto kReadFromCacheMax = 2; + + StackIntVector sliceNumbersFromCache; + StackIntVector offsetsFromLoader; + SerializedSlice toCache; + bool filled = false; + }; + + struct Slice { + enum class Flag : uchar { + Header = 0x01, + LoadingFromCache = 0x02, + LoadedFromCache = 0x04, + ChangedSinceCache = 0x08, + }; + friend constexpr inline bool is_flag_type(Flag) { return true; } + using Flags = base::flags; + + struct PrepareFillResult { + StackIntVector offsetsFromLoader; + base::flat_map::const_iterator start; + base::flat_map::const_iterator finish; + bool ready = true; + }; + + bool processCacheData(QByteArray &&data, int maxSliceSize); + bool processComplexCacheData( + bytes::const_span data, + int maxSliceSize); + void addPart(int offset, QByteArray bytes); + PrepareFillResult prepareFill(int from, int till); + + // Get up to kLoadFromRemoteMax not loaded parts in from-till range. + StackIntVector offsetsFromLoader( + int from, + int till) const; + + base::flat_map parts; + Flags flags; + + }; + + class Slices { + public: + Slices(int size, bool useCache); + + void headerDone(bool fromCache); + + bool processCacheResult(int sliceNumber, QByteArray &&result); + bool processPart(int offset, QByteArray &&bytes); + + FillResult fill(int offset, bytes::span buffer); + SerializedSlice unloadToCache(); + + private: + enum class HeaderMode { + Unknown, + Small, +// Full, + NoCache, + }; + + void applyHeaderCacheData(); + int maxSliceSize(int sliceNumber) const; + SerializedSlice serializeAndUnloadSlice( + int sliceNumber, + Slice &slice) const; + + std::vector _data; + Slice _header; + int _size = 0; + HeaderMode _headerMode = HeaderMode::Unknown; + + }; + + // 0 is for headerData, slice index = sliceNumber - 1. + void readFromCache(int sliceNumber); + bool processCacheResults(); + void putToCache(SerializedSlice &&data); + + void cancelLoadInRange(int from, int till); + void loadAtOffset(int offset); + void checkLoadWillBeFirst(int offset); + bool processLoadedParts(); + + bool fillFromSlices(int offset, bytes::span buffer); + + void finalizeCache(); + + static std::shared_ptr InitCacheHelper( + std::optional baseKey); const not_null _owner; const std::unique_ptr _loader; + const std::shared_ptr _cacheHelper; QMutex _loadedPartsMutex; std::vector _loadedParts; std::atomic _waiting = nullptr; + PriorityQueue _loadingOffsets; - // #TODO streaming optimize - base::flat_map _data; + Slices _slices; bool _failed = false; rpl::lifetime _lifetime; diff --git a/Telegram/SourceFiles/storage/cache/storage_cache_database.cpp b/Telegram/SourceFiles/storage/cache/storage_cache_database.cpp index 9dab70be5..d7486edce 100644 --- a/Telegram/SourceFiles/storage/cache/storage_cache_database.cpp +++ b/Telegram/SourceFiles/storage/cache/storage_cache_database.cpp @@ -176,6 +176,14 @@ void Database::clearByTag(uint8 tag, FnMut &&done) { }); } +void Database::sync() { + auto semaphore = crl::semaphore(); + _wrapped.with([&](Implementation &) { + semaphore.release(); + }); + semaphore.acquire(); +} + Database::~Database() = default; } // namespace Cache diff --git a/Telegram/SourceFiles/storage/cache/storage_cache_database.h b/Telegram/SourceFiles/storage/cache/storage_cache_database.h index 424f980c2..d28f536a1 100644 --- a/Telegram/SourceFiles/storage/cache/storage_cache_database.h +++ b/Telegram/SourceFiles/storage/cache/storage_cache_database.h @@ -72,6 +72,8 @@ public: void clearByTag(uint8 tag, FnMut &&done = nullptr); void waitForCleaner(FnMut &&done = nullptr); + void sync(); + ~Database(); private: diff --git a/Telegram/SourceFiles/storage/localstorage.cpp b/Telegram/SourceFiles/storage/localstorage.cpp index 09f65d9c4..792ca7e24 100644 --- a/Telegram/SourceFiles/storage/localstorage.cpp +++ b/Telegram/SourceFiles/storage/localstorage.cpp @@ -668,6 +668,8 @@ FileKey _recentHashtagsAndBotsKey = 0; bool _recentHashtagsAndBotsWereRead = false; qint64 _cacheTotalSizeLimit = Database::Settings().totalSizeLimit; qint32 _cacheTotalTimeLimit = Database::Settings().totalTimeLimit; +qint64 _cacheBigFileTotalSizeLimit = Database::Settings().totalSizeLimit; +qint32 _cacheBigFileTotalTimeLimit = Database::Settings().totalTimeLimit; FileKey _exportSettingsKey = 0; @@ -1081,6 +1083,8 @@ bool _readSetting(quint32 blockId, QDataStream &stream, int version, ReadSetting _cacheTotalSizeLimit = size; _cacheTotalTimeLimit = time; + _cacheBigFileTotalSizeLimit = size; + _cacheBigFileTotalTimeLimit = size; } break; case dbiAnimationsDisabled: { @@ -2105,6 +2109,7 @@ void _writeUserSettings() { data.stream << quint32(dbiAutoPlay) << qint32(cAutoPlayGif() ? 1 : 0); data.stream << quint32(dbiUseExternalVideoPlayer) << qint32(cUseExternalVideoPlayer()); data.stream << quint32(dbiCacheSettings) << qint64(_cacheTotalSizeLimit) << qint32(_cacheTotalTimeLimit); + // #TODO streaming save _cacheBigFileTotal limits?.. if (!userData.isEmpty()) { data.stream << quint32(dbiAuthSessionSettings) << userData; } @@ -2813,6 +2818,8 @@ void reset() { _oldMapVersion = _oldSettingsVersion = 0; _cacheTotalSizeLimit = Database::Settings().totalSizeLimit; _cacheTotalTimeLimit = Database::Settings().totalTimeLimit; + _cacheBigFileTotalSizeLimit = Database::Settings().totalSizeLimit; + _cacheBigFileTotalTimeLimit = Database::Settings().totalTimeLimit; StoredAuthSessionCache.reset(); _mapChanged = true; _writeMap(WriteMapWhen::Now); @@ -3176,18 +3183,22 @@ qint32 _storageAudioSize(qint32 rawlen) { return result; } -QString cachePath() { - Expects(!_userDbPath.isEmpty()); - - return _userDbPath + "cache"; -} - Storage::EncryptionKey cacheKey() { Expects(LocalKey != nullptr); return Storage::EncryptionKey(bytes::make_vector(LocalKey->data())); } +Storage::EncryptionKey cacheBigFileKey() { + return cacheKey(); +} + +QString cachePath() { + Expects(!_userDbPath.isEmpty()); + + return _userDbPath + "cache"; +} + Storage::Cache::Database::Settings cacheSettings() { auto result = Storage::Cache::Database::Settings(); result.clearOnWrongKey = true; @@ -3210,6 +3221,34 @@ void updateCacheSettings(Storage::Cache::Database::SettingsUpdate &update) { _writeUserSettings(); } +QString cacheBigFilePath() { + Expects(!_userDbPath.isEmpty()); + + return _userDbPath + "media_cache"; +} + +Storage::Cache::Database::Settings cacheBigFileSettings() { + auto result = Storage::Cache::Database::Settings(); + result.clearOnWrongKey = true; + result.totalSizeLimit = _cacheBigFileTotalSizeLimit; + result.totalTimeLimit = _cacheBigFileTotalTimeLimit; + result.maxDataSize = Storage::kMaxFileInMemory; + return result; +} + +void updateCacheBigFileSettings(Storage::Cache::Database::SettingsUpdate &update) { + Expects(update.totalSizeLimit > Database::Settings().maxDataSize); + Expects(update.totalTimeLimit >= 0); + + if (_cacheBigFileTotalSizeLimit == update.totalSizeLimit + && _cacheBigFileTotalTimeLimit == update.totalTimeLimit) { + return; + } + _cacheBigFileTotalSizeLimit = update.totalSizeLimit; + _cacheBigFileTotalTimeLimit = update.totalTimeLimit; + //_writeUserSettings(); // #TODO streaming save those?.. +} + class CountWaveformTask : public Task { public: CountWaveformTask(DocumentData *doc) diff --git a/Telegram/SourceFiles/storage/localstorage.h b/Telegram/SourceFiles/storage/localstorage.h index 8536c2dd5..0f50d24ed 100644 --- a/Telegram/SourceFiles/storage/localstorage.h +++ b/Telegram/SourceFiles/storage/localstorage.h @@ -112,11 +112,16 @@ bool hasDraft(const PeerId &peer); void writeFileLocation(MediaKey location, const FileLocation &local); FileLocation readFileLocation(MediaKey location, bool check = true); -QString cachePath(); Storage::EncryptionKey cacheKey(); +QString cachePath(); Storage::Cache::Database::Settings cacheSettings(); void updateCacheSettings(Storage::Cache::Database::SettingsUpdate &update); +Storage::EncryptionKey cacheBigFileKey(); +QString cacheBigFilePath(); +Storage::Cache::Database::Settings cacheBigFileSettings(); +void updateCacheBigFileSettings(Storage::Cache::Database::SettingsUpdate &update); + void countVoiceWaveform(DocumentData *document); void cancelTask(TaskId id);