diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index 2a3072383..e5d5be76a 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -443,13 +443,10 @@ mtpMsgId ConnectionPrivate::prepareToSend( if (const auto msgId = request.getMsgId()) { // resending this request - QWriteLocker lock(_sessionData->toResendMutex()); - auto &toResend = _sessionData->toResendMap(); - const auto i = toResend.find(msgId); - if (i != toResend.cend()) { - toResend.erase(i); + const auto i = _resendingIds.find(msgId); + if (i != _resendingIds.cend()) { + _resendingIds.erase(i); } - lock.unlock(); return (forceNewMsgId || msgId > currentLastId) ? replaceMsgId(request, currentLastId) @@ -470,33 +467,29 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId) if (oldMsgId == newId) { return newId; } - QWriteLocker locker(_sessionData->toResendMutex()); - // haveSentMutex() and wereAckedMutex() were locked in tryToSend() - - auto &toResend = _sessionData->toResendMap(); - auto &wereAcked = _sessionData->wereAckedMap(); + // haveSentMutex() was locked in tryToSend() auto &haveSent = _sessionData->haveSentMap(); - while (toResend.constFind(newId) != toResend.cend() - || wereAcked.constFind(newId) != wereAcked.cend() + while (_resendingIds.contains(newId) + || _ackedIds.contains(newId) || haveSent.constFind(newId) != haveSent.cend()) { newId = base::unixtime::mtproto_msg_id(); } MTP_LOG(_shiftedDcId, ("[r%1] msg_id %2 -> %3").arg(request->requestId).arg(oldMsgId).arg(newId)); - const auto i = toResend.find(oldMsgId); - if (i != toResend.cend()) { - const auto req = i.value(); - toResend.erase(i); - toResend.insert(newId, req); + const auto i = _resendingIds.find(oldMsgId); + if (i != _resendingIds.end()) { + const auto requestId = i->second; + _resendingIds.erase(i); + _resendingIds.emplace(newId, requestId); } - const auto j = wereAcked.find(oldMsgId); - if (j != wereAcked.cend()) { - const auto req = j.value(); - wereAcked.erase(j); - wereAcked.insert(newId, req); + const auto j = _ackedIds.find(oldMsgId); + if (j != _ackedIds.cend()) { + const auto requestId = j->second; + _ackedIds.erase(j); + _ackedIds.emplace(newId, requestId); } const auto k = haveSent.find(oldMsgId); @@ -618,14 +611,14 @@ void ConnectionPrivate::tryToSend() { } stateRequest = SecureRequest::Serialize(MTPMsgsStateReq( MTP_msgs_state_req(MTP_vector<MTPlong>(ids)))); - // Add to haveSent / wereAcked maps, but don't add to requestMap. + // Add to haveSent / _ackedIds, but don't add to requestMap. stateRequest->requestId = GetNextRequestId(); } if (_connection->usingHttpWait()) { httpWaitRequest = SecureRequest::Serialize(MTPHttpWait( MTP_http_wait(MTP_int(100), MTP_int(30), MTP_int(25000)))); } - if (_keyCreator && _keyCreator->bindReadyToRequest()) { + if (!_bindMsgId && _keyCreator && _keyCreator->readyToBind()) { bindDcKeyRequest = _keyCreator->prepareBindRequest( _encryptionKey, _sessionId); @@ -635,22 +628,6 @@ void ConnectionPrivate::tryToSend() { // seqNo for it manually here. bindDcKeyRequest.setSeqNo( nextRequestSeqNumber(bindDcKeyRequest.needAck())); - //} else if (!_keyChecker) { - // if (const auto &keyForCheck = _sessionData->getKeyForCheck()) { - // _keyChecker = std::make_unique<details::DcKeyChecker>( - // _instance, - // _shiftedDcId, - // keyForCheck); - // bindDcKeyRequest = _keyChecker->prepareRequest( - // _encryptionKey, - // _sessionId); - - // // This is a special request with msgId used inside the message - // // body, so it is prepared already with a msgId and we place - // // seqNo for it manually here. - // bindDcKeyRequest.setSeqNo( - // nextRequestSeqNumber(bindDcKeyRequest.needAck())); - // } } } @@ -750,8 +727,11 @@ void ConnectionPrivate::tryToSend() { const auto msgId = prepareToSend( toSendRequest, base::unixtime::mtproto_msg_id(), - forceNewMsgId); - if (pingRequest) { + forceNewMsgId && !bindDcKeyRequest); + if (bindDcKeyRequest) { + _bindMsgId = msgId; + needAnyResponse = true; + } else if (pingRequest) { _pingMsgId = msgId; needAnyResponse = true; } else if (resendRequest || stateRequest) { @@ -792,8 +772,7 @@ void ConnectionPrivate::tryToSend() { needAnyResponse = true; } else { - QWriteLocker locker3(_sessionData->wereAckedMutex()); - _sessionData->wereAckedMap().insert(msgId, toSendRequest->requestId); + _ackedIds.emplace(msgId, toSendRequest->requestId); } } } else { // send in container @@ -833,10 +812,6 @@ void ConnectionPrivate::tryToSend() { QWriteLocker locker2(_sessionData->haveSentMutex()); auto &haveSent = _sessionData->haveSentMap(); - // the fact of this lock is used in replaceMsgId() - QWriteLocker locker3(_sessionData->wereAckedMutex()); - auto &wereAcked = _sessionData->wereAckedMap(); - // prepare "request-like" wrap for msgId vector auto haveSentIdsWrap = SecureRequest::Prepare(idsWrapSize); haveSentIdsWrap->msDate = 0; // Container: msDate = 0, seqNo = 0. @@ -844,6 +819,15 @@ void ConnectionPrivate::tryToSend() { haveSentIdsWrap->resize(haveSentIdsWrap->size() + idsWrapSize); auto haveSentArr = (mtpMsgId*)(haveSentIdsWrap->data() + 8); + if (bindDcKeyRequest) { + _bindMsgId = placeToContainer( + toSendRequest, + bigMsgId, + false, + haveSentArr, + bindDcKeyRequest); + needAnyResponse = true; + } if (pingRequest) { _pingMsgId = placeToContainer( toSendRequest, @@ -852,7 +836,8 @@ void ConnectionPrivate::tryToSend() { haveSentArr, pingRequest); needAnyResponse = true; - } else if (resendRequest || stateRequest || bindDcKeyRequest) { + } + if (resendRequest || stateRequest) { needAnyResponse = true; } for (auto i = toSend.begin(), e = toSend.end(); i != e; ++i) { @@ -890,7 +875,7 @@ void ConnectionPrivate::tryToSend() { needAnyResponse = true; } else { - wereAcked.insert(msgId, req->requestId); + _ackedIds.emplace(msgId, req->requestId); } } if (!added) { @@ -908,7 +893,6 @@ void ConnectionPrivate::tryToSend() { if (resendRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, resendRequest); if (ackRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, ackRequest); if (httpWaitRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, httpWaitRequest); - if (bindDcKeyRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, bindDcKeyRequest); const auto containerMsgId = prepareToSend( toSendRequest, @@ -1044,6 +1028,7 @@ void ConnectionPrivate::connectToServer(bool afterConfig) { setState(ConnectingState); + _bindMsgId = 0; _pingId = _pingMsgId = _pingIdToSend = _pingSendAt = 0; _pingSender.cancel(); @@ -1352,7 +1337,7 @@ void ConnectionPrivate::handleReceived() { _sessionSalt = serverSalt; if (setState(ConnectedState, ConnectingState)) { - _sessionData->resendAll(); + resendAll(); } } else { DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(_sessionSalt)); @@ -1416,10 +1401,16 @@ void ConnectionPrivate::handleReceived() { } } -ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPrime *from, const mtpPrime *end, uint64 msgId, int32 serverTime, uint64 serverSalt, bool badTime) { - const auto cons = mtpTypeId(*from); +ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived( + const mtpPrime *from, + const mtpPrime *end, + uint64 msgId, + int32 serverTime, + uint64 serverSalt, + bool badTime) { + Expects(from < end); - switch (cons) { + switch (mtpTypeId(*from)) { case mtpc_gzip_packed: { DEBUG_LOG(("Message Info: gzip container")); @@ -1495,18 +1486,15 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr if (!msg.read(from, end)) { return HandleResult::ParseError; } - auto &ids = msg.c_msgs_ack().vmsg_ids().v; - uint32 idsCount = ids.size(); + const auto &ids = msg.c_msgs_ack().vmsg_ids().v; + DEBUG_LOG(("Message Info: acks received, ids: %1" + ).arg(LogIdsVector(ids))); + if (ids.isEmpty()) { + return badTime ? HandleResult::Ignored : HandleResult::Success; + } - DEBUG_LOG(("Message Info: acks received, ids: %1").arg(LogIdsVector(ids))); - if (!idsCount) return (badTime ? HandleResult::Ignored : HandleResult::Success); - - if (badTime) { - if (requestsFixTimeSalt(ids, serverTime, serverSalt)) { - badTime = false; - } else { - return HandleResult::Ignored; - } + if (badTime && !requestsFixTimeSalt(ids, serverTime, serverSalt)) { + return HandleResult::Ignored; } requestsAcked(ids); } return HandleResult::Success; @@ -1519,11 +1507,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr const auto &data(msg.c_bad_msg_notification()); LOG(("Message Info: bad message notification received (error_code %3) for msg_id = %1, seq_no = %2").arg(data.vbad_msg_id().v).arg(data.vbad_msg_seqno().v).arg(data.verror_code().v)); - mtpMsgId resendId = data.vbad_msg_id().v; - if (resendId == _pingMsgId) { - _pingId = 0; - } - int32 errorCode = data.verror_code().v; + const auto resendId = data.vbad_msg_id().v; + const auto errorCode = data.verror_code().v; if (false || errorCode == 16 || errorCode == 17 @@ -1619,13 +1604,11 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr if (!msg.read(from, end)) { return HandleResult::ParseError; } - const auto &data(msg.c_bad_server_salt()); + const auto &data = msg.c_bad_server_salt(); DEBUG_LOG(("Message Info: bad server salt received (error_code %4) for msg_id = %1, seq_no = %2, new salt: %3").arg(data.vbad_msg_id().v).arg(data.vbad_msg_seqno().v).arg(data.vnew_server_salt().v).arg(data.verror_code().v)); - mtpMsgId resendId = data.vbad_msg_id().v; - if (resendId == _pingMsgId) { - _pingId = 0; - } else if (!wasSent(resendId)) { + const auto resendId = data.vbad_msg_id().v; + if (!wasSent(resendId)) { DEBUG_LOG(("Message Error: such message was not sent recently %1").arg(resendId)); return (badTime ? HandleResult::Ignored : HandleResult::Success); } @@ -1634,7 +1617,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr base::unixtime::update(serverTime); if (setState(ConnectedState, ConnectingState)) { - _sessionData->resendAll(); + resendAll(); } badTime = false; @@ -1661,11 +1644,6 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr { const auto minRecv = _receivedMessageIds.min(); const auto maxRecv = _receivedMessageIds.max(); - - QReadLocker locker(_sessionData->wereAckedMutex()); - const auto &wereAcked = _sessionData->wereAckedMap(); - const auto wereAckedEnd = wereAcked.cend(); - for (uint32 i = 0, l = idsCount; i < l; ++i) { char state = 0; uint64 reqMsgId = ids[i].v; @@ -1679,7 +1657,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr state |= 0x02; } else { state |= 0x04; - if (wereAcked.constFind(reqMsgId) != wereAckedEnd) { + if (_ackedIds.contains(reqMsgId)) { state |= 0x80; // we know, that server knows, that we received request } if (msgIdState == ReceivedIdsManager::State::NeedsAck) { // need ack, so we sent ack @@ -1837,20 +1815,21 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr if (!reqMsgId.read(++from, end)) { return HandleResult::ParseError; } - mtpTypeId typeId = from[0]; + const auto requestMsgId = reqMsgId.v; - DEBUG_LOG(("RPC Info: response received for %1, queueing...").arg(reqMsgId.v)); + DEBUG_LOG(("RPC Info: response received for %1, queueing...").arg(requestMsgId)); QVector<MTPlong> ids(1, reqMsgId); if (badTime) { if (requestsFixTimeSalt(ids, serverTime, serverSalt)) { badTime = false; } else { - DEBUG_LOG(("Message Info: error, such message was not sent recently %1").arg(reqMsgId.v)); + DEBUG_LOG(("Message Info: error, such message was not sent recently %1").arg(requestMsgId)); return HandleResult::Ignored; } } + mtpTypeId typeId = from[0]; if (typeId == mtpc_gzip_packed) { DEBUG_LOG(("RPC Info: gzip container")); response = ungzip(++from, end); @@ -1874,36 +1853,17 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr } requestsAcked(ids, true); - if (_keyCreator) { - const auto result = _keyCreator->handleBindResponse( - reqMsgId, - response); - switch (result) { - case DcKeyBindState::Success: - if (!_sessionData->releaseKeyCreationOnDone( - _encryptionKey, - base::take(_keyCreator)->bindPersistentKey())) { - return HandleResult::DestroyTemporaryKey; - } - _sessionData->queueNeedToResumeAndSend(); - return HandleResult::Success; - case DcKeyBindState::DefinitelyDestroyed: - if (destroyOldEnoughPersistentKey()) { - return HandleResult::DestroyTemporaryKey; - } - [[fallthrough]]; - case DcKeyBindState::Failed: - _sessionData->queueNeedToResumeAndSend(); - return HandleResult::Success; - } + const auto bindResult = handleBindResponse(requestMsgId, response); + if (bindResult != HandleResult::Ignored) { + return bindResult; } - auto requestId = wasSent(reqMsgId.v); + const auto requestId = wasSent(requestMsgId); if (requestId && requestId != mtpRequestId(0xFFFFFFFF)) { // Save rpc_result for processing in the main thread. QWriteLocker locker(_sessionData->haveReceivedMutex()); _sessionData->haveReceivedResponses().insert(requestId, response); } else { - DEBUG_LOG(("RPC Info: requestId not found for msgId %1").arg(reqMsgId.v)); + DEBUG_LOG(("RPC Info: requestId not found for msgId %1").arg(requestMsgId)); } } return HandleResult::Success; @@ -1942,7 +1902,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr } } for (const auto msgId : toResend) { - _sessionData->resend(msgId, 10, true); + resend(msgId, 10, true); } mtpBuffer update(from - start); @@ -1991,22 +1951,13 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr if (_currentDcType == DcType::Regular) { mtpBuffer update(end - from); - if (end > from) memcpy(update.data(), from, (end - from) * sizeof(mtpPrime)); + if (end > from) { + memcpy(update.data(), from, (end - from) * sizeof(mtpPrime)); + } // Notify main process about the new updates. QWriteLocker locker(_sessionData->haveReceivedMutex()); _sessionData->haveReceivedUpdates().push_back(SerializedMessage(update)); - - if (cons != mtpc_updatesTooLong - && cons != mtpc_updateShortMessage - && cons != mtpc_updateShortChatMessage - && cons != mtpc_updateShortSentMessage - && cons != mtpc_updateShort - && cons != mtpc_updatesCombined - && cons != mtpc_updates) { - // Maybe some new unknown update? - LOG(("Message Error: unknown constructor 0x%1").arg(cons, 0, 16)); - } } else { LOG(("Message Error: unexpected updates in dcType: %1" ).arg(static_cast<int>(_currentDcType))); @@ -2015,6 +1966,34 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr return HandleResult::Success; } +ConnectionPrivate::HandleResult ConnectionPrivate::handleBindResponse( + mtpMsgId requestMsgId, + const mtpBuffer &response) { + if (!_keyCreator || !_bindMsgId || _bindMsgId != requestMsgId) { + return HandleResult::Ignored; + } + const auto result = _keyCreator->handleBindResponse(response); + switch (result) { + case DcKeyBindState::Success: + if (!_sessionData->releaseKeyCreationOnDone( + _encryptionKey, + base::take(_keyCreator)->bindPersistentKey())) { + return HandleResult::DestroyTemporaryKey; + } + _sessionData->queueNeedToResumeAndSend(); + return HandleResult::Success; + case DcKeyBindState::DefinitelyDestroyed: + if (destroyOldEnoughPersistentKey()) { + return HandleResult::DestroyTemporaryKey; + } + [[fallthrough]]; + case DcKeyBindState::Failed: + _sessionData->queueNeedToResumeAndSend(); + return HandleResult::Success; + } + Unexpected("Result of BoundKeyCreator::handleBindResponse."); +} + mtpBuffer ConnectionPrivate::ungzip(const mtpPrime *from, const mtpPrime *end) const { mtpBuffer result; // * 4 because of mtpPrime type result.resize(0); @@ -2090,87 +2069,80 @@ void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byRespon auto clearedBecauseTooOld = std::vector<RPCCallbackClear>(); QVector<MTPlong> toAckMore; { - QWriteLocker locker1(_sessionData->wereAckedMutex()); - auto &wereAcked = _sessionData->wereAckedMap(); + QWriteLocker locker2(_sessionData->haveSentMutex()); + auto &haveSent = _sessionData->haveSentMap(); - { - QWriteLocker locker2(_sessionData->haveSentMutex()); - auto &haveSent = _sessionData->haveSentMap(); - - for (uint32 i = 0; i < idsCount; ++i) { - mtpMsgId msgId = ids[i].v; - const auto req = haveSent.find(msgId); - if (req != haveSent.cend()) { - if (!req.value()->msDate) { - DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(ids[i].v)); - uint32 inContCount = ((*req)->size() - 8) / 2; - const mtpMsgId *inContId = (const mtpMsgId *)(req.value()->constData() + 8); - toAckMore.reserve(toAckMore.size() + inContCount); - for (uint32 j = 0; j < inContCount; ++j) { - toAckMore.push_back(MTP_long(*(inContId++))); - } + for (uint32 i = 0; i < idsCount; ++i) { + mtpMsgId msgId = ids[i].v; + const auto req = haveSent.find(msgId); + if (req != haveSent.cend()) { + if (!req.value()->msDate) { + DEBUG_LOG(("Message Info: container ack received, msgId %1").arg(ids[i].v)); + uint32 inContCount = ((*req)->size() - 8) / 2; + const mtpMsgId *inContId = (const mtpMsgId *)(req.value()->constData() + 8); + toAckMore.reserve(toAckMore.size() + inContCount); + for (uint32 j = 0; j < inContCount; ++j) { + toAckMore.push_back(MTP_long(*(inContId++))); + } + haveSent.erase(req); + } else { + mtpRequestId reqId = req.value()->requestId; + bool moveToAcked = byResponse; + if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler) + moveToAcked = !_instance->hasCallbacks(reqId); + } + if (moveToAcked) { + _ackedIds.emplace(msgId, reqId); haveSent.erase(req); } else { - mtpRequestId reqId = req.value()->requestId; - bool moveToAcked = byResponse; - if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler) - moveToAcked = !_instance->hasCallbacks(reqId); - } - if (moveToAcked) { - wereAcked.insert(msgId, reqId); - haveSent.erase(req); + DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId)); + } + } + } else { + DEBUG_LOG(("Message Info: msgId %1 was not found in recent sent, while acking requests, searching in resend...").arg(msgId)); + const auto reqIt = _resendingIds.find(msgId); + if (reqIt != _resendingIds.end()) { + const auto reqId = reqIt->second; + bool moveToAcked = byResponse; + if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler) + moveToAcked = !_instance->hasCallbacks(reqId); + } + if (moveToAcked) { + QWriteLocker locker4(_sessionData->toSendMutex()); + auto &toSend = _sessionData->toSendMap(); + const auto req = toSend.find(reqId); + if (req != toSend.cend()) { + _ackedIds.emplace(msgId, req.value()->requestId); + if (req.value()->requestId != reqId) { + DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(reqId).arg(req.value()->requestId)); + } else { + DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(reqId)); + } + toSend.erase(req); } else { - DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId)); + DEBUG_LOG(("Message Info: msgId %1 was found in recent resent, requestId %2 was not found in prepared to send").arg(msgId)); } + _resendingIds.erase(reqIt); + } else { + DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId)); } } else { - DEBUG_LOG(("Message Info: msgId %1 was not found in recent sent, while acking requests, searching in resend...").arg(msgId)); - QWriteLocker locker3(_sessionData->toResendMutex()); - auto &toResend = _sessionData->toResendMap(); - const auto reqIt = toResend.find(msgId); - if (reqIt != toResend.cend()) { - const auto reqId = reqIt.value(); - bool moveToAcked = byResponse; - if (!moveToAcked) { // ignore ACK, if we need a response (if we have a handler) - moveToAcked = !_instance->hasCallbacks(reqId); - } - if (moveToAcked) { - QWriteLocker locker4(_sessionData->toSendMutex()); - auto &toSend = _sessionData->toSendMap(); - const auto req = toSend.find(reqId); - if (req != toSend.cend()) { - wereAcked.insert(msgId, req.value()->requestId); - if (req.value()->requestId != reqId) { - DEBUG_LOG(("Message Error: for msgId %1 found resent request, requestId %2, contains requestId %3").arg(msgId).arg(reqId).arg(req.value()->requestId)); - } else { - DEBUG_LOG(("Message Info: acked msgId %1 that was prepared to resend, requestId %2").arg(msgId).arg(reqId)); - } - toSend.erase(req); - } else { - DEBUG_LOG(("Message Info: msgId %1 was found in recent resent, requestId %2 was not found in prepared to send").arg(msgId)); - } - toResend.erase(reqIt); - } else { - DEBUG_LOG(("Message Info: ignoring ACK for msgId %1 because request %2 requires a response").arg(msgId).arg(reqId)); - } - } else { - DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(msgId)); - } + DEBUG_LOG(("Message Info: msgId %1 was not found in recent resent either").arg(msgId)); } } } + } - uint32 ackedCount = wereAcked.size(); - if (ackedCount > kIdsBufferSize) { - DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize)); - clearedBecauseTooOld.reserve(ackedCount - kIdsBufferSize); - while (ackedCount-- > kIdsBufferSize) { - auto i = wereAcked.begin(); - clearedBecauseTooOld.push_back(RPCCallbackClear( - i.value(), - RPCError::TimeoutError)); - wereAcked.erase(i); - } + auto ackedCount = _ackedIds.size(); + if (ackedCount > kIdsBufferSize) { + DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize)); + clearedBecauseTooOld.reserve(ackedCount - kIdsBufferSize); + while (ackedCount-- > kIdsBufferSize) { + auto i = _ackedIds.begin(); + clearedBecauseTooOld.push_back(RPCCallbackClear( + i->second, + RPCError::TimeoutError)); + _ackedIds.erase(i); } } @@ -2204,10 +2176,8 @@ void ConnectionPrivate::handleMsgsStates(const QVector<MTPlong> &ids, const QByt const auto haveSentEnd = haveSent.cend(); if (haveSent.find(requestMsgId) == haveSentEnd) { DEBUG_LOG(("Message Info: state was received for msgId %1, but request is not found, looking in resent requests...").arg(requestMsgId)); - QWriteLocker locker2(_sessionData->toResendMutex()); - auto &toResend = _sessionData->toResendMap(); - const auto reqIt = toResend.find(requestMsgId); - if (reqIt != toResend.cend()) { + const auto reqIt = _resendingIds.find(requestMsgId); + if (reqIt != _resendingIds.cend()) { if ((state & 0x07) != 0x04) { // was received DEBUG_LOG(("Message Info: state was received for msgId %1, state %2, already resending in container").arg(requestMsgId).arg((int32)state)); } else { @@ -2230,22 +2200,70 @@ void ConnectionPrivate::handleMsgsStates(const QVector<MTPlong> &ids, const QByt } } +void ConnectionPrivate::clearSpecialMsgId(mtpMsgId msgId) { + if (msgId == _pingMsgId) { + _pingMsgId = 0; + _pingId = 0; + } else if (msgId == _bindMsgId) { + _bindMsgId = 0; + } +} + void ConnectionPrivate::resend( mtpMsgId msgId, crl::time msCanWait, bool forceContainer) { - if (msgId != _pingMsgId) { - _sessionData->resend(msgId, msCanWait, forceContainer); + const auto guard = gsl::finally([&] { + clearSpecialMsgId(msgId); + if (msCanWait >= 0) { + _sessionData->queueSendAnything(msCanWait); + } + }); + + auto lock = QWriteLocker(_sessionData->haveSentMutex()); + auto &haveSent = _sessionData->haveSentMap(); + auto i = haveSent.find(msgId); + if (i == haveSent.end()) { + return; + } + auto request = i.value(); + haveSent.erase(i); + lock.unlock(); + + // For container just resend all messages we can. + if (request.isSentContainer()) { + DEBUG_LOG(("Message Info: resending container from haveSent, msgId %1").arg(msgId)); + const mtpMsgId *ids = (const mtpMsgId *)(request->constData() + 8); + for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) { + resend(ids[i], -1, true); + } + } else if (!request.isStateRequest()) { + request->msDate = forceContainer ? 0 : crl::now(); + _resendingIds.emplace(msgId, request->requestId); + { + QWriteLocker locker(_sessionData->toSendMutex()); + _sessionData->toSendMap().insert(request->requestId, request); + } } } -void ConnectionPrivate::resendMany( - QVector<mtpMsgId> msgIds, - crl::time msCanWait, - bool forceContainer) { - for (const auto msgId : msgIds) { - resend(msgId, msCanWait, forceContainer); +void ConnectionPrivate::resendAll() { + auto toResend = std::vector<mtpMsgId>(); + + auto lock = QReadLocker(_sessionData->haveSentMutex()); + const auto &haveSent = _sessionData->haveSentMap(); + toResend.reserve(haveSent.size()); + for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) { + if (!i.value().isSentContainer()) { + toResend.push_back(i.key()); + } } + lock.unlock(); + + for (const auto msgId : toResend) { + resend(msgId, -1, true); + } + _sessionData->queueSendAnything(); } void ConnectionPrivate::onConnected( @@ -2470,10 +2488,6 @@ DcType ConnectionPrivate::tryAcquireKeyCreation() { ).arg(result->persistentServerSalt)); _sessionSalt = result->temporaryServerSalt; - if (result->persistentKey) { - _sessionData->clearForNewKey(_instance); - } - auto key = result->persistentKey ? std::move(result->persistentKey) : _sessionData->getPersistentKey(); @@ -2512,7 +2526,7 @@ void ConnectionPrivate::authKeyChecked() { }); if (_sessionSalt && setState(ConnectedState)) { - _sessionData->resendAll(); + resendAll(); } // else receive salt in bad_server_salt first, then try to send all the requests _pingIdToSend = rand_value<uint64>(); // get server_salt @@ -2651,7 +2665,9 @@ bool ConnectionPrivate::sendSecureRequest( } mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const { - if (msgId == _pingMsgId) return mtpRequestId(0xFFFFFFFF); + if (msgId == _pingMsgId || msgId == _bindMsgId) { + return mtpRequestId(0xFFFFFFFF); + } { QReadLocker locker(_sessionData->haveSentMutex()); const auto &haveSent = _sessionData->haveSentMap(); @@ -2662,17 +2678,11 @@ mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const { : mtpRequestId(0xFFFFFFFF); } } - { - QReadLocker locker(_sessionData->toResendMutex()); - const auto &toResend = _sessionData->toResendMap(); - const auto i = toResend.constFind(msgId); - if (i != toResend.cend()) return i.value(); + if (const auto i = _resendingIds.find(msgId); i != end(_resendingIds)) { + return i->second; } - { - QReadLocker locker(_sessionData->wereAckedMutex()); - const auto &wereAcked = _sessionData->wereAckedMap(); - const auto i = wereAcked.constFind(msgId); - if (i != wereAcked.cend()) return i.value(); + if (const auto i = _ackedIds.find(msgId); i != end(_ackedIds)) { + return i->second; } return 0; } diff --git a/Telegram/SourceFiles/mtproto/connection.h b/Telegram/SourceFiles/mtproto/connection.h index b53a827ad..36c5706d3 100644 --- a/Telegram/SourceFiles/mtproto/connection.h +++ b/Telegram/SourceFiles/mtproto/connection.h @@ -154,6 +154,9 @@ private: mtpRequestId wasSent(mtpMsgId msgId) const; [[nodiscard]] HandleResult handleOneReceived(const mtpPrime *from, const mtpPrime *end, uint64 msgId, int32 serverTime, uint64 serverSalt, bool badTime); + [[nodiscard]] HandleResult handleBindResponse( + mtpMsgId requestMsgId, + const mtpBuffer &response); mtpBuffer ungzip(const mtpPrime *from, const mtpPrime *end) const; void handleMsgsStates(const QVector<MTPlong> &ids, const QByteArray &states, QVector<MTPlong> &acked); @@ -176,10 +179,8 @@ private: mtpMsgId msgId, crl::time msCanWait = 0, bool forceContainer = false); - void resendMany( - QVector<mtpMsgId> msgIds, - crl::time msCanWait = 0, - bool forceContainer = false); + void resendAll(); + void clearSpecialMsgId(mtpMsgId msgId); [[nodiscard]] DcType tryAcquireKeyCreation(); void resetSession(); @@ -249,8 +250,11 @@ private: QVector<MTPlong> _resendRequestData; base::flat_set<mtpMsgId> _stateRequestData; details::ReceivedIdsManager _receivedMessageIds; + base::flat_map<mtpMsgId, mtpRequestId> _resendingIds; + base::flat_map<mtpMsgId, mtpRequestId> _ackedIds; std::unique_ptr<details::BoundKeyCreator> _keyCreator; + mtpMsgId _bindMsgId = 0; }; diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.cpp b/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.cpp index ab8dd2488..ae68422b5 100644 --- a/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.cpp +++ b/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.cpp @@ -50,8 +50,8 @@ void BoundKeyCreator::restartBinder() { } } -bool BoundKeyCreator::bindReadyToRequest() const { - return _binder ? !_binder->requested() : false; +bool BoundKeyCreator::readyToBind() const { + return _binder.has_value(); } SecureRequest BoundKeyCreator::prepareBindRequest( @@ -63,11 +63,10 @@ SecureRequest BoundKeyCreator::prepareBindRequest( } DcKeyBindState BoundKeyCreator::handleBindResponse( - MTPlong requestMsgId, const mtpBuffer &response) { - return _binder - ? _binder->handleResponse(requestMsgId, response) - : DcKeyBindState::Unknown; + Expects(_binder.has_value()); + + return _binder->handleResponse(response); } AuthKeyPtr BoundKeyCreator::bindPersistentKey() const { diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.h b/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.h index 78d624b96..e8a7bf100 100644 --- a/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.h +++ b/Telegram/SourceFiles/mtproto/details/mtproto_bound_key_creator.h @@ -31,12 +31,11 @@ public: void bind(AuthKeyPtr &&persistentKey); void restartBinder(); - [[nodiscard]] bool bindReadyToRequest() const; + [[nodiscard]] bool readyToBind() const; [[nodiscard]] SecureRequest prepareBindRequest( const AuthKeyPtr &temporaryKey, uint64 sessionId); [[nodiscard]] DcKeyBindState handleBindResponse( - MTPlong requestMsgId, const mtpBuffer &response); [[nodiscard]] AuthKeyPtr bindPersistentKey() const; diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.cpp b/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.cpp index 44ca8a4c7..736a6f4fa 100644 --- a/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.cpp +++ b/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.cpp @@ -77,46 +77,34 @@ DcKeyBinder::DcKeyBinder(AuthKeyPtr &&persistentKey) Expects(_persistentKey != nullptr); } -bool DcKeyBinder::requested() const { - return _requestMsgId != 0; -} - SecureRequest DcKeyBinder::prepareRequest( const AuthKeyPtr &temporaryKey, uint64 sessionId) { - Expects(_requestMsgId == 0); Expects(temporaryKey != nullptr); Expects(temporaryKey->expiresAt() != 0); const auto nonce = openssl::RandomValue<uint64>(); - _requestMsgId = base::unixtime::mtproto_msg_id(); + const auto msgId = base::unixtime::mtproto_msg_id(); auto result = SecureRequest::Serialize(MTPauth_BindTempAuthKey( MTP_long(_persistentKey->keyId()), MTP_long(nonce), MTP_int(temporaryKey->expiresAt()), MTP_bytes(EncryptBindAuthKeyInner( _persistentKey, - _requestMsgId, + msgId, MTP_bind_auth_key_inner( MTP_long(nonce), MTP_long(temporaryKey->keyId()), MTP_long(_persistentKey->keyId()), MTP_long(sessionId), MTP_int(temporaryKey->expiresAt())))))); - result.setMsgId(_requestMsgId); + result.setMsgId(msgId); return result; } -DcKeyBindState DcKeyBinder::handleResponse( - MTPlong requestMsgId, - const mtpBuffer &response) { +DcKeyBindState DcKeyBinder::handleResponse(const mtpBuffer &response) { Expects(!response.isEmpty()); - if (!_requestMsgId || requestMsgId.v != _requestMsgId) { - return DcKeyBindState::Unknown; - } - _requestMsgId = 0; - auto from = response.begin(); const auto end = from + response.size(); auto error = MTPRpcError(); diff --git a/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.h b/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.h index db311b3b4..ba18176e4 100644 --- a/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.h +++ b/Telegram/SourceFiles/mtproto/details/mtproto_dc_key_binder.h @@ -17,7 +17,6 @@ class Instance; namespace MTP::details { enum class DcKeyBindState { - Unknown, Success, Failed, DefinitelyDestroyed, @@ -27,18 +26,14 @@ class DcKeyBinder final { public: explicit DcKeyBinder(AuthKeyPtr &&persistentKey); - [[nodiscard]] bool requested() const; [[nodiscard]] SecureRequest prepareRequest( const AuthKeyPtr &temporaryKey, uint64 sessionId); - [[nodiscard]] DcKeyBindState handleResponse( - MTPlong requestMsgId, - const mtpBuffer &response); + [[nodiscard]] DcKeyBindState handleResponse(const mtpBuffer &response); [[nodiscard]] AuthKeyPtr persistentKey() const; private: AuthKeyPtr _persistentKey; - mtpMsgId _requestMsgId = 0; }; diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp index 2dc955cb6..ba2ce7fd4 100644 --- a/Telegram/SourceFiles/mtproto/session.cpp +++ b/Telegram/SourceFiles/mtproto/session.cpp @@ -64,48 +64,6 @@ void SessionData::notifyConnectionInited(const ConnectionOptions &options) { } } -void SessionData::clearForNewKey(not_null<Instance*> instance) { - auto clearCallbacks = std::vector<RPCCallbackClear>(); - { - QReadLocker locker1(haveSentMutex()); - QReadLocker locker2(toResendMutex()); - QReadLocker locker3(haveReceivedMutex()); - QReadLocker locker4(wereAckedMutex()); - clearCallbacks.reserve(_haveSent.size() + _toResend.size() + _wereAcked.size()); - for (auto i = _haveSent.cbegin(), e = _haveSent.cend(); i != e; ++i) { - auto requestId = i.value()->requestId; - if (!_receivedResponses.contains(requestId)) { - clearCallbacks.push_back(requestId); - } - } - for (auto i = _toResend.cbegin(), e = _toResend.cend(); i != e; ++i) { - auto requestId = i.value(); - if (!_receivedResponses.contains(requestId)) { - clearCallbacks.push_back(requestId); - } - } - for (auto i = _wereAcked.cbegin(), e = _wereAcked.cend(); i != e; ++i) { - auto requestId = i.value(); - if (!_receivedResponses.contains(requestId)) { - clearCallbacks.push_back(requestId); - } - } - } - { - QWriteLocker locker(haveSentMutex()); - _haveSent.clear(); - } - { - QWriteLocker locker(toResendMutex()); - _toResend.clear(); - } - { - QWriteLocker locker(wereAckedMutex()); - _wereAcked.clear(); - } - instance->clearCallbacksDelayed(std::move(clearCallbacks)); -} - void SessionData::queueTryToReceive() { withSession([](not_null<Session*> session) { session->tryToReceive(); @@ -142,23 +100,6 @@ void SessionData::queueSendMsgsStateInfo(quint64 msgId, QByteArray data) { }); } -void SessionData::resend( - mtpMsgId msgId, - crl::time msCanWait, - bool forceContainer) { - QMutexLocker lock(&_ownerMutex); - if (_owner) { - _owner->resend(msgId, msCanWait, forceContainer); - } -} - -void SessionData::resendAll() { - QMutexLocker lock(&_ownerMutex); - if (_owner) { - _owner->resendAll(); - } -} - bool SessionData::connectionInited() const { QMutexLocker lock(&_ownerMutex); return _owner ? _owner->connectionInited() : false; @@ -455,72 +396,16 @@ QString Session::transport() const { return _connection ? _connection->transport() : QString(); } -void Session::resend( - mtpMsgId msgId, - crl::time msCanWait, - bool forceContainer) { - auto lock = QWriteLocker(_data->haveSentMutex()); - auto &haveSent = _data->haveSentMap(); - - auto i = haveSent.find(msgId); - if (i == haveSent.end()) { - return; - } - auto request = i.value(); - haveSent.erase(i); - lock.unlock(); - - // For container just resend all messages we can. - if (request.isSentContainer()) { - DEBUG_LOG(("Message Info: resending container from haveSent, msgId %1").arg(msgId)); - const mtpMsgId *ids = (const mtpMsgId *)(request->constData() + 8); - for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) { - resend(ids[i], 10, true); - } - } else if (!request.isStateRequest()) { - request->msDate = forceContainer ? 0 : crl::now(); - { - QWriteLocker locker(_data->toResendMutex()); - _data->toResendMap().insert(msgId, request->requestId); - } - sendPrepared(request, msCanWait, false); - } -} - -void Session::resendAll() { - QVector<mtpMsgId> toResend; - { - QReadLocker locker(_data->haveSentMutex()); - const auto &haveSent = _data->haveSentMap(); - toResend.reserve(haveSent.size()); - for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) { - if (i.value()->requestId) { - toResend.push_back(i.key()); - } - } - } - for (uint32 i = 0, l = toResend.size(); i < l; ++i) { - resend(toResend[i], -1, true); - } - InvokeQueued(this, [=] { - sendAnything(); - }); -} - void Session::sendPrepared( const SecureRequest &request, - crl::time msCanWait, - bool newRequest) { + crl::time msCanWait) { DEBUG_LOG(("MTP Info: adding request to toSendMap, msCanWait %1" ).arg(msCanWait)); { QWriteLocker locker(_data->toSendMutex()); _data->toSendMap().insert(request->requestId, request); - - if (newRequest) { - *(mtpMsgId*)(request->data() + 4) = 0; - *(request->data() + 6) = 0; - } + *(mtpMsgId*)(request->data() + 4) = 0; + *(request->data() + 6) = 0; } DEBUG_LOG(("MTP Info: added, requestId %1").arg(request->requestId)); diff --git a/Telegram/SourceFiles/mtproto/session.h b/Telegram/SourceFiles/mtproto/session.h index d7dd23b3d..896dafe4b 100644 --- a/Telegram/SourceFiles/mtproto/session.h +++ b/Telegram/SourceFiles/mtproto/session.h @@ -29,22 +29,6 @@ enum class CreatingKeyType; using PreRequestMap = QMap<mtpRequestId, SecureRequest>; using RequestMap = QMap<mtpMsgId, SecureRequest>; - -class RequestIdsMap : public QMap<mtpMsgId, mtpRequestId> { -public: - using ParentType = QMap<mtpMsgId, mtpRequestId>; - - mtpMsgId min() const { - return size() ? cbegin().key() : 0; - } - - mtpMsgId max() const { - ParentType::const_iterator e(cend()); - return size() ? (--e).key() : 0; - } - -}; - using SerializedMessage = mtpBuffer; inline bool ResponseNeedsAck(const SerializedMessage &response) { @@ -102,12 +86,6 @@ public: not_null<QReadWriteLock*> haveSentMutex() const { return &_haveSentLock; } - not_null<QReadWriteLock*> toResendMutex() const { - return &_toResendLock; - } - not_null<QReadWriteLock*> wereAckedMutex() const { - return &_wereAckedLock; - } not_null<QReadWriteLock*> haveReceivedMutex() const { return &_haveReceivedLock; } @@ -124,18 +102,6 @@ public: const RequestMap &haveSentMap() const { return _haveSent; } - RequestIdsMap &toResendMap() { // msgId -> requestId, on which toSend: requestId -> request for resended requests - return _toResend; - } - const RequestIdsMap &toResendMap() const { - return _toResend; - } - RequestIdsMap &wereAckedMap() { - return _wereAcked; - } - const RequestIdsMap &wereAckedMap() const { - return _wereAcked; - } QMap<mtpRequestId, SerializedMessage> &haveReceivedResponses() { return _receivedResponses; } @@ -154,8 +120,6 @@ public: return _owner; } - void clearForNewKey(not_null<Instance*> instance); - // Connection -> Session interface. void queueTryToReceive(); void queueNeedToResumeAndSend(); @@ -173,11 +137,6 @@ public: const AuthKeyPtr &persistentKeyUsedForBind); void releaseKeyCreationOnFail(); void destroyTemporaryKey(uint64 keyId); - void resend( - mtpMsgId msgId, - crl::time msCanWait, - bool forceContainer); - void resendAll(); void detach(); @@ -192,8 +151,6 @@ private: PreRequestMap _toSend; // map of request_id -> request, that is waiting to be sent RequestMap _haveSent; // map of msg_id -> request, that was sent, msDate = 0 for msgs_state_req (no resend / state req), msDate = 0, seqNo = 0 for containers - RequestIdsMap _toResend; // map of msg_id -> request_id, that request_id -> request lies in toSend and is waiting to be resent - RequestIdsMap _wereAcked; // map of msg_id -> request_id, this msg_ids already were acked or do not need ack QMap<mtpRequestId, SerializedMessage> _receivedResponses; // map of request_id -> response that should be processed in the main thread QList<SerializedMessage> _receivedUpdates; // list of updates that should be processed in the main thread @@ -202,8 +159,6 @@ private: mutable QReadWriteLock _optionsLock; mutable QReadWriteLock _toSendLock; mutable QReadWriteLock _haveSentLock; - mutable QReadWriteLock _toResendLock; - mutable QReadWriteLock _wereAckedLock; mutable QReadWriteLock _haveReceivedLock; }; @@ -234,18 +189,7 @@ public: [[nodiscard]] AuthKeyPtr getPersistentKey() const; [[nodiscard]] AuthKeyPtr getTemporaryKey(TemporaryKeyType type) const; [[nodiscard]] bool connectionInited() const; - void resend( - mtpMsgId msgId, - crl::time msCanWait = 0, - bool forceContainer = false); - void resendAll(); - - // Thread-safe. - // Nulls msgId and seqNo in request, if newRequest = true. - void sendPrepared( - const SecureRequest &request, - crl::time msCanWait = 0, - bool newRequest = true); + void sendPrepared(const SecureRequest &request, crl::time msCanWait = 0); // Connection thread. [[nodiscard]] CreatingKeyType acquireKeyCreation(TemporaryKeyType type);