From b9af3c7f3432949d126620285f7049cc9f575b60 Mon Sep 17 00:00:00 2001 From: John Preston Date: Thu, 9 Aug 2018 23:39:22 +0300 Subject: [PATCH] Add base::ConcurrentTimer. Write removes from cache database once an hour. --- Telegram/SourceFiles/application.cpp | 2 + Telegram/SourceFiles/base/base_pch.h | 2 + .../SourceFiles/base/concurrent_timer.cpp | 367 ++++++++++++++++++ Telegram/SourceFiles/base/concurrent_timer.h | 147 +++++++ Telegram/SourceFiles/base/timer.cpp | 1 - Telegram/SourceFiles/core/launcher.cpp | 3 + .../SourceFiles/core/main_queue_processor.h | 1 - .../storage/cache/storage_cache_database.cpp | 24 +- .../cache/storage_cache_database_tests.cpp | 1 + .../storage/storage_encrypted_file_tests.cpp | 3 + Telegram/ThirdParty/crl | 2 +- Telegram/gyp/lib_base.gyp | 2 + Telegram/gyp/lib_storage.gyp | 5 + Telegram/gyp/tests/tests.gyp | 1 - 14 files changed, 552 insertions(+), 9 deletions(-) create mode 100644 Telegram/SourceFiles/base/concurrent_timer.cpp create mode 100644 Telegram/SourceFiles/base/concurrent_timer.h diff --git a/Telegram/SourceFiles/application.cpp b/Telegram/SourceFiles/application.cpp index 4f4544c7d..ee6d6c4c6 100644 --- a/Telegram/SourceFiles/application.cpp +++ b/Telegram/SourceFiles/application.cpp @@ -15,6 +15,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "core/crash_reports.h" #include "messenger.h" #include "base/timer.h" +#include "base/concurrent_timer.h" #include "base/qthelp_url.h" #include "base/qthelp_regex.h" #include "core/update_checker.h" @@ -446,6 +447,7 @@ void adjustSingleTimers() { a->adjustSingleTimers(); } base::Timer::Adjust(); + base::ConcurrentTimerEnvironment::Adjust(); } void connect(const char *signal, QObject *object, const char *method) { diff --git a/Telegram/SourceFiles/base/base_pch.h b/Telegram/SourceFiles/base/base_pch.h index 1c3189841..91cfe95ee 100644 --- a/Telegram/SourceFiles/base/base_pch.h +++ b/Telegram/SourceFiles/base/base_pch.h @@ -11,6 +11,8 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include #include #include +#include +#include #include #include diff --git a/Telegram/SourceFiles/base/concurrent_timer.cpp b/Telegram/SourceFiles/base/concurrent_timer.cpp new file mode 100644 index 000000000..4f9fb82f8 --- /dev/null +++ b/Telegram/SourceFiles/base/concurrent_timer.cpp @@ -0,0 +1,367 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#include "base/concurrent_timer.h" + +#include +#include + +using namespace base::details; + +namespace base { +namespace details { +namespace { + +constexpr auto kCallDelayedEvent = QEvent::Type(QEvent::User + 1); +constexpr auto kCancelTimerEvent = QEvent::Type(QEvent::User + 2); +static_assert(kCancelTimerEvent < QEvent::MaxUser); + +ConcurrentTimerEnvironment *Environment/* = nullptr*/; +QMutex EnvironmentMutex; + +class CallDelayedEvent : public QEvent { +public: + CallDelayedEvent( + crl::time_type timeout, + Qt::TimerType type, + FnMut method); + + crl::time_type timeout() const; + Qt::TimerType type() const; + FnMut takeMethod(); + +private: + crl::time_type _timeout = 0; + Qt::TimerType _type = Qt::PreciseTimer; + FnMut _method; + +}; + +class CancelTimerEvent : public QEvent { +public: + CancelTimerEvent(); + +}; + +CallDelayedEvent::CallDelayedEvent( + crl::time_type timeout, + Qt::TimerType type, + FnMut method) +: QEvent(kCallDelayedEvent) +, _timeout(timeout) +, _type(type) +, _method(std::move(method)) { + Expects(_timeout >= 0 && _timeout < std::numeric_limits::max()); +} + +crl::time_type CallDelayedEvent::timeout() const { + return _timeout; +} + +Qt::TimerType CallDelayedEvent::type() const { + return _type; +} + +FnMut CallDelayedEvent::takeMethod() { + return base::take(_method); +} + +CancelTimerEvent::CancelTimerEvent() : QEvent(kCancelTimerEvent) { +} + +} // namespace + +class TimerObject : public QObject { +public: + TimerObject( + not_null thread, + not_null adjuster, + Fn adjust); + +protected: + bool event(QEvent *e) override; + +private: + void callDelayed(not_null e); + void callNow(); + void cancel(); + void adjust(); + + FnMut _next; + Fn _adjust; + int _timerId = 0; + +}; + +TimerObject::TimerObject( + not_null thread, + not_null adjuster, + Fn adjust) +: _adjust(std::move(adjust)) { + moveToThread(thread); + connect( + adjuster, + &QObject::destroyed, + this, + &TimerObject::adjust, + Qt::DirectConnection); +} + +bool TimerObject::event(QEvent *e) { + const auto type = e->type(); + switch (type) { + case kCallDelayedEvent: + callDelayed(static_cast(e)); + return true; + case kCancelTimerEvent: + cancel(); + return true; + case QEvent::Timer: + callNow(); + return true; + } + return QObject::event(e); +} + +void TimerObject::callDelayed(not_null e) { + cancel(); + + const auto timeout = e->timeout(); + const auto type = e->type(); + _next = e->takeMethod(); + if (timeout > 0) { + _timerId = startTimer(timeout, type); + } else { + base::take(_next)(); + } +} + +void TimerObject::cancel() { + if (const auto id = base::take(_timerId)) { + killTimer(id); + } + _next = nullptr; +} + +void TimerObject::callNow() { + auto next = base::take(_next); + cancel(); + next(); +} + +void TimerObject::adjust() { + if (_adjust) { + _adjust(); + } +} + +TimerObjectWrap::TimerObjectWrap(Fn adjust) { + QMutexLocker lock(&EnvironmentMutex); + + if (Environment) { + _value = Environment->createTimer(std::move(adjust)); + } +} + +TimerObjectWrap::~TimerObjectWrap() { + if (_value) { + QMutexLocker lock(&EnvironmentMutex); + + if (Environment) { + _value.release()->deleteLater(); + } + } +} + +void TimerObjectWrap::call( + crl::time_type timeout, + Qt::TimerType type, + FnMut method) { + sendEvent(std::make_unique( + timeout, + type, + std::move(method))); +} + +void TimerObjectWrap::cancel() { + sendEvent(std::make_unique()); +} + +void TimerObjectWrap::sendEvent(std::unique_ptr event) { + if (!_value) { + return; + } + QCoreApplication::postEvent( + _value.get(), + event.release(), + Qt::HighEventPriority); +} + +} // namespace details + +ConcurrentTimerEnvironment::ConcurrentTimerEnvironment() { + _thread.start(); + _adjuster.moveToThread(&_thread); + + acquire(); +} + +ConcurrentTimerEnvironment::~ConcurrentTimerEnvironment() { + _thread.quit(); + release(); + _thread.wait(); + QObject::disconnect(&_adjuster, &QObject::destroyed, nullptr, nullptr); +} + +std::unique_ptr ConcurrentTimerEnvironment::createTimer( + Fn adjust) { + return std::make_unique( + &_thread, + &_adjuster, + std::move(adjust)); +} + +void ConcurrentTimerEnvironment::Adjust() { + QMutexLocker lock(&EnvironmentMutex); + if (Environment) { + Environment->adjustTimers(); + } +} + +void ConcurrentTimerEnvironment::adjustTimers() { + QObject emitter; + QObject::connect( + &emitter, + &QObject::destroyed, + &_adjuster, + &QObject::destroyed, + Qt::QueuedConnection); +} + +void ConcurrentTimerEnvironment::acquire() { + Expects(Environment == nullptr); + + QMutexLocker lock(&EnvironmentMutex); + Environment = this; +} + +void ConcurrentTimerEnvironment::release() { + Expects(Environment == this); + + QMutexLocker lock(&EnvironmentMutex); + Environment = nullptr; +} + +ConcurrentTimer::ConcurrentTimer( + Fn)> runner, + Fn callback) +: _runner(std::move(runner)) +, _object(createAdjuster()) +, _callback(std::move(callback)) +, _type(Qt::PreciseTimer) +, _adjusted(false) { + setRepeat(Repeat::Interval); +} + +Fn ConcurrentTimer::createAdjuster() { + auto guards = base::make_binary_guard(); + _guard = std::make_shared(true); + return [=, runner = _runner, guard = std::weak_ptr(_guard)] { + runner([=] { + if (!guard.lock()) { + return; + } + adjust(); + }); + }; +} + +void ConcurrentTimer::start( + TimeMs timeout, + Qt::TimerType type, + Repeat repeat) { + _type = type; + setRepeat(repeat); + _adjusted = false; + setTimeout(timeout); + + cancelAndSchedule(_timeout); + _next = crl::time() + _timeout; +} + +void ConcurrentTimer::cancelAndSchedule(int timeout) { + auto guards = base::make_binary_guard(); + _running = std::move(guards.first); + auto method = [ + =, + runner = _runner, + guard = std::move(guards.second) + ]() mutable { + if (!guard.alive()) { + return; + } + runner([=, guard = std::move(guard)] { + if (!guard.alive()) { + return; + } + timerEvent(); + }); + }; + _object.call(timeout, _type, std::move(method)); +} + +void ConcurrentTimer::timerEvent() { + if (repeat() == Repeat::Interval) { + if (_adjusted) { + start(_timeout, _type, repeat()); + } else { + _next = crl::time() + _timeout; + } + } else { + cancel(); + } + + if (_callback) { + _callback(); + } +} + +void ConcurrentTimer::cancel() { + _running = {}; + if (isActive()) { + _running = base::binary_guard(); + _object.cancel(); + } +} + +TimeMs ConcurrentTimer::remainingTime() const { + if (!isActive()) { + return -1; + } + const auto now = crl::time(); + return (_next > now) ? (_next - now) : TimeMs(0); +} + +void ConcurrentTimer::adjust() { + auto remaining = remainingTime(); + if (remaining >= 0) { + cancelAndSchedule(remaining); + _adjusted = true; + } +} + +void ConcurrentTimer::setTimeout(TimeMs timeout) { + Expects(timeout >= 0 && timeout <= std::numeric_limits::max()); + + _timeout = static_cast(timeout); +} + +int ConcurrentTimer::timeout() const { + return _timeout; +} + +} // namespace base diff --git a/Telegram/SourceFiles/base/concurrent_timer.h b/Telegram/SourceFiles/base/concurrent_timer.h new file mode 100644 index 000000000..1be2af1b7 --- /dev/null +++ b/Telegram/SourceFiles/base/concurrent_timer.h @@ -0,0 +1,147 @@ +/* +This file is part of Telegram Desktop, +the official desktop application for the Telegram messaging service. + +For license and copyright information please follow this link: +https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL +*/ +#pragma once + +#include "base/binary_guard.h" +#include +#include +#include + +namespace base { +namespace details { + +class TimerObject; + +class TimerObjectWrap { +public: + explicit TimerObjectWrap(Fn adjust); + ~TimerObjectWrap(); + + void call( + crl::time_type timeout, + Qt::TimerType type, + FnMut method); + void cancel(); + +private: + void sendEvent(std::unique_ptr event); + + std::unique_ptr _value; + +}; + +} // namespace details + +class ConcurrentTimerEnvironment { +public: + ConcurrentTimerEnvironment(); + ~ConcurrentTimerEnvironment(); + + std::unique_ptr createTimer(Fn adjust); + + static void Adjust(); + +private: + void acquire(); + void release(); + void adjustTimers(); + + QThread _thread; + QObject _adjuster; + +}; + +class ConcurrentTimer { +public: + explicit ConcurrentTimer( + Fn)> runner, + Fn callback = nullptr); + + template + explicit ConcurrentTimer( + crl::weak_on_queue weak, + Fn callback = nullptr); + + static Qt::TimerType DefaultType(TimeMs timeout) { + constexpr auto kThreshold = TimeMs(1000); + return (timeout > kThreshold) ? Qt::CoarseTimer : Qt::PreciseTimer; + } + + void setCallback(Fn callback) { + _callback = std::move(callback); + } + + void callOnce(TimeMs timeout) { + callOnce(timeout, DefaultType(timeout)); + } + + void callEach(TimeMs timeout) { + callEach(timeout, DefaultType(timeout)); + } + + void callOnce(TimeMs timeout, Qt::TimerType type) { + start(timeout, type, Repeat::SingleShot); + } + + void callEach(TimeMs timeout, Qt::TimerType type) { + start(timeout, type, Repeat::Interval); + } + + bool isActive() const { + return _running.alive(); + } + + void cancel(); + TimeMs remainingTime() const; + +private: + enum class Repeat : unsigned { + Interval = 0, + SingleShot = 1, + }; + Fn createAdjuster(); + void start(TimeMs timeout, Qt::TimerType type, Repeat repeat); + void adjust(); + + void cancelAndSchedule(int timeout); + + void setTimeout(TimeMs timeout); + int timeout() const; + + void timerEvent(); + + void setRepeat(Repeat repeat) { + _repeat = static_cast(repeat); + } + Repeat repeat() const { + return static_cast(_repeat); + } + + Fn)> _runner; + std::shared_ptr _guard; // Must be before _object. + details::TimerObjectWrap _object; + Fn _callback; + base::binary_guard _running; + TimeMs _next = 0; + int _timeout = 0; + int _timerId = 0; + + Qt::TimerType _type : 2; + bool _adjusted : 1; + unsigned _repeat : 1; + +}; + +template +ConcurrentTimer::ConcurrentTimer( + crl::weak_on_queue weak, + Fn callback) +: ConcurrentTimer(weak.runner(), std::move(callback)) { +} + +} // namespace base diff --git a/Telegram/SourceFiles/base/timer.cpp b/Telegram/SourceFiles/base/timer.cpp index f0c096793..32388c1e9 100644 --- a/Telegram/SourceFiles/base/timer.cpp +++ b/Telegram/SourceFiles/base/timer.cpp @@ -26,7 +26,6 @@ Timer::Timer( moveToThread(thread); } - Timer::Timer(Fn callback) : QObject(nullptr) , _callback(std::move(callback)) diff --git a/Telegram/SourceFiles/core/launcher.cpp b/Telegram/SourceFiles/core/launcher.cpp index 0a58df067..794061f8a 100644 --- a/Telegram/SourceFiles/core/launcher.cpp +++ b/Telegram/SourceFiles/core/launcher.cpp @@ -12,6 +12,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "core/crash_reports.h" #include "core/main_queue_processor.h" #include "core/update_checker.h" +#include "base/concurrent_timer.h" #include "application.h" namespace Core { @@ -242,6 +243,8 @@ void Launcher::processArguments() { int Launcher::executeApplication() { MainQueueProcessor processor; + base::ConcurrentTimerEnvironment environment; + Application app(this, _argc, _argv); return app.exec(); } diff --git a/Telegram/SourceFiles/core/main_queue_processor.h b/Telegram/SourceFiles/core/main_queue_processor.h index 0fe1807ac..3e8b1567f 100644 --- a/Telegram/SourceFiles/core/main_queue_processor.h +++ b/Telegram/SourceFiles/core/main_queue_processor.h @@ -12,7 +12,6 @@ namespace Core { class MainQueueProcessor : public QObject { public: MainQueueProcessor(); - ~MainQueueProcessor(); protected: diff --git a/Telegram/SourceFiles/storage/cache/storage_cache_database.cpp b/Telegram/SourceFiles/storage/cache/storage_cache_database.cpp index ad558cc96..cb6c077c1 100644 --- a/Telegram/SourceFiles/storage/cache/storage_cache_database.cpp +++ b/Telegram/SourceFiles/storage/cache/storage_cache_database.cpp @@ -12,6 +12,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "storage/storage_encrypted_file.h" #include "base/flat_set.h" #include "base/algorithm.h" +#include "base/concurrent_timer.h" #include #include #include @@ -39,6 +40,7 @@ constexpr auto kReadBlockSize = 8 * 1024 * 1024; constexpr auto kRecordSizeUnknown = size_type(-1); constexpr auto kRecordSizeInvalid = size_type(-2); constexpr auto kMaxDataSize = 10 * 1024 * 1024; +constexpr auto kRemoveBundleDelay = 60 * 60 * crl::time_type(1000); using RecordType = uint8; using PlaceId = std::array; @@ -175,6 +177,8 @@ public: void clear(FnMut done); + ~Database(); + private: struct Entry { Entry() = default; @@ -232,6 +236,8 @@ private: std::unordered_map _map; std::set _removing; + base::ConcurrentTimer _writeRemoveTimer; + CleanerWrap _cleaner; }; @@ -253,7 +259,8 @@ Database::Database( const Settings &settings) : _weak(std::move(weak)) , _base(ComputeBasePath(path)) -, _settings(settings) { +, _settings(settings) +, _writeRemoveTimer(_weak, [=] { writeMultiRemove(); }) { } template @@ -483,6 +490,9 @@ bool Database::readRecordMultiRemove(bytes::const_span data) { } void Database::close(FnMut done) { + if (_writeRemoveTimer.isActive()) { + writeMultiRemove(); + } _cleaner = CleanerWrap(); _binlog.close(); invokeCallback(done); @@ -588,11 +598,11 @@ void Database::remove(const Key &key, FnMut done) { const auto i = _map.find(key); if (i != _map.end()) { _removing.emplace(key); - if (true || _removing.size() == kMaxBundledRecords) { + if (_removing.size() == kMaxBundledRecords) { + _writeRemoveTimer.cancel(); writeMultiRemove(); - // cancel timeout?.. - } else { - // timeout?.. + } else if (!_writeRemoveTimer.isActive()) { + _writeRemoveTimer.callOnce(kRemoveBundleDelay); } const auto &entry = i->second; @@ -649,6 +659,10 @@ void Database::clear(FnMut done) { writeVersion(version) ? Error::NoError() : ioError(versionPath())); } +Database::~Database() { + close(nullptr); +} + auto Database::findAvailableVersion() const -> Version { const auto entries = QDir(_base).entryList( QDir::Dirs | QDir::NoDotAndDotDot); diff --git a/Telegram/SourceFiles/storage/cache/storage_cache_database_tests.cpp b/Telegram/SourceFiles/storage/cache/storage_cache_database_tests.cpp index 51f2bcdbf..e9d7d1965 100644 --- a/Telegram/SourceFiles/storage/cache/storage_cache_database_tests.cpp +++ b/Telegram/SourceFiles/storage/cache/storage_cache_database_tests.cpp @@ -9,6 +9,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "storage/cache/storage_cache_database.h" #include "storage/storage_encryption.h" +#include "base/concurrent_timer.h" #include #include diff --git a/Telegram/SourceFiles/storage/storage_encrypted_file_tests.cpp b/Telegram/SourceFiles/storage/storage_encrypted_file_tests.cpp index 6c539a069..97ae69ed8 100644 --- a/Telegram/SourceFiles/storage/storage_encrypted_file_tests.cpp +++ b/Telegram/SourceFiles/storage/storage_encrypted_file_tests.cpp @@ -9,6 +9,9 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL #include "storage/storage_encrypted_file.h" +#include +#include + #ifdef Q_OS_WIN #include "platform/win/windows_dlls.h" #endif // Q_OS_WIN diff --git a/Telegram/ThirdParty/crl b/Telegram/ThirdParty/crl index 2cab11076..4291015ef 160000 --- a/Telegram/ThirdParty/crl +++ b/Telegram/ThirdParty/crl @@ -1 +1 @@ -Subproject commit 2cab11076d84a9db7d86f165eb2cfb4c6ebcc8f4 +Subproject commit 4291015efab76bda5886a56b5007f4531be17d46 diff --git a/Telegram/gyp/lib_base.gyp b/Telegram/gyp/lib_base.gyp index b802c0aa1..1f3e9c34e 100644 --- a/Telegram/gyp/lib_base.gyp +++ b/Telegram/gyp/lib_base.gyp @@ -51,6 +51,8 @@ '<(src_loc)/base/binary_guard.h', '<(src_loc)/base/build_config.h', '<(src_loc)/base/bytes.h', + '<(src_loc)/base/concurrent_timer.cpp', + '<(src_loc)/base/concurrent_timer.h', '<(src_loc)/base/flags.h', '<(src_loc)/base/enum_mask.h', '<(src_loc)/base/flat_map.h', diff --git a/Telegram/gyp/lib_storage.gyp b/Telegram/gyp/lib_storage.gyp index a86f0ef63..81bb4be52 100644 --- a/Telegram/gyp/lib_storage.gyp +++ b/Telegram/gyp/lib_storage.gyp @@ -34,6 +34,11 @@ ], 'dependencies': [ 'crl.gyp:crl', + 'lib_base.gyp:lib_base', + ], + 'export_dependent_settings': [ + 'crl.gyp:crl', + 'lib_base.gyp:lib_base', ], 'include_dirs': [ '<(src_loc)', diff --git a/Telegram/gyp/tests/tests.gyp b/Telegram/gyp/tests/tests.gyp index 6e574dc02..4d6ef2361 100644 --- a/Telegram/gyp/tests/tests.gyp +++ b/Telegram/gyp/tests/tests.gyp @@ -125,7 +125,6 @@ ], 'dependencies': [ '../lib_storage.gyp:lib_storage', - '../crl.gyp:crl', ], 'sources': [ '<(src_loc)/storage/storage_encrypted_file_tests.cpp',