Keep in sent container only important msgIds.

This commit is contained in:
John Preston 2019-12-02 13:49:33 +03:00
parent efaa3ba453
commit 3b703d7262
2 changed files with 23 additions and 18 deletions

View File

@ -249,20 +249,31 @@ void Connection::checkSentRequests() {
} }
void Connection::clearOldContainers() { void Connection::clearOldContainers() {
auto resent = false;
const auto now = crl::now(); const auto now = crl::now();
for (auto i = _sentContainers.begin(); i != _sentContainers.end();) { for (auto i = _sentContainers.begin(); i != _sentContainers.end();) {
if (now > i->second.sent + kSentContainerLives) { if (now > i->second.sent + kSentContainerLives) {
DEBUG_LOG(("MTP Info: Removing old container %1, " DEBUG_LOG(("MTP Info: Removing old container with resending %1, "
"sent: %2, now: %3, current unixtime: %4" "sent: %2, now: %3, current unixtime: %4"
).arg(i->first ).arg(i->first
).arg(i->second.sent ).arg(i->second.sent
).arg(now ).arg(now
).arg(base::unixtime::now())); ).arg(base::unixtime::now()));
const auto ids = std::move(i->second.messages);
i = _sentContainers.erase(i); i = _sentContainers.erase(i);
resent = resent || !ids.empty();
for (const auto innerMsgId : ids) {
resend(innerMsgId, -1, true);
}
} else { } else {
++i; ++i;
} }
} }
if (resent) {
_sessionData->queueNeedToResumeAndSend();
}
} }
void Connection::destroyAllConnections() { void Connection::destroyAllConnections() {
@ -464,7 +475,6 @@ mtpMsgId Connection::replaceMsgId(SerializedRequest &request, mtpMsgId newId) {
} }
mtpMsgId Connection::placeToContainer( mtpMsgId Connection::placeToContainer(
SentContainer &sentIdsWrap,
SerializedRequest &toSendRequest, SerializedRequest &toSendRequest,
mtpMsgId &bigMsgId, mtpMsgId &bigMsgId,
bool forceNewMsgId, bool forceNewMsgId,
@ -473,7 +483,6 @@ mtpMsgId Connection::placeToContainer(
if (msgId >= bigMsgId) { if (msgId >= bigMsgId) {
bigMsgId = base::unixtime::mtproto_msg_id(); bigMsgId = base::unixtime::mtproto_msg_id();
} }
sentIdsWrap.messages.push_back(msgId);
uint32 from = toSendRequest->size(), len = req.messageSize(); uint32 from = toSendRequest->size(), len = req.messageSize();
toSendRequest->resize(from + len); toSendRequest->resize(from + len);
@ -554,6 +563,8 @@ void Connection::tryToSend() {
resendRequest = SerializedRequest::Serialize(MTPMsgResendReq( resendRequest = SerializedRequest::Serialize(MTPMsgResendReq(
MTP_msg_resend_req(MTP_vector<MTPlong>( MTP_msg_resend_req(MTP_vector<MTPlong>(
base::take(_resendRequestData))))); base::take(_resendRequestData)))));
// Add to haveSent / _ackedIds, but don't add to requestMap.
resendRequest->requestId = GetNextRequestId();
} }
if (!_stateRequestData.empty()) { if (!_stateRequestData.empty()) {
auto ids = QVector<MTPlong>(); auto ids = QVector<MTPlong>();
@ -768,7 +779,6 @@ void Connection::tryToSend() {
if (bindDcKeyRequest) { if (bindDcKeyRequest) {
_bindMsgId = placeToContainer( _bindMsgId = placeToContainer(
sentIdsWrap,
toSendRequest, toSendRequest,
bigMsgId, bigMsgId,
false, false,
@ -777,22 +787,17 @@ void Connection::tryToSend() {
} }
if (pingRequest) { if (pingRequest) {
_pingMsgId = placeToContainer( _pingMsgId = placeToContainer(
sentIdsWrap,
toSendRequest, toSendRequest,
bigMsgId, bigMsgId,
forceNewMsgId, forceNewMsgId,
pingRequest); pingRequest);
needAnyResponse = true; needAnyResponse = true;
} }
if (resendRequest || stateRequest) {
needAnyResponse = true;
}
for (auto &[requestId, request] : toSend) { for (auto &[requestId, request] : toSend) {
const auto msgId = prepareToSend( const auto msgId = prepareToSend(
request, request,
bigMsgId, bigMsgId,
forceNewMsgId); forceNewMsgId);
sentIdsWrap.messages.push_back(msgId);
if (msgId >= bigMsgId) { if (msgId >= bigMsgId) {
bigMsgId = base::unixtime::mtproto_msg_id(); bigMsgId = base::unixtime::mtproto_msg_id();
} }
@ -816,9 +821,10 @@ void Connection::tryToSend() {
*(toSendRequest->data() + reqNeedsLayer + 3) += initSize; *(toSendRequest->data() + reqNeedsLayer + 3) += initSize;
added = true; added = true;
} }
Assert(!haveSent.contains(msgId)); Assert(!haveSent.contains(msgId));
haveSent.emplace(msgId, request); haveSent.emplace(msgId, request);
sentIdsWrap.messages.push_back(msgId);
needAnyResponse = true; needAnyResponse = true;
} else { } else {
_ackedIds.emplace(msgId, request->requestId); _ackedIds.emplace(msgId, request->requestId);
@ -832,25 +838,28 @@ void Connection::tryToSend() {
} }
if (stateRequest) { if (stateRequest) {
const auto msgId = placeToContainer( const auto msgId = placeToContainer(
sentIdsWrap,
toSendRequest, toSendRequest,
bigMsgId, bigMsgId,
forceNewMsgId, forceNewMsgId,
stateRequest); stateRequest);
Assert(!haveSent.contains(msgId)); Assert(!haveSent.contains(msgId));
haveSent.emplace(msgId, stateRequest); haveSent.emplace(msgId, stateRequest);
sentIdsWrap.messages.push_back(msgId);
needAnyResponse = true;
} }
if (resendRequest) { if (resendRequest) {
placeToContainer( const auto msgId = placeToContainer(
sentIdsWrap,
toSendRequest, toSendRequest,
bigMsgId, bigMsgId,
forceNewMsgId, forceNewMsgId,
resendRequest); resendRequest);
Assert(!haveSent.contains(msgId));
haveSent.emplace(msgId, resendRequest);
sentIdsWrap.messages.push_back(msgId);
needAnyResponse = true;
} }
if (ackRequest) { if (ackRequest) {
placeToContainer( placeToContainer(
sentIdsWrap,
toSendRequest, toSendRequest,
bigMsgId, bigMsgId,
forceNewMsgId, forceNewMsgId,
@ -858,7 +867,6 @@ void Connection::tryToSend() {
} }
if (httpWaitRequest) { if (httpWaitRequest) {
placeToContainer( placeToContainer(
sentIdsWrap,
toSendRequest, toSendRequest,
bigMsgId, bigMsgId,
forceNewMsgId, forceNewMsgId,
@ -1959,7 +1967,6 @@ void Connection::requestsAcked(const QVector<MTPlong> &ids, bool byResponse) {
DEBUG_LOG(("Message Info: requests acked, ids %1").arg(LogIdsVector(ids))); DEBUG_LOG(("Message Info: requests acked, ids %1").arg(LogIdsVector(ids)));
auto clearedBecauseTooOld = std::vector<RPCCallbackClear>();
QVector<MTPlong> toAckMore; QVector<MTPlong> toAckMore;
{ {
QWriteLocker locker2(_sessionData->haveSentMutex()); QWriteLocker locker2(_sessionData->haveSentMutex());
@ -2023,7 +2030,6 @@ void Connection::requestsAcked(const QVector<MTPlong> &ids, bool byResponse) {
auto ackedCount = _ackedIds.size(); auto ackedCount = _ackedIds.size();
if (ackedCount > kIdsBufferSize) { if (ackedCount > kIdsBufferSize) {
DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize)); DEBUG_LOG(("Message Info: removing some old acked sent msgIds %1").arg(ackedCount - kIdsBufferSize));
clearedBecauseTooOld.reserve(ackedCount - kIdsBufferSize);
while (ackedCount-- > kIdsBufferSize) { while (ackedCount-- > kIdsBufferSize) {
_ackedIds.erase(_ackedIds.begin()); _ackedIds.erase(_ackedIds.begin());
} }

View File

@ -106,7 +106,6 @@ private:
void clearOldContainers(); void clearOldContainers();
mtpMsgId placeToContainer( mtpMsgId placeToContainer(
SentContainer &sentIdsWrap,
details::SerializedRequest &toSendRequest, details::SerializedRequest &toSendRequest,
mtpMsgId &bigMsgId, mtpMsgId &bigMsgId,
bool forceNewMsgId, bool forceNewMsgId,