Make MTP::Session handle the connection thread.

This commit is contained in:
John Preston 2019-11-27 17:22:22 +03:00
parent 06f5f7f7d9
commit c742d7406c
10 changed files with 277 additions and 367 deletions

View File

@ -453,18 +453,14 @@ void Account::destroyMtpKeys(MTP::AuthKeysList &&keys) {
Core::App().dcOptions(),
MTP::Instance::Mode::KeysDestroyer,
std::move(destroyConfig));
QObject::connect(
_mtpForKeysDestroy.get(),
&MTP::Instance::allKeysDestroyed,
[=] { allKeysDestroyed(); });
}
void Account::allKeysDestroyed() {
LOG(("MTP Info: all keys scheduled for destroy are destroyed."));
crl::on_main(this, [=] {
_mtpForKeysDestroy = nullptr;
Local::writeMtpData();
});
_mtpForKeysDestroy->allKeysDestroyed(
) | rpl::start_with_next([=] {
LOG(("MTP Info: all keys scheduled for destroy are destroyed."));
crl::on_main(this, [=] {
_mtpForKeysDestroy = nullptr;
Local::writeMtpData();
});
}, _lifetime);
}
void Account::suggestMainDcId(MTP::DcId mainDcId) {

View File

@ -85,7 +85,6 @@ private:
void watchSessionChanges();
void destroyMtpKeys(MTP::AuthKeysList &&keys);
void allKeysDestroyed();
void resetAuthorizationKeys();
void loggedOut();

View File

@ -114,75 +114,46 @@ void WrapInvokeAfter(
} // namespace
Connection::Connection(not_null<Instance*> instance)
: _instance(instance) {
Connection::Connection(
not_null<Instance*> instance,
not_null<QThread*> thread,
std::shared_ptr<SessionData> data,
ShiftedDcId shiftedDcId)
: QObject(nullptr)
, _instance(instance)
, _shiftedDcId(shiftedDcId)
, _realDcType(_instance->dcOptions()->dcType(_shiftedDcId))
, _currentDcType(_realDcType)
, _state(DisconnectedState)
, _retryTimer(thread, [=] { retryByTimer(); })
, _oldConnectionTimer(thread, [=] { markConnectionOld(); })
, _waitForConnectedTimer(thread, [=] { waitConnectedFailed(); })
, _waitForReceivedTimer(thread, [=] { waitReceivedFailed(); })
, _waitForBetterTimer(thread, [=] { waitBetterFailed(); })
, _waitForReceived(kMinReceiveTimeout)
, _waitForConnected(kMinConnectedTimeout)
, _pingSender(thread, [=] { sendPingByTimer(); })
, _checkSentRequestsTimer(thread, [=] { checkSentRequests(); })
, _sessionData(std::move(data)) {
Expects(_shiftedDcId != 0);
moveToThread(thread);
connect(thread, &QThread::started, this, [=] {
_checkSentRequestsTimer.callEach(kCheckSentRequestsEach);
connectToServer();
});
}
Connection::~Connection() {
Expects(_private == nullptr);
releaseKeyCreationOnFail();
doDisconnect();
if (_thread) {
waitTillFinish();
}
Expects(!_connection);
Expects(_testConnections.empty());
}
void Connection::start(
std::shared_ptr<SessionData> sessionData,
ShiftedDcId shiftedDcId) {
Expects(_thread == nullptr && _private == nullptr);
_thread = std::make_unique<QThread>();
auto newData = std::make_unique<ConnectionPrivate>(
_instance,
_thread.get(),
this,
std::move(sessionData),
shiftedDcId);
_instance->dcOptions()->changed(
) | rpl::filter([=](DcId dcId) {
return (BareDcId(shiftedDcId) == dcId) && (_private != nullptr);
}) | rpl::start_with_next([=] {
const auto raw = _private;
InvokeQueued(raw, [=] {
raw->dcOptionsChanged();
});
}, _lifetime);
// will be deleted in the thread::finished signal
_private = newData.release();
_thread->start();
}
void Connection::kill() {
Expects(_private != nullptr && _thread != nullptr);
_private->stop();
_private = nullptr;
_thread->quit();
}
void Connection::waitTillFinish() {
Expects(_private == nullptr && _thread != nullptr);
DEBUG_LOG(("Waiting for connectionThread to finish"));
_thread->wait();
_thread.reset();
}
int32 Connection::state() const {
Expects(_private != nullptr && _thread != nullptr);
return _private->getState();
}
QString Connection::transport() const {
Expects(_private != nullptr && _thread != nullptr);
return _private->transport();
}
void ConnectionPrivate::appendTestConnection(
void Connection::appendTestConnection(
DcOptions::Variants::Protocol protocol,
const QString &ip,
int port,
@ -231,7 +202,7 @@ void ConnectionPrivate::appendTestConnection(
});
}
int16 ConnectionPrivate::getProtocolDcId() const {
int16 Connection::getProtocolDcId() const {
const auto dcId = BareDcId(_shiftedDcId);
const auto simpleDcId = isTemporaryDcId(dcId)
? getRealIdFromTemporaryDcId(dcId)
@ -244,7 +215,7 @@ int16 ConnectionPrivate::getProtocolDcId() const {
: testedDcId;
}
void ConnectionPrivate::checkSentRequests() {
void Connection::checkSentRequests() {
// Remove very old (10 minutes) containers and resend requests.
auto removingIds = std::vector<mtpMsgId>();
auto requesting = false;
@ -285,7 +256,7 @@ void ConnectionPrivate::checkSentRequests() {
}
}
void ConnectionPrivate::destroyAllConnections() {
void Connection::destroyAllConnections() {
clearUnboundKeyCreator();
_waitForBetterTimer.cancel();
_waitForReceivedTimer.cancel();
@ -294,67 +265,20 @@ void ConnectionPrivate::destroyAllConnections() {
_connection = nullptr;
}
ConnectionPrivate::ConnectionPrivate(
not_null<Instance*> instance,
not_null<QThread*> thread,
not_null<Connection*> owner,
std::shared_ptr<SessionData> data,
ShiftedDcId shiftedDcId)
: QObject(nullptr)
, _instance(instance)
, _owner(owner)
, _shiftedDcId(shiftedDcId)
, _realDcType(_instance->dcOptions()->dcType(_shiftedDcId))
, _currentDcType(_realDcType)
, _state(DisconnectedState)
, _retryTimer(thread, [=] { retryByTimer(); })
, _oldConnectionTimer(thread, [=] { markConnectionOld(); })
, _waitForConnectedTimer(thread, [=] { waitConnectedFailed(); })
, _waitForReceivedTimer(thread, [=] { waitReceivedFailed(); })
, _waitForBetterTimer(thread, [=] { waitBetterFailed(); })
, _waitForReceived(kMinReceiveTimeout)
, _waitForConnected(kMinConnectedTimeout)
, _pingSender(thread, [=] { sendPingByTimer(); })
, _checkSentRequestsTimer(thread, [=] { checkSentRequests(); })
, _sessionData(std::move(data)) {
Expects(_shiftedDcId != 0);
moveToThread(thread);
connect(thread, &QThread::started, this, [=] {
_checkSentRequestsTimer.callEach(kCheckSentRequestsEach);
connectToServer();
});
connect(thread, &QThread::finished, this, [=] { finishAndDestroy(); });
connect(_sessionData->owner(), SIGNAL(authKeyChanged()), this, SLOT(updateAuthKey()), Qt::QueuedConnection);
connect(_sessionData->owner(), SIGNAL(needToRestart()), this, SLOT(restartNow()), Qt::QueuedConnection);
connect(_sessionData->owner(), SIGNAL(needToSend()), this, SLOT(tryToSend()), Qt::QueuedConnection);
connect(_sessionData->owner(), SIGNAL(needToPing()), this, SLOT(onPingSendForce()), Qt::QueuedConnection);
void Connection::cdnConfigChanged() {
connectToServer(true);
}
ConnectionPrivate::~ConnectionPrivate() {
releaseKeyCreationOnFail();
Expects(_finished);
Expects(!_connection);
Expects(_testConnections.empty());
}
void ConnectionPrivate::onCDNConfigLoaded() {
restart();
}
int32 ConnectionPrivate::getShiftedDcId() const {
int32 Connection::getShiftedDcId() const {
return _shiftedDcId;
}
void ConnectionPrivate::dcOptionsChanged() {
void Connection::dcOptionsChanged() {
_retryTimeout = 1;
connectToServer(true);
}
int32 ConnectionPrivate::getState() const {
int32 Connection::getState() const {
QReadLocker lock(&_stateMutex);
int32 result = _state;
if (_state < 0) {
@ -368,7 +292,7 @@ int32 ConnectionPrivate::getState() const {
return result;
}
QString ConnectionPrivate::transport() const {
QString Connection::transport() const {
QReadLocker lock(&_stateMutex);
if (!_connection || (_state < 0)) {
return QString();
@ -378,8 +302,8 @@ QString ConnectionPrivate::transport() const {
return _connection->transport();
}
bool ConnectionPrivate::setState(int32 state, int32 ifState) {
if (ifState != Connection::UpdateAlways) {
bool Connection::setState(int state, int ifState) {
if (ifState != kUpdateStateAlways) {
QReadLocker lock(&_stateMutex);
if (_state != ifState) {
return false;
@ -402,7 +326,7 @@ bool ConnectionPrivate::setState(int32 state, int32 ifState) {
return true;
}
void ConnectionPrivate::resetSession() {
void Connection::resetSession() {
MTP_LOG(_shiftedDcId, ("Resetting session!"));
_needSessionReset = false;
@ -412,7 +336,7 @@ void ConnectionPrivate::resetSession() {
_sessionData->queueResetDone();
}
void ConnectionPrivate::changeSessionId() {
void Connection::changeSessionId() {
auto sessionId = _sessionId;
do {
sessionId = openssl::RandomValue<uint64>();
@ -429,13 +353,13 @@ void ConnectionPrivate::changeSessionId() {
_receivedMessageIds.clear();
}
uint32 ConnectionPrivate::nextRequestSeqNumber(bool needAck) {
uint32 Connection::nextRequestSeqNumber(bool needAck) {
const auto result = _messagesCounter;
_messagesCounter += (needAck ? 1 : 0);
return result * 2 + (needAck ? 1 : 0);
}
bool ConnectionPrivate::realDcTypeChanged() {
bool Connection::realDcTypeChanged() {
const auto now = _instance->dcOptions()->dcType(_shiftedDcId);
if (_realDcType == now) {
return false;
@ -444,7 +368,7 @@ bool ConnectionPrivate::realDcTypeChanged() {
return true;
}
bool ConnectionPrivate::markSessionAsStarted() {
bool Connection::markSessionAsStarted() {
if (_sessionMarkedAsStarted) {
return false;
}
@ -452,7 +376,7 @@ bool ConnectionPrivate::markSessionAsStarted() {
return true;
}
mtpMsgId ConnectionPrivate::prepareToSend(
mtpMsgId Connection::prepareToSend(
SerializedRequest &request,
mtpMsgId currentLastId,
bool forceNewMsgId) {
@ -477,7 +401,7 @@ mtpMsgId ConnectionPrivate::prepareToSend(
return currentLastId;
}
mtpMsgId ConnectionPrivate::replaceMsgId(SerializedRequest &request, mtpMsgId newId) {
mtpMsgId Connection::replaceMsgId(SerializedRequest &request, mtpMsgId newId) {
Expects(request->size() > 8);
const auto oldMsgId = request.getMsgId();
@ -535,7 +459,7 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SerializedRequest &request, mtpMsgId ne
return newId;
}
mtpMsgId ConnectionPrivate::placeToContainer(
mtpMsgId Connection::placeToContainer(
SerializedRequest &toSendRequest,
mtpMsgId &bigMsgId,
bool forceNewMsgId,
@ -554,7 +478,7 @@ mtpMsgId ConnectionPrivate::placeToContainer(
return msgId;
}
void ConnectionPrivate::tryToSend() {
void Connection::tryToSend() {
if (!_connection || !_keyId) {
return;
}
@ -920,7 +844,7 @@ void ConnectionPrivate::tryToSend() {
sendSecureRequest(std::move(toSendRequest), needAnyResponse);
}
void ConnectionPrivate::retryByTimer() {
void Connection::retryByTimer() {
if (_retryTimeout < 3) {
++_retryTimeout;
} else if (_retryTimeout == 3) {
@ -931,18 +855,14 @@ void ConnectionPrivate::retryByTimer() {
connectToServer();
}
void ConnectionPrivate::restartNow() {
void Connection::restartNow() {
_retryTimeout = 1;
_retryTimer.cancel();
restart();
}
void ConnectionPrivate::connectToServer(bool afterConfig) {
if (_finished) {
DEBUG_LOG(("MTP Error: "
"connectToServer() called for finished connection!"));
return;
} else if (afterConfig && (!_testConnections.empty() || _connection)) {
void Connection::connectToServer(bool afterConfig) {
if (afterConfig && (!_testConnections.empty() || _connection)) {
return;
}
@ -1046,7 +966,7 @@ void ConnectionPrivate::connectToServer(bool afterConfig) {
_waitForConnectedTimer.callOnce(_waitForConnected);
}
void ConnectionPrivate::restart() {
void Connection::restart() {
DEBUG_LOG(("MTP Info: restarting Connection"));
_waitForReceivedTimer.cancel();
@ -1066,7 +986,7 @@ void ConnectionPrivate::restart() {
setState(-_retryTimeout);
}
void ConnectionPrivate::onSentSome(uint64 size) {
void Connection::onSentSome(uint64 size) {
if (!_waitForReceivedTimer.isActive()) {
auto remain = static_cast<uint64>(_waitForReceived);
if (!_oldConnection) {
@ -1087,7 +1007,7 @@ void ConnectionPrivate::onSentSome(uint64 size) {
if (!_firstSentAt) _firstSentAt = crl::now();
}
void ConnectionPrivate::onReceivedSome() {
void Connection::onReceivedSome() {
if (_oldConnection) {
_oldConnection = false;
DEBUG_LOG(("This connection marked as not old!"));
@ -1105,13 +1025,13 @@ void ConnectionPrivate::onReceivedSome() {
}
}
void ConnectionPrivate::markConnectionOld() {
void Connection::markConnectionOld() {
_oldConnection = true;
_waitForReceived = kMinReceiveTimeout;
DEBUG_LOG(("This connection marked as old! _waitForReceived now %1ms").arg(_waitForReceived));
}
void ConnectionPrivate::sendPingByTimer() {
void Connection::sendPingByTimer() {
if (_pingId) {
// _pingSendAt: when to send next ping (lastPingAt + kPingSendAfter)
// could be equal to zero.
@ -1130,7 +1050,7 @@ void ConnectionPrivate::sendPingByTimer() {
}
}
void ConnectionPrivate::onPingSendForce() {
void Connection::sendPingForce() {
if (!_pingId) {
_pingSendAt = 0;
DEBUG_LOG(("Will send ping!"));
@ -1138,7 +1058,7 @@ void ConnectionPrivate::onPingSendForce() {
}
}
void ConnectionPrivate::waitReceivedFailed() {
void Connection::waitReceivedFailed() {
Expects(_connectionOptions != nullptr);
if (!_connectionOptions->useTcp) {
@ -1158,7 +1078,7 @@ void ConnectionPrivate::waitReceivedFailed() {
InvokeQueued(this, [=] { connectToServer(); });
}
void ConnectionPrivate::waitConnectedFailed() {
void Connection::waitConnectedFailed() {
DEBUG_LOG(("MTP Info: can't connect in %1ms").arg(_waitForConnected));
auto maxTimeout = kMaxConnectedTimeout;
for (const auto &connection : _testConnections) {
@ -1174,46 +1094,29 @@ void ConnectionPrivate::waitConnectedFailed() {
InvokeQueued(this, [=] { connectToServer(); });
}
void ConnectionPrivate::waitBetterFailed() {
void Connection::waitBetterFailed() {
confirmBestConnection();
}
void ConnectionPrivate::connectingTimedOut() {
void Connection::connectingTimedOut() {
for (const auto &connection : _testConnections) {
connection.data->timedOut();
}
doDisconnect();
}
void ConnectionPrivate::doDisconnect() {
void Connection::doDisconnect() {
destroyAllConnections();
setState(DisconnectedState);
}
void ConnectionPrivate::finishAndDestroy() {
doDisconnect();
_finished = true;
const auto connection = _owner;
const auto instance = _instance;
InvokeQueued(instance, [=] {
instance->connectionFinished(connection);
});
deleteLater();
}
void ConnectionPrivate::requestCDNConfig() {
connect(
_instance,
SIGNAL(cdnConfigLoaded()),
this,
SLOT(onCDNConfigLoaded()),
Qt::UniqueConnection);
void Connection::requestCDNConfig() {
InvokeQueued(_instance, [instance = _instance] {
instance->requestCDNConfig();
});
}
void ConnectionPrivate::handleReceived() {
void Connection::handleReceived() {
Expects(_encryptionKey != nullptr);
onReceivedSome();
@ -1413,7 +1316,7 @@ void ConnectionPrivate::handleReceived() {
}
}
ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
Connection::HandleResult Connection::handleOneReceived(
const mtpPrime *from,
const mtpPrime *end,
uint64 msgId,
@ -1931,7 +1834,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(
return HandleResult::Success;
}
ConnectionPrivate::HandleResult ConnectionPrivate::handleBindResponse(
Connection::HandleResult Connection::handleBindResponse(
mtpMsgId requestMsgId,
const mtpBuffer &response) {
if (!_keyCreator || !_bindMsgId || _bindMsgId != requestMsgId) {
@ -1959,7 +1862,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleBindResponse(
Unexpected("Result of BoundKeyCreator::handleBindResponse.");
}
mtpBuffer ConnectionPrivate::ungzip(const mtpPrime *from, const mtpPrime *end) const {
mtpBuffer Connection::ungzip(const mtpPrime *from, const mtpPrime *end) const {
mtpBuffer result; // * 4 because of mtpPrime type
result.resize(0);
@ -2011,7 +1914,7 @@ mtpBuffer ConnectionPrivate::ungzip(const mtpPrime *from, const mtpPrime *end) c
return result;
}
bool ConnectionPrivate::requestsFixTimeSalt(const QVector<MTPlong> &ids, int32 serverTime, uint64 serverSalt) {
bool Connection::requestsFixTimeSalt(const QVector<MTPlong> &ids, int32 serverTime, uint64 serverSalt) {
uint32 idsCount = ids.size();
for (uint32 i = 0; i < idsCount; ++i) {
@ -2026,7 +1929,7 @@ bool ConnectionPrivate::requestsFixTimeSalt(const QVector<MTPlong> &ids, int32 s
return false;
}
void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byResponse) {
void Connection::requestsAcked(const QVector<MTPlong> &ids, bool byResponse) {
uint32 idsCount = ids.size();
DEBUG_LOG(("Message Info: requests acked, ids %1").arg(LogIdsVector(ids)));
@ -2120,7 +2023,7 @@ void ConnectionPrivate::requestsAcked(const QVector<MTPlong> &ids, bool byRespon
}
}
void ConnectionPrivate::handleMsgsStates(const QVector<MTPlong> &ids, const QByteArray &states, QVector<MTPlong> &acked) {
void Connection::handleMsgsStates(const QVector<MTPlong> &ids, const QByteArray &states, QVector<MTPlong> &acked) {
uint32 idsCount = ids.size();
if (!idsCount) {
DEBUG_LOG(("Message Info: void ids vector in handleMsgsStates()"));
@ -2163,7 +2066,7 @@ void ConnectionPrivate::handleMsgsStates(const QVector<MTPlong> &ids, const QByt
}
}
void ConnectionPrivate::clearSpecialMsgId(mtpMsgId msgId) {
void Connection::clearSpecialMsgId(mtpMsgId msgId) {
if (msgId == _pingMsgId) {
_pingMsgId = 0;
_pingId = 0;
@ -2172,7 +2075,7 @@ void ConnectionPrivate::clearSpecialMsgId(mtpMsgId msgId) {
}
}
void ConnectionPrivate::resend(
void Connection::resend(
mtpMsgId msgId,
crl::time msCanWait,
bool forceContainer) {
@ -2211,7 +2114,7 @@ void ConnectionPrivate::resend(
}
}
void ConnectionPrivate::resendAll() {
void Connection::resendAll() {
auto toResend = std::vector<mtpMsgId>();
auto lock = QReadLocker(_sessionData->haveSentMutex());
@ -2230,7 +2133,7 @@ void ConnectionPrivate::resendAll() {
_sessionData->queueSendAnything();
}
void ConnectionPrivate::onConnected(
void Connection::onConnected(
not_null<AbstractConnection*> connection) {
disconnect(connection, &AbstractConnection::connected, nullptr, nullptr);
if (!connection->isConnected()) {
@ -2264,7 +2167,7 @@ void ConnectionPrivate::onConnected(
}
}
void ConnectionPrivate::onDisconnected(
void Connection::onDisconnected(
not_null<AbstractConnection*> connection) {
removeTestConnection(connection);
@ -2276,7 +2179,7 @@ void ConnectionPrivate::onDisconnected(
}
}
void ConnectionPrivate::confirmBestConnection() {
void Connection::confirmBestConnection() {
if (_waitForBetterTimer.isActive()) {
return;
}
@ -2300,7 +2203,7 @@ void ConnectionPrivate::confirmBestConnection() {
checkAuthKey();
}
void ConnectionPrivate::removeTestConnection(
void Connection::removeTestConnection(
not_null<AbstractConnection*> connection) {
_testConnections.erase(
ranges::remove(
@ -2310,7 +2213,7 @@ void ConnectionPrivate::removeTestConnection(
end(_testConnections));
}
void ConnectionPrivate::checkAuthKey() {
void Connection::checkAuthKey() {
if (_keyId) {
authKeyChecked();
} else if (_instance->isKeysDestroyer()) {
@ -2321,7 +2224,7 @@ void ConnectionPrivate::checkAuthKey() {
}
}
void ConnectionPrivate::updateAuthKey() {
void Connection::updateAuthKey() {
if (_instance->isKeysDestroyer() || _keyCreator || !_connection) {
return;
}
@ -2339,7 +2242,7 @@ void ConnectionPrivate::updateAuthKey() {
}
}
void ConnectionPrivate::setCurrentKeyId(uint64 newKeyId) {
void Connection::setCurrentKeyId(uint64 newKeyId) {
if (_keyId == newKeyId) {
return;
}
@ -2349,7 +2252,7 @@ void ConnectionPrivate::setCurrentKeyId(uint64 newKeyId) {
changeSessionId();
}
void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&encryptionKey) {
void Connection::applyAuthKey(AuthKeyPtr &&encryptionKey) {
_encryptionKey = std::move(encryptionKey);
const auto newKeyId = _encryptionKey ? _encryptionKey->keyId() : 0;
if (_keyId) {
@ -2391,7 +2294,7 @@ void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&encryptionKey) {
}
}
bool ConnectionPrivate::destroyOldEnoughPersistentKey() {
bool Connection::destroyOldEnoughPersistentKey() {
Expects(_keyCreator != nullptr);
const auto key = _keyCreator->bindPersistentKey();
@ -2410,7 +2313,7 @@ bool ConnectionPrivate::destroyOldEnoughPersistentKey() {
return true;
}
DcType ConnectionPrivate::tryAcquireKeyCreation() {
DcType Connection::tryAcquireKeyCreation() {
if (_keyCreator) {
return _currentDcType;
} else if (_instance->isKeysDestroyer()) {
@ -2484,7 +2387,7 @@ DcType ConnectionPrivate::tryAcquireKeyCreation() {
return forceUseRegular ? DcType::Regular : _realDcType;
}
void ConnectionPrivate::authKeyChecked() {
void Connection::authKeyChecked() {
connect(_connection, &AbstractConnection::receivedData, [=] {
handleReceived();
});
@ -2497,7 +2400,7 @@ void ConnectionPrivate::authKeyChecked() {
_sessionData->queueNeedToResumeAndSend();
}
void ConnectionPrivate::onError(
void Connection::onError(
not_null<AbstractConnection*> connection,
qint32 errorCode) {
if (errorCode == -429) {
@ -2517,7 +2420,7 @@ void ConnectionPrivate::onError(
}
}
void ConnectionPrivate::handleError(int errorCode) {
void Connection::handleError(int errorCode) {
destroyAllConnections();
_waitForConnectedTimer.cancel();
@ -2529,7 +2432,7 @@ void ConnectionPrivate::handleError(int errorCode) {
}
}
void ConnectionPrivate::destroyTemporaryKey() {
void Connection::destroyTemporaryKey() {
if (_instance->isKeysDestroyer()) {
LOG(("MTP Info: -404 error received in destroyer %1, assuming key was destroyed.").arg(_shiftedDcId));
_instance->keyWasPossiblyDestroyed(_shiftedDcId);
@ -2544,7 +2447,7 @@ void ConnectionPrivate::destroyTemporaryKey() {
restart();
}
bool ConnectionPrivate::sendSecureRequest(
bool Connection::sendSecureRequest(
SerializedRequest &&request,
bool needAnyResponse) {
#ifdef TDESKTOP_MTPROTO_OLD
@ -2628,7 +2531,7 @@ bool ConnectionPrivate::sendSecureRequest(
return true;
}
mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const {
mtpRequestId Connection::wasSent(mtpMsgId msgId) const {
if (msgId == _pingMsgId || msgId == _bindMsgId) {
return mtpRequestId(0xFFFFFFFF);
}
@ -2651,13 +2554,13 @@ mtpRequestId ConnectionPrivate::wasSent(mtpMsgId msgId) const {
return 0;
}
void ConnectionPrivate::clearUnboundKeyCreator() {
void Connection::clearUnboundKeyCreator() {
if (_keyCreator) {
_keyCreator->stop();
}
}
void ConnectionPrivate::releaseKeyCreationOnFail() {
void Connection::releaseKeyCreationOnFail() {
if (!_keyCreator) {
return;
}
@ -2665,8 +2568,5 @@ void ConnectionPrivate::releaseKeyCreationOnFail() {
_sessionData->releaseKeyCreationOnFail();
}
void ConnectionPrivate::stop() {
}
} // namespace internal
} // namespace MTP

View File

@ -29,72 +29,34 @@ class Instance;
namespace internal {
class AbstractConnection;
class ConnectionPrivate;
class SessionData;
class RSAPublicKey;
struct ConnectionOptions;
class Connection {
class Connection : public QObject {
public:
enum ConnectionType {
TcpConnection,
HttpConnection
};
Connection(not_null<Instance*> instance);
~Connection();
void start(std::shared_ptr<SessionData> data, ShiftedDcId shiftedDcId);
void kill();
void waitTillFinish();
static const int UpdateAlways = 666;
int32 state() const;
QString transport() const;
private:
not_null<Instance*> _instance;
std::unique_ptr<QThread> _thread;
ConnectionPrivate *_private = nullptr;
rpl::lifetime _lifetime;
};
class ConnectionPrivate : public QObject {
Q_OBJECT
public:
ConnectionPrivate(
Connection(
not_null<Instance*> instance,
not_null<QThread*> thread,
not_null<Connection*> owner,
std::shared_ptr<SessionData> data,
ShiftedDcId shiftedDcId);
~ConnectionPrivate();
~Connection();
void stop();
int32 getShiftedDcId() const;
[[nodiscard]] int32 getShiftedDcId() const;
void dcOptionsChanged();
void cdnConfigChanged();
int32 getState() const;
QString transport() const;
public slots:
void restartNow();
void onPingSendForce();
// Sessions signals, when we need to send something
void tryToSend();
[[nodiscard]] int32 getState() const;
[[nodiscard]] QString transport() const;
void updateAuthKey();
void onCDNConfigLoaded();
void restartNow();
void sendPingForce();
void tryToSend();
private:
static constexpr auto kUpdateStateAlways = 666;
struct TestConnection {
ConnectionPointer data;
int priority = 0;
@ -113,7 +75,6 @@ private:
void connectingTimedOut();
void doDisconnect();
void restart();
void finishAndDestroy();
void requestCDNConfig();
void handleError(int errorCode);
void onError(
@ -167,7 +128,7 @@ private:
void handleMsgsStates(const QVector<MTPlong> &ids, const QByteArray &states, QVector<MTPlong> &acked);
// _sessionDataMutex must be locked for read.
bool setState(int32 state, int32 ifState = Connection::UpdateAlways);
bool setState(int state, int ifState = kUpdateStateAlways);
void appendTestConnection(
DcOptions::Variants::Protocol protocol,
@ -206,13 +167,12 @@ private:
[[nodiscard]] bool realDcTypeChanged();
const not_null<Instance*> _instance;
const not_null<Connection*> _owner;
const ShiftedDcId _shiftedDcId = 0;
DcType _realDcType = DcType();
DcType _currentDcType = DcType();
mutable QReadWriteLock _stateMutex;
int32 _state = DisconnectedState;
int _state = DisconnectedState;
bool _needSessionReset = false;
@ -241,8 +201,6 @@ private:
base::Timer _pingSender;
base::Timer _checkSentRequestsTimer;
bool _finished = false;
std::shared_ptr<SessionData> _sessionData;
std::unique_ptr<ConnectionOptions> _connectionOptions;
AuthKeyPtr _encryptionKey;

View File

@ -68,6 +68,11 @@ public:
: _that(that)
, _lock(&_that->_useThroughLockers) {
}
void unlock() {
_lock.unlock();
}
~WriteLocker() {
_that->computeCdnDcIds();
}
@ -84,6 +89,10 @@ public:
: _lock(&that->_useThroughLockers) {
}
void unlock() {
_lock.unlock();
}
private:
QReadLocker _lock;
@ -517,6 +526,10 @@ rpl::producer<DcId> DcOptions::changed() const {
return _changed.events();
}
rpl::producer<> DcOptions::cdnConfigChanged() const {
return _cdnConfigChanged.events();
}
std::vector<DcId> DcOptions::configEnumDcIds() const {
auto result = std::vector<DcId>();
{
@ -553,20 +566,23 @@ DcType DcOptions::dcType(ShiftedDcId shiftedDcId) const {
void DcOptions::setCDNConfig(const MTPDcdnConfig &config) {
WriteLocker lock(this);
_cdnPublicKeys.clear();
for_const (auto &publicKey, config.vpublic_keys().v) {
Expects(publicKey.type() == mtpc_cdnPublicKey);
const auto &keyData = publicKey.c_cdnPublicKey();
const auto keyBytes = bytes::make_span(keyData.vpublic_key().v);
auto key = internal::RSAPublicKey(keyBytes);
if (key.valid()) {
_cdnPublicKeys[keyData.vdc_id().v].emplace(
key.fingerprint(),
std::move(key));
} else {
LOG(("MTP Error: could not read this public RSA key:"));
LOG((qs(keyData.vpublic_key())));
}
for (const auto &key : config.vpublic_keys().v) {
key.match([&](const MTPDcdnPublicKey &data) {
const auto keyBytes = bytes::make_span(data.vpublic_key().v);
auto key = internal::RSAPublicKey(keyBytes);
if (key.valid()) {
_cdnPublicKeys[data.vdc_id().v].emplace(
key.fingerprint(),
std::move(key));
} else {
LOG(("MTP Error: could not read this public RSA key:"));
LOG((qs(data.vpublic_key())));
}
});
}
lock.unlock();
_cdnConfigChanged.fire({});
}
bool DcOptions::hasCDNKeysForDc(DcId dcId) const {

View File

@ -67,6 +67,7 @@ public:
QByteArray serialize() const;
[[nodiscard]] rpl::producer<DcId> changed() const;
[[nodiscard]] rpl::producer<> cdnConfigChanged() const;
void setFromList(const MTPVector<MTPDcOption> &options);
void addFromList(const MTPVector<MTPDcOption> &options);
void addFromOther(DcOptions &&options);
@ -141,6 +142,7 @@ private:
mutable QReadWriteLock _useThroughLockers;
rpl::event_stream<DcId> _changed;
rpl::event_stream<> _cdnConfigChanged;
// True when we have overriden options from a .tdesktop-endpoints file.
bool _immutable = false;

View File

@ -71,6 +71,7 @@ public:
[[nodiscard]] rpl::producer<DcId> dcTemporaryKeyChanged() const;
[[nodiscard]] AuthKeysList getKeysForWrite() const;
void addKeysForDestroy(AuthKeysList &&keys);
[[nodiscard]] rpl::producer<> allKeysDestroyed() const;
[[nodiscard]] not_null<DcOptions*> dcOptions();
@ -106,10 +107,6 @@ public:
void removeDc(ShiftedDcId shiftedDcId);
void unpaused();
void queueQuittingConnection(
std::unique_ptr<Connection> &&connection);
void connectionFinished(not_null<Connection*> connection);
void sendRequest(
mtpRequestId requestId,
SerializedRequest &&request,
@ -216,8 +213,6 @@ private:
base::flat_map<ShiftedDcId, std::unique_ptr<Session>> _sessions;
std::vector<std::unique_ptr<Session>> _sessionsToDestroy;
std::vector<std::unique_ptr<Connection>> _connectionsToDestroy;
std::unique_ptr<ConfigLoader> _configLoader;
std::unique_ptr<DomainResolver> _domainResolver;
std::unique_ptr<SpecialConfigRequest> _httpUnixtimeLoader;
@ -229,6 +224,8 @@ private:
base::flat_map<DcId, AuthKeyPtr> _keysForWrite;
base::flat_map<ShiftedDcId, mtpRequestId> _logoutGuestRequestIds;
rpl::event_stream<> _allKeysDestroyed;
// holds dcWithShift for request to this dc or -dc for request to main dc
std::map<mtpRequestId, ShiftedDcId> _requestsByDc;
mutable QMutex _requestByDcLock;
@ -480,13 +477,10 @@ void Instance::Private::requestCDNConfig() {
MTPhelp_GetCdnConfig()
).done([this](const MTPCdnConfig &result) {
_cdnConfigLoadRequestId = 0;
Expects(result.type() == mtpc_cdnConfig);
dcOptions()->setCDNConfig(result.c_cdnConfig());
result.match([&](const MTPDcdnConfig &data) {
dcOptions()->setCDNConfig(data);
});
Local::writeSettings();
emit _instance->cdnConfigLoaded();
}).send();
}
@ -759,6 +753,10 @@ void Instance::Private::addKeysForDestroy(AuthKeysList &&keys) {
}
}
rpl::producer<> Instance::Private::allKeysDestroyed() const {
return _allKeysDestroyed.events();
}
not_null<DcOptions*> Instance::Private::dcOptions() {
return _dcOptions;
}
@ -777,22 +775,6 @@ void Instance::Private::unpaused() {
}
}
void Instance::Private::queueQuittingConnection(
std::unique_ptr<Connection> &&connection) {
_connectionsToDestroy.push_back(std::move(connection));
}
void Instance::Private::connectionFinished(
not_null<Connection*> connection) {
const auto i = ranges::find(
_connectionsToDestroy,
connection.get(),
&std::unique_ptr<Connection>::get);
if (i != _connectionsToDestroy.end()) {
_connectionsToDestroy.erase(i);
}
}
void Instance::Private::configLoadDone(const MTPConfig &result) {
Expects(result.type() == mtpc_config);
@ -1582,7 +1564,7 @@ void Instance::Private::completedKeyDestroy(ShiftedDcId shiftedDcId) {
_keysForWrite.erase(shiftedDcId);
killSession(shiftedDcId);
if (_dcenters.empty()) {
emit _instance->allKeysDestroyed();
_allKeysDestroyed.fire({});
}
}
@ -1674,6 +1656,10 @@ QString Instance::langPackName() const {
return Lang::Current().langPackName();
}
rpl::producer<> Instance::allKeysDestroyed() const {
return _private->allKeysDestroyed();
}
void Instance::requestConfig() {
_private->requestConfig();
}
@ -1698,10 +1684,6 @@ void Instance::requestCDNConfig() {
_private->requestCDNConfig();
}
void Instance::connectionFinished(not_null<Connection*> connection) {
_private->connectionFinished(connection);
}
void Instance::restart() {
_private->restart();
}
@ -1784,11 +1766,6 @@ void Instance::unpaused() {
_private->unpaused();
}
void Instance::queueQuittingConnection(
std::unique_ptr<Connection> &&connection) {
_private->queueQuittingConnection(std::move(connection));
}
void Instance::setUpdatesHandler(RPCDoneHandlerPtr onDone) {
_private->setUpdatesHandler(onDone);
}

View File

@ -15,7 +15,6 @@ namespace internal {
class Dcenter;
class Session;
class Connection;
[[nodiscard]] int GetNextRequestId();
@ -61,6 +60,8 @@ public:
[[nodiscard]] QString cloudLangCode() const;
[[nodiscard]] QString langPackName() const;
[[nodiscard]] rpl::producer<> allKeysDestroyed() const;
// Thread-safe.
[[nodiscard]] QString deviceModel() const;
[[nodiscard]] QString systemVersion() const;
@ -90,8 +91,6 @@ public:
void unpaused();
void queueQuittingConnection(std::unique_ptr<internal::Connection> &&connection);
void setUpdatesHandler(RPCDoneHandlerPtr onDone);
void setGlobalFailHandler(RPCFailHandlerPtr onFail);
void setStateChangedHandler(Fn<void(ShiftedDcId shiftedDcId, int32 state)> handler);
@ -126,8 +125,6 @@ public:
void syncHttpUnixtime();
void connectionFinished(not_null<internal::Connection*> connection);
void sendAnything(ShiftedDcId shiftedDcId = 0, crl::time msCanWait = 0);
template <typename Request>
@ -199,8 +196,6 @@ public:
}
signals:
void cdnConfigLoaded();
void allKeysDestroyed();
void proxyDomainResolved(
QString host,
QStringList ips,

View File

@ -148,8 +148,7 @@ Session::Session(
not_null<Instance*> instance,
ShiftedDcId shiftedDcId,
not_null<Dcenter*> dc)
: QObject()
, _instance(instance)
: _instance(instance)
, _shiftedDcId(shiftedDcId)
, _dc(dc)
, _data(std::make_shared<SessionData>(this))
@ -157,6 +156,19 @@ Session::Session(
_timeouter.callEach(1000);
refreshOptions();
watchDcKeyChanges();
watchDcOptionsChanges();
}
Session::~Session() {
Expects(!_connection);
Expects(!_thread);
if (_myKeyCreation != CreatingKeyType::None) {
releaseKeyCreationOnFail();
}
for (const auto &thread : _destroyingThreads) {
thread->wait();
}
}
void Session::watchDcKeyChanges() {
@ -166,13 +178,59 @@ void Session::watchDcKeyChanges() {
}) | rpl::start_with_next([=] {
DEBUG_LOG(("AuthKey Info: Session::authKeyCreatedForDC slot, "
"emitting authKeyChanged(), dcWithShift %1").arg(_shiftedDcId));
emit authKeyChanged();
if (const auto connection = _connection) {
InvokeQueued(connection, [=] {
connection->updateAuthKey();
});
}
}, _lifetime);
}
void Session::watchDcOptionsChanges() {
_instance->dcOptions()->changed(
) | rpl::filter([=](DcId dcId) {
return (BareDcId(_shiftedDcId) == dcId) && (_connection != nullptr);
}) | rpl::start_with_next([=] {
InvokeQueued(_connection, [connection = _connection] {
connection->dcOptionsChanged();
});
}, _lifetime);
if (_instance->dcOptions()->dcType(_shiftedDcId) == DcType::Cdn) {
_instance->dcOptions()->cdnConfigChanged(
) | rpl::filter([=] {
return (_connection != nullptr);
}) | rpl::start_with_next([=] {
InvokeQueued(_connection, [connection = _connection] {
connection->cdnConfigChanged();
});
}, _lifetime);
}
}
void Session::start() {
_connection = std::make_unique<Connection>(_instance);
_connection->start(_data, _shiftedDcId);
killConnection();
_thread = std::make_unique<QThread>();
const auto thread = _thread.get();
connect(thread, &QThread::finished, [=] {
InvokeQueued(this, [=] {
const auto i = ranges::find(
_destroyingThreads,
thread,
&std::unique_ptr<QThread>::get);
if (i != _destroyingThreads.end()) {
_destroyingThreads.erase(i);
}
});
});
_connection = new Connection(
_instance,
thread,
_data,
_shiftedDcId);
thread->start();
}
bool Session::rpcErrorOccured(
@ -188,7 +246,11 @@ void Session::restart() {
return;
}
refreshOptions();
emit needToRestart();
if (const auto connection = _connection) {
InvokeQueued(connection, [=] {
connection->restartNow();
});
}
}
void Session::refreshOptions() {
@ -221,14 +283,11 @@ void Session::reInitConnection() {
void Session::stop() {
if (_killed) {
DEBUG_LOG(("Session Error: can't kill a killed session"));
DEBUG_LOG(("Session Error: can't stop a killed session"));
return;
}
DEBUG_LOG(("Session Info: stopping session dcWithShift %1").arg(_shiftedDcId));
if (_connection) {
_connection->kill();
_instance->queueQuittingConnection(std::move(_connection));
}
killConnection();
}
void Session::kill() {
@ -286,12 +345,15 @@ void Session::needToResumeAndSend() {
DEBUG_LOG(("Session Info: resuming session dcWithShift %1").arg(_shiftedDcId));
start();
}
if (_ping) {
_ping = false;
emit needToPing();
} else {
emit needToSend();
}
const auto connection = _connection;
const auto ping = base::take(_ping);
InvokeQueued(connection, [=] {
if (ping) {
connection->sendPingForce();
} else {
connection->tryToSend();
}
});
}
void Session::connectionStateChange(int newState) {
@ -323,7 +385,7 @@ int32 Session::requestState(mtpRequestId requestId) const {
bool connected = false;
if (_connection) {
int32 s = _connection->state();
const auto s = _connection->getState();
if (s == ConnectedState) {
connected = true;
} else if (s == ConnectingState || s == DisconnectedState) {
@ -352,7 +414,7 @@ int32 Session::getState() const {
int32 result = -86400000;
if (_connection) {
int32 s = _connection->state();
const auto s = _connection->getState();
if (s == ConnectedState) {
return s;
} else if (s == ConnectingState || s == DisconnectedState) {
@ -448,7 +510,8 @@ void Session::releaseKeyCreationOnFail() {
}
void Session::notifyDcConnectionInited() {
DEBUG_LOG(("MTP Info: emitting MTProtoDC::connectionWasInited(), dcWithShift %1").arg(_shiftedDcId));
DEBUG_LOG(("MTP Info: MTProtoDC::connectionWasInited(), dcWithShift %1"
).arg(_shiftedDcId));
_dc->setConnectionInited();
}
@ -514,11 +577,19 @@ void Session::tryToReceive() {
}
}
Session::~Session() {
if (_myKeyCreation != CreatingKeyType::None) {
releaseKeyCreationOnFail();
void Session::killConnection() {
Expects(!_thread || _connection);
if (!_connection) {
return;
}
Assert(_connection == nullptr);
base::take(_connection)->deleteLater();
_destroyingThreads.push_back(base::take(_thread));
_destroyingThreads.back()->quit();
Ensures(_connection == nullptr);
Ensures(_thread == nullptr);
}
} // namespace internal

View File

@ -56,7 +56,7 @@ struct ConnectionOptions {
class Session;
class SessionData {
public:
SessionData(not_null<Session*> creator) : _owner(creator) {
explicit SessionData(not_null<Session*> creator) : _owner(creator) {
}
void notifyConnectionInited(const ConnectionOptions &options);
@ -92,11 +92,6 @@ public:
return _receivedUpdates;
}
// Warning! Valid only in constructor, _owner is guaranteed != null.
[[nodiscard]] not_null<Session*> owner() {
return _owner;
}
// Connection -> Session interface.
void queueTryToReceive();
void queueNeedToResumeAndSend();
@ -139,8 +134,6 @@ private:
};
class Session : public QObject {
Q_OBJECT
public:
// Main thread.
Session(
@ -180,8 +173,8 @@ public:
void ping();
void cancel(mtpRequestId requestId, mtpMsgId msgId);
int32 requestState(mtpRequestId requestId) const;
int32 getState() const;
int requestState(mtpRequestId requestId) const;
int getState() const;
QString transport() const;
void tryToReceive();
@ -190,23 +183,26 @@ public:
void resetDone();
void sendAnything(crl::time msCanWait = 0);
signals:
void authKeyChanged();
void needToSend();
void needToPing();
void needToRestart();
private:
void watchDcKeyChanges();
void watchDcOptionsChanges();
bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err);
void killConnection();
bool rpcErrorOccured(
mtpRequestId requestId,
const RPCFailHandlerPtr &onFail,
const RPCError &err);
const not_null<Instance*> _instance;
const ShiftedDcId _shiftedDcId = 0;
const not_null<Dcenter*> _dc;
const std::shared_ptr<SessionData> _data;
std::unique_ptr<Connection> _connection;
std::unique_ptr<QThread> _thread;
std::vector<std::unique_ptr<QThread>> _destroyingThreads;
Connection *_connection = nullptr;
bool _killed = false;
bool _needToReceive = false;