From 873ccf8096e550c9ec80256f9054c0f30b8f9a4b Mon Sep 17 00:00:00 2001 From: John Preston Date: Mon, 4 Sep 2017 22:14:44 +0300 Subject: [PATCH] Implement SharedMediaWithLastViewer using rpl. --- .../history/history_shared_media.cpp | 670 +++++++++++------- .../history/history_shared_media.h | 175 ++--- .../history/history_user_photos.cpp | 70 +- Telegram/SourceFiles/mediaview.cpp | 28 +- Telegram/SourceFiles/mediaview.h | 8 +- .../SourceFiles/storage/storage_facade.cpp | 40 +- Telegram/SourceFiles/storage/storage_facade.h | 10 +- .../storage/storage_shared_media.cpp | 83 +-- .../storage/storage_shared_media.h | 37 +- 9 files changed, 605 insertions(+), 516 deletions(-) diff --git a/Telegram/SourceFiles/history/history_shared_media.cpp b/Telegram/SourceFiles/history/history_shared_media.cpp index b3ea72123..a4cb3bfd0 100644 --- a/Telegram/SourceFiles/history/history_shared_media.cpp +++ b/Telegram/SourceFiles/history/history_shared_media.cpp @@ -28,7 +28,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org namespace { -using Type = SharedMediaViewer::Type; +using Type = SharedMediaSlice::Type; inline MediaOverviewType SharedMediaTypeToOverview(Type type) { switch (type) { @@ -61,14 +61,116 @@ void SharedMediaShowOverview( } } -SharedMediaSlice::SharedMediaSlice(Key key) : SharedMediaSlice(key, base::none) { +class SharedMediaSliceBuilder { +public: + using Type = Storage::SharedMediaType; + using Key = Storage::SharedMediaKey; + + SharedMediaSliceBuilder(Key key, int limitBefore, int limitAfter); + + using Result = Storage::SharedMediaResult; + using SliceUpdate = Storage::SharedMediaSliceUpdate; + using RemoveOne = Storage::SharedMediaRemoveOne; + using RemoveAll = Storage::SharedMediaRemoveAll; + bool applyUpdate(const Result &result); + bool applyUpdate(const SliceUpdate &update); + bool applyUpdate(const RemoveOne &update); + bool applyUpdate(const RemoveAll &update); + + void checkInsufficientMedia(); + using AroundData = std::pair; + rpl::producer insufficientMediaAround() const { + return _insufficientMediaAround.events(); + } + + SharedMediaSlice snapshot() const; + +private: + enum class RequestDirection { + Before, + After, + }; + void requestMessages(RequestDirection direction); + void sliceToLimits(); + + void mergeSliceData( + base::optional count, + const base::flat_set &messageIds, + base::optional skippedBefore = base::none, + base::optional skippedAfter = base::none); + + Key _key; + base::flat_set _ids; + MsgRange _range; + base::optional _fullCount; + base::optional _skippedBefore; + base::optional _skippedAfter; + int _limitBefore = 0; + int _limitAfter = 0; + + rpl::event_stream _insufficientMediaAround; + +}; + +class SharedMediaMergedSliceBuilder { +public: + using Type = SharedMediaMergedSlice::Type; + using Key = SharedMediaMergedSlice::Key; + + SharedMediaMergedSliceBuilder(Key key); + + void applyPartUpdate(SharedMediaSlice &&update); + void applyMigratedUpdate(SharedMediaSlice &&update); + + SharedMediaMergedSlice snapshot() const; + +private: + Key _key; + SharedMediaSlice _part; + base::optional _migrated; + +}; + +class SharedMediaWithLastSliceBuilder { +public: + using Type = SharedMediaWithLastSlice::Type; + using Key = SharedMediaWithLastSlice::Key; + + SharedMediaWithLastSliceBuilder(Key key); + + void applyViewerUpdate(SharedMediaMergedSlice &&update); + void applyEndingUpdate(SharedMediaMergedSlice &&update); + + SharedMediaWithLastSlice snapshot() const; + +private: + Key _key; + SharedMediaWithLastSlice _data; + +}; + +SharedMediaSlice::SharedMediaSlice(Key key) : SharedMediaSlice( + key, + {}, + {}, + base::none, + base::none, + base::none) { } SharedMediaSlice::SharedMediaSlice( Key key, - base::optional fullCount) + const base::flat_set &ids, + MsgRange range, + base::optional fullCount, + base::optional skippedBefore, + base::optional skippedAfter) : _key(key) - , _fullCount(fullCount) { + , _ids(ids) + , _range(range) + , _fullCount(fullCount) + , _skippedBefore(skippedBefore) + , _skippedAfter(skippedAfter) { } base::optional SharedMediaSlice::indexOf(MsgId msgId) const { @@ -119,129 +221,35 @@ QString SharedMediaSlice::debug() const { return before + middle + after; } -SharedMediaViewer::SharedMediaViewer( +SharedMediaSliceBuilder::SharedMediaSliceBuilder( Key key, int limitBefore, int limitAfter) : _key(key) , _limitBefore(limitBefore) - , _limitAfter(limitAfter) - , _data(_key) { - Expects(IsServerMsgId(key.messageId) || (key.messageId == 0)); - Expects((key.messageId != 0) || (limitBefore == 0 && limitAfter == 0)); + , _limitAfter(limitAfter) { } -void SharedMediaViewer::start() { - auto applyUpdateCallback = [this](auto &update) { - this->applyUpdate(update); - }; - subscribe(Auth().storage().sharedMediaSliceUpdated(), applyUpdateCallback); - subscribe(Auth().storage().sharedMediaOneRemoved(), applyUpdateCallback); - subscribe(Auth().storage().sharedMediaAllRemoved(), applyUpdateCallback); - - loadInitial(); -} - -void SharedMediaViewer::loadInitial() { - auto weak = base::make_weak_unique(this); - Auth().storage().query(Storage::SharedMediaQuery( - _key, - _limitBefore, - _limitAfter), [weak](Storage::SharedMediaResult &&result) { - if (weak) { - weak->applyStoredResult(std::move(result)); - } - }); -} - -void SharedMediaViewer::applyStoredResult(Storage::SharedMediaResult &&result) { +bool SharedMediaSliceBuilder::applyUpdate(const Result &result) { mergeSliceData( result.count, result.messageIds, result.skippedBefore, result.skippedAfter); + return true; } -void SharedMediaViewer::mergeSliceData( - base::optional count, - const base::flat_set &messageIds, - base::optional skippedBefore, - base::optional skippedAfter) { - if (messageIds.empty()) { - if (count && _data._fullCount != count) { - _data._fullCount = count; - if (*_data._fullCount <= _data.size()) { - _data._fullCount = _data.size(); - _data._skippedBefore = _data._skippedAfter = 0; - } - updated.notify(_data); - } - sliceToLimits(); - return; - } - if (count) { - _data._fullCount = count; - } - auto wasMinId = _data._ids.empty() ? -1 : _data._ids.front(); - auto wasMaxId = _data._ids.empty() ? -1 : _data._ids.back(); - _data._ids.merge(messageIds.begin(), messageIds.end()); - - auto adjustSkippedBefore = [&](MsgId oldId, int oldSkippedBefore) { - auto it = _data._ids.find(oldId); - Assert(it != _data._ids.end()); - _data._skippedBefore = oldSkippedBefore - (it - _data._ids.begin()); - accumulate_max(*_data._skippedBefore, 0); - }; - if (skippedBefore) { - adjustSkippedBefore(messageIds.front(), *skippedBefore); - } else if (wasMinId >= 0 && _data._skippedBefore) { - adjustSkippedBefore(wasMinId, *_data._skippedBefore); - } else { - _data._skippedBefore = base::none; - } - - auto adjustSkippedAfter = [&](MsgId oldId, int oldSkippedAfter) { - auto it = _data._ids.find(oldId); - Assert(it != _data._ids.end()); - _data._skippedAfter = oldSkippedAfter - (_data._ids.end() - it - 1); - accumulate_max(*_data._skippedAfter, 0); - }; - if (skippedAfter) { - adjustSkippedAfter(messageIds.back(), *skippedAfter); - } else if (wasMaxId >= 0 && _data._skippedAfter) { - adjustSkippedAfter(wasMaxId, *_data._skippedAfter); - } else { - _data._skippedAfter = base::none; - } - - if (_data._fullCount) { - if (_data._skippedBefore && !_data._skippedAfter) { - _data._skippedAfter = *_data._fullCount - - *_data._skippedBefore - - int(_data._ids.size()); - } else if (_data._skippedAfter && !_data._skippedBefore) { - _data._skippedBefore = *_data._fullCount - - *_data._skippedAfter - - int(_data._ids.size()); - } - } - - sliceToLimits(); - - updated.notify(_data); -} - -void SharedMediaViewer::applyUpdate(const SliceUpdate &update) { +bool SharedMediaSliceBuilder::applyUpdate(const SliceUpdate &update) { if (update.peerId != _key.peerId || update.type != _key.type) { - return; + return false; } auto intersects = [](MsgRange range1, MsgRange range2) { return (range1.from <= range2.till) && (range2.from <= range1.till); }; if (!intersects(update.range, { - _data._ids.empty() ? _key.messageId : _data._ids.front(), - _data._ids.empty() ? _key.messageId : _data._ids.back() })) { - return; + _ids.empty() ? _key.messageId : _ids.front(), + _ids.empty() ? _key.messageId : _ids.back() })) { + return false; } auto skippedBefore = (update.range.from == 0) ? 0 @@ -254,92 +262,226 @@ void SharedMediaViewer::applyUpdate(const SliceUpdate &update) { update.messages ? *update.messages : base::flat_set {}, skippedBefore, skippedAfter); + return true; } -void SharedMediaViewer::applyUpdate(const OneRemoved &update) { +bool SharedMediaSliceBuilder::applyUpdate(const RemoveOne &update) { if (update.peerId != _key.peerId || !update.types.test(_key.type)) { - return; + return false; } auto changed = false; - if (_data._fullCount && *_data._fullCount > 0) { - --*_data._fullCount; + if (_fullCount && *_fullCount > 0) { + --*_fullCount; changed = true; } - if (_data._ids.contains(update.messageId)) { - _data._ids.remove(update.messageId); + if (_ids.contains(update.messageId)) { + _ids.remove(update.messageId); changed = true; - } else if (!_data._ids.empty()) { - if (_data._ids.front() > update.messageId - && _data._skippedBefore - && *_data._skippedBefore > 0) { - --*_data._skippedBefore; + } else if (!_ids.empty()) { + if (_ids.front() > update.messageId + && _skippedBefore + && *_skippedBefore > 0) { + --*_skippedBefore; changed = true; - } else if (_data._ids.back() < update.messageId - && _data._skippedAfter - && *_data._skippedAfter > 0) { - --*_data._skippedAfter; + } else if (_ids.back() < update.messageId + && _skippedAfter + && *_skippedAfter > 0) { + --*_skippedAfter; changed = true; } } - if (changed) { - updated.notify(_data); - } + return changed; } -void SharedMediaViewer::applyUpdate(const AllRemoved &update) { +bool SharedMediaSliceBuilder::applyUpdate(const RemoveAll &update) { if (update.peerId != _key.peerId) { + return false; + } + _ids = {}; + _range = { 0, ServerMaxMsgId }; + _fullCount = 0; + _skippedBefore = 0; + _skippedAfter = 0; + return true; +} + +void SharedMediaSliceBuilder::checkInsufficientMedia() { + sliceToLimits(); +} + +void SharedMediaSliceBuilder::mergeSliceData( + base::optional count, + const base::flat_set &messageIds, + base::optional skippedBefore, + base::optional skippedAfter) { + if (messageIds.empty()) { + if (count && _fullCount != count) { + _fullCount = count; + if (*_fullCount <= _ids.size()) { + _fullCount = _ids.size(); + _skippedBefore = _skippedAfter = 0; + } + } + sliceToLimits(); return; } - _data = SharedMediaSlice(_key, 0); - updated.notify(_data); + if (count) { + _fullCount = count; + } + auto wasMinId = _ids.empty() ? -1 : _ids.front(); + auto wasMaxId = _ids.empty() ? -1 : _ids.back(); + _ids.merge(messageIds.begin(), messageIds.end()); + + auto adjustSkippedBefore = [&](MsgId oldId, int oldSkippedBefore) { + auto it = _ids.find(oldId); + Assert(it != _ids.end()); + _skippedBefore = oldSkippedBefore - (it - _ids.begin()); + accumulate_max(*_skippedBefore, 0); + }; + if (skippedBefore) { + adjustSkippedBefore(messageIds.front(), *skippedBefore); + } else if (wasMinId >= 0 && _skippedBefore) { + adjustSkippedBefore(wasMinId, *_skippedBefore); + } else { + _skippedBefore = base::none; + } + + auto adjustSkippedAfter = [&](MsgId oldId, int oldSkippedAfter) { + auto it = _ids.find(oldId); + Assert(it != _ids.end()); + _skippedAfter = oldSkippedAfter - (_ids.end() - it - 1); + accumulate_max(*_skippedAfter, 0); + }; + if (skippedAfter) { + adjustSkippedAfter(messageIds.back(), *skippedAfter); + } else if (wasMaxId >= 0 && _skippedAfter) { + adjustSkippedAfter(wasMaxId, *_skippedAfter); + } else { + _skippedAfter = base::none; + } + + if (_fullCount) { + if (_skippedBefore && !_skippedAfter) { + _skippedAfter = *_fullCount + - *_skippedBefore + - int(_ids.size()); + } else if (_skippedAfter && !_skippedBefore) { + _skippedBefore = *_fullCount + - *_skippedAfter + - int(_ids.size()); + } + } + + sliceToLimits(); } -void SharedMediaViewer::sliceToLimits() { - auto aroundIt = base::lower_bound(_data._ids, _key.messageId); - auto removeFromBegin = (aroundIt - _data._ids.begin() - _limitBefore); - auto removeFromEnd = (_data._ids.end() - aroundIt - _limitAfter - 1); +void SharedMediaSliceBuilder::sliceToLimits() { + auto aroundIt = base::lower_bound(_ids, _key.messageId); + auto removeFromBegin = (aroundIt - _ids.begin() - _limitBefore); + auto removeFromEnd = (_ids.end() - aroundIt - _limitAfter - 1); if (removeFromBegin > 0) { - _data._ids.erase(_data._ids.begin(), _data._ids.begin() + removeFromBegin); - if (_data._skippedBefore) { - *_data._skippedBefore += removeFromBegin; + _ids.erase(_ids.begin(), _ids.begin() + removeFromBegin); + if (_skippedBefore) { + *_skippedBefore += removeFromBegin; } - } else if (removeFromBegin < 0 && (!_data._skippedBefore || *_data._skippedBefore > 0)) { + } else if (removeFromBegin < 0 && (!_skippedBefore || *_skippedBefore > 0)) { requestMessages(RequestDirection::Before); } if (removeFromEnd > 0) { - _data._ids.erase(_data._ids.end() - removeFromEnd, _data._ids.end()); - if (_data._skippedAfter) { - *_data._skippedAfter += removeFromEnd; + _ids.erase(_ids.end() - removeFromEnd, _ids.end()); + if (_skippedAfter) { + *_skippedAfter += removeFromEnd; } - } else if (removeFromEnd < 0 && (!_data._skippedAfter || *_data._skippedAfter > 0)) { + } else if (removeFromEnd < 0 && (!_skippedAfter || *_skippedAfter > 0)) { requestMessages(RequestDirection::After); } } -void SharedMediaViewer::requestMessages(RequestDirection direction) { +void SharedMediaSliceBuilder::requestMessages(RequestDirection direction) { using SliceType = ApiWrap::SliceType; - auto requestAroundData = [&]() -> std::pair { - if (_data._ids.empty()) { + auto requestAroundData = [&]() -> AroundData { + if (_ids.empty()) { return { _key.messageId, SliceType::Around }; } else if (direction == RequestDirection::Before) { - return { _data._ids.front(), SliceType::Before }; + return { _ids.front(), SliceType::Before }; } - return { _data._ids.back(), SliceType::After }; - }(); - Auth().api().requestSharedMedia( - App::peer(_key.peerId), - _key.type, - requestAroundData.first, - requestAroundData.second); + return { _ids.back(), SliceType::After }; + }; + _insufficientMediaAround.fire(requestAroundData()); } -SharedMediaSliceMerged::SharedMediaSliceMerged(Key key) : SharedMediaSliceMerged( +SharedMediaSlice SharedMediaSliceBuilder::snapshot() const { + return SharedMediaSlice( + _key, + _ids, + _range, + _fullCount, + _skippedBefore, + _skippedAfter); +} + +rpl::producer SharedMediaViewer( + SharedMediaSlice::Key key, + int limitBefore, + int limitAfter) { + Expects(IsServerMsgId(key.messageId) || (key.messageId == 0)); + Expects((key.messageId != 0) || (limitBefore == 0 && limitAfter == 0)); + + return [=](auto consumer) { + auto lifetime = rpl::lifetime(); + auto builder = lifetime.make_state( + key, + limitBefore, + limitAfter); + auto applyUpdate = [=](auto &&update) { + if (builder->applyUpdate(std::move(update))) { + consumer.put_next(builder->snapshot()); + } + }; + auto requestMediaAround = [ + peer = App::peer(key.peerId), + type = key.type + ](SharedMediaSliceBuilder::AroundData data) { + Auth().api().requestSharedMedia( + peer, + type, + data.first, + data.second); + }; + builder->insufficientMediaAround() + | rpl::on_next(requestMediaAround) + | rpl::start(lifetime); + + Auth().storage().sharedMediaSliceUpdated() + | rpl::on_next(applyUpdate) + | rpl::start(lifetime); + Auth().storage().sharedMediaOneRemoved() + | rpl::on_next(applyUpdate) + | rpl::start(lifetime); + Auth().storage().sharedMediaAllRemoved() + | rpl::on_next(applyUpdate) + | rpl::start(lifetime); + + Auth().storage().query(Storage::SharedMediaQuery( + key, + limitBefore, + limitAfter + )) + | rpl::on_next(applyUpdate) + | rpl::on_done([=] { builder->checkInsufficientMedia(); }) + | rpl::start(lifetime); + + return lifetime; + }; +} + +SharedMediaMergedSlice::SharedMediaMergedSlice(Key key) : SharedMediaMergedSlice( key, SharedMediaSlice(PartKey(key)), MigratedSlice(key)) { } -SharedMediaSliceMerged::SharedMediaSliceMerged( +SharedMediaMergedSlice::SharedMediaMergedSlice( Key key, SharedMediaSlice part, base::optional migrated) @@ -348,13 +490,13 @@ SharedMediaSliceMerged::SharedMediaSliceMerged( , _migrated(std::move(migrated)) { } -base::optional SharedMediaSliceMerged::fullCount() const { +base::optional SharedMediaMergedSlice::fullCount() const { return Add( _part.fullCount(), _migrated ? _migrated->fullCount() : 0); } -base::optional SharedMediaSliceMerged::skippedBefore() const { +base::optional SharedMediaMergedSlice::skippedBefore() const { return Add( isolatedInMigrated() ? 0 : _part.skippedBefore(), _migrated @@ -365,14 +507,14 @@ base::optional SharedMediaSliceMerged::skippedBefore() const { ); } -base::optional SharedMediaSliceMerged::skippedAfter() const { +base::optional SharedMediaMergedSlice::skippedAfter() const { return Add( isolatedInMigrated() ? _part.fullCount() : _part.skippedAfter(), isolatedInPart() ? 0 : _migrated->skippedAfter() ); } -base::optional SharedMediaSliceMerged::indexOf(FullMsgId fullId) const { +base::optional SharedMediaMergedSlice::indexOf(FullMsgId fullId) const { return isFromPart(fullId) ? (_part.indexOf(fullId.msg) | func::add(migratedSize())) : isolatedInPart() @@ -382,12 +524,12 @@ base::optional SharedMediaSliceMerged::indexOf(FullMsgId fullId) const { : base::none; } -int SharedMediaSliceMerged::size() const { +int SharedMediaMergedSlice::size() const { return (isolatedInPart() ? 0 : migratedSize()) + (isolatedInMigrated() ? 0 : _part.size()); } -FullMsgId SharedMediaSliceMerged::operator[](int index) const { +FullMsgId SharedMediaMergedSlice::operator[](int index) const { Expects(index >= 0 && index < size()); if (auto size = migratedSize()) { @@ -399,7 +541,7 @@ FullMsgId SharedMediaSliceMerged::operator[](int index) const { return ComputeId(_part, index); } -base::optional SharedMediaSliceMerged::distance(const Key &a, const Key &b) const { +base::optional SharedMediaMergedSlice::distance(const Key &a, const Key &b) const { if (a.type != _key.type || b.type != _key.type || a.peerId != _key.peerId @@ -416,65 +558,79 @@ base::optional SharedMediaSliceMerged::distance(const Key &a, const Key &b) return base::none; } -QString SharedMediaSliceMerged::debug() const { +QString SharedMediaMergedSlice::debug() const { return (_migrated ? (_migrated->debug() + '|') : QString()) + _part.debug(); } -SharedMediaViewerMerged::SharedMediaViewerMerged( - Key key, - int limitBefore, - int limitAfter) +SharedMediaMergedSliceBuilder::SharedMediaMergedSliceBuilder(Key key) : _key(key) - , _limitBefore(limitBefore) - , _limitAfter(limitAfter) - , _part(SharedMediaSliceMerged::PartKey(_key), _limitBefore, _limitAfter) - , _migrated(MigratedViewer(_key, _limitBefore, _limitAfter)) - , _data(_key) { + , _part(SharedMediaMergedSlice::PartKey(_key)) + , _migrated(SharedMediaMergedSlice::MigratedSlice(_key)) { +} + +void SharedMediaMergedSliceBuilder::applyPartUpdate(SharedMediaSlice &&update) { + _part = std::move(update); +} + +void SharedMediaMergedSliceBuilder::applyMigratedUpdate(SharedMediaSlice &&update) { + _migrated = std::move(update); +} + +SharedMediaMergedSlice SharedMediaMergedSliceBuilder::snapshot() const { + return SharedMediaMergedSlice( + _key, + _part, + _migrated + ); +} + +rpl::producer SharedMediaMergedViewer( + SharedMediaMergedSlice::Key key, + int limitBefore, + int limitAfter) { Expects(IsServerMsgId(key.universalId) || (key.universalId == 0) || (IsServerMsgId(-key.universalId) && key.migratedPeerId != 0)); Expects((key.universalId != 0) || (limitBefore == 0 && limitAfter == 0)); -} -std::unique_ptr SharedMediaViewerMerged::MigratedViewer( - const Key &key, - int limitBefore, - int limitAfter) { - return key.migratedPeerId - ? std::make_unique( - SharedMediaSliceMerged::MigratedKey(key), + return [=](auto consumer) { + auto lifetime = rpl::lifetime(); + auto builder = lifetime.make_state(key); + + SharedMediaViewer( + SharedMediaMergedSlice::PartKey(key), limitBefore, - limitAfter) - : nullptr; + limitAfter + ) | rpl::on_next([=](SharedMediaSlice &&update) { + builder->applyPartUpdate(std::move(update)); + consumer.put_next(builder->snapshot()); + }) | rpl::start(lifetime); + + if (key.migratedPeerId) { + SharedMediaViewer( + SharedMediaMergedSlice::MigratedKey(key), + limitBefore, + limitAfter + ) | rpl::on_next([=](SharedMediaSlice &&update) { + builder->applyMigratedUpdate(std::move(update)); + consumer.put_next(builder->snapshot()); + }) | rpl::start(lifetime); + } + + return lifetime; + }; } -void SharedMediaViewerMerged::start() { - subscribe(_part.updated, [this](const SharedMediaSlice &update) { - _data = SharedMediaSliceMerged(_key, update, std::move(_data._migrated)); - updated.notify(_data); - }); - if (_migrated) { - subscribe(_migrated->updated, [this](const SharedMediaSlice &update) { - _data = SharedMediaSliceMerged(_key, std::move(_data._part), update); - updated.notify(_data); - }); - } - _part.start(); - if (_migrated) { - _migrated->start(); - } -} - -SharedMediaSliceWithLast::SharedMediaSliceWithLast(Key key) : SharedMediaSliceWithLast( +SharedMediaWithLastSlice::SharedMediaWithLastSlice(Key key) : SharedMediaWithLastSlice( key, - SharedMediaSliceMerged(ViewerKey(key)), + SharedMediaMergedSlice(ViewerKey(key)), EndingSlice(key)) { } -SharedMediaSliceWithLast::SharedMediaSliceWithLast( +SharedMediaWithLastSlice::SharedMediaWithLastSlice( Key key, - SharedMediaSliceMerged slice, - base::optional ending) + SharedMediaMergedSlice slice, + base::optional ending) : _key(key) , _slice(std::move(slice)) , _ending(std::move(ending)) @@ -484,17 +640,17 @@ SharedMediaSliceWithLast::SharedMediaSliceWithLast( : false) { } -base::optional SharedMediaSliceWithLast::fullCount() const { +base::optional SharedMediaWithLastSlice::fullCount() const { return Add( _slice.fullCount(), _isolatedLastPhoto | [](bool isolated) { return isolated ? 1 : 0; }); } -base::optional SharedMediaSliceWithLast::skippedBefore() const { +base::optional SharedMediaWithLastSlice::skippedBefore() const { return _slice.skippedBefore(); } -base::optional SharedMediaSliceWithLast::skippedAfter() const { +base::optional SharedMediaWithLastSlice::skippedAfter() const { return isolatedInSlice() ? Add( _slice.skippedAfter(), @@ -502,7 +658,7 @@ base::optional SharedMediaSliceWithLast::skippedAfter() const { : (lastPhotoSkip() | [](int) { return 0; }); } -base::optional SharedMediaSliceWithLast::indexOf(Value value) const { +base::optional SharedMediaWithLastSlice::indexOf(Value value) const { return base::get_if(&value) ? _slice.indexOf(*base::get_if(&value)) : (isolatedInSlice() @@ -511,12 +667,12 @@ base::optional SharedMediaSliceWithLast::indexOf(Value value) const { : Add(_slice.size() - 1, lastPhotoSkip()); } -int SharedMediaSliceWithLast::size() const { +int SharedMediaWithLastSlice::size() const { return _slice.size() + ((!isolatedInSlice() && lastPhotoSkip() == 1) ? 1 : 0); } -SharedMediaSliceWithLast::Value SharedMediaSliceWithLast::operator[](int index) const { +SharedMediaWithLastSlice::Value SharedMediaWithLastSlice::operator[](int index) const { Expects(index >= 0 && index < size()); return (index < _slice.size()) @@ -524,7 +680,7 @@ SharedMediaSliceWithLast::Value SharedMediaSliceWithLast::operator[](int index) : Value(App::photo(_lastPhotoId)); } -base::optional SharedMediaSliceWithLast::distance(const Key &a, const Key &b) const { +base::optional SharedMediaWithLastSlice::distance(const Key &a, const Key &b) const { if (a.type != _key.type || b.type != _key.type || a.peerId != _key.peerId @@ -541,22 +697,22 @@ base::optional SharedMediaSliceWithLast::distance(const Key &a, const Key & return base::none; } -QString SharedMediaSliceWithLast::debug() const { +QString SharedMediaWithLastSlice::debug() const { return _slice.debug() + (_isolatedLastPhoto ? (*_isolatedLastPhoto ? "@" : "") : "?"); } -PhotoId SharedMediaSliceWithLast::LastPeerPhotoId(PeerId peerId) { +PhotoId SharedMediaWithLastSlice::LastPeerPhotoId(PeerId peerId) { if (auto peer = App::peerLoaded(peerId)) { return peer->photoId; } return UnknownPeerPhotoId; } -base::optional SharedMediaSliceWithLast::IsLastIsolated( - const SharedMediaSliceMerged &slice, - const base::optional &ending, +base::optional SharedMediaWithLastSlice::IsLastIsolated( + const SharedMediaMergedSlice &slice, + const base::optional &ending, PhotoId lastPeerPhotoId) { if (lastPeerPhotoId == UnknownPeerPhotoId) { return base::none; @@ -575,8 +731,8 @@ base::optional SharedMediaSliceWithLast::IsLastIsolated( | [&](PhotoId photoId) { return lastPeerPhotoId != photoId; }; } -base::optional SharedMediaSliceWithLast::LastFullMsgId( - const SharedMediaSliceMerged &slice) { +base::optional SharedMediaWithLastSlice::LastFullMsgId( + const SharedMediaMergedSlice &slice) { if (slice.fullCount() == 0) { return FullMsgId(); } else if (slice.size() == 0 || slice.skippedAfter() != 0) { @@ -585,43 +741,59 @@ base::optional SharedMediaSliceWithLast::LastFullMsgId( return slice[slice.size() - 1]; } -SharedMediaViewerWithLast::SharedMediaViewerWithLast( - Key key, - int limitBefore, - int limitAfter) +rpl::producer SharedMediaWithLastViewer( + SharedMediaWithLastSlice::Key key, + int limitBefore, + int limitAfter) { + return [=](auto consumer) { + auto lifetime = rpl::lifetime(); + auto builder = lifetime.make_state(key); + + SharedMediaMergedViewer( + SharedMediaWithLastSlice::ViewerKey(key), + limitBefore, + limitAfter + ) | rpl::on_next([=](SharedMediaMergedSlice &&update) { + builder->applyViewerUpdate(std::move(update)); + consumer.put_next(builder->snapshot()); + }) | rpl::start(lifetime); + + if (base::get_if(&key.universalId)) { + SharedMediaMergedViewer( + SharedMediaWithLastSlice::EndingKey(key), + 1, + 1 + ) | rpl::on_next([=](SharedMediaMergedSlice &&update) { + builder->applyEndingUpdate(std::move(update)); + consumer.put_next(builder->snapshot()); + }) | rpl::start(lifetime); + } + + return lifetime; + }; +} + +SharedMediaWithLastSliceBuilder::SharedMediaWithLastSliceBuilder(Key key) : _key(key) - , _limitBefore(limitBefore) - , _limitAfter(limitAfter) - , _viewer(SharedMediaSliceWithLast::ViewerKey(_key), _limitBefore, _limitAfter) - , _ending(EndingViewer(_key, _limitBefore, _limitAfter)) , _data(_key) { } -std::unique_ptr SharedMediaViewerWithLast::EndingViewer( - const Key &key, - int limitBefore, - int limitAfter) { - return base::get_if(&key.universalId) - ? std::make_unique( - SharedMediaSliceWithLast::EndingKey(key), - 1, - 1) - : nullptr; +void SharedMediaWithLastSliceBuilder::applyViewerUpdate( + SharedMediaMergedSlice &&update) { + _data = SharedMediaWithLastSlice( + _key, + std::move(update), + std::move(_data._ending)); } -void SharedMediaViewerWithLast::start() { - subscribe(_viewer.updated, [this](const SharedMediaSliceMerged &update) { - _data = SharedMediaSliceWithLast(_key, update, std::move(_data._ending)); - updated.notify(_data); - }); - if (_ending) { - subscribe(_ending->updated, [this](const SharedMediaSliceMerged &update) { - _data = SharedMediaSliceWithLast(_key, std::move(_data._slice), update); - updated.notify(_data); - }); - } - _viewer.start(); - if (_ending) { - _ending->start(); - } +void SharedMediaWithLastSliceBuilder::applyEndingUpdate( + SharedMediaMergedSlice &&update) { + _data = SharedMediaWithLastSlice( + _key, + std::move(_data._slice), + std::move(update)); +} + +SharedMediaWithLastSlice SharedMediaWithLastSliceBuilder::snapshot() const { + return _data; } diff --git a/Telegram/SourceFiles/history/history_shared_media.h b/Telegram/SourceFiles/history/history_shared_media.h index e4abec3bb..8cb6f8f28 100644 --- a/Telegram/SourceFiles/history/history_shared_media.h +++ b/Telegram/SourceFiles/history/history_shared_media.h @@ -29,13 +29,19 @@ void SharedMediaShowOverview( Storage::SharedMediaType type, not_null history); -class SharedMediaViewer; class SharedMediaSlice { public: + using Type = Storage::SharedMediaType; using Key = Storage::SharedMediaKey; SharedMediaSlice(Key key); - SharedMediaSlice(Key key, base::optional fullCount); + SharedMediaSlice( + Key key, + const base::flat_set &ids, + MsgRange range, + base::optional fullCount, + base::optional skippedBefore, + base::optional skippedAfter); const Key &key() const { return _key; } @@ -57,56 +63,16 @@ private: base::optional _skippedBefore; base::optional _skippedAfter; - friend class SharedMediaViewer; + class SharedMediaSliceBuilder; }; -class SharedMediaViewer : - private base::Subscriber, - public base::enable_weak_from_this { -public: - using Type = Storage::SharedMediaType; - using Key = Storage::SharedMediaKey; +rpl::producer SharedMediaViewer( + SharedMediaSlice::Key key, + int limitBefore, + int limitAfter); - SharedMediaViewer(Key key, int limitBefore, int limitAfter); - - void start(); - - base::Observable updated; - -private: - using InitialResult = Storage::SharedMediaResult; - using SliceUpdate = Storage::SharedMediaSliceUpdate; - using OneRemoved = Storage::SharedMediaRemoveOne; - using AllRemoved = Storage::SharedMediaRemoveAll; - - void loadInitial(); - enum class RequestDirection { - Before, - After, - }; - void requestMessages(RequestDirection direction); - void applyStoredResult(InitialResult &&result); - void applyUpdate(const SliceUpdate &update); - void applyUpdate(const OneRemoved &update); - void applyUpdate(const AllRemoved &update); - void sliceToLimits(); - - void mergeSliceData( - base::optional count, - const base::flat_set &messageIds, - base::optional skippedBefore = base::none, - base::optional skippedAfter = base::none); - - Key _key; - int _limitBefore = 0; - int _limitAfter = 0; - SharedMediaSlice _data; - -}; - -class SharedMediaViewerMerged; -class SharedMediaSliceMerged { +class SharedMediaMergedSlice { public: using Type = Storage::SharedMediaType; using UniversalMsgId = MsgId; @@ -136,8 +102,8 @@ public: }; - SharedMediaSliceMerged(Key key); - SharedMediaSliceMerged( + SharedMediaMergedSlice(Key key); + SharedMediaMergedSlice( Key key, SharedMediaSlice part, base::optional migrated); @@ -154,7 +120,6 @@ public: QString debug() const; -private: static SharedMediaSlice::Key PartKey(const Key &key) { return { key.peerId, @@ -169,6 +134,8 @@ private: (key.universalId <= 0) ? (-key.universalId) : (ServerMaxMsgId - 1) }; } + +private: static base::optional MigratedSlice(const Key &key) { return key.migratedPeerId ? base::make_optional(SharedMediaSlice(MigratedKey(key))) @@ -222,47 +189,22 @@ private: SharedMediaSlice _part; base::optional _migrated; - friend class SharedMediaViewerMerged; + friend class SharedMediaMergedSliceBuilder; }; -class SharedMediaViewerMerged : private base::Subscriber { -public: - using Type = SharedMediaSliceMerged::Type; - using Key = SharedMediaSliceMerged::Key; +rpl::producer SharedMediaMergedViewer( + SharedMediaMergedSlice::Key key, + int limitBefore, + int limitAfter); - SharedMediaViewerMerged( - Key key, - int limitBefore, - int limitAfter); - - void start(); - - base::Observable updated; - -private: - static std::unique_ptr MigratedViewer( - const Key &key, - int limitBefore, - int limitAfter); - - Key _key; - int _limitBefore = 0; - int _limitAfter = 0; - SharedMediaViewer _part; - std::unique_ptr _migrated; - SharedMediaSliceMerged _data; - -}; - -class SharedMediaViewerWithLast; -class SharedMediaSliceWithLast { +class SharedMediaWithLastSlice { public: using Type = Storage::SharedMediaType; // base::none in those mean CurrentPeerPhoto. using Value = base::variant>; - using MessageId = SharedMediaSliceMerged::UniversalMsgId; + using MessageId = SharedMediaMergedSlice::UniversalMsgId; using UniversalMsgId = base::variant< MessageId, not_null>; @@ -295,11 +237,11 @@ public: }; - SharedMediaSliceWithLast(Key key); - SharedMediaSliceWithLast( + SharedMediaWithLastSlice(Key key); + SharedMediaWithLastSlice( Key key, - SharedMediaSliceMerged slice, - base::optional ending); + SharedMediaMergedSlice slice, + base::optional ending); base::optional fullCount() const; base::optional skippedBefore() const; @@ -311,18 +253,17 @@ public: QString debug() const; -private: - static SharedMediaSliceMerged::Key ViewerKey(const Key &key) { + static SharedMediaMergedSlice::Key ViewerKey(const Key &key) { return { key.peerId, key.migratedPeerId, key.type, base::get_if(&key.universalId) - ? (*base::get_if(&key.universalId)) - : ServerMaxMsgId - 1 + ? (*base::get_if(&key.universalId)) + : ServerMaxMsgId - 1 }; } - static SharedMediaSliceMerged::Key EndingKey(const Key &key) { + static SharedMediaMergedSlice::Key EndingKey(const Key &key) { return { key.peerId, key.migratedPeerId, @@ -330,19 +271,21 @@ private: ServerMaxMsgId - 1 }; } - static base::optional EndingSlice(const Key &key) { + +private: + static base::optional EndingSlice(const Key &key) { return base::get_if(&key.universalId) - ? base::make_optional(SharedMediaSliceMerged(EndingKey(key))) + ? base::make_optional(SharedMediaMergedSlice(EndingKey(key))) : base::none; } static PhotoId LastPeerPhotoId(PeerId peerId); static base::optional IsLastIsolated( - const SharedMediaSliceMerged &slice, - const base::optional &ending, + const SharedMediaMergedSlice &slice, + const base::optional &ending, PhotoId lastPeerPhotoId); static base::optional LastFullMsgId( - const SharedMediaSliceMerged &slice); + const SharedMediaMergedSlice &slice); static base::optional Add( const base::optional &a, const base::optional &b) { @@ -371,40 +314,16 @@ private: } Key _key; - SharedMediaSliceMerged _slice; - base::optional _ending; + SharedMediaMergedSlice _slice; + base::optional _ending; PhotoId _lastPhotoId = 0; base::optional _isolatedLastPhoto; - friend class SharedMediaViewerWithLast; + friend class SharedMediaWithLastSliceBuilder; }; -class SharedMediaViewerWithLast : private base::Subscriber { -public: - using Type = SharedMediaSliceWithLast::Type; - using Key = SharedMediaSliceWithLast::Key; - - SharedMediaViewerWithLast( - Key key, - int limitBefore, - int limitAfter); - - void start(); - - base::Observable updated; - -private: - static std::unique_ptr EndingViewer( - const Key &key, - int limitBefore, - int limitAfter); - - Key _key; - int _limitBefore = 0; - int _limitAfter = 0; - SharedMediaViewerMerged _viewer; - std::unique_ptr _ending; - SharedMediaSliceWithLast _data; - -}; +rpl::producer SharedMediaWithLastViewer( + SharedMediaWithLastSlice::Key key, + int limitBefore, + int limitAfter); diff --git a/Telegram/SourceFiles/history/history_user_photos.cpp b/Telegram/SourceFiles/history/history_user_photos.cpp index 987755d4e..823421f70 100644 --- a/Telegram/SourceFiles/history/history_user_photos.cpp +++ b/Telegram/SourceFiles/history/history_user_photos.cpp @@ -25,6 +25,41 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "storage/storage_facade.h" #include "storage/storage_user_photos.h" +class UserPhotosSliceBuilder { +public: + using Key = UserPhotosSlice::Key; + + UserPhotosSliceBuilder(Key key, int limitBefore, int limitAfter); + + bool applyUpdate(const Storage::UserPhotosResult &update); + bool applyUpdate(const Storage::UserPhotosSliceUpdate &update); + void checkInsufficientPhotos(); + rpl::producer insufficientPhotosAround() const { + return _insufficientPhotosAround.events(); + } + + UserPhotosSlice snapshot() const; + +private: + void mergeSliceData( + base::optional count, + const std::deque &photoIds, + base::optional skippedBefore, + int skippedAfter); + void sliceToLimits(); + + Key _key; + std::deque _ids; + base::optional _fullCount; + base::optional _skippedBefore; + int _skippedAfter = 0; + int _limitBefore = 0; + int _limitAfter = 0; + + rpl::event_stream _insufficientPhotosAround; + +}; + UserPhotosSlice::UserPhotosSlice(Key key) : UserPhotosSlice( key, {}, @@ -90,41 +125,6 @@ QString UserPhotosSlice::debug() const { return before + middle + after; } -class UserPhotosSliceBuilder { -public: - using Key = UserPhotosSlice::Key; - - UserPhotosSliceBuilder(Key key, int limitBefore, int limitAfter); - - bool applyUpdate(const Storage::UserPhotosResult &update); - bool applyUpdate(const Storage::UserPhotosSliceUpdate &update); - void checkInsufficientPhotos(); - rpl::producer insufficientPhotosAround() const { - return _insufficientPhotosAround.events(); - } - - UserPhotosSlice snapshot() const; - -private: - void mergeSliceData( - base::optional count, - const std::deque &photoIds, - base::optional skippedBefore, - int skippedAfter); - void sliceToLimits(); - - Key _key; - std::deque _ids; - base::optional _fullCount; - base::optional _skippedBefore; - int _skippedAfter = 0; - int _limitBefore = 0; - int _limitAfter = 0; - - rpl::event_stream _insufficientPhotosAround; - -}; - UserPhotosSliceBuilder::UserPhotosSliceBuilder( Key key, int limitBefore, diff --git a/Telegram/SourceFiles/mediaview.cpp b/Telegram/SourceFiles/mediaview.cpp index 31b871164..cc8615a25 100644 --- a/Telegram/SourceFiles/mediaview.cpp +++ b/Telegram/SourceFiles/mediaview.cpp @@ -68,18 +68,15 @@ constexpr auto kIdsPreloadAfter = 28; } // namespace struct MediaView::SharedMedia { - SharedMedia(SharedMediaViewerWithLast::Key key) - : key(key) - , slice(key, kIdsLimit, kIdsLimit) { + SharedMedia(SharedMediaWithLastSlice::Key key) : key(key) { } - SharedMediaViewerWithLast::Key key; - SharedMediaViewerWithLast slice; + SharedMediaWithLastSlice::Key key; + rpl::lifetime lifetime; }; struct MediaView::UserPhotos { - UserPhotos(UserPhotosSlice::Key key) - : key(key) { + UserPhotos(UserPhotosSlice::Key key) : key(key) { } UserPhotosSlice::Key key; @@ -1044,7 +1041,7 @@ bool MediaView::validSharedMedia() const { return false; } auto countDistanceInData = [](const auto &a, const auto &b) { - return [&](const SharedMediaSliceWithLast &data) { + return [&](const SharedMediaWithLastSlice &data) { return data.distance(a, b); }; }; @@ -1063,21 +1060,24 @@ bool MediaView::validSharedMedia() const { void MediaView::validateSharedMedia() { if (auto key = sharedMediaKey()) { _sharedMedia = std::make_unique(*key); - subscribe(_sharedMedia->slice.updated, [this](const SharedMediaSliceWithLast &data) { - handleSharedMediaUpdate(data); - }); - _sharedMedia->slice.start(); + SharedMediaWithLastViewer( + *key, + kIdsLimit, + kIdsLimit + ) | rpl::on_next([this](SharedMediaWithLastSlice &&update) { + handleSharedMediaUpdate(std::move(update)); + }) | rpl::start(_sharedMedia->lifetime); } else { _sharedMedia = nullptr; _sharedMediaData = base::none; } } -void MediaView::handleSharedMediaUpdate(const SharedMediaSliceWithLast &update) { +void MediaView::handleSharedMediaUpdate(SharedMediaWithLastSlice &&update) { if ((!_photo && !_doc) || !_sharedMedia) { _sharedMediaData = base::none; } else { - _sharedMediaData = update; + _sharedMediaData = std::move(update); } findCurrent(); updateControls(); diff --git a/Telegram/SourceFiles/mediaview.h b/Telegram/SourceFiles/mediaview.h index e2cffb6f4..1fe4aa088 100644 --- a/Telegram/SourceFiles/mediaview.h +++ b/Telegram/SourceFiles/mediaview.h @@ -169,13 +169,13 @@ private: void updateMixerVideoVolume() const; struct SharedMedia; - using SharedMediaType = SharedMediaViewerWithLast::Type; - using SharedMediaKey = SharedMediaViewerWithLast::Key; + using SharedMediaType = SharedMediaWithLastSlice::Type; + using SharedMediaKey = SharedMediaWithLastSlice::Key; base::optional sharedMediaType() const; base::optional sharedMediaKey() const; bool validSharedMedia() const; void validateSharedMedia(); - void handleSharedMediaUpdate(const SharedMediaSliceWithLast &update); + void handleSharedMediaUpdate(SharedMediaWithLastSlice &&update); struct UserPhotos; using UserPhotosKey = UserPhotosSlice::Key; @@ -251,7 +251,7 @@ private: PhotoData *_photo = nullptr; DocumentData *_doc = nullptr; std::unique_ptr _sharedMedia; - base::optional _sharedMediaData; + base::optional _sharedMediaData; std::unique_ptr _userPhotos; base::optional _userPhotosData; diff --git a/Telegram/SourceFiles/storage/storage_facade.cpp b/Telegram/SourceFiles/storage/storage_facade.cpp index 17c50faba..b881e657a 100644 --- a/Telegram/SourceFiles/storage/storage_facade.cpp +++ b/Telegram/SourceFiles/storage/storage_facade.cpp @@ -32,13 +32,11 @@ public: void add(SharedMediaAddSlice &&query); void remove(SharedMediaRemoveOne &&query); void remove(SharedMediaRemoveAll &&query); - void query( - SharedMediaQuery &&query, - base::lambda_once &&callback); + rpl::producer query(SharedMediaQuery &&query) const; - base::Observable &sharedMediaSliceUpdated(); - base::Observable &sharedMediaOneRemoved(); - base::Observable &sharedMediaAllRemoved(); + rpl::producer sharedMediaSliceUpdated() const; + rpl::producer sharedMediaOneRemoved() const; + rpl::producer sharedMediaAllRemoved() const; void add(UserPhotosAddNew &&query); void add(UserPhotosAddSlice &&query); @@ -74,22 +72,20 @@ void Facade::Impl::remove(SharedMediaRemoveAll &&query) { _sharedMedia.remove(std::move(query)); } -void Facade::Impl::query( - SharedMediaQuery &&query, - base::lambda_once &&callback) { - _sharedMedia.query(query, std::move(callback)); +rpl::producer Facade::Impl::query(SharedMediaQuery &&query) const { + return _sharedMedia.query(std::move(query)); } -base::Observable &Facade::Impl::sharedMediaSliceUpdated() { - return _sharedMedia.sliceUpdated; +rpl::producer Facade::Impl::sharedMediaSliceUpdated() const { + return _sharedMedia.sliceUpdated(); } -base::Observable &Facade::Impl::sharedMediaOneRemoved() { - return _sharedMedia.oneRemoved; +rpl::producer Facade::Impl::sharedMediaOneRemoved() const { + return _sharedMedia.oneRemoved(); } -base::Observable &Facade::Impl::sharedMediaAllRemoved() { - return _sharedMedia.allRemoved; +rpl::producer Facade::Impl::sharedMediaAllRemoved() const { + return _sharedMedia.allRemoved(); } void Facade::Impl::add(UserPhotosAddNew &&query) { @@ -139,21 +135,19 @@ void Facade::remove(SharedMediaRemoveAll &&query) { _impl->remove(std::move(query)); } -void Facade::query( - SharedMediaQuery &&query, - base::lambda_once &&callback) { - _impl->query(std::move(query), std::move(callback)); +rpl::producer Facade::query(SharedMediaQuery &&query) const { + return _impl->query(std::move(query)); } -base::Observable &Facade::sharedMediaSliceUpdated() { +rpl::producer Facade::sharedMediaSliceUpdated() const { return _impl->sharedMediaSliceUpdated(); } -base::Observable &Facade::sharedMediaOneRemoved() { +rpl::producer Facade::sharedMediaOneRemoved() const { return _impl->sharedMediaOneRemoved(); } -base::Observable &Facade::sharedMediaAllRemoved() { +rpl::producer Facade::sharedMediaAllRemoved() const { return _impl->sharedMediaAllRemoved(); } diff --git a/Telegram/SourceFiles/storage/storage_facade.h b/Telegram/SourceFiles/storage/storage_facade.h index 7de37f450..86264a206 100644 --- a/Telegram/SourceFiles/storage/storage_facade.h +++ b/Telegram/SourceFiles/storage/storage_facade.h @@ -51,13 +51,11 @@ public: void add(SharedMediaAddSlice &&query); void remove(SharedMediaRemoveOne &&query); void remove(SharedMediaRemoveAll &&query); - void query( - SharedMediaQuery &&query, - base::lambda_once &&callback); - base::Observable &sharedMediaSliceUpdated(); - base::Observable &sharedMediaOneRemoved(); - base::Observable &sharedMediaAllRemoved(); + rpl::producer query(SharedMediaQuery &&query) const; + rpl::producer sharedMediaSliceUpdated() const; + rpl::producer sharedMediaOneRemoved() const; + rpl::producer sharedMediaAllRemoved() const; void add(UserPhotosAddNew &&query); void add(UserPhotosAddSlice &&query); diff --git a/Telegram/SourceFiles/storage/storage_shared_media.cpp b/Telegram/SourceFiles/storage/storage_shared_media.cpp index eca1aee20..f812b4b42 100644 --- a/Telegram/SourceFiles/storage/storage_shared_media.cpp +++ b/Telegram/SourceFiles/storage/storage_shared_media.cpp @@ -128,7 +128,7 @@ void SharedMedia::List::addRange( } } update.count = _count; - sliceUpdated.notify(update, true); + _sliceUpdated.fire(std::move(update)); } void SharedMedia::List::addNew(MsgId messageId) { @@ -171,33 +171,28 @@ void SharedMedia::List::removeAll() { _count = 0; } -void SharedMedia::List::query( - const SharedMediaQuery &query, - base::lambda_once &&callback) { - auto result = SharedMediaResult {}; - result.count = _count; - - auto slice = base::lower_bound( - _slices, - query.key.messageId, - [](const Slice &slice, MsgId id) { return slice.range.till < id; }); - if (slice != _slices.end() && slice->range.from <= query.key.messageId) { - result = queryFromSlice(query, *slice); - } else { - result.count = _count; - } - base::TaskQueue::Main().Put( - [ - callback = std::move(callback), - result = std::move(result) - ]() mutable { - callback(std::move(result)); - }); +rpl::producer SharedMedia::List::query( + SharedMediaQuery &&query) const { + return [this, query = std::move(query)](auto consumer) { + auto slice = base::lower_bound( + _slices, + query.key.messageId, + [](const Slice &slice, MsgId id) { return slice.range.till < id; }); + if (slice != _slices.end() && slice->range.from <= query.key.messageId) { + consumer.put_next(queryFromSlice(query, *slice)); + } else if (_count) { + auto result = SharedMediaResult {}; + result.count = _count; + consumer.put_next(std::move(result)); + } + consumer.put_done(); + return rpl::lifetime(); + }; } SharedMediaResult SharedMedia::List::queryFromSlice( const SharedMediaQuery &query, - const Slice &slice) { + const Slice &slice) const { auto result = SharedMediaResult {}; auto position = base::lower_bound(slice.messages, query.key.messageId); auto haveBefore = int(position - slice.messages.begin()); @@ -237,14 +232,17 @@ std::map::iterator for (auto index = 0; index != kSharedMediaTypeCount; ++index) { auto &list = result->second[index]; auto type = static_cast(index); - subscribe(list.sliceUpdated, [this, type, peer](const SliceUpdate &update) { - sliceUpdated.notify(SharedMediaSliceUpdate( - peer, - type, - update.messages, - update.range, - update.count), true); - }); + + list.sliceUpdated() + | rpl::on_next([this, peer, type](SliceUpdate &&update) { + _sliceUpdated.fire(SharedMediaSliceUpdate( + peer, + type, + update.messages, + update.range, + update.count)); + }) + | rpl::start(_lifetime); } return result; } @@ -284,7 +282,7 @@ void SharedMedia::remove(SharedMediaRemoveOne &&query) { auto type = static_cast(index); if (query.types.test(type)) { peerIt->second[index].removeOne(query.messageId); - oneRemoved.notify(query, true); + _oneRemoved.fire(std::move(query)); } } } @@ -296,26 +294,21 @@ void SharedMedia::remove(SharedMediaRemoveAll &&query) { for (auto index = 0; index != kSharedMediaTypeCount; ++index) { peerIt->second[index].removeAll(); } - allRemoved.notify(query, true); + _allRemoved.fire(std::move(query)); } } -void SharedMedia::query( - const SharedMediaQuery &query, - base::lambda_once &&callback) { +rpl::producer SharedMedia::query(SharedMediaQuery &&query) const { Expects(IsValidSharedMediaType(query.key.type)); auto peerIt = _lists.find(query.key.peerId); if (peerIt != _lists.end()) { auto index = static_cast(query.key.type); - peerIt->second[index].query(query, std::move(callback)); - } else { - base::TaskQueue::Main().Put( - [ - callback = std::move(callback) - ]() mutable { - callback(SharedMediaResult()); - }); + return peerIt->second[index].query(std::move(query)); } + return [](auto consumer) { + consumer.put_done(); + return rpl::lifetime(); + }; } } // namespace Storage diff --git a/Telegram/SourceFiles/storage/storage_shared_media.h b/Telegram/SourceFiles/storage/storage_shared_media.h index cf86bbb1b..ea46ef296 100644 --- a/Telegram/SourceFiles/storage/storage_shared_media.h +++ b/Telegram/SourceFiles/storage/storage_shared_media.h @@ -21,6 +21,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #pragma once #include "storage/storage_facade.h" +#include "rpl/event_stream.h" namespace Storage { @@ -191,7 +192,7 @@ struct SharedMediaSliceUpdate { base::optional count; }; -class SharedMedia : private base::Subscriber { +class SharedMedia { public: using Type = SharedMediaType; @@ -200,13 +201,18 @@ public: void add(SharedMediaAddSlice &&query); void remove(SharedMediaRemoveOne &&query); void remove(SharedMediaRemoveAll &&query); - void query( - const SharedMediaQuery &query, - base::lambda_once &&callback); - base::Observable sliceUpdated; - base::Observable oneRemoved; - base::Observable allRemoved; + rpl::producer query(SharedMediaQuery &&query) const; + + rpl::producer sliceUpdated() const { + return _sliceUpdated.events(); + } + rpl::producer oneRemoved() const { + return _oneRemoved.events(); + } + rpl::producer allRemoved() const { + return _allRemoved.events(); + } private: class List { @@ -219,16 +225,16 @@ private: base::optional count); void removeOne(MsgId messageId); void removeAll(); - void query( - const SharedMediaQuery &query, - base::lambda_once &&callback); + rpl::producer query(SharedMediaQuery &&query) const; struct SliceUpdate { const base::flat_set *messages = nullptr; MsgRange range; base::optional count; }; - base::Observable sliceUpdated; + rpl::producer sliceUpdated() const { + return _sliceUpdated.events(); + } private: struct Slice { @@ -267,11 +273,13 @@ private: SharedMediaResult queryFromSlice( const SharedMediaQuery &query, - const Slice &slice); + const Slice &slice) const; base::optional _count; base::flat_set _slices; + rpl::event_stream _sliceUpdated; + }; using SliceUpdate = List::SliceUpdate; using Lists = std::array; @@ -280,6 +288,11 @@ private: std::map _lists; + rpl::lifetime _lifetime; + rpl::event_stream _sliceUpdated; + rpl::event_stream _oneRemoved; + rpl::event_stream _allRemoved; + }; } // namespace Storage