From dd933cf61cf393c7c5b0f7bd53f58bb084709363 Mon Sep 17 00:00:00 2001 From: John Preston Date: Fri, 24 Feb 2017 20:15:41 +0300 Subject: [PATCH] MTP global state moved to MTP::Instance class. Now there will be ability to start multiple mtproto instances. --- Telegram/SourceFiles/app.cpp | 6 +- Telegram/SourceFiles/core/utils.cpp | 2 +- Telegram/SourceFiles/core/utils.h | 47 +- Telegram/SourceFiles/intro/introwidget.cpp | 2 +- Telegram/SourceFiles/localstorage.cpp | 36 +- Telegram/SourceFiles/logs.cpp | 6 +- Telegram/SourceFiles/mainwidget.cpp | 8 +- Telegram/SourceFiles/messenger.cpp | 136 +- Telegram/SourceFiles/messenger.h | 16 +- Telegram/SourceFiles/mtproto/connection.cpp | 89 +- Telegram/SourceFiles/mtproto/connection.h | 35 +- Telegram/SourceFiles/mtproto/dc_options.cpp | 5 +- Telegram/SourceFiles/mtproto/dcenter.cpp | 195 +-- Telegram/SourceFiles/mtproto/dcenter.h | 44 +- Telegram/SourceFiles/mtproto/facade.cpp | 851 +----------- Telegram/SourceFiles/mtproto/facade.h | 221 ++- Telegram/SourceFiles/mtproto/mtp_instance.cpp | 1237 +++++++++++++++++ Telegram/SourceFiles/mtproto/mtp_instance.h | 131 ++ Telegram/SourceFiles/mtproto/session.cpp | 96 +- Telegram/SourceFiles/mtproto/session.h | 34 +- Telegram/SourceFiles/passcodewidget.cpp | 2 +- Telegram/SourceFiles/structs.h | 4 +- Telegram/gyp/Telegram.gyp | 6 +- 23 files changed, 1868 insertions(+), 1341 deletions(-) create mode 100644 Telegram/SourceFiles/mtproto/mtp_instance.cpp create mode 100644 Telegram/SourceFiles/mtproto/mtp_instance.h diff --git a/Telegram/SourceFiles/app.cpp b/Telegram/SourceFiles/app.cpp index 0f368d69e..a9fde250f 100644 --- a/Telegram/SourceFiles/app.cpp +++ b/Telegram/SourceFiles/app.cpp @@ -226,11 +226,11 @@ namespace { } // namespace void logOut() { - if (MTP::started()) { - MTP::logoutKeys(rpcDone(&loggedOut), rpcFail(&loggedOut)); + if (auto mtproto = Messenger::Instance().mtp()) { + mtproto->logout(rpcDone(&loggedOut), rpcFail(&loggedOut)); } else { loggedOut(); - MTP::start(); + Messenger::Instance().startMtp(); } } diff --git a/Telegram/SourceFiles/core/utils.cpp b/Telegram/SourceFiles/core/utils.cpp index 025fbe5fb..40ca068a4 100644 --- a/Telegram/SourceFiles/core/utils.cpp +++ b/Telegram/SourceFiles/core/utils.cpp @@ -124,7 +124,7 @@ void unixtimeSet(int32 serverTime, bool force) { } TimeId unixtime() { - TimeId result = myunixtime(); + auto result = myunixtime(); QReadLocker locker(&unixtimeLock); return result + unixtimeDelta; diff --git a/Telegram/SourceFiles/core/utils.h b/Telegram/SourceFiles/core/utils.h index 3e7213199..afef280e0 100644 --- a/Telegram/SourceFiles/core/utils.h +++ b/Telegram/SourceFiles/core/utils.h @@ -23,6 +23,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "core/basic_types.h" #include #include +#include namespace base { @@ -89,6 +90,50 @@ inline bool contains(const Container &container, const T &value) { return std::find(std::begin(container), end, value) != end; } +// We need a custom comparator for std::set>::find to work with pointers. +// thanks to http://stackoverflow.com/questions/18939882/raw-pointer-lookup-for-sets-of-unique-ptrs +template +struct pointer_comparator { + using is_transparent = std::true_type; + + // helper does some magic in order to reduce the number of + // pairs of types we need to know how to compare: it turns + // everything into a pointer, and then uses `std::less` + // to do the comparison: + struct helper { + T *ptr = nullptr; + helper() = default; + helper(const helper &other) = default; + helper(T *p) : ptr(p) { + } + template + helper(const std::shared_ptr &other) : ptr(other.get()) { + } + template + helper(const std::unique_ptr &other) : ptr(other.get()) { + } + bool operator<(helper other) const { + return std::less()(ptr, other.ptr); + } + }; + + // without helper, we'd need 2^n different overloads, where + // n is the number of types we want to support (so, 8 with + // raw pointers, unique pointers, and shared pointers). That + // seems silly. + // && helps enforce rvalue use only + bool operator()(const helper &&lhs, const helper &&rhs) const { + return lhs < rhs; + } + +}; + +template +using set_of_unique_ptr = std::set, base::pointer_comparator>; + +template +using set_of_shared_ptr = std::set, base::pointer_comparator>; + } // namespace base // using for_const instead of plain range-based for loop to ensure usage of const_iterator @@ -162,7 +207,6 @@ inline void t_assert_fail(const char *message, const char *file, int32 line) { class Exception : public std::exception { public: - Exception(const QString &msg, bool isFatal = true) : _fatal(isFatal), _msg(msg.toUtf8()) { LOG(("Exception: %1").arg(msg)); } @@ -179,6 +223,7 @@ public: private: bool _fatal; QByteArray _msg; + }; class MTPint; diff --git a/Telegram/SourceFiles/intro/introwidget.cpp b/Telegram/SourceFiles/intro/introwidget.cpp index efe110e9e..5f68a7fd9 100644 --- a/Telegram/SourceFiles/intro/introwidget.cpp +++ b/Telegram/SourceFiles/intro/introwidget.cpp @@ -265,7 +265,7 @@ bool Widget::resetFail(const RPCError &error) { void Widget::gotNearestDC(const MTPNearestDc &result) { auto &nearest = result.c_nearestDc(); DEBUG_LOG(("Got nearest dc, country: %1, nearest: %2, this: %3").arg(nearest.vcountry.c_string().v.c_str()).arg(nearest.vnearest_dc.v).arg(nearest.vthis_dc.v)); - MTP::setdc(nearest.vnearest_dc.v, true); + Messenger::Instance().mtp()->suggestMainDcId(nearest.vnearest_dc.v); auto nearestCountry = qs(nearest.vcountry); if (getData()->country != nearestCountry) { getData()->country = nearestCountry; diff --git a/Telegram/SourceFiles/localstorage.cpp b/Telegram/SourceFiles/localstorage.cpp index a38bd6d79..da9edeb8c 100644 --- a/Telegram/SourceFiles/localstorage.cpp +++ b/Telegram/SourceFiles/localstorage.cpp @@ -504,7 +504,7 @@ enum { // Local Storage Keys enum { dbiKey = 0x00, dbiUser = 0x01, - dbiDcOptionOld = 0x02, + dbiDcOptionOldOld = 0x02, dbiChatSizeMax = 0x03, dbiMutePeer = 0x04, dbiSendKey = 0x05, @@ -541,7 +541,7 @@ enum { dbiRecentEmojiOld = 0x24, dbiEmojiVariantsOld = 0x25, dbiRecentStickers = 0x26, - dbiDcOption = 0x27, + dbiDcOptionOld = 0x27, dbiTryIPv6 = 0x28, dbiSongVolume = 0x29, dbiWindowsNotificationsOld = 0x30, @@ -567,6 +567,7 @@ enum { dbiDialogsWidthRatio = 0x48, dbiUseExternalVideoPlayer = 0x49, dbiDcOptions = 0x4a, + dbiMtpAuthorization = 0x4b, dbiEncryptedWithSalt = 333, dbiEncrypted = 444, @@ -843,7 +844,7 @@ void applyReadContext(const ReadSettingsContext &context) { bool _readSetting(quint32 blockId, QDataStream &stream, int version, ReadSettingsContext &context) { switch (blockId) { - case dbiDcOptionOld: { + case dbiDcOptionOldOld: { quint32 dcId, port; QString host, ip; stream >> dcId >> host >> ip >> port; @@ -852,7 +853,7 @@ bool _readSetting(quint32 blockId, QDataStream &stream, int version, ReadSetting context.dcOptions.constructAddOne(dcId, 0, ip.toStdString(), port); } break; - case dbiDcOption: { + case dbiDcOptionOld: { quint32 dcIdWithShift, port; qint32 flags; QString ip; @@ -863,7 +864,7 @@ bool _readSetting(quint32 blockId, QDataStream &stream, int version, ReadSetting } break; case dbiDcOptions: { - QByteArray serialized; + auto serialized = QByteArray(); stream >> serialized; if (!_checkStreamStatus(stream)) return false; @@ -909,8 +910,7 @@ bool _readSetting(quint32 blockId, QDataStream &stream, int version, ReadSetting if (!_checkStreamStatus(stream)) return false; DEBUG_LOG(("MTP Info: user found, dc %1, uid %2").arg(dcId).arg(userId)); - MTP::configure(dcId); - + Messenger::Instance().setMtpMainDcId(dcId); if (userId) { Messenger::Instance().authSessionCreate(UserId(userId)); } @@ -923,7 +923,15 @@ bool _readSetting(quint32 blockId, QDataStream &stream, int version, ReadSetting stream.readRawData(key.data(), key.size()); if (!_checkStreamStatus(stream)) return false; - MTP::setKey(dcId, key); + Messenger::Instance().setMtpKey(dcId, key); + } break; + + case dbiMtpAuthorization: { + auto serialized = QByteArray(); + stream >> serialized; + if (!_checkStreamStatus(stream)) return false; + + Messenger::Instance().setMtpAuthorization(serialized); } break; case dbiAutoStart: { @@ -1777,18 +1785,12 @@ void _writeMtpData() { return; } - auto keys = MTP::getKeys(); + auto mtpAuthorizationSerialized = Messenger::Instance().serializeMtpAuthorization(); - quint32 size = sizeof(quint32) + sizeof(qint32) + sizeof(quint32); - size += keys.size() * (sizeof(quint32) + sizeof(quint32) + MTP::AuthKey::kSize); + quint32 size = sizeof(quint32) + Serialize::bytearraySize(mtpAuthorizationSerialized); EncryptedDescriptor data(size); - data.stream << quint32(dbiUser) << qint32(AuthSession::CurrentUserId()) << quint32(MTP::maindc()); - for_const (auto &key, keys) { - data.stream << quint32(dbiKey) << quint32(key->getDC()); - key->write(data.stream); - } - + data.stream << quint32(dbiMtpAuthorization) << mtpAuthorizationSerialized; mtp.writeEncrypted(data); } diff --git a/Telegram/SourceFiles/logs.cpp b/Telegram/SourceFiles/logs.cpp index 115618300..17fd4b685 100644 --- a/Telegram/SourceFiles/logs.cpp +++ b/Telegram/SourceFiles/logs.cpp @@ -25,6 +25,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include #include "pspecific.h" +#include "mtproto/connection.h" #ifndef TDESKTOP_DISABLE_CRASH_REPORTS @@ -87,9 +88,8 @@ QString _logsEntryStart() { static int32 index = 0; QDateTime tm(QDateTime::currentDateTime()); - QThread *thread = QThread::currentThread(); - MTP::internal::Thread *mtpThread = qobject_cast(thread); - uint threadId = mtpThread ? mtpThread->getThreadId() : 0; + auto thread = qobject_cast(QThread::currentThread()); + auto threadId = thread ? thread->getThreadIndex() : 0; return QString("[%1 %2-%3]").arg(tm.toString("hh:mm:ss.zzz")).arg(QString("%1").arg(threadId, 2, 10, QChar('0'))).arg(++index, 7, 10, QChar('0')); } diff --git a/Telegram/SourceFiles/mainwidget.cpp b/Telegram/SourceFiles/mainwidget.cpp index 518bf6ae2..c8570b0d4 100644 --- a/Telegram/SourceFiles/mainwidget.cpp +++ b/Telegram/SourceFiles/mainwidget.cpp @@ -82,7 +82,9 @@ MainWidget::MainWidget(QWidget *parent) : TWidget(parent) , _playerPanel(this, Media::Player::Panel::Layout::Full) , _mediaType(this, st::defaultDropdownMenu) , _api(new ApiWrap(this)) { - MTP::setGlobalDoneHandler(rpcDone(&MainWidget::updateReceived)); + Messenger::Instance().mtp()->setUpdatesHandler(rpcDone(&MainWidget::updateReceived)); + Messenger::Instance().mtp()->setGlobalFailHandler(rpcFail(&MainWidget::updateFail)); + _ptsWaiter.setRequesting(true); updateScrollColors(); @@ -174,8 +176,6 @@ MainWidget::MainWidget(QWidget *parent) : TWidget(parent) orderWidgets(); - MTP::setGlobalFailHandler(rpcFail(&MainWidget::updateFail)); - _mediaType->hide(); _mediaType->setOrigin(Ui::PanelAnimation::Origin::TopRight); _topBar->mediaTypeButton()->installEventFilter(_mediaType); @@ -4279,7 +4279,7 @@ MainWidget::~MainWidget() { _hider = nullptr; delete hider; } - MTP::clearGlobalHandlers(); + Messenger::Instance().mtp()->clearGlobalHandlers(); if (App::wnd()) App::wnd()->noMain(this); } diff --git a/Telegram/SourceFiles/messenger.cpp b/Telegram/SourceFiles/messenger.cpp index 70521d7fe..aefde0a40 100644 --- a/Telegram/SourceFiles/messenger.cpp +++ b/Telegram/SourceFiles/messenger.cpp @@ -32,6 +32,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "fileuploader.h" #include "mainwidget.h" #include "mtproto/dc_options.h" +#include "mtproto/mtp_instance.h" #include "media/player/media_player_instance.h" #include "window/notifications_manager.h" #include "window/themes/window_theme.h" @@ -43,25 +44,18 @@ namespace { Messenger *SingleInstance = nullptr; -void mtpStateChanged(int32 dc, int32 state) { - if (App::wnd()) { - App::wnd()->mtpStateChanged(dc, state); - } -} - -void mtpSessionReset(int32 dc) { - if (App::main() && dc == MTP::maindc()) { - App::main()->getDifference(); - } -} - } // namespace Messenger *Messenger::InstancePointer() { return SingleInstance; } -Messenger::Messenger() : QObject() { +struct Messenger::Private { + MTP::Instance::Config mtpConfig; +}; + +Messenger::Messenger() : QObject() +, _private(std::make_unique()) { t_assert(SingleInstance == nullptr); SingleInstance = this; @@ -126,12 +120,9 @@ Messenger::Messenger() : QObject() { DEBUG_LOG(("Application Info: passcode needed...")); } else { DEBUG_LOG(("Application Info: local map read...")); - MTP::start(); + startMtp(); } - MTP::setStateChangedHandler(mtpStateChanged); - MTP::setSessionResetHandler(mtpSessionReset); - DEBUG_LOG(("Application Info: MTP started...")); DEBUG_LOG(("Application Info: showing.")); @@ -168,6 +159,107 @@ Messenger::Messenger() : QObject() { } } +void Messenger::setMtpMainDcId(MTP::DcId mainDcId) { + t_assert(!_mtproto); + _private->mtpConfig.mainDcId = mainDcId; +} + +void Messenger::setMtpKey(MTP::DcId dcId, const MTP::AuthKey::Data &keyData) { + t_assert(!_mtproto); + _private->mtpConfig.keys.insert(std::make_pair(dcId, keyData)); +} + +QByteArray Messenger::serializeMtpAuthorization() const { + auto serialize = [this](auto keysCount, auto mainDcId, auto writeKeys) { + auto result = QByteArray(); + auto size = sizeof(qint32) + sizeof(qint32) + sizeof(qint32); // userId + mainDcId + keys count + size += keysCount * (sizeof(qint32) + MTP::AuthKey::Data().size()); + result.reserve(size); + { + QBuffer buffer(&result); + if (!buffer.open(QIODevice::WriteOnly)) { + LOG(("MTP Error: could not open buffer to serialize mtp authorization.")); + return result; + } + QDataStream stream(&buffer); + stream.setVersion(QDataStream::Qt_5_1); + + stream << qint32(AuthSession::CurrentUserId()) << qint32(mainDcId) << qint32(keysCount); + writeKeys(stream); + } + return result; + }; + if (_mtproto) { + auto keys = _mtproto->getKeysForWrite(); + return serialize(keys.size(), _mtproto->mainDcId(), [&keys](QDataStream &stream) { + for (auto &key : keys) { + stream << qint32(key->getDC()); + key->write(stream); + } + }); + } + auto &keys = _private->mtpConfig.keys; + return serialize(keys.size(), _private->mtpConfig.mainDcId, [&keys](QDataStream &stream) { + for (auto &key : keys) { + stream << qint32(key.first); + stream.writeRawData(key.second.data(), key.second.size()); + } + }); +} + +void Messenger::setMtpAuthorization(const QByteArray &serialized) { + t_assert(!_mtproto); + t_assert(!authSession()); + + auto readonly = serialized; + QBuffer buffer(&readonly); + if (!buffer.open(QIODevice::ReadOnly)) { + LOG(("MTP Error: could not open serialized mtp authorization for reading.")); + return; + } + QDataStream stream(&buffer); + stream.setVersion(QDataStream::Qt_5_1); + + qint32 userId = 0, mainDcId = 0, count = 0; + stream >> userId >> mainDcId >> count; + if (stream.status() != QDataStream::Ok) { + LOG(("MTP Error: could not read main fields from serialized mtp authorization.")); + return; + } + + if (userId) { + authSessionCreate(userId); + } + _private->mtpConfig.mainDcId = mainDcId; + for (auto i = 0; i != count; ++i) { + qint32 dcId = 0; + MTP::AuthKey::Data keyData; + stream >> dcId; + stream.readRawData(keyData.data(), keyData.size()); + if (stream.status() != QDataStream::Ok) { + LOG(("MTP Error: could not read key from serialized mtp authorization.")); + return; + } + _private->mtpConfig.keys.insert(std::make_pair(dcId, keyData)); + } +} + +void Messenger::startMtp() { + t_assert(!_mtproto); + _mtproto = std::make_unique(_dcOptions.get(), std::move(_private->mtpConfig)); + + _mtproto->setStateChangedHandler([](MTP::ShiftedDcId shiftedDcId, int32 state) { + if (App::wnd()) { + App::wnd()->mtpStateChanged(shiftedDcId, state); + } + }); + _mtproto->setSessionResetHandler([](MTP::ShiftedDcId shiftedDcId) { + if (App::main() && shiftedDcId == MTP::maindc()) { + App::main()->getDifference(); + } + }); +} + void Messenger::loadLanguage() { if (cLang() < languageTest) { cSetLang(Sandbox::LangSystem()); @@ -199,10 +291,12 @@ void Messenger::startLocalStorage() { _dcOptions = std::make_unique(); _dcOptions->constructFromBuiltIn(); Local::start(); - subscribe(_dcOptions->changed(), [](const MTP::DcOptions::Ids &ids) { + subscribe(_dcOptions->changed(), [this](const MTP::DcOptions::Ids &ids) { Local::writeSettings(); - for (auto id : ids) { - MTP::restart(id); + if (auto instance = mtp()) { + for (auto id : ids) { + instance->restart(id); + } } }); } @@ -499,7 +593,7 @@ void Messenger::checkMapVersion() { void Messenger::prepareToDestroy() { _window.reset(); - MTP::finish(); + _mtproto.reset(); } Messenger::~Messenger() { diff --git a/Telegram/SourceFiles/messenger.h b/Telegram/SourceFiles/messenger.h index 309ef7d30..3b9c2be7e 100644 --- a/Telegram/SourceFiles/messenger.h +++ b/Telegram/SourceFiles/messenger.h @@ -24,6 +24,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org namespace MTP { class DcOptions; +class Instance; } // namespace MTP class AuthSession; @@ -55,6 +56,16 @@ public: MTP::DcOptions *dcOptions() { return _dcOptions.get(); } + + void setMtpMainDcId(MTP::DcId mainDcId); + void setMtpKey(MTP::DcId dcId, const MTP::AuthKey::Data &keyData); + QByteArray serializeMtpAuthorization() const; + void setMtpAuthorization(const QByteArray &serialized); + void startMtp(); + MTP::Instance *mtp() { + return _mtproto.get(); + } + AuthSession *authSession() { return _authSession.get(); } @@ -114,13 +125,16 @@ private: QMap killDownloadSessionTimes; SingleTimer killDownloadSessionsTimer; - TimeMs _lastActionTime = 0; + // Some fields are just moved from the declaration. + struct Private; + const std::unique_ptr _private; std::unique_ptr _window; FileUploader *_uploader = nullptr; Translator *_translator = nullptr; std::unique_ptr _dcOptions; + std::unique_ptr _mtproto; std::unique_ptr _authSession; }; diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index 95381c639..bfddf2e1c 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -40,9 +40,31 @@ using std::string; namespace MTP { namespace internal { - namespace { +void wrapInvokeAfter(mtpRequest &to, const mtpRequest &from, const mtpRequestMap &haveSent, int32 skipBeforeRequest = 0) { + mtpMsgId afterId(*(mtpMsgId*)(from->after->data() + 4)); + mtpRequestMap::const_iterator i = afterId ? haveSent.constFind(afterId) : haveSent.cend(); + int32 size = to->size(), lenInInts = (from.innerLength() >> 2), headlen = 4, fulllen = headlen + lenInInts; + if (i == haveSent.constEnd()) { // no invoke after or such msg was not sent or was completed recently + to->resize(size + fulllen + skipBeforeRequest); + if (skipBeforeRequest) { + memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime)); + memcpy(to->data() + size + headlen + skipBeforeRequest, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime)); + } else { + memcpy(to->data() + size, from->constData() + 4, fulllen * sizeof(mtpPrime)); + } + } else { + to->resize(size + fulllen + skipBeforeRequest + 3); + memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime)); + (*to)[size + 3] += 3 * sizeof(mtpPrime); + *((mtpTypeId*)&((*to)[size + headlen + skipBeforeRequest])) = mtpc_invokeAfterMsg; + memcpy(to->data() + size + headlen + skipBeforeRequest + 1, &afterId, 2 * sizeof(mtpPrime)); + memcpy(to->data() + size + headlen + skipBeforeRequest + 3, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime)); + if (size + 3 != 7) (*to)[7] += 3 * sizeof(mtpPrime); + } +} + bool parsePQ(const string &pqStr, string &pStr, string &qStr) { if (pqStr.length() > 8) return false; // more than 64 bit pq @@ -321,35 +343,20 @@ RSAPublicKeys InitRSAPublicKeys() { } // namespace -uint32 ThreadIdIncrement = 0; - -Thread::Thread() : QThread(nullptr) -, _threadId(++ThreadIdIncrement) { -} - -uint32 Thread::getThreadId() const { - return _threadId; -} - -Thread::~Thread() { -} - -Connection::Connection() : thread(nullptr), data(nullptr) { +Connection::Connection(Instance *instance) : _instance(instance) { } int32 Connection::prepare(SessionData *sessionData, int32 dc) { t_assert(thread == nullptr && data == nullptr); - thread = new Thread(); - data = new ConnectionPrivate(thread, this, sessionData, dc); - - dc = data->getDC(); - if (!dc) { - delete data; - data = nullptr; - delete thread; - thread = nullptr; - return 0; + thread = std::make_unique(); + auto newData = std::make_unique(_instance, thread.get(), this, sessionData, dc); + dc = newData->getDC(); + if (dc) { + // will be deleted in the thread::finished signal + data = newData.release(); + } else { + thread.reset(); } return dc; } @@ -361,9 +368,8 @@ void Connection::start() { void Connection::kill() { t_assert(data != nullptr && thread != nullptr); data->stop(); - data = nullptr; // will be deleted in thread::finished signal + data = nullptr; thread->quit(); - queueQuittingConnection(this); } void Connection::waitTillFinish() { @@ -371,8 +377,7 @@ void Connection::waitTillFinish() { DEBUG_LOG(("Waiting for connectionThread to finish")); thread->wait(); - delete thread; - thread = nullptr; + thread.reset(); } int32 Connection::state() const { @@ -388,7 +393,10 @@ QString Connection::transport() const { } Connection::~Connection() { - t_assert(data == nullptr && thread == nullptr); + t_assert(data == nullptr); + if (thread) { + waitTillFinish(); + } } void ConnectionPrivate::createConn(bool createIPv4, bool createIPv6) { @@ -440,7 +448,8 @@ void ConnectionPrivate::destroyConn(AbstractConnection **conn) { } } -ConnectionPrivate::ConnectionPrivate(QThread *thread, Connection *owner, SessionData *data, uint32 _dc) : QObject(nullptr) +ConnectionPrivate::ConnectionPrivate(Instance *instance, QThread *thread, Connection *owner, SessionData *data, uint32 _dc) : QObject() +, _instance(instance) , _state(DisconnectedState) , dc(_dc) , _owner(owner) @@ -463,7 +472,7 @@ ConnectionPrivate::ConnectionPrivate(QThread *thread, Connection *owner, Session connect(thread, SIGNAL(started()), this, SLOT(socketStart())); connect(thread, SIGNAL(finished()), this, SLOT(doFinish())); - connect(this, SIGNAL(finished(Connection*)), globalSlotCarrier(), SLOT(connectionFinished(Connection*)), Qt::QueuedConnection); + connect(this, SIGNAL(finished(internal::Connection*)), _instance, SLOT(connectionFinished(internal::Connection*)), Qt::QueuedConnection); connect(&retryTimer, SIGNAL(timeout()), this, SLOT(retryByTimer())); connect(&_waitForConnectedTimer, SIGNAL(timeout()), this, SLOT(onWaitConnectedFailed())); @@ -1104,8 +1113,8 @@ void ConnectionPrivate::socketStart(bool afterConfig) { } if (noIPv4) DEBUG_LOG(("MTP Info: DC %1 options for IPv4 over HTTP not found, waiting for config").arg(dc)); if (Global::TryIPv6() && noIPv6) DEBUG_LOG(("MTP Info: DC %1 options for IPv6 over HTTP not found, waiting for config").arg(dc)); - connect(configLoader(), SIGNAL(loaded()), this, SLOT(onConfigLoaded())); - configLoader()->load(); + connect(_instance, SIGNAL(configLoaded()), this, SLOT(onConfigLoaded()), Qt::UniqueConnection); + QMetaObject::invokeMethod(_instance, "configLoadRequest", Qt::QueuedConnection); return; } @@ -1635,7 +1644,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr mtpRequestId requestId = wasSent(resendId); if (requestId) { LOG(("Message Error: bad message notification received, msgId %1, error_code %2, fatal: clearing callbacks").arg(data.vbad_msg_id.v).arg(errorCode)); - clearCallbacksDelayed(RPCCallbackClears(1, RPCCallbackClear(requestId, -errorCode))); + _instance->clearCallbacksDelayed(RPCCallbackClears(1, RPCCallbackClear(requestId, -errorCode))); } else { DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(resendId)); } @@ -2101,7 +2110,7 @@ void ConnectionPrivate::requestsAcked(const QVector &ids, bool byRespon mtpRequestId reqId = req.value()->requestId; bool moveToAcked = byResponse; if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler) - moveToAcked = !hasCallbacks(reqId); + moveToAcked = !_instance->hasCallbacks(reqId); } if (moveToAcked) { wereAcked.insert(msgId, reqId); @@ -2119,7 +2128,7 @@ void ConnectionPrivate::requestsAcked(const QVector &ids, bool byRespon mtpRequestId reqId = reqIt.value(); bool moveToAcked = byResponse; if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler) - moveToAcked = !hasCallbacks(reqId); + moveToAcked = !_instance->hasCallbacks(reqId); } if (moveToAcked) { QWriteLocker locker4(sessionData->toSendMutex()); @@ -2160,7 +2169,7 @@ void ConnectionPrivate::requestsAcked(const QVector &ids, bool byRespon } if (clearedAcked.size()) { - clearCallbacksDelayed(clearedAcked); + _instance->clearCallbacksDelayed(clearedAcked); } if (toAckMore.size()) { @@ -2681,8 +2690,8 @@ void ConnectionPrivate::dhClientParamsAnswered() { DEBUG_LOG(("AuthKey Info: auth key gen succeed, id: %1, server salt: %2").arg(authKey->keyId()).arg(serverSalt)); - sessionData->owner()->notifyKeyCreated(authKey); // slot will call authKeyCreated() - sessionData->clear(); + sessionData->owner()->notifyKeyCreated(std::move(authKey)); // slot will call authKeyCreated() + sessionData->clear(_instance); unlockKey(); } return; diff --git a/Telegram/SourceFiles/mtproto/connection.h b/Telegram/SourceFiles/mtproto/connection.h index f38f3af20..57cbe4a9d 100644 --- a/Telegram/SourceFiles/mtproto/connection.h +++ b/Telegram/SourceFiles/mtproto/connection.h @@ -25,6 +25,9 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "core/single_timer.h" namespace MTP { + +class Instance; + namespace internal { class AbstractConnection; @@ -35,24 +38,27 @@ class Thread : public QThread { Q_OBJECT public: - Thread(); - uint32 getThreadId() const; - ~Thread(); + Thread() { + static int ThreadCounter = 0; + _threadIndex = ++ThreadCounter; + } + int getThreadIndex() const { + return _threadIndex; + } private: - uint32 _threadId; + int _threadIndex = 0; }; class Connection { public: - enum ConnectionType { TcpConnection, HttpConnection }; - Connection(); + Connection(Instance *instance); int32 prepare(SessionData *data, int32 dc = 0); // return dc void start(); @@ -67,9 +73,9 @@ public: QString transport() const; private: - - QThread *thread; - ConnectionPrivate *data; + Instance *_instance = nullptr; + std::unique_ptr thread; + ConnectionPrivate *data = nullptr; }; @@ -77,7 +83,7 @@ class ConnectionPrivate : public QObject { Q_OBJECT public: - ConnectionPrivate(QThread *thread, Connection *owner, SessionData *data, uint32 dc); + ConnectionPrivate(Instance *instance, QThread *thread, Connection *owner, SessionData *data, uint32 dc); ~ConnectionPrivate(); void stop(); @@ -102,10 +108,9 @@ signals: void resendManyAsync(QVector msgIds, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo); void resendAllAsync(); - void finished(Connection *connection); + void finished(internal::Connection *connection); public slots: - void retryByTimer(); void restartNow(); void restart(bool mayBeBadKey = false); @@ -149,7 +154,6 @@ public slots: void onConfigLoaded(); private: - void doDisconnect(); void createConn(bool createIPv4, bool createIPv6); @@ -175,8 +179,11 @@ private: void clearMessages(); bool setState(int32 state, int32 ifState = Connection::UpdateAlways); + + Instance *_instance = nullptr; + mutable QReadWriteLock stateConnMutex; - int32 _state; + int32 _state = DisconnectedState; bool _needSessionReset = false; void resetSession(); diff --git a/Telegram/SourceFiles/mtproto/dc_options.cpp b/Telegram/SourceFiles/mtproto/dc_options.cpp index 0bec478a2..b991a9508 100644 --- a/Telegram/SourceFiles/mtproto/dc_options.cpp +++ b/Telegram/SourceFiles/mtproto/dc_options.cpp @@ -200,6 +200,7 @@ void DcOptions::constructFromSerialized(const QByteArray &serialized) { return; } QDataStream stream(&buffer); + stream.setVersion(QDataStream::Qt_5_1); qint32 count = 0; stream >> count; if (stream.status() != QDataStream::Ok) { @@ -243,10 +244,6 @@ DcId DcOptions::getDefaultDcId() const { auto result = sortedDcIds(); t_assert(!result.empty()); - auto main = internal::mainDC(); - if (base::contains(result, main)) { - return main; - } return result[0]; } diff --git a/Telegram/SourceFiles/mtproto/dcenter.cpp b/Telegram/SourceFiles/mtproto/dcenter.cpp index cbded784f..7d83a73c7 100644 --- a/Telegram/SourceFiles/mtproto/dcenter.cpp +++ b/Telegram/SourceFiles/mtproto/dcenter.cpp @@ -23,82 +23,22 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "mtproto/facade.h" #include "mtproto/dc_options.h" -#include "messenger.h" +#include "mtproto/mtp_instance.h" #include "localstorage.h" namespace MTP { namespace internal { namespace { -DcenterMap gDCs; -bool configLoadedOnce = false; -bool mainDCChanged = false; -int32 _mainDC = 2; - -typedef QMap _KeysMapForWrite; -_KeysMapForWrite _keysMapForWrite; -QMutex _keysMapForWriteMutex; - constexpr auto kEnumerateDcTimeout = 8000; // 8 seconds timeout for help_getConfig to work (then move to other dc) } // namespace -DcenterMap &DCMap() { - return gDCs; -} - -bool configNeeded() { - return !configLoadedOnce; -} - -int32 mainDC() { - return _mainDC; -} - -namespace { - QMap logoutGuestMap; // dcWithShift to logout request id - bool logoutDone(mtpRequestId req) { - for (QMap::iterator i = logoutGuestMap.begin(); i != logoutGuestMap.end(); ++i) { - if (i.value() == req) { - MTP::killSession(i.key()); - logoutGuestMap.erase(i); - return true; - } - } - return false; - } -} - -void logoutOtherDCs() { - QList dcs; - { - QMutexLocker lock(&_keysMapForWriteMutex); - dcs = _keysMapForWrite.keys(); - } - for (int32 i = 0, cnt = dcs.size(); i != cnt; ++i) { - if (dcs[i] != MTP::maindc()) { - logoutGuestMap.insert(MTP::lgtDcId(dcs[i]), MTP::send(MTPauth_LogOut(), rpcDone(&logoutDone), rpcFail(&logoutDone), MTP::lgtDcId(dcs[i]))); - } - } -} - -void setDC(int32 dc, bool firstOnly) { - if (!dc || (firstOnly && mainDCChanged)) return; - mainDCChanged = true; - if (dc != _mainDC) { - _mainDC = dc; - } -} - -Dcenter::Dcenter(int32 id, const AuthKeyPtr &key) : _id(id), _key(key), _connectionInited(false) { +Dcenter::Dcenter(Instance *instance, DcId dcId, AuthKeyPtr &&key) +: _instance(instance) +, _id(dcId) +, _key(std::move(key)) { connect(this, SIGNAL(authKeyCreated()), this, SLOT(authKeyWrite()), Qt::QueuedConnection); - - QMutexLocker lock(&_keysMapForWriteMutex); - if (_key) { - _keysMapForWrite[_id] = _key; - } else { - _keysMapForWrite.remove(_id); - } } void Dcenter::authKeyWrite() { @@ -108,18 +48,13 @@ void Dcenter::authKeyWrite() { } } -void Dcenter::setKey(const AuthKeyPtr &key) { +void Dcenter::setKey(AuthKeyPtr &&key) { DEBUG_LOG(("AuthKey Info: MTProtoDC::setKey(%1), emitting authKeyCreated, dc %2").arg(key ? key->keyId() : 0).arg(_id)); - _key = key; + _key = std::move(key); _connectionInited = false; emit authKeyCreated(); - QMutexLocker lock(&_keysMapForWriteMutex); - if (_key) { - _keysMapForWrite[_id] = _key; - } else { - _keysMapForWrite.remove(_id); - } + _instance->setKeyForWrite(_id, _key); } QReadWriteLock *Dcenter::keyMutex() const { @@ -132,99 +67,45 @@ const AuthKeyPtr &Dcenter::getKey() const { void Dcenter::destroyKey() { setKey(AuthKeyPtr()); - - QMutexLocker lock(&_keysMapForWriteMutex); - _keysMapForWrite.remove(_id); } -namespace { - -ConfigLoader *_configLoader = nullptr; -auto loadingConfig = false; - -void configLoaded(const MTPConfig &result) { - loadingConfig = false; - - auto &data = result.c_config(); - - DEBUG_LOG(("MTP Info: got config, chat_size_max: %1, date: %2, test_mode: %3, this_dc: %4, dc_options.length: %5").arg(data.vchat_size_max.v).arg(data.vdate.v).arg(mtpIsTrue(data.vtest_mode)).arg(data.vthis_dc.v).arg(data.vdc_options.c_vector().v.size())); - - if (data.vdc_options.c_vector().v.empty()) { - LOG(("MTP Error: config with empty dc_options received!")); - } else { - Messenger::Instance().dcOptions()->setFromList(data.vdc_options); - } - - Global::SetChatSizeMax(data.vchat_size_max.v); - Global::SetMegagroupSizeMax(data.vmegagroup_size_max.v); - Global::SetForwardedCountMax(data.vforwarded_count_max.v); - Global::SetOnlineUpdatePeriod(data.vonline_update_period_ms.v); - Global::SetOfflineBlurTimeout(data.voffline_blur_timeout_ms.v); - Global::SetOfflineIdleTimeout(data.voffline_idle_timeout_ms.v); - Global::SetOnlineCloudTimeout(data.vonline_cloud_timeout_ms.v); - Global::SetNotifyCloudDelay(data.vnotify_cloud_delay_ms.v); - Global::SetNotifyDefaultDelay(data.vnotify_default_delay_ms.v); - Global::SetChatBigSize(data.vchat_big_size.v); // ? - Global::SetPushChatPeriod(data.vpush_chat_period_ms.v); // ? - Global::SetPushChatLimit(data.vpush_chat_limit.v); // ? - Global::SetSavedGifsLimit(data.vsaved_gifs_limit.v); - Global::SetEditTimeLimit(data.vedit_time_limit.v); // ? - Global::SetStickersRecentLimit(data.vstickers_recent_limit.v); - Global::SetPinnedDialogsCountMax(data.vpinned_dialogs_count_max.v); - - configLoadedOnce = true; - Local::writeSettings(); - - configLoader()->done(); -} - -bool configFailed(const RPCError &error) { - if (MTP::isDefaultHandledError(error)) return false; - - loadingConfig = false; - LOG(("MTP Error: failed to get config!")); - return false; -} - -}; - -ConfigLoader::ConfigLoader() { +ConfigLoader::ConfigLoader(Instance *instance, RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail) : _instance(instance) +, _doneHandler(onDone) +, _failHandler(onFail) { connect(&_enumDCTimer, SIGNAL(timeout()), this, SLOT(enumDC())); } void ConfigLoader::load() { - if (loadingConfig) return; - loadingConfig = true; - - MTP::send(MTPhelp_GetConfig(), rpcDone(configLoaded), rpcFail(configFailed)); + sendRequest(_instance->mainDcId()); _enumDCTimer.start(kEnumerateDcTimeout); } -void ConfigLoader::done() { +mtpRequestId ConfigLoader::sendRequest(ShiftedDcId shiftedDcId) { + return _instance->send(MTPhelp_GetConfig(), _doneHandler, _failHandler, shiftedDcId); +} + +ConfigLoader::~ConfigLoader() { _enumDCTimer.stop(); if (_enumRequest) { - MTP::cancel(_enumRequest); - _enumRequest = 0; + _instance->cancel(_enumRequest); } if (_enumCurrent) { - MTP::killSession(MTP::cfgDcId(_enumCurrent)); - _enumCurrent = 0; + _instance->killSession(MTP::cfgDcId(_enumCurrent)); } - emit loaded(); } void ConfigLoader::enumDC() { - if (!loadingConfig) return; - - if (_enumRequest) MTP::cancel(_enumRequest); + if (_enumRequest) { + _instance->cancel(_enumRequest); + } if (!_enumCurrent) { - _enumCurrent = _mainDC; + _enumCurrent = _instance->mainDcId(); } else { - MTP::killSession(MTP::cfgDcId(_enumCurrent)); + _instance->killSession(MTP::cfgDcId(_enumCurrent)); } - auto ids = Messenger::Instance().dcOptions()->sortedDcIds(); + auto ids = _instance->dcOptions()->sortedDcIds(); t_assert(!ids.empty()); auto i = std::find(ids.cbegin(), ids.cend(), _enumCurrent); @@ -233,34 +114,10 @@ void ConfigLoader::enumDC() { } else { _enumCurrent = *i; } - _enumRequest = MTP::send(MTPhelp_GetConfig(), rpcDone(configLoaded), rpcFail(configFailed), MTP::cfgDcId(_enumCurrent)); + _enumRequest = sendRequest(MTP::cfgDcId(_enumCurrent)); _enumDCTimer.start(kEnumerateDcTimeout); } -ConfigLoader *configLoader() { - if (!_configLoader) _configLoader = new ConfigLoader(); - return _configLoader; -} - -void destroyConfigLoader() { - delete _configLoader; - _configLoader = nullptr; -} - -AuthKeysMap getAuthKeys() { - AuthKeysMap result; - QMutexLocker lock(&_keysMapForWriteMutex); - for_const (const AuthKeyPtr &key, _keysMapForWrite) { - result.push_back(key); - } - return result; -} - -void setAuthKey(int32 dcId, AuthKeyPtr key) { - DcenterPtr dc(new Dcenter(dcId, key)); - gDCs.insert(dcId, dc); -} - } // namespace internal } // namespace MTP diff --git a/Telegram/SourceFiles/mtproto/dcenter.h b/Telegram/SourceFiles/mtproto/dcenter.h index 698bf1538..9be724fa4 100644 --- a/Telegram/SourceFiles/mtproto/dcenter.h +++ b/Telegram/SourceFiles/mtproto/dcenter.h @@ -21,19 +21,24 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #pragma once #include "core/single_timer.h" +#include "mtproto/rpc_sender.h" +#include "mtproto/auth_key.h" namespace MTP { + +class Instance; + namespace internal { class Dcenter : public QObject { Q_OBJECT public: - Dcenter(int32 id, const AuthKeyPtr &key); + Dcenter(Instance *instance, DcId dcId, AuthKeyPtr &&key); QReadWriteLock *keyMutex() const; const AuthKeyPtr &getKey() const; - void setKey(const AuthKeyPtr &key); + void setKey(AuthKeyPtr &&key); void destroyKey(); bool connectionInited() const { @@ -56,47 +61,40 @@ private slots: private: mutable QReadWriteLock keyLock; mutable QMutex initLock; - int32 _id; + Instance *_instance = nullptr; + DcId _id = 0; AuthKeyPtr _key; - bool _connectionInited; + bool _connectionInited = false; }; -typedef QSharedPointer DcenterPtr; -typedef QMap DcenterMap; +using DcenterPtr = std::shared_ptr; +using DcenterMap = std::map; class ConfigLoader : public QObject { Q_OBJECT public: - ConfigLoader(); + ConfigLoader(Instance *instance, RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail); + ~ConfigLoader(); + void load(); - void done(); public slots: void enumDC(); -signals: - void loaded(); - private: + mtpRequestId sendRequest(ShiftedDcId shiftedDcId); + + Instance *_instance = nullptr; SingleTimer _enumDCTimer; DcId _enumCurrent = 0; mtpRequestId _enumRequest = 0; + RPCDoneHandlerPtr _doneHandler; + RPCFailHandlerPtr _failHandler; + }; -ConfigLoader *configLoader(); -void destroyConfigLoader(); - -DcenterMap &DCMap(); -bool configNeeded(); -int32 mainDC(); -void logoutOtherDCs(); -void setDC(int32 dc, bool firstOnly = false); - -AuthKeysMap getAuthKeys(); -void setAuthKey(int32 dc, AuthKeyPtr key); - } // namespace internal } // namespace MTP diff --git a/Telegram/SourceFiles/mtproto/facade.cpp b/Telegram/SourceFiles/mtproto/facade.cpp index b36c87802..67ec76ff4 100644 --- a/Telegram/SourceFiles/mtproto/facade.cpp +++ b/Telegram/SourceFiles/mtproto/facade.cpp @@ -22,366 +22,16 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "mtproto/facade.h" #include "localstorage.h" -#include "auth_session.h" +#include "messenger.h" namespace MTP { - +namespace internal { namespace { - typedef QMap Sessions; - Sessions sessions; - internal::Session *mainSession; - - typedef QMap RequestsByDC; // holds dcWithShift for request to this dc or -dc for request to main dc - RequestsByDC requestsByDC; - QMutex requestByDCLock; - - typedef QMap AuthExportRequests; // holds target dcWithShift for auth export request - AuthExportRequests authExportRequests; - - bool _started = false; - - uint32 layer; - - typedef QMap ParserMap; - ParserMap parserMap; - QMutex parserMapLock; - - typedef QMap RequestMap; - RequestMap requestMap; - QReadWriteLock requestMapLock; - - typedef QPair DelayedRequest; - typedef QList DelayedRequestsList; - DelayedRequestsList delayedRequests; - - typedef QMap RequestsDelays; - RequestsDelays requestsDelays; - - typedef QSet BadGuestDCRequests; - BadGuestDCRequests badGuestDCRequests; - - typedef QVector DCAuthWaiters; - typedef QMap AuthWaiters; // holds request ids waiting for auth import to specific dc - AuthWaiters authWaiters; - - typedef OrderedSet MTPQuittingConnections; - MTPQuittingConnections quittingConnections; - - QMutex toClearLock; - RPCCallbackClears toClear; - - RPCResponseHandler globalHandler; - MTPStateChangedHandler stateChangedHandler = 0; - MTPSessionResetHandler sessionResetHandler = 0; - internal::GlobalSlotCarrier *_globalSlotCarrier = 0; - - bool hasAuthorization() { - return (AuthSession::Current() != nullptr); - } - - void importDone(const MTPauth_Authorization &result, mtpRequestId req) { - QMutexLocker locker1(&requestByDCLock); - - RequestsByDC::iterator i = requestsByDC.find(req); - if (i == requestsByDC.end()) { - LOG(("MTP Error: auth import request not found in requestsByDC, requestId: %1").arg(req)); - RPCError error(internal::rpcClientError("AUTH_IMPORT_FAIL", QString("did not find import request in requestsByDC, request %1").arg(req))); - if (globalHandler.onFail && hasAuthorization()) (*globalHandler.onFail)(req, error); // auth failed in main dc - return; - } - DcId newdc = bareDcId(i.value()); - - DEBUG_LOG(("MTP Info: auth import to dc %1 succeeded").arg(newdc)); - - DCAuthWaiters &waiters(authWaiters[newdc]); - if (waiters.size()) { - QReadLocker locker(&requestMapLock); - for (DCAuthWaiters::iterator i = waiters.begin(), e = waiters.end(); i != e; ++i) { - mtpRequestId requestId = *i; - RequestMap::const_iterator j = requestMap.constFind(requestId); - if (j == requestMap.cend()) { - LOG(("MTP Error: could not find request %1 for resending").arg(requestId)); - continue; - } - ShiftedDcId dcWithShift = newdc; - { - RequestsByDC::iterator k = requestsByDC.find(requestId); - if (k == requestsByDC.cend()) { - LOG(("MTP Error: could not find request %1 by dc for resending").arg(requestId)); - continue; - } - if (k.value() < 0) { - setdc(newdc); - k.value() = -newdc; - } else { - dcWithShift = shiftDcId(newdc, getDcIdShift(k.value())); - k.value() = dcWithShift; - } - DEBUG_LOG(("MTP Info: resending request %1 to dc %2 after import auth").arg(requestId).arg(k.value())); - } - if (internal::Session *session = internal::getSession(dcWithShift)) { - session->sendPrepared(j.value()); - } - } - waiters.clear(); - } - } - - bool importFail(const RPCError &error, mtpRequestId req) { - if (isDefaultHandledError(error)) return false; - - if (globalHandler.onFail && hasAuthorization()) (*globalHandler.onFail)(req, error); // auth import failed - return true; - } - - void exportDone(const MTPauth_ExportedAuthorization &result, mtpRequestId req) { - AuthExportRequests::const_iterator i = authExportRequests.constFind(req); - if (i == authExportRequests.cend()) { - LOG(("MTP Error: auth export request target dcWithShift not found, requestId: %1").arg(req)); - RPCError error(internal::rpcClientError("AUTH_IMPORT_FAIL", QString("did not find target dcWithShift, request %1").arg(req))); - if (globalHandler.onFail && hasAuthorization()) (*globalHandler.onFail)(req, error); // auth failed in main dc - return; - } - - const auto &data(result.c_auth_exportedAuthorization()); - send(MTPauth_ImportAuthorization(data.vid, data.vbytes), rpcDone(importDone), rpcFail(importFail), i.value()); - authExportRequests.remove(req); - } - - bool exportFail(const RPCError &error, mtpRequestId req) { - if (isDefaultHandledError(error)) return false; - - AuthExportRequests::const_iterator i = authExportRequests.constFind(req); - if (i != authExportRequests.cend()) { - authWaiters[bareDcId(i.value())].clear(); - } - if (globalHandler.onFail && hasAuthorization()) (*globalHandler.onFail)(req, error); // auth failed in main dc - return true; - } - - bool onErrorDefault(mtpRequestId requestId, const RPCError &error) { - const QString &err(error.type()); - int32 code = error.code(); - if (!isFloodError(error) && err != qstr("AUTH_KEY_UNREGISTERED")) { - int breakpoint = 0; - } - bool badGuestDC = (code == 400) && (err == qsl("FILE_ID_INVALID")); - QRegularExpressionMatch m; - if ((m = QRegularExpression("^(FILE|PHONE|NETWORK|USER)_MIGRATE_(\\d+)$").match(err)).hasMatch()) { - if (!requestId) return false; - - ShiftedDcId dcWithShift = 0, newdcWithShift = m.captured(2).toInt(); - { - QMutexLocker locker(&requestByDCLock); - RequestsByDC::iterator i = requestsByDC.find(requestId); - if (i == requestsByDC.end()) { - LOG(("MTP Error: could not find request %1 for migrating to %2").arg(requestId).arg(newdcWithShift)); - } else { - dcWithShift = i.value(); - } - } - if (!dcWithShift || !newdcWithShift) return false; - - DEBUG_LOG(("MTP Info: changing request %1 from dcWithShift%2 to dc%3").arg(requestId).arg(dcWithShift).arg(newdcWithShift)); - if (dcWithShift < 0) { // newdc shift = 0 - if (false && hasAuthorization() && !authExportRequests.contains(requestId)) { // migrate not supported at this moment - DEBUG_LOG(("MTP Info: importing auth to dc %1").arg(newdcWithShift)); - DCAuthWaiters &waiters(authWaiters[newdcWithShift]); - if (!waiters.size()) { - authExportRequests.insert(send(MTPauth_ExportAuthorization(MTP_int(newdcWithShift)), rpcDone(exportDone), rpcFail(exportFail)), newdcWithShift); - } - waiters.push_back(requestId); - return true; - } else { - MTP::setdc(newdcWithShift); - } - } else { - newdcWithShift = shiftDcId(newdcWithShift, getDcIdShift(dcWithShift)); - } - - mtpRequest req; - { - QReadLocker locker(&requestMapLock); - RequestMap::const_iterator i = requestMap.constFind(requestId); - if (i == requestMap.cend()) { - LOG(("MTP Error: could not find request %1").arg(requestId)); - return false; - } - req = i.value(); - } - if (auto session = internal::getSession(newdcWithShift)) { - internal::registerRequest(requestId, (dcWithShift < 0) ? -newdcWithShift : newdcWithShift); - session->sendPrepared(req); - } - return true; - } else if (code < 0 || code >= 500 || (m = QRegularExpression("^FLOOD_WAIT_(\\d+)$").match(err)).hasMatch()) { - if (!requestId) return false; - - int32 secs = 1; - if (code < 0 || code >= 500) { - RequestsDelays::iterator i = requestsDelays.find(requestId); - if (i != requestsDelays.cend()) { - secs = (i.value() > 60) ? i.value() : (i.value() *= 2); - } else { - requestsDelays.insert(requestId, secs); - } - } else { - secs = m.captured(1).toInt(); -// if (secs >= 60) return false; - } - auto sendAt = getms(true) + secs * 1000 + 10; - DelayedRequestsList::iterator i = delayedRequests.begin(), e = delayedRequests.end(); - for (; i != e; ++i) { - if (i->first == requestId) return true; - if (i->second > sendAt) break; - } - delayedRequests.insert(i, DelayedRequest(requestId, sendAt)); - - if (_globalSlotCarrier) _globalSlotCarrier->checkDelayed(); - - return true; - } else if (code == 401 || (badGuestDC && badGuestDCRequests.constFind(requestId) == badGuestDCRequests.cend())) { - int32 dcWithShift = 0; - { - QMutexLocker locker(&requestByDCLock); - RequestsByDC::iterator i = requestsByDC.find(requestId); - if (i != requestsByDC.end()) { - dcWithShift = i.value(); - } else { - LOG(("MTP Error: unauthorized request without dc info, requestId %1").arg(requestId)); - } - } - int32 newdc = bareDcId(qAbs(dcWithShift)); - if (!newdc || newdc == internal::mainDC() || !hasAuthorization()) { - if (!badGuestDC && globalHandler.onFail) (*globalHandler.onFail)(requestId, error); // auth failed in main dc - return false; - } - - DEBUG_LOG(("MTP Info: importing auth to dcWithShift %1").arg(dcWithShift)); - DCAuthWaiters &waiters(authWaiters[newdc]); - if (!waiters.size()) { - authExportRequests.insert(send(MTPauth_ExportAuthorization(MTP_int(newdc)), rpcDone(exportDone), rpcFail(exportFail)), abs(dcWithShift)); - } - waiters.push_back(requestId); - if (badGuestDC) badGuestDCRequests.insert(requestId); - return true; - } else if (err == qstr("CONNECTION_NOT_INITED") || err == qstr("CONNECTION_LAYER_INVALID")) { - mtpRequest req; - { - QReadLocker locker(&requestMapLock); - RequestMap::const_iterator i = requestMap.constFind(requestId); - if (i == requestMap.cend()) { - LOG(("MTP Error: could not find request %1").arg(requestId)); - return false; - } - req = i.value(); - } - int32 dcWithShift = 0; - { - QMutexLocker locker(&requestByDCLock); - RequestsByDC::iterator i = requestsByDC.find(requestId); - if (i == requestsByDC.end()) { - LOG(("MTP Error: could not find request %1 for resending with init connection").arg(requestId)); - } else { - dcWithShift = i.value(); - } - } - if (!dcWithShift) return false; - - if (internal::Session *session = internal::getSession(qAbs(dcWithShift))) { - req->needsLayer = true; - session->sendPrepared(req); - } - return true; - } else if (err == qstr("MSG_WAIT_FAILED")) { - mtpRequest req; - { - QReadLocker locker(&requestMapLock); - RequestMap::const_iterator i = requestMap.constFind(requestId); - if (i == requestMap.cend()) { - LOG(("MTP Error: could not find request %1").arg(requestId)); - return false; - } - req = i.value(); - } - if (!req->after) { - LOG(("MTP Error: wait failed for not dependent request %1").arg(requestId)); - return false; - } - int32 dcWithShift = 0; - { - QMutexLocker locker(&requestByDCLock); - RequestsByDC::iterator i = requestsByDC.find(requestId), j = requestsByDC.find(req->after->requestId); - if (i == requestsByDC.end()) { - LOG(("MTP Error: could not find request %1 by dc").arg(requestId)); - } else if (j == requestsByDC.end()) { - LOG(("MTP Error: could not find dependent request %1 by dc").arg(req->after->requestId)); - } else { - dcWithShift = i.value(); - if (i.value() != j.value()) { - req->after = mtpRequest(); - } - } - } - if (!dcWithShift) return false; - - if (!req->after) { - if (internal::Session *session = internal::getSession(qAbs(dcWithShift))) { - req->needsLayer = true; - session->sendPrepared(req); - } - } else { - int32 newdc = bareDcId(qAbs(dcWithShift)); - DCAuthWaiters &waiters(authWaiters[newdc]); - if (waiters.indexOf(req->after->requestId) >= 0) { - if (waiters.indexOf(requestId) < 0) { - waiters.push_back(requestId); - } - if (badGuestDCRequests.constFind(req->after->requestId) != badGuestDCRequests.cend()) { - if (badGuestDCRequests.constFind(requestId) == badGuestDCRequests.cend()) { - badGuestDCRequests.insert(requestId); - } - } - } else { - DelayedRequestsList::iterator i = delayedRequests.begin(), e = delayedRequests.end(); - for (; i != e; ++i) { - if (i->first == requestId) return true; - if (i->first == req->after->requestId) break; - } - if (i != e) { - delayedRequests.insert(i, DelayedRequest(requestId, i->second)); - } - - if (_globalSlotCarrier) _globalSlotCarrier->checkDelayed(); - } - } - return true; - } - if (badGuestDC) badGuestDCRequests.remove(requestId); - return false; - } - int PauseLevel = 0; } // namespace -namespace internal { - -Session *getSession(ShiftedDcId shiftedDcId) { - if (!_started) return nullptr; - if (!shiftedDcId) return mainSession; - if (!bareDcId(shiftedDcId)) { - shiftedDcId += bareDcId(mainSession->getDcWithShift()); - } - - Sessions::const_iterator i = sessions.constFind(shiftedDcId); - if (i == sessions.cend()) { - i = sessions.insert(shiftedDcId, new Session(shiftedDcId)); - } - return i.value(); -} - bool paused() { return PauseLevel > 0; } @@ -392,504 +42,17 @@ void pause() { void unpause() { --PauseLevel; - if (_started) { - for_const (auto session, sessions) { - session->unpaused(); + if (!PauseLevel) { + if (auto instance = MainInstance()) { + instance->unpaused(); } } } -void registerRequest(mtpRequestId requestId, int32 dcWithShift) { - { - QMutexLocker locker(&requestByDCLock); - requestsByDC.insert(requestId, dcWithShift); - } - internal::performDelayedClear(); // need to do it somewhere... -} - -void unregisterRequest(mtpRequestId requestId) { - requestsDelays.remove(requestId); - - { - QWriteLocker locker(&requestMapLock); - requestMap.remove(requestId); - } - - QMutexLocker locker(&requestByDCLock); - requestsByDC.remove(requestId); -} - -mtpRequestId storeRequest(mtpRequest &request, const RPCResponseHandler &parser) { - mtpRequestId res = reqid(); - request->requestId = res; - if (parser.onDone || parser.onFail) { - QMutexLocker locker(&parserMapLock); - parserMap.insert(res, parser); - } - { - QWriteLocker locker(&requestMapLock); - requestMap.insert(res, request); - } - return res; -} - -mtpRequest getRequest(mtpRequestId reqId) { - static mtpRequest zero; - mtpRequest req; - { - QReadLocker locker(&requestMapLock); - RequestMap::const_iterator i = requestMap.constFind(reqId); - req = (i == requestMap.cend()) ? zero : i.value(); - } - return req; -} - -void wrapInvokeAfter(mtpRequest &to, const mtpRequest &from, const mtpRequestMap &haveSent, int32 skipBeforeRequest) { - mtpMsgId afterId(*(mtpMsgId*)(from->after->data() + 4)); - mtpRequestMap::const_iterator i = afterId ? haveSent.constFind(afterId) : haveSent.cend(); - int32 size = to->size(), lenInInts = (from.innerLength() >> 2), headlen = 4, fulllen = headlen + lenInInts; - if (i == haveSent.constEnd()) { // no invoke after or such msg was not sent or was completed recently - to->resize(size + fulllen + skipBeforeRequest); - if (skipBeforeRequest) { - memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime)); - memcpy(to->data() + size + headlen + skipBeforeRequest, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime)); - } else { - memcpy(to->data() + size, from->constData() + 4, fulllen * sizeof(mtpPrime)); - } - } else { - to->resize(size + fulllen + skipBeforeRequest + 3); - memcpy(to->data() + size, from->constData() + 4, headlen * sizeof(mtpPrime)); - (*to)[size + 3] += 3 * sizeof(mtpPrime); - *((mtpTypeId*)&((*to)[size + headlen + skipBeforeRequest])) = mtpc_invokeAfterMsg; - memcpy(to->data() + size + headlen + skipBeforeRequest + 1, &afterId, 2 * sizeof(mtpPrime)); - memcpy(to->data() + size + headlen + skipBeforeRequest + 3, from->constData() + 4 + headlen, lenInInts * sizeof(mtpPrime)); - if (size + 3 != 7) (*to)[7] += 3 * sizeof(mtpPrime); - } -} - -void clearCallbacks(mtpRequestId requestId, int32 errorCode) { - RPCResponseHandler h; - bool found = false; - { - QMutexLocker locker(&parserMapLock); - ParserMap::iterator i = parserMap.find(requestId); - if (i != parserMap.end()) { - h = i.value(); - found = true; - - parserMap.erase(i); - } - } - if (errorCode && found) { - rpcErrorOccured(requestId, h, rpcClientError("CLEAR_CALLBACK", QString("did not handle request %1, error code %2").arg(requestId).arg(errorCode))); - } -} - -void clearCallbacksDelayed(const RPCCallbackClears &requestIds) { - uint32 idsCount = requestIds.size(); - if (!idsCount) return; - - if (cDebug()) { - QString idsStr = QString("%1").arg(requestIds[0].requestId); - for (uint32 i = 1; i < idsCount; ++i) { - idsStr += QString(", %1").arg(requestIds[i].requestId); - } - DEBUG_LOG(("RPC Info: clear callbacks delayed, msgIds: %1").arg(idsStr)); - } - - QMutexLocker lock(&toClearLock); - uint32 toClearNow = toClear.size(); - if (toClearNow) { - toClear.resize(toClearNow + idsCount); - memcpy(toClear.data() + toClearNow, requestIds.constData(), idsCount * sizeof(RPCCallbackClear)); - } else { - toClear = requestIds; - } -} - -void performDelayedClear() { - QMutexLocker lock(&toClearLock); - if (!toClear.isEmpty()) { - for (RPCCallbackClears::iterator i = toClear.begin(), e = toClear.end(); i != e; ++i) { - if (cDebug()) { - QMutexLocker locker(&parserMapLock); - if (parserMap.find(i->requestId) != parserMap.end()) { - DEBUG_LOG(("RPC Info: clearing delayed callback %1, error code %2").arg(i->requestId).arg(i->errorCode)); - } - } - clearCallbacks(i->requestId, i->errorCode); - internal::unregisterRequest(i->requestId); - } - toClear.clear(); - } -} - -void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end) { - RPCResponseHandler h; - { - QMutexLocker locker(&parserMapLock); - ParserMap::iterator i = parserMap.find(requestId); - if (i != parserMap.cend()) { - h = i.value(); - parserMap.erase(i); - - DEBUG_LOG(("RPC Info: found parser for request %1, trying to parse response...").arg(requestId)); - } - } - if (h.onDone || h.onFail) { - try { - if (from >= end) throw mtpErrorInsufficient(); - - if (*from == mtpc_rpc_error) { - RPCError err(MTPRpcError(from, end)); - DEBUG_LOG(("RPC Info: error received, code %1, type %2, description: %3").arg(err.code()).arg(err.type()).arg(err.description())); - if (!rpcErrorOccured(requestId, h, err)) { - QMutexLocker locker(&parserMapLock); - parserMap.insert(requestId, h); - return; - } - } else { - if (h.onDone) { -// t_assert(App::app() != 0); - (*h.onDone)(requestId, from, end); - } - } - } catch (Exception &e) { - if (!rpcErrorOccured(requestId, h, rpcClientError("RESPONSE_PARSE_FAILED", QString("exception text: ") + e.what()))) { - QMutexLocker locker(&parserMapLock); - parserMap.insert(requestId, h); - return; - } - } - } else { - DEBUG_LOG(("RPC Info: parser not found for %1").arg(requestId)); - } - unregisterRequest(requestId); -} - -bool hasCallbacks(mtpRequestId requestId) { - QMutexLocker locker(&parserMapLock); - ParserMap::iterator i = parserMap.find(requestId); - return (i != parserMap.cend()); -} - -void globalCallback(const mtpPrime *from, const mtpPrime *end) { - if (globalHandler.onDone) (*globalHandler.onDone)(0, from, end); // some updates were received -} - -void onStateChange(int32 dcWithShift, int32 state) { - if (stateChangedHandler) stateChangedHandler(dcWithShift, state); -} - -void onSessionReset(int32 dcWithShift) { - if (sessionResetHandler) sessionResetHandler(dcWithShift); -} - -bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err) { // return true if need to clean request data - if (isDefaultHandledError(err)) { - if (onFail && (*onFail)(requestId, err)) return true; - } - - if (onErrorDefault(requestId, err)) { - return false; - } - LOG(("RPC Error: request %1 got fail with code %2, error %3%4").arg(requestId).arg(err.code()).arg(err.type()).arg(err.description().isEmpty() ? QString() : QString(": %1").arg(err.description()))); - onFail && (*onFail)(requestId, err); - return true; -} - -GlobalSlotCarrier::GlobalSlotCarrier() { - connect(&_timer, SIGNAL(timeout()), this, SLOT(checkDelayed())); -} - -void GlobalSlotCarrier::checkDelayed() { - auto now = getms(true); - while (!delayedRequests.isEmpty() && now >= delayedRequests.front().second) { - mtpRequestId requestId = delayedRequests.front().first; - delayedRequests.pop_front(); - - int32 dcWithShift = 0; - { - QMutexLocker locker(&requestByDCLock); - RequestsByDC::const_iterator i = requestsByDC.constFind(requestId); - if (i != requestsByDC.cend()) { - dcWithShift = i.value(); - } else { - LOG(("MTP Error: could not find request dc for delayed resend, requestId %1").arg(requestId)); - continue; - } - } - - mtpRequest req; - { - QReadLocker locker(&requestMapLock); - RequestMap::const_iterator j = requestMap.constFind(requestId); - if (j == requestMap.cend()) { - DEBUG_LOG(("MTP Error: could not find request %1").arg(requestId)); - continue; - } - req = j.value(); - } - if (Session *session = getSession(qAbs(dcWithShift))) { - session->sendPrepared(req); - } - } - - if (!delayedRequests.isEmpty()) { - _timer.start(delayedRequests.front().second - now); - } -} - -void GlobalSlotCarrier::connectionFinished(Connection *connection) { - MTPQuittingConnections::iterator i = quittingConnections.find(connection); - if (i != quittingConnections.cend()) { - quittingConnections.erase(i); - } - - connection->waitTillFinish(); - delete connection; -} - -GlobalSlotCarrier *globalSlotCarrier() { - return _globalSlotCarrier; -} - -void queueQuittingConnection(Connection *connection) { - quittingConnections.insert(connection); -} - } // namespace internal -void start() { - if (started()) return; - - unixtimeInit(); - - internal::DcenterMap &dcs(internal::DCMap()); - - _globalSlotCarrier = new internal::GlobalSlotCarrier(); - - mainSession = new internal::Session(internal::mainDC()); - sessions.insert(mainSession->getDcWithShift(), mainSession); - - _started = true; - - if (internal::configNeeded()) { - internal::configLoader()->load(); - } -} - -bool started() { - return _started; -} - -void restart() { - if (!_started) return; - - for (auto i = sessions.cbegin(), e = sessions.cend(); i != e; ++i) { - i.value()->restart(); - } -} - -void restart(int32 dcMask) { - if (!_started) return; - - dcMask = bareDcId(dcMask); - for (Sessions::const_iterator i = sessions.cbegin(), e = sessions.cend(); i != e; ++i) { - if (bareDcId(i.value()->getDcWithShift()) == dcMask) { - i.value()->restart(); - } - } -} - -void configure(int32 dc) { - if (_started) return; - internal::setDC(dc); -} - -void setdc(int32 dc, bool fromZeroOnly) { - if (!dc || !_started) return; - internal::setDC(dc, fromZeroOnly); - int32 oldMainDc = mainSession->getDcWithShift(); - if (maindc() != oldMainDc) { - killSession(oldMainDc); - } - Local::writeMtpData(); -} - -int32 maindc() { - return internal::mainDC(); -} - -int32 dcstate(int32 dc) { - if (!_started) return 0; - - if (!dc) return mainSession->getState(); - if (!bareDcId(dc)) { - dc += bareDcId(mainSession->getDcWithShift()); - } - - Sessions::const_iterator i = sessions.constFind(dc); - if (i != sessions.cend()) return i.value()->getState(); - - return DisconnectedState; -} - -QString dctransport(int32 dc) { - if (!_started) return QString(); - - if (!dc) return mainSession->transport(); - if (!bareDcId(dc)) { - dc += bareDcId(mainSession->getDcWithShift()); - } - - Sessions::const_iterator i = sessions.constFind(dc); - if (i != sessions.cend()) return i.value()->transport(); - - return QString(); -} - -void ping() { - if (internal::Session *session = internal::getSession(0)) { - session->ping(); - } -} - -void cancel(mtpRequestId requestId) { - if (!_started || !requestId) return; - - mtpMsgId msgId = 0; - requestsDelays.remove(requestId); - { - QWriteLocker locker(&requestMapLock); - RequestMap::iterator i = requestMap.find(requestId); - if (i != requestMap.end()) { - msgId = *(mtpMsgId*)(i.value()->constData() + 4); - requestMap.erase(i); - } - } - { - QMutexLocker locker(&requestByDCLock); - RequestsByDC::iterator i = requestsByDC.find(requestId); - if (i != requestsByDC.end()) { - if (internal::Session *session = internal::getSession(qAbs(i.value()))) { - session->cancel(requestId, msgId); - } - requestsByDC.erase(i); - } - } - internal::clearCallbacks(requestId); -} - -void killSession(int32 dc) { - Sessions::iterator i = sessions.find(dc); - if (i != sessions.cend()) { - bool wasMain = (i.value() == mainSession); - - i.value()->kill(); - i.value()->deleteLater(); - sessions.erase(i); - - if (wasMain) { - mainSession = new internal::Session(internal::mainDC()); - int32 newdc = mainSession->getDcWithShift(); - i = sessions.find(newdc); - if (i != sessions.cend()) { - i.value()->kill(); - i.value()->deleteLater(); - sessions.erase(i); - } - sessions.insert(newdc, mainSession); - } - } -} - -void stopSession(int32 dc) { - Sessions::iterator i = sessions.find(dc); - if (i != sessions.end()) { - if (i.value() != mainSession) { // don't stop main session - i.value()->stop(); - } - } -} - -int32 state(mtpRequestId requestId) { - if (requestId > 0) { - QMutexLocker locker(&requestByDCLock); - RequestsByDC::iterator i = requestsByDC.find(requestId); - if (i != requestsByDC.end()) { - if (internal::Session *session = internal::getSession(qAbs(i.value()))) { - return session->requestState(requestId); - } - return MTP::RequestConnecting; - } - return MTP::RequestSent; - } - if (internal::Session *session = internal::getSession(-requestId)) { - return session->requestState(0); - } - return MTP::RequestConnecting; -} - -void finish() { - mainSession = nullptr; - for (auto session : base::take(sessions)) { - session->kill(); - delete session; - } - - for_const (auto connection, quittingConnections) { - connection->waitTillFinish(); - delete connection; - } - quittingConnections.clear(); - - delete _globalSlotCarrier; - _globalSlotCarrier = nullptr; - - internal::destroyConfigLoader(); - - _started = false; -} - -void logoutKeys(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail) { - mtpRequestId req = MTP::send(MTPauth_LogOut(), onDone, onFail); - internal::logoutOtherDCs(); -} - -void setGlobalDoneHandler(RPCDoneHandlerPtr handler) { - globalHandler.onDone = handler; -} - -void setGlobalFailHandler(RPCFailHandlerPtr handler) { - globalHandler.onFail = handler; -} - -void setStateChangedHandler(MTPStateChangedHandler handler) { - stateChangedHandler = handler; -} - -void setSessionResetHandler(MTPSessionResetHandler handler) { - sessionResetHandler = handler; -} - -void clearGlobalHandlers() { - setGlobalDoneHandler(RPCDoneHandlerPtr()); - setGlobalFailHandler(RPCFailHandlerPtr()); - setStateChangedHandler(0); - setSessionResetHandler(0); -} - -AuthKeysMap getKeys() { - return internal::getAuthKeys(); -} - -void setKey(int dc, const AuthKey::Data &key) { - auto dcId = MTP::bareDcId(dc); - auto keyPtr = std::make_shared(); - keyPtr->setDC(dcId); - keyPtr->setKey(key); - return internal::setAuthKey(dc, std::move(keyPtr)); +Instance *MainInstance() { + return Messenger::Instance().mtp(); } } // namespace MTP diff --git a/Telegram/SourceFiles/mtproto/facade.h b/Telegram/SourceFiles/mtproto/facade.h index ebe1cdeb2..1711ab336 100644 --- a/Telegram/SourceFiles/mtproto/facade.h +++ b/Telegram/SourceFiles/mtproto/facade.h @@ -23,58 +23,41 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "mtproto/core_types.h" #include "mtproto/session.h" #include "core/single_timer.h" +#include "mtproto/mtp_instance.h" namespace MTP { namespace internal { -Session *getSession(ShiftedDcId shiftedDcId); // 0 - current set dc - bool paused(); void pause(); void unpause(); -void registerRequest(mtpRequestId requestId, int32 dc); -void unregisterRequest(mtpRequestId requestId); - -mtpRequestId storeRequest(mtpRequest &request, const RPCResponseHandler &parser); -mtpRequest getRequest(mtpRequestId req); -void wrapInvokeAfter(mtpRequest &to, const mtpRequest &from, const mtpRequestMap &haveSent, int32 skipBeforeRequest = 0); -void clearCallbacks(mtpRequestId requestId, int32 errorCode = RPCError::NoError); // 0 - do not toggle onError callback -void clearCallbacksDelayed(const RPCCallbackClears &requestIds); -void performDelayedClear(); -void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end); -bool hasCallbacks(mtpRequestId requestId); -void globalCallback(const mtpPrime *from, const mtpPrime *end); -void onStateChange(int32 dcWithShift, int32 state); -void onSessionReset(int32 dcWithShift); -bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err); // return true if need to clean request data -inline bool rpcErrorOccured(mtpRequestId requestId, const RPCResponseHandler &handler, const RPCError &err) { - return rpcErrorOccured(requestId, handler.onFail, err); -} - -// used for: -// - resending requests by timer which were postponed by flood delay -// - destroying MTProtoConnections whose thread has finished -class GlobalSlotCarrier : public QObject { - Q_OBJECT +} // namespace internal +class PauseHolder { public: - GlobalSlotCarrier(); - -public slots: - void checkDelayed(); - void connectionFinished(Connection *connection); + PauseHolder() { + restart(); + } + void restart() { + if (!base::take(_paused, true)) { + internal::pause(); + } + } + void release() { + if (base::take(_paused)) { + internal::unpause(); + } + } + ~PauseHolder() { + release(); + } private: - SingleTimer _timer; + bool _paused = false; }; -GlobalSlotCarrier *globalSlotCarrier(); -void queueQuittingConnection(Connection *connection); - -} // namespace internal - constexpr ShiftedDcId DCShift = 10000; constexpr DcId bareDcId(ShiftedDcId shiftedDcId) { return (shiftedDcId % DCShift); @@ -135,115 +118,105 @@ constexpr bool isUplDcId(ShiftedDcId shiftedDcId) { return (shiftedDcId >= internal::uploadDcId(0, 0)) && (shiftedDcId < internal::uploadDcId(0, MTPUploadSessionsCount - 1) + DCShift); } -void start(); -bool started(); -void restart(); -void restart(int32 dcMask); - -class PauseHolder { -public: - PauseHolder() { - restart(); - } - void restart() { - if (!base::take(_paused, true)) { - internal::pause(); - } - } - void release() { - if (base::take(_paused)) { - internal::unpause(); - } - } - ~PauseHolder() { - release(); - } - -private: - bool _paused = false; - -}; - -void configure(int32 dc); - -void setdc(int32 dc, bool fromZeroOnly = false); -int32 maindc(); - enum { DisconnectedState = 0, ConnectingState = 1, ConnectedState = 2, }; -int32 dcstate(int32 dc = 0); -QString dctransport(int32 dc = 0); - -template -inline mtpRequestId send(const TRequest &request, RPCResponseHandler callbacks = RPCResponseHandler(), int32 dc = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) { - if (internal::Session *session = internal::getSession(dc)) { - return session->send(request, callbacks, msCanWait, true, !dc, after); - } - return 0; -} -template -inline mtpRequestId send(const TRequest &request, RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail = RPCFailHandlerPtr(), int32 dc = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) { - return send(request, RPCResponseHandler(onDone, onFail), dc, msCanWait, after); -} -inline void sendAnything(int32 dc = 0, TimeMs msCanWait = 0) { - if (auto session = internal::getSession(dc)) { - return session->sendAnything(msCanWait); - } -} -void ping(); -void cancel(mtpRequestId req); -void killSession(int32 dc); -void stopSession(int32 dc); enum { RequestSent = 0, RequestConnecting = 1, RequestSending = 2 }; -int32 state(mtpRequestId req); // < 0 means waiting for such count of ms -void finish(); +Instance *MainInstance(); -void logoutKeys(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail); +inline void restart() { + return MainInstance()->restart(); +} -void setGlobalDoneHandler(RPCDoneHandlerPtr handler); -void setGlobalFailHandler(RPCFailHandlerPtr handler); -void setStateChangedHandler(MTPStateChangedHandler handler); -void setSessionResetHandler(MTPSessionResetHandler handler); -void clearGlobalHandlers(); +inline void restart(ShiftedDcId shiftedDcId) { + return MainInstance()->restart(shiftedDcId); +} -AuthKeysMap getKeys(); -void setKey(int dc, const AuthKey::Data &key); +inline DcId maindc() { + return MainInstance()->mainDcId(); +} + +inline int32 dcstate(ShiftedDcId shiftedDcId = 0) { + if (auto instance = MainInstance()) { + return instance->dcstate(shiftedDcId); + } + return DisconnectedState; +} + +inline QString dctransport(ShiftedDcId shiftedDcId = 0) { + if (auto instance = MainInstance()) { + return instance->dctransport(shiftedDcId); + } + return QString(); +} + +template +inline mtpRequestId send(const TRequest &request, RPCResponseHandler callbacks = RPCResponseHandler(), ShiftedDcId dcId = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) { + return MainInstance()->send(request, std::move(callbacks), dcId, msCanWait, after); +} + +template +inline mtpRequestId send(const TRequest &request, RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail = RPCFailHandlerPtr(), ShiftedDcId dcId = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) { + return MainInstance()->send(request, std::move(onDone), std::move(onFail), dcId, msCanWait, after); +} + +inline void sendAnything(ShiftedDcId shiftedDcId = 0, TimeMs msCanWait = 0) { + return MainInstance()->sendAnything(shiftedDcId, msCanWait); +} + +inline void cancel(mtpRequestId requestId) { + return MainInstance()->cancel(requestId); +} + +inline void ping() { + return MainInstance()->ping(); +} + +inline void killSession(ShiftedDcId shiftedDcId) { + return MainInstance()->killSession(shiftedDcId); +} + +inline void stopSession(ShiftedDcId shiftedDcId) { + return MainInstance()->stopSession(shiftedDcId); +} + +inline int32 state(mtpRequestId requestId) { // < 0 means waiting for such count of ms + return MainInstance()->state(requestId); +} namespace internal { - template - mtpRequestId Session::send(const TRequest &request, RPCResponseHandler callbacks, TimeMs msCanWait, bool needsLayer, bool toMainDC, mtpRequestId after) { - mtpRequestId requestId = 0; - try { - uint32 requestSize = request.innerLength() >> 2; - mtpRequest reqSerialized(mtpRequestData::prepare(requestSize)); - request.write(*reqSerialized); +template +mtpRequestId Session::send(const TRequest &request, RPCResponseHandler callbacks, TimeMs msCanWait, bool needsLayer, bool toMainDC, mtpRequestId after) { + mtpRequestId requestId = 0; + try { + uint32 requestSize = request.innerLength() >> 2; + mtpRequest reqSerialized(mtpRequestData::prepare(requestSize)); + request.write(*reqSerialized); - DEBUG_LOG(("MTP Info: adding request to toSendMap, msCanWait %1").arg(msCanWait)); + DEBUG_LOG(("MTP Info: adding request to toSendMap, msCanWait %1").arg(msCanWait)); - reqSerialized->msDate = getms(true); // > 0 - can send without container - reqSerialized->needsLayer = needsLayer; - if (after) reqSerialized->after = MTP::internal::getRequest(after); - requestId = MTP::internal::storeRequest(reqSerialized, callbacks); + reqSerialized->msDate = getms(true); // > 0 - can send without container + reqSerialized->needsLayer = needsLayer; + if (after) reqSerialized->after = getRequest(after); + requestId = storeRequest(reqSerialized, callbacks); - sendPrepared(reqSerialized, msCanWait); - } catch (Exception &e) { - requestId = 0; - MTP::internal::rpcErrorOccured(requestId, callbacks, rpcClientError("NO_REQUEST_ID", QString("send() failed to queue request, exception: %1").arg(e.what()))); - } - if (requestId) MTP::internal::registerRequest(requestId, toMainDC ? -getDcWithShift() : getDcWithShift()); - return requestId; + sendPrepared(reqSerialized, msCanWait); + } catch (Exception &e) { + requestId = 0; + rpcErrorOccured(requestId, callbacks.onFail, rpcClientError("NO_REQUEST_ID", QString("send() failed to queue request, exception: %1").arg(e.what()))); } + if (requestId) registerRequest(requestId, toMainDC ? -getDcWithShift() : getDcWithShift()); + return requestId; +} } // namespace internal - } // namespace MTP diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.cpp b/Telegram/SourceFiles/mtproto/mtp_instance.cpp new file mode 100644 index 000000000..43b3f9c12 --- /dev/null +++ b/Telegram/SourceFiles/mtproto/mtp_instance.cpp @@ -0,0 +1,1237 @@ +/* +This file is part of Telegram Desktop, +the official desktop version of Telegram messaging app, see https://telegram.org + +Telegram Desktop is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +It is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +In addition, as a special exception, the copyright holders give permission +to link the code of portions of this program with the OpenSSL library. + +Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE +Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org +*/ +#include "stdafx.h" +#include "mtproto/mtp_instance.h" + +#include "mtproto/dc_options.h" +#include "localstorage.h" +#include "auth_session.h" +#include "messenger.h" +#include "mtproto/connection.h" + +namespace MTP { + +class Instance::Private { +public: + Private(Instance *instance, DcOptions *options); + + void start(Config &&config); + + void suggestMainDcId(DcId mainDcId); + void setMainDcId(DcId mainDcId); + DcId mainDcId() const; + + void setKeyForWrite(DcId dcId, const AuthKeyPtr &key); + AuthKeysMap getKeysForWrite() const; + + DcOptions *dcOptions(); + + void configLoadRequest(); + + void restart(); + void restart(ShiftedDcId shiftedDcId); + int32 dcstate(ShiftedDcId shiftedDcId = 0); + QString dctransport(ShiftedDcId shiftedDcId = 0); + void ping(); + void cancel(mtpRequestId requestId); + int32 state(mtpRequestId requestId); // < 0 means waiting for such count of ms + void killSession(ShiftedDcId shiftedDcId); + void stopSession(ShiftedDcId shiftedDcId); + void logout(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail); + + internal::DcenterPtr getDcById(DcId dcId); + void unpaused(); + + void queueQuittingConnection(std::unique_ptr connection); + void connectionFinished(internal::Connection *connection); + + void registerRequest(mtpRequestId requestId, int32 dcWithShift); + void unregisterRequest(mtpRequestId requestId); + mtpRequestId storeRequest(mtpRequest &request, const RPCResponseHandler &parser); + mtpRequest getRequest(mtpRequestId requestId); + void clearCallbacks(mtpRequestId requestId, int32 errorCode = RPCError::NoError); // 0 - do not toggle onError callback + void clearCallbacksDelayed(const RPCCallbackClears &requestIds); + void performDelayedClear(); + void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end); + bool hasCallbacks(mtpRequestId requestId); + void globalCallback(const mtpPrime *from, const mtpPrime *end); + + void onStateChange(ShiftedDcId dcWithShift, int32 state); + void onSessionReset(ShiftedDcId dcWithShift); + + // return true if need to clean request data + bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err); + inline bool rpcErrorOccured(mtpRequestId requestId, const RPCResponseHandler &handler, const RPCError &err) { + return rpcErrorOccured(requestId, handler.onFail, err); + } + + void setUpdatesHandler(RPCDoneHandlerPtr onDone); + void setGlobalFailHandler(RPCFailHandlerPtr onFail); + void setStateChangedHandler(base::lambda &&handler); + void setSessionResetHandler(base::lambda &&handler); + void clearGlobalHandlers(); + + internal::Session *getSession(ShiftedDcId shiftedDcId); + + ~Private(); + +private: + bool hasAuthorization(); + void importDone(const MTPauth_Authorization &result, mtpRequestId requestId); + bool importFail(const RPCError &error, mtpRequestId requestId); + void exportDone(const MTPauth_ExportedAuthorization &result, mtpRequestId requestId); + bool exportFail(const RPCError &error, mtpRequestId requestId); + bool onErrorDefault(mtpRequestId requestId, const RPCError &error); + + bool logoutGuestDone(mtpRequestId requestId); + + void configLoadDone(const MTPConfig &result); + bool configLoadFail(const RPCError &error); + + void checkDelayedRequests(); + + Instance *_instance = nullptr; + + DcOptions *_dcOptions = nullptr; + + DcId _mainDcId = Config::kDefaultMainDc; + bool _mainDcIdForced = false; + internal::DcenterMap _dcenters; + + internal::Session *_mainSession = nullptr; + std::map> _sessions; + + base::set_of_unique_ptr _quittingConnections; + + std::unique_ptr _configLoader; + + std::map _keysForWrite; + mutable QReadWriteLock _keysForWriteLock; + + std::map _logoutGuestRequestIds; + + // holds dcWithShift for request to this dc or -dc for request to main dc + std::map _requestsByDc; + QMutex _requestByDcLock; + + // holds target dcWithShift for auth export request + std::map _authExportRequests; + + std::map _parserMap; + QMutex _parserMapLock; + + std::map _requestMap; + QReadWriteLock _requestMapLock; + + std::deque> _delayedRequests; + + std::map _requestsDelays; + + std::set _badGuestDcRequests; + + std::map> _authWaiters; + + QMutex _toClearLock; + RPCCallbackClears _toClear; + + RPCResponseHandler _globalHandler; + base::lambda _stateChangedHandler; + base::lambda _sessionResetHandler; + + SingleTimer _checkDelayedTimer; + +}; + +Instance::Private::Private(Instance *instance, DcOptions *options) : _instance(instance) +, _dcOptions(options) { +} + +void Instance::Private::start(Config &&config) { + unixtimeInit(); + + for (auto &keyData : config.keys) { + auto dcId = keyData.first; + auto key = std::make_shared(); + key->setDC(dcId); + key->setKey(keyData.second); + + _keysForWrite[dcId] = key; + + auto dc = std::make_shared(_instance, dcId, std::move(key)); + _dcenters.emplace(dcId, std::move(dc)); + } + + if (config.mainDcId != Config::kNotSetMainDc) { + _mainDcId = config.mainDcId; + _mainDcIdForced = true; + } + if (_mainDcId != Config::kNoneMainDc) { + auto main = std::make_unique(_instance, _mainDcId); + _mainSession = main.get(); + auto newMainDcId = main->getDcWithShift(); + _sessions.emplace(newMainDcId, std::move(main)); + } + + _checkDelayedTimer.setTimeoutHandler([this] { + checkDelayedRequests(); + }); + + configLoadRequest(); +} + +void Instance::Private::suggestMainDcId(DcId mainDcId) { + if (_mainDcIdForced) return; + setMainDcId(mainDcId); +} + +void Instance::Private::setMainDcId(DcId mainDcId) { + if (!_mainSession) { + LOG(("MTP Error: attempting to change mainDcId in an MTP instance without main session.")); + return; + } + + _mainDcIdForced = true; + auto oldMainDcId = _mainSession->getDcWithShift(); + _mainDcId = mainDcId; + if (oldMainDcId != _mainDcId) { + killSession(oldMainDcId); + } + Local::writeMtpData(); +} + +DcId Instance::Private::mainDcId() const { + t_assert(_mainDcId != Config::kNoneMainDc); + return _mainDcId; +} + +void Instance::Private::configLoadRequest() { + if (_configLoader) { + return; + } + _configLoader = std::make_unique(_instance, rpcDone([this](const MTPConfig &result) { + configLoadDone(result); + }), rpcFail([this](const RPCError &error) { + return configLoadFail(error); + })); + _configLoader->load(); +} + +void Instance::Private::restart() { + for (auto &session : _sessions) { + session.second->restart(); + } +} + +void Instance::Private::restart(ShiftedDcId shiftedDcId) { + auto dcId = bareDcId(shiftedDcId); + for (auto &session : _sessions) { + if (bareDcId(session.second->getDcWithShift()) == dcId) { + session.second->restart(); + } + } +} + +int32 Instance::Private::dcstate(ShiftedDcId shiftedDcId) { + if (!shiftedDcId) { + t_assert(_mainSession != nullptr); + return _mainSession->getState(); + } + + if (!bareDcId(shiftedDcId)) { + t_assert(_mainSession != nullptr); + shiftedDcId += bareDcId(_mainSession->getDcWithShift()); + } + + auto it = _sessions.find(shiftedDcId); + if (it != _sessions.cend()) return it->second->getState(); + + return DisconnectedState; +} + +QString Instance::Private::dctransport(ShiftedDcId shiftedDcId) { + if (!shiftedDcId) { + t_assert(_mainSession != nullptr); + return _mainSession->transport(); + } + if (!bareDcId(shiftedDcId)) { + t_assert(_mainSession != nullptr); + shiftedDcId += bareDcId(_mainSession->getDcWithShift()); + } + + auto it = _sessions.find(shiftedDcId); + if (it != _sessions.cend()) { + return it->second->transport(); + } + + return QString(); +} + +void Instance::Private::ping() { + if (auto session = getSession(0)) { + session->ping(); + } +} + +void Instance::Private::cancel(mtpRequestId requestId) { + if (!requestId) return; + + mtpMsgId msgId = 0; + _requestsDelays.erase(requestId); + { + QWriteLocker locker(&_requestMapLock); + auto it = _requestMap.find(requestId); + if (it != _requestMap.end()) { + msgId = *(mtpMsgId*)(it->second->constData() + 4); + _requestMap.erase(it); + } + } + { + QMutexLocker locker(&_requestByDcLock); + auto it = _requestsByDc.find(requestId); + if (it != _requestsByDc.end()) { + if (auto session = getSession(qAbs(it->second))) { + session->cancel(requestId, msgId); + } + _requestsByDc.erase(it); + } + } + clearCallbacks(requestId); +} + +int32 Instance::Private::state(mtpRequestId requestId) { // < 0 means waiting for such count of ms + if (requestId > 0) { + QMutexLocker locker(&_requestByDcLock); + auto i = _requestsByDc.find(requestId); + if (i != _requestsByDc.end()) { + if (auto session = getSession(qAbs(i->second))) { + return session->requestState(requestId); + } + return MTP::RequestConnecting; + } + return MTP::RequestSent; + } + if (auto session = getSession(-requestId)) { + return session->requestState(0); + } + return MTP::RequestConnecting; +} + +void Instance::Private::killSession(ShiftedDcId shiftedDcId) { + auto it = _sessions.find(shiftedDcId); + if (it != _sessions.cend()) { + bool wasMain = (it->second.get() == _mainSession); + + it->second->kill(); + it->second.release()->deleteLater(); + _sessions.erase(it); + + if (wasMain) { + auto main = std::make_unique(_instance, _mainDcId); + _mainSession = main.get(); + auto newMainDcId = main->getDcWithShift(); + it = _sessions.find(newMainDcId); + if (it != _sessions.cend()) { + it->second->kill(); + it->second.release()->deleteLater(); + _sessions.erase(it); + } + _sessions.insert(std::make_pair(newMainDcId, std::move(main))); + } + } +} + +void Instance::Private::stopSession(ShiftedDcId shiftedDcId) { + auto it = _sessions.find(shiftedDcId); + if (it != _sessions.end()) { + if (it->second.get() != _mainSession) { // don't stop main session + it->second->stop(); + } + } +} + +void Instance::Private::logout(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail) { + _instance->send(MTPauth_LogOut(), onDone, onFail); + + auto dcIds = std::vector(); + { + QReadLocker lock(&_keysForWriteLock); + dcIds.reserve(_keysForWrite.size()); + for (auto &key : _keysForWrite) { + dcIds.push_back(key.first); + } + } + for (auto dcId : dcIds) { + if (dcId != mainDcId()) { + auto shiftedDcId = MTP::lgtDcId(dcId); + auto requestId = _instance->send(MTPauth_LogOut(), rpcDone([this](mtpRequestId requestId) { + logoutGuestDone(requestId); + }), rpcFail([this](mtpRequestId requestId) { + return logoutGuestDone(requestId); + }), shiftedDcId); + _logoutGuestRequestIds.insert(std::make_pair(shiftedDcId, requestId)); + } + } +} + +bool Instance::Private::logoutGuestDone(mtpRequestId requestId) { + for (auto i = _logoutGuestRequestIds.begin(), e = _logoutGuestRequestIds.end(); i != e; ++i) { + if (i->second == requestId) { + killSession(i->first); + _logoutGuestRequestIds.erase(i); + return true; + } + } + return false; +} + +internal::DcenterPtr Instance::Private::getDcById(DcId dcId) { + auto it = _dcenters.find(dcId); + if (it == _dcenters.cend()) { + auto result = std::make_shared(_instance, dcId, AuthKeyPtr()); + auto insert = std::make_pair(dcId, std::move(result)); + it = _dcenters.insert(std::move(insert)).first; + } + return it->second; +} + +void Instance::Private::setKeyForWrite(DcId dcId, const AuthKeyPtr &key) { + QWriteLocker lock(&_keysForWriteLock); + if (key) { + _keysForWrite[dcId] = key; + } else { + _keysForWrite.erase(dcId); + } +} + +AuthKeysMap Instance::Private::getKeysForWrite() const { + auto result = AuthKeysMap(); + QReadLocker lock(&_keysForWriteLock); + for (auto &key : _keysForWrite) { + result.push_back(key.second); + } + return result; +} + +DcOptions *Instance::Private::dcOptions() { + return _dcOptions; +} + +void Instance::Private::unpaused() { + for (auto &session : _sessions) { + session.second->unpaused(); + } +} + +void Instance::Private::queueQuittingConnection(std::unique_ptr connection) { + _quittingConnections.insert(std::move(connection)); +} + +void Instance::Private::connectionFinished(internal::Connection *connection) { + auto it = _quittingConnections.find(connection); + if (it != _quittingConnections.end()) { + _quittingConnections.erase(it); + } +} + +void Instance::Private::configLoadDone(const MTPConfig &result) { + _configLoader.reset(); + + if (result.type() != mtpc_config) { + LOG(("MTP Error: wrong config constructor: %1").arg(result.type())); + return; + } + auto &data = result.c_config(); + + DEBUG_LOG(("MTP Info: got config, chat_size_max: %1, date: %2, test_mode: %3, this_dc: %4, dc_options.length: %5").arg(data.vchat_size_max.v).arg(data.vdate.v).arg(mtpIsTrue(data.vtest_mode)).arg(data.vthis_dc.v).arg(data.vdc_options.c_vector().v.size())); + + if (data.vdc_options.c_vector().v.empty()) { + LOG(("MTP Error: config with empty dc_options received!")); + } else { + _dcOptions->setFromList(data.vdc_options); + } + + Global::SetChatSizeMax(data.vchat_size_max.v); + Global::SetMegagroupSizeMax(data.vmegagroup_size_max.v); + Global::SetForwardedCountMax(data.vforwarded_count_max.v); + Global::SetOnlineUpdatePeriod(data.vonline_update_period_ms.v); + Global::SetOfflineBlurTimeout(data.voffline_blur_timeout_ms.v); + Global::SetOfflineIdleTimeout(data.voffline_idle_timeout_ms.v); + Global::SetOnlineCloudTimeout(data.vonline_cloud_timeout_ms.v); + Global::SetNotifyCloudDelay(data.vnotify_cloud_delay_ms.v); + Global::SetNotifyDefaultDelay(data.vnotify_default_delay_ms.v); + Global::SetChatBigSize(data.vchat_big_size.v); // ? + Global::SetPushChatPeriod(data.vpush_chat_period_ms.v); // ? + Global::SetPushChatLimit(data.vpush_chat_limit.v); // ? + Global::SetSavedGifsLimit(data.vsaved_gifs_limit.v); + Global::SetEditTimeLimit(data.vedit_time_limit.v); // ? + Global::SetStickersRecentLimit(data.vstickers_recent_limit.v); + Global::SetPinnedDialogsCountMax(data.vpinned_dialogs_count_max.v); + + Local::writeSettings(); + + emit _instance->configLoaded(); +} + +bool Instance::Private::configLoadFail(const RPCError &error) { + if (isDefaultHandledError(error)) return false; + + // loadingConfig = false; + LOG(("MTP Error: failed to get config!")); + return false; +} + +void Instance::Private::checkDelayedRequests() { + auto now = getms(true); + while (!_delayedRequests.empty() && now >= _delayedRequests.front().second) { + auto requestId = _delayedRequests.front().first; + _delayedRequests.pop_front(); + + auto dcWithShift = ShiftedDcId(0); + { + QMutexLocker locker(&_requestByDcLock); + auto it = _requestsByDc.find(requestId); + if (it != _requestsByDc.cend()) { + dcWithShift = it->second; + } else { + LOG(("MTP Error: could not find request dc for delayed resend, requestId %1").arg(requestId)); + continue; + } + } + + auto request = mtpRequest(); + { + QReadLocker locker(&_requestMapLock); + auto it = _requestMap.find(requestId); + if (it == _requestMap.cend()) { + DEBUG_LOG(("MTP Error: could not find request %1").arg(requestId)); + continue; + } + request = it->second; + } + if (auto session = getSession(qAbs(dcWithShift))) { + session->sendPrepared(request); + } + } + + if (!_delayedRequests.empty()) { + _checkDelayedTimer.start(_delayedRequests.front().second - now); + } +} + +void Instance::Private::registerRequest(mtpRequestId requestId, int32 dcWithShift) { + { + QMutexLocker locker(&_requestByDcLock); + _requestsByDc.emplace(requestId, dcWithShift); + } + performDelayedClear(); // need to do it somewhere... +} + +void Instance::Private::unregisterRequest(mtpRequestId requestId) { + _requestsDelays.erase(requestId); + + { + QWriteLocker locker(&_requestMapLock); + _requestMap.erase(requestId); + } + + QMutexLocker locker(&_requestByDcLock); + _requestsByDc.erase(requestId); +} + +mtpRequestId Instance::Private::storeRequest(mtpRequest &request, const RPCResponseHandler &parser) { + mtpRequestId res = reqid(); + request->requestId = res; + if (parser.onDone || parser.onFail) { + QMutexLocker locker(&_parserMapLock); + _parserMap.emplace(res, parser); + } + { + QWriteLocker locker(&_requestMapLock); + _requestMap.emplace(res, request); + } + return res; +} + +mtpRequest Instance::Private::getRequest(mtpRequestId requestId) { + auto result = mtpRequest(); + { + QReadLocker locker(&_requestMapLock); + auto it = _requestMap.find(requestId); + if (it != _requestMap.cend()) { + result = it->second; + } + } + return result; +} + + +void Instance::Private::clearCallbacks(mtpRequestId requestId, int32 errorCode) { + RPCResponseHandler h; + bool found = false; + { + QMutexLocker locker(&_parserMapLock); + auto it = _parserMap.find(requestId); + if (it != _parserMap.end()) { + h = it->second; + found = true; + + _parserMap.erase(it); + } + } + if (errorCode && found) { + rpcErrorOccured(requestId, h, internal::rpcClientError("CLEAR_CALLBACK", QString("did not handle request %1, error code %2").arg(requestId).arg(errorCode))); + } +} + +void Instance::Private::clearCallbacksDelayed(const RPCCallbackClears &requestIds) { + uint32 idsCount = requestIds.size(); + if (!idsCount) return; + + if (cDebug()) { + QString idsStr = QString("%1").arg(requestIds[0].requestId); + for (uint32 i = 1; i < idsCount; ++i) { + idsStr += QString(", %1").arg(requestIds[i].requestId); + } + DEBUG_LOG(("RPC Info: clear callbacks delayed, msgIds: %1").arg(idsStr)); + } + + QMutexLocker lock(&_toClearLock); + uint32 toClearNow = _toClear.size(); + if (toClearNow) { + _toClear.resize(toClearNow + idsCount); + memcpy(_toClear.data() + toClearNow, requestIds.constData(), idsCount * sizeof(RPCCallbackClear)); + } else { + _toClear = requestIds; + } +} + +void Instance::Private::performDelayedClear() { + QMutexLocker lock(&_toClearLock); + if (!_toClear.isEmpty()) { + for (auto &clearRequest : _toClear) { + if (cDebug()) { + QMutexLocker locker(&_parserMapLock); + if (_parserMap.find(clearRequest.requestId) != _parserMap.end()) { + DEBUG_LOG(("RPC Info: clearing delayed callback %1, error code %2").arg(clearRequest.requestId).arg(clearRequest.errorCode)); + } + } + clearCallbacks(clearRequest.requestId, clearRequest.errorCode); + unregisterRequest(clearRequest.requestId); + } + _toClear.clear(); + } +} + +void Instance::Private::execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end) { + RPCResponseHandler h; + { + QMutexLocker locker(&_parserMapLock); + auto it = _parserMap.find(requestId); + if (it != _parserMap.cend()) { + h = it->second; + _parserMap.erase(it); + + DEBUG_LOG(("RPC Info: found parser for request %1, trying to parse response...").arg(requestId)); + } + } + if (h.onDone || h.onFail) { + try { + if (from >= end) throw mtpErrorInsufficient(); + + if (*from == mtpc_rpc_error) { + RPCError err(MTPRpcError(from, end)); + DEBUG_LOG(("RPC Info: error received, code %1, type %2, description: %3").arg(err.code()).arg(err.type()).arg(err.description())); + if (!rpcErrorOccured(requestId, h, err)) { + QMutexLocker locker(&_parserMapLock); + _parserMap.emplace(requestId, h); + return; + } + } else { + if (h.onDone) { + (*h.onDone)(requestId, from, end); + } + } + } catch (Exception &e) { + if (!rpcErrorOccured(requestId, h, internal::rpcClientError("RESPONSE_PARSE_FAILED", QString("exception text: ") + e.what()))) { + QMutexLocker locker(&_parserMapLock); + _parserMap.emplace(requestId, h); + return; + } + } + } else { + DEBUG_LOG(("RPC Info: parser not found for %1").arg(requestId)); + } + unregisterRequest(requestId); +} + +bool Instance::Private::hasCallbacks(mtpRequestId requestId) { + QMutexLocker locker(&_parserMapLock); + auto it = _parserMap.find(requestId); + return (it != _parserMap.cend()); +} + +void Instance::Private::globalCallback(const mtpPrime *from, const mtpPrime *end) { + if (_globalHandler.onDone) { + (*_globalHandler.onDone)(0, from, end); // some updates were received + } +} + +void Instance::Private::onStateChange(int32 dcWithShift, int32 state) { + if (_stateChangedHandler) { + _stateChangedHandler(dcWithShift, state); + } +} + +void Instance::Private::onSessionReset(int32 dcWithShift) { + if (_sessionResetHandler) { + _sessionResetHandler(dcWithShift); + } +} + +bool Instance::Private::rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err) { // return true if need to clean request data + if (isDefaultHandledError(err)) { + if (onFail && (*onFail)(requestId, err)) return true; + } + + if (onErrorDefault(requestId, err)) { + return false; + } + LOG(("RPC Error: request %1 got fail with code %2, error %3%4").arg(requestId).arg(err.code()).arg(err.type()).arg(err.description().isEmpty() ? QString() : QString(": %1").arg(err.description()))); + onFail && (*onFail)(requestId, err); + return true; +} + +bool Instance::Private::hasAuthorization() { + return (AuthSession::Current() != nullptr); +} + +void Instance::Private::importDone(const MTPauth_Authorization &result, mtpRequestId requestId) { + QMutexLocker locker1(&_requestByDcLock); + + auto it = _requestsByDc.find(requestId); + if (it == _requestsByDc.end()) { + LOG(("MTP Error: auth import request not found in requestsByDC, requestId: %1").arg(requestId)); + RPCError error(internal::rpcClientError("AUTH_IMPORT_FAIL", QString("did not find import request in requestsByDC, request %1").arg(requestId))); + if (_globalHandler.onFail && hasAuthorization()) { + (*_globalHandler.onFail)(requestId, error); // auth failed in main dc + } + return; + } + auto newdc = bareDcId(it->second); + + DEBUG_LOG(("MTP Info: auth import to dc %1 succeeded").arg(newdc)); + + auto &waiters = _authWaiters[newdc]; + if (waiters.size()) { + QReadLocker locker(&_requestMapLock); + for (auto waitedRequestId : waiters) { + auto it = _requestMap.find(waitedRequestId); + if (it == _requestMap.cend()) { + LOG(("MTP Error: could not find request %1 for resending").arg(waitedRequestId)); + continue; + } + auto dcWithShift = ShiftedDcId(newdc); + { + auto k = _requestsByDc.find(waitedRequestId); + if (k == _requestsByDc.cend()) { + LOG(("MTP Error: could not find request %1 by dc for resending").arg(waitedRequestId)); + continue; + } + if (k->second < 0) { + _instance->setMainDcId(newdc); + k->second = -newdc; + } else { + dcWithShift = shiftDcId(newdc, getDcIdShift(k->second)); + k->second = dcWithShift; + } + DEBUG_LOG(("MTP Info: resending request %1 to dc %2 after import auth").arg(waitedRequestId).arg(k->second)); + } + if (auto session = getSession(dcWithShift)) { + session->sendPrepared(it->second); + } + } + waiters.clear(); + } +} + +bool Instance::Private::importFail(const RPCError &error, mtpRequestId requestId) { + if (isDefaultHandledError(error)) return false; + + if (_globalHandler.onFail && hasAuthorization()) { + (*_globalHandler.onFail)(requestId, error); // auth import failed + } + return true; +} + +void Instance::Private::exportDone(const MTPauth_ExportedAuthorization &result, mtpRequestId requestId) { + auto it = _authExportRequests.find(requestId); + if (it == _authExportRequests.cend()) { + LOG(("MTP Error: auth export request target dcWithShift not found, requestId: %1").arg(requestId)); + RPCError error(internal::rpcClientError("AUTH_IMPORT_FAIL", QString("did not find target dcWithShift, request %1").arg(requestId))); + if (_globalHandler.onFail && hasAuthorization()) { + (*_globalHandler.onFail)(requestId, error); // auth failed in main dc + } + return; + } + + auto &data = result.c_auth_exportedAuthorization(); + _instance->send(MTPauth_ImportAuthorization(data.vid, data.vbytes), rpcDone([this](const MTPauth_Authorization &result, mtpRequestId requestId) { + importDone(result, requestId); + }), rpcFail([this](const RPCError &error, mtpRequestId requestId) { + return importFail(error, requestId); + }), it->second); + _authExportRequests.erase(requestId); +} + +bool Instance::Private::exportFail(const RPCError &error, mtpRequestId requestId) { + if (isDefaultHandledError(error)) return false; + + auto it = _authExportRequests.find(requestId); + if (it != _authExportRequests.cend()) { + _authWaiters[bareDcId(it->second)].clear(); + } + if (_globalHandler.onFail && hasAuthorization()) { + (*_globalHandler.onFail)(requestId, error); // auth failed in main dc + } + return true; +} + +bool Instance::Private::onErrorDefault(mtpRequestId requestId, const RPCError &error) { + auto &err(error.type()); + auto code = error.code(); + if (!isFloodError(error) && err != qstr("AUTH_KEY_UNREGISTERED")) { + int breakpoint = 0; + } + auto badGuestDc = (code == 400) && (err == qsl("FILE_ID_INVALID")); + QRegularExpressionMatch m; + if ((m = QRegularExpression("^(FILE|PHONE|NETWORK|USER)_MIGRATE_(\\d+)$").match(err)).hasMatch()) { + if (!requestId) return false; + + ShiftedDcId dcWithShift = 0, newdcWithShift = m.captured(2).toInt(); + { + QMutexLocker locker(&_requestByDcLock); + auto it = _requestsByDc.find(requestId); + if (it == _requestsByDc.end()) { + LOG(("MTP Error: could not find request %1 for migrating to %2").arg(requestId).arg(newdcWithShift)); + } else { + dcWithShift = it->second; + } + } + if (!dcWithShift || !newdcWithShift) return false; + + DEBUG_LOG(("MTP Info: changing request %1 from dcWithShift%2 to dc%3").arg(requestId).arg(dcWithShift).arg(newdcWithShift)); + if (dcWithShift < 0) { // newdc shift = 0 + if (false && hasAuthorization() && _authExportRequests.find(requestId) == _authExportRequests.cend()) { + // + // migrate not supported at this moment + // this was not tested even once + // + //DEBUG_LOG(("MTP Info: importing auth to dc %1").arg(newdcWithShift)); + //auto &waiters(_authWaiters[newdcWithShift]); + //if (waiters.empty()) { + // auto exportRequestId = _instance->send(MTPauth_ExportAuthorization(MTP_int(newdcWithShift)), rpcDone([this](const MTPauth_ExportedAuthorization &result, mtpRequestId requestId) { + // exportDone(result, requestId); + // }), rpcFail([this](const RPCError &error, mtpRequestId requestId) { + // return exportFail(error, requestId); + // })); + // _authExportRequests.emplace(exportRequestId, newdcWithShift); + //} + //waiters.push_back(requestId); + //return true; + } else { + _instance->setMainDcId(newdcWithShift); + } + } else { + newdcWithShift = shiftDcId(newdcWithShift, getDcIdShift(dcWithShift)); + } + + auto request = mtpRequest(); + { + QReadLocker locker(&_requestMapLock); + auto it = _requestMap.find(requestId); + if (it == _requestMap.cend()) { + LOG(("MTP Error: could not find request %1").arg(requestId)); + return false; + } + request = it->second; + } + if (auto session = getSession(newdcWithShift)) { + registerRequest(requestId, (dcWithShift < 0) ? -newdcWithShift : newdcWithShift); + session->sendPrepared(request); + } + return true; + } else if (code < 0 || code >= 500 || (m = QRegularExpression("^FLOOD_WAIT_(\\d+)$").match(err)).hasMatch()) { + if (!requestId) return false; + + int32 secs = 1; + if (code < 0 || code >= 500) { + auto it = _requestsDelays.find(requestId); + if (it != _requestsDelays.cend()) { + secs = (it->second > 60) ? it->second : (it->second *= 2); + } else { + _requestsDelays.emplace(requestId, secs); + } + } else { + secs = m.captured(1).toInt(); +// if (secs >= 60) return false; + } + auto sendAt = getms(true) + secs * 1000 + 10; + auto it = _delayedRequests.begin(), e = _delayedRequests.end(); + for (; it != e; ++it) { + if (it->first == requestId) return true; + if (it->second > sendAt) break; + } + _delayedRequests.insert(it, std::make_pair(requestId, sendAt)); + + checkDelayedRequests(); + + return true; + } else if (code == 401 || (badGuestDc && _badGuestDcRequests.find(requestId) == _badGuestDcRequests.cend())) { + auto dcWithShift = ShiftedDcId(0); + { + QMutexLocker locker(&_requestByDcLock); + auto it = _requestsByDc.find(requestId); + if (it != _requestsByDc.end()) { + dcWithShift = it->second; + } else { + LOG(("MTP Error: unauthorized request without dc info, requestId %1").arg(requestId)); + } + } + auto newdc = bareDcId(qAbs(dcWithShift)); + if (!newdc || newdc == mainDcId() || !hasAuthorization()) { + if (!badGuestDc && _globalHandler.onFail) { + (*_globalHandler.onFail)(requestId, error); // auth failed in main dc + } + return false; + } + + DEBUG_LOG(("MTP Info: importing auth to dcWithShift %1").arg(dcWithShift)); + auto &waiters(_authWaiters[newdc]); + if (!waiters.size()) { + auto exportRequestId = _instance->send(MTPauth_ExportAuthorization(MTP_int(newdc)), rpcDone([this](const MTPauth_ExportedAuthorization &result, mtpRequestId requestId) { + exportDone(result, requestId); + }), rpcFail([this](const RPCError &error, mtpRequestId requestId) { + return exportFail(error, requestId); + })); + _authExportRequests.emplace(exportRequestId, abs(dcWithShift)); + } + waiters.push_back(requestId); + if (badGuestDc) _badGuestDcRequests.insert(requestId); + return true; + } else if (err == qstr("CONNECTION_NOT_INITED") || err == qstr("CONNECTION_LAYER_INVALID")) { + mtpRequest request; + { + QReadLocker locker(&_requestMapLock); + auto it = _requestMap.find(requestId); + if (it == _requestMap.cend()) { + LOG(("MTP Error: could not find request %1").arg(requestId)); + return false; + } + request = it->second; + } + auto dcWithShift = ShiftedDcId(0); + { + QMutexLocker locker(&_requestByDcLock); + auto it = _requestsByDc.find(requestId); + if (it == _requestsByDc.end()) { + LOG(("MTP Error: could not find request %1 for resending with init connection").arg(requestId)); + } else { + dcWithShift = it->second; + } + } + if (!dcWithShift) return false; + + if (auto session = getSession(qAbs(dcWithShift))) { + request->needsLayer = true; + session->sendPrepared(request); + } + return true; + } else if (err == qstr("MSG_WAIT_FAILED")) { + mtpRequest request; + { + QReadLocker locker(&_requestMapLock); + auto it = _requestMap.find(requestId); + if (it == _requestMap.cend()) { + LOG(("MTP Error: could not find request %1").arg(requestId)); + return false; + } + request = it->second; + } + if (!request->after) { + LOG(("MTP Error: wait failed for not dependent request %1").arg(requestId)); + return false; + } + auto dcWithShift = ShiftedDcId(0); + { + QMutexLocker locker(&_requestByDcLock); + auto it = _requestsByDc.find(requestId); + auto afterIt = _requestsByDc.find(request->after->requestId); + if (it == _requestsByDc.end()) { + LOG(("MTP Error: could not find request %1 by dc").arg(requestId)); + } else if (afterIt == _requestsByDc.end()) { + LOG(("MTP Error: could not find dependent request %1 by dc").arg(request->after->requestId)); + } else { + dcWithShift = it->second; + if (it->second != afterIt->second) { + request->after = mtpRequest(); + } + } + } + if (!dcWithShift) return false; + + if (!request->after) { + if (auto session = getSession(qAbs(dcWithShift))) { + request->needsLayer = true; + session->sendPrepared(request); + } + } else { + auto newdc = bareDcId(qAbs(dcWithShift)); + auto &waiters(_authWaiters[newdc]); + if (base::contains(waiters, request->after->requestId)) { + if (!base::contains(waiters, requestId)) { + waiters.push_back(requestId); + } + if (_badGuestDcRequests.find(request->after->requestId) != _badGuestDcRequests.cend()) { + if (_badGuestDcRequests.find(requestId) == _badGuestDcRequests.cend()) { + _badGuestDcRequests.insert(requestId); + } + } + } else { + auto i = _delayedRequests.begin(), e = _delayedRequests.end(); + for (; i != e; ++i) { + if (i->first == requestId) return true; + if (i->first == request->after->requestId) break; + } + if (i != e) { + _delayedRequests.insert(i, std::make_pair(requestId, i->second)); + } + + checkDelayedRequests(); + } + } + return true; + } + if (badGuestDc) _badGuestDcRequests.erase(requestId); + return false; +} + +internal::Session *Instance::Private::getSession(ShiftedDcId shiftedDcId) { + if (!shiftedDcId) { + t_assert(_mainSession != nullptr); + return _mainSession; + } + if (!bareDcId(shiftedDcId)) { + t_assert(_mainSession != nullptr); + shiftedDcId += bareDcId(_mainSession->getDcWithShift()); + } + + auto it = _sessions.find(shiftedDcId); + if (it == _sessions.cend()) { + it = _sessions.emplace(shiftedDcId, std::make_unique(_instance, shiftedDcId)).first; + } + return it->second.get(); +} + +void Instance::Private::setUpdatesHandler(RPCDoneHandlerPtr onDone) { + _globalHandler.onDone = onDone; +} + +void Instance::Private::setGlobalFailHandler(RPCFailHandlerPtr onFail) { + _globalHandler.onFail = onFail; +} + +void Instance::Private::setStateChangedHandler(base::lambda &&handler) { + _stateChangedHandler = std::move(handler); +} + +void Instance::Private::setSessionResetHandler(base::lambda &&handler) { + _sessionResetHandler = std::move(handler); +} + +void Instance::Private::clearGlobalHandlers() { + setUpdatesHandler(RPCDoneHandlerPtr()); + setGlobalFailHandler(RPCFailHandlerPtr()); + setStateChangedHandler(base::lambda()); + setSessionResetHandler(base::lambda()); +} + +Instance::Private::~Private() { + for (auto &session : base::take(_sessions)) { + session.second->kill(); + } +} + +Instance::Instance(DcOptions *options, Config &&config) : QObject() +, _private(std::make_unique(this, options)) { + _private->start(std::move(config)); +} + +void Instance::suggestMainDcId(DcId mainDcId) { + _private->suggestMainDcId(mainDcId); +} + +void Instance::setMainDcId(DcId mainDcId) { + _private->setMainDcId(mainDcId); +} + +DcId Instance::mainDcId() const { + return _private->mainDcId(); +} + +void Instance::configLoadRequest() { + _private->configLoadRequest(); +} + +void Instance::connectionFinished(internal::Connection *connection) { + _private->connectionFinished(connection); +} + +void Instance::restart() { + _private->restart(); +} + +void Instance::restart(ShiftedDcId shiftedDcId) { + _private->restart(shiftedDcId); +} + +int32 Instance::dcstate(ShiftedDcId shiftedDcId) { + return _private->dcstate(shiftedDcId); +} + +QString Instance::dctransport(ShiftedDcId shiftedDcId) { + return _private->dctransport(shiftedDcId); +} + +void Instance::ping() { + _private->ping(); +} + +void Instance::cancel(mtpRequestId requestId) { + _private->cancel(requestId); +} + +int32 Instance::state(mtpRequestId requestId) { // < 0 means waiting for such count of ms + return _private->state(requestId); +} + +void Instance::killSession(ShiftedDcId shiftedDcId) { + _private->killSession(shiftedDcId); +} + +void Instance::stopSession(ShiftedDcId shiftedDcId) { + _private->stopSession(shiftedDcId); +} + +void Instance::logout(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail) { + _private->logout(onDone, onFail); +} + +internal::DcenterPtr Instance::getDcById(DcId dcId) { + return _private->getDcById(dcId); +} + +void Instance::setKeyForWrite(DcId dcId, const AuthKeyPtr &key) { + _private->setKeyForWrite(dcId, key); +} + +AuthKeysMap Instance::getKeysForWrite() const { + return _private->getKeysForWrite(); +} + +DcOptions *Instance::dcOptions() { + return _private->dcOptions(); +} + +void Instance::unpaused() { + _private->unpaused(); +} + +void Instance::queueQuittingConnection(std::unique_ptr connection) { + _private->queueQuittingConnection(std::move(connection)); +} + +void Instance::setUpdatesHandler(RPCDoneHandlerPtr onDone) { + _private->setUpdatesHandler(onDone); +} + +void Instance::setGlobalFailHandler(RPCFailHandlerPtr onFail) { + _private->setGlobalFailHandler(onFail); +} + +void Instance::setStateChangedHandler(base::lambda &&handler) { + _private->setStateChangedHandler(std::move(handler)); +} + +void Instance::setSessionResetHandler(base::lambda &&handler) { + _private->setSessionResetHandler(std::move(handler)); +} + +void Instance::clearGlobalHandlers() { + _private->clearGlobalHandlers(); +} + +void Instance::onStateChange(ShiftedDcId dcWithShift, int32 state) { + _private->onStateChange(dcWithShift, state); +} + +void Instance::onSessionReset(ShiftedDcId dcWithShift) { + _private->onSessionReset(dcWithShift); +} + +void Instance::registerRequest(mtpRequestId requestId, ShiftedDcId dcWithShift) { + _private->registerRequest(requestId, dcWithShift); +} + +mtpRequestId Instance::storeRequest(mtpRequest &request, const RPCResponseHandler &parser) { + return _private->storeRequest(request, parser); +} + +mtpRequest Instance::getRequest(mtpRequestId requestId) { + return _private->getRequest(requestId); +} + +void Instance::clearCallbacksDelayed(const RPCCallbackClears &requestIds) { + _private->clearCallbacksDelayed(requestIds); +} + +void Instance::execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end) { + _private->execCallback(requestId, from, end); +} + +bool Instance::hasCallbacks(mtpRequestId requestId) { + return _private->hasCallbacks(requestId); +} + +void Instance::globalCallback(const mtpPrime *from, const mtpPrime *end) { + _private->globalCallback(from, end); +} + +bool Instance::rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err) { + return _private->rpcErrorOccured(requestId, onFail, err); +} + +internal::Session *Instance::getSession(ShiftedDcId shiftedDcId) { + return _private->getSession(shiftedDcId); +} + +Instance::~Instance() = default; + +} // namespace MTP diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.h b/Telegram/SourceFiles/mtproto/mtp_instance.h new file mode 100644 index 000000000..82e5a05fc --- /dev/null +++ b/Telegram/SourceFiles/mtproto/mtp_instance.h @@ -0,0 +1,131 @@ +/* +This file is part of Telegram Desktop, +the official desktop version of Telegram messaging app, see https://telegram.org + +Telegram Desktop is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +It is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +In addition, as a special exception, the copyright holders give permission +to link the code of portions of this program with the OpenSSL library. + +Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE +Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org +*/ +#pragma once + +#include "mtproto/dcenter.h" +#include +#include + +namespace MTP { + +class DcOptions; +class Session; + +class Instance : public QObject { + Q_OBJECT + +public: + struct Config { + static constexpr auto kNoneMainDc = -1; + static constexpr auto kNotSetMainDc = 0; + static constexpr auto kDefaultMainDc = 2; + + DcId mainDcId = kNotSetMainDc; + std::map keys; + }; + Instance(DcOptions *options, Config &&config); + + Instance(const Instance &other) = delete; + Instance &operator=(const Instance &other) = delete; + + void suggestMainDcId(DcId mainDcId); + void setMainDcId(DcId mainDcId); + DcId mainDcId() const; + + void Instance::setKeyForWrite(DcId dcId, const AuthKeyPtr &key); + AuthKeysMap Instance::getKeysForWrite() const; + + DcOptions *dcOptions(); + + template + mtpRequestId send(const TRequest &request, RPCResponseHandler callbacks = RPCResponseHandler(), ShiftedDcId dcId = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) { + if (auto session = getSession(dcId)) { + return session->send(request, callbacks, msCanWait, true, !dcId, after); + } + return 0; + } + + template + mtpRequestId send(const TRequest &request, RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail = RPCFailHandlerPtr(), int32 dc = 0, TimeMs msCanWait = 0, mtpRequestId after = 0) { + return send(request, RPCResponseHandler(onDone, onFail), dc, msCanWait, after); + } + + void sendAnything(ShiftedDcId dcId = 0, TimeMs msCanWait = 0) { + if (auto session = getSession(dcId)) { + session->sendAnything(msCanWait); + } + } + + void restart(); + void restart(ShiftedDcId shiftedDcId); + int32 dcstate(ShiftedDcId shiftedDcId = 0); + QString dctransport(ShiftedDcId shiftedDcId = 0); + void ping(); + void cancel(mtpRequestId requestId); + int32 state(mtpRequestId requestId); // < 0 means waiting for such count of ms + void killSession(ShiftedDcId shiftedDcId); + void stopSession(ShiftedDcId shiftedDcId); + void logout(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail); + + internal::DcenterPtr getDcById(DcId dcId); + void unpaused(); + + void queueQuittingConnection(std::unique_ptr connection); + + void setUpdatesHandler(RPCDoneHandlerPtr onDone); + void setGlobalFailHandler(RPCFailHandlerPtr onFail); + void setStateChangedHandler(base::lambda &&handler); + void setSessionResetHandler(base::lambda &&handler); + void clearGlobalHandlers(); + + void onStateChange(ShiftedDcId dcWithShift, int32 state); + void onSessionReset(ShiftedDcId dcWithShift); + + void registerRequest(mtpRequestId requestId, ShiftedDcId dcWithShift); + mtpRequestId storeRequest(mtpRequest &request, const RPCResponseHandler &parser); + mtpRequest getRequest(mtpRequestId requestId); + void clearCallbacksDelayed(const RPCCallbackClears &requestIds); + + void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end); + bool hasCallbacks(mtpRequestId requestId); + void globalCallback(const mtpPrime *from, const mtpPrime *end); + + // return true if need to clean request data + bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err); + + ~Instance(); + +public slots: + void configLoadRequest(); + void connectionFinished(internal::Connection *connection); + +signals: + void configLoaded(); + +private: + internal::Session *getSession(ShiftedDcId shiftedDcId); + + class Private; + const std::unique_ptr _private; + +}; + +} // namespace MTP diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp index 70bc96afa..dbe149877 100644 --- a/Telegram/SourceFiles/mtproto/session.cpp +++ b/Telegram/SourceFiles/mtproto/session.cpp @@ -19,13 +19,14 @@ Full license: https://github.com/telegramdesktop/tdesktop/blob/master/LICENSE Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org */ #include "stdafx.h" - #include "mtproto/session.h" +#include "mtproto/connection.h" + namespace MTP { namespace internal { -void SessionData::clear() { +void SessionData::clear(Instance *instance) { RPCCallbackClears clearCallbacks; { QReadLocker locker1(haveSentMutex()), locker2(toResendMutex()), locker3(haveReceivedMutex()), locker4(wereAckedMutex()); @@ -66,42 +67,23 @@ void SessionData::clear() { QWriteLocker locker(receivedIdsMutex()); receivedIds.clear(); } - clearCallbacksDelayed(clearCallbacks); + instance->clearCallbacksDelayed(clearCallbacks); } -Session::Session(int32 requestedDcId) : QObject() -, _connection(0) -, _killed(false) -, _needToReceive(false) -, data(this) -, dcWithShift(0) -, dc(0) -, msSendCall(0) -, msWait(0) -, _ping(false) { - if (_killed) { - DEBUG_LOG(("Session Error: can't start a killed session")); - return; - } - if (dcWithShift) { - DEBUG_LOG(("Session Info: Session::start called on already started session")); - return; - } - - msSendCall = msWait = 0; - +Session::Session(Instance *instance, ShiftedDcId requestedShiftedDcId) : QObject() +, _instance(instance) +, data(this) { connect(&timeouter, SIGNAL(timeout()), this, SLOT(checkRequestsByTimer())); timeouter.start(1000); connect(&sender, SIGNAL(timeout()), this, SLOT(needToResumeAndSend())); - _connection = new Connection(); - dcWithShift = _connection->prepare(&data, requestedDcId); + _connection = std::make_unique(_instance); + dcWithShift = _connection->prepare(&data, requestedShiftedDcId); if (!dcWithShift) { - delete _connection; - _connection = 0; - DEBUG_LOG(("Session Info: could not start connection to dc %1").arg(requestedDcId)); + _connection.reset(); + DEBUG_LOG(("Session Info: could not start connection to dc %1").arg(requestedShiftedDcId)); return; } createDcData(); @@ -112,24 +94,33 @@ void Session::createDcData() { if (dc) { return; } - int32 dcId = bareDcId(dcWithShift); + auto dcId = bareDcId(dcWithShift); - auto &dcs = DCMap(); - auto dcIndex = dcs.constFind(dcId); - if (dcIndex == dcs.cend()) { - dc = DcenterPtr(new Dcenter(dcId, AuthKeyPtr())); - dcs.insert(dcId, dc); - } else { - dc = dcIndex.value(); - } + dc = _instance->getDcById(dcId); ReadLockerAttempt lock(keyMutex()); data.setKey(lock ? dc->getKey() : AuthKeyPtr()); if (lock && dc->connectionInited()) { data.setLayerWasInited(true); } - connect(dc.data(), SIGNAL(authKeyCreated()), this, SLOT(authKeyCreatedForDC()), Qt::QueuedConnection); - connect(dc.data(), SIGNAL(layerWasInited(bool)), this, SLOT(layerWasInitedForDC(bool)), Qt::QueuedConnection); + connect(dc.get(), SIGNAL(authKeyCreated()), this, SLOT(authKeyCreatedForDC()), Qt::QueuedConnection); + connect(dc.get(), SIGNAL(layerWasInited(bool)), this, SLOT(layerWasInitedForDC(bool)), Qt::QueuedConnection); +} + +void Session::registerRequest(mtpRequestId requestId, ShiftedDcId dcWithShift) { + return _instance->registerRequest(requestId, dcWithShift); +} + +mtpRequestId Session::storeRequest(mtpRequest &request, const RPCResponseHandler &parser) { + return _instance->storeRequest(request, parser); +} + +mtpRequest Session::getRequest(mtpRequestId requestId) { + return _instance->getRequest(requestId); +} + +bool Session::rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err) { // return true if need to clean request data + return _instance->rpcErrorOccured(requestId, onFail, err); } void Session::restart() { @@ -148,7 +139,7 @@ void Session::stop() { DEBUG_LOG(("Session Info: stopping session dcWithShift %1").arg(dcWithShift)); if (_connection) { _connection->kill(); - _connection = 0; + _instance->queueQuittingConnection(std::move(_connection)); } } @@ -202,12 +193,9 @@ void Session::needToResumeAndSend() { } if (!_connection) { DEBUG_LOG(("Session Info: resuming session dcWithShift %1").arg(dcWithShift)); - DcenterMap &dcs(DCMap()); - - _connection = new Connection(); + _connection = std::make_unique(_instance); if (!_connection->prepare(&data, dcWithShift)) { - delete _connection; - _connection = 0; + _connection.reset(); DEBUG_LOG(("Session Info: could not start connection to dcWithShift %1").arg(dcWithShift)); dcWithShift = 0; return; @@ -298,16 +286,16 @@ void Session::checkRequestsByTimer() { } } } - clearCallbacksDelayed(clearCallbacks); + _instance->clearCallbacksDelayed(clearCallbacks); } } void Session::onConnectionStateChange(qint32 newState) { - onStateChange(dcWithShift, newState); + _instance->onStateChange(dcWithShift, newState); } void Session::onResetDone() { - onSessionReset(dcWithShift); + _instance->onSessionReset(dcWithShift); } void Session::cancel(mtpRequestId requestId, mtpMsgId msgId) { @@ -473,9 +461,9 @@ void Session::authKeyCreatedForDC() { emit authKeyCreated(); } -void Session::notifyKeyCreated(const AuthKeyPtr &key) { +void Session::notifyKeyCreated(AuthKeyPtr &&key) { DEBUG_LOG(("AuthKey Info: Session::keyCreated(), setting, dcWithShift %1").arg(dcWithShift)); - dc->setKey(key); + dc->setKey(std::move(key)); } void Session::layerWasInitedForDC(bool wasInited) { @@ -530,17 +518,17 @@ void Session::tryToReceive() { } if (requestId <= 0) { if (dcWithShift == bareDcId(dcWithShift)) { // call globalCallback only in main session - globalCallback(response.constData(), response.constData() + response.size()); + _instance->globalCallback(response.constData(), response.constData() + response.size()); } } else { - execCallback(requestId, response.constData(), response.constData() + response.size()); + _instance->execCallback(requestId, response.constData(), response.constData() + response.size()); } ++cnt; } } Session::~Session() { - t_assert(_connection == 0); + t_assert(_connection == nullptr); } MTPrpcError rpcClientError(const QString &type, const QString &description) { diff --git a/Telegram/SourceFiles/mtproto/session.h b/Telegram/SourceFiles/mtproto/session.h index 02ddb7eaa..9136bd65d 100644 --- a/Telegram/SourceFiles/mtproto/session.h +++ b/Telegram/SourceFiles/mtproto/session.h @@ -20,14 +20,17 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org */ #pragma once -#include "mtproto/connection.h" #include "mtproto/dcenter.h" -#include "mtproto/rpc_sender.h" #include "core/single_timer.h" namespace MTP { + +class Instance; + namespace internal { +class Connection; + class ReceivedMsgIds { public: bool registerMsgId(mtpMsgId msgId, bool needAck) { @@ -236,7 +239,7 @@ public: return result * 2 + (needAck ? 1 : 0); } - void clear(); + void clear(Instance *instance); private: uint64 _session = 0; @@ -275,7 +278,7 @@ class Session : public QObject { Q_OBJECT public: - Session(int32 dcenter); + Session(Instance *instance, ShiftedDcId requestedShiftedDcId); void restart(); void stop(); @@ -283,10 +286,10 @@ public: void unpaused(); - int32 getDcWithShift() const; + ShiftedDcId getDcWithShift() const; QReadWriteLock *keyMutex() const; - void notifyKeyCreated(const AuthKeyPtr &key); + void notifyKeyCreated(AuthKeyPtr &&key); void destroyKey(); void notifyLayerInited(bool wasInited); @@ -331,19 +334,26 @@ public slots: private: void createDcData(); - Connection *_connection; + void registerRequest(mtpRequestId requestId, ShiftedDcId dcWithShift); + mtpRequestId storeRequest(mtpRequest &request, const RPCResponseHandler &parser); + mtpRequest getRequest(mtpRequestId requestId); + bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err); - bool _killed; - bool _needToReceive; + Instance *_instance; + std::unique_ptr _connection; + + bool _killed = false; + bool _needToReceive = false; SessionData data; - int32 dcWithShift; + ShiftedDcId dcWithShift = 0; DcenterPtr dc; - TimeMs msSendCall, msWait; + TimeMs msSendCall = 0; + TimeMs msWait = 0; - bool _ping; + bool _ping = false; QTimer timeouter; SingleTimer sender; diff --git a/Telegram/SourceFiles/passcodewidget.cpp b/Telegram/SourceFiles/passcodewidget.cpp index 932c3cd2b..f9983fd78 100644 --- a/Telegram/SourceFiles/passcodewidget.cpp +++ b/Telegram/SourceFiles/passcodewidget.cpp @@ -72,7 +72,7 @@ void PasscodeWidget::onSubmit() { if (Local::readMap(_passcode->text().toUtf8()) != Local::ReadMapPassNeeded) { cSetPasscodeBadTries(0); - MTP::start(); + Messenger::Instance().startMtp(); if (AuthSession::Current()) { App::wnd()->setupMain(); } else { diff --git a/Telegram/SourceFiles/structs.h b/Telegram/SourceFiles/structs.h index 12a74af5f..bbee9c0ca 100644 --- a/Telegram/SourceFiles/structs.h +++ b/Telegram/SourceFiles/structs.h @@ -186,9 +186,9 @@ static const NotifySettingsPtr EmptyNotifySettings = NotifySettingsPtr(1); extern NotifySettings globalNotifyAll, globalNotifyUsers, globalNotifyChats; extern NotifySettingsPtr globalNotifyAllPtr, globalNotifyUsersPtr, globalNotifyChatsPtr; -inline bool isNotifyMuted(NotifySettingsPtr settings, TimeId *changeIn = 0) { +inline bool isNotifyMuted(NotifySettingsPtr settings, TimeId *changeIn = nullptr) { if (settings != UnknownNotifySettings && settings != EmptyNotifySettings) { - TimeId t = unixtime(); + auto t = unixtime(); if (settings->mute > t) { if (changeIn) *changeIn = settings->mute - t + 1; return true; diff --git a/Telegram/gyp/Telegram.gyp b/Telegram/gyp/Telegram.gyp index 2da26f0bc..53aa6bf99 100644 --- a/Telegram/gyp/Telegram.gyp +++ b/Telegram/gyp/Telegram.gyp @@ -323,8 +323,6 @@ '<(src_loc)/media/media_clip_qtgif.h', '<(src_loc)/media/media_clip_reader.cpp', '<(src_loc)/media/media_clip_reader.h', - '<(src_loc)/mtproto/facade.cpp', - '<(src_loc)/mtproto/facade.h', '<(src_loc)/mtproto/auth_key.cpp', '<(src_loc)/mtproto/auth_key.h', '<(src_loc)/mtproto/connection.cpp', @@ -343,8 +341,12 @@ '<(src_loc)/mtproto/dcenter.h', '<(src_loc)/mtproto/dc_options.cpp', '<(src_loc)/mtproto/dc_options.h', + '<(src_loc)/mtproto/facade.cpp', + '<(src_loc)/mtproto/facade.h', '<(src_loc)/mtproto/file_download.cpp', '<(src_loc)/mtproto/file_download.h', + '<(src_loc)/mtproto/mtp_instance.cpp', + '<(src_loc)/mtproto/mtp_instance.h', '<(src_loc)/mtproto/rsa_public_key.cpp', '<(src_loc)/mtproto/rsa_public_key.h', '<(src_loc)/mtproto/rpc_sender.cpp',