diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp index 6cef04aed..e21f8df16 100644 --- a/Telegram/SourceFiles/mtproto/connection.cpp +++ b/Telegram/SourceFiles/mtproto/connection.cpp @@ -291,11 +291,15 @@ QString ConnectionPrivate::transport() const { bool ConnectionPrivate::setState(int32 state, int32 ifState) { if (ifState != Connection::UpdateAlways) { QReadLocker lock(&stateConnMutex); - if (_state != ifState) return false; + if (_state != ifState) { + return false; + } } QWriteLocker lock(&stateConnMutex); - if (_state == state) return false; + if (_state == state) { + return false; + } _state = state; if (state < 0) { _retryTimeout = -state; @@ -308,118 +312,14 @@ bool ConnectionPrivate::setState(int32 state, int32 ifState) { return true; } -void ConnectionPrivate::resetSession() { // recreate all msg_id and msg_seqno +void ConnectionPrivate::resetSession() { + MTP_LOG(_shiftedDcId, ("Resetting session!")); _needSessionReset = false; - MTP_LOG(_shiftedDcId, ("Resetting session!")); - - QWriteLocker locker1(_sessionData->haveSentMutex()); - QWriteLocker locker2(_sessionData->toResendMutex()); - QWriteLocker locker3(_sessionData->toSendMutex()); - QWriteLocker locker4(_sessionData->wereAckedMutex()); - auto &haveSent = _sessionData->haveSentMap(); - auto &toResend = _sessionData->toResendMap(); - auto &toSend = _sessionData->toSendMap(); - auto &wereAcked = _sessionData->wereAckedMap(); - - auto newId = base::unixtime::mtproto_msg_id(); - auto setSeqNumbers = RequestMap(); - auto replaces = QMap(); - for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) { - if (!i.value().isSentContainer()) { - if (!*(mtpMsgId*)(i.value()->constData() + 4)) continue; - - mtpMsgId id = i.key(); - if (id > newId) { - while (toResend.constFind(newId) != toResend.cend() - || wereAcked.constFind(newId) != wereAcked.cend() - || haveSent.constFind(newId) != haveSent.cend()) { - newId = base::unixtime::mtproto_msg_id(); - } - - MTP_LOG(_shiftedDcId, ("Replacing msgId %1 to %2!" - ).arg(id - ).arg(newId)); - replaces.insert(id, newId); - id = newId; - *(mtpMsgId*)(i.value()->data() + 4) = id; - } - setSeqNumbers.insert(id, i.value()); - } - } - // Collect all non-container requests. - for (auto i = toResend.cbegin(), e = toResend.cend(); i != e; ++i) { - const auto j = toSend.constFind(i.value()); - if (j == toSend.cend()) continue; - - if (!j.value().isSentContainer()) { - if (!*(mtpMsgId*)(j.value()->constData() + 4)) continue; - - mtpMsgId id = i.key(); - if (id > newId) { - while (toResend.constFind(newId) != toResend.cend() - || wereAcked.constFind(newId) != wereAcked.cend() - || haveSent.constFind(newId) != haveSent.cend()) { - newId = base::unixtime::mtproto_msg_id(); - } - - MTP_LOG(_shiftedDcId, ("Replacing msgId %1 to %2!" - ).arg(id - ).arg(newId)); - replaces.insert(id, newId); - id = newId; - *(mtpMsgId*)(j.value()->data() + 4) = id; - } - setSeqNumbers.insert(id, j.value()); - } - } - - const auto sessionId = rand_value(); - DEBUG_LOG(("MTP Info: creating new session after bad_msg_notification, setting random server_session %1").arg(sessionId)); - _sessionData->setSessionId(sessionId); - - for (auto i = setSeqNumbers.cbegin(), e = setSeqNumbers.cend(); i != e; ++i) { // generate new seq_numbers - bool wasNeedAck = (*(i.value()->data() + 6) & 1); - *(i.value()->data() + 6) = _sessionData->nextRequestSeqNumber(wasNeedAck); - } - if (!replaces.isEmpty()) { - for (auto i = replaces.cbegin(), e = replaces.cend(); i != e; ++i) { // replace msgIds keys in all data structs - const auto j = haveSent.find(i.key()); - if (j != haveSent.cend()) { - const auto req = j.value(); - haveSent.erase(j); - haveSent.insert(i.value(), req); - } - const auto k = toResend.find(i.key()); - if (k != toResend.cend()) { - const auto req = k.value(); - toResend.erase(k); - toResend.insert(i.value(), req); - } - const auto l = wereAcked.find(i.key()); - if (l != wereAcked.cend()) { - DEBUG_LOG(("MTP Info: Replaced %1 with %2 in wereAcked." - ).arg(i.key() - ).arg(i.value())); - - const auto req = l.value(); - wereAcked.erase(l); - wereAcked.insert(i.value(), req); - } - } - for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) { // replace msgIds in saved containers - if (i.value().isSentContainer()) { - mtpMsgId *ids = (mtpMsgId*)(i.value()->data() + 8); - for (uint32 j = 0, l = (i.value()->size() - 8) >> 1; j < l; ++j) { - const auto k = replaces.constFind(ids[j]); - if (k != replaces.cend()) { - ids[j] = k.value(); - } - } - } - } - } + DEBUG_LOG(("MTP Info: creating new session in resetSession.")); + _sessionData->changeSessionId(); + // #TODO move to sessionData, clear on changeSessionIdLocked. _ackRequestData.clear(); _resendRequestData.clear(); { @@ -432,93 +332,99 @@ void ConnectionPrivate::resetSession() { // recreate all msg_id and msg_seqno mtpMsgId ConnectionPrivate::prepareToSend( SecureRequest &request, - mtpMsgId currentLastId) { - if (request->size() < 9) { - return 0; - } + mtpMsgId currentLastId, + bool forceNewMsgId) { + Expects(request->size() > 8); + if (const auto msgId = request.getMsgId()) { // resending this request - QWriteLocker locker(_sessionData->toResendMutex()); + QWriteLocker lock(_sessionData->toResendMutex()); auto &toResend = _sessionData->toResendMap(); const auto i = toResend.find(msgId); if (i != toResend.cend()) { toResend.erase(i); } - return msgId; + lock.unlock(); + + return (forceNewMsgId || msgId > currentLastId) + ? replaceMsgId(request, currentLastId) + : msgId; } request.setMsgId(currentLastId); request.setSeqNo(_sessionData->nextRequestSeqNumber(request.needAck())); + if (request->requestId) { + MTP_LOG(_shiftedDcId, ("[r%1] msg_id 0 -> %2").arg(request->requestId).arg(currentLastId)); + } return currentLastId; } mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId) { - if (request->size() < 9) return 0; + Expects(request->size() > 8); const auto oldMsgId = request.getMsgId(); - if (oldMsgId != newId) { - if (oldMsgId) { - QWriteLocker locker(_sessionData->toResendMutex()); - // haveSentMutex() and wereAckedMutex() were locked in tryToSend() - - auto &toResend = _sessionData->toResendMap(); - auto &wereAcked = _sessionData->wereAckedMap(); - auto &haveSent = _sessionData->haveSentMap(); - - while (true) { - if (toResend.constFind(newId) == toResend.cend() && wereAcked.constFind(newId) == wereAcked.cend() && haveSent.constFind(newId) == haveSent.cend()) { - break; - } - const auto m = base::unixtime::mtproto_msg_id(); - if (m <= newId) break; // wtf - - newId = m; - } - - const auto i = toResend.find(oldMsgId); - if (i != toResend.cend()) { - const auto req = i.value(); - toResend.erase(i); - toResend.insert(newId, req); - } - - const auto j = wereAcked.find(oldMsgId); - if (j != wereAcked.cend()) { - const auto req = j.value(); - wereAcked.erase(j); - wereAcked.insert(newId, req); - } - - const auto k = haveSent.find(oldMsgId); - if (k != haveSent.cend()) { - const auto req = k.value(); - haveSent.erase(k); - haveSent.insert(newId, req); - } - - for (auto l = haveSent.begin(); l != haveSent.cend(); ++l) { - const auto req = l.value(); - if (req.isSentContainer()) { - const auto ids = (mtpMsgId *)(req->data() + 8); - for (uint32 i = 0, l = (req->size() - 8) >> 1; i < l; ++i) { - if (ids[i] == oldMsgId) { - ids[i] = newId; - } - } - } - } - } else { - request.setSeqNo(_sessionData->nextRequestSeqNumber(request.needAck())); - } - request.setMsgId(newId); + if (oldMsgId == newId) { + return newId; } + QWriteLocker locker(_sessionData->toResendMutex()); + // haveSentMutex() and wereAckedMutex() were locked in tryToSend() + + auto &toResend = _sessionData->toResendMap(); + auto &wereAcked = _sessionData->wereAckedMap(); + auto &haveSent = _sessionData->haveSentMap(); + + while (toResend.constFind(newId) != toResend.cend() + || wereAcked.constFind(newId) != wereAcked.cend() + || haveSent.constFind(newId) != haveSent.cend()) { + newId = base::unixtime::mtproto_msg_id(); + } + + MTP_LOG(_shiftedDcId, ("[r%1] msg_id %2 -> %3").arg(request->requestId).arg(oldMsgId).arg(newId)); + + const auto i = toResend.find(oldMsgId); + if (i != toResend.cend()) { + const auto req = i.value(); + toResend.erase(i); + toResend.insert(newId, req); + } + + const auto j = wereAcked.find(oldMsgId); + if (j != wereAcked.cend()) { + const auto req = j.value(); + wereAcked.erase(j); + wereAcked.insert(newId, req); + } + + const auto k = haveSent.find(oldMsgId); + if (k != haveSent.cend()) { + const auto req = k.value(); + haveSent.erase(k); + haveSent.insert(newId, req); + } + + for (auto l = haveSent.begin(); l != haveSent.cend(); ++l) { + const auto req = l.value(); + if (req.isSentContainer()) { + const auto ids = (mtpMsgId *)(req->data() + 8); + for (uint32 i = 0, l = (req->size() - 8) >> 1; i < l; ++i) { + if (ids[i] == oldMsgId) { + ids[i] = newId; + } + } + } + } + + request.setMsgId(newId); + request.setSeqNo(_sessionData->nextRequestSeqNumber(request.needAck())); return newId; } -mtpMsgId ConnectionPrivate::placeToContainer(SecureRequest &toSendRequest, mtpMsgId &bigMsgId, mtpMsgId *&haveSentArr, SecureRequest &req) { - auto msgId = prepareToSend(req, bigMsgId); - if (msgId > bigMsgId) { - msgId = replaceMsgId(req, bigMsgId); - } +mtpMsgId ConnectionPrivate::placeToContainer( + SecureRequest &toSendRequest, + mtpMsgId &bigMsgId, + bool forceNewMsgId, + mtpMsgId *&haveSentArr, + SecureRequest &req) { + const auto msgId = prepareToSend(req, bigMsgId, forceNewMsgId); if (msgId >= bigMsgId) { bigMsgId = base::unixtime::mtproto_msg_id(); } @@ -551,7 +457,12 @@ void ConnectionPrivate::tryToSend() { && _pingSendAt <= crl::now()) { _pingIdToSend = openssl::RandomValue(); } + const auto forceNewMsgId = sendAll + && _sessionData->markSessionAsStarted(); + if (forceNewMsgId) { + int a = 0; + } auto pingRequest = SecureRequest(); auto ackRequest = SecureRequest(); auto resendRequest = SecureRequest(); @@ -744,7 +655,8 @@ void ConnectionPrivate::tryToSend() { const auto msgId = prepareToSend( toSendRequest, - base::unixtime::mtproto_msg_id()); + base::unixtime::mtproto_msg_id(), + forceNewMsgId); if (pingRequest) { _pingMsgId = msgId; needAnyResponse = true; @@ -839,17 +751,22 @@ void ConnectionPrivate::tryToSend() { auto haveSentArr = (mtpMsgId*)(haveSentIdsWrap->data() + 8); if (pingRequest) { - _pingMsgId = placeToContainer(toSendRequest, bigMsgId, haveSentArr, pingRequest); + _pingMsgId = placeToContainer( + toSendRequest, + bigMsgId, + forceNewMsgId, + haveSentArr, + pingRequest); needAnyResponse = true; } else if (resendRequest || stateRequest || bindDcKeyRequest) { needAnyResponse = true; } for (auto i = toSend.begin(), e = toSend.end(); i != e; ++i) { auto &req = i.value(); - auto msgId = prepareToSend(req, bigMsgId); - if (msgId > bigMsgId) { - msgId = replaceMsgId(req, bigMsgId); - } + const auto msgId = prepareToSend( + req, + bigMsgId, + forceNewMsgId); if (msgId >= bigMsgId) { bigMsgId = base::unixtime::mtproto_msg_id(); } @@ -889,21 +806,24 @@ void ConnectionPrivate::tryToSend() { } } if (stateRequest) { - mtpMsgId msgId = placeToContainer(toSendRequest, bigMsgId, haveSentArr, stateRequest); + mtpMsgId msgId = placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, stateRequest); stateRequest->msDate = 0; // 0 for state request, do not request state of it Assert(!haveSent.contains(msgId)); haveSent.insert(msgId, stateRequest); } - if (resendRequest) placeToContainer(toSendRequest, bigMsgId, haveSentArr, resendRequest); - if (ackRequest) placeToContainer(toSendRequest, bigMsgId, haveSentArr, ackRequest); - if (httpWaitRequest) placeToContainer(toSendRequest, bigMsgId, haveSentArr, httpWaitRequest); - if (bindDcKeyRequest) placeToContainer(toSendRequest, bigMsgId, haveSentArr, bindDcKeyRequest); + if (resendRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, resendRequest); + if (ackRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, ackRequest); + if (httpWaitRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, httpWaitRequest); + if (bindDcKeyRequest) placeToContainer(toSendRequest, bigMsgId, forceNewMsgId, haveSentArr, bindDcKeyRequest); - mtpMsgId contMsgId = prepareToSend(toSendRequest, bigMsgId); - *(mtpMsgId*)(haveSentIdsWrap->data() + 4) = contMsgId; + const auto containerMsgId = prepareToSend( + toSendRequest, + bigMsgId, + forceNewMsgId); + *(mtpMsgId*)(haveSentIdsWrap->data() + 4) = containerMsgId; (*haveSentIdsWrap)[6] = 0; // for container, msDate = 0, seqNo = 0 - Assert(!haveSent.contains(contMsgId)); - haveSent.insert(contMsgId, haveSentIdsWrap); + Assert(!haveSent.contains(containerMsgId)); + haveSent.insert(containerMsgId, haveSentIdsWrap); toSend.clear(); } } @@ -1050,7 +970,6 @@ void ConnectionPrivate::restart() { if (_needSessionReset) { resetSession(); } - _restarted = true; if (_retryTimer.isActive()) { return; } @@ -1144,7 +1063,6 @@ void ConnectionPrivate::waitReceivedFailed() { _waitForReceived *= 2; } doDisconnect(); - _restarted = true; if (_retryTimer.isActive()) { return; } @@ -1164,7 +1082,6 @@ void ConnectionPrivate::waitConnectedFailed() { } connectingTimedOut(); - _restarted = true; DEBUG_LOG(("MTP Info: immediate restart!")); InvokeQueued(this, [=] { connectToServer(); }); @@ -1183,9 +1100,7 @@ void ConnectionPrivate::connectingTimedOut() { void ConnectionPrivate::doDisconnect() { destroyAllConnections(); - setState(DisconnectedState); - _restarted = false; } void ConnectionPrivate::finishAndDestroy() { @@ -1212,6 +1127,8 @@ void ConnectionPrivate::requestCDNConfig() { } void ConnectionPrivate::handleReceived() { + Expects(_temporaryKey != nullptr); + onReceivedSome(); while (!_connection->received().empty()) { @@ -1345,9 +1262,8 @@ void ConnectionPrivate::handleReceived() { DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2, updating...").arg(serverSalt).arg(mySalt)); _sessionData->setSalt(serverSalt); - if (setState(ConnectedState, ConnectingState) && _restarted) { - _sessionData->queueResendAll(); - _restarted = false; + if (setState(ConnectedState, ConnectingState)) { + _sessionData->resendAll(); } } else { DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(mySalt)); @@ -1393,9 +1309,10 @@ void ConnectionPrivate::handleReceived() { } if (res != HandleResult::Success && res != HandleResult::Ignored) { - _needSessionReset = (res == HandleResult::ResetSession); if (res == HandleResult::DestroyTemporaryKey) { destroyTemporaryKey(); + } else if (res == HandleResult::ResetSession) { + _needSessionReset = true; } return restart(); } @@ -1633,11 +1550,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr _sessionData->setSalt(serverSalt); base::unixtime::update(serverTime); - if (setState(ConnectedState, ConnectingState)) { // maybe only connected - if (_restarted) { - _sessionData->queueResendAll(); - _restarted = false; - } + if (setState(ConnectedState, ConnectingState)) { + _sessionData->resendAll(); } badTime = false; @@ -1863,7 +1777,6 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr return HandleResult::Ignored; } } - requestsAcked(ids, true); if (typeId == mtpc_gzip_packed) { DEBUG_LOG(("RPC Info: gzip container")); @@ -1880,12 +1793,13 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr if (DcKeyBinder::IsDestroyedTemporaryKeyError(response)) { return HandleResult::DestroyTemporaryKey; } - } else { // An error could be some RPC_CALL_FAIL or other error inside // the initConnection, so we're not sure yet that it was inited. // Wait till a good response is received. + } else { _sessionData->notifyConnectionInited(*_connectionOptions); } + requestsAcked(ids, true); if (_keyBinder) { const auto result = _keyBinder->handleResponse( @@ -1945,11 +1859,16 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr const auto &haveSent = _sessionData->haveSentMap(); toResend.reserve(haveSent.size()); for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) { - if (i.key() >= firstMsgId) break; - if (i.value()->requestId) toResend.push_back(i.key()); + if (i.key() >= firstMsgId) { + break; + } else if (i.value()->requestId) { + toResend.push_back(i.key()); + } } } - resendMany(toResend, 10, true); + for (const auto msgId : toResend) { + _sessionData->resend(msgId, 10, true); + } mtpBuffer update(from - start); if (from > start) memcpy(update.data(), start, (from - start) * sizeof(mtpPrime)); @@ -2237,26 +2156,18 @@ void ConnectionPrivate::resend( mtpMsgId msgId, crl::time msCanWait, bool forceContainer) { - if (msgId == _pingMsgId) { - return; + if (msgId != _pingMsgId) { + _sessionData->resend(msgId, msCanWait, forceContainer); } - _sessionData->queueResend(msgId, msCanWait, forceContainer); } void ConnectionPrivate::resendMany( QVector msgIds, crl::time msCanWait, bool forceContainer) { - for (int32 i = 0, l = msgIds.size(); i < l; ++i) { - if (msgIds.at(i) == _pingMsgId) { - msgIds.remove(i); - --l; - } + for (const auto msgId : msgIds) { + resend(msgId, msCanWait, forceContainer); } - _sessionData->queueResendMany( - std::move(msgIds), - msCanWait, - forceContainer); } void ConnectionPrivate::onConnected( @@ -2340,10 +2251,13 @@ void ConnectionPrivate::removeTestConnection( } void ConnectionPrivate::checkAuthKey() { - if (!_keyId) { - updateAuthKey(); - } else { + Expects(_keyCreator == nullptr); + Expects(_keyBinder == nullptr || _keyId != 0); + + if (_keyId) { authKeyChecked(); + } else { + applyAuthKey(_sessionData->getTemporaryKey()); } } @@ -2359,23 +2273,40 @@ void ConnectionPrivate::updateAuthKey() { void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&temporaryKey) { _temporaryKey = std::move(temporaryKey); const auto newKeyId = _temporaryKey ? _temporaryKey->keyId() : 0; - if (newKeyId) { + if (_keyId) { if (_keyId == newKeyId) { return; } - _sessionData->setCurrentKeyId(newKeyId); + _keyId = 0; + if (_sessionData->setCurrentKeyId(_keyId)) { + _ackRequestData.clear(); // #TODO move to sessionData. + _resendRequestData.clear(); + { + QWriteLocker locker5(_sessionData->stateRequestMutex()); + _sessionData->stateRequestMap().clear(); + } + } + DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed, restarting..." + ).arg(_shiftedDcId)); + if (_connection) { + restart(); + } + return; } - _keyId = newKeyId; if (!_connection) { return; } - if (const auto already = _connection->sentEncryptedWithKeyId()) { - Assert(already != newKeyId); - DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed").arg(_shiftedDcId)); - - restart(); - return; + if (newKeyId && _sessionData->setCurrentKeyId(newKeyId)) { + _ackRequestData.clear(); // #TODO move to sessionData. + _resendRequestData.clear(); + { + QWriteLocker locker5(_sessionData->stateRequestMutex()); + _sessionData->stateRequestMap().clear(); + } } + _keyId = newKeyId; + Assert(!_connection->sentEncryptedWithKeyId()); + DEBUG_LOG(("AuthKey Info: Connection update key from Session, dc %1 result: %2").arg(_shiftedDcId).arg(Logs::mb(&_keyId, sizeof(_keyId)).str())); if (_keyId) { return authKeyChecked(); @@ -2470,12 +2401,8 @@ void ConnectionPrivate::authKeyChecked() { handleReceived(); }); - if (_sessionData->getSalt()) { - setState(ConnectedState); - if (_restarted) { - _sessionData->queueResendAll(); - _restarted = false; - } + if (_sessionData->getSalt() && setState(ConnectedState)) { + _sessionData->resendAll(); } // else receive salt in bad_server_salt first, then try to send all the requests _pingIdToSend = rand_value(); // get server_salt @@ -2525,7 +2452,6 @@ void ConnectionPrivate::destroyTemporaryKey() { if (_temporaryKey) { _sessionData->destroyTemporaryKey(_temporaryKey->keyId()); } - _needSessionReset = true; applyAuthKey(nullptr); } diff --git a/Telegram/SourceFiles/mtproto/connection.h b/Telegram/SourceFiles/mtproto/connection.h index 1aa3ae376..d786f80ea 100644 --- a/Telegram/SourceFiles/mtproto/connection.h +++ b/Telegram/SourceFiles/mtproto/connection.h @@ -139,9 +139,13 @@ private: mtpMsgId placeToContainer( SecureRequest &toSendRequest, mtpMsgId &bigMsgId, + bool forceNewMsgId, mtpMsgId *&haveSentArr, SecureRequest &req); - mtpMsgId prepareToSend(SecureRequest &request, mtpMsgId currentLastId); + mtpMsgId prepareToSend( + SecureRequest &request, + mtpMsgId currentLastId, + bool forceNewMsgId); mtpMsgId replaceMsgId(SecureRequest &request, mtpMsgId newId); bool sendSecureRequest(SecureRequest &&request, bool needAnyResponse); @@ -221,7 +225,6 @@ private: mtpMsgId _pingMsgId = 0; base::Timer _pingSender; - bool _restarted = false; bool _finished = false; AuthKeyPtr _temporaryKey; diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.cpp b/Telegram/SourceFiles/mtproto/mtp_instance.cpp index 0f73de8da..f4c0a22e9 100644 --- a/Telegram/SourceFiles/mtproto/mtp_instance.cpp +++ b/Telegram/SourceFiles/mtproto/mtp_instance.cpp @@ -1156,7 +1156,9 @@ void Instance::Private::onSessionReset(ShiftedDcId dcWithShift) { bool Instance::Private::rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err) { // return true if need to clean request data if (isDefaultHandledError(err)) { - if (onFail && (*onFail)(requestId, err)) return true; + if (onFail && (*onFail)(requestId, err)) { + return true; + } } if (onErrorDefault(requestId, err)) { diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp index 38f3586c8..0504cb045 100644 --- a/Telegram/SourceFiles/mtproto/session.cpp +++ b/Telegram/SourceFiles/mtproto/session.cpp @@ -26,10 +26,6 @@ constexpr auto kCheckResendTimeout = crl::time(10000); // when resending request or checking its state. constexpr auto kCheckResendWaiting = crl::time(1000); -// How much ints should message contain for us not to resend, -// but instead to check its state. -constexpr auto kResendThreshold = 1; - // Container lives 10 minutes in haveSent map. constexpr auto kContainerLives = 600; @@ -76,15 +72,51 @@ void SessionData::withSession(Callback &&callback) { } } -void SessionData::setCurrentKeyId(uint64 keyId) { +bool SessionData::setCurrentKeyId(uint64 keyId) { QWriteLocker locker(&_lock); if (_keyId == keyId) { - return; + return false; } _keyId = keyId; - _sessionId = openssl::RandomValue(); + + DEBUG_LOG(("MTP Info: auth key set in SessionData, id %1").arg(keyId)); + + changeSessionIdLocked(); + return true; +} + +void SessionData::changeSessionId() { + QWriteLocker locker(&_lock); + changeSessionIdLocked(); +} + +void SessionData::changeSessionIdLocked() { + auto sessionId = _sessionId; + do { + sessionId = openssl::RandomValue(); + } while (_sessionId == sessionId); + + DEBUG_LOG(("MTP Info: setting server_session: %1").arg(sessionId)); + + _sessionId = sessionId; _messagesSent = 0; - DEBUG_LOG(("MTP Info: new auth key set in SessionData, id %1, setting random server_session %2").arg(_keyId).arg(_sessionId)); + _sessionMarkedAsStarted = false; +} + +uint32 SessionData::nextRequestSeqNumber(bool needAck) { + QWriteLocker locker(&_lock); + auto result = _messagesSent; + _messagesSent += (needAck ? 1 : 0); + return result * 2 + (needAck ? 1 : 0); +} + +bool SessionData::markSessionAsStarted() { + QWriteLocker locker(&_lock); + if (_sessionMarkedAsStarted) { + return false; + } + _sessionMarkedAsStarted = true; + return true; } void SessionData::setKeyForCheck(const AuthKeyPtr &key) { @@ -166,12 +198,6 @@ void SessionData::queueConnectionStateChange(int newState) { }); } -void SessionData::queueResendAll() { - withSession([](not_null session) { - session->resendAll(); - }); -} - void SessionData::queueResetDone() { withSession([](not_null session) { session->resetDone(); @@ -190,24 +216,21 @@ void SessionData::queueSendMsgsStateInfo(quint64 msgId, QByteArray data) { }); } -void SessionData::queueResend( +void SessionData::resend( mtpMsgId msgId, crl::time msCanWait, bool forceContainer) { - withSession([=](not_null session) { - session->resend(msgId, msCanWait, forceContainer); - }); + QMutexLocker lock(&_ownerMutex); + if (_owner) { + _owner->resend(msgId, msCanWait, forceContainer); + } } -void SessionData::queueResendMany( - QVector msgIds, - crl::time msCanWait, - bool forceContainer) { - withSession([=](not_null session) { - for (const auto msgId : msgIds) { - session->resend(msgId, msCanWait, forceContainer); - } - }); +void SessionData::resendAll() { + QMutexLocker lock(&_ownerMutex); + if (_owner) { + _owner->resendAll(); + } } bool SessionData::connectionInited() const { @@ -437,7 +460,6 @@ bool Session::sharedDc() const { } void Session::checkRequestsByTimer() { - QVector resendingIds; QVector removingIds; // remove very old (10 minutes) containers and resend requests QVector stateRequestIds; @@ -450,14 +472,9 @@ void Session::checkRequestsByTimer() { auto &req = i.value(); if (req->msDate > 0) { if (req->msDate + kCheckResendTimeout < ms) { // need to resend or check state - if (req.messageSize() < kResendThreshold) { // resend - resendingIds.reserve(haveSentCount); - resendingIds.push_back(i.key()); - } else { - req->msDate = ms; - stateRequestIds.reserve(haveSentCount); - stateRequestIds.push_back(i.key()); - } + req->msDate = ms; + stateRequestIds.reserve(haveSentCount); + stateRequestIds.push_back(i.key()); } } else if (base::unixtime::now() > int32(i.key() >> 32) + kContainerLives) { @@ -477,12 +494,6 @@ void Session::checkRequestsByTimer() { } sendAnything(kCheckResendWaiting); } - if (!resendingIds.isEmpty()) { - for (uint32 i = 0, l = resendingIds.size(); i < l; ++i) { - DEBUG_LOG(("MTP Info: resending request %1").arg(resendingIds[i])); - resend(resendingIds[i], kCheckResendWaiting); - } - } if (!removingIds.isEmpty()) { auto clearCallbacks = std::vector(); { @@ -586,40 +597,35 @@ QString Session::transport() const { return _connection ? _connection->transport() : QString(); } -mtpRequestId Session::resend( +void Session::resend( mtpMsgId msgId, crl::time msCanWait, bool forceContainer) { - SecureRequest request; - { - QWriteLocker locker(_data->haveSentMutex()); - auto &haveSent = _data->haveSentMap(); + auto lock = QWriteLocker(_data->haveSentMutex()); + auto &haveSent = _data->haveSentMap(); - auto i = haveSent.find(msgId); - if (i == haveSent.end()) { - return 0; - } - - request = i.value(); - haveSent.erase(i); + auto i = haveSent.find(msgId); + if (i == haveSent.end()) { + return; } - if (request.isSentContainer()) { // for container just resend all messages we can + auto request = i.value(); + haveSent.erase(i); + lock.unlock(); + + // For container just resend all messages we can. + if (request.isSentContainer()) { DEBUG_LOG(("Message Info: resending container from haveSent, msgId %1").arg(msgId)); const mtpMsgId *ids = (const mtpMsgId *)(request->constData() + 8); for (uint32 i = 0, l = (request->size() - 8) >> 1; i < l; ++i) { resend(ids[i], 10, true); } - return 0xFFFFFFFF; } else if (!request.isStateRequest()) { request->msDate = forceContainer ? 0 : crl::now(); - sendPrepared(request, msCanWait, false); { QWriteLocker locker(_data->toResendMutex()); _data->toResendMap().insert(msgId, request->requestId); } - return request->requestId; - } else { - return 0; + sendPrepared(request, msCanWait, false); } } @@ -636,8 +642,11 @@ void Session::resendAll() { } } for (uint32 i = 0, l = toResend.size(); i < l; ++i) { - resend(toResend[i], 10, true); + resend(toResend[i], -1, true); } + InvokeQueued(this, [=] { + sendAnything(); + }); } void Session::sendPrepared( @@ -657,8 +666,11 @@ void Session::sendPrepared( } DEBUG_LOG(("MTP Info: added, requestId %1").arg(request->requestId)); - - sendAnything(msCanWait); + if (msCanWait >= 0) { + InvokeQueued(this, [=] { + sendAnything(msCanWait); + }); + } } bool Session::acquireKeyCreation() { diff --git a/Telegram/SourceFiles/mtproto/session.h b/Telegram/SourceFiles/mtproto/session.h index 7432bec33..b3034dcc8 100644 --- a/Telegram/SourceFiles/mtproto/session.h +++ b/Telegram/SourceFiles/mtproto/session.h @@ -140,16 +140,8 @@ public: SessionData(not_null creator) : _owner(creator) { } - void setCurrentKeyId(uint64 keyId); - void setSessionId(uint64 sessionId) { - DEBUG_LOG(("MTP Info: setting server_session: %1").arg(sessionId)); - - QWriteLocker locker(&_lock); - if (_sessionId != sessionId) { - _sessionId = sessionId; - _messagesSent = 0; - } - } + bool setCurrentKeyId(uint64 keyId); + void changeSessionId(); [[nodiscard]] uint64 getSessionId() const { QReadLocker locker(&_lock); return _sessionId; @@ -254,12 +246,8 @@ public: return _owner; } - uint32 nextRequestSeqNumber(bool needAck = true) { - QWriteLocker locker(&_lock); - auto result = _messagesSent; - _messagesSent += (needAck ? 1 : 0); - return result * 2 + (needAck ? 1 : 0); - } + [[nodiscard]] bool markSessionAsStarted(); + [[nodiscard]] uint32 nextRequestSeqNumber(bool needAck); void clearForNewKey(not_null instance); @@ -267,18 +255,9 @@ public: void queueTryToReceive(); void queueNeedToResumeAndSend(); void queueConnectionStateChange(int newState); - void queueResendAll(); void queueResetDone(); void queueSendAnything(crl::time msCanWait = 0); void queueSendMsgsStateInfo(quint64 msgId, QByteArray data); - void queueResend( - mtpMsgId msgId, - crl::time msCanWait, - bool forceContainer); - void queueResendMany( - QVector msgIds, - crl::time msCanWait, - bool forceContainer); [[nodiscard]] bool connectionInited() const; [[nodiscard]] AuthKeyPtr getPersistentKey() const; @@ -289,10 +268,17 @@ public: const AuthKeyPtr &persistentKey); void releaseKeyCreationOnFail(); void destroyTemporaryKey(uint64 keyId); + void resend( + mtpMsgId msgId, + crl::time msCanWait, + bool forceContainer); + void resendAll(); void detach(); private: + void changeSessionIdLocked(); + template void withSession(Callback &&callback); @@ -300,6 +286,7 @@ private: uint64 _sessionId = 0; uint64 _salt = 0; uint32 _messagesSent = 0; + bool _sessionMarkedAsStarted = false; Session *_owner = nullptr; mutable QMutex _ownerMutex; @@ -355,6 +342,18 @@ public: [[nodiscard]] AuthKeyPtr getPersistentKey() const; [[nodiscard]] AuthKeyPtr getTemporaryKey() const; [[nodiscard]] bool connectionInited() const; + void resend( + mtpMsgId msgId, + crl::time msCanWait = 0, + bool forceContainer = false); + void resendAll(); + + // Thread-safe. + // Nulls msgId and seqNo in request, if newRequest = true. + void sendPrepared( + const SecureRequest &request, + crl::time msCanWait = 0, + bool newRequest = true); // Connection thread. [[nodiscard]] bool acquireKeyCreation(); @@ -374,23 +373,12 @@ public: void sendDcKeyCheck(const AuthKeyPtr &key); - // Nulls msgId and seqNo in request, if newRequest = true. - void sendPrepared( - const SecureRequest &request, - crl::time msCanWait = 0, - bool newRequest = true); - void tryToReceive(); void needToResumeAndSend(); void connectionStateChange(int newState); - void resendAll(); // After connection restart. void resetDone(); void sendAnything(crl::time msCanWait = 0); void sendMsgsStateInfo(quint64 msgId, QByteArray data); - mtpRequestId resend( - mtpMsgId msgId, - crl::time msCanWait = 0, - bool forceContainer = false); signals: void authKeyChanged();