diff --git a/Telegram/SourceFiles/application.cpp b/Telegram/SourceFiles/application.cpp index c2eeae415..f899532f7 100644 --- a/Telegram/SourceFiles/application.cpp +++ b/Telegram/SourceFiles/application.cpp @@ -33,6 +33,7 @@ Copyright (c) 2014-2016 John Preston, https://desktop.telegram.org #include "autoupdater.h" #include "core/observer.h" #include "observer_peer.h" +#include "core/observer.h" namespace { void mtpStateChanged(int32 dc, int32 state) { @@ -922,6 +923,10 @@ void AppClass::call_handleDelayedPeerUpdates() { Notify::peerUpdatedSendDelayed(); } +void AppClass::call_handleObservables() { + base::HandleObservables(); +} + void AppClass::killDownloadSessions() { uint64 ms = getms(), left = MTPAckSendWaiting + MTPKillFileSessionTimeout; for (QMap::iterator i = killDownloadSessionTimes.begin(); i != killDownloadSessionTimes.end(); ) { diff --git a/Telegram/SourceFiles/application.h b/Telegram/SourceFiles/application.h index 78983d070..025164cd6 100644 --- a/Telegram/SourceFiles/application.h +++ b/Telegram/SourceFiles/application.h @@ -205,6 +205,7 @@ public slots: void call_handleUnreadCounterUpdate(); void call_handleFileDialogQueue(); void call_handleDelayedPeerUpdates(); + void call_handleObservables(); private: diff --git a/Telegram/SourceFiles/core/basic_types.h b/Telegram/SourceFiles/core/basic_types.h index 943772903..8bb5d3efb 100644 --- a/Telegram/SourceFiles/core/basic_types.h +++ b/Telegram/SourceFiles/core/basic_types.h @@ -33,13 +33,6 @@ T *getPointerAndReset(T *&ptr) { return result; } -template -T createAndSwap(T &value) { - T result = T(); - std::swap(result, value); - return result; -} - struct NullType { }; @@ -491,6 +484,13 @@ struct is_base_of { } // namespace std_ +template +T createAndSwap(T &value) { + T result = T(); + std::swap(result, value); + return std_::move(result); +} + #include "logs.h" static volatile int *t_assert_nullptr = nullptr; diff --git a/Telegram/SourceFiles/core/observer.cpp b/Telegram/SourceFiles/core/observer.cpp index 6ec33a341..cc9b9497f 100644 --- a/Telegram/SourceFiles/core/observer.cpp +++ b/Telegram/SourceFiles/core/observer.cpp @@ -113,3 +113,62 @@ void observerRegisteredDefault(Observer *observer, ConnectionId connection) { } // namespace internal } // namespace Notify + +namespace base { +namespace internal { +namespace { + +bool CantUseObservables = false; + +struct ObservableListWrap { + ~ObservableListWrap() { + CantUseObservables = true; + } + OrderedSet list; +}; + +ObservableListWrap &PendingObservables() { + static ObservableListWrap result; + return result; +} + +ObservableListWrap &ActiveObservables() { + static ObservableListWrap result; + return result; +} + +} // namespace + +void RegisterPendingObservable(ObservableCallHandlers *handlers) { + if (CantUseObservables) return; + PendingObservables().list.insert(handlers); + Global::RefHandleObservables().call(); +} + +void UnregisterActiveObservable(ObservableCallHandlers *handlers) { + if (CantUseObservables) return; + ActiveObservables().list.remove(handlers); +} + +void UnregisterObservable(ObservableCallHandlers *handlers) { + if (CantUseObservables) return; + PendingObservables().list.remove(handlers); + ActiveObservables().list.remove(handlers); +} + +} // namespace internal + +void HandleObservables() { + if (internal::CantUseObservables) return; + auto &active = internal::ActiveObservables().list; + qSwap(active, internal::PendingObservables().list); + while (!active.empty()) { + auto first = *active.begin(); + (*first)(); + if (!active.empty() && *active.begin() == first) { + active.erase(active.begin()); + } + } +} + +} // namespace base diff --git a/Telegram/SourceFiles/core/observer.h b/Telegram/SourceFiles/core/observer.h index 7b0df81a2..02fc10bac 100644 --- a/Telegram/SourceFiles/core/observer.h +++ b/Telegram/SourceFiles/core/observer.h @@ -240,3 +240,324 @@ inline void observerRegistered(ObserverType *observer, ConnectionId connection) } } // namespace Notify + +namespace base { +namespace internal { + +using ObservableCallHandlers = base::lambda_unique; +void RegisterPendingObservable(ObservableCallHandlers *handlers); +void UnregisterActiveObservable(ObservableCallHandlers *handlers); +void UnregisterObservable(ObservableCallHandlers *handlers); + +template +struct SubscriptionHandlerHelper { + using type = base::lambda_unique; +}; + +template <> +struct SubscriptionHandlerHelper { + using type = base::lambda_unique; +}; + +template +using SubscriptionHandler = typename SubscriptionHandlerHelper::type; + +// Required because QShared/WeakPointer can't point to void. +class BaseObservableData { +}; + +template +class CommonObservableData; + +template +class ObservableData; + +} // namespace internal + +class Subscription { +public: + Subscription() = default; + Subscription(const Subscription &) = delete; + Subscription &operator=(const Subscription &) = delete; + Subscription(Subscription &&other) : _node(createAndSwap(other._node)), _removeMethod(other._removeMethod) { + } + Subscription &operator=(Subscription &&other) { + qSwap(_node, other._node); + qSwap(_removeMethod, other._removeMethod); + return *this; + } + void destroy() { + if (_node) { + (*_removeMethod)(_node); + delete _node; + _node = nullptr; + } + } + ~Subscription() { + destroy(); + } + +private: + struct Node { + Node(const QSharedPointer &observable) : observable(observable) { + } + Node *next = nullptr; + Node *prev = nullptr; + QWeakPointer observable; + }; + using RemoveMethod = void(*)(Node*); + Subscription(Node *node, RemoveMethod removeMethod) : _node(node), _removeMethod(removeMethod) { + } + + Node *_node = nullptr; + RemoveMethod _removeMethod; + + template + friend class internal::CommonObservableData; + + template + friend class internal::ObservableData; + +}; + +template +class Observable; + +namespace internal { + +template +class CommonObservable { +public: + using Handler = typename CommonObservableData::Handler; + + Subscription subscribe(Handler &&handler) { + if (_data) { + _data->append(std_::forward(handler)); + } else { + _data = MakeShared>(this, std_::forward(handler)); + } + return _data->last(); + } + +private: + QSharedPointer> _data; + + friend class CommonObservableData; + friend class Observable; + +}; + +} // namespace internal + +template +class Observable : public internal::CommonObservable { +public: + void notify(EventType &&event) { + if (_data) { + _data->notify(std_::move(event)); + } + } + +}; + +namespace internal { + +template +class CommonObservableData : public BaseObservableData { +public: + using Handler = SubscriptionHandler; + + CommonObservableData(CommonObservable *observable, Handler &&handler) : _observable(observable) + , _begin(new Node(observable->_data, std_::forward(handler))) + , _end(_begin) { + } + + void append(Handler &&handler) { + auto node = new Node(_observable->_data, std_::forward(handler)); + + _end->next = node; + node->prev = _end; + _end = node; + } + + Subscription last() { + return { _end, &CommonObservableData::destroyNode }; + } + + bool empty() const { + return !_begin; + } + +private: + struct Node : public Subscription::Node { + Node(const QSharedPointer &observer, Handler &&handler) : Subscription::Node(observer), handler(std_::move(handler)) { + } + Handler handler; + }; + + void remove(Subscription::Node *node) { + if (node->prev) { + node->prev->next = node->next; + } + if (node->next) { + node->next->prev = node->prev; + } + if (_begin == node) { + _begin = static_cast(node->next); + } + if (_end == node) { + _end = static_cast(node->prev); + } + if (_current == node) { + _current = static_cast(node->prev); + } else if (!_begin) { + _observable->_data.reset(); + } + } + + static void destroyNode(Subscription::Node *node) { + if (auto that = node->observable.lock()) { + static_cast(that.data())->remove(node); + } + } + + template + void notifyEnumerate(CallCurrent callCurrent) { + _current = _begin; + do { + callCurrent(); + if (_current) { + _current = static_cast(_current->next); + } else if (_begin) { + _current = _begin; + } else { + break; + } + } while (_current); + + if (!_begin) { + _observable->_data.reset(); + } + } + + CommonObservable *_observable = nullptr; + Node *_begin; + Node *_current = nullptr; + Node *_end; + ObservableCallHandlers _callHandlers; + + friend class ObservableData; + +}; + +template +class ObservableData : public CommonObservableData { +public: + using CommonObservableData::CommonObservableData; + + void notify(EventType &&event) { + if (!_callHandlers) { + _callHandlers = [this]() { + callHandlers(); + }; + } + if (_events.empty()) { + RegisterPendingObservable(&_callHandlers); + } + _events.push_back(std_::move(event)); + } + + ~ObservableData() { + UnregisterObservable(&_callHandlers); + } + +private: + void callHandlers() { + auto events = createAndSwap(_events); + for (auto &event : events) { + notifyEnumerate([this, &event]() { + _current->handler(event); + }); + } + UnregisterActiveObservable(&_callHandlers); + } + + std_::vector_of_moveable _events; + +}; + +template <> +class ObservableData : public CommonObservableData { +public: + using CommonObservableData::CommonObservableData; + + void notify() { + if (!_callHandlers) { + _callHandlers = [this]() { + callHandlers(); + }; + } + if (!_eventsCount) { + RegisterPendingObservable(&_callHandlers); + } + ++_eventsCount; + } + + ~ObservableData() { + UnregisterObservable(&_callHandlers); + } + +private: + void callHandlers() { + auto eventsCount = createAndSwap(_eventsCount); + for (int i = 0; i != eventsCount; ++i) { + notifyEnumerate([this]() { + _current->handler(); + }); + } + UnregisterActiveObservable(&_callHandlers); + } + + int _eventsCount = 0; + +}; + +} // namespace internal + +template <> +class Observable : public internal::CommonObservable { +public: + void notify() { + if (_data) { + _data->notify(); + } + } + +}; + +class Subscriber { +protected: + template + int subscribe(base::Observable &observable, Lambda &&handler) { + _subscriptions.push_back(observable.subscribe(std_::forward(handler))); + return _subscriptions.size() - 1; + } + + template + int subscribe(base::Observable *observable, Lambda &&handler) { + return subscribe(*observable, std_::forward(handler)); + } + + void unsubscribe(int index) { + t_assert(index >= 0 && index < _subscriptions.size()); + _subscriptions[index].destroy(); + } + +private: + std_::vector_of_moveable _subscriptions; + +}; + +void HandleObservables(); + +} // namespace base diff --git a/Telegram/SourceFiles/core/vector_of_moveable.h b/Telegram/SourceFiles/core/vector_of_moveable.h index 608bb0796..6119a9678 100644 --- a/Telegram/SourceFiles/core/vector_of_moveable.h +++ b/Telegram/SourceFiles/core/vector_of_moveable.h @@ -30,6 +30,21 @@ class vector_of_moveable { void *_plaindata = nullptr; public: + vector_of_moveable() = default; + vector_of_moveable(const vector_of_moveable &other) = delete; + vector_of_moveable &operator=(const vector_of_moveable &other) = delete; + vector_of_moveable(vector_of_moveable &&other) + : _size(createAndSwap(other._size)) + , _capacity(createAndSwap(other._capacity)) + , _plaindata(createAndSwap(other._plaindata)) { + } + vector_of_moveable &operator=(vector_of_moveable &&other) { + std::swap(_size, other._size); + std::swap(_capacity, other._capacity); + std::swap(_plaindata, other._plaindata); + return *this; + } + inline T *data() { return reinterpret_cast(_plaindata); } diff --git a/Telegram/SourceFiles/facades.cpp b/Telegram/SourceFiles/facades.cpp index 4cfdfd51c..a4294ff16 100644 --- a/Telegram/SourceFiles/facades.cpp +++ b/Telegram/SourceFiles/facades.cpp @@ -577,6 +577,7 @@ struct Data { SingleDelayedCall HandleUnreadCounterUpdate = { App::app(), "call_handleUnreadCounterUpdate" }; SingleDelayedCall HandleFileDialogQueue = { App::app(), "call_handleFileDialogQueue" }; SingleDelayedCall HandleDelayedPeerUpdates = { App::app(), "call_handleDelayedPeerUpdates" }; + SingleDelayedCall HandleObservables = { App::app(), "call_handleObservables" }; Adaptive::Layout AdaptiveLayout = Adaptive::NormalLayout; bool AdaptiveForWide = true; @@ -654,6 +655,7 @@ DefineRefVar(Global, SingleDelayedCall, HandleHistoryUpdate); DefineRefVar(Global, SingleDelayedCall, HandleUnreadCounterUpdate); DefineRefVar(Global, SingleDelayedCall, HandleFileDialogQueue); DefineRefVar(Global, SingleDelayedCall, HandleDelayedPeerUpdates); +DefineRefVar(Global, SingleDelayedCall, HandleObservables); DefineVar(Global, Adaptive::Layout, AdaptiveLayout); DefineVar(Global, bool, AdaptiveForWide); diff --git a/Telegram/SourceFiles/facades.h b/Telegram/SourceFiles/facades.h index c0b0a4613..2fdd64591 100644 --- a/Telegram/SourceFiles/facades.h +++ b/Telegram/SourceFiles/facades.h @@ -230,6 +230,7 @@ DeclareRefVar(SingleDelayedCall, HandleHistoryUpdate); DeclareRefVar(SingleDelayedCall, HandleUnreadCounterUpdate); DeclareRefVar(SingleDelayedCall, HandleFileDialogQueue); DeclareRefVar(SingleDelayedCall, HandleDelayedPeerUpdates); +DeclareRefVar(SingleDelayedCall, HandleObservables); DeclareVar(Adaptive::Layout, AdaptiveLayout); DeclareVar(bool, AdaptiveForWide); diff --git a/Telegram/SourceFiles/ui/twidget.h b/Telegram/SourceFiles/ui/twidget.h index 6760a2698..7b6c12261 100644 --- a/Telegram/SourceFiles/ui/twidget.h +++ b/Telegram/SourceFiles/ui/twidget.h @@ -275,16 +275,16 @@ public: SingleDelayedCall(QObject *parent, const char *member) : QObject(parent), _member(member) { } void call() { - if (!_pending.loadAcquire()) { - _pending.storeRelease(1); + if (_pending.testAndSetOrdered(0, 1)) { QMetaObject::invokeMethod(this, "makeDelayedCall", Qt::QueuedConnection); } } private slots: void makeDelayedCall() { - _pending.storeRelease(0); - QMetaObject::invokeMethod(parent(), _member); + if (_pending.testAndSetOrdered(1, 0)) { + QMetaObject::invokeMethod(parent(), _member); + } } private: diff --git a/Telegram/gyp/Telegram.gyp b/Telegram/gyp/Telegram.gyp index 0532b5024..01b17232b 100644 --- a/Telegram/gyp/Telegram.gyp +++ b/Telegram/gyp/Telegram.gyp @@ -187,6 +187,7 @@ '<(src_loc)/core/observer.h', '<(src_loc)/core/qthelp_url.cpp', '<(src_loc)/core/qthelp_url.h', + '<(src_loc)/core/vector_of_moveable.h', '<(src_loc)/data/data_abstract_structure.cpp', '<(src_loc)/data/data_abstract_structure.h', '<(src_loc)/data/data_drafts.cpp',