From 06c724df016af312a484f6d6fb075c9431bcae33 Mon Sep 17 00:00:00 2001 From: John Preston <johnprestonmail@gmail.com> Date: Tue, 2 Jan 2018 16:44:12 +0300 Subject: [PATCH] Clear callbacks async in MTP::Instance. Also fix previous build. --- Telegram/SourceFiles/history/history_item.cpp | 1 + Telegram/SourceFiles/mtproto/connection.cpp | 60 ++++++++---- Telegram/SourceFiles/mtproto/mtp_instance.cpp | 98 ++++++++++--------- Telegram/SourceFiles/mtproto/mtp_instance.h | 2 +- Telegram/SourceFiles/mtproto/rpc_sender.h | 6 +- Telegram/SourceFiles/mtproto/session.cpp | 12 +-- 6 files changed, 103 insertions(+), 76 deletions(-) diff --git a/Telegram/SourceFiles/history/history_item.cpp b/Telegram/SourceFiles/history/history_item.cpp index ede83f1f0..925451b6a 100644 --- a/Telegram/SourceFiles/history/history_item.cpp +++ b/Telegram/SourceFiles/history/history_item.cpp @@ -41,6 +41,7 @@ Copyright (c) 2014-2017 John Preston, https://desktop.telegram.org #include "messenger.h" #include "mainwindow.h" #include "window/window_controller.h" +#include "core/crash_reports.h" namespace { diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index 27e440b31..e61337379 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -1580,10 +1580,17 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr _pingId = 0; } int32 errorCode = data.verror_code.v; - if (errorCode == 16 || errorCode == 17 || errorCode == 32 || errorCode == 33 || errorCode == 64) { // can handle - bool needResend = (errorCode == 16 || errorCode == 17); // bad msg_id + if (false + || errorCode == 16 + || errorCode == 17 + || errorCode == 32 + || errorCode == 33 + || errorCode == 64) { // can handle + const auto needResend = false + || (errorCode == 16) // bad msg_id + || (errorCode == 17) // bad msg_id + || (errorCode == 64); // bad container if (errorCode == 64) { // bad container! - needResend = true; if (cDebug()) { mtpRequest request; { @@ -1600,7 +1607,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr if (request) { if (mtpRequestData::isSentContainer(request)) { QStringList lst; - const mtpMsgId *ids = (const mtpMsgId *)(request->constData() + 8); + const auto ids = (const mtpMsgId *)(request->constData() + 8); for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) { lst.push_back(QString::number(ids[i])); } @@ -1613,11 +1620,14 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr } if (!wasSent(resendId)) { - DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(resendId)); - return (badTime ? HandleResult::Ignored : HandleResult::Success); + DEBUG_LOG(("Message Error: " + "such message was not sent recently %1").arg(resendId)); + return badTime + ? HandleResult::Ignored + : HandleResult::Success; } - if (needResend) { // bad msg_id + if (needResend) { // bad msg_id or bad container if (serverSalt) sessionData->setSalt(serverSalt); unixtimeSet(serverTime, true); @@ -1634,17 +1644,25 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr return HandleResult::ResetSession; } } else { // fatal (except 48, but it must not get here) - mtpMsgId resendId = data.vbad_msg_id.v; - mtpRequestId requestId = wasSent(resendId); + const auto badMsgId = mtpMsgId(data.vbad_msg_id.v); + const auto requestId = wasSent(resendId); if (requestId) { - LOG(("Message Error: bad message notification received, msgId %1, error_code %2, fatal: clearing callbacks").arg(data.vbad_msg_id.v).arg(errorCode)); - _instance->clearCallbacksDelayed(RPCCallbackClears( - 1, - RPCCallbackClear(requestId, -errorCode))); + LOG(("Message Error: " + "bad message notification received, " + "msgId %1, error_code %2, fatal: clearing callbacks" + ).arg(badMsgId + ).arg(errorCode + )); + _instance->clearCallbacksDelayed({ 1, RPCCallbackClear( + requestId, + -errorCode) }); } else { - DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(resendId)); + DEBUG_LOG(("Message Error: " + "such message was not sent recently %1").arg(badMsgId)); } - return (badTime ? HandleResult::Ignored : HandleResult::Success); + return badTime + ? HandleResult::Ignored + : HandleResult::Success; } } return HandleResult::Success; @@ -2101,7 +2119,7 @@ void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byRespon DEBUG_LOG(("Message Info: requests acked, ids %1").arg(LogIdsVector(ids))); - RPCCallbackClears clearedAcked; + auto clearedBecauseTooOld = std::vector<RPCCallbackClear>(); QVector<MTPlong> toAckMore; { QWriteLocker locker1(sessionData->wereAckedMutex()); @@ -2177,10 +2195,10 @@ void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byRespon uint32 ackedCount = wereAcked.size(); if (ackedCount > MTPIdsBufferSize) { DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - MTPIdsBufferSize)); - clearedAcked.reserve(ackedCount - MTPIdsBufferSize); + clearedBecauseTooOld.reserve(ackedCount - MTPIdsBufferSize); while (ackedCount-- > MTPIdsBufferSize) { - mtpRequestIdsMap::iterator i(wereAcked.begin()); - clearedAcked.push_back(RPCCallbackClear( + auto i = wereAcked.begin(); + clearedBecauseTooOld.push_back(RPCCallbackClear( i.key(), RPCError::TimeoutError)); wereAcked.erase(i); @@ -2188,8 +2206,8 @@ void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byRespon } } - if (clearedAcked.size()) { - _instance->clearCallbacksDelayed(clearedAcked); + if (!clearedBecauseTooOld.empty()) { + _instance->clearCallbacksDelayed(std::move(clearedBecauseTooOld)); } if (toAckMore.size()) { diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.cpp b/Telegram/SourceFiles/mtproto/mtp_instance.cpp index 4b4bf88f0..51d5fda1f 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.cpp +++ b/Telegram/SourceFiles/mtproto/mtp_instance.cpp @@ -76,15 +76,13 @@ public: std::unique_ptr<internal::Connection> &&connection); void connectionFinished(internal::Connection *connection); - void registerRequest(mtpRequestId requestId, int32 dcWithShift); + void registerRequest(mtpRequestId requestId, ShiftedDcId dcWithShift); void unregisterRequest(mtpRequestId requestId); mtpRequestId storeRequest( mtpRequest &request, RPCResponseHandler &&callbacks); mtpRequest getRequest(mtpRequestId requestId); - void clearCallbacks(mtpRequestId requestId, int32 errorCode = RPCError::NoError); // 0 - do not toggle onError callback - void clearCallbacksDelayed(const RPCCallbackClears &requestIds); - void performDelayedClear(); + void clearCallbacksDelayed(std::vector<RPCCallbackClear> &&ids); void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end); bool hasCallbacks(mtpRequestId requestId); void globalCallback(const mtpPrime *from, const mtpPrime *end); @@ -140,6 +138,12 @@ private: void cdnConfigLoadDone(const MTPCdnConfig &result); bool cdnConfigLoadFail(const RPCError &error); + // RPCError::NoError means do not toggle onError callback. + void clearCallbacks( + mtpRequestId requestId, + int32 errorCode = RPCError::NoError); + void clearCallbacks(const std::vector<RPCCallbackClear> &ids); + void checkDelayedRequests(); not_null<Instance*> _instance; @@ -185,9 +189,6 @@ private: std::map<DcId, std::vector<mtpRequestId>> _authWaiters; - QMutex _toClearLock; - RPCCallbackClears _toClear; - RPCResponseHandler _globalHandler; base::lambda<void(ShiftedDcId shiftedDcId, int32 state)> _stateChangedHandler; base::lambda<void(ShiftedDcId shiftedDcId)> _sessionResetHandler; @@ -681,12 +682,11 @@ void Instance::Private::checkDelayedRequests() { } } -void Instance::Private::registerRequest(mtpRequestId requestId, int32 dcWithShift) { - { - QMutexLocker locker(&_requestByDcLock); - _requestsByDc.emplace(requestId, dcWithShift); - } - performDelayedClear(); // need to do it somewhere... +void Instance::Private::registerRequest( + mtpRequestId requestId, + ShiftedDcId dcWithShift) { + QMutexLocker locker(&_requestByDcLock); + _requestsByDc.emplace(requestId, dcWithShift); } void Instance::Private::unregisterRequest(mtpRequestId requestId) { @@ -748,46 +748,50 @@ void Instance::Private::clearCallbacks(mtpRequestId requestId, int32 errorCode) } } -void Instance::Private::clearCallbacksDelayed(const RPCCallbackClears &requestIds) { - uint32 idsCount = requestIds.size(); - if (!idsCount) return; +void Instance::Private::clearCallbacksDelayed( + std::vector<RPCCallbackClear> &&ids) { + if (ids.empty()) { + return; + } if (cDebug()) { - QString idsStr = QString("%1").arg(requestIds[0].requestId); - for (uint32 i = 1; i < idsCount; ++i) { - idsStr += QString(", %1").arg(requestIds[i].requestId); + auto idsString = QStringList(); + idsString.reserve(ids.size()); + for (auto &value : ids) { + idsString.push_back(QString::number(value.requestId)); } - DEBUG_LOG(("RPC Info: clear callbacks delayed, msgIds: %1").arg(idsStr)); + DEBUG_LOG(("RPC Info: clear callbacks delayed, msgIds: %1" + ).arg(idsString.join(", "))); } - QMutexLocker lock(&_toClearLock); - uint32 toClearNow = _toClear.size(); - if (toClearNow) { - _toClear.resize(toClearNow + idsCount); - memcpy(_toClear.data() + toClearNow, requestIds.constData(), idsCount * sizeof(RPCCallbackClear)); - } else { - _toClear = requestIds; - } + crl::on_main(_instance, [this, list = std::move(ids)] { + clearCallbacks(list); + }); } -void Instance::Private::performDelayedClear() { - QMutexLocker lock(&_toClearLock); - if (!_toClear.isEmpty()) { - for (auto &clearRequest : _toClear) { - if (cDebug()) { - QMutexLocker locker(&_parserMapLock); - if (_parserMap.find(clearRequest.requestId) != _parserMap.end()) { - DEBUG_LOG(("RPC Info: clearing delayed callback %1, error code %2").arg(clearRequest.requestId).arg(clearRequest.errorCode)); - } +void Instance::Private::clearCallbacks( + const std::vector<RPCCallbackClear> &ids) { + Expects(!ids.empty()); + + for (const auto &clearRequest : ids) { + if (cDebug()) { + QMutexLocker locker(&_parserMapLock); + if (_parserMap.find(clearRequest.requestId) != _parserMap.end()) { + DEBUG_LOG(("RPC Info: " + "clearing delayed callback %1, error code %2" + ).arg(clearRequest.requestId + ).arg(clearRequest.errorCode)); } - clearCallbacks(clearRequest.requestId, clearRequest.errorCode); - unregisterRequest(clearRequest.requestId); } - _toClear.clear(); + clearCallbacks(clearRequest.requestId, clearRequest.errorCode); + unregisterRequest(clearRequest.requestId); } } -void Instance::Private::execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end) { +void Instance::Private::execCallback( + mtpRequestId requestId, + const mtpPrime *from, + const mtpPrime *end) { RPCResponseHandler h; { QMutexLocker locker(&_parserMapLock); @@ -1023,7 +1027,9 @@ bool Instance::Private::onErrorDefault(mtpRequestId requestId, const RPCError &e request = it->second; } if (auto session = getSession(newdcWithShift)) { - registerRequest(requestId, (dcWithShift < 0) ? -newdcWithShift : newdcWithShift); + registerRequest( + requestId, + (dcWithShift < 0) ? -newdcWithShift : newdcWithShift); session->sendPrepared(request); } return true; @@ -1424,7 +1430,9 @@ void Instance::onSessionReset(ShiftedDcId dcWithShift) { _private->onSessionReset(dcWithShift); } -void Instance::registerRequest(mtpRequestId requestId, ShiftedDcId dcWithShift) { +void Instance::registerRequest( + mtpRequestId requestId, + ShiftedDcId dcWithShift) { _private->registerRequest(requestId, dcWithShift); } @@ -1438,8 +1446,8 @@ mtpRequest Instance::getRequest(mtpRequestId requestId) { return _private->getRequest(requestId); } -void Instance::clearCallbacksDelayed(const RPCCallbackClears &requestIds) { - _private->clearCallbacksDelayed(requestIds); +void Instance::clearCallbacksDelayed(std::vector<RPCCallbackClear> &&ids) { + _private->clearCallbacksDelayed(std::move(ids)); } void Instance::execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end) { diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.h b/Telegram/SourceFiles/mtproto/mtp_instance.h index 076eff88b..420565a78 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.h +++ b/Telegram/SourceFiles/mtproto/mtp_instance.h @@ -135,7 +135,7 @@ public: mtpRequest &request, RPCResponseHandler &&callbacks); mtpRequest getRequest(mtpRequestId requestId); - void clearCallbacksDelayed(const RPCCallbackClears &requestIds); + void clearCallbacksDelayed(std::vector<RPCCallbackClear> &&ids); void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end); bool hasCallbacks(mtpRequestId requestId); diff --git a/Telegram/SourceFiles/mtproto/rpc_sender.h b/Telegram/SourceFiles/mtproto/rpc_sender.h index 5f3f8550b..2988869cd 100644 --- a/Telegram/SourceFiles/mtproto/rpc_sender.h +++ b/Telegram/SourceFiles/mtproto/rpc_sender.h @@ -270,7 +270,9 @@ private: }; struct RPCCallbackClear { - RPCCallbackClear(mtpRequestId id = 0, int32 code = RPCError::NoError) : requestId(id), errorCode(code) { + RPCCallbackClear(mtpRequestId id , int32 code = RPCError::NoError) + : requestId(id) + , errorCode(code) { } mtpRequestId requestId; @@ -278,8 +280,6 @@ struct RPCCallbackClear { }; -using RPCCallbackClears = QVector<RPCCallbackClear> ; - template <typename TReturn> inline RPCDoneHandlerPtr rpcDone(TReturn (*onDone)(const mtpPrime *, const mtpPrime *)) { // done(from, end) return RPCDoneHandlerPtr(new RPCDoneHandlerBare<TReturn>(onDone)); diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp index 9fe8f4eeb..f0466ebe9 100644 --- a/Telegram/SourceFiles/mtproto/session.cpp +++ b/Telegram/SourceFiles/mtproto/session.cpp @@ -56,7 +56,7 @@ void SessionData::setKey(const AuthKeyPtr &key) { } void SessionData::clear(Instance *instance) { - RPCCallbackClears clearCallbacks; + auto clearCallbacks = std::vector<RPCCallbackClear>(); { QReadLocker locker1(haveSentMutex()), locker2(toResendMutex()), locker3(haveReceivedMutex()), locker4(wereAckedMutex()); auto receivedResponsesEnd = _receivedResponses.cend(); @@ -96,7 +96,7 @@ void SessionData::clear(Instance *instance) { QWriteLocker locker(receivedIdsMutex()); _receivedIds.clear(); } - instance->clearCallbacksDelayed(clearCallbacks); + instance->clearCallbacksDelayed(std::move(clearCallbacks)); } Session::Session(not_null<Instance*> instance, ShiftedDcId shiftedDcId) : QObject() @@ -305,12 +305,12 @@ void Session::checkRequestsByTimer() { } } if (!removingIds.isEmpty()) { - RPCCallbackClears clearCallbacks; + auto clearCallbacks = std::vector<RPCCallbackClear>(); { QWriteLocker locker(data.haveSentMutex()); - mtpRequestMap &haveSent(data.haveSentMap()); + auto &haveSent = data.haveSentMap(); for (uint32 i = 0, l = removingIds.size(); i < l; ++i) { - mtpRequestMap::iterator j = haveSent.find(removingIds[i]); + auto j = haveSent.find(removingIds[i]); if (j != haveSent.cend()) { if (j.value()->requestId) { clearCallbacks.push_back(j.value()->requestId); @@ -319,7 +319,7 @@ void Session::checkRequestsByTimer() { } } } - _instance->clearCallbacksDelayed(clearCallbacks); + _instance->clearCallbacksDelayed(std::move(clearCallbacks)); } }