mirror of https://github.com/procxx/kepka.git
Correctly check container lifetime.
This commit is contained in:
parent
cbebcb1bc9
commit
9caac426ef
|
@ -218,42 +218,52 @@ int16 Connection::getProtocolDcId() const {
|
||||||
void Connection::checkSentRequests() {
|
void Connection::checkSentRequests() {
|
||||||
// Remove very old (10 minutes) containers and resend requests.
|
// Remove very old (10 minutes) containers and resend requests.
|
||||||
auto removingIds = std::vector<mtpMsgId>();
|
auto removingIds = std::vector<mtpMsgId>();
|
||||||
|
auto restarting = false;
|
||||||
auto requesting = false;
|
auto requesting = false;
|
||||||
{
|
{
|
||||||
QReadLocker locker(_sessionData->haveSentMutex());
|
QReadLocker locker(_sessionData->haveSentMutex());
|
||||||
auto &haveSent = _sessionData->haveSentMap();
|
auto &haveSent = _sessionData->haveSentMap();
|
||||||
const auto haveSentCount = haveSent.size();
|
const auto haveSentCount = haveSent.size();
|
||||||
auto now = crl::now();
|
const auto now = crl::now();
|
||||||
|
const auto checkAfter = kCheckSentRequestTimeout;
|
||||||
for (const auto &[msgId, request] : haveSent) {
|
for (const auto &[msgId, request] : haveSent) {
|
||||||
if (request.isStateRequest()) {
|
if (request.isStateRequest()) {
|
||||||
continue;
|
continue;
|
||||||
} else if (request.isSentContainer()) {
|
} else if (request.isSentContainer()) {
|
||||||
if (base::unixtime::now()
|
if (now > request->lastSentTime + kContainerLives) {
|
||||||
> int32(msgId >> 32) + kContainerLives) {
|
|
||||||
removingIds.push_back(msgId);
|
removingIds.push_back(msgId);
|
||||||
|
DEBUG_LOG(("MTP Info: Removing old container %1, "
|
||||||
|
"sent: %2, now: %3, current unixtime: %4"
|
||||||
|
).arg(msgId
|
||||||
|
).arg(request->lastSentTime
|
||||||
|
).arg(now
|
||||||
|
).arg(base::unixtime::now()));
|
||||||
}
|
}
|
||||||
} else if (request->lastSentTime + kCheckSentRequestTimeout
|
} else if (request->lastSentTime + checkAfter < now) {
|
||||||
< now) {
|
|
||||||
// Need to check state.
|
// Need to check state.
|
||||||
request->lastSentTime = now;
|
request->lastSentTime = now;
|
||||||
if (_stateRequestData.emplace(msgId).second) {
|
if (_bindMsgId) {
|
||||||
|
restarting = true;
|
||||||
|
} else if (_stateRequestData.emplace(msgId).second) {
|
||||||
requesting = true;
|
requesting = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (requesting) {
|
|
||||||
_sessionData->queueSendAnything(kSendStateRequestWaiting);
|
|
||||||
}
|
|
||||||
if (!removingIds.empty()) {
|
if (!removingIds.empty()) {
|
||||||
QWriteLocker locker(_sessionData->haveSentMutex());
|
QWriteLocker locker(_sessionData->haveSentMutex());
|
||||||
auto &haveSent = _sessionData->haveSentMap();
|
auto &haveSent = _sessionData->haveSentMap();
|
||||||
for (const auto msgId : removingIds) {
|
for (const auto msgId : removingIds) {
|
||||||
if (const auto removed = haveSent.take(msgId)) {
|
haveSent.remove(msgId);
|
||||||
Assert(!(*removed)->requestId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (restarting) {
|
||||||
|
DEBUG_LOG(("MTP Info: "
|
||||||
|
"Request state while key is not bound, restarting."));
|
||||||
|
restart();
|
||||||
|
} else if (requesting) {
|
||||||
|
_sessionData->queueSendAnything(kSendStateRequestWaiting);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::destroyAllConnections() {
|
void Connection::destroyAllConnections() {
|
||||||
|
@ -479,7 +489,12 @@ mtpMsgId Connection::placeToContainer(
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::tryToSend() {
|
void Connection::tryToSend() {
|
||||||
if (!_connection || !_keyId) {
|
DEBUG_LOG(("MTP Info: tryToSend for dc %1.").arg(_shiftedDcId));
|
||||||
|
if (!_connection) {
|
||||||
|
DEBUG_LOG(("MTP Info: not yet connected in dc %1.").arg(_shiftedDcId));
|
||||||
|
return;
|
||||||
|
} else if (!_keyId) {
|
||||||
|
DEBUG_LOG(("MTP Info: not yet with auth key in dc %1.").arg(_shiftedDcId));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1051,6 +1066,7 @@ void Connection::sendPingByTimer() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::sendPingForce() {
|
void Connection::sendPingForce() {
|
||||||
|
DEBUG_LOG(("MTP Info: send ping force for dcWithShift %1.").arg(_shiftedDcId));
|
||||||
if (!_pingId) {
|
if (!_pingId) {
|
||||||
_pingSendAt = 0;
|
_pingSendAt = 0;
|
||||||
DEBUG_LOG(("Will send ping!"));
|
DEBUG_LOG(("Will send ping!"));
|
||||||
|
@ -1496,14 +1512,20 @@ Connection::HandleResult Connection::handleOneReceived(
|
||||||
const auto requestId = wasSent(resendId);
|
const auto requestId = wasSent(resendId);
|
||||||
if (requestId) {
|
if (requestId) {
|
||||||
LOG(("Message Error: "
|
LOG(("Message Error: "
|
||||||
"bad message notification received, "
|
"fatal bad message notification received, "
|
||||||
"msgId %1, error_code %2, fatal: clearing callbacks"
|
"msgId %1, error_code %2, requestId: %3"
|
||||||
).arg(badMsgId
|
).arg(badMsgId
|
||||||
).arg(errorCode
|
).arg(errorCode
|
||||||
));
|
).arg(requestId));
|
||||||
_instance->clearCallbacksDelayed({ 1, RPCCallbackClear(
|
auto response = mtpBuffer();
|
||||||
requestId,
|
MTPRpcError(MTP_rpc_error(
|
||||||
-errorCode) });
|
MTP_int(500),
|
||||||
|
MTP_string("PROTOCOL_ERROR")
|
||||||
|
)).write(response);
|
||||||
|
|
||||||
|
// Save rpc_error for processing in the main thread.
|
||||||
|
QWriteLocker locker(_sessionData->haveReceivedMutex());
|
||||||
|
_sessionData->haveReceivedResponses().emplace(requestId, response);
|
||||||
} else {
|
} else {
|
||||||
DEBUG_LOG(("Message Error: "
|
DEBUG_LOG(("Message Error: "
|
||||||
"such message was not sent recently %1").arg(badMsgId));
|
"such message was not sent recently %1").arg(badMsgId));
|
||||||
|
@ -2006,18 +2028,10 @@ void Connection::requestsAcked(const QVector<MTPlong> &ids, bool byResponse) {
|
||||||
DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize));
|
DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize));
|
||||||
clearedBecauseTooOld.reserve(ackedCount - kIdsBufferSize);
|
clearedBecauseTooOld.reserve(ackedCount - kIdsBufferSize);
|
||||||
while (ackedCount-- > kIdsBufferSize) {
|
while (ackedCount-- > kIdsBufferSize) {
|
||||||
auto i = _ackedIds.begin();
|
_ackedIds.erase(_ackedIds.begin());
|
||||||
clearedBecauseTooOld.push_back(RPCCallbackClear(
|
|
||||||
i->second,
|
|
||||||
RPCError::TimeoutError));
|
|
||||||
_ackedIds.erase(i);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!clearedBecauseTooOld.empty()) {
|
|
||||||
_instance->clearCallbacksDelayed(std::move(clearedBecauseTooOld));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (toAckMore.size()) {
|
if (toAckMore.size()) {
|
||||||
requestsAcked(toAckMore);
|
requestsAcked(toAckMore);
|
||||||
}
|
}
|
||||||
|
@ -2260,7 +2274,7 @@ void Connection::applyAuthKey(AuthKeyPtr &&encryptionKey) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
setCurrentKeyId(0);
|
setCurrentKeyId(0);
|
||||||
DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed, restarting..."
|
DEBUG_LOG(("MTP Info: auth_key id for dc %1 changed, restarting..."
|
||||||
).arg(_shiftedDcId));
|
).arg(_shiftedDcId));
|
||||||
if (_connection) {
|
if (_connection) {
|
||||||
restart();
|
restart();
|
||||||
|
|
|
@ -122,7 +122,6 @@ public:
|
||||||
const SerializedRequest &request,
|
const SerializedRequest &request,
|
||||||
RPCResponseHandler &&callbacks);
|
RPCResponseHandler &&callbacks);
|
||||||
SerializedRequest getRequest(mtpRequestId requestId);
|
SerializedRequest getRequest(mtpRequestId requestId);
|
||||||
void clearCallbacksDelayed(std::vector<RPCCallbackClear> &&ids);
|
|
||||||
void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end);
|
void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end);
|
||||||
bool hasCallbacks(mtpRequestId requestId);
|
bool hasCallbacks(mtpRequestId requestId);
|
||||||
void globalCallback(const mtpPrime *from, const mtpPrime *end);
|
void globalCallback(const mtpPrime *from, const mtpPrime *end);
|
||||||
|
@ -189,12 +188,6 @@ private:
|
||||||
std::optional<ShiftedDcId> changeRequestByDc(
|
std::optional<ShiftedDcId> changeRequestByDc(
|
||||||
mtpRequestId requestId, DcId newdc);
|
mtpRequestId requestId, DcId newdc);
|
||||||
|
|
||||||
// RPCError::NoError means do not toggle onError callback.
|
|
||||||
void clearCallbacks(
|
|
||||||
mtpRequestId requestId,
|
|
||||||
int32 errorCode = RPCError::NoError);
|
|
||||||
void clearCallbacks(const std::vector<RPCCallbackClear> &ids);
|
|
||||||
|
|
||||||
void checkDelayedRequests();
|
void checkDelayedRequests();
|
||||||
|
|
||||||
const not_null<Instance*> _instance;
|
const not_null<Instance*> _instance;
|
||||||
|
@ -562,7 +555,9 @@ void Instance::Private::cancel(mtpRequestId requestId) {
|
||||||
const auto session = getSession(qAbs(*shiftedDcId));
|
const auto session = getSession(qAbs(*shiftedDcId));
|
||||||
session->cancel(requestId, msgId);
|
session->cancel(requestId, msgId);
|
||||||
}
|
}
|
||||||
clearCallbacks(requestId);
|
|
||||||
|
QMutexLocker locker(&_parserMapLock);
|
||||||
|
_parserMap.erase(requestId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// result < 0 means waiting for such count of ms.
|
// result < 0 means waiting for such count of ms.
|
||||||
|
@ -993,76 +988,6 @@ SerializedRequest Instance::Private::getRequest(mtpRequestId requestId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void Instance::Private::clearCallbacks(mtpRequestId requestId, int32 errorCode) {
|
|
||||||
RPCResponseHandler h;
|
|
||||||
bool found = false;
|
|
||||||
{
|
|
||||||
QMutexLocker locker(&_parserMapLock);
|
|
||||||
auto it = _parserMap.find(requestId);
|
|
||||||
if (it != _parserMap.end()) {
|
|
||||||
h = it->second;
|
|
||||||
found = true;
|
|
||||||
|
|
||||||
_parserMap.erase(it);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (errorCode && found) {
|
|
||||||
LOG(("API Error: callbacks cleared without handling! "
|
|
||||||
"Request: %1, error code: %2"
|
|
||||||
).arg(requestId
|
|
||||||
).arg(errorCode));
|
|
||||||
rpcErrorOccured(
|
|
||||||
requestId,
|
|
||||||
h,
|
|
||||||
RPCError::Local(
|
|
||||||
"CLEAR_CALLBACK",
|
|
||||||
QString("did not handle request %1, error code %2"
|
|
||||||
).arg(requestId
|
|
||||||
).arg(errorCode)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void Instance::Private::clearCallbacksDelayed(
|
|
||||||
std::vector<RPCCallbackClear> &&ids) {
|
|
||||||
if (ids.empty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Logs::DebugEnabled()) {
|
|
||||||
auto idsString = QStringList();
|
|
||||||
idsString.reserve(ids.size());
|
|
||||||
for (auto &value : ids) {
|
|
||||||
idsString.push_back(QString::number(value.requestId));
|
|
||||||
}
|
|
||||||
DEBUG_LOG(("RPC Info: clear callbacks delayed, msgIds: %1"
|
|
||||||
).arg(idsString.join(", ")));
|
|
||||||
}
|
|
||||||
|
|
||||||
InvokeQueued(_instance, [=, list = std::move(ids)] {
|
|
||||||
clearCallbacks(list);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
void Instance::Private::clearCallbacks(
|
|
||||||
const std::vector<RPCCallbackClear> &ids) {
|
|
||||||
Expects(!ids.empty());
|
|
||||||
|
|
||||||
for (const auto &clearRequest : ids) {
|
|
||||||
if (Logs::DebugEnabled()) {
|
|
||||||
QMutexLocker locker(&_parserMapLock);
|
|
||||||
const auto hasParsers = (_parserMap.find(clearRequest.requestId)
|
|
||||||
!= _parserMap.end());
|
|
||||||
DEBUG_LOG(("RPC Info: "
|
|
||||||
"clearing delayed callback %1, error code %2, parsers: %3"
|
|
||||||
).arg(clearRequest.requestId
|
|
||||||
).arg(clearRequest.errorCode
|
|
||||||
).arg(Logs::b(hasParsers)));
|
|
||||||
}
|
|
||||||
clearCallbacks(clearRequest.requestId, clearRequest.errorCode);
|
|
||||||
unregisterRequest(clearRequest.requestId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void Instance::Private::execCallback(
|
void Instance::Private::execCallback(
|
||||||
mtpRequestId requestId,
|
mtpRequestId requestId,
|
||||||
const mtpPrime *from,
|
const mtpPrime *from,
|
||||||
|
@ -1859,10 +1784,6 @@ void Instance::onSessionReset(ShiftedDcId shiftedDcId) {
|
||||||
_private->onSessionReset(shiftedDcId);
|
_private->onSessionReset(shiftedDcId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Instance::clearCallbacksDelayed(std::vector<RPCCallbackClear> &&ids) {
|
|
||||||
_private->clearCallbacksDelayed(std::move(ids));
|
|
||||||
}
|
|
||||||
|
|
||||||
void Instance::execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end) {
|
void Instance::execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end) {
|
||||||
_private->execCallback(requestId, from, end);
|
_private->execCallback(requestId, from, end);
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,9 +100,6 @@ public:
|
||||||
void onStateChange(ShiftedDcId shiftedDcId, int32 state);
|
void onStateChange(ShiftedDcId shiftedDcId, int32 state);
|
||||||
void onSessionReset(ShiftedDcId shiftedDcId);
|
void onSessionReset(ShiftedDcId shiftedDcId);
|
||||||
|
|
||||||
// Thread-safe.
|
|
||||||
void clearCallbacksDelayed(std::vector<RPCCallbackClear> &&ids);
|
|
||||||
|
|
||||||
void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end);
|
void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end);
|
||||||
bool hasCallbacks(mtpRequestId requestId);
|
bool hasCallbacks(mtpRequestId requestId);
|
||||||
void globalCallback(const mtpPrime *from, const mtpPrime *end);
|
void globalCallback(const mtpPrime *from, const mtpPrime *end);
|
||||||
|
|
|
@ -175,10 +175,12 @@ void Session::watchDcKeyChanges() {
|
||||||
) | rpl::filter([=](DcId dcId) {
|
) | rpl::filter([=](DcId dcId) {
|
||||||
return (dcId == _shiftedDcId) || (dcId == BareDcId(_shiftedDcId));
|
return (dcId == _shiftedDcId) || (dcId == BareDcId(_shiftedDcId));
|
||||||
}) | rpl::start_with_next([=] {
|
}) | rpl::start_with_next([=] {
|
||||||
DEBUG_LOG(("AuthKey Info: Session::authKeyCreatedForDC slot, "
|
DEBUG_LOG(("AuthKey Info: dcTemporaryKeyChanged in Session %1"
|
||||||
"emitting authKeyChanged(), dcWithShift %1").arg(_shiftedDcId));
|
).arg(_shiftedDcId));
|
||||||
if (const auto connection = _connection) {
|
if (const auto connection = _connection) {
|
||||||
InvokeQueued(connection, [=] {
|
InvokeQueued(connection, [=] {
|
||||||
|
DEBUG_LOG(("AuthKey Info: calling Connection::updateAuthKey in Session %1"
|
||||||
|
).arg(_shiftedDcId));
|
||||||
connection->updateAuthKey();
|
connection->updateAuthKey();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue