diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index 6dd933189..711b46d94 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -1479,9 +1479,9 @@ void ConnectionPrivate::handleReceived() { bool emitSignal = false; { QReadLocker locker(sessionData->haveReceivedMutex()); - emitSignal = !sessionData->haveReceivedMap().isEmpty(); + emitSignal = !sessionData->haveReceivedResponses().isEmpty() || !sessionData->haveReceivedUpdates().isEmpty(); if (emitSignal) { - DEBUG_LOG(("MTP Info: emitting needToReceive() - need to parse in another thread, haveReceivedMap.size() = %1").arg(sessionData->haveReceivedMap().size())); + DEBUG_LOG(("MTP Info: emitting needToReceive() - need to parse in another thread, %1 responses, %2 updates.").arg(sessionData->haveReceivedResponses().size()).arg(sessionData->haveReceivedUpdates().size())); } } @@ -1908,7 +1908,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr case mtpc_rpc_result: { if (from + 3 > end) throw mtpErrorInsufficient(); - mtpResponse response; + auto response = SerializedMessage(); MTPlong reqMsgId; reqMsgId.read(++from, end); @@ -1943,10 +1943,11 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr sessionData->owner()->notifyLayerInited(true); } - mtpRequestId requestId = wasSent(reqMsgId.v); + auto requestId = wasSent(reqMsgId.v); if (requestId && requestId != mtpRequestId(0xFFFFFFFF)) { + // Save rpc_result for processing in the main thread. QWriteLocker locker(sessionData->haveReceivedMutex()); - sessionData->haveReceivedMap().insert(requestId, response); // save rpc_result for processing in main mtp thread + sessionData->haveReceivedResponses().insert(requestId, response); } else { DEBUG_LOG(("RPC Info: requestId not found for msgId %1").arg(reqMsgId.v)); } @@ -1986,10 +1987,9 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr mtpBuffer update(from - start); if (from > start) memcpy(update.data(), start, (from - start) * sizeof(mtpPrime)); + // Notify main process about new session - need to get difference. QWriteLocker locker(sessionData->haveReceivedMutex()); - mtpResponseMap &haveReceived(sessionData->haveReceivedMap()); - mtpRequestId fakeRequestId = sessionData->nextFakeRequestId(); - haveReceived.insert(fakeRequestId, mtpResponse(update)); // notify main process about new session - need to get difference + sessionData->haveReceivedUpdates().push_back(SerializedMessage(update)); } return HandleResult::Success; case mtpc_ping: { @@ -2044,10 +2044,9 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr mtpBuffer update(end - from); if (end > from) memcpy(update.data(), from, (end - from) * sizeof(mtpPrime)); + // Notify main process about the new updates. QWriteLocker locker(sessionData->haveReceivedMutex()); - mtpResponseMap &haveReceived(sessionData->haveReceivedMap()); - mtpRequestId fakeRequestId = sessionData->nextFakeRequestId(); - haveReceived.insert(fakeRequestId, mtpResponse(update)); // notify main process about new updates + sessionData->haveReceivedUpdates().push_back(SerializedMessage(update)); if (cons != mtpc_updatesTooLong && cons != mtpc_updateShortMessage && cons != mtpc_updateShortChatMessage && cons != mtpc_updateShortSentMessage && cons != mtpc_updateShort && cons != mtpc_updatesCombined && cons != mtpc_updates) { LOG(("Message Error: unknown constructor %1").arg(cons)); // maybe new api?.. diff --git a/Telegram/SourceFiles/mtproto/core_types.h b/Telegram/SourceFiles/mtproto/core_types.h index da889b528..00589cd8c 100644 --- a/Telegram/SourceFiles/mtproto/core_types.h +++ b/Telegram/SourceFiles/mtproto/core_types.h @@ -119,23 +119,6 @@ inline void mtpRequest::write(mtpBuffer &to) const { memcpy(to.data() + was, value->constData() + 8, s * sizeof(mtpPrime)); } -class mtpResponse : public mtpBuffer { -public: - mtpResponse() { - } - mtpResponse(const mtpBuffer &v) : mtpBuffer(v) { - } - mtpResponse &operator=(const mtpBuffer &v) { - mtpBuffer::operator=(v); - return (*this); - } - bool needAck() const { - if (size() < 8) return false; - uint32 seqNo = *(uint32*)(constData() + 6); - return (seqNo & 0x01) ? true : false; - } -}; - using mtpPreRequestMap = QMap; using mtpRequestMap = QMap; using mtpMsgIdsSet = QMap; @@ -154,8 +137,6 @@ public: } }; -using mtpResponseMap = QMap; - class mtpErrorUnexpected : public Exception { public: mtpErrorUnexpected(mtpTypeId typeId, const QString &type) : Exception(QString("MTP Unexpected type id #%1 read in %2").arg(uint32(typeId), 0, 16).arg(type), false) { // maybe api changed?.. diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp index 3a68fb94c..9ceb485e1 100644 --- a/Telegram/SourceFiles/mtproto/session.cpp +++ b/Telegram/SourceFiles/mtproto/session.cpp @@ -29,42 +29,42 @@ void SessionData::clear(Instance *instance) { RPCCallbackClears clearCallbacks; { QReadLocker locker1(haveSentMutex()), locker2(toResendMutex()), locker3(haveReceivedMutex()), locker4(wereAckedMutex()); - mtpResponseMap::const_iterator end = haveReceived.cend(); - clearCallbacks.reserve(haveSent.size() + wereAcked.size()); - for (mtpRequestMap::const_iterator i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) { - mtpRequestId requestId = i.value()->requestId; - if (haveReceived.find(requestId) == end) { + auto receivedResponsesEnd = _receivedResponses.cend(); + clearCallbacks.reserve(_haveSent.size() + _wereAcked.size()); + for (auto i = _haveSent.cbegin(), e = _haveSent.cend(); i != e; ++i) { + auto requestId = i.value()->requestId; + if (!_receivedResponses.contains(requestId)) { clearCallbacks.push_back(requestId); } } - for (mtpRequestIdsMap::const_iterator i = toResend.cbegin(), e = toResend.cend(); i != e; ++i) { - mtpRequestId requestId = i.value(); - if (haveReceived.find(requestId) == end) { + for (auto i = _toResend.cbegin(), e = _toResend.cend(); i != e; ++i) { + auto requestId = i.value(); + if (!_receivedResponses.contains(requestId)) { clearCallbacks.push_back(requestId); } } - for (mtpRequestIdsMap::const_iterator i = wereAcked.cbegin(), e = wereAcked.cend(); i != e; ++i) { - mtpRequestId requestId = i.value(); - if (haveReceived.find(requestId) == end) { + for (auto i = _wereAcked.cbegin(), e = _wereAcked.cend(); i != e; ++i) { + auto requestId = i.value(); + if (!_receivedResponses.contains(requestId)) { clearCallbacks.push_back(requestId); } } } { QWriteLocker locker(haveSentMutex()); - haveSent.clear(); + _haveSent.clear(); } { QWriteLocker locker(toResendMutex()); - toResend.clear(); + _toResend.clear(); } { QWriteLocker locker(wereAckedMutex()); - wereAcked.clear(); + _wereAcked.clear(); } { QWriteLocker locker(receivedIdsMutex()); - receivedIds.clear(); + _receivedIds.clear(); } instance->clearCallbacksDelayed(clearCallbacks); } @@ -494,28 +494,37 @@ void Session::tryToReceive() { _needToReceive = true; return; } - int32 cnt = 0; while (true) { - mtpRequestId requestId; - mtpResponse response; + auto requestId = mtpRequestId(0); + auto isUpdate = false; + auto message = SerializedMessage(); { QWriteLocker locker(data.haveReceivedMutex()); - mtpResponseMap &responses(data.haveReceivedMap()); - mtpResponseMap::iterator i = responses.begin(); - if (i == responses.end()) return; - - requestId = i.key(); - response = i.value(); - responses.erase(i); + auto &responses = data.haveReceivedResponses(); + auto response = responses.begin(); + if (response == responses.cend()) { + auto &updates = data.haveReceivedUpdates(); + auto update = updates.begin(); + if (update == updates.cend()) { + return; + } else { + message = std::move(*update); + isUpdate = true; + updates.pop_front(); + } + } else { + requestId = response.key(); + message = std::move(response.value()); + responses.erase(response); + } } - if (requestId <= 0) { + if (isUpdate) { if (dcWithShift == bareDcId(dcWithShift)) { // call globalCallback only in main session - _instance->globalCallback(response.constData(), response.constData() + response.size()); + _instance->globalCallback(message.constData(), message.constData() + message.size()); } } else { - _instance->execCallback(requestId, response.constData(), response.constData() + response.size()); + _instance->execCallback(requestId, message.constData(), message.constData() + message.size()); } - ++cnt; } } diff --git a/Telegram/SourceFiles/mtproto/session.h b/Telegram/SourceFiles/mtproto/session.h index 5b08dba64..205bf7ab0 100644 --- a/Telegram/SourceFiles/mtproto/session.h +++ b/Telegram/SourceFiles/mtproto/session.h @@ -85,6 +85,16 @@ private: }; +using SerializedMessage = mtpBuffer; + +inline bool ResponseNeedsAck(const SerializedMessage &response) { + if (response.size() < 8) { + return false; + } + auto seqNo = *(uint32*)(response.constData() + 6); + return (seqNo & 0x01) ? true : false; +} + class Session; class SessionData { public: @@ -94,31 +104,31 @@ public: void setSession(uint64 session) { DEBUG_LOG(("MTP Info: setting server_session: %1").arg(session)); - QWriteLocker locker(&lock); + QWriteLocker locker(&_lock); if (_session != session) { _session = session; _messagesSent = 0; } } uint64 getSession() const { - QReadLocker locker(&lock); + QReadLocker locker(&_lock); return _session; } bool layerWasInited() const { - QReadLocker locker(&lock); + QReadLocker locker(&_lock); return _layerInited; } void setLayerWasInited(bool was) { - QWriteLocker locker(&lock); + QWriteLocker locker(&_lock); _layerInited = was; } void setSalt(uint64 salt) { - QWriteLocker locker(&lock); + QWriteLocker locker(&_lock); _salt = salt; } uint64 getSalt() const { - QReadLocker locker(&lock); + QReadLocker locker(&_lock); return _salt; } @@ -131,7 +141,7 @@ public: _authKey = key; DEBUG_LOG(("MTP Info: new auth key set in SessionData, id %1, setting random server_session %2").arg(key ? key->keyId() : 0).arg(session)); - QWriteLocker locker(&lock); + QWriteLocker locker(&_lock); if (_session != session) { _session = session; _messagesSent = 0; @@ -141,88 +151,85 @@ public: } bool isCheckedKey() const { - QReadLocker locker(&lock); + QReadLocker locker(&_lock); return _keyChecked; } void setCheckedKey(bool checked) { - QWriteLocker locker(&lock); + QWriteLocker locker(&_lock); _keyChecked = checked; } QReadWriteLock *keyMutex() const; QReadWriteLock *toSendMutex() const { - return &toSendLock; + return &_toSendLock; } QReadWriteLock *haveSentMutex() const { - return &haveSentLock; + return &_haveSentLock; } QReadWriteLock *toResendMutex() const { - return &toResendLock; + return &_toResendLock; } QReadWriteLock *wereAckedMutex() const { - return &wereAckedLock; + return &_wereAckedLock; } QReadWriteLock *receivedIdsMutex() const { - return &receivedIdsLock; + return &_receivedIdsLock; } QReadWriteLock *haveReceivedMutex() const { - return &haveReceivedLock; + return &_haveReceivedLock; } QReadWriteLock *stateRequestMutex() const { - return &stateRequestLock; + return &_stateRequestLock; } mtpPreRequestMap &toSendMap() { - return toSend; + return _toSend; } const mtpPreRequestMap &toSendMap() const { - return toSend; + return _toSend; } mtpRequestMap &haveSentMap() { - return haveSent; + return _haveSent; } const mtpRequestMap &haveSentMap() const { - return haveSent; + return _haveSent; } mtpRequestIdsMap &toResendMap() { // msgId -> requestId, on which toSend: requestId -> request for resended requests - return toResend; + return _toResend; } const mtpRequestIdsMap &toResendMap() const { - return toResend; + return _toResend; } ReceivedMsgIds &receivedIdsSet() { - return receivedIds; + return _receivedIds; } const ReceivedMsgIds &receivedIdsSet() const { - return receivedIds; + return _receivedIds; } mtpRequestIdsMap &wereAckedMap() { - return wereAcked; + return _wereAcked; } const mtpRequestIdsMap &wereAckedMap() const { - return wereAcked; + return _wereAcked; } - mtpResponseMap &haveReceivedMap() { - return haveReceived; + QMap &haveReceivedResponses() { + return _receivedResponses; } - const mtpResponseMap &haveReceivedMap() const { - return haveReceived; + const QMap &haveReceivedResponses() const { + return _receivedResponses; + } + QList &haveReceivedUpdates() { + return _receivedUpdates; + } + const QList &haveReceivedUpdates() const { + return _receivedUpdates; } mtpMsgIdsSet &stateRequestMap() { - return stateRequest; + return _stateRequest; } const mtpMsgIdsSet &stateRequestMap() const { - return stateRequest; - } - - mtpRequestId nextFakeRequestId() { // must be locked by haveReceivedMutex() - if (haveReceived.isEmpty() || haveReceived.cbegin().key() > 0) { - _fakeRequestId = -2000000000; - } else { - ++_fakeRequestId; - } - return _fakeRequestId; + return _stateRequest; } Session *owner() { @@ -233,8 +240,8 @@ public: } uint32 nextRequestSeqNumber(bool needAck = true) { - QWriteLocker locker(&lock); - uint32 result(_messagesSent); + QWriteLocker locker(&_lock); + auto result = _messagesSent; _messagesSent += (needAck ? 1 : 0); return result * 2 + (needAck ? 1 : 0); } @@ -246,7 +253,6 @@ private: uint64 _salt = 0; uint32 _messagesSent = 0; - mtpRequestId _fakeRequestId = -2000000000; Session *_owner = nullptr; @@ -254,23 +260,25 @@ private: bool _keyChecked = false; bool _layerInited = false; - mtpPreRequestMap toSend; // map of request_id -> request, that is waiting to be sent - mtpRequestMap haveSent; // map of msg_id -> request, that was sent, msDate = 0 for msgs_state_req (no resend / state req), msDate = 0, seqNo = 0 for containers - mtpRequestIdsMap toResend; // map of msg_id -> request_id, that request_id -> request lies in toSend and is waiting to be resent - ReceivedMsgIds receivedIds; // set of received msg_id's, for checking new msg_ids - mtpRequestIdsMap wereAcked; // map of msg_id -> request_id, this msg_ids already were acked or do not need ack - mtpResponseMap haveReceived; // map of request_id -> response, that should be processed in other thread - mtpMsgIdsSet stateRequest; // set of msg_id's, whose state should be requested + mtpPreRequestMap _toSend; // map of request_id -> request, that is waiting to be sent + mtpRequestMap _haveSent; // map of msg_id -> request, that was sent, msDate = 0 for msgs_state_req (no resend / state req), msDate = 0, seqNo = 0 for containers + mtpRequestIdsMap _toResend; // map of msg_id -> request_id, that request_id -> request lies in toSend and is waiting to be resent + ReceivedMsgIds _receivedIds; // set of received msg_id's, for checking new msg_ids + mtpRequestIdsMap _wereAcked; // map of msg_id -> request_id, this msg_ids already were acked or do not need ack + mtpMsgIdsSet _stateRequest; // set of msg_id's, whose state should be requested + + QMap _receivedResponses; // map of request_id -> response that should be processed in the main thread + QList _receivedUpdates; // list of updates that should be processed in the main thread // mutexes - mutable QReadWriteLock lock; - mutable QReadWriteLock toSendLock; - mutable QReadWriteLock haveSentLock; - mutable QReadWriteLock toResendLock; - mutable QReadWriteLock receivedIdsLock; - mutable QReadWriteLock wereAckedLock; - mutable QReadWriteLock haveReceivedLock; - mutable QReadWriteLock stateRequestLock; + mutable QReadWriteLock _lock; + mutable QReadWriteLock _toSendLock; + mutable QReadWriteLock _haveSentLock; + mutable QReadWriteLock _toResendLock; + mutable QReadWriteLock _receivedIdsLock; + mutable QReadWriteLock _wereAckedLock; + mutable QReadWriteLock _haveReceivedLock; + mutable QReadWriteLock _stateRequestLock; };