From 9caac426ef8bc55814d3cdb98d8499779ecce92c Mon Sep 17 00:00:00 2001
From: John Preston <johnprestonmail@gmail.com>
Date: Mon, 2 Dec 2019 12:40:53 +0300
Subject: [PATCH] Correctly check container lifetime.

---
 Telegram/SourceFiles/mtproto/connection.cpp   | 72 +++++++++-------
 Telegram/SourceFiles/mtproto/mtp_instance.cpp | 85 +------------------
 Telegram/SourceFiles/mtproto/mtp_instance.h   |  3 -
 Telegram/SourceFiles/mtproto/session.cpp      |  6 +-
 4 files changed, 50 insertions(+), 116 deletions(-)

diff --git a/Telegram/SourceFiles/mtproto/connection.cpp b/Telegram/SourceFiles/mtproto/connection.cpp
index 05cc6c457..1e5f17c6a 100644
--- a/Telegram/SourceFiles/mtproto/connection.cpp
+++ b/Telegram/SourceFiles/mtproto/connection.cpp
@@ -218,42 +218,52 @@ int16 Connection::getProtocolDcId() const {
 void Connection::checkSentRequests() {
 	// Remove very old (10 minutes) containers and resend requests.
 	auto removingIds = std::vector<mtpMsgId>();
+	auto restarting = false;
 	auto requesting = false;
 	{
 		QReadLocker locker(_sessionData->haveSentMutex());
 		auto &haveSent = _sessionData->haveSentMap();
 		const auto haveSentCount = haveSent.size();
-		auto now = crl::now();
+		const auto now = crl::now();
+		const auto checkAfter = kCheckSentRequestTimeout;
 		for (const auto &[msgId, request] : haveSent) {
 			if (request.isStateRequest()) {
 				continue;
 			} else if (request.isSentContainer()) {
-				if (base::unixtime::now()
-					> int32(msgId >> 32) + kContainerLives) {
+				if (now > request->lastSentTime + kContainerLives) {
 					removingIds.push_back(msgId);
+					DEBUG_LOG(("MTP Info: Removing old container %1, "
+						"sent: %2, now: %3, current unixtime: %4"
+						).arg(msgId
+						).arg(request->lastSentTime
+						).arg(now
+						).arg(base::unixtime::now()));
 				}
-			} else if (request->lastSentTime + kCheckSentRequestTimeout
-				< now) {
+			} else if (request->lastSentTime + checkAfter < now) {
 				// Need to check state.
 				request->lastSentTime = now;
-				if (_stateRequestData.emplace(msgId).second) {
+				if (_bindMsgId) {
+					restarting = true;
+				} else if (_stateRequestData.emplace(msgId).second) {
 					requesting = true;
 				}
 			}
 		}
 	}
-	if (requesting) {
-		_sessionData->queueSendAnything(kSendStateRequestWaiting);
-	}
 	if (!removingIds.empty()) {
 		QWriteLocker locker(_sessionData->haveSentMutex());
 		auto &haveSent = _sessionData->haveSentMap();
 		for (const auto msgId : removingIds) {
-			if (const auto removed = haveSent.take(msgId)) {
-				Assert(!(*removed)->requestId);
-			}
+			haveSent.remove(msgId);
 		}
 	}
+	if (restarting) {
+		DEBUG_LOG(("MTP Info: "
+			"Request state while key is not bound, restarting."));
+		restart();
+	} else if (requesting) {
+		_sessionData->queueSendAnything(kSendStateRequestWaiting);
+	}
 }
 
 void Connection::destroyAllConnections() {
@@ -479,7 +489,12 @@ mtpMsgId Connection::placeToContainer(
 }
 
 void Connection::tryToSend() {
-	if (!_connection || !_keyId) {
+	DEBUG_LOG(("MTP Info: tryToSend for dc %1.").arg(_shiftedDcId));
+	if (!_connection) {
+		DEBUG_LOG(("MTP Info: not yet connected in dc %1.").arg(_shiftedDcId));
+		return;
+	} else if (!_keyId) {
+		DEBUG_LOG(("MTP Info: not yet with auth key in dc %1.").arg(_shiftedDcId));
 		return;
 	}
 
@@ -1051,6 +1066,7 @@ void Connection::sendPingByTimer() {
 }
 
 void Connection::sendPingForce() {
+	DEBUG_LOG(("MTP Info: send ping force for dcWithShift %1.").arg(_shiftedDcId));
 	if (!_pingId) {
 		_pingSendAt = 0;
 		DEBUG_LOG(("Will send ping!"));
@@ -1496,14 +1512,20 @@ Connection::HandleResult Connection::handleOneReceived(
 			const auto requestId = wasSent(resendId);
 			if (requestId) {
 				LOG(("Message Error: "
-					"bad message notification received, "
-					"msgId %1, error_code %2, fatal: clearing callbacks"
+					"fatal bad message notification received, "
+					"msgId %1, error_code %2, requestId: %3"
 					).arg(badMsgId
 					).arg(errorCode
-					));
-				_instance->clearCallbacksDelayed({ 1, RPCCallbackClear(
-					requestId,
-					-errorCode) });
+					).arg(requestId));
+				auto response = mtpBuffer();
+				MTPRpcError(MTP_rpc_error(
+					MTP_int(500),
+					MTP_string("PROTOCOL_ERROR")
+				)).write(response);
+
+				// Save rpc_error for processing in the main thread.
+				QWriteLocker locker(_sessionData->haveReceivedMutex());
+				_sessionData->haveReceivedResponses().emplace(requestId, response);
 			} else {
 				DEBUG_LOG(("Message Error: "
 					"such message was not sent recently %1").arg(badMsgId));
@@ -2006,18 +2028,10 @@ void Connection::requestsAcked(const QVector<MTPlong> &ids, bool byResponse) {
 		DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize));
 		clearedBecauseTooOld.reserve(ackedCount - kIdsBufferSize);
 		while (ackedCount-- > kIdsBufferSize) {
-			auto i = _ackedIds.begin();
-			clearedBecauseTooOld.push_back(RPCCallbackClear(
-				i->second,
-				RPCError::TimeoutError));
-			_ackedIds.erase(i);
+			_ackedIds.erase(_ackedIds.begin());
 		}
 	}
 
-	if (!clearedBecauseTooOld.empty()) {
-		_instance->clearCallbacksDelayed(std::move(clearedBecauseTooOld));
-	}
-
 	if (toAckMore.size()) {
 		requestsAcked(toAckMore);
 	}
@@ -2260,7 +2274,7 @@ void Connection::applyAuthKey(AuthKeyPtr &&encryptionKey) {
 			return;
 		}
 		setCurrentKeyId(0);
-		DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed, restarting..."
+		DEBUG_LOG(("MTP Info: auth_key id for dc %1 changed, restarting..."
 			).arg(_shiftedDcId));
 		if (_connection) {
 			restart();
diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.cpp b/Telegram/SourceFiles/mtproto/mtp_instance.cpp
index a24caabee..2af0788f0 100644
--- a/Telegram/SourceFiles/mtproto/mtp_instance.cpp
+++ b/Telegram/SourceFiles/mtproto/mtp_instance.cpp
@@ -122,7 +122,6 @@ public:
 		const SerializedRequest &request,
 		RPCResponseHandler &&callbacks);
 	SerializedRequest getRequest(mtpRequestId requestId);
-	void clearCallbacksDelayed(std::vector<RPCCallbackClear> &&ids);
 	void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end);
 	bool hasCallbacks(mtpRequestId requestId);
 	void globalCallback(const mtpPrime *from, const mtpPrime *end);
@@ -189,12 +188,6 @@ private:
 	std::optional<ShiftedDcId> changeRequestByDc(
 		mtpRequestId requestId, DcId newdc);
 
-	// RPCError::NoError means do not toggle onError callback.
-	void clearCallbacks(
-		mtpRequestId requestId,
-		int32 errorCode = RPCError::NoError);
-	void clearCallbacks(const std::vector<RPCCallbackClear> &ids);
-
 	void checkDelayedRequests();
 
 	const not_null<Instance*> _instance;
@@ -562,7 +555,9 @@ void Instance::Private::cancel(mtpRequestId requestId) {
 		const auto session = getSession(qAbs(*shiftedDcId));
 		session->cancel(requestId, msgId);
 	}
-	clearCallbacks(requestId);
+
+	QMutexLocker locker(&_parserMapLock);
+	_parserMap.erase(requestId);
 }
 
 // result < 0 means waiting for such count of ms.
@@ -993,76 +988,6 @@ SerializedRequest Instance::Private::getRequest(mtpRequestId requestId) {
 }
 
 
-void Instance::Private::clearCallbacks(mtpRequestId requestId, int32 errorCode) {
-	RPCResponseHandler h;
-	bool found = false;
-	{
-		QMutexLocker locker(&_parserMapLock);
-		auto it = _parserMap.find(requestId);
-		if (it != _parserMap.end()) {
-			h = it->second;
-			found = true;
-
-			_parserMap.erase(it);
-		}
-	}
-	if (errorCode && found) {
-		LOG(("API Error: callbacks cleared without handling! "
-			"Request: %1, error code: %2"
-			).arg(requestId
-			).arg(errorCode));
-		rpcErrorOccured(
-			requestId,
-			h,
-			RPCError::Local(
-				"CLEAR_CALLBACK",
-				QString("did not handle request %1, error code %2"
-				).arg(requestId
-				).arg(errorCode)));
-	}
-}
-
-void Instance::Private::clearCallbacksDelayed(
-		std::vector<RPCCallbackClear> &&ids) {
-	if (ids.empty()) {
-		return;
-	}
-
-	if (Logs::DebugEnabled()) {
-		auto idsString = QStringList();
-		idsString.reserve(ids.size());
-		for (auto &value : ids) {
-			idsString.push_back(QString::number(value.requestId));
-		}
-		DEBUG_LOG(("RPC Info: clear callbacks delayed, msgIds: %1"
-			).arg(idsString.join(", ")));
-	}
-
-	InvokeQueued(_instance, [=, list = std::move(ids)] {
-		clearCallbacks(list);
-	});
-}
-
-void Instance::Private::clearCallbacks(
-		const std::vector<RPCCallbackClear> &ids) {
-	Expects(!ids.empty());
-
-	for (const auto &clearRequest : ids) {
-		if (Logs::DebugEnabled()) {
-			QMutexLocker locker(&_parserMapLock);
-			const auto hasParsers = (_parserMap.find(clearRequest.requestId)
-				!= _parserMap.end());
-			DEBUG_LOG(("RPC Info: "
-				"clearing delayed callback %1, error code %2, parsers: %3"
-				).arg(clearRequest.requestId
-				).arg(clearRequest.errorCode
-				).arg(Logs::b(hasParsers)));
-		}
-		clearCallbacks(clearRequest.requestId, clearRequest.errorCode);
-		unregisterRequest(clearRequest.requestId);
-	}
-}
-
 void Instance::Private::execCallback(
 		mtpRequestId requestId,
 		const mtpPrime *from,
@@ -1859,10 +1784,6 @@ void Instance::onSessionReset(ShiftedDcId shiftedDcId) {
 	_private->onSessionReset(shiftedDcId);
 }
 
-void Instance::clearCallbacksDelayed(std::vector<RPCCallbackClear> &&ids) {
-	_private->clearCallbacksDelayed(std::move(ids));
-}
-
 void Instance::execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end) {
 	_private->execCallback(requestId, from, end);
 }
diff --git a/Telegram/SourceFiles/mtproto/mtp_instance.h b/Telegram/SourceFiles/mtproto/mtp_instance.h
index 2eb80fb8d..7f6d7f463 100644
--- a/Telegram/SourceFiles/mtproto/mtp_instance.h
+++ b/Telegram/SourceFiles/mtproto/mtp_instance.h
@@ -100,9 +100,6 @@ public:
 	void onStateChange(ShiftedDcId shiftedDcId, int32 state);
 	void onSessionReset(ShiftedDcId shiftedDcId);
 
-	// Thread-safe.
-	void clearCallbacksDelayed(std::vector<RPCCallbackClear> &&ids);
-
 	void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end);
 	bool hasCallbacks(mtpRequestId requestId);
 	void globalCallback(const mtpPrime *from, const mtpPrime *end);
diff --git a/Telegram/SourceFiles/mtproto/session.cpp b/Telegram/SourceFiles/mtproto/session.cpp
index 00294cd3d..b81859c27 100644
--- a/Telegram/SourceFiles/mtproto/session.cpp
+++ b/Telegram/SourceFiles/mtproto/session.cpp
@@ -175,10 +175,12 @@ void Session::watchDcKeyChanges() {
 	) | rpl::filter([=](DcId dcId) {
 		return (dcId == _shiftedDcId) || (dcId == BareDcId(_shiftedDcId));
 	}) | rpl::start_with_next([=] {
-		DEBUG_LOG(("AuthKey Info: Session::authKeyCreatedForDC slot, "
-			"emitting authKeyChanged(), dcWithShift %1").arg(_shiftedDcId));
+		DEBUG_LOG(("AuthKey Info: dcTemporaryKeyChanged in Session %1"
+			).arg(_shiftedDcId));
 		if (const auto connection = _connection) {
 			InvokeQueued(connection, [=] {
+				DEBUG_LOG(("AuthKey Info: calling Connection::updateAuthKey in Session %1"
+					).arg(_shiftedDcId));
 				connection->updateAuthKey();
 			});
 		}