Remove all signals from ConnectionPrivate.

This commit is contained in:
John Preston 2019-11-18 15:53:37 +03:00
parent cfe12f773f
commit d9fc3619c2
8 changed files with 399 additions and 297 deletions

View File

@ -23,15 +23,6 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "base/qthelp_url.h" #include "base/qthelp_url.h"
#include "base/unixtime.h" #include "base/unixtime.h"
extern "C" {
#include <openssl/bn.h>
#include <openssl/err.h>
#include <openssl/aes.h>
#include <openssl/sha.h>
#include <openssl/md5.h>
#include <openssl/rand.h>
} // extern "C"
#ifdef small #ifdef small
#undef small #undef small
#endif // small #endif // small
@ -107,7 +98,7 @@ Connection::~Connection() {
} }
} }
void Connection::start(SessionData *sessionData, ShiftedDcId shiftedDcId) { void Connection::start(std::shared_ptr<SessionData> sessionData, ShiftedDcId shiftedDcId) {
Expects(_thread == nullptr && _private == nullptr); Expects(_thread == nullptr && _private == nullptr);
_thread = std::make_unique<QThread>(); _thread = std::make_unique<QThread>();
@ -115,7 +106,7 @@ void Connection::start(SessionData *sessionData, ShiftedDcId shiftedDcId) {
_instance, _instance,
_thread.get(), _thread.get(),
this, this,
sessionData, std::move(sessionData),
shiftedDcId); shiftedDcId);
// will be deleted in the thread::finished signal // will be deleted in the thread::finished signal
@ -214,10 +205,7 @@ int16 ConnectionPrivate::getProtocolDcId() const {
} }
void ConnectionPrivate::destroyAllConnections() { void ConnectionPrivate::destroyAllConnections() {
{ clearKeyCreatorOnFail();
QReadLocker lockFinished(&_sessionDataMutex);
clearKeyCreatorOnFail();
}
_waitForBetterTimer.cancel(); _waitForBetterTimer.cancel();
_waitForReceivedTimer.cancel(); _waitForReceivedTimer.cancel();
_waitForConnectedTimer.cancel(); _waitForConnectedTimer.cancel();
@ -229,7 +217,7 @@ ConnectionPrivate::ConnectionPrivate(
not_null<Instance*> instance, not_null<Instance*> instance,
not_null<QThread*> thread, not_null<QThread*> thread,
not_null<Connection*> owner, not_null<Connection*> owner,
not_null<SessionData*> data, std::shared_ptr<SessionData> data,
ShiftedDcId shiftedDcId) ShiftedDcId shiftedDcId)
: QObject(nullptr) : QObject(nullptr)
, _instance(instance) , _instance(instance)
@ -244,7 +232,7 @@ ConnectionPrivate::ConnectionPrivate(
, _waitForReceived(kMinReceiveTimeout) , _waitForReceived(kMinReceiveTimeout)
, _waitForConnected(kMinConnectedTimeout) , _waitForConnected(kMinConnectedTimeout)
, _pingSender(thread, [=] { sendPingByTimer(); }) , _pingSender(thread, [=] { sendPingByTimer(); })
, _sessionData(data) { , _sessionData(std::move(data)) {
Expects(_shiftedDcId != 0); Expects(_shiftedDcId != 0);
moveToThread(thread); moveToThread(thread);
@ -254,33 +242,16 @@ ConnectionPrivate::ConnectionPrivate(
connect(_sessionData->owner(), SIGNAL(authKeyChanged()), this, SLOT(updateAuthKey()), Qt::QueuedConnection); connect(_sessionData->owner(), SIGNAL(authKeyChanged()), this, SLOT(updateAuthKey()), Qt::QueuedConnection);
connect(_sessionData->owner(), SIGNAL(needToRestart()), this, SLOT(restartNow()), Qt::QueuedConnection); connect(_sessionData->owner(), SIGNAL(needToRestart()), this, SLOT(restartNow()), Qt::QueuedConnection);
connect(this, SIGNAL(needToReceive()), _sessionData->owner(), SLOT(tryToReceive()), Qt::QueuedConnection);
connect(this, SIGNAL(stateChanged(qint32)), _sessionData->owner(), SLOT(onConnectionStateChange(qint32)), Qt::QueuedConnection);
connect(_sessionData->owner(), SIGNAL(needToSend()), this, SLOT(tryToSend()), Qt::QueuedConnection); connect(_sessionData->owner(), SIGNAL(needToSend()), this, SLOT(tryToSend()), Qt::QueuedConnection);
connect(_sessionData->owner(), SIGNAL(needToPing()), this, SLOT(onPingSendForce()), Qt::QueuedConnection); connect(_sessionData->owner(), SIGNAL(needToPing()), this, SLOT(onPingSendForce()), Qt::QueuedConnection);
connect(this, SIGNAL(sessionResetDone()), _sessionData->owner(), SLOT(onResetDone()), Qt::QueuedConnection);
static bool _registered = false;
if (!_registered) {
_registered = true;
qRegisterMetaType<QVector<quint64> >("QVector<quint64>");
}
connect(this, SIGNAL(needToSendAsync()), _sessionData->owner(), SLOT(needToResumeAndSend()), Qt::QueuedConnection);
connect(this, SIGNAL(sendAnythingAsync(qint64)), _sessionData->owner(), SLOT(sendAnything(qint64)), Qt::QueuedConnection);
connect(this, SIGNAL(sendHttpWaitAsync()), _sessionData->owner(), SLOT(sendAnything()), Qt::QueuedConnection);
connect(this, SIGNAL(sendPongAsync(quint64,quint64)), _sessionData->owner(), SLOT(sendPong(quint64,quint64)), Qt::QueuedConnection);
connect(this, SIGNAL(sendMsgsStateInfoAsync(quint64, QByteArray)), _sessionData->owner(), SLOT(sendMsgsStateInfo(quint64,QByteArray)), Qt::QueuedConnection);
connect(this, SIGNAL(resendAsync(quint64,qint64,bool,bool)), _sessionData->owner(), SLOT(resend(quint64,qint64,bool,bool)), Qt::QueuedConnection);
connect(this, SIGNAL(resendManyAsync(QVector<quint64>,qint64,bool,bool)), _sessionData->owner(), SLOT(resendMany(QVector<quint64>,qint64,bool,bool)), Qt::QueuedConnection);
connect(this, SIGNAL(resendAllAsync()), _sessionData->owner(), SLOT(resendAll()), Qt::QueuedConnection);
} }
ConnectionPrivate::~ConnectionPrivate() { ConnectionPrivate::~ConnectionPrivate() {
clearKeyCreatorOnFail();
Expects(_finished); Expects(_finished);
Expects(!_connection); Expects(!_connection);
Expects(_testConnections.empty()); Expects(_testConnections.empty());
Expects(!_keyCreator);
} }
void ConnectionPrivate::onConfigLoaded() { void ConnectionPrivate::onConfigLoaded() {
@ -324,6 +295,7 @@ bool ConnectionPrivate::setState(int32 state, int32 ifState) {
QReadLocker lock(&stateConnMutex); QReadLocker lock(&stateConnMutex);
if (_state != ifState) return false; if (_state != ifState) return false;
} }
QWriteLocker lock(&stateConnMutex); QWriteLocker lock(&stateConnMutex);
if (_state == state) return false; if (_state == state) return false;
_state = state; _state = state;
@ -332,7 +304,9 @@ bool ConnectionPrivate::setState(int32 state, int32 ifState) {
_retryTimer.callOnce(_retryTimeout); _retryTimer.callOnce(_retryTimeout);
_retryWillFinish = crl::now() + _retryTimeout; _retryWillFinish = crl::now() + _retryTimeout;
} }
emit stateChanged(state); lock.unlock();
_sessionData->queueConnectionStateChange(state);
return true; return true;
} }
@ -465,7 +439,7 @@ void ConnectionPrivate::resetSession() { // recreate all msg_id and msg_seqno
_sessionData->stateRequestMap().clear(); _sessionData->stateRequestMap().clear();
} }
emit sessionResetDone(); _sessionData->queueResetDone();
} }
mtpMsgId ConnectionPrivate::prepareToSend( mtpMsgId ConnectionPrivate::prepareToSend(
@ -570,12 +544,11 @@ mtpMsgId ConnectionPrivate::placeToContainer(SecureRequest &toSendRequest, mtpMs
} }
void ConnectionPrivate::tryToSend() { void ConnectionPrivate::tryToSend() {
QReadLocker lockFinished(&_sessionDataMutex); if (!_connection || !_keyId) {
if (!_sessionData || !_connection || !_keyId) {
return; return;
} }
auto needsLayer = !_sessionData->owner()->connectionInited(); auto needsLayer = !_sessionData->connectionInited();
auto state = getState(); auto state = getState();
auto sendOnlyFirstPing = (state != ConnectedState); auto sendOnlyFirstPing = (state != ConnectedState);
if (sendOnlyFirstPing && !_pingIdToSend) { if (sendOnlyFirstPing && !_pingIdToSend) {
@ -926,16 +899,10 @@ void ConnectionPrivate::tryToSend() {
toSend.clear(); toSend.clear();
} }
} }
sendSecureRequest( sendSecureRequest(std::move(toSendRequest), needAnyResponse);
std::move(toSendRequest),
needAnyResponse,
lockFinished);
} }
void ConnectionPrivate::retryByTimer() { void ConnectionPrivate::retryByTimer() {
QReadLocker lockFinished(&_sessionDataMutex);
if (!_sessionData) return;
if (_retryTimeout < 3) { if (_retryTimeout < 3) {
++_retryTimeout; ++_retryTimeout;
} else if (_retryTimeout == 3) { } else if (_retryTimeout == 3) {
@ -959,17 +926,11 @@ void ConnectionPrivate::connectToServer(bool afterConfig) {
return; return;
} }
QReadLocker lockFinished(&_sessionDataMutex);
if (!_sessionData) {
DEBUG_LOG(("MTP Error: "
"connectToServer() called for stopped connection!"));
return;
}
_connectionOptions = std::make_unique<ConnectionOptions>( _connectionOptions = std::make_unique<ConnectionOptions>(
_sessionData->connectionOptions()); _sessionData->connectionOptions());
// #TODO race. // #TODO race.
const auto hasKey = (_sessionData->owner()->getKey() != nullptr); const auto hasKey = (_sessionData->getKey() != nullptr);
lockFinished.unlock();
const auto bareDc = BareDcId(_shiftedDcId); const auto bareDc = BareDcId(_shiftedDcId);
_dcType = _instance->dcOptions()->dcType(_shiftedDcId); _dcType = _instance->dcOptions()->dcType(_shiftedDcId);
@ -1063,6 +1024,7 @@ void ConnectionPrivate::connectToServer(bool afterConfig) {
_waitForConnectedTimer.cancel(); _waitForConnectedTimer.cancel();
setState(ConnectingState); setState(ConnectingState);
_pingId = _pingMsgId = _pingIdToSend = _pingSendAt = 0; _pingId = _pingMsgId = _pingIdToSend = _pingSendAt = 0;
_pingSender.cancel(); _pingSender.cancel();
@ -1070,25 +1032,23 @@ void ConnectionPrivate::connectToServer(bool afterConfig) {
} }
void ConnectionPrivate::restart() { void ConnectionPrivate::restart() {
QReadLocker lockFinished(&_sessionDataMutex);
if (!_sessionData) return;
DEBUG_LOG(("MTP Info: restarting Connection")); DEBUG_LOG(("MTP Info: restarting Connection"));
_waitForReceivedTimer.cancel(); _waitForReceivedTimer.cancel();
_waitForConnectedTimer.cancel(); _waitForConnectedTimer.cancel();
lockFinished.unlock();
doDisconnect(); doDisconnect();
lockFinished.relock(); if (_needSessionReset) {
if (_sessionData && _needSessionReset) {
resetSession(); resetSession();
} }
_restarted = true; _restarted = true;
if (_retryTimer.isActive()) return; if (_retryTimer.isActive()) {
return;
}
DEBUG_LOG(("MTP Info: restart timeout: %1ms").arg(_retryTimeout)); DEBUG_LOG(("MTP Info: restart timeout: %1ms").arg(_retryTimeout));
setState(-_retryTimeout); setState(-_retryTimeout);
} }
@ -1152,7 +1112,7 @@ void ConnectionPrivate::sendPingByTimer() {
_pingSender.callOnce(mustSendTill - now); _pingSender.callOnce(mustSendTill - now);
} }
} else { } else {
emit needToSendAsync(); _sessionData->queueNeedToResumeAndSend();
} }
} }
@ -1244,16 +1204,8 @@ void ConnectionPrivate::requestCDNConfig() {
} }
void ConnectionPrivate::handleReceived() { void ConnectionPrivate::handleReceived() {
QReadLocker lockFinished(&_sessionDataMutex);
if (!_sessionData) return;
onReceivedSome(); onReceivedSome();
const auto restartOnError = [&] {
lockFinished.unlock();
restart();
};
while (!_connection->received().empty()) { while (!_connection->received().empty()) {
auto intsBuffer = std::move(_connection->received().front()); auto intsBuffer = std::move(_connection->received().front());
_connection->received().pop_front(); _connection->received().pop_front();
@ -1268,13 +1220,13 @@ void ConnectionPrivate::handleReceived() {
LOG(("TCP Error: bad message received, len %1").arg(intsCount * kIntSize)); LOG(("TCP Error: bad message received, len %1").arg(intsCount * kIntSize));
TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(ints, intsCount * kIntSize).str())); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(ints, intsCount * kIntSize).str()));
return restartOnError(); return restart();
} }
if (_keyId != *(uint64*)ints) { if (_keyId != *(uint64*)ints) {
LOG(("TCP Error: bad auth_key_id %1 instead of %2 received").arg(_keyId).arg(*(uint64*)ints)); LOG(("TCP Error: bad auth_key_id %1 instead of %2 received").arg(_keyId).arg(*(uint64*)ints));
TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(ints, intsCount * kIntSize).str())); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(ints, intsCount * kIntSize).str()));
return restartOnError(); return restart();
} }
auto encryptedInts = ints + kExternalHeaderIntsCount; auto encryptedInts = ints + kExternalHeaderIntsCount;
@ -1301,7 +1253,7 @@ void ConnectionPrivate::handleReceived() {
LOG(("TCP Error: bad messageLength %1").arg(messageLength)); LOG(("TCP Error: bad messageLength %1").arg(messageLength));
TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(ints, intsCount * kIntSize).str())); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(ints, intsCount * kIntSize).str()));
return restartOnError(); return restart();
} }
auto fullDataLength = kEncryptedHeaderIntsCount * kIntSize + messageLength; // Without padding. auto fullDataLength = kEncryptedHeaderIntsCount * kIntSize + messageLength; // Without padding.
@ -1322,7 +1274,7 @@ void ConnectionPrivate::handleReceived() {
LOG(("TCP Error: bad SHA1 hash after aesDecrypt in message.")); LOG(("TCP Error: bad SHA1 hash after aesDecrypt in message."));
TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(encryptedInts, encryptedBytesCount).str())); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(encryptedInts, encryptedBytesCount).str()));
return restartOnError(); return restart();
} }
#else // TDESKTOP_MTPROTO_OLD #else // TDESKTOP_MTPROTO_OLD
constexpr auto kMinPaddingSize = 12U; constexpr auto kMinPaddingSize = 12U;
@ -1342,7 +1294,7 @@ void ConnectionPrivate::handleReceived() {
LOG(("TCP Error: bad SHA256 hash after aesDecrypt in message")); LOG(("TCP Error: bad SHA256 hash after aesDecrypt in message"));
TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(encryptedInts, encryptedBytesCount).str())); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(encryptedInts, encryptedBytesCount).str()));
return restartOnError(); return restart();
} }
#endif // TDESKTOP_MTPROTO_OLD #endif // TDESKTOP_MTPROTO_OLD
@ -1350,7 +1302,7 @@ void ConnectionPrivate::handleReceived() {
LOG(("TCP Error: bad msg_len received %1, data size: %2").arg(messageLength).arg(encryptedBytesCount)); LOG(("TCP Error: bad msg_len received %1, data size: %2").arg(messageLength).arg(encryptedBytesCount));
TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(encryptedInts, encryptedBytesCount).str())); TCP_LOG(("TCP Error: bad message %1").arg(Logs::mb(encryptedInts, encryptedBytesCount).str()));
return restartOnError(); return restart();
} }
TCP_LOG(("TCP Info: decrypted message %1,%2,%3 is %4 len").arg(msgId).arg(seqNo).arg(Logs::b(needAck)).arg(fullDataLength)); TCP_LOG(("TCP Info: decrypted message %1,%2,%3 is %4 len").arg(msgId).arg(seqNo).arg(Logs::b(needAck)).arg(fullDataLength));
@ -1360,7 +1312,7 @@ void ConnectionPrivate::handleReceived() {
LOG(("MTP Error: bad server session received")); LOG(("MTP Error: bad server session received"));
TCP_LOG(("MTP Error: bad server session %1 instead of %2 in message received").arg(session).arg(serverSession)); TCP_LOG(("MTP Error: bad server session %1 instead of %2 in message received").arg(session).arg(serverSession));
return restartOnError(); return restart();
} }
const auto serverTime = int32(msgId >> 32); const auto serverTime = int32(msgId >> 32);
@ -1369,7 +1321,7 @@ void ConnectionPrivate::handleReceived() {
if (!isReply && ((msgId & 0x03) != 3)) { if (!isReply && ((msgId & 0x03) != 3)) {
LOG(("MTP Error: bad msg_id %1 in message received").arg(msgId)); LOG(("MTP Error: bad msg_id %1 in message received").arg(msgId));
return restartOnError(); return restart();
} }
bool badTime = false; bool badTime = false;
@ -1384,11 +1336,10 @@ void ConnectionPrivate::handleReceived() {
if (!badTime) { if (!badTime) {
DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2, updating...").arg(serverSalt).arg(mySalt)); DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2, updating...").arg(serverSalt).arg(mySalt));
_sessionData->setSalt(serverSalt); _sessionData->setSalt(serverSalt);
if (setState(ConnectedState, ConnectingState)) { // only connected
if (_restarted) { if (setState(ConnectedState, ConnectingState) && _restarted) {
emit resendAllAsync(); _sessionData->queueResendAll();
_restarted = false; _restarted = false;
}
} }
} else { } else {
DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(mySalt)); DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(mySalt));
@ -1419,29 +1370,24 @@ void ConnectionPrivate::handleReceived() {
} }
// send acks // send acks
uint32 toAckSize = _ackRequestData.size(); if (const auto toAckSize = _ackRequestData.size()) {
if (toAckSize) {
DEBUG_LOG(("MTP Info: will send %1 acks, ids: %2").arg(toAckSize).arg(LogIdsVector(_ackRequestData))); DEBUG_LOG(("MTP Info: will send %1 acks, ids: %2").arg(toAckSize).arg(LogIdsVector(_ackRequestData)));
emit sendAnythingAsync(kAckSendWaiting); _sessionData->queueSendAnything(kAckSendWaiting);
} }
bool emitSignal = false; auto lock = QReadLocker(_sessionData->haveReceivedMutex());
{ const auto tryToReceive = !_sessionData->haveReceivedResponses().isEmpty() || !_sessionData->haveReceivedUpdates().isEmpty();
QReadLocker locker(_sessionData->haveReceivedMutex()); lock.unlock();
emitSignal = !_sessionData->haveReceivedResponses().isEmpty() || !_sessionData->haveReceivedUpdates().isEmpty();
if (emitSignal) {
DEBUG_LOG(("MTP Info: emitting needToReceive() - need to parse in another thread, %1 responses, %2 updates.").arg(_sessionData->haveReceivedResponses().size()).arg(_sessionData->haveReceivedUpdates().size()));
}
}
if (emitSignal) { if (tryToReceive) {
emit needToReceive(); DEBUG_LOG(("MTP Info: queueTryToReceive() - need to parse in another thread, %1 responses, %2 updates.").arg(_sessionData->haveReceivedResponses().size()).arg(_sessionData->haveReceivedUpdates().size()));
_sessionData->queueTryToReceive();
} }
if (res != HandleResult::Success && res != HandleResult::Ignored) { if (res != HandleResult::Success && res != HandleResult::Ignored) {
_needSessionReset = (res == HandleResult::ResetSession); _needSessionReset = (res == HandleResult::ResetSession);
return restartOnError(); return restart();
} }
_retryTimeout = 1; // reset restart() timer _retryTimeout = 1; // reset restart() timer
@ -1449,12 +1395,12 @@ void ConnectionPrivate::handleReceived() {
if (!wasConnected) { if (!wasConnected) {
if (getState() == ConnectedState) { if (getState() == ConnectedState) {
emit needToSendAsync(); _sessionData->queueNeedToResumeAndSend();
} }
} }
} }
if (_connection->needHttpWait()) { if (_connection->needHttpWait()) {
emit sendHttpWaitAsync(); _sessionData->queueSendAnything();
} }
} }
@ -1679,7 +1625,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
if (setState(ConnectedState, ConnectingState)) { // maybe only connected if (setState(ConnectedState, ConnectingState)) { // maybe only connected
if (_restarted) { if (_restarted) {
emit resendAllAsync(); _sessionData->queueResendAll();
_restarted = false; _restarted = false;
} }
} }
@ -1741,7 +1687,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
info[i] = state; info[i] = state;
} }
} }
emit sendMsgsStateInfoAsync(msgId, info); _sessionData->queueSendMsgsStateInfo(msgId, info);
} return HandleResult::Success; } return HandleResult::Success;
case mtpc_msgs_state_info: { case mtpc_msgs_state_info: {
@ -2273,32 +2219,45 @@ void ConnectionPrivate::handleMsgsStates(const QVector<MTPlong> &ids, const QByt
} }
} }
void ConnectionPrivate::resend(quint64 msgId, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) { void ConnectionPrivate::resend(
if (msgId == _pingMsgId) return; mtpMsgId msgId,
emit resendAsync(msgId, msCanWait, forceContainer, sendMsgStateInfo); crl::time msCanWait,
bool forceContainer,
bool sendMsgStateInfo) {
if (msgId == _pingMsgId) {
return;
}
_sessionData->queueResend(
msgId,
msCanWait,
forceContainer,
sendMsgStateInfo);
} }
void ConnectionPrivate::resendMany(QVector<quint64> msgIds, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) { void ConnectionPrivate::resendMany(
QVector<mtpMsgId> msgIds,
crl::time msCanWait,
bool forceContainer,
bool sendMsgStateInfo) {
for (int32 i = 0, l = msgIds.size(); i < l; ++i) { for (int32 i = 0, l = msgIds.size(); i < l; ++i) {
if (msgIds.at(i) == _pingMsgId) { if (msgIds.at(i) == _pingMsgId) {
msgIds.remove(i); msgIds.remove(i);
--l; --l;
} }
} }
emit resendManyAsync(msgIds, msCanWait, forceContainer, sendMsgStateInfo); _sessionData->queueResendMany(
std::move(msgIds),
msCanWait,
forceContainer,
sendMsgStateInfo);
} }
void ConnectionPrivate::onConnected( void ConnectionPrivate::onConnected(
not_null<AbstractConnection*> connection) { not_null<AbstractConnection*> connection) {
QReadLocker lockFinished(&_sessionDataMutex);
if (!_sessionData) return;
disconnect(connection, &AbstractConnection::connected, nullptr, nullptr); disconnect(connection, &AbstractConnection::connected, nullptr, nullptr);
if (!connection->isConnected()) { if (!connection->isConnected()) {
LOG(("Connection Error: not connected in onConnected(), " LOG(("Connection Error: not connected in onConnected(), "
"state: %1").arg(connection->debugState())); "state: %1").arg(connection->debugState()));
lockFinished.unlock();
return restart(); return restart();
} }
@ -2323,8 +2282,6 @@ void ConnectionPrivate::onConnected(
_waitForBetterTimer.cancel(); _waitForBetterTimer.cancel();
_connection = std::move(i->data); _connection = std::move(i->data);
_testConnections.clear(); _testConnections.clear();
lockFinished.unlock();
checkAuthKey(); checkAuthKey();
} }
} }
@ -2384,13 +2341,16 @@ void ConnectionPrivate::checkAuthKey() {
} }
void ConnectionPrivate::updateAuthKey() { void ConnectionPrivate::updateAuthKey() {
QReadLocker lockFinished(&_sessionDataMutex); if (_keyCreator) {
if (!_sessionData || _keyCreator) {
return; return;
} }
DEBUG_LOG(("AuthKey Info: Connection updating key from Session, dc %1").arg(_shiftedDcId)); DEBUG_LOG(("AuthKey Info: Connection updating key from Session, dc %1").arg(_shiftedDcId));
_key = _sessionData->owner()->getKey(); applyAuthKey(_sessionData->getKey());
}
void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&key) {
_key = std::move(key);
const auto newKeyId = _key ? _key->keyId() : 0; const auto newKeyId = _key ? _key->keyId() : 0;
if (newKeyId) { if (newKeyId) {
if (_keyId == newKeyId) { if (_keyId == newKeyId) {
@ -2406,7 +2366,6 @@ void ConnectionPrivate::updateAuthKey() {
Assert(already != newKeyId); Assert(already != newKeyId);
DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed").arg(_shiftedDcId)); DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed").arg(_shiftedDcId));
lockFinished.unlock();
restart(); restart();
return; return;
} }
@ -2420,11 +2379,10 @@ void ConnectionPrivate::updateAuthKey() {
LOG(("MTP Error: No key %1 in updateAuthKey() for destroying.").arg(_shiftedDcId)); LOG(("MTP Error: No key %1 in updateAuthKey() for destroying.").arg(_shiftedDcId));
_instance->checkIfKeyWasDestroyed(_shiftedDcId); _instance->checkIfKeyWasDestroyed(_shiftedDcId);
return; return;
} else if (!_sessionData->owner()->acquireKeyCreation()) { } else if (!_sessionData->acquireKeyCreation()) {
DEBUG_LOG(("AuthKey Info: No key in updateAuthKey(), but someone is creating already.")); DEBUG_LOG(("AuthKey Info: No key in updateAuthKey(), but someone is creating already."));
return; return;
} }
lockFinished.unlock();
DEBUG_LOG(("AuthKey Info: No key in updateAuthKey(), creating.")); DEBUG_LOG(("AuthKey Info: No key in updateAuthKey(), creating."));
createDcKey(); createDcKey();
@ -2435,9 +2393,6 @@ void ConnectionPrivate::createDcKey() {
using Error = DcKeyCreator::Error; using Error = DcKeyCreator::Error;
auto delegate = DcKeyCreator::Delegate(); auto delegate = DcKeyCreator::Delegate();
delegate.done = [=](base::expected<Result, Error> result) { delegate.done = [=](base::expected<Result, Error> result) {
QReadLocker lockFinished(&_sessionDataMutex);
if (!_sessionData) return;
if (result) { if (result) {
DEBUG_LOG(("AuthKey Info: auth key gen succeed, id: %1, server salt: %2").arg(result->key->keyId()).arg(result->serverSalt)); DEBUG_LOG(("AuthKey Info: auth key gen succeed, id: %1, server salt: %2").arg(result->key->keyId()).arg(result->serverSalt));
@ -2445,10 +2400,8 @@ void ConnectionPrivate::createDcKey() {
_sessionData->clearForNewKey(_instance); _sessionData->clearForNewKey(_instance);
_keyCreator = nullptr; _keyCreator = nullptr;
_sessionData->owner()->releaseKeyCreationOnDone( _sessionData->releaseKeyCreationOnDone(result->key);
std::move(result->key)); applyAuthKey(std::move(result->key));
updateAuthKey();
return; return;
} }
clearKeyCreatorOnFail(); clearKeyCreatorOnFail();
@ -2484,14 +2437,13 @@ void ConnectionPrivate::authKeyChecked() {
if (_sessionData->getSalt()) { if (_sessionData->getSalt()) {
setState(ConnectedState); setState(ConnectedState);
if (_restarted) { if (_restarted) {
emit resendAllAsync(); _sessionData->queueResendAll();
_restarted = false; _restarted = false;
} }
} // else receive salt in bad_server_salt first, then try to send all the requests } // else receive salt in bad_server_salt first, then try to send all the requests
_pingIdToSend = rand_value<uint64>(); // get server_salt _pingIdToSend = rand_value<uint64>(); // get server_salt
_sessionData->queueNeedToResumeAndSend();
emit needToSendAsync();
} }
void ConnectionPrivate::onError( void ConnectionPrivate::onError(
@ -2537,20 +2489,15 @@ void ConnectionPrivate::handleError(int errorCode) {
void ConnectionPrivate::destroyCdnKey() { void ConnectionPrivate::destroyCdnKey() {
if (_key) { if (_key) {
QReadLocker lockFinished(&_sessionDataMutex); _sessionData->destroyCdnKey(_keyId);
if (_sessionData) {
_sessionData->owner()->destroyCdnKey(_keyId);
}
} }
_key = nullptr; _key = nullptr;
_keyId = 0; _keyId = 0;
} }
bool ConnectionPrivate::sendSecureRequest( bool ConnectionPrivate::sendSecureRequest(
SecureRequest &&request, SecureRequest &&request,
bool needAnyResponse, bool needAnyResponse) {
QReadLocker &lockFinished) {
#ifdef TDESKTOP_MTPROTO_OLD #ifdef TDESKTOP_MTPROTO_OLD
const auto oldPadding = true; const auto oldPadding = true;
#else // TDESKTOP_MTPROTO_OLD #else // TDESKTOP_MTPROTO_OLD
@ -2658,24 +2605,15 @@ mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const {
return 0; return 0;
} }
// _sessionDataMutex must be locked for read.
void ConnectionPrivate::clearKeyCreatorOnFail() { void ConnectionPrivate::clearKeyCreatorOnFail() {
if (_keyCreator) { if (!_keyCreator) {
_keyCreator = nullptr; return;
Assert(_sessionData != nullptr);
_sessionData->owner()->releaseKeyCreationOnFail();
} }
_keyCreator = nullptr;
_sessionData->releaseKeyCreationOnFail();
} }
void ConnectionPrivate::stop() { void ConnectionPrivate::stop() {
QWriteLocker lockFinished(&_sessionDataMutex);
if (!_sessionData) {
Assert(_keyCreator == nullptr);
return;
}
clearKeyCreatorOnFail();
_sessionData = nullptr;
} }
} // namespace internal } // namespace internal

View File

@ -21,7 +21,7 @@ class DcKeyChecker;
} // namespace details } // namespace details
// How much time to wait for some more requests, when sending msg acks. // How much time to wait for some more requests, when sending msg acks.
constexpr auto kAckSendWaiting = crl::time(10000); constexpr auto kAckSendWaiting = 10 * crl::time(1000);
class Instance; class Instance;
@ -43,7 +43,7 @@ public:
Connection(not_null<Instance*> instance); Connection(not_null<Instance*> instance);
~Connection(); ~Connection();
void start(SessionData *data, ShiftedDcId shiftedDcId); void start(std::shared_ptr<SessionData> data, ShiftedDcId shiftedDcId);
void kill(); void kill();
void waitTillFinish(); void waitTillFinish();
@ -68,7 +68,7 @@ public:
not_null<Instance*> instance, not_null<Instance*> instance,
not_null<QThread*> thread, not_null<QThread*> thread,
not_null<Connection*> owner, not_null<Connection*> owner,
not_null<SessionData*> data, std::shared_ptr<SessionData> data,
ShiftedDcId shiftedDcId); ShiftedDcId shiftedDcId);
~ConnectionPrivate(); ~ConnectionPrivate();
@ -79,21 +79,6 @@ public:
int32 getState() const; int32 getState() const;
QString transport() const; QString transport() const;
signals:
void needToReceive();
void needToRestart();
void stateChanged(qint32 newState);
void sessionResetDone();
void needToSendAsync();
void sendAnythingAsync(qint64 msWait);
void sendHttpWaitAsync();
void sendPongAsync(quint64 msgId, quint64 pingId);
void sendMsgsStateInfoAsync(quint64 msgId, QByteArray data);
void resendAsync(quint64 msgId, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo);
void resendManyAsync(QVector<quint64> msgIds, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo);
void resendAllAsync();
public slots: public slots:
void restartNow(); void restartNow();
@ -112,6 +97,15 @@ private:
ConnectionPointer data; ConnectionPointer data;
int priority = 0; int priority = 0;
}; };
enum class HandleResult {
Success,
Ignored,
RestartConnection,
ResetSession,
ParseError,
};
void connectToServer(bool afterConfig = false); void connectToServer(bool afterConfig = false);
void connectingTimedOut(); void connectingTimedOut();
void doDisconnect(); void doDisconnect();
@ -135,8 +129,6 @@ private:
void waitBetterFailed(); void waitBetterFailed();
void markConnectionOld(); void markConnectionOld();
void sendPingByTimer(); void sendPingByTimer();
// Locks _sessionDataMutex.
void destroyAllConnections(); void destroyAllConnections();
void confirmBestConnection(); void confirmBestConnection();
@ -151,23 +143,14 @@ private:
mtpMsgId prepareToSend(SecureRequest &request, mtpMsgId currentLastId); mtpMsgId prepareToSend(SecureRequest &request, mtpMsgId currentLastId);
mtpMsgId replaceMsgId(SecureRequest &request, mtpMsgId newId); mtpMsgId replaceMsgId(SecureRequest &request, mtpMsgId newId);
bool sendSecureRequest( bool sendSecureRequest(SecureRequest &&request, bool needAnyResponse);
SecureRequest &&request,
bool needAnyResponse,
QReadLocker &lockFinished);
mtpRequestId wasSent(mtpMsgId msgId) const; mtpRequestId wasSent(mtpMsgId msgId) const;
enum class HandleResult {
Success,
Ignored,
RestartConnection,
ResetSession,
ParseError,
};
[[nodiscard]] HandleResult handleOneReceived(const mtpPrime *from, const mtpPrime *end, uint64 msgId, int32 serverTime, uint64 serverSalt, bool badTime); [[nodiscard]] HandleResult handleOneReceived(const mtpPrime *from, const mtpPrime *end, uint64 msgId, int32 serverTime, uint64 serverSalt, bool badTime);
mtpBuffer ungzip(const mtpPrime *from, const mtpPrime *end) const; mtpBuffer ungzip(const mtpPrime *from, const mtpPrime *end) const;
void handleMsgsStates(const QVector<MTPlong> &ids, const QByteArray &states, QVector<MTPlong> &acked); void handleMsgsStates(const QVector<MTPlong> &ids, const QByteArray &states, QVector<MTPlong> &acked);
// _sessionDataMutex must be locked for read.
bool setState(int32 state, int32 ifState = Connection::UpdateAlways); bool setState(int32 state, int32 ifState = Connection::UpdateAlways);
void appendTestConnection( void appendTestConnection(
@ -182,19 +165,26 @@ private:
// remove msgs with such ids from sessionData->haveSent, add to sessionData->wereAcked // remove msgs with such ids from sessionData->haveSent, add to sessionData->wereAcked
void requestsAcked(const QVector<MTPlong> &ids, bool byResponse = false); void requestsAcked(const QVector<MTPlong> &ids, bool byResponse = false);
void resend(quint64 msgId, qint64 msCanWait = 0, bool forceContainer = false, bool sendMsgStateInfo = false); void resend(
void resendMany(QVector<quint64> msgIds, qint64 msCanWait = 0, bool forceContainer = false, bool sendMsgStateInfo = false); mtpMsgId msgId,
crl::time msCanWait = 0,
bool forceContainer = false,
bool sendMsgStateInfo = false);
void resendMany(
QVector<mtpMsgId> msgIds,
crl::time msCanWait = 0,
bool forceContainer = false,
bool sendMsgStateInfo = false);
void createDcKey(); void createDcKey();
void resetSession(); void resetSession();
void checkAuthKey(); void checkAuthKey();
void authKeyChecked(); void authKeyChecked();
void destroyCdnKey(); void destroyCdnKey();
// _sessionDataMutex must be locked for read.
void clearKeyCreatorOnFail(); void clearKeyCreatorOnFail();
void applyAuthKey(AuthKeyPtr &&key);
not_null<Instance*> _instance; const not_null<Instance*> _instance;
DcType _dcType = DcType::Regular; DcType _dcType = DcType::Regular;
mutable QReadWriteLock stateConnMutex; mutable QReadWriteLock stateConnMutex;
@ -236,8 +226,7 @@ private:
AuthKeyPtr _key; AuthKeyPtr _key;
uint64 _keyId = 0; uint64 _keyId = 0;
QReadWriteLock _sessionDataMutex; std::shared_ptr<SessionData> _sessionData;
SessionData *_sessionData = nullptr;
std::unique_ptr<ConnectionOptions> _connectionOptions; std::unique_ptr<ConnectionOptions> _connectionOptions;
std::unique_ptr<details::DcKeyCreator> _keyCreator; std::unique_ptr<details::DcKeyCreator> _keyCreator;

View File

@ -37,8 +37,8 @@ AuthKeyPtr Dcenter::getKey() const {
return _key; return _key;
} }
void Dcenter::destroyCdnKey(uint64 keyId) { bool Dcenter::destroyCdnKey(uint64 keyId) {
destroyKey(keyId); return destroyKey(keyId);
} }
bool Dcenter::destroyConfirmedForgottenKey(uint64 keyId) { bool Dcenter::destroyConfirmedForgottenKey(uint64 keyId) {
@ -54,9 +54,6 @@ bool Dcenter::destroyKey(uint64 keyId) {
} }
_key = nullptr; _key = nullptr;
_connectionInited = false; _connectionInited = false;
lock.unlock();
emit authKeyChanged();
return true; return true;
} }
@ -86,18 +83,18 @@ void Dcenter::releaseKeyCreationOnFail() {
_creatingKey = false; _creatingKey = false;
} }
void Dcenter::releaseKeyCreationOnDone(AuthKeyPtr &&key) { void Dcenter::releaseKeyCreationOnDone(const AuthKeyPtr &key) {
Expects(_creatingKey); Expects(_creatingKey);
Expects(_key == nullptr); Expects(_key == nullptr);
QWriteLocker lock(&_mutex); QWriteLocker lock(&_mutex);
DEBUG_LOG(("AuthKey Info: Dcenter::releaseKeyCreationOnDone(%1), emitting authKeyChanged, dc %2").arg(key ? key->keyId() : 0).arg(_id)); DEBUG_LOG(("AuthKey Info: Dcenter::releaseKeyCreationOnDone(%1), "
_key = std::move(key); "emitting authKeyChanged, dc %2"
).arg(key ? key->keyId() : 0
).arg(_id));
_key = key;
_connectionInited = false; _connectionInited = false;
_creatingKey = false; _creatingKey = false;
lock.unlock();
emit authKeyChanged();
} }
} // namespace internal } // namespace internal

View File

@ -16,8 +16,6 @@ using AuthKeyPtr = std::shared_ptr<AuthKey>;
namespace internal { namespace internal {
class Dcenter : public QObject { class Dcenter : public QObject {
Q_OBJECT
public: public:
// Main thread. // Main thread.
Dcenter(DcId dcId, AuthKeyPtr &&key); Dcenter(DcId dcId, AuthKeyPtr &&key);
@ -26,9 +24,9 @@ public:
[[nodiscard]] DcId id() const; [[nodiscard]] DcId id() const;
[[nodiscard]] AuthKeyPtr getKey() const; [[nodiscard]] AuthKeyPtr getKey() const;
void destroyCdnKey(uint64 keyId); bool destroyCdnKey(uint64 keyId);
bool destroyConfirmedForgottenKey(uint64 keyId); bool destroyConfirmedForgottenKey(uint64 keyId);
void releaseKeyCreationOnDone(AuthKeyPtr &&key); void releaseKeyCreationOnDone(const AuthKeyPtr &key);
[[nodiscard]] bool connectionInited() const; [[nodiscard]] bool connectionInited() const;
void setConnectionInited(bool connectionInited = true); void setConnectionInited(bool connectionInited = true);
@ -36,9 +34,6 @@ public:
[[nodiscard]] bool acquireKeyCreation(); [[nodiscard]] bool acquireKeyCreation();
void releaseKeyCreationOnFail(); void releaseKeyCreationOnFail();
signals:
void authKeyChanged();
private: private:
bool destroyKey(uint64 keyId); bool destroyKey(uint64 keyId);

View File

@ -65,7 +65,8 @@ public:
void setMainDcId(DcId mainDcId); void setMainDcId(DcId mainDcId);
[[nodiscard]] DcId mainDcId() const; [[nodiscard]] DcId mainDcId() const;
void setKeyForWrite(DcId dcId, const AuthKeyPtr &key); void dcKeyChanged(DcId dcId, const AuthKeyPtr &key);
[[nodiscard]] rpl::producer<DcId> dcKeyChanged() const;
[[nodiscard]] AuthKeysList getKeysForWrite() const; [[nodiscard]] AuthKeysList getKeysForWrite() const;
void addKeysForDestroy(AuthKeysList &&keys); void addKeysForDestroy(AuthKeysList &&keys);
@ -208,6 +209,7 @@ private:
bool _mainDcIdForced = false; bool _mainDcIdForced = false;
base::flat_map<DcId, std::unique_ptr<Dcenter>> _dcenters; base::flat_map<DcId, std::unique_ptr<Dcenter>> _dcenters;
std::vector<std::unique_ptr<Dcenter>> _dcentersToDestroy; std::vector<std::unique_ptr<Dcenter>> _dcentersToDestroy;
rpl::event_stream<DcId> _dcKeyChanged;
Session *_mainSession = nullptr; Session *_mainSession = nullptr;
base::flat_map<ShiftedDcId, std::unique_ptr<Session>> _sessions; base::flat_map<ShiftedDcId, std::unique_ptr<Session>> _sessions;
@ -693,7 +695,9 @@ not_null<Dcenter*> Instance::Private::getDcById(
return addDc(dcId); return addDc(dcId);
} }
void Instance::Private::setKeyForWrite(DcId dcId, const AuthKeyPtr &key) { void Instance::Private::dcKeyChanged(DcId dcId, const AuthKeyPtr &key) {
_dcKeyChanged.fire_copy(dcId);
if (isTemporaryDcId(dcId)) { if (isTemporaryDcId(dcId)) {
return; return;
} }
@ -710,6 +714,10 @@ void Instance::Private::setKeyForWrite(DcId dcId, const AuthKeyPtr &key) {
}); });
} }
rpl::producer<DcId> Instance::Private::dcKeyChanged() const {
return _dcKeyChanged.events();
}
AuthKeysList Instance::Private::getKeysForWrite() const { AuthKeysList Instance::Private::getKeysForWrite() const {
auto result = AuthKeysList(); auto result = AuthKeysList();
@ -1600,7 +1608,7 @@ void Instance::Private::keyDestroyedOnServer(DcId dcId, uint64 keyId) {
if (const auto dc = findDc(dcId)) { if (const auto dc = findDc(dcId)) {
if (dc->destroyConfirmedForgottenKey(keyId)) { if (dc->destroyConfirmedForgottenKey(keyId)) {
LOG(("Key destroyed!")); LOG(("Key destroyed!"));
setKeyForWrite(dcId, nullptr); dcKeyChanged(dcId, nullptr);
} else { } else {
LOG(("Key already is different.")); LOG(("Key already is different."));
} }
@ -1753,10 +1761,12 @@ void Instance::logout(RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail) {
_private->logout(onDone, onFail); _private->logout(onDone, onFail);
} }
void Instance::setKeyForWrite(DcId dcId, const AuthKeyPtr &key) { void Instance::dcKeyChanged(DcId dcId, const AuthKeyPtr &key) {
InvokeQueued(this, [=] { _private->dcKeyChanged(dcId, key);
_private->setKeyForWrite(dcId, key); }
});
rpl::producer<DcId> Instance::dcKeyChanged() const {
return _private->dcKeyChanged();
} }
AuthKeysList Instance::getKeysForWrite() const { AuthKeysList Instance::getKeysForWrite() const {

View File

@ -63,9 +63,10 @@ public:
// Thread-safe. // Thread-safe.
[[nodiscard]] QString deviceModel() const; [[nodiscard]] QString deviceModel() const;
[[nodiscard]] QString systemVersion() const; [[nodiscard]] QString systemVersion() const;
void setKeyForWrite(DcId dcId, const AuthKeyPtr &key);
// Main thread. // Main thread.
void dcKeyChanged(DcId dcId, const AuthKeyPtr &key);
[[nodiscard]] rpl::producer<DcId> dcKeyChanged() const;
[[nodiscard]] AuthKeysList getKeysForWrite() const; [[nodiscard]] AuthKeysList getKeysForWrite() const;
void addKeysForDestroy(AuthKeysList &&keys); void addKeysForDestroy(AuthKeysList &&keys);
@ -110,6 +111,8 @@ public:
bool isKeysDestroyer() const; bool isKeysDestroyer() const;
void scheduleKeyDestroy(ShiftedDcId shiftedDcId); void scheduleKeyDestroy(ShiftedDcId shiftedDcId);
void checkIfKeyWasDestroyed(ShiftedDcId shiftedDcId); void checkIfKeyWasDestroyed(ShiftedDcId shiftedDcId);
// Main thread.
void keyDestroyedOnServer(DcId dcId, uint64 keyId); void keyDestroyedOnServer(DcId dcId, uint64 keyId);
void requestConfig(); void requestConfig();

View File

@ -64,6 +64,19 @@ ConnectionOptions::ConnectionOptions(
, useTcp(useTcp) { , useTcp(useTcp) {
} }
template <typename Callback>
void SessionData::withSession(Callback &&callback) {
QMutexLocker lock(&_ownerMutex);
if (const auto session = _owner) {
InvokeQueued(session, [
session,
callback = std::forward<Callback>(callback)
] {
callback(session);
});
}
}
void SessionData::setCurrentKeyId(uint64 keyId) { void SessionData::setCurrentKeyId(uint64 keyId) {
QWriteLocker locker(&_lock); QWriteLocker locker(&_lock);
if (_keyId == keyId) { if (_keyId == keyId) {
@ -136,6 +149,115 @@ void SessionData::clearForNewKey(not_null<Instance*> instance) {
instance->clearCallbacksDelayed(std::move(clearCallbacks)); instance->clearCallbacksDelayed(std::move(clearCallbacks));
} }
void SessionData::queueTryToReceive() {
withSession([](not_null<Session*> session) {
session->tryToReceive();
});
}
void SessionData::queueNeedToResumeAndSend() {
withSession([](not_null<Session*> session) {
session->needToResumeAndSend();
});
}
void SessionData::queueConnectionStateChange(int newState) {
withSession([=](not_null<Session*> session) {
session->connectionStateChange(newState);
});
}
void SessionData::queueResendAll() {
withSession([](not_null<Session*> session) {
session->resendAll();
});
}
void SessionData::queueResetDone() {
withSession([](not_null<Session*> session) {
session->resetDone();
});
}
void SessionData::queueSendAnything(crl::time msCanWait) {
withSession([=](not_null<Session*> session) {
session->sendAnything(msCanWait);
});
}
void SessionData::queueSendMsgsStateInfo(quint64 msgId, QByteArray data) {
withSession([=](not_null<Session*> session) {
session->sendMsgsStateInfo(msgId, data);
});
}
void SessionData::queueResend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer,
bool sendMsgStateInfo) {
withSession([=](not_null<Session*> session) {
session->resend(msgId, msCanWait, forceContainer, sendMsgStateInfo);
});
}
void SessionData::queueResendMany(
QVector<mtpMsgId> msgIds,
crl::time msCanWait,
bool forceContainer,
bool sendMsgStateInfo) {
withSession([=](not_null<Session*> session) {
for (const auto msgId : msgIds) {
session->resend(
msgId,
msCanWait,
forceContainer,
sendMsgStateInfo);
}
});
}
bool SessionData::connectionInited() const {
QMutexLocker lock(&_ownerMutex);
return _owner ? _owner->connectionInited() : false;
}
AuthKeyPtr SessionData::getKey() const {
QMutexLocker lock(&_ownerMutex);
return _owner ? _owner->getKey() : nullptr;
}
bool SessionData::acquireKeyCreation() {
QMutexLocker lock(&_ownerMutex);
return _owner ? _owner->acquireKeyCreation() : false;
}
void SessionData::releaseKeyCreationOnDone(const AuthKeyPtr &key) {
QMutexLocker lock(&_ownerMutex);
if (_owner) {
_owner->releaseKeyCreationOnDone(key);
}
}
void SessionData::releaseKeyCreationOnFail() {
QMutexLocker lock(&_ownerMutex);
if (_owner) {
_owner->releaseKeyCreationOnFail();
}
}
void SessionData::destroyCdnKey(uint64 keyId) {
QMutexLocker lock(&_ownerMutex);
if (_owner) {
_owner->destroyCdnKey(keyId);
}
}
void SessionData::detach() {
QMutexLocker lock(&_ownerMutex);
_owner = nullptr;
}
Session::Session( Session::Session(
not_null<Instance*> instance, not_null<Instance*> instance,
ShiftedDcId shiftedDcId, ShiftedDcId shiftedDcId,
@ -145,19 +267,30 @@ Session::Session(
, _shiftedDcId(shiftedDcId) , _shiftedDcId(shiftedDcId)
, _ownedDc(dc ? nullptr : std::make_unique<Dcenter>(shiftedDcId, nullptr)) , _ownedDc(dc ? nullptr : std::make_unique<Dcenter>(shiftedDcId, nullptr))
, _dc(dc ? dc : _ownedDc.get()) , _dc(dc ? dc : _ownedDc.get())
, _data(this) , _data(std::make_shared<SessionData>(this))
, _timeouter([=] { checkRequestsByTimer(); }) , _timeouter([=] { checkRequestsByTimer(); })
, _sender([=] { needToResumeAndSend(); }) { , _sender([=] { needToResumeAndSend(); }) {
_timeouter.callEach(1000); _timeouter.callEach(1000);
refreshOptions(); refreshOptions();
if (sharedDc()) { if (sharedDc()) {
connect(_dc, SIGNAL(authKeyChanged()), this, SLOT(authKeyChangedForDC()), Qt::QueuedConnection); watchDcKeyChanges();
} }
} }
void Session::watchDcKeyChanges() {
_instance->dcKeyChanged(
) | rpl::filter([=](DcId dcId) {
return (dcId == _shiftedDcId) || (dcId == BareDcId(_shiftedDcId));
}) | rpl::start_with_next([=] {
DEBUG_LOG(("AuthKey Info: Session::authKeyCreatedForDC slot, "
"emitting authKeyChanged(), dcWithShift %1").arg(_shiftedDcId));
emit authKeyChanged();
}, _lifetime);
}
void Session::start() { void Session::start() {
_connection = std::make_unique<Connection>(_instance); _connection = std::make_unique<Connection>(_instance);
_connection->start(&_data, _shiftedDcId); _connection->start(_data, _shiftedDcId);
if (_instance->isKeysDestroyer()) { if (_instance->isKeysDestroyer()) {
_instance->scheduleKeyDestroy(_shiftedDcId); _instance->scheduleKeyDestroy(_shiftedDcId);
} }
@ -189,7 +322,7 @@ void Session::refreshOptions() {
const auto useHttp = (proxyType != ProxyData::Type::Mtproto); const auto useHttp = (proxyType != ProxyData::Type::Mtproto);
const auto useIPv4 = true; const auto useIPv4 = true;
const auto useIPv6 = Global::TryIPv6(); const auto useIPv6 = Global::TryIPv6();
_data.setConnectionOptions(ConnectionOptions( _data->setConnectionOptions(ConnectionOptions(
_instance->systemLangCode(), _instance->systemLangCode(),
_instance->cloudLangCode(), _instance->cloudLangCode(),
_instance->langPackName(), _instance->langPackName(),
@ -222,22 +355,25 @@ void Session::stop() {
void Session::kill() { void Session::kill() {
stop(); stop();
_killed = true; _killed = true;
_data->detach();
DEBUG_LOG(("Session Info: marked session dcWithShift %1 as killed").arg(_shiftedDcId)); DEBUG_LOG(("Session Info: marked session dcWithShift %1 as killed").arg(_shiftedDcId));
} }
void Session::unpaused() { void Session::unpaused() {
if (_needToReceive) { if (_needToReceive) {
_needToReceive = false; _needToReceive = false;
QTimer::singleShot(0, this, SLOT(tryToReceive())); InvokeQueued(this, [=] {
tryToReceive();
});
} }
} }
void Session::sendDcKeyCheck(const AuthKeyPtr &key) { void Session::sendDcKeyCheck(const AuthKeyPtr &key) {
_data.setKeyForCheck(key); _data->setKeyForCheck(key);
needToResumeAndSend(); needToResumeAndSend();
} }
void Session::sendAnything(qint64 msCanWait) { void Session::sendAnything(crl::time msCanWait) {
if (_killed) { if (_killed) {
DEBUG_LOG(("Session Error: can't send anything in a killed session")); DEBUG_LOG(("Session Error: can't send anything in a killed session"));
return; return;
@ -284,12 +420,6 @@ void Session::needToResumeAndSend() {
} }
} }
void Session::sendPong(quint64 msgId, quint64 pingId) {
_instance->sendProtocolMessage(
_shiftedDcId,
MTPPong(MTP_pong(MTP_long(msgId), MTP_long(pingId))));
}
void Session::sendMsgsStateInfo(quint64 msgId, QByteArray data) { void Session::sendMsgsStateInfo(quint64 msgId, QByteArray data) {
auto info = bytes::vector(); auto info = bytes::vector();
if (!data.isEmpty()) { if (!data.isEmpty()) {
@ -312,8 +442,8 @@ void Session::checkRequestsByTimer() {
QVector<mtpMsgId> stateRequestIds; QVector<mtpMsgId> stateRequestIds;
{ {
QReadLocker locker(_data.haveSentMutex()); QReadLocker locker(_data->haveSentMutex());
auto &haveSent = _data.haveSentMap(); auto &haveSent = _data->haveSentMap();
const auto haveSentCount = haveSent.size(); const auto haveSentCount = haveSent.size();
auto ms = crl::now(); auto ms = crl::now();
for (auto i = haveSent.begin(), e = haveSent.end(); i != e; ++i) { for (auto i = haveSent.begin(), e = haveSent.end(); i != e; ++i) {
@ -340,9 +470,9 @@ void Session::checkRequestsByTimer() {
if (stateRequestIds.size()) { if (stateRequestIds.size()) {
DEBUG_LOG(("MTP Info: requesting state of msgs: %1").arg(LogIds(stateRequestIds))); DEBUG_LOG(("MTP Info: requesting state of msgs: %1").arg(LogIds(stateRequestIds)));
{ {
QWriteLocker locker(_data.stateRequestMutex()); QWriteLocker locker(_data->stateRequestMutex());
for (uint32 i = 0, l = stateRequestIds.size(); i < l; ++i) { for (uint32 i = 0, l = stateRequestIds.size(); i < l; ++i) {
_data.stateRequestMap().insert(stateRequestIds[i], true); _data->stateRequestMap().insert(stateRequestIds[i], true);
} }
} }
sendAnything(kCheckResendWaiting); sendAnything(kCheckResendWaiting);
@ -356,8 +486,8 @@ void Session::checkRequestsByTimer() {
if (!removingIds.isEmpty()) { if (!removingIds.isEmpty()) {
auto clearCallbacks = std::vector<RPCCallbackClear>(); auto clearCallbacks = std::vector<RPCCallbackClear>();
{ {
QWriteLocker locker(_data.haveSentMutex()); QWriteLocker locker(_data->haveSentMutex());
auto &haveSent = _data.haveSentMap(); auto &haveSent = _data->haveSentMap();
for (uint32 i = 0, l = removingIds.size(); i < l; ++i) { for (uint32 i = 0, l = removingIds.size(); i < l; ++i) {
auto j = haveSent.find(removingIds[i]); auto j = haveSent.find(removingIds[i]);
if (j != haveSent.cend()) { if (j != haveSent.cend()) {
@ -372,28 +502,28 @@ void Session::checkRequestsByTimer() {
} }
} }
void Session::onConnectionStateChange(qint32 newState) { void Session::connectionStateChange(int newState) {
_instance->onStateChange(_shiftedDcId, newState); _instance->onStateChange(_shiftedDcId, newState);
} }
void Session::onResetDone() { void Session::resetDone() {
_instance->onSessionReset(_shiftedDcId); _instance->onSessionReset(_shiftedDcId);
} }
void Session::cancel(mtpRequestId requestId, mtpMsgId msgId) { void Session::cancel(mtpRequestId requestId, mtpMsgId msgId) {
if (requestId) { if (requestId) {
QWriteLocker locker(_data.toSendMutex()); QWriteLocker locker(_data->toSendMutex());
_data.toSendMap().remove(requestId); _data->toSendMap().remove(requestId);
} }
if (msgId) { if (msgId) {
QWriteLocker locker(_data.haveSentMutex()); QWriteLocker locker(_data->haveSentMutex());
_data.haveSentMap().remove(msgId); _data->haveSentMap().remove(msgId);
} }
} }
void Session::ping() { void Session::ping() {
_ping = true; _ping = true;
sendAnything(0); sendAnything();
} }
int32 Session::requestState(mtpRequestId requestId) const { int32 Session::requestState(mtpRequestId requestId) const {
@ -419,8 +549,8 @@ int32 Session::requestState(mtpRequestId requestId) const {
} }
if (!requestId) return MTP::RequestSent; if (!requestId) return MTP::RequestSent;
QWriteLocker locker(_data.toSendMutex()); QWriteLocker locker(_data->toSendMutex());
const auto &toSend = _data.toSendMap(); const auto &toSend = _data->toSendMap();
const auto i = toSend.constFind(requestId); const auto i = toSend.constFind(requestId);
if (i != toSend.cend()) { if (i != toSend.cend()) {
return MTP::RequestSending; return MTP::RequestSending;
@ -456,11 +586,15 @@ QString Session::transport() const {
return _connection ? _connection->transport() : QString(); return _connection ? _connection->transport() : QString();
} }
mtpRequestId Session::resend(quint64 msgId, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) { mtpRequestId Session::resend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer,
bool sendMsgStateInfo) {
SecureRequest request; SecureRequest request;
{ {
QWriteLocker locker(_data.haveSentMutex()); QWriteLocker locker(_data->haveSentMutex());
auto &haveSent = _data.haveSentMap(); auto &haveSent = _data->haveSentMap();
auto i = haveSent.find(msgId); auto i = haveSent.find(msgId);
if (i == haveSent.end()) { if (i == haveSent.end()) {
@ -493,8 +627,8 @@ mtpRequestId Session::resend(quint64 msgId, qint64 msCanWait, bool forceContaine
request->msDate = forceContainer ? 0 : crl::now(); request->msDate = forceContainer ? 0 : crl::now();
sendPrepared(request, msCanWait, false); sendPrepared(request, msCanWait, false);
{ {
QWriteLocker locker(_data.toResendMutex()); QWriteLocker locker(_data->toResendMutex());
_data.toResendMap().insert(msgId, request->requestId); _data->toResendMap().insert(msgId, request->requestId);
} }
return request->requestId; return request->requestId;
} else { } else {
@ -502,17 +636,11 @@ mtpRequestId Session::resend(quint64 msgId, qint64 msCanWait, bool forceContaine
} }
} }
void Session::resendMany(QVector<quint64> msgIds, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo) {
for (int32 i = 0, l = msgIds.size(); i < l; ++i) {
resend(msgIds.at(i), msCanWait, forceContainer, sendMsgStateInfo);
}
}
void Session::resendAll() { void Session::resendAll() {
QVector<mtpMsgId> toResend; QVector<mtpMsgId> toResend;
{ {
QReadLocker locker(_data.haveSentMutex()); QReadLocker locker(_data->haveSentMutex());
const auto &haveSent = _data.haveSentMap(); const auto &haveSent = _data->haveSentMap();
toResend.reserve(haveSent.size()); toResend.reserve(haveSent.size());
for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) { for (auto i = haveSent.cbegin(), e = haveSent.cend(); i != e; ++i) {
if (i.value()->requestId) { if (i.value()->requestId) {
@ -532,8 +660,8 @@ void Session::sendPrepared(
DEBUG_LOG(("MTP Info: adding request to toSendMap, msCanWait %1" DEBUG_LOG(("MTP Info: adding request to toSendMap, msCanWait %1"
).arg(msCanWait)); ).arg(msCanWait));
{ {
QWriteLocker locker(_data.toSendMutex()); QWriteLocker locker(_data->toSendMutex());
_data.toSendMap().insert(request->requestId, request); _data->toSendMap().insert(request->requestId, request);
if (newRequest) { if (newRequest) {
*(mtpMsgId*)(request->data() + 4) = 0; *(mtpMsgId*)(request->data() + 4) = 0;
@ -546,29 +674,37 @@ void Session::sendPrepared(
sendAnything(msCanWait); sendAnything(msCanWait);
} }
void Session::authKeyChangedForDC() {
DEBUG_LOG(("AuthKey Info: Session::authKeyCreatedForDC slot, emitting authKeyChanged(), dcWithShift %1").arg(_shiftedDcId));
emit authKeyChanged();
}
bool Session::acquireKeyCreation() { bool Session::acquireKeyCreation() {
return _dc->acquireKeyCreation(); Expects(!_myKeyCreation);
if (!_dc->acquireKeyCreation()) {
return false;
}
_myKeyCreation = true;
return true;
} }
void Session::releaseKeyCreationOnFail() { void Session::releaseKeyCreationOnFail() {
Expects(_myKeyCreation);
_dc->releaseKeyCreationOnFail(); _dc->releaseKeyCreationOnFail();
_myKeyCreation = false;
} }
void Session::releaseKeyCreationOnDone(AuthKeyPtr &&key) { void Session::releaseKeyCreationOnDone(const AuthKeyPtr &key) {
Expects(_myKeyCreation);
DEBUG_LOG(("AuthKey Info: Session key created, setting, dcWithShift %1").arg(_shiftedDcId)); DEBUG_LOG(("AuthKey Info: Session key created, setting, dcWithShift %1").arg(_shiftedDcId));
_dc->releaseKeyCreationOnDone(key);
_myKeyCreation = false;
if (sharedDc()) { if (sharedDc()) {
const auto dcId = _dc->id(); const auto dcId = _dc->id();
const auto instance = _instance; const auto instance = _instance;
InvokeQueued(instance, [=] { InvokeQueued(instance, [=] {
instance->setKeyForWrite(dcId, key); instance->dcKeyChanged(dcId, key);
}); });
} }
_dc->releaseKeyCreationOnDone(std::move(key));
} }
void Session::notifyDcConnectionInited() { void Session::notifyDcConnectionInited() {
@ -577,12 +713,14 @@ void Session::notifyDcConnectionInited() {
} }
void Session::destroyCdnKey(uint64 keyId) { void Session::destroyCdnKey(uint64 keyId) {
_dc->destroyCdnKey(keyId); if (!_dc->destroyCdnKey(keyId)) {
return;
}
if (sharedDc()) { if (sharedDc()) {
const auto dcId = _dc->id(); const auto dcId = _dc->id();
const auto instance = _instance; const auto instance = _instance;
InvokeQueued(instance, [=] { InvokeQueued(instance, [=] {
instance->setKeyForWrite(dcId, nullptr); instance->dcKeyChanged(dcId, nullptr);
}); });
} }
} }
@ -613,11 +751,11 @@ void Session::tryToReceive() {
auto isUpdate = false; auto isUpdate = false;
auto message = SerializedMessage(); auto message = SerializedMessage();
{ {
QWriteLocker locker(_data.haveReceivedMutex()); QWriteLocker locker(_data->haveReceivedMutex());
auto &responses = _data.haveReceivedResponses(); auto &responses = _data->haveReceivedResponses();
auto response = responses.begin(); auto response = responses.begin();
if (response == responses.cend()) { if (response == responses.cend()) {
auto &updates = _data.haveReceivedUpdates(); auto &updates = _data->haveReceivedUpdates();
auto update = updates.begin(); auto update = updates.begin();
if (update == updates.cend()) { if (update == updates.cend()) {
return; return;
@ -643,6 +781,9 @@ void Session::tryToReceive() {
} }
Session::~Session() { Session::~Session() {
if (_myKeyCreation) {
releaseKeyCreationOnFail();
}
Assert(_connection == nullptr); Assert(_connection == nullptr);
} }

View File

@ -249,10 +249,8 @@ public:
return _stateRequest; return _stateRequest;
} }
not_null<Session*> owner() { // Warning! Valid only in constructor, _owner is guaranteed != null.
return _owner; [[nodiscard]] not_null<Session*> owner() {
}
not_null<const Session*> owner() const {
return _owner; return _owner;
} }
@ -265,13 +263,45 @@ public:
void clearForNewKey(not_null<Instance*> instance); void clearForNewKey(not_null<Instance*> instance);
// Connection -> Session interface.
void queueTryToReceive();
void queueNeedToResumeAndSend();
void queueConnectionStateChange(int newState);
void queueResendAll();
void queueResetDone();
void queueSendAnything(crl::time msCanWait = 0);
void queueSendMsgsStateInfo(quint64 msgId, QByteArray data);
void queueResend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer,
bool sendMsgStateInfo);
void queueResendMany(
QVector<mtpMsgId> msgIds,
crl::time msCanWait,
bool forceContainer,
bool sendMsgStateInfo);
[[nodiscard]] bool connectionInited() const;
[[nodiscard]] AuthKeyPtr getKey() const;
[[nodiscard]] bool acquireKeyCreation();
void releaseKeyCreationOnDone(const AuthKeyPtr &key);
void releaseKeyCreationOnFail();
void destroyCdnKey(uint64 keyId);
void detach();
private: private:
template <typename Callback>
void withSession(Callback &&callback);
uint64 _keyId = 0; uint64 _keyId = 0;
uint64 _sessionId = 0; uint64 _sessionId = 0;
uint64 _salt = 0; uint64 _salt = 0;
uint32 _messagesSent = 0; uint32 _messagesSent = 0;
not_null<Session*> _owner; Session *_owner = nullptr;
mutable QMutex _ownerMutex;
AuthKeyPtr _dcKeyForCheck; AuthKeyPtr _dcKeyForCheck;
ConnectionOptions _options; ConnectionOptions _options;
@ -327,7 +357,7 @@ public:
// Connection thread. // Connection thread.
[[nodiscard]] bool acquireKeyCreation(); [[nodiscard]] bool acquireKeyCreation();
void releaseKeyCreationOnFail(); void releaseKeyCreationOnFail();
void releaseKeyCreationOnDone(AuthKeyPtr &&key); void releaseKeyCreationOnDone(const AuthKeyPtr &key);
void destroyCdnKey(uint64 keyId); void destroyCdnKey(uint64 keyId);
void notifyDcConnectionInited(); void notifyDcConnectionInited();
@ -346,32 +376,29 @@ public:
crl::time msCanWait = 0, crl::time msCanWait = 0,
bool newRequest = true); bool newRequest = true);
void tryToReceive();
void needToResumeAndSend();
void connectionStateChange(int newState);
void resendAll(); // After connection restart.
void resetDone();
void sendAnything(crl::time msCanWait = 0);
void sendMsgsStateInfo(quint64 msgId, QByteArray data);
mtpRequestId resend(
mtpMsgId msgId,
crl::time msCanWait = 0,
bool forceContainer = false,
bool sendMsgStateInfo = false);
signals: signals:
void authKeyChanged(); void authKeyChanged();
void needToSend(); void needToSend();
void needToPing(); void needToPing();
void needToRestart(); void needToRestart();
public slots:
void needToResumeAndSend();
mtpRequestId resend(quint64 msgId, qint64 msCanWait = 0, bool forceContainer = false, bool sendMsgStateInfo = false);
void resendMany(QVector<quint64> msgIds, qint64 msCanWait, bool forceContainer, bool sendMsgStateInfo);
void resendAll(); // after connection restart
void authKeyChangedForDC();
void tryToReceive();
void onConnectionStateChange(qint32 newState);
void onResetDone();
void sendAnything(qint64 msCanWait = 0);
void sendPong(quint64 msgId, quint64 pingId);
void sendMsgsStateInfo(quint64 msgId, QByteArray data);
private: private:
[[nodiscard]] bool sharedDc() const; [[nodiscard]] bool sharedDc() const;
void checkRequestsByTimer(); void checkRequestsByTimer();
void watchDcKeyChanges();
bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err); bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err);
@ -379,15 +406,15 @@ private:
const ShiftedDcId _shiftedDcId = 0; const ShiftedDcId _shiftedDcId = 0;
const std::unique_ptr<Dcenter> _ownedDc; const std::unique_ptr<Dcenter> _ownedDc;
const not_null<Dcenter*> _dc; const not_null<Dcenter*> _dc;
const std::shared_ptr<SessionData> _data;
std::unique_ptr<Connection> _connection; std::unique_ptr<Connection> _connection;
bool _killed = false; bool _killed = false;
bool _needToReceive = false; bool _needToReceive = false;
SessionData _data;
AuthKeyPtr _dcKeyForCheck; AuthKeyPtr _dcKeyForCheck;
bool _myKeyCreation = false;
crl::time _msSendCall = 0; crl::time _msSendCall = 0;
crl::time _msWait = 0; crl::time _msWait = 0;
@ -397,6 +424,8 @@ private:
base::Timer _timeouter; base::Timer _timeouter;
base::Timer _sender; base::Timer _sender;
rpl::lifetime _lifetime;
}; };
} // namespace internal } // namespace internal